diff options
Diffstat (limited to 'src/arrow/cpp/examples')
29 files changed, 4383 insertions, 0 deletions
diff --git a/src/arrow/cpp/examples/arrow/CMakeLists.txt b/src/arrow/cpp/examples/arrow/CMakeLists.txt new file mode 100644 index 000000000..ac758b92d --- /dev/null +++ b/src/arrow/cpp/examples/arrow/CMakeLists.txt @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ADD_ARROW_EXAMPLE(row_wise_conversion_example) + +if (ARROW_COMPUTE) + ADD_ARROW_EXAMPLE(compute_register_example) +endif() + +if (ARROW_COMPUTE AND ARROW_CSV) + ADD_ARROW_EXAMPLE(compute_and_write_csv_example) +endif() + +if (ARROW_PARQUET AND ARROW_DATASET) + if (ARROW_BUILD_SHARED) + set(DATASET_EXAMPLES_LINK_LIBS arrow_dataset_shared) + else() + set(DATASET_EXAMPLES_LINK_LIBS arrow_dataset_static) + endif() + + ADD_ARROW_EXAMPLE(dataset_parquet_scan_example + EXTRA_LINK_LIBS + ${DATASET_EXAMPLES_LINK_LIBS}) + add_dependencies(dataset_parquet_scan_example parquet) + + ADD_ARROW_EXAMPLE(dataset_documentation_example + EXTRA_LINK_LIBS + ${DATASET_EXAMPLES_LINK_LIBS}) + add_dependencies(dataset_documentation_example parquet) +endif() diff --git a/src/arrow/cpp/examples/arrow/compute_and_write_csv_example.cc b/src/arrow/cpp/examples/arrow/compute_and_write_csv_example.cc new file mode 100644 index 000000000..db3478759 --- /dev/null +++ b/src/arrow/cpp/examples/arrow/compute_and_write_csv_example.cc @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api_aggregate.h> +#include <arrow/csv/api.h> +#include <arrow/csv/writer.h> +#include <arrow/io/api.h> +#include <arrow/result.h> +#include <arrow/status.h> + +#include <iostream> +#include <vector> + +// Many operations in Apache Arrow operate on +// columns of data, and the columns of data are +// assembled into a table. In this example, we +// examine how to compare two arrays which are +// combined to form a table that is then written +// out to a CSV file. +// +// To run this example you can use +// ./compute_and_write_csv_example +// +// the program will write the files into +// compute_and_write_output.csv +// in the current directory + +arrow::Status RunMain(int argc, char** argv) { + // Make Arrays + arrow::NumericBuilder<arrow::Int64Type> int64_builder; + arrow::BooleanBuilder boolean_builder; + + // Make place for 8 values in total + ARROW_RETURN_NOT_OK(int64_builder.Resize(8)); + ARROW_RETURN_NOT_OK(boolean_builder.Resize(8)); + + // Bulk append the given values + std::vector<int64_t> int64_values = {1, 2, 3, 4, 5, 6, 7, 8}; + ARROW_RETURN_NOT_OK(int64_builder.AppendValues(int64_values)); + std::shared_ptr<arrow::Array> array_a; + ARROW_RETURN_NOT_OK(int64_builder.Finish(&array_a)); + int64_builder.Reset(); + int64_values = {2, 5, 1, 3, 6, 2, 7, 4}; + std::shared_ptr<arrow::Array> array_b; + ARROW_RETURN_NOT_OK(int64_builder.AppendValues(int64_values)); + ARROW_RETURN_NOT_OK(int64_builder.Finish(&array_b)); + + // Cast the arrays to their actual types + auto int64_array_a = std::static_pointer_cast<arrow::Int64Array>(array_a); + auto int64_array_b = std::static_pointer_cast<arrow::Int64Array>(array_b); + // Explicit comparison of values using a loop + for (int64_t i = 0; i < 8; i++) { + if ((!int64_array_a->IsNull(i)) && (!int64_array_b->IsNull(i))) { + bool comparison_result = int64_array_a->Value(i) > int64_array_b->Value(i); + boolean_builder.UnsafeAppend(comparison_result); + } else { + boolean_builder.UnsafeAppendNull(); + } + } + std::shared_ptr<arrow::Array> array_a_gt_b_self; + ARROW_RETURN_NOT_OK(boolean_builder.Finish(&array_a_gt_b_self)); + std::cout << "Array explicitly compared" << std::endl; + + // Explicit comparison of values using a compute function + ARROW_ASSIGN_OR_RAISE(arrow::Datum compared_datum, + arrow::compute::CallFunction("greater", {array_a, array_b})); + auto array_a_gt_b_compute = compared_datum.make_array(); + std::cout << "Arrays compared using a compute function" << std::endl; + + // Create a table for the output + auto schema = + arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), + arrow::field("a>b? (self written)", arrow::boolean()), + arrow::field("a>b? (arrow)", arrow::boolean())}); + std::shared_ptr<arrow::Table> my_table = arrow::Table::Make( + schema, {array_a, array_b, array_a_gt_b_self, array_a_gt_b_compute}); + + std::cout << "Table created" << std::endl; + + // Write table to CSV file + auto csv_filename = "compute_and_write_output.csv"; + ARROW_ASSIGN_OR_RAISE(auto outstream, arrow::io::FileOutputStream::Open(csv_filename)); + + std::cout << "Writing CSV file" << std::endl; + ARROW_RETURN_NOT_OK(arrow::csv::WriteCSV( + *my_table, arrow::csv::WriteOptions::Defaults(), outstream.get())); + + return arrow::Status::OK(); +} + +int main(int argc, char** argv) { + arrow::Status status = RunMain(argc, argv); + if (!status.ok()) { + std::cerr << status << std::endl; + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} diff --git a/src/arrow/cpp/examples/arrow/compute_register_example.cc b/src/arrow/cpp/examples/arrow/compute_register_example.cc new file mode 100644 index 000000000..dd760bb60 --- /dev/null +++ b/src/arrow/cpp/examples/arrow/compute_register_example.cc @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/api.h> +#include <arrow/compute/exec/exec_plan.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/compute/exec/options.h> +#include <arrow/util/async_generator.h> +#include <arrow/util/future.h> + +#include <cstdlib> +#include <iostream> +#include <memory> + +// Demonstrate registering an Arrow compute function outside of the Arrow source tree + +namespace cp = ::arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +class ExampleFunctionOptionsType : public cp::FunctionOptionsType { + const char* type_name() const override { return "ExampleFunctionOptionsType"; } + std::string Stringify(const cp::FunctionOptions&) const override { + return "ExampleFunctionOptionsType"; + } + bool Compare(const cp::FunctionOptions&, const cp::FunctionOptions&) const override { + return true; + } + std::unique_ptr<cp::FunctionOptions> Copy(const cp::FunctionOptions&) const override; + // optional: support for serialization + // Result<std::shared_ptr<Buffer>> Serialize(const FunctionOptions&) const override; + // Result<std::unique_ptr<FunctionOptions>> Deserialize(const Buffer&) const override; +}; + +cp::FunctionOptionsType* GetExampleFunctionOptionsType() { + static ExampleFunctionOptionsType options_type; + return &options_type; +} + +class ExampleFunctionOptions : public cp::FunctionOptions { + public: + ExampleFunctionOptions() : cp::FunctionOptions(GetExampleFunctionOptionsType()) {} +}; + +std::unique_ptr<cp::FunctionOptions> ExampleFunctionOptionsType::Copy( + const cp::FunctionOptions&) const { + return std::unique_ptr<cp::FunctionOptions>(new ExampleFunctionOptions()); +} + +arrow::Status ExampleFunctionImpl(cp::KernelContext* ctx, const cp::ExecBatch& batch, + arrow::Datum* out) { + *out->mutable_array() = *batch[0].array(); + return arrow::Status::OK(); +} + +class ExampleNodeOptions : public cp::ExecNodeOptions {}; + +// a basic ExecNode which ignores all input batches +class ExampleNode : public cp::ExecNode { + public: + ExampleNode(ExecNode* input, const ExampleNodeOptions&) + : ExecNode(/*plan=*/input->plan(), /*inputs=*/{input}, + /*input_labels=*/{"ignored"}, + /*output_schema=*/input->output_schema(), /*num_outputs=*/1) {} + + const char* kind_name() const override { return "ExampleNode"; } + + arrow::Status StartProducing() override { + outputs_[0]->InputFinished(this, 0); + return arrow::Status::OK(); + } + + void ResumeProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } + void StopProducing() override { inputs_[0]->StopProducing(); } + + void InputReceived(ExecNode* input, cp::ExecBatch batch) override {} + void ErrorReceived(ExecNode* input, arrow::Status error) override {} + void InputFinished(ExecNode* input, int total_batches) override {} + + arrow::Future<> finished() override { return inputs_[0]->finished(); } +}; + +arrow::Result<cp::ExecNode*> ExampleExecNodeFactory(cp::ExecPlan* plan, + std::vector<cp::ExecNode*> inputs, + const cp::ExecNodeOptions& options) { + const auto& example_options = + arrow::internal::checked_cast<const ExampleNodeOptions&>(options); + + return plan->EmplaceNode<ExampleNode>(inputs[0], example_options); +} + +const cp::FunctionDoc func_doc{ + "Example function to demonstrate registering an out-of-tree function", + "", + {"x"}, + "ExampleFunctionOptions"}; + +int main(int argc, char** argv) { + const std::string name = "compute_register_example"; + auto func = std::make_shared<cp::ScalarFunction>(name, cp::Arity::Unary(), &func_doc); + ABORT_ON_FAILURE(func->AddKernel({cp::InputType::Array(arrow::int64())}, arrow::int64(), + ExampleFunctionImpl)); + + auto registry = cp::GetFunctionRegistry(); + ABORT_ON_FAILURE(registry->AddFunction(std::move(func))); + + arrow::Int64Builder builder(arrow::default_memory_pool()); + std::shared_ptr<arrow::Array> arr; + ABORT_ON_FAILURE(builder.Append(42)); + ABORT_ON_FAILURE(builder.Finish(&arr)); + auto options = std::make_shared<ExampleFunctionOptions>(); + auto maybe_result = cp::CallFunction(name, {arr}, options.get()); + ABORT_ON_FAILURE(maybe_result.status()); + + std::cout << maybe_result->make_array()->ToString() << std::endl; + + // Expression serialization will raise NotImplemented if an expression includes + // FunctionOptions for which serialization is not supported. + auto expr = cp::call(name, {}, options); + auto maybe_serialized = cp::Serialize(expr); + std::cerr << maybe_serialized.status().ToString() << std::endl; + + auto exec_registry = cp::default_exec_factory_registry(); + ABORT_ON_FAILURE( + exec_registry->AddFactory("compute_register_example", ExampleExecNodeFactory)); + + auto maybe_plan = cp::ExecPlan::Make(); + ABORT_ON_FAILURE(maybe_plan.status()); + auto plan = maybe_plan.ValueOrDie(); + + arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> source_gen, sink_gen; + ABORT_ON_FAILURE( + cp::Declaration::Sequence( + { + {"source", cp::SourceNodeOptions{arrow::schema({}), source_gen}}, + {"compute_register_example", ExampleNodeOptions{}}, + {"sink", cp::SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get()) + .status()); + + return EXIT_SUCCESS; +} diff --git a/src/arrow/cpp/examples/arrow/dataset_documentation_example.cc b/src/arrow/cpp/examples/arrow/dataset_documentation_example.cc new file mode 100644 index 000000000..1aac66d4a --- /dev/null +++ b/src/arrow/cpp/examples/arrow/dataset_documentation_example.cc @@ -0,0 +1,374 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example showcases various ways to work with Datasets. It's +// intended to be paired with the documentation. + +#include <arrow/api.h> +#include <arrow/compute/cast.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/dataset/dataset.h> +#include <arrow/dataset/discovery.h> +#include <arrow/dataset/file_base.h> +#include <arrow/dataset/file_ipc.h> +#include <arrow/dataset/file_parquet.h> +#include <arrow/dataset/scanner.h> +#include <arrow/filesystem/filesystem.h> +#include <arrow/ipc/writer.h> +#include <arrow/util/iterator.h> +#include <parquet/arrow/writer.h> + +#include <iostream> +#include <vector> + +namespace ds = arrow::dataset; +namespace fs = arrow::fs; +namespace cp = arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +// (Doc section: Reading Datasets) +// Generate some data for the rest of this example. +std::shared_ptr<arrow::Table> CreateTable() { + auto schema = + arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), + arrow::field("c", arrow::int64())}); + std::shared_ptr<arrow::Array> array_a; + std::shared_ptr<arrow::Array> array_b; + std::shared_ptr<arrow::Array> array_c; + arrow::NumericBuilder<arrow::Int64Type> builder; + ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + ABORT_ON_FAILURE(builder.Finish(&array_a)); + builder.Reset(); + ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); + ABORT_ON_FAILURE(builder.Finish(&array_b)); + builder.Reset(); + ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); + ABORT_ON_FAILURE(builder.Finish(&array_c)); + return arrow::Table::Make(schema, {array_a, array_b, array_c}); +} + +// Set up a dataset by writing two Parquet files. +std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem, + const std::string& root_path) { + auto base_path = root_path + "/parquet_dataset"; + ABORT_ON_FAILURE(filesystem->CreateDir(base_path)); + // Create an Arrow Table + auto table = CreateTable(); + // Write it into two Parquet files + auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie(); + ABORT_ON_FAILURE(parquet::arrow::WriteTable( + *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048)); + output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie(); + ABORT_ON_FAILURE(parquet::arrow::WriteTable( + *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048)); + return base_path; +} +// (Doc section: Reading Datasets) + +// (Doc section: Reading different file formats) +// Set up a dataset by writing two Feather files. +std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem, + const std::string& root_path) { + auto base_path = root_path + "/feather_dataset"; + ABORT_ON_FAILURE(filesystem->CreateDir(base_path)); + // Create an Arrow Table + auto table = CreateTable(); + // Write it into two Feather files + auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie(); + auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie(); + ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5))); + ABORT_ON_FAILURE(writer->Close()); + output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie(); + writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie(); + ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5))); + ABORT_ON_FAILURE(writer->Close()); + return base_path; +} +// (Doc section: Reading different file formats) + +// (Doc section: Reading and writing partitioned data) +// Set up a dataset by writing files with partitioning +std::string CreateExampleParquetHivePartitionedDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) { + auto base_path = root_path + "/parquet_dataset"; + ABORT_ON_FAILURE(filesystem->CreateDir(base_path)); + // Create an Arrow Table + auto schema = arrow::schema( + {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()), + arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())}); + std::vector<std::shared_ptr<arrow::Array>> arrays(4); + arrow::NumericBuilder<arrow::Int64Type> builder; + ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + ABORT_ON_FAILURE(builder.Finish(&arrays[0])); + builder.Reset(); + ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0})); + ABORT_ON_FAILURE(builder.Finish(&arrays[1])); + builder.Reset(); + ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2})); + ABORT_ON_FAILURE(builder.Finish(&arrays[2])); + arrow::StringBuilder string_builder; + ABORT_ON_FAILURE( + string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"})); + ABORT_ON_FAILURE(string_builder.Finish(&arrays[3])); + auto table = arrow::Table::Make(schema, arrays); + // Write it using Datasets + auto dataset = std::make_shared<ds::InMemoryDataset>(table); + auto scanner_builder = dataset->NewScan().ValueOrDie(); + auto scanner = scanner_builder->Finish().ValueOrDie(); + + // The partition schema determines which fields are part of the partitioning. + auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())}); + // We'll use Hive-style partitioning, which creates directories with "key=value" pairs. + auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema); + // We'll write Parquet files. + auto format = std::make_shared<ds::ParquetFileFormat>(); + ds::FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = format->DefaultWriteOptions(); + write_options.filesystem = filesystem; + write_options.base_dir = base_path; + write_options.partitioning = partitioning; + write_options.basename_template = "part{i}.parquet"; + ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner)); + return base_path; +} +// (Doc section: Reading and writing partitioned data) + +// (Doc section: Dataset discovery) +// Read the whole dataset with the given format, without partitioning. +std::shared_ptr<arrow::Table> ScanWholeDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, + const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { + // Create a dataset by scanning the filesystem for files + fs::FileSelector selector; + selector.base_dir = base_dir; + auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, + ds::FileSystemFactoryOptions()) + .ValueOrDie(); + auto dataset = factory->Finish().ValueOrDie(); + // Print out the fragments + for (const auto& fragment : dataset->GetFragments().ValueOrDie()) { + std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl; + } + // Read the entire dataset as a Table + auto scan_builder = dataset->NewScan().ValueOrDie(); + auto scanner = scan_builder->Finish().ValueOrDie(); + return scanner->ToTable().ValueOrDie(); +} +// (Doc section: Dataset discovery) + +// (Doc section: Filtering data) +// Read a dataset, but select only column "b" and only rows where b < 4. +// +// This is useful when you only want a few columns from a dataset. Where possible, +// Datasets will push down the column selection such that less work is done. +std::shared_ptr<arrow::Table> FilterAndSelectDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, + const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { + fs::FileSelector selector; + selector.base_dir = base_dir; + auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, + ds::FileSystemFactoryOptions()) + .ValueOrDie(); + auto dataset = factory->Finish().ValueOrDie(); + // Read specified columns with a row filter + auto scan_builder = dataset->NewScan().ValueOrDie(); + ABORT_ON_FAILURE(scan_builder->Project({"b"})); + ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4)))); + auto scanner = scan_builder->Finish().ValueOrDie(); + return scanner->ToTable().ValueOrDie(); +} +// (Doc section: Filtering data) + +// (Doc section: Projecting columns) +// Read a dataset, but with column projection. +// +// This is useful to derive new columns from existing data. For example, here we +// demonstrate casting a column to a different type, and turning a numeric column into a +// boolean column based on a predicate. You could also rename columns or perform +// computations involving multiple columns. +std::shared_ptr<arrow::Table> ProjectDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, + const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { + fs::FileSelector selector; + selector.base_dir = base_dir; + auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, + ds::FileSystemFactoryOptions()) + .ValueOrDie(); + auto dataset = factory->Finish().ValueOrDie(); + // Read specified columns with a row filter + auto scan_builder = dataset->NewScan().ValueOrDie(); + ABORT_ON_FAILURE(scan_builder->Project( + { + // Leave column "a" as-is. + cp::field_ref("a"), + // Cast column "b" to float32. + cp::call("cast", {cp::field_ref("b")}, + arrow::compute::CastOptions::Safe(arrow::float32())), + // Derive a boolean column from "c". + cp::equal(cp::field_ref("c"), cp::literal(1)), + }, + {"a_renamed", "b_as_float32", "c_1"})); + auto scanner = scan_builder->Finish().ValueOrDie(); + return scanner->ToTable().ValueOrDie(); +} +// (Doc section: Projecting columns) + +// (Doc section: Projecting columns #2) +// Read a dataset, but with column projection. +// +// This time, we read all original columns plus one derived column. This simply combines +// the previous two examples: selecting a subset of columns by name, and deriving new +// columns with an expression. +std::shared_ptr<arrow::Table> SelectAndProjectDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, + const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { + fs::FileSelector selector; + selector.base_dir = base_dir; + auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, + ds::FileSystemFactoryOptions()) + .ValueOrDie(); + auto dataset = factory->Finish().ValueOrDie(); + // Read specified columns with a row filter + auto scan_builder = dataset->NewScan().ValueOrDie(); + std::vector<std::string> names; + std::vector<cp::Expression> exprs; + // Read all the original columns. + for (const auto& field : dataset->schema()->fields()) { + names.push_back(field->name()); + exprs.push_back(cp::field_ref(field->name())); + } + // Also derive a new column. + names.emplace_back("b_large"); + exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1))); + ABORT_ON_FAILURE(scan_builder->Project(exprs, names)); + auto scanner = scan_builder->Finish().ValueOrDie(); + return scanner->ToTable().ValueOrDie(); +} +// (Doc section: Projecting columns #2) + +// (Doc section: Reading and writing partitioned data #2) +// Read an entire dataset, but with partitioning information. +std::shared_ptr<arrow::Table> ScanPartitionedDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, + const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { + fs::FileSelector selector; + selector.base_dir = base_dir; + selector.recursive = true; // Make sure to search subdirectories + ds::FileSystemFactoryOptions options; + // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition + // schema. + options.partitioning = ds::HivePartitioning::MakeFactory(); + auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options) + .ValueOrDie(); + auto dataset = factory->Finish().ValueOrDie(); + // Print out the fragments + for (const auto& fragment : dataset->GetFragments().ValueOrDie()) { + std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl; + std::cout << "Partition expression: " + << (*fragment)->partition_expression().ToString() << std::endl; + } + auto scan_builder = dataset->NewScan().ValueOrDie(); + auto scanner = scan_builder->Finish().ValueOrDie(); + return scanner->ToTable().ValueOrDie(); +} +// (Doc section: Reading and writing partitioned data #2) + +// (Doc section: Reading and writing partitioned data #3) +// Read an entire dataset, but with partitioning information. Also, filter the dataset on +// the partition values. +std::shared_ptr<arrow::Table> FilterPartitionedDataset( + const std::shared_ptr<fs::FileSystem>& filesystem, + const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) { + fs::FileSelector selector; + selector.base_dir = base_dir; + selector.recursive = true; + ds::FileSystemFactoryOptions options; + options.partitioning = ds::HivePartitioning::MakeFactory(); + auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options) + .ValueOrDie(); + auto dataset = factory->Finish().ValueOrDie(); + auto scan_builder = dataset->NewScan().ValueOrDie(); + // Filter based on the partition values. This will mean that we won't even read the + // files whose partition expressions don't match the filter. + ABORT_ON_FAILURE( + scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b")))); + auto scanner = scan_builder->Finish().ValueOrDie(); + return scanner->ToTable().ValueOrDie(); +} +// (Doc section: Reading and writing partitioned data #3) + +int main(int argc, char** argv) { + if (argc < 3) { + // Fake success for CI purposes. + return EXIT_SUCCESS; + } + + std::string uri = argv[1]; + std::string format_name = argv[2]; + std::string mode = argc > 3 ? argv[3] : "no_filter"; + std::string root_path; + auto fs = fs::FileSystemFromUri(uri, &root_path).ValueOrDie(); + + std::string base_path; + std::shared_ptr<ds::FileFormat> format; + if (format_name == "feather") { + format = std::make_shared<ds::IpcFileFormat>(); + base_path = CreateExampleFeatherDataset(fs, root_path); + } else if (format_name == "parquet") { + format = std::make_shared<ds::ParquetFileFormat>(); + base_path = CreateExampleParquetDataset(fs, root_path); + } else if (format_name == "parquet_hive") { + format = std::make_shared<ds::ParquetFileFormat>(); + base_path = CreateExampleParquetHivePartitionedDataset(fs, root_path); + } else { + std::cerr << "Unknown format: " << format_name << std::endl; + std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl; + return EXIT_FAILURE; + } + + std::shared_ptr<arrow::Table> table; + if (mode == "no_filter") { + table = ScanWholeDataset(fs, format, base_path); + } else if (mode == "filter") { + table = FilterAndSelectDataset(fs, format, base_path); + } else if (mode == "project") { + table = ProjectDataset(fs, format, base_path); + } else if (mode == "select_project") { + table = SelectAndProjectDataset(fs, format, base_path); + } else if (mode == "partitioned") { + table = ScanPartitionedDataset(fs, format, base_path); + } else if (mode == "filter_partitioned") { + table = FilterPartitionedDataset(fs, format, base_path); + } else { + std::cerr << "Unknown mode: " << mode << std::endl; + std::cerr + << "Supported modes: no_filter, filter, project, select_project, partitioned" + << std::endl; + return EXIT_FAILURE; + } + std::cout << "Read " << table->num_rows() << " rows" << std::endl; + std::cout << table->ToString() << std::endl; + return EXIT_SUCCESS; +} diff --git a/src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc b/src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc new file mode 100644 index 000000000..cd9b89fe3 --- /dev/null +++ b/src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/dataset/dataset.h> +#include <arrow/dataset/discovery.h> +#include <arrow/dataset/file_base.h> +#include <arrow/dataset/file_parquet.h> +#include <arrow/dataset/scanner.h> +#include <arrow/filesystem/filesystem.h> +#include <arrow/filesystem/path_util.h> + +#include <cstdlib> +#include <iostream> + +using arrow::field; +using arrow::int16; +using arrow::Schema; +using arrow::Table; + +namespace fs = arrow::fs; + +namespace ds = arrow::dataset; + +namespace cp = arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +struct Configuration { + // Increase the ds::DataSet by repeating `repeat` times the ds::Dataset. + size_t repeat = 1; + + // Indicates if the Scanner::ToTable should consume in parallel. + bool use_threads = true; + + // Indicates to the Scan operator which columns are requested. This + // optimization avoid deserializing unneeded columns. + std::vector<std::string> projected_columns = {"pickup_at", "dropoff_at", + "total_amount"}; + + // Indicates the filter by which rows will be filtered. This optimization can + // make use of partition information and/or file metadata if possible. + cp::Expression filter = + cp::greater(cp::field_ref("total_amount"), cp::literal(1000.0f)); + + ds::InspectOptions inspect_options{}; + ds::FinishOptions finish_options{}; +} conf; + +std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri, + std::string* path) { + return fs::FileSystemFromUri(uri, path).ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetDatasetFromDirectory( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string dir) { + // Find all files under `path` + fs::FileSelector s; + s.base_dir = dir; + s.recursive = true; + + ds::FileSystemFactoryOptions options; + // The factory will try to build a child dataset. + auto factory = ds::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie(); + + // Try to infer a common schema for all files. + auto schema = factory->Inspect(conf.inspect_options).ValueOrDie(); + // Caller can optionally decide another schema as long as it is compatible + // with the previous one, e.g. `factory->Finish(compatible_schema)`. + auto child = factory->Finish(conf.finish_options).ValueOrDie(); + + ds::DatasetVector children{conf.repeat, child}; + auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); + + return dataset.ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetParquetDatasetFromMetadata( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string metadata_path) { + ds::ParquetFactoryOptions options; + auto factory = + ds::ParquetDatasetFactory::Make(metadata_path, fs, format, options).ValueOrDie(); + return factory->Finish().ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetDatasetFromFile( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string file) { + ds::FileSystemFactoryOptions options; + // The factory will try to build a child dataset. + auto factory = + ds::FileSystemDatasetFactory::Make(fs, {file}, format, options).ValueOrDie(); + + // Try to infer a common schema for all files. + auto schema = factory->Inspect(conf.inspect_options).ValueOrDie(); + // Caller can optionally decide another schema as long as it is compatible + // with the previous one, e.g. `factory->Finish(compatible_schema)`. + auto child = factory->Finish(conf.finish_options).ValueOrDie(); + + ds::DatasetVector children; + children.resize(conf.repeat, child); + auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); + + return dataset.ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetDatasetFromPath( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string path) { + auto info = fs->GetFileInfo(path).ValueOrDie(); + if (info.IsDirectory()) { + return GetDatasetFromDirectory(fs, format, path); + } + + auto dirname_basename = arrow::fs::internal::GetAbstractPathParent(path); + auto basename = dirname_basename.second; + + if (basename == "_metadata") { + return GetParquetDatasetFromMetadata(fs, format, path); + } + + return GetDatasetFromFile(fs, format, path); +} + +std::shared_ptr<ds::Scanner> GetScannerFromDataset(std::shared_ptr<ds::Dataset> dataset, + std::vector<std::string> columns, + cp::Expression filter, + bool use_threads) { + auto scanner_builder = dataset->NewScan().ValueOrDie(); + + if (!columns.empty()) { + ABORT_ON_FAILURE(scanner_builder->Project(columns)); + } + + ABORT_ON_FAILURE(scanner_builder->Filter(filter)); + + ABORT_ON_FAILURE(scanner_builder->UseThreads(use_threads)); + + return scanner_builder->Finish().ValueOrDie(); +} + +std::shared_ptr<Table> GetTableFromScanner(std::shared_ptr<ds::Scanner> scanner) { + return scanner->ToTable().ValueOrDie(); +} + +int main(int argc, char** argv) { + auto format = std::make_shared<ds::ParquetFileFormat>(); + + if (argc != 2) { + // Fake success for CI purposes. + return EXIT_SUCCESS; + } + + std::string path; + auto fs = GetFileSystemFromUri(argv[1], &path); + + auto dataset = GetDatasetFromPath(fs, format, path); + + auto scanner = GetScannerFromDataset(dataset, conf.projected_columns, conf.filter, + conf.use_threads); + + auto table = GetTableFromScanner(scanner); + std::cout << "Table size: " << table->num_rows() << "\n"; + + return EXIT_SUCCESS; +} diff --git a/src/arrow/cpp/examples/arrow/row_wise_conversion_example.cc b/src/arrow/cpp/examples/arrow/row_wise_conversion_example.cc new file mode 100644 index 000000000..1af1c5547 --- /dev/null +++ b/src/arrow/cpp/examples/arrow/row_wise_conversion_example.cc @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/result.h> + +#include <cstdint> +#include <iomanip> +#include <iostream> +#include <vector> + +using arrow::DoubleBuilder; +using arrow::Int64Builder; +using arrow::ListBuilder; + +// While we want to use columnar data structures to build efficient operations, we +// often receive data in a row-wise fashion from other systems. In the following, +// we want give a brief introduction into the classes provided by Apache Arrow by +// showing how to transform row-wise data into a columnar table. +// +// The table contains an id for a product, the number of components in the product +// and the cost of each component. +// +// The data in this example is stored in the following struct: +struct data_row { + int64_t id; + int64_t components; + std::vector<double> component_cost; +}; + +// Transforming a vector of structs into a columnar Table. +// +// The final representation should be an `arrow::Table` which in turn +// is made up of an `arrow::Schema` and a list of +// `arrow::ChunkedArray` instances. As the first step, we will iterate +// over the data and build up the arrays incrementally. For this +// task, we provide `arrow::ArrayBuilder` classes that help in the +// construction of the final `arrow::Array` instances. +// +// For each type, Arrow has a specially typed builder class. For the primitive +// values `id` and `components` we can use the `arrow::Int64Builder`. For the +// `component_cost` vector, we need to have two builders, a top-level +// `arrow::ListBuilder` that builds the array of offsets and a nested +// `arrow::DoubleBuilder` that constructs the underlying values array that +// is referenced by the offsets in the former array. +arrow::Result<std::shared_ptr<arrow::Table>> VectorToColumnarTable( + const std::vector<struct data_row>& rows) { + // The builders are more efficient using + // arrow::jemalloc::MemoryPool::default_pool() as this can increase the size of + // the underlying memory regions in-place. At the moment, arrow::jemalloc is only + // supported on Unix systems, not Windows. + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + Int64Builder id_builder(pool); + Int64Builder components_builder(pool); + ListBuilder component_cost_builder(pool, std::make_shared<DoubleBuilder>(pool)); + // The following builder is owned by component_cost_builder. + DoubleBuilder* component_item_cost_builder = + (static_cast<DoubleBuilder*>(component_cost_builder.value_builder())); + + // Now we can loop over our existing data and insert it into the builders. The + // `Append` calls here may fail (e.g. we cannot allocate enough additional memory). + // Thus we need to check their return values. For more information on these values, + // check the documentation about `arrow::Status`. + for (const data_row& row : rows) { + ARROW_RETURN_NOT_OK(id_builder.Append(row.id)); + ARROW_RETURN_NOT_OK(components_builder.Append(row.components)); + + // Indicate the start of a new list row. This will memorise the current + // offset in the values builder. + ARROW_RETURN_NOT_OK(component_cost_builder.Append()); + // Store the actual values. The same memory layout is + // used for the component cost data, in this case a vector of + // type double, as for the memory that Arrow uses to hold this + // data and will be created. + ARROW_RETURN_NOT_OK(component_item_cost_builder->AppendValues( + row.component_cost.data(), row.component_cost.size())); + } + + // At the end, we finalise the arrays, declare the (type) schema and combine them + // into a single `arrow::Table`: + std::shared_ptr<arrow::Array> id_array; + ARROW_RETURN_NOT_OK(id_builder.Finish(&id_array)); + std::shared_ptr<arrow::Array> components_array; + ARROW_RETURN_NOT_OK(components_builder.Finish(&components_array)); + // No need to invoke component_cost_builder.Finish because it is implied by + // the parent builder's Finish invocation. + std::shared_ptr<arrow::Array> component_cost_array; + ARROW_RETURN_NOT_OK(component_cost_builder.Finish(&component_cost_array)); + + std::vector<std::shared_ptr<arrow::Field>> schema_vector = { + arrow::field("id", arrow::int64()), arrow::field("components", arrow::int64()), + arrow::field("component_cost", arrow::list(arrow::float64()))}; + + auto schema = std::make_shared<arrow::Schema>(schema_vector); + + // The final `table` variable is the one we can then pass on to other functions + // that can consume Apache Arrow memory structures. This object has ownership of + // all referenced data, thus we don't have to care about undefined references once + // we leave the scope of the function building the table and its underlying arrays. + std::shared_ptr<arrow::Table> table = + arrow::Table::Make(schema, {id_array, components_array, component_cost_array}); + + return table; +} + +arrow::Result<std::vector<data_row>> ColumnarTableToVector( + const std::shared_ptr<arrow::Table>& table) { + // To convert an Arrow table back into the same row-wise representation as in the + // above section, we first will check that the table conforms to our expected + // schema and then will build up the vector of rows incrementally. + // + // For the check if the table is as expected, we can utilise solely its schema. + std::vector<std::shared_ptr<arrow::Field>> schema_vector = { + arrow::field("id", arrow::int64()), arrow::field("components", arrow::int64()), + arrow::field("component_cost", arrow::list(arrow::float64()))}; + auto expected_schema = std::make_shared<arrow::Schema>(schema_vector); + + if (!expected_schema->Equals(*table->schema())) { + // The table doesn't have the expected schema thus we cannot directly + // convert it to our target representation. + return arrow::Status::Invalid("Schemas are not matching!"); + } + + // As we have ensured that the table has the expected structure, we can unpack the + // underlying arrays. For the primitive columns `id` and `components` we can use the + // high level functions to get the values whereas for the nested column + // `component_costs` we need to access the C-pointer to the data to copy its + // contents into the resulting `std::vector<double>`. Here we need to be careful to + // also add the offset to the pointer. This offset is needed to enable zero-copy + // slicing operations. While this could be adjusted automatically for double + // arrays, this cannot be done for the accompanying bitmap as often the slicing + // border would be inside a byte. + + auto ids = std::static_pointer_cast<arrow::Int64Array>(table->column(0)->chunk(0)); + auto components = + std::static_pointer_cast<arrow::Int64Array>(table->column(1)->chunk(0)); + auto component_cost = + std::static_pointer_cast<arrow::ListArray>(table->column(2)->chunk(0)); + auto component_cost_values = + std::static_pointer_cast<arrow::DoubleArray>(component_cost->values()); + // To enable zero-copy slices, the native values pointer might need to account + // for this slicing offset. This is not needed for the higher level functions + // like Value(…) that already account for this offset internally. + const double* ccv_ptr = component_cost_values->raw_values(); + std::vector<data_row> rows; + for (int64_t i = 0; i < table->num_rows(); i++) { + // Another simplification in this example is that we assume that there are + // no null entries, e.g. each row is fill with valid values. + int64_t id = ids->Value(i); + int64_t component = components->Value(i); + const double* first = ccv_ptr + component_cost->value_offset(i); + const double* last = ccv_ptr + component_cost->value_offset(i + 1); + std::vector<double> components_vec(first, last); + rows.push_back({id, component, components_vec}); + } + + return rows; +} + +int main(int argc, char** argv) { + std::vector<data_row> rows = { + {1, 1, {10.0}}, {2, 3, {11.0, 12.0, 13.0}}, {3, 2, {15.0, 25.0}}}; + std::shared_ptr<arrow::Table> table; + std::vector<data_row> expected_rows; + + arrow::Result<std::shared_ptr<arrow::Table>> table_result = VectorToColumnarTable(rows); + table = std::move(table_result).ValueOrDie(); + + arrow::Result<std::vector<data_row>> expected_rows_result = + ColumnarTableToVector(table); + expected_rows = std::move(expected_rows_result).ValueOrDie(); + + assert(rows.size() == expected_rows.size()); + + // Print out contents of table, should get + // ID Components Component prices + // 1 1 10 + // 2 3 11 12 13 + // 3 2 15 25 + std::cout << std::left << std::setw(3) << "ID " << std::left << std::setw(11) + << "Components " << std::left << std::setw(15) << "Component prices " + << std::endl; + for (const auto& row : rows) { + std::cout << std::left << std::setw(3) << row.id << std::left << std::setw(11) + << row.components; + for (const auto& cost : row.component_cost) { + std::cout << std::left << std::setw(4) << cost; + } + std::cout << std::endl; + } + return EXIT_SUCCESS; +} diff --git a/src/arrow/cpp/examples/minimal_build/.gitignore b/src/arrow/cpp/examples/minimal_build/.gitignore new file mode 100644 index 000000000..c94f3ec42 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/.gitignore @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +test.arrow diff --git a/src/arrow/cpp/examples/minimal_build/CMakeLists.txt b/src/arrow/cpp/examples/minimal_build/CMakeLists.txt new file mode 100644 index 000000000..9fc20c70f --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/CMakeLists.txt @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 3.0) + +project(ArrowMinimalExample) + +option(ARROW_LINK_SHARED "Link to the Arrow shared library" ON) + +find_package(Arrow REQUIRED) + +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_BUILD_TYPE Release) + +message(STATUS "Arrow version: ${ARROW_VERSION}") +message(STATUS "Arrow SO version: ${ARROW_FULL_SO_VERSION}") + +add_executable(arrow_example example.cc) + +if (ARROW_LINK_SHARED) + target_link_libraries(arrow_example PRIVATE arrow_shared) +else() + set(THREADS_PREFER_PTHREAD_FLAG ON) + find_package(Threads REQUIRED) + target_link_libraries(arrow_example PRIVATE arrow_static Threads::Threads) +endif() diff --git a/src/arrow/cpp/examples/minimal_build/README.md b/src/arrow/cpp/examples/minimal_build/README.md new file mode 100644 index 000000000..9f889f6ad --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/README.md @@ -0,0 +1,88 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Minimal C++ build example + +This directory showcases a minimal build of Arrow C++ (in `build_arrow.sh`). +This minimal build is then used by an example third-party C++ project +using CMake logic to compile and link against the Arrow C++ library +(in `build_example.sh` and `CMakeLists.txt`). + +When run, the example executable reads a file named `test.csv`, +displays its parsed contents, and then saves them in Arrow IPC format in +a file named `test.arrow`. + +## Running the example + +You can run this simple example using [Docker Compose][docker-compose] +and the given `docker-compose.yml` and dockerfiles, which installs a +minimal Ubuntu image with a basic C++ toolchain. + +Just open a terminal in this directory and run the following commands: + +```bash +docker-compose run --rm minimal +``` + +Note that this example mounts two volumes inside the Docker image: +* `/arrow` points to the Arrow source tree +* `/io` points to this example directory + +## Statically-linked builds + +We've provided an example build configuration here with CMake to show how to +create a statically-linked executable with bundled dependencies. + +To run it on Linux, you can use the above Docker image: + +```bash +docker-compose run --rm static +``` + +On macOS, you can use the `run_static.sh` but you must set some environment +variables to point the script to your Arrow checkout, for example: + +```bash +export ARROW_DIR=path/to/arrow-clone +export EXAMPLE_DIR=$ARROW_DIR/cpp/examples/minimal_build +export ARROW_BUILD_DIR=$(pwd)/arrow-build +export EXAMPLE_BUILD_DIR=$(pwd)/example + +./run_static.sh +``` + +On Windows, you can run `run_static.bat` from the command prompt with Visual +Studio's command line tools enabled and CMake and ninja build in the path: + +``` +call run_static.bat +``` + +### Static linking against system libraries + +You can also use static libraries of Arrow's dependencies from the +system. To run this configuration, set +`ARROW_DEPENDENCY_SOURCE=SYSTEM` for `run_static.sh`. You can use +`docker-compose` for this too: + +```bash +docker-compose run --rm static-system-dependency +``` + +[docker-compose]: https://docs.docker.com/compose/ diff --git a/src/arrow/cpp/examples/minimal_build/build_arrow.sh b/src/arrow/cpp/examples/minimal_build/build_arrow.sh new file mode 100755 index 000000000..402c312e4 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/build_arrow.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -ex + +NPROC=$(nproc) + +mkdir -p $ARROW_BUILD_DIR +pushd $ARROW_BUILD_DIR + +# Enable the CSV reader as it's used by the example third-party build +cmake /arrow/cpp \ + -DARROW_CSV=ON \ + -DARROW_JEMALLOC=OFF \ + $ARROW_CMAKE_OPTIONS + +make -j$NPROC +make install + +popd diff --git a/src/arrow/cpp/examples/minimal_build/build_example.sh b/src/arrow/cpp/examples/minimal_build/build_example.sh new file mode 100755 index 000000000..a315755a5 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/build_example.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -ex + +mkdir -p $EXAMPLE_BUILD_DIR +pushd $EXAMPLE_BUILD_DIR + +cmake /io +make + +popd diff --git a/src/arrow/cpp/examples/minimal_build/docker-compose.yml b/src/arrow/cpp/examples/minimal_build/docker-compose.yml new file mode 100644 index 000000000..6e2dcef81 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/docker-compose.yml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.5' + +services: + minimal: + build: + context: . + dockerfile: minimal.dockerfile + volumes: + - ../../../:/arrow:delegated + - .:/io:delegated + command: + - "/io/run.sh" + + static: + build: + context: . + dockerfile: minimal.dockerfile + volumes: + - ../../../:/arrow:delegated + - .:/io:delegated + command: + - "/io/run_static.sh" + + static-system-dependency: + build: + context: . + dockerfile: system_dependency.dockerfile + environment: + ARROW_DEPENDENCY_SOURCE: "SYSTEM" + volumes: + - ../../../:/arrow:delegated + - .:/io:delegated + command: + - "/io/run_static.sh" diff --git a/src/arrow/cpp/examples/minimal_build/example.cc b/src/arrow/cpp/examples/minimal_build/example.cc new file mode 100644 index 000000000..9bfb9953e --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/example.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/csv/api.h> +#include <arrow/io/api.h> +#include <arrow/ipc/api.h> +#include <arrow/pretty_print.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <arrow/table.h> + +#include <iostream> + +using arrow::Status; + +namespace { + +Status RunMain(int argc, char** argv) { + const char* csv_filename = "test.csv"; + const char* arrow_filename = "test.arrow"; + + std::cerr << "* Reading CSV file '" << csv_filename << "' into table" << std::endl; + ARROW_ASSIGN_OR_RAISE(auto input_file, arrow::io::ReadableFile::Open(csv_filename)); + ARROW_ASSIGN_OR_RAISE(auto csv_reader, arrow::csv::TableReader::Make( + arrow::io::default_io_context(), input_file, + arrow::csv::ReadOptions::Defaults(), + arrow::csv::ParseOptions::Defaults(), + arrow::csv::ConvertOptions::Defaults())); + ARROW_ASSIGN_OR_RAISE(auto table, csv_reader->Read()); + + std::cerr << "* Read table:" << std::endl; + ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, {}, &std::cerr)); + + std::cerr << "* Writing table into Arrow IPC file '" << arrow_filename << "'" + << std::endl; + ARROW_ASSIGN_OR_RAISE(auto output_file, + arrow::io::FileOutputStream::Open(arrow_filename)); + ARROW_ASSIGN_OR_RAISE(auto batch_writer, + arrow::ipc::MakeFileWriter(output_file, table->schema())); + ARROW_RETURN_NOT_OK(batch_writer->WriteTable(*table)); + ARROW_RETURN_NOT_OK(batch_writer->Close()); + + return Status::OK(); +} + +} // namespace + +int main(int argc, char** argv) { + Status st = RunMain(argc, argv); + if (!st.ok()) { + std::cerr << st << std::endl; + return 1; + } + return 0; +} diff --git a/src/arrow/cpp/examples/minimal_build/minimal.dockerfile b/src/arrow/cpp/examples/minimal_build/minimal.dockerfile new file mode 100644 index 000000000..9361fc5e8 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/minimal.dockerfile @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM ubuntu:focal + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update -y -q && \ + apt-get install -y -q --no-install-recommends \ + build-essential \ + cmake \ + pkg-config && \ + apt-get clean && rm -rf /var/lib/apt/lists* diff --git a/src/arrow/cpp/examples/minimal_build/run.sh b/src/arrow/cpp/examples/minimal_build/run.sh new file mode 100755 index 000000000..a76058b0b --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/run.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +cd /io + +export ARROW_BUILD_DIR=/build/arrow +export EXAMPLE_BUILD_DIR=/build/example + +echo +echo "==" +echo "== Building Arrow C++ library" +echo "==" +echo + +./build_arrow.sh + +echo +echo "==" +echo "== Building example project using Arrow C++ library" +echo "==" +echo + +./build_example.sh + +echo +echo "==" +echo "== Running example project" +echo "==" +echo + +${EXAMPLE_BUILD_DIR}/arrow_example diff --git a/src/arrow/cpp/examples/minimal_build/run_static.bat b/src/arrow/cpp/examples/minimal_build/run_static.bat new file mode 100644 index 000000000..bbc7ff8f7 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/run_static.bat @@ -0,0 +1,88 @@ +@rem Licensed to the Apache Software Foundation (ASF) under one +@rem or more contributor license agreements. See the NOTICE file +@rem distributed with this work for additional information +@rem regarding copyright ownership. The ASF licenses this file +@rem to you under the Apache License, Version 2.0 (the +@rem "License"); you may not use this file except in compliance +@rem with the License. You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, +@rem software distributed under the License is distributed on an +@rem "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@rem KIND, either express or implied. See the License for the +@rem specific language governing permissions and limitations +@rem under the License. + +@echo on + +@rem clean up prior attempts +if exist "arrow-build" rd arrow-build /s /q +if exist "dist" rd dist /s /q +if exist "example" rd example /s /q + +echo +echo "==" +echo "== Building Arrow C++ library" +echo "==" +echo + +set INSTALL_PREFIX=%cd%\dist + +mkdir arrow-build +pushd arrow-build + +@rem bzip2_ep fails with this method + +cmake ..\..\.. ^ + -GNinja ^ + -DCMAKE_INSTALL_PREFIX=%INSTALL_PREFIX% ^ + -DARROW_DEPENDENCY_SOURCE=BUNDLED ^ + -DARROW_BUILD_SHARED=OFF ^ + -DARROW_BUILD_STATIC=ON ^ + -DARROW_COMPUTE=ON ^ + -DARROW_CSV=ON ^ + -DARROW_DATASET=ON ^ + -DARROW_FILESYSTEM=ON ^ + -DARROW_HDFS=ON ^ + -DARROW_JSON=ON ^ + -DARROW_MIMALLOC=ON ^ + -DARROW_ORC=ON ^ + -DARROW_PARQUET=ON ^ + -DARROW_PLASMA=ON ^ + -DARROW_WITH_BROTLI=ON ^ + -DARROW_WITH_BZ2=OFF ^ + -DARROW_WITH_LZ4=ON ^ + -DARROW_WITH_SNAPPY=ON ^ + -DARROW_WITH_ZLIB=ON ^ + -DARROW_WITH_ZSTD=ON + +ninja install + +popd + +echo +echo "==" +echo "== Building example project using Arrow C++ library" +echo "==" +echo + +mkdir example +pushd example + +cmake .. ^ + -GNinja ^ + -DCMAKE_PREFIX_PATH="%INSTALL_PREFIX%" ^ + -DARROW_LINK_SHARED=OFF +ninja + +popd + +echo +echo "==" +echo "== Running example project" +echo "==" +echo + +call example\arrow_example.exe diff --git a/src/arrow/cpp/examples/minimal_build/run_static.sh b/src/arrow/cpp/examples/minimal_build/run_static.sh new file mode 100755 index 000000000..ff3bb8945 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/run_static.sh @@ -0,0 +1,121 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +: ${ARROW_DIR:=/arrow} +: ${EXAMPLE_DIR:=/io} +: ${ARROW_BUILD_DIR:=/build/arrow} +: ${EXAMPLE_BUILD_DIR:=/build/example} + +: ${ARROW_DEPENDENCY_SOURCE:=BUNDLED} + +echo +echo "==" +echo "== Building Arrow C++ library" +echo "==" +echo + +mkdir -p $ARROW_BUILD_DIR +pushd $ARROW_BUILD_DIR + +NPROC=$(nproc) + +cmake $ARROW_DIR/cpp \ + -DARROW_BUILD_SHARED=OFF \ + -DARROW_BUILD_STATIC=ON \ + -DARROW_COMPUTE=ON \ + -DARROW_CSV=ON \ + -DARROW_DATASET=ON \ + -DARROW_DEPENDENCY_SOURCE=${ARROW_DEPENDENCY_SOURCE} \ + -DARROW_DEPENDENCY_USE_SHARED=OFF \ + -DARROW_FILESYSTEM=ON \ + -DARROW_HDFS=ON \ + -DARROW_JEMALLOC=ON \ + -DARROW_JSON=ON \ + -DARROW_ORC=ON \ + -DARROW_PARQUET=ON \ + -DARROW_PLASMA=ON \ + -DARROW_WITH_BROTLI=ON \ + -DARROW_WITH_BZ2=ON \ + -DARROW_WITH_LZ4=ON \ + -DARROW_WITH_SNAPPY=ON \ + -DARROW_WITH_ZLIB=ON \ + -DARROW_WITH_ZSTD=ON \ + -DORC_SOURCE=BUNDLED \ + $ARROW_CMAKE_OPTIONS + +make -j$NPROC +make install + +popd + +echo +echo "==" +echo "== CMake:" +echo "== Building example project using Arrow C++ library" +echo "==" +echo + +rm -rf $EXAMPLE_BUILD_DIR +mkdir -p $EXAMPLE_BUILD_DIR +pushd $EXAMPLE_BUILD_DIR + +cmake $EXAMPLE_DIR -DARROW_LINK_SHARED=OFF +make + +popd + +echo +echo "==" +echo "== CMake:" +echo "== Running example project" +echo "==" +echo + +pushd $EXAMPLE_DIR + +$EXAMPLE_BUILD_DIR/arrow_example + +echo +echo "==" +echo "== pkg-config" +echo "== Building example project using Arrow C++ library" +echo "==" +echo + +rm -rf $EXAMPLE_BUILD_DIR +mkdir -p $EXAMPLE_BUILD_DIR +${CXX:-c++} \ + -o $EXAMPLE_BUILD_DIR/arrow_example \ + $EXAMPLE_DIR/example.cc \ + $(PKG_CONFIG_PATH=$ARROW_BUILD_DIR/lib/pkgconfig \ + pkg-config --cflags --libs --static arrow) + +popd + +echo +echo "==" +echo "== pkg-config:" +echo "== Running example project" +echo "==" +echo + +pushd $EXAMPLE_DIR + +$EXAMPLE_BUILD_DIR/arrow_example diff --git a/src/arrow/cpp/examples/minimal_build/system_dependency.dockerfile b/src/arrow/cpp/examples/minimal_build/system_dependency.dockerfile new file mode 100644 index 000000000..926fcaf6f --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/system_dependency.dockerfile @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM ubuntu:focal + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update -y -q && \ + apt-get install -y -q --no-install-recommends \ + build-essential \ + cmake \ + libboost-filesystem-dev \ + libboost-regex-dev \ + libboost-system-dev \ + libbrotli-dev \ + libbz2-dev \ + libgflags-dev \ + liblz4-dev \ + libprotobuf-dev \ + libprotoc-dev \ + libre2-dev \ + libsnappy-dev \ + libthrift-dev \ + libutf8proc-dev \ + libzstd-dev \ + pkg-config \ + protobuf-compiler \ + rapidjson-dev \ + zlib1g-dev && \ + apt-get clean && rm -rf /var/lib/apt/lists* diff --git a/src/arrow/cpp/examples/minimal_build/test.csv b/src/arrow/cpp/examples/minimal_build/test.csv new file mode 100644 index 000000000..ca2440852 --- /dev/null +++ b/src/arrow/cpp/examples/minimal_build/test.csv @@ -0,0 +1,3 @@ +Integers,Strings,Timestamps +1,Some,2018-11-13 17:11:10 +2,data,N/A diff --git a/src/arrow/cpp/examples/parquet/CMakeLists.txt b/src/arrow/cpp/examples/parquet/CMakeLists.txt new file mode 100644 index 000000000..2d16948ae --- /dev/null +++ b/src/arrow/cpp/examples/parquet/CMakeLists.txt @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_executable(parquet_low_level_example low_level_api/reader_writer.cc) +add_executable(parquet_low_level_example2 low_level_api/reader_writer2.cc) +add_executable(parquet_arrow_example parquet_arrow/reader_writer.cc) +add_executable(parquet_stream_api_example parquet_stream_api/stream_reader_writer.cc) +target_include_directories(parquet_low_level_example PRIVATE low_level_api/) +target_include_directories(parquet_low_level_example2 PRIVATE low_level_api/) + +# The variables in these files are for illustration purposes +set(PARQUET_EXAMPLES_WARNING_SUPPRESSIONS + low_level_api/reader_writer.cc + low_level_api/reader_writer2.cc) + +if (PARQUET_REQUIRE_ENCRYPTION) + add_executable(parquet_encryption_example low_level_api/encryption_reader_writer.cc) + add_executable(parquet_encryption_example_all_crypto_options low_level_api/encryption_reader_writer_all_crypto_options.cc) + target_include_directories(parquet_encryption_example PRIVATE low_level_api/) + target_include_directories(parquet_encryption_example_all_crypto_options PRIVATE low_level_api/) + + set(PARQUET_EXAMPLES_WARNING_SUPPRESSIONS + ${PARQUET_EXAMPLES_WARNING_SUPPRESSIONS} + low_level_api/encryption_reader_writer.cc + low_level_api/encryption_reader_writer_all_crypto_options.cc) + +endif() + +if(UNIX) + foreach(FILE ${PARQUET_EXAMPLES_WARNING_SUPPRESSIONS}) + set_property(SOURCE ${FILE} + APPEND_STRING + PROPERTY COMPILE_FLAGS "-Wno-unused-variable") + endforeach() +endif() + +# Prefer shared linkage but use static if shared build is deactivated +if (ARROW_BUILD_SHARED) + set(PARQUET_EXAMPLE_LINK_LIBS parquet_shared) +else() + set(PARQUET_EXAMPLE_LINK_LIBS parquet_static) +endif() + +target_link_libraries(parquet_arrow_example ${PARQUET_EXAMPLE_LINK_LIBS}) +target_link_libraries(parquet_low_level_example ${PARQUET_EXAMPLE_LINK_LIBS}) +target_link_libraries(parquet_low_level_example2 ${PARQUET_EXAMPLE_LINK_LIBS}) +target_link_libraries(parquet_stream_api_example ${PARQUET_EXAMPLE_LINK_LIBS}) + +if(PARQUET_REQUIRE_ENCRYPTION) + target_link_libraries(parquet_encryption_example ${PARQUET_EXAMPLE_LINK_LIBS}) + target_link_libraries(parquet_encryption_example_all_crypto_options ${PARQUET_EXAMPLE_LINK_LIBS}) +endif() + +add_dependencies(parquet + parquet_low_level_example + parquet_low_level_example2 + parquet_arrow_example + parquet_stream_api_example) + +if (PARQUET_REQUIRE_ENCRYPTION) + add_dependencies(parquet + parquet_encryption_example + parquet_encryption_example_all_crypto_options) +endif() diff --git a/src/arrow/cpp/examples/parquet/low_level_api/encryption_reader_writer.cc b/src/arrow/cpp/examples/parquet/low_level_api/encryption_reader_writer.cc new file mode 100644 index 000000000..75788b283 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/low_level_api/encryption_reader_writer.cc @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <reader_writer.h> + +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> + +/* + * This file contains sample for writing and reading encrypted Parquet file with + * basic encryption configuration. + * + * A detailed description of the Parquet Modular Encryption specification can be found + * here: + * https://github.com/apache/parquet-format/blob/encryption/Encryption.md + * + * The write sample creates a file with eight columns where two of the columns and the + * footer are encrypted. + * + * The read sample decrypts using key retriever that holds the keys of two encrypted + * columns and the footer key. + */ + +constexpr int NUM_ROWS_PER_ROW_GROUP = 500; +const char* PARQUET_FILENAME = "parquet_cpp_example.parquet.encrypted"; +const char* kFooterEncryptionKey = "0123456789012345"; // 128bit/16 +const char* kColumnEncryptionKey1 = "1234567890123450"; +const char* kColumnEncryptionKey2 = "1234567890123451"; + +int main(int argc, char** argv) { + /********************************************************************************** + PARQUET ENCRYPTION WRITER EXAMPLE + **********************************************************************************/ + + try { + // Create a local file output stream instance. + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr<FileClass> out_file; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = SetupSchema(); + + // Add encryption properties + // Encryption configuration: Encrypt two columns and the footer. + std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> + encryption_cols; + + parquet::SchemaDescriptor schema_desc; + schema_desc.Init(schema); + auto column_path1 = schema_desc.Column(5)->path()->ToDotString(); + auto column_path2 = schema_desc.Column(4)->path()->ToDotString(); + + parquet::ColumnEncryptionProperties::Builder encryption_col_builder0(column_path1); + parquet::ColumnEncryptionProperties::Builder encryption_col_builder1(column_path2); + encryption_col_builder0.key(kColumnEncryptionKey1)->key_id("kc1"); + encryption_col_builder1.key(kColumnEncryptionKey2)->key_id("kc2"); + + encryption_cols[column_path1] = encryption_col_builder0.build(); + encryption_cols[column_path2] = encryption_col_builder1.build(); + + parquet::FileEncryptionProperties::Builder file_encryption_builder( + kFooterEncryptionKey); + + parquet::WriterProperties::Builder builder; + // Add the current encryption configuration to WriterProperties. + builder.encryption(file_encryption_builder.footer_key_metadata("kf") + ->encrypted_columns(encryption_cols) + ->build()); + + // Add other writer properties + builder.compression(parquet::Compression::SNAPPY); + + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a RowGroup with a specific number of rows. + parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // Write the Bool column + parquet::BoolWriter* bool_writer = + static_cast<parquet::BoolWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + bool value = ((i % 2) == 0) ? true : false; + bool_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int32 column + parquet::Int32Writer* int32_writer = + static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + int32_t value = i; + int32_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int64 column. Each row has repeats twice. + parquet::Int64Writer* int64_writer = + static_cast<parquet::Int64Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) { + int64_t value = i * 1000 * 1000; + value *= 1000 * 1000; + int16_t definition_level = 1; + int16_t repetition_level = 0; + if ((i % 2) == 0) { + repetition_level = 1; // start of a new record + } + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value); + } + + // Write the INT96 column. + parquet::Int96Writer* int96_writer = + static_cast<parquet::Int96Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::Int96 value; + value.value[0] = i; + value.value[1] = i + 1; + value.value[2] = i + 2; + int96_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Float column + parquet::FloatWriter* float_writer = + static_cast<parquet::FloatWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + float value = static_cast<float>(i) * 1.1f; + float_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Double column + parquet::DoubleWriter* double_writer = + static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + double value = i * 1.1111111; + double_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the ByteArray column. Make every alternate values NULL + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::ByteArray value; + char hello[FIXED_LENGTH] = "parquet"; + hello[7] = static_cast<char>(static_cast<int>('0') + i / 100); + hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10); + hello[9] = static_cast<char>(static_cast<int>('0') + i % 10); + if (i % 2 == 0) { + int16_t definition_level = 1; + value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]); + value.len = FIXED_LENGTH; + ba_writer->WriteBatch(1, &definition_level, nullptr, &value); + } else { + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + } + + // Write the FixedLengthByteArray column + parquet::FixedLenByteArrayWriter* flba_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::FixedLenByteArray value; + char v = static_cast<char>(i); + char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]); + + flba_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + /********************************************************************************** + PARQUET ENCRYPTION READER EXAMPLE + **********************************************************************************/ + + // Decryption configuration: Decrypt using key retriever callback that holds the keys + // of two encrypted columns and the footer key. + std::shared_ptr<parquet::StringKeyIdRetriever> string_kr1 = + std::make_shared<parquet::StringKeyIdRetriever>(); + string_kr1->PutKey("kf", kFooterEncryptionKey); + string_kr1->PutKey("kc1", kColumnEncryptionKey1); + string_kr1->PutKey("kc2", kColumnEncryptionKey2); + std::shared_ptr<parquet::DecryptionKeyRetriever> kr1 = + std::static_pointer_cast<parquet::StringKeyIdRetriever>(string_kr1); + + parquet::FileDecryptionProperties::Builder file_decryption_builder; + + try { + parquet::ReaderProperties reader_properties = parquet::default_reader_properties(); + + // Add the current decryption configuration to ReaderProperties. + reader_properties.file_decryption_properties( + file_decryption_builder.key_retriever(kr1)->build()); + + // Create a ParquetReader instance + std::unique_ptr<parquet::ParquetFileReader> parquet_reader = + parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false, reader_properties); + + // Get the File MetaData + std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); + + // Get the number of RowGroups + int num_row_groups = file_metadata->num_row_groups(); + assert(num_row_groups == 1); + + // Get the number of Columns + int num_columns = file_metadata->num_columns(); + assert(num_columns == 8); + + // Iterate over all the RowGroups in the file + for (int r = 0; r < num_row_groups; ++r) { + // Get the RowGroup Reader + std::shared_ptr<parquet::RowGroupReader> row_group_reader = + parquet_reader->RowGroup(r); + + int64_t values_read = 0; + int64_t rows_read = 0; + int16_t definition_level; + int16_t repetition_level; + int i; + std::shared_ptr<parquet::ColumnReader> column_reader; + + // Get the Column Reader for the boolean column + column_reader = row_group_reader->Column(0); + parquet::BoolReader* bool_reader = + static_cast<parquet::BoolReader*>(column_reader.get()); + + // Read all the rows in the column + i = 0; + while (bool_reader->HasNext()) { + bool value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + bool expected_value = ((i % 2) == 0) ? true : false; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Int32 column + column_reader = row_group_reader->Column(1); + parquet::Int32Reader* int32_reader = + static_cast<parquet::Int32Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int32_reader->HasNext()) { + int32_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + assert(value == i); + i++; + } + + // Get the Column Reader for the Int64 column + column_reader = row_group_reader->Column(2); + parquet::Int64Reader* int64_reader = + static_cast<parquet::Int64Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int64_reader->HasNext()) { + int64_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, + &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + int64_t expected_value = i * 1000 * 1000; + expected_value *= 1000 * 1000; + assert(value == expected_value); + if ((i % 2) == 0) { + assert(repetition_level == 1); + } else { + assert(repetition_level == 0); + } + i++; + } + + // Get the Column Reader for the Int96 column + column_reader = row_group_reader->Column(3); + parquet::Int96Reader* int96_reader = + static_cast<parquet::Int96Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int96_reader->HasNext()) { + parquet::Int96 value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + parquet::Int96 expected_value; + expected_value.value[0] = i; + expected_value.value[1] = i + 1; + expected_value.value[2] = i + 2; + for (int j = 0; j < 3; j++) { + assert(value.value[j] == expected_value.value[j]); + } + ARROW_UNUSED(expected_value); // suppress compiler warning in release builds + i++; + } + + // Get the Column Reader for the Float column + column_reader = row_group_reader->Column(4); + parquet::FloatReader* float_reader = + static_cast<parquet::FloatReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (float_reader->HasNext()) { + float value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + float expected_value = static_cast<float>(i) * 1.1f; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Double column + column_reader = row_group_reader->Column(5); + parquet::DoubleReader* double_reader = + static_cast<parquet::DoubleReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (double_reader->HasNext()) { + double value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + double expected_value = i * 1.1111111; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the ByteArray column + column_reader = row_group_reader->Column(6); + parquet::ByteArrayReader* ba_reader = + static_cast<parquet::ByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (ba_reader->HasNext()) { + parquet::ByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + ARROW_UNUSED(rows_read); // suppress compiler warning in release builds + // Verify the value written + char expected_value[FIXED_LENGTH] = "parquet"; + expected_value[7] = static_cast<char>('0' + i / 100); + expected_value[8] = static_cast<char>('0' + (i / 10) % 10); + expected_value[9] = static_cast<char>('0' + i % 10); + if (i % 2 == 0) { // only alternate values exist + // There are no NULL values in the rows written + assert(values_read == 1); + assert(value.len == FIXED_LENGTH); + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + assert(definition_level == 1); + } else { + // There are NULL values in the rows written + assert(values_read == 0); + assert(definition_level == 0); + } + ARROW_UNUSED(expected_value); // suppress compiler warning in release builds + i++; + } + + // Get the Column Reader for the FixedLengthByteArray column + column_reader = row_group_reader->Column(7); + parquet::FixedLenByteArrayReader* flba_reader = + static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (flba_reader->HasNext()) { + parquet::FixedLenByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + char v = static_cast<char>(i); + char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + i++; + } + } + } catch (const std::exception& e) { + std::cerr << "Parquet read error: " << e.what() << std::endl; + } + + std::cout << "Parquet Writing and Reading Complete" << std::endl; + return 0; +} diff --git a/src/arrow/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc b/src/arrow/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc new file mode 100644 index 000000000..5b01e0284 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc @@ -0,0 +1,656 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/io/file.h> +#include <arrow/util/logging.h> +#include <dirent.h> +#include <parquet/api/reader.h> +#include <parquet/api/writer.h> + +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> +#include <regex> +#include <sstream> + +/* + * This file contains samples for writing and reading encrypted Parquet files in different + * encryption and decryption configurations. + * Each sample section is dedicated to an independent configuration and shows its creation + * from beginning to end. + * The samples have the following goals: + * 1) Demonstrate usage of different options for data encryption and decryption. + * 2) Produce encrypted files for interoperability tests with other (eg parquet-mr) + * readers that support encryption. + * 3) Produce encrypted files with plaintext footer, for testing the ability of legacy + * readers to parse the footer and read unencrypted columns. + * 4) Perform interoperability tests with other (eg parquet-mr) writers, by reading + * encrypted files produced by these writers. + * + * Each write sample produces new independent parquet file, encrypted with a different + * encryption configuration as described below. + * The name of each file is in the form of: + * tester<encryption config number>.parquet.encrypted. + * + * The read sample creates a set of decryption configurations and then uses each of them + * to read all encrypted files in the input directory. + * + * The different encryption and decryption configurations are listed below. + * + * Usage: ./encryption-interop-tests <write/read> <path-to-directory-of-parquet-files> + * + * A detailed description of the Parquet Modular Encryption specification can be found + * here: + * https://github.com/apache/parquet-format/blob/encryption/Encryption.md + * + * The write sample creates files with four columns in the following + * encryption configurations: + * + * - Encryption configuration 1: Encrypt all columns and the footer with the same key. + * (uniform encryption) + * - Encryption configuration 2: Encrypt two columns and the footer, with different + * keys. + * - Encryption configuration 3: Encrypt two columns, with different keys. + * Don’t encrypt footer (to enable legacy readers) + * - plaintext footer mode. + * - Encryption configuration 4: Encrypt two columns and the footer, with different + * keys. Supply aad_prefix for file identity + * verification. + * - Encryption configuration 5: Encrypt two columns and the footer, with different + * keys. Supply aad_prefix, and call + * disable_aad_prefix_storage to prevent file + * identity storage in file metadata. + * - Encryption configuration 6: Encrypt two columns and the footer, with different + * keys. Use the alternative (AES_GCM_CTR_V1) algorithm. + * + * The read sample uses each of the following decryption configurations to read every + * encrypted files in the input directory: + * + * - Decryption configuration 1: Decrypt using key retriever that holds the keys of + * two encrypted columns and the footer key. + * - Decryption configuration 2: Decrypt using key retriever that holds the keys of + * two encrypted columns and the footer key. Supplies + * aad_prefix to verify file identity. + * - Decryption configuration 3: Decrypt using explicit column and footer keys + * (instead of key retrieval callback). + */ + +constexpr int NUM_ROWS_PER_ROW_GROUP = 500; + +const char* kFooterEncryptionKey = "0123456789012345"; // 128bit/16 +const char* kColumnEncryptionKey1 = "1234567890123450"; +const char* kColumnEncryptionKey2 = "1234567890123451"; +const char* fileName = "tester"; + +using FileClass = ::arrow::io::FileOutputStream; +using parquet::ConvertedType; +using parquet::Repetition; +using parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +void PrintDecryptionConfiguration(int configuration); +// Check that the decryption result is as expected. +void CheckResult(std::string file, int example_id, std::string exception_msg); +// Returns true if FileName ends with suffix. Otherwise returns false. +// Used to skip unencrypted parquet files. +bool FileNameEndsWith(std::string file_name, std::string suffix); + +std::vector<std::string> GetDirectoryFiles(const std::string& path) { + std::vector<std::string> files; + struct dirent* entry; + DIR* dir = opendir(path.c_str()); + + if (dir == NULL) { + exit(-1); + } + while ((entry = readdir(dir)) != NULL) { + files.push_back(std::string(entry->d_name)); + } + closedir(dir); + return files; +} + +static std::shared_ptr<GroupNode> SetupSchema() { + parquet::schema::NodeVector fields; + // Create a primitive node named 'boolean_field' with type:BOOLEAN, + // repetition:REQUIRED + fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED, + Type::BOOLEAN, ConvertedType::NONE)); + + // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED, + // logical type:TIME_MILLIS + fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32, + ConvertedType::TIME_MILLIS)); + + fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT, + ConvertedType::NONE)); + + fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE, + ConvertedType::NONE)); + + // Create a GroupNode named 'schema' using the primitive nodes defined above + // This GroupNode is the root node of the schema tree + return std::static_pointer_cast<GroupNode>( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); +} + +void InteropTestWriteEncryptedParquetFiles(std::string root_path) { + /********************************************************************************** + Creating a number of Encryption configurations + **********************************************************************************/ + + // This vector will hold various encryption configurations. + std::vector<std::shared_ptr<parquet::FileEncryptionProperties>> + vector_of_encryption_configurations; + + // Encryption configuration 1: Encrypt all columns and the footer with the same key. + // (uniform encryption) + parquet::FileEncryptionProperties::Builder file_encryption_builder_1( + kFooterEncryptionKey); + // Add to list of encryption configurations. + vector_of_encryption_configurations.push_back( + file_encryption_builder_1.footer_key_metadata("kf")->build()); + + // Encryption configuration 2: Encrypt two columns and the footer, with different keys. + std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> + encryption_cols2; + std::string path1 = "double_field"; + std::string path2 = "float_field"; + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_20(path1); + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_21(path2); + encryption_col_builder_20.key(kColumnEncryptionKey1)->key_id("kc1"); + encryption_col_builder_21.key(kColumnEncryptionKey2)->key_id("kc2"); + + encryption_cols2[path1] = encryption_col_builder_20.build(); + encryption_cols2[path2] = encryption_col_builder_21.build(); + + parquet::FileEncryptionProperties::Builder file_encryption_builder_2( + kFooterEncryptionKey); + + vector_of_encryption_configurations.push_back( + file_encryption_builder_2.footer_key_metadata("kf") + ->encrypted_columns(encryption_cols2) + ->build()); + + // Encryption configuration 3: Encrypt two columns, with different keys. + // Don’t encrypt footer. + // (plaintext footer mode, readable by legacy readers) + std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> + encryption_cols3; + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_30(path1); + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_31(path2); + encryption_col_builder_30.key(kColumnEncryptionKey1)->key_id("kc1"); + encryption_col_builder_31.key(kColumnEncryptionKey2)->key_id("kc2"); + + encryption_cols3[path1] = encryption_col_builder_30.build(); + encryption_cols3[path2] = encryption_col_builder_31.build(); + parquet::FileEncryptionProperties::Builder file_encryption_builder_3( + kFooterEncryptionKey); + + vector_of_encryption_configurations.push_back( + file_encryption_builder_3.footer_key_metadata("kf") + ->encrypted_columns(encryption_cols3) + ->set_plaintext_footer() + ->build()); + + // Encryption configuration 4: Encrypt two columns and the footer, with different keys. + // Use aad_prefix. + std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> + encryption_cols4; + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_40(path1); + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_41(path2); + encryption_col_builder_40.key(kColumnEncryptionKey1)->key_id("kc1"); + encryption_col_builder_41.key(kColumnEncryptionKey2)->key_id("kc2"); + + encryption_cols4[path1] = encryption_col_builder_40.build(); + encryption_cols4[path2] = encryption_col_builder_41.build(); + parquet::FileEncryptionProperties::Builder file_encryption_builder_4( + kFooterEncryptionKey); + + vector_of_encryption_configurations.push_back( + file_encryption_builder_4.footer_key_metadata("kf") + ->encrypted_columns(encryption_cols4) + ->aad_prefix(fileName) + ->build()); + + // Encryption configuration 5: Encrypt two columns and the footer, with different keys. + // Use aad_prefix and disable_aad_prefix_storage. + std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> + encryption_cols5; + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_50(path1); + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_51(path2); + encryption_col_builder_50.key(kColumnEncryptionKey1)->key_id("kc1"); + encryption_col_builder_51.key(kColumnEncryptionKey2)->key_id("kc2"); + + encryption_cols5[path1] = encryption_col_builder_50.build(); + encryption_cols5[path2] = encryption_col_builder_51.build(); + parquet::FileEncryptionProperties::Builder file_encryption_builder_5( + kFooterEncryptionKey); + + vector_of_encryption_configurations.push_back( + file_encryption_builder_5.encrypted_columns(encryption_cols5) + ->footer_key_metadata("kf") + ->aad_prefix(fileName) + ->disable_aad_prefix_storage() + ->build()); + + // Encryption configuration 6: Encrypt two columns and the footer, with different keys. + // Use AES_GCM_CTR_V1 algorithm. + std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> + encryption_cols6; + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_60(path1); + parquet::ColumnEncryptionProperties::Builder encryption_col_builder_61(path2); + encryption_col_builder_60.key(kColumnEncryptionKey1)->key_id("kc1"); + encryption_col_builder_61.key(kColumnEncryptionKey2)->key_id("kc2"); + + encryption_cols6[path1] = encryption_col_builder_60.build(); + encryption_cols6[path2] = encryption_col_builder_61.build(); + parquet::FileEncryptionProperties::Builder file_encryption_builder_6( + kFooterEncryptionKey); + + vector_of_encryption_configurations.push_back( + file_encryption_builder_6.footer_key_metadata("kf") + ->encrypted_columns(encryption_cols6) + ->algorithm(parquet::ParquetCipher::AES_GCM_CTR_V1) + ->build()); + + /********************************************************************************** + PARQUET WRITER EXAMPLE + **********************************************************************************/ + + // Iterate over the encryption configurations and for each one write a parquet file. + for (unsigned example_id = 0; example_id < vector_of_encryption_configurations.size(); + ++example_id) { + std::stringstream ss; + ss << example_id + 1; + std::string test_number_string = ss.str(); + try { + // Create a local file output stream instance. + std::shared_ptr<FileClass> out_file; + std::string file = + root_path + fileName + std::string(test_number_string) + ".parquet.encrypted"; + std::cout << "Write " << file << std::endl; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(file)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = SetupSchema(); + + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + + // Add the current encryption configuration to WriterProperties. + builder.encryption(vector_of_encryption_configurations[example_id]); + + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a RowGroup with a specific number of rows. + parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // Write the Bool column + parquet::BoolWriter* bool_writer = + static_cast<parquet::BoolWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + bool value = ((i % 2) == 0) ? true : false; + bool_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int32 column + parquet::Int32Writer* int32_writer = + static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + int32_t value = i; + int32_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Float column + parquet::FloatWriter* float_writer = + static_cast<parquet::FloatWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + float value = static_cast<float>(i) * 1.1f; + float_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Double column + parquet::DoubleWriter* double_writer = + static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + double value = i * 1.1111111; + double_writer->WriteBatch(1, nullptr, nullptr, &value); + } + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return; + } + } +} + +void InteropTestReadEncryptedParquetFiles(std::string root_path) { + std::vector<std::string> files_in_directory = GetDirectoryFiles(root_path); + + /********************************************************************************** + Creating a number of Decryption configurations + **********************************************************************************/ + + // This vector will hold various decryption configurations. + std::vector<std::shared_ptr<parquet::FileDecryptionProperties>> + vector_of_decryption_configurations; + + // Decryption configuration 1: Decrypt using key retriever callback that holds the keys + // of two encrypted columns and the footer key. + std::shared_ptr<parquet::StringKeyIdRetriever> string_kr1 = + std::make_shared<parquet::StringKeyIdRetriever>(); + string_kr1->PutKey("kf", kFooterEncryptionKey); + string_kr1->PutKey("kc1", kColumnEncryptionKey1); + string_kr1->PutKey("kc2", kColumnEncryptionKey2); + std::shared_ptr<parquet::DecryptionKeyRetriever> kr1 = + std::static_pointer_cast<parquet::StringKeyIdRetriever>(string_kr1); + + parquet::FileDecryptionProperties::Builder file_decryption_builder_1; + vector_of_decryption_configurations.push_back( + file_decryption_builder_1.key_retriever(kr1)->build()); + + // Decryption configuration 2: Decrypt using key retriever callback that holds the keys + // of two encrypted columns and the footer key. Supply aad_prefix. + std::shared_ptr<parquet::StringKeyIdRetriever> string_kr2 = + std::make_shared<parquet::StringKeyIdRetriever>(); + string_kr2->PutKey("kf", kFooterEncryptionKey); + string_kr2->PutKey("kc1", kColumnEncryptionKey1); + string_kr2->PutKey("kc2", kColumnEncryptionKey2); + std::shared_ptr<parquet::DecryptionKeyRetriever> kr2 = + std::static_pointer_cast<parquet::StringKeyIdRetriever>(string_kr2); + + parquet::FileDecryptionProperties::Builder file_decryption_builder_2; + vector_of_decryption_configurations.push_back( + file_decryption_builder_2.key_retriever(kr2)->aad_prefix(fileName)->build()); + + // Decryption configuration 3: Decrypt using explicit column and footer keys. + std::string path_double = "double_field"; + std::string path_float = "float_field"; + std::map<std::string, std::shared_ptr<parquet::ColumnDecryptionProperties>> + decryption_cols; + parquet::ColumnDecryptionProperties::Builder decryption_col_builder31(path_double); + parquet::ColumnDecryptionProperties::Builder decryption_col_builder32(path_float); + + decryption_cols[path_double] = + decryption_col_builder31.key(kColumnEncryptionKey1)->build(); + decryption_cols[path_float] = + decryption_col_builder32.key(kColumnEncryptionKey2)->build(); + + parquet::FileDecryptionProperties::Builder file_decryption_builder_3; + vector_of_decryption_configurations.push_back( + file_decryption_builder_3.footer_key(kFooterEncryptionKey) + ->column_keys(decryption_cols) + ->build()); + + /********************************************************************************** + PARQUET READER EXAMPLE + **********************************************************************************/ + + // Iterate over the decryption configurations and use each one to read every files + // in the input directory. + for (unsigned example_id = 0; example_id < vector_of_decryption_configurations.size(); + ++example_id) { + PrintDecryptionConfiguration(example_id + 1); + for (auto const& file : files_in_directory) { + std::string exception_msg = ""; + if (!FileNameEndsWith(file, "parquet.encrypted")) // Skip non encrypted files + continue; + try { + std::cout << "--> Read file " << file << std::endl; + + parquet::ReaderProperties reader_properties = + parquet::default_reader_properties(); + + // Add the current decryption configuration to ReaderProperties. + reader_properties.file_decryption_properties( + vector_of_decryption_configurations[example_id]->DeepClone()); + + // Create a ParquetReader instance + std::unique_ptr<parquet::ParquetFileReader> parquet_reader = + parquet::ParquetFileReader::OpenFile(root_path + file, false, + reader_properties); + + // Get the File MetaData + std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); + + // Get the number of RowGroups + int num_row_groups = file_metadata->num_row_groups(); + assert(num_row_groups == 1); + + // Get the number of Columns + int num_columns = file_metadata->num_columns(); + assert(num_columns == 4); + + // Iterate over all the RowGroups in the file + for (int r = 0; r < num_row_groups; ++r) { + // Get the RowGroup Reader + std::shared_ptr<parquet::RowGroupReader> row_group_reader = + parquet_reader->RowGroup(r); + + int64_t values_read = 0; + int64_t rows_read = 0; + int i; + std::shared_ptr<parquet::ColumnReader> column_reader; + + // Get the Column Reader for the boolean column + column_reader = row_group_reader->Column(0); + parquet::BoolReader* bool_reader = + static_cast<parquet::BoolReader*>(column_reader.get()); + + // Read all the rows in the column + i = 0; + while (bool_reader->HasNext()) { + bool value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + bool expected_value = ((i % 2) == 0) ? true : false; + assert(value == expected_value); + i++; + } + ARROW_UNUSED(rows_read); // suppress compiler warning in release builds + + // Get the Column Reader for the Int32 column + column_reader = row_group_reader->Column(1); + parquet::Int32Reader* int32_reader = + static_cast<parquet::Int32Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int32_reader->HasNext()) { + int32_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + assert(value == i); + i++; + } + + // Get the Column Reader for the Float column + column_reader = row_group_reader->Column(2); + parquet::FloatReader* float_reader = + static_cast<parquet::FloatReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (float_reader->HasNext()) { + float value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + float expected_value = static_cast<float>(i) * 1.1f; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Double column + column_reader = row_group_reader->Column(3); + parquet::DoubleReader* double_reader = + static_cast<parquet::DoubleReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (double_reader->HasNext()) { + double value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + double expected_value = i * 1.1111111; + assert(value == expected_value); + i++; + } + } + } catch (const std::exception& e) { + exception_msg = e.what(); + } + CheckResult(file, example_id, exception_msg); + std::cout << "file [" << file << "] Parquet Reading Complete" << std::endl; + } + } +} + +void PrintDecryptionConfiguration(int configuration) { + std::cout << "\n\nDecryption configuration "; + if (configuration == 1) { + std::cout << "1: \n\nDecrypt using key retriever that holds" + " the keys of two encrypted columns and the footer key." + << std::endl; + } else if (configuration == 2) { + std::cout << "2: \n\nDecrypt using key retriever that holds" + " the keys of two encrypted columns and the footer key. Pass aad_prefix." + << std::endl; + } else if (configuration == 3) { + std::cout << "3: \n\nDecrypt using explicit column and footer keys." << std::endl; + } else { + std::cout << "Unknown configuration" << std::endl; + exit(-1); + } + std::cout << std::endl; +} + +// Check that the decryption result is as expected. +void CheckResult(std::string file, int example_id, std::string exception_msg) { + int encryption_configuration_number; + std::regex r("tester([0-9]+)\\.parquet.encrypted"); + std::smatch m; + std::regex_search(file, m, r); + if (m.size() == 0) { + std::cerr + << "Error: Error parsing filename to extract encryption configuration number. " + << std::endl; + } + std::string encryption_configuration_number_str = m.str(1); + encryption_configuration_number = atoi(encryption_configuration_number_str.c_str()); + if (encryption_configuration_number < 1 || encryption_configuration_number > 6) { + std::cerr << "Error: Unknown encryption configuration number. " << std::endl; + } + + int decryption_configuration_number = example_id + 1; + + // Encryption_configuration number five contains aad_prefix and + // disable_aad_prefix_storage. + // An exception is expected to be thrown if the file is not decrypted with aad_prefix. + if (encryption_configuration_number == 5) { + if (decryption_configuration_number == 1 || decryption_configuration_number == 3) { + std::size_t found = exception_msg.find("AAD"); + if (found == std::string::npos) + std::cout << "Error: Expecting AAD related exception."; + return; + } + } + // Decryption configuration number two contains aad_prefix. An exception is expected to + // be thrown if the file was not encrypted with the same aad_prefix. + if (decryption_configuration_number == 2) { + if (encryption_configuration_number != 5 && encryption_configuration_number != 4) { + std::size_t found = exception_msg.find("AAD"); + if (found == std::string::npos) { + std::cout << "Error: Expecting AAD related exception." << std::endl; + } + return; + } + } + if (!exception_msg.empty()) + std::cout << "Error: Unexpected exception was thrown." << exception_msg; +} + +bool FileNameEndsWith(std::string file_name, std::string suffix) { + std::string::size_type idx = file_name.find_first_of('.'); + + if (idx != std::string::npos) { + std::string extension = file_name.substr(idx + 1); + if (extension.compare(suffix) == 0) return true; + } + return false; +} + +int main(int argc, char** argv) { + enum Operation { write, read }; + std::string root_path; + Operation operation = write; + if (argc < 3) { + std::cout << "Usage: encryption-reader-writer-all-crypto-options <read/write> " + "<Path-to-parquet-files>" + << std::endl; + exit(1); + } + root_path = argv[1]; + if (root_path.compare("read") == 0) { + operation = read; + } + + root_path = argv[2]; + std::cout << "Root path is: " << root_path << std::endl; + + if (operation == write) { + InteropTestWriteEncryptedParquetFiles(root_path); + } else { + InteropTestReadEncryptedParquetFiles(root_path); + } + return 0; +} diff --git a/src/arrow/cpp/examples/parquet/low_level_api/reader_writer.cc b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer.cc new file mode 100644 index 000000000..09af32289 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer.cc @@ -0,0 +1,413 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <reader_writer.h> + +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> + +/* + * This example describes writing and reading Parquet Files in C++ and serves as a + * reference to the API. + * The file contains all the physical data types supported by Parquet. + * This example uses the RowGroupWriter API that supports writing RowGroups optimized for + * memory consumption. + **/ + +/* Parquet is a structured columnar file format + * Parquet File = "Parquet data" + "Parquet Metadata" + * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a + * columnar layout + * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their + * Columns + * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a + * complex (nested) type (internal nodes) + * For specific details, please refer the format here: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + **/ + +constexpr int NUM_ROWS_PER_ROW_GROUP = 500; +const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet"; + +int main(int argc, char** argv) { + /********************************************************************************** + PARQUET WRITER EXAMPLE + **********************************************************************************/ + // parquet::REQUIRED fields do not need definition and repetition level values + // parquet::OPTIONAL fields require only definition level values + // parquet::REPEATED fields require both definition and repetition level values + try { + // Create a local file output stream instance. + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr<FileClass> out_file; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = SetupSchema(); + + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a RowGroup with a specific number of rows. + parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // Write the Bool column + parquet::BoolWriter* bool_writer = + static_cast<parquet::BoolWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + bool value = ((i % 2) == 0) ? true : false; + bool_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int32 column + parquet::Int32Writer* int32_writer = + static_cast<parquet::Int32Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + int32_t value = i; + int32_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int64 column. Each row has repeats twice. + parquet::Int64Writer* int64_writer = + static_cast<parquet::Int64Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) { + int64_t value = i * 1000 * 1000; + value *= 1000 * 1000; + int16_t definition_level = 1; + int16_t repetition_level = 0; + if ((i % 2) == 0) { + repetition_level = 1; // start of a new record + } + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value); + } + + // Write the INT96 column. + parquet::Int96Writer* int96_writer = + static_cast<parquet::Int96Writer*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::Int96 value; + value.value[0] = i; + value.value[1] = i + 1; + value.value[2] = i + 2; + int96_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Float column + parquet::FloatWriter* float_writer = + static_cast<parquet::FloatWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + float value = static_cast<float>(i) * 1.1f; + float_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Double column + parquet::DoubleWriter* double_writer = + static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + double value = i * 1.1111111; + double_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the ByteArray column. Make every alternate values NULL + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::ByteArray value; + char hello[FIXED_LENGTH] = "parquet"; + hello[7] = static_cast<char>(static_cast<int>('0') + i / 100); + hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10); + hello[9] = static_cast<char>(static_cast<int>('0') + i % 10); + if (i % 2 == 0) { + int16_t definition_level = 1; + value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]); + value.len = FIXED_LENGTH; + ba_writer->WriteBatch(1, &definition_level, nullptr, &value); + } else { + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + } + + // Write the FixedLengthByteArray column + parquet::FixedLenByteArrayWriter* flba_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::FixedLenByteArray value; + char v = static_cast<char>(i); + char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]); + + flba_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + /********************************************************************************** + PARQUET READER EXAMPLE + **********************************************************************************/ + + try { + // Create a ParquetReader instance + std::unique_ptr<parquet::ParquetFileReader> parquet_reader = + parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false); + + // Get the File MetaData + std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); + + // Get the number of RowGroups + int num_row_groups = file_metadata->num_row_groups(); + assert(num_row_groups == 1); + + // Get the number of Columns + int num_columns = file_metadata->num_columns(); + assert(num_columns == 8); + + // Iterate over all the RowGroups in the file + for (int r = 0; r < num_row_groups; ++r) { + // Get the RowGroup Reader + std::shared_ptr<parquet::RowGroupReader> row_group_reader = + parquet_reader->RowGroup(r); + + int64_t values_read = 0; + int64_t rows_read = 0; + int16_t definition_level; + int16_t repetition_level; + int i; + std::shared_ptr<parquet::ColumnReader> column_reader; + + ARROW_UNUSED(rows_read); // prevent warning in release build + + // Get the Column Reader for the boolean column + column_reader = row_group_reader->Column(0); + parquet::BoolReader* bool_reader = + static_cast<parquet::BoolReader*>(column_reader.get()); + + // Read all the rows in the column + i = 0; + while (bool_reader->HasNext()) { + bool value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + bool expected_value = ((i % 2) == 0) ? true : false; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Int32 column + column_reader = row_group_reader->Column(1); + parquet::Int32Reader* int32_reader = + static_cast<parquet::Int32Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int32_reader->HasNext()) { + int32_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + assert(value == i); + i++; + } + + // Get the Column Reader for the Int64 column + column_reader = row_group_reader->Column(2); + parquet::Int64Reader* int64_reader = + static_cast<parquet::Int64Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int64_reader->HasNext()) { + int64_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, + &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + int64_t expected_value = i * 1000 * 1000; + expected_value *= 1000 * 1000; + assert(value == expected_value); + if ((i % 2) == 0) { + assert(repetition_level == 1); + } else { + assert(repetition_level == 0); + } + i++; + } + + // Get the Column Reader for the Int96 column + column_reader = row_group_reader->Column(3); + parquet::Int96Reader* int96_reader = + static_cast<parquet::Int96Reader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int96_reader->HasNext()) { + parquet::Int96 value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + parquet::Int96 expected_value; + ARROW_UNUSED(expected_value); // prevent warning in release build + expected_value.value[0] = i; + expected_value.value[1] = i + 1; + expected_value.value[2] = i + 2; + for (int j = 0; j < 3; j++) { + assert(value.value[j] == expected_value.value[j]); + } + i++; + } + + // Get the Column Reader for the Float column + column_reader = row_group_reader->Column(4); + parquet::FloatReader* float_reader = + static_cast<parquet::FloatReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (float_reader->HasNext()) { + float value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + float expected_value = static_cast<float>(i) * 1.1f; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Double column + column_reader = row_group_reader->Column(5); + parquet::DoubleReader* double_reader = + static_cast<parquet::DoubleReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (double_reader->HasNext()) { + double value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + double expected_value = i * 1.1111111; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the ByteArray column + column_reader = row_group_reader->Column(6); + parquet::ByteArrayReader* ba_reader = + static_cast<parquet::ByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (ba_reader->HasNext()) { + parquet::ByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // Verify the value written + char expected_value[FIXED_LENGTH] = "parquet"; + ARROW_UNUSED(expected_value); // prevent warning in release build + expected_value[7] = static_cast<char>('0' + i / 100); + expected_value[8] = static_cast<char>('0' + (i / 10) % 10); + expected_value[9] = static_cast<char>('0' + i % 10); + if (i % 2 == 0) { // only alternate values exist + // There are no NULL values in the rows written + assert(values_read == 1); + assert(value.len == FIXED_LENGTH); + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + assert(definition_level == 1); + } else { + // There are NULL values in the rows written + assert(values_read == 0); + assert(definition_level == 0); + } + i++; + } + + // Get the Column Reader for the FixedLengthByteArray column + column_reader = row_group_reader->Column(7); + parquet::FixedLenByteArrayReader* flba_reader = + static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + i = 0; + while (flba_reader->HasNext()) { + parquet::FixedLenByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + char v = static_cast<char>(i); + char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + i++; + } + } + } catch (const std::exception& e) { + std::cerr << "Parquet read error: " << e.what() << std::endl; + return -1; + } + + std::cout << "Parquet Writing and Reading Complete" << std::endl; + + return 0; +} diff --git a/src/arrow/cpp/examples/parquet/low_level_api/reader_writer.h b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer.h new file mode 100644 index 000000000..ed8e74653 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/io/file.h> +#include <arrow/util/logging.h> +#include <parquet/api/reader.h> +#include <parquet/api/writer.h> + +using parquet::ConvertedType; +using parquet::Repetition; +using parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +constexpr int FIXED_LENGTH = 10; + +static std::shared_ptr<GroupNode> SetupSchema() { + parquet::schema::NodeVector fields; + // Create a primitive node named 'boolean_field' with type:BOOLEAN, + // repetition:REQUIRED + fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED, + Type::BOOLEAN, ConvertedType::NONE)); + + // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED, + // logical type:TIME_MILLIS + fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32, + ConvertedType::TIME_MILLIS)); + + // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED + fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64, + ConvertedType::NONE)); + + fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96, + ConvertedType::NONE)); + + fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT, + ConvertedType::NONE)); + + fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE, + ConvertedType::NONE)); + + // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL + fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY, + ConvertedType::NONE)); + + // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY, + // repetition:REQUIRED, field_length = FIXED_LENGTH + fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, + FIXED_LENGTH)); + + // Create a GroupNode named 'schema' using the primitive nodes defined above + // This GroupNode is the root node of the schema tree + return std::static_pointer_cast<GroupNode>( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); +} diff --git a/src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc new file mode 100644 index 000000000..65dd5799e --- /dev/null +++ b/src/arrow/cpp/examples/parquet/low_level_api/reader_writer2.cc @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <reader_writer.h> + +#include <cassert> +#include <fstream> +#include <iostream> +#include <memory> + +/* + * This example describes writing and reading Parquet Files in C++ and serves as a + * reference to the API. + * The file contains all the physical data types supported by Parquet. + * This example uses the RowGroupWriter API that supports writing RowGroups based on a + * certain size. + **/ + +/* Parquet is a structured columnar file format + * Parquet File = "Parquet data" + "Parquet Metadata" + * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a + * columnar layout + * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their + * Columns + * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a + * complex (nested) type (internal nodes) + * For specific details, please refer the format here: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + **/ + +constexpr int NUM_ROWS = 2500000; +constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB +const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet"; + +int main(int argc, char** argv) { + /********************************************************************************** + PARQUET WRITER EXAMPLE + **********************************************************************************/ + // parquet::REQUIRED fields do not need definition and repetition level values + // parquet::OPTIONAL fields require only definition level values + // parquet::REPEATED fields require both definition and repetition level values + try { + // Create a local file output stream instance. + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr<FileClass> out_file; + PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); + + // Setup the parquet schema + std::shared_ptr<GroupNode> schema = SetupSchema(); + + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr<parquet::WriterProperties> props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr<parquet::ParquetFileWriter> file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a BufferedRowGroup to keep the RowGroup open until a certain size + parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); + + int num_columns = file_writer->num_columns(); + std::vector<int64_t> buffered_values_estimate(num_columns, 0); + for (int i = 0; i < NUM_ROWS; i++) { + int64_t estimated_bytes = 0; + // Get the estimated size of the values that are not written to a page yet + for (int n = 0; n < num_columns; n++) { + estimated_bytes += buffered_values_estimate[n]; + } + + // We need to consider the compressed pages + // as well as the values that are not compressed yet + if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() + + estimated_bytes) > ROW_GROUP_SIZE) { + rg_writer->Close(); + std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); + rg_writer = file_writer->AppendBufferedRowGroup(); + } + + int col_id = 0; + // Write the Bool column + parquet::BoolWriter* bool_writer = + static_cast<parquet::BoolWriter*>(rg_writer->column(col_id)); + bool bool_value = ((i % 2) == 0) ? true : false; + bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value); + buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes(); + + // Write the Int32 column + col_id++; + parquet::Int32Writer* int32_writer = + static_cast<parquet::Int32Writer*>(rg_writer->column(col_id)); + int32_t int32_value = i; + int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value); + buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes(); + + // Write the Int64 column. Each row has repeats twice. + col_id++; + parquet::Int64Writer* int64_writer = + static_cast<parquet::Int64Writer*>(rg_writer->column(col_id)); + int64_t int64_value1 = 2 * i; + int16_t definition_level = 1; + int16_t repetition_level = 0; + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1); + int64_t int64_value2 = (2 * i + 1); + repetition_level = 1; // start of a new record + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2); + buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes(); + + // Write the INT96 column. + col_id++; + parquet::Int96Writer* int96_writer = + static_cast<parquet::Int96Writer*>(rg_writer->column(col_id)); + parquet::Int96 int96_value; + int96_value.value[0] = i; + int96_value.value[1] = i + 1; + int96_value.value[2] = i + 2; + int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value); + buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes(); + + // Write the Float column + col_id++; + parquet::FloatWriter* float_writer = + static_cast<parquet::FloatWriter*>(rg_writer->column(col_id)); + float float_value = static_cast<float>(i) * 1.1f; + float_writer->WriteBatch(1, nullptr, nullptr, &float_value); + buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes(); + + // Write the Double column + col_id++; + parquet::DoubleWriter* double_writer = + static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id)); + double double_value = i * 1.1111111; + double_writer->WriteBatch(1, nullptr, nullptr, &double_value); + buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes(); + + // Write the ByteArray column. Make every alternate values NULL + col_id++; + parquet::ByteArrayWriter* ba_writer = + static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id)); + parquet::ByteArray ba_value; + char hello[FIXED_LENGTH] = "parquet"; + hello[7] = static_cast<char>(static_cast<int>('0') + i / 100); + hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10); + hello[9] = static_cast<char>(static_cast<int>('0') + i % 10); + if (i % 2 == 0) { + int16_t definition_level = 1; + ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]); + ba_value.len = FIXED_LENGTH; + ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); + } else { + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); + + // Write the FixedLengthByteArray column + col_id++; + parquet::FixedLenByteArrayWriter* flba_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id)); + parquet::FixedLenByteArray flba_value; + char v = static_cast<char>(i); + char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]); + + flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value); + buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes(); + } + + // Close the RowGroupWriter + rg_writer->Close(); + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + /********************************************************************************** + PARQUET READER EXAMPLE + **********************************************************************************/ + + try { + // Create a ParquetReader instance + std::unique_ptr<parquet::ParquetFileReader> parquet_reader = + parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false); + + // Get the File MetaData + std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); + + int num_row_groups = file_metadata->num_row_groups(); + + // Get the number of Columns + int num_columns = file_metadata->num_columns(); + assert(num_columns == 8); + + std::vector<int> col_row_counts(num_columns, 0); + + // Iterate over all the RowGroups in the file + for (int r = 0; r < num_row_groups; ++r) { + // Get the RowGroup Reader + std::shared_ptr<parquet::RowGroupReader> row_group_reader = + parquet_reader->RowGroup(r); + + assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE); + + int64_t values_read = 0; + int64_t rows_read = 0; + int16_t definition_level; + int16_t repetition_level; + std::shared_ptr<parquet::ColumnReader> column_reader; + int col_id = 0; + + ARROW_UNUSED(rows_read); // prevent warning in release build + + // Get the Column Reader for the boolean column + column_reader = row_group_reader->Column(col_id); + parquet::BoolReader* bool_reader = + static_cast<parquet::BoolReader*>(column_reader.get()); + + // Read all the rows in the column + while (bool_reader->HasNext()) { + bool value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false; + assert(value == expected_value); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Int32 column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::Int32Reader* int32_reader = + static_cast<parquet::Int32Reader*>(column_reader.get()); + // Read all the rows in the column + while (int32_reader->HasNext()) { + int32_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + assert(value == col_row_counts[col_id]); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Int64 column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::Int64Reader* int64_reader = + static_cast<parquet::Int64Reader*>(column_reader.get()); + // Read all the rows in the column + while (int64_reader->HasNext()) { + int64_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, + &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + int64_t expected_value = col_row_counts[col_id]; + assert(value == expected_value); + if ((col_row_counts[col_id] % 2) == 0) { + assert(repetition_level == 0); + } else { + assert(repetition_level == 1); + } + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Int96 column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::Int96Reader* int96_reader = + static_cast<parquet::Int96Reader*>(column_reader.get()); + // Read all the rows in the column + while (int96_reader->HasNext()) { + parquet::Int96 value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + parquet::Int96 expected_value; + ARROW_UNUSED(expected_value); // prevent warning in release build + expected_value.value[0] = col_row_counts[col_id]; + expected_value.value[1] = col_row_counts[col_id] + 1; + expected_value.value[2] = col_row_counts[col_id] + 2; + for (int j = 0; j < 3; j++) { + assert(value.value[j] == expected_value.value[j]); + } + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Float column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::FloatReader* float_reader = + static_cast<parquet::FloatReader*>(column_reader.get()); + // Read all the rows in the column + while (float_reader->HasNext()) { + float value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f; + assert(value == expected_value); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the Double column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::DoubleReader* double_reader = + static_cast<parquet::DoubleReader*>(column_reader.get()); + // Read all the rows in the column + while (double_reader->HasNext()) { + double value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + double expected_value = col_row_counts[col_id] * 1.1111111; + assert(value == expected_value); + col_row_counts[col_id]++; + } + + // Get the Column Reader for the ByteArray column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::ByteArrayReader* ba_reader = + static_cast<parquet::ByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + while (ba_reader->HasNext()) { + parquet::ByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // Verify the value written + char expected_value[FIXED_LENGTH] = "parquet"; + ARROW_UNUSED(expected_value); // prevent warning in release build + expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100); + expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10); + expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10); + if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist + // There are no NULL values in the rows written + assert(values_read == 1); + assert(value.len == FIXED_LENGTH); + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + assert(definition_level == 1); + } else { + // There are NULL values in the rows written + assert(values_read == 0); + assert(definition_level == 0); + } + col_row_counts[col_id]++; + } + + // Get the Column Reader for the FixedLengthByteArray column + col_id++; + column_reader = row_group_reader->Column(col_id); + parquet::FixedLenByteArrayReader* flba_reader = + static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get()); + // Read all the rows in the column + while (flba_reader->HasNext()) { + parquet::FixedLenByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + char v = static_cast<char>(col_row_counts[col_id]); + char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + col_row_counts[col_id]++; + } + } + } catch (const std::exception& e) { + std::cerr << "Parquet read error: " << e.what() << std::endl; + return -1; + } + + std::cout << "Parquet Writing and Reading Complete" << std::endl; + + return 0; +} diff --git a/src/arrow/cpp/examples/parquet/parquet_arrow/CMakeLists.txt b/src/arrow/cpp/examples/parquet/parquet_arrow/CMakeLists.txt new file mode 100644 index 000000000..43eb21957 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/parquet_arrow/CMakeLists.txt @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Require cmake that supports BYPRODUCTS in add_custom_command, ExternalProject_Add [1]. +cmake_minimum_required(VERSION 3.2.0) + +project(parquet_arrow_example) + +include(ExternalProject) +include(FindPkgConfig) +include(GNUInstallDirs) + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake_modules") + +# This ensures that things like gnu++11 get passed correctly +set(CMAKE_CXX_STANDARD 11) + +# We require a C++11 compliant compiler +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +# Look for installed packages the system +find_package(Arrow REQUIRED) +find_package(Parquet REQUIRED) + +include_directories(SYSTEM ${ARROW_INCLUDE_DIR} ${PARQUET_INCLUDE_DIR}) + +add_executable(parquet_arrow_example reader_writer.cc) +target_link_libraries(parquet_arrow_example ${PARQUET_SHARED_LIB} ${ARROW_SHARED_LIB}) diff --git a/src/arrow/cpp/examples/parquet/parquet_arrow/README.md b/src/arrow/cpp/examples/parquet/parquet_arrow/README.md new file mode 100644 index 000000000..e99819fd2 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/parquet_arrow/README.md @@ -0,0 +1,20 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +Using parquet-cpp with the arrow interface +========================================== + +This folder contains an example project that shows how to setup a CMake project +that consumes `parquet-cpp` as a library as well as how you can use the +`parquet/arrow` interface to reading and write Apache Parquet files. diff --git a/src/arrow/cpp/examples/parquet/parquet_arrow/reader_writer.cc b/src/arrow/cpp/examples/parquet/parquet_arrow/reader_writer.cc new file mode 100644 index 000000000..f5d96ec16 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/parquet_arrow/reader_writer.cc @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/io/api.h> +#include <parquet/arrow/reader.h> +#include <parquet/arrow/writer.h> +#include <parquet/exception.h> + +#include <iostream> + +// #0 Build dummy data to pass around +// To have some input data, we first create an Arrow Table that holds +// some data. +std::shared_ptr<arrow::Table> generate_table() { + arrow::Int64Builder i64builder; + PARQUET_THROW_NOT_OK(i64builder.AppendValues({1, 2, 3, 4, 5})); + std::shared_ptr<arrow::Array> i64array; + PARQUET_THROW_NOT_OK(i64builder.Finish(&i64array)); + + arrow::StringBuilder strbuilder; + PARQUET_THROW_NOT_OK(strbuilder.Append("some")); + PARQUET_THROW_NOT_OK(strbuilder.Append("string")); + PARQUET_THROW_NOT_OK(strbuilder.Append("content")); + PARQUET_THROW_NOT_OK(strbuilder.Append("in")); + PARQUET_THROW_NOT_OK(strbuilder.Append("rows")); + std::shared_ptr<arrow::Array> strarray; + PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); + + std::shared_ptr<arrow::Schema> schema = arrow::schema( + {arrow::field("int", arrow::int64()), arrow::field("str", arrow::utf8())}); + + return arrow::Table::Make(schema, {i64array, strarray}); +} + +// #1 Write out the data as a Parquet file +void write_parquet_file(const arrow::Table& table) { + std::shared_ptr<arrow::io::FileOutputStream> outfile; + PARQUET_ASSIGN_OR_THROW( + outfile, arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet")); + // The last argument to the function call is the size of the RowGroup in + // the parquet file. Normally you would choose this to be rather large but + // for the example, we use a small value to have multiple RowGroups. + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(table, arrow::default_memory_pool(), outfile, 3)); +} + +// #2: Fully read in the file +void read_whole_file() { + std::cout << "Reading parquet-arrow-example.parquet at once" << std::endl; + std::shared_ptr<arrow::io::ReadableFile> infile; + PARQUET_ASSIGN_OR_THROW(infile, + arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", + arrow::default_memory_pool())); + + std::unique_ptr<parquet::arrow::FileReader> reader; + PARQUET_THROW_NOT_OK( + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + std::shared_ptr<arrow::Table> table; + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); + std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns() + << " columns." << std::endl; +} + +// #3: Read only a single RowGroup of the parquet file +void read_single_rowgroup() { + std::cout << "Reading first RowGroup of parquet-arrow-example.parquet" << std::endl; + std::shared_ptr<arrow::io::ReadableFile> infile; + PARQUET_ASSIGN_OR_THROW(infile, + arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", + arrow::default_memory_pool())); + + std::unique_ptr<parquet::arrow::FileReader> reader; + PARQUET_THROW_NOT_OK( + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + std::shared_ptr<arrow::Table> table; + PARQUET_THROW_NOT_OK(reader->RowGroup(0)->ReadTable(&table)); + std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns() + << " columns." << std::endl; +} + +// #4: Read only a single column of the whole parquet file +void read_single_column() { + std::cout << "Reading first column of parquet-arrow-example.parquet" << std::endl; + std::shared_ptr<arrow::io::ReadableFile> infile; + PARQUET_ASSIGN_OR_THROW(infile, + arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", + arrow::default_memory_pool())); + + std::unique_ptr<parquet::arrow::FileReader> reader; + PARQUET_THROW_NOT_OK( + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + std::shared_ptr<arrow::ChunkedArray> array; + PARQUET_THROW_NOT_OK(reader->ReadColumn(0, &array)); + PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout)); + std::cout << std::endl; +} + +// #5: Read only a single column of a RowGroup (this is known as ColumnChunk) +// from the Parquet file. +void read_single_column_chunk() { + std::cout << "Reading first ColumnChunk of the first RowGroup of " + "parquet-arrow-example.parquet" + << std::endl; + std::shared_ptr<arrow::io::ReadableFile> infile; + PARQUET_ASSIGN_OR_THROW(infile, + arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", + arrow::default_memory_pool())); + + std::unique_ptr<parquet::arrow::FileReader> reader; + PARQUET_THROW_NOT_OK( + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + std::shared_ptr<arrow::ChunkedArray> array; + PARQUET_THROW_NOT_OK(reader->RowGroup(0)->Column(0)->Read(&array)); + PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout)); + std::cout << std::endl; +} + +int main(int argc, char** argv) { + std::shared_ptr<arrow::Table> table = generate_table(); + write_parquet_file(*table); + read_whole_file(); + read_single_rowgroup(); + read_single_column(); + read_single_column_chunk(); +} diff --git a/src/arrow/cpp/examples/parquet/parquet_stream_api/stream_reader_writer.cc b/src/arrow/cpp/examples/parquet/parquet_stream_api/stream_reader_writer.cc new file mode 100644 index 000000000..64ab7af49 --- /dev/null +++ b/src/arrow/cpp/examples/parquet/parquet_stream_api/stream_reader_writer.cc @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cassert> +#include <chrono> +#include <cstdint> +#include <cstring> +#include <ctime> +#include <iomanip> +#include <iostream> +#include <utility> + +#include "arrow/io/file.h" +#include "parquet/exception.h" +#include "parquet/stream_reader.h" +#include "parquet/stream_writer.h" + +// This file gives an example of how to use the parquet::StreamWriter +// and parquet::StreamReader classes. +// It shows writing/reading of the supported types as well as how a +// user-defined type can be handled. + +template <typename T> +using optional = parquet::StreamReader::optional<T>; + +// Example of a user-defined type to be written to/read from Parquet +// using C++ input/output operators. +class UserTimestamp { + public: + UserTimestamp() = default; + + explicit UserTimestamp(const std::chrono::microseconds v) : ts_{v} {} + + bool operator==(const UserTimestamp& x) const { return ts_ == x.ts_; } + + void dump(std::ostream& os) const { + const auto t = static_cast<std::time_t>( + std::chrono::duration_cast<std::chrono::seconds>(ts_).count()); + os << std::put_time(std::gmtime(&t), "%Y%m%d-%H%M%S"); + } + + void dump(parquet::StreamWriter& os) const { os << ts_; } + + private: + std::chrono::microseconds ts_; +}; + +std::ostream& operator<<(std::ostream& os, const UserTimestamp& v) { + v.dump(os); + return os; +} + +parquet::StreamWriter& operator<<(parquet::StreamWriter& os, const UserTimestamp& v) { + v.dump(os); + return os; +} + +parquet::StreamReader& operator>>(parquet::StreamReader& os, UserTimestamp& v) { + std::chrono::microseconds ts; + + os >> ts; + v = UserTimestamp{ts}; + + return os; +} + +std::shared_ptr<parquet::schema::GroupNode> GetSchema() { + parquet::schema::NodeVector fields; + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "string_field", parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY, + parquet::ConvertedType::UTF8)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "char_field", parquet::Repetition::REQUIRED, parquet::Type::FIXED_LEN_BYTE_ARRAY, + parquet::ConvertedType::NONE, 1)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "char[4]_field", parquet::Repetition::REQUIRED, parquet::Type::FIXED_LEN_BYTE_ARRAY, + parquet::ConvertedType::NONE, 4)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "int8_field", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_8)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "uint16_field", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::UINT_16)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "int32_field", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "uint64_field", parquet::Repetition::OPTIONAL, parquet::Type::INT64, + parquet::ConvertedType::UINT_64)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "double_field", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE)); + + // User defined timestamp type. + fields.push_back(parquet::schema::PrimitiveNode::Make( + "timestamp_field", parquet::Repetition::REQUIRED, parquet::Type::INT64, + parquet::ConvertedType::TIMESTAMP_MICROS)); + + fields.push_back(parquet::schema::PrimitiveNode::Make( + "chrono_milliseconds_field", parquet::Repetition::REQUIRED, parquet::Type::INT64, + parquet::ConvertedType::TIMESTAMP_MILLIS)); + + return std::static_pointer_cast<parquet::schema::GroupNode>( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); +} + +struct TestData { + static const int num_rows = 2000; + + static void init() { std::time(&ts_offset_); } + + static optional<std::string> GetOptString(const int i) { + if (i % 2 == 0) return {}; + return "Str #" + std::to_string(i); + } + static arrow::util::string_view GetStringView(const int i) { + static std::string string; + string = "StringView #" + std::to_string(i); + return arrow::util::string_view(string); + } + static const char* GetCharPtr(const int i) { + static std::string string; + string = "CharPtr #" + std::to_string(i); + return string.c_str(); + } + static char GetChar(const int i) { return i & 1 ? 'M' : 'F'; } + static int8_t GetInt8(const int i) { return static_cast<int8_t>((i % 256) - 128); } + static uint16_t GetUInt16(const int i) { return static_cast<uint16_t>(i); } + static int32_t GetInt32(const int i) { return 3 * i - 17; } + static optional<uint64_t> GetOptUInt64(const int i) { + if (i % 11 == 0) return {}; + return (1ull << 40) + i * i + 101; + } + static double GetDouble(const int i) { return 6.62607004e-34 * 3e8 * i; } + static UserTimestamp GetUserTimestamp(const int i) { + return UserTimestamp{std::chrono::microseconds{(ts_offset_ + 3 * i) * 1000000 + i}}; + } + static std::chrono::milliseconds GetChronoMilliseconds(const int i) { + return std::chrono::milliseconds{(ts_offset_ + 3 * i) * 1000ull + i}; + } + + static char char4_array[4]; + + private: + static std::time_t ts_offset_; +}; + +char TestData::char4_array[] = "XYZ"; +std::time_t TestData::ts_offset_; + +void WriteParquetFile() { + std::shared_ptr<arrow::io::FileOutputStream> outfile; + + PARQUET_ASSIGN_OR_THROW( + outfile, arrow::io::FileOutputStream::Open("parquet-stream-api-example.parquet")); + + parquet::WriterProperties::Builder builder; + +#if defined ARROW_WITH_BROTLI + builder.compression(parquet::Compression::BROTLI); +#elif defined ARROW_WITH_ZSTD + builder.compression(parquet::Compression::ZSTD); +#endif + + parquet::StreamWriter os{ + parquet::ParquetFileWriter::Open(outfile, GetSchema(), builder.build())}; + + os.SetMaxRowGroupSize(1000); + + for (auto i = 0; i < TestData::num_rows; ++i) { + // Output string using 3 different types: std::string, arrow::util::string_view and + // const char *. + switch (i % 3) { + case 0: + os << TestData::GetOptString(i); + break; + case 1: + os << TestData::GetStringView(i); + break; + case 2: + os << TestData::GetCharPtr(i); + break; + } + os << TestData::GetChar(i); + switch (i % 2) { + case 0: + os << TestData::char4_array; + break; + case 1: + os << parquet::StreamWriter::FixedStringView{TestData::GetCharPtr(i), 4}; + break; + } + os << TestData::GetInt8(i); + os << TestData::GetUInt16(i); + os << TestData::GetInt32(i); + os << TestData::GetOptUInt64(i); + os << TestData::GetDouble(i); + os << TestData::GetUserTimestamp(i); + os << TestData::GetChronoMilliseconds(i); + os << parquet::EndRow; + + if (i == TestData::num_rows / 2) { + os << parquet::EndRowGroup; + } + } + std::cout << "Parquet Stream Writing complete." << std::endl; +} + +void ReadParquetFile() { + std::shared_ptr<arrow::io::ReadableFile> infile; + + PARQUET_ASSIGN_OR_THROW( + infile, arrow::io::ReadableFile::Open("parquet-stream-api-example.parquet")); + + parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)}; + + optional<std::string> opt_string; + char ch; + char char_array[4]; + int8_t int8; + uint16_t uint16; + int32_t int32; + optional<uint64_t> opt_uint64; + double d; + UserTimestamp ts_user; + std::chrono::milliseconds ts_ms; + int i; + + for (i = 0; !os.eof(); ++i) { + os >> opt_string; + os >> ch; + os >> char_array; + os >> int8; + os >> uint16; + os >> int32; + os >> opt_uint64; + os >> d; + os >> ts_user; + os >> ts_ms; + os >> parquet::EndRow; + + if (0) { + // For debugging. + std::cout << "Row #" << i << std::endl; + + std::cout << "string["; + if (opt_string) { + std::cout << *opt_string; + } else { + std::cout << "N/A"; + } + std::cout << "] char[" << ch << "] charArray[" << char_array << "] int8[" + << int(int8) << "] uint16[" << uint16 << "] int32[" << int32; + std::cout << "] uint64["; + if (opt_uint64) { + std::cout << *opt_uint64; + } else { + std::cout << "N/A"; + } + std::cout << "] double[" << d << "] tsUser[" << ts_user << "] tsMs[" + << ts_ms.count() << "]" << std::endl; + } + // Check data. + switch (i % 3) { + case 0: + assert(opt_string == TestData::GetOptString(i)); + break; + case 1: + assert(*opt_string == TestData::GetStringView(i)); + break; + case 2: + assert(*opt_string == TestData::GetCharPtr(i)); + break; + } + assert(ch == TestData::GetChar(i)); + switch (i % 2) { + case 0: + assert(0 == std::memcmp(char_array, TestData::char4_array, sizeof(char_array))); + break; + case 1: + assert(0 == std::memcmp(char_array, TestData::GetCharPtr(i), sizeof(char_array))); + break; + } + assert(int8 == TestData::GetInt8(i)); + assert(uint16 == TestData::GetUInt16(i)); + assert(int32 == TestData::GetInt32(i)); + assert(opt_uint64 == TestData::GetOptUInt64(i)); + assert(std::abs(d - TestData::GetDouble(i)) < 1e-6); + assert(ts_user == TestData::GetUserTimestamp(i)); + assert(ts_ms == TestData::GetChronoMilliseconds(i)); + } + assert(TestData::num_rows == i); + + std::cout << "Parquet Stream Reading complete." << std::endl; +} + +int main() { + WriteParquetFile(); + ReadParquetFile(); + + return 0; +} |