diff options
Diffstat (limited to 'src/arrow/cpp/src/parquet/arrow/reader.cc')
-rw-r--r-- | src/arrow/cpp/src/parquet/arrow/reader.cc | 1305 |
1 files changed, 1305 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/arrow/reader.cc b/src/arrow/cpp/src/parquet/arrow/reader.cc new file mode 100644 index 000000000..1c2331864 --- /dev/null +++ b/src/arrow/cpp/src/parquet/arrow/reader.cc @@ -0,0 +1,1305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/reader.h" + +#include <algorithm> +#include <cstring> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/extension_type.h" +#include "arrow/io/memory.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" +#include "arrow/type.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/future.h" +#include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/parallel.h" +#include "arrow/util/range.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/metadata.h" +#include "parquet/properties.h" +#include "parquet/schema.h" + +using arrow::Array; +using arrow::ArrayData; +using arrow::BooleanArray; +using arrow::ChunkedArray; +using arrow::DataType; +using arrow::ExtensionType; +using arrow::Field; +using arrow::Future; +using arrow::Int32Array; +using arrow::ListArray; +using arrow::MemoryPool; +using arrow::RecordBatchReader; +using arrow::ResizableBuffer; +using arrow::Status; +using arrow::StructArray; +using arrow::Table; +using arrow::TimestampArray; + +using arrow::internal::checked_cast; +using arrow::internal::Iota; + +// Help reduce verbosity +using ParquetReader = parquet::ParquetFileReader; + +using parquet::internal::RecordReader; + +namespace BitUtil = arrow::BitUtil; + +namespace parquet { +namespace arrow { +namespace { + +::arrow::Result<std::shared_ptr<ArrayData>> ChunksToSingle(const ChunkedArray& chunked) { + switch (chunked.num_chunks()) { + case 0: { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> array, + ::arrow::MakeArrayOfNull(chunked.type(), 0)); + return array->data(); + } + case 1: + return chunked.chunk(0)->data(); + default: + // ARROW-3762(wesm): If item reader yields a chunked array, we reject as + // this is not yet implemented + return Status::NotImplemented( + "Nested data conversions not implemented for chunked array outputs"); + } +} + +} // namespace + +class ColumnReaderImpl : public ColumnReader { + public: + virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0; + virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0; + virtual const std::shared_ptr<Field> field() = 0; + + ::arrow::Status NextBatch(int64_t batch_size, + std::shared_ptr<::arrow::ChunkedArray>* out) final { + RETURN_NOT_OK(LoadBatch(batch_size)); + RETURN_NOT_OK(BuildArray(batch_size, out)); + for (int x = 0; x < (*out)->num_chunks(); x++) { + RETURN_NOT_OK((*out)->chunk(x)->Validate()); + } + return Status::OK(); + } + + virtual ::arrow::Status LoadBatch(int64_t num_records) = 0; + + virtual ::arrow::Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<::arrow::ChunkedArray>* out) = 0; + virtual bool IsOrHasRepeatedChild() const = 0; +}; + +namespace { + +std::shared_ptr<std::unordered_set<int>> VectorToSharedSet( + const std::vector<int>& values) { + std::shared_ptr<std::unordered_set<int>> result(new std::unordered_set<int>()); + result->insert(values.begin(), values.end()); + return result; +} + +// Forward declaration +Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& context, + std::unique_ptr<ColumnReaderImpl>* out); + +// ---------------------------------------------------------------------- +// FileReaderImpl forward declaration + +class FileReaderImpl : public FileReader { + public: + FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader, + ArrowReaderProperties properties) + : pool_(pool), + reader_(std::move(reader)), + reader_properties_(std::move(properties)) {} + + Status Init() { + return SchemaManifest::Make(reader_->metadata()->schema(), + reader_->metadata()->key_value_metadata(), + reader_properties_, &manifest_); + } + + FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) { + return [row_groups](int i, ParquetFileReader* reader) { + return new FileColumnIterator(i, reader, row_groups); + }; + } + + FileColumnIteratorFactory AllRowGroupsFactory() { + return SomeRowGroupsFactory(Iota(reader_->metadata()->num_row_groups())); + } + + Status BoundsCheckColumn(int column) { + if (column < 0 || column >= this->num_columns()) { + return Status::Invalid("Column index out of bounds (got ", column, + ", should be " + "between 0 and ", + this->num_columns() - 1, ")"); + } + return Status::OK(); + } + + Status BoundsCheckRowGroup(int row_group) { + // row group indices check + if (row_group < 0 || row_group >= num_row_groups()) { + return Status::Invalid("Some index in row_group_indices is ", row_group, + ", which is either < 0 or >= num_row_groups(", + num_row_groups(), ")"); + } + return Status::OK(); + } + + Status BoundsCheck(const std::vector<int>& row_groups, + const std::vector<int>& column_indices) { + for (int i : row_groups) { + RETURN_NOT_OK(BoundsCheckRowGroup(i)); + } + for (int i : column_indices) { + RETURN_NOT_OK(BoundsCheckColumn(i)); + } + return Status::OK(); + } + + std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override; + + Status ReadTable(const std::vector<int>& indices, + std::shared_ptr<Table>* out) override { + return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); + } + + Status GetFieldReader(int i, + const std::shared_ptr<std::unordered_set<int>>& included_leaves, + const std::vector<int>& row_groups, + std::unique_ptr<ColumnReaderImpl>* out) { + auto ctx = std::make_shared<ReaderContext>(); + ctx->reader = reader_.get(); + ctx->pool = pool_; + ctx->iterator_factory = SomeRowGroupsFactory(row_groups); + ctx->filter_leaves = true; + ctx->included_leaves = included_leaves; + return GetReader(manifest_.schema_fields[i], ctx, out); + } + + Status GetFieldReaders(const std::vector<int>& column_indices, + const std::vector<int>& row_groups, + std::vector<std::shared_ptr<ColumnReaderImpl>>* out, + std::shared_ptr<::arrow::Schema>* out_schema) { + // We only need to read schema fields which have columns indicated + // in the indices vector + ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices, + manifest_.GetFieldIndices(column_indices)); + + auto included_leaves = VectorToSharedSet(column_indices); + + out->resize(field_indices.size()); + ::arrow::FieldVector out_fields(field_indices.size()); + for (size_t i = 0; i < out->size(); ++i) { + std::unique_ptr<ColumnReaderImpl> reader; + RETURN_NOT_OK( + GetFieldReader(field_indices[i], included_leaves, row_groups, &reader)); + + out_fields[i] = reader->field(); + out->at(i) = std::move(reader); + } + + *out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata); + return Status::OK(); + } + + Status GetColumn(int i, FileColumnIteratorFactory iterator_factory, + std::unique_ptr<ColumnReader>* out); + + Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override { + return GetColumn(i, AllRowGroupsFactory(), out); + } + + Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override { + return FromParquetSchema(reader_->metadata()->schema(), reader_properties_, + reader_->metadata()->key_value_metadata(), out); + } + + Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override { + auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns())); + std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups()); + + std::unique_ptr<ColumnReaderImpl> reader; + RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader)); + + return ReadColumn(i, row_groups, reader.get(), out); + } + + Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader, + std::shared_ptr<ChunkedArray>* out) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + // TODO(wesm): This calculation doesn't make much sense when we have repeated + // schema nodes + int64_t records_to_read = 0; + for (auto row_group : row_groups) { + // Can throw exception + records_to_read += + reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values(); + } + return reader->NextBatch(records_to_read, out); + END_PARQUET_CATCH_EXCEPTIONS + } + + Status ReadColumn(int i, const std::vector<int>& row_groups, + std::shared_ptr<ChunkedArray>* out) { + std::unique_ptr<ColumnReader> flat_column_reader; + RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader)); + return ReadColumn(i, row_groups, flat_column_reader.get(), out); + } + + Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override { + return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out); + } + + Status ReadTable(std::shared_ptr<Table>* table) override { + return ReadTable(Iota(reader_->metadata()->num_columns()), table); + } + + Status ReadRowGroups(const std::vector<int>& row_groups, + const std::vector<int>& indices, + std::shared_ptr<Table>* table) override; + + // Helper method used by ReadRowGroups - read the given row groups/columns, skipping + // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader + // alive in async contexts. + Future<std::shared_ptr<Table>> DecodeRowGroups( + std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups, + const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor); + + Status ReadRowGroups(const std::vector<int>& row_groups, + std::shared_ptr<Table>* table) override { + return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table); + } + + Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices, + std::shared_ptr<Table>* out) override { + return ReadRowGroups({row_group_index}, column_indices, out); + } + + Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override { + return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); + } + + Status GetRecordBatchReader(const std::vector<int>& row_group_indices, + const std::vector<int>& column_indices, + std::unique_ptr<RecordBatchReader>* out) override; + + Status GetRecordBatchReader(const std::vector<int>& row_group_indices, + std::unique_ptr<RecordBatchReader>* out) override { + return GetRecordBatchReader(row_group_indices, + Iota(reader_->metadata()->num_columns()), out); + } + + ::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>> + GetRecordBatchGenerator(std::shared_ptr<FileReader> reader, + const std::vector<int> row_group_indices, + const std::vector<int> column_indices, + ::arrow::internal::Executor* cpu_executor, + int row_group_readahead) override; + + int num_columns() const { return reader_->metadata()->num_columns(); } + + ParquetFileReader* parquet_reader() const override { return reader_.get(); } + + int num_row_groups() const override { return reader_->metadata()->num_row_groups(); } + + void set_use_threads(bool use_threads) override { + reader_properties_.set_use_threads(use_threads); + } + + void set_batch_size(int64_t batch_size) override { + reader_properties_.set_batch_size(batch_size); + } + + const ArrowReaderProperties& properties() const override { return reader_properties_; } + + const SchemaManifest& manifest() const override { return manifest_; } + + Status ScanContents(std::vector<int> columns, const int32_t column_batch_size, + int64_t* num_rows) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + *num_rows = ScanFileContents(columns, column_batch_size, reader_.get()); + return Status::OK(); + END_PARQUET_CATCH_EXCEPTIONS + } + + MemoryPool* pool_; + std::unique_ptr<ParquetFileReader> reader_; + ArrowReaderProperties reader_properties_; + + SchemaManifest manifest_; +}; + +class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader { + public: + RowGroupRecordBatchReader(::arrow::RecordBatchIterator batches, + std::shared_ptr<::arrow::Schema> schema) + : batches_(std::move(batches)), schema_(std::move(schema)) {} + + ~RowGroupRecordBatchReader() override {} + + Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override { + return batches_.Next().Value(out); + } + + std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } + + private: + ::arrow::Iterator<std::shared_ptr<::arrow::RecordBatch>> batches_; + std::shared_ptr<::arrow::Schema> schema_; +}; + +class ColumnChunkReaderImpl : public ColumnChunkReader { + public: + ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index) + : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {} + + Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override { + return impl_->ReadColumn(column_index_, {row_group_index_}, out); + } + + private: + FileReaderImpl* impl_; + int column_index_; + int row_group_index_; +}; + +class RowGroupReaderImpl : public RowGroupReader { + public: + RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index) + : impl_(impl), row_group_index_(row_group_index) {} + + std::shared_ptr<ColumnChunkReader> Column(int column_index) override { + return std::shared_ptr<ColumnChunkReader>( + new ColumnChunkReaderImpl(impl_, row_group_index_, column_index)); + } + + Status ReadTable(const std::vector<int>& column_indices, + std::shared_ptr<::arrow::Table>* out) override { + return impl_->ReadRowGroup(row_group_index_, column_indices, out); + } + + Status ReadTable(std::shared_ptr<::arrow::Table>* out) override { + return impl_->ReadRowGroup(row_group_index_, out); + } + + private: + FileReaderImpl* impl_; + int row_group_index_; +}; + +// ---------------------------------------------------------------------- +// Column reader implementations + +// Leaf reader is for primitive arrays and primitive children of nested arrays +class LeafReader : public ColumnReaderImpl { + public: + LeafReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field, + std::unique_ptr<FileColumnIterator> input, + ::parquet::internal::LevelInfo leaf_info) + : ctx_(std::move(ctx)), + field_(std::move(field)), + input_(std::move(input)), + descr_(input_->descr()) { + record_reader_ = RecordReader::Make( + descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); + NextRowGroup(); + } + + Status GetDefLevels(const int16_t** data, int64_t* length) final { + *data = record_reader_->def_levels(); + *length = record_reader_->levels_position(); + return Status::OK(); + } + + Status GetRepLevels(const int16_t** data, int64_t* length) final { + *data = record_reader_->rep_levels(); + *length = record_reader_->levels_position(); + return Status::OK(); + } + + bool IsOrHasRepeatedChild() const final { return false; } + + Status LoadBatch(int64_t records_to_read) final { + BEGIN_PARQUET_CATCH_EXCEPTIONS + out_ = nullptr; + record_reader_->Reset(); + // Pre-allocation gives much better performance for flat columns + record_reader_->Reserve(records_to_read); + while (records_to_read > 0) { + if (!record_reader_->HasMoreData()) { + break; + } + int64_t records_read = record_reader_->ReadRecords(records_to_read); + records_to_read -= records_read; + if (records_read == 0) { + NextRowGroup(); + } + } + RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_, + ctx_->pool, &out_)); + return Status::OK(); + END_PARQUET_CATCH_EXCEPTIONS + } + + ::arrow::Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<::arrow::ChunkedArray>* out) final { + *out = out_; + return Status::OK(); + } + + const std::shared_ptr<Field> field() override { return field_; } + + private: + std::shared_ptr<ChunkedArray> out_; + void NextRowGroup() { + std::unique_ptr<PageReader> page_reader = input_->NextChunk(); + record_reader_->SetPageReader(std::move(page_reader)); + } + + std::shared_ptr<ReaderContext> ctx_; + std::shared_ptr<Field> field_; + std::unique_ptr<FileColumnIterator> input_; + const ColumnDescriptor* descr_; + std::shared_ptr<RecordReader> record_reader_; +}; + +// Column reader for extension arrays +class ExtensionReader : public ColumnReaderImpl { + public: + ExtensionReader(std::shared_ptr<Field> field, + std::unique_ptr<ColumnReaderImpl> storage_reader) + : field_(std::move(field)), storage_reader_(std::move(storage_reader)) {} + + Status GetDefLevels(const int16_t** data, int64_t* length) override { + return storage_reader_->GetDefLevels(data, length); + } + + Status GetRepLevels(const int16_t** data, int64_t* length) override { + return storage_reader_->GetRepLevels(data, length); + } + + Status LoadBatch(int64_t number_of_records) final { + return storage_reader_->LoadBatch(number_of_records); + } + + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<ChunkedArray>* out) override { + std::shared_ptr<ChunkedArray> storage; + RETURN_NOT_OK(storage_reader_->BuildArray(length_upper_bound, &storage)); + *out = ExtensionType::WrapArray(field_->type(), storage); + return Status::OK(); + } + + bool IsOrHasRepeatedChild() const final { + return storage_reader_->IsOrHasRepeatedChild(); + } + + const std::shared_ptr<Field> field() override { return field_; } + + private: + std::shared_ptr<Field> field_; + std::unique_ptr<ColumnReaderImpl> storage_reader_; +}; + +template <typename IndexType> +class ListReader : public ColumnReaderImpl { + public: + ListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field, + ::parquet::internal::LevelInfo level_info, + std::unique_ptr<ColumnReaderImpl> child_reader) + : ctx_(std::move(ctx)), + field_(std::move(field)), + level_info_(level_info), + item_reader_(std::move(child_reader)) {} + + Status GetDefLevels(const int16_t** data, int64_t* length) override { + return item_reader_->GetDefLevels(data, length); + } + + Status GetRepLevels(const int16_t** data, int64_t* length) override { + return item_reader_->GetRepLevels(data, length); + } + + bool IsOrHasRepeatedChild() const final { return true; } + + Status LoadBatch(int64_t number_of_records) final { + return item_reader_->LoadBatch(number_of_records); + } + + virtual ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray( + std::shared_ptr<ArrayData> data) { + if (field_->type()->id() == ::arrow::Type::MAP) { + // Error out if data is not map-compliant instead of aborting in MakeArray below + RETURN_NOT_OK(::arrow::MapArray::ValidateChildData(data->child_data)); + } + std::shared_ptr<Array> result = ::arrow::MakeArray(data); + return std::make_shared<ChunkedArray>(result); + } + + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<ChunkedArray>* out) override { + const int16_t* def_levels; + const int16_t* rep_levels; + int64_t num_levels; + RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels)); + RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels)); + + std::shared_ptr<ResizableBuffer> validity_buffer; + ::parquet::internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = length_upper_bound; + if (field_->nullable()) { + ARROW_ASSIGN_OR_RAISE( + validity_buffer, + AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool)); + validity_io.valid_bits = validity_buffer->mutable_data(); + } + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr<ResizableBuffer> offsets_buffer, + AllocateResizableBuffer( + sizeof(IndexType) * std::max(int64_t{1}, length_upper_bound + 1), + ctx_->pool)); + // Ensure zero initialization in case we have reached a zero length list (and + // because first entry is always zero). + IndexType* offset_data = reinterpret_cast<IndexType*>(offsets_buffer->mutable_data()); + offset_data[0] = 0; + BEGIN_PARQUET_CATCH_EXCEPTIONS + ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels, + level_info_, &validity_io, offset_data); + END_PARQUET_CATCH_EXCEPTIONS + + RETURN_NOT_OK(item_reader_->BuildArray(offset_data[validity_io.values_read], out)); + + // Resize to actual number of elements returned. + RETURN_NOT_OK( + offsets_buffer->Resize((validity_io.values_read + 1) * sizeof(IndexType))); + if (validity_buffer != nullptr) { + RETURN_NOT_OK( + validity_buffer->Resize(BitUtil::BytesForBits(validity_io.values_read))); + validity_buffer->ZeroPadding(); + } + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, ChunksToSingle(**out)); + + std::vector<std::shared_ptr<Buffer>> buffers{ + validity_io.null_count > 0 ? validity_buffer : nullptr, offsets_buffer}; + auto data = std::make_shared<ArrayData>( + field_->type(), + /*length=*/validity_io.values_read, std::move(buffers), + std::vector<std::shared_ptr<ArrayData>>{item_chunk}, validity_io.null_count); + + ARROW_ASSIGN_OR_RAISE(*out, AssembleArray(std::move(data))); + return Status::OK(); + } + + const std::shared_ptr<Field> field() override { return field_; } + + private: + std::shared_ptr<ReaderContext> ctx_; + std::shared_ptr<Field> field_; + ::parquet::internal::LevelInfo level_info_; + std::unique_ptr<ColumnReaderImpl> item_reader_; +}; + +class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader<int32_t> { + public: + FixedSizeListReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field, + ::parquet::internal::LevelInfo level_info, + std::unique_ptr<ColumnReaderImpl> child_reader) + : ListReader(std::move(ctx), std::move(field), level_info, + std::move(child_reader)) {} + ::arrow::Result<std::shared_ptr<ChunkedArray>> AssembleArray( + std::shared_ptr<ArrayData> data) final { + DCHECK_EQ(data->buffers.size(), 2); + DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST); + const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type()); + const int32_t* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data()); + for (int x = 1; x <= data->length; x++) { + int32_t size = offsets[x] - offsets[x - 1]; + if (size != type.list_size()) { + return Status::Invalid("Expected all lists to be of size=", type.list_size(), + " but index ", x, " had size=", size); + } + } + data->buffers.resize(1); + std::shared_ptr<Array> result = ::arrow::MakeArray(data); + return std::make_shared<ChunkedArray>(result); + } +}; + +class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl { + public: + explicit StructReader(std::shared_ptr<ReaderContext> ctx, + std::shared_ptr<Field> filtered_field, + ::parquet::internal::LevelInfo level_info, + std::vector<std::unique_ptr<ColumnReaderImpl>> children) + : ctx_(std::move(ctx)), + filtered_field_(std::move(filtered_field)), + level_info_(level_info), + children_(std::move(children)) { + // There could be a mix of children some might be repeated some might not be. + // If possible use one that isn't since that will be guaranteed to have the least + // number of levels to reconstruct a nullable bitmap. + auto result = std::find_if(children_.begin(), children_.end(), + [](const std::unique_ptr<ColumnReaderImpl>& child) { + return !child->IsOrHasRepeatedChild(); + }); + if (result != children_.end()) { + def_rep_level_child_ = result->get(); + has_repeated_child_ = false; + } else if (!children_.empty()) { + def_rep_level_child_ = children_.front().get(); + has_repeated_child_ = true; + } + } + + bool IsOrHasRepeatedChild() const final { return has_repeated_child_; } + + Status LoadBatch(int64_t records_to_read) override { + for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) { + RETURN_NOT_OK(reader->LoadBatch(records_to_read)); + } + return Status::OK(); + } + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<ChunkedArray>* out) override; + Status GetDefLevels(const int16_t** data, int64_t* length) override; + Status GetRepLevels(const int16_t** data, int64_t* length) override; + const std::shared_ptr<Field> field() override { return filtered_field_; } + + private: + const std::shared_ptr<ReaderContext> ctx_; + const std::shared_ptr<Field> filtered_field_; + const ::parquet::internal::LevelInfo level_info_; + const std::vector<std::unique_ptr<ColumnReaderImpl>> children_; + ColumnReaderImpl* def_rep_level_child_ = nullptr; + bool has_repeated_child_; +}; + +Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) { + *data = nullptr; + if (children_.size() == 0) { + *length = 0; + return Status::Invalid("StructReader had no children"); + } + + // This method should only be called when this struct or one of its parents + // are optional/repeated or it has a repeated child. + // Meaning all children must have rep/def levels associated + // with them. + RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length)); + return Status::OK(); +} + +Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) { + *data = nullptr; + if (children_.size() == 0) { + *length = 0; + return Status::Invalid("StructReader had no childre"); + } + + // This method should only be called when this struct or one of its parents + // are optional/repeated or it has repeated child. + // Meaning all children must have rep/def levels associated + // with them. + RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length)); + return Status::OK(); +} + +Status StructReader::BuildArray(int64_t length_upper_bound, + std::shared_ptr<ChunkedArray>* out) { + std::vector<std::shared_ptr<ArrayData>> children_array_data; + std::shared_ptr<ResizableBuffer> null_bitmap; + + ::parquet::internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = length_upper_bound; + // This simplifies accounting below. + validity_io.values_read = length_upper_bound; + + BEGIN_PARQUET_CATCH_EXCEPTIONS + const int16_t* def_levels; + const int16_t* rep_levels; + int64_t num_levels; + + if (has_repeated_child_) { + ARROW_ASSIGN_OR_RAISE( + null_bitmap, + AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool)); + validity_io.valid_bits = null_bitmap->mutable_data(); + RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels)); + RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels)); + DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, &validity_io); + } else if (filtered_field_->nullable()) { + ARROW_ASSIGN_OR_RAISE( + null_bitmap, + AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool)); + validity_io.valid_bits = null_bitmap->mutable_data(); + RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels)); + DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io); + } + + // Ensure all values are initialized. + if (null_bitmap) { + RETURN_NOT_OK(null_bitmap->Resize(BitUtil::BytesForBits(validity_io.values_read))); + null_bitmap->ZeroPadding(); + } + + END_PARQUET_CATCH_EXCEPTIONS + // Gather children arrays and def levels + for (auto& child : children_) { + std::shared_ptr<ChunkedArray> field; + RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field)); + children_array_data.push_back(std::move(array_data)); + } + + if (!filtered_field_->nullable() && !has_repeated_child_) { + validity_io.values_read = children_array_data.front()->length; + } + + std::vector<std::shared_ptr<Buffer>> buffers{validity_io.null_count > 0 ? null_bitmap + : nullptr}; + auto data = + std::make_shared<ArrayData>(filtered_field_->type(), + /*length=*/validity_io.values_read, std::move(buffers), + std::move(children_array_data)); + std::shared_ptr<Array> result = ::arrow::MakeArray(data); + + *out = std::make_shared<ChunkedArray>(result); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// File reader implementation + +Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field, + const std::shared_ptr<ReaderContext>& ctx, + std::unique_ptr<ColumnReaderImpl>* out) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + + auto type_id = arrow_field->type()->id(); + + if (type_id == ::arrow::Type::EXTENSION) { + auto storage_field = arrow_field->WithType( + checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type()); + RETURN_NOT_OK(GetReader(field, storage_field, ctx, out)); + out->reset(new ExtensionReader(arrow_field, std::move(*out))); + return Status::OK(); + } + + if (field.children.size() == 0) { + if (!field.is_leaf()) { + return Status::Invalid("Parquet non-leaf node has no children"); + } + if (!ctx->IncludesLeaf(field.column_index)) { + *out = nullptr; + return Status::OK(); + } + std::unique_ptr<FileColumnIterator> input( + ctx->iterator_factory(field.column_index, ctx->reader)); + out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info)); + } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP || + type_id == ::arrow::Type::FIXED_SIZE_LIST || + type_id == ::arrow::Type::LARGE_LIST) { + auto list_field = arrow_field; + auto child = &field.children[0]; + std::unique_ptr<ColumnReaderImpl> child_reader; + RETURN_NOT_OK(GetReader(*child, ctx, &child_reader)); + if (child_reader == nullptr) { + *out = nullptr; + return Status::OK(); + } + + // These two types might not be equal if there column pruning occurred. + // further down the stack. + const std::shared_ptr<DataType> reader_child_type = child_reader->field()->type(); + // This should really never happen but was raised as a question on the code + // review, this should be pretty cheap check so leave it in. + if (ARROW_PREDICT_FALSE(list_field->type()->num_fields() != 1)) { + return Status::Invalid("expected exactly one child field for: ", + list_field->ToString()); + } + const DataType& schema_child_type = *(list_field->type()->field(0)->type()); + if (type_id == ::arrow::Type::MAP) { + if (reader_child_type->num_fields() != 2 || + !reader_child_type->field(0)->type()->Equals( + *schema_child_type.field(0)->type())) { + // This case applies if either key or value are completed filtered + // out so we can take the type as is or the key was partially + // so keeping it as a map no longer makes sence. + list_field = list_field->WithType(::arrow::list(child_reader->field())); + } else if (!reader_child_type->field(1)->type()->Equals( + *schema_child_type.field(1)->type())) { + list_field = list_field->WithType(std::make_shared<::arrow::MapType>( + reader_child_type->field( + 0), // field 0 is unchanged baed on previous if statement + reader_child_type->field(1))); + } + // Map types are list<struct<key, value>> so use ListReader + // for reconstruction. + out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info, + std::move(child_reader))); + } else if (type_id == ::arrow::Type::LIST) { + if (!reader_child_type->Equals(schema_child_type)) { + list_field = list_field->WithType(::arrow::list(reader_child_type)); + } + + out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info, + std::move(child_reader))); + } else if (type_id == ::arrow::Type::LARGE_LIST) { + if (!reader_child_type->Equals(schema_child_type)) { + list_field = list_field->WithType(::arrow::large_list(reader_child_type)); + } + + out->reset(new ListReader<int64_t>(ctx, list_field, field.level_info, + std::move(child_reader))); + } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) { + if (!reader_child_type->Equals(schema_child_type)) { + auto& fixed_list_type = + checked_cast<const ::arrow::FixedSizeListType&>(*list_field->type()); + int32_t list_size = fixed_list_type.list_size(); + list_field = + list_field->WithType(::arrow::fixed_size_list(reader_child_type, list_size)); + } + + out->reset(new FixedSizeListReader(ctx, list_field, field.level_info, + std::move(child_reader))); + } else { + return Status::UnknownError("Unknown list type: ", field.field->ToString()); + } + } else if (type_id == ::arrow::Type::STRUCT) { + std::vector<std::shared_ptr<Field>> child_fields; + int arrow_field_idx = 0; + std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers; + for (const auto& child : field.children) { + std::unique_ptr<ColumnReaderImpl> child_reader; + RETURN_NOT_OK(GetReader(child, ctx, &child_reader)); + if (!child_reader) { + arrow_field_idx++; + // If all children were pruned, then we do not try to read this field + continue; + } + std::shared_ptr<::arrow::Field> child_field = child.field; + const DataType& reader_child_type = *child_reader->field()->type(); + const DataType& schema_child_type = + *arrow_field->type()->field(arrow_field_idx++)->type(); + // These might not be equal if column pruning occurred. + if (!schema_child_type.Equals(reader_child_type)) { + child_field = child_field->WithType(child_reader->field()->type()); + } + child_fields.push_back(child_field); + child_readers.emplace_back(std::move(child_reader)); + } + if (child_fields.size() == 0) { + *out = nullptr; + return Status::OK(); + } + auto filtered_field = + ::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields), + arrow_field->nullable(), arrow_field->metadata()); + out->reset(new StructReader(ctx, filtered_field, field.level_info, + std::move(child_readers))); + } else { + return Status::Invalid("Unsupported nested type: ", arrow_field->ToString()); + } + return Status::OK(); + + END_PARQUET_CATCH_EXCEPTIONS +} + +Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& ctx, + std::unique_ptr<ColumnReaderImpl>* out) { + return GetReader(field, field.field, ctx, out); +} + +} // namespace + +Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, + const std::vector<int>& column_indices, + std::unique_ptr<RecordBatchReader>* out) { + RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); + + if (reader_properties_.pre_buffer()) { + // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled + BEGIN_PARQUET_CATCH_EXCEPTIONS + reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + } + + std::vector<std::shared_ptr<ColumnReaderImpl>> readers; + std::shared_ptr<::arrow::Schema> batch_schema; + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema)); + + if (readers.empty()) { + // Just generate all batches right now; they're cheap since they have no columns. + int64_t batch_size = properties().batch_size(); + auto max_sized_batch = + ::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{}); + + ::arrow::RecordBatchVector batches; + + for (int row_group : row_groups) { + int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows(); + + batches.insert(batches.end(), num_rows / batch_size, max_sized_batch); + + if (int64_t trailing_rows = num_rows % batch_size) { + batches.push_back(max_sized_batch->Slice(0, trailing_rows)); + } + } + + *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>( + ::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema)); + + return Status::OK(); + } + + int64_t num_rows = 0; + for (int row_group : row_groups) { + num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows(); + } + + using ::arrow::RecordBatchIterator; + + // NB: This lambda will be invoked outside the scope of this call to + // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value. + // `this` is a non-owning pointer so we are relying on the parent FileReader outliving + // this RecordBatchReader. + ::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator( + [readers, batch_schema, num_rows, + this]() mutable -> ::arrow::Result<RecordBatchIterator> { + ::arrow::ChunkedArrayVector columns(readers.size()); + + // don't reserve more rows than necessary + int64_t batch_size = std::min(properties().batch_size(), num_rows); + num_rows -= batch_size; + + RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( + reader_properties_.use_threads(), static_cast<int>(readers.size()), + [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); })); + + for (const auto& column : columns) { + if (column == nullptr || column->length() == 0) { + return ::arrow::IterationTraits<RecordBatchIterator>::End(); + } + } + + auto table = ::arrow::Table::Make(batch_schema, std::move(columns)); + auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table); + + // NB: explicitly preserve table so that table_reader doesn't outlive it + return ::arrow::MakeFunctionIterator( + [table, table_reader] { return table_reader->Next(); }); + }); + + *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>( + ::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema)); + + return Status::OK(); +} + +/// Given a file reader and a list of row groups, this is a generator of record +/// batch generators (where each sub-generator is the contents of a single row group). +class RowGroupGenerator { + public: + using RecordBatchGenerator = + ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>; + + explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader, + ::arrow::internal::Executor* cpu_executor, + std::vector<int> row_groups, std::vector<int> column_indices) + : arrow_reader_(std::move(arrow_reader)), + cpu_executor_(cpu_executor), + row_groups_(std::move(row_groups)), + column_indices_(std::move(column_indices)), + index_(0) {} + + ::arrow::Future<RecordBatchGenerator> operator()() { + if (index_ >= row_groups_.size()) { + return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>(); + } + int row_group = row_groups_[index_++]; + std::vector<int> column_indices = column_indices_; + auto reader = arrow_reader_; + if (!reader->properties().pre_buffer()) { + return SubmitRead(cpu_executor_, reader, row_group, column_indices); + } + auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); + if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); + return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> { + return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + }); + } + + private: + // Synchronous fallback for when pre-buffer isn't enabled. + // + // Making the Parquet reader truly asynchronous requires heavy refactoring, so the + // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for + // async I/O without forcing readahead. + static ::arrow::Future<RecordBatchGenerator> SubmitRead( + ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self, + const int row_group, const std::vector<int>& column_indices) { + if (!cpu_executor) { + return ReadOneRowGroup(cpu_executor, self, row_group, column_indices); + } + // If we have an executor, then force transfer (even if I/O was complete) + return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self, + row_group, column_indices)); + } + + static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup( + ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self, + const int row_group, const std::vector<int>& column_indices) { + // Skips bound checks/pre-buffering, since we've done that already + const int64_t batch_size = self->properties().batch_size(); + return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor) + .Then([batch_size](const std::shared_ptr<Table>& table) + -> ::arrow::Result<RecordBatchGenerator> { + ::arrow::TableBatchReader table_reader(*table); + table_reader.set_chunksize(batch_size); + ::arrow::RecordBatchVector batches; + RETURN_NOT_OK(table_reader.ReadAll(&batches)); + return ::arrow::MakeVectorGenerator(std::move(batches)); + }); + } + + std::shared_ptr<FileReaderImpl> arrow_reader_; + ::arrow::internal::Executor* cpu_executor_; + std::vector<int> row_groups_; + std::vector<int> column_indices_; + size_t index_; +}; + +::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>> +FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader, + const std::vector<int> row_group_indices, + const std::vector<int> column_indices, + ::arrow::internal::Executor* cpu_executor, + int row_group_readahead) { + RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices)); + if (reader_properties_.pre_buffer()) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + } + ::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator = + RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader), + cpu_executor, row_group_indices, column_indices); + if (row_group_readahead > 0) { + row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator), + row_group_readahead); + } + return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); +} + +Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, + std::unique_ptr<ColumnReader>* out) { + RETURN_NOT_OK(BoundsCheckColumn(i)); + auto ctx = std::make_shared<ReaderContext>(); + ctx->reader = reader_.get(); + ctx->pool = pool_; + ctx->iterator_factory = iterator_factory; + ctx->filter_leaves = false; + std::unique_ptr<ColumnReaderImpl> result; + RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result)); + out->reset(result.release()); + return Status::OK(); +} + +Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups, + const std::vector<int>& column_indices, + std::shared_ptr<Table>* out) { + RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); + + // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled + if (reader_properties_.pre_buffer()) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + parquet_reader()->PreBuffer(row_groups, column_indices, + reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + } + + auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices, + /*cpu_executor=*/nullptr); + ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult()); + return Status::OK(); +} + +Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups( + std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups, + const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) { + // `self` is used solely to keep `this` alive in an async context - but we use this + // in a sync context too so use `this` over `self` + std::vector<std::shared_ptr<ColumnReaderImpl>> readers; + std::shared_ptr<::arrow::Schema> result_schema; + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); + // OptionalParallelForAsync requires an executor + if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); + + auto read_column = [row_groups, self, this](size_t i, + std::shared_ptr<ColumnReaderImpl> reader) + -> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> { + std::shared_ptr<::arrow::ChunkedArray> column; + RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column)); + return column; + }; + auto make_table = [result_schema, row_groups, self, + this](const ::arrow::ChunkedArrayVector& columns) + -> ::arrow::Result<std::shared_ptr<Table>> { + int64_t num_rows = 0; + if (!columns.empty()) { + num_rows = columns[0]->length(); + } else { + for (int i : row_groups) { + num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows(); + } + } + auto table = Table::Make(std::move(result_schema), columns, num_rows); + RETURN_NOT_OK(table->Validate()); + return table; + }; + return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(), + std::move(readers), read_column, + cpu_executor) + .Then(std::move(make_table)); +} + +std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) { + return std::make_shared<RowGroupReaderImpl>(this, row_group_index); +} + +// ---------------------------------------------------------------------- +// Public factory functions + +Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices, + std::shared_ptr<RecordBatchReader>* out) { + std::unique_ptr<RecordBatchReader> tmp; + ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp)); + out->reset(tmp.release()); + return Status::OK(); +} + +Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices, + const std::vector<int>& column_indices, + std::shared_ptr<RecordBatchReader>* out) { + std::unique_ptr<RecordBatchReader> tmp; + ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp)); + out->reset(tmp.release()); + return Status::OK(); +} + +Status FileReader::Make(::arrow::MemoryPool* pool, + std::unique_ptr<ParquetFileReader> reader, + const ArrowReaderProperties& properties, + std::unique_ptr<FileReader>* out) { + out->reset(new FileReaderImpl(pool, std::move(reader), properties)); + return static_cast<FileReaderImpl*>(out->get())->Init(); +} + +Status FileReader::Make(::arrow::MemoryPool* pool, + std::unique_ptr<ParquetFileReader> reader, + std::unique_ptr<FileReader>* out) { + return Make(pool, std::move(reader), default_arrow_reader_properties(), out); +} + +FileReaderBuilder::FileReaderBuilder() + : pool_(::arrow::default_memory_pool()), + properties_(default_arrow_reader_properties()) {} + +Status FileReaderBuilder::Open(std::shared_ptr<::arrow::io::RandomAccessFile> file, + const ReaderProperties& properties, + std::shared_ptr<FileMetaData> metadata) { + PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(std::move(file), properties, + std::move(metadata))); + return Status::OK(); +} + +FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) { + pool_ = pool; + return this; +} + +FileReaderBuilder* FileReaderBuilder::properties( + const ArrowReaderProperties& arg_properties) { + properties_ = arg_properties; + return this; +} + +Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) { + return FileReader::Make(pool_, std::move(raw_reader_), properties_, out); +} + +Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool, + std::unique_ptr<FileReader>* reader) { + FileReaderBuilder builder; + RETURN_NOT_OK(builder.Open(std::move(file))); + return builder.memory_pool(pool)->Build(reader); +} + +namespace internal { + +Status FuzzReader(std::unique_ptr<FileReader> reader) { + auto st = Status::OK(); + for (int i = 0; i < reader->num_row_groups(); ++i) { + std::shared_ptr<Table> table; + auto row_group_status = reader->ReadRowGroup(i, &table); + if (row_group_status.ok()) { + row_group_status &= table->ValidateFull(); + } + st &= row_group_status; + } + return st; +} + +Status FuzzReader(const uint8_t* data, int64_t size) { + auto buffer = std::make_shared<::arrow::Buffer>(data, size); + auto file = std::make_shared<::arrow::io::BufferReader>(buffer); + FileReaderBuilder builder; + RETURN_NOT_OK(builder.Open(std::move(file))); + + std::unique_ptr<FileReader> reader; + RETURN_NOT_OK(builder.Build(&reader)); + return FuzzReader(std::move(reader)); +} + +} // namespace internal + +} // namespace arrow +} // namespace parquet |