diff options
Diffstat (limited to 'src/arrow/cpp/src/parquet/metadata.cc')
-rw-r--r-- | src/arrow/cpp/src/parquet/metadata.cc | 1797 |
1 files changed, 1797 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/metadata.cc b/src/arrow/cpp/src/parquet/metadata.cc new file mode 100644 index 000000000..0f99530dd --- /dev/null +++ b/src/arrow/cpp/src/parquet/metadata.cc @@ -0,0 +1,1797 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/metadata.h" + +#include <algorithm> +#include <cinttypes> +#include <ostream> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/io/memory.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" +#include "parquet/encryption/encryption_internal.h" +#include "parquet/encryption/internal_file_decryptor.h" +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/schema_internal.h" +#include "parquet/statistics.h" +#include "parquet/thrift_internal.h" + +namespace parquet { + +const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() { + static ApplicationVersion version("parquet-mr", 1, 8, 0); + return version; +} + +const ApplicationVersion& ApplicationVersion::PARQUET_816_FIXED_VERSION() { + static ApplicationVersion version("parquet-mr", 1, 2, 9); + return version; +} + +const ApplicationVersion& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() { + static ApplicationVersion version("parquet-cpp", 1, 3, 0); + return version; +} + +const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() { + static ApplicationVersion version("parquet-mr", 1, 10, 0); + return version; +} + +std::string ParquetVersionToString(ParquetVersion::type ver) { + switch (ver) { + case ParquetVersion::PARQUET_1_0: + return "1.0"; + ARROW_SUPPRESS_DEPRECATION_WARNING + case ParquetVersion::PARQUET_2_0: + return "pseudo-2.0"; + ARROW_UNSUPPRESS_DEPRECATION_WARNING + case ParquetVersion::PARQUET_2_4: + return "2.4"; + case ParquetVersion::PARQUET_2_6: + return "2.6"; + } + + // This should be unreachable + return "UNKNOWN"; +} + +template <typename DType> +static std::shared_ptr<Statistics> MakeTypedColumnStats( + const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { + // If ColumnOrder is defined, return max_value and min_value + if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) { + return MakeStatistics<DType>( + descr, metadata.statistics.min_value, metadata.statistics.max_value, + metadata.num_values - metadata.statistics.null_count, + metadata.statistics.null_count, metadata.statistics.distinct_count, + metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value, + metadata.statistics.__isset.null_count, + metadata.statistics.__isset.distinct_count); + } + // Default behavior + return MakeStatistics<DType>( + descr, metadata.statistics.min, metadata.statistics.max, + metadata.num_values - metadata.statistics.null_count, + metadata.statistics.null_count, metadata.statistics.distinct_count, + metadata.statistics.__isset.max || metadata.statistics.__isset.min, + metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count); +} + +std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_data, + const ColumnDescriptor* descr) { + switch (static_cast<Type::type>(meta_data.type)) { + case Type::BOOLEAN: + return MakeTypedColumnStats<BooleanType>(meta_data, descr); + case Type::INT32: + return MakeTypedColumnStats<Int32Type>(meta_data, descr); + case Type::INT64: + return MakeTypedColumnStats<Int64Type>(meta_data, descr); + case Type::INT96: + return MakeTypedColumnStats<Int96Type>(meta_data, descr); + case Type::DOUBLE: + return MakeTypedColumnStats<DoubleType>(meta_data, descr); + case Type::FLOAT: + return MakeTypedColumnStats<FloatType>(meta_data, descr); + case Type::BYTE_ARRAY: + return MakeTypedColumnStats<ByteArrayType>(meta_data, descr); + case Type::FIXED_LEN_BYTE_ARRAY: + return MakeTypedColumnStats<FLBAType>(meta_data, descr); + case Type::UNDEFINED: + break; + } + throw ParquetException("Can't decode page statistics for selected column type"); +} + +// MetaData Accessor + +// ColumnCryptoMetaData +class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl { + public: + explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData* crypto_metadata) + : crypto_metadata_(crypto_metadata) {} + + bool encrypted_with_footer_key() const { + return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY; + } + bool encrypted_with_column_key() const { + return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY; + } + std::shared_ptr<schema::ColumnPath> path_in_schema() const { + return std::make_shared<schema::ColumnPath>( + crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema); + } + const std::string& key_metadata() const { + return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.key_metadata; + } + + private: + const format::ColumnCryptoMetaData* crypto_metadata_; +}; + +std::unique_ptr<ColumnCryptoMetaData> ColumnCryptoMetaData::Make( + const uint8_t* metadata) { + return std::unique_ptr<ColumnCryptoMetaData>(new ColumnCryptoMetaData(metadata)); +} + +ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata) + : impl_(new ColumnCryptoMetaDataImpl( + reinterpret_cast<const format::ColumnCryptoMetaData*>(metadata))) {} + +ColumnCryptoMetaData::~ColumnCryptoMetaData() = default; + +std::shared_ptr<schema::ColumnPath> ColumnCryptoMetaData::path_in_schema() const { + return impl_->path_in_schema(); +} +bool ColumnCryptoMetaData::encrypted_with_footer_key() const { + return impl_->encrypted_with_footer_key(); +} +const std::string& ColumnCryptoMetaData::key_metadata() const { + return impl_->key_metadata(); +} + +// ColumnChunk metadata +class ColumnChunkMetaData::ColumnChunkMetaDataImpl { + public: + explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column, + const ColumnDescriptor* descr, + int16_t row_group_ordinal, int16_t column_ordinal, + const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : column_(column), descr_(descr), writer_version_(writer_version) { + column_metadata_ = &column->meta_data; + if (column->__isset.crypto_metadata) { // column metadata is encrypted + format::ColumnCryptoMetaData ccmd = column->crypto_metadata; + + if (ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY) { + if (file_decryptor != nullptr && file_decryptor->properties() != nullptr) { + // should decrypt metadata + std::shared_ptr<schema::ColumnPath> path = std::make_shared<schema::ColumnPath>( + ccmd.ENCRYPTION_WITH_COLUMN_KEY.path_in_schema); + std::string key_metadata = ccmd.ENCRYPTION_WITH_COLUMN_KEY.key_metadata; + + std::string aad_column_metadata = encryption::CreateModuleAad( + file_decryptor->file_aad(), encryption::kColumnMetaData, row_group_ordinal, + column_ordinal, static_cast<int16_t>(-1)); + auto decryptor = file_decryptor->GetColumnMetaDecryptor( + path->ToDotString(), key_metadata, aad_column_metadata); + auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size()); + DeserializeThriftMsg( + reinterpret_cast<const uint8_t*>(column->encrypted_column_metadata.c_str()), + &len, &decrypted_metadata_, decryptor); + column_metadata_ = &decrypted_metadata_; + } else { + throw ParquetException( + "Cannot decrypt ColumnMetadata." + " FileDecryption is not setup correctly"); + } + } + } + for (const auto& encoding : column_metadata_->encodings) { + encodings_.push_back(LoadEnumSafe(&encoding)); + } + for (const auto& encoding_stats : column_metadata_->encoding_stats) { + encoding_stats_.push_back({LoadEnumSafe(&encoding_stats.page_type), + LoadEnumSafe(&encoding_stats.encoding), + encoding_stats.count}); + } + possible_stats_ = nullptr; + } + + bool Equals(const ColumnChunkMetaDataImpl& other) const { + return *column_metadata_ == *other.column_metadata_; + } + + // column chunk + inline int64_t file_offset() const { return column_->file_offset; } + inline const std::string& file_path() const { return column_->file_path; } + + inline Type::type type() const { return LoadEnumSafe(&column_metadata_->type); } + + inline int64_t num_values() const { return column_metadata_->num_values; } + + std::shared_ptr<schema::ColumnPath> path_in_schema() { + return std::make_shared<schema::ColumnPath>(column_metadata_->path_in_schema); + } + + // Check if statistics are set and are valid + // 1) Must be set in the metadata + // 2) Statistics must not be corrupted + inline bool is_stats_set() const { + DCHECK(writer_version_ != nullptr); + // If the column statistics don't exist or column sort order is unknown + // we cannot use the column stats + if (!column_metadata_->__isset.statistics || + descr_->sort_order() == SortOrder::UNKNOWN) { + return false; + } + if (possible_stats_ == nullptr) { + possible_stats_ = MakeColumnStats(*column_metadata_, descr_); + } + EncodedStatistics encodedStatistics = possible_stats_->Encode(); + return writer_version_->HasCorrectStatistics(type(), encodedStatistics, + descr_->sort_order()); + } + + inline std::shared_ptr<Statistics> statistics() const { + return is_stats_set() ? possible_stats_ : nullptr; + } + + inline Compression::type compression() const { + return LoadEnumSafe(&column_metadata_->codec); + } + + const std::vector<Encoding::type>& encodings() const { return encodings_; } + + const std::vector<PageEncodingStats>& encoding_stats() const { return encoding_stats_; } + + inline bool has_dictionary_page() const { + return column_metadata_->__isset.dictionary_page_offset; + } + + inline int64_t dictionary_page_offset() const { + return column_metadata_->dictionary_page_offset; + } + + inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; } + + inline bool has_index_page() const { + return column_metadata_->__isset.index_page_offset; + } + + inline int64_t index_page_offset() const { return column_metadata_->index_page_offset; } + + inline int64_t total_compressed_size() const { + return column_metadata_->total_compressed_size; + } + + inline int64_t total_uncompressed_size() const { + return column_metadata_->total_uncompressed_size; + } + + inline std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const { + if (column_->__isset.crypto_metadata) { + return ColumnCryptoMetaData::Make( + reinterpret_cast<const uint8_t*>(&column_->crypto_metadata)); + } else { + return nullptr; + } + } + + private: + mutable std::shared_ptr<Statistics> possible_stats_; + std::vector<Encoding::type> encodings_; + std::vector<PageEncodingStats> encoding_stats_; + const format::ColumnChunk* column_; + const format::ColumnMetaData* column_metadata_; + format::ColumnMetaData decrypted_metadata_; + const ColumnDescriptor* descr_; + const ApplicationVersion* writer_version_; +}; + +std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make( + const void* metadata, const ColumnDescriptor* descr, + const ApplicationVersion* writer_version, int16_t row_group_ordinal, + int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) { + return std::unique_ptr<ColumnChunkMetaData>( + new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal, + writer_version, std::move(file_decryptor))); +} + +ColumnChunkMetaData::ColumnChunkMetaData( + const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, + int16_t column_ordinal, const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : impl_{new ColumnChunkMetaDataImpl( + reinterpret_cast<const format::ColumnChunk*>(metadata), descr, + row_group_ordinal, column_ordinal, writer_version, std::move(file_decryptor))} { +} + +ColumnChunkMetaData::~ColumnChunkMetaData() = default; + +// column chunk +int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); } + +const std::string& ColumnChunkMetaData::file_path() const { return impl_->file_path(); } + +Type::type ColumnChunkMetaData::type() const { return impl_->type(); } + +int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); } + +std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const { + return impl_->path_in_schema(); +} + +std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const { + return impl_->statistics(); +} + +bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } + +bool ColumnChunkMetaData::has_dictionary_page() const { + return impl_->has_dictionary_page(); +} + +int64_t ColumnChunkMetaData::dictionary_page_offset() const { + return impl_->dictionary_page_offset(); +} + +int64_t ColumnChunkMetaData::data_page_offset() const { + return impl_->data_page_offset(); +} + +bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); } + +int64_t ColumnChunkMetaData::index_page_offset() const { + return impl_->index_page_offset(); +} + +Compression::type ColumnChunkMetaData::compression() const { + return impl_->compression(); +} + +bool ColumnChunkMetaData::can_decompress() const { + return ::arrow::util::Codec::IsAvailable(compression()); +} + +const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const { + return impl_->encodings(); +} + +const std::vector<PageEncodingStats>& ColumnChunkMetaData::encoding_stats() const { + return impl_->encoding_stats(); +} + +int64_t ColumnChunkMetaData::total_uncompressed_size() const { + return impl_->total_uncompressed_size(); +} + +int64_t ColumnChunkMetaData::total_compressed_size() const { + return impl_->total_compressed_size(); +} + +std::unique_ptr<ColumnCryptoMetaData> ColumnChunkMetaData::crypto_metadata() const { + return impl_->crypto_metadata(); +} + +bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const { + return impl_->Equals(*other.impl_); +} + +// row-group metadata +class RowGroupMetaData::RowGroupMetaDataImpl { + public: + explicit RowGroupMetaDataImpl(const format::RowGroup* row_group, + const SchemaDescriptor* schema, + const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : row_group_(row_group), + schema_(schema), + writer_version_(writer_version), + file_decryptor_(std::move(file_decryptor)) {} + + bool Equals(const RowGroupMetaDataImpl& other) const { + return *row_group_ == *other.row_group_; + } + + inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); } + + inline int64_t num_rows() const { return row_group_->num_rows; } + + inline int64_t total_byte_size() const { return row_group_->total_byte_size; } + + inline int64_t total_compressed_size() const { + return row_group_->total_compressed_size; + } + + inline int64_t file_offset() const { return row_group_->file_offset; } + + inline const SchemaDescriptor* schema() const { return schema_; } + + std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) { + if (i < num_columns()) { + return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i), + writer_version_, row_group_->ordinal, + static_cast<int16_t>(i), file_decryptor_); + } + throw ParquetException("The file only has ", num_columns(), + " columns, requested metadata for column: ", i); + } + + private: + const format::RowGroup* row_group_; + const SchemaDescriptor* schema_; + const ApplicationVersion* writer_version_; + std::shared_ptr<InternalFileDecryptor> file_decryptor_; +}; + +std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make( + const void* metadata, const SchemaDescriptor* schema, + const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor) { + return std::unique_ptr<RowGroupMetaData>( + new RowGroupMetaData(metadata, schema, writer_version, std::move(file_decryptor))); +} + +RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema, + const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : impl_{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup*>(metadata), + schema, writer_version, std::move(file_decryptor))} { +} + +RowGroupMetaData::~RowGroupMetaData() = default; + +bool RowGroupMetaData::Equals(const RowGroupMetaData& other) const { + return impl_->Equals(*other.impl_); +} + +int RowGroupMetaData::num_columns() const { return impl_->num_columns(); } + +int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); } + +int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); } + +int64_t RowGroupMetaData::total_compressed_size() const { + return impl_->total_compressed_size(); +} + +int64_t RowGroupMetaData::file_offset() const { return impl_->file_offset(); } + +const SchemaDescriptor* RowGroupMetaData::schema() const { return impl_->schema(); } + +std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const { + return impl_->ColumnChunk(i); +} + +bool RowGroupMetaData::can_decompress() const { + int n_columns = num_columns(); + for (int i = 0; i < n_columns; i++) { + if (!ColumnChunk(i)->can_decompress()) { + return false; + } + } + return true; +} + +// file metadata +class FileMetaData::FileMetaDataImpl { + public: + FileMetaDataImpl() = default; + + explicit FileMetaDataImpl( + const void* metadata, uint32_t* metadata_len, + std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr) + : file_decryptor_(file_decryptor) { + metadata_.reset(new format::FileMetaData); + + auto footer_decryptor = + file_decryptor_ != nullptr ? file_decryptor->GetFooterDecryptor() : nullptr; + + DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len, + metadata_.get(), footer_decryptor); + metadata_len_ = *metadata_len; + + if (metadata_->__isset.created_by) { + writer_version_ = ApplicationVersion(metadata_->created_by); + } else { + writer_version_ = ApplicationVersion("unknown 0.0.0"); + } + + InitSchema(); + InitColumnOrders(); + InitKeyValueMetadata(); + } + + bool VerifySignature(const void* signature) { + // verify decryption properties are set + if (file_decryptor_ == nullptr) { + throw ParquetException("Decryption not set properly. cannot verify signature"); + } + // serialize the footer + uint8_t* serialized_data; + uint32_t serialized_len = metadata_len_; + ThriftSerializer serializer; + serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data); + + // encrypt with nonce + auto nonce = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature)); + auto tag = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature)) + + encryption::kNonceLength; + + std::string key = file_decryptor_->GetFooterKey(); + std::string aad = encryption::CreateFooterAad(file_decryptor_->file_aad()); + + auto aes_encryptor = encryption::AesEncryptor::Make( + file_decryptor_->algorithm(), static_cast<int>(key.size()), true, nullptr); + + std::shared_ptr<Buffer> encrypted_buffer = std::static_pointer_cast<ResizableBuffer>( + AllocateBuffer(file_decryptor_->pool(), + aes_encryptor->CiphertextSizeDelta() + serialized_len)); + uint32_t encrypted_len = aes_encryptor->SignedFooterEncrypt( + serialized_data, serialized_len, str2bytes(key), static_cast<int>(key.size()), + str2bytes(aad), static_cast<int>(aad.size()), nonce, + encrypted_buffer->mutable_data()); + // Delete AES encryptor object. It was created only to verify the footer signature. + aes_encryptor->WipeOut(); + delete aes_encryptor; + return 0 == + memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength, + tag, encryption::kGcmTagLength); + } + + inline uint32_t size() const { return metadata_len_; } + inline int num_columns() const { return schema_.num_columns(); } + inline int64_t num_rows() const { return metadata_->num_rows; } + inline int num_row_groups() const { + return static_cast<int>(metadata_->row_groups.size()); + } + inline int32_t version() const { return metadata_->version; } + inline const std::string& created_by() const { return metadata_->created_by; } + inline int num_schema_elements() const { + return static_cast<int>(metadata_->schema.size()); + } + + inline bool is_encryption_algorithm_set() const { + return metadata_->__isset.encryption_algorithm; + } + inline EncryptionAlgorithm encryption_algorithm() { + return FromThrift(metadata_->encryption_algorithm); + } + inline const std::string& footer_signing_key_metadata() { + return metadata_->footer_signing_key_metadata; + } + + const ApplicationVersion& writer_version() const { return writer_version_; } + + void WriteTo(::arrow::io::OutputStream* dst, + const std::shared_ptr<Encryptor>& encryptor) const { + ThriftSerializer serializer; + // Only in encrypted files with plaintext footers the + // encryption_algorithm is set in footer + if (is_encryption_algorithm_set()) { + uint8_t* serialized_data; + uint32_t serialized_len; + serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data); + + // encrypt the footer key + std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() + + serialized_len); + unsigned encrypted_len = + encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data()); + + // write unencrypted footer + PARQUET_THROW_NOT_OK(dst->Write(serialized_data, serialized_len)); + // Write signature (nonce and tag) + PARQUET_THROW_NOT_OK( + dst->Write(encrypted_data.data() + 4, encryption::kNonceLength)); + PARQUET_THROW_NOT_OK( + dst->Write(encrypted_data.data() + encrypted_len - encryption::kGcmTagLength, + encryption::kGcmTagLength)); + } else { // either plaintext file (when encryptor is null) + // or encrypted file with encrypted footer + serializer.Serialize(metadata_.get(), dst, encryptor); + } + } + + std::unique_ptr<RowGroupMetaData> RowGroup(int i) { + if (!(i < num_row_groups())) { + std::stringstream ss; + ss << "The file only has " << num_row_groups() + << " row groups, requested metadata for row group: " << i; + throw ParquetException(ss.str()); + } + return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_, + file_decryptor_); + } + + bool Equals(const FileMetaDataImpl& other) const { + return *metadata_ == *other.metadata_; + } + + const SchemaDescriptor* schema() const { return &schema_; } + + const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const { + return key_value_metadata_; + } + + void set_file_path(const std::string& path) { + for (format::RowGroup& row_group : metadata_->row_groups) { + for (format::ColumnChunk& chunk : row_group.columns) { + chunk.__set_file_path(path); + } + } + } + + format::RowGroup& row_group(int i) { + DCHECK_LT(i, num_row_groups()); + return metadata_->row_groups[i]; + } + + void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) { + if (!schema()->Equals(*other->schema())) { + throw ParquetException("AppendRowGroups requires equal schemas."); + } + + // ARROW-13654: `other` may point to self, be careful not to enter an infinite loop + const int n = other->num_row_groups(); + metadata_->row_groups.reserve(metadata_->row_groups.size() + n); + for (int i = 0; i < n; i++) { + format::RowGroup other_rg = other->row_group(i); + metadata_->num_rows += other_rg.num_rows; + metadata_->row_groups.push_back(std::move(other_rg)); + } + } + + std::shared_ptr<FileMetaData> Subset(const std::vector<int>& row_groups) { + for (int i : row_groups) { + if (i < num_row_groups()) continue; + + throw ParquetException( + "The file only has ", num_row_groups(), + " row groups, but requested a subset including row group: ", i); + } + + std::shared_ptr<FileMetaData> out(new FileMetaData()); + out->impl_.reset(new FileMetaDataImpl()); + out->impl_->metadata_.reset(new format::FileMetaData()); + + auto metadata = out->impl_->metadata_.get(); + metadata->version = metadata_->version; + metadata->schema = metadata_->schema; + + metadata->row_groups.resize(row_groups.size()); + int i = 0; + for (int selected_index : row_groups) { + metadata->num_rows += row_group(selected_index).num_rows; + metadata->row_groups[i++] = row_group(selected_index); + } + + metadata->key_value_metadata = metadata_->key_value_metadata; + metadata->created_by = metadata_->created_by; + metadata->column_orders = metadata_->column_orders; + metadata->encryption_algorithm = metadata_->encryption_algorithm; + metadata->footer_signing_key_metadata = metadata_->footer_signing_key_metadata; + metadata->__isset = metadata_->__isset; + + out->impl_->schema_ = schema_; + out->impl_->writer_version_ = writer_version_; + out->impl_->key_value_metadata_ = key_value_metadata_; + out->impl_->file_decryptor_ = file_decryptor_; + + return out; + } + + void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor) { + file_decryptor_ = file_decryptor; + } + + private: + friend FileMetaDataBuilder; + uint32_t metadata_len_ = 0; + std::unique_ptr<format::FileMetaData> metadata_; + SchemaDescriptor schema_; + ApplicationVersion writer_version_; + std::shared_ptr<const KeyValueMetadata> key_value_metadata_; + std::shared_ptr<InternalFileDecryptor> file_decryptor_; + + void InitSchema() { + if (metadata_->schema.empty()) { + throw ParquetException("Empty file schema (no root)"); + } + schema_.Init(schema::Unflatten(&metadata_->schema[0], + static_cast<int>(metadata_->schema.size()))); + } + + void InitColumnOrders() { + // update ColumnOrder + std::vector<parquet::ColumnOrder> column_orders; + if (metadata_->__isset.column_orders) { + for (auto column_order : metadata_->column_orders) { + if (column_order.__isset.TYPE_ORDER) { + column_orders.push_back(ColumnOrder::type_defined_); + } else { + column_orders.push_back(ColumnOrder::undefined_); + } + } + } else { + column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_); + } + + schema_.updateColumnOrders(column_orders); + } + + void InitKeyValueMetadata() { + std::shared_ptr<KeyValueMetadata> metadata = nullptr; + if (metadata_->__isset.key_value_metadata) { + metadata = std::make_shared<KeyValueMetadata>(); + for (const auto& it : metadata_->key_value_metadata) { + metadata->Append(it.key, it.value); + } + } + key_value_metadata_ = std::move(metadata); + } +}; + +std::shared_ptr<FileMetaData> FileMetaData::Make( + const void* metadata, uint32_t* metadata_len, + std::shared_ptr<InternalFileDecryptor> file_decryptor) { + // This FileMetaData ctor is private, not compatible with std::make_shared + return std::shared_ptr<FileMetaData>( + new FileMetaData(metadata, metadata_len, file_decryptor)); +} + +FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : impl_{std::unique_ptr<FileMetaDataImpl>( + new FileMetaDataImpl(metadata, metadata_len, file_decryptor))} {} + +FileMetaData::FileMetaData() + : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {} + +FileMetaData::~FileMetaData() = default; + +bool FileMetaData::Equals(const FileMetaData& other) const { + return impl_->Equals(*other.impl_); +} + +std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const { + return impl_->RowGroup(i); +} + +bool FileMetaData::VerifySignature(const void* signature) { + return impl_->VerifySignature(signature); +} + +uint32_t FileMetaData::size() const { return impl_->size(); } + +int FileMetaData::num_columns() const { return impl_->num_columns(); } + +int64_t FileMetaData::num_rows() const { return impl_->num_rows(); } + +int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); } + +bool FileMetaData::can_decompress() const { + int n_row_groups = num_row_groups(); + for (int i = 0; i < n_row_groups; i++) { + if (!RowGroup(i)->can_decompress()) { + return false; + } + } + return true; +} + +bool FileMetaData::is_encryption_algorithm_set() const { + return impl_->is_encryption_algorithm_set(); +} + +EncryptionAlgorithm FileMetaData::encryption_algorithm() const { + return impl_->encryption_algorithm(); +} + +const std::string& FileMetaData::footer_signing_key_metadata() const { + return impl_->footer_signing_key_metadata(); +} + +void FileMetaData::set_file_decryptor( + std::shared_ptr<InternalFileDecryptor> file_decryptor) { + impl_->set_file_decryptor(file_decryptor); +} + +ParquetVersion::type FileMetaData::version() const { + switch (impl_->version()) { + case 1: + return ParquetVersion::PARQUET_1_0; + case 2: + return ParquetVersion::PARQUET_2_LATEST; + default: + // Improperly set version, assuming Parquet 1.0 + break; + } + return ParquetVersion::PARQUET_1_0; +} + +const ApplicationVersion& FileMetaData::writer_version() const { + return impl_->writer_version(); +} + +const std::string& FileMetaData::created_by() const { return impl_->created_by(); } + +int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); } + +const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); } + +const std::shared_ptr<const KeyValueMetadata>& FileMetaData::key_value_metadata() const { + return impl_->key_value_metadata(); +} + +void FileMetaData::set_file_path(const std::string& path) { impl_->set_file_path(path); } + +void FileMetaData::AppendRowGroups(const FileMetaData& other) { + impl_->AppendRowGroups(other.impl_); +} + +std::shared_ptr<FileMetaData> FileMetaData::Subset( + const std::vector<int>& row_groups) const { + return impl_->Subset(row_groups); +} + +void FileMetaData::WriteTo(::arrow::io::OutputStream* dst, + const std::shared_ptr<Encryptor>& encryptor) const { + return impl_->WriteTo(dst, encryptor); +} + +class FileCryptoMetaData::FileCryptoMetaDataImpl { + public: + FileCryptoMetaDataImpl() = default; + + explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) { + metadata_.reset(new format::FileCryptoMetaData); + DeserializeThriftMsg(metadata, metadata_len, metadata_.get()); + metadata_len_ = *metadata_len; + } + + EncryptionAlgorithm encryption_algorithm() { + return FromThrift(metadata_->encryption_algorithm); + } + const std::string& key_metadata() { return metadata_->key_metadata; } + void WriteTo(::arrow::io::OutputStream* dst) const { + ThriftSerializer serializer; + serializer.Serialize(metadata_.get(), dst); + } + + private: + friend FileMetaDataBuilder; + std::unique_ptr<format::FileCryptoMetaData> metadata_; + uint32_t metadata_len_; +}; + +EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() const { + return impl_->encryption_algorithm(); +} + +const std::string& FileCryptoMetaData::key_metadata() const { + return impl_->key_metadata(); +} + +std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make( + const uint8_t* serialized_metadata, uint32_t* metadata_len) { + return std::shared_ptr<FileCryptoMetaData>( + new FileCryptoMetaData(serialized_metadata, metadata_len)); +} + +FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata, + uint32_t* metadata_len) + : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len)) {} + +FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {} + +FileCryptoMetaData::~FileCryptoMetaData() = default; + +void FileCryptoMetaData::WriteTo(::arrow::io::OutputStream* dst) const { + impl_->WriteTo(dst); +} + +std::string FileMetaData::SerializeToString() const { + // We need to pass in an initial size. Since it will automatically + // increase the buffer size to hold the metadata, we just leave it 0. + PARQUET_ASSIGN_OR_THROW(auto serializer, ::arrow::io::BufferOutputStream::Create(0)); + WriteTo(serializer.get()); + PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, serializer->Finish()); + return metadata_buffer->ToString(); +} + +ApplicationVersion::ApplicationVersion(std::string application, int major, int minor, + int patch) + : application_(std::move(application)), version{major, minor, patch, "", "", ""} {} + +namespace { +// Parse the application version format and set parsed values to +// ApplicationVersion. +// +// The application version format must be compatible parquet-mr's +// one. See also: +// * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/VersionParser.java +// * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java +// +// The application version format: +// "${APPLICATION_NAME}" +// "${APPLICATION_NAME} version ${VERSION}" +// "${APPLICATION_NAME} version ${VERSION} (build ${BUILD_NAME})" +// +// Eg: +// parquet-cpp +// parquet-cpp version 1.5.0ab-xyz5.5.0+cd +// parquet-cpp version 1.5.0ab-xyz5.5.0+cd (build abcd) +// +// The VERSION format: +// "${MAJOR}" +// "${MAJOR}.${MINOR}" +// "${MAJOR}.${MINOR}.${PATCH}" +// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}" +// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}" +// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}+${BUILD_INFO}" +// "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}+${BUILD_INFO}" +// "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}" +// "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}+${BUILD_INFO}" +// "${MAJOR}.${MINOR}.${PATCH}+${BUILD_INFO}" +// +// Eg: +// 1 +// 1.5 +// 1.5.0 +// 1.5.0ab +// 1.5.0ab-cdh5.5.0 +// 1.5.0ab-cdh5.5.0+cd +// 1.5.0ab+cd +// 1.5.0-cdh5.5.0 +// 1.5.0-cdh5.5.0+cd +// 1.5.0+cd +class ApplicationVersionParser { + public: + ApplicationVersionParser(const std::string& created_by, + ApplicationVersion& application_version) + : created_by_(created_by), + application_version_(application_version), + spaces_(" \t\v\r\n\f"), + digits_("0123456789") {} + + void Parse() { + application_version_.application_ = "unknown"; + application_version_.version = {0, 0, 0, "", "", ""}; + + if (!ParseApplicationName()) { + return; + } + if (!ParseVersion()) { + return; + } + if (!ParseBuildName()) { + return; + } + } + + private: + bool IsSpace(const std::string& string, const size_t& offset) { + auto target = ::arrow::util::string_view(string).substr(offset, 1); + return target.find_first_of(spaces_) != ::arrow::util::string_view::npos; + } + + void RemovePrecedingSpaces(const std::string& string, size_t& start, + const size_t& end) { + while (start < end && IsSpace(string, start)) { + ++start; + } + } + + void RemoveTrailingSpaces(const std::string& string, const size_t& start, size_t& end) { + while (start < (end - 1) && (end - 1) < string.size() && IsSpace(string, end - 1)) { + --end; + } + } + + bool ParseApplicationName() { + std::string version_mark(" version "); + auto version_mark_position = created_by_.find(version_mark); + size_t application_name_end; + // No VERSION and BUILD_NAME. + if (version_mark_position == std::string::npos) { + version_start_ = std::string::npos; + application_name_end = created_by_.size(); + } else { + version_start_ = version_mark_position + version_mark.size(); + application_name_end = version_mark_position; + } + + size_t application_name_start = 0; + RemovePrecedingSpaces(created_by_, application_name_start, application_name_end); + RemoveTrailingSpaces(created_by_, application_name_start, application_name_end); + application_version_.application_ = created_by_.substr( + application_name_start, application_name_end - application_name_start); + + return true; + } + + bool ParseVersion() { + // No VERSION. + if (version_start_ == std::string::npos) { + return false; + } + + RemovePrecedingSpaces(created_by_, version_start_, created_by_.size()); + version_end_ = created_by_.find(" (", version_start_); + // No BUILD_NAME. + if (version_end_ == std::string::npos) { + version_end_ = created_by_.size(); + } + RemoveTrailingSpaces(created_by_, version_start_, version_end_); + // No VERSION. + if (version_start_ == version_end_) { + return false; + } + version_string_ = created_by_.substr(version_start_, version_end_ - version_start_); + + if (!ParseVersionMajor()) { + return false; + } + if (!ParseVersionMinor()) { + return false; + } + if (!ParseVersionPatch()) { + return false; + } + if (!ParseVersionUnknown()) { + return false; + } + if (!ParseVersionPreRelease()) { + return false; + } + if (!ParseVersionBuildInfo()) { + return false; + } + + return true; + } + + bool ParseVersionMajor() { + size_t version_major_start = 0; + auto version_major_end = version_string_.find_first_not_of(digits_); + // MAJOR only. + if (version_major_end == std::string::npos) { + version_major_end = version_string_.size(); + version_parsing_position_ = version_major_end; + } else { + // No ".". + if (version_string_[version_major_end] != '.') { + return false; + } + // No MAJOR. + if (version_major_end == version_major_start) { + return false; + } + version_parsing_position_ = version_major_end + 1; // +1 is for '.'. + } + auto version_major_string = version_string_.substr( + version_major_start, version_major_end - version_major_start); + application_version_.version.major = atoi(version_major_string.c_str()); + return true; + } + + bool ParseVersionMinor() { + auto version_minor_start = version_parsing_position_; + auto version_minor_end = + version_string_.find_first_not_of(digits_, version_minor_start); + // MAJOR.MINOR only. + if (version_minor_end == std::string::npos) { + version_minor_end = version_string_.size(); + version_parsing_position_ = version_minor_end; + } else { + // No ".". + if (version_string_[version_minor_end] != '.') { + return false; + } + // No MINOR. + if (version_minor_end == version_minor_start) { + return false; + } + version_parsing_position_ = version_minor_end + 1; // +1 is for '.'. + } + auto version_minor_string = version_string_.substr( + version_minor_start, version_minor_end - version_minor_start); + application_version_.version.minor = atoi(version_minor_string.c_str()); + return true; + } + + bool ParseVersionPatch() { + auto version_patch_start = version_parsing_position_; + auto version_patch_end = + version_string_.find_first_not_of(digits_, version_patch_start); + // No UNKNOWN, PRE_RELEASE and BUILD_INFO. + if (version_patch_end == std::string::npos) { + version_patch_end = version_string_.size(); + } + // No PATCH. + if (version_patch_end == version_patch_start) { + return false; + } + auto version_patch_string = version_string_.substr( + version_patch_start, version_patch_end - version_patch_start); + application_version_.version.patch = atoi(version_patch_string.c_str()); + version_parsing_position_ = version_patch_end; + return true; + } + + bool ParseVersionUnknown() { + // No UNKNOWN. + if (version_parsing_position_ == version_string_.size()) { + return true; + } + auto version_unknown_start = version_parsing_position_; + auto version_unknown_end = version_string_.find_first_of("-+", version_unknown_start); + // No PRE_RELEASE and BUILD_INFO + if (version_unknown_end == std::string::npos) { + version_unknown_end = version_string_.size(); + } + application_version_.version.unknown = version_string_.substr( + version_unknown_start, version_unknown_end - version_unknown_start); + version_parsing_position_ = version_unknown_end; + return true; + } + + bool ParseVersionPreRelease() { + // No PRE_RELEASE. + if (version_parsing_position_ == version_string_.size() || + version_string_[version_parsing_position_] != '-') { + return true; + } + + auto version_pre_release_start = version_parsing_position_ + 1; // +1 is for '-'. + auto version_pre_release_end = + version_string_.find_first_of("+", version_pre_release_start); + // No BUILD_INFO + if (version_pre_release_end == std::string::npos) { + version_pre_release_end = version_string_.size(); + } + application_version_.version.pre_release = version_string_.substr( + version_pre_release_start, version_pre_release_end - version_pre_release_start); + version_parsing_position_ = version_pre_release_end; + return true; + } + + bool ParseVersionBuildInfo() { + // No BUILD_INFO. + if (version_parsing_position_ == version_string_.size() || + version_string_[version_parsing_position_] != '+') { + return true; + } + + auto version_build_info_start = version_parsing_position_ + 1; // +1 is for '+'. + application_version_.version.build_info = + version_string_.substr(version_build_info_start); + return true; + } + + bool ParseBuildName() { + std::string build_mark(" (build "); + auto build_mark_position = created_by_.find(build_mark, version_end_); + // No BUILD_NAME. + if (build_mark_position == std::string::npos) { + return false; + } + auto build_name_start = build_mark_position + build_mark.size(); + RemovePrecedingSpaces(created_by_, build_name_start, created_by_.size()); + auto build_name_end = created_by_.find_first_of(")", build_name_start); + // No end ")". + if (build_name_end == std::string::npos) { + return false; + } + RemoveTrailingSpaces(created_by_, build_name_start, build_name_end); + application_version_.build_ = + created_by_.substr(build_name_start, build_name_end - build_name_start); + + return true; + } + + const std::string& created_by_; + ApplicationVersion& application_version_; + + // For parsing. + std::string spaces_; + std::string digits_; + size_t version_parsing_position_; + size_t version_start_; + size_t version_end_; + std::string version_string_; +}; +} // namespace + +ApplicationVersion::ApplicationVersion(const std::string& created_by) { + ApplicationVersionParser parser(created_by, *this); + parser.Parse(); +} + +bool ApplicationVersion::VersionLt(const ApplicationVersion& other_version) const { + if (application_ != other_version.application_) return false; + + if (version.major < other_version.version.major) return true; + if (version.major > other_version.version.major) return false; + DCHECK_EQ(version.major, other_version.version.major); + if (version.minor < other_version.version.minor) return true; + if (version.minor > other_version.version.minor) return false; + DCHECK_EQ(version.minor, other_version.version.minor); + return version.patch < other_version.version.patch; +} + +bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) const { + return application_ == other_version.application_ && + version.major == other_version.version.major && + version.minor == other_version.version.minor && + version.patch == other_version.version.patch; +} + +// Reference: +// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java +// PARQUET-686 has more discussion on statistics +bool ApplicationVersion::HasCorrectStatistics(Type::type col_type, + EncodedStatistics& statistics, + SortOrder::type sort_order) const { + // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed + // correctly for all types + if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) || + (application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) { + // Only SIGNED are valid unless max and min are the same + // (in which case the sort order does not matter) + bool max_equals_min = statistics.has_min && statistics.has_max + ? statistics.min() == statistics.max() + : false; + if (SortOrder::SIGNED != sort_order && !max_equals_min) { + return false; + } + + // Statistics of other types are OK + if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) { + return true; + } + } + // created_by is not populated, which could have been caused by + // parquet-mr during the same time as PARQUET-251, see PARQUET-297 + if (application_ == "unknown") { + return true; + } + + // Unknown sort order has incorrect stats + if (SortOrder::UNKNOWN == sort_order) { + return false; + } + + // PARQUET-251 + if (VersionLt(PARQUET_251_FIXED_VERSION())) { + return false; + } + + return true; +} + +// MetaData Builders +// row-group metadata +class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { + public: + explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props, + const ColumnDescriptor* column) + : owned_column_chunk_(new format::ColumnChunk), + properties_(std::move(props)), + column_(column) { + Init(owned_column_chunk_.get()); + } + + explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props, + const ColumnDescriptor* column, + format::ColumnChunk* column_chunk) + : properties_(std::move(props)), column_(column) { + Init(column_chunk); + } + + const void* contents() const { return column_chunk_; } + + // column chunk + void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); } + + // column metadata + void SetStatistics(const EncodedStatistics& val) { + column_chunk_->meta_data.__set_statistics(ToThrift(val)); + } + + void Finish(int64_t num_values, int64_t dictionary_page_offset, + int64_t index_page_offset, int64_t data_page_offset, + int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, + bool dictionary_fallback, + const std::map<Encoding::type, int32_t>& dict_encoding_stats, + const std::map<Encoding::type, int32_t>& data_encoding_stats, + const std::shared_ptr<Encryptor>& encryptor) { + if (dictionary_page_offset > 0) { + column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); + column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); + } else { + column_chunk_->__set_file_offset(data_page_offset + compressed_size); + } + column_chunk_->__isset.meta_data = true; + column_chunk_->meta_data.__set_num_values(num_values); + if (index_page_offset >= 0) { + column_chunk_->meta_data.__set_index_page_offset(index_page_offset); + } + column_chunk_->meta_data.__set_data_page_offset(data_page_offset); + column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size); + column_chunk_->meta_data.__set_total_compressed_size(compressed_size); + + std::vector<format::Encoding::type> thrift_encodings; + if (has_dictionary) { + thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); + if (properties_->version() == ParquetVersion::PARQUET_1_0) { + thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); + } else { + thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding())); + } + } else { // Dictionary not enabled + thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); + } + thrift_encodings.push_back(ToThrift(Encoding::RLE)); + // Only PLAIN encoding is supported for fallback in V1 + // TODO(majetideepak): Use user specified encoding for V2 + if (dictionary_fallback) { + thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); + } + column_chunk_->meta_data.__set_encodings(thrift_encodings); + std::vector<format::PageEncodingStats> thrift_encoding_stats; + // Add dictionary page encoding stats + for (const auto& entry : dict_encoding_stats) { + format::PageEncodingStats dict_enc_stat; + dict_enc_stat.__set_page_type(format::PageType::DICTIONARY_PAGE); + dict_enc_stat.__set_encoding(ToThrift(entry.first)); + dict_enc_stat.__set_count(entry.second); + thrift_encoding_stats.push_back(dict_enc_stat); + } + // Add data page encoding stats + for (const auto& entry : data_encoding_stats) { + format::PageEncodingStats data_enc_stat; + data_enc_stat.__set_page_type(format::PageType::DATA_PAGE); + data_enc_stat.__set_encoding(ToThrift(entry.first)); + data_enc_stat.__set_count(entry.second); + thrift_encoding_stats.push_back(data_enc_stat); + } + column_chunk_->meta_data.__set_encoding_stats(thrift_encoding_stats); + + const auto& encrypt_md = + properties_->column_encryption_properties(column_->path()->ToDotString()); + // column is encrypted + if (encrypt_md != nullptr && encrypt_md->is_encrypted()) { + column_chunk_->__isset.crypto_metadata = true; + format::ColumnCryptoMetaData ccmd; + if (encrypt_md->is_encrypted_with_footer_key()) { + // encrypted with footer key + ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true; + ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey()); + } else { // encrypted with column key + format::EncryptionWithColumnKey eck; + eck.__set_key_metadata(encrypt_md->key_metadata()); + eck.__set_path_in_schema(column_->path()->ToDotVector()); + ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true; + ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck); + } + column_chunk_->__set_crypto_metadata(ccmd); + + bool encrypted_footer = + properties_->file_encryption_properties()->encrypted_footer(); + bool encrypt_metadata = + !encrypted_footer || !encrypt_md->is_encrypted_with_footer_key(); + if (encrypt_metadata) { + ThriftSerializer serializer; + // Serialize and encrypt ColumnMetadata separately + // Thrift-serialize the ColumnMetaData structure, + // encrypt it with the column key, and write to encrypted_column_metadata + uint8_t* serialized_data; + uint32_t serialized_len; + + serializer.SerializeToBuffer(&column_chunk_->meta_data, &serialized_len, + &serialized_data); + + std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() + + serialized_len); + unsigned encrypted_len = + encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data()); + + const char* temp = + const_cast<const char*>(reinterpret_cast<char*>(encrypted_data.data())); + std::string encrypted_column_metadata(temp, encrypted_len); + column_chunk_->__set_encrypted_column_metadata(encrypted_column_metadata); + + if (encrypted_footer) { + column_chunk_->__isset.meta_data = false; + } else { + // Keep redacted metadata version for old readers + column_chunk_->__isset.meta_data = true; + column_chunk_->meta_data.__isset.statistics = false; + column_chunk_->meta_data.__isset.encoding_stats = false; + } + } + } + } + + void WriteTo(::arrow::io::OutputStream* sink) { + ThriftSerializer serializer; + serializer.Serialize(column_chunk_, sink); + } + + const ColumnDescriptor* descr() const { return column_; } + int64_t total_compressed_size() const { + return column_chunk_->meta_data.total_compressed_size; + } + + private: + void Init(format::ColumnChunk* column_chunk) { + column_chunk_ = column_chunk; + + column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type())); + column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector()); + column_chunk_->meta_data.__set_codec( + ToThrift(properties_->compression(column_->path()))); + } + + format::ColumnChunk* column_chunk_; + std::unique_ptr<format::ColumnChunk> owned_column_chunk_; + const std::shared_ptr<WriterProperties> properties_; + const ColumnDescriptor* column_; +}; + +std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make( + std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column, + void* contents) { + return std::unique_ptr<ColumnChunkMetaDataBuilder>( + new ColumnChunkMetaDataBuilder(std::move(props), column, contents)); +} + +std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make( + std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column) { + return std::unique_ptr<ColumnChunkMetaDataBuilder>( + new ColumnChunkMetaDataBuilder(std::move(props), column)); +} + +ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( + std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column) + : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>( + new ColumnChunkMetaDataBuilderImpl(std::move(props), column))} {} + +ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( + std::shared_ptr<WriterProperties> props, const ColumnDescriptor* column, + void* contents) + : impl_{std::unique_ptr<ColumnChunkMetaDataBuilderImpl>( + new ColumnChunkMetaDataBuilderImpl( + std::move(props), column, + reinterpret_cast<format::ColumnChunk*>(contents)))} {} + +ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() = default; + +const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); } + +void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { + impl_->set_file_path(path); +} + +void ColumnChunkMetaDataBuilder::Finish( + int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, + int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, + bool has_dictionary, bool dictionary_fallback, + const std::map<Encoding::type, int32_t>& dict_encoding_stats, + const std::map<Encoding::type, int32_t>& data_encoding_stats, + const std::shared_ptr<Encryptor>& encryptor) { + impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, + compressed_size, uncompressed_size, has_dictionary, dictionary_fallback, + dict_encoding_stats, data_encoding_stats, encryptor); +} + +void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) { + impl_->WriteTo(sink); +} + +const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { + return impl_->descr(); +} + +void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) { + impl_->SetStatistics(result); +} + +int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const { + return impl_->total_compressed_size(); +} + +class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { + public: + explicit RowGroupMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props, + const SchemaDescriptor* schema, void* contents) + : properties_(std::move(props)), schema_(schema), next_column_(0) { + row_group_ = reinterpret_cast<format::RowGroup*>(contents); + InitializeColumns(schema->num_columns()); + } + + ColumnChunkMetaDataBuilder* NextColumnChunk() { + if (!(next_column_ < num_columns())) { + std::stringstream ss; + ss << "The schema only has " << num_columns() + << " columns, requested metadata for column: " << next_column_; + throw ParquetException(ss.str()); + } + auto column = schema_->Column(next_column_); + auto column_builder = ColumnChunkMetaDataBuilder::Make( + properties_, column, &row_group_->columns[next_column_++]); + auto column_builder_ptr = column_builder.get(); + column_builders_.push_back(std::move(column_builder)); + return column_builder_ptr; + } + + int current_column() { return next_column_ - 1; } + + void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) { + if (!(next_column_ == schema_->num_columns())) { + std::stringstream ss; + ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns() + << " columns are initialized"; + throw ParquetException(ss.str()); + } + + int64_t file_offset = 0; + int64_t total_compressed_size = 0; + for (int i = 0; i < schema_->num_columns(); i++) { + if (!(row_group_->columns[i].file_offset >= 0)) { + std::stringstream ss; + ss << "Column " << i << " is not complete."; + throw ParquetException(ss.str()); + } + if (i == 0) { + const format::ColumnMetaData& first_col = row_group_->columns[0].meta_data; + // As per spec, file_offset for the row group points to the first + // dictionary or data page of the column. + if (first_col.__isset.dictionary_page_offset && + first_col.dictionary_page_offset > 0) { + file_offset = first_col.dictionary_page_offset; + } else { + file_offset = first_col.data_page_offset; + } + } + // sometimes column metadata is encrypted and not available to read, + // so we must get total_compressed_size from column builder + total_compressed_size += column_builders_[i]->total_compressed_size(); + } + + row_group_->__set_file_offset(file_offset); + row_group_->__set_total_compressed_size(total_compressed_size); + row_group_->__set_total_byte_size(total_bytes_written); + row_group_->__set_ordinal(row_group_ordinal); + } + + void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; } + + int num_columns() { return static_cast<int>(row_group_->columns.size()); } + + int64_t num_rows() { return row_group_->num_rows; } + + private: + void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); } + + format::RowGroup* row_group_; + const std::shared_ptr<WriterProperties> properties_; + const SchemaDescriptor* schema_; + std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_; + int next_column_; +}; + +std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make( + std::shared_ptr<WriterProperties> props, const SchemaDescriptor* schema_, + void* contents) { + return std::unique_ptr<RowGroupMetaDataBuilder>( + new RowGroupMetaDataBuilder(std::move(props), schema_, contents)); +} + +RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(std::shared_ptr<WriterProperties> props, + const SchemaDescriptor* schema_, + void* contents) + : impl_{new RowGroupMetaDataBuilderImpl(std::move(props), schema_, contents)} {} + +RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() = default; + +ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { + return impl_->NextColumnChunk(); +} + +int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); } + +int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } + +int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); } + +void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) { + impl_->set_num_rows(num_rows); +} + +void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written, + int16_t row_group_ordinal) { + impl_->Finish(total_bytes_written, row_group_ordinal); +} + +// file metadata +// TODO(PARQUET-595) Support key_value_metadata +class FileMetaDataBuilder::FileMetaDataBuilderImpl { + public: + explicit FileMetaDataBuilderImpl( + const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props, + std::shared_ptr<const KeyValueMetadata> key_value_metadata) + : metadata_(new format::FileMetaData()), + properties_(std::move(props)), + schema_(schema), + key_value_metadata_(std::move(key_value_metadata)) { + if (properties_->file_encryption_properties() != nullptr && + properties_->file_encryption_properties()->encrypted_footer()) { + crypto_metadata_.reset(new format::FileCryptoMetaData()); + } + } + + RowGroupMetaDataBuilder* AppendRowGroup() { + row_groups_.emplace_back(); + current_row_group_builder_ = + RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back()); + return current_row_group_builder_.get(); + } + + std::unique_ptr<FileMetaData> Finish() { + int64_t total_rows = 0; + for (auto row_group : row_groups_) { + total_rows += row_group.num_rows; + } + metadata_->__set_num_rows(total_rows); + metadata_->__set_row_groups(row_groups_); + + if (key_value_metadata_) { + metadata_->key_value_metadata.clear(); + metadata_->key_value_metadata.reserve(key_value_metadata_->size()); + for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { + format::KeyValue kv_pair; + kv_pair.__set_key(key_value_metadata_->key(i)); + kv_pair.__set_value(key_value_metadata_->value(i)); + metadata_->key_value_metadata.push_back(kv_pair); + } + metadata_->__isset.key_value_metadata = true; + } + + int32_t file_version = 0; + switch (properties_->version()) { + case ParquetVersion::PARQUET_1_0: + file_version = 1; + break; + default: + file_version = 2; + break; + } + metadata_->__set_version(file_version); + metadata_->__set_created_by(properties_->created_by()); + + // Users cannot set the `ColumnOrder` since we donot not have user defined sort order + // in the spec yet. + // We always default to `TYPE_DEFINED_ORDER`. We can expose it in + // the API once we have user defined sort orders in the Parquet format. + // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType + format::TypeDefinedOrder type_defined_order; + format::ColumnOrder column_order; + column_order.__set_TYPE_ORDER(type_defined_order); + column_order.__isset.TYPE_ORDER = true; + metadata_->column_orders.resize(schema_->num_columns(), column_order); + metadata_->__isset.column_orders = true; + + // if plaintext footer, set footer signing algorithm + auto file_encryption_properties = properties_->file_encryption_properties(); + if (file_encryption_properties && !file_encryption_properties->encrypted_footer()) { + EncryptionAlgorithm signing_algorithm; + EncryptionAlgorithm algo = file_encryption_properties->algorithm(); + signing_algorithm.aad.aad_file_unique = algo.aad.aad_file_unique; + signing_algorithm.aad.supply_aad_prefix = algo.aad.supply_aad_prefix; + if (!algo.aad.supply_aad_prefix) { + signing_algorithm.aad.aad_prefix = algo.aad.aad_prefix; + } + signing_algorithm.algorithm = ParquetCipher::AES_GCM_V1; + + metadata_->__set_encryption_algorithm(ToThrift(signing_algorithm)); + const std::string& footer_signing_key_metadata = + file_encryption_properties->footer_key_metadata(); + if (footer_signing_key_metadata.size() > 0) { + metadata_->__set_footer_signing_key_metadata(footer_signing_key_metadata); + } + } + + ToParquet(static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()), + &metadata_->schema); + auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData()); + file_meta_data->impl_->metadata_ = std::move(metadata_); + file_meta_data->impl_->InitSchema(); + file_meta_data->impl_->InitKeyValueMetadata(); + return file_meta_data; + } + + std::unique_ptr<FileCryptoMetaData> BuildFileCryptoMetaData() { + if (crypto_metadata_ == nullptr) { + return nullptr; + } + + auto file_encryption_properties = properties_->file_encryption_properties(); + + crypto_metadata_->__set_encryption_algorithm( + ToThrift(file_encryption_properties->algorithm())); + std::string key_metadata = file_encryption_properties->footer_key_metadata(); + + if (!key_metadata.empty()) { + crypto_metadata_->__set_key_metadata(key_metadata); + } + + std::unique_ptr<FileCryptoMetaData> file_crypto_metadata = + std::unique_ptr<FileCryptoMetaData>(new FileCryptoMetaData()); + file_crypto_metadata->impl_->metadata_ = std::move(crypto_metadata_); + + return file_crypto_metadata; + } + + protected: + std::unique_ptr<format::FileMetaData> metadata_; + std::unique_ptr<format::FileCryptoMetaData> crypto_metadata_; + + private: + const std::shared_ptr<WriterProperties> properties_; + std::vector<format::RowGroup> row_groups_; + + std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_; + const SchemaDescriptor* schema_; + std::shared_ptr<const KeyValueMetadata> key_value_metadata_; +}; + +std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make( + const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props, + std::shared_ptr<const KeyValueMetadata> key_value_metadata) { + return std::unique_ptr<FileMetaDataBuilder>( + new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata))); +} + +FileMetaDataBuilder::FileMetaDataBuilder( + const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props, + std::shared_ptr<const KeyValueMetadata> key_value_metadata) + : impl_{std::unique_ptr<FileMetaDataBuilderImpl>(new FileMetaDataBuilderImpl( + schema, std::move(props), std::move(key_value_metadata)))} {} + +FileMetaDataBuilder::~FileMetaDataBuilder() = default; + +RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { + return impl_->AppendRowGroup(); +} + +std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); } + +std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() { + return impl_->BuildFileCryptoMetaData(); +} + +} // namespace parquet |