summaryrefslogtreecommitdiffstats
path: root/src/arrow/js/src/io/node/builder.ts
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/js/src/io/node/builder.ts
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/js/src/io/node/builder.ts')
-rw-r--r--src/arrow/js/src/io/node/builder.ts98
1 files changed, 98 insertions, 0 deletions
diff --git a/src/arrow/js/src/io/node/builder.ts b/src/arrow/js/src/io/node/builder.ts
new file mode 100644
index 000000000..eb9579536
--- /dev/null
+++ b/src/arrow/js/src/io/node/builder.ts
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { Duplex } from 'stream';
+import { DataType } from '../../type';
+import { Builder, BuilderOptions } from '../../builder/index';
+
+/** @ignore */
+export interface BuilderDuplexOptions<T extends DataType = any, TNull = any> extends BuilderOptions<T, TNull> {
+ autoDestroy?: boolean;
+ highWaterMark?: number;
+ queueingStrategy?: 'bytes' | 'count';
+ dictionaryHashFunction?: (value: any) => string | number;
+ valueToChildTypeId?: (builder: Builder<T, TNull>, value: any, offset: number) => number;
+}
+
+/** @ignore */
+export function builderThroughNodeStream<T extends DataType = any, TNull = any>(options: BuilderDuplexOptions<T, TNull>) {
+ return new BuilderDuplex(Builder.new(options), options);
+}
+
+/** @ignore */
+type CB = (error?: Error | null | undefined) => void;
+
+/** @ignore */
+class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {
+
+ private _finished: boolean;
+ private _numChunks: number;
+ private _desiredSize: number;
+ private _builder: Builder<T, TNull>;
+ private _getSize: (builder: Builder<T, TNull>) => number;
+
+ constructor(builder: Builder<T, TNull>, options: BuilderDuplexOptions<T, TNull>) {
+
+ const { queueingStrategy = 'count', autoDestroy = true } = options;
+ const { highWaterMark = queueingStrategy !== 'bytes' ? 1000 : 2 ** 14 } = options;
+
+ super({ autoDestroy, highWaterMark: 1, allowHalfOpen: true, writableObjectMode: true, readableObjectMode: true });
+
+ this._numChunks = 0;
+ this._finished = false;
+ this._builder = builder;
+ this._desiredSize = highWaterMark;
+ this._getSize = queueingStrategy !== 'bytes' ? builderLength : builderByteLength;
+ }
+ _read(size: number) {
+ this._maybeFlush(this._builder, this._desiredSize = size);
+ }
+ _final(cb?: CB) {
+ this._maybeFlush(this._builder.finish(), this._desiredSize);
+ cb && cb();
+ }
+ _write(value: any, _: string, cb?: CB) {
+ const result = this._maybeFlush(
+ this._builder.append(value),
+ this._desiredSize
+ );
+ cb && cb();
+ return result;
+ }
+ _destroy(err: Error | null, cb?: (error: Error | null) => void) {
+ this._builder.clear();
+ cb && cb(err);
+ }
+ private _maybeFlush(builder: Builder<T, TNull>, size: number) {
+ if (this._getSize(builder) >= size) {
+ ++this._numChunks && this.push(builder.toVector());
+ }
+ if (builder.finished) {
+ if (builder.length > 0 || this._numChunks === 0) {
+ ++this._numChunks && this.push(builder.toVector());
+ }
+ if (!this._finished && (this._finished = true)) {
+ this.push(null);
+ }
+ return false;
+ }
+ return this._getSize(builder) < this.writableHighWaterMark;
+ }
+}
+
+/** @ignore */ const builderLength = <T extends DataType = any>(builder: Builder<T>) => builder.length;
+/** @ignore */ const builderByteLength = <T extends DataType = any>(builder: Builder<T>) => builder.byteLength;