From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/arrow/python/benchmarks/parquet.py | 156 +++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 src/arrow/python/benchmarks/parquet.py (limited to 'src/arrow/python/benchmarks/parquet.py') diff --git a/src/arrow/python/benchmarks/parquet.py b/src/arrow/python/benchmarks/parquet.py new file mode 100644 index 000000000..3aeca425b --- /dev/null +++ b/src/arrow/python/benchmarks/parquet.py @@ -0,0 +1,156 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import shutil +import tempfile + +from pandas.util.testing import rands +import numpy as np +import pandas as pd + +import pyarrow as pa +try: + import pyarrow.parquet as pq +except ImportError: + pq = None + + +class ParquetManifestCreation(object): + """Benchmark creating a parquet manifest.""" + + size = 10 ** 6 + tmpdir = None + + param_names = ('num_partitions', 'num_threads') + params = [(10, 100, 1000), (1, 8)] + + def setup(self, num_partitions, num_threads): + if pq is None: + raise NotImplementedError("Parquet support not enabled") + + self.tmpdir = tempfile.mkdtemp('benchmark_parquet') + rnd = np.random.RandomState(42) + num1 = rnd.randint(0, num_partitions, size=self.size) + num2 = rnd.randint(0, 1000, size=self.size) + output_df = pd.DataFrame({'num1': num1, 'num2': num2}) + output_table = pa.Table.from_pandas(output_df) + pq.write_to_dataset(output_table, self.tmpdir, ['num1']) + + def teardown(self, num_partitions, num_threads): + if self.tmpdir is not None: + shutil.rmtree(self.tmpdir) + + def time_manifest_creation(self, num_partitions, num_threads): + pq.ParquetManifest(self.tmpdir, metadata_nthreads=num_threads) + + +class ParquetWriteBinary(object): + + def setup(self): + nuniques = 100000 + value_size = 50 + length = 1000000 + num_cols = 10 + + unique_values = np.array([rands(value_size) for + i in range(nuniques)], dtype='O') + values = unique_values[np.random.randint(0, nuniques, size=length)] + self.table = pa.table([pa.array(values) for i in range(num_cols)], + names=['f{}'.format(i) for i in range(num_cols)]) + self.table_df = self.table.to_pandas() + + def time_write_binary_table(self): + out = pa.BufferOutputStream() + pq.write_table(self.table, out) + + def time_write_binary_table_uncompressed(self): + out = pa.BufferOutputStream() + pq.write_table(self.table, out, compression='none') + + def time_write_binary_table_no_dictionary(self): + out = pa.BufferOutputStream() + pq.write_table(self.table, out, use_dictionary=False) + + def time_convert_pandas_and_write_binary_table(self): + out = pa.BufferOutputStream() + pq.write_table(pa.table(self.table_df), out) + + +def generate_dict_strings(string_size, nunique, length, random_order=True): + uniques = np.array([rands(string_size) for i in range(nunique)], dtype='O') + if random_order: + indices = np.random.randint(0, nunique, size=length).astype('i4') + else: + indices = np.arange(nunique).astype('i4').repeat(length // nunique) + return pa.DictionaryArray.from_arrays(indices, uniques) + + +def generate_dict_table(num_cols, string_size, nunique, length, + random_order=True): + data = generate_dict_strings(string_size, nunique, length, + random_order=random_order) + return pa.table([ + data for i in range(num_cols) + ], names=['f{}'.format(i) for i in range(num_cols)]) + + +class ParquetWriteDictionaries(object): + + param_names = ('nunique',) + params = [(1000), (100000)] + + def setup(self, nunique): + self.num_cols = 10 + self.value_size = 32 + self.nunique = nunique + self.length = 10000000 + + self.table = generate_dict_table(self.num_cols, self.value_size, + self.nunique, self.length) + self.table_sequential = generate_dict_table(self.num_cols, + self.value_size, + self.nunique, self.length, + random_order=False) + + def time_write_random_order(self, nunique): + pq.write_table(self.table, pa.BufferOutputStream()) + + def time_write_sequential(self, nunique): + pq.write_table(self.table_sequential, pa.BufferOutputStream()) + + +class ParquetManyColumns(object): + + total_cells = 10000000 + param_names = ('num_cols',) + params = [100, 1000, 10000] + + def setup(self, num_cols): + num_rows = self.total_cells // num_cols + self.table = pa.table({'c' + str(i): np.random.randn(num_rows) + for i in range(num_cols)}) + + out = pa.BufferOutputStream() + pq.write_table(self.table, out) + self.buf = out.getvalue() + + def time_write(self, num_cols): + out = pa.BufferOutputStream() + pq.write_table(self.table, out) + + def time_read(self, num_cols): + pq.read_table(self.buf) -- cgit v1.2.3