diff options
Diffstat (limited to 'src/arrow/js/test/unit/ipc')
-rw-r--r-- | src/arrow/js/test/unit/ipc/helpers.ts | 202 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/message-reader-tests.ts | 109 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts | 123 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts | 150 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts | 40 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts | 65 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts | 227 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts | 219 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/validate.ts | 74 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts | 46 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts | 46 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts | 119 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts | 272 | ||||
-rw-r--r-- | src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts | 274 |
14 files changed, 1966 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/ipc/helpers.ts b/src/arrow/js/test/unit/ipc/helpers.ts new file mode 100644 index 000000000..9fccefec9 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/helpers.ts @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; + +import { + Table, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import * as fs from 'fs'; +import { fs as memfs } from 'memfs'; +import { Readable, PassThrough } from 'stream'; +import randomatic from 'randomatic'; + +export abstract class ArrowIOTestHelper { + + constructor(public table: Table) {} + + public static file(table: Table) { return new ArrowFileIOTestHelper(table); } + public static json(table: Table) { return new ArrowJsonIOTestHelper(table); } + public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); } + + protected abstract writer(table: Table): RecordBatchWriter; + protected async filepath(table: Table): Promise<fs.PathLike> { + const path = `/${randomatic('a0', 20)}.arrow`; + const data = await this.writer(table).toUint8Array(); + await memfs.promises.writeFile(path, data); + return path; + } + + buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(await this.writer(this.table).toUint8Array()); + }; + } + iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(chunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(<any> await memfs.promises.open(path, 'r')); + await memfs.promises.unlink(path); + }; + } + fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(<any> memfs.createReadStream(path)); + await memfs.promises.unlink(path); + }; + } + nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const sink = new PassThrough(); + sink.end(await this.writer(this.table).toUint8Array()); + await testFn(sink); + }; + } + whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path))); + await memfs.promises.unlink(path); + }; + } + whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' })); + await memfs.promises.unlink(path); + }; + } +} + +class ArrowFileIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchFileWriter.writeAll(table); + } +} + +class ArrowJsonIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchJSONWriter.writeAll(table); + } +} + +class ArrowStreamIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchStreamWriter.writeAll(table); + } +} + +export function* chunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function* asyncChunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) { + if (iterator instanceof ReadableStream) { + iterator = readableDOMStreamToAsyncIterator(iterator); + } + let chunks = [], total = 0; + for await (const chunk of iterator) { + chunks.push(chunk); + total += chunk.byteLength; + } + return chunks.reduce((x, buffer) => { + x.buffer.set(buffer, x.offset); + x.offset += buffer.byteLength; + return x; + }, { offset: 0, buffer: new Uint8Array(total) }).buffer; +} + +export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) { + // Get a lock on the stream + const reader = stream.getReader(); + try { + while (true) { + // Read from the stream + const { done, value } = await reader.read(); + // Exit if we're done + if (done) { break; } + // Else yield the chunk + yield value as T; + } + } finally { + try { stream.locked && reader.releaseLock(); } catch (e) {} + } +} + +export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) { + stream = new Readable((stream as any)._readableState).wrap(stream); + return new ReadableStream<T>({ + ...opts, + start(controller) { + stream.pause(); + stream.on('data', (chunk) => { + controller.enqueue(chunk); + stream.pause(); + }); + stream.on('end', () => controller.close()); + stream.on('error', e => controller.error(e)); + }, + pull() { stream.resume(); }, + cancel(reason) { + stream.pause(); + if (typeof (stream as any).cancel === 'function') { + return (stream as any).cancel(reason); + } else if (typeof (stream as any).destroy === 'function') { + return (stream as any).destroy(reason); + } + } + }); +} diff --git a/src/arrow/js/test/unit/ipc/message-reader-tests.ts b/src/arrow/js/test/unit/ipc/message-reader-tests.ts new file mode 100644 index 000000000..c48aa2ce1 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/message-reader-tests.ts @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// import * as fs from 'fs'; +import { + generateRandomTables, + // generateDictionaryTables +} from '../../data/tables'; + +import { ArrowIOTestHelper } from './helpers'; +import { MessageReader, AsyncMessageReader } from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + const numMessages = table.chunks.reduce((numMessages, batch) => { + return numMessages + + /* recordBatch message */ 1 + + /* dictionary messages */ batch.dictionaries.size; + }, /* schema message */ 1); + + const validate = validateMessageReader.bind(0, numMessages); + const validateAsync = validateAsyncMessageReader.bind(0, numMessages); + + describe(`MessageReader (${name})`, () => { + describe(`should read all Messages`, () => { + test(`Uint8Array`, io.buffer(validate)); + test(`Iterable`, io.iterable(validate)); + }); + }); + + describe(`AsyncMessageReader (${name})`, () => { + describe(`should read all Messages`, () => { + test('AsyncIterable', io.asyncIterable(validateAsync)); + test('fs.FileHandle', io.fsFileHandle(validateAsync)); + test('fs.ReadStream', io.fsReadableStream(validateAsync)); + test('stream.Readable', io.nodeReadableStream(validateAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync)); + }); + }); +} + +export function validateMessageReader(numMessages: number, source: any) { + const reader = new MessageReader(source); + let index = 0; + for (let message of reader) { + + if (index === 0) { + expect(message.isSchema()).toBe(true); + expect(message.bodyLength).toBe(0); + } else { + expect(message.isSchema()).toBe(false); + expect(message.isRecordBatch() || message.isDictionaryBatch()).toBe(true); + } + + try { + expect(message.bodyLength % 8).toBe(0); + } catch (e) { throw new Error(`bodyLength: ${e}`); } + + const body = reader.readMessageBody(message.bodyLength); + expect(body).toBeInstanceOf(Uint8Array); + expect(body.byteLength).toBe(message.bodyLength); + expect(index++).toBeLessThan(numMessages); + } + expect(index).toBe(numMessages); + reader.return(); +} + +export async function validateAsyncMessageReader(numMessages: number, source: any) { + const reader = new AsyncMessageReader(source); + let index = 0; + for await (let message of reader) { + + if (index === 0) { + expect(message.isSchema()).toBe(true); + expect(message.bodyLength).toBe(0); + } else { + expect(message.isSchema()).toBe(false); + expect(message.isRecordBatch() || message.isDictionaryBatch()).toBe(true); + } + + try { + expect(message.bodyLength % 8).toBe(0); + } catch (e) { throw new Error(`bodyLength: ${e}`); } + + const body = await reader.readMessageBody(message.bodyLength); + expect(body).toBeInstanceOf(Uint8Array); + expect(body.byteLength).toBe(message.bodyLength); + expect(index++).toBeLessThan(numMessages); + } + expect(index).toBe(numMessages); + await reader.return(); +} diff --git a/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts new file mode 100644 index 000000000..a7ddfc940 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; +import { ArrowIOTestHelper } from '../helpers'; +import { toArray } from 'ix/asynciterable/toarray'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader +} from '../validate'; + +import { + RecordBatchReader, + RecordBatchFileReader, + AsyncRecordBatchFileReader +} from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.file(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + const validate = (source: any) => { validateRecordBatchReader('file', 3, RecordBatchReader.from(source)); }; + const validateAsync = async (source: any) => { await validateAsyncRecordBatchReader('file', 3, await RecordBatchReader.from(source)); }; + const validateAsyncWrapped = async (source: any) => { await validateAsyncRecordBatchReader('file', 3, await RecordBatchReader.from(Promise.resolve(source))); }; + + describe(`RecordBatchFileReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + test(`Uint8Array`, io.buffer(validate)); + test(`Iterable`, io.iterable(validate)); + }); + describe(`should allow random access to record batches after iterating when autoDestroy=false`, () => { + test(`Uint8Array`, io.buffer(validateRandomAccess)); + test(`Iterable`, io.iterable(validateRandomAccess)); + }); + }); + + describe(`AsyncRecordBatchFileReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + + test('AsyncIterable', io.asyncIterable(validateAsync)); + test('fs.FileHandle', io.fsFileHandle(validateAsync)); + test('fs.ReadStream', io.fsReadableStream(validateAsync)); + test('stream.Readable', io.nodeReadableStream(validateAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync)); + + test('Promise<AsyncIterable>', io.asyncIterable(validateAsyncWrapped)); + test('Promise<fs.FileHandle>', io.fsFileHandle(validateAsyncWrapped)); + test('Promise<fs.ReadStream>', io.fsReadableStream(validateAsyncWrapped)); + test('Promise<stream.Readable>', io.nodeReadableStream(validateAsyncWrapped)); + test('Promise<ReadableStream>', io.whatwgReadableStream(validateAsyncWrapped)); + test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateAsyncWrapped)); + }); + + describe(`should allow random access to record batches after iterating when autoDestroy=false`, () => { + + test('AsyncIterable', io.asyncIterable(validateRandomAccessAsync)); + test('fs.FileHandle', io.fsFileHandle(validateRandomAccessAsync)); + test('fs.ReadStream', io.fsReadableStream(validateRandomAccessAsync)); + test('stream.Readable', io.nodeReadableStream(validateRandomAccessAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateRandomAccessAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateRandomAccessAsync)); + + test('Promise<AsyncIterable>', io.asyncIterable(validateRandomAccessAsync)); + test('Promise<fs.FileHandle>', io.fsFileHandle(validateRandomAccessAsync)); + test('Promise<fs.ReadStream>', io.fsReadableStream(validateRandomAccessAsync)); + test('Promise<stream.Readable>', io.nodeReadableStream(validateRandomAccessAsync)); + test('Promise<ReadableStream>', io.whatwgReadableStream(validateRandomAccessAsync)); + test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateRandomAccessAsync)); + }); + }); +} + +function validateRandomAccess(source: any) { + const reader = RecordBatchReader.from(source) as RecordBatchFileReader; + const schema = reader.open({ autoDestroy: false }).schema; + const batches = [...reader]; + expect(reader.closed).toBe(false); + expect(reader.schema).toBe(schema); + while (batches.length > 0) { + const expected = batches.pop()!; + const actual = reader.readRecordBatch(batches.length); + expect(actual).toEqualRecordBatch(expected); + } + reader.cancel(); + expect(reader.closed).toBe(true); + expect(reader.schema).toBeUndefined(); +} + +async function validateRandomAccessAsync(source: any) { + const reader = (await RecordBatchReader.from(source)) as AsyncRecordBatchFileReader; + const schema = (await reader.open({ autoDestroy: false })).schema; + const batches = await toArray(reader); + expect(reader.closed).toBe(false); + expect(reader.schema).toBe(schema); + while (batches.length > 0) { + const expected = batches.pop()!; + const actual = await reader.readRecordBatch(batches.length); + expect(actual).toEqualRecordBatch(expected); + } + await reader.cancel(); + expect(reader.closed).toBe(true); + expect(reader.schema).toBeUndefined(); +} diff --git a/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts b/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts new file mode 100644 index 000000000..c444b78fc --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { ArrowIOTestHelper } from '../helpers'; +import { + RecordBatchReader, + RecordBatchFileReader, + RecordBatchStreamReader, + AsyncRecordBatchFileReader, + AsyncRecordBatchStreamReader +} from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + // eslint-disable-next-line jest/valid-describe + describe('RecordBatchReader.from', ((table, name) => () => { + testFromFile(ArrowIOTestHelper.file(table), name); + testFromJSON(ArrowIOTestHelper.json(table), name); + testFromStream(ArrowIOTestHelper.stream(table), name); + })(table, name)); +} + +function testFromJSON(io: ArrowIOTestHelper, name: string) { + describe(`should return a RecordBatchJSONReader (${name})`, () => { + test(`Uint8Array`, io.buffer((buffer) => { + const json = JSON.parse(`${Buffer.from(buffer)}`); + const reader = RecordBatchReader.from(json); + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchStreamReader); + })); + }); +} + +function testFromFile(io: ArrowIOTestHelper, name: string) { + + describe(`should return a RecordBatchFileReader (${name})`, () => { + + test(`Uint8Array`, io.buffer(syncSync)); + test(`Iterable`, io.iterable(syncSync)); + test('AsyncIterable', io.asyncIterable(asyncSync)); + test('fs.FileHandle', io.fsFileHandle(asyncAsync)); + test('fs.ReadStream', io.fsReadableStream(asyncSync)); + test('stream.Readable', io.nodeReadableStream(asyncSync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(asyncSync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(asyncSync)); + + test(`Promise<Uint8Array>`, io.buffer((source) => asyncSync(Promise.resolve(source)))); + test(`Promise<Iterable>`, io.iterable((source) => asyncSync(Promise.resolve(source)))); + test('Promise<AsyncIterable>', io.asyncIterable((source) => asyncSync(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', io.fsFileHandle((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', io.fsReadableStream((source) => asyncSync(Promise.resolve(source)))); + test('Promise<stream.Readable>', io.nodeReadableStream((source) => asyncSync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableStream>', io.whatwgReadableStream((source) => asyncSync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableByteStream>', io.whatwgReadableByteStream((source) => asyncSync(Promise.resolve(source)))); + }); + + function syncSync(source: any) { + const reader = RecordBatchReader.from(source); + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchFileReader); + } + + async function asyncSync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchFileReader); + } + + async function asyncAsync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(false); + expect(reader.isAsync()).toEqual(true); + expect(reader).toBeInstanceOf(AsyncRecordBatchFileReader); + } +} + +function testFromStream(io: ArrowIOTestHelper, name: string) { + + describe(`should return a RecordBatchStreamReader (${name})`, () => { + + test(`Uint8Array`, io.buffer(syncSync)); + test(`Iterable`, io.iterable(syncSync)); + test('AsyncIterable', io.asyncIterable(asyncAsync)); + test('fs.FileHandle', io.fsFileHandle(asyncAsync)); + test('fs.ReadStream', io.fsReadableStream(asyncAsync)); + test('stream.Readable', io.nodeReadableStream(asyncAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(asyncAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(asyncAsync)); + + test(`Promise<Uint8Array>`, io.buffer((source) => asyncSync(Promise.resolve(source)))); + test(`Promise<Iterable>`, io.iterable((source) => asyncSync(Promise.resolve(source)))); + test('Promise<AsyncIterable>', io.asyncIterable((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', io.fsFileHandle((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', io.fsReadableStream((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<stream.Readable>', io.nodeReadableStream((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableStream>', io.whatwgReadableStream((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableByteStream>', io.whatwgReadableByteStream((source) => asyncAsync(Promise.resolve(source)))); + }); + + function syncSync(source: any) { + const reader = RecordBatchReader.from(source); + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchStreamReader); + } + + async function asyncSync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchStreamReader); + } + + async function asyncAsync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(false); + expect(reader.isAsync()).toEqual(true); + expect(reader).toBeInstanceOf(AsyncRecordBatchStreamReader); + } +} diff --git a/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts new file mode 100644 index 000000000..9bd1e3466 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { ArrowIOTestHelper } from '../helpers'; +import { RecordBatchReader } from 'apache-arrow'; +import { validateRecordBatchReader } from '../validate'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.json(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchJSONReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + test(`Uint8Array`, io.buffer((buffer) => { + const json = JSON.parse(Buffer.from(buffer).toString()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(json)); + })); + }); + }); +} diff --git a/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts new file mode 100644 index 000000000..23879cf79 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader +} from '../validate'; + +import { ArrowIOTestHelper } from '../helpers'; +import { RecordBatchReader } from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + const validate = (source: any) => { validateRecordBatchReader('stream', 3, RecordBatchReader.from(source)); }; + const validateAsync = async (source: any) => { await validateAsyncRecordBatchReader('stream', 3, await RecordBatchReader.from(source)); }; + const validateAsyncWrapped = async (source: any) => { await validateAsyncRecordBatchReader('stream', 3, await RecordBatchReader.from(Promise.resolve(source))); }; + + describe(`RecordBatchStreamReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + test(`Uint8Array`, io.buffer(validate)); + test(`Iterable`, io.iterable(validate)); + }); + }); + + describe(`AsyncRecordBatchStreamReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + + test('AsyncIterable', io.asyncIterable(validateAsync)); + test('fs.FileHandle', io.fsFileHandle(validateAsync)); + test('fs.ReadStream', io.fsReadableStream(validateAsync)); + test('stream.Readable', io.nodeReadableStream(validateAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync)); + + test('Promise<AsyncIterable>', io.asyncIterable(validateAsyncWrapped)); + test('Promise<fs.FileHandle>', io.fsFileHandle(validateAsyncWrapped)); + test('Promise<fs.ReadStream>', io.fsReadableStream(validateAsyncWrapped)); + test('Promise<stream.Readable>', io.nodeReadableStream(validateAsyncWrapped)); + test('Promise<ReadableStream>', io.whatwgReadableStream(validateAsyncWrapped)); + test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateAsyncWrapped)); + }); + }); +} diff --git a/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts new file mode 100644 index 000000000..a380e1619 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { + Table, + RecordBatchReader, + RecordBatchStreamWriter +} from 'apache-arrow'; + +import { validateRecordBatchAsyncIterator } from '../validate'; +import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'; + +(() => { + + if (process.env.TEST_DOM_STREAMS !== 'true') { + return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchReader.throughDOM (${name})`, () => { + describe('file', () => { + test('ReadableStream', file.whatwgReadableStream(validate)); + test('ReadableByteStream', file.whatwgReadableByteStream(validate)); + }); + describe('stream', () => { + test('ReadableStream', stream.whatwgReadableStream(validate)); + test('ReadableByteStream', stream.whatwgReadableByteStream(validate)); + }); + async function validate(source: ReadableStream) { + const stream = source.pipeThrough(RecordBatchReader.throughDOM()); + await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream)); + } + }); + + describe(`toDOMStream (${name})`, () => { + + describe(`RecordBatchJSONReader`, () => { + test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`)))); + }); + + describe(`RecordBatchFileReader`, () => { + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + describe(`RecordBatchStreamReader`, () => { + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + async function validate(source: any) { + const reader: RecordBatchReader = await RecordBatchReader.from(source); + const iterator = readableDOMStreamToAsyncIterator(reader.toDOMStream()); + await validateRecordBatchAsyncIterator(3, iterator); + } + }); + } + + it('readAll() should pipe to separate WhatWG WritableStreams', async () => { + // @ts-ignore + const { concatStream } = await import('@openpgp/web-stream-tools'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = concatStream(tables.map((table, i) => + RecordBatchStreamWriter.writeAll(table).toDOMStream({ + // Alternate between bytes mode and regular mode because code coverage + type: i % 2 === 0 ? 'bytes' : undefined + }) + )) as ReadableStream<Uint8Array>; + + let tableIndex = -1; + let reader: RecordBatchReader | undefined; + + for await (reader of RecordBatchReader.readAll(stream)) { + + validateStreamState(reader, stream, false); + + const output = reader + .pipeThrough(RecordBatchStreamWriter.throughDOM()) + .pipeThrough(new TransformStream()); + + validateStreamState(reader, output, false, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(output); + expect(streamTable).toEqualTable(sourceTable); + expect(output.locked).toBe(false); + } + + expect(reader).toBeDefined(); + validateStreamState(reader!, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => { + // @ts-ignore + const { concatStream } = await import('@openpgp/web-stream-tools'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = concatStream(tables.map((table, i) => + RecordBatchStreamWriter.writeAll(table).toDOMStream({ + // Alternate between bytes mode and regular mode because code coverage + type: i % 2 === 0 ? 'bytes' : undefined + }) + )) as ReadableStream<Uint8Array>; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + validateStreamState(reader, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should close the underlying WhatWG ReadableStream when reading multiple tables and we break early', async () => { + // @ts-ignore + const { concatStream } = await import('@openpgp/web-stream-tools'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = concatStream(tables.map((table, i) => + RecordBatchStreamWriter.writeAll(table).toDOMStream({ + // Alternate between bytes mode and regular mode because code coverage + type: i % 2 === 0 ? 'bytes' : undefined + }) + )) as ReadableStream<Uint8Array>; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + let batchIndex = -1; + const sourceTable = tables[++tableIndex]; + const breakEarly = tableIndex === (tables.length / 2 | 0); + + for await (const streamBatch of reader) { + expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]); + if (breakEarly && batchIndex === 1) { break; } + } + if (breakEarly) { + // the reader should stay open until we break from the outermost loop + validateStreamState(reader, stream, false); + break; + } + } + + validateStreamState(reader, stream, true); + expect(tableIndex).toBe(tables.length / 2 | 0); + }); +})(); + +function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) { + expect(reader.closed).toBe(closed); + expect(stream.locked).toBe(locked); +} diff --git a/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts new file mode 100644 index 000000000..822f99350 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables +} from '../../../data/tables'; + +import { + Table, + RecordBatchReader, + RecordBatchStreamWriter +} from 'apache-arrow'; + +import { ArrowIOTestHelper } from '../helpers'; +import { validateRecordBatchAsyncIterator } from '../validate'; + +(() => { + + if (process.env.TEST_NODE_STREAMS !== 'true') { + return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchReader.throughNode (${name})`, () => { + describe('file', () => { + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + }); + describe('stream', () => { + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + }); + async function validate(source: NodeJS.ReadableStream) { + const stream = source.pipe(RecordBatchReader.throughNode()); + await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]()); + } + }); + + describe(`toNodeStream (${name})`, () => { + + describe(`RecordBatchJSONReader`, () => { + test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`)))); + }); + + describe(`RecordBatchFileReader`, () => { + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + describe(`RecordBatchStreamReader`, () => { + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + async function validate(source: any) { + const reader: RecordBatchReader = await RecordBatchReader.from(source); + await validateRecordBatchAsyncIterator(3, reader.toNodeStream()[Symbol.asyncIterator]()); + } + }); + } + + it('readAll() should pipe to separate NodeJS WritableStreams', async () => { + // @ts-ignore + const { default: MultiStream } = await import('multistream'); + const { PassThrough } = await import('stream'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = new MultiStream(tables.map((table) => + () => RecordBatchStreamWriter.writeAll(table).toNodeStream() + )) as NodeJS.ReadableStream; + + let tableIndex = -1; + let reader: RecordBatchReader | undefined; + + for await (reader of RecordBatchReader.readAll(stream)) { + + validateStreamState(reader, stream, false); + + const output = reader + .pipe(RecordBatchStreamWriter.throughNode()) + .pipe(new PassThrough()); + + validateStreamState(reader, output, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(output); + expect(streamTable).toEqualTable(sourceTable); + expect(Boolean(output.readableFlowing)).toBe(false); + } + + expect(reader).toBeDefined(); + validateStreamState(reader!, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should not close the underlying NodeJS ReadableStream when reading multiple tables to completion', async () => { + // @ts-ignore + const { default: MultiStream } = await import('multistream'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = new MultiStream(tables.map((table) => + () => RecordBatchStreamWriter.writeAll(table).toNodeStream() + )) as NodeJS.ReadableStream; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + validateStreamState(reader, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should close the underlying NodeJS ReadableStream when reading multiple tables and we break early', async () => { + // @ts-ignore + const { default: MultiStream } = await import('multistream'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = new MultiStream(tables.map((table) => + () => RecordBatchStreamWriter.writeAll(table).toNodeStream() + )) as NodeJS.ReadableStream; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + let batchIndex = -1; + const sourceTable = tables[++tableIndex]; + const breakEarly = tableIndex === (tables.length / 2 | 0); + + for await (const streamBatch of reader) { + expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]); + if (breakEarly && batchIndex === 1) { break; } + } + if (breakEarly) { + // the reader should stay open until we break from the outermost loop + validateStreamState(reader, stream, false); + break; + } + } + + validateStreamState(reader, stream, true, true); + expect(tableIndex).toBe(tables.length / 2 | 0); + }); +})(); + +function validateStreamState(reader: RecordBatchReader, stream: NodeJS.ReadableStream, closed: boolean, readable = !closed) { + expect(reader.closed).toBe(closed); + expect(Boolean(stream.readable)).toBe(readable); + expect(Boolean((stream as any).destroyed)).toBe(closed); + expect(Boolean((stream as any).readableFlowing)).toBe(false); +} diff --git a/src/arrow/js/test/unit/ipc/validate.ts b/src/arrow/js/test/unit/ipc/validate.ts new file mode 100644 index 000000000..aedf87a2d --- /dev/null +++ b/src/arrow/js/test/unit/ipc/validate.ts @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; + +import { + Schema, + RecordBatch, + RecordBatchReader, + RecordBatchFileReader, + RecordBatchStreamReader, +} from 'apache-arrow'; + +export function validateRecordBatchReader<T extends RecordBatchFileReader | RecordBatchStreamReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) { + const reader = r.open(); + expect(reader).toBeInstanceOf(RecordBatchReader); + expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true); + expect(reader.schema).toBeInstanceOf(Schema); + validateRecordBatchIterator(numBatches, reader[Symbol.iterator]()); + expect(reader.closed).toBe(reader.autoDestroy); + return reader; +} + +export async function validateAsyncRecordBatchReader<T extends RecordBatchReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) { + const reader = await r.open(); + expect(reader).toBeInstanceOf(RecordBatchReader); + expect(reader.schema).toBeInstanceOf(Schema); + expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true); + await validateRecordBatchAsyncIterator(numBatches, reader[Symbol.asyncIterator]()); + expect(reader.closed).toBe(reader.autoDestroy); + return reader; +} + +export function validateRecordBatchIterator(numBatches: number, iterator: Iterable<RecordBatch> | IterableIterator<RecordBatch>) { + let i = 0; + try { + for (const recordBatch of iterator) { + expect(recordBatch).toBeInstanceOf(RecordBatch); + expect(i++).toBeLessThan(numBatches); + } + } catch (e) { throw new Error(`${i}: ${e}`); } + expect(i).toBe(numBatches); + if (typeof (iterator as any).return === 'function') { + (iterator as any).return(); + } +} + +export async function validateRecordBatchAsyncIterator(numBatches: number, iterator: AsyncIterable<RecordBatch> | AsyncIterableIterator<RecordBatch>) { + let i = 0; + try { + for await (const recordBatch of iterator) { + expect(recordBatch).toBeInstanceOf(RecordBatch); + expect(i++).toBeLessThan(numBatches); + } + } catch (e) { throw new Error(`${i}: ${e}`); } + expect(i).toBe(numBatches); + if (typeof (iterator as any).return === 'function') { + await (iterator as any).return(); + } +} diff --git a/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts new file mode 100644 index 000000000..fa639e5f6 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import { validateRecordBatchIterator } from '../validate'; +import { Table, RecordBatchFileWriter } from 'apache-arrow'; + +describe('RecordBatchFileWriter', () => { + for (const table of generateRandomTables([10, 20, 30])) { + testFileWriter(table, `[${table.schema.fields.join(', ')}]`); + } + for (const table of generateDictionaryTables([10, 20, 30])) { + testFileWriter(table, `${table.schema.fields[0]}`); + } +}); + +function testFileWriter(table: Table, name: string) { + describe(`should write the Arrow IPC file format (${name})`, () => { + test(`Table`, validateTable.bind(0, table)); + }); +} + +async function validateTable(source: Table) { + const writer = RecordBatchFileWriter.writeAll(source); + const result = await Table.from(writer.toUint8Array()); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts new file mode 100644 index 000000000..05be0e272 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import { validateRecordBatchIterator } from '../validate'; +import { Table, RecordBatchJSONWriter } from 'apache-arrow'; + +describe('RecordBatchJSONWriter', () => { + for (const table of generateRandomTables([10, 20, 30])) { + testJSONWriter(table, `[${table.schema.fields.join(', ')}]`); + } + for (const table of generateDictionaryTables([10, 20, 30])) { + testJSONWriter(table, `${table.schema.fields[0]}`); + } +}); + +function testJSONWriter(table: Table, name: string) { + describe(`should write the Arrow IPC JSON format (${name})`, () => { + test(`Table`, validateTable.bind(0, table)); + }); +} + +async function validateTable(source: Table) { + const writer = RecordBatchJSONWriter.writeAll(source); + const result = Table.from(JSON.parse(await writer.toString())); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts new file mode 100644 index 000000000..a83aa39da --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import * as generate from '../../../generate-test-data'; +import { validateRecordBatchIterator } from '../validate'; +import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer'; +import { DictionaryVector, Dictionary, Uint32, Int32 } from 'apache-arrow'; +import { Table, Schema, Field, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from 'apache-arrow'; + +describe('RecordBatchStreamWriter', () => { + + (() => { + const type = generate.sparseUnion(0, 0).vector.type; + const schema = new Schema([new Field('dictSparseUnion', type)]); + const table = generate.table([10, 20, 30], schema).table; + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + })(); + + for (const table of generateRandomTables([10, 20, 30])) { + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + } + + for (const table of generateDictionaryTables([10, 20, 30])) { + const testName = `${table.schema.fields[0]}`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + } + + it(`should write multiple tables to the same output stream`, async () => { + const tables = [] as Table[]; + const writer = new RecordBatchStreamWriter({ autoDestroy: false }); + const validate = (async () => { + for await (const reader of RecordBatchReader.readAll(writer)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + })(); + for (const table of generateRandomTables([10, 20, 30])) { + tables.push(table); + await writer.writeAll((async function*() { + for (const chunk of table.chunks) { + yield chunk; // insert some asynchrony + await new Promise((r) => setTimeout(r, 1)); + } + }())); + } + writer.close(); + await validate; + }); + + it('should write delta dictionary batches', async () => { + + const name = 'dictionary_encoded_uint32'; + const chunks: DictionaryVector<Uint32, Int32>[] = []; + const { + vector: sourceVector, values: sourceValues, + } = generate.dictionary(1000, 20, new Uint32(), new Int32()); + + const writer = RecordBatchStreamWriter.writeAll((function* () { + const transform = Builder.throughIterable({ + type: sourceVector.type, nullValues: [null], + queueingStrategy: 'count', highWaterMark: 50, + }); + for (const chunk of transform(sourceValues())) { + chunks.push(chunk); + yield RecordBatch.new({ [name]: chunk }); + } + })()); + + expect(Chunked.concat(chunks)).toEqualVector(sourceVector); + + type T = { [name]: Dictionary<Uint32, Int32> }; + const sourceTable = Table.new({ [name]: sourceVector }); + const resultTable = await Table.from<T>(writer.toUint8Array()); + + const { dictionary } = resultTable.getColumn(name); + + expect(resultTable).toEqualTable(sourceTable); + expect((dictionary as Chunked)).toBeInstanceOf(Chunked); + expect((dictionary as Chunked).chunks).toHaveLength(20); + }); +}); + +function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) { + describe(`should write the Arrow IPC stream format (${name})`, () => { + test(`Table`, validateTable.bind(0, table, options)); + }); +} + +async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) { + const writer = RecordBatchStreamWriter.writeAll(source, options); + const result = await Table.from(writer.toUint8Array()); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts new file mode 100644 index 000000000..a19ddcdd7 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { from, as } from 'ix/asynciterable'; +import { tap, flatMap } from 'ix/asynciterable/operators'; + +import { + Table, + RecordBatchReader, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import { + ArrowIOTestHelper, + concatBuffersAsync, + readableDOMStreamToAsyncIterator +} from '../helpers'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader, + validateRecordBatchAsyncIterator +} from '../validate'; + +(() => { + + if (process.env.TEST_DOM_STREAMS !== 'true') { + return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchWriter.throughDOM (${name})`, () => { + + describe('file', () => { + describe(`convert`, () => { + test('ReadableStream', file.whatwgReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + test('ReadableByteStream', file.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchStreamWriter))); + }); + describe(`through`, () => { + test('ReadableStream', file.whatwgReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + test('ReadableByteStream', file.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchFileWriter))); + }); + }); + + describe('stream', () => { + describe(`convert`, () => { + test('ReadableStream', stream.whatwgReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + test('ReadableByteStream', stream.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchFileWriter))); + }); + describe(`through`, () => { + test('ReadableStream', stream.whatwgReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + test('ReadableByteStream', stream.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchStreamWriter))); + }); + }); + + async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) { + const stream = source + .pipeThrough(RecordBatchReader.throughDOM()) + .pipeThrough(RBWImplementation.throughDOM()); + const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream'; + await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream)); + } + + async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) { + const stream = source + .pipeThrough(RecordBatchReader.throughDOM()) + .pipeThrough(RBWImplementation.throughDOM()) + .pipeThrough(RecordBatchReader.throughDOM()); + await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream)); + } + }); + + describe(`toDOMStream (${name})`, () => { + + const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x)); + + describe(`RecordBatchJSONWriter`, () => { + + const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`); + + test('Uint8Array', json.buffer((source) => validate(toJSON(source)))); + test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source))))); + + async function validate(source: { schema: any } | Promise<{ schema: any }>) { + const reader = await RecordBatchReader.from(<any> source); + const writer = await RecordBatchJSONWriter.writeAll(reader); + const buffer = await concatBuffersAsync(writer.toDOMStream()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer))); + } + }); + + describe(`RecordBatchFileWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchFileWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('file', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchFileWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('file', 3, reader); + } + }); + }); + + describe(`RecordBatchStreamWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchStreamWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('stream', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchStreamWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('stream', 3, reader); + } + }); + }); + }); + } + + describe(`RecordBatchStreamWriter.throughDOM`, () => { + + const opts = { autoDestroy: false }; + const sleep = (n: number) => new Promise((r) => setTimeout(r, n)); + + it(`should write a stream of tables to the same output stream`, async () => { + + const tables = [] as Table[]; + const stream: ReadableStream<any> = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(stream.locked).toBe(false); + }); + + it(`should write a stream of record batches to the same output stream`, async () => { + + const tables = [] as Table[]; + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + // flatMap from Table -> RecordBatches[] + .pipe(flatMap((table) => as(table.chunks))) + .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(stream.locked).toBe(false); + }); + }); + +})(); diff --git a/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts new file mode 100644 index 000000000..662129b1b --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { from, as } from 'ix/asynciterable'; +import { tap, flatMap } from 'ix/asynciterable/operators'; +import 'ix/Ix.node'; + +import { + Table, + RecordBatchReader, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import { + ArrowIOTestHelper, + concatBuffersAsync +} from '../helpers'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader, + validateRecordBatchAsyncIterator +} from '../validate'; + +(() => { + + if (process.env.TEST_NODE_STREAMS !== 'true') { + return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchWriter.throughNode (${name})`, () => { + + describe('file', () => { + describe(`convert`, () => { + test('fs.ReadStream', file.fsReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + test('stream.Readable', file.nodeReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + }); + describe(`through`, () => { + test('fs.ReadStream', file.fsReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + test('stream.Readable', file.nodeReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + }); + }); + + describe('stream', () => { + describe(`convert`, () => { + test('fs.ReadStream', stream.fsReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + test('stream.Readable', stream.nodeReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + }); + describe(`through`, () => { + test('fs.ReadStream', stream.fsReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + test('stream.Readable', stream.nodeReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + }); + }); + + async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) { + const stream = source + .pipe(RecordBatchReader.throughNode()) + .pipe(RBWImplementation.throughNode()); + const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream'; + await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream)); + } + + async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) { + const stream = source + .pipe(RecordBatchReader.throughNode()) + .pipe(RBWImplementation.throughNode()) + .pipe(RecordBatchReader.throughNode()); + await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]()); + } + }); + + describe(`toNodeStream (${name})`, () => { + + const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x)); + + describe(`RecordBatchJSONWriter`, () => { + + const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`); + + test('Uint8Array', json.buffer((source) => validate(toJSON(source)))); + test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source))))); + + async function validate(source: { schema: any } | Promise<{ schema: any }>) { + const reader = await RecordBatchReader.from(<any> source); + const writer = await RecordBatchJSONWriter.writeAll(reader); + const buffer = await concatBuffersAsync(writer.toNodeStream()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer))); + } + }); + + describe(`RecordBatchFileWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchFileWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('file', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchFileWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('file', 3, reader); + } + }); + }); + + describe(`RecordBatchStreamWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchStreamWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('stream', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchStreamWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('stream', 3, reader); + } + }); + }); + }); + } + + describe(`RecordBatchStreamWriter.throughNode`, () => { + + const sleep = (n: number) => new Promise((r) => setTimeout(r, n)); + + it(`should write a stream of tables to the same output stream`, async () => { + + const tables = [] as Table[]; + const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false }); + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipe(writer); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(writer.readable).toBe(false); + expect((writer as any).destroyed).toBe(true); + }); + + it(`should write a stream of record batches to the same output stream`, async () => { + + const tables = [] as Table[]; + const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false }); + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipe(flatMap((table) => as(table.chunks))) + .pipe(writer); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(writer.readable).toBe(false); + expect((writer as any).destroyed).toBe(true); + }); + + }); +})(); |