summaryrefslogtreecommitdiffstats
path: root/src/arrow/cpp/src/parquet/arrow/reader.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/cpp/src/parquet/arrow/reader.cc')
-rw-r--r--src/arrow/cpp/src/parquet/arrow/reader.cc1305
1 files changed, 1305 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/arrow/reader.cc b/src/arrow/cpp/src/parquet/arrow/reader.cc
new file mode 100644
index 000000000..1c2331864
--- /dev/null
+++ b/src/arrow/cpp/src/parquet/arrow/reader.cc
@@ -0,0 +1,1305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/reader.h"
+
+#include <algorithm>
+#include <cstring>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/extension_type.h"
+#include "arrow/io/memory.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/parallel.h"
+#include "arrow/util/range.h"
+#include "parquet/arrow/reader_internal.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+using arrow::Array;
+using arrow::ArrayData;
+using arrow::BooleanArray;
+using arrow::ChunkedArray;
+using arrow::DataType;
+using arrow::ExtensionType;
+using arrow::Field;
+using arrow::Future;
+using arrow::Int32Array;
+using arrow::ListArray;
+using arrow::MemoryPool;
+using arrow::RecordBatchReader;
+using arrow::ResizableBuffer;
+using arrow::Status;
+using arrow::StructArray;
+using arrow::Table;
+using arrow::TimestampArray;
+
+using arrow::internal::checked_cast;
+using arrow::internal::Iota;
+
+// Help reduce verbosity
+using ParquetReader = parquet::ParquetFileReader;
+
+using parquet::internal::RecordReader;
+
+namespace BitUtil = arrow::BitUtil;
+
+namespace parquet {
+namespace arrow {
+namespace {
+
+::arrow::Result<std::shared_ptr<ArrayData>> ChunksToSingle(const ChunkedArray& chunked) {
+ switch (chunked.num_chunks()) {
+ case 0: {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> array,
+ ::arrow::MakeArrayOfNull(chunked.type(), 0));
+ return array->data();
+ }
+ case 1:
+ return chunked.chunk(0)->data();
+ default:
+ // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
+ // this is not yet implemented
+ return Status::NotImplemented(
+ "Nested data conversions not implemented for chunked array outputs");
+ }
+}
+
+} // namespace
+
+class ColumnReaderImpl : public ColumnReader {
+ public:
+ virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
+ virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
+ virtual const std::shared_ptr<Field> field() = 0;
+
+ ::arrow::Status NextBatch(int64_t batch_size,
+ std::shared_ptr<::arrow::ChunkedArray>* out) final {
+ RETURN_NOT_OK(LoadBatch(batch_size));
+ RETURN_NOT_OK(BuildArray(batch_size, out));
+ for (int x = 0; x < (*out)->num_chunks(); x++) {
+ RETURN_NOT_OK((*out)->chunk(x)->Validate());
+ }
+ return Status::OK();
+ }
+
+ virtual ::arrow::Status LoadBatch(int64_t num_records) = 0;
+
+ virtual ::arrow::Status BuildArray(int64_t length_upper_bound,
+ std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
+ virtual bool IsOrHasRepeatedChild() const = 0;
+};
+
+namespace {
+
+std::shared_ptr<std::unordered_set<int>> VectorToSharedSet(
+ const std::vector<int>& values) {
+ std::shared_ptr<std::unordered_set<int>> result(new std::unordered_set<int>());
+ result->insert(values.begin(), values.end());
+ return result;
+}
+
+// Forward declaration
+Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& context,
+ std::unique_ptr<ColumnReaderImpl>* out);
+
+// ----------------------------------------------------------------------
+// FileReaderImpl forward declaration
+
+class FileReaderImpl : public FileReader {
+ public:
+ FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
+ ArrowReaderProperties properties)
+ : pool_(pool),
+ reader_(std::move(reader)),
+ reader_properties_(std::move(properties)) {}
+
+ Status Init() {
+ return SchemaManifest::Make(reader_->metadata()->schema(),
+ reader_->metadata()->key_value_metadata(),
+ reader_properties_, &manifest_);
+ }
+
+ FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
+ return [row_groups](int i, ParquetFileReader* reader) {
+ return new FileColumnIterator(i, reader, row_groups);
+ };
+ }
+
+ FileColumnIteratorFactory AllRowGroupsFactory() {
+ return SomeRowGroupsFactory(Iota(reader_->metadata()->num_row_groups()));
+ }
+
+ Status BoundsCheckColumn(int column) {
+ if (column < 0 || column >= this->num_columns()) {
+ return Status::Invalid("Column index out of bounds (got ", column,
+ ", should be "
+ "between 0 and ",
+ this->num_columns() - 1, ")");
+ }
+ return Status::OK();
+ }
+
+ Status BoundsCheckRowGroup(int row_group) {
+ // row group indices check
+ if (row_group < 0 || row_group >= num_row_groups()) {
+ return Status::Invalid("Some index in row_group_indices is ", row_group,
+ ", which is either < 0 or >= num_row_groups(",
+ num_row_groups(), ")");
+ }
+ return Status::OK();
+ }
+
+ Status BoundsCheck(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices) {
+ for (int i : row_groups) {
+ RETURN_NOT_OK(BoundsCheckRowGroup(i));
+ }
+ for (int i : column_indices) {
+ RETURN_NOT_OK(BoundsCheckColumn(i));
+ }
+ return Status::OK();
+ }
+
+ std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
+
+ Status ReadTable(const std::vector<int>& indices,
+ std::shared_ptr<Table>* out) override {
+ return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
+ }
+
+ Status GetFieldReader(int i,
+ const std::shared_ptr<std::unordered_set<int>>& included_leaves,
+ const std::vector<int>& row_groups,
+ std::unique_ptr<ColumnReaderImpl>* out) {
+ auto ctx = std::make_shared<ReaderContext>();
+ ctx->reader = reader_.get();
+ ctx->pool = pool_;
+ ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
+ ctx->filter_leaves = true;
+ ctx->included_leaves = included_leaves;
+ return GetReader(manifest_.schema_fields[i], ctx, out);
+ }
+
+ Status GetFieldReaders(const std::vector<int>& column_indices,
+ const std::vector<int>& row_groups,
+ std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
+ std::shared_ptr<::arrow::Schema>* out_schema) {
+ // We only need to read schema fields which have columns indicated
+ // in the indices vector
+ ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
+ manifest_.GetFieldIndices(column_indices));
+
+ auto included_leaves = VectorToSharedSet(column_indices);
+
+ out->resize(field_indices.size());
+ ::arrow::FieldVector out_fields(field_indices.size());
+ for (size_t i = 0; i < out->size(); ++i) {
+ std::unique_ptr<ColumnReaderImpl> reader;
+ RETURN_NOT_OK(
+ GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));
+
+ out_fields[i] = reader->field();
+ out->at(i) = std::move(reader);
+ }
+
+ *out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata);
+ return Status::OK();
+ }
+
+ Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+ std::unique_ptr<ColumnReader>* out);
+
+ Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
+ return GetColumn(i, AllRowGroupsFactory(), out);
+ }
+
+ Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override {
+ return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
+ reader_->metadata()->key_value_metadata(), out);
+ }
+
+ Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
+ auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
+ std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());
+
+ std::unique_ptr<ColumnReaderImpl> reader;
+ RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));
+
+ return ReadColumn(i, row_groups, reader.get(), out);
+ }
+
+ Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
+ std::shared_ptr<ChunkedArray>* out) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ // TODO(wesm): This calculation doesn't make much sense when we have repeated
+ // schema nodes
+ int64_t records_to_read = 0;
+ for (auto row_group : row_groups) {
+ // Can throw exception
+ records_to_read +=
+ reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
+ }
+ return reader->NextBatch(records_to_read, out);
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
+ Status ReadColumn(int i, const std::vector<int>& row_groups,
+ std::shared_ptr<ChunkedArray>* out) {
+ std::unique_ptr<ColumnReader> flat_column_reader;
+ RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
+ return ReadColumn(i, row_groups, flat_column_reader.get(), out);
+ }
+
+ Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
+ return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out);
+ }
+
+ Status ReadTable(std::shared_ptr<Table>* table) override {
+ return ReadTable(Iota(reader_->metadata()->num_columns()), table);
+ }
+
+ Status ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& indices,
+ std::shared_ptr<Table>* table) override;
+
+ // Helper method used by ReadRowGroups - read the given row groups/columns, skipping
+ // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
+ // alive in async contexts.
+ Future<std::shared_ptr<Table>> DecodeRowGroups(
+ std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);
+
+ Status ReadRowGroups(const std::vector<int>& row_groups,
+ std::shared_ptr<Table>* table) override {
+ return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
+ }
+
+ Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
+ std::shared_ptr<Table>* out) override {
+ return ReadRowGroups({row_group_index}, column_indices, out);
+ }
+
+ Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
+ return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
+ }
+
+ Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
+ const std::vector<int>& column_indices,
+ std::unique_ptr<RecordBatchReader>* out) override;
+
+ Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
+ std::unique_ptr<RecordBatchReader>* out) override {
+ return GetRecordBatchReader(row_group_indices,
+ Iota(reader_->metadata()->num_columns()), out);
+ }
+
+ ::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
+ GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
+ const std::vector<int> row_group_indices,
+ const std::vector<int> column_indices,
+ ::arrow::internal::Executor* cpu_executor,
+ int row_group_readahead) override;
+
+ int num_columns() const { return reader_->metadata()->num_columns(); }
+
+ ParquetFileReader* parquet_reader() const override { return reader_.get(); }
+
+ int num_row_groups() const override { return reader_->metadata()->num_row_groups(); }
+
+ void set_use_threads(bool use_threads) override {
+ reader_properties_.set_use_threads(use_threads);
+ }
+
+ void set_batch_size(int64_t batch_size) override {
+ reader_properties_.set_batch_size(batch_size);
+ }
+
+ const ArrowReaderProperties& properties() const override { return reader_properties_; }
+
+ const SchemaManifest& manifest() const override { return manifest_; }
+
+ Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
+ int64_t* num_rows) override {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ *num_rows = ScanFileContents(columns, column_batch_size, reader_.get());
+ return Status::OK();
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
+ MemoryPool* pool_;
+ std::unique_ptr<ParquetFileReader> reader_;
+ ArrowReaderProperties reader_properties_;
+
+ SchemaManifest manifest_;
+};
+
+class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
+ public:
+ RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches,
+ std::shared_ptr<::arrow::Schema> schema)
+ : batches_(std::move(batches)), schema_(std::move(schema)) {}
+
+ ~RowGroupRecordBatchReader() override {}
+
+ Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
+ return batches_.Next().Value(out);
+ }
+
+ std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
+
+ private:
+ ::arrow::Iterator<std::shared_ptr<::arrow::RecordBatch>> batches_;
+ std::shared_ptr<::arrow::Schema> schema_;
+};
+
+class ColumnChunkReaderImpl : public ColumnChunkReader {
+ public:
+ ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index)
+ : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
+
+ Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override {
+ return impl_->ReadColumn(column_index_, {row_group_index_}, out);
+ }
+
+ private:
+ FileReaderImpl* impl_;
+ int column_index_;
+ int row_group_index_;
+};
+
+class RowGroupReaderImpl : public RowGroupReader {
+ public:
+ RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index)
+ : impl_(impl), row_group_index_(row_group_index) {}
+
+ std::shared_ptr<ColumnChunkReader> Column(int column_index) override {
+ return std::shared_ptr<ColumnChunkReader>(
+ new ColumnChunkReaderImpl(impl_, row_group_index_, column_index));
+ }
+
+ Status ReadTable(const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out) override {
+ return impl_->ReadRowGroup(row_group_index_, column_indices, out);
+ }
+
+ Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
+ return impl_->ReadRowGroup(row_group_index_, out);
+ }
+
+ private:
+ FileReaderImpl* impl_;
+ int row_group_index_;
+};
+
+// ----------------------------------------------------------------------
+// Column reader implementations
+
+// Leaf reader is for primitive arrays and primitive children of nested arrays
+class LeafReader : public ColumnReaderImpl {
+ public:
+ LeafReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
+ std::unique_ptr<FileColumnIterator> input,
+ ::parquet::internal::LevelInfo leaf_info)
+ : ctx_(std::move(ctx)),
+ field_(std::move(field)),
+ input_(std::move(input)),
+ descr_(input_->descr()) {
+ record_reader_ = RecordReader::Make(
+ descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY);
+ NextRowGroup();
+ }
+
+ Status GetDefLevels(const int16_t** data, int64_t* length) final {
+ *data = record_reader_->def_levels();
+ *length = record_reader_->levels_position();
+ return Status::OK();
+ }
+
+ Status GetRepLevels(const int16_t** data, int64_t* length) final {
+ *data = record_reader_->rep_levels();
+ *length = record_reader_->levels_position();
+ return Status::OK();
+ }
+
+ bool IsOrHasRepeatedChild() const final { return false; }
+
+ Status LoadBatch(int64_t records_to_read) final {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ out_ = nullptr;
+ record_reader_->Reset();
+ // Pre-allocation gives much better performance for flat columns
+ record_reader_->Reserve(records_to_read);
+ while (records_to_read > 0) {
+ if (!record_reader_->HasMoreData()) {
+ break;
+ }
+ int64_t records_read = record_reader_->ReadRecords(records_to_read);
+ records_to_read -= records_read;
+ if (records_read == 0) {
+ NextRowGroup();
+ }
+ }
+ RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_,
+ ctx_->pool, &out_));
+ return Status::OK();
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
+ ::arrow::Status BuildArray(int64_t length_upper_bound,
+ std::shared_ptr<::arrow::ChunkedArray>* out) final {
+ *out = out_;
+ return Status::OK();
+ }
+
+ const std::shared_ptr<Field> field() override { return field_; }
+
+ private:
+ std::shared_ptr<ChunkedArray> out_;
+ void NextRowGroup() {
+ std::unique_ptr<PageReader> page_reader = input_->NextChunk();
+ record_reader_->SetPageReader(std::move(page_reader));
+ }
+
+ std::shared_ptr<ReaderContext> ctx_;
+ std::shared_ptr<Field> field_;
+ std::unique_ptr<FileColumnIterator> input_;
+ const ColumnDescriptor* descr_;
+ std::shared_ptr<RecordReader> record_reader_;
+};
+
+// Column reader for extension arrays
+class ExtensionReader : public ColumnReaderImpl {
+ public:
+ ExtensionReader(std::shared_ptr<Field> field,
+ std::unique_ptr<ColumnReaderImpl> storage_reader)
+ : field_(std::move(field)), storage_reader_(std::move(storage_reader)) {}
+
+ Status GetDefLevels(const int16_t** data, int64_t* length) override {
+ return storage_reader_->GetDefLevels(data, length);
+ }
+
+ Status GetRepLevels(const int16_t** data, int64_t* length) override {
+ return storage_reader_->GetRepLevels(data, length);
+ }
+
+ Status LoadBatch(int64_t number_of_records) final {
+ return storage_reader_->LoadBatch(number_of_records);
+ }
+
+ Status BuildArray(int64_t length_upper_bound,
+ std::shared_ptr<ChunkedArray>* out) override {
+ std::shared_ptr<ChunkedArray> storage;
+ RETURN_NOT_OK(storage_reader_->BuildArray(length_upper_bound, &storage));
+ *out = ExtensionType::WrapArray(field_->type(), storage);
+ return Status::OK();
+ }
+
+ bool IsOrHasRepeatedChild() const final {
+ return storage_reader_->IsOrHasRepeatedChild();
+ }
+
+ const std::shared_ptr<Field> field() override { return field_; }
+
+ private:
+ std::shared_ptr<Field> field_;
+ std::unique_ptr<ColumnReaderImpl> storage_reader_;
+};
+
+template <typename IndexType>
+class ListReader : public ColumnReaderImpl {
+ public:
+ ListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
+ ::parquet::internal::LevelInfo level_info,
+ std::unique_ptr<ColumnReaderImpl> child_reader)
+ : ctx_(std::move(ctx)),
+ field_(std::move(field)),
+ level_info_(level_info),
+ item_reader_(std::move(child_reader)) {}
+
+ Status GetDefLevels(const int16_t** data, int64_t* length) override {
+ return item_reader_->GetDefLevels(data, length);
+ }
+
+ Status GetRepLevels(const int16_t** data, int64_t* length) override {
+ return item_reader_->GetRepLevels(data, length);
+ }
+
+ bool IsOrHasRepeatedChild() const final { return true; }
+
+ Status LoadBatch(int64_t number_of_records) final {
+ return item_reader_->LoadBatch(number_of_records);
+ }
+
+ virtual ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
+ std::shared_ptr<ArrayData> data) {
+ if (field_->type()->id() == ::arrow::Type::MAP) {
+ // Error out if data is not map-compliant instead of aborting in MakeArray below
+ RETURN_NOT_OK(::arrow::MapArray::ValidateChildData(data->child_data));
+ }
+ std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+ return std::make_shared<ChunkedArray>(result);
+ }
+
+ Status BuildArray(int64_t length_upper_bound,
+ std::shared_ptr<ChunkedArray>* out) override {
+ const int16_t* def_levels;
+ const int16_t* rep_levels;
+ int64_t num_levels;
+ RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
+ RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
+
+ std::shared_ptr<ResizableBuffer> validity_buffer;
+ ::parquet::internal::ValidityBitmapInputOutput validity_io;
+ validity_io.values_read_upper_bound = length_upper_bound;
+ if (field_->nullable()) {
+ ARROW_ASSIGN_OR_RAISE(
+ validity_buffer,
+ AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool));
+ validity_io.valid_bits = validity_buffer->mutable_data();
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ std::shared_ptr<ResizableBuffer> offsets_buffer,
+ AllocateResizableBuffer(
+ sizeof(IndexType) * std::max(int64_t{1}, length_upper_bound + 1),
+ ctx_->pool));
+ // Ensure zero initialization in case we have reached a zero length list (and
+ // because first entry is always zero).
+ IndexType* offset_data = reinterpret_cast<IndexType*>(offsets_buffer->mutable_data());
+ offset_data[0] = 0;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+ level_info_, &validity_io, offset_data);
+ END_PARQUET_CATCH_EXCEPTIONS
+
+ RETURN_NOT_OK(item_reader_->BuildArray(offset_data[validity_io.values_read], out));
+
+ // Resize to actual number of elements returned.
+ RETURN_NOT_OK(
+ offsets_buffer->Resize((validity_io.values_read + 1) * sizeof(IndexType)));
+ if (validity_buffer != nullptr) {
+ RETURN_NOT_OK(
+ validity_buffer->Resize(BitUtil::BytesForBits(validity_io.values_read)));
+ validity_buffer->ZeroPadding();
+ }
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, ChunksToSingle(**out));
+
+ std::vector<std::shared_ptr<Buffer>> buffers{
+ validity_io.null_count > 0 ? validity_buffer : nullptr, offsets_buffer};
+ auto data = std::make_shared<ArrayData>(
+ field_->type(),
+ /*length=*/validity_io.values_read, std::move(buffers),
+ std::vector<std::shared_ptr<ArrayData>>{item_chunk}, validity_io.null_count);
+
+ ARROW_ASSIGN_OR_RAISE(*out, AssembleArray(std::move(data)));
+ return Status::OK();
+ }
+
+ const std::shared_ptr<Field> field() override { return field_; }
+
+ private:
+ std::shared_ptr<ReaderContext> ctx_;
+ std::shared_ptr<Field> field_;
+ ::parquet::internal::LevelInfo level_info_;
+ std::unique_ptr<ColumnReaderImpl> item_reader_;
+};
+
+class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader<int32_t> {
+ public:
+ FixedSizeListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
+ ::parquet::internal::LevelInfo level_info,
+ std::unique_ptr<ColumnReaderImpl> child_reader)
+ : ListReader(std::move(ctx), std::move(field), level_info,
+ std::move(child_reader)) {}
+ ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray(
+ std::shared_ptr<ArrayData> data) final {
+ DCHECK_EQ(data->buffers.size(), 2);
+ DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST);
+ const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type());
+ const int32_t* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data());
+ for (int x = 1; x <= data->length; x++) {
+ int32_t size = offsets[x] - offsets[x - 1];
+ if (size != type.list_size()) {
+ return Status::Invalid("Expected all lists to be of size=", type.list_size(),
+ " but index ", x, " had size=", size);
+ }
+ }
+ data->buffers.resize(1);
+ std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+ return std::make_shared<ChunkedArray>(result);
+ }
+};
+
+class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
+ public:
+ explicit StructReader(std::shared_ptr<ReaderContext> ctx,
+ std::shared_ptr<Field> filtered_field,
+ ::parquet::internal::LevelInfo level_info,
+ std::vector<std::unique_ptr<ColumnReaderImpl>> children)
+ : ctx_(std::move(ctx)),
+ filtered_field_(std::move(filtered_field)),
+ level_info_(level_info),
+ children_(std::move(children)) {
+ // There could be a mix of children some might be repeated some might not be.
+ // If possible use one that isn't since that will be guaranteed to have the least
+ // number of levels to reconstruct a nullable bitmap.
+ auto result = std::find_if(children_.begin(), children_.end(),
+ [](const std::unique_ptr<ColumnReaderImpl>& child) {
+ return !child->IsOrHasRepeatedChild();
+ });
+ if (result != children_.end()) {
+ def_rep_level_child_ = result->get();
+ has_repeated_child_ = false;
+ } else if (!children_.empty()) {
+ def_rep_level_child_ = children_.front().get();
+ has_repeated_child_ = true;
+ }
+ }
+
+ bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+ Status LoadBatch(int64_t records_to_read) override {
+ for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+ RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+ }
+ return Status::OK();
+ }
+ Status BuildArray(int64_t length_upper_bound,
+ std::shared_ptr<ChunkedArray>* out) override;
+ Status GetDefLevels(const int16_t** data, int64_t* length) override;
+ Status GetRepLevels(const int16_t** data, int64_t* length) override;
+ const std::shared_ptr<Field> field() override { return filtered_field_; }
+
+ private:
+ const std::shared_ptr<ReaderContext> ctx_;
+ const std::shared_ptr<Field> filtered_field_;
+ const ::parquet::internal::LevelInfo level_info_;
+ const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+ ColumnReaderImpl* def_rep_level_child_ = nullptr;
+ bool has_repeated_child_;
+};
+
+Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
+ *data = nullptr;
+ if (children_.size() == 0) {
+ *length = 0;
+ return Status::Invalid("StructReader had no children");
+ }
+
+ // This method should only be called when this struct or one of its parents
+ // are optional/repeated or it has a repeated child.
+ // Meaning all children must have rep/def levels associated
+ // with them.
+ RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
+ return Status::OK();
+}
+
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
+ *data = nullptr;
+ if (children_.size() == 0) {
+ *length = 0;
+ return Status::Invalid("StructReader had no childre");
+ }
+
+ // This method should only be called when this struct or one of its parents
+ // are optional/repeated or it has repeated child.
+ // Meaning all children must have rep/def levels associated
+ // with them.
+ RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
+ return Status::OK();
+}
+
+Status StructReader::BuildArray(int64_t length_upper_bound,
+ std::shared_ptr<ChunkedArray>* out) {
+ std::vector<std::shared_ptr<ArrayData>> children_array_data;
+ std::shared_ptr<ResizableBuffer> null_bitmap;
+
+ ::parquet::internal::ValidityBitmapInputOutput validity_io;
+ validity_io.values_read_upper_bound = length_upper_bound;
+ // This simplifies accounting below.
+ validity_io.values_read = length_upper_bound;
+
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ const int16_t* def_levels;
+ const int16_t* rep_levels;
+ int64_t num_levels;
+
+ if (has_repeated_child_) {
+ ARROW_ASSIGN_OR_RAISE(
+ null_bitmap,
+ AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool));
+ validity_io.valid_bits = null_bitmap->mutable_data();
+ RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
+ RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels));
+ DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, &validity_io);
+ } else if (filtered_field_->nullable()) {
+ ARROW_ASSIGN_OR_RAISE(
+ null_bitmap,
+ AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool));
+ validity_io.valid_bits = null_bitmap->mutable_data();
+ RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
+ DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io);
+ }
+
+ // Ensure all values are initialized.
+ if (null_bitmap) {
+ RETURN_NOT_OK(null_bitmap->Resize(BitUtil::BytesForBits(validity_io.values_read)));
+ null_bitmap->ZeroPadding();
+ }
+
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Gather children arrays and def levels
+ for (auto& child : children_) {
+ std::shared_ptr<ChunkedArray> field;
+ RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field));
+ children_array_data.push_back(std::move(array_data));
+ }
+
+ if (!filtered_field_->nullable() && !has_repeated_child_) {
+ validity_io.values_read = children_array_data.front()->length;
+ }
+
+ std::vector<std::shared_ptr<Buffer>> buffers{validity_io.null_count > 0 ? null_bitmap
+ : nullptr};
+ auto data =
+ std::make_shared<ArrayData>(filtered_field_->type(),
+ /*length=*/validity_io.values_read, std::move(buffers),
+ std::move(children_array_data));
+ std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+
+ *out = std::make_shared<ChunkedArray>(result);
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// File reader implementation
+
+Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
+ const std::shared_ptr<ReaderContext>& ctx,
+ std::unique_ptr<ColumnReaderImpl>* out) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+
+ auto type_id = arrow_field->type()->id();
+
+ if (type_id == ::arrow::Type::EXTENSION) {
+ auto storage_field = arrow_field->WithType(
+ checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
+ RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
+ out->reset(new ExtensionReader(arrow_field, std::move(*out)));
+ return Status::OK();
+ }
+
+ if (field.children.size() == 0) {
+ if (!field.is_leaf()) {
+ return Status::Invalid("Parquet non-leaf node has no children");
+ }
+ if (!ctx->IncludesLeaf(field.column_index)) {
+ *out = nullptr;
+ return Status::OK();
+ }
+ std::unique_ptr<FileColumnIterator> input(
+ ctx->iterator_factory(field.column_index, ctx->reader));
+ out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info));
+ } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP ||
+ type_id == ::arrow::Type::FIXED_SIZE_LIST ||
+ type_id == ::arrow::Type::LARGE_LIST) {
+ auto list_field = arrow_field;
+ auto child = &field.children[0];
+ std::unique_ptr<ColumnReaderImpl> child_reader;
+ RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
+ if (child_reader == nullptr) {
+ *out = nullptr;
+ return Status::OK();
+ }
+
+ // These two types might not be equal if there column pruning occurred.
+ // further down the stack.
+ const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type();
+ // This should really never happen but was raised as a question on the code
+ // review, this should be pretty cheap check so leave it in.
+ if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) {
+ return Status::Invalid("expected exactly one child field for: ",
+ list_field->ToString());
+ }
+ const DataType& schema_child_type = *(list_field->type()->field(0)->type());
+ if (type_id == ::arrow::Type::MAP) {
+ if (reader_child_type->num_fields() != 2 ||
+ !reader_child_type->field(0)->type()->Equals(
+ *schema_child_type.field(0)->type())) {
+ // This case applies if either key or value are completed filtered
+ // out so we can take the type as is or the key was partially
+ // so keeping it as a map no longer makes sence.
+ list_field = list_field->WithType(::arrow::list(child_reader->field()));
+ } else if (!reader_child_type->field(1)->type()->Equals(
+ *schema_child_type.field(1)->type())) {
+ list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
+ reader_child_type->field(
+ 0), // field 0 is unchanged baed on previous if statement
+ reader_child_type->field(1)));
+ }
+ // Map types are list<struct<key, value>> so use ListReader
+ // for reconstruction.
+ out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
+ std::move(child_reader)));
+ } else if (type_id == ::arrow::Type::LIST) {
+ if (!reader_child_type->Equals(schema_child_type)) {
+ list_field = list_field->WithType(::arrow::list(reader_child_type));
+ }
+
+ out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
+ std::move(child_reader)));
+ } else if (type_id == ::arrow::Type::LARGE_LIST) {
+ if (!reader_child_type->Equals(schema_child_type)) {
+ list_field = list_field->WithType(::arrow::large_list(reader_child_type));
+ }
+
+ out->reset(new ListReader<int64_t>(ctx, list_field, field.level_info,
+ std::move(child_reader)));
+ } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
+ if (!reader_child_type->Equals(schema_child_type)) {
+ auto& fixed_list_type =
+ checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type());
+ int32_t list_size = fixed_list_type.list_size();
+ list_field =
+ list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size));
+ }
+
+ out->reset(new FixedSizeListReader(ctx, list_field, field.level_info,
+ std::move(child_reader)));
+ } else {
+ return Status::UnknownError("Unknown list type: ", field.field->ToString());
+ }
+ } else if (type_id == ::arrow::Type::STRUCT) {
+ std::vector<std::shared_ptr<Field>> child_fields;
+ int arrow_field_idx = 0;
+ std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
+ for (const auto& child : field.children) {
+ std::unique_ptr<ColumnReaderImpl> child_reader;
+ RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
+ if (!child_reader) {
+ arrow_field_idx++;
+ // If all children were pruned, then we do not try to read this field
+ continue;
+ }
+ std::shared_ptr<::arrow::Field> child_field = child.field;
+ const DataType& reader_child_type = *child_reader->field()->type();
+ const DataType& schema_child_type =
+ *arrow_field->type()->field(arrow_field_idx++)->type();
+ // These might not be equal if column pruning occurred.
+ if (!schema_child_type.Equals(reader_child_type)) {
+ child_field = child_field->WithType(child_reader->field()->type());
+ }
+ child_fields.push_back(child_field);
+ child_readers.emplace_back(std::move(child_reader));
+ }
+ if (child_fields.size() == 0) {
+ *out = nullptr;
+ return Status::OK();
+ }
+ auto filtered_field =
+ ::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
+ arrow_field->nullable(), arrow_field->metadata());
+ out->reset(new StructReader(ctx, filtered_field, field.level_info,
+ std::move(child_readers)));
+ } else {
+ return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
+ }
+ return Status::OK();
+
+ END_PARQUET_CATCH_EXCEPTIONS
+}
+
+Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& ctx,
+ std::unique_ptr<ColumnReaderImpl>* out) {
+ return GetReader(field, field.field, ctx, out);
+}
+
+} // namespace
+
+Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::unique_ptr<RecordBatchReader>* out) {
+ RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
+
+ if (reader_properties_.pre_buffer()) {
+ // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
+ reader_properties_.cache_options());
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
+ std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
+ std::shared_ptr<::arrow::Schema> batch_schema;
+ RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));
+
+ if (readers.empty()) {
+ // Just generate all batches right now; they're cheap since they have no columns.
+ int64_t batch_size = properties().batch_size();
+ auto max_sized_batch =
+ ::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});
+
+ ::arrow::RecordBatchVector batches;
+
+ for (int row_group : row_groups) {
+ int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
+
+ batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);
+
+ if (int64_t trailing_rows = num_rows % batch_size) {
+ batches.push_back(max_sized_batch->Slice(0, trailing_rows));
+ }
+ }
+
+ *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
+ ::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));
+
+ return Status::OK();
+ }
+
+ int64_t num_rows = 0;
+ for (int row_group : row_groups) {
+ num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
+ }
+
+ using ::arrow::RecordBatchIterator;
+
+ // NB: This lambda will be invoked outside the scope of this call to
+ // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
+ // `this` is a non-owning pointer so we are relying on the parent FileReader outliving
+ // this RecordBatchReader.
+ ::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
+ [readers, batch_schema, num_rows,
+ this]() mutable -> ::arrow::Result<RecordBatchIterator> {
+ ::arrow::ChunkedArrayVector columns(readers.size());
+
+ // don't reserve more rows than necessary
+ int64_t batch_size = std::min(properties().batch_size(), num_rows);
+ num_rows -= batch_size;
+
+ RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
+ reader_properties_.use_threads(), static_cast<int>(readers.size()),
+ [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
+
+ for (const auto& column : columns) {
+ if (column == nullptr || column->length() == 0) {
+ return ::arrow::IterationTraits<RecordBatchIterator>::End();
+ }
+ }
+
+ auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
+ auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+
+ // NB: explicitly preserve table so that table_reader doesn't outlive it
+ return ::arrow::MakeFunctionIterator(
+ [table, table_reader] { return table_reader->Next(); });
+ });
+
+ *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
+ ::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));
+
+ return Status::OK();
+}
+
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch generators (where each sub-generator is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+ using RecordBatchGenerator =
+ ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+ explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
+ ::arrow::internal::Executor* cpu_executor,
+ std::vector<int> row_groups, std::vector<int> column_indices)
+ : arrow_reader_(std::move(arrow_reader)),
+ cpu_executor_(cpu_executor),
+ row_groups_(std::move(row_groups)),
+ column_indices_(std::move(column_indices)),
+ index_(0) {}
+
+ ::arrow::Future<RecordBatchGenerator> operator()() {
+ if (index_ >= row_groups_.size()) {
+ return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
+ }
+ int row_group = row_groups_[index_++];
+ std::vector<int> column_indices = column_indices_;
+ auto reader = arrow_reader_;
+ if (!reader->properties().pre_buffer()) {
+ return SubmitRead(cpu_executor_, reader, row_group, column_indices);
+ }
+ auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
+ if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
+ return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
+ return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
+ });
+ }
+
+ private:
+ // Synchronous fallback for when pre-buffer isn't enabled.
+ //
+ // Making the Parquet reader truly asynchronous requires heavy refactoring, so the
+ // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
+ // async I/O without forcing readahead.
+ static ::arrow::Future<RecordBatchGenerator> SubmitRead(
+ ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
+ const int row_group, const std::vector<int>& column_indices) {
+ if (!cpu_executor) {
+ return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
+ }
+ // If we have an executor, then force transfer (even if I/O was complete)
+ return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
+ row_group, column_indices));
+ }
+
+ static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
+ ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
+ const int row_group, const std::vector<int>& column_indices) {
+ // Skips bound checks/pre-buffering, since we've done that already
+ const int64_t batch_size = self->properties().batch_size();
+ return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
+ .Then([batch_size](const std::shared_ptr<Table>& table)
+ -> ::arrow::Result<RecordBatchGenerator> {
+ ::arrow::TableBatchReader table_reader(*table);
+ table_reader.set_chunksize(batch_size);
+ ::arrow::RecordBatchVector batches;
+ RETURN_NOT_OK(table_reader.ReadAll(&batches));
+ return ::arrow::MakeVectorGenerator(std::move(batches));
+ });
+ }
+
+ std::shared_ptr<FileReaderImpl> arrow_reader_;
+ ::arrow::internal::Executor* cpu_executor_;
+ std::vector<int> row_groups_;
+ std::vector<int> column_indices_;
+ size_t index_;
+};
+
+::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
+FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
+ const std::vector<int> row_group_indices,
+ const std::vector<int> column_indices,
+ ::arrow::internal::Executor* cpu_executor,
+ int row_group_readahead) {
+ RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
+ if (reader_properties_.pre_buffer()) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
+ reader_properties_.cache_options());
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+ ::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
+ RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
+ cpu_executor, row_group_indices, column_indices);
+ if (row_group_readahead > 0) {
+ row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator),
+ row_group_readahead);
+ }
+ return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
+}
+
+Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+ std::unique_ptr<ColumnReader>* out) {
+ RETURN_NOT_OK(BoundsCheckColumn(i));
+ auto ctx = std::make_shared<ReaderContext>();
+ ctx->reader = reader_.get();
+ ctx->pool = pool_;
+ ctx->iterator_factory = iterator_factory;
+ ctx->filter_leaves = false;
+ std::unique_ptr<ColumnReaderImpl> result;
+ RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
+ out->reset(result.release());
+ return Status::OK();
+}
+
+Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<Table>* out) {
+ RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
+
+ // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
+ if (reader_properties_.pre_buffer()) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ parquet_reader()->PreBuffer(row_groups, column_indices,
+ reader_properties_.io_context(),
+ reader_properties_.cache_options());
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
+ auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
+ /*cpu_executor=*/nullptr);
+ ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
+ return Status::OK();
+}
+
+Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
+ std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
+ // `self` is used solely to keep `this` alive in an async context - but we use this
+ // in a sync context too so use `this` over `self`
+ std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
+ std::shared_ptr<::arrow::Schema> result_schema;
+ RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
+ // OptionalParallelForAsync requires an executor
+ if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
+
+ auto read_column = [row_groups, self, this](size_t i,
+ std::shared_ptr<ColumnReaderImpl> reader)
+ -> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
+ std::shared_ptr<::arrow::ChunkedArray> column;
+ RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column));
+ return column;
+ };
+ auto make_table = [result_schema, row_groups, self,
+ this](const ::arrow::ChunkedArrayVector& columns)
+ -> ::arrow::Result<std::shared_ptr<Table>> {
+ int64_t num_rows = 0;
+ if (!columns.empty()) {
+ num_rows = columns[0]->length();
+ } else {
+ for (int i : row_groups) {
+ num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
+ }
+ }
+ auto table = Table::Make(std::move(result_schema), columns, num_rows);
+ RETURN_NOT_OK(table->Validate());
+ return table;
+ };
+ return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
+ std::move(readers), read_column,
+ cpu_executor)
+ .Then(std::move(make_table));
+}
+
+std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
+ return std::make_shared<RowGroupReaderImpl>(this, row_group_index);
+}
+
+// ----------------------------------------------------------------------
+// Public factory functions
+
+Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
+ std::shared_ptr<RecordBatchReader>* out) {
+ std::unique_ptr<RecordBatchReader> tmp;
+ ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
+ out->reset(tmp.release());
+ return Status::OK();
+}
+
+Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<RecordBatchReader>* out) {
+ std::unique_ptr<RecordBatchReader> tmp;
+ ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
+ out->reset(tmp.release());
+ return Status::OK();
+}
+
+Status FileReader::Make(::arrow::MemoryPool* pool,
+ std::unique_ptr<ParquetFileReader> reader,
+ const ArrowReaderProperties& properties,
+ std::unique_ptr<FileReader>* out) {
+ out->reset(new FileReaderImpl(pool, std::move(reader), properties));
+ return static_cast<FileReaderImpl*>(out->get())->Init();
+}
+
+Status FileReader::Make(::arrow::MemoryPool* pool,
+ std::unique_ptr<ParquetFileReader> reader,
+ std::unique_ptr<FileReader>* out) {
+ return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
+}
+
+FileReaderBuilder::FileReaderBuilder()
+ : pool_(::arrow::default_memory_pool()),
+ properties_(default_arrow_reader_properties()) {}
+
+Status FileReaderBuilder::Open(std::shared_ptr<::arrow::io::RandomAccessFile> file,
+ const ReaderProperties& properties,
+ std::shared_ptr<FileMetaData> metadata) {
+ PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(std::move(file), properties,
+ std::move(metadata)));
+ return Status::OK();
+}
+
+FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) {
+ pool_ = pool;
+ return this;
+}
+
+FileReaderBuilder* FileReaderBuilder::properties(
+ const ArrowReaderProperties& arg_properties) {
+ properties_ = arg_properties;
+ return this;
+}
+
+Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) {
+ return FileReader::Make(pool_, std::move(raw_reader_), properties_, out);
+}
+
+Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
+ std::unique_ptr<FileReader>* reader) {
+ FileReaderBuilder builder;
+ RETURN_NOT_OK(builder.Open(std::move(file)));
+ return builder.memory_pool(pool)->Build(reader);
+}
+
+namespace internal {
+
+Status FuzzReader(std::unique_ptr<FileReader> reader) {
+ auto st = Status::OK();
+ for (int i = 0; i < reader->num_row_groups(); ++i) {
+ std::shared_ptr<Table> table;
+ auto row_group_status = reader->ReadRowGroup(i, &table);
+ if (row_group_status.ok()) {
+ row_group_status &= table->ValidateFull();
+ }
+ st &= row_group_status;
+ }
+ return st;
+}
+
+Status FuzzReader(const uint8_t* data, int64_t size) {
+ auto buffer = std::make_shared<::arrow::Buffer>(data, size);
+ auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
+ FileReaderBuilder builder;
+ RETURN_NOT_OK(builder.Open(std::move(file)));
+
+ std::unique_ptr<FileReader> reader;
+ RETURN_NOT_OK(builder.Build(&reader));
+ return FuzzReader(std::move(reader));
+}
+
+} // namespace internal
+
+} // namespace arrow
+} // namespace parquet