diff options
Diffstat (limited to 'misc/arrow-parquet')
-rw-r--r-- | misc/arrow-parquet/arrow-test.cpp | 155 | ||||
-rwxr-xr-x | misc/arrow-parquet/dump-parquet.py | 52 | ||||
-rwxr-xr-x | misc/arrow-parquet/gen-parquet-test-files.py | 92 |
3 files changed, 299 insertions, 0 deletions
diff --git a/misc/arrow-parquet/arrow-test.cpp b/misc/arrow-parquet/arrow-test.cpp new file mode 100644 index 0000000..f29fa9b --- /dev/null +++ b/misc/arrow-parquet/arrow-test.cpp @@ -0,0 +1,155 @@ +#include <arrow/io/file.h> +#include <parquet/stream_reader.h> + +#include <iostream> +#include <memory> +#include <string> +#include <sstream> + +using std::cout; +using std::cerr; +using std::endl; + +int main(int argc, char** argv) +{ + if (argc < 2) + return EXIT_FAILURE; + + const char* filepath = argv[1]; + std::shared_ptr<arrow::io::ReadableFile> infile; + + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(filepath)); + + auto file_reader = parquet::ParquetFileReader::Open(infile); + auto file_md = file_reader->metadata(); + const parquet::FileMetaData& r = *file_md; + + cout << "num-columns: " << r.num_columns() << endl; + cout << "num-rows: " << r.num_rows() << endl; + cout << "num-row-groups: " << r.num_row_groups() << endl; + cout << "num-schema-elements: " << r.num_schema_elements() << endl; + cout << "can-decompress: " << r.can_decompress() << endl; + + for (int i = 0; i < r.num_row_groups(); ++i) + { + cout << "row-group " << i << ":" << endl; + auto rg = r.RowGroup(i); + cout << " num rows: " << rg->num_rows() << endl; + cout << " total byte size: " << rg->total_byte_size() << endl; + cout << " total compressed size: " << rg->total_compressed_size() << endl; + cout << " file offset: " << rg->file_offset() << endl; + cout << " num columns: " << rg->num_columns() << endl; + + for (int j = 0; j < rg->num_columns(); ++j) + { + cout << " column chunk " << j << ":" << endl; + auto cc = rg->ColumnChunk(j); + cout << " file path: " << cc->file_path() << endl; + cout << " num values: " << cc->num_values() << endl; + cout << " type: " << cc->type() << endl; + cout << " data page offset: " << std::dec << cc->data_page_offset() << endl; + cout << " has dictionary page: " << cc->has_dictionary_page() << endl; + cout << " compression: " << cc->compression() << endl; + if (cc->has_dictionary_page()) + cout << " dictionary page offset: " << cc->dictionary_page_offset() << endl; + cout << " has index page: " << cc->has_index_page() << endl; + } + } + + cout << "schema:" << endl; + const parquet::SchemaDescriptor* p = r.schema(); + cout << " name: " << p->name() << endl; + cout << " num-columns: " << p->num_columns() << endl; + + std::vector<const parquet::ColumnDescriptor*> column_types; + column_types.reserve(p->num_columns()); + + for (int i = 0; i < p->num_columns(); ++i) + { + cout << "column " << i << ":" << endl; + const parquet::ColumnDescriptor* col_desc = p->Column(i); + column_types.push_back(col_desc); + + cout << " name: " << col_desc->name() << endl; + cout << " physical type: " << col_desc->physical_type() << endl; + cout << " converted type: " << col_desc->converted_type() << endl; + cout << " type length: " << col_desc->type_length() << endl; + } + + parquet::StreamReader stream{std::move(file_reader)}; + + if (stream.eof()) + return EXIT_SUCCESS; + + cout << "row values:" << endl; + + // print column labels + for (const parquet::ColumnDescriptor* p : column_types) + cout << p->name() << ' '; + cout << endl; + + for (int i = 0; i < r.num_rows(); ++i) + { + for (const parquet::ColumnDescriptor* p : column_types) + { + switch (p->physical_type()) + { + case parquet::Type::BYTE_ARRAY: + { + switch (p->converted_type()) + { + case parquet::ConvertedType::UTF8: + { + std::string v; + stream >> v; + cout << v << ' '; + break; + } + default: + throw std::runtime_error("WIP: unhandled converted type for BYTE_ARRAY"); + } + break; + } + case parquet::Type::INT64: + { + switch (p->converted_type()) + { + case parquet::ConvertedType::NONE: + { + int64_t v; + stream >> v; + cout << v << ' '; + break; + } + default: + throw std::runtime_error("WIP: unhandled converted type for INT64"); + } + break; + } + case parquet::Type::BOOLEAN: + { + if (p->converted_type() != parquet::ConvertedType::NONE) + throw std::runtime_error("WIP: unhandled covnerted type for BOOLEAN"); + + bool v; + stream >> v; + cout << v << ' '; + break; + } + default: + { + std::ostringstream os; + os << "WIP: not handled type: physical=" << p->physical_type() << "; converted=" << p->converted_type(); + throw std::runtime_error(os.str()); + } + } + } + + stream >> parquet::EndRow; + cout << endl; + } + + return EXIT_SUCCESS; +} diff --git a/misc/arrow-parquet/dump-parquet.py b/misc/arrow-parquet/dump-parquet.py new file mode 100755 index 0000000..742f37e --- /dev/null +++ b/misc/arrow-parquet/dump-parquet.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +######################################################################## +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +######################################################################## + +import argparse +from pathlib import Path + +import pyarrow.parquet as pq + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("path", type=Path) + parser.add_argument("--num", "-n", type=int, default=10, help="Number of rows to print.") + args = parser.parse_args() + + parquet = pq.ParquetFile(args.path) + print(f"num-row-groups: {parquet.metadata.num_row_groups}") + print(f"num-rows: {parquet.metadata.num_rows}") + print(f"num-columns: {parquet.metadata.num_columns}") + print("schema:") + for i, name in enumerate(parquet.metadata.schema.names): + col = parquet.metadata.schema.column(i) + print(f" column {i}:") + for attr_name in dir(col): + if attr_name.startswith("_"): + continue + attr_value = getattr(col, attr_name) + if callable(attr_value): + continue + print(f" {attr_name}: {attr_value}") + + for icol, (name, chunked_array) in enumerate(zip(parquet.metadata.schema.names, parquet.read_row_group(0))): + print(f"column {icol}:") + print(f" name: {name}") + print(f" type: {chunked_array.type}") + print(f" num-chunks: {chunked_array.num_chunks}") + print(f" data:") + for i, v in enumerate(chunked_array.chunks[0]): + if i == args.num: + break + print(f" - {v}") + + +if __name__ == "__main__": + main() + diff --git a/misc/arrow-parquet/gen-parquet-test-files.py b/misc/arrow-parquet/gen-parquet-test-files.py new file mode 100755 index 0000000..a066536 --- /dev/null +++ b/misc/arrow-parquet/gen-parquet-test-files.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +######################################################################## +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +######################################################################## + +import pandas as pd +from pathlib import Path + + +def gen_str(pos): + return gen_str.values[pos] + + +gen_str.values = ( + "ubergeek", + "thwarter", + "ironfist", + "turkoman", + "mesozoan", + "seatsale", + "hardtack", + "phyllary", + "hydriads", + "stranger", + "cistuses", + "capelets", + "headband", + "dudesses", + "aminases", + "eggwhite", + "boxscore", + "upsurges", + "blowlamp", + "dionysia", + "rejecter", + "keratome", + "diasters", + "juddocks", + "gownsman", + "sweepsaw", + "chuckeys", + "partyers", + "weredogs", + "exabytes", +) + + +def main(): + data = { + "float64 with nan": [1.2, 3.4, None, None, 5.6] + } + + df = pd.DataFrame(data=data) + df["float64 with nan"] = df["float64 with nan"].astype("float64") + + print(df) + print(df.dtypes) + + outdir = Path("../../test/parquet/basic") + outpath = outdir / "float-with-non.parquet" + df.to_parquet(outpath, engine="pyarrow", compression=None) + + row_size = 10 + data = { + "int32": [v for v in range(row_size)], + "int64": [v * 10 + v for v in range(row_size)], + "float32": [-v for v in range(row_size)], + "float64": [-v - 21 for v in range(row_size)], + "boolean": [(v & 0x01) != 0 for v in range(row_size)], + "string": [gen_str(pos) for pos in range(row_size)], + } + df = pd.DataFrame(data=data) + df["int32"] = df["int32"].astype("int32") + df["int64"] = df["int64"].astype("int64") + df["float32"] = df["float32"].astype("float32") + df["float64"] = df["float64"].astype("float64") + + print(df) + print(df.dtypes) + + df.to_parquet(outdir / f"basic-nocomp.parquet", engine="pyarrow", compression=None) + for comp in ("gzip", "snappy", "zstd"): + df.to_parquet(outdir / f"basic-{comp}.parquet", engine="pyarrow", compression=comp) + + +if __name__ == "__main__": + main() + |