summaryrefslogtreecommitdiffstats
path: root/src/arrow/js/test/unit/ipc/helpers.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/js/test/unit/ipc/helpers.ts')
-rw-r--r--src/arrow/js/test/unit/ipc/helpers.ts202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/ipc/helpers.ts b/src/arrow/js/test/unit/ipc/helpers.ts
new file mode 100644
index 000000000..9fccefec9
--- /dev/null
+++ b/src/arrow/js/test/unit/ipc/helpers.ts
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import '../../jest-extensions';
+
+import {
+ Table,
+ RecordBatchWriter,
+ RecordBatchFileWriter,
+ RecordBatchJSONWriter,
+ RecordBatchStreamWriter,
+} from 'apache-arrow';
+
+import * as fs from 'fs';
+import { fs as memfs } from 'memfs';
+import { Readable, PassThrough } from 'stream';
+import randomatic from 'randomatic';
+
+export abstract class ArrowIOTestHelper {
+
+ constructor(public table: Table) {}
+
+ public static file(table: Table) { return new ArrowFileIOTestHelper(table); }
+ public static json(table: Table) { return new ArrowJsonIOTestHelper(table); }
+ public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); }
+
+ protected abstract writer(table: Table): RecordBatchWriter;
+ protected async filepath(table: Table): Promise<fs.PathLike> {
+ const path = `/${randomatic('a0', 20)}.arrow`;
+ const data = await this.writer(table).toUint8Array();
+ await memfs.promises.writeFile(path, data);
+ return path;
+ }
+
+ buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ await testFn(await this.writer(this.table).toUint8Array());
+ };
+ }
+ iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ await testFn(chunkedIterable(await this.writer(this.table).toUint8Array()));
+ };
+ }
+ asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array()));
+ };
+ }
+ fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(<any> await memfs.promises.open(path, 'r'));
+ await memfs.promises.unlink(path);
+ };
+ }
+ fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(<any> memfs.createReadStream(path));
+ await memfs.promises.unlink(path);
+ };
+ }
+ nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const sink = new PassThrough();
+ sink.end(await this.writer(this.table).toUint8Array());
+ await testFn(sink);
+ };
+ }
+ whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(nodeToDOMStream(memfs.createReadStream(path)));
+ await memfs.promises.unlink(path);
+ };
+ }
+ whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) {
+ return async () => {
+ expect.hasAssertions();
+ const path = await this.filepath(this.table);
+ await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' }));
+ await memfs.promises.unlink(path);
+ };
+ }
+}
+
+class ArrowFileIOTestHelper extends ArrowIOTestHelper {
+ constructor(table: Table) { super(table); }
+ protected writer(table: Table) {
+ return RecordBatchFileWriter.writeAll(table);
+ }
+}
+
+class ArrowJsonIOTestHelper extends ArrowIOTestHelper {
+ constructor(table: Table) { super(table); }
+ protected writer(table: Table) {
+ return RecordBatchJSONWriter.writeAll(table);
+ }
+}
+
+class ArrowStreamIOTestHelper extends ArrowIOTestHelper {
+ constructor(table: Table) { super(table); }
+ protected writer(table: Table) {
+ return RecordBatchStreamWriter.writeAll(table);
+ }
+}
+
+export function* chunkedIterable(buffer: Uint8Array) {
+ let offset = 0, size = 0;
+ while (offset < buffer.byteLength) {
+ size = yield buffer.subarray(offset, offset +=
+ (isNaN(+size) ? buffer.byteLength - offset : size));
+ }
+}
+
+export async function* asyncChunkedIterable(buffer: Uint8Array) {
+ let offset = 0, size = 0;
+ while (offset < buffer.byteLength) {
+ size = yield buffer.subarray(offset, offset +=
+ (isNaN(+size) ? buffer.byteLength - offset : size));
+ }
+}
+
+export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) {
+ if (iterator instanceof ReadableStream) {
+ iterator = readableDOMStreamToAsyncIterator(iterator);
+ }
+ let chunks = [], total = 0;
+ for await (const chunk of iterator) {
+ chunks.push(chunk);
+ total += chunk.byteLength;
+ }
+ return chunks.reduce((x, buffer) => {
+ x.buffer.set(buffer, x.offset);
+ x.offset += buffer.byteLength;
+ return x;
+ }, { offset: 0, buffer: new Uint8Array(total) }).buffer;
+}
+
+export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) {
+ // Get a lock on the stream
+ const reader = stream.getReader();
+ try {
+ while (true) {
+ // Read from the stream
+ const { done, value } = await reader.read();
+ // Exit if we're done
+ if (done) { break; }
+ // Else yield the chunk
+ yield value as T;
+ }
+ } finally {
+ try { stream.locked && reader.releaseLock(); } catch (e) {}
+ }
+}
+
+export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) {
+ stream = new Readable((stream as any)._readableState).wrap(stream);
+ return new ReadableStream<T>({
+ ...opts,
+ start(controller) {
+ stream.pause();
+ stream.on('data', (chunk) => {
+ controller.enqueue(chunk);
+ stream.pause();
+ });
+ stream.on('end', () => controller.close());
+ stream.on('error', e => controller.error(e));
+ },
+ pull() { stream.resume(); },
+ cancel(reason) {
+ stream.pause();
+ if (typeof (stream as any).cancel === 'function') {
+ return (stream as any).cancel(reason);
+ } else if (typeof (stream as any).destroy === 'function') {
+ return (stream as any).destroy(reason);
+ }
+ }
+ });
+}