diff options
Diffstat (limited to 'src/arrow/js/test/unit/ipc/helpers.ts')
-rw-r--r-- | src/arrow/js/test/unit/ipc/helpers.ts | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/ipc/helpers.ts b/src/arrow/js/test/unit/ipc/helpers.ts new file mode 100644 index 000000000..9fccefec9 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/helpers.ts @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; + +import { + Table, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import * as fs from 'fs'; +import { fs as memfs } from 'memfs'; +import { Readable, PassThrough } from 'stream'; +import randomatic from 'randomatic'; + +export abstract class ArrowIOTestHelper { + + constructor(public table: Table) {} + + public static file(table: Table) { return new ArrowFileIOTestHelper(table); } + public static json(table: Table) { return new ArrowJsonIOTestHelper(table); } + public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); } + + protected abstract writer(table: Table): RecordBatchWriter; + protected async filepath(table: Table): Promise<fs.PathLike> { + const path = `/${randomatic('a0', 20)}.arrow`; + const data = await this.writer(table).toUint8Array(); + await memfs.promises.writeFile(path, data); + return path; + } + + buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(await this.writer(this.table).toUint8Array()); + }; + } + iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(chunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(<any> await memfs.promises.open(path, 'r')); + await memfs.promises.unlink(path); + }; + } + fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(<any> memfs.createReadStream(path)); + await memfs.promises.unlink(path); + }; + } + nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const sink = new PassThrough(); + sink.end(await this.writer(this.table).toUint8Array()); + await testFn(sink); + }; + } + whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path))); + await memfs.promises.unlink(path); + }; + } + whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' })); + await memfs.promises.unlink(path); + }; + } +} + +class ArrowFileIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchFileWriter.writeAll(table); + } +} + +class ArrowJsonIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchJSONWriter.writeAll(table); + } +} + +class ArrowStreamIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchStreamWriter.writeAll(table); + } +} + +export function* chunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function* asyncChunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) { + if (iterator instanceof ReadableStream) { + iterator = readableDOMStreamToAsyncIterator(iterator); + } + let chunks = [], total = 0; + for await (const chunk of iterator) { + chunks.push(chunk); + total += chunk.byteLength; + } + return chunks.reduce((x, buffer) => { + x.buffer.set(buffer, x.offset); + x.offset += buffer.byteLength; + return x; + }, { offset: 0, buffer: new Uint8Array(total) }).buffer; +} + +export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) { + // Get a lock on the stream + const reader = stream.getReader(); + try { + while (true) { + // Read from the stream + const { done, value } = await reader.read(); + // Exit if we're done + if (done) { break; } + // Else yield the chunk + yield value as T; + } + } finally { + try { stream.locked && reader.releaseLock(); } catch (e) {} + } +} + +export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) { + stream = new Readable((stream as any)._readableState).wrap(stream); + return new ReadableStream<T>({ + ...opts, + start(controller) { + stream.pause(); + stream.on('data', (chunk) => { + controller.enqueue(chunk); + stream.pause(); + }); + stream.on('end', () => controller.close()); + stream.on('error', e => controller.error(e)); + }, + pull() { stream.resume(); }, + cancel(reason) { + stream.pause(); + if (typeof (stream as any).cancel === 'function') { + return (stream as any).cancel(reason); + } else if (typeof (stream as any).destroy === 'function') { + return (stream as any).destroy(reason); + } + } + }); +} |