diff options
Diffstat (limited to 'src/arrow/cpp/src/parquet/file_deserialize_test.cc')
-rw-r--r-- | src/arrow/cpp/src/parquet/file_deserialize_test.cc | 372 |
1 files changed, 372 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/file_deserialize_test.cc b/src/arrow/cpp/src/parquet/file_deserialize_test.cc new file mode 100644 index 000000000..d0d333256 --- /dev/null +++ b/src/arrow/cpp/src/parquet/file_deserialize_test.cc @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <cstdint> +#include <cstring> +#include <memory> + +#include "parquet/column_page.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/platform.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +#include "arrow/io/memory.h" +#include "arrow/status.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/compression.h" + +namespace parquet { + +using ::arrow::io::BufferReader; + +// Adds page statistics occupying a certain amount of bytes (for testing very +// large page headers) +template <typename H> +static inline void AddDummyStats(int stat_size, H& header, bool fill_all_stats = false) { + std::vector<uint8_t> stat_bytes(stat_size); + // Some non-zero value + std::fill(stat_bytes.begin(), stat_bytes.end(), 1); + header.statistics.__set_max( + std::string(reinterpret_cast<const char*>(stat_bytes.data()), stat_size)); + + if (fill_all_stats) { + header.statistics.__set_min( + std::string(reinterpret_cast<const char*>(stat_bytes.data()), stat_size)); + header.statistics.__set_null_count(42); + header.statistics.__set_distinct_count(1); + } + + header.__isset.statistics = true; +} + +template <typename H> +static inline void CheckStatistics(const H& expected, const EncodedStatistics& actual) { + if (expected.statistics.__isset.max) { + ASSERT_EQ(expected.statistics.max, actual.max()); + } + if (expected.statistics.__isset.min) { + ASSERT_EQ(expected.statistics.min, actual.min()); + } + if (expected.statistics.__isset.null_count) { + ASSERT_EQ(expected.statistics.null_count, actual.null_count); + } + if (expected.statistics.__isset.distinct_count) { + ASSERT_EQ(expected.statistics.distinct_count, actual.distinct_count); + } +} + +class TestPageSerde : public ::testing::Test { + public: + void SetUp() { + data_page_header_.encoding = format::Encoding::PLAIN; + data_page_header_.definition_level_encoding = format::Encoding::RLE; + data_page_header_.repetition_level_encoding = format::Encoding::RLE; + + ResetStream(); + } + + void InitSerializedPageReader(int64_t num_rows, + Compression::type codec = Compression::UNCOMPRESSED) { + EndStream(); + + auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_); + page_reader_ = PageReader::Open(stream, num_rows, codec); + } + + void WriteDataPageHeader(int max_serialized_len = 1024, int32_t uncompressed_size = 0, + int32_t compressed_size = 0) { + // Simplifying writing serialized data page headers which may or may not + // have meaningful data associated with them + + // Serialize the Page header + page_header_.__set_data_page_header(data_page_header_); + page_header_.uncompressed_page_size = uncompressed_size; + page_header_.compressed_page_size = compressed_size; + page_header_.type = format::PageType::DATA_PAGE; + + ThriftSerializer serializer; + ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get())); + } + + void WriteDataPageHeaderV2(int max_serialized_len = 1024, int32_t uncompressed_size = 0, + int32_t compressed_size = 0) { + // Simplifying writing serialized data page V2 headers which may or may not + // have meaningful data associated with them + + // Serialize the Page header + page_header_.__set_data_page_header_v2(data_page_header_v2_); + page_header_.uncompressed_page_size = uncompressed_size; + page_header_.compressed_page_size = compressed_size; + page_header_.type = format::PageType::DATA_PAGE_V2; + + ThriftSerializer serializer; + ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get())); + } + + void ResetStream() { out_stream_ = CreateOutputStream(); } + + void EndStream() { PARQUET_ASSIGN_OR_THROW(out_buffer_, out_stream_->Finish()); } + + protected: + std::shared_ptr<::arrow::io::BufferOutputStream> out_stream_; + std::shared_ptr<Buffer> out_buffer_; + + std::unique_ptr<PageReader> page_reader_; + format::PageHeader page_header_; + format::DataPageHeader data_page_header_; + format::DataPageHeaderV2 data_page_header_v2_; +}; + +void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) { + ASSERT_EQ(PageType::DATA_PAGE, page->type()); + + const DataPageV1* data_page = static_cast<const DataPageV1*>(page); + ASSERT_EQ(expected.num_values, data_page->num_values()); + ASSERT_EQ(expected.encoding, data_page->encoding()); + ASSERT_EQ(expected.definition_level_encoding, data_page->definition_level_encoding()); + ASSERT_EQ(expected.repetition_level_encoding, data_page->repetition_level_encoding()); + CheckStatistics(expected, data_page->statistics()); +} + +// Overload for DataPageV2 tests. +void CheckDataPageHeader(const format::DataPageHeaderV2 expected, const Page* page) { + ASSERT_EQ(PageType::DATA_PAGE_V2, page->type()); + + const DataPageV2* data_page = static_cast<const DataPageV2*>(page); + ASSERT_EQ(expected.num_values, data_page->num_values()); + ASSERT_EQ(expected.num_nulls, data_page->num_nulls()); + ASSERT_EQ(expected.num_rows, data_page->num_rows()); + ASSERT_EQ(expected.encoding, data_page->encoding()); + ASSERT_EQ(expected.definition_levels_byte_length, + data_page->definition_levels_byte_length()); + ASSERT_EQ(expected.repetition_levels_byte_length, + data_page->repetition_levels_byte_length()); + ASSERT_EQ(expected.is_compressed, data_page->is_compressed()); + CheckStatistics(expected, data_page->statistics()); +} + +TEST_F(TestPageSerde, DataPageV1) { + int stats_size = 512; + const int32_t num_rows = 4444; + AddDummyStats(stats_size, data_page_header_, /* fill_all_stats = */ true); + data_page_header_.num_values = num_rows; + + ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader()); + InitSerializedPageReader(num_rows); + std::shared_ptr<Page> current_page = page_reader_->NextPage(); + ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get())); +} + +TEST_F(TestPageSerde, DataPageV2) { + int stats_size = 512; + const int32_t num_rows = 4444; + AddDummyStats(stats_size, data_page_header_v2_, /* fill_all_stats = */ true); + data_page_header_v2_.num_values = num_rows; + + ASSERT_NO_FATAL_FAILURE(WriteDataPageHeaderV2()); + InitSerializedPageReader(num_rows); + std::shared_ptr<Page> current_page = page_reader_->NextPage(); + ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_v2_, current_page.get())); +} + +TEST_F(TestPageSerde, TestLargePageHeaders) { + int stats_size = 256 * 1024; // 256 KB + AddDummyStats(stats_size, data_page_header_); + + // Any number to verify metadata roundtrip + const int32_t num_rows = 4141; + data_page_header_.num_values = num_rows; + + int max_header_size = 512 * 1024; // 512 KB + ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(max_header_size)); + + ASSERT_OK_AND_ASSIGN(int64_t position, out_stream_->Tell()); + ASSERT_GE(max_header_size, position); + + // check header size is between 256 KB to 16 MB + ASSERT_LE(stats_size, position); + ASSERT_GE(kDefaultMaxPageHeaderSize, position); + + InitSerializedPageReader(num_rows); + std::shared_ptr<Page> current_page = page_reader_->NextPage(); + ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get())); +} + +TEST_F(TestPageSerde, TestFailLargePageHeaders) { + const int32_t num_rows = 1337; // dummy value + + int stats_size = 256 * 1024; // 256 KB + AddDummyStats(stats_size, data_page_header_); + + // Serialize the Page header + int max_header_size = 512 * 1024; // 512 KB + ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(max_header_size)); + ASSERT_OK_AND_ASSIGN(int64_t position, out_stream_->Tell()); + ASSERT_GE(max_header_size, position); + + int smaller_max_size = 128 * 1024; + ASSERT_LE(smaller_max_size, position); + InitSerializedPageReader(num_rows); + + // Set the max page header size to 128 KB, which is less than the current + // header size + page_reader_->set_max_page_header_size(smaller_max_size); + ASSERT_THROW(page_reader_->NextPage(), ParquetException); +} + +TEST_F(TestPageSerde, Compression) { + std::vector<Compression::type> codec_types; + +#ifdef ARROW_WITH_SNAPPY + codec_types.push_back(Compression::SNAPPY); +#endif + +#ifdef ARROW_WITH_BROTLI + codec_types.push_back(Compression::BROTLI); +#endif + +#ifdef ARROW_WITH_GZIP + codec_types.push_back(Compression::GZIP); +#endif + +#ifdef ARROW_WITH_LZ4 + codec_types.push_back(Compression::LZ4); + codec_types.push_back(Compression::LZ4_HADOOP); +#endif + +#ifdef ARROW_WITH_ZSTD + codec_types.push_back(Compression::ZSTD); +#endif + + const int32_t num_rows = 32; // dummy value + data_page_header_.num_values = num_rows; + + const int num_pages = 10; + + std::vector<std::vector<uint8_t>> faux_data; + faux_data.resize(num_pages); + for (int i = 0; i < num_pages; ++i) { + // The pages keep getting larger + int page_size = (i + 1) * 64; + test::random_bytes(page_size, 0, &faux_data[i]); + } + for (auto codec_type : codec_types) { + auto codec = GetCodec(codec_type); + + std::vector<uint8_t> buffer; + for (int i = 0; i < num_pages; ++i) { + const uint8_t* data = faux_data[i].data(); + int data_size = static_cast<int>(faux_data[i].size()); + + int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data); + buffer.resize(max_compressed_size); + + int64_t actual_size; + ASSERT_OK_AND_ASSIGN( + actual_size, codec->Compress(data_size, data, max_compressed_size, &buffer[0])); + + ASSERT_NO_FATAL_FAILURE( + WriteDataPageHeader(1024, data_size, static_cast<int32_t>(actual_size))); + ASSERT_OK(out_stream_->Write(buffer.data(), actual_size)); + } + + InitSerializedPageReader(num_rows * num_pages, codec_type); + + std::shared_ptr<Page> page; + const DataPageV1* data_page; + for (int i = 0; i < num_pages; ++i) { + int data_size = static_cast<int>(faux_data[i].size()); + page = page_reader_->NextPage(); + data_page = static_cast<const DataPageV1*>(page.get()); + ASSERT_EQ(data_size, data_page->size()); + ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size)); + } + + ResetStream(); + } +} // namespace parquet + +TEST_F(TestPageSerde, LZONotSupported) { + // Must await PARQUET-530 + int data_size = 1024; + std::vector<uint8_t> faux_data(data_size); + ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(1024, data_size, data_size)); + ASSERT_OK(out_stream_->Write(faux_data.data(), data_size)); + ASSERT_THROW(InitSerializedPageReader(data_size, Compression::LZO), ParquetException); +} + +// ---------------------------------------------------------------------- +// File structure tests + +class TestParquetFileReader : public ::testing::Test { + public: + void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) { + reader_.reset(new ParquetFileReader()); + + auto reader = std::make_shared<BufferReader>(buffer); + + ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(reader)), + ParquetException); + } + + protected: + std::unique_ptr<ParquetFileReader> reader_; +}; + +TEST_F(TestParquetFileReader, InvalidHeader) { + const char* bad_header = "PAR2"; + + auto buffer = Buffer::Wrap(bad_header, strlen(bad_header)); + ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); +} + +TEST_F(TestParquetFileReader, InvalidFooter) { + // File is smaller than FOOTER_SIZE + const char* bad_file = "PAR1PAR"; + auto buffer = Buffer::Wrap(bad_file, strlen(bad_file)); + ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); + + // Magic number incorrect + const char* bad_file2 = "PAR1PAR2"; + buffer = Buffer::Wrap(bad_file2, strlen(bad_file2)); + ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); +} + +TEST_F(TestParquetFileReader, IncompleteMetadata) { + auto stream = CreateOutputStream(); + + const char* magic = "PAR1"; + + ASSERT_OK(stream->Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic))); + std::vector<uint8_t> bytes(10); + ASSERT_OK(stream->Write(bytes.data(), bytes.size())); + uint32_t metadata_len = 24; + ASSERT_OK( + stream->Write(reinterpret_cast<const uint8_t*>(&metadata_len), sizeof(uint32_t))); + ASSERT_OK(stream->Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic))); + + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish()); + ASSERT_NO_FATAL_FAILURE(AssertInvalidFileThrows(buffer)); +} + +} // namespace parquet |