summaryrefslogtreecommitdiffstats
path: root/src/arrow/js/test/unit/ipc
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/js/test/unit/ipc')
-rw-r--r--src/arrow/js/test/unit/ipc/helpers.ts202
-rw-r--r--src/arrow/js/test/unit/ipc/message-reader-tests.ts109
-rw-r--r--src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts123
-rw-r--r--src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts150
-rw-r--r--src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts40
-rw-r--r--src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts65
-rw-r--r--src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts227
-rw-r--r--src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts219
-rw-r--r--src/arrow/js/test/unit/ipc/validate.ts74
-rw-r--r--src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts46
-rw-r--r--src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts46
-rw-r--r--src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts119
-rw-r--r--src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts272
-rw-r--r--src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts274
14 files changed, 1966 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/ipc/helpers.ts b/src/arrow/js/test/unit/ipc/helpers.ts
new file mode 100644
index 000000000..9fccefec9
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/helpers.ts
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import '../../jest-extensions';
+
+import {
+ Table,
+ RecordBatchWriter,
+ RecordBatchFileWriter,
+ RecordBatchJSONWriter,
+ RecordBatchStreamWriter,
+} from 'apache-arrow';
+
+import * as fs from 'fs';
+import { fs as memfs } from 'memfs';
+import { Readable, PassThrough } from 'stream';
+import randomatic from 'randomatic';
+
+export abstract class ArrowIOTestHelper {
+
+ constructor(public table: Table) {}
+
+ public static file(table: Table) { return new ArrowFileIOTestHelper(table); }
+ public static json(table: Table) { return new ArrowJsonIOTestHelper(table); }
+ public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); }
+
+ protected abstract writer(table: Table): RecordBatchWriter;
+ protected async filepath(table: Table): Promise<fs.PathLike> {
+ const path = `/${randomatic('a0', 20)}.arrow`;
+ const data = await this.writer(table).toUint8Array();
+ await memfs.promises.writeFile(path, data);
+ return path;
+ }
+
+ buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ await testFn(await this.writer(this.table).toUint8Array());
+ };
+ }
+ iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ await testFn(chunkedIterable(await this.writer(this.table).toUint8Array()));
+ };
+ }
+ asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array()));
+ };
+ }
+ fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(<any> await memfs.promises.open(path, 'r'));
+ await memfs.promises.unlink(path);
+ };
+ }
+ fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(<any> memfs.createReadStream(path));
+ await memfs.promises.unlink(path);
+ };
+ }
+ nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const sink = new PassThrough();
+ sink.end(await this.writer(this.table).toUint8Array());
+ await testFn(sink);
+ };
+ }
+ whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(nodeToDOMStream(memfs.createReadStream(path)));
+ await memfs.promises.unlink(path);
+ };
+ }
+ whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' }));
+ await memfs.promises.unlink(path);
+ };
+ }
+}
+
+class ArrowFileIOTestHelper extends ArrowIOTestHelper {
+ constructor(table: Table) { super(table); }
+ protected writer(table: Table) {
+ return RecordBatchFileWriter.writeAll(table);
+ }
+}
+
+class ArrowJsonIOTestHelper extends ArrowIOTestHelper {
+ constructor(table: Table) { super(table); }
+ protected writer(table: Table) {
+ return RecordBatchJSONWriter.writeAll(table);
+ }
+}
+
+class ArrowStreamIOTestHelper extends ArrowIOTestHelper {
+ constructor(table: Table) { super(table); }
+ protected writer(table: Table) {
+ return RecordBatchStreamWriter.writeAll(table);
+ }
+}
+
+export function* chunkedIterable(buffer: Uint8Array) {
+ let offset = 0, size = 0;
+ while (offset < buffer.byteLength) {
+ size = yield buffer.subarray(offset, offset +=
+ (isNaN(+size) ? buffer.byteLength - offset : size));
+ }
+}
+
+export async function* asyncChunkedIterable(buffer: Uint8Array) {
+ let offset = 0, size = 0;
+ while (offset < buffer.byteLength) {
+ size = yield buffer.subarray(offset, offset +=
+ (isNaN(+size) ? buffer.byteLength - offset : size));
+ }
+}
+
+export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) {
+ if (iterator instanceof ReadableStream) {
+ iterator = readableDOMStreamToAsyncIterator(iterator);
+ }
+ let chunks = [], total = 0;
+ for await (const chunk of iterator) {
+ chunks.push(chunk);
+ total += chunk.byteLength;
+ }
+ return chunks.reduce((x, buffer) => {
+ x.buffer.set(buffer, x.offset);
+ x.offset += buffer.byteLength;
+ return x;
+ }, { offset: 0, buffer: new Uint8Array(total) }).buffer;
+}
+
+export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) {
+ // Get a lock on the stream
+ const reader = stream.getReader();
+ try {
+ while (true) {
+ // Read from the stream
+ const { done, value } = await reader.read();
+ // Exit if we're done
+ if (done) { break; }
+ // Else yield the chunk
+ yield value as T;
+ }
+ } finally {
+ try { stream.locked && reader.releaseLock(); } catch (e) {}
+ }
+}
+
+export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) {
+ stream = new Readable((stream as any)._readableState).wrap(stream);
+ return new ReadableStream<T>({
+ ...opts,
+ start(controller) {
+ stream.pause();
+ stream.on('data', (chunk) => {
+ controller.enqueue(chunk);
+ stream.pause();
+ });
+ stream.on('end', () => controller.close());
+ stream.on('error', e => controller.error(e));
+ },
+ pull() { stream.resume(); },
+ cancel(reason) {
+ stream.pause();
+ if (typeof (stream as any).cancel === 'function') {
+ return (stream as any).cancel(reason);
+ } else if (typeof (stream as any).destroy === 'function') {
+ return (stream as any).destroy(reason);
+ }
+ }
+ });
+}
diff --git a/src/arrow/js/test/unit/ipc/message-reader-tests.ts b/src/arrow/js/test/unit/ipc/message-reader-tests.ts
new file mode 100644
index 000000000..c48aa2ce1
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/message-reader-tests.ts
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// import * as fs from 'fs';
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../data/tables';
+
+import { ArrowIOTestHelper } from './helpers';
+import { MessageReader, AsyncMessageReader } from 'apache-arrow';
+
+for (const table of generateRandomTables([10, 20, 30])) {
+
+ const io = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+ const numMessages = table.chunks.reduce((numMessages, batch) => {
+ return numMessages +
+ /* recordBatch message */ 1 +
+ /* dictionary messages */ batch.dictionaries.size;
+ }, /* schema message */ 1);
+
+ const validate = validateMessageReader.bind(0, numMessages);
+ const validateAsync = validateAsyncMessageReader.bind(0, numMessages);
+
+ describe(`MessageReader (${name})`, () => {
+ describe(`should read all Messages`, () => {
+ test(`Uint8Array`, io.buffer(validate));
+ test(`Iterable`, io.iterable(validate));
+ });
+ });
+
+ describe(`AsyncMessageReader (${name})`, () => {
+ describe(`should read all Messages`, () => {
+ test('AsyncIterable', io.asyncIterable(validateAsync));
+ test('fs.FileHandle', io.fsFileHandle(validateAsync));
+ test('fs.ReadStream', io.fsReadableStream(validateAsync));
+ test('stream.Readable', io.nodeReadableStream(validateAsync));
+ test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync));
+ test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync));
+ });
+ });
+}
+
+export function validateMessageReader(numMessages: number, source: any) {
+ const reader = new MessageReader(source);
+ let index = 0;
+ for (let message of reader) {
+
+ if (index === 0) {
+ expect(message.isSchema()).toBe(true);
+ expect(message.bodyLength).toBe(0);
+ } else {
+ expect(message.isSchema()).toBe(false);
+ expect(message.isRecordBatch() || message.isDictionaryBatch()).toBe(true);
+ }
+
+ try {
+ expect(message.bodyLength % 8).toBe(0);
+ } catch (e) { throw new Error(`bodyLength: ${e}`); }
+
+ const body = reader.readMessageBody(message.bodyLength);
+ expect(body).toBeInstanceOf(Uint8Array);
+ expect(body.byteLength).toBe(message.bodyLength);
+ expect(index++).toBeLessThan(numMessages);
+ }
+ expect(index).toBe(numMessages);
+ reader.return();
+}
+
+export async function validateAsyncMessageReader(numMessages: number, source: any) {
+ const reader = new AsyncMessageReader(source);
+ let index = 0;
+ for await (let message of reader) {
+
+ if (index === 0) {
+ expect(message.isSchema()).toBe(true);
+ expect(message.bodyLength).toBe(0);
+ } else {
+ expect(message.isSchema()).toBe(false);
+ expect(message.isRecordBatch() || message.isDictionaryBatch()).toBe(true);
+ }
+
+ try {
+ expect(message.bodyLength % 8).toBe(0);
+ } catch (e) { throw new Error(`bodyLength: ${e}`); }
+
+ const body = await reader.readMessageBody(message.bodyLength);
+ expect(body).toBeInstanceOf(Uint8Array);
+ expect(body.byteLength).toBe(message.bodyLength);
+ expect(index++).toBeLessThan(numMessages);
+ }
+ expect(index).toBe(numMessages);
+ await reader.return();
+}
diff --git a/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts
new file mode 100644
index 000000000..a7ddfc940
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+import { ArrowIOTestHelper } from '../helpers';
+import { toArray } from 'ix/asynciterable/toarray';
+
+import {
+ validateRecordBatchReader,
+ validateAsyncRecordBatchReader
+} from '../validate';
+
+import {
+ RecordBatchReader,
+ RecordBatchFileReader,
+ AsyncRecordBatchFileReader
+} from 'apache-arrow';
+
+for (const table of generateRandomTables([10, 20, 30])) {
+
+ const io = ArrowIOTestHelper.file(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ const validate = (source: any) => { validateRecordBatchReader('file', 3, RecordBatchReader.from(source)); };
+ const validateAsync = async (source: any) => { await validateAsyncRecordBatchReader('file', 3, await RecordBatchReader.from(source)); };
+ const validateAsyncWrapped = async (source: any) => { await validateAsyncRecordBatchReader('file', 3, await RecordBatchReader.from(Promise.resolve(source))); };
+
+ describe(`RecordBatchFileReader (${name})`, () => {
+ describe(`should read all RecordBatches`, () => {
+ test(`Uint8Array`, io.buffer(validate));
+ test(`Iterable`, io.iterable(validate));
+ });
+ describe(`should allow random access to record batches after iterating when autoDestroy=false`, () => {
+ test(`Uint8Array`, io.buffer(validateRandomAccess));
+ test(`Iterable`, io.iterable(validateRandomAccess));
+ });
+ });
+
+ describe(`AsyncRecordBatchFileReader (${name})`, () => {
+ describe(`should read all RecordBatches`, () => {
+
+ test('AsyncIterable', io.asyncIterable(validateAsync));
+ test('fs.FileHandle', io.fsFileHandle(validateAsync));
+ test('fs.ReadStream', io.fsReadableStream(validateAsync));
+ test('stream.Readable', io.nodeReadableStream(validateAsync));
+ test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync));
+ test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync));
+
+ test('Promise<AsyncIterable>', io.asyncIterable(validateAsyncWrapped));
+ test('Promise<fs.FileHandle>', io.fsFileHandle(validateAsyncWrapped));
+ test('Promise<fs.ReadStream>', io.fsReadableStream(validateAsyncWrapped));
+ test('Promise<stream.Readable>', io.nodeReadableStream(validateAsyncWrapped));
+ test('Promise<ReadableStream>', io.whatwgReadableStream(validateAsyncWrapped));
+ test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateAsyncWrapped));
+ });
+
+ describe(`should allow random access to record batches after iterating when autoDestroy=false`, () => {
+
+ test('AsyncIterable', io.asyncIterable(validateRandomAccessAsync));
+ test('fs.FileHandle', io.fsFileHandle(validateRandomAccessAsync));
+ test('fs.ReadStream', io.fsReadableStream(validateRandomAccessAsync));
+ test('stream.Readable', io.nodeReadableStream(validateRandomAccessAsync));
+ test('whatwg.ReadableStream', io.whatwgReadableStream(validateRandomAccessAsync));
+ test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateRandomAccessAsync));
+
+ test('Promise<AsyncIterable>', io.asyncIterable(validateRandomAccessAsync));
+ test('Promise<fs.FileHandle>', io.fsFileHandle(validateRandomAccessAsync));
+ test('Promise<fs.ReadStream>', io.fsReadableStream(validateRandomAccessAsync));
+ test('Promise<stream.Readable>', io.nodeReadableStream(validateRandomAccessAsync));
+ test('Promise<ReadableStream>', io.whatwgReadableStream(validateRandomAccessAsync));
+ test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateRandomAccessAsync));
+ });
+ });
+}
+
+function validateRandomAccess(source: any) {
+ const reader = RecordBatchReader.from(source) as RecordBatchFileReader;
+ const schema = reader.open({ autoDestroy: false }).schema;
+ const batches = [...reader];
+ expect(reader.closed).toBe(false);
+ expect(reader.schema).toBe(schema);
+ while (batches.length > 0) {
+ const expected = batches.pop()!;
+ const actual = reader.readRecordBatch(batches.length);
+ expect(actual).toEqualRecordBatch(expected);
+ }
+ reader.cancel();
+ expect(reader.closed).toBe(true);
+ expect(reader.schema).toBeUndefined();
+}
+
+async function validateRandomAccessAsync(source: any) {
+ const reader = (await RecordBatchReader.from(source)) as AsyncRecordBatchFileReader;
+ const schema = (await reader.open({ autoDestroy: false })).schema;
+ const batches = await toArray(reader);
+ expect(reader.closed).toBe(false);
+ expect(reader.schema).toBe(schema);
+ while (batches.length > 0) {
+ const expected = batches.pop()!;
+ const actual = await reader.readRecordBatch(batches.length);
+ expect(actual).toEqualRecordBatch(expected);
+ }
+ await reader.cancel();
+ expect(reader.closed).toBe(true);
+ expect(reader.schema).toBeUndefined();
+}
diff --git a/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts b/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts
new file mode 100644
index 000000000..c444b78fc
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import { ArrowIOTestHelper } from '../helpers';
+import {
+ RecordBatchReader,
+ RecordBatchFileReader,
+ RecordBatchStreamReader,
+ AsyncRecordBatchFileReader,
+ AsyncRecordBatchStreamReader
+} from 'apache-arrow';
+
+for (const table of generateRandomTables([10, 20, 30])) {
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+ // eslint-disable-next-line jest/valid-describe
+ describe('RecordBatchReader.from', ((table, name) => () => {
+ testFromFile(ArrowIOTestHelper.file(table), name);
+ testFromJSON(ArrowIOTestHelper.json(table), name);
+ testFromStream(ArrowIOTestHelper.stream(table), name);
+ })(table, name));
+}
+
+function testFromJSON(io: ArrowIOTestHelper, name: string) {
+ describe(`should return a RecordBatchJSONReader (${name})`, () => {
+ test(`Uint8Array`, io.buffer((buffer) => {
+ const json = JSON.parse(`${Buffer.from(buffer)}`);
+ const reader = RecordBatchReader.from(json);
+ expect(reader.isSync()).toEqual(true);
+ expect(reader.isAsync()).toEqual(false);
+ expect(reader).toBeInstanceOf(RecordBatchStreamReader);
+ }));
+ });
+}
+
+function testFromFile(io: ArrowIOTestHelper, name: string) {
+
+ describe(`should return a RecordBatchFileReader (${name})`, () => {
+
+ test(`Uint8Array`, io.buffer(syncSync));
+ test(`Iterable`, io.iterable(syncSync));
+ test('AsyncIterable', io.asyncIterable(asyncSync));
+ test('fs.FileHandle', io.fsFileHandle(asyncAsync));
+ test('fs.ReadStream', io.fsReadableStream(asyncSync));
+ test('stream.Readable', io.nodeReadableStream(asyncSync));
+ test('whatwg.ReadableStream', io.whatwgReadableStream(asyncSync));
+ test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(asyncSync));
+
+ test(`Promise<Uint8Array>`, io.buffer((source) => asyncSync(Promise.resolve(source))));
+ test(`Promise<Iterable>`, io.iterable((source) => asyncSync(Promise.resolve(source))));
+ test('Promise<AsyncIterable>', io.asyncIterable((source) => asyncSync(Promise.resolve(source))));
+ test('Promise<fs.FileHandle>', io.fsFileHandle((source) => asyncAsync(Promise.resolve(source))));
+ test('Promise<fs.ReadStream>', io.fsReadableStream((source) => asyncSync(Promise.resolve(source))));
+ test('Promise<stream.Readable>', io.nodeReadableStream((source) => asyncSync(Promise.resolve(source))));
+ test('Promise<whatwg.ReadableStream>', io.whatwgReadableStream((source) => asyncSync(Promise.resolve(source))));
+ test('Promise<whatwg.ReadableByteStream>', io.whatwgReadableByteStream((source) => asyncSync(Promise.resolve(source))));
+ });
+
+ function syncSync(source: any) {
+ const reader = RecordBatchReader.from(source);
+ expect(reader.isSync()).toEqual(true);
+ expect(reader.isAsync()).toEqual(false);
+ expect(reader).toBeInstanceOf(RecordBatchFileReader);
+ }
+
+ async function asyncSync(source: any) {
+ const pending = RecordBatchReader.from(source);
+ expect(pending).toBeInstanceOf(Promise);
+ const reader = await pending;
+ expect(reader.isSync()).toEqual(true);
+ expect(reader.isAsync()).toEqual(false);
+ expect(reader).toBeInstanceOf(RecordBatchFileReader);
+ }
+
+ async function asyncAsync(source: any) {
+ const pending = RecordBatchReader.from(source);
+ expect(pending).toBeInstanceOf(Promise);
+ const reader = await pending;
+ expect(reader.isSync()).toEqual(false);
+ expect(reader.isAsync()).toEqual(true);
+ expect(reader).toBeInstanceOf(AsyncRecordBatchFileReader);
+ }
+}
+
+function testFromStream(io: ArrowIOTestHelper, name: string) {
+
+ describe(`should return a RecordBatchStreamReader (${name})`, () => {
+
+ test(`Uint8Array`, io.buffer(syncSync));
+ test(`Iterable`, io.iterable(syncSync));
+ test('AsyncIterable', io.asyncIterable(asyncAsync));
+ test('fs.FileHandle', io.fsFileHandle(asyncAsync));
+ test('fs.ReadStream', io.fsReadableStream(asyncAsync));
+ test('stream.Readable', io.nodeReadableStream(asyncAsync));
+ test('whatwg.ReadableStream', io.whatwgReadableStream(asyncAsync));
+ test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(asyncAsync));
+
+ test(`Promise<Uint8Array>`, io.buffer((source) => asyncSync(Promise.resolve(source))));
+ test(`Promise<Iterable>`, io.iterable((source) => asyncSync(Promise.resolve(source))));
+ test('Promise<AsyncIterable>', io.asyncIterable((source) => asyncAsync(Promise.resolve(source))));
+ test('Promise<fs.FileHandle>', io.fsFileHandle((source) => asyncAsync(Promise.resolve(source))));
+ test('Promise<fs.ReadStream>', io.fsReadableStream((source) => asyncAsync(Promise.resolve(source))));
+ test('Promise<stream.Readable>', io.nodeReadableStream((source) => asyncAsync(Promise.resolve(source))));
+ test('Promise<whatwg.ReadableStream>', io.whatwgReadableStream((source) => asyncAsync(Promise.resolve(source))));
+ test('Promise<whatwg.ReadableByteStream>', io.whatwgReadableByteStream((source) => asyncAsync(Promise.resolve(source))));
+ });
+
+ function syncSync(source: any) {
+ const reader = RecordBatchReader.from(source);
+ expect(reader.isSync()).toEqual(true);
+ expect(reader.isAsync()).toEqual(false);
+ expect(reader).toBeInstanceOf(RecordBatchStreamReader);
+ }
+
+ async function asyncSync(source: any) {
+ const pending = RecordBatchReader.from(source);
+ expect(pending).toBeInstanceOf(Promise);
+ const reader = await pending;
+ expect(reader.isSync()).toEqual(true);
+ expect(reader.isAsync()).toEqual(false);
+ expect(reader).toBeInstanceOf(RecordBatchStreamReader);
+ }
+
+ async function asyncAsync(source: any) {
+ const pending = RecordBatchReader.from(source);
+ expect(pending).toBeInstanceOf(Promise);
+ const reader = await pending;
+ expect(reader.isSync()).toEqual(false);
+ expect(reader.isAsync()).toEqual(true);
+ expect(reader).toBeInstanceOf(AsyncRecordBatchStreamReader);
+ }
+}
diff --git a/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts
new file mode 100644
index 000000000..9bd1e3466
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import { ArrowIOTestHelper } from '../helpers';
+import { RecordBatchReader } from 'apache-arrow';
+import { validateRecordBatchReader } from '../validate';
+
+for (const table of generateRandomTables([10, 20, 30])) {
+
+ const io = ArrowIOTestHelper.json(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchJSONReader (${name})`, () => {
+ describe(`should read all RecordBatches`, () => {
+ test(`Uint8Array`, io.buffer((buffer) => {
+ const json = JSON.parse(Buffer.from(buffer).toString());
+ validateRecordBatchReader('json', 3, RecordBatchReader.from(json));
+ }));
+ });
+ });
+}
diff --git a/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts
new file mode 100644
index 000000000..23879cf79
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import {
+ validateRecordBatchReader,
+ validateAsyncRecordBatchReader
+} from '../validate';
+
+import { ArrowIOTestHelper } from '../helpers';
+import { RecordBatchReader } from 'apache-arrow';
+
+for (const table of generateRandomTables([10, 20, 30])) {
+
+ const io = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ const validate = (source: any) => { validateRecordBatchReader('stream', 3, RecordBatchReader.from(source)); };
+ const validateAsync = async (source: any) => { await validateAsyncRecordBatchReader('stream', 3, await RecordBatchReader.from(source)); };
+ const validateAsyncWrapped = async (source: any) => { await validateAsyncRecordBatchReader('stream', 3, await RecordBatchReader.from(Promise.resolve(source))); };
+
+ describe(`RecordBatchStreamReader (${name})`, () => {
+ describe(`should read all RecordBatches`, () => {
+ test(`Uint8Array`, io.buffer(validate));
+ test(`Iterable`, io.iterable(validate));
+ });
+ });
+
+ describe(`AsyncRecordBatchStreamReader (${name})`, () => {
+ describe(`should read all RecordBatches`, () => {
+
+ test('AsyncIterable', io.asyncIterable(validateAsync));
+ test('fs.FileHandle', io.fsFileHandle(validateAsync));
+ test('fs.ReadStream', io.fsReadableStream(validateAsync));
+ test('stream.Readable', io.nodeReadableStream(validateAsync));
+ test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync));
+ test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync));
+
+ test('Promise<AsyncIterable>', io.asyncIterable(validateAsyncWrapped));
+ test('Promise<fs.FileHandle>', io.fsFileHandle(validateAsyncWrapped));
+ test('Promise<fs.ReadStream>', io.fsReadableStream(validateAsyncWrapped));
+ test('Promise<stream.Readable>', io.nodeReadableStream(validateAsyncWrapped));
+ test('Promise<ReadableStream>', io.whatwgReadableStream(validateAsyncWrapped));
+ test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateAsyncWrapped));
+ });
+ });
+}
diff --git a/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts
new file mode 100644
index 000000000..a380e1619
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import {
+ Table,
+ RecordBatchReader,
+ RecordBatchStreamWriter
+} from 'apache-arrow';
+
+import { validateRecordBatchAsyncIterator } from '../validate';
+import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers';
+
+(() => {
+
+ if (process.env.TEST_DOM_STREAMS !== 'true') {
+ return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {});
+ }
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+
+ const file = ArrowIOTestHelper.file(table);
+ const json = ArrowIOTestHelper.json(table);
+ const stream = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchReader.throughDOM (${name})`, () => {
+ describe('file', () => {
+ test('ReadableStream', file.whatwgReadableStream(validate));
+ test('ReadableByteStream', file.whatwgReadableByteStream(validate));
+ });
+ describe('stream', () => {
+ test('ReadableStream', stream.whatwgReadableStream(validate));
+ test('ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ });
+ async function validate(source: ReadableStream) {
+ const stream = source.pipeThrough(RecordBatchReader.throughDOM());
+ await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream));
+ }
+ });
+
+ describe(`toDOMStream (${name})`, () => {
+
+ describe(`RecordBatchJSONReader`, () => {
+ test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`))));
+ });
+
+ describe(`RecordBatchFileReader`, () => {
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source))));
+ });
+
+ describe(`RecordBatchStreamReader`, () => {
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source))));
+ });
+
+ async function validate(source: any) {
+ const reader: RecordBatchReader = await RecordBatchReader.from(source);
+ const iterator = readableDOMStreamToAsyncIterator(reader.toDOMStream());
+ await validateRecordBatchAsyncIterator(3, iterator);
+ }
+ });
+ }
+
+ it('readAll() should pipe to separate WhatWG WritableStreams', async () => {
+ // @ts-ignore
+ const { concatStream } = await import('@openpgp/web-stream-tools');
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = concatStream(tables.map((table, i) =>
+ RecordBatchStreamWriter.writeAll(table).toDOMStream({
+ // Alternate between bytes mode and regular mode because code coverage
+ type: i % 2 === 0 ? 'bytes' : undefined
+ })
+ )) as ReadableStream<Uint8Array>;
+
+ let tableIndex = -1;
+ let reader: RecordBatchReader | undefined;
+
+ for await (reader of RecordBatchReader.readAll(stream)) {
+
+ validateStreamState(reader, stream, false);
+
+ const output = reader
+ .pipeThrough(RecordBatchStreamWriter.throughDOM())
+ .pipeThrough(new TransformStream());
+
+ validateStreamState(reader, output, false, false);
+
+ const sourceTable = tables[++tableIndex];
+ const streamTable = await Table.from(output);
+ expect(streamTable).toEqualTable(sourceTable);
+ expect(output.locked).toBe(false);
+ }
+
+ expect(reader).toBeDefined();
+ validateStreamState(reader!, stream, true);
+ expect(tableIndex).toBe(tables.length - 1);
+ });
+
+ it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => {
+ // @ts-ignore
+ const { concatStream } = await import('@openpgp/web-stream-tools');
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = concatStream(tables.map((table, i) =>
+ RecordBatchStreamWriter.writeAll(table).toDOMStream({
+ // Alternate between bytes mode and regular mode because code coverage
+ type: i % 2 === 0 ? 'bytes' : undefined
+ })
+ )) as ReadableStream<Uint8Array>;
+
+ let tableIndex = -1;
+ let reader = await RecordBatchReader.from(stream);
+
+ validateStreamState(reader, stream, false);
+
+ for await (reader of RecordBatchReader.readAll(reader)) {
+
+ validateStreamState(reader, stream, false);
+
+ const sourceTable = tables[++tableIndex];
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ validateStreamState(reader, stream, true);
+ expect(tableIndex).toBe(tables.length - 1);
+ });
+
+ it('should close the underlying WhatWG ReadableStream when reading multiple tables and we break early', async () => {
+ // @ts-ignore
+ const { concatStream } = await import('@openpgp/web-stream-tools');
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = concatStream(tables.map((table, i) =>
+ RecordBatchStreamWriter.writeAll(table).toDOMStream({
+ // Alternate between bytes mode and regular mode because code coverage
+ type: i % 2 === 0 ? 'bytes' : undefined
+ })
+ )) as ReadableStream<Uint8Array>;
+
+ let tableIndex = -1;
+ let reader = await RecordBatchReader.from(stream);
+
+ validateStreamState(reader, stream, false);
+
+ for await (reader of RecordBatchReader.readAll(reader)) {
+
+ validateStreamState(reader, stream, false);
+
+ let batchIndex = -1;
+ const sourceTable = tables[++tableIndex];
+ const breakEarly = tableIndex === (tables.length / 2 | 0);
+
+ for await (const streamBatch of reader) {
+ expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]);
+ if (breakEarly && batchIndex === 1) { break; }
+ }
+ if (breakEarly) {
+ // the reader should stay open until we break from the outermost loop
+ validateStreamState(reader, stream, false);
+ break;
+ }
+ }
+
+ validateStreamState(reader, stream, true);
+ expect(tableIndex).toBe(tables.length / 2 | 0);
+ });
+})();
+
+function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) {
+ expect(reader.closed).toBe(closed);
+ expect(stream.locked).toBe(locked);
+}
diff --git a/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts
new file mode 100644
index 000000000..822f99350
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables
+} from '../../../data/tables';
+
+import {
+ Table,
+ RecordBatchReader,
+ RecordBatchStreamWriter
+} from 'apache-arrow';
+
+import { ArrowIOTestHelper } from '../helpers';
+import { validateRecordBatchAsyncIterator } from '../validate';
+
+(() => {
+
+ if (process.env.TEST_NODE_STREAMS !== 'true') {
+ return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {});
+ }
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+
+ const file = ArrowIOTestHelper.file(table);
+ const json = ArrowIOTestHelper.json(table);
+ const stream = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchReader.throughNode (${name})`, () => {
+ describe('file', () => {
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ });
+ describe('stream', () => {
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ });
+ async function validate(source: NodeJS.ReadableStream) {
+ const stream = source.pipe(RecordBatchReader.throughNode());
+ await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]());
+ }
+ });
+
+ describe(`toNodeStream (${name})`, () => {
+
+ describe(`RecordBatchJSONReader`, () => {
+ test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`))));
+ });
+
+ describe(`RecordBatchFileReader`, () => {
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source))));
+ });
+
+ describe(`RecordBatchStreamReader`, () => {
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source))));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source))));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source))));
+ });
+
+ async function validate(source: any) {
+ const reader: RecordBatchReader = await RecordBatchReader.from(source);
+ await validateRecordBatchAsyncIterator(3, reader.toNodeStream()[Symbol.asyncIterator]());
+ }
+ });
+ }
+
+ it('readAll() should pipe to separate NodeJS WritableStreams', async () => {
+ // @ts-ignore
+ const { default: MultiStream } = await import('multistream');
+ const { PassThrough } = await import('stream');
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = new MultiStream(tables.map((table) =>
+ () => RecordBatchStreamWriter.writeAll(table).toNodeStream()
+ )) as NodeJS.ReadableStream;
+
+ let tableIndex = -1;
+ let reader: RecordBatchReader | undefined;
+
+ for await (reader of RecordBatchReader.readAll(stream)) {
+
+ validateStreamState(reader, stream, false);
+
+ const output = reader
+ .pipe(RecordBatchStreamWriter.throughNode())
+ .pipe(new PassThrough());
+
+ validateStreamState(reader, output, false);
+
+ const sourceTable = tables[++tableIndex];
+ const streamTable = await Table.from(output);
+ expect(streamTable).toEqualTable(sourceTable);
+ expect(Boolean(output.readableFlowing)).toBe(false);
+ }
+
+ expect(reader).toBeDefined();
+ validateStreamState(reader!, stream, true);
+ expect(tableIndex).toBe(tables.length - 1);
+ });
+
+ it('should not close the underlying NodeJS ReadableStream when reading multiple tables to completion', async () => {
+ // @ts-ignore
+ const { default: MultiStream } = await import('multistream');
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = new MultiStream(tables.map((table) =>
+ () => RecordBatchStreamWriter.writeAll(table).toNodeStream()
+ )) as NodeJS.ReadableStream;
+
+ let tableIndex = -1;
+ let reader = await RecordBatchReader.from(stream);
+
+ validateStreamState(reader, stream, false);
+
+ for await (reader of RecordBatchReader.readAll(reader)) {
+
+ validateStreamState(reader, stream, false);
+
+ const sourceTable = tables[++tableIndex];
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ validateStreamState(reader, stream, true);
+ expect(tableIndex).toBe(tables.length - 1);
+ });
+
+ it('should close the underlying NodeJS ReadableStream when reading multiple tables and we break early', async () => {
+ // @ts-ignore
+ const { default: MultiStream } = await import('multistream');
+
+ expect.hasAssertions();
+
+ const tables = [...generateRandomTables([10, 20, 30])];
+
+ const stream = new MultiStream(tables.map((table) =>
+ () => RecordBatchStreamWriter.writeAll(table).toNodeStream()
+ )) as NodeJS.ReadableStream;
+
+ let tableIndex = -1;
+ let reader = await RecordBatchReader.from(stream);
+
+ validateStreamState(reader, stream, false);
+
+ for await (reader of RecordBatchReader.readAll(reader)) {
+
+ validateStreamState(reader, stream, false);
+
+ let batchIndex = -1;
+ const sourceTable = tables[++tableIndex];
+ const breakEarly = tableIndex === (tables.length / 2 | 0);
+
+ for await (const streamBatch of reader) {
+ expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]);
+ if (breakEarly && batchIndex === 1) { break; }
+ }
+ if (breakEarly) {
+ // the reader should stay open until we break from the outermost loop
+ validateStreamState(reader, stream, false);
+ break;
+ }
+ }
+
+ validateStreamState(reader, stream, true, true);
+ expect(tableIndex).toBe(tables.length / 2 | 0);
+ });
+})();
+
+function validateStreamState(reader: RecordBatchReader, stream: NodeJS.ReadableStream, closed: boolean, readable = !closed) {
+ expect(reader.closed).toBe(closed);
+ expect(Boolean(stream.readable)).toBe(readable);
+ expect(Boolean((stream as any).destroyed)).toBe(closed);
+ expect(Boolean((stream as any).readableFlowing)).toBe(false);
+}
diff --git a/src/arrow/js/test/unit/ipc/validate.ts b/src/arrow/js/test/unit/ipc/validate.ts
new file mode 100644
index 000000000..aedf87a2d
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/validate.ts
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import '../../jest-extensions';
+
+import {
+ Schema,
+ RecordBatch,
+ RecordBatchReader,
+ RecordBatchFileReader,
+ RecordBatchStreamReader,
+} from 'apache-arrow';
+
+export function validateRecordBatchReader<T extends RecordBatchFileReader | RecordBatchStreamReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) {
+ const reader = r.open();
+ expect(reader).toBeInstanceOf(RecordBatchReader);
+ expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true);
+ expect(reader.schema).toBeInstanceOf(Schema);
+ validateRecordBatchIterator(numBatches, reader[Symbol.iterator]());
+ expect(reader.closed).toBe(reader.autoDestroy);
+ return reader;
+}
+
+export async function validateAsyncRecordBatchReader<T extends RecordBatchReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) {
+ const reader = await r.open();
+ expect(reader).toBeInstanceOf(RecordBatchReader);
+ expect(reader.schema).toBeInstanceOf(Schema);
+ expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true);
+ await validateRecordBatchAsyncIterator(numBatches, reader[Symbol.asyncIterator]());
+ expect(reader.closed).toBe(reader.autoDestroy);
+ return reader;
+}
+
+export function validateRecordBatchIterator(numBatches: number, iterator: Iterable<RecordBatch> | IterableIterator<RecordBatch>) {
+ let i = 0;
+ try {
+ for (const recordBatch of iterator) {
+ expect(recordBatch).toBeInstanceOf(RecordBatch);
+ expect(i++).toBeLessThan(numBatches);
+ }
+ } catch (e) { throw new Error(`${i}: ${e}`); }
+ expect(i).toBe(numBatches);
+ if (typeof (iterator as any).return === 'function') {
+ (iterator as any).return();
+ }
+}
+
+export async function validateRecordBatchAsyncIterator(numBatches: number, iterator: AsyncIterable<RecordBatch> | AsyncIterableIterator<RecordBatch>) {
+ let i = 0;
+ try {
+ for await (const recordBatch of iterator) {
+ expect(recordBatch).toBeInstanceOf(RecordBatch);
+ expect(i++).toBeLessThan(numBatches);
+ }
+ } catch (e) { throw new Error(`${i}: ${e}`); }
+ expect(i).toBe(numBatches);
+ if (typeof (iterator as any).return === 'function') {
+ await (iterator as any).return();
+ }
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts
new file mode 100644
index 000000000..fa639e5f6
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ generateDictionaryTables
+} from '../../../data/tables';
+
+import { validateRecordBatchIterator } from '../validate';
+import { Table, RecordBatchFileWriter } from 'apache-arrow';
+
+describe('RecordBatchFileWriter', () => {
+ for (const table of generateRandomTables([10, 20, 30])) {
+ testFileWriter(table, `[${table.schema.fields.join(', ')}]`);
+ }
+ for (const table of generateDictionaryTables([10, 20, 30])) {
+ testFileWriter(table, `${table.schema.fields[0]}`);
+ }
+});
+
+function testFileWriter(table: Table, name: string) {
+ describe(`should write the Arrow IPC file format (${name})`, () => {
+ test(`Table`, validateTable.bind(0, table));
+ });
+}
+
+async function validateTable(source: Table) {
+ const writer = RecordBatchFileWriter.writeAll(source);
+ const result = await Table.from(writer.toUint8Array());
+ validateRecordBatchIterator(3, source.chunks);
+ expect(result).toEqualTable(source);
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts
new file mode 100644
index 000000000..05be0e272
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ generateDictionaryTables
+} from '../../../data/tables';
+
+import { validateRecordBatchIterator } from '../validate';
+import { Table, RecordBatchJSONWriter } from 'apache-arrow';
+
+describe('RecordBatchJSONWriter', () => {
+ for (const table of generateRandomTables([10, 20, 30])) {
+ testJSONWriter(table, `[${table.schema.fields.join(', ')}]`);
+ }
+ for (const table of generateDictionaryTables([10, 20, 30])) {
+ testJSONWriter(table, `${table.schema.fields[0]}`);
+ }
+});
+
+function testJSONWriter(table: Table, name: string) {
+ describe(`should write the Arrow IPC JSON format (${name})`, () => {
+ test(`Table`, validateTable.bind(0, table));
+ });
+}
+
+async function validateTable(source: Table) {
+ const writer = RecordBatchJSONWriter.writeAll(source);
+ const result = Table.from(JSON.parse(await writer.toString()));
+ validateRecordBatchIterator(3, source.chunks);
+ expect(result).toEqualTable(source);
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts
new file mode 100644
index 000000000..a83aa39da
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ generateDictionaryTables
+} from '../../../data/tables';
+
+import * as generate from '../../../generate-test-data';
+import { validateRecordBatchIterator } from '../validate';
+import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer';
+import { DictionaryVector, Dictionary, Uint32, Int32 } from 'apache-arrow';
+import { Table, Schema, Field, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from 'apache-arrow';
+
+describe('RecordBatchStreamWriter', () => {
+
+ (() => {
+ const type = generate.sparseUnion(0, 0).vector.type;
+ const schema = new Schema([new Field('dictSparseUnion', type)]);
+ const table = generate.table([10, 20, 30], schema).table;
+ const testName = `[${table.schema.fields.join(', ')}]`;
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+ })();
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+ const testName = `[${table.schema.fields.join(', ')}]`;
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+ }
+
+ for (const table of generateDictionaryTables([10, 20, 30])) {
+ const testName = `${table.schema.fields[0]}`;
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
+ testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
+ }
+
+ it(`should write multiple tables to the same output stream`, async () => {
+ const tables = [] as Table[];
+ const writer = new RecordBatchStreamWriter({ autoDestroy: false });
+ const validate = (async () => {
+ for await (const reader of RecordBatchReader.readAll(writer)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+ })();
+ for (const table of generateRandomTables([10, 20, 30])) {
+ tables.push(table);
+ await writer.writeAll((async function*() {
+ for (const chunk of table.chunks) {
+ yield chunk; // insert some asynchrony
+ await new Promise((r) => setTimeout(r, 1));
+ }
+ }()));
+ }
+ writer.close();
+ await validate;
+ });
+
+ it('should write delta dictionary batches', async () => {
+
+ const name = 'dictionary_encoded_uint32';
+ const chunks: DictionaryVector<Uint32, Int32>[] = [];
+ const {
+ vector: sourceVector, values: sourceValues,
+ } = generate.dictionary(1000, 20, new Uint32(), new Int32());
+
+ const writer = RecordBatchStreamWriter.writeAll((function* () {
+ const transform = Builder.throughIterable({
+ type: sourceVector.type, nullValues: [null],
+ queueingStrategy: 'count', highWaterMark: 50,
+ });
+ for (const chunk of transform(sourceValues())) {
+ chunks.push(chunk);
+ yield RecordBatch.new({ [name]: chunk });
+ }
+ })());
+
+ expect(Chunked.concat(chunks)).toEqualVector(sourceVector);
+
+ type T = { [name]: Dictionary<Uint32, Int32> };
+ const sourceTable = Table.new({ [name]: sourceVector });
+ const resultTable = await Table.from<T>(writer.toUint8Array());
+
+ const { dictionary } = resultTable.getColumn(name);
+
+ expect(resultTable).toEqualTable(sourceTable);
+ expect((dictionary as Chunked)).toBeInstanceOf(Chunked);
+ expect((dictionary as Chunked).chunks).toHaveLength(20);
+ });
+});
+
+function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) {
+ describe(`should write the Arrow IPC stream format (${name})`, () => {
+ test(`Table`, validateTable.bind(0, table, options));
+ });
+}
+
+async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) {
+ const writer = RecordBatchStreamWriter.writeAll(source, options);
+ const result = await Table.from(writer.toUint8Array());
+ validateRecordBatchIterator(3, source.chunks);
+ expect(result).toEqualTable(source);
+}
diff --git a/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts
new file mode 100644
index 000000000..a19ddcdd7
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import { from, as } from 'ix/asynciterable';
+import { tap, flatMap } from 'ix/asynciterable/operators';
+
+import {
+ Table,
+ RecordBatchReader,
+ RecordBatchWriter,
+ RecordBatchFileWriter,
+ RecordBatchJSONWriter,
+ RecordBatchStreamWriter,
+} from 'apache-arrow';
+
+import {
+ ArrowIOTestHelper,
+ concatBuffersAsync,
+ readableDOMStreamToAsyncIterator
+} from '../helpers';
+
+import {
+ validateRecordBatchReader,
+ validateAsyncRecordBatchReader,
+ validateRecordBatchAsyncIterator
+} from '../validate';
+
+(() => {
+
+ if (process.env.TEST_DOM_STREAMS !== 'true') {
+ return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {});
+ }
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+
+ const file = ArrowIOTestHelper.file(table);
+ const json = ArrowIOTestHelper.json(table);
+ const stream = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchWriter.throughDOM (${name})`, () => {
+
+ describe('file', () => {
+ describe(`convert`, () => {
+ test('ReadableStream', file.whatwgReadableStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ test('ReadableByteStream', file.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ });
+ describe(`through`, () => {
+ test('ReadableStream', file.whatwgReadableStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ test('ReadableByteStream', file.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ });
+ });
+
+ describe('stream', () => {
+ describe(`convert`, () => {
+ test('ReadableStream', stream.whatwgReadableStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ test('ReadableByteStream', stream.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ });
+ describe(`through`, () => {
+ test('ReadableStream', stream.whatwgReadableStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ test('ReadableByteStream', stream.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ });
+ });
+
+ async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) {
+ const stream = source
+ .pipeThrough(RecordBatchReader.throughDOM())
+ .pipeThrough(RBWImplementation.throughDOM());
+ const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream';
+ await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream));
+ }
+
+ async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) {
+ const stream = source
+ .pipeThrough(RecordBatchReader.throughDOM())
+ .pipeThrough(RBWImplementation.throughDOM())
+ .pipeThrough(RecordBatchReader.throughDOM());
+ await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream));
+ }
+ });
+
+ describe(`toDOMStream (${name})`, () => {
+
+ const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x));
+
+ describe(`RecordBatchJSONWriter`, () => {
+
+ const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`);
+
+ test('Uint8Array', json.buffer((source) => validate(toJSON(source))));
+ test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source)))));
+
+ async function validate(source: { schema: any } | Promise<{ schema: any }>) {
+ const reader = await RecordBatchReader.from(<any> source);
+ const writer = await RecordBatchJSONWriter.writeAll(reader);
+ const buffer = await concatBuffersAsync(writer.toDOMStream());
+ validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer)));
+ }
+ });
+
+ describe(`RecordBatchFileWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchFileWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('file', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchFileWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('file', 3, reader);
+ }
+ });
+ });
+
+ describe(`RecordBatchStreamWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchStreamWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('stream', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchStreamWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toDOMStream());
+ await validateAsyncRecordBatchReader('stream', 3, reader);
+ }
+ });
+ });
+ });
+ }
+
+ describe(`RecordBatchStreamWriter.throughDOM`, () => {
+
+ const opts = { autoDestroy: false };
+ const sleep = (n: number) => new Promise((r) => setTimeout(r, n));
+
+ it(`should write a stream of tables to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const stream: ReadableStream<any> = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(stream.locked).toBe(false);
+ });
+
+ it(`should write a stream of record batches to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const stream = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ // flatMap from Table -> RecordBatches[]
+ .pipe(flatMap((table) => as(table.chunks)))
+ .pipeThrough(RecordBatchStreamWriter.throughDOM(opts));
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(stream.locked).toBe(false);
+ });
+ });
+
+})();
diff --git a/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts
new file mode 100644
index 000000000..662129b1b
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ generateRandomTables,
+ // generateDictionaryTables
+} from '../../../data/tables';
+
+import { from, as } from 'ix/asynciterable';
+import { tap, flatMap } from 'ix/asynciterable/operators';
+import 'ix/Ix.node';
+
+import {
+ Table,
+ RecordBatchReader,
+ RecordBatchWriter,
+ RecordBatchFileWriter,
+ RecordBatchJSONWriter,
+ RecordBatchStreamWriter,
+} from 'apache-arrow';
+
+import {
+ ArrowIOTestHelper,
+ concatBuffersAsync
+} from '../helpers';
+
+import {
+ validateRecordBatchReader,
+ validateAsyncRecordBatchReader,
+ validateRecordBatchAsyncIterator
+} from '../validate';
+
+(() => {
+
+ if (process.env.TEST_NODE_STREAMS !== 'true') {
+ return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {});
+ }
+
+ for (const table of generateRandomTables([10, 20, 30])) {
+
+ const file = ArrowIOTestHelper.file(table);
+ const json = ArrowIOTestHelper.json(table);
+ const stream = ArrowIOTestHelper.stream(table);
+ const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
+
+ describe(`RecordBatchWriter.throughNode (${name})`, () => {
+
+ describe('file', () => {
+ describe(`convert`, () => {
+ test('fs.ReadStream', file.fsReadableStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ test('stream.Readable', file.nodeReadableStream(validateConvert.bind(0, RecordBatchStreamWriter)));
+ });
+ describe(`through`, () => {
+ test('fs.ReadStream', file.fsReadableStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ test('stream.Readable', file.nodeReadableStream(validateThrough.bind(0, RecordBatchFileWriter)));
+ });
+ });
+
+ describe('stream', () => {
+ describe(`convert`, () => {
+ test('fs.ReadStream', stream.fsReadableStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ test('stream.Readable', stream.nodeReadableStream(validateConvert.bind(0, RecordBatchFileWriter)));
+ });
+ describe(`through`, () => {
+ test('fs.ReadStream', stream.fsReadableStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ test('stream.Readable', stream.nodeReadableStream(validateThrough.bind(0, RecordBatchStreamWriter)));
+ });
+ });
+
+ async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) {
+ const stream = source
+ .pipe(RecordBatchReader.throughNode())
+ .pipe(RBWImplementation.throughNode());
+ const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream';
+ await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream));
+ }
+
+ async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) {
+ const stream = source
+ .pipe(RecordBatchReader.throughNode())
+ .pipe(RBWImplementation.throughNode())
+ .pipe(RecordBatchReader.throughNode());
+ await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]());
+ }
+ });
+
+ describe(`toNodeStream (${name})`, () => {
+
+ const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x));
+
+ describe(`RecordBatchJSONWriter`, () => {
+
+ const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`);
+
+ test('Uint8Array', json.buffer((source) => validate(toJSON(source))));
+ test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source)))));
+
+ async function validate(source: { schema: any } | Promise<{ schema: any }>) {
+ const reader = await RecordBatchReader.from(<any> source);
+ const writer = await RecordBatchJSONWriter.writeAll(reader);
+ const buffer = await concatBuffersAsync(writer.toNodeStream());
+ validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer)));
+ }
+ });
+
+ describe(`RecordBatchFileWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchFileWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('file', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, file.buffer(validate));
+ test(`Iterable`, file.iterable(validate));
+ test('AsyncIterable', file.asyncIterable(validate));
+ test('fs.FileHandle', file.fsFileHandle(validate));
+ test('fs.ReadStream', file.fsReadableStream(validate));
+ test('stream.Readable', file.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', file.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchFileWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('file', 3, reader);
+ }
+ });
+ });
+
+ describe(`RecordBatchStreamWriter`, () => {
+
+ describe(`sync write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const reader = await RecordBatchReader.from(source);
+ const writer = await RecordBatchStreamWriter.writeAll(reader);
+ const stream = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('stream', 3, stream);
+ }
+ });
+
+ describe(`async write/read`, () => {
+
+ test(`Uint8Array`, stream.buffer(validate));
+ test(`Iterable`, stream.iterable(validate));
+ test('AsyncIterable', stream.asyncIterable(validate));
+ test('fs.FileHandle', stream.fsFileHandle(validate));
+ test('fs.ReadStream', stream.fsReadableStream(validate));
+ test('stream.Readable', stream.nodeReadableStream(validate));
+ test('whatwg.ReadableStream', stream.whatwgReadableStream(validate));
+ test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate));
+ test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate)));
+ test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate)));
+ test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate)));
+ test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate)));
+ test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate)));
+
+ async function validate(source: any) {
+ const writer = new RecordBatchStreamWriter();
+ /* no await */ writer.writeAll(await RecordBatchReader.from(source));
+ const reader = await RecordBatchReader.from(writer.toNodeStream());
+ await validateAsyncRecordBatchReader('stream', 3, reader);
+ }
+ });
+ });
+ });
+ }
+
+ describe(`RecordBatchStreamWriter.throughNode`, () => {
+
+ const sleep = (n: number) => new Promise((r) => setTimeout(r, n));
+
+ it(`should write a stream of tables to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false });
+ const stream = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ .pipe(writer);
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(writer.readable).toBe(false);
+ expect((writer as any).destroyed).toBe(true);
+ });
+
+ it(`should write a stream of record batches to the same output stream`, async () => {
+
+ const tables = [] as Table[];
+ const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false });
+ const stream = from(generateRandomTables([10, 20, 30]))
+ // insert some asynchrony
+ .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
+ .pipe(flatMap((table) => as(table.chunks)))
+ .pipe(writer);
+
+ for await (const reader of RecordBatchReader.readAll(stream)) {
+ const sourceTable = tables.shift()!;
+ const streamTable = await Table.from(reader);
+ expect(streamTable).toEqualTable(sourceTable);
+ }
+
+ expect(tables).toHaveLength(0);
+ expect(writer.readable).toBe(false);
+ expect((writer as any).destroyed).toBe(true);
+ });
+
+ });
+})();