diff options
Diffstat (limited to 'src/arrow/cpp/src/parquet/column_scanner_test.cc')
-rw-r--r-- | src/arrow/cpp/src/parquet/column_scanner_test.cc | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/parquet/column_scanner_test.cc b/src/arrow/cpp/src/parquet/column_scanner_test.cc new file mode 100644 index 000000000..f6d162e3d --- /dev/null +++ b/src/arrow/cpp/src/parquet/column_scanner_test.cc @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <algorithm> +#include <cstdint> +#include <cstdlib> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/testing/gtest_compat.h" + +#include "parquet/column_page.h" +#include "parquet/column_scanner.h" +#include "parquet/schema.h" +#include "parquet/test_util.h" +#include "parquet/types.h" + +namespace parquet { + +using schema::NodePtr; + +namespace test { + +template <typename Type> +class TestFlatScanner : public ::testing::Test { + public: + using c_type = typename Type::c_type; + + void InitScanner(const ColumnDescriptor* d) { + std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_)); + scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager))); + } + + void CheckResults(int batch_size, const ColumnDescriptor* d) { + TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get()); + c_type val; + bool is_null = false; + int16_t def_level; + int16_t rep_level; + int j = 0; + scanner->SetBatchSize(batch_size); + for (int i = 0; i < num_levels_; i++) { + ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j; + if (!is_null) { + ASSERT_EQ(values_[j], val) << i << "V" << j; + j++; + } + if (d->max_definition_level() > 0) { + ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j; + } + if (d->max_repetition_level() > 0) { + ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j; + } + } + ASSERT_EQ(num_values_, j); + ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null)); + } + + void Clear() { + pages_.clear(); + values_.clear(); + def_levels_.clear(); + rep_levels_.clear(); + } + + void Execute(int num_pages, int levels_per_page, int batch_size, + const ColumnDescriptor* d, Encoding::type encoding) { + num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, + values_, data_buffer_, pages_, encoding); + num_levels_ = num_pages * levels_per_page; + InitScanner(d); + CheckResults(batch_size, d); + Clear(); + } + + void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1, + std::shared_ptr<ColumnDescriptor>& d2, + std::shared_ptr<ColumnDescriptor>& d3, int length) { + NodePtr type; + type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num, + ConvertedType::NONE, length); + d1.reset(new ColumnDescriptor(type, 0, 0)); + type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num, + ConvertedType::NONE, length); + d2.reset(new ColumnDescriptor(type, 4, 0)); + type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num, + ConvertedType::NONE, length); + d3.reset(new ColumnDescriptor(type, 4, 2)); + } + + void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length, + Encoding::type encoding = Encoding::PLAIN) { + std::shared_ptr<ColumnDescriptor> d1; + std::shared_ptr<ColumnDescriptor> d2; + std::shared_ptr<ColumnDescriptor> d3; + InitDescriptors(d1, d2, d3, type_length); + // evaluate REQUIRED pages + Execute(num_pages, num_levels, batch_size, d1.get(), encoding); + // evaluate OPTIONAL pages + Execute(num_pages, num_levels, batch_size, d2.get(), encoding); + // evaluate REPEATED pages + Execute(num_pages, num_levels, batch_size, d3.get(), encoding); + } + + protected: + int num_levels_; + int num_values_; + std::vector<std::shared_ptr<Page>> pages_; + std::shared_ptr<Scanner> scanner_; + std::vector<c_type> values_; + std::vector<int16_t> def_levels_; + std::vector<int16_t> rep_levels_; + std::vector<uint8_t> data_buffer_; // For BA and FLBA +}; + +static int num_levels_per_page = 100; +static int num_pages = 20; +static int batch_size = 32; + +typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, + ByteArrayType> + TestTypes; + +using TestBooleanFlatScanner = TestFlatScanner<BooleanType>; +using TestFLBAFlatScanner = TestFlatScanner<FLBAType>; + +TYPED_TEST_SUITE(TestFlatScanner, TestTypes); + +TYPED_TEST(TestFlatScanner, TestPlainScanner) { + ASSERT_NO_FATAL_FAILURE( + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN)); +} + +TYPED_TEST(TestFlatScanner, TestDictScanner) { + ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, + Encoding::RLE_DICTIONARY)); +} + +TEST_F(TestBooleanFlatScanner, TestPlainScanner) { + ASSERT_NO_FATAL_FAILURE( + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0)); +} + +TEST_F(TestFLBAFlatScanner, TestPlainScanner) { + ASSERT_NO_FATAL_FAILURE( + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH)); +} + +TEST_F(TestFLBAFlatScanner, TestDictScanner) { + ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, + FLBA_LENGTH, Encoding::RLE_DICTIONARY)); +} + +TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) { + ASSERT_NO_FATAL_FAILURE(this->ExecuteAll(num_pages, num_levels_per_page, batch_size, + FLBA_LENGTH, Encoding::PLAIN_DICTIONARY)); +} + +// PARQUET 502 +TEST_F(TestFLBAFlatScanner, TestSmallBatch) { + NodePtr type = + schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + ConvertedType::DECIMAL, FLBA_LENGTH, 10, 2); + const ColumnDescriptor d(type, 0, 0); + num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, + data_buffer_, pages_); + num_levels_ = 1 * 100; + InitScanner(&d); + ASSERT_NO_FATAL_FAILURE(CheckResults(1, &d)); +} + +TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) { + NodePtr type = + schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, + ConvertedType::DECIMAL, FLBA_LENGTH, 10, 2); + const ColumnDescriptor d(type, 4, 0); + num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, + data_buffer_, pages_); + num_levels_ = 1 * 100; + InitScanner(&d); + TypedScanner<FLBAType>* scanner = + reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get()); + ASSERT_EQ(10, scanner->descr()->type_precision()); + ASSERT_EQ(2, scanner->descr()->type_scale()); + ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length()); +} + +TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) { + NodePtr type = + schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, + ConvertedType::DECIMAL, FLBA_LENGTH, 10, 2); + const ColumnDescriptor d(type, 4, 0); + num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, + data_buffer_, pages_); + num_levels_ = 1 * 100; + InitScanner(&d); + TypedScanner<FLBAType>* scanner = + reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get()); + scanner->SetBatchSize(batch_size); + std::stringstream ss_fail; + for (int i = 0; i < num_levels_; i++) { + std::stringstream ss; + scanner->PrintNext(ss, 17); + std::string result = ss.str(); + ASSERT_LE(17, result.size()) << i; + } + ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException); +} + +} // namespace test +} // namespace parquet |