summaryrefslogtreecommitdiffstats
path: root/src/arrow/js/src/io/node
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/js/src/io/node
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/js/src/io/node')
-rw-r--r--src/arrow/js/src/io/node/builder.ts98
-rw-r--r--src/arrow/js/src/io/node/iterable.ts113
-rw-r--r--src/arrow/js/src/io/node/reader.ts86
-rw-r--r--src/arrow/js/src/io/node/writer.ts77
4 files changed, 374 insertions, 0 deletions
diff --git a/src/arrow/js/src/io/node/builder.ts b/src/arrow/js/src/io/node/builder.ts
new file mode 100644
index 000000000..eb9579536
--- /dev/null
+++ b/src/arrow/js/src/io/node/builder.ts
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { Duplex } from 'stream';
+import { DataType } from '../../type';
+import { Builder, BuilderOptions } from '../../builder/index';
+
+/** @ignore */
+export interface BuilderDuplexOptions<T extends DataType = any, TNull = any> extends BuilderOptions<T, TNull> {
+ autoDestroy?: boolean;
+ highWaterMark?: number;
+ queueingStrategy?: 'bytes' | 'count';
+ dictionaryHashFunction?: (value: any) => string | number;
+ valueToChildTypeId?: (builder: Builder<T, TNull>, value: any, offset: number) => number;
+}
+
+/** @ignore */
+export function builderThroughNodeStream<T extends DataType = any, TNull = any>(options: BuilderDuplexOptions<T, TNull>) {
+ return new BuilderDuplex(Builder.new(options), options);
+}
+
+/** @ignore */
+type CB = (error?: Error | null | undefined) => void;
+
+/** @ignore */
+class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {
+
+ private _finished: boolean;
+ private _numChunks: number;
+ private _desiredSize: number;
+ private _builder: Builder<T, TNull>;
+ private _getSize: (builder: Builder<T, TNull>) => number;
+
+ constructor(builder: Builder<T, TNull>, options: BuilderDuplexOptions<T, TNull>) {
+
+ const { queueingStrategy = 'count', autoDestroy = true } = options;
+ const { highWaterMark = queueingStrategy !== 'bytes' ? 1000 : 2 ** 14 } = options;
+
+ super({ autoDestroy, highWaterMark: 1, allowHalfOpen: true, writableObjectMode: true, readableObjectMode: true });
+
+ this._numChunks = 0;
+ this._finished = false;
+ this._builder = builder;
+ this._desiredSize = highWaterMark;
+ this._getSize = queueingStrategy !== 'bytes' ? builderLength : builderByteLength;
+ }
+ _read(size: number) {
+ this._maybeFlush(this._builder, this._desiredSize = size);
+ }
+ _final(cb?: CB) {
+ this._maybeFlush(this._builder.finish(), this._desiredSize);
+ cb && cb();
+ }
+ _write(value: any, _: string, cb?: CB) {
+ const result = this._maybeFlush(
+ this._builder.append(value),
+ this._desiredSize
+ );
+ cb && cb();
+ return result;
+ }
+ _destroy(err: Error | null, cb?: (error: Error | null) => void) {
+ this._builder.clear();
+ cb && cb(err);
+ }
+ private _maybeFlush(builder: Builder<T, TNull>, size: number) {
+ if (this._getSize(builder) >= size) {
+ ++this._numChunks && this.push(builder.toVector());
+ }
+ if (builder.finished) {
+ if (builder.length > 0 || this._numChunks === 0) {
+ ++this._numChunks && this.push(builder.toVector());
+ }
+ if (!this._finished && (this._finished = true)) {
+ this.push(null);
+ }
+ return false;
+ }
+ return this._getSize(builder) < this.writableHighWaterMark;
+ }
+}
+
+/** @ignore */ const builderLength = <T extends DataType = any>(builder: Builder<T>) => builder.length;
+/** @ignore */ const builderByteLength = <T extends DataType = any>(builder: Builder<T>) => builder.byteLength;
diff --git a/src/arrow/js/src/io/node/iterable.ts b/src/arrow/js/src/io/node/iterable.ts
new file mode 100644
index 000000000..457bc894d
--- /dev/null
+++ b/src/arrow/js/src/io/node/iterable.ts
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { Readable } from 'stream';
+import { isIterable, isAsyncIterable } from '../../util/compat';
+
+/** @ignore */
+type ReadableOptions = import('stream').ReadableOptions;
+/** @ignore */
+type SourceIterator<T> = Generator<T, void, number | null>;
+/** @ignore */
+type AsyncSourceIterator<T> = AsyncGenerator<T, void, number | null>;
+
+/** @ignore */
+export function toNodeStream<T>(source: Iterable<T> | AsyncIterable<T>, options?: ReadableOptions): Readable {
+ if (isAsyncIterable<T>(source)) { return new AsyncIterableReadable(source[Symbol.asyncIterator]() as AsyncSourceIterator<T>, options); }
+ if (isIterable<T>(source)) { return new IterableReadable(source[Symbol.iterator]() as SourceIterator<T>, options); }
+ /* istanbul ignore next */
+ throw new Error(`toNodeStream() must be called with an Iterable or AsyncIterable`);
+}
+
+/** @ignore */
+class IterableReadable<T extends Uint8Array | any> extends Readable {
+ private _pulling: boolean;
+ private _bytesMode: boolean;
+ private _iterator: SourceIterator<T>;
+ constructor(it: SourceIterator<T>, options?: ReadableOptions) {
+ super(options);
+ this._iterator = it;
+ this._pulling = false;
+ this._bytesMode = !options || !options.objectMode;
+ }
+ _read(size: number) {
+ const it = this._iterator;
+ if (it && !this._pulling && (this._pulling = true)) {
+ this._pulling = this._pull(size, it);
+ }
+ }
+ _destroy(e: Error | null, cb: (e: Error | null) => void) {
+ const it = this._iterator;
+ let fn: any;
+ it && (fn = e != null && it.throw || it.return);
+ fn?.call(it, e);
+ cb && cb(null);
+ }
+ private _pull(size: number, it: SourceIterator<T>) {
+ const bm = this._bytesMode;
+ let r: IteratorResult<T> | null = null;
+ while (this.readable && !(r = it.next(bm ? size : null)).done) {
+ if (size != null) {
+ size -= (bm && ArrayBuffer.isView(r.value) ? r.value.byteLength : 1);
+ }
+ if (!this.push(r.value) || size <= 0) { break; }
+ }
+ if ((r?.done || !this.readable) && (this.push(null) || true)) {
+ it.return && it.return();
+ }
+ return !this.readable;
+ }
+}
+
+/** @ignore */
+class AsyncIterableReadable<T extends Uint8Array | any> extends Readable {
+ private _pulling: boolean;
+ private _bytesMode: boolean;
+ private _iterator: AsyncSourceIterator<T>;
+ constructor(it: AsyncSourceIterator<T>, options?: ReadableOptions) {
+ super(options);
+ this._iterator = it;
+ this._pulling = false;
+ this._bytesMode = !options || !options.objectMode;
+ }
+ _read(size: number) {
+ const it = this._iterator;
+ if (it && !this._pulling && (this._pulling = true)) {
+ (async () => this._pulling = await this._pull(size, it))();
+ }
+ }
+ _destroy(e: Error | null, cb: (e: Error | null) => void) {
+ const it = this._iterator;
+ let fn: any;
+ it && (fn = e != null && it.throw || it.return);
+ fn?.call(it, e).then(() => cb && cb(null)) || (cb && cb(null));
+ }
+ private async _pull(size: number, it: AsyncSourceIterator<T>) {
+ const bm = this._bytesMode;
+ let r: IteratorResult<T> | null = null;
+ while (this.readable && !(r = await it.next(bm ? size : null)).done) {
+ if (size != null) {
+ size -= (bm && ArrayBuffer.isView(r.value) ? r.value.byteLength : 1);
+ }
+ if (!this.push(r.value) || size <= 0) { break; }
+ }
+ if ((r?.done || !this.readable) && (this.push(null) || true)) {
+ it.return && it.return();
+ }
+ return !this.readable;
+ }
+}
diff --git a/src/arrow/js/src/io/node/reader.ts b/src/arrow/js/src/io/node/reader.ts
new file mode 100644
index 000000000..a51fb0b40
--- /dev/null
+++ b/src/arrow/js/src/io/node/reader.ts
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { DataType } from '../../type';
+import { Duplex, DuplexOptions } from 'stream';
+import { RecordBatch } from '../../recordbatch';
+import { AsyncByteQueue } from '../../io/stream';
+import { RecordBatchReader } from '../../ipc/reader';
+
+/** @ignore */
+export function recordBatchReaderThroughNodeStream<T extends { [key: string]: DataType } = any>(options?: DuplexOptions & { autoDestroy: boolean }) {
+ return new RecordBatchReaderDuplex<T>(options);
+}
+
+/** @ignore */
+type CB = (error?: Error | null | undefined) => void;
+
+/** @ignore */
+class RecordBatchReaderDuplex<T extends { [key: string]: DataType } = any> extends Duplex {
+ private _pulling = false;
+ private _autoDestroy = true;
+ private _reader: RecordBatchReader | null;
+ private _asyncQueue: AsyncByteQueue | null;
+ constructor(options?: DuplexOptions & { autoDestroy: boolean }) {
+ super({ allowHalfOpen: false, ...options, readableObjectMode: true, writableObjectMode: false });
+ this._reader = null;
+ this._pulling = false;
+ this._asyncQueue = new AsyncByteQueue();
+ this._autoDestroy = options && (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true;
+ }
+ _final(cb?: CB) {
+ const aq = this._asyncQueue;
+ aq?.close();
+ cb && cb();
+ }
+ _write(x: any, _: string, cb: CB) {
+ const aq = this._asyncQueue;
+ aq?.write(x);
+ cb && cb();
+ return true;
+ }
+ _read(size: number) {
+ const aq = this._asyncQueue;
+ if (aq && !this._pulling && (this._pulling = true)) {
+ (async () => {
+ if (!this._reader) {
+ this._reader = await this._open(aq);
+ }
+ this._pulling = await this._pull(size, this._reader);
+ })();
+ }
+ }
+ _destroy(err: Error | null, cb: (error: Error | null) => void) {
+ const aq = this._asyncQueue;
+ if (aq) { err ? aq.abort(err) : aq.close(); }
+ cb(this._asyncQueue = this._reader = null);
+ }
+ async _open(source: AsyncByteQueue) {
+ return await (await RecordBatchReader.from<T>(source)).open({ autoDestroy: this._autoDestroy });
+ }
+ async _pull(size: number, reader: RecordBatchReader<T>) {
+ let r: IteratorResult<RecordBatch<T>> | null = null;
+ while (this.readable && !(r = await reader.next()).done) {
+ if (!this.push(r.value) || (size != null && --size <= 0)) { break; }
+ }
+ if (!this.readable || (r?.done && (reader.autoDestroy || (await reader.reset().open()).closed))) {
+ this.push(null);
+ await reader.cancel();
+ }
+ return !this.readable;
+ }
+}
diff --git a/src/arrow/js/src/io/node/writer.ts b/src/arrow/js/src/io/node/writer.ts
new file mode 100644
index 000000000..79d61b9a3
--- /dev/null
+++ b/src/arrow/js/src/io/node/writer.ts
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { DataType } from '../../type';
+import { Duplex, DuplexOptions } from 'stream';
+import { AsyncByteStream } from '../../io/stream';
+import { RecordBatchWriter } from '../../ipc/writer';
+
+/** @ignore */
+export function recordBatchWriterThroughNodeStream<T extends { [key: string]: DataType } = any>(this: typeof RecordBatchWriter, options?: DuplexOptions & { autoDestroy: boolean }) {
+ return new RecordBatchWriterDuplex(new this<T>(options));
+}
+
+/** @ignore */
+type CB = (error?: Error | null | undefined) => void;
+
+/** @ignore */
+class RecordBatchWriterDuplex<T extends { [key: string]: DataType } = any> extends Duplex {
+ private _pulling = false;
+ private _reader: AsyncByteStream | null;
+ private _writer: RecordBatchWriter | null;
+ constructor(writer: RecordBatchWriter<T>, options?: DuplexOptions) {
+ super({ allowHalfOpen: false, ...options, writableObjectMode: true, readableObjectMode: false });
+ this._writer = writer;
+ this._reader = new AsyncByteStream(writer);
+ }
+ _final(cb?: CB) {
+ const writer = this._writer;
+ writer?.close();
+ cb && cb();
+ }
+ _write(x: any, _: string, cb: CB) {
+ const writer = this._writer;
+ writer?.write(x);
+ cb && cb();
+ return true;
+ }
+ _read(size: number) {
+ const it = this._reader;
+ if (it && !this._pulling && (this._pulling = true)) {
+ (async () => this._pulling = await this._pull(size, it))();
+ }
+ }
+ _destroy(err: Error | null, cb: (error: Error | null) => void) {
+ const writer = this._writer;
+ if (writer) { err ? writer.abort(err) : writer.close(); }
+ cb(this._reader = this._writer = null);
+ }
+ async _pull(size: number, reader: AsyncByteStream) {
+ let r: IteratorResult<Uint8Array> | null = null;
+ while (this.readable && !(r = await reader.next(size || null)).done) {
+ if (size != null && r.value) {
+ size -= r.value.byteLength;
+ }
+ if (!this.push(r.value) || size <= 0) { break; }
+ }
+ if ((r?.done || !this.readable)) {
+ this.push(null);
+ await reader.cancel();
+ }
+ return !this.readable;
+ }
+}