diff options
Diffstat (limited to 'src/arrow/cpp/src/parquet/column_reader.cc')
-rw-r--r-- | src/arrow/cpp/src/parquet/column_reader.cc | 1808 |
1 files changed, 1808 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/column_reader.cc b/src/arrow/cpp/src/parquet/column_reader.cc new file mode 100644 index 000000000..c7ad78c10 --- /dev/null +++ b/src/arrow/cpp/src/parquet/column_reader.cc @@ -0,0 +1,1808 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/column_reader.h" + +#include <algorithm> +#include <cstdint> +#include <cstring> +#include <exception> +#include <iostream> +#include <memory> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_dict.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/chunked_array.h" +#include "arrow/type.h" +#include "arrow/util/bit_stream_utils.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/compression.h" +#include "arrow/util/int_util_internal.h" +#include "arrow/util/logging.h" +#include "arrow/util/rle_encoding.h" +#include "parquet/column_page.h" +#include "parquet/encoding.h" +#include "parquet/encryption/encryption_internal.h" +#include "parquet/encryption/internal_file_decryptor.h" +#include "parquet/level_comparison.h" +#include "parquet/level_conversion.h" +#include "parquet/properties.h" +#include "parquet/statistics.h" +#include "parquet/thrift_internal.h" // IWYU pragma: keep +// Required after "arrow/util/int_util_internal.h" (for OPTIONAL) +#include "parquet/windows_compatibility.h" + +using arrow::MemoryPool; +using arrow::internal::AddWithOverflow; +using arrow::internal::checked_cast; +using arrow::internal::MultiplyWithOverflow; + +namespace BitUtil = arrow::BitUtil; + +namespace parquet { +namespace { +inline bool HasSpacedValues(const ColumnDescriptor* descr) { + if (descr->max_repetition_level() > 0) { + // repeated+flat case + return !descr->schema_node()->is_required(); + } else { + // non-repeated+nested case + // Find if a node forces nulls in the lowest level along the hierarchy + const schema::Node* node = descr->schema_node().get(); + while (node) { + if (node->is_optional()) { + return true; + } + node = node->parent(); + } + return false; + } +} +} // namespace + +LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} + +LevelDecoder::~LevelDecoder() {} + +int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, + int num_buffered_values, const uint8_t* data, + int32_t data_size) { + max_level_ = max_level; + int32_t num_bytes = 0; + encoding_ = encoding; + num_values_remaining_ = num_buffered_values; + bit_width_ = BitUtil::Log2(max_level + 1); + switch (encoding) { + case Encoding::RLE: { + if (data_size < 4) { + throw ParquetException("Received invalid levels (corrupt data page?)"); + } + num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data); + if (num_bytes < 0 || num_bytes > data_size - 4) { + throw ParquetException("Received invalid number of bytes (corrupt data page?)"); + } + const uint8_t* decoder_data = data + 4; + if (!rle_decoder_) { + rle_decoder_.reset( + new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_)); + } else { + rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); + } + return 4 + num_bytes; + } + case Encoding::BIT_PACKED: { + int num_bits = 0; + if (MultiplyWithOverflow(num_buffered_values, bit_width_, &num_bits)) { + throw ParquetException( + "Number of buffered values too large (corrupt data page?)"); + } + num_bytes = static_cast<int32_t>(BitUtil::BytesForBits(num_bits)); + if (num_bytes < 0 || num_bytes > data_size - 4) { + throw ParquetException("Received invalid number of bytes (corrupt data page?)"); + } + if (!bit_packed_decoder_) { + bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes)); + } else { + bit_packed_decoder_->Reset(data, num_bytes); + } + return num_bytes; + } + default: + throw ParquetException("Unknown encoding type for levels."); + } + return -1; +} + +void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level, + int num_buffered_values, const uint8_t* data) { + max_level_ = max_level; + // Repetition and definition levels always uses RLE encoding + // in the DataPageV2 format. + if (num_bytes < 0) { + throw ParquetException("Invalid page header (corrupt data page?)"); + } + encoding_ = Encoding::RLE; + num_values_remaining_ = num_buffered_values; + bit_width_ = BitUtil::Log2(max_level + 1); + + if (!rle_decoder_) { + rle_decoder_.reset(new ::arrow::util::RleDecoder(data, num_bytes, bit_width_)); + } else { + rle_decoder_->Reset(data, num_bytes, bit_width_); + } +} + +int LevelDecoder::Decode(int batch_size, int16_t* levels) { + int num_decoded = 0; + + int num_values = std::min(num_values_remaining_, batch_size); + if (encoding_ == Encoding::RLE) { + num_decoded = rle_decoder_->GetBatch(levels, num_values); + } else { + num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); + } + if (num_decoded > 0) { + internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); + if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { + std::stringstream ss; + ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max + << " out of range. Max Level: " << max_level_; + throw ParquetException(ss.str()); + } + } + num_values_remaining_ -= num_decoded; + return num_decoded; +} + +ReaderProperties default_reader_properties() { + static ReaderProperties default_reader_properties; + return default_reader_properties; +} + +namespace { + +// Extracts encoded statistics from V1 and V2 data page headers +template <typename H> +EncodedStatistics ExtractStatsFromHeader(const H& header) { + EncodedStatistics page_statistics; + if (!header.__isset.statistics) { + return page_statistics; + } + const format::Statistics& stats = header.statistics; + if (stats.__isset.max) { + page_statistics.set_max(stats.max); + } + if (stats.__isset.min) { + page_statistics.set_min(stats.min); + } + if (stats.__isset.null_count) { + page_statistics.set_null_count(stats.null_count); + } + if (stats.__isset.distinct_count) { + page_statistics.set_distinct_count(stats.distinct_count); + } + return page_statistics; +} + +// ---------------------------------------------------------------------- +// SerializedPageReader deserializes Thrift metadata and pages that have been +// assembled in a serialized stream for storing in a Parquet files + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift format::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageReader : public PageReader { + public: + SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, + Compression::type codec, ::arrow::MemoryPool* pool, + const CryptoContext* crypto_ctx) + : stream_(std::move(stream)), + decompression_buffer_(AllocateBuffer(pool, 0)), + page_ordinal_(0), + seen_num_rows_(0), + total_num_rows_(total_num_rows), + decryption_buffer_(AllocateBuffer(pool, 0)) { + if (crypto_ctx != nullptr) { + crypto_ctx_ = *crypto_ctx; + InitDecryption(); + } + max_page_header_size_ = kDefaultMaxPageHeaderSize; + decompressor_ = GetCodec(codec); + } + + // Implement the PageReader interface + std::shared_ptr<Page> NextPage() override; + + void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; } + + private: + void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t module_type, + const std::string& page_aad); + + void InitDecryption(); + + std::shared_ptr<Buffer> DecompressIfNeeded(std::shared_ptr<Buffer> page_buffer, + int compressed_len, int uncompressed_len, + int levels_byte_len = 0); + + std::shared_ptr<ArrowInputStream> stream_; + + format::PageHeader current_page_header_; + std::shared_ptr<Page> current_page_; + + // Compression codec to use. + std::unique_ptr<::arrow::util::Codec> decompressor_; + std::shared_ptr<ResizableBuffer> decompression_buffer_; + + // The fields below are used for calculation of AAD (additional authenticated data) + // suffix which is part of the Parquet Modular Encryption. + // The AAD suffix for a parquet module is built internally by + // concatenating different parts some of which include + // the row group ordinal, column ordinal and page ordinal. + // Please refer to the encryption specification for more details: + // https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data + + // The ordinal fields in the context below are used for AAD suffix calculation. + CryptoContext crypto_ctx_; + int16_t page_ordinal_; // page ordinal does not count the dictionary page + + // Maximum allowed page size + uint32_t max_page_header_size_; + + // Number of rows read in data pages so far + int64_t seen_num_rows_; + + // Number of rows in all the data pages + int64_t total_num_rows_; + + // data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page + // header in a single column respectively. + // While calculating AAD for different pages in a single column the pages AAD is + // updated by only the page ordinal. + std::string data_page_aad_; + std::string data_page_header_aad_; + // Encryption + std::shared_ptr<ResizableBuffer> decryption_buffer_; +}; + +void SerializedPageReader::InitDecryption() { + // Prepare the AAD for quick update later. + if (crypto_ctx_.data_decryptor != nullptr) { + DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty()); + data_page_aad_ = encryption::CreateModuleAad( + crypto_ctx_.data_decryptor->file_aad(), encryption::kDataPage, + crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); + } + if (crypto_ctx_.meta_decryptor != nullptr) { + DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty()); + data_page_header_aad_ = encryption::CreateModuleAad( + crypto_ctx_.meta_decryptor->file_aad(), encryption::kDataPageHeader, + crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); + } +} + +void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, + int8_t module_type, + const std::string& page_aad) { + DCHECK(decryptor != nullptr); + if (crypto_ctx_.start_decrypt_with_dictionary_page) { + std::string aad = encryption::CreateModuleAad( + decryptor->file_aad(), module_type, crypto_ctx_.row_group_ordinal, + crypto_ctx_.column_ordinal, kNonPageOrdinal); + decryptor->UpdateAad(aad); + } else { + encryption::QuickUpdatePageAad(page_aad, page_ordinal_); + decryptor->UpdateAad(page_aad); + } +} + +std::shared_ptr<Page> SerializedPageReader::NextPage() { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + + while (seen_num_rows_ < total_num_rows_) { + uint32_t header_size = 0; + uint32_t allowed_page_size = kDefaultPageHeaderSize; + + // Page headers can be very large because of page statistics + // We try to deserialize a larger buffer progressively + // until a maximum allowed header limit + while (true) { + PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size)); + if (view.size() == 0) { + return std::shared_ptr<Page>(nullptr); + } + + // This gets used, then set by DeserializeThriftMsg + header_size = static_cast<uint32_t>(view.size()); + try { + if (crypto_ctx_.meta_decryptor != nullptr) { + UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader, + data_page_header_aad_); + } + DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size, + ¤t_page_header_, crypto_ctx_.meta_decryptor); + break; + } catch (std::exception& e) { + // Failed to deserialize. Double the allowed page header size and try again + std::stringstream ss; + ss << e.what(); + allowed_page_size *= 2; + if (allowed_page_size > max_page_header_size_) { + ss << "Deserializing page header failed.\n"; + throw ParquetException(ss.str()); + } + } + } + // Advance the stream offset + PARQUET_THROW_NOT_OK(stream_->Advance(header_size)); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + if (compressed_len < 0 || uncompressed_len < 0) { + throw ParquetException("Invalid page header"); + } + + if (crypto_ctx_.data_decryptor != nullptr) { + UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage, + data_page_aad_); + } + + // Read the compressed data page. + PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len)); + if (page_buffer->size() != compressed_len) { + std::stringstream ss; + ss << "Page was smaller (" << page_buffer->size() << ") than expected (" + << compressed_len << ")"; + ParquetException::EofException(ss.str()); + } + + // Decrypt it if we need to + if (crypto_ctx_.data_decryptor != nullptr) { + PARQUET_THROW_NOT_OK(decryption_buffer_->Resize( + compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(), false)); + compressed_len = crypto_ctx_.data_decryptor->Decrypt( + page_buffer->data(), compressed_len, decryption_buffer_->mutable_data()); + + page_buffer = decryption_buffer_; + } + + const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type); + + if (page_type == PageType::DICTIONARY_PAGE) { + crypto_ctx_.start_decrypt_with_dictionary_page = false; + const format::DictionaryPageHeader& dict_header = + current_page_header_.dictionary_page_header; + + bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; + if (dict_header.num_values < 0) { + throw ParquetException("Invalid page header (negative number of values)"); + } + + // Uncompress if needed + page_buffer = + DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len); + + return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values, + LoadEnumSafe(&dict_header.encoding), + is_sorted); + } else if (page_type == PageType::DATA_PAGE) { + ++page_ordinal_; + const format::DataPageHeader& header = current_page_header_.data_page_header; + + if (header.num_values < 0) { + throw ParquetException("Invalid page header (negative number of values)"); + } + EncodedStatistics page_statistics = ExtractStatsFromHeader(header); + seen_num_rows_ += header.num_values; + + // Uncompress if needed + page_buffer = + DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len); + + return std::make_shared<DataPageV1>(page_buffer, header.num_values, + LoadEnumSafe(&header.encoding), + LoadEnumSafe(&header.definition_level_encoding), + LoadEnumSafe(&header.repetition_level_encoding), + uncompressed_len, page_statistics); + } else if (page_type == PageType::DATA_PAGE_V2) { + ++page_ordinal_; + const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; + + if (header.num_values < 0) { + throw ParquetException("Invalid page header (negative number of values)"); + } + if (header.definition_levels_byte_length < 0 || + header.repetition_levels_byte_length < 0) { + throw ParquetException("Invalid page header (negative levels byte length)"); + } + bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; + EncodedStatistics page_statistics = ExtractStatsFromHeader(header); + seen_num_rows_ += header.num_values; + + // Uncompress if needed + int levels_byte_len; + if (AddWithOverflow(header.definition_levels_byte_length, + header.repetition_levels_byte_length, &levels_byte_len)) { + throw ParquetException("Levels size too large (corrupt file?)"); + } + // DecompressIfNeeded doesn't take `is_compressed` into account as + // it's page type-agnostic. + if (is_compressed) { + page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len, + uncompressed_len, levels_byte_len); + } + + return std::make_shared<DataPageV2>( + page_buffer, header.num_values, header.num_nulls, header.num_rows, + LoadEnumSafe(&header.encoding), header.definition_levels_byte_length, + header.repetition_levels_byte_length, uncompressed_len, is_compressed, + page_statistics); + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return std::shared_ptr<Page>(nullptr); +} + +std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded( + std::shared_ptr<Buffer> page_buffer, int compressed_len, int uncompressed_len, + int levels_byte_len) { + if (decompressor_ == nullptr) { + return page_buffer; + } + if (compressed_len < levels_byte_len || uncompressed_len < levels_byte_len) { + throw ParquetException("Invalid page header"); + } + + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { + PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); + } + + if (levels_byte_len > 0) { + // First copy the levels as-is + uint8_t* decompressed = decompression_buffer_->mutable_data(); + memcpy(decompressed, page_buffer->data(), levels_byte_len); + } + + // Decompress the values + PARQUET_THROW_NOT_OK(decompressor_->Decompress( + compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len, + uncompressed_len - levels_byte_len, + decompression_buffer_->mutable_data() + levels_byte_len)); + + return decompression_buffer_; +} + +} // namespace + +std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream, + int64_t total_num_rows, + Compression::type codec, + ::arrow::MemoryPool* pool, + const CryptoContext* ctx) { + return std::unique_ptr<PageReader>( + new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx)); +} + +namespace { + +// ---------------------------------------------------------------------- +// Impl base class for TypedColumnReader and RecordReader + +// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index +// encoding. +static bool IsDictionaryIndexEncoding(const Encoding::type& e) { + return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; +} + +template <typename DType> +class ColumnReaderImplBase { + public: + using T = typename DType::c_type; + + ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : descr_(descr), + max_def_level_(descr->max_definition_level()), + max_rep_level_(descr->max_repetition_level()), + num_buffered_values_(0), + num_decoded_values_(0), + pool_(pool), + current_decoder_(nullptr), + current_encoding_(Encoding::UNKNOWN) {} + + virtual ~ColumnReaderImplBase() = default; + + protected: + // Read up to batch_size values from the current data page into the + // pre-allocated memory T* + // + // @returns: the number of values read into the out buffer + int64_t ReadValues(int64_t batch_size, T* out) { + int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size)); + return num_decoded; + } + + // Read up to batch_size values from the current data page into the + // pre-allocated memory T*, leaving spaces for null entries according + // to the def_levels. + // + // @returns: the number of values read into the out buffer + int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, + uint8_t* valid_bits, int64_t valid_bits_offset) { + return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), + static_cast<int>(null_count), valid_bits, + valid_bits_offset); + } + + // Read multiple definition levels into preallocated memory + // + // Returns the number of decoded definition levels + int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { + if (max_def_level_ == 0) { + return 0; + } + return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels); + } + + bool HasNextInternal() { + // Either there is no data page available yet, or the data page has been + // exhausted + if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { + if (!ReadNewPage() || num_buffered_values_ == 0) { + return false; + } + } + return true; + } + + // Read multiple repetition levels into preallocated memory + // Returns the number of decoded repetition levels + int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { + if (max_rep_level_ == 0) { + return 0; + } + return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels); + } + + // Advance to the next data page + bool ReadNewPage() { + // Loop until we find the next data page. + while (true) { + current_page_ = pager_->NextPage(); + if (!current_page_) { + // EOS + return false; + } + + if (current_page_->type() == PageType::DICTIONARY_PAGE) { + ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); + continue; + } else if (current_page_->type() == PageType::DATA_PAGE) { + const auto page = std::static_pointer_cast<DataPageV1>(current_page_); + const int64_t levels_byte_size = InitializeLevelDecoders( + *page, page->repetition_level_encoding(), page->definition_level_encoding()); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else if (current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast<DataPageV2>(current_page_); + int64_t levels_byte_size = InitializeLevelDecodersV2(*page); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return true; + } + + void ConfigureDictionary(const DictionaryPage* page) { + int encoding = static_cast<int>(page->encoding()); + if (page->encoding() == Encoding::PLAIN_DICTIONARY || + page->encoding() == Encoding::PLAIN) { + encoding = static_cast<int>(Encoding::RLE_DICTIONARY); + } + + auto it = decoders_.find(encoding); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); + } + + if (page->encoding() == Encoding::PLAIN_DICTIONARY || + page->encoding() == Encoding::PLAIN) { + auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); + dictionary->SetData(page->num_values(), page->data(), page->size()); + + // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // DictionaryPage buffer is no longer required after this step + // + // TODO(wesm): investigate whether this all-or-nothing decoding of the + // dictionary makes sense and whether performance can be improved + + std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_); + decoder->SetDict(dictionary.get()); + decoders_[encoding] = + std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release())); + } else { + ParquetException::NYI("only plain dictionary encoding has been implemented"); + } + + new_dictionary_ = true; + current_decoder_ = decoders_[encoding].get(); + DCHECK(current_decoder_); + } + + // Initialize repetition and definition level decoders on the next data page. + + // If the data page includes repetition and definition levels, we + // initialize the level decoders and return the number of encoded level bytes. + // The return value helps determine the number of bytes in the encoded data. + int64_t InitializeLevelDecoders(const DataPage& page, + Encoding::type repetition_level_encoding, + Encoding::type definition_level_encoding) { + // Read a data page. + num_buffered_values_ = page.num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + + const uint8_t* buffer = page.data(); + int32_t levels_byte_size = 0; + int32_t max_size = page.size(); + + // Data page Layout: Repetition Levels - Definition Levels - encoded values. + // Levels are encoded as rle or bit-packed. + // Init repetition levels + if (max_rep_level_ > 0) { + int32_t rep_levels_bytes = repetition_level_decoder_.SetData( + repetition_level_encoding, max_rep_level_, + static_cast<int>(num_buffered_values_), buffer, max_size); + buffer += rep_levels_bytes; + levels_byte_size += rep_levels_bytes; + max_size -= rep_levels_bytes; + } + // TODO figure a way to set max_def_level_ to 0 + // if the initial value is invalid + + // Init definition levels + if (max_def_level_ > 0) { + int32_t def_levels_bytes = definition_level_decoder_.SetData( + definition_level_encoding, max_def_level_, + static_cast<int>(num_buffered_values_), buffer, max_size); + levels_byte_size += def_levels_bytes; + max_size -= def_levels_bytes; + } + + return levels_byte_size; + } + + int64_t InitializeLevelDecodersV2(const DataPageV2& page) { + // Read a data page. + num_buffered_values_ = page.num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + const uint8_t* buffer = page.data(); + + const int64_t total_levels_length = + static_cast<int64_t>(page.repetition_levels_byte_length()) + + page.definition_levels_byte_length(); + + if (total_levels_length > page.size()) { + throw ParquetException("Data page too small for levels (corrupt header?)"); + } + + if (max_rep_level_ > 0) { + repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(), + max_rep_level_, + static_cast<int>(num_buffered_values_), buffer); + buffer += page.repetition_levels_byte_length(); + } + + if (max_def_level_ > 0) { + definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(), + max_def_level_, + static_cast<int>(num_buffered_values_), buffer); + } + + return total_levels_length; + } + + // Get a decoder object for this page or create a new decoder if this is the + // first page with this encoding. + void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { + const uint8_t* buffer = page.data() + levels_byte_size; + const int64_t data_size = page.size() - levels_byte_size; + + if (data_size < 0) { + throw ParquetException("Page smaller than size of encoded levels"); + } + + Encoding::type encoding = page.encoding(); + + if (IsDictionaryIndexEncoding(encoding)) { + encoding = Encoding::RLE_DICTIONARY; + } + + auto it = decoders_.find(static_cast<int>(encoding)); + if (it != decoders_.end()) { + DCHECK(it->second.get() != nullptr); + if (encoding == Encoding::RLE_DICTIONARY) { + DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); + } + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast<int>(encoding)] = std::move(decoder); + break; + } + case Encoding::BYTE_STREAM_SPLIT: { + auto decoder = MakeTypedDecoder<DType>(Encoding::BYTE_STREAM_SPLIT, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast<int>(encoding)] = std::move(decoder); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: { + auto decoder = MakeTypedDecoder<DType>(Encoding::DELTA_BINARY_PACKED, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast<int>(encoding)] = std::move(decoder); + break; + } + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_encoding_ = encoding; + current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, + static_cast<int>(data_size)); + } + + const ColumnDescriptor* descr_; + const int16_t max_def_level_; + const int16_t max_rep_level_; + + std::unique_ptr<PageReader> pager_; + std::shared_ptr<Page> current_page_; + + // Not set if full schema for this field has no optional or repeated elements + LevelDecoder definition_level_decoder_; + + // Not set for flat schemas. + LevelDecoder repetition_level_decoder_; + + // The total number of values stored in the data page. This is the maximum of + // the number of encoded definition levels or encoded values. For + // non-repeated, required columns, this is equal to the number of encoded + // values. For repeated or optional values, there may be fewer data values + // than levels, and this tells you how many encoded levels there are in that + // case. + int64_t num_buffered_values_; + + // The number of values from the current data page that have been decoded + // into memory + int64_t num_decoded_values_; + + ::arrow::MemoryPool* pool_; + + using DecoderType = TypedDecoder<DType>; + DecoderType* current_decoder_; + Encoding::type current_encoding_; + + /// Flag to signal when a new dictionary has been set, for the benefit of + /// DictionaryRecordReader + bool new_dictionary_; + + // The exposed encoding + ExposedEncoding exposed_encoding_ = ExposedEncoding::NO_ENCODING; + + // Map of encoding type to the respective decoder object. For example, a + // column chunk's data pages may include both dictionary-encoded and + // plain-encoded data. + std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_; + + void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } +}; + +// ---------------------------------------------------------------------- +// TypedColumnReader implementations + +template <typename DType> +class TypedColumnReaderImpl : public TypedColumnReader<DType>, + public ColumnReaderImplBase<DType> { + public: + using T = typename DType::c_type; + + TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, + ::arrow::MemoryPool* pool) + : ColumnReaderImplBase<DType>(descr, pool) { + this->pager_ = std::move(pager); + } + + bool HasNext() override { return this->HasNextInternal(); } + + int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + T* values, int64_t* values_read) override; + + int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + T* values, uint8_t* valid_bits, int64_t valid_bits_offset, + int64_t* levels_read, int64_t* values_read, + int64_t* null_count) override; + + int64_t Skip(int64_t num_rows_to_skip) override; + + Type::type type() const override { return this->descr_->physical_type(); } + + const ColumnDescriptor* descr() const override { return this->descr_; } + + ExposedEncoding GetExposedEncoding() override { return this->exposed_encoding_; }; + + int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels, + int16_t* rep_levels, int32_t* indices, + int64_t* indices_read, const T** dict, + int32_t* dict_len) override; + + protected: + void SetExposedEncoding(ExposedEncoding encoding) override { + this->exposed_encoding_ = encoding; + } + + private: + // Read dictionary indices. Similar to ReadValues but decode data to dictionary indices. + // This function is called only by ReadBatchWithDictionary(). + int64_t ReadDictionaryIndices(int64_t indices_to_read, int32_t* indices) { + auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_); + return decoder->DecodeIndices(static_cast<int>(indices_to_read), indices); + } + + // Get dictionary. The dictionary should have been set by SetDict(). The dictionary is + // owned by the internal decoder and is destroyed when the reader is destroyed. This + // function is called only by ReadBatchWithDictionary() after dictionary is configured. + void GetDictionary(const T** dictionary, int32_t* dictionary_length) { + auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_); + decoder->GetDictionary(dictionary, dictionary_length); + } + + // Read definition and repetition levels. Also return the number of definition levels + // and number of values to read. This function is called before reading values. + void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + int64_t* num_def_levels, int64_t* values_to_read) { + batch_size = + std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); + + // If the field is required and non-repeated, there are no definition levels + if (this->max_def_level_ > 0 && def_levels != nullptr) { + *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + // TODO(wesm): this tallying of values-to-decode can be performed with better + // cache-efficiency if fused with the level decoding. + for (int64_t i = 0; i < *num_def_levels; ++i) { + if (def_levels[i] == this->max_def_level_) { + ++(*values_to_read); + } + } + } else { + // Required field, read all values + *values_to_read = batch_size; + } + + // Not present for non-repeated fields + if (this->max_rep_level_ > 0 && rep_levels != nullptr) { + int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); + if (def_levels != nullptr && *num_def_levels != num_rep_levels) { + throw ParquetException("Number of decoded rep / def levels did not match"); + } + } + } +}; + +template <typename DType> +int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( + int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int32_t* indices, + int64_t* indices_read, const T** dict, int32_t* dict_len) { + bool has_dict_output = dict != nullptr && dict_len != nullptr; + // Similar logic as ReadValues to get pages. + if (!HasNext()) { + *indices_read = 0; + if (has_dict_output) { + *dict = nullptr; + *dict_len = 0; + } + return 0; + } + + // Verify the current data page is dictionary encoded. + if (this->current_encoding_ != Encoding::RLE_DICTIONARY) { + std::stringstream ss; + ss << "Data page is not dictionary encoded. Encoding: " + << EncodingToString(this->current_encoding_); + throw ParquetException(ss.str()); + } + + // Get dictionary pointer and length. + if (has_dict_output) { + GetDictionary(dict, dict_len); + } + + // Similar logic as ReadValues to get def levels and rep levels. + int64_t num_def_levels = 0; + int64_t indices_to_read = 0; + ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &indices_to_read); + + // Read dictionary indices. + *indices_read = ReadDictionaryIndices(indices_to_read, indices); + int64_t total_indices = std::max(num_def_levels, *indices_read); + this->ConsumeBufferedValues(total_indices); + + return total_indices; +} + +template <typename DType> +int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels, + int16_t* rep_levels, T* values, + int64_t* values_read) { + // HasNext invokes ReadNewPage + if (!HasNext()) { + *values_read = 0; + return 0; + } + + // TODO(wesm): keep reading data pages until batch_size is reached, or the + // row group is finished + int64_t num_def_levels = 0; + int64_t values_to_read = 0; + ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); + + *values_read = this->ReadValues(values_to_read, values); + int64_t total_values = std::max(num_def_levels, *values_read); + this->ConsumeBufferedValues(total_values); + + return total_values; +} + +template <typename DType> +int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced( + int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, + uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, + int64_t* values_read, int64_t* null_count_out) { + // HasNext invokes ReadNewPage + if (!HasNext()) { + *levels_read = 0; + *values_read = 0; + *null_count_out = 0; + return 0; + } + + int64_t total_values; + // TODO(wesm): keep reading data pages until batch_size is reached, or the + // row group is finished + batch_size = + std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); + + // If the field is required and non-repeated, there are no definition levels + if (this->max_def_level_ > 0) { + int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + + // Not present for non-repeated fields + if (this->max_rep_level_ > 0) { + int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); + if (num_def_levels != num_rep_levels) { + throw ParquetException("Number of decoded rep / def levels did not match"); + } + } + + const bool has_spaced_values = HasSpacedValues(this->descr_); + int64_t null_count = 0; + if (!has_spaced_values) { + int values_to_read = 0; + for (int64_t i = 0; i < num_def_levels; ++i) { + if (def_levels[i] == this->max_def_level_) { + ++values_to_read; + } + } + total_values = this->ReadValues(values_to_read, values); + ::arrow::BitUtil::SetBitsTo(valid_bits, valid_bits_offset, + /*length=*/total_values, + /*bits_are_set=*/true); + *values_read = total_values; + } else { + internal::LevelInfo info; + info.repeated_ancestor_def_level = this->max_def_level_ - 1; + info.def_level = this->max_def_level_; + info.rep_level = this->max_rep_level_; + internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = num_def_levels; + validity_io.valid_bits = valid_bits; + validity_io.valid_bits_offset = valid_bits_offset; + validity_io.null_count = null_count; + validity_io.values_read = *values_read; + + internal::DefLevelsToBitmap(def_levels, num_def_levels, info, &validity_io); + null_count = validity_io.null_count; + *values_read = validity_io.values_read; + + total_values = + this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count), + valid_bits, valid_bits_offset); + } + *levels_read = num_def_levels; + *null_count_out = null_count; + + } else { + // Required field, read all values + total_values = this->ReadValues(batch_size, values); + ::arrow::BitUtil::SetBitsTo(valid_bits, valid_bits_offset, + /*length=*/total_values, + /*bits_are_set=*/true); + *null_count_out = 0; + *values_read = total_values; + *levels_read = total_values; + } + + this->ConsumeBufferedValues(*levels_read); + return total_values; +} + +template <typename DType> +int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_rows_to_skip) { + int64_t rows_to_skip = num_rows_to_skip; + while (HasNext() && rows_to_skip > 0) { + // If the number of rows to skip is more than the number of undecoded values, skip the + // Page. + if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) { + rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_; + this->num_decoded_values_ = this->num_buffered_values_; + } else { + // We need to read this Page + // Jump to the right offset in the Page + int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint + int64_t values_read = 0; + + // This will be enough scratch space to accommodate 16-bit levels or any + // value type + int value_size = type_traits<DType::type_num>::value_byte_size; + std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer( + this->pool_, batch_size * std::max<int>(sizeof(int16_t), value_size)); + + do { + batch_size = std::min(batch_size, rows_to_skip); + values_read = + ReadBatch(static_cast<int>(batch_size), + reinterpret_cast<int16_t*>(scratch->mutable_data()), + reinterpret_cast<int16_t*>(scratch->mutable_data()), + reinterpret_cast<T*>(scratch->mutable_data()), &values_read); + rows_to_skip -= values_read; + } while (values_read > 0 && rows_to_skip > 0); + } + } + return num_rows_to_skip - rows_to_skip; +} + +} // namespace + +// ---------------------------------------------------------------------- +// Dynamic column reader constructor + +std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr, + std::unique_ptr<PageReader> pager, + MemoryPool* pool) { + switch (descr->physical_type()) { + case Type::BOOLEAN: + return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager), + pool); + case Type::INT32: + return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager), + pool); + case Type::INT64: + return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager), + pool); + case Type::INT96: + return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager), + pool); + case Type::FLOAT: + return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager), + pool); + case Type::DOUBLE: + return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager), + pool); + case Type::BYTE_ARRAY: + return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>( + descr, std::move(pager), pool); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager), + pool); + default: + ParquetException::NYI("type reader not implemented"); + } + // Unreachable code, but suppress compiler warning + return std::shared_ptr<ColumnReader>(nullptr); +} + +// ---------------------------------------------------------------------- +// RecordReader + +namespace internal { +namespace { + +// The minimum number of repetition/definition levels to decode at a time, for +// better vectorized performance when doing many smaller record reads +constexpr int64_t kMinLevelBatchSize = 1024; + +template <typename DType> +class TypedRecordReader : public ColumnReaderImplBase<DType>, + virtual public RecordReader { + public: + using T = typename DType::c_type; + using BASE = ColumnReaderImplBase<DType>; + TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool) + : BASE(descr, pool) { + leaf_info_ = leaf_info; + nullable_values_ = leaf_info.HasNullableValues(); + at_record_start_ = true; + records_read_ = 0; + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + levels_written_ = 0; + levels_position_ = 0; + levels_capacity_ = 0; + uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY); + + if (uses_values_) { + values_ = AllocateBuffer(pool); + } + valid_bits_ = AllocateBuffer(pool); + def_levels_ = AllocateBuffer(pool); + rep_levels_ = AllocateBuffer(pool); + Reset(); + } + + int64_t available_values_current_page() const { + return this->num_buffered_values_ - this->num_decoded_values_; + } + + // Compute the values capacity in bytes for the given number of elements + int64_t bytes_for_values(int64_t nitems) const { + int64_t type_size = GetTypeByteSize(this->descr_->physical_type()); + int64_t bytes_for_values = -1; + if (MultiplyWithOverflow(nitems, type_size, &bytes_for_values)) { + throw ParquetException("Total size of items too large"); + } + return bytes_for_values; + } + + int64_t ReadRecords(int64_t num_records) override { + // Delimit records, then read values at the end + int64_t records_read = 0; + + if (levels_position_ < levels_written_) { + records_read += ReadRecordData(num_records); + } + + int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); + + // If we are in the middle of a record, we continue until reaching the + // desired number of records or the end of the current record if we've found + // enough records + while (!at_record_start_ || records_read < num_records) { + // Is there more data to read in this row group? + if (!this->HasNextInternal()) { + if (!at_record_start_) { + // We ended the row group while inside a record that we haven't seen + // the end of yet. So increment the record count for the last record in + // the row group + ++records_read; + at_record_start_ = true; + } + break; + } + + /// We perform multiple batch reads until we either exhaust the row group + /// or observe the desired number of records + int64_t batch_size = std::min(level_batch_size, available_values_current_page()); + + // No more data in column + if (batch_size == 0) { + break; + } + + if (this->max_def_level_ > 0) { + ReserveLevels(batch_size); + + int16_t* def_levels = this->def_levels() + levels_written_; + int16_t* rep_levels = this->rep_levels() + levels_written_; + + // Not present for non-repeated fields + int64_t levels_read = 0; + if (this->max_rep_level_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + throw ParquetException("Number of decoded rep / def levels did not match"); + } + } else if (this->max_def_level_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + } + + // Exhausted column chunk + if (levels_read == 0) { + break; + } + + levels_written_ += levels_read; + records_read += ReadRecordData(num_records - records_read); + } else { + // No repetition or definition levels + batch_size = std::min(num_records - records_read, batch_size); + records_read += ReadRecordData(batch_size); + } + } + + return records_read; + } + + // We may outwardly have the appearance of having exhausted a column chunk + // when in fact we are in the middle of processing the last batch + bool has_values_to_process() const { return levels_position_ < levels_written_; } + + std::shared_ptr<ResizableBuffer> ReleaseValues() override { + if (uses_values_) { + auto result = values_; + PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; + } else { + return nullptr; + } + } + + std::shared_ptr<ResizableBuffer> ReleaseIsValid() override { + if (leaf_info_.HasNullableValues()) { + auto result = valid_bits_; + PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true)); + valid_bits_ = AllocateBuffer(this->pool_); + return result; + } else { + return nullptr; + } + } + + // Process written repetition/definition levels to reach the end of + // records. Process no more levels than necessary to delimit the indicated + // number of logical records. Updates internal state of RecordReader + // + // \return Number of records delimited + int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { + int64_t values_to_read = 0; + int64_t records_read = 0; + + const int16_t* def_levels = this->def_levels() + levels_position_; + const int16_t* rep_levels = this->rep_levels() + levels_position_; + + DCHECK_GT(this->max_rep_level_, 0); + + // Count logical records and number of values to read + while (levels_position_ < levels_written_) { + const int16_t rep_level = *rep_levels++; + if (rep_level == 0) { + // If at_record_start_ is true, we are seeing the start of a record + // for the second time, such as after repeated calls to + // DelimitRecords. In this case we must continue until we find + // another record start or exhausting the ColumnChunk + if (!at_record_start_) { + // We've reached the end of a record; increment the record count. + ++records_read; + if (records_read == num_records) { + // We've found the number of records we were looking for. Set + // at_record_start_ to true and break + at_record_start_ = true; + break; + } + } + } + // We have decided to consume the level at this position; therefore we + // must advance until we find another record boundary + at_record_start_ = false; + + const int16_t def_level = *def_levels++; + if (def_level == this->max_def_level_) { + ++values_to_read; + } + ++levels_position_; + } + *values_seen = values_to_read; + return records_read; + } + + void Reserve(int64_t capacity) override { + ReserveLevels(capacity); + ReserveValues(capacity); + } + + int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) { + if (extra_size < 0) { + throw ParquetException("Negative size (corrupt file?)"); + } + int64_t target_size = -1; + if (AddWithOverflow(size, extra_size, &target_size)) { + throw ParquetException("Allocation size too large (corrupt file?)"); + } + if (target_size >= (1LL << 62)) { + throw ParquetException("Allocation size too large (corrupt file?)"); + } + if (capacity >= target_size) { + return capacity; + } + return BitUtil::NextPower2(target_size); + } + + void ReserveLevels(int64_t extra_levels) { + if (this->max_def_level_ > 0) { + const int64_t new_levels_capacity = + UpdateCapacity(levels_capacity_, levels_written_, extra_levels); + if (new_levels_capacity > levels_capacity_) { + constexpr auto kItemSize = static_cast<int64_t>(sizeof(int16_t)); + int64_t capacity_in_bytes = -1; + if (MultiplyWithOverflow(new_levels_capacity, kItemSize, &capacity_in_bytes)) { + throw ParquetException("Allocation size too large (corrupt file?)"); + } + PARQUET_THROW_NOT_OK(def_levels_->Resize(capacity_in_bytes, false)); + if (this->max_rep_level_ > 0) { + PARQUET_THROW_NOT_OK(rep_levels_->Resize(capacity_in_bytes, false)); + } + levels_capacity_ = new_levels_capacity; + } + } + } + + void ReserveValues(int64_t extra_values) { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + // XXX(wesm): A hack to avoid memory allocation when reading directly + // into builder classes + if (uses_values_) { + PARQUET_THROW_NOT_OK( + values_->Resize(bytes_for_values(new_values_capacity), false)); + } + values_capacity_ = new_values_capacity; + } + if (leaf_info_.HasNullableValues()) { + int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } + + void Reset() override { + ResetValues(); + + if (levels_written_ > 0) { + const int64_t levels_remaining = levels_written_ - levels_position_; + // Shift remaining levels to beginning of buffer and trim to only the number + // of decoded levels remaining + int16_t* def_data = def_levels(); + int16_t* rep_data = rep_levels(); + + std::copy(def_data + levels_position_, def_data + levels_written_, def_data); + PARQUET_THROW_NOT_OK( + def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); + + if (this->max_rep_level_ > 0) { + std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); + PARQUET_THROW_NOT_OK( + rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); + } + + levels_written_ -= levels_position_; + levels_position_ = 0; + levels_capacity_ = levels_remaining; + } + + records_read_ = 0; + + // Call Finish on the binary builders to reset them + } + + void SetPageReader(std::unique_ptr<PageReader> reader) override { + at_record_start_ = true; + this->pager_ = std::move(reader); + ResetDecoders(); + } + + bool HasMoreData() const override { return this->pager_ != nullptr; } + + // Dictionary decoders must be reset when advancing row groups + void ResetDecoders() { this->decoders_.clear(); } + + virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { + uint8_t* valid_bits = valid_bits_->mutable_data(); + const int64_t valid_bits_offset = values_written_; + + int64_t num_decoded = this->current_decoder_->DecodeSpaced( + ValuesHead<T>(), static_cast<int>(values_with_nulls), + static_cast<int>(null_count), valid_bits, valid_bits_offset); + DCHECK_EQ(num_decoded, values_with_nulls); + } + + virtual void ReadValuesDense(int64_t values_to_read) { + int64_t num_decoded = + this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read)); + DCHECK_EQ(num_decoded, values_to_read); + } + + // Return number of logical records read + int64_t ReadRecordData(int64_t num_records) { + // Conservative upper bound + const int64_t possible_num_values = + std::max(num_records, levels_written_ - levels_position_); + ReserveValues(possible_num_values); + + const int64_t start_levels_position = levels_position_; + + int64_t values_to_read = 0; + int64_t records_read = 0; + if (this->max_rep_level_ > 0) { + records_read = DelimitRecords(num_records, &values_to_read); + } else if (this->max_def_level_ > 0) { + // No repetition levels, skip delimiting logic. Each level represents a + // null or not null entry + records_read = std::min(levels_written_ - levels_position_, num_records); + + // This is advanced by DelimitRecords, which we skipped + levels_position_ += records_read; + } else { + records_read = values_to_read = num_records; + } + + int64_t null_count = 0; + if (leaf_info_.HasNullableValues()) { + ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = levels_position_ - start_levels_position; + validity_io.valid_bits = valid_bits_->mutable_data(); + validity_io.valid_bits_offset = values_written_; + + DefLevelsToBitmap(def_levels() + start_levels_position, + levels_position_ - start_levels_position, leaf_info_, + &validity_io); + values_to_read = validity_io.values_read - validity_io.null_count; + null_count = validity_io.null_count; + DCHECK_GE(values_to_read, 0); + ReadValuesSpaced(validity_io.values_read, null_count); + } else { + DCHECK_GE(values_to_read, 0); + ReadValuesDense(values_to_read); + } + if (this->leaf_info_.def_level > 0) { + // Optional, repeated, or some mix thereof + this->ConsumeBufferedValues(levels_position_ - start_levels_position); + } else { + // Flat, non-repeated + this->ConsumeBufferedValues(values_to_read); + } + // Total values, including null spaces, if any + values_written_ += values_to_read + null_count; + null_count_ += null_count; + + return records_read; + } + + void DebugPrintState() override { + const int16_t* def_levels = this->def_levels(); + const int16_t* rep_levels = this->rep_levels(); + const int64_t total_levels_read = levels_position_; + + const T* vals = reinterpret_cast<const T*>(this->values()); + + std::cout << "def levels: "; + for (int64_t i = 0; i < total_levels_read; ++i) { + std::cout << def_levels[i] << " "; + } + std::cout << std::endl; + + std::cout << "rep levels: "; + for (int64_t i = 0; i < total_levels_read; ++i) { + std::cout << rep_levels[i] << " "; + } + std::cout << std::endl; + + std::cout << "values: "; + for (int64_t i = 0; i < this->values_written(); ++i) { + std::cout << vals[i] << " "; + } + std::cout << std::endl; + } + + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + } + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + } + } + + protected: + template <typename T> + T* ValuesHead() { + return reinterpret_cast<T*>(values_->mutable_data()) + values_written_; + } + LevelInfo leaf_info_; +}; + +class FLBARecordReader : public TypedRecordReader<FLBAType>, + virtual public BinaryRecordReader { + public: + FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader<FLBAType>(descr, leaf_info, pool), builder_(nullptr) { + DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); + int byte_width = descr_->type_length(); + std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); + builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_)); + } + + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } + + void ReadValuesDense(int64_t values_to_read) override { + auto values = ValuesHead<FLBA>(); + int64_t num_decoded = + this->current_decoder_->Decode(values, static_cast<int>(values_to_read)); + DCHECK_EQ(num_decoded, values_to_read); + + for (int64_t i = 0; i < num_decoded; i++) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + uint8_t* valid_bits = valid_bits_->mutable_data(); + const int64_t valid_bits_offset = values_written_; + auto values = ValuesHead<FLBA>(); + + int64_t num_decoded = this->current_decoder_->DecodeSpaced( + values, static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits, valid_bits_offset); + DCHECK_EQ(num_decoded, values_to_read); + + for (int64_t i = 0; i < num_decoded; i++) { + if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } else { + PARQUET_THROW_NOT_OK(builder_->AppendNull()); + } + } + ResetValues(); + } + + private: + std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; +}; + +class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>, + virtual public BinaryRecordReader { + public: + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) { + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + } + + ::arrow::ArrayVector GetBuilderChunks() override { + ::arrow::ArrayVector result = accumulator_.chunks; + if (result.size() == 0 || accumulator_.builder->length() > 0) { + std::shared_ptr<::arrow::Array> last_chunk; + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + result.push_back(std::move(last_chunk)); + } + accumulator_.chunks = {}; + return result; + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast<int>(values_to_read), &accumulator_); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), values_written_, &accumulator_); + DCHECK_EQ(num_decoded, values_to_read - null_count); + ResetValues(); + } + + private: + // Helper data structure for accumulating builder chunks + typename EncodingTraits<ByteArrayType>::Accumulator accumulator_; +}; + +class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>, + virtual public DictionaryRecordReader { + public: + ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool), builder_(pool) { + this->read_dictionary_ = true; + } + + std::shared_ptr<::arrow::ChunkedArray> GetResult() override { + FlushBuilder(); + std::vector<std::shared_ptr<::arrow::Array>> result; + std::swap(result, result_chunks_); + return std::make_shared<::arrow::ChunkedArray>(std::move(result), builder_.type()); + } + + void FlushBuilder() { + if (builder_.length() > 0) { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_.Finish(&chunk)); + result_chunks_.emplace_back(std::move(chunk)); + + // Also clears the dictionary memo table + builder_.Reset(); + } + } + + void MaybeWriteNewDictionary() { + if (this->new_dictionary_) { + /// If there is a new dictionary, we may need to flush the builder, then + /// insert the new dictionary values + FlushBuilder(); + builder_.ResetFull(); + auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); + decoder->InsertDictionary(&builder_); + this->new_dictionary_ = false; + } + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = 0; + if (current_encoding_ == Encoding::RLE_DICTIONARY) { + MaybeWriteNewDictionary(); + auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); + num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_); + } else { + num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast<int>(values_to_read), &builder_); + + /// Flush values since they have been copied into the builder + ResetValues(); + } + DCHECK_EQ(num_decoded, values_to_read); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = 0; + if (current_encoding_ == Encoding::RLE_DICTIONARY) { + MaybeWriteNewDictionary(); + auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); + num_decoded = decoder->DecodeIndicesSpaced( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), values_written_, &builder_); + } else { + num_decoded = this->current_decoder_->DecodeArrow( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), values_written_, &builder_); + + /// Flush values since they have been copied into the builder + ResetValues(); + } + DCHECK_EQ(num_decoded, values_to_read - null_count); + } + + private: + using BinaryDictDecoder = DictDecoder<ByteArrayType>; + + ::arrow::BinaryDictionary32Builder builder_; + std::vector<std::shared_ptr<::arrow::Array>> result_chunks_; +}; + +// TODO(wesm): Implement these to some satisfaction +template <> +void TypedRecordReader<Int96Type>::DebugPrintState() {} + +template <> +void TypedRecordReader<ByteArrayType>::DebugPrintState() {} + +template <> +void TypedRecordReader<FLBAType>::DebugPrintState() {} + +std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr, + LevelInfo leaf_info, + ::arrow::MemoryPool* pool, + bool read_dictionary) { + if (read_dictionary) { + return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info, pool); + } else { + return std::make_shared<ByteArrayChunkedRecordReader>(descr, leaf_info, pool); + } +} + +} // namespace + +std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, + LevelInfo leaf_info, MemoryPool* pool, + const bool read_dictionary) { + switch (descr->physical_type()) { + case Type::BOOLEAN: + return std::make_shared<TypedRecordReader<BooleanType>>(descr, leaf_info, pool); + case Type::INT32: + return std::make_shared<TypedRecordReader<Int32Type>>(descr, leaf_info, pool); + case Type::INT64: + return std::make_shared<TypedRecordReader<Int64Type>>(descr, leaf_info, pool); + case Type::INT96: + return std::make_shared<TypedRecordReader<Int96Type>>(descr, leaf_info, pool); + case Type::FLOAT: + return std::make_shared<TypedRecordReader<FloatType>>(descr, leaf_info, pool); + case Type::DOUBLE: + return std::make_shared<TypedRecordReader<DoubleType>>(descr, leaf_info, pool); + case Type::BYTE_ARRAY: + return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared<FLBARecordReader>(descr, leaf_info, pool); + default: { + // PARQUET-1481: This can occur if the file is corrupt + std::stringstream ss; + ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type()); + throw ParquetException(ss.str()); + } + } + // Unreachable code, but suppress compiler warning + return nullptr; +} + +} // namespace internal +} // namespace parquet |