summaryrefslogtreecommitdiffstats
path: root/src/arrow/js/bin
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/js/bin
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/js/bin')
-rwxr-xr-xsrc/arrow/js/bin/arrow2csv.js28
-rwxr-xr-xsrc/arrow/js/bin/file-to-stream.js40
-rwxr-xr-xsrc/arrow/js/bin/integration.js255
-rwxr-xr-xsrc/arrow/js/bin/json-to-arrow.js108
-rwxr-xr-xsrc/arrow/js/bin/print-buffer-alignment.js81
-rwxr-xr-xsrc/arrow/js/bin/stream-to-file.js40
6 files changed, 552 insertions, 0 deletions
diff --git a/src/arrow/js/bin/arrow2csv.js b/src/arrow/js/bin/arrow2csv.js
new file mode 100755
index 000000000..0e446fabe
--- /dev/null
+++ b/src/arrow/js/bin/arrow2csv.js
@@ -0,0 +1,28 @@
+#! /usr/bin/env node
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+const Path = require(`path`);
+const here = Path.resolve(__dirname, '../');
+const tsnode = require.resolve(`ts-node/register`);
+const arrow2csv = Path.join(here, `src/bin/arrow2csv.ts`);
+const env = { ...process.env, TS_NODE_TRANSPILE_ONLY: `true` };
+
+require('child_process').spawn(`node`, [
+ `-r`, tsnode, arrow2csv, ...process.argv.slice(2)
+], { cwd: here, env, stdio: `inherit` });
diff --git a/src/arrow/js/bin/file-to-stream.js b/src/arrow/js/bin/file-to-stream.js
new file mode 100755
index 000000000..090cd0b0e
--- /dev/null
+++ b/src/arrow/js/bin/file-to-stream.js
@@ -0,0 +1,40 @@
+#! /usr/bin/env node
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// @ts-check
+
+const fs = require('fs');
+const path = require('path');
+const eos = require('util').promisify(require('stream').finished);
+const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : '';
+const { RecordBatchReader, RecordBatchStreamWriter } = require(`../index${extension}`);
+
+(async () => {
+
+ const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2]));
+ const writable = process.argv.length < 4 ? process.stdout : fs.createWriteStream(path.resolve(process.argv[3]));
+
+ const fileToStream = readable
+ .pipe(RecordBatchReader.throughNode())
+ .pipe(RecordBatchStreamWriter.throughNode())
+ .pipe(writable);
+
+ await eos(fileToStream);
+
+})().catch((e) => { console.error(e); process.exit(1); });
diff --git a/src/arrow/js/bin/integration.js b/src/arrow/js/bin/integration.js
new file mode 100755
index 000000000..507514eba
--- /dev/null
+++ b/src/arrow/js/bin/integration.js
@@ -0,0 +1,255 @@
+#! /usr/bin/env node
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// @ts-nocheck
+
+const fs = require('fs');
+const Path = require('path');
+const { promisify } = require('util');
+const glob = promisify(require('glob'));
+const { zip } = require('ix/iterable/zip');
+const { parse: bignumJSONParse } = require('json-bignum');
+const argv = require(`command-line-args`)(cliOpts(), { partial: true });
+const {
+ Table,
+ RecordBatchReader,
+ util: { createElementComparator }
+} = require('../targets/apache-arrow/');
+
+const exists = async (p) => {
+ try {
+ return !!(await fs.promises.stat(p));
+ } catch (e) { return false; }
+}
+
+(async () => {
+
+ if (!argv.mode) { return print_usage(); }
+
+ let mode = argv.mode.toUpperCase();
+ let jsonPaths = [...(argv.json || [])];
+ let arrowPaths = [...(argv.arrow || [])];
+
+ if (mode === 'VALIDATE' && !jsonPaths.length) {
+ [jsonPaths, arrowPaths] = await loadLocalJSONAndArrowPathsForDebugging(jsonPaths, arrowPaths);
+ }
+
+ if (!jsonPaths.length) { return print_usage(); }
+
+ switch (mode) {
+ case 'VALIDATE':
+ for (let [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) {
+ await validate(jsonPath, arrowPath);
+ }
+ break;
+ default:
+ return print_usage();
+ }
+})()
+.then((x) => +x || 0, (e) => {
+ e && process.stderr.write(`${e?.stack || e}\n`);
+ return process.exitCode || 1;
+}).then((code) => process.exit(code));
+
+function cliOpts() {
+ return [
+ {
+ type: String,
+ name: 'mode',
+ description: 'The integration test to run'
+ },
+ {
+ type: String,
+ name: 'arrow', alias: 'a',
+ multiple: true, defaultValue: [],
+ description: 'The Arrow file[s] to read/write'
+ },
+ {
+ type: String,
+ name: 'json', alias: 'j',
+ multiple: true, defaultValue: [],
+ description: 'The JSON file[s] to read/write'
+ }
+ ];
+}
+
+function print_usage() {
+ console.log(require('command-line-usage')([
+ {
+ header: 'integration',
+ content: 'Script for running Arrow integration tests'
+ },
+ {
+ header: 'Synopsis',
+ content: [
+ '$ integration.js -j file.json -a file.arrow --mode validate'
+ ]
+ },
+ {
+ header: 'Options',
+ optionList: [
+ ...cliOpts(),
+ {
+ name: 'help',
+ description: 'Print this usage guide.'
+ }
+ ]
+ },
+ ]));
+ return 1;
+}
+
+async function validate(jsonPath, arrowPath) {
+
+ const files = await Promise.all([
+ fs.promises.readFile(arrowPath),
+ fs.promises.readFile(jsonPath, 'utf8'),
+ ]);
+
+ const arrowData = files[0];
+ const jsonData = bignumJSONParse(files[1]);
+
+ validateReaderIntegration(jsonData, arrowData);
+ validateTableFromBuffersIntegration(jsonData, arrowData);
+ validateTableToBuffersIntegration('json', 'file')(jsonData, arrowData);
+ validateTableToBuffersIntegration('json', 'file')(jsonData, arrowData);
+ validateTableToBuffersIntegration('binary', 'file')(jsonData, arrowData);
+ validateTableToBuffersIntegration('binary', 'file')(jsonData, arrowData);
+}
+
+function validateReaderIntegration(jsonData, arrowBuffer) {
+ const msg = `json and arrow record batches report the same values`;
+ try {
+ const jsonReader = RecordBatchReader.from(jsonData);
+ const binaryReader = RecordBatchReader.from(arrowBuffer);
+ for (const [jsonRecordBatch, binaryRecordBatch] of zip(jsonReader, binaryReader)) {
+ compareTableIsh(jsonRecordBatch, binaryRecordBatch);
+ }
+ } catch (e) { throw new Error(`${msg}: fail \n ${e?.stack || e}`); }
+ process.stdout.write(`${msg}: pass\n`);
+}
+
+function validateTableFromBuffersIntegration(jsonData, arrowBuffer) {
+ const msg = `json and arrow tables report the same values`;
+ try {
+ const jsonTable = Table.from(jsonData);
+ const binaryTable = Table.from(arrowBuffer);
+ compareTableIsh(jsonTable, binaryTable);
+ } catch (e) { throw new Error(`${msg}: fail \n ${e?.stack || e}`); }
+ process.stdout.write(`${msg}: pass\n`);
+}
+
+function validateTableToBuffersIntegration(srcFormat, arrowFormat) {
+ const refFormat = srcFormat === `json` ? `binary` : `json`;
+ return function testTableToBuffersIntegration(jsonData, arrowBuffer) {
+ const msg = `serialized ${srcFormat} ${arrowFormat} reports the same values as the ${refFormat} ${arrowFormat}`;
+ try {
+ const refTable = Table.from(refFormat === `json` ? jsonData : arrowBuffer);
+ const srcTable = Table.from(srcFormat === `json` ? jsonData : arrowBuffer);
+ const dstTable = Table.from(srcTable.serialize(`binary`, arrowFormat === `stream`));
+ compareTableIsh(dstTable, refTable);
+ } catch (e) { throw new Error(`${msg}: fail \n ${e?.stack || e}`); }
+ process.stdout.write(`${msg}: pass\n`);
+ };
+}
+
+function compareTableIsh(actual, expected) {
+ if (actual.length !== expected.length) {
+ throw new Error(`length: ${actual.length} !== ${expected.length}`);
+ }
+ if (actual.numCols !== expected.numCols) {
+ throw new Error(`numCols: ${actual.numCols} !== ${expected.numCols}`);
+ }
+ (() => {
+ const getChildAtFn = expected instanceof Table ? 'getColumnAt' : 'getChildAt';
+ for (let i = -1, n = actual.numCols; ++i < n;) {
+ const v1 = actual[getChildAtFn](i);
+ const v2 = expected[getChildAtFn](i);
+ compareVectors(v1, v2);
+ }
+ })();
+}
+
+function compareVectors(actual, expected) {
+
+ if ((actual == null && expected != null) || (expected == null && actual != null)) {
+ throw new Error(`${actual == null ? `actual` : `expected`} is null, was expecting ${actual ?? expected} to be that also`);
+ }
+
+ let props = ['type', 'length', 'nullCount'];
+
+ (() => {
+ for (let i = -1, n = props.length; ++i < n;) {
+ const prop = props[i];
+ if (`${actual[prop]}` !== `${expected[prop]}`) {
+ throw new Error(`${prop}: ${actual[prop]} !== ${expected[prop]}`);
+ }
+ }
+ })();
+
+ (() => {
+ for (let i = -1, n = actual.length; ++i < n;) {
+ let x1 = actual.get(i), x2 = expected.get(i);
+ if (!createElementComparator(x2)(x1)) {
+ throw new Error(`${i}: ${x1} !== ${x2}`);
+ }
+ }
+ })();
+
+ (() => {
+ let i = -1;
+ for (let [x1, x2] of zip(actual, expected)) {
+ ++i;
+ if (!createElementComparator(x2)(x1)) {
+ throw new Error(`${i}: ${x1} !== ${x2}`);
+ }
+ }
+ })();
+}
+
+async function loadLocalJSONAndArrowPathsForDebugging(jsonPaths, arrowPaths) {
+
+ const sourceJSONPaths = await glob(Path.resolve(__dirname, `../test/data/json/`, `*.json`));
+
+ if (!arrowPaths.length) {
+ await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'cpp', 'file');
+ await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'java', 'file');
+ await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'cpp', 'stream');
+ await loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, 'java', 'stream');
+ }
+
+ for (let [jsonPath, arrowPath] of zip(jsonPaths, arrowPaths)) {
+ console.log(`jsonPath: ${jsonPath}`);
+ console.log(`arrowPath: ${arrowPath}`);
+ }
+
+ return [jsonPaths, arrowPaths];
+
+ async function loadJSONAndArrowPaths(sourceJSONPaths, jsonPaths, arrowPaths, source, format) {
+ for (const jsonPath of sourceJSONPaths) {
+ const { name } = Path.parse(jsonPath);
+ const arrowPath = Path.resolve(__dirname, `../test/data/${source}/${format}/${name}.arrow`);
+ if (await exists(arrowPath)) {
+ jsonPaths.push(jsonPath);
+ arrowPaths.push(arrowPath);
+ }
+ }
+ return [jsonPaths, arrowPaths];
+ }
+}
diff --git a/src/arrow/js/bin/json-to-arrow.js b/src/arrow/js/bin/json-to-arrow.js
new file mode 100755
index 000000000..8f3fbd3fc
--- /dev/null
+++ b/src/arrow/js/bin/json-to-arrow.js
@@ -0,0 +1,108 @@
+#! /usr/bin/env node
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// @ts-check
+
+const fs = require('fs');
+const Path = require('path');
+const { parse } = require('json-bignum');
+const eos = require('util').promisify(require('stream').finished);
+const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : '';
+const argv = require(`command-line-args`)(cliOpts(), { partial: true });
+const { RecordBatchReader, RecordBatchFileWriter, RecordBatchStreamWriter } = require(`../index${extension}`);
+
+const jsonPaths = [...(argv.json || [])];
+const arrowPaths = [...(argv.arrow || [])];
+
+(async () => {
+
+ if (!jsonPaths.length || !arrowPaths.length || (jsonPaths.length !== arrowPaths.length)) {
+ return print_usage();
+ }
+
+ await Promise.all(jsonPaths.map(async (path, i) => {
+
+ const RecordBatchWriter = argv.format !== 'stream'
+ ? RecordBatchFileWriter
+ : RecordBatchStreamWriter;
+
+ const reader = RecordBatchReader.from(parse(
+ await fs.promises.readFile(Path.resolve(path), 'utf8')));
+
+ const jsonToArrow = reader
+ .pipe(RecordBatchWriter.throughNode())
+ .pipe(fs.createWriteStream(arrowPaths[i]));
+
+ await eos(jsonToArrow);
+
+ }));
+})()
+.then((x) => +x || 0, (e) => {
+ e && process.stderr.write(`${e}`);
+ return process.exitCode || 1;
+}).then((code = 0) => process.exit(code));
+
+function cliOpts() {
+ return [
+ {
+ type: String,
+ name: 'format', alias: 'f',
+ multiple: false, defaultValue: 'file',
+ description: 'The Arrow format to write, either "file" or "stream"'
+ },
+ {
+ type: String,
+ name: 'arrow', alias: 'a',
+ multiple: true, defaultValue: [],
+ description: 'The Arrow file[s] to write'
+ },
+ {
+ type: String,
+ name: 'json', alias: 'j',
+ multiple: true, defaultValue: [],
+ description: 'The JSON file[s] to read'
+ }
+ ];
+}
+
+function print_usage() {
+ console.log(require('command-line-usage')([
+ {
+ header: 'json-to-arrow',
+ content: 'Script for converting a JSON Arrow file to a binary Arrow file'
+ },
+ {
+ header: 'Synopsis',
+ content: [
+ '$ json-to-arrow.js -j in.json -a out.arrow -f stream'
+ ]
+ },
+ {
+ header: 'Options',
+ optionList: [
+ ...cliOpts(),
+ {
+ name: 'help',
+ description: 'Print this usage guide.'
+ }
+ ]
+ },
+ ]));
+ return 1;
+}
diff --git a/src/arrow/js/bin/print-buffer-alignment.js b/src/arrow/js/bin/print-buffer-alignment.js
new file mode 100755
index 000000000..4c3260397
--- /dev/null
+++ b/src/arrow/js/bin/print-buffer-alignment.js
@@ -0,0 +1,81 @@
+#! /usr/bin/env node
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// @ts-check
+
+const fs = require('fs');
+const path = require('path');
+const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : '';
+const { RecordBatch, AsyncMessageReader } = require(`../index${extension}`);
+const { VectorLoader } = require(`../targets/apache-arrow/visitor/vectorloader`);
+
+(async () => {
+
+ const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2]));
+ const reader = new AsyncMessageReader(readable);
+
+ let schema, recordBatchIndex = 0, dictionaryBatchIndex = 0;
+
+ for await (let message of reader) {
+
+ let bufferRegions = [];
+
+ if (message.isSchema()) {
+ schema = message.header();
+ continue;
+ } else if (message.isRecordBatch()) {
+ const header = message.header();
+ bufferRegions = header.buffers;
+ const body = await reader.readMessageBody(message.bodyLength);
+ const recordBatch = loadRecordBatch(schema, header, body);
+ console.log(`record batch ${++recordBatchIndex}: ${JSON.stringify({
+ offset: body.byteOffset,
+ length: body.byteLength,
+ numRows: recordBatch.length,
+ })}`);
+ } else if (message.isDictionaryBatch()) {
+ const header = message.header();
+ bufferRegions = header.data.buffers;
+ const type = schema.dictionaries.get(header.id);
+ const body = await reader.readMessageBody(message.bodyLength);
+ const recordBatch = loadDictionaryBatch(header.data, body, type);
+ console.log(`dictionary batch ${++dictionaryBatchIndex}: ${JSON.stringify({
+ offset: body.byteOffset,
+ length: body.byteLength,
+ numRows: recordBatch.length,
+ dictionaryId: header.id,
+ })}`);
+ }
+
+ bufferRegions.forEach(({ offset, length }, i) => {
+ console.log(`\tbuffer ${i + 1}: { offset: ${offset}, length: ${length} }`);
+ });
+ }
+
+ await reader.return();
+
+})().catch((e) => { console.error(e); process.exit(1); });
+
+function loadRecordBatch(schema, header, body) {
+ return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields));
+}
+
+function loadDictionaryBatch(header, body, dictionaryType) {
+ return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType]));
+}
diff --git a/src/arrow/js/bin/stream-to-file.js b/src/arrow/js/bin/stream-to-file.js
new file mode 100755
index 000000000..015a5eace
--- /dev/null
+++ b/src/arrow/js/bin/stream-to-file.js
@@ -0,0 +1,40 @@
+#! /usr/bin/env node
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// @ts-check
+
+const fs = require('fs');
+const path = require('path');
+const eos = require('util').promisify(require('stream').finished);
+const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : '';
+const { RecordBatchReader, RecordBatchFileWriter } = require(`../index${extension}`);
+
+(async () => {
+
+ const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2]));
+ const writable = process.argv.length < 4 ? process.stdout : fs.createWriteStream(path.resolve(process.argv[3]));
+
+ const streamToFile = readable
+ .pipe(RecordBatchReader.throughNode())
+ .pipe(RecordBatchFileWriter.throughNode())
+ .pipe(writable);
+
+ await eos(streamToFile);
+
+})().catch((e) => { console.error(e); process.exit(1); });