diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/js/bin | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/js/bin')
-rwxr-xr-x | src/arrow/js/bin/arrow2csv.js | 28 | ||||
-rwxr-xr-x | src/arrow/js/bin/file-to-stream.js | 40 | ||||
-rwxr-xr-x | src/arrow/js/bin/integration.js | 255 | ||||
-rwxr-xr-x | src/arrow/js/bin/json-to-arrow.js | 108 | ||||
-rwxr-xr-x | src/arrow/js/bin/print-buffer-alignment.js | 81 | ||||
-rwxr-xr-x | src/arrow/js/bin/stream-to-file.js | 40 |
6 files changed, 552 insertions, 0 deletions
diff --git a/src/arrow/js/bin/arrow2csv.js b/src/arrow/js/bin/arrow2csv.js new file mode 100755 index 000000000..0e446fabe --- /dev/null +++ b/src/arrow/js/bin/arrow2csv.js @@ -0,0 +1,28 @@ +#! /usr/bin/env node + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +const Path = require(`path`); +const here = Path.resolve(__dirname, '../'); +const tsnode = require.resolve(`ts-node/register`); +const arrow2csv = Path.join(here, `src/bin/arrow2csv.ts`); +const env = { ...process.env, TS_NODE_TRANSPILE_ONLY: `true` }; + +require('child_process').spawn(`node`, [ + `-r`, tsnode, arrow2csv, ...process.argv.slice(2) +], { cwd: here, env, stdio: `inherit` }); diff --git a/src/arrow/js/bin/file-to-stream.js b/src/arrow/js/bin/file-to-stream.js new file mode 100755 index 000000000..090cd0b0e --- /dev/null +++ b/src/arrow/js/bin/file-to-stream.js @@ -0,0 +1,40 @@ +#! /usr/bin/env node + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// @ts-check + +const fs = require('fs'); +const path = require('path'); +const eos = require('util').promisify(require('stream').finished); +const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : ''; +const { RecordBatchReader, RecordBatchStreamWriter } = require(`../index${extension}`); + +(async () => { + + const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2])); + const writable = process.argv.length < 4 ? process.stdout : fs.createWriteStream(path.resolve(process.argv[3])); + + const fileToStream = readable + .pipe(RecordBatchReader.throughNode()) + .pipe(RecordBatchStreamWriter.throughNode()) + .pipe(writable); + + await eos(fileToStream); + +})().catch((e) => { console.error(e); process.exit(1); }); diff --git a/src/arrow/js/bin/integration.js b/src/arrow/js/bin/integration.js new file mode 100755 index 000000000..507514eba --- /dev/null +++ b/src/arrow/js/bin/integration.js @@ -0,0 +1,255 @@ +#! /usr/bin/env node + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// @ts-nocheck + +const fs = require('fs'); +const Path = require('path'); +const { promisify } = require('util'); +const glob = promisify(require('glob')); +const { zip } = require('ix/iterable/zip'); +const { parse: bignumJSONParse } = require('json-bignum'); +const argv = require(`command-line-args`)(cliOpts(), { partial: true }); +const { + Table, + RecordBatchReader, + util: { createElementComparator } +} = require('../targets/apache-arrow/'); + +const exists = async (p) => { + try { + return !!(await fs.promises.stat(p)); + } catch (e) { return false; } +} + +(async () => { + + if (!argv.mode) { return print_usage(); } + + let mode = argv.mode.toUpperCase(); + let jsonPaths = [...(argv.json || [])]; + let arrowPaths = [...(argv.arrow || [])]; + + if (mode === 'VALIDATE' && !jsonPaths.length) { + [jsonPaths, arrowPaths] = await loadLocalJSONAndArrowPathsForDebugging(jsonPaths, arrowPaths); + } + + if (!jsonPaths.length) { return print_usage(); } + + switch (mode) { + case 'VALIDATE': + for (let [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) { + await validate(jsonPath, arrowPath); + } + break; + default: + return print_usage(); + } +})() +.then((x) => +x || 0, (e) => { + e && process.stderr.write(`${e?.stack || e}\n`); + return process.exitCode || 1; +}).then((code) => process.exit(code)); + +function cliOpts() { + return [ + { + type: String, + name: 'mode', + description: 'The integration test to run' + }, + { + type: String, + name: 'arrow', alias: 'a', + multiple: true, defaultValue: [], + description: 'The Arrow file[s] to read/write' + }, + { + type: String, + name: 'json', alias: 'j', + multiple: true, defaultValue: [], + description: 'The JSON file[s] to read/write' + } + ]; +} + +function print_usage() { + console.log(require('command-line-usage')([ + { + header: 'integration', + content: 'Script for running Arrow integration tests' + }, + { + header: 'Synopsis', + content: [ + '$ integration.js -j file.json -a file.arrow --mode validate' + ] + }, + { + header: 'Options', + optionList: [ + ...cliOpts(), + { + name: 'help', + description: 'Print this usage guide.' + } + ] + }, + ])); + return 1; +} + +async function validate(jsonPath, arrowPath) { + + const files = await Promise.all([ + fs.promises.readFile(arrowPath), + fs.promises.readFile(jsonPath, 'utf8'), + ]); + + const arrowData = files[0]; + const jsonData = bignumJSONParse(files[1]); + + validateReaderIntegration(jsonData, arrowData); + validateTableFromBuffersIntegration(jsonData, arrowData); + validateTableToBuffersIntegration('json', 'file')(jsonData, arrowData); + validateTableToBuffersIntegration('json', 'file')(jsonData, arrowData); + validateTableToBuffersIntegration('binary', 'file')(jsonData, arrowData); + validateTableToBuffersIntegration('binary', 'file')(jsonData, arrowData); +} + +function validateReaderIntegration(jsonData, arrowBuffer) { + const msg = `json and arrow record batches report the same values`; + try { + const jsonReader = RecordBatchReader.from(jsonData); + const binaryReader = RecordBatchReader.from(arrowBuffer); + for (const [jsonRecordBatch, binaryRecordBatch] of zip(jsonReader, binaryReader)) { + compareTableIsh(jsonRecordBatch, binaryRecordBatch); + } + } catch (e) { throw new Error(`${msg}: fail \n ${e?.stack || e}`); } + process.stdout.write(`${msg}: pass\n`); +} + +function validateTableFromBuffersIntegration(jsonData, arrowBuffer) { + const msg = `json and arrow tables report the same values`; + try { + const jsonTable = Table.from(jsonData); + const binaryTable = Table.from(arrowBuffer); + compareTableIsh(jsonTable, binaryTable); + } catch (e) { throw new Error(`${msg}: fail \n ${e?.stack || e}`); } + process.stdout.write(`${msg}: pass\n`); +} + +function validateTableToBuffersIntegration(srcFormat, arrowFormat) { + const refFormat = srcFormat === `json` ? `binary` : `json`; + return function testTableToBuffersIntegration(jsonData, arrowBuffer) { + const msg = `serialized ${srcFormat} ${arrowFormat} reports the same values as the ${refFormat} ${arrowFormat}`; + try { + const refTable = Table.from(refFormat === `json` ? jsonData : arrowBuffer); + const srcTable = Table.from(srcFormat === `json` ? jsonData : arrowBuffer); + const dstTable = Table.from(srcTable.serialize(`binary`, arrowFormat === `stream`)); + compareTableIsh(dstTable, refTable); + } catch (e) { throw new Error(`${msg}: fail \n ${e?.stack || e}`); } + process.stdout.write(`${msg}: pass\n`); + }; +} + +function compareTableIsh(actual, expected) { + if (actual.length !== expected.length) { + throw new Error(`length: ${actual.length} !== ${expected.length}`); + } + if (actual.numCols !== expected.numCols) { + throw new Error(`numCols: ${actual.numCols} !== ${expected.numCols}`); + } + (() => { + const getChildAtFn = expected instanceof Table ? 'getColumnAt' : 'getChildAt'; + for (let i = -1, n = actual.numCols; ++i < n;) { + const v1 = actual[getChildAtFn](i); + const v2 = expected[getChildAtFn](i); + compareVectors(v1, v2); + } + })(); +} + +function compareVectors(actual, expected) { + + if ((actual == null && expected != null) || (expected == null && actual != null)) { + throw new Error(`${actual == null ? `actual` : `expected`} is null, was expecting ${actual ?? expected} to be that also`); + } + + let props = ['type', 'length', 'nullCount']; + + (() => { + for (let i = -1, n = props.length; ++i < n;) { + const prop = props[i]; + if (`${actual[prop]}` !== `${expected[prop]}`) { + throw new Error(`${prop}: ${actual[prop]} !== ${expected[prop]}`); + } + } + })(); + + (() => { + for (let i = -1, n = actual.length; ++i < n;) { + let x1 = actual.get(i), x2 = expected.get(i); + if (!createElementComparator(x2)(x1)) { + throw new Error(`${i}: ${x1} !== ${x2}`); + } + } + })(); + + (() => { + let i = -1; + for (let [x1, x2] of zip(actual, expected)) { + ++i; + if (!createElementComparator(x2)(x1)) { + throw new Error(`${i}: ${x1} !== ${x2}`); + } + } + })(); +} + +async function loadLocalJSONAndArrowPathsForDebugging(jsonPaths, arrowPaths) { + + const sourceJSONPaths = await glob(Path.resolve(__dirname, `../test/data/json/`, `*.json`)); + + if (!arrowPaths.length) { + await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'cpp', 'file'); + await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'java', 'file'); + await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'cpp', 'stream'); + await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'java', 'stream'); + } + + for (let [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) { + console.log(`jsonPath: ${jsonPath}`); + console.log(`arrowPath: ${arrowPath}`); + } + + return [jsonPaths, arrowPaths]; + + async function loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, source, format) { + for (const jsonPath of sourceJSONPaths) { + const { name } = Path.parse(jsonPath); + const arrowPath = Path.resolve(__dirname, `../test/data/${source}/${format}/${name}.arrow`); + if (await exists(arrowPath)) { + jsonPaths.push(jsonPath); + arrowPaths.push(arrowPath); + } + } + return [jsonPaths, arrowPaths]; + } +} diff --git a/src/arrow/js/bin/json-to-arrow.js b/src/arrow/js/bin/json-to-arrow.js new file mode 100755 index 000000000..8f3fbd3fc --- /dev/null +++ b/src/arrow/js/bin/json-to-arrow.js @@ -0,0 +1,108 @@ +#! /usr/bin/env node + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// @ts-check + +const fs = require('fs'); +const Path = require('path'); +const { parse } = require('json-bignum'); +const eos = require('util').promisify(require('stream').finished); +const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : ''; +const argv = require(`command-line-args`)(cliOpts(), { partial: true }); +const { RecordBatchReader, RecordBatchFileWriter, RecordBatchStreamWriter } = require(`../index${extension}`); + +const jsonPaths = [...(argv.json || [])]; +const arrowPaths = [...(argv.arrow || [])]; + +(async () => { + + if (!jsonPaths.length || !arrowPaths.length || (jsonPaths.length !== arrowPaths.length)) { + return print_usage(); + } + + await Promise.all(jsonPaths.map(async (path, i) => { + + const RecordBatchWriter = argv.format !== 'stream' + ? RecordBatchFileWriter + : RecordBatchStreamWriter; + + const reader = RecordBatchReader.from(parse( + await fs.promises.readFile(Path.resolve(path), 'utf8'))); + + const jsonToArrow = reader + .pipe(RecordBatchWriter.throughNode()) + .pipe(fs.createWriteStream(arrowPaths[i])); + + await eos(jsonToArrow); + + })); +})() +.then((x) => +x || 0, (e) => { + e && process.stderr.write(`${e}`); + return process.exitCode || 1; +}).then((code = 0) => process.exit(code)); + +function cliOpts() { + return [ + { + type: String, + name: 'format', alias: 'f', + multiple: false, defaultValue: 'file', + description: 'The Arrow format to write, either "file" or "stream"' + }, + { + type: String, + name: 'arrow', alias: 'a', + multiple: true, defaultValue: [], + description: 'The Arrow file[s] to write' + }, + { + type: String, + name: 'json', alias: 'j', + multiple: true, defaultValue: [], + description: 'The JSON file[s] to read' + } + ]; +} + +function print_usage() { + console.log(require('command-line-usage')([ + { + header: 'json-to-arrow', + content: 'Script for converting a JSON Arrow file to a binary Arrow file' + }, + { + header: 'Synopsis', + content: [ + '$ json-to-arrow.js -j in.json -a out.arrow -f stream' + ] + }, + { + header: 'Options', + optionList: [ + ...cliOpts(), + { + name: 'help', + description: 'Print this usage guide.' + } + ] + }, + ])); + return 1; +} diff --git a/src/arrow/js/bin/print-buffer-alignment.js b/src/arrow/js/bin/print-buffer-alignment.js new file mode 100755 index 000000000..4c3260397 --- /dev/null +++ b/src/arrow/js/bin/print-buffer-alignment.js @@ -0,0 +1,81 @@ +#! /usr/bin/env node + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// @ts-check + +const fs = require('fs'); +const path = require('path'); +const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : ''; +const { RecordBatch, AsyncMessageReader } = require(`../index${extension}`); +const { VectorLoader } = require(`../targets/apache-arrow/visitor/vectorloader`); + +(async () => { + + const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2])); + const reader = new AsyncMessageReader(readable); + + let schema, recordBatchIndex = 0, dictionaryBatchIndex = 0; + + for await (let message of reader) { + + let bufferRegions = []; + + if (message.isSchema()) { + schema = message.header(); + continue; + } else if (message.isRecordBatch()) { + const header = message.header(); + bufferRegions = header.buffers; + const body = await reader.readMessageBody(message.bodyLength); + const recordBatch = loadRecordBatch(schema, header, body); + console.log(`record batch ${++recordBatchIndex}: ${JSON.stringify({ + offset: body.byteOffset, + length: body.byteLength, + numRows: recordBatch.length, + })}`); + } else if (message.isDictionaryBatch()) { + const header = message.header(); + bufferRegions = header.data.buffers; + const type = schema.dictionaries.get(header.id); + const body = await reader.readMessageBody(message.bodyLength); + const recordBatch = loadDictionaryBatch(header.data, body, type); + console.log(`dictionary batch ${++dictionaryBatchIndex}: ${JSON.stringify({ + offset: body.byteOffset, + length: body.byteLength, + numRows: recordBatch.length, + dictionaryId: header.id, + })}`); + } + + bufferRegions.forEach(({ offset, length }, i) => { + console.log(`\tbuffer ${i + 1}: { offset: ${offset}, length: ${length} }`); + }); + } + + await reader.return(); + +})().catch((e) => { console.error(e); process.exit(1); }); + +function loadRecordBatch(schema, header, body) { + return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields)); +} + +function loadDictionaryBatch(header, body, dictionaryType) { + return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType])); +} diff --git a/src/arrow/js/bin/stream-to-file.js b/src/arrow/js/bin/stream-to-file.js new file mode 100755 index 000000000..015a5eace --- /dev/null +++ b/src/arrow/js/bin/stream-to-file.js @@ -0,0 +1,40 @@ +#! /usr/bin/env node + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// @ts-check + +const fs = require('fs'); +const path = require('path'); +const eos = require('util').promisify(require('stream').finished); +const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : ''; +const { RecordBatchReader, RecordBatchFileWriter } = require(`../index${extension}`); + +(async () => { + + const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2])); + const writable = process.argv.length < 4 ? process.stdout : fs.createWriteStream(path.resolve(process.argv[3])); + + const streamToFile = readable + .pipe(RecordBatchReader.throughNode()) + .pipe(RecordBatchFileWriter.throughNode()) + .pipe(writable); + + await eos(streamToFile); + +})().catch((e) => { console.error(e); process.exit(1); }); |