diff options
Diffstat (limited to 'src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc')
-rw-r--r-- | src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc new file mode 100644 index 000000000..65dd5799e --- /dev/null +++ b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <reader_writer.h> + +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> + +/* + * This example describes writing and reading Parquet Files in C++ and serves as a + * reference to the API. + * The file contains all the physical data types supported by Parquet. + * This example uses the RowGroupWriter API that supports writing RowGroups based on a + * certain size. + **/ + +/* Parquet is a structured columnar file format + * Parquet File = "Parquet data" + "Parquet Metadata" + * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a + * columnar layout + * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their + * Columns + * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a + * complex (nested) type (internal nodes) + * For specific details, please refer the format here: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + **/ + +constexpr int NUM_ROWS = 2500000; +constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB +const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet"; + +int main(int argc, char** argv) { + /********************************************************************************** + PARQUET WRITER EXAMPLE + **********************************************************************************/ + // parquet::REQUIRED fields do not need definition and repetition level values + // parquet::OPTIONAL fields require only definition level values + // parquet::REPEATED fields require both definition and repetition level values + try { + // Create a local file output stream instance. + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr<FileClass> out_file; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = SetupSchema(); + + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a BufferedRowGroup to keep the RowGroup open until a certain size + parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); + + int num_columns = file_writer->num_columns(); + std::vector<int64_t> buffered_values_estimate(num_columns, 0); + for (int i = 0; i < NUM_ROWS; i++) { + int64_t estimated_bytes = 0; + // Get the estimated size of the values that are not written to a page yet + for (int n = 0; n < num_columns; n++) { + estimated_bytes += buffered_values_estimate[n]; + } + + // We need to consider the compressed pages + // as well as the values that are not compressed yet + if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() + + estimated_bytes) > ROW_GROUP_SIZE) { + rg_writer->Close(); + std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); + rg_writer = file_writer->AppendBufferedRowGroup(); + } + + int col_id = 0; + // Write the Bool column + parquet::BoolWriter* bool_writer = + static_cast<parquet::BoolWriter*>(rg_writer->column(col_id)); + bool bool_value = ((i % 2) == 0) ? true : false; + bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value); + buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes(); + + // Write the Int32 column + col_id++; + parquet::Int32Writer* int32_writer = + static_cast<parquet::Int32Writer*>(rg_writer->column(col_id)); + int32_t int32_value = i; + int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value); + buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes(); + + // Write the Int64 column. Each row has repeats twice. + col_id++; + parquet::Int64Writer* int64_writer = + static_cast<parquet::Int64Writer*>(rg_writer->column(col_id)); + int64_t int64_value1 = 2 * i; + int16_t definition_level = 1; + int16_t repetition_level = 0; + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1); + int64_t int64_value2 = (2 * i + 1); + repetition_level = 1; // start of a new record + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2); + buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes(); + + // Write the INT96 column. + col_id++; + parquet::Int96Writer* int96_writer = + static_cast<parquet::Int96Writer*>(rg_writer->column(col_id)); + parquet::Int96 int96_value; + int96_value.value[0] = i; + int96_value.value[1] = i + 1; + int96_value.value[2] = i + 2; + int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value); + buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes(); + + // Write the Float column + col_id++; + parquet::FloatWriter* float_writer = + static_cast<parquet::FloatWriter*>(rg_writer->column(col_id)); + float float_value = static_cast<float>(i) * 1.1f; + float_writer->WriteBatch(1, nullptr, nullptr, &float_value); + buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes(); + + // Write the Double column + col_id++; + parquet::DoubleWriter* double_writer = + static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id)); + double double_value = i * 1.1111111; + double_writer->WriteBatch(1, nullptr, nullptr, &double_value); + buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes(); + + // Write the ByteArray column. Make every alternate values NULL + col_id++; + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); + parquet::ByteArray ba_value; + char hello[FIXED_LENGTH] = "parquet"; + hello[7] = static_cast<char>(static_cast<int>('0') + i / 100); + hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10); + hello[9] = static_cast<char>(static_cast<int>('0') + i % 10); + if (i % 2 == 0) { + int16_t definition_level = 1; + ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]); + ba_value.len = FIXED_LENGTH; + ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); + } else { + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); + + // Write the FixedLengthByteArray column + col_id++; + parquet::FixedLenByteArrayWriter* flba_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id)); + parquet::FixedLenByteArray flba_value; + char v = static_cast<char>(i); + char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]); + + flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value); + buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes(); + } + + // Close the RowGroupWriter + rg_writer->Close(); + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + /********************************************************************************** + PARQUET READER EXAMPLE + **********************************************************************************/ + + try { + // Create a ParquetReader instance + std::unique_ptr<parquet::ParquetFileReader> parquet_reader = + parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false); + + // Get the File MetaData + std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); + + int num_row_groups = file_metadata->num_row_groups(); + + // Get the number of Columns + int num_columns = file_metadata->num_columns(); + assert(num_columns == 8); + + std::vector<int> col_row_counts(num_columns, 0); + + // Iterate over all the RowGroups in the file + for (int r = 0; r < num_row_groups; ++r) { + // Get the RowGroup Reader + std::shared_ptr<parquet::RowGroupReader> row_group_reader = + parquet_reader->RowGroup(r); + + assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE); + + int64_t values_read = 0; + int64_t rows_read = 0; + int16_t definition_level; + int16_t repetition_level; + std::shared_ptr<parquet::ColumnReader> column_reader; + int col_id = 0; + + ARROW_UNUSED(rows_read); // prevent warning in release build + + // Get the Column Reader for the boolean column + column_reader = row_group_reader->Column(col_id); + parquet::BoolReader* bool_reader = + static_cast<parquet::BoolReader*>(column_reader.get()); + + // Read all the rows in the column + while (bool_reader->HasNext()) { + bool value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false; + assert(value == expected_value); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Int32 column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::Int32Reader* int32_reader = + static_cast<parquet::Int32Reader*>(column_reader.get()); + // Read all the rows in the column + while (int32_reader->HasNext()) { + int32_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + assert(value == col_row_counts[col_id]); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Int64 column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::Int64Reader* int64_reader = + static_cast<parquet::Int64Reader*>(column_reader.get()); + // Read all the rows in the column + while (int64_reader->HasNext()) { + int64_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, + &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + int64_t expected_value = col_row_counts[col_id]; + assert(value == expected_value); + if ((col_row_counts[col_id] % 2) == 0) { + assert(repetition_level == 0); + } else { + assert(repetition_level == 1); + } + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Int96 column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::Int96Reader* int96_reader = + static_cast<parquet::Int96Reader*>(column_reader.get()); + // Read all the rows in the column + while (int96_reader->HasNext()) { + parquet::Int96 value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + parquet::Int96 expected_value; + ARROW_UNUSED(expected_value); // prevent warning in release build + expected_value.value[0] = col_row_counts[col_id]; + expected_value.value[1] = col_row_counts[col_id] + 1; + expected_value.value[2] = col_row_counts[col_id] + 2; + for (int j = 0; j < 3; j++) { + assert(value.value[j] == expected_value.value[j]); + } + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Float column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::FloatReader* float_reader = + static_cast<parquet::FloatReader*>(column_reader.get()); + // Read all the rows in the column + while (float_reader->HasNext()) { + float value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f; + assert(value == expected_value); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Double column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::DoubleReader* double_reader = + static_cast<parquet::DoubleReader*>(column_reader.get()); + // Read all the rows in the column + while (double_reader->HasNext()) { + double value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + double expected_value = col_row_counts[col_id] * 1.1111111; + assert(value == expected_value); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the ByteArray column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::ByteArrayReader* ba_reader = + static_cast<parquet::ByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + while (ba_reader->HasNext()) { + parquet::ByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // Verify the value written + char expected_value[FIXED_LENGTH] = "parquet"; + ARROW_UNUSED(expected_value); // prevent warning in release build + expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100); + expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10); + expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10); + if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist + // There are no NULL values in the rows written + assert(values_read == 1); + assert(value.len == FIXED_LENGTH); + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + assert(definition_level == 1); + } else { + // There are NULL values in the rows written + assert(values_read == 0); + assert(definition_level == 0); + } + col_row_counts[col_id]++; + } + + // Get the Column Reader for the FixedLengthByteArray column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::FixedLenByteArrayReader* flba_reader = + static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + while (flba_reader->HasNext()) { + parquet::FixedLenByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + char v = static_cast<char>(col_row_counts[col_id]); + char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + col_row_counts[col_id]++; + } + } + } catch (const std::exception& e) { + std::cerr << "Parquet read error: " << e.what() << std::endl; + return -1; + } + + std::cout << "Parquet Writing and Reading Complete" << std::endl; + + return 0; +} |