summaryrefslogtreecommitdiffstats
path: root/src/arrow/js/test/unit/ipc/writer
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/js/test/unit/ipc/writer')
-rw-r--r--src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts46
-rw-r--r--src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts46
-rw-r--r--src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts119
-rw-r--r--src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts272
-rw-r--r--src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts274
5 files changed, 757 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts
new file mode 100644
index 000000000..fa639e5f6
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ generateDictionaryTables
+} from '../../../data/tables';
+
+import { validateRecordBatchIterator } from '../validate';
+import { Table, RecordBatchFileWriter } from 'apache-arrow';
+
+describe('RecordBatchFileWriter', () => {
+ for (const table of generateRandomTables([10, 20, 30])) {
+ testFileWriter(table, `[${table.schema.fields.join(', ')}]`);
+ }
+ for (const table of generateDictionaryTables([10, 20, 30])) {
+ testFileWriter(table, `${table.schema.fields[0]}`);
+ }
+});
+
+function testFileWriter(table: Table, name: string) {
+ describe(`should write the Arrow IPC file format (${name})`, () => {
+ test(`Table`, validateTable.bind(0, table));
+ });
+}
+
+async function validateTable(source: Table) {
+ const writer = RecordBatchFileWriter.writeAll(source);
+ const result = await Table.from(writer.toUint8Array());
+ validateRecordBatchIterator(3, source.chunks);
+ expect(result).toEqualTable(source);
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts
new file mode 100644
index 000000000..05be0e272
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ generateDictionaryTables
+} from '../../../data/tables';
+
+import { validateRecordBatchIterator } from '../validate';
+import { Table, RecordBatchJSONWriter } from 'apache-arrow';
+
+describe('RecordBatchJSONWriter', () => {
+ for (const table of generateRandomTables([10, 20, 30])) {
+ testJSONWriter(table, `[${table.schema.fields.join(', ')}]`);
+ }
+ for (const table of generateDictionaryTables([10, 20, 30])) {
+ testJSONWriter(table, `${table.schema.fields[0]}`);
+ }
+});
+
+function testJSONWriter(table: Table, name: string) {
+ describe(`should write the Arrow IPC JSON format (${name})`, () => {
+ test(`Table`, validateTable.bind(0, table));
+ });
+}
+
+async function validateTable(source: Table) {
+ const writer = RecordBatchJSONWriter.writeAll(source);
+ const result = Table.from(JSON.parse(await writer.toString()));
+ validateRecordBatchIterator(3, source.chunks);
+ expect(result).toEqualTable(source);
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts
new file mode 100644
index 000000000..a83aa39da
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ generateDictionaryTables
+} from '../../../data/tables';
+
+import * as generate from '../../../generate-test-data';
+import { validateRecordBatchIterator } from '../validate';
+import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer';
+import { DictionaryVector, Dictionary, Uint32, Int32 } from 'apache-arrow';
+import { Table, Schema, Field, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from 'apache-arrow';
+
+describe('RecordBatchStreamWriter', () => {
+
+ (() => {
+ const type = generate.sparseUnion(0, 0).vector.type;
+ const schema = new Schema([new Field('dictSparseUnion', type)]);
+ const table = generate.table([10, 20, 30], schema).table;
+ const testName = `[${table.schema.fields.join(', ')}]`;
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+ })();
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+ const testName = `[${table.schema.fields.join(', ')}]`;
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+ }
+
+ for (const table of generateDictionaryTables([10, 20, 30])) {
+ const testName = `${table.schema.fields[0]}`;
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+ }
+
+ it(`should write multiple tables to the same output stream`, async () => {
+ const tables = [] as Table[];
+ const writer = new RecordBatchStreamWriter({ autoDestroy: false });
+ const validate = (async () => {
+ for await (const reader of RecordBatchReader.readAll(writer)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+ })();
+ for (const table of generateRandomTables([10, 20, 30])) {
+ tables.push(table);
+ await writer.writeAll((async function*() {
+ for (const chunk of table.chunks) {
+ yield chunk; // insert some asynchrony
+ await new Promise((r) => setTimeout(r, 1));
+ }
+ }()));
+ }
+ writer.close();
+ await validate;
+ });
+
+ it('should write delta dictionary batches', async () => {
+
+ const name = 'dictionary_encoded_uint32';
+ const chunks: DictionaryVector<Uint32, Int32>[] = [];
+ const {
+ vector: sourceVector, values: sourceValues,
+ } = generate.dictionary(1000, 20, new Uint32(), new Int32());
+
+ const writer = RecordBatchStreamWriter.writeAll((function* () {
+ const transform = Builder.throughIterable({
+ type: sourceVector.type, nullValues: [null],
+ queueingStrategy: 'count', highWaterMark: 50,
+ });
+ for (const chunk of transform(sourceValues())) {
+ chunks.push(chunk);
+ yield RecordBatch.new({ [name]: chunk });
+ }
+ })());
+
+ expect(Chunked.concat(chunks)).toEqualVector(sourceVector);
+
+ type T = { [name]: Dictionary<Uint32, Int32> };
+ const sourceTable = Table.new({ [name]: sourceVector });
+ const resultTable = await Table.from<T>(writer.toUint8Array());
+
+ const { dictionary } = resultTable.getColumn(name);
+
+ expect(resultTable).toEqualTable(sourceTable);
+ expect((dictionary as Chunked)).toBeInstanceOf(Chunked);
+ expect((dictionary as Chunked).chunks).toHaveLength(20);
+ });
+});
+
+function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) {
+ describe(`should write the Arrow IPC stream format (${name})`, () => {
+ test(`Table`, validateTable.bind(0, table, options));
+ });
+}
+
+async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) {
+ const writer = RecordBatchStreamWriter.writeAll(source, options);
+ const result = await Table.from(writer.toUint8Array());
+ validateRecordBatchIterator(3, source.chunks);
+ expect(result).toEqualTable(source);
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts
new file mode 100644
index 000000000..a19ddcdd7
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import { from, as } from 'ix/asynciterable';
+import { tap, flatMap } from 'ix/asynciterable/operators';
+
+import {
+ Table,
+ RecordBatchReader,
+ RecordBatchWriter,
+ RecordBatchFileWriter,
+ RecordBatchJSONWriter,
+ RecordBatchStreamWriter,
+} from 'apache-arrow';
+
+import {
+ ArrowIOTestHelper,
+ concatBuffersAsync,
+ readableDOMStreamToAsyncIterator
+} from '../helpers';
+
+import {
+ validateRecordBatchReader,
+ validateAsyncRecordBatchReader,
+ validateRecordBatchAsyncIterator
+} from '../validate';
+
+(() => {
+
+ if (process.env.TEST_DOM_STREAMS !== 'true') {
+ return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {});
+ }
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+
+ const file = ArrowIOTestHelper.file(table);
+ const json = ArrowIOTestHelper.json(table);
+ const stream = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchWriter.throughDOM (${name})`, () => {
+
+ describe('file', () => {
+ describe(`convert`, () => {
+ test('ReadableStream', file.whatwgReadableStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ test('ReadableByteStream', file.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ });
+ describe(`through`, () => {
+ test('ReadableStream', file.whatwgReadableStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ test('ReadableByteStream', file.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ });
+ });
+
+ describe('stream', () => {
+ describe(`convert`, () => {
+ test('ReadableStream', stream.whatwgReadableStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ test('ReadableByteStream', stream.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ });
+ describe(`through`, () => {
+ test('ReadableStream', stream.whatwgReadableStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ test('ReadableByteStream', stream.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ });
+ });
+
+ async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) {
+ const stream = source
+ .pipeThrough(RecordBatchReader.throughDOM())
+ .pipeThrough(RBWImplementation.throughDOM());
+ const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream';
+ await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream));
+ }
+
+ async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) {
+ const stream = source
+ .pipeThrough(RecordBatchReader.throughDOM())
+ .pipeThrough(RBWImplementation.throughDOM())
+ .pipeThrough(RecordBatchReader.throughDOM());
+ await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream));
+ }
+ });
+
+ describe(`toDOMStream (${name})`, () => {
+
+ const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x));
+
+ describe(`RecordBatchJSONWriter`, () => {
+
+ const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`);
+
+ test('Uint8Array', json.buffer((source) => validate(toJSON(source))));
+ test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source)))));
+
+ async function validate(source: { schema: any } | Promise<{ schema: any }>) {
+ const reader = await RecordBatchReader.from(<any> source);
+ const writer = await RecordBatchJSONWriter.writeAll(reader);
+ const buffer = await concatBuffersAsync(writer.toDOMStream());
+ validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer)));
+ }
+ });
+
+ describe(`RecordBatchFileWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchFileWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('file', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchFileWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('file', 3, reader);
+ }
+ });
+ });
+
+ describe(`RecordBatchStreamWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchStreamWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('stream', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchStreamWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('stream', 3, reader);
+ }
+ });
+ });
+ });
+ }
+
+ describe(`RecordBatchStreamWriter.throughDOM`, () => {
+
+ const opts = { autoDestroy: false };
+ const sleep = (n: number) => new Promise((r) => setTimeout(r, n));
+
+ it(`should write a stream of tables to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const stream: ReadableStream<any> = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(stream.locked).toBe(false);
+ });
+
+ it(`should write a stream of record batches to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const stream = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ // flatMap from Table -> RecordBatches[]
+ .pipe(flatMap((table) => as(table.chunks)))
+ .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(stream.locked).toBe(false);
+ });
+ });
+
+})();
diff --git a/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts
new file mode 100644
index 000000000..662129b1b
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import { from, as } from 'ix/asynciterable';
+import { tap, flatMap } from 'ix/asynciterable/operators';
+import 'ix/Ix.node';
+
+import {
+ Table,
+ RecordBatchReader,
+ RecordBatchWriter,
+ RecordBatchFileWriter,
+ RecordBatchJSONWriter,
+ RecordBatchStreamWriter,
+} from 'apache-arrow';
+
+import {
+ ArrowIOTestHelper,
+ concatBuffersAsync
+} from '../helpers';
+
+import {
+ validateRecordBatchReader,
+ validateAsyncRecordBatchReader,
+ validateRecordBatchAsyncIterator
+} from '../validate';
+
+(() => {
+
+ if (process.env.TEST_NODE_STREAMS !== 'true') {
+ return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {});
+ }
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+
+ const file = ArrowIOTestHelper.file(table);
+ const json = ArrowIOTestHelper.json(table);
+ const stream = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchWriter.throughNode (${name})`, () => {
+
+ describe('file', () => {
+ describe(`convert`, () => {
+ test('fs.ReadStream', file.fsReadableStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ test('stream.Readable', file.nodeReadableStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ });
+ describe(`through`, () => {
+ test('fs.ReadStream', file.fsReadableStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ test('stream.Readable', file.nodeReadableStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ });
+ });
+
+ describe('stream', () => {
+ describe(`convert`, () => {
+ test('fs.ReadStream', stream.fsReadableStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ test('stream.Readable', stream.nodeReadableStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ });
+ describe(`through`, () => {
+ test('fs.ReadStream', stream.fsReadableStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ test('stream.Readable', stream.nodeReadableStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ });
+ });
+
+ async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) {
+ const stream = source
+ .pipe(RecordBatchReader.throughNode())
+ .pipe(RBWImplementation.throughNode());
+ const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream';
+ await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream));
+ }
+
+ async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) {
+ const stream = source
+ .pipe(RecordBatchReader.throughNode())
+ .pipe(RBWImplementation.throughNode())
+ .pipe(RecordBatchReader.throughNode());
+ await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]());
+ }
+ });
+
+ describe(`toNodeStream (${name})`, () => {
+
+ const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x));
+
+ describe(`RecordBatchJSONWriter`, () => {
+
+ const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`);
+
+ test('Uint8Array', json.buffer((source) => validate(toJSON(source))));
+ test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source)))));
+
+ async function validate(source: { schema: any } | Promise<{ schema: any }>) {
+ const reader = await RecordBatchReader.from(<any> source);
+ const writer = await RecordBatchJSONWriter.writeAll(reader);
+ const buffer = await concatBuffersAsync(writer.toNodeStream());
+ validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer)));
+ }
+ });
+
+ describe(`RecordBatchFileWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchFileWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('file', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchFileWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('file', 3, reader);
+ }
+ });
+ });
+
+ describe(`RecordBatchStreamWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchStreamWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('stream', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchStreamWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('stream', 3, reader);
+ }
+ });
+ });
+ });
+ }
+
+ describe(`RecordBatchStreamWriter.throughNode`, () => {
+
+ const sleep = (n: number) => new Promise((r) => setTimeout(r, n));
+
+ it(`should write a stream of tables to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false });
+ const stream = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ .pipe(writer);
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(writer.readable).toBe(false);
+ expect((writer as any).destroyed).toBe(true);
+ });
+
+ it(`should write a stream of record batches to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false });
+ const stream = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ .pipe(flatMap((table) => as(table.chunks)))
+ .pipe(writer);
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(writer.readable).toBe(false);
+ expect((writer as any).destroyed).toBe(true);
+ });
+
+ });
+})();