#! /usr/bin/env node // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // @ts-check const fs = require('fs'); const path = require('path'); const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : ''; const { RecordBatch, AsyncMessageReader } = require(`../index${extension}`); const { VectorLoader } = require(`../targets/apache-arrow/visitor/vectorloader`); (async () => { const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2])); const reader = new AsyncMessageReader(readable); let schema, recordBatchIndex = 0, dictionaryBatchIndex = 0; for await (let message of reader) { let bufferRegions = []; if (message.isSchema()) { schema = message.header(); continue; } else if (message.isRecordBatch()) { const header = message.header(); bufferRegions = header.buffers; const body = await reader.readMessageBody(message.bodyLength); const recordBatch = loadRecordBatch(schema, header, body); console.log(`record batch ${++recordBatchIndex}: ${JSON.stringify({ offset: body.byteOffset, length: body.byteLength, numRows: recordBatch.length, })}`); } else if (message.isDictionaryBatch()) { const header = message.header(); bufferRegions = header.data.buffers; const type = schema.dictionaries.get(header.id); const body = await reader.readMessageBody(message.bodyLength); const recordBatch = loadDictionaryBatch(header.data, body, type); console.log(`dictionary batch ${++dictionaryBatchIndex}: ${JSON.stringify({ offset: body.byteOffset, length: body.byteLength, numRows: recordBatch.length, dictionaryId: header.id, })}`); } bufferRegions.forEach(({ offset, length }, i) => { console.log(`\tbuffer ${i + 1}: { offset: ${offset}, length: ${length} }`); }); } await reader.return(); })().catch((e) => { console.error(e); process.exit(1); }); function loadRecordBatch(schema, header, body) { return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields)); } function loadDictionaryBatch(header, body, dictionaryType) { return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType])); }