diff options
Diffstat (limited to 'src/arrow/js/test/unit/ipc/writer')
5 files changed, 757 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts new file mode 100644 index 000000000..fa639e5f6 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import { validateRecordBatchIterator } from '../validate'; +import { Table, RecordBatchFileWriter } from 'apache-arrow'; + +describe('RecordBatchFileWriter', () => { + for (const table of generateRandomTables([10, 20, 30])) { + testFileWriter(table, `[${table.schema.fields.join(', ')}]`); + } + for (const table of generateDictionaryTables([10, 20, 30])) { + testFileWriter(table, `${table.schema.fields[0]}`); + } +}); + +function testFileWriter(table: Table, name: string) { + describe(`should write the Arrow IPC file format (${name})`, () => { + test(`Table`, validateTable.bind(0, table)); + }); +} + +async function validateTable(source: Table) { + const writer = RecordBatchFileWriter.writeAll(source); + const result = await Table.from(writer.toUint8Array()); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts new file mode 100644 index 000000000..05be0e272 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import { validateRecordBatchIterator } from '../validate'; +import { Table, RecordBatchJSONWriter } from 'apache-arrow'; + +describe('RecordBatchJSONWriter', () => { + for (const table of generateRandomTables([10, 20, 30])) { + testJSONWriter(table, `[${table.schema.fields.join(', ')}]`); + } + for (const table of generateDictionaryTables([10, 20, 30])) { + testJSONWriter(table, `${table.schema.fields[0]}`); + } +}); + +function testJSONWriter(table: Table, name: string) { + describe(`should write the Arrow IPC JSON format (${name})`, () => { + test(`Table`, validateTable.bind(0, table)); + }); +} + +async function validateTable(source: Table) { + const writer = RecordBatchJSONWriter.writeAll(source); + const result = Table.from(JSON.parse(await writer.toString())); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts new file mode 100644 index 000000000..a83aa39da --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import * as generate from '../../../generate-test-data'; +import { validateRecordBatchIterator } from '../validate'; +import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer'; +import { DictionaryVector, Dictionary, Uint32, Int32 } from 'apache-arrow'; +import { Table, Schema, Field, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from 'apache-arrow'; + +describe('RecordBatchStreamWriter', () => { + + (() => { + const type = generate.sparseUnion(0, 0).vector.type; + const schema = new Schema([new Field('dictSparseUnion', type)]); + const table = generate.table([10, 20, 30], schema).table; + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + })(); + + for (const table of generateRandomTables([10, 20, 30])) { + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + } + + for (const table of generateDictionaryTables([10, 20, 30])) { + const testName = `${table.schema.fields[0]}`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + } + + it(`should write multiple tables to the same output stream`, async () => { + const tables = [] as Table[]; + const writer = new RecordBatchStreamWriter({ autoDestroy: false }); + const validate = (async () => { + for await (const reader of RecordBatchReader.readAll(writer)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + })(); + for (const table of generateRandomTables([10, 20, 30])) { + tables.push(table); + await writer.writeAll((async function*() { + for (const chunk of table.chunks) { + yield chunk; // insert some asynchrony + await new Promise((r) => setTimeout(r, 1)); + } + }())); + } + writer.close(); + await validate; + }); + + it('should write delta dictionary batches', async () => { + + const name = 'dictionary_encoded_uint32'; + const chunks: DictionaryVector<Uint32, Int32>[] = []; + const { + vector: sourceVector, values: sourceValues, + } = generate.dictionary(1000, 20, new Uint32(), new Int32()); + + const writer = RecordBatchStreamWriter.writeAll((function* () { + const transform = Builder.throughIterable({ + type: sourceVector.type, nullValues: [null], + queueingStrategy: 'count', highWaterMark: 50, + }); + for (const chunk of transform(sourceValues())) { + chunks.push(chunk); + yield RecordBatch.new({ [name]: chunk }); + } + })()); + + expect(Chunked.concat(chunks)).toEqualVector(sourceVector); + + type T = { [name]: Dictionary<Uint32, Int32> }; + const sourceTable = Table.new({ [name]: sourceVector }); + const resultTable = await Table.from<T>(writer.toUint8Array()); + + const { dictionary } = resultTable.getColumn(name); + + expect(resultTable).toEqualTable(sourceTable); + expect((dictionary as Chunked)).toBeInstanceOf(Chunked); + expect((dictionary as Chunked).chunks).toHaveLength(20); + }); +}); + +function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) { + describe(`should write the Arrow IPC stream format (${name})`, () => { + test(`Table`, validateTable.bind(0, table, options)); + }); +} + +async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) { + const writer = RecordBatchStreamWriter.writeAll(source, options); + const result = await Table.from(writer.toUint8Array()); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts new file mode 100644 index 000000000..a19ddcdd7 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { from, as } from 'ix/asynciterable'; +import { tap, flatMap } from 'ix/asynciterable/operators'; + +import { + Table, + RecordBatchReader, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import { + ArrowIOTestHelper, + concatBuffersAsync, + readableDOMStreamToAsyncIterator +} from '../helpers'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader, + validateRecordBatchAsyncIterator +} from '../validate'; + +(() => { + + if (process.env.TEST_DOM_STREAMS !== 'true') { + return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchWriter.throughDOM (${name})`, () => { + + describe('file', () => { + describe(`convert`, () => { + test('ReadableStream', file.whatwgReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + test('ReadableByteStream', file.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchStreamWriter))); + }); + describe(`through`, () => { + test('ReadableStream', file.whatwgReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + test('ReadableByteStream', file.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchFileWriter))); + }); + }); + + describe('stream', () => { + describe(`convert`, () => { + test('ReadableStream', stream.whatwgReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + test('ReadableByteStream', stream.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchFileWriter))); + }); + describe(`through`, () => { + test('ReadableStream', stream.whatwgReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + test('ReadableByteStream', stream.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchStreamWriter))); + }); + }); + + async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) { + const stream = source + .pipeThrough(RecordBatchReader.throughDOM()) + .pipeThrough(RBWImplementation.throughDOM()); + const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream'; + await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream)); + } + + async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) { + const stream = source + .pipeThrough(RecordBatchReader.throughDOM()) + .pipeThrough(RBWImplementation.throughDOM()) + .pipeThrough(RecordBatchReader.throughDOM()); + await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream)); + } + }); + + describe(`toDOMStream (${name})`, () => { + + const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x)); + + describe(`RecordBatchJSONWriter`, () => { + + const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`); + + test('Uint8Array', json.buffer((source) => validate(toJSON(source)))); + test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source))))); + + async function validate(source: { schema: any } | Promise<{ schema: any }>) { + const reader = await RecordBatchReader.from(<any> source); + const writer = await RecordBatchJSONWriter.writeAll(reader); + const buffer = await concatBuffersAsync(writer.toDOMStream()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer))); + } + }); + + describe(`RecordBatchFileWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchFileWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('file', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchFileWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('file', 3, reader); + } + }); + }); + + describe(`RecordBatchStreamWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchStreamWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('stream', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchStreamWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('stream', 3, reader); + } + }); + }); + }); + } + + describe(`RecordBatchStreamWriter.throughDOM`, () => { + + const opts = { autoDestroy: false }; + const sleep = (n: number) => new Promise((r) => setTimeout(r, n)); + + it(`should write a stream of tables to the same output stream`, async () => { + + const tables = [] as Table[]; + const stream: ReadableStream<any> = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(stream.locked).toBe(false); + }); + + it(`should write a stream of record batches to the same output stream`, async () => { + + const tables = [] as Table[]; + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + // flatMap from Table -> RecordBatches[] + .pipe(flatMap((table) => as(table.chunks))) + .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(stream.locked).toBe(false); + }); + }); + +})(); diff --git a/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts new file mode 100644 index 000000000..662129b1b --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { from, as } from 'ix/asynciterable'; +import { tap, flatMap } from 'ix/asynciterable/operators'; +import 'ix/Ix.node'; + +import { + Table, + RecordBatchReader, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import { + ArrowIOTestHelper, + concatBuffersAsync +} from '../helpers'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader, + validateRecordBatchAsyncIterator +} from '../validate'; + +(() => { + + if (process.env.TEST_NODE_STREAMS !== 'true') { + return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchWriter.throughNode (${name})`, () => { + + describe('file', () => { + describe(`convert`, () => { + test('fs.ReadStream', file.fsReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + test('stream.Readable', file.nodeReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + }); + describe(`through`, () => { + test('fs.ReadStream', file.fsReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + test('stream.Readable', file.nodeReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + }); + }); + + describe('stream', () => { + describe(`convert`, () => { + test('fs.ReadStream', stream.fsReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + test('stream.Readable', stream.nodeReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + }); + describe(`through`, () => { + test('fs.ReadStream', stream.fsReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + test('stream.Readable', stream.nodeReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + }); + }); + + async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) { + const stream = source + .pipe(RecordBatchReader.throughNode()) + .pipe(RBWImplementation.throughNode()); + const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream'; + await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream)); + } + + async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) { + const stream = source + .pipe(RecordBatchReader.throughNode()) + .pipe(RBWImplementation.throughNode()) + .pipe(RecordBatchReader.throughNode()); + await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]()); + } + }); + + describe(`toNodeStream (${name})`, () => { + + const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x)); + + describe(`RecordBatchJSONWriter`, () => { + + const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`); + + test('Uint8Array', json.buffer((source) => validate(toJSON(source)))); + test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source))))); + + async function validate(source: { schema: any } | Promise<{ schema: any }>) { + const reader = await RecordBatchReader.from(<any> source); + const writer = await RecordBatchJSONWriter.writeAll(reader); + const buffer = await concatBuffersAsync(writer.toNodeStream()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer))); + } + }); + + describe(`RecordBatchFileWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchFileWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('file', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchFileWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('file', 3, reader); + } + }); + }); + + describe(`RecordBatchStreamWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchStreamWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('stream', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchStreamWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('stream', 3, reader); + } + }); + }); + }); + } + + describe(`RecordBatchStreamWriter.throughNode`, () => { + + const sleep = (n: number) => new Promise((r) => setTimeout(r, n)); + + it(`should write a stream of tables to the same output stream`, async () => { + + const tables = [] as Table[]; + const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false }); + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipe(writer); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(writer.readable).toBe(false); + expect((writer as any).destroyed).toBe(true); + }); + + it(`should write a stream of record batches to the same output stream`, async () => { + + const tables = [] as Table[]; + const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false }); + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipe(flatMap((table) => as(table.chunks))) + .pipe(writer); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(writer.readable).toBe(false); + expect((writer as any).destroyed).toBe(true); + }); + + }); +})(); |