diff options
Diffstat (limited to '')
39 files changed, 5841 insertions, 0 deletions
diff --git a/src/arrow/js/test/unit/bit-tests.ts b/src/arrow/js/test/unit/bit-tests.ts new file mode 100644 index 000000000..cdfb37c16 --- /dev/null +++ b/src/arrow/js/test/unit/bit-tests.ts @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import * as Arrow from 'apache-arrow'; +const { BitIterator, getBool } = Arrow.util; + +describe('Bits', () => { + test('BitIterator produces correct bits for single byte', () => { + const byte = new Uint8Array([0b11110000]); + expect([...new BitIterator(byte, 0, 8, null, getBool)]).toEqual( + [false, false, false, false, true, true, true, true]); + + expect([...new BitIterator(byte, 2, 5, null, getBool)]).toEqual( + [false, false, true, true, true]); + }); + + test('BitIterator produces correct bits for multiple bytes', () => { + const byte = new Uint8Array([0b11110000, 0b10101010]); + expect([...new BitIterator(byte, 0, 16, null, getBool)]).toEqual( + [false, false, false, false, true, true, true, true, + false, true, false, true, false, true, false, true]); + + expect([...new BitIterator(byte, 2, 11, null, getBool)]).toEqual( + [false, false, true, true, true, true, + false, true, false, true, false]); + }); +}); diff --git a/src/arrow/js/test/unit/builders/builder-tests.ts b/src/arrow/js/test/unit/builders/builder-tests.ts new file mode 100644 index 000000000..b6fa60271 --- /dev/null +++ b/src/arrow/js/test/unit/builders/builder-tests.ts @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; +import { from, fromDOMStream, toArray } from 'ix/asynciterable'; +import { fromNodeStream } from 'ix/asynciterable/fromnodestream'; +import { validateVector } from './utils'; +import * as generate from '../../generate-test-data'; +import { Type, DataType, Chunked, util, Builder, UnionVector } from 'apache-arrow'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +describe('Generated Test Data', () => { + describe('NullBuilder', () => { validateBuilder(generate.null_); }); + describe('BoolBuilder', () => { validateBuilder(generate.bool); }); + describe('Int8Builder', () => { validateBuilder(generate.int8); }); + describe('Int16Builder', () => { validateBuilder(generate.int16); }); + describe('Int32Builder', () => { validateBuilder(generate.int32); }); + describe('Int64Builder', () => { validateBuilder(generate.int64); }); + describe('Uint8Builder', () => { validateBuilder(generate.uint8); }); + describe('Uint16Builder', () => { validateBuilder(generate.uint16); }); + describe('Uint32Builder', () => { validateBuilder(generate.uint32); }); + describe('Uint64Builder', () => { validateBuilder(generate.uint64); }); + describe('Float16Builder', () => { validateBuilder(generate.float16); }); + describe('Float32Builder', () => { validateBuilder(generate.float32); }); + describe('Float64Builder', () => { validateBuilder(generate.float64); }); + describe('Utf8Builder', () => { validateBuilder(generate.utf8); }); + describe('BinaryBuilder', () => { validateBuilder(generate.binary); }); + describe('FixedSizeBinaryBuilder', () => { validateBuilder(generate.fixedSizeBinary); }); + describe('DateDayBuilder', () => { validateBuilder(generate.dateDay); }); + describe('DateMillisecondBuilder', () => { validateBuilder(generate.dateMillisecond); }); + describe('TimestampSecondBuilder', () => { validateBuilder(generate.timestampSecond); }); + describe('TimestampMillisecondBuilder', () => { validateBuilder(generate.timestampMillisecond); }); + describe('TimestampMicrosecondBuilder', () => { validateBuilder(generate.timestampMicrosecond); }); + describe('TimestampNanosecondBuilder', () => { validateBuilder(generate.timestampNanosecond); }); + describe('TimeSecondBuilder', () => { validateBuilder(generate.timeSecond); }); + describe('TimeMillisecondBuilder', () => { validateBuilder(generate.timeMillisecond); }); + describe('TimeMicrosecondBuilder', () => { validateBuilder(generate.timeMicrosecond); }); + describe('TimeNanosecondBuilder', () => { validateBuilder(generate.timeNanosecond); }); + describe('DecimalBuilder', () => { validateBuilder(generate.decimal); }); + describe('ListBuilder', () => { validateBuilder(generate.list); }); + describe('StructBuilder', () => { validateBuilder(generate.struct); }); + describe('DenseUnionBuilder', () => { validateBuilder(generate.denseUnion); }); + describe('SparseUnionBuilder', () => { validateBuilder(generate.sparseUnion); }); + describe('DictionaryBuilder', () => { validateBuilder(generate.dictionary); }); + describe('IntervalDayTimeBuilder', () => { validateBuilder(generate.intervalDayTime); }); + describe('IntervalYearMonthBuilder', () => { validateBuilder(generate.intervalYearMonth); }); + describe('FixedSizeListBuilder', () => { validateBuilder(generate.fixedSizeList); }); + describe('MapBuilder', () => { validateBuilder(generate.map); }); +}); + +function validateBuilder(generate: (length?: number, nullCount?: number, ...args: any[]) => generate.GeneratedVector) { + + const type = generate(0, 0).vector.type; + + for (let i = -1; ++i < 1;) { + validateBuilderWithNullValues(`no nulls`, [], generate(100, 0)); + validateBuilderWithNullValues(`with nulls`, [null], generate(100)); + if (DataType.isUtf8(type)) { + validateBuilderWithNullValues(`with \\0`, ['\0'], generate(100)); + validateBuilderWithNullValues(`with n/a`, ['n/a'], generate(100)); + } else if (DataType.isFloat(type)) { + validateBuilderWithNullValues(`with NaNs`, [NaN], generate(100)); + } else if (DataType.isInt(type)) { + validateBuilderWithNullValues(`with MAX_INT`, [ + type.bitWidth < 64 ? 0x7fffffff : + new Uint32Array([0x7fffffff, 0x7fffffff])], generate(100)); + } + } +} + +const countQueueingStrategy = { highWaterMark: 10 }; +const byteLengthQueueingStrategy = { highWaterMark: 64 }; + +const iterableBuilderOptions = <T extends DataType = any>({ vector }: generate.GeneratedVector, { type, ...opts }: BuilderOptions<T>) => ({ + ...opts, type, + valueToChildTypeId: !DataType.isUnion(type) ? undefined : (() => { + let { typeIds } = vector as UnionVector; + let lastChunkLength = 0, chunksLength = 0; + return (builder: Builder<T>, _value: any, index: number) => { + if (index === 0) { + chunksLength += lastChunkLength; + } + lastChunkLength = builder.length + 1; + return typeIds[chunksLength + index]; + }; + })() +}); + +const domStreamBuilderOptions = <T extends DataType = any>({ vector }: generate.GeneratedVector, { type, queueingStrategy, ...opts }: Partial<BuilderTransformOptions<T>>) => ({ + ...opts, type, + valueToChildTypeId: !DataType.isUnion(type) ? undefined : (() => { + let { typeIds } = vector as UnionVector; + let lastChunkLength = 0, chunksLength = 0; + return (builder: Builder<T>, _value: any, index: number) => { + if (index === 0) { + chunksLength += lastChunkLength; + } + lastChunkLength = builder.length + 1; + return typeIds[chunksLength + index]; + }; + })(), + queueingStrategy, + readableStrategy: queueingStrategy === 'bytes' ? byteLengthQueueingStrategy : countQueueingStrategy, + writableStrategy: queueingStrategy === 'bytes' ? byteLengthQueueingStrategy : countQueueingStrategy, +}); + +const nodeStreamBuilderOptions = <T extends DataType = any>({ vector }: generate.GeneratedVector, { type, queueingStrategy, ...opts }: Partial<BuilderDuplexOptions<T>>) => ({ + ...opts, type, + valueToChildTypeId: !DataType.isUnion(type) ? undefined : (() => { + let { typeIds } = vector as UnionVector; + let lastChunkLength = 0, chunksLength = 0; + return (builder: Builder<T>, _value: any, index: number) => { + if (index === 0) { + chunksLength += lastChunkLength; + } + lastChunkLength = builder.length + 1; + return typeIds[chunksLength + index]; + }; + })(), + queueingStrategy, + highWaterMark: queueingStrategy === 'bytes' ? 64 : 10 +}); + +function validateBuilderWithNullValues(suiteName: string, nullValues: any[], generated: generate.GeneratedVector) { + + const type = generated.vector.type; + const referenceNullValues = nullValues.slice(); + const originalValues = generated.values().slice(); + const typeName = Type[type.typeId].toLowerCase(); + + let values: any[]; + const opts: any = { type, nullValues }; + + if (DataType.isNull(type) || (nullValues.length === 1 && nullValues[0] === null)) { + values = originalValues.slice(); + } else if (nullValues.length > 0) { + values = fillNA(originalValues, nullValues); + } else { + values = fillNADefault(originalValues, [originalValues.find((x) => x !== null)]); + } + + if (DataType.isInt(type) && type.bitWidth === 64 && ArrayBuffer.isView(nullValues[0])) { + referenceNullValues[0] = util.BN.new<any>(nullValues[0])[Symbol.toPrimitive]('default'); + } + + describe(suiteName, () => { + it(`encodes ${typeName} single`, async () => { + const opts_ = iterableBuilderOptions(generated, { ...opts }); + const vector = await encodeSingle(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + it(`encodes ${typeName} chunks by count`, async () => { + const highWaterMark = Math.max(5, (Math.random() * values.length - 5) | 0); + const opts_ = iterableBuilderOptions(generated, { ...opts, highWaterMark, queueingStrategy: 'count' }); + const vector = await encodeChunks(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + it(`encodes ${typeName} chunks by bytes`, async () => { + const highWaterMark = 64; + const opts_ = iterableBuilderOptions(generated, { ...opts, highWaterMark, queueingStrategy: 'bytes' }); + const vector = await encodeChunks(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + if (testDOMStreams) { + it(`encodes ${typeName} chunks from a DOM stream by count`, async () => { + const opts_ = domStreamBuilderOptions(generated, { ...opts, queueingStrategy: 'count' }); + const vector = await encodeChunksDOM(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + it(`encodes ${typeName} chunks from a DOM stream by bytes`, async () => { + const opts_ = domStreamBuilderOptions(generated, { ...opts, queueingStrategy: 'bytes' }); + const vector = await encodeChunksDOM(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + } + if (testNodeStreams) { + it(`encodes ${typeName} chunks from a Node stream by count`, async () => { + const opts_ = nodeStreamBuilderOptions(generated, { ...opts, queueingStrategy: 'count' }); + const vector = await encodeChunksNode(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + it(`encodes ${typeName} chunks from a Node stream by bytes`, async () => { + const opts_ = nodeStreamBuilderOptions(generated, { ...opts, queueingStrategy: 'bytes' }); + const vector = await encodeChunksNode(values.slice(), opts_); + validateVector(values, vector, referenceNullValues); + }); + } + }); +} + +function fillNA(values: any[], nulls: any[]): any[] { + const n = nulls.length - 1; + return values.map((x) => { + if (x === null) { + return nulls[Math.round(n * Math.random())]; + } + return x; + }); +} + +function fillNADefault(values: any[], nulls: any[]): any[] { + const n = nulls.length - 1; + return values.map((x) => { + if (x === null) { + return nulls[Math.round(n * Math.random())]; + } else if (Array.isArray(x) && x.length > 0) { + let defaultValue = x.find((y) => y !== null); + if (defaultValue === undefined) { defaultValue = 0; } + return fillNADefault(x, [defaultValue]); + } + return x; + }); +} + +type BuilderOptions<T extends DataType = any, TNull = any> = import('apache-arrow/builder').BuilderOptions<T, TNull>; +type BuilderDuplexOptions<T extends DataType = any, TNull = any> = import('apache-arrow/io/node/builder').BuilderDuplexOptions<T, TNull>; +type BuilderTransformOptions<T extends DataType = any, TNull = any> = import('apache-arrow/io/whatwg/builder').BuilderTransformOptions<T, TNull>; + +async function encodeSingle<T extends DataType, TNull = any>(values: (T['TValue'] | TNull)[], options: BuilderOptions<T, TNull>) { + const builder = Builder.new(options); + values.forEach((x) => builder.append(x)); + return builder.finish().toVector(); +} + +async function encodeChunks<T extends DataType, TNull = any>(values: (T['TValue'] | TNull)[], options: BuilderOptions<T, TNull>) { + return Chunked.concat(...Builder.throughIterable(options)(values)); +} + +async function encodeChunksDOM<T extends DataType, TNull = any>(values: (T['TValue'] | TNull)[], options: BuilderTransformOptions<T, TNull>) { + + const stream = from(values).toDOMStream() + .pipeThrough(Builder.throughDOM(options)); + + const chunks = await fromDOMStream(stream).pipe(toArray); + + return Chunked.concat(...chunks); +} + +async function encodeChunksNode<T extends DataType, TNull = any>(values: (T['TValue'] | TNull)[], options: BuilderDuplexOptions<T, TNull>) { + + if (options.nullValues) { + options.nullValues = [...options.nullValues, undefined] as TNull[]; + } + + const stream = from(fillNA(values, [undefined])) + .toNodeStream({ objectMode: true }) + .pipe(Builder.throughNode(options)); + + const chunks: any[] = await fromNodeStream(stream, options.highWaterMark).pipe(toArray); + + return Chunked.concat(...chunks); +} diff --git a/src/arrow/js/test/unit/builders/date-tests.ts b/src/arrow/js/test/unit/builders/date-tests.ts new file mode 100644 index 000000000..5a9cc092b --- /dev/null +++ b/src/arrow/js/test/unit/builders/date-tests.ts @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { validateVector } from './utils'; +import { Vector, DateDay, DateMillisecond } from 'apache-arrow'; +import { + encodeAll, + encodeEach, + encodeEachDOM, + encodeEachNode, + date32sNoNulls, + date64sNoNulls, + date32sWithNulls, + date64sWithNulls +} from './utils'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +describe('DateDayBuilder', () => { + runTestsWithEncoder('encodeAll', encodeAll(() => new DateDay())); + runTestsWithEncoder('encodeEach: 5', encodeEach(() => new DateDay(), 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(() => new DateDay(), 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new DateDay())); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new DateDay(), 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new DateDay(), 25)); + + function runTestsWithEncoder(name: string, encode: (vals: (Date | null)[], nullVals?: any[]) => Promise<Vector<DateDay>>) { + describe(`${encode.name} ${name}`, () => { + it(`encodes dates no nulls`, async () => { + const vals = date32sNoNulls(20); + validateVector(vals, await encode(vals, []), []); + }); + it(`encodes dates with nulls`, async () => { + const vals = date32sWithNulls(20); + validateVector(vals, await encode(vals, [null]), [null]); + }); + }); + } +}); + +describe('DateMillisecondBuilder', () => { + runTestsWithEncoder('encodeAll', encodeAll(() => new DateMillisecond())); + runTestsWithEncoder('encodeEach: 5', encodeEach(() => new DateMillisecond(), 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(() => new DateMillisecond(), 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new DateMillisecond())); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new DateMillisecond(), 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new DateMillisecond(), 25)); + + function runTestsWithEncoder(name: string, encode: (vals: (Date | null)[], nullVals?: any[]) => Promise<Vector<DateMillisecond>>) { + describe(`${encode.name} ${name}`, () => { + it(`encodes dates no nulls`, async () => { + const vals = date64sNoNulls(20); + validateVector(vals, await encode(vals, []), []); + }); + it(`encodes dates with nulls`, async () => { + const vals = date64sWithNulls(20); + validateVector(vals, await encode(vals, [null]), [null]); + }); + }); + } +}); + +describe('DateMillisecondBuilder with nulls', () => { + const encode = encodeAll(() => new DateMillisecond()); + const dates = [ + null, + '2019-03-19T13:40:14.746Z', + '2019-03-06T21:12:50.912Z', + '2019-03-22T12:50:56.854Z', + '2019-02-25T03:34:30.916Z', + null, + null, + null, + null, + null, + null, + '2019-03-18T18:12:37.293Z', + '2019-03-26T21:58:35.307Z', + '2019-04-02T03:03:46.464Z', + '2019-03-24T18:45:25.763Z', + null, + '2019-03-19T01:10:59.189Z', + '2019-03-10T21:15:32.237Z', + '2019-03-21T07:25:34.864Z', + null + ].map((x) => x === null ? x : new Date(x)); + it(`encodes dates with nulls`, async () => { + const vals = dates.slice(); + validateVector(vals, await encode(vals, [null]), [null]); + }); +}); diff --git a/src/arrow/js/test/unit/builders/dictionary-tests.ts b/src/arrow/js/test/unit/builders/dictionary-tests.ts new file mode 100644 index 000000000..19b3603bc --- /dev/null +++ b/src/arrow/js/test/unit/builders/dictionary-tests.ts @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { validateVector } from './utils'; +import { Dictionary, Utf8, Int32, Vector } from 'apache-arrow'; +import { + encodeAll, + encodeEach, + encodeEachDOM, + encodeEachNode, + duplicateItems, + stringsNoNulls, + stringsWithNAs, + stringsWithNulls, + stringsWithEmpties +} from './utils'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +describe('DictionaryBuilder', () => { + describe('<Utf8, Int32>', () => { + runTestsWithEncoder('encodeAll', encodeAll(() => new Dictionary(new Utf8(), new Int32()))); + runTestsWithEncoder('encodeEach: 5', encodeEach(() => new Dictionary(new Utf8(), new Int32()), 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(() => new Dictionary(new Utf8(), new Int32()), 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new Dictionary(new Utf8(), new Int32()), void 0)); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new Dictionary(new Utf8(), new Int32()), 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new Dictionary(new Utf8(), new Int32()), 25)); + }); +}); + +function runTestsWithEncoder(name: string, encode: (vals: (string | null)[], nullVals?: any[]) => Promise<Vector<Dictionary<Utf8, Int32>>>) { + describe(`${encode.name} ${name}`, () => { + it(`dictionary-encodes strings no nulls`, async () => { + const vals = duplicateItems(20, stringsNoNulls(10)); + validateVector(vals, await encode(vals, []), []); + }); + it(`dictionary-encodes strings with nulls`, async () => { + const vals = duplicateItems(20, stringsWithNulls(10)); + validateVector(vals, await encode(vals, [null]), [null]); + }); + it(`dictionary-encodes strings using n/a as the null value rep`, async () => { + const vals = duplicateItems(20, stringsWithNAs(10)); + validateVector(vals, await encode(vals, ['n/a']), ['n/a']); + }); + it(`dictionary-encodes strings using \\0 as the null value rep`, async () => { + const vals = duplicateItems(20, stringsWithEmpties(10)); + validateVector(vals, await encode(vals, ['\0']), ['\0']); + }); + }); +} diff --git a/src/arrow/js/test/unit/builders/int64-tests.ts b/src/arrow/js/test/unit/builders/int64-tests.ts new file mode 100644 index 000000000..876ce7030 --- /dev/null +++ b/src/arrow/js/test/unit/builders/int64-tests.ts @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { util, Vector, DataType, Int64 } from 'apache-arrow'; +import { + validateVector, + encodeAll, encodeEach, encodeEachDOM, encodeEachNode, + int64sNoNulls, int64sWithNulls, int64sWithMaxInts, +} from './utils'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +const typeFactory = () => new Int64(); +const valueName = `Int64`.toLowerCase(); +const encode0 = encodeAll(typeFactory); +const encode1 = encodeEach(typeFactory); +const encode2 = encodeEach(typeFactory, 5); +const encode3 = encodeEach(typeFactory, 25); +const encode4 = encodeEachDOM(typeFactory, 25); +const encode5 = encodeEachNode(typeFactory, 25); + +const nulls0: any[] = [0x7fffffff]; +const nulls1: any[] = [0x7fffffff]; +nulls0[0] = new Uint32Array([0x7fffffff, 0x7fffffff]); +nulls1[0] = util.BN.new(nulls0[0])[Symbol.toPrimitive](); + +type EncodeValues<T extends DataType> = (values: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>; + +function encodeAndValidate<T extends DataType>(encode: EncodeValues<T>, providedNulls: any[] = [], expectedNulls = providedNulls) { + return (values: any[]) => { + return async () => { + const vector = await encode(values, providedNulls); + const expected = values.map((x) => { + switch (typeof x) { + case 'number': return new Int32Array([x, 0]); + case 'bigint': return new Int32Array(new BigInt64Array([x]).buffer); + } + return x ? x.slice() : x; + }); + return validateVector(expected, vector, expectedNulls); + }; + }; +} + +describe(`Int64Builder`, () => { + describe(`encode single chunk`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode0, [], [])(int64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode0, [null], [null])(int64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode0, nulls0, nulls1)(int64sWithMaxInts(20))); + }); + describe(`encode chunks length default`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode1, [], [])(int64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode1, [null], [null])(int64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode1, nulls0, nulls1)(int64sWithMaxInts(20))); + }); + describe(`encode chunks length 5`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode2, [], [])(int64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode2, [null], [null])(int64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode2, nulls0, nulls1)(int64sWithMaxInts(20))); + }); + describe(`encode chunks length 25`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode3, [], [])(int64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode3, [null], [null])(int64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode3, nulls0, nulls1)(int64sWithMaxInts(20))); + }); + testDOMStreams && describe(`encode chunks length 25, WhatWG stream`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode4, [], [])(int64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode4, [null], [null])(int64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode4, nulls0, nulls1)(int64sWithMaxInts(20))); + }); + testNodeStreams && describe(`encode chunks length 25, NodeJS stream`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode5, [], [])(int64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode5, [null], [null])(int64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode5, nulls0, nulls1)(int64sWithMaxInts(20))); + }); +}); diff --git a/src/arrow/js/test/unit/builders/primitive-tests.ts b/src/arrow/js/test/unit/builders/primitive-tests.ts new file mode 100644 index 000000000..3fd515bf4 --- /dev/null +++ b/src/arrow/js/test/unit/builders/primitive-tests.ts @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + Vector, DataType, + Bool, Int8, Int16, Int32, Uint8, Uint16, Uint32, Float16, Float32, Float64 +} from 'apache-arrow'; + +import { + validateVector, + encodeAll, encodeEach, encodeEachDOM, encodeEachNode, + boolsNoNulls, boolsWithNulls, + int8sNoNulls, int8sWithNulls, int8sWithMaxInts, + int16sNoNulls, int16sWithNulls, int16sWithMaxInts, + int32sNoNulls, int32sWithNulls, int32sWithMaxInts, + uint8sNoNulls, uint8sWithNulls, uint8sWithMaxInts, + uint16sNoNulls, uint16sWithNulls, uint16sWithMaxInts, + uint32sNoNulls, uint32sWithNulls, uint32sWithMaxInts, + float16sNoNulls, float16sWithNulls, float16sWithNaNs, + float32sNoNulls, float32sWithNulls, float64sWithNaNs, + float64sNoNulls, float64sWithNulls, float32sWithNaNs, +} from './utils'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +describe('BoolBuilder', () => { + + runTestsWithEncoder('encodeAll: 5', encodeAll(() => new Bool())); + runTestsWithEncoder('encodeEach: 5', encodeEach(() => new Bool(), 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(() => new Bool(), 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new Bool())); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new Bool(), 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new Bool(), 25)); + + function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>) { + describe(`${encode.name} ${name}`, () => { + it(`encodes bools no nulls`, async () => { + const vals = boolsNoNulls(20); + validateVector(vals, await encode(vals, []), []); + }); + it(`encodes bools with nulls`, async () => { + const vals = boolsWithNulls(20); + validateVector(vals, await encode(vals, [null]), [null]); + }); + }); + } +}); + +type PrimitiveTypeOpts<T extends DataType> = [ + new (...args: any[]) => T, + (count: number) => (T['TValue'] | null)[], + (count: number) => (T['TValue'] | null)[], + (count: number) => (T['TValue'] | null)[] +]; + +[ + [Int8, int8sNoNulls, int8sWithNulls, int8sWithMaxInts] as PrimitiveTypeOpts<Int8>, + [Int16, int16sNoNulls, int16sWithNulls, int16sWithMaxInts] as PrimitiveTypeOpts<Int16>, + [Int32, int32sNoNulls, int32sWithNulls, int32sWithMaxInts] as PrimitiveTypeOpts<Int32>, + [Uint8, uint8sNoNulls, uint8sWithNulls, uint8sWithMaxInts] as PrimitiveTypeOpts<Uint8>, + [Uint16, uint16sNoNulls, uint16sWithNulls, uint16sWithMaxInts] as PrimitiveTypeOpts<Uint16>, + [Uint32, uint32sNoNulls, uint32sWithNulls, uint32sWithMaxInts] as PrimitiveTypeOpts<Uint32>, +].forEach(([TypeCtor, noNulls, withNulls, withNaNs]) => { + + describe(`${TypeCtor.name}Builder`, () => { + + const typeFactory = () => new TypeCtor(); + const valueName = TypeCtor.name.toLowerCase(); + + runTestsWithEncoder('encodeAll', encodeAll(typeFactory)); + runTestsWithEncoder('encodeEach: 5', encodeEach(typeFactory, 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(typeFactory, 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(typeFactory)); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(typeFactory, 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(typeFactory, 25)); + + function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>) { + describe(`${name}`, () => { + it(`encodes ${valueName} no nulls`, async () => { + const vals = noNulls(20); + validateVector(vals, await encode(vals, []), []); + }); + it(`encodes ${valueName} with nulls`, async () => { + const vals = withNulls(20); + validateVector(vals, await encode(vals, [null]), [null]); + }); + it(`encodes ${valueName} with MAX_INT`, async () => { + const vals = withNaNs(20); + validateVector(vals, await encode(vals, [0x7fffffff]), [0x7fffffff]); + }); + }); + } + }); +}); + +[ + [Float16, float16sNoNulls, float16sWithNulls, float16sWithNaNs] as PrimitiveTypeOpts<Float16>, + [Float32, float32sNoNulls, float32sWithNulls, float32sWithNaNs] as PrimitiveTypeOpts<Float32>, + [Float64, float64sNoNulls, float64sWithNulls, float64sWithNaNs] as PrimitiveTypeOpts<Float64>, +].forEach(([TypeCtor, noNulls, withNulls, withNaNs]) => { + + describe(`${TypeCtor.name}Builder`, () => { + + const typeFactory = () => new TypeCtor(); + const valueName = TypeCtor.name.toLowerCase(); + + runTestsWithEncoder('encodeAll', encodeAll(typeFactory)); + runTestsWithEncoder('encodeEach: 5', encodeEach(typeFactory, 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(typeFactory, 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(typeFactory)); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(typeFactory, 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(typeFactory, 25)); + + function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>) { + describe(`${name}`, () => { + it(`encodes ${valueName} no nulls`, async () => { + const vals = noNulls(20); + validateVector(vals, await encode(vals, []), []); + }); + it(`encodes ${valueName} with nulls`, async () => { + const vals = withNulls(20); + validateVector(vals, await encode(vals, [null]), [null]); + }); + it(`encodes ${valueName} with NaNs`, async () => { + const vals = withNaNs(20); + validateVector(vals, await encode(vals, [NaN]), [NaN]); + }); + }); + } + }); +}); + +describe('Float16Builder', () => { + const encode = encodeAll(() => new Float16()); + it(`encodes the weird values`, async () => { + const vals = [0, 5.960464477539063e-8, NaN, 65504, 2, -0]; + validateVector(vals, await encode(vals, []), []); + }); +}); diff --git a/src/arrow/js/test/unit/builders/uint64-tests.ts b/src/arrow/js/test/unit/builders/uint64-tests.ts new file mode 100644 index 000000000..e08e25b5c --- /dev/null +++ b/src/arrow/js/test/unit/builders/uint64-tests.ts @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { util, Vector, DataType, Uint64 } from 'apache-arrow'; +import { + validateVector, + encodeAll, encodeEach, encodeEachDOM, encodeEachNode, + uint64sNoNulls, uint64sWithNulls, uint64sWithMaxInts, +} from './utils'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +const typeFactory = () => new Uint64(); +const valueName = `Uint64`.toLowerCase(); +const encode0 = encodeAll(typeFactory); +const encode1 = encodeEach(typeFactory); +const encode2 = encodeEach(typeFactory, 5); +const encode3 = encodeEach(typeFactory, 25); +const encode4 = encodeEachDOM(typeFactory, 25); +const encode5 = encodeEachNode(typeFactory, 25); + +const nulls0: any[] = [0x7fffffff]; +const nulls1: any[] = [0x7fffffff]; +nulls0[0] = new Uint32Array([0x7fffffff, 0x7fffffff]); +nulls1[0] = util.BN.new(nulls0[0])[Symbol.toPrimitive](); + +type ValuesToVector<T extends DataType> = (values: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>; + +function encodeAndValidate<T extends DataType>(encode: ValuesToVector<T>, providedNulls: any[] = [], expectedNulls = providedNulls) { + return (values: any[]) => { + return async () => { + const vector = await encode(values, providedNulls); + const expected = values.map((x) => { + switch (typeof x) { + case 'number': return new Uint32Array([x, 0]); + case 'bigint': return new Uint32Array(new BigUint64Array([x]).buffer); + } + return x ? x.slice() : x; + }); + return validateVector(expected, vector, expectedNulls); + }; + }; +} + +describe(`Uint64Builder`, () => { + describe(`encode single chunk`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode0, [], [])(uint64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode0, [null], [null])(uint64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode0, nulls0, nulls1)(uint64sWithMaxInts(20))); + }); + describe(`encode chunks length default`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode1, [], [])(uint64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode1, [null], [null])(uint64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode1, nulls0, nulls1)(uint64sWithMaxInts(20))); + }); + describe(`encode chunks length 5`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode2, [], [])(uint64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode2, [null], [null])(uint64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode2, nulls0, nulls1)(uint64sWithMaxInts(20))); + }); + describe(`encode chunks length 25`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode3, [], [])(uint64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode3, [null], [null])(uint64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode3, nulls0, nulls1)(uint64sWithMaxInts(20))); + }); + testDOMStreams && describe(`encode chunks length 25, WhatWG stream`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode4, [], [])(uint64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode4, [null], [null])(uint64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode4, nulls0, nulls1)(uint64sWithMaxInts(20))); + }); + testNodeStreams && describe(`encode chunks length 25, NodeJS stream`, () => { + it(`encodes ${valueName} no nulls`, encodeAndValidate(encode5, [], [])(uint64sNoNulls(20))); + it(`encodes ${valueName} with nulls`, encodeAndValidate(encode5, [null], [null])(uint64sWithNulls(20))); + it(`encodes ${valueName} with MAX_INT`, encodeAndValidate(encode5, nulls0, nulls1)(uint64sWithMaxInts(20))); + }); +}); diff --git a/src/arrow/js/test/unit/builders/utf8-tests.ts b/src/arrow/js/test/unit/builders/utf8-tests.ts new file mode 100644 index 000000000..212879ab4 --- /dev/null +++ b/src/arrow/js/test/unit/builders/utf8-tests.ts @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { validateVector } from './utils'; +import { Vector, Utf8 } from 'apache-arrow'; +import { + encodeAll, + encodeEach, + encodeEachDOM, + encodeEachNode, + stringsNoNulls, + stringsWithNAs, + stringsWithNulls, + stringsWithEmpties +} from './utils'; + +const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true'; +const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true'; + +describe('Utf8Builder', () => { + runTestsWithEncoder('encodeAll', encodeAll(() => new Utf8())); + runTestsWithEncoder('encodeEach: 5', encodeEach(() => new Utf8(), 5)); + runTestsWithEncoder('encodeEach: 25', encodeEach(() => new Utf8(), 25)); + runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new Utf8(), void 0)); + testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new Utf8(), 25)); + testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new Utf8(), 25)); +}); + +function runTestsWithEncoder(name: string, encode: (vals: (string | null)[], nullVals?: any[]) => Promise<Vector<Utf8>>) { + describe(`${encode.name} ${name}`, () => { + it(`encodes strings no nulls`, async () => { + const vals = stringsNoNulls(20); + validateVector(vals, await encode(vals, []), []); + }); + it(`encodes strings with nulls`, async () => { + const vals = stringsWithNulls(20); + validateVector(vals, await encode(vals, [null]), [null]); + }); + it(`encodes strings using n/a as the null value rep`, async () => { + const vals = stringsWithNAs(20); + validateVector(vals, await encode(vals, ['n/a']), ['n/a']); + }); + it(`encodes strings using \\0 as the null value rep`, async () => { + const vals = stringsWithEmpties(20); + validateVector(vals, await encode(vals, ['\0']), ['\0']); + }); + }); +} diff --git a/src/arrow/js/test/unit/builders/utils.ts b/src/arrow/js/test/unit/builders/utils.ts new file mode 100644 index 000000000..9bd16fff3 --- /dev/null +++ b/src/arrow/js/test/unit/builders/utils.ts @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; +import { from, fromDOMStream, toArray } from 'ix/asynciterable'; +import { fromNodeStream } from 'ix/asynciterable/fromnodestream'; +import 'ix/Ix.node'; +import { util } from 'apache-arrow'; +import { Builder } from 'apache-arrow'; +import { DataType, Vector, Chunked } from 'apache-arrow'; +import randstr from 'randomatic'; + +const rand = Math.random.bind(Math); +const randnulls = <T, TNull = null>(values: T[], n: TNull = <any> null) => values.map((x) => Math.random() > 0.25 ? x : n) as (T | TNull)[]; + +export const randomBytes = (length: number) => fillRandom(Uint8Array, length); +export const randomString = ((opts) => (length: number) => + randstr('?', length, opts) +)({ chars: `abcdefghijklmnopqrstuvwxyz0123456789_` }); + +export const stringsNoNulls = (length = 20) => Array.from({ length }, (_) => randomString(1 + (Math.random() * 19 | 0))); +export const timestamp32sNoNulls = (length = 20, now = Date.now() / 86400000 | 0) => + Array.from({ length }, (_) => (now + (rand() * 10000 * (rand() > 0.5 ? -1 : 1)) | 0) * 86400000); + +export const timestamp64sNoNulls = (length = 20, now = Date.now()) => Array.from({ length }, (_) => { + const ms = now + (rand() * 31557600000 * (rand() > 0.5 ? -1 : 1) | 0); + return new Int32Array([(ms % 4294967296) | 0, (ms / 4294967296) | 0]); +}); + +export const timestamp32sWithNulls = (length = 20) => randnulls(timestamp32sNoNulls(length), null); +export const timestamp64sWithNulls = (length = 20) => randnulls(timestamp64sNoNulls(length), null); +export const timestamp32sWithMaxInts = (length = 20) => randnulls(timestamp32sNoNulls(length), 0x7fffffff); +export const timestamp64sWithMaxInts = (length = 20) => randnulls(timestamp64sNoNulls(length), new Int32Array([0x7fffffff, 0x7fffffff])); + +export const boolsNoNulls = (length = 20) => Array.from({ length }, () => rand() > 0.5); +export const date32sNoNulls = (length = 20) => timestamp32sNoNulls(length).map((x) => new Date(x)); +export const date64sNoNulls = (length = 20) => timestamp64sNoNulls(length).map((x) => new Date(4294967296 * x[1] + (x[0] >>> 0))); +export const int8sNoNulls = (length = 20) => Array.from(new Int8Array(randomBytes(length * Int8Array.BYTES_PER_ELEMENT).buffer)); +export const int16sNoNulls = (length = 20) => Array.from(new Int16Array(randomBytes(length * Int16Array.BYTES_PER_ELEMENT).buffer)); +export const int32sNoNulls = (length = 20) => Array.from(new Int32Array(randomBytes(length * Int32Array.BYTES_PER_ELEMENT).buffer)); +export const int64sNoNulls = (length = 20) => Array.from({ length }, (_, i) => { + const bn = util.BN.new(new Int32Array(randomBytes(2 * 4).buffer)); + // Evenly distribute the three types of arguments we support in the Int64 + // builder + switch (i % 3) { + // Int32Array (util.BN is-a Int32Array) + case 0: return bn; + // BigInt + case 1: return bn[Symbol.toPrimitive](); + // number + case 2: + default: return bn[0]; + } +}); + +export const uint8sNoNulls = (length = 20) => Array.from(new Uint8Array(randomBytes(length * Uint8Array.BYTES_PER_ELEMENT).buffer)); +export const uint16sNoNulls = (length = 20) => Array.from(new Uint16Array(randomBytes(length * Uint16Array.BYTES_PER_ELEMENT).buffer)); +export const uint32sNoNulls = (length = 20) => Array.from(new Uint32Array(randomBytes(length * Uint32Array.BYTES_PER_ELEMENT).buffer)); +export const uint64sNoNulls = (length = 20) => Array.from({ length }, (_, i) => { + const bn = util.BN.new(new Uint32Array(randomBytes(2 * 4).buffer)); + // Evenly distribute the three types of arguments we support in the Uint64 + // builder + switch (i % 3) { + // UInt32Array (util.BN is-a Uint32Array) + case 0: return bn; + // BigInt + case 1: return bn[Symbol.toPrimitive](); + // number + case 2: + default: return bn[0]; + } +}); +export const float16sNoNulls = (length = 20) => Array.from(new Uint16Array(randomBytes(length * Uint16Array.BYTES_PER_ELEMENT).buffer)).map(util.uint16ToFloat64); +export const float32sNoNulls = (length = 20) => Array.from(new Float32Array(randomBytes(length * Float32Array.BYTES_PER_ELEMENT).buffer)); +export const float64sNoNulls = (length = 20) => Array.from(new Float64Array(randomBytes(length * Float64Array.BYTES_PER_ELEMENT).buffer)); + +export const stringsWithNAs = (length = 20) => randnulls(stringsNoNulls(length), 'n/a'); +export const stringsWithNulls = (length = 20) => randnulls(stringsNoNulls(length), null); +export const stringsWithEmpties = (length = 20) => randnulls(stringsNoNulls(length), '\0'); + +export const boolsWithNulls = (length = 20) => randnulls(boolsNoNulls(length), null); +export const date32sWithNulls = (length = 20) => randnulls(date32sNoNulls(length), null); +export const date64sWithNulls = (length = 20) => randnulls(date64sNoNulls(length), null); +export const int8sWithNulls = (length = 20) => randnulls(int8sNoNulls(length), null); +export const int16sWithNulls = (length = 20) => randnulls(int16sNoNulls(length), null); +export const int32sWithNulls = (length = 20) => randnulls(int32sNoNulls(length), null); +export const int64sWithNulls = (length = 20) => randnulls(int64sNoNulls(length), null); +export const uint8sWithNulls = (length = 20) => randnulls(uint8sNoNulls(length), null); +export const uint16sWithNulls = (length = 20) => randnulls(uint16sNoNulls(length), null); +export const uint32sWithNulls = (length = 20) => randnulls(uint32sNoNulls(length), null); +export const uint64sWithNulls = (length = 20) => randnulls(uint64sNoNulls(length), null); +export const float16sWithNulls = (length = 20) => randnulls(float16sNoNulls(length), null); +export const float32sWithNulls = (length = 20) => randnulls(float32sNoNulls(length), null); +export const float64sWithNulls = (length = 20) => randnulls(float64sNoNulls(length), null); + +export const int8sWithMaxInts = (length = 20) => randnulls(int8sNoNulls(length), 0x7fffffff); +export const int16sWithMaxInts = (length = 20) => randnulls(int16sNoNulls(length), 0x7fffffff); +export const int32sWithMaxInts = (length = 20) => randnulls(int32sNoNulls(length), 0x7fffffff); +export const int64sWithMaxInts = (length = 20) => randnulls(int64sNoNulls(length), new Int32Array([0x7fffffff, 0x7fffffff])); +export const uint8sWithMaxInts = (length = 20) => randnulls(uint8sNoNulls(length), 0x7fffffff); +export const uint16sWithMaxInts = (length = 20) => randnulls(uint16sNoNulls(length), 0x7fffffff); +export const uint32sWithMaxInts = (length = 20) => randnulls(uint32sNoNulls(length), 0x7fffffff); +export const uint64sWithMaxInts = (length = 20) => randnulls(uint64sNoNulls(length), new Uint32Array([0x7fffffff, 0x7fffffff])); +export const float16sWithNaNs = (length = 20) => randnulls(float16sNoNulls(length), NaN); +export const float32sWithNaNs = (length = 20) => randnulls(float32sNoNulls(length), NaN); +export const float64sWithNaNs = (length = 20) => randnulls(float64sNoNulls(length), NaN); + +export const duplicateItems = (n: number, xs: (any | null)[]) => { + const out = new Array<string | null>(n); + for (let i = -1, k = xs.length; ++i < n;) { + out[i] = xs[Math.random() * k | 0]; + } + return out; +}; + +export function encodeAll<T extends DataType>(typeFactory: () => T) { + return async function encodeAll<TNull = any>(values: (T['TValue'] | TNull)[], nullValues?: TNull[]) { + const type = typeFactory(); + const builder = Builder.new({ type, nullValues }); + values.forEach(builder.append.bind(builder)); + return builder.finish().toVector(); + }; +} + +export function encodeEach<T extends DataType>(typeFactory: () => T, chunkLen?: number) { + return async function encodeEach<TNull = any>(vals: (T['TValue'] | TNull)[], nullValues?: TNull[]) { + const type = typeFactory(); + const opts = { type, nullValues, highWaterMark: chunkLen }; + const chunks = [...Builder.throughIterable(opts)(vals)]; + return Chunked.concat(...chunks) as Chunked<T>; + }; +} + +export function encodeEachDOM<T extends DataType>(typeFactory: () => T, chunkLen?: number) { + return async function encodeEachDOM<TNull = any>(vals: (T['TValue'] | TNull)[], nullValues?: TNull[]) { + const type = typeFactory(); + const strategy = { highWaterMark: chunkLen }; + const source = from(vals).toDOMStream(); + const builder = Builder.throughDOM({ type, nullValues, readableStrategy: strategy, writableStrategy: strategy }); + const chunks = await fromDOMStream(source.pipeThrough(builder)).pipe(toArray); + return Chunked.concat(...chunks) as Chunked<T>; + }; +} + +export function encodeEachNode<T extends DataType>(typeFactory: () => T, chunkLen?: number) { + return async function encodeEachNode<TNull = any>(vals: (T['TValue'] | TNull)[], nullValues?: TNull[]) { + const type = typeFactory(); + const vals_ = vals.map((x) => x === null ? undefined : x); + const source = from(vals_).toNodeStream({ objectMode: true }); + const nulls_ = nullValues ? nullValues.map((x) => x === null ? undefined : x) : nullValues; + const builder = Builder.throughNode({ type, nullValues: nulls_, highWaterMark: chunkLen }); + const chunks: any[] = await fromNodeStream(source.pipe(builder), chunkLen).pipe(toArray); + return Chunked.concat(...chunks) as Chunked<T>; + }; +} + +const isInt64Null = (nulls: Map<any, any>, x: any) => { + if (ArrayBuffer.isView(x)) { + const bn = util.BN.new<Int32Array>(x as Int32Array); + return nulls.has((<any> bn)[Symbol.toPrimitive]('default')); + } + return false; +}; + +export function validateVector<T extends DataType>(vals: (T['TValue'] | null)[], vec: Vector, nullVals: any[]) { + let i = 0, x: T['TValue'] | null, y: T['TValue'] | null; + const nulls = nullVals.reduce((m, x) => m.set(x, x), new Map()); + try { + for (x of vec) { + if (nulls.has(y = vals[i])) { + expect(x).toBeNull(); + } else if (isInt64Null(nulls, y)) { + expect(x).toBeNull(); + } else { + expect(x).toArrowCompare(y); + } + i++; + } + } catch (e) { + // Uncomment these two lines to catch and debug the value retrieval that failed + // debugger; + // vec.get(i); + throw new Error([ + `${(vec as any).VectorName}[${i}]: ${e?.stack || e}`, + `nulls: [${nullVals.join(', ')}]`, + `values: [${vals.join(', ')}]`, + ].join('\n')); + } +} + +function fillRandom<T extends TypedArrayConstructor>(ArrayType: T, length: number) { + const BPE = ArrayType.BYTES_PER_ELEMENT; + const array = new ArrayType(length); + const max = (2 ** (8 * BPE)) - 1; + for (let i = -1; ++i < length; array[i] = rand() * max * (rand() > 0.5 ? -1 : 1)) { } + return array as InstanceType<T>; +} + +type TypedArrayConstructor = + (typeof Int8Array) | + (typeof Int16Array) | + (typeof Int32Array) | + (typeof Uint8Array) | + (typeof Uint16Array) | + (typeof Uint32Array) | + (typeof Float32Array) | + (typeof Float64Array); diff --git a/src/arrow/js/test/unit/dataframe-tests.ts b/src/arrow/js/test/unit/dataframe-tests.ts new file mode 100644 index 000000000..9e87e372d --- /dev/null +++ b/src/arrow/js/test/unit/dataframe-tests.ts @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../jest-extensions'; +import { + predicate, DataFrame, RecordBatch +} from 'apache-arrow'; +import { test_data } from './table-tests'; +import { jest } from '@jest/globals'; + +const { col, lit, custom, and, or, And, Or } = predicate; + +const F32 = 0, I32 = 1, DICT = 2; + +describe(`DataFrame`, () => { + + for (let datum of test_data) { + describe(datum.name, () => { + + describe(`scan()`, () => { + test(`yields all values`, () => { + const df = new DataFrame(datum.table()); + let expected_idx = 0; + df.scan((idx, batch) => { + const columns = batch.schema.fields.map((_, i) => batch.getChildAt(i)!); + expect(columns.map((c) => c.get(idx))).toEqual(values[expected_idx++]); + }); + }); + test(`calls bind function with every batch`, () => { + const df = new DataFrame(datum.table()); + let bind = jest.fn(); + df.scan(() => { }, bind); + for (let batch of df.chunks) { + expect(bind).toHaveBeenCalledWith(batch); + } + }); + }); + describe(`scanReverse()`, () => { + test(`yields all values`, () => { + const df = new DataFrame(datum.table()); + let expected_idx = values.length; + df.scanReverse((idx, batch) => { + const columns = batch.schema.fields.map((_, i) => batch.getChildAt(i)!); + expect(columns.map((c) => c.get(idx))).toEqual(values[--expected_idx]); + }); + }); + test(`calls bind function with every batch`, () => { + const df = new DataFrame(datum.table()); + let bind = jest.fn(); + df.scanReverse(() => { }, bind); + for (let batch of df.chunks) { + expect(bind).toHaveBeenCalledWith(batch); + } + }); + }); + test(`count() returns the correct length`, () => { + const df = new DataFrame(datum.table()); + const values = datum.values(); + expect(df.count()).toEqual(values.length); + }); + test(`getColumnIndex`, () => { + const df = new DataFrame(datum.table()); + expect(df.getColumnIndex('i32')).toEqual(I32); + expect(df.getColumnIndex('f32')).toEqual(F32); + expect(df.getColumnIndex('dictionary')).toEqual(DICT); + }); + const df = new DataFrame(datum.table()); + const values = datum.values(); + let get_i32: (idx: number) => number, get_f32: (idx: number) => number; + const filter_tests = [ + { + name: `filter on f32 >= 0`, + filtered: df.filter(col('f32').ge(0)), + expected: values.filter((row) => row[F32] >= 0) + }, { + name: `filter on 0 <= f32`, + filtered: df.filter(lit(0).le(col('f32'))), + expected: values.filter((row) => 0 <= row[F32]) + }, { + name: `filter on i32 <= 0`, + filtered: df.filter(col('i32').le(0)), + expected: values.filter((row) => row[I32] <= 0) + }, { + name: `filter on 0 >= i32`, + filtered: df.filter(lit(0).ge(col('i32'))), + expected: values.filter((row) => 0 >= row[I32]) + }, { + name: `filter on f32 < 0`, + filtered: df.filter(col('f32').lt(0)), + expected: values.filter((row) => row[F32] < 0) + }, { + name: `filter on i32 > 1 (empty)`, + filtered: df.filter(col('i32').gt(0)), + expected: values.filter((row) => row[I32] > 0) + }, { + name: `filter on f32 <= -.25 || f3 >= .25`, + filtered: df.filter(col('f32').le(-.25).or(col('f32').ge(.25))), + expected: values.filter((row) => row[F32] <= -.25 || row[F32] >= .25) + }, { + name: `filter on !(f32 <= -.25 || f3 >= .25) (not)`, + filtered: df.filter(col('f32').le(-.25).or(col('f32').ge(.25)).not()), + expected: values.filter((row) => !(row[F32] <= -.25 || row[F32] >= .25)) + }, { + name: `filter method combines predicates (f32 >= 0 && i32 <= 0)`, + filtered: df.filter(col('i32').le(0)).filter(col('f32').ge(0)), + expected: values.filter((row) => row[I32] <= 0 && row[F32] >= 0) + }, { + name: `filter on dictionary == 'a'`, + filtered: df.filter(col('dictionary').eq('a')), + expected: values.filter((row) => row[DICT] === 'a') + }, { + name: `filter on 'a' == dictionary (commutativity)`, + filtered: df.filter(lit('a').eq(col('dictionary'))), + expected: values.filter((row) => row[DICT] === 'a') + }, { + name: `filter on dictionary != 'b'`, + filtered: df.filter(col('dictionary').ne('b')), + expected: values.filter((row) => row[DICT] !== 'b') + }, { + name: `filter on f32 >= i32`, + filtered: df.filter(col('f32').ge(col('i32'))), + expected: values.filter((row) => row[F32] >= row[I32]) + }, { + name: `filter on f32 <= i32`, + filtered: df.filter(col('f32').le(col('i32'))), + expected: values.filter((row) => row[F32] <= row[I32]) + }, { + name: `filter on f32*i32 > 0 (custom predicate)`, + filtered: df.filter(custom( + (idx: number) => (get_f32(idx) * get_i32(idx) > 0), + (batch: RecordBatch) => { + get_f32 = col('f32').bind(batch); + get_i32 = col('i32').bind(batch); + })), + expected: values.filter((row) => (row[F32] as number) * (row[I32] as number) > 0) + }, { + name: `filter out all records`, + filtered: df.filter(lit(1).eq(0)), + expected: [] + } + ]; + for (let this_test of filter_tests) { + const { name, filtered, expected } = this_test; + describe(name, () => { + test(`count() returns the correct length`, () => { + expect(filtered.count()).toEqual(expected.length); + }); + describe(`scan()`, () => { + test(`iterates over expected values`, () => { + let expected_idx = 0; + filtered.scan((idx, batch) => { + const columns = batch.schema.fields.map((_, i) => batch.getChildAt(i)!); + expect(columns.map((c) => c.get(idx))).toEqual(expected[expected_idx++]); + }); + }); + test(`calls bind function lazily`, () => { + let bind = jest.fn(); + filtered.scan(() => { }, bind); + if (expected.length) { + expect(bind).toHaveBeenCalled(); + } else { + expect(bind).not.toHaveBeenCalled(); + } + }); + }); + describe(`scanReverse()`, () => { + test(`iterates over expected values in reverse`, () => { + let expected_idx = expected.length; + filtered.scanReverse((idx, batch) => { + const columns = batch.schema.fields.map((_, i) => batch.getChildAt(i)!); + expect(columns.map((c) => c.get(idx))).toEqual(expected[--expected_idx]); + }); + }); + test(`calls bind function lazily`, () => { + let bind = jest.fn(); + filtered.scanReverse(() => { }, bind); + if (expected.length) { + expect(bind).toHaveBeenCalled(); + } else { + expect(bind).not.toHaveBeenCalled(); + } + }); + }); + }); + } + test(`countBy on dictionary returns the correct counts`, () => { + // Make sure countBy works both with and without the Col wrapper + // class + let expected: { [key: string]: number } = { 'a': 0, 'b': 0, 'c': 0 }; + for (let row of values) { + expected[row[DICT]] += 1; + } + + expect(df.countBy(col('dictionary')).toJSON()).toEqual(expected); + expect(df.countBy('dictionary').toJSON()).toEqual(expected); + }); + test(`countBy on dictionary with filter returns the correct counts`, () => { + let expected: { [key: string]: number } = { 'a': 0, 'b': 0, 'c': 0 }; + for (let row of values) { + if (row[I32] === 1) { expected[row[DICT]] += 1; } + } + + expect(df.filter(col('i32').eq(1)).countBy('dictionary').toJSON()).toEqual(expected); + }); + test(`countBy on non dictionary column throws error`, () => { + expect(() => { df.countBy('i32'); }).toThrow(); + expect(() => { df.filter(col('dict').eq('a')).countBy('i32'); }).toThrow(); + }); + test(`countBy on non-existent column throws error`, () => { + expect(() => { df.countBy('FAKE' as any); }).toThrow(); + }); + test(`table.select() basic tests`, () => { + let selected = df.select('f32', 'dictionary'); + expect(selected.schema.fields).toHaveLength(2); + expect(selected.schema.fields[0]).toEqual(df.schema.fields[0]); + expect(selected.schema.fields[1]).toEqual(df.schema.fields[2]); + + expect(selected).toHaveLength(values.length); + let idx = 0, expected_row; + for (let row of selected) { + expected_row = values[idx++]; + expect(row.f32).toEqual(expected_row[F32]); + expect(row.dictionary).toEqual(expected_row[DICT]); + } + }); + test(`table.filter(..).count() on always false predicates returns 0`, () => { + expect(df.filter(col('i32').ge(100)).count()).toEqual(0); + expect(df.filter(col('dictionary').eq('z')).count()).toEqual(0); + }); + describe(`lit-lit comparison`, () => { + test(`always-false count() returns 0`, () => { + expect(df.filter(lit('abc').eq('def')).count()).toEqual(0); + expect(df.filter(lit(0).ge(1)).count()).toEqual(0); + }); + test(`always-true count() returns length`, () => { + expect(df.filter(lit('abc').eq('abc')).count()).toEqual(df.length); + expect(df.filter(lit(-100).le(0)).count()).toEqual(df.length); + }); + }); + describe(`col-col comparison`, () => { + test(`always-false count() returns 0`, () => { + expect(df.filter(col('dictionary').eq(col('i32'))).count()).toEqual(0); + }); + test(`always-true count() returns length`, () => { + expect(df.filter(col('dictionary').eq(col('dictionary'))).count()).toEqual(df.length); + }); + }); + }); + } +}); + +describe(`Predicate`, () => { + const p1 = col('a').gt(100); + const p2 = col('a').lt(1000); + const p3 = col('b').eq('foo'); + const p4 = col('c').eq('bar'); + const expected = [p1, p2, p3, p4]; + test(`and flattens children`, () => { + expect(and(p1, p2, p3, p4).children).toEqual(expected); + expect(and(p1.and(p2), new And(p3, p4)).children).toEqual(expected); + expect(and(p1.and(p2, p3, p4)).children).toEqual(expected); + }); + test(`or flattens children`, () => { + expect(or(p1, p2, p3, p4).children).toEqual(expected); + expect(or(p1.or(p2), new Or(p3, p4)).children).toEqual(expected); + expect(or(p1.or(p2, p3, p4)).children).toEqual(expected); + }); +}); diff --git a/src/arrow/js/test/unit/generated-data-tests.ts b/src/arrow/js/test/unit/generated-data-tests.ts new file mode 100644 index 000000000..ab1276f76 --- /dev/null +++ b/src/arrow/js/test/unit/generated-data-tests.ts @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../jest-extensions'; +import * as generate from '../generate-test-data'; +import { validateTable, validateRecordBatch, validateVector } from './generated-data-validators'; + +describe('Generated Test Data', () => { + describe('Table', () => { validateTable(generate.table([100, 150, 75])).run(); }); + describe('RecordBatch', () => { validateRecordBatch(generate.recordBatch()).run(); }); + describe('NullVector', () => { validateVector(generate.null_()).run(); }); + describe('BoolVector', () => { validateVector(generate.bool()).run(); }); + describe('Int8Vector', () => { validateVector(generate.int8()).run(); }); + describe('Int16Vector', () => { validateVector(generate.int16()).run(); }); + describe('Int32Vector', () => { validateVector(generate.int32()).run(); }); + describe('Int64Vector', () => { validateVector(generate.int64()).run(); }); + describe('Uint8Vector', () => { validateVector(generate.uint8()).run(); }); + describe('Uint16Vector', () => { validateVector(generate.uint16()).run(); }); + describe('Uint32Vector', () => { validateVector(generate.uint32()).run(); }); + describe('Uint64Vector', () => { validateVector(generate.uint64()).run(); }); + describe('Float16Vector', () => { validateVector(generate.float16()).run(); }); + describe('Float32Vector', () => { validateVector(generate.float32()).run(); }); + describe('Float64Vector', () => { validateVector(generate.float64()).run(); }); + describe('Utf8Vector', () => { validateVector(generate.utf8()).run(); }); + describe('BinaryVector', () => { validateVector(generate.binary()).run(); }); + describe('FixedSizeBinaryVector', () => { validateVector(generate.fixedSizeBinary()).run(); }); + describe('DateDayVector', () => { validateVector(generate.dateDay()).run(); }); + describe('DateMillisecondVector', () => { validateVector(generate.dateMillisecond()).run(); }); + describe('TimestampSecondVector', () => { validateVector(generate.timestampSecond()).run(); }); + describe('TimestampMillisecondVector', () => { validateVector(generate.timestampMillisecond()).run(); }); + describe('TimestampMicrosecondVector', () => { validateVector(generate.timestampMicrosecond()).run(); }); + describe('TimestampNanosecondVector', () => { validateVector(generate.timestampNanosecond()).run(); }); + describe('TimeSecondVector', () => { validateVector(generate.timeSecond()).run(); }); + describe('TimeMillisecondVector', () => { validateVector(generate.timeMillisecond()).run(); }); + describe('TimeMicrosecondVector', () => { validateVector(generate.timeMicrosecond()).run(); }); + describe('TimeNanosecondVector', () => { validateVector(generate.timeNanosecond()).run(); }); + describe('DecimalVector', () => { validateVector(generate.decimal()).run(); }); + describe('ListVector', () => { validateVector(generate.list()).run(); }); + describe('StructVector', () => { validateVector(generate.struct()).run(); }); + describe('DenseUnionVector', () => { validateVector(generate.denseUnion()).run(); }); + describe('SparseUnionVector', () => { validateVector(generate.sparseUnion()).run(); }); + describe('DictionaryVector', () => { validateVector(generate.dictionary()).run(); }); + describe('IntervalDayTimeVector', () => { validateVector(generate.intervalDayTime()).run(); }); + describe('IntervalYearMonthVector', () => { validateVector(generate.intervalYearMonth()).run(); }); + describe('FixedSizeListVector', () => { validateVector(generate.fixedSizeList()).run(); }); + describe('MapVector', () => { validateVector(generate.map()).run(); }); +}); diff --git a/src/arrow/js/test/unit/generated-data-validators.ts b/src/arrow/js/test/unit/generated-data-validators.ts new file mode 100644 index 000000000..910386d4a --- /dev/null +++ b/src/arrow/js/test/unit/generated-data-validators.ts @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../jest-extensions'; +import { + GeneratedTable, + GeneratedRecordBatch, + GeneratedVector +} from '../generate-test-data'; + +import { util } from 'apache-arrow'; +const { createElementComparator: compare } = util; + +type DeferredTest = { description: string; tests?: DeferredTest[]; run: (...args: any[]) => any }; + +function deferTest(description: string, run: (...args: any[]) => any) { + return { description, run: () => test(description, run) } as DeferredTest; +} + +function deferDescribe(description: string, tests: DeferredTest | DeferredTest[]) { + const t = (Array.isArray(tests) ? tests : [tests]).filter(Boolean); + return { description, tests: t, run: () => describe(description, () => { t.forEach((x) => x.run()); } ) }; +} + +export function validateTable({ keys, rows, cols, rowBatches, colBatches, keyBatches, table }: GeneratedTable) { + return deferDescribe(`Table: ${table.schema}`, ([] as DeferredTest[]).concat( + validateVector({ values: rows, vector: table }), + table.chunks.map((recordBatch, i) => + deferDescribe(`recordBatch ${i}`, validateRecordBatch({ + keys: keyBatches[i], rows: rowBatches[i], cols: colBatches[i], recordBatch + })) + ), + table.schema.fields.map((field, i) => + deferDescribe(`column ${i}: ${field}`, validateVector({ + keys: keys()[i], + values: () => cols()[i], + vector: table.getColumnAt(i)! + })) + ) + )); +} + +export function validateRecordBatch({ rows, cols, keys, recordBatch }: GeneratedRecordBatch) { + return deferDescribe(`RecordBatch: ${recordBatch.schema}`, ([] as DeferredTest[]).concat( + validateVector({ values: rows, vector: recordBatch }), + recordBatch.schema.fields.map((field, i) => + deferDescribe(`Field: ${field}`, validateVector({ + keys: keys()[i], + values: () => cols()[i], + vector: recordBatch.getChildAt(i)! + })) + ) + )); +} + +export function validateVector({ values: createTestValues, vector, keys }: GeneratedVector, sliced = false) { + + const values = createTestValues(); + const suites = [ + deferDescribe(`Validate ${vector.type} (sliced=${sliced})`, [ + deferTest(`length is correct`, () => { + expect(vector).toHaveLength(values.length); + }), + deferTest(`gets expected values`, () => { + expect.hasAssertions(); + let i = -1, n = vector.length, actual, expected; + try { + while (++i < n) { + actual = vector.get(i); + expected = values[i]; + expect(actual).toArrowCompare(expected); + } + } catch (e) { throw new Error(`${vector}[${i}]: ${e}`); } + }), + (keys && keys.length > 0) && deferTest(`dictionary indices should match`, () => { + expect.hasAssertions(); + let indices = (vector as any).indices; + let i = -1, n = indices.length; + try { + while (++i < n) { + indices.isValid(i) + ? expect(indices.get(i)).toBe(keys[i]) + : expect(indices.get(i)).toBeNull(); + } + } catch (e) { throw new Error(`${indices}[${i}]: ${e}`); } + }) || null as any as DeferredTest, + deferTest(`sets expected values`, () => { + expect.hasAssertions(); + let i = -1, n = vector.length, actual, expected; + try { + while (++i < n) { + expected = vector.get(i); + vector.set(i, expected); + actual = vector.get(i); + expect(actual).toArrowCompare(expected); + } + } catch (e) { throw new Error(`${vector}[${i}]: ${e}`); } + }), + deferTest(`iterates expected values`, () => { + expect.hasAssertions(); + let i = -1, actual, expected; + try { + for (actual of vector) { + expected = values[++i]; + expect(actual).toArrowCompare(expected); + } + } catch (e) { throw new Error(`${vector}[${i}]: ${e}`); } + }), + deferTest(`indexOf returns expected values`, () => { + expect.hasAssertions(); + let i = -1, n = vector.length; + const shuffled = shuffle(values); + let value: any, actual, expected; + try { + while (++i < n) { + value = shuffled[i]; + actual = vector.indexOf(value); + expected = values.findIndex(compare(value)); + expect(actual).toBe(expected); + } + // I would be pretty surprised if randomatic ever generates these values + expect(vector.indexOf('purple elephants')).toBe(-1); + expect(vector.indexOf('whistling wombats')).toBe(-1); + expect(vector.indexOf('carnivorous novices')).toBe(-1); + } catch (e) { throw new Error(`${vector}[${i}]: ${e}`); } + }) + ]) + ] as DeferredTest[]; + + if (!sliced) { + const begin = (values.length * .25) | 0; + const end = (values.length * .75) | 0; + suites.push( + // test slice with no args + validateVector({ + vector: vector.slice(), + values: () => values.slice(), + keys: keys ? keys.slice() : undefined + }, true), + // test slicing half the array + validateVector({ + vector: vector.slice(begin, end), + values: () => values.slice(begin, end), + keys: keys ? keys.slice(begin, end) : undefined + }, true), + // test concat each end together + validateVector({ + vector: vector.slice(0, begin).concat(vector.slice(end)), + values: () => values.slice(0, begin).concat(values.slice(end)), + keys: keys ? [...keys.slice(0, begin), ...keys.slice(end)] : undefined + }, true) + ); + + return deferDescribe(`Vector`, suites); + } + + return suites[0]; +} + +function shuffle(input: any[]) { + const result = input.slice(); + let j, tmp, i = result.length; + while (--i > 0) { + j = (Math.random() * (i + 1)) | 0; + tmp = result[i]; + result[i] = result[j]; + result[j] = tmp; + } + return result; +} diff --git a/src/arrow/js/test/unit/int-tests.ts b/src/arrow/js/test/unit/int-tests.ts new file mode 100644 index 000000000..15c75e1a1 --- /dev/null +++ b/src/arrow/js/test/unit/int-tests.ts @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import * as Arrow from 'apache-arrow'; +const { Int64, Uint64, Int128 } = Arrow.util; + +describe(`Uint64`, () => { + test(`gets expected high/low bytes`, () => { + let i = new Uint64(new Uint32Array([5, 0])); + expect(i.high()).toEqual(0); + expect(i.low()).toEqual(5); + }); + test(`adds 32-bit numbers`, () => { + let a = new Uint64(new Uint32Array([5, 0])); + let b = new Uint64(new Uint32Array([9, 0])); + let expected = new Uint64(new Uint32Array([14, 0])); + expect(a.plus(b)).toEqual(expected); + }); + test(`addition overflows 32-bit numbers`, () => { + let a = new Uint64(new Uint32Array([0xffffffff, 0])); + let b = new Uint64(new Uint32Array([9, 0])); + let expected = new Uint64(new Uint32Array([8, 1])); + expect(a.plus(b)).toEqual(expected); + }); + test(`multiplies 32-bit numbers`, () => { + let a = new Uint64(new Uint32Array([5, 0])); + let b = new Uint64(new Uint32Array([9, 0])); + let expected = new Uint64(new Uint32Array([45, 0])); + expect(a.times(b)).toEqual(expected); + }); + test(`multiplication overflows 32-bit numbers`, () => { + let a = new Uint64(new Uint32Array([0x80000000, 0])); + let b = new Uint64(new Uint32Array([3, 0])); + let expected = new Uint64(new Uint32Array([0x80000000, 1])); + expect(a.times(b)).toEqual(expected); + }); + test(`multiplication is associative`, () => { + let a = new Uint64(new Uint32Array([0x80000000, 0])); + let b = new Uint64(new Uint32Array([3, 0])); + expect(Uint64.multiply(a, b)).toEqual(Uint64.multiply(b,a)); + }); + test(`lessThan works on 32-bit numbers`, () => { + let a = new Uint64(new Uint32Array([0x0000abcd, 0])); + let b = new Uint64(new Uint32Array([0x0000abcf, 0])); + expect(a.lessThan(b)).toBeTruthy(); + }); + test(`lessThan works on 64-bit numbers`, () => { + let a = new Uint64(new Uint32Array([123, 32])); + let b = new Uint64(new Uint32Array([568, 32])); + expect(a.lessThan(b)).toBeTruthy(); + }); + test(`fromString parses string`, () => { + expect(Uint64.fromString('6789123456789')).toEqual(new Int64(new Uint32Array([0xb74abf15, 0x62c]))); + }); + test(`fromString parses big (full unsigned 64-bit) string`, () => { + expect(Uint64.fromString('18364758544493064720')).toEqual(new Uint64(new Uint32Array([0x76543210, 0xfedcba98]))); + }); + test(`fromNumber converts 53-ish bit number`, () => { + expect(Uint64.fromNumber(8086463330923024)).toEqual(new Uint64(new Uint32Array([0x76543210, 0x001cba98]))); + }); +}); + +describe(`Int64`, () => { + test(`gets expected high/low bytes`, () => { + let i = new Int64(new Uint32Array([5, 0])); + expect(i.high()).toEqual(0); + expect(i.low()).toEqual(5); + }); + test(`adds 32-bit numbers`, () => { + let a = new Int64(new Uint32Array([5, 0])); + let b = new Int64(new Uint32Array([9, 0])); + let expected = new Int64(new Uint32Array([14, 0])); + expect(a.plus(b)).toEqual(expected); + }); + test(`adds negative 32-bit numbers`, () => { + let a = new Int64(new Uint32Array([56789 , 0])); + let b = new Int64(new Uint32Array([-66789, -1])); + let expected = new Int64(new Uint32Array([-10000, -1])); + expect(a.plus(b)).toEqual(expected); + }); + test(`addition overflows 32-bit numbers`, () => { + let a = new Int64(new Uint32Array([0xffffffff, 0])); + let b = new Int64(new Uint32Array([9, 0])); + let expected = new Int64(new Uint32Array([8, 1])); + expect(a.plus(b)).toEqual(expected); + }); + test(`multiplies 32-bit numbers`, () => { + let a = new Int64(new Uint32Array([5, 0])); + let b = new Int64(new Uint32Array([9, 0])); + let expected = new Int64(new Uint32Array([45, 0])); + expect(a.times(b)).toEqual(expected); + }); + test(`multiplication overflows 32-bit numbers`, () => { + let a = new Int64(new Uint32Array([0x80000000, 0])); + let b = new Int64(new Uint32Array([3, 0])); + let expected = new Int64(new Uint32Array([0x80000000, 1])); + expect(a.times(b)).toEqual(expected); + }); + test(`multiplication works on negative numbers`, () => { + let a = new Int64(new Uint32Array([-5, -1])); + let b = new Int64(new Uint32Array([-100, -1])); + expect(a.times(b)).toEqual(new Int64(new Uint32Array([ 500, 0]))); + expect(a.times(b)).toEqual(new Int64(new Uint32Array([ -50000, -1]))); + expect(a.times(b)).toEqual(new Int64(new Uint32Array([5000000, 0]))); + }); + test(`multiplication is associative`, () => { + let a = new Int64(new Uint32Array([0x80000000, 0])); + let b = new Int64(new Uint32Array([3, 0])); + expect(Int64.multiply(a, b)).toEqual(Int64.multiply(b,a)); + }); + test(`lessThan works on 32-bit numbers`, () => { + let a = new Int64(new Uint32Array([0x0000abcd, 0])); + let b = new Int64(new Uint32Array([0x0000abcf, 0])); + expect(a.lessThan(b)).toBeTruthy(); + }); + test(`lessThan works on 64-bit numbers`, () => { + let a = new Int64(new Uint32Array([123, 32])); + let b = new Int64(new Uint32Array([568, 32])); + expect(a.lessThan(b)).toBeTruthy(); + }); + test(`lessThan works on negative numbers`, () => { + let a = new Int64(new Uint32Array([0, -158])); + let b = new Int64(new Uint32Array([-3, -1])); + expect(a.lessThan(b)).toBeTruthy(); + }); + test(`lessThan works on mixed numbers`, () => { + let a = new Int64(new Uint32Array([-3, -1])); + let b = new Int64(new Uint32Array([ 0, 3])); + expect(a.lessThan(b)).toBeTruthy(); + }); + test(`negate works on 32-bit number`, () => { + expect (new Int64(new Uint32Array([123456, 0])).negate()).toEqual(new Int64(new Uint32Array([-123456, -1]))); + }); + test(`double negation is noop`, () => { + let test = new Int64(new Uint32Array([6789, 12345])); + let expected = new Int64(new Uint32Array([6789, 12345])); + expect(test.negate().negate()).toEqual(expected); + }); + test(`negate works on 64-bit number`, () => { + expect (new Int64(new Uint32Array([0xb74abf15, 0x62c])).negate()).toEqual(new Int64(new Uint32Array([0x48b540eb, 0xfffff9d3]))); + }); + test(`fromString parses string`, () => { + expect(Int64.fromString('6789123456789')).toEqual(new Int64(new Uint32Array([0xb74abf15, 0x62c]))); + }); + test(`fromString parses negative string`, () => { + expect(Int64.fromString('-6789123456789')).toEqual(new Int64(new Uint32Array([0x48b540eb, 0xfffff9d3]))); + }); + test(`fromNumber converts 53-ish bit number`, () => { + expect(Int64.fromNumber(8086463330923024)).toEqual(new Int64(new Uint32Array([0x76543210, 0x001cba98]))); + expect(Int64.fromNumber(-8086463330923024)).toEqual(new Int64(new Uint32Array([0x89abcdf0, 0xffe34567]))); + }); +}); + +describe(`Int128`, () => { + test(`gets expected bytes`, () => { + let i = new Int128(new Uint32Array([4, 3, 2, 1])); + expect(i.high().high()).toEqual(1); + expect(i.high().low() ).toEqual(2); + expect(i.low().high() ).toEqual(3); + expect(i.low().low() ).toEqual(4); + }); + test(`adds 32-bit numbers`, () => { + let a = new Int128(new Uint32Array([5, 0, 0, 0])); + let b = new Int128(new Uint32Array([9, 0, 0, 0])); + let expected = new Int128(new Uint32Array([14, 0, 0, 0])); + expect(a.plus(b)).toEqual(expected); + }); + test(`adds negative 32-bit numbers`, () => { + let a = new Int128(new Uint32Array([56789 , 0, 0, 0])); + let b = new Int128(new Uint32Array([-66789, -1, -1, -1])); + let expected = new Int128(new Uint32Array([-10000, -1, -1, -1])); + expect(a.plus(b)).toEqual(expected); + }); + test(`addition overflows 32-bit numbers`, () => { + let a = new Int128(new Uint32Array([0xffffffff, 0, 0, 0])); + let b = new Int128(new Uint32Array([9, 0, 0, 0])); + let expected = new Int128(new Uint32Array([8, 1, 0, 0])); + expect(a.plus(b)).toEqual(expected); + }); + test(`multiplies 32-bit numbers`, () => { + let a = new Int128(new Uint32Array([5, 0, 0, 0])); + let b = new Int128(new Uint32Array([9, 0, 0, 0])); + let expected = new Int128(new Uint32Array([45, 0, 0, 0])); + expect(a.times(b)).toEqual(expected); + }); + test(`multiplication overflows 32-bit numbers`, () => { + let a = new Int128(new Uint32Array([0x80000000, 0, 0, 0])); + let b = new Int128(new Uint32Array([3, 0, 0, 0])); + let expected = new Int128(new Uint32Array([0x80000000, 1, 0, 0])); + expect(a.times(b)).toEqual(expected); + }); + test(`multiplication works on negative numbers`, () => { + let a = new Int128(new Uint32Array([-5, -1, -1, -1])); + let b = new Int128(new Uint32Array([-100, -1, -1, -1])); + expect(a.times(b)).toEqual(new Int128(new Uint32Array([ 500, 0, 0, 0]))); + expect(a.times(b)).toEqual(new Int128(new Uint32Array([ -50000, -1, -1, -1]))); + expect(a.times(b)).toEqual(new Int128(new Uint32Array([5000000, 0, 0, 0]))); + }); + test(`multiplication is associative`, () => { + let a = new Int128(new Uint32Array([4, 3, 2, 1])); + let b = new Int128(new Uint32Array([3, 0, 0, 0])); + expect(Int128.multiply(a, b)).toEqual(Int128.multiply(b,a)); + }); + test(`multiplication can produce 128-bit number`, () => { + let a = new Int128(new Uint32Array([0, 0xf0000000, 0, 0])); + let b = new Int128(new Uint32Array([0, 0x10000000, 0, 0])); + expect(a.times(b)).toEqual(new Int128(new Uint32Array([0x00000000, 0x00000000, 0x00000000, 0xf000000]))); + }); + test(`fromString parses string`, () => { + expect(Int128.fromString('1002111867823618826746863804903129070')) + .toEqual(new Int64(new Uint32Array([0x00c0ffee, + 0x00c0ffee, + 0x00c0ffee, + 0x00c0ffee]))); + }); + test(`fromString parses negative string`, () => { + expect(Int128.fromString('-12345678901234567890123456789012345678')) + .toEqual(new Int64(new Uint32Array([0x21c70cb2, + 0x3bb66faf, + 0x0ffdccec, + 0xf6b64f09]))); + }); + test(`fromNumber converts 53-ish bit number`, () => { + expect(Int128.fromNumber(8086463330923024)).toEqual(new Int128(new Uint32Array([0x76543210, 0x001cba98, 0, 0]))); + expect(Int128.fromNumber(-8086463330923024)).toEqual(new Int128(new Uint32Array([0x89abcdf0, 0xffe34567, 0xffffffff, 0xffffffff]))); + }); +}); diff --git a/src/arrow/js/test/unit/ipc/helpers.ts b/src/arrow/js/test/unit/ipc/helpers.ts new file mode 100644 index 000000000..9fccefec9 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/helpers.ts @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; + +import { + Table, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import * as fs from 'fs'; +import { fs as memfs } from 'memfs'; +import { Readable, PassThrough } from 'stream'; +import randomatic from 'randomatic'; + +export abstract class ArrowIOTestHelper { + + constructor(public table: Table) {} + + public static file(table: Table) { return new ArrowFileIOTestHelper(table); } + public static json(table: Table) { return new ArrowJsonIOTestHelper(table); } + public static stream(table: Table) { return new ArrowStreamIOTestHelper(table); } + + protected abstract writer(table: Table): RecordBatchWriter; + protected async filepath(table: Table): Promise<fs.PathLike> { + const path = `/${randomatic('a0', 20)}.arrow`; + const data = await this.writer(table).toUint8Array(); + await memfs.promises.writeFile(path, data); + return path; + } + + buffer(testFn: (buffer: Uint8Array) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(await this.writer(this.table).toUint8Array()); + }; + } + iterable(testFn: (iterable: Generator<Uint8Array>) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(chunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + asyncIterable(testFn: (asyncIterable: AsyncGenerator<Uint8Array>) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + await testFn(asyncChunkedIterable(await this.writer(this.table).toUint8Array())); + }; + } + fsFileHandle(testFn: (handle: fs.promises.FileHandle) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(<any> await memfs.promises.open(path, 'r')); + await memfs.promises.unlink(path); + }; + } + fsReadableStream(testFn: (stream: fs.ReadStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(<any> memfs.createReadStream(path)); + await memfs.promises.unlink(path); + }; + } + nodeReadableStream(testFn: (stream: NodeJS.ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const sink = new PassThrough(); + sink.end(await this.writer(this.table).toUint8Array()); + await testFn(sink); + }; + } + whatwgReadableStream(testFn: (stream: ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path))); + await memfs.promises.unlink(path); + }; + } + whatwgReadableByteStream(testFn: (stream: ReadableStream) => void | Promise<void>) { + return async () => { + expect.hasAssertions(); + const path = await this.filepath(this.table); + await testFn(nodeToDOMStream(memfs.createReadStream(path), { type: 'bytes' })); + await memfs.promises.unlink(path); + }; + } +} + +class ArrowFileIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchFileWriter.writeAll(table); + } +} + +class ArrowJsonIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchJSONWriter.writeAll(table); + } +} + +class ArrowStreamIOTestHelper extends ArrowIOTestHelper { + constructor(table: Table) { super(table); } + protected writer(table: Table) { + return RecordBatchStreamWriter.writeAll(table); + } +} + +export function* chunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function* asyncChunkedIterable(buffer: Uint8Array) { + let offset = 0, size = 0; + while (offset < buffer.byteLength) { + size = yield buffer.subarray(offset, offset += + (isNaN(+size) ? buffer.byteLength - offset : size)); + } +} + +export async function concatBuffersAsync(iterator: AsyncIterable<Uint8Array> | ReadableStream) { + if (iterator instanceof ReadableStream) { + iterator = readableDOMStreamToAsyncIterator(iterator); + } + let chunks = [], total = 0; + for await (const chunk of iterator) { + chunks.push(chunk); + total += chunk.byteLength; + } + return chunks.reduce((x, buffer) => { + x.buffer.set(buffer, x.offset); + x.offset += buffer.byteLength; + return x; + }, { offset: 0, buffer: new Uint8Array(total) }).buffer; +} + +export async function* readableDOMStreamToAsyncIterator<T>(stream: ReadableStream<T>) { + // Get a lock on the stream + const reader = stream.getReader(); + try { + while (true) { + // Read from the stream + const { done, value } = await reader.read(); + // Exit if we're done + if (done) { break; } + // Else yield the chunk + yield value as T; + } + } finally { + try { stream.locked && reader.releaseLock(); } catch (e) {} + } +} + +export function nodeToDOMStream<T = any>(stream: NodeJS.ReadableStream, opts: any = {}) { + stream = new Readable((stream as any)._readableState).wrap(stream); + return new ReadableStream<T>({ + ...opts, + start(controller) { + stream.pause(); + stream.on('data', (chunk) => { + controller.enqueue(chunk); + stream.pause(); + }); + stream.on('end', () => controller.close()); + stream.on('error', e => controller.error(e)); + }, + pull() { stream.resume(); }, + cancel(reason) { + stream.pause(); + if (typeof (stream as any).cancel === 'function') { + return (stream as any).cancel(reason); + } else if (typeof (stream as any).destroy === 'function') { + return (stream as any).destroy(reason); + } + } + }); +} diff --git a/src/arrow/js/test/unit/ipc/message-reader-tests.ts b/src/arrow/js/test/unit/ipc/message-reader-tests.ts new file mode 100644 index 000000000..c48aa2ce1 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/message-reader-tests.ts @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// import * as fs from 'fs'; +import { + generateRandomTables, + // generateDictionaryTables +} from '../../data/tables'; + +import { ArrowIOTestHelper } from './helpers'; +import { MessageReader, AsyncMessageReader } from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + const numMessages = table.chunks.reduce((numMessages, batch) => { + return numMessages + + /* recordBatch message */ 1 + + /* dictionary messages */ batch.dictionaries.size; + }, /* schema message */ 1); + + const validate = validateMessageReader.bind(0, numMessages); + const validateAsync = validateAsyncMessageReader.bind(0, numMessages); + + describe(`MessageReader (${name})`, () => { + describe(`should read all Messages`, () => { + test(`Uint8Array`, io.buffer(validate)); + test(`Iterable`, io.iterable(validate)); + }); + }); + + describe(`AsyncMessageReader (${name})`, () => { + describe(`should read all Messages`, () => { + test('AsyncIterable', io.asyncIterable(validateAsync)); + test('fs.FileHandle', io.fsFileHandle(validateAsync)); + test('fs.ReadStream', io.fsReadableStream(validateAsync)); + test('stream.Readable', io.nodeReadableStream(validateAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync)); + }); + }); +} + +export function validateMessageReader(numMessages: number, source: any) { + const reader = new MessageReader(source); + let index = 0; + for (let message of reader) { + + if (index === 0) { + expect(message.isSchema()).toBe(true); + expect(message.bodyLength).toBe(0); + } else { + expect(message.isSchema()).toBe(false); + expect(message.isRecordBatch() || message.isDictionaryBatch()).toBe(true); + } + + try { + expect(message.bodyLength % 8).toBe(0); + } catch (e) { throw new Error(`bodyLength: ${e}`); } + + const body = reader.readMessageBody(message.bodyLength); + expect(body).toBeInstanceOf(Uint8Array); + expect(body.byteLength).toBe(message.bodyLength); + expect(index++).toBeLessThan(numMessages); + } + expect(index).toBe(numMessages); + reader.return(); +} + +export async function validateAsyncMessageReader(numMessages: number, source: any) { + const reader = new AsyncMessageReader(source); + let index = 0; + for await (let message of reader) { + + if (index === 0) { + expect(message.isSchema()).toBe(true); + expect(message.bodyLength).toBe(0); + } else { + expect(message.isSchema()).toBe(false); + expect(message.isRecordBatch() || message.isDictionaryBatch()).toBe(true); + } + + try { + expect(message.bodyLength % 8).toBe(0); + } catch (e) { throw new Error(`bodyLength: ${e}`); } + + const body = await reader.readMessageBody(message.bodyLength); + expect(body).toBeInstanceOf(Uint8Array); + expect(body.byteLength).toBe(message.bodyLength); + expect(index++).toBeLessThan(numMessages); + } + expect(index).toBe(numMessages); + await reader.return(); +} diff --git a/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts new file mode 100644 index 000000000..a7ddfc940 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/file-reader-tests.ts @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; +import { ArrowIOTestHelper } from '../helpers'; +import { toArray } from 'ix/asynciterable/toarray'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader +} from '../validate'; + +import { + RecordBatchReader, + RecordBatchFileReader, + AsyncRecordBatchFileReader +} from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.file(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + const validate = (source: any) => { validateRecordBatchReader('file', 3, RecordBatchReader.from(source)); }; + const validateAsync = async (source: any) => { await validateAsyncRecordBatchReader('file', 3, await RecordBatchReader.from(source)); }; + const validateAsyncWrapped = async (source: any) => { await validateAsyncRecordBatchReader('file', 3, await RecordBatchReader.from(Promise.resolve(source))); }; + + describe(`RecordBatchFileReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + test(`Uint8Array`, io.buffer(validate)); + test(`Iterable`, io.iterable(validate)); + }); + describe(`should allow random access to record batches after iterating when autoDestroy=false`, () => { + test(`Uint8Array`, io.buffer(validateRandomAccess)); + test(`Iterable`, io.iterable(validateRandomAccess)); + }); + }); + + describe(`AsyncRecordBatchFileReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + + test('AsyncIterable', io.asyncIterable(validateAsync)); + test('fs.FileHandle', io.fsFileHandle(validateAsync)); + test('fs.ReadStream', io.fsReadableStream(validateAsync)); + test('stream.Readable', io.nodeReadableStream(validateAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync)); + + test('Promise<AsyncIterable>', io.asyncIterable(validateAsyncWrapped)); + test('Promise<fs.FileHandle>', io.fsFileHandle(validateAsyncWrapped)); + test('Promise<fs.ReadStream>', io.fsReadableStream(validateAsyncWrapped)); + test('Promise<stream.Readable>', io.nodeReadableStream(validateAsyncWrapped)); + test('Promise<ReadableStream>', io.whatwgReadableStream(validateAsyncWrapped)); + test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateAsyncWrapped)); + }); + + describe(`should allow random access to record batches after iterating when autoDestroy=false`, () => { + + test('AsyncIterable', io.asyncIterable(validateRandomAccessAsync)); + test('fs.FileHandle', io.fsFileHandle(validateRandomAccessAsync)); + test('fs.ReadStream', io.fsReadableStream(validateRandomAccessAsync)); + test('stream.Readable', io.nodeReadableStream(validateRandomAccessAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateRandomAccessAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateRandomAccessAsync)); + + test('Promise<AsyncIterable>', io.asyncIterable(validateRandomAccessAsync)); + test('Promise<fs.FileHandle>', io.fsFileHandle(validateRandomAccessAsync)); + test('Promise<fs.ReadStream>', io.fsReadableStream(validateRandomAccessAsync)); + test('Promise<stream.Readable>', io.nodeReadableStream(validateRandomAccessAsync)); + test('Promise<ReadableStream>', io.whatwgReadableStream(validateRandomAccessAsync)); + test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateRandomAccessAsync)); + }); + }); +} + +function validateRandomAccess(source: any) { + const reader = RecordBatchReader.from(source) as RecordBatchFileReader; + const schema = reader.open({ autoDestroy: false }).schema; + const batches = [...reader]; + expect(reader.closed).toBe(false); + expect(reader.schema).toBe(schema); + while (batches.length > 0) { + const expected = batches.pop()!; + const actual = reader.readRecordBatch(batches.length); + expect(actual).toEqualRecordBatch(expected); + } + reader.cancel(); + expect(reader.closed).toBe(true); + expect(reader.schema).toBeUndefined(); +} + +async function validateRandomAccessAsync(source: any) { + const reader = (await RecordBatchReader.from(source)) as AsyncRecordBatchFileReader; + const schema = (await reader.open({ autoDestroy: false })).schema; + const batches = await toArray(reader); + expect(reader.closed).toBe(false); + expect(reader.schema).toBe(schema); + while (batches.length > 0) { + const expected = batches.pop()!; + const actual = await reader.readRecordBatch(batches.length); + expect(actual).toEqualRecordBatch(expected); + } + await reader.cancel(); + expect(reader.closed).toBe(true); + expect(reader.schema).toBeUndefined(); +} diff --git a/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts b/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts new file mode 100644 index 000000000..c444b78fc --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/from-inference-tests.ts @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { ArrowIOTestHelper } from '../helpers'; +import { + RecordBatchReader, + RecordBatchFileReader, + RecordBatchStreamReader, + AsyncRecordBatchFileReader, + AsyncRecordBatchStreamReader +} from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + // eslint-disable-next-line jest/valid-describe + describe('RecordBatchReader.from', ((table, name) => () => { + testFromFile(ArrowIOTestHelper.file(table), name); + testFromJSON(ArrowIOTestHelper.json(table), name); + testFromStream(ArrowIOTestHelper.stream(table), name); + })(table, name)); +} + +function testFromJSON(io: ArrowIOTestHelper, name: string) { + describe(`should return a RecordBatchJSONReader (${name})`, () => { + test(`Uint8Array`, io.buffer((buffer) => { + const json = JSON.parse(`${Buffer.from(buffer)}`); + const reader = RecordBatchReader.from(json); + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchStreamReader); + })); + }); +} + +function testFromFile(io: ArrowIOTestHelper, name: string) { + + describe(`should return a RecordBatchFileReader (${name})`, () => { + + test(`Uint8Array`, io.buffer(syncSync)); + test(`Iterable`, io.iterable(syncSync)); + test('AsyncIterable', io.asyncIterable(asyncSync)); + test('fs.FileHandle', io.fsFileHandle(asyncAsync)); + test('fs.ReadStream', io.fsReadableStream(asyncSync)); + test('stream.Readable', io.nodeReadableStream(asyncSync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(asyncSync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(asyncSync)); + + test(`Promise<Uint8Array>`, io.buffer((source) => asyncSync(Promise.resolve(source)))); + test(`Promise<Iterable>`, io.iterable((source) => asyncSync(Promise.resolve(source)))); + test('Promise<AsyncIterable>', io.asyncIterable((source) => asyncSync(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', io.fsFileHandle((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', io.fsReadableStream((source) => asyncSync(Promise.resolve(source)))); + test('Promise<stream.Readable>', io.nodeReadableStream((source) => asyncSync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableStream>', io.whatwgReadableStream((source) => asyncSync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableByteStream>', io.whatwgReadableByteStream((source) => asyncSync(Promise.resolve(source)))); + }); + + function syncSync(source: any) { + const reader = RecordBatchReader.from(source); + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchFileReader); + } + + async function asyncSync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchFileReader); + } + + async function asyncAsync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(false); + expect(reader.isAsync()).toEqual(true); + expect(reader).toBeInstanceOf(AsyncRecordBatchFileReader); + } +} + +function testFromStream(io: ArrowIOTestHelper, name: string) { + + describe(`should return a RecordBatchStreamReader (${name})`, () => { + + test(`Uint8Array`, io.buffer(syncSync)); + test(`Iterable`, io.iterable(syncSync)); + test('AsyncIterable', io.asyncIterable(asyncAsync)); + test('fs.FileHandle', io.fsFileHandle(asyncAsync)); + test('fs.ReadStream', io.fsReadableStream(asyncAsync)); + test('stream.Readable', io.nodeReadableStream(asyncAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(asyncAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(asyncAsync)); + + test(`Promise<Uint8Array>`, io.buffer((source) => asyncSync(Promise.resolve(source)))); + test(`Promise<Iterable>`, io.iterable((source) => asyncSync(Promise.resolve(source)))); + test('Promise<AsyncIterable>', io.asyncIterable((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', io.fsFileHandle((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', io.fsReadableStream((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<stream.Readable>', io.nodeReadableStream((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableStream>', io.whatwgReadableStream((source) => asyncAsync(Promise.resolve(source)))); + test('Promise<whatwg.ReadableByteStream>', io.whatwgReadableByteStream((source) => asyncAsync(Promise.resolve(source)))); + }); + + function syncSync(source: any) { + const reader = RecordBatchReader.from(source); + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchStreamReader); + } + + async function asyncSync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(true); + expect(reader.isAsync()).toEqual(false); + expect(reader).toBeInstanceOf(RecordBatchStreamReader); + } + + async function asyncAsync(source: any) { + const pending = RecordBatchReader.from(source); + expect(pending).toBeInstanceOf(Promise); + const reader = await pending; + expect(reader.isSync()).toEqual(false); + expect(reader.isAsync()).toEqual(true); + expect(reader).toBeInstanceOf(AsyncRecordBatchStreamReader); + } +} diff --git a/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts new file mode 100644 index 000000000..9bd1e3466 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/json-reader-tests.ts @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { ArrowIOTestHelper } from '../helpers'; +import { RecordBatchReader } from 'apache-arrow'; +import { validateRecordBatchReader } from '../validate'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.json(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchJSONReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + test(`Uint8Array`, io.buffer((buffer) => { + const json = JSON.parse(Buffer.from(buffer).toString()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(json)); + })); + }); + }); +} diff --git a/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts b/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts new file mode 100644 index 000000000..23879cf79 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/stream-reader-tests.ts @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader +} from '../validate'; + +import { ArrowIOTestHelper } from '../helpers'; +import { RecordBatchReader } from 'apache-arrow'; + +for (const table of generateRandomTables([10, 20, 30])) { + + const io = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + const validate = (source: any) => { validateRecordBatchReader('stream', 3, RecordBatchReader.from(source)); }; + const validateAsync = async (source: any) => { await validateAsyncRecordBatchReader('stream', 3, await RecordBatchReader.from(source)); }; + const validateAsyncWrapped = async (source: any) => { await validateAsyncRecordBatchReader('stream', 3, await RecordBatchReader.from(Promise.resolve(source))); }; + + describe(`RecordBatchStreamReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + test(`Uint8Array`, io.buffer(validate)); + test(`Iterable`, io.iterable(validate)); + }); + }); + + describe(`AsyncRecordBatchStreamReader (${name})`, () => { + describe(`should read all RecordBatches`, () => { + + test('AsyncIterable', io.asyncIterable(validateAsync)); + test('fs.FileHandle', io.fsFileHandle(validateAsync)); + test('fs.ReadStream', io.fsReadableStream(validateAsync)); + test('stream.Readable', io.nodeReadableStream(validateAsync)); + test('whatwg.ReadableStream', io.whatwgReadableStream(validateAsync)); + test('whatwg.ReadableByteStream', io.whatwgReadableByteStream(validateAsync)); + + test('Promise<AsyncIterable>', io.asyncIterable(validateAsyncWrapped)); + test('Promise<fs.FileHandle>', io.fsFileHandle(validateAsyncWrapped)); + test('Promise<fs.ReadStream>', io.fsReadableStream(validateAsyncWrapped)); + test('Promise<stream.Readable>', io.nodeReadableStream(validateAsyncWrapped)); + test('Promise<ReadableStream>', io.whatwgReadableStream(validateAsyncWrapped)); + test('Promise<ReadableByteStream>', io.whatwgReadableByteStream(validateAsyncWrapped)); + }); + }); +} diff --git a/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts new file mode 100644 index 000000000..a380e1619 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/streams-dom-tests.ts @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { + Table, + RecordBatchReader, + RecordBatchStreamWriter +} from 'apache-arrow'; + +import { validateRecordBatchAsyncIterator } from '../validate'; +import { ArrowIOTestHelper, readableDOMStreamToAsyncIterator } from '../helpers'; + +(() => { + + if (process.env.TEST_DOM_STREAMS !== 'true') { + return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchReader.throughDOM (${name})`, () => { + describe('file', () => { + test('ReadableStream', file.whatwgReadableStream(validate)); + test('ReadableByteStream', file.whatwgReadableByteStream(validate)); + }); + describe('stream', () => { + test('ReadableStream', stream.whatwgReadableStream(validate)); + test('ReadableByteStream', stream.whatwgReadableByteStream(validate)); + }); + async function validate(source: ReadableStream) { + const stream = source.pipeThrough(RecordBatchReader.throughDOM()); + await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream)); + } + }); + + describe(`toDOMStream (${name})`, () => { + + describe(`RecordBatchJSONReader`, () => { + test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`)))); + }); + + describe(`RecordBatchFileReader`, () => { + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + describe(`RecordBatchStreamReader`, () => { + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + async function validate(source: any) { + const reader: RecordBatchReader = await RecordBatchReader.from(source); + const iterator = readableDOMStreamToAsyncIterator(reader.toDOMStream()); + await validateRecordBatchAsyncIterator(3, iterator); + } + }); + } + + it('readAll() should pipe to separate WhatWG WritableStreams', async () => { + // @ts-ignore + const { concatStream } = await import('@openpgp/web-stream-tools'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = concatStream(tables.map((table, i) => + RecordBatchStreamWriter.writeAll(table).toDOMStream({ + // Alternate between bytes mode and regular mode because code coverage + type: i % 2 === 0 ? 'bytes' : undefined + }) + )) as ReadableStream<Uint8Array>; + + let tableIndex = -1; + let reader: RecordBatchReader | undefined; + + for await (reader of RecordBatchReader.readAll(stream)) { + + validateStreamState(reader, stream, false); + + const output = reader + .pipeThrough(RecordBatchStreamWriter.throughDOM()) + .pipeThrough(new TransformStream()); + + validateStreamState(reader, output, false, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(output); + expect(streamTable).toEqualTable(sourceTable); + expect(output.locked).toBe(false); + } + + expect(reader).toBeDefined(); + validateStreamState(reader!, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should not close the underlying WhatWG ReadableStream when reading multiple tables to completion', async () => { + // @ts-ignore + const { concatStream } = await import('@openpgp/web-stream-tools'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = concatStream(tables.map((table, i) => + RecordBatchStreamWriter.writeAll(table).toDOMStream({ + // Alternate between bytes mode and regular mode because code coverage + type: i % 2 === 0 ? 'bytes' : undefined + }) + )) as ReadableStream<Uint8Array>; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + validateStreamState(reader, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should close the underlying WhatWG ReadableStream when reading multiple tables and we break early', async () => { + // @ts-ignore + const { concatStream } = await import('@openpgp/web-stream-tools'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = concatStream(tables.map((table, i) => + RecordBatchStreamWriter.writeAll(table).toDOMStream({ + // Alternate between bytes mode and regular mode because code coverage + type: i % 2 === 0 ? 'bytes' : undefined + }) + )) as ReadableStream<Uint8Array>; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + let batchIndex = -1; + const sourceTable = tables[++tableIndex]; + const breakEarly = tableIndex === (tables.length / 2 | 0); + + for await (const streamBatch of reader) { + expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]); + if (breakEarly && batchIndex === 1) { break; } + } + if (breakEarly) { + // the reader should stay open until we break from the outermost loop + validateStreamState(reader, stream, false); + break; + } + } + + validateStreamState(reader, stream, true); + expect(tableIndex).toBe(tables.length / 2 | 0); + }); +})(); + +function validateStreamState(reader: RecordBatchReader, stream: ReadableStream, closed: boolean, locked = !closed) { + expect(reader.closed).toBe(closed); + expect(stream.locked).toBe(locked); +} diff --git a/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts new file mode 100644 index 000000000..822f99350 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/reader/streams-node-tests.ts @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables +} from '../../../data/tables'; + +import { + Table, + RecordBatchReader, + RecordBatchStreamWriter +} from 'apache-arrow'; + +import { ArrowIOTestHelper } from '../helpers'; +import { validateRecordBatchAsyncIterator } from '../validate'; + +(() => { + + if (process.env.TEST_NODE_STREAMS !== 'true') { + return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchReader.throughNode (${name})`, () => { + describe('file', () => { + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + }); + describe('stream', () => { + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + }); + async function validate(source: NodeJS.ReadableStream) { + const stream = source.pipe(RecordBatchReader.throughNode()); + await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]()); + } + }); + + describe(`toNodeStream (${name})`, () => { + + describe(`RecordBatchJSONReader`, () => { + test('Uint8Array', json.buffer((source) => validate(JSON.parse(`${Buffer.from(source)}`)))); + }); + + describe(`RecordBatchFileReader`, () => { + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', file.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', file.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', file.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', file.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + describe(`RecordBatchStreamReader`, () => { + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable((source) => validate(Promise.resolve(source)))); + test('Promise<fs.FileHandle>', stream.fsFileHandle((source) => validate(Promise.resolve(source)))); + test('Promise<fs.ReadStream>', stream.fsReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<stream.Readable>', stream.nodeReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableStream>', stream.whatwgReadableStream((source) => validate(Promise.resolve(source)))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream((source) => validate(Promise.resolve(source)))); + }); + + async function validate(source: any) { + const reader: RecordBatchReader = await RecordBatchReader.from(source); + await validateRecordBatchAsyncIterator(3, reader.toNodeStream()[Symbol.asyncIterator]()); + } + }); + } + + it('readAll() should pipe to separate NodeJS WritableStreams', async () => { + // @ts-ignore + const { default: MultiStream } = await import('multistream'); + const { PassThrough } = await import('stream'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = new MultiStream(tables.map((table) => + () => RecordBatchStreamWriter.writeAll(table).toNodeStream() + )) as NodeJS.ReadableStream; + + let tableIndex = -1; + let reader: RecordBatchReader | undefined; + + for await (reader of RecordBatchReader.readAll(stream)) { + + validateStreamState(reader, stream, false); + + const output = reader + .pipe(RecordBatchStreamWriter.throughNode()) + .pipe(new PassThrough()); + + validateStreamState(reader, output, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(output); + expect(streamTable).toEqualTable(sourceTable); + expect(Boolean(output.readableFlowing)).toBe(false); + } + + expect(reader).toBeDefined(); + validateStreamState(reader!, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should not close the underlying NodeJS ReadableStream when reading multiple tables to completion', async () => { + // @ts-ignore + const { default: MultiStream } = await import('multistream'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = new MultiStream(tables.map((table) => + () => RecordBatchStreamWriter.writeAll(table).toNodeStream() + )) as NodeJS.ReadableStream; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + const sourceTable = tables[++tableIndex]; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + validateStreamState(reader, stream, true); + expect(tableIndex).toBe(tables.length - 1); + }); + + it('should close the underlying NodeJS ReadableStream when reading multiple tables and we break early', async () => { + // @ts-ignore + const { default: MultiStream } = await import('multistream'); + + expect.hasAssertions(); + + const tables = [...generateRandomTables([10, 20, 30])]; + + const stream = new MultiStream(tables.map((table) => + () => RecordBatchStreamWriter.writeAll(table).toNodeStream() + )) as NodeJS.ReadableStream; + + let tableIndex = -1; + let reader = await RecordBatchReader.from(stream); + + validateStreamState(reader, stream, false); + + for await (reader of RecordBatchReader.readAll(reader)) { + + validateStreamState(reader, stream, false); + + let batchIndex = -1; + const sourceTable = tables[++tableIndex]; + const breakEarly = tableIndex === (tables.length / 2 | 0); + + for await (const streamBatch of reader) { + expect(streamBatch).toEqualRecordBatch(sourceTable.chunks[++batchIndex]); + if (breakEarly && batchIndex === 1) { break; } + } + if (breakEarly) { + // the reader should stay open until we break from the outermost loop + validateStreamState(reader, stream, false); + break; + } + } + + validateStreamState(reader, stream, true, true); + expect(tableIndex).toBe(tables.length / 2 | 0); + }); +})(); + +function validateStreamState(reader: RecordBatchReader, stream: NodeJS.ReadableStream, closed: boolean, readable = !closed) { + expect(reader.closed).toBe(closed); + expect(Boolean(stream.readable)).toBe(readable); + expect(Boolean((stream as any).destroyed)).toBe(closed); + expect(Boolean((stream as any).readableFlowing)).toBe(false); +} diff --git a/src/arrow/js/test/unit/ipc/validate.ts b/src/arrow/js/test/unit/ipc/validate.ts new file mode 100644 index 000000000..aedf87a2d --- /dev/null +++ b/src/arrow/js/test/unit/ipc/validate.ts @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; + +import { + Schema, + RecordBatch, + RecordBatchReader, + RecordBatchFileReader, + RecordBatchStreamReader, +} from 'apache-arrow'; + +export function validateRecordBatchReader<T extends RecordBatchFileReader | RecordBatchStreamReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) { + const reader = r.open(); + expect(reader).toBeInstanceOf(RecordBatchReader); + expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true); + expect(reader.schema).toBeInstanceOf(Schema); + validateRecordBatchIterator(numBatches, reader[Symbol.iterator]()); + expect(reader.closed).toBe(reader.autoDestroy); + return reader; +} + +export async function validateAsyncRecordBatchReader<T extends RecordBatchReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) { + const reader = await r.open(); + expect(reader).toBeInstanceOf(RecordBatchReader); + expect(reader.schema).toBeInstanceOf(Schema); + expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true); + await validateRecordBatchAsyncIterator(numBatches, reader[Symbol.asyncIterator]()); + expect(reader.closed).toBe(reader.autoDestroy); + return reader; +} + +export function validateRecordBatchIterator(numBatches: number, iterator: Iterable<RecordBatch> | IterableIterator<RecordBatch>) { + let i = 0; + try { + for (const recordBatch of iterator) { + expect(recordBatch).toBeInstanceOf(RecordBatch); + expect(i++).toBeLessThan(numBatches); + } + } catch (e) { throw new Error(`${i}: ${e}`); } + expect(i).toBe(numBatches); + if (typeof (iterator as any).return === 'function') { + (iterator as any).return(); + } +} + +export async function validateRecordBatchAsyncIterator(numBatches: number, iterator: AsyncIterable<RecordBatch> | AsyncIterableIterator<RecordBatch>) { + let i = 0; + try { + for await (const recordBatch of iterator) { + expect(recordBatch).toBeInstanceOf(RecordBatch); + expect(i++).toBeLessThan(numBatches); + } + } catch (e) { throw new Error(`${i}: ${e}`); } + expect(i).toBe(numBatches); + if (typeof (iterator as any).return === 'function') { + await (iterator as any).return(); + } +} diff --git a/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts new file mode 100644 index 000000000..fa639e5f6 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/file-writer-tests.ts @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import { validateRecordBatchIterator } from '../validate'; +import { Table, RecordBatchFileWriter } from 'apache-arrow'; + +describe('RecordBatchFileWriter', () => { + for (const table of generateRandomTables([10, 20, 30])) { + testFileWriter(table, `[${table.schema.fields.join(', ')}]`); + } + for (const table of generateDictionaryTables([10, 20, 30])) { + testFileWriter(table, `${table.schema.fields[0]}`); + } +}); + +function testFileWriter(table: Table, name: string) { + describe(`should write the Arrow IPC file format (${name})`, () => { + test(`Table`, validateTable.bind(0, table)); + }); +} + +async function validateTable(source: Table) { + const writer = RecordBatchFileWriter.writeAll(source); + const result = await Table.from(writer.toUint8Array()); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts new file mode 100644 index 000000000..05be0e272 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/json-writer-tests.ts @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import { validateRecordBatchIterator } from '../validate'; +import { Table, RecordBatchJSONWriter } from 'apache-arrow'; + +describe('RecordBatchJSONWriter', () => { + for (const table of generateRandomTables([10, 20, 30])) { + testJSONWriter(table, `[${table.schema.fields.join(', ')}]`); + } + for (const table of generateDictionaryTables([10, 20, 30])) { + testJSONWriter(table, `${table.schema.fields[0]}`); + } +}); + +function testJSONWriter(table: Table, name: string) { + describe(`should write the Arrow IPC JSON format (${name})`, () => { + test(`Table`, validateTable.bind(0, table)); + }); +} + +async function validateTable(source: Table) { + const writer = RecordBatchJSONWriter.writeAll(source); + const result = Table.from(JSON.parse(await writer.toString())); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts new file mode 100644 index 000000000..a83aa39da --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/stream-writer-tests.ts @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + generateDictionaryTables +} from '../../../data/tables'; + +import * as generate from '../../../generate-test-data'; +import { validateRecordBatchIterator } from '../validate'; +import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer'; +import { DictionaryVector, Dictionary, Uint32, Int32 } from 'apache-arrow'; +import { Table, Schema, Field, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from 'apache-arrow'; + +describe('RecordBatchStreamWriter', () => { + + (() => { + const type = generate.sparseUnion(0, 0).vector.type; + const schema = new Schema([new Field('dictSparseUnion', type)]); + const table = generate.table([10, 20, 30], schema).table; + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + })(); + + for (const table of generateRandomTables([10, 20, 30])) { + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + } + + for (const table of generateDictionaryTables([10, 20, 30])) { + const testName = `${table.schema.fields[0]}`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); + } + + it(`should write multiple tables to the same output stream`, async () => { + const tables = [] as Table[]; + const writer = new RecordBatchStreamWriter({ autoDestroy: false }); + const validate = (async () => { + for await (const reader of RecordBatchReader.readAll(writer)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + })(); + for (const table of generateRandomTables([10, 20, 30])) { + tables.push(table); + await writer.writeAll((async function*() { + for (const chunk of table.chunks) { + yield chunk; // insert some asynchrony + await new Promise((r) => setTimeout(r, 1)); + } + }())); + } + writer.close(); + await validate; + }); + + it('should write delta dictionary batches', async () => { + + const name = 'dictionary_encoded_uint32'; + const chunks: DictionaryVector<Uint32, Int32>[] = []; + const { + vector: sourceVector, values: sourceValues, + } = generate.dictionary(1000, 20, new Uint32(), new Int32()); + + const writer = RecordBatchStreamWriter.writeAll((function* () { + const transform = Builder.throughIterable({ + type: sourceVector.type, nullValues: [null], + queueingStrategy: 'count', highWaterMark: 50, + }); + for (const chunk of transform(sourceValues())) { + chunks.push(chunk); + yield RecordBatch.new({ [name]: chunk }); + } + })()); + + expect(Chunked.concat(chunks)).toEqualVector(sourceVector); + + type T = { [name]: Dictionary<Uint32, Int32> }; + const sourceTable = Table.new({ [name]: sourceVector }); + const resultTable = await Table.from<T>(writer.toUint8Array()); + + const { dictionary } = resultTable.getColumn(name); + + expect(resultTable).toEqualTable(sourceTable); + expect((dictionary as Chunked)).toBeInstanceOf(Chunked); + expect((dictionary as Chunked).chunks).toHaveLength(20); + }); +}); + +function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) { + describe(`should write the Arrow IPC stream format (${name})`, () => { + test(`Table`, validateTable.bind(0, table, options)); + }); +} + +async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) { + const writer = RecordBatchStreamWriter.writeAll(source, options); + const result = await Table.from(writer.toUint8Array()); + validateRecordBatchIterator(3, source.chunks); + expect(result).toEqualTable(source); +} diff --git a/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts new file mode 100644 index 000000000..a19ddcdd7 --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/streams-dom-tests.ts @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { from, as } from 'ix/asynciterable'; +import { tap, flatMap } from 'ix/asynciterable/operators'; + +import { + Table, + RecordBatchReader, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import { + ArrowIOTestHelper, + concatBuffersAsync, + readableDOMStreamToAsyncIterator +} from '../helpers'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader, + validateRecordBatchAsyncIterator +} from '../validate'; + +(() => { + + if (process.env.TEST_DOM_STREAMS !== 'true') { + return test('not testing DOM streams because process.env.TEST_DOM_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchWriter.throughDOM (${name})`, () => { + + describe('file', () => { + describe(`convert`, () => { + test('ReadableStream', file.whatwgReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + test('ReadableByteStream', file.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchStreamWriter))); + }); + describe(`through`, () => { + test('ReadableStream', file.whatwgReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + test('ReadableByteStream', file.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchFileWriter))); + }); + }); + + describe('stream', () => { + describe(`convert`, () => { + test('ReadableStream', stream.whatwgReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + test('ReadableByteStream', stream.whatwgReadableByteStream(validateConvert.bind(0, RecordBatchFileWriter))); + }); + describe(`through`, () => { + test('ReadableStream', stream.whatwgReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + test('ReadableByteStream', stream.whatwgReadableByteStream(validateThrough.bind(0, RecordBatchStreamWriter))); + }); + }); + + async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) { + const stream = source + .pipeThrough(RecordBatchReader.throughDOM()) + .pipeThrough(RBWImplementation.throughDOM()); + const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream'; + await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream)); + } + + async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: ReadableStream) { + const stream = source + .pipeThrough(RecordBatchReader.throughDOM()) + .pipeThrough(RBWImplementation.throughDOM()) + .pipeThrough(RecordBatchReader.throughDOM()); + await validateRecordBatchAsyncIterator(3, readableDOMStreamToAsyncIterator(stream)); + } + }); + + describe(`toDOMStream (${name})`, () => { + + const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x)); + + describe(`RecordBatchJSONWriter`, () => { + + const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`); + + test('Uint8Array', json.buffer((source) => validate(toJSON(source)))); + test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source))))); + + async function validate(source: { schema: any } | Promise<{ schema: any }>) { + const reader = await RecordBatchReader.from(<any> source); + const writer = await RecordBatchJSONWriter.writeAll(reader); + const buffer = await concatBuffersAsync(writer.toDOMStream()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer))); + } + }); + + describe(`RecordBatchFileWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchFileWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('file', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchFileWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('file', 3, reader); + } + }); + }); + + describe(`RecordBatchStreamWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchStreamWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('stream', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchStreamWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toDOMStream()); + await validateAsyncRecordBatchReader('stream', 3, reader); + } + }); + }); + }); + } + + describe(`RecordBatchStreamWriter.throughDOM`, () => { + + const opts = { autoDestroy: false }; + const sleep = (n: number) => new Promise((r) => setTimeout(r, n)); + + it(`should write a stream of tables to the same output stream`, async () => { + + const tables = [] as Table[]; + const stream: ReadableStream<any> = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(stream.locked).toBe(false); + }); + + it(`should write a stream of record batches to the same output stream`, async () => { + + const tables = [] as Table[]; + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + // flatMap from Table -> RecordBatches[] + .pipe(flatMap((table) => as(table.chunks))) + .pipeThrough(RecordBatchStreamWriter.throughDOM(opts)); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(stream.locked).toBe(false); + }); + }); + +})(); diff --git a/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts new file mode 100644 index 000000000..662129b1b --- /dev/null +++ b/src/arrow/js/test/unit/ipc/writer/streams-node-tests.ts @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + generateRandomTables, + // generateDictionaryTables +} from '../../../data/tables'; + +import { from, as } from 'ix/asynciterable'; +import { tap, flatMap } from 'ix/asynciterable/operators'; +import 'ix/Ix.node'; + +import { + Table, + RecordBatchReader, + RecordBatchWriter, + RecordBatchFileWriter, + RecordBatchJSONWriter, + RecordBatchStreamWriter, +} from 'apache-arrow'; + +import { + ArrowIOTestHelper, + concatBuffersAsync +} from '../helpers'; + +import { + validateRecordBatchReader, + validateAsyncRecordBatchReader, + validateRecordBatchAsyncIterator +} from '../validate'; + +(() => { + + if (process.env.TEST_NODE_STREAMS !== 'true') { + return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {}); + } + + for (const table of generateRandomTables([10, 20, 30])) { + + const file = ArrowIOTestHelper.file(table); + const json = ArrowIOTestHelper.json(table); + const stream = ArrowIOTestHelper.stream(table); + const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; + + describe(`RecordBatchWriter.throughNode (${name})`, () => { + + describe('file', () => { + describe(`convert`, () => { + test('fs.ReadStream', file.fsReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + test('stream.Readable', file.nodeReadableStream(validateConvert.bind(0, RecordBatchStreamWriter))); + }); + describe(`through`, () => { + test('fs.ReadStream', file.fsReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + test('stream.Readable', file.nodeReadableStream(validateThrough.bind(0, RecordBatchFileWriter))); + }); + }); + + describe('stream', () => { + describe(`convert`, () => { + test('fs.ReadStream', stream.fsReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + test('stream.Readable', stream.nodeReadableStream(validateConvert.bind(0, RecordBatchFileWriter))); + }); + describe(`through`, () => { + test('fs.ReadStream', stream.fsReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + test('stream.Readable', stream.nodeReadableStream(validateThrough.bind(0, RecordBatchStreamWriter))); + }); + }); + + async function validateConvert(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) { + const stream = source + .pipe(RecordBatchReader.throughNode()) + .pipe(RBWImplementation.throughNode()); + const type = RBWImplementation === RecordBatchFileWriter ? 'file' : 'stream'; + await validateAsyncRecordBatchReader(type, 3, await RecordBatchReader.from(stream)); + } + + async function validateThrough(RBWImplementation: typeof RecordBatchWriter, source: NodeJS.ReadableStream) { + const stream = source + .pipe(RecordBatchReader.throughNode()) + .pipe(RBWImplementation.throughNode()) + .pipe(RecordBatchReader.throughNode()); + await validateRecordBatchAsyncIterator(3, stream[Symbol.asyncIterator]()); + } + }); + + describe(`toNodeStream (${name})`, () => { + + const wrapArgInPromise = (fn: (p: Promise<any>) => any) => (x: any) => fn(Promise.resolve(x)); + + describe(`RecordBatchJSONWriter`, () => { + + const toJSON = (x: any): { schema: any } => JSON.parse(`${Buffer.from(x)}`); + + test('Uint8Array', json.buffer((source) => validate(toJSON(source)))); + test('Promise<Uint8Array>', json.buffer((source) => validate(Promise.resolve(toJSON(source))))); + + async function validate(source: { schema: any } | Promise<{ schema: any }>) { + const reader = await RecordBatchReader.from(<any> source); + const writer = await RecordBatchJSONWriter.writeAll(reader); + const buffer = await concatBuffersAsync(writer.toNodeStream()); + validateRecordBatchReader('json', 3, RecordBatchReader.from(toJSON(buffer))); + } + }); + + describe(`RecordBatchFileWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchFileWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('file', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, file.buffer(validate)); + test(`Iterable`, file.iterable(validate)); + test('AsyncIterable', file.asyncIterable(validate)); + test('fs.FileHandle', file.fsFileHandle(validate)); + test('fs.ReadStream', file.fsReadableStream(validate)); + test('stream.Readable', file.nodeReadableStream(validate)); + test('whatwg.ReadableStream', file.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', file.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', file.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', file.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', file.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', file.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', file.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', file.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchFileWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('file', 3, reader); + } + }); + }); + + describe(`RecordBatchStreamWriter`, () => { + + describe(`sync write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const reader = await RecordBatchReader.from(source); + const writer = await RecordBatchStreamWriter.writeAll(reader); + const stream = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('stream', 3, stream); + } + }); + + describe(`async write/read`, () => { + + test(`Uint8Array`, stream.buffer(validate)); + test(`Iterable`, stream.iterable(validate)); + test('AsyncIterable', stream.asyncIterable(validate)); + test('fs.FileHandle', stream.fsFileHandle(validate)); + test('fs.ReadStream', stream.fsReadableStream(validate)); + test('stream.Readable', stream.nodeReadableStream(validate)); + test('whatwg.ReadableStream', stream.whatwgReadableStream(validate)); + test('whatwg.ReadableByteStream', stream.whatwgReadableByteStream(validate)); + test('Promise<AsyncIterable>', stream.asyncIterable(wrapArgInPromise(validate))); + test('Promise<fs.FileHandle>', stream.fsFileHandle(wrapArgInPromise(validate))); + test('Promise<fs.ReadStream>', stream.fsReadableStream(wrapArgInPromise(validate))); + test('Promise<stream.Readable>', stream.nodeReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableStream>', stream.whatwgReadableStream(wrapArgInPromise(validate))); + test('Promise<ReadableByteStream>', stream.whatwgReadableByteStream(wrapArgInPromise(validate))); + + async function validate(source: any) { + const writer = new RecordBatchStreamWriter(); + /* no await */ writer.writeAll(await RecordBatchReader.from(source)); + const reader = await RecordBatchReader.from(writer.toNodeStream()); + await validateAsyncRecordBatchReader('stream', 3, reader); + } + }); + }); + }); + } + + describe(`RecordBatchStreamWriter.throughNode`, () => { + + const sleep = (n: number) => new Promise((r) => setTimeout(r, n)); + + it(`should write a stream of tables to the same output stream`, async () => { + + const tables = [] as Table[]; + const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false }); + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipe(writer); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(writer.readable).toBe(false); + expect((writer as any).destroyed).toBe(true); + }); + + it(`should write a stream of record batches to the same output stream`, async () => { + + const tables = [] as Table[]; + const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false }); + const stream = from(generateRandomTables([10, 20, 30])) + // insert some asynchrony + .pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } })) + .pipe(flatMap((table) => as(table.chunks))) + .pipe(writer); + + for await (const reader of RecordBatchReader.readAll(stream)) { + const sourceTable = tables.shift()!; + const streamTable = await Table.from(reader); + expect(streamTable).toEqualTable(sourceTable); + } + + expect(tables).toHaveLength(0); + expect(writer.readable).toBe(false); + expect((writer as any).destroyed).toBe(true); + }); + + }); +})(); diff --git a/src/arrow/js/test/unit/math-tests.ts b/src/arrow/js/test/unit/math-tests.ts new file mode 100644 index 000000000..7e3ffcd8f --- /dev/null +++ b/src/arrow/js/test/unit/math-tests.ts @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import * as Arrow from 'apache-arrow'; +const { float64ToUint16, uint16ToFloat64 } = Arrow.util; + +describe('Float16', () => { + test('Uint16 to Float64 works', () => { + + const uNaN = 0x7E00 /* NaN */; + const pInf = 0x7C00 /* 1/0 */; + const nInf = 0xFC00 /*-1/0 */; + let value = 0, expected = value; + + do { + + expected = value; + + // if exponent is all 1s, either Infinity or NaN + if ((value & 0x7C00) === 0x7C00) { + // if significand, must be NaN + if (((value << 6) & 0xFFFF) !== 0) { + expected = uNaN; + } else { + // otherwise +/- Infinity + expected = (value >>> 15) !== 0 ? nInf : pInf; + } + } + + expect(float64ToUint16(uint16ToFloat64(value))).toEqual(expected); + } while (++value < 65536); + }); +}); diff --git a/src/arrow/js/test/unit/recordbatch/record-batch-tests.ts b/src/arrow/js/test/unit/recordbatch/record-batch-tests.ts new file mode 100644 index 000000000..520c04f84 --- /dev/null +++ b/src/arrow/js/test/unit/recordbatch/record-batch-tests.ts @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; +import { + Data, RecordBatch, + Vector, Int32Vector, Float32Vector, Float32, Int32, +} from 'apache-arrow'; +import { arange } from '../utils'; + +function numsRecordBatch(i32Len: number, f32Len: number) { + return RecordBatch.new({ + i32: Int32Vector.from(new Int32Array(arange(new Array(i32Len)))) as Int32Vector, + f32: Float32Vector.from(new Float32Array(arange(new Array(f32Len)))) as Float32Vector + }); +} + +describe(`RecordBatch`, () => { + describe(`new()`, () => { + + test(`creates a new RecordBatch from a Vector`, () => { + + const i32s = new Int32Array(arange(new Array<number>(10))); + + let i32 = Vector.new(Data.Int(new Int32(), 0, i32s.length, 0, null, i32s)); + expect(i32).toHaveLength(i32s.length); + expect(i32.nullCount).toBe(0); + + const batch = RecordBatch.new([i32], ['i32']); + i32 = batch.getChildAt(0) as Int32Vector; + + expect(batch.schema.fields[0].name).toBe('i32'); + expect(i32).toHaveLength(i32s.length); + expect(i32.nullCount).toBe(0); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + }); + + test(`creates a new RecordBatch from Vectors`, () => { + + const i32s = new Int32Array(arange(new Array<number>(10))); + const f32s = new Float32Array(arange(new Array<number>(10))); + + let i32 = Vector.new(Data.Int(new Int32(), 0, i32s.length, 0, null, i32s)); + let f32 = Vector.new(Data.Float(new Float32(), 0, f32s.length, 0, null, f32s)); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + const batch = RecordBatch.new([i32, f32], ['i32', 'f32']); + i32 = batch.getChildAt(0) as Int32Vector; + f32 = batch.getChildAt(1) as Float32Vector; + + expect(batch.schema.fields[0].name).toBe('i32'); + expect(batch.schema.fields[1].name).toBe('f32'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + expect(f32).toEqualVector(Float32Vector.from(f32s)); + }); + + test(`creates a new RecordBatch from Vectors with different lengths`, () => { + + const i32s = new Int32Array(arange(new Array<number>(20))); + const f32s = new Float32Array(arange(new Array<number>(8))); + + let i32 = Int32Vector.from(i32s); + let f32 = Float32Vector.from(f32s); + + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + const batch = RecordBatch.new([i32, f32]); + i32 = batch.getChildAt(0) as Int32Vector; + f32 = batch.getChildAt(1) as Float32Vector; + + expect(batch.schema.fields[0].name).toBe('0'); + expect(batch.schema.fields[1].name).toBe('1'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(i32s.length); // new length should be the same as the longest sibling + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(i32s.length - f32s.length); + + const f32Expected = Data.Float( + f32.type, 0, i32s.length, + i32s.length - f32s.length, + new Uint8Array(8).fill(255, 0, 1), f32s); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + expect(f32).toEqualVector(new Float32Vector(f32Expected)); + }); + }); + + describe(`select()`, () => { + test(`can select recordbatch children by name`, () => { + const batch = numsRecordBatch(32, 27); + const i32sBatch = batch.select('i32'); + expect(i32sBatch.numCols).toBe(1); + expect(i32sBatch).toHaveLength(32); + }); + }); + describe(`selectAt()`, () => { + test(`can select recordbatch children by index`, () => { + const batch = numsRecordBatch(32, 45); + const f32sBatch = batch.selectAt(1); + expect(f32sBatch.numCols).toBe(1); + expect(f32sBatch).toHaveLength(45); + }); + }); +}); diff --git a/src/arrow/js/test/unit/table-tests.ts b/src/arrow/js/test/unit/table-tests.ts new file mode 100644 index 000000000..2f138182b --- /dev/null +++ b/src/arrow/js/test/unit/table-tests.ts @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../jest-extensions'; +import { + Data, Schema, Field, Table, RecordBatch, Column, + Vector, Int32Vector, Float32Vector, Utf8Vector, DictionaryVector, + Struct, Float32, Int32, Dictionary, Utf8, Int8 +} from 'apache-arrow'; +import { arange } from './utils'; + +const NAMES = ['f32', 'i32', 'dictionary'] as (keyof TestDataSchema)[]; +const F32 = 0, I32 = 1, DICT = 2; +export const test_data = [ + { + name: `single record batch`, + table: getSingleRecordBatchTable, + // Use Math.fround to coerce to float32 + values: () => [ + [Math.fround(-0.3), -1, 'a'], + [Math.fround(-0.2), 1, 'b'], + [Math.fround(-0.1), -1, 'c'], + [Math.fround(0), 1, 'a'], + [Math.fround(0.1), -1, 'b'], + [Math.fround(0.2), 1, 'c'], + [Math.fround(0.3), -1, 'a'] + ] + }, { + name: `multiple record batches`, + table: getMultipleRecordBatchesTable, + values: () => [ + [Math.fround(-0.3), -1, 'a'], + [Math.fround(-0.2), 1, 'b'], + [Math.fround(-0.1), -1, 'c'], + [Math.fround(0), 1, 'a'], + [Math.fround(0.1), -1, 'b'], + [Math.fround(0.2), 1, 'c'], + [Math.fround(0.3), -1, 'a'], + [Math.fround(0.2), 1, 'b'], + [Math.fround(0.1), -1, 'c'], + ] + }, { + name: `struct`, + table: () => Table.fromStruct(getStructTable().getColumn('struct')!), + // Use Math.fround to coerce to float32 + values: () => [ + [Math.fround(-0.3), -1, 'a'], + [Math.fround(-0.2), 1, 'b'], + [Math.fround(-0.1), -1, 'c'], + [Math.fround(0), 1, 'a'], + [Math.fround(0.1), -1, 'b'], + [Math.fround(0.2), 1, 'c'], + [Math.fround(0.3), -1, 'a'] + ] + }, +]; + +function compareBatchAndTable(source: Table, offset: number, batch: RecordBatch, table: Table) { + expect(batch).toHaveLength(table.length); + expect(table.numCols).toEqual(source.numCols); + expect(batch.numCols).toEqual(source.numCols); + for (let i = -1, n = source.numCols; ++i < n;) { + const v0 = source.getColumnAt(i)!.slice(offset, offset + batch.length); + const v1 = batch.getChildAt(i); + const v2 = table.getColumnAt(i); + const name = source.schema.fields[i].name; + expect([v1, `batch`, name]).toEqualVector([v0, `source`]); + expect([v2, `table`, name]).toEqualVector([v0, `source`]); + } +} + +describe(`Table`, () => { + test(`can create an empty table`, () => { + expect(Table.empty()).toHaveLength(0); + }); + test(`Table.from([]) creates an empty table`, () => { + expect(Table.from([])).toHaveLength(0); + }); + test(`Table.from() creates an empty table`, () => { + expect(Table.from()).toHaveLength(0); + }); + + describe(`new()`, () => { + test(`creates an empty Table with Columns`, () => { + let i32 = Column.new('i32', Data.new(new Int32(), 0, 0)); + let f32 = Column.new('f32', Data.new(new Float32(), 0, 0)); + const table = Table.new(i32, f32); + i32 = table.getColumn('i32')!; + f32 = table.getColumn('f32')!; + expect(table).toHaveLength(0); + expect(i32).toHaveLength(0); + expect(f32).toHaveLength(0); + expect(i32.toArray()).toBeInstanceOf(Int32Array); + expect(f32.toArray()).toBeInstanceOf(Float32Array); + }); + + test(`creates a new Table from a Column`, () => { + + const i32s = new Int32Array(arange(new Array<number>(10))); + + let i32 = Column.new('i32', Data.Int(new Int32(), 0, i32s.length, 0, null, i32s)); + expect(i32.name).toBe('i32'); + expect(i32).toHaveLength(i32s.length); + expect(i32.nullable).toBe(true); + expect(i32.nullCount).toBe(0); + + const table = Table.new(i32); + i32 = table.getColumnAt(0)!; + + expect(i32.name).toBe('i32'); + expect(i32).toHaveLength(i32s.length); + expect(i32.nullable).toBe(true); + expect(i32.nullCount).toBe(0); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + }); + + test(`creates a new Table from Columns`, () => { + + const i32s = new Int32Array(arange(new Array<number>(10))); + const f32s = new Float32Array(arange(new Array<number>(10))); + + let i32 = Column.new('i32', Data.Int(new Int32(), 0, i32s.length, 0, null, i32s)); + let f32 = Column.new('f32', Data.Float(new Float32(), 0, f32s.length, 0, null, f32s)); + expect(i32.name).toBe('i32'); + expect(f32.name).toBe('f32'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullable).toBe(true); + expect(f32.nullable).toBe(true); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + const table = Table.new(i32, f32); + i32 = table.getColumnAt(0)!; + f32 = table.getColumnAt(1)!; + + expect(i32.name).toBe('i32'); + expect(f32.name).toBe('f32'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullable).toBe(true); + expect(f32.nullable).toBe(true); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + expect(f32).toEqualVector(Float32Vector.from(f32s)); + }); + + test(`creates a new Table from Columns with different lengths`, () => { + + const i32s = new Int32Array(arange(new Array<number>(20))); + const f32s = new Float32Array(arange(new Array<number>(8))); + + let i32 = Column.new('i32', Int32Vector.from(i32s)); + let f32 = Column.new('f32', Float32Vector.from(f32s)); + + expect(i32.name).toBe('i32'); + expect(f32.name).toBe('f32'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullable).toBe(true); + expect(f32.nullable).toBe(true); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + const table = Table.new([i32, f32]); + i32 = table.getColumnAt(0)!; + f32 = table.getColumnAt(1)!; + + expect(i32.name).toBe('i32'); + expect(f32.name).toBe('f32'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(i32s.length); // new length should be the same as the longest sibling + expect(i32.nullable).toBe(true); + expect(f32.nullable).toBe(true); // true, with 12 additional nulls + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(i32s.length - f32s.length); + + const f32Expected = Data.Float( + f32.type, 0, i32s.length, + i32s.length - f32s.length, + new Uint8Array(8).fill(255, 0, 1), f32s); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + expect(f32).toEqualVector(new Float32Vector(f32Expected)); + }); + + test(`creates a new Table from Columns with different lengths and number of inner chunks`, () => { + + const i32s = new Int32Array(arange(new Array<number>(20))); + const f32s = new Float32Array(arange(new Array<number>(16))); + + let i32 = Column.new('i32', Int32Vector.from(i32s)); + let f32 = Column.new('f32', Float32Vector.from(f32s.slice(0, 8)), Float32Vector.from(f32s.slice(8, 16))); + + expect(i32.name).toBe('i32'); + expect(f32.name).toBe('f32'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(f32s.length); + expect(i32.nullable).toBe(true); + expect(f32.nullable).toBe(true); + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(0); + + const table = Table.new({ i32Renamed: i32, f32Renamed: f32 }); + i32 = table.getColumn('i32Renamed'); + f32 = table.getColumn('f32Renamed'); + + expect(i32.name).toBe('i32Renamed'); + expect(f32.name).toBe('f32Renamed'); + expect(i32).toHaveLength(i32s.length); + expect(f32).toHaveLength(i32s.length); // new length should be the same as the longest sibling + expect(i32.nullable).toBe(true); + expect(f32.nullable).toBe(true); // true, with 4 additional nulls + expect(i32.nullCount).toBe(0); + expect(f32.nullCount).toBe(i32s.length - f32s.length); + + const f32Expected = Data.Float( + f32.type, 0, i32s.length, + i32s.length - f32s.length, + new Uint8Array(8).fill(255, 0, 2), f32s); + + expect(i32).toEqualVector(Int32Vector.from(i32s)); + expect(f32).toEqualVector(new Float32Vector(f32Expected)); + }); + + test(`creates a new Table from Typed Arrays`, () => { + let i32s = Int32Array.from({length: 10}, (_, i) => i); + let f32s = Float32Array.from({length: 10}, (_, i) => i); + const table = Table.new({ i32s, f32s }); + const i32 = table.getColumn('i32s')!; + const f32 = table.getColumn('f32s')!; + + expect(table).toHaveLength(10); + expect(i32).toHaveLength(10); + expect(f32).toHaveLength(10); + expect(i32.toArray()).toBeInstanceOf(Int32Array); + expect(f32.toArray()).toBeInstanceOf(Float32Array); + expect(i32.toArray()).toEqual(i32s); + expect(f32.toArray()).toEqual(f32s); + }); + }); + + test(`Table.serialize() serializes sliced RecordBatches`, () => { + + const table = getSingleRecordBatchTable(); + const batch = table.chunks[0], half = batch.length / 2 | 0; + + // First compare what happens when slicing from the batch level + let [batch1, batch2] = [batch.slice(0, half), batch.slice(half)]; + + compareBatchAndTable(table, 0, batch1, Table.from(new Table(batch1).serialize())); + compareBatchAndTable(table, half, batch2, Table.from(new Table(batch2).serialize())); + + // Then compare what happens when creating a RecordBatch by slicing each child individually + batch1 = new RecordBatch(batch1.schema, batch1.length, batch1.schema.fields.map((_, i) => { + return batch.getChildAt(i)!.slice(0, half); + })); + + batch2 = new RecordBatch(batch2.schema, batch2.length, batch2.schema.fields.map((_, i) => { + return batch.getChildAt(i)!.slice(half); + })); + + compareBatchAndTable(table, 0, batch1, Table.from(new Table(batch1).serialize())); + compareBatchAndTable(table, half, batch2, Table.from(new Table(batch2).serialize())); + }); + + for (let datum of test_data) { + describe(datum.name, () => { + test(`has the correct length`, () => { + const table = datum.table(); + const values = datum.values(); + expect(table).toHaveLength(values.length); + }); + test(`gets expected values`, () => { + const table = datum.table(); + const values = datum.values(); + for (let i = -1; ++i < values.length;) { + const row = table.get(i); + const expected = values[i]; + expect(row.f32).toEqual(expected[F32]); + expect(row.i32).toEqual(expected[I32]); + expect(row.dictionary).toEqual(expected[DICT]); + } + }); + test(`iterates expected values`, () => { + let i = 0; + const table = datum.table(); + const values = datum.values(); + for (let row of table) { + const expected = values[i++]; + expect(row.f32).toEqual(expected[F32]); + expect(row.i32).toEqual(expected[I32]); + expect(row.dictionary).toEqual(expected[DICT]); + } + }); + test(`serialize and de-serialize is a no-op`, () => { + const table = datum.table(); + const clone = Table.from(table.serialize()); + expect(clone).toEqualTable(table); + }); + + test(`count() returns the correct length`, () => { + const table = datum.table(); + const values = datum.values(); + expect(table.count()).toEqual(values.length); + }); + test(`getColumnIndex`, () => { + const table = datum.table(); + expect(table.getColumnIndex('i32')).toEqual(I32); + expect(table.getColumnIndex('f32')).toEqual(F32); + expect(table.getColumnIndex('dictionary')).toEqual(DICT); + }); + + const table = datum.table(); + const values = datum.values(); + + test(`table.select() basic tests`, () => { + let selected = table.select('f32', 'dictionary'); + expect(selected.schema.fields).toHaveLength(2); + expect(selected.schema.fields[0]).toEqual(table.schema.fields[0]); + expect(selected.schema.fields[1]).toEqual(table.schema.fields[2]); + + expect(selected).toHaveLength(values.length); + let idx = 0, expected_row; + for (let row of selected) { + expected_row = values[idx++]; + expect(row.f32).toEqual(expected_row[F32]); + expect(row.dictionary).toEqual(expected_row[DICT]); + } + }); + }); + } +}); + +type TestDataSchema = { f32: Float32; i32: Int32; dictionary: Dictionary<Utf8, Int8> }; + +function getTestVectors(f32Values: number[], i32Values: number[], dictIndices: number[]) { + + const values = Utf8Vector.from(['a', 'b', 'c']); + const i32Data = Data.Int(new Int32(), 0, i32Values.length, 0, null, i32Values); + const f32Data = Data.Float(new Float32(), 0, f32Values.length, 0, null, f32Values); + + return [Vector.new(f32Data), Vector.new(i32Data), DictionaryVector.from(values, new Int8(), dictIndices)]; +} + +function getSingleRecordBatchTable() { + const vectors = getTestVectors( + [-0.3, -0.2, -0.1, 0, 0.1, 0.2, 0.3], + [-1, 1, -1, 1, -1, 1, -1], + [0, 1, 2, 0, 1, 2, 0] + ); + + return Table.new<TestDataSchema>(vectors, NAMES); +} + +function getMultipleRecordBatchesTable() { + + const types = getTestVectors([], [], []).map((vec) => vec.type); + const fields = NAMES.map((name, i) => Field.new(name, types[i])); + const schema = new Schema<TestDataSchema>(fields); + + const b1 = new RecordBatch(schema, 3, getTestVectors( + [-0.3, -0.2, -0.1], + [-1, 1, -1], + [0, 1, 2] + )); + + const b2 = new RecordBatch(schema, 3, getTestVectors( + [0, 0.1, 0.2], + [1, -1, 1], + [0, 1, 2] + )); + + const b3 = new RecordBatch(schema, 3, getTestVectors( + [0.3, 0.2, 0.1], + [-1, 1, -1], + [0, 1, 2] + )); + + return new Table<TestDataSchema>([b1, b2, b3]); +} + +function getStructTable() { + const table = getSingleRecordBatchTable(); + const struct = new Struct<TestDataSchema>(table.schema.fields); + const children = table.schema.fields.map((_, i) => table.getColumnAt(i)!); + const structVec = Vector.new(Data.Struct(struct, 0, table.length, 0, null, children)); + return Table.new<{ struct: Struct<TestDataSchema> }>([structVec], ['struct']); +} diff --git a/src/arrow/js/test/unit/table/assign-tests.ts b/src/arrow/js/test/unit/table/assign-tests.ts new file mode 100644 index 000000000..fa1dacbc6 --- /dev/null +++ b/src/arrow/js/test/unit/table/assign-tests.ts @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* eslint-disable jest/no-standalone-expect */ + +import '../../jest-extensions'; +import { zip } from 'ix/iterable'; +import * as generate from '../../generate-test-data'; +import { validateTable } from '../generated-data-validators'; +import { + Schema, Field, DataType, Int32, Float32, Utf8 +} from 'apache-arrow'; + +const toSchema = (...xs: [string, DataType][]) => new Schema(xs.map((x) => new Field(...x))); +const schema1 = toSchema(['a', new Int32()], ['b', new Float32()], ['c', new Utf8()]); +const partialOverlapWith1 = toSchema(['a', new Int32()], ['b', new Float32()], ['f', new Utf8()]); +const schema2 = toSchema(['d', new Int32()], ['e', new Float32()], ['f', new Utf8()]); + +describe('Table.assign()', () => { + describe(`should assign non-overlapping fields`, () => { + const lhs = generate.table([20], schema1); + const rhs = generate.table([20], schema2); + const table = lhs.table.assign(rhs.table); + const f = assignGeneratedTables(lhs, rhs); + expect(table.schema.fields.map((f) => f.name)).toEqual(['a', 'b', 'c', 'd', 'e', 'f']); + validateTable({ ...f([0,1,2], [3,4,5]), table }).run(); + }); + describe(`should assign partially-overlapping fields`, () => { + const lhs = generate.table([20], schema1); + const rhs = generate.table([20], partialOverlapWith1); + const table = lhs.table.assign(rhs.table); + const f = assignGeneratedTables(lhs, rhs); + expect(table.schema.fields.map((f) => f.name)).toEqual(['a', 'b', 'c', 'f']); + // eslint-disable-next-line no-sparse-arrays + validateTable({ ...f([ , , 2], [0,1,3]), table }).run(); + }); + describe(`should assign completely-overlapping fields`, () => { + const lhs = generate.table([20], schema2); + const rhs = generate.table([20], schema2); + const table = lhs.table.assign(rhs.table); + const f = assignGeneratedTables(lhs, rhs); + expect(table.schema.fields.map((f) => f.name)).toEqual(['d', 'e', 'f']); + // eslint-disable-next-line no-sparse-arrays + validateTable({ ...f([ , , ], [0,1,2]), table }).run(); + }); +}); + +function assignGeneratedTables(lhs: generate.GeneratedTable, rhs: generate.GeneratedTable) { + return function createAssignedTestData(lhsIndices: any[], rhsIndices: any[]) { + const pluckLhs = (xs: any[], ys: any[] = []) => lhsIndices.reduce((ys, i, j) => { + if (i !== undefined) { ys[i] = xs ? xs[j] : null; } + return ys; + }, ys); + const pluckRhs = (xs: any[], ys: any[] = []) => rhsIndices.reduce((ys, i, j) => { + if (i !== undefined) { ys[i] = xs ? xs[j] : null; } + return ys; + }, ys); + const cols = () => [...pluckLhs(lhs.cols(), pluckRhs(rhs.cols()))]; + const keys = () => [...pluckLhs(lhs.keys(), pluckRhs(rhs.keys()))]; + const rows = () => [...zip(lhs.rows(), rhs.rows())].map(([x, y]) => [...pluckLhs(x, pluckRhs(y))]); + const colBatches = [...zip(lhs.colBatches, rhs.colBatches)].map(([x, y]) => () => [...pluckLhs(x(), pluckRhs(y()))]); + const keyBatches = [...zip(lhs.keyBatches, rhs.keyBatches)].map(([x, y]) => () => [...pluckLhs(x(), pluckRhs(y()))]); + const rowBatches = [...zip(lhs.rowBatches, rhs.rowBatches)].map(([x, y]) => () => [...zip(x(), y())].map(([x, y]) => [...pluckLhs(x, pluckRhs(y))])); + return { cols, keys, rows, colBatches, keyBatches, rowBatches }; + }; +} diff --git a/src/arrow/js/test/unit/table/serialize-tests.ts b/src/arrow/js/test/unit/table/serialize-tests.ts new file mode 100644 index 000000000..5eb211763 --- /dev/null +++ b/src/arrow/js/test/unit/table/serialize-tests.ts @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import '../../jest-extensions'; +import * as generate from '../../generate-test-data'; +import { + Table, Schema, Field, DataType, Dictionary, Int32, Float32, Utf8, Null, Int32Vector +} from 'apache-arrow'; + +const toSchema = (...xs: [string, DataType][]) => new Schema(xs.map((x) => new Field(...x))); +const schema1 = toSchema(['a', new Int32()], ['b', new Float32()], ['c', new Dictionary(new Utf8(), new Int32())]); +const schema2 = toSchema(['d', new Int32()], ['e', new Float32()], ['f', new Utf8()]); +const nullSchema = new Schema([new Field('null', new Null())]); + +schema1.metadata.set('foo', 'bar'); + +function createTable<T extends { [key: string]: DataType } = any>(schema: Schema<T>, chunkLengths: number[]) { + return generate.table(chunkLengths, schema).table; +} + +describe('Table#serialize()', () => { + + test(`doesn't swap the order of buffers that share the same underlying ArrayBuffer but are in a different order`, () => { + const values = new Int32Array([0, 1, 2, 3, 4, 5, 6, 7]); + const expected = values.slice(); + const x = Int32Vector.from(values.subarray(4, 8)); // back + const y = Int32Vector.from(values.subarray(0, 4)); // front + const source = Table.new([x, y], ['x', 'y']); + const table = Table.from(source.serialize()); + expect(table.getColumn('x').toArray()).toEqual(expected.subarray(4, 8)); + expect(table.getColumn('y').toArray()).toEqual(expected.subarray(0, 4)); + }); + + test(`Table#empty round-trips through serialization`, () => { + const source = Table.empty(); + source.schema.metadata.set('foo', 'bar'); + expect(source).toHaveLength(0); + expect(source.numCols).toBe(0); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + + test(`Schema metadata round-trips through serialization`, () => { + const source = createTable(schema1, [20]); + expect(source).toHaveLength(20); + expect(source.numCols).toBe(3); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + + test(`Table#assign an empty Table to a Table with a zero-length Null column round-trips through serialization`, () => { + const table1 = new Table(nullSchema); + const table2 = Table.empty(); + const source = table1.assign(table2); + expect(source).toHaveLength(0); + expect(source.numCols).toBe(1); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + }); + + const chunkLengths = [] as number[]; + for (let i = -1; ++i < 3;) { + chunkLengths[i * 2] = (Math.random() * 100) | 0; + chunkLengths[i * 2 + 1] = 0; + const table = <T extends { [key: string]: DataType } = any>(schema: Schema<T>) => createTable(schema, chunkLengths); + test(`Table#select round-trips through serialization`, () => { + const source = table(schema1).select('a', 'c'); + expect(source.numCols).toBe(2); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + }); + test(`Table#selectAt round-trips through serialization`, () => { + const source = table(schema1).selectAt(0, 2); + expect(source.numCols).toBe(2); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + }); + test(`Table#assign round-trips through serialization`, () => { + const source = table(schema1).assign(table(schema2)); + expect(source.numCols).toBe(6); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + test(`Table#assign with an empty table round-trips through serialization`, () => { + const table1 = table(schema1); + const source = table1.assign(Table.empty()); + expect(source.numCols).toBe(table1.numCols); + expect(source).toHaveLength(table1.length); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + test(`Table#assign with a zero-length Null column round-trips through serialization`, () => { + const table1 = new Table(nullSchema); + const table2 = table(schema1); + const source = table1.assign(table2); + expect(source).toHaveLength(table2.length); + expect(source.numCols).toBe(4); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + test(`Table#assign with different lengths and number of chunks round-trips through serialization`, () => { + const table1 = table(schema1); + const table2 = createTable(schema2, [102, 4, 10, 97, 10, 2, 4]); + const source = table1.assign(table2); + expect(source.numCols).toBe(6); + expect(source).toHaveLength(Math.max(table1.length, table2.length)); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + test(`Table#select with Table#assign the result of Table#selectAt round-trips through serialization`, () => { + const table1 = table(schema1); + const table2 = table(schema2); + const source = table1.select('a', 'c').assign(table2.selectAt(2)); + expect(source.numCols).toBe(3); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + test(`Table#slice round-trips through serialization`, () => { + const table1 = table(schema1); + const length = table1.length; + const [begin, end] = [length * .25, length * .75].map((x) => x | 0); + const source = table1.slice(begin, end); + expect(source.numCols).toBe(3); + expect(source).toHaveLength(end - begin); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + test(`Table#concat of two slices round-trips through serialization`, () => { + const table1 = table(schema1); + const length = table1.length; + const [begin1, end1] = [length * .10, length * .20].map((x) => x | 0); + const [begin2, end2] = [length * .80, length * .90].map((x) => x | 0); + const slice1 = table1.slice(begin1, end1); + const slice2 = table1.slice(begin2, end2); + const source = slice1.concat(slice2); + expect(slice1).toHaveLength(end1 - begin1); + expect(slice2).toHaveLength(end2 - begin2); + expect(source).toHaveLength((end1 - begin1) + (end2 - begin2)); + [slice1, slice2, source].forEach((x) => expect(x.numCols).toBe(3)); + const result = Table.from(source.serialize()); + expect(result).toEqualTable(source); + expect(result.schema.metadata.get('foo')).toEqual('bar'); + }); + } +}); diff --git a/src/arrow/js/test/unit/utils-tests.ts b/src/arrow/js/test/unit/utils-tests.ts new file mode 100644 index 000000000..985bec7aa --- /dev/null +++ b/src/arrow/js/test/unit/utils-tests.ts @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { isTypedArray } from 'apache-arrow'; + +describe('isTypedArray', () => { + test('works for typed arrays', () => { + expect(isTypedArray(new Int8Array())).toBeTruthy(); + expect(isTypedArray(new Int32Array())).toBeTruthy(); + expect(isTypedArray(new BigInt64Array())).toBeTruthy(); + }); + + test('does not recognize arrays, buffers, or data views', () => { + expect(isTypedArray(new Array([1, 2, 3]))).toBeFalsy(); + expect(isTypedArray(new ArrayBuffer(10))).toBeFalsy(); + expect(isTypedArray(new DataView(new ArrayBuffer(10)))).toBeFalsy(); + }); +}); diff --git a/src/arrow/js/test/unit/utils.ts b/src/arrow/js/test/unit/utils.ts new file mode 100644 index 000000000..c57de487f --- /dev/null +++ b/src/arrow/js/test/unit/utils.ts @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +export function arange<T extends { length: number; [n: number]: number }>(arr: T, n = arr.length) { + for (let i = -1; ++i < n; arr[i] = i) { } + return arr; +} diff --git a/src/arrow/js/test/unit/vector/bool-vector-tests.ts b/src/arrow/js/test/unit/vector/bool-vector-tests.ts new file mode 100644 index 000000000..41c53da60 --- /dev/null +++ b/src/arrow/js/test/unit/vector/bool-vector-tests.ts @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { Data, Bool, Vector, BoolVector } from 'apache-arrow'; + +const newBoolVector = (length: number, data: Uint8Array) => Vector.new(Data.Bool(new Bool(), 0, length, 0, null, data)); + +describe(`BoolVector`, () => { + const values = [true, true, false, true, true, false, false, false]; + const n = values.length; + const vector = newBoolVector(n, new Uint8Array([27, 0, 0, 0, 0, 0, 0, 0])); + test(`gets expected values`, () => { + let i = -1; + while (++i < n) { + expect(vector.get(i)).toEqual(values[i]); + } + }); + test(`iterates expected values`, () => { + let i = -1; + for (let v of vector) { + expect(++i).toBeLessThan(n); + expect(v).toEqual(values[i]); + } + }); + test(`indexOf returns expected values`, () => { + for (let test_value of [true, false]) { + const expected = values.indexOf(test_value); + expect(vector.indexOf(test_value)).toEqual(expected); + } + }); + test(`indexOf returns -1 when value not found`, () => { + const v = newBoolVector(3, new Uint8Array([0xFF])); + expect(v.indexOf(false)).toEqual(-1); + }); + test(`can set values to true and false`, () => { + const v = newBoolVector(n, new Uint8Array([27, 0, 0, 0, 0, 0, 0, 0])); + const expected1 = [true, true, false, true, true, false, false, false]; + const expected2 = [true, true, true, true, true, false, false, false]; + const expected3 = [true, true, false, false, false, false, true, true]; + function validate(expected: boolean[]) { + for (let i = -1; ++i < n;) { + expect(v.get(i)).toEqual(expected[i]); + } + } + validate(expected1); + v.set(2, true); + validate(expected2); + v.set(2, false); + validate(expected1); + v.set(3, false); + v.set(4, false); + v.set(6, true); + v.set(7, true); + validate(expected3); + v.set(3, true); + v.set(4, true); + v.set(6, false); + v.set(7, false); + validate(expected1); + }); + test(`packs 0 values`, () => { + const expected = new Uint8Array(64); + expect(BoolVector.from([]).values).toEqual(expected); + }); + test(`packs 3 values`, () => { + const expected = new Uint8Array(64); + expected[0] = 5; + expect(BoolVector.from([ + true, false, true + ]).values).toEqual(expected); + }); + test(`packs 8 values`, () => { + const expected = new Uint8Array(64); + expected[0] = 27; + expect(BoolVector.from([ + true, true, false, true, true, false, false, false + ]).values).toEqual(expected); + }); + test(`packs 25 values`, () => { + const expected = new Uint8Array(64); + expected[0] = 27; + expected[1] = 216; + expect(BoolVector.from([ + true, true, false, true, true, false, false, false, + false, false, false, true, true, false, true, true, + false + ]).values).toEqual(expected); + }); + test(`from with boolean Array packs values`, () => { + const expected = new Uint8Array(64); + expected[0] = 5; + expect(BoolVector + .from([true, false, true]) + .slice().values + ).toEqual(expected); + }); +}); diff --git a/src/arrow/js/test/unit/vector/date-vector-tests.ts b/src/arrow/js/test/unit/vector/date-vector-tests.ts new file mode 100644 index 000000000..4658633ba --- /dev/null +++ b/src/arrow/js/test/unit/vector/date-vector-tests.ts @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { Table, DateDay, DateMillisecond } from 'apache-arrow'; + +describe(`DateVector`, () => { + it('returns days since the epoch as correct JS Dates', () => { + const table = Table.from(test_data); + const expectedMillis = expectedMillis32(); + const date32 = table.getColumnAt<DateDay>(0)!; + for (const date of date32) { + const millis = expectedMillis.shift(); + expect(date).toEqual(millis === null ? null : new Date(millis!)); + } + }); + it('returns millisecond longs since the epoch as correct JS Dates', () => { + const table = Table.from(test_data); + const expectedMillis = expectedMillis64(); + const date64 = table.getColumnAt<DateMillisecond>(1)!; + for (const date of date64) { + const millis = expectedMillis.shift(); + expect(date).toEqual(millis === null ? null : new Date(millis!)); + } + }); +}); + +const expectedMillis32 = () => [ + 165247430400000, 34582809600000, 232604524800000, null, + 199808812800000, 165646771200000, 209557238400000, null +]; + +const expectedMillis64 = () => [ + 27990830234011, -41278585914325, 12694624797111, + null, null, 10761360520213, null, 1394015437000 +]; + +const test_data = { + 'schema': { + 'fields': [ + { + 'name': 'f0', + 'type': { + 'name': 'date', + 'unit': 'DAY' + }, + 'nullable': true, + 'children': [] + }, + { + 'name': 'f1', + 'type': { + 'name': 'date', + 'unit': 'MILLISECOND' + }, + 'nullable': true, + 'children': [] + } + ] + }, + 'batches': [ + { + 'count': 8, + 'columns': [ + { + 'name': 'f0', + 'count': 8, + 'VALIDITY': [1, 1, 1, 0, 1, 1, 1, 0], + 'DATA': [1912586, 400264, 2692182, 2163746, 2312602, 1917208, 2425431] + }, + { + 'name': 'f1', + 'count': 8, + 'VALIDITY': [1, 1, 1, 0, 0, 1, 0, 1], + 'DATA': [ + 27990830234011, + -41278585914325, + 12694624797111, + -38604948562547, + -37802308043516, + 10761360520213, + -25129181633384, + 1394015437000 // <-- the tricky one + ] + } + ] + } + ] +}; diff --git a/src/arrow/js/test/unit/vector/numeric-vector-tests.ts b/src/arrow/js/test/unit/vector/numeric-vector-tests.ts new file mode 100644 index 000000000..61418c431 --- /dev/null +++ b/src/arrow/js/test/unit/vector/numeric-vector-tests.ts @@ -0,0 +1,616 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* eslint-disable jest/no-identical-title */ + +import { + util, + Data, Vector, + Float, Float16, Float32, Float64, + Int, Int8, Int16, Int32, Int64, Uint8, Uint16, Uint32, Uint64, + FloatVector, Float16Vector, Float32Vector, Float64Vector, + IntVector, Int8Vector, Int16Vector, Int32Vector, Int64Vector, + Uint8Vector, Uint16Vector, Uint32Vector, Uint64Vector, +} from 'apache-arrow'; + +const { float64ToUint16, uint16ToFloat64 } = util; +import { VectorType as V } from 'apache-arrow/interfaces'; +import { TypedArray, TypedArrayConstructor } from 'apache-arrow/interfaces'; +import { BigIntArray, BigIntArrayConstructor } from 'apache-arrow/interfaces'; + +const { joinUint8Arrays, BN } = util; +const uint16ToFloat64Array = (b: ArrayBuffer) => new Float64Array([...new Uint16Array(b)].map(uint16ToFloat64)); +const randomBytes = (n: number) => new Uint16Array([ + ...Uint16Array.from([0, 65535]), + ...Uint16Array.from({ length: (n / 2) - 2 }, () => (Math.random() * 65536) | 0), +]).buffer; +const toBigNumsArray = (values: Int32Array | Uint32Array) => { + const array = new Array(values.length * 0.5); + for (let i = -1, n = values.length * 0.5; ++i < n;) { + array[i] = BN.new(values.subarray(i * 2, i * 2 + 2))[Symbol.toPrimitive](); + } + return array; +}; + +const testValueBuffers = Array.from({ length: 5 }, () => randomBytes(64)); +const testValuesBuffer = joinUint8Arrays(testValueBuffers.map((b) => new Uint8Array(b)))[0].buffer; + +const checkType = <T, R extends T>(Ctor: new (...args: any) => T, inst: R) => expect(inst).toBeInstanceOf(Ctor); +const valuesArray = <T extends TypedArray>(ArrayType: TypedArrayConstructor<T>) => [...valuesTyped<T>(ArrayType)]; +const valuesArray64 = <T extends TypedArray>(ArrayType: TypedArrayConstructor<T>) => { + const typed = valuesTyped<T>(ArrayType); + const array = new Array(typed.length * 0.5); + for (let i = -1, n = array.length; ++i < n;) { + // Interleave regular Arrays and TypedArrays to cover more surface area + array[i] = i % 2 === 0 + ? [...typed.subarray(i * 2, (i + 1) * 2)] + : typed.subarray(i * 2, (i + 1) * 2); + } + return array; +}; +const valuesTyped = <T extends TypedArray>(ArrayType: TypedArrayConstructor<T>) => new ArrayType(testValuesBuffer); +const bigIntValuesTyped = <T extends BigIntArray>(ArrayType: BigIntArrayConstructor<T>) => new ArrayType(testValuesBuffer); +const bigIntValuesArray = <T extends BigIntArray>(ArrayType: BigIntArrayConstructor<T>) => [...bigIntValuesTyped<T>(ArrayType)]; + +describe(`FloatVector`, () => { + + describe(`FloatVector.from infers the type from the input TypedArray`, () => { + + const u16s = valuesTyped(Uint16Array).map((x) => float64ToUint16(uint16ToFloat64(x))); + const f16s = valuesArray(Uint16Array).map(uint16ToFloat64); + const f32s = valuesTyped(Float32Array); + const f64s = valuesTyped(Float64Array); + const f16Vec = FloatVector.from(u16s); + const f32Vec = FloatVector.from(valuesTyped(Float32Array)); + const f64Vec = FloatVector.from(valuesTyped(Float64Array)); + + // test strong typing at compile-time + test(`return type is correct`, () => checkType(Float16Vector, f16Vec)); + test(`return type is correct`, () => checkType(Float32Vector, f32Vec)); + test(`return type is correct`, () => checkType(Float64Vector, f64Vec)); + test(`throws on bad input`, () => { + expect(() => FloatVector.from(<any> {})).toThrow('Unrecognized FloatVector input'); + }); + + testAndValidateVector(f16Vec, u16s, f16s); + testAndValidateVector(f32Vec, f32s); + testAndValidateVector(f64Vec, f64s); + }); + + describe(`FloatVector.from casts the input values to the correct float type`, () => { + + const u16s = valuesTyped(Uint16Array).map((x) => float64ToUint16(uint16ToFloat64(x))); + const f16s = valuesArray(Uint16Array).map(uint16ToFloat64); + const f16Vec_ = FloatVector.from(u16s); + + const f16Vec = Float16Vector.from(f16Vec_); + const f32Vec = Float32Vector.from(f16Vec_); + const f64Vec = Float64Vector.from(f16Vec_); + + // test strong typing at compile-time + test(`return type is correct`, () => checkType(Float16Vector, f16Vec)); + test(`return type is correct`, () => checkType(Float32Vector, f32Vec)); + test(`return type is correct`, () => checkType(Float64Vector, f64Vec)); + + testAndValidateVector(f16Vec, u16s, f16s); + testAndValidateVector(f32Vec, Float32Array.from(f16s)); + testAndValidateVector(f64Vec, Float64Array.from(f16s)); + }); + + describe(`Float16Vector`, () => { + testFloatVector(Float16, valuesArray(Uint16Array).map(uint16ToFloat64)); + describe(`Float16Vector.from accepts regular Arrays`, () => { + const u16s = valuesTyped(Uint16Array).map((x) => float64ToUint16(uint16ToFloat64(x))); + const f16s = valuesArray(Uint16Array).map(uint16ToFloat64); + const vector = Float16Vector.from(f16s); + test(`return type is correct`, () => checkType(Float16Vector, vector)); + testAndValidateVector(vector, u16s, f16s); + }); + describe(`Float16Vector.from accepts Uint16Arrays`, () => { + const u16s = valuesTyped(Uint16Array).map((x) => float64ToUint16(uint16ToFloat64(x))); + const f16s = valuesArray(Uint16Array).map(uint16ToFloat64); + const vector = Float16Vector.from(u16s); + test(`return type is correct`, () => checkType(Float16Vector, vector)); + testAndValidateVector(vector, u16s, f16s); + }); + }); + describe(`Float32Vector`, () => { + testFloatVector(Float32); + describe(`Float32Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Float32Array); + const vector = Float32Vector.from(values); + testAndValidateVector(vector, valuesTyped(Float32Array), values); + test(`return type is correct`, () => checkType(Float32Vector, vector)); + }); + }); + describe(`Float64Vector`, () => { + testFloatVector(Float64); + describe(`Float64Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Float64Array); + const vector = Float64Vector.from(values); + testAndValidateVector(vector, valuesTyped(Float64Array), values); + test(`return type is correct`, () => checkType(Float64Vector, vector)); + }); + }); +}); + +describe(`IntVector`, () => { + + describe(`IntVector.from infers the type from the input TypedArray`, () => { + + const i8s = valuesTyped(Int8Array); + const i16s = valuesTyped(Int16Array); + const i32s = valuesTyped(Int32Array); + const i64s = valuesTyped(Int32Array); + const u8s = valuesTyped(Uint8Array); + const u16s = valuesTyped(Uint16Array); + const u32s = valuesTyped(Uint32Array); + const u64s = valuesTyped(Uint32Array); + const i8Vec = IntVector.from(i8s); + const i16Vec = IntVector.from(i16s); + const i32Vec = IntVector.from(i32s); + const i64Vec = IntVector.from(i64s, true); + const u8Vec = IntVector.from(u8s); + const u16Vec = IntVector.from(u16s); + const u32Vec = IntVector.from(u32s); + const u64Vec = IntVector.from(u64s, true); + + // test strong typing at compile-time + test(`return type is correct`, () => checkType(Int8Vector, i8Vec)); + test(`return type is correct`, () => checkType(Int16Vector, i16Vec)); + test(`return type is correct`, () => checkType(Int32Vector, i32Vec)); + test(`return type is correct`, () => checkType(Int64Vector, i64Vec)); + test(`return type is correct`, () => checkType(Uint8Vector, u8Vec)); + test(`return type is correct`, () => checkType(Uint16Vector, u16Vec)); + test(`return type is correct`, () => checkType(Uint32Vector, u32Vec)); + test(`return type is correct`, () => checkType(Uint64Vector, u64Vec)); + test(`throws on bad input`, () => { + expect(() => IntVector.from(<any> {})).toThrow('Unrecognized IntVector input'); + }); + + const bigI64s = BigInt64Array.from(toBigNumsArray(i64s)); + const bigU64s = BigUint64Array.from(toBigNumsArray(u64s)); + + testAndValidateVector(i8Vec, i8s); + testAndValidateVector(i16Vec, i16s); + testAndValidateVector(i32Vec, i32s); + // This tests when values are represented as pairs of lo, hi + testAndValidateVector(i64Vec, i64s); + // This tests when values are represented as native JS bigints + testAndValidateVector(i64Vec, i64s, [...bigI64s]); + testAndValidateVector(u8Vec, u8s); + testAndValidateVector(u16Vec, u16s); + testAndValidateVector(u32Vec, u32s); + // This tests when values are represented as pairs of lo, hi + testAndValidateVector(u64Vec, u64s); + // This tests when values are represented as native JS bigints + testAndValidateVector(u64Vec, u64s, [...bigU64s]); + }); + + describe('IntVector.from casts the input values to the correct integer type', () => { + + const i8s = valuesTyped(Int8Array); + const i16s = valuesTyped(Int16Array); + const i32s = valuesTyped(Int32Array); + const i64s = valuesTyped(Int32Array); + const u8s = valuesTyped(Uint8Array); + const u16s = valuesTyped(Uint16Array); + const u32s = valuesTyped(Uint32Array); + const u64s = valuesTyped(Uint32Array); + const i8Vec_ = IntVector.from(i8s); + const i16Vec_ = IntVector.from(i16s); + const i32Vec_ = IntVector.from(i32s); + const i64Vec_ = IntVector.from(i64s, true); + const u8Vec_ = IntVector.from(u8s); + const u16Vec_ = IntVector.from(u16s); + const u32Vec_ = IntVector.from(u32s); + const u64Vec_ = IntVector.from(u64s, true); + + // Convert from a Vector of the opposite sign + const i8Vec = Int8Vector.from(u8Vec_); + const i16Vec = Int16Vector.from(u16Vec_); + const i32Vec = Int32Vector.from(u32Vec_); + const i64Vec = Int64Vector.from(u64Vec_); + const u8Vec = Uint8Vector.from(i8Vec_); + const u16Vec = Uint16Vector.from(i16Vec_); + const u32Vec = Uint32Vector.from(i32Vec_); + const u64Vec = Uint64Vector.from(i64Vec_); + + // test strong typing at compile-time + test(`return type is correct`, () => checkType(Int8Vector, i8Vec)); + test(`return type is correct`, () => checkType(Int16Vector, i16Vec)); + test(`return type is correct`, () => checkType(Int32Vector, i32Vec)); + test(`return type is correct`, () => checkType(Int64Vector, i64Vec)); + test(`return type is correct`, () => checkType(Uint8Vector, u8Vec)); + test(`return type is correct`, () => checkType(Uint16Vector, u16Vec)); + test(`return type is correct`, () => checkType(Uint32Vector, u32Vec)); + test(`return type is correct`, () => checkType(Uint64Vector, u64Vec)); + + const bigI64s = BigInt64Array.from(toBigNumsArray(u64s)); + const bigU64s = BigUint64Array.from(toBigNumsArray(i64s)); + + testAndValidateVector(i8Vec, Int8Array.from(u8s)); + testAndValidateVector(i16Vec, Int16Array.from(u16s)); + testAndValidateVector(i32Vec, Int32Array.from(u32s)); + // This tests when values are represented as pairs of lo, hi + testAndValidateVector(i64Vec, new Int32Array(bigI64s.buffer)); + // This tests when values are represented as native JS bigints + testAndValidateVector(i64Vec, new Int32Array(bigI64s.buffer), [...bigI64s]); + testAndValidateVector(u8Vec, Uint8Array.from(i8s)); + testAndValidateVector(u16Vec, Uint16Array.from(i16s)); + testAndValidateVector(u32Vec, Uint32Array.from(i32s)); + // This tests when values are represented as pairs of lo, hi + testAndValidateVector(u64Vec, new Uint32Array(bigU64s.buffer)); + // This tests when values are represented as native JS bigints + testAndValidateVector(u64Vec, new Uint32Array(bigU64s.buffer), [...bigU64s]); + }); + + describe(`Int8Vector`, () => { + testIntVector(Int8); + describe(`Int8Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Int8Array); + const vector = Int8Vector.from(values); + testAndValidateVector(vector, valuesTyped(Int8Array), values); + test(`return type is correct`, () => checkType(Int8Vector, vector)); + }); + }); + describe(`Int16Vector`, () => { + testIntVector(Int16); + describe(`Int16Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Int16Array); + const vector = Int16Vector.from(values); + testAndValidateVector(vector, valuesTyped(Int16Array), values); + test(`return type is correct`, () => checkType(Int16Vector, vector)); + }); + }); + describe(`Int32Vector`, () => { + testIntVector(Int32); + describe(`Int32Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Int32Array); + const vector = Int32Vector.from(values); + testAndValidateVector(vector, valuesTyped(Int32Array), values); + test(`return type is correct`, () => checkType(Int32Vector, vector)); + }); + }); + describe(`Int64Vector`, () => { + testIntVector(Int64); + testIntVector(Int64, bigIntValuesArray(BigInt64Array)); + describe(`Int64Vector.from accepts regular Arrays`, () => { + const values = valuesArray64(Int32Array); + const vector = Int64Vector.from(values); + testAndValidateVector(vector, valuesTyped(Int32Array), values); + testAndValidateVector(vector, valuesTyped(Int32Array), bigIntValuesArray(BigInt64Array)); + test(`return type is correct`, () => checkType(Int64Vector, vector)); + }); + }); + describe(`Uint8Vector`, () => { + testIntVector(Uint8); + describe(`Uint8Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Uint8Array); + const vector = Uint8Vector.from(values); + testAndValidateVector(vector, valuesTyped(Uint8Array), values); + test(`return type is correct`, () => checkType(Uint8Vector, vector)); + }); + }); + describe(`Uint16Vector`, () => { + testIntVector(Uint16); + describe(`Uint16Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Uint16Array); + const vector = Uint16Vector.from(values); + testAndValidateVector(vector, valuesTyped(Uint16Array), values); + test(`return type is correct`, () => checkType(Uint16Vector, vector)); + }); + }); + describe(`Uint32Vector`, () => { + testIntVector(Uint32); + describe(`Uint32Vector.from accepts regular Arrays`, () => { + const values = valuesArray(Uint32Array); + const vector = Uint32Vector.from(values); + testAndValidateVector(vector, valuesTyped(Uint32Array), values); + test(`return type is correct`, () => checkType(Uint32Vector, vector)); + }); + }); + describe(`Uint64Vector`, () => { + testIntVector(Uint64); + testIntVector(Uint64, bigIntValuesArray(BigUint64Array)); + describe(`Uint64Vector.from accepts regular Arrays`, () => { + const values = valuesArray64(Uint32Array); + const vector = Uint64Vector.from(values); + testAndValidateVector(vector, valuesTyped(Uint32Array), values); + testAndValidateVector(vector, valuesTyped(Uint32Array), bigIntValuesArray(BigUint64Array)); + test(`return type is correct`, () => checkType(Uint64Vector, vector)); + }); + }); +}); + +function testIntVector<T extends Int>(DataType: new () => T, values?: Array<any>) { + + const type = new DataType(); + const ArrayType = type.ArrayType; + const stride = type.bitWidth < 64 ? 1 : 2; + + const typed = valuesTyped(ArrayType); + const jsArray = values || [...typed]; + const vector = Vector.new(Data.Int(type, 0, typed.length / stride, 0, null, typed)); + const chunked = testValueBuffers.map((b) => new ArrayType(b)) + .map((b) => Vector.new(Data.Int(type, 0, b.length / stride, 0, null, b))) + .reduce((v: any, v2) => v.concat(v2)); + + const vectorBegin = (vector.length * .25) | 0; + const vectorEnd = (vector.length * .75) | 0; + const typedBegin = vectorBegin * (typed.length / vector.length); + const typedEnd = vectorEnd * (typed.length / vector.length); + const jsArrayBegin = vectorBegin * (jsArray.length / vector.length); + const jsArrayEnd = vectorEnd * (jsArray.length / vector.length); + + const combos = [[`vector`, vector], [`chunked`, chunked]] as [string, V<T>][]; + combos.forEach(([chunksType, vector]) => { + describe(chunksType, () => { + // test base case no slicing + describe(`base case no slicing`, () => { testAndValidateVector(vector, typed, jsArray); }); + // test slicing without args + describe(`slicing without args`, () => { testAndValidateVector(vector.slice(), typed.slice(), jsArray.slice()); }); + // test slicing the middle half + describe(`slice the middle half`, () => { + testAndValidateVector( + vector.slice(vectorBegin, vectorEnd), + typed.slice(typedBegin, typedEnd), + jsArray.slice(jsArrayBegin, jsArrayEnd) + ); + }); + // test splicing out the middle half + describe(`splicing out the middle half`, () => { + testAndValidateVector( + vector.slice(0, vectorBegin).concat(vector.slice(vectorEnd)), + new ArrayType([...typed.slice(0, typedBegin), ...typed.slice(typedEnd)]), + [...jsArray.slice(0, jsArrayBegin), ...jsArray.slice(jsArrayEnd)] + ); + }); + }); + }); +} + +function testFloatVector<T extends Float>(DataType: new () => T, values?: Array<any>) { + + const type = new DataType(); + const ArrayType = type.ArrayType; + + const typed = valuesTyped(ArrayType); + const jsArray = values || [...typed]; + const vector = Vector.new(Data.Float(type, 0, typed.length, 0, null, typed)); + const chunked = testValueBuffers.map((b) => new ArrayType(b)) + .map((b) => Vector.new(Data.Float(type, 0, b.length, 0, null, b))) + .reduce((v: any, v2) => v.concat(v2)); + + const begin = (vector.length * .25) | 0; + const end = (vector.length * .75) | 0; + const combos = [[`vector`, vector], [`chunked`, chunked]] as [string, V<T>][]; + + combos.forEach(([chunksType, vector]) => { + describe(chunksType, () => { + // test base case no slicing + describe(`base case no slicing`, () => { testAndValidateVector(vector, typed, jsArray); }); + // test slicing without args + describe(`slicing without args`, () => { testAndValidateVector(vector.slice(), typed.slice(), jsArray.slice()); }); + // test slicing the middle half + describe(`slice the middle half`, () => { + testAndValidateVector( + vector.slice(begin, end), + typed.slice(begin, end), + jsArray.slice(begin, end) + ); + }); + // test splicing out the middle half + describe(`splicing out the middle half`, () => { + testAndValidateVector( + vector.slice(0, begin).concat(vector.slice(end)), + new ArrayType([...typed.slice(0, begin), ...typed.slice(end)]), + [...jsArray.slice(0, begin), ...jsArray.slice(end)] + ); + }); + }); + }); +} + +function testAndValidateVector<T extends Int | Float>(vector: Vector<T>, typed: T['TArray'], values: any[] = [...typed]) { + gets_expected_values(vector, typed, values); + iterates_expected_values(vector, typed, values); + indexof_returns_expected_values(vector, typed, values); + slice_returns_a_typedarray(vector); + slices_the_entire_array(vector, typed); + slices_from_minus_20_to_length(vector, typed); + slices_from_0_to_minus_20(vector, typed); + slices_the_array_from_0_to_length_minus_20(vector, typed); + slices_the_array_from_0_to_length_plus_20(vector, typed); +} + +function gets_expected_values<T extends Int | Float>(vector: Vector<T>, typed: T['TArray'], values: any[] = [...typed]) { + test(`gets expected values`, () => { + expect.hasAssertions(); + let i = -1, n = vector.length; + let stride = vector.stride; + try { + if (stride === 1) { + while (++i < n) { + expect(vector.get(i)).toEqual(values[i]); + } + } else if (typeof values[0] === 'bigint') { + while (++i < n) { + const x: any = vector.get(i)!; + expect(0n + x).toEqual(values[i]); + } + } else { + const vector64 = vector as Vector<Int64 | Uint64>; + const i64 = (() => typed.subarray(stride * i, stride * (i + 1))); + while (++i < n) { + expect((vector64.get(i) as any).subarray(0, stride)).toEqual(i64()); + } + } + } catch (e) { throw new Error(`${i}: ${e}`); } + }); +} + +function iterates_expected_values<T extends Int | Float>(vector: Vector<T>, typed: T['TArray'], values: any[] = [...typed]) { + test(`iterates expected values`, () => { + expect.hasAssertions(); + let i = -1, n = vector.length; + let stride = vector.stride; + try { + if (stride === 1) { + for (let v of vector) { + expect(++i).toBeLessThan(n); + expect(v).toEqual(values[i]); + } + } else if (typeof values[0] === 'bigint') { + let x: any; + for (let v of vector) { + x = v; + expect(++i).toBeLessThan(n); + expect(0n + x).toEqual(values[i]); + } + } else { + const vector64 = vector as Vector<Int64 | Uint64>; + const i64 = (() => typed.subarray(stride * i, stride * (i + 1))); + for (let v of vector64) { + expect(++i).toBeLessThan(n); + expect((v as any).subarray(0, stride)).toEqual(i64()); + } + } + } catch (e) { throw new Error(`${i}: ${e}`); } + }); +} + +function indexof_returns_expected_values<T extends Int | Float>(vector: Vector<T>, typed: T['TArray'], values: any = [...typed]) { + test(`indexOf returns expected values`, () => { + + expect.hasAssertions(); + + const stride = vector.stride; + const BPE = vector.ArrayType.BYTES_PER_ELEMENT; + const isBigInt = typeof values[0] === 'bigint'; + const isInt64 = util.compareTypes(vector.type, new Int64()); + const isFloat16 = util.compareTypes(vector.type, new Float16()); + + // Create a few random values + let missing: any = new vector.ArrayType(randomBytes(8 * 2 * BPE)); + + // Special cases convert the values and/or missing to the + // representations that indexOf() expects to receive + + if (isFloat16) { + missing = uint16ToFloat64Array(missing); + } else if (isBigInt) { + const BigIntArray = isInt64 ? BigInt64Array : BigUint64Array; + missing = Array.from({ length: missing.length / stride }, + (_, i) => new BigIntArray(missing.buffer, BPE * stride * i, 1)[0]); + } else if (stride !== 1) { + values = Array.from({ length: typed.length / stride }, + (_, i) => typed.slice(stride * i, stride * (i + 1))); + missing = Array.from({ length: missing.length / stride }, + (_, i) => missing.slice(stride * i, stride * (i + 1))); + } + + const original = values.slice(); + // Combine with the expected values and shuffle the order + const shuffled = shuffle(values.concat([...missing])); + let i = -1, j: number, k: number, n = shuffled.length; + + try { + if (!isBigInt) { + while (++i < n) { + const search = shuffled[i]; + if (typeof search !== 'number' || !isNaN(search)) { + expect(vector.indexOf(search)).toEqual(original.indexOf(search)); + } else { + for (j = -1, k = original.length; ++j < k;) { + if (isNaN(original[j])) { break; } + } + expect(vector.indexOf(search)).toEqual(j < k ? j : -1); + } + } + } else { + // Distinguish the bigint comparisons to ensure the indexOf type signature accepts bigints + let shuffled64 = shuffled as bigint[]; + if (isInt64) { + let vector64 = (<unknown> vector) as Int64Vector; + while (++i < n) { + expect(vector64.indexOf(shuffled64[i])).toEqual(original.indexOf(shuffled64[i])); + } + } else { + let vector64 = (<unknown> vector) as Uint64Vector; + while (++i < n) { + expect(vector64.indexOf(shuffled64[i])).toEqual(original.indexOf(shuffled64[i])); + } + } + } + } catch (e) { throw new Error(`${i} (${shuffled[i]}): ${e}`); } + }); +} + +function slice_returns_a_typedarray<T extends Int | Float>(vector: Vector<T>) { + test(`slice returns a TypedArray`, () => { + expect.hasAssertions(); + expect(vector.slice().toArray()).toBeInstanceOf(vector.ArrayType); + }); +} + +function slices_the_entire_array<T extends Int | Float>(vector: Vector<T>, values: T['TArray']) { + test(`slices the entire array`, () => { + expect.hasAssertions(); + expect(vector.slice().toArray()).toEqual(values); + }); +} + +function slices_from_minus_20_to_length<T extends Int | Float>(vector: Vector<T>, values: T['TArray']) { + test(`slices from -20 to length`, () => { + expect.hasAssertions(); + expect(vector.slice(-20).toArray()).toEqual(values.slice(-(20 * vector.stride))); + }); +} + +function slices_from_0_to_minus_20<T extends Int | Float>(vector: Vector<T>, values: T['TArray']) { + test(`slices from 0 to -20`, () => { + expect.hasAssertions(); + expect(vector.slice(0, -20).toArray()).toEqual(values.slice(0, -(20 * vector.stride))); + }); +} + +function slices_the_array_from_0_to_length_minus_20 <T extends Int | Float>(vector: Vector<T>, values: T['TArray']) { + test(`slices the array from 0 to length - 20`, () => { + expect.hasAssertions(); + expect(vector.slice(0, vector.length - 20).toArray()).toEqual(values.slice(0, values.length - (20 * vector.stride))); + }); +} + +function slices_the_array_from_0_to_length_plus_20<T extends Int | Float>(vector: Vector<T>, values: T['TArray']) { + test(`slices the array from 0 to length + 20`, () => { + expect.hasAssertions(); + expect(vector.slice(0, vector.length + 20).toArray()).toEqual(values.slice(0, values.length + (20 * vector.stride))); + }); +} + +function shuffle(input: any[]) { + const result = input.slice(); + let j, tmp, i = result.length; + while (--i > 0) { + j = (Math.random() * (i + 1)) | 0; + tmp = result[i]; + result[i] = result[j]; + result[j] = tmp; + } + return result; +} diff --git a/src/arrow/js/test/unit/vector/vector-tests.ts b/src/arrow/js/test/unit/vector/vector-tests.ts new file mode 100644 index 000000000..60bff94f8 --- /dev/null +++ b/src/arrow/js/test/unit/vector/vector-tests.ts @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { + Int32, Dictionary, DateUnit, util, + Data, Vector, Utf8Vector, DateVector, DictionaryVector, +} from 'apache-arrow'; + +describe(`DateVector`, () => { + const extras = [ + new Date(2000, 0, 1), + new Date(1991, 5, 28, 12, 11, 10) + ]; + describe(`unit = MILLISECOND`, () => { + const values = [ + new Date(1989, 5, 22, 1, 2, 3), + new Date(1988, 3, 25, 4, 5, 6), + new Date(1987, 2, 24, 7, 8, 9), + new Date(2018, 4, 12, 17, 30, 0) + ]; + const vector = DateVector.from(values); + basicVectorTests(vector, values, extras); + }); + describe(`unit = DAY`, () => { + // Use UTC to ensure that dates are always at midnight + const values = [ + new Date(Date.UTC(1989, 5, 22)), + new Date(Date.UTC(1988, 3, 25)), + new Date(Date.UTC(1987, 2, 24)), + new Date(Date.UTC(2018, 4, 12)) + ]; + const vector = DateVector.from(values, DateUnit.DAY); + basicVectorTests(vector, values, extras); + }); +}); + +describe(`DictionaryVector`, () => { + + const dictionary = ['foo', 'bar', 'baz']; + const extras = ['abc', '123']; // values to search for that should NOT be found + const dictionary_vec = Utf8Vector.from(dictionary); + + const indices = Array.from({length: 50}, () => Math.random() * 3 | 0); + const validity = Array.from({ length: indices.length }, () => Math.random() > 0.2 ? true : false); + + describe(`index with nullCount == 0`, () => { + + const values = Array.from(indices).map((d) => dictionary[d]); + const vector = DictionaryVector.from(dictionary_vec, new Int32(), indices); + + basicVectorTests(vector, values, extras); + + describe(`sliced`, () => { + basicVectorTests(vector.slice(10, 20), values.slice(10,20), extras); + }); + }); + + describe(`index with nullCount > 0`, () => { + + const nullBitmap = util.packBools(validity); + const nullCount = validity.reduce((acc, d) => acc + (d ? 0 : 1), 0); + const values = Array.from(indices).map((d, i) => validity[i] ? dictionary[d] : null); + const type = new Dictionary(dictionary_vec.type, new Int32(), null, null); + const vector = Vector.new(Data.Dictionary(type, 0, indices.length, nullCount, nullBitmap, indices, dictionary_vec)); + + basicVectorTests(vector, values, ['abc', '123']); + describe(`sliced`, () => { + basicVectorTests(vector.slice(10, 20), values.slice(10,20), extras); + }); + }); +}); + +describe(`Utf8Vector`, () => { + const values = ['foo', 'bar', 'baz', 'foo bar', 'bar']; + const vector = Utf8Vector.from(values); + basicVectorTests(vector, values, ['abc', '123']); + describe(`sliced`, () => { + basicVectorTests(vector.slice(1,3), values.slice(1,3), ['foo', 'abc']); + }); +}); + +// Creates some basic tests for the given vector. +// Verifies that: +// - `get` and the native iterator return the same data as `values` +// - `indexOf` returns the same indices as `values` +function basicVectorTests(vector: Vector, values: any[], extras: any[]) { + + const n = values.length; + + test(`gets expected values`, () => { + let i = -1; + while (++i < n) { + expect(vector.get(i)).toEqual(values[i]); + } + }); + test(`iterates expected values`, () => { + expect.hasAssertions(); + let i = -1; + for (let v of vector) { + expect(++i).toBeLessThan(n); + expect(v).toEqual(values[i]); + } + }); + test(`indexOf returns expected values`, () => { + let testValues = values.concat(extras); + + for (const value of testValues) { + const actual = vector.indexOf(value); + const expected = values.indexOf(value); + expect(actual).toEqual(expected); + } + }); +} diff --git a/src/arrow/js/test/unit/visitor-tests.ts b/src/arrow/js/test/unit/visitor-tests.ts new file mode 100644 index 000000000..22b3e5ced --- /dev/null +++ b/src/arrow/js/test/unit/visitor-tests.ts @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { Field } from 'apache-arrow'; +import { Visitor } from 'apache-arrow'; +import { + DataType, Dictionary, + Bool, Null, Utf8, Binary, Decimal, FixedSizeBinary, List, FixedSizeList, Map_, Struct, + Float, Float16, Float32, Float64, + Int, Uint8, Uint16, Uint32, Uint64, Int8, Int16, Int32, Int64, + Date_, DateDay, DateMillisecond, + Interval, IntervalDayTime, IntervalYearMonth, + Time, TimeSecond, TimeMillisecond, TimeMicrosecond, TimeNanosecond, + Timestamp, TimestampSecond, TimestampMillisecond, TimestampMicrosecond, TimestampNanosecond, + Union, DenseUnion, SparseUnion, +} from 'apache-arrow'; + +class BasicVisitor extends Visitor { + public type: DataType | undefined; + public visitNull <T extends Null> (type: T) { return (this.type = type); } + public visitBool <T extends Bool> (type: T) { return (this.type = type); } + public visitInt <T extends Int> (type: T) { return (this.type = type); } + public visitFloat <T extends Float> (type: T) { return (this.type = type); } + public visitUtf8 <T extends Utf8> (type: T) { return (this.type = type); } + public visitBinary <T extends Binary> (type: T) { return (this.type = type); } + public visitFixedSizeBinary <T extends FixedSizeBinary> (type: T) { return (this.type = type); } + public visitDate <T extends Date_> (type: T) { return (this.type = type); } + public visitTimestamp <T extends Timestamp> (type: T) { return (this.type = type); } + public visitTime <T extends Time> (type: T) { return (this.type = type); } + public visitDecimal <T extends Decimal> (type: T) { return (this.type = type); } + public visitList <T extends List> (type: T) { return (this.type = type); } + public visitStruct <T extends Struct> (type: T) { return (this.type = type); } + public visitUnion <T extends Union> (type: T) { return (this.type = type); } + public visitDictionary <T extends Dictionary> (type: T) { return (this.type = type); } + public visitInterval <T extends Interval> (type: T) { return (this.type = type); } + public visitFixedSizeList <T extends FixedSizeList> (type: T) { return (this.type = type); } + public visitMap <T extends Map_> (type: T) { return (this.type = type); } +} + +class FeatureVisitor extends Visitor { + public type: DataType | undefined; + public visitNull <T extends Null> (type: T) { return (this.type = type); } + public visitBool <T extends Bool> (type: T) { return (this.type = type); } + public visitInt8 <T extends Int8> (type: T) { return (this.type = type); } + public visitInt16 <T extends Int16> (type: T) { return (this.type = type); } + public visitInt32 <T extends Int32> (type: T) { return (this.type = type); } + public visitInt64 <T extends Int64> (type: T) { return (this.type = type); } + public visitUint8 <T extends Uint8> (type: T) { return (this.type = type); } + public visitUint16 <T extends Uint16> (type: T) { return (this.type = type); } + public visitUint32 <T extends Uint32> (type: T) { return (this.type = type); } + public visitUint64 <T extends Uint64> (type: T) { return (this.type = type); } + public visitFloat16 <T extends Float16> (type: T) { return (this.type = type); } + public visitFloat32 <T extends Float32> (type: T) { return (this.type = type); } + public visitFloat64 <T extends Float64> (type: T) { return (this.type = type); } + public visitUtf8 <T extends Utf8> (type: T) { return (this.type = type); } + public visitBinary <T extends Binary> (type: T) { return (this.type = type); } + public visitFixedSizeBinary <T extends FixedSizeBinary> (type: T) { return (this.type = type); } + public visitDateDay <T extends DateDay> (type: T) { return (this.type = type); } + public visitDateMillisecond <T extends DateMillisecond> (type: T) { return (this.type = type); } + public visitTimestampSecond <T extends TimestampSecond> (type: T) { return (this.type = type); } + public visitTimestampMillisecond <T extends TimestampMillisecond> (type: T) { return (this.type = type); } + public visitTimestampMicrosecond <T extends TimestampMicrosecond> (type: T) { return (this.type = type); } + public visitTimestampNanosecond <T extends TimestampNanosecond> (type: T) { return (this.type = type); } + public visitTimeSecond <T extends TimeSecond> (type: T) { return (this.type = type); } + public visitTimeMillisecond <T extends TimeMillisecond> (type: T) { return (this.type = type); } + public visitTimeMicrosecond <T extends TimeMicrosecond> (type: T) { return (this.type = type); } + public visitTimeNanosecond <T extends TimeNanosecond> (type: T) { return (this.type = type); } + public visitDecimal <T extends Decimal> (type: T) { return (this.type = type); } + public visitList <T extends List> (type: T) { return (this.type = type); } + public visitStruct <T extends Struct> (type: T) { return (this.type = type); } + public visitDenseUnion <T extends DenseUnion> (type: T) { return (this.type = type); } + public visitSparseUnion <T extends SparseUnion> (type: T) { return (this.type = type); } + public visitDictionary <T extends Dictionary> (type: T) { return (this.type = type); } + public visitIntervalDayTime <T extends IntervalDayTime> (type: T) { return (this.type = type); } + public visitIntervalYearMonth <T extends IntervalYearMonth> (type: T) { return (this.type = type); } + public visitFixedSizeList <T extends FixedSizeList> (type: T) { return (this.type = type); } + public visitMap <T extends Map_> (type: T) { return (this.type = type); } +} + +describe('Visitor', () => { + + describe('uses the base methods when no feature methods are implemented', () => { + test(`visits Null types`, () => validateBasicVisitor(new Null())); + test(`visits Bool types`, () => validateBasicVisitor(new Bool())); + test(`visits Int types`, () => validateBasicVisitor(new Int(true, 32))); + test(`visits Float types`, () => validateBasicVisitor(new Float(0))); + test(`visits Utf8 types`, () => validateBasicVisitor(new Utf8())); + test(`visits Binary types`, () => validateBasicVisitor(new Binary())); + test(`visits FixedSizeBinary types`, () => validateBasicVisitor(new FixedSizeBinary(128))); + test(`visits Date types`, () => validateBasicVisitor(new Date_(0))); + test(`visits Timestamp types`, () => validateBasicVisitor(new Timestamp(0, 'UTC'))); + test(`visits Time types`, () => validateBasicVisitor(new Time(0, 64))); + test(`visits Decimal types`, () => validateBasicVisitor(new Decimal(2, 9))); + test(`visits List types`, () => validateBasicVisitor(new List(null as any))); + test(`visits Struct types`, () => validateBasicVisitor(new Struct([] as any[]))); + test(`visits Union types`, () => validateBasicVisitor(new Union(0, [] as any[], [] as any[]))); + test(`visits Dictionary types`, () => validateBasicVisitor(new Dictionary(null as any, null as any))); + test(`visits Interval types`, () => validateBasicVisitor(new Interval(0))); + test(`visits FixedSizeList types`, () => validateBasicVisitor(new FixedSizeList(2, null as any))); + test(`visits Map types`, () => validateBasicVisitor(new Map_(new Field('', new Struct<{ key: Int; value: Int }>([] as any[]))))); + function validateBasicVisitor<T extends DataType>(type: T) { + const visitor = new BasicVisitor(); + const result = visitor.visit(type); + expect(result).toBe(type); + expect(visitor.type).toBe(type); + } + }); + + describe(`uses the feature methods instead of the base methods when they're implemented`, () => { + + test(`visits Null types`, () => validateFeatureVisitor(new Null())); + test(`visits Bool types`, () => validateFeatureVisitor(new Bool())); + test(`visits Int8 types`, () => validateFeatureVisitor(new Int8())); + test(`visits Int16 types`, () => validateFeatureVisitor(new Int16())); + test(`visits Int32 types`, () => validateFeatureVisitor(new Int32())); + test(`visits Int64 types`, () => validateFeatureVisitor(new Int64())); + test(`visits Uint8 types`, () => validateFeatureVisitor(new Uint8())); + test(`visits Uint16 types`, () => validateFeatureVisitor(new Uint16())); + test(`visits Uint32 types`, () => validateFeatureVisitor(new Uint32())); + test(`visits Uint64 types`, () => validateFeatureVisitor(new Uint64())); + test(`visits Float16 types`, () => validateFeatureVisitor(new Float16())); + test(`visits Float32 types`, () => validateFeatureVisitor(new Float32())); + test(`visits Float64 types`, () => validateFeatureVisitor(new Float64())); + test(`visits Utf8 types`, () => validateFeatureVisitor(new Utf8())); + test(`visits Binary types`, () => validateFeatureVisitor(new Binary())); + test(`visits FixedSizeBinary types`, () => validateFeatureVisitor(new FixedSizeBinary(128))); + test(`visits DateDay types`, () => validateFeatureVisitor(new DateDay())); + test(`visits DateMillisecond types`, () => validateFeatureVisitor(new DateMillisecond())); + test(`visits TimestampSecond types`, () => validateFeatureVisitor(new TimestampSecond())); + test(`visits TimestampMillisecond types`, () => validateFeatureVisitor(new TimestampMillisecond())); + test(`visits TimestampMicrosecond types`, () => validateFeatureVisitor(new TimestampMicrosecond())); + test(`visits TimestampNanosecond types`, () => validateFeatureVisitor(new TimestampNanosecond())); + test(`visits TimeSecond types`, () => validateFeatureVisitor(new TimeSecond())); + test(`visits TimeMillisecond types`, () => validateFeatureVisitor(new TimeMillisecond())); + test(`visits TimeMicrosecond types`, () => validateFeatureVisitor(new TimeMicrosecond())); + test(`visits TimeNanosecond types`, () => validateFeatureVisitor(new TimeNanosecond())); + test(`visits Decimal types`, () => validateFeatureVisitor(new Decimal(2, 9))); + test(`visits List types`, () => validateFeatureVisitor(new List(null as any))); + test(`visits Struct types`, () => validateFeatureVisitor(new Struct([] as any[]))); + test(`visits DenseUnion types`, () => validateFeatureVisitor(new DenseUnion([] as any[], [] as any[]))); + test(`visits SparseUnion types`, () => validateFeatureVisitor(new SparseUnion([] as any[], [] as any[]))); + test(`visits Dictionary types`, () => validateFeatureVisitor(new Dictionary(null as any, null as any))); + test(`visits IntervalDayTime types`, () => validateFeatureVisitor(new IntervalDayTime())); + test(`visits IntervalYearMonth types`, () => validateFeatureVisitor(new IntervalYearMonth())); + test(`visits FixedSizeList types`, () => validateFeatureVisitor(new FixedSizeList(2, null as any))); + test(`visits Map types`, () => validateFeatureVisitor(new Map_(new Field('', new Struct<{ key: Int; value: Int }>([] as any[]))))); + + function validateFeatureVisitor<T extends DataType>(type: T) { + const visitor = new FeatureVisitor(); + const result = visitor.visit(type); + expect(result).toBe(type); + expect(visitor.type).toBe(type); + } + }); +}); |