summaryrefslogtreecommitdiffstats
path: root/misc/arrow-parquet
diff options
context:
space:
mode:
Diffstat (limited to 'misc/arrow-parquet')
-rw-r--r--misc/arrow-parquet/arrow-test.cpp155
-rwxr-xr-xmisc/arrow-parquet/dump-parquet.py52
-rwxr-xr-xmisc/arrow-parquet/gen-parquet-test-files.py92
3 files changed, 299 insertions, 0 deletions
diff --git a/misc/arrow-parquet/arrow-test.cpp b/misc/arrow-parquet/arrow-test.cpp
new file mode 100644
index 0000000..f29fa9b
--- /dev/null
+++ b/misc/arrow-parquet/arrow-test.cpp
@@ -0,0 +1,155 @@
+#include <arrow/io/file.h>
+#include <parquet/stream_reader.h>
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <sstream>
+
+using std::cout;
+using std::cerr;
+using std::endl;
+
+int main(int argc, char** argv)
+{
+ if (argc < 2)
+ return EXIT_FAILURE;
+
+ const char* filepath = argv[1];
+ std::shared_ptr<arrow::io::ReadableFile> infile;
+
+ PARQUET_ASSIGN_OR_THROW(
+ infile,
+ arrow::io::ReadableFile::Open(filepath));
+
+ auto file_reader = parquet::ParquetFileReader::Open(infile);
+ auto file_md = file_reader->metadata();
+ const parquet::FileMetaData& r = *file_md;
+
+ cout << "num-columns: " << r.num_columns() << endl;
+ cout << "num-rows: " << r.num_rows() << endl;
+ cout << "num-row-groups: " << r.num_row_groups() << endl;
+ cout << "num-schema-elements: " << r.num_schema_elements() << endl;
+ cout << "can-decompress: " << r.can_decompress() << endl;
+
+ for (int i = 0; i < r.num_row_groups(); ++i)
+ {
+ cout << "row-group " << i << ":" << endl;
+ auto rg = r.RowGroup(i);
+ cout << " num rows: " << rg->num_rows() << endl;
+ cout << " total byte size: " << rg->total_byte_size() << endl;
+ cout << " total compressed size: " << rg->total_compressed_size() << endl;
+ cout << " file offset: " << rg->file_offset() << endl;
+ cout << " num columns: " << rg->num_columns() << endl;
+
+ for (int j = 0; j < rg->num_columns(); ++j)
+ {
+ cout << " column chunk " << j << ":" << endl;
+ auto cc = rg->ColumnChunk(j);
+ cout << " file path: " << cc->file_path() << endl;
+ cout << " num values: " << cc->num_values() << endl;
+ cout << " type: " << cc->type() << endl;
+ cout << " data page offset: " << std::dec << cc->data_page_offset() << endl;
+ cout << " has dictionary page: " << cc->has_dictionary_page() << endl;
+ cout << " compression: " << cc->compression() << endl;
+ if (cc->has_dictionary_page())
+ cout << " dictionary page offset: " << cc->dictionary_page_offset() << endl;
+ cout << " has index page: " << cc->has_index_page() << endl;
+ }
+ }
+
+ cout << "schema:" << endl;
+ const parquet::SchemaDescriptor* p = r.schema();
+ cout << " name: " << p->name() << endl;
+ cout << " num-columns: " << p->num_columns() << endl;
+
+ std::vector<const parquet::ColumnDescriptor*> column_types;
+ column_types.reserve(p->num_columns());
+
+ for (int i = 0; i < p->num_columns(); ++i)
+ {
+ cout << "column " << i << ":" << endl;
+ const parquet::ColumnDescriptor* col_desc = p->Column(i);
+ column_types.push_back(col_desc);
+
+ cout << " name: " << col_desc->name() << endl;
+ cout << " physical type: " << col_desc->physical_type() << endl;
+ cout << " converted type: " << col_desc->converted_type() << endl;
+ cout << " type length: " << col_desc->type_length() << endl;
+ }
+
+ parquet::StreamReader stream{std::move(file_reader)};
+
+ if (stream.eof())
+ return EXIT_SUCCESS;
+
+ cout << "row values:" << endl;
+
+ // print column labels
+ for (const parquet::ColumnDescriptor* p : column_types)
+ cout << p->name() << ' ';
+ cout << endl;
+
+ for (int i = 0; i < r.num_rows(); ++i)
+ {
+ for (const parquet::ColumnDescriptor* p : column_types)
+ {
+ switch (p->physical_type())
+ {
+ case parquet::Type::BYTE_ARRAY:
+ {
+ switch (p->converted_type())
+ {
+ case parquet::ConvertedType::UTF8:
+ {
+ std::string v;
+ stream >> v;
+ cout << v << ' ';
+ break;
+ }
+ default:
+ throw std::runtime_error("WIP: unhandled converted type for BYTE_ARRAY");
+ }
+ break;
+ }
+ case parquet::Type::INT64:
+ {
+ switch (p->converted_type())
+ {
+ case parquet::ConvertedType::NONE:
+ {
+ int64_t v;
+ stream >> v;
+ cout << v << ' ';
+ break;
+ }
+ default:
+ throw std::runtime_error("WIP: unhandled converted type for INT64");
+ }
+ break;
+ }
+ case parquet::Type::BOOLEAN:
+ {
+ if (p->converted_type() != parquet::ConvertedType::NONE)
+ throw std::runtime_error("WIP: unhandled covnerted type for BOOLEAN");
+
+ bool v;
+ stream >> v;
+ cout << v << ' ';
+ break;
+ }
+ default:
+ {
+ std::ostringstream os;
+ os << "WIP: not handled type: physical=" << p->physical_type() << "; converted=" << p->converted_type();
+ throw std::runtime_error(os.str());
+ }
+ }
+ }
+
+ stream >> parquet::EndRow;
+ cout << endl;
+ }
+
+ return EXIT_SUCCESS;
+}
diff --git a/misc/arrow-parquet/dump-parquet.py b/misc/arrow-parquet/dump-parquet.py
new file mode 100755
index 0000000..742f37e
--- /dev/null
+++ b/misc/arrow-parquet/dump-parquet.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python3
+########################################################################
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+########################################################################
+
+import argparse
+from pathlib import Path
+
+import pyarrow.parquet as pq
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("path", type=Path)
+ parser.add_argument("--num", "-n", type=int, default=10, help="Number of rows to print.")
+ args = parser.parse_args()
+
+ parquet = pq.ParquetFile(args.path)
+ print(f"num-row-groups: {parquet.metadata.num_row_groups}")
+ print(f"num-rows: {parquet.metadata.num_rows}")
+ print(f"num-columns: {parquet.metadata.num_columns}")
+ print("schema:")
+ for i, name in enumerate(parquet.metadata.schema.names):
+ col = parquet.metadata.schema.column(i)
+ print(f" column {i}:")
+ for attr_name in dir(col):
+ if attr_name.startswith("_"):
+ continue
+ attr_value = getattr(col, attr_name)
+ if callable(attr_value):
+ continue
+ print(f" {attr_name}: {attr_value}")
+
+ for icol, (name, chunked_array) in enumerate(zip(parquet.metadata.schema.names, parquet.read_row_group(0))):
+ print(f"column {icol}:")
+ print(f" name: {name}")
+ print(f" type: {chunked_array.type}")
+ print(f" num-chunks: {chunked_array.num_chunks}")
+ print(f" data:")
+ for i, v in enumerate(chunked_array.chunks[0]):
+ if i == args.num:
+ break
+ print(f" - {v}")
+
+
+if __name__ == "__main__":
+ main()
+
diff --git a/misc/arrow-parquet/gen-parquet-test-files.py b/misc/arrow-parquet/gen-parquet-test-files.py
new file mode 100755
index 0000000..a066536
--- /dev/null
+++ b/misc/arrow-parquet/gen-parquet-test-files.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3
+########################################################################
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+########################################################################
+
+import pandas as pd
+from pathlib import Path
+
+
+def gen_str(pos):
+ return gen_str.values[pos]
+
+
+gen_str.values = (
+ "ubergeek",
+ "thwarter",
+ "ironfist",
+ "turkoman",
+ "mesozoan",
+ "seatsale",
+ "hardtack",
+ "phyllary",
+ "hydriads",
+ "stranger",
+ "cistuses",
+ "capelets",
+ "headband",
+ "dudesses",
+ "aminases",
+ "eggwhite",
+ "boxscore",
+ "upsurges",
+ "blowlamp",
+ "dionysia",
+ "rejecter",
+ "keratome",
+ "diasters",
+ "juddocks",
+ "gownsman",
+ "sweepsaw",
+ "chuckeys",
+ "partyers",
+ "weredogs",
+ "exabytes",
+)
+
+
+def main():
+ data = {
+ "float64 with nan": [1.2, 3.4, None, None, 5.6]
+ }
+
+ df = pd.DataFrame(data=data)
+ df["float64 with nan"] = df["float64 with nan"].astype("float64")
+
+ print(df)
+ print(df.dtypes)
+
+ outdir = Path("../../test/parquet/basic")
+ outpath = outdir / "float-with-non.parquet"
+ df.to_parquet(outpath, engine="pyarrow", compression=None)
+
+ row_size = 10
+ data = {
+ "int32": [v for v in range(row_size)],
+ "int64": [v * 10 + v for v in range(row_size)],
+ "float32": [-v for v in range(row_size)],
+ "float64": [-v - 21 for v in range(row_size)],
+ "boolean": [(v & 0x01) != 0 for v in range(row_size)],
+ "string": [gen_str(pos) for pos in range(row_size)],
+ }
+ df = pd.DataFrame(data=data)
+ df["int32"] = df["int32"].astype("int32")
+ df["int64"] = df["int64"].astype("int64")
+ df["float32"] = df["float32"].astype("float32")
+ df["float64"] = df["float64"].astype("float64")
+
+ print(df)
+ print(df.dtypes)
+
+ df.to_parquet(outdir / f"basic-nocomp.parquet", engine="pyarrow", compression=None)
+ for comp in ("gzip", "snappy", "zstd"):
+ df.to_parquet(outdir / f"basic-{comp}.parquet", engine="pyarrow", compression=comp)
+
+
+if __name__ == "__main__":
+ main()
+