summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/benchmarks/parquet.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/benchmarks/parquet.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/benchmarks/parquet.py')
-rw-r--r--src/arrow/python/benchmarks/parquet.py156
1 files changed, 156 insertions, 0 deletions
diff --git a/src/arrow/python/benchmarks/parquet.py b/src/arrow/python/benchmarks/parquet.py
new file mode 100644
index 000000000..3aeca425b
--- /dev/null
+++ b/src/arrow/python/benchmarks/parquet.py
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import shutil
+import tempfile
+
+from pandas.util.testing import rands
+import numpy as np
+import pandas as pd
+
+import pyarrow as pa
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+
+class ParquetManifestCreation(object):
+ """Benchmark creating a parquet manifest."""
+
+ size = 10 ** 6
+ tmpdir = None
+
+ param_names = ('num_partitions', 'num_threads')
+ params = [(10, 100, 1000), (1, 8)]
+
+ def setup(self, num_partitions, num_threads):
+ if pq is None:
+ raise NotImplementedError("Parquet support not enabled")
+
+ self.tmpdir = tempfile.mkdtemp('benchmark_parquet')
+ rnd = np.random.RandomState(42)
+ num1 = rnd.randint(0, num_partitions, size=self.size)
+ num2 = rnd.randint(0, 1000, size=self.size)
+ output_df = pd.DataFrame({'num1': num1, 'num2': num2})
+ output_table = pa.Table.from_pandas(output_df)
+ pq.write_to_dataset(output_table, self.tmpdir, ['num1'])
+
+ def teardown(self, num_partitions, num_threads):
+ if self.tmpdir is not None:
+ shutil.rmtree(self.tmpdir)
+
+ def time_manifest_creation(self, num_partitions, num_threads):
+ pq.ParquetManifest(self.tmpdir, metadata_nthreads=num_threads)
+
+
+class ParquetWriteBinary(object):
+
+ def setup(self):
+ nuniques = 100000
+ value_size = 50
+ length = 1000000
+ num_cols = 10
+
+ unique_values = np.array([rands(value_size) for
+ i in range(nuniques)], dtype='O')
+ values = unique_values[np.random.randint(0, nuniques, size=length)]
+ self.table = pa.table([pa.array(values) for i in range(num_cols)],
+ names=['f{}'.format(i) for i in range(num_cols)])
+ self.table_df = self.table.to_pandas()
+
+ def time_write_binary_table(self):
+ out = pa.BufferOutputStream()
+ pq.write_table(self.table, out)
+
+ def time_write_binary_table_uncompressed(self):
+ out = pa.BufferOutputStream()
+ pq.write_table(self.table, out, compression='none')
+
+ def time_write_binary_table_no_dictionary(self):
+ out = pa.BufferOutputStream()
+ pq.write_table(self.table, out, use_dictionary=False)
+
+ def time_convert_pandas_and_write_binary_table(self):
+ out = pa.BufferOutputStream()
+ pq.write_table(pa.table(self.table_df), out)
+
+
+def generate_dict_strings(string_size, nunique, length, random_order=True):
+ uniques = np.array([rands(string_size) for i in range(nunique)], dtype='O')
+ if random_order:
+ indices = np.random.randint(0, nunique, size=length).astype('i4')
+ else:
+ indices = np.arange(nunique).astype('i4').repeat(length // nunique)
+ return pa.DictionaryArray.from_arrays(indices, uniques)
+
+
+def generate_dict_table(num_cols, string_size, nunique, length,
+ random_order=True):
+ data = generate_dict_strings(string_size, nunique, length,
+ random_order=random_order)
+ return pa.table([
+ data for i in range(num_cols)
+ ], names=['f{}'.format(i) for i in range(num_cols)])
+
+
+class ParquetWriteDictionaries(object):
+
+ param_names = ('nunique',)
+ params = [(1000), (100000)]
+
+ def setup(self, nunique):
+ self.num_cols = 10
+ self.value_size = 32
+ self.nunique = nunique
+ self.length = 10000000
+
+ self.table = generate_dict_table(self.num_cols, self.value_size,
+ self.nunique, self.length)
+ self.table_sequential = generate_dict_table(self.num_cols,
+ self.value_size,
+ self.nunique, self.length,
+ random_order=False)
+
+ def time_write_random_order(self, nunique):
+ pq.write_table(self.table, pa.BufferOutputStream())
+
+ def time_write_sequential(self, nunique):
+ pq.write_table(self.table_sequential, pa.BufferOutputStream())
+
+
+class ParquetManyColumns(object):
+
+ total_cells = 10000000
+ param_names = ('num_cols',)
+ params = [100, 1000, 10000]
+
+ def setup(self, num_cols):
+ num_rows = self.total_cells // num_cols
+ self.table = pa.table({'c' + str(i): np.random.randn(num_rows)
+ for i in range(num_cols)})
+
+ out = pa.BufferOutputStream()
+ pq.write_table(self.table, out)
+ self.buf = out.getvalue()
+
+ def time_write(self, num_cols):
+ out = pa.BufferOutputStream()
+ pq.write_table(self.table, out)
+
+ def time_read(self, num_cols):
+ pq.read_table(self.buf)