diff options
Diffstat (limited to 'src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc')
-rw-r--r-- | src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc b/src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc new file mode 100644 index 000000000..cd9b89fe3 --- /dev/null +++ b/src/arrow/cpp/examples/arrow/dataset_parquet_scan_example.cc @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <arrow/compute/exec/expression.h> +#include <arrow/dataset/dataset.h> +#include <arrow/dataset/discovery.h> +#include <arrow/dataset/file_base.h> +#include <arrow/dataset/file_parquet.h> +#include <arrow/dataset/scanner.h> +#include <arrow/filesystem/filesystem.h> +#include <arrow/filesystem/path_util.h> + +#include <cstdlib> +#include <iostream> + +using arrow::field; +using arrow::int16; +using arrow::Schema; +using arrow::Table; + +namespace fs = arrow::fs; + +namespace ds = arrow::dataset; + +namespace cp = arrow::compute; + +#define ABORT_ON_FAILURE(expr) \ + do { \ + arrow::Status status_ = (expr); \ + if (!status_.ok()) { \ + std::cerr << status_.message() << std::endl; \ + abort(); \ + } \ + } while (0); + +struct Configuration { + // Increase the ds::DataSet by repeating `repeat` times the ds::Dataset. + size_t repeat = 1; + + // Indicates if the Scanner::ToTable should consume in parallel. + bool use_threads = true; + + // Indicates to the Scan operator which columns are requested. This + // optimization avoid deserializing unneeded columns. + std::vector<std::string> projected_columns = {"pickup_at", "dropoff_at", + "total_amount"}; + + // Indicates the filter by which rows will be filtered. This optimization can + // make use of partition information and/or file metadata if possible. + cp::Expression filter = + cp::greater(cp::field_ref("total_amount"), cp::literal(1000.0f)); + + ds::InspectOptions inspect_options{}; + ds::FinishOptions finish_options{}; +} conf; + +std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri, + std::string* path) { + return fs::FileSystemFromUri(uri, path).ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetDatasetFromDirectory( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string dir) { + // Find all files under `path` + fs::FileSelector s; + s.base_dir = dir; + s.recursive = true; + + ds::FileSystemFactoryOptions options; + // The factory will try to build a child dataset. + auto factory = ds::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie(); + + // Try to infer a common schema for all files. + auto schema = factory->Inspect(conf.inspect_options).ValueOrDie(); + // Caller can optionally decide another schema as long as it is compatible + // with the previous one, e.g. `factory->Finish(compatible_schema)`. + auto child = factory->Finish(conf.finish_options).ValueOrDie(); + + ds::DatasetVector children{conf.repeat, child}; + auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); + + return dataset.ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetParquetDatasetFromMetadata( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string metadata_path) { + ds::ParquetFactoryOptions options; + auto factory = + ds::ParquetDatasetFactory::Make(metadata_path, fs, format, options).ValueOrDie(); + return factory->Finish().ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetDatasetFromFile( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string file) { + ds::FileSystemFactoryOptions options; + // The factory will try to build a child dataset. + auto factory = + ds::FileSystemDatasetFactory::Make(fs, {file}, format, options).ValueOrDie(); + + // Try to infer a common schema for all files. + auto schema = factory->Inspect(conf.inspect_options).ValueOrDie(); + // Caller can optionally decide another schema as long as it is compatible + // with the previous one, e.g. `factory->Finish(compatible_schema)`. + auto child = factory->Finish(conf.finish_options).ValueOrDie(); + + ds::DatasetVector children; + children.resize(conf.repeat, child); + auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); + + return dataset.ValueOrDie(); +} + +std::shared_ptr<ds::Dataset> GetDatasetFromPath( + std::shared_ptr<fs::FileSystem> fs, std::shared_ptr<ds::ParquetFileFormat> format, + std::string path) { + auto info = fs->GetFileInfo(path).ValueOrDie(); + if (info.IsDirectory()) { + return GetDatasetFromDirectory(fs, format, path); + } + + auto dirname_basename = arrow::fs::internal::GetAbstractPathParent(path); + auto basename = dirname_basename.second; + + if (basename == "_metadata") { + return GetParquetDatasetFromMetadata(fs, format, path); + } + + return GetDatasetFromFile(fs, format, path); +} + +std::shared_ptr<ds::Scanner> GetScannerFromDataset(std::shared_ptr<ds::Dataset> dataset, + std::vector<std::string> columns, + cp::Expression filter, + bool use_threads) { + auto scanner_builder = dataset->NewScan().ValueOrDie(); + + if (!columns.empty()) { + ABORT_ON_FAILURE(scanner_builder->Project(columns)); + } + + ABORT_ON_FAILURE(scanner_builder->Filter(filter)); + + ABORT_ON_FAILURE(scanner_builder->UseThreads(use_threads)); + + return scanner_builder->Finish().ValueOrDie(); +} + +std::shared_ptr<Table> GetTableFromScanner(std::shared_ptr<ds::Scanner> scanner) { + return scanner->ToTable().ValueOrDie(); +} + +int main(int argc, char** argv) { + auto format = std::make_shared<ds::ParquetFileFormat>(); + + if (argc != 2) { + // Fake success for CI purposes. + return EXIT_SUCCESS; + } + + std::string path; + auto fs = GetFileSystemFromUri(argv[1], &path); + + auto dataset = GetDatasetFromPath(fs, format, path); + + auto scanner = GetScannerFromDataset(dataset, conf.projected_columns, conf.filter, + conf.use_threads); + + auto table = GetTableFromScanner(scanner); + std::cout << "Table size: " << table->num_rows() << "\n"; + + return EXIT_SUCCESS; +} |