From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/arrow/js/test/unit/ipc/helpers.ts | 202 ++++++++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 src/arrow/js/test/unit/ipc/helpers.ts (limited to 'src/arrow/js/test/unit/ipc/helpers.ts') diff --git a/src/arrow/js/test/unit/ipc/helpers.ts b/src/arrow/js/test/unit/ipc/helpers.ts new file mode 100644 index 000000000..9fccefec9 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/helpers.ts @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; + +import { + Table, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import * as fs from 'fs'; +import { fs as memfs } from 'memfs'; +import { Readable, PassThrough } from 'stream'; +import randomatic from 'randomatic'; + +export abstract class ArrowIOTestHelper { + + constructor(public table: Table) {} + + public static file(table: Table) { return new ArrowFileIOTestHelper(table); } + public static json(table: Table) { return new ArrowJsonIOTestHelper(table); } + public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); } + + protected abstract writer(table: Table): RecordBatchWriter; + protected async filepath(table: Table): Promise { + const path = `/${randomatic('a0', 20)}.arrow`; + const data = await this.writer(table).toUint8Array(); + await memfs.promises.writeFile(path, data); + return path; + } + + buffer(testFn: (buffer: Uint8Array) => void | Promise) { + return async () => { + expect.hasAssertions(); + await testFn(await this.writer(this.table).toUint8Array()); + }; + } + iterable(testFn: (iterable: Generator) => void | Promise) { + return async () => { + expect.hasAssertions(); + await testFn(chunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + asyncIterable(testFn: (asyncIterable: AsyncGenerator) => void | Promise) { + return async () => { + expect.hasAssertions(); + await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn( await memfs.promises.open(path, 'r')); + await memfs.promises.unlink(path); + }; + } + fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn( memfs.createReadStream(path)); + await memfs.promises.unlink(path); + }; + } + nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise) { + return async () => { + expect.hasAssertions(); + const sink = new PassThrough(); + sink.end(await this.writer(this.table).toUint8Array()); + await testFn(sink); + }; + } + whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path))); + await memfs.promises.unlink(path); + }; + } + whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' })); + await memfs.promises.unlink(path); + }; + } +} + +class ArrowFileIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchFileWriter.writeAll(table); + } +} + +class ArrowJsonIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchJSONWriter.writeAll(table); + } +} + +class ArrowStreamIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchStreamWriter.writeAll(table); + } +} + +export function* chunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function* asyncChunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function concatBuffersAsync(iterator: AsyncIterable | ReadableStream) { + if (iterator instanceof ReadableStream) { + iterator = readableDOMStreamToAsyncIterator(iterator); + } + let chunks = [], total = 0; + for await (const chunk of iterator) { + chunks.push(chunk); + total += chunk.byteLength; + } + return chunks.reduce((x, buffer) => { + x.buffer.set(buffer, x.offset); + x.offset += buffer.byteLength; + return x; + }, { offset: 0, buffer: new Uint8Array(total) }).buffer; +} + +export async function* readableDOMStreamToAsyncIterator(stream: ReadableStream) { + // Get a lock on the stream + const reader = stream.getReader(); + try { + while (true) { + // Read from the stream + const { done, value } = await reader.read(); + // Exit if we're done + if (done) { break; } + // Else yield the chunk + yield value as T; + } + } finally { + try { stream.locked && reader.releaseLock(); } catch (e) {} + } +} + +export function nodeToDOMStream(stream: NodeJS.ReadableStream, opts: any = {}) { + stream = new Readable((stream as any)._readableState).wrap(stream); + return new ReadableStream({ + ...opts, + start(controller) { + stream.pause(); + stream.on('data', (chunk) => { + controller.enqueue(chunk); + stream.pause(); + }); + stream.on('end', () => controller.close()); + stream.on('error', e => controller.error(e)); + }, + pull() { stream.resume(); }, + cancel(reason) { + stream.pause(); + if (typeof (stream as any).cancel === 'function') { + return (stream as any).cancel(reason); + } else if (typeof (stream as any).destroy === 'function') { + return (stream as any).destroy(reason); + } + } + }); +} -- cgit v1.2.3