diff options
Diffstat (limited to 'src/arrow/cpp/src/parquet/reader_test.cc')
-rw-r--r-- | src/arrow/cpp/src/parquet/reader_test.cc | 810 |
1 files changed, 810 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/reader_test.cc b/src/arrow/cpp/src/parquet/reader_test.cc new file mode 100644 index 000000000..2d13266df --- /dev/null +++ b/src/arrow/cpp/src/parquet/reader_test.cc @@ -0,0 +1,810 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fcntl.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <string> + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/io/file.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" + +#include "parquet/column_reader.h" +#include "parquet/column_scanner.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/platform.h" +#include "parquet/printer.h" +#include "parquet/test_util.h" + +using arrow::internal::checked_pointer_cast; + +namespace parquet { +using schema::GroupNode; +using schema::PrimitiveNode; + +using ReadableFile = ::arrow::io::ReadableFile; + +std::string data_file(const char* file) { + std::string dir_string(test::get_data_dir()); + std::stringstream ss; + ss << dir_string << "/" << file; + return ss.str(); +} + +std::string alltypes_plain() { return data_file("alltypes_plain.parquet"); } + +std::string nation_dict_truncated_data_page() { + return data_file("nation.dict-malformed.parquet"); +} + +// LZ4-compressed data files. +// These files come in three flavours: +// - legacy "LZ4" compression type, actually compressed with block LZ4 codec +// (as emitted by some earlier versions of parquet-cpp) +// - legacy "LZ4" compression type, actually compressed with custom Hadoop LZ4 codec +// (as emitted by parquet-mr) +// - "LZ4_RAW" compression type (added in Parquet format version 2.9.0) + +std::string hadoop_lz4_compressed() { return data_file("hadoop_lz4_compressed.parquet"); } + +std::string hadoop_lz4_compressed_larger() { + return data_file("hadoop_lz4_compressed_larger.parquet"); +} + +std::string non_hadoop_lz4_compressed() { + return data_file("non_hadoop_lz4_compressed.parquet"); +} + +std::string lz4_raw_compressed() { return data_file("lz4_raw_compressed.parquet"); } + +std::string lz4_raw_compressed_larger() { + return data_file("lz4_raw_compressed_larger.parquet"); +} + +// TODO: Assert on definition and repetition levels +template <typename DType, typename ValueType> +void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size, + int64_t expected_levels_read, + std::vector<ValueType>& expected_values, + int64_t expected_values_read) { + std::vector<ValueType> values(batch_size); + int64_t values_read; + + auto levels_read = + col->ReadBatch(batch_size, nullptr, nullptr, values.data(), &values_read); + ASSERT_EQ(expected_levels_read, levels_read); + + ASSERT_EQ(expected_values, values); + ASSERT_EQ(expected_values_read, values_read); +} + +void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata, + bool allow_uncompressed_mismatch = false) { + const int64_t total_byte_size = rg_metadata->total_byte_size(); + const int64_t total_compressed_size = rg_metadata->total_compressed_size(); + + ASSERT_GE(total_byte_size, 0); + ASSERT_GE(total_compressed_size, 0); + + int64_t total_column_byte_size = 0; + int64_t total_column_compressed_size = 0; + for (int i = 0; i < rg_metadata->num_columns(); ++i) { + total_column_byte_size += rg_metadata->ColumnChunk(i)->total_uncompressed_size(); + total_column_compressed_size += rg_metadata->ColumnChunk(i)->total_compressed_size(); + } + + if (!allow_uncompressed_mismatch) { + ASSERT_EQ(total_byte_size, total_column_byte_size); + } + if (total_compressed_size != 0) { + ASSERT_EQ(total_compressed_size, total_column_compressed_size); + } +} + +class TestAllTypesPlain : public ::testing::Test { + public: + void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); } + + void TearDown() {} + + protected: + std::unique_ptr<ParquetFileReader> reader_; +}; + +TEST_F(TestAllTypesPlain, NoopConstructDestruct) {} + +TEST_F(TestAllTypesPlain, RowGroupMetaData) { + auto group = reader_->RowGroup(0); + CheckRowGroupMetadata(group->metadata()); +} + +TEST_F(TestAllTypesPlain, TestBatchRead) { + std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); + + // column 0, id + std::shared_ptr<Int32Reader> col = + std::dynamic_pointer_cast<Int32Reader>(group->Column(0)); + + int16_t def_levels[4]; + int16_t rep_levels[4]; + int32_t values[4]; + + // This file only has 8 rows + ASSERT_EQ(8, reader_->metadata()->num_rows()); + // This file only has 1 row group + ASSERT_EQ(1, reader_->metadata()->num_row_groups()); + // Size of the metadata is 730 bytes + ASSERT_EQ(730, reader_->metadata()->size()); + // This row group must have 8 rows + ASSERT_EQ(8, group->metadata()->num_rows()); + + ASSERT_TRUE(col->HasNext()); + int64_t values_read; + auto levels_read = col->ReadBatch(4, def_levels, rep_levels, values, &values_read); + ASSERT_EQ(4, levels_read); + ASSERT_EQ(4, values_read); + + // Now read past the end of the file + ASSERT_TRUE(col->HasNext()); + levels_read = col->ReadBatch(5, def_levels, rep_levels, values, &values_read); + ASSERT_EQ(4, levels_read); + ASSERT_EQ(4, values_read); + + ASSERT_FALSE(col->HasNext()); +} + +TEST_F(TestAllTypesPlain, RowGroupColumnBoundchecking) { + // Part of PARQUET-1857 + ASSERT_THROW(reader_->RowGroup(reader_->metadata()->num_row_groups()), + ParquetException); + + auto row_group = reader_->RowGroup(0); + ASSERT_THROW(row_group->Column(row_group->metadata()->num_columns()), ParquetException); + ASSERT_THROW(row_group->GetColumnPageReader(row_group->metadata()->num_columns()), + ParquetException); +} + +TEST_F(TestAllTypesPlain, TestFlatScannerInt32) { + std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); + + // column 0, id + std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0))); + int32_t val; + bool is_null; + for (int i = 0; i < 8; ++i) { + ASSERT_TRUE(scanner->HasNext()); + ASSERT_TRUE(scanner->NextValue(&val, &is_null)); + ASSERT_FALSE(is_null); + } + ASSERT_FALSE(scanner->HasNext()); + ASSERT_FALSE(scanner->NextValue(&val, &is_null)); +} + +TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) { + std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); + + // column 0, id + std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0))); + + ASSERT_EQ(128, scanner->batch_size()); + scanner->SetBatchSize(1024); + ASSERT_EQ(1024, scanner->batch_size()); +} + +TEST_F(TestAllTypesPlain, DebugPrintWorks) { + std::stringstream ss; + + std::list<int> columns; + ParquetFilePrinter printer(reader_.get()); + printer.DebugPrint(ss, columns); + + std::string result = ss.str(); + ASSERT_GT(result.size(), 0); +} + +TEST_F(TestAllTypesPlain, ColumnSelection) { + std::stringstream ss; + + std::list<int> columns; + columns.push_back(5); + columns.push_back(0); + columns.push_back(10); + ParquetFilePrinter printer(reader_.get()); + printer.DebugPrint(ss, columns); + + std::string result = ss.str(); + ASSERT_GT(result.size(), 0); +} + +TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { + std::stringstream ss; + + std::list<int> columns; + columns.push_back(100); + ParquetFilePrinter printer1(reader_.get()); + ASSERT_THROW(printer1.DebugPrint(ss, columns), ParquetException); + + columns.clear(); + columns.push_back(-1); + ParquetFilePrinter printer2(reader_.get()); + ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException); +} + +class TestLocalFile : public ::testing::Test { + public: + void SetUp() { + std::string dir_string(test::get_data_dir()); + + std::stringstream ss; + ss << dir_string << "/" + << "alltypes_plain.parquet"; + + PARQUET_ASSIGN_OR_THROW(handle, ReadableFile::Open(ss.str())); + fileno = handle->file_descriptor(); + } + + void TearDown() {} + + protected: + int fileno; + std::shared_ptr<::arrow::io::ReadableFile> handle; +}; + +TEST_F(TestLocalFile, OpenWithMetadata) { + // PARQUET-808 + std::stringstream ss; + std::shared_ptr<FileMetaData> metadata = ReadMetaData(handle); + + auto reader = ParquetFileReader::Open(handle, default_reader_properties(), metadata); + + // Compare pointers + ASSERT_EQ(metadata.get(), reader->metadata().get()); + + std::list<int> columns; + ParquetFilePrinter printer(reader.get()); + printer.DebugPrint(ss, columns, true); + + // Make sure OpenFile passes on the external metadata, too + auto reader2 = ParquetFileReader::OpenFile(alltypes_plain(), false, + default_reader_properties(), metadata); + + // Compare pointers + ASSERT_EQ(metadata.get(), reader2->metadata().get()); +} + +TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) { + // PARQUET-816. Some files generated by older Parquet implementations may + // contain malformed data page metadata, and we can successfully decode them + // if we optimistically proceed to decoding, even if there is not enough data + // available in the stream. Before, we had quite aggressive checking of + // stream reads, which are not found e.g. in Impala's Parquet implementation + auto reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), false); + std::stringstream ss; + + // empty list means print all + std::list<int> columns; + ParquetFilePrinter printer1(reader.get()); + printer1.DebugPrint(ss, columns, true); + + reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), true); + std::stringstream ss2; + ParquetFilePrinter printer2(reader.get()); + printer2.DebugPrint(ss2, columns, true); + + // The memory-mapped reads runs over the end of the column chunk and succeeds + // by accident + ASSERT_EQ(ss2.str(), ss.str()); +} + +TEST(TestDumpWithLocalFile, DumpOutput) { + std::string header_output = R"###(File Name: nested_lists.snappy.parquet +Version: 1.0 +Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c) +Total rows: 3 +Number of RowGroups: 1 +Number of Real Columns: 2 +Number of Columns: 2 +Number of Selected Columns: 2 +Column 0: a.list.element.list.element.list.element (BYTE_ARRAY / String / UTF8) +Column 1: b (INT32) +--- Row Group: 0 --- +--- Total Bytes: 155 --- +--- Total Compressed Bytes: 0 --- +--- Rows: 3 --- +Column 0 + Values: 18 Statistics Not Set + Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY + Uncompressed Size: 103, Compressed Size: 104 +Column 1 + Values: 3, Null Values: 0, Distinct Values: 0 + Max: 1, Min: 1 + Compression: SNAPPY, Encodings: BIT_PACKED PLAIN_DICTIONARY + Uncompressed Size: 52, Compressed Size: 56 +)###"; + std::string values_output = R"###(--- Values --- +element |b | +a |1 | +b |1 | +c |1 | +NULL | +d | +a | +b | +c | +d | +NULL | +e | +a | +b | +c | +d | +e | +NULL | +f | + +)###"; + std::string dump_output = R"###(--- Values --- +Column 0 + D:7 R:0 V:a + D:7 R:3 V:b + D:7 R:2 V:c + D:4 R:1 NULL + D:7 R:2 V:d + D:7 R:0 V:a + D:7 R:3 V:b + D:7 R:2 V:c + D:7 R:3 V:d + D:4 R:1 NULL + D:7 R:2 V:e + D:7 R:0 V:a + D:7 R:3 V:b + D:7 R:2 V:c + D:7 R:3 V:d + D:7 R:2 V:e + D:4 R:1 NULL + D:7 R:2 V:f +Column 1 + D:0 R:0 V:1 + D:0 R:0 V:1 + D:0 R:0 V:1 +)###"; + + // empty list means print all + std::list<int> columns; + + std::stringstream ss_values, ss_dump; + const char* file = "nested_lists.snappy.parquet"; + auto reader_props = default_reader_properties(); + auto reader = ParquetFileReader::OpenFile(data_file(file), false, reader_props); + ParquetFilePrinter printer(reader.get()); + + printer.DebugPrint(ss_values, columns, true, false, false, file); + printer.DebugPrint(ss_dump, columns, true, true, false, file); + + ASSERT_EQ(header_output + values_output, ss_values.str()); + ASSERT_EQ(header_output + dump_output, ss_dump.str()); +} + +TEST(TestJSONWithLocalFile, JSONOutput) { + std::string json_output = R"###({ + "FileName": "alltypes_plain.parquet", + "Version": "1.0", + "CreatedBy": "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)", + "TotalRows": "8", + "NumberOfRowGroups": "1", + "NumberOfRealColumns": "11", + "NumberOfColumns": "11", + "Columns": [ + { "Id": "0", "Name": "id", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "1", "Name": "bool_col", "PhysicalType": "BOOLEAN", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "2", "Name": "tinyint_col", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "3", "Name": "smallint_col", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "4", "Name": "int_col", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "5", "Name": "bigint_col", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "6", "Name": "float_col", "PhysicalType": "FLOAT", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "7", "Name": "double_col", "PhysicalType": "DOUBLE", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "8", "Name": "date_string_col", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "9", "Name": "string_col", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} }, + { "Id": "10", "Name": "timestamp_col", "PhysicalType": "INT96", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} } + ], + "RowGroups": [ + { + "Id": "0", "TotalBytes": "671", "TotalCompressedBytes": "0", "Rows": "8", + "ColumnChunks": [ + {"Id": "0", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "73", "CompressedSize": "73" }, + {"Id": "1", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "24", "CompressedSize": "24" }, + {"Id": "2", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, + {"Id": "3", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, + {"Id": "4", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, + {"Id": "5", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" }, + {"Id": "6", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" }, + {"Id": "7", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" }, + {"Id": "8", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "88", "CompressedSize": "88" }, + {"Id": "9", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "49", "CompressedSize": "49" }, + {"Id": "10", "Values": "8", "StatsSet": "False", + "Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "139", "CompressedSize": "139" } + ] + } + ] +} +)###"; + + std::stringstream ss; + // empty list means print all + std::list<int> columns; + + auto reader = + ParquetFileReader::OpenFile(alltypes_plain(), false, default_reader_properties()); + ParquetFilePrinter printer(reader.get()); + printer.JSONPrint(ss, columns, "alltypes_plain.parquet"); + + ASSERT_EQ(json_output, ss.str()); +} + +TEST(TestFileReader, BufferedReadsWithDictionary) { + const int num_rows = 1000; + + // Make schema + schema::NodeVector fields; + fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::DOUBLE, + ConvertedType::NONE)); + auto schema = std::static_pointer_cast<GroupNode>( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // Write small batches and small data pages + std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder() + .write_batch_size(64) + ->data_pagesize(128) + ->enable_dictionary() + ->build(); + + ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); + std::shared_ptr<ParquetFileWriter> file_writer = + ParquetFileWriter::Open(out_file, schema, writer_props); + + RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // write one column + ::arrow::random::RandomArrayGenerator rag(0); + DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn()); + std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100); + const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col); + writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values()); + rg_writer->Close(); + file_writer->Close(); + + // Open the reader + ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); + auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); + + ReaderProperties reader_props; + reader_props.enable_buffered_stream(); + reader_props.set_buffer_size(64); + std::unique_ptr<ParquetFileReader> file_reader = + ParquetFileReader::Open(in_file, reader_props); + + auto row_group = file_reader->RowGroup(0); + auto col_reader = std::static_pointer_cast<DoubleReader>( + row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY)); + EXPECT_EQ(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY); + + auto indices = ::arrow::internal::make_unique<int32_t[]>(num_rows); + const double* dict = nullptr; + int32_t dict_len = 0; + for (int row_index = 0; row_index < num_rows; ++row_index) { + const double* tmp_dict = nullptr; + int32_t tmp_dict_len = 0; + int64_t values_read = 0; + int64_t levels_read = col_reader->ReadBatchWithDictionary( + /*batch_size=*/1, /*def_levels=*/nullptr, /*rep_levels=*/nullptr, + indices.get() + row_index, &values_read, &tmp_dict, &tmp_dict_len); + + if (tmp_dict != nullptr) { + EXPECT_EQ(values_read, 1); + dict = tmp_dict; + dict_len = tmp_dict_len; + } else { + EXPECT_EQ(values_read, 0); + } + + ASSERT_EQ(1, levels_read); + ASSERT_EQ(1, values_read); + } + + // Check the results + for (int row_index = 0; row_index < num_rows; ++row_index) { + EXPECT_LT(indices[row_index], dict_len); + EXPECT_EQ(dict[indices[row_index]], col_typed.Value(row_index)); + } +} + +TEST(TestFileReader, BufferedReads) { + // PARQUET-1636: Buffered reads were broken before introduction of + // RandomAccessFile::GetStream + + const int num_columns = 10; + const int num_rows = 1000; + + // Make schema + schema::NodeVector fields; + for (int i = 0; i < num_columns; ++i) { + fields.push_back(PrimitiveNode::Make("field" + std::to_string(i), + Repetition::REQUIRED, Type::DOUBLE, + ConvertedType::NONE)); + } + auto schema = std::static_pointer_cast<GroupNode>( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // Write small batches and small data pages + std::shared_ptr<WriterProperties> writer_props = + WriterProperties::Builder().write_batch_size(64)->data_pagesize(128)->build(); + + ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); + std::shared_ptr<ParquetFileWriter> file_writer = + ParquetFileWriter::Open(out_file, schema, writer_props); + + RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + ::arrow::ArrayVector column_data; + ::arrow::random::RandomArrayGenerator rag(0); + + // Scratch space for reads + ::arrow::BufferVector scratch_space; + + // write columns + for (int col_index = 0; col_index < num_columns; ++col_index) { + DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn()); + std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100); + const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col); + writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values()); + column_data.push_back(col); + + // We use this later for reading back the columns + scratch_space.push_back( + AllocateBuffer(::arrow::default_memory_pool(), num_rows * sizeof(double))); + } + rg_writer->Close(); + file_writer->Close(); + + // Open the reader + ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); + auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); + + ReaderProperties reader_props; + reader_props.enable_buffered_stream(); + reader_props.set_buffer_size(64); + std::unique_ptr<ParquetFileReader> file_reader = + ParquetFileReader::Open(in_file, reader_props); + + auto row_group = file_reader->RowGroup(0); + std::vector<std::shared_ptr<DoubleReader>> col_readers; + for (int col_index = 0; col_index < num_columns; ++col_index) { + col_readers.push_back( + std::static_pointer_cast<DoubleReader>(row_group->Column(col_index))); + } + + for (int row_index = 0; row_index < num_rows; ++row_index) { + for (int col_index = 0; col_index < num_columns; ++col_index) { + double* out = + reinterpret_cast<double*>(scratch_space[col_index]->mutable_data()) + row_index; + int64_t values_read = 0; + int64_t levels_read = + col_readers[col_index]->ReadBatch(1, nullptr, nullptr, out, &values_read); + + ASSERT_EQ(1, levels_read); + ASSERT_EQ(1, values_read); + } + } + + // Check the results + for (int col_index = 0; col_index < num_columns; ++col_index) { + ASSERT_TRUE( + scratch_space[col_index]->Equals(*column_data[col_index]->data()->buffers[1])); + } +} + +std::unique_ptr<ParquetFileReader> OpenBuffer(const std::string& contents) { + auto buffer = ::arrow::Buffer::FromString(contents); + return ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); +} + +::arrow::Future<> OpenBufferAsync(const std::string& contents) { + auto buffer = ::arrow::Buffer::FromString(contents); + return ::arrow::Future<>( + ParquetFileReader::OpenAsync(std::make_shared<::arrow::io::BufferReader>(buffer))); +} + +// https://github.com/google/googletest/pull/2904 not available in our version of +// gtest/gmock +#define EXPECT_THROW_THAT(callable, ex_type, property) \ + EXPECT_THROW( \ + try { (callable)(); } catch (const ex_type& err) { \ + EXPECT_THAT(err, (property)); \ + throw; \ + }, \ + ex_type) + +TEST(TestFileReader, TestOpenErrors) { + EXPECT_THROW_THAT( + []() { OpenBuffer(""); }, ParquetInvalidOrCorruptedFileException, + ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr("Parquet file size is 0 bytes"))); + EXPECT_THROW_THAT( + []() { OpenBuffer("AAAAPAR0"); }, ParquetInvalidOrCorruptedFileException, + ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr("Parquet magic bytes not found"))); + EXPECT_THROW_THAT( + []() { OpenBuffer("APAR1"); }, ParquetInvalidOrCorruptedFileException, + ::testing::Property( + &ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr( + "Parquet file size is 5 bytes, smaller than the minimum file footer"))); + EXPECT_THROW_THAT( + []() { OpenBuffer("\xFF\xFF\xFF\x0FPAR1"); }, + ParquetInvalidOrCorruptedFileException, + ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr("Parquet file size is 8 bytes, smaller " + "than the size reported by footer's"))); + EXPECT_THROW_THAT( + []() { OpenBuffer(std::string("\x00\x00\x00\x00PAR1", 8)); }, ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"))); + + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("Parquet file size is 0 bytes"), OpenBufferAsync("")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("Parquet magic bytes not found"), + OpenBufferAsync("AAAAPAR0")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr( + "Parquet file size is 5 bytes, smaller than the minimum file footer"), + OpenBufferAsync("APAR1")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr( + "Parquet file size is 8 bytes, smaller than the size reported by footer's"), + OpenBufferAsync("\xFF\xFF\xFF\x0FPAR1")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + IOError, ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"), + OpenBufferAsync(std::string("\x00\x00\x00\x00PAR1", 8))); +} + +#undef EXPECT_THROW_THAT + +#ifdef ARROW_WITH_LZ4 +struct TestCodecParam { + std::string name; + std::string small_data_file; + std::string larger_data_file; +}; + +void PrintTo(const TestCodecParam& p, std::ostream* os) { *os << p.name; } + +class TestCodec : public ::testing::TestWithParam<TestCodecParam> { + protected: + const std::string& GetSmallDataFile() { return GetParam().small_data_file; } + + const std::string& GetLargerDataFile() { return GetParam().larger_data_file; } +}; + +TEST_P(TestCodec, SmallFileMetadataAndValues) { + std::unique_ptr<ParquetFileReader> reader_ = + ParquetFileReader::OpenFile(GetSmallDataFile()); + std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0); + const auto rg_metadata = group->metadata(); + + // This file only has 4 rows + ASSERT_EQ(4, reader_->metadata()->num_rows()); + // This file only has 3 columns + ASSERT_EQ(3, reader_->metadata()->num_columns()); + // This file only has 1 row group + ASSERT_EQ(1, reader_->metadata()->num_row_groups()); + + // This row group must have 4 rows + ASSERT_EQ(4, rg_metadata->num_rows()); + + // Some parquet-cpp versions are susceptible to PARQUET-2008 + const auto& app_ver = reader_->metadata()->writer_version(); + const bool allow_uncompressed_mismatch = + (app_ver.application_ == "parquet-cpp" && app_ver.version.major == 1 && + app_ver.version.minor == 5 && app_ver.version.patch == 1); + + CheckRowGroupMetadata(rg_metadata, allow_uncompressed_mismatch); + + // column 0, c0 + auto col0 = checked_pointer_cast<Int64Reader>(group->Column(0)); + std::vector<int64_t> expected_values = {1593604800, 1593604800, 1593604801, 1593604801}; + AssertColumnValues(col0, 4, 4, expected_values, 4); + + // column 1, c1 + std::vector<ByteArray> expected_byte_arrays = {ByteArray("abc"), ByteArray("def"), + ByteArray("abc"), ByteArray("def")}; + auto col1 = checked_pointer_cast<ByteArrayReader>(group->Column(1)); + AssertColumnValues(col1, 4, 4, expected_byte_arrays, 4); + + // column 2, v11 + std::vector<double> expected_double_values = {42.0, 7.7, 42.125, 7.7}; + auto col2 = checked_pointer_cast<DoubleReader>(group->Column(2)); + AssertColumnValues(col2, 4, 4, expected_double_values, 4); +} + +TEST_P(TestCodec, LargeFileValues) { + // Test codec with a larger data file such data may have been compressed + // in several "frames" (ARROW-9177) + auto file_path = GetParam().larger_data_file; + if (file_path.empty()) { + GTEST_SKIP() << "Larger data file not available for this codec"; + } + auto file = ParquetFileReader::OpenFile(file_path); + auto group = file->RowGroup(0); + + const int64_t kNumRows = 10000; + + ASSERT_EQ(kNumRows, file->metadata()->num_rows()); + ASSERT_EQ(1, file->metadata()->num_columns()); + ASSERT_EQ(1, file->metadata()->num_row_groups()); + ASSERT_EQ(kNumRows, group->metadata()->num_rows()); + + // column 0 ("a") + auto col = checked_pointer_cast<ByteArrayReader>(group->Column(0)); + + std::vector<ByteArray> values(kNumRows); + int64_t values_read; + auto levels_read = + col->ReadBatch(kNumRows, nullptr, nullptr, values.data(), &values_read); + ASSERT_EQ(kNumRows, levels_read); + ASSERT_EQ(kNumRows, values_read); + ASSERT_EQ(values[0], ByteArray("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b")); + ASSERT_EQ(values[1], ByteArray("e8fb9197-cb9f-4118-b67f-fbfa65f61843")); + ASSERT_EQ(values[kNumRows - 2], ByteArray("ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c")); + ASSERT_EQ(values[kNumRows - 1], ByteArray("85440778-460a-41ac-aa2e-ac3ee41696bf")); +} + +std::vector<TestCodecParam> test_codec_params{ + {"LegacyLZ4Hadoop", hadoop_lz4_compressed(), hadoop_lz4_compressed_larger()}, + {"LegacyLZ4NonHadoop", non_hadoop_lz4_compressed(), ""}, + {"LZ4Raw", lz4_raw_compressed(), lz4_raw_compressed_larger()}}; + +INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec, ::testing::ValuesIn(test_codec_params), + testing::PrintToStringParamName()); +#endif // ARROW_WITH_LZ4 + +} // namespace parquet |