summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/tests/util.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/tests/util.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/tests/util.py')
-rw-r--r--src/arrow/python/pyarrow/tests/util.py331
1 files changed, 331 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/tests/util.py b/src/arrow/python/pyarrow/tests/util.py
new file mode 100644
index 000000000..281de69e3
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/util.py
@@ -0,0 +1,331 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Utility functions for testing
+"""
+
+import contextlib
+import decimal
+import gc
+import numpy as np
+import os
+import random
+import signal
+import string
+import subprocess
+import sys
+
+import pytest
+
+import pyarrow as pa
+import pyarrow.fs
+
+
+def randsign():
+ """Randomly choose either 1 or -1.
+
+ Returns
+ -------
+ sign : int
+ """
+ return random.choice((-1, 1))
+
+
+@contextlib.contextmanager
+def random_seed(seed):
+ """Set the random seed inside of a context manager.
+
+ Parameters
+ ----------
+ seed : int
+ The seed to set
+
+ Notes
+ -----
+ This function is useful when you want to set a random seed but not affect
+ the random state of other functions using the random module.
+ """
+ original_state = random.getstate()
+ random.seed(seed)
+ try:
+ yield
+ finally:
+ random.setstate(original_state)
+
+
+def randdecimal(precision, scale):
+ """Generate a random decimal value with specified precision and scale.
+
+ Parameters
+ ----------
+ precision : int
+ The maximum number of digits to generate. Must be an integer between 1
+ and 38 inclusive.
+ scale : int
+ The maximum number of digits following the decimal point. Must be an
+ integer greater than or equal to 0.
+
+ Returns
+ -------
+ decimal_value : decimal.Decimal
+ A random decimal.Decimal object with the specified precision and scale.
+ """
+ assert 1 <= precision <= 38, 'precision must be between 1 and 38 inclusive'
+ if scale < 0:
+ raise ValueError(
+ 'randdecimal does not yet support generating decimals with '
+ 'negative scale'
+ )
+ max_whole_value = 10 ** (precision - scale) - 1
+ whole = random.randint(-max_whole_value, max_whole_value)
+
+ if not scale:
+ return decimal.Decimal(whole)
+
+ max_fractional_value = 10 ** scale - 1
+ fractional = random.randint(0, max_fractional_value)
+
+ return decimal.Decimal(
+ '{}.{}'.format(whole, str(fractional).rjust(scale, '0'))
+ )
+
+
+def random_ascii(length):
+ return bytes(np.random.randint(65, 123, size=length, dtype='i1'))
+
+
+def rands(nchars):
+ """
+ Generate one random string.
+ """
+ RANDS_CHARS = np.array(
+ list(string.ascii_letters + string.digits), dtype=(np.str_, 1))
+ return "".join(np.random.choice(RANDS_CHARS, nchars))
+
+
+def make_dataframe():
+ import pandas as pd
+
+ N = 30
+ df = pd.DataFrame(
+ {col: np.random.randn(N) for col in string.ascii_uppercase[:4]},
+ index=pd.Index([rands(10) for _ in range(N)])
+ )
+ return df
+
+
+def memory_leak_check(f, metric='rss', threshold=1 << 17, iterations=10,
+ check_interval=1):
+ """
+ Execute the function and try to detect a clear memory leak either internal
+ to Arrow or caused by a reference counting problem in the Python binding
+ implementation. Raises exception if a leak detected
+
+ Parameters
+ ----------
+ f : callable
+ Function to invoke on each iteration
+ metric : {'rss', 'vms', 'shared'}, default 'rss'
+ Attribute of psutil.Process.memory_info to use for determining current
+ memory use
+ threshold : int, default 128K
+ Threshold in number of bytes to consider a leak
+ iterations : int, default 10
+ Total number of invocations of f
+ check_interval : int, default 1
+ Number of invocations of f in between each memory use check
+ """
+ import psutil
+ proc = psutil.Process()
+
+ def _get_use():
+ gc.collect()
+ return getattr(proc.memory_info(), metric)
+
+ baseline_use = _get_use()
+
+ def _leak_check():
+ current_use = _get_use()
+ if current_use - baseline_use > threshold:
+ raise Exception("Memory leak detected. "
+ "Departure from baseline {} after {} iterations"
+ .format(current_use - baseline_use, i))
+
+ for i in range(iterations):
+ f()
+ if i % check_interval == 0:
+ _leak_check()
+
+
+def get_modified_env_with_pythonpath():
+ # Prepend pyarrow root directory to PYTHONPATH
+ env = os.environ.copy()
+ existing_pythonpath = env.get('PYTHONPATH', '')
+
+ module_path = os.path.abspath(
+ os.path.dirname(os.path.dirname(pa.__file__)))
+
+ if existing_pythonpath:
+ new_pythonpath = os.pathsep.join((module_path, existing_pythonpath))
+ else:
+ new_pythonpath = module_path
+ env['PYTHONPATH'] = new_pythonpath
+ return env
+
+
+def invoke_script(script_name, *args):
+ subprocess_env = get_modified_env_with_pythonpath()
+
+ dir_path = os.path.dirname(os.path.realpath(__file__))
+ python_file = os.path.join(dir_path, script_name)
+
+ cmd = [sys.executable, python_file]
+ cmd.extend(args)
+
+ subprocess.check_call(cmd, env=subprocess_env)
+
+
+@contextlib.contextmanager
+def changed_environ(name, value):
+ """
+ Temporarily set environment variable *name* to *value*.
+ """
+ orig_value = os.environ.get(name)
+ os.environ[name] = value
+ try:
+ yield
+ finally:
+ if orig_value is None:
+ del os.environ[name]
+ else:
+ os.environ[name] = orig_value
+
+
+@contextlib.contextmanager
+def change_cwd(path):
+ curdir = os.getcwd()
+ os.chdir(str(path))
+ try:
+ yield
+ finally:
+ os.chdir(curdir)
+
+
+@contextlib.contextmanager
+def disabled_gc():
+ gc.disable()
+ try:
+ yield
+ finally:
+ gc.enable()
+
+
+def _filesystem_uri(path):
+ # URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns.
+ if os.name == 'nt':
+ uri = 'file:///{}'.format(path)
+ else:
+ uri = 'file://{}'.format(path)
+ return uri
+
+
+class FSProtocolClass:
+ def __init__(self, path):
+ self._path = path
+
+ def __fspath__(self):
+ return str(self._path)
+
+
+class ProxyHandler(pyarrow.fs.FileSystemHandler):
+ """
+ A dataset handler that proxies to an underlying filesystem. Useful
+ to partially wrap an existing filesystem with partial changes.
+ """
+
+ def __init__(self, fs):
+ self._fs = fs
+
+ def __eq__(self, other):
+ if isinstance(other, ProxyHandler):
+ return self._fs == other._fs
+ return NotImplemented
+
+ def __ne__(self, other):
+ if isinstance(other, ProxyHandler):
+ return self._fs != other._fs
+ return NotImplemented
+
+ def get_type_name(self):
+ return "proxy::" + self._fs.type_name
+
+ def normalize_path(self, path):
+ return self._fs.normalize_path(path)
+
+ def get_file_info(self, paths):
+ return self._fs.get_file_info(paths)
+
+ def get_file_info_selector(self, selector):
+ return self._fs.get_file_info(selector)
+
+ def create_dir(self, path, recursive):
+ return self._fs.create_dir(path, recursive=recursive)
+
+ def delete_dir(self, path):
+ return self._fs.delete_dir(path)
+
+ def delete_dir_contents(self, path):
+ return self._fs.delete_dir_contents(path)
+
+ def delete_root_dir_contents(self):
+ return self._fs.delete_dir_contents("", accept_root_dir=True)
+
+ def delete_file(self, path):
+ return self._fs.delete_file(path)
+
+ def move(self, src, dest):
+ return self._fs.move(src, dest)
+
+ def copy_file(self, src, dest):
+ return self._fs.copy_file(src, dest)
+
+ def open_input_stream(self, path):
+ return self._fs.open_input_stream(path)
+
+ def open_input_file(self, path):
+ return self._fs.open_input_file(path)
+
+ def open_output_stream(self, path, metadata):
+ return self._fs.open_output_stream(path, metadata=metadata)
+
+ def open_append_stream(self, path, metadata):
+ return self._fs.open_append_stream(path, metadata=metadata)
+
+
+def get_raise_signal():
+ if sys.version_info >= (3, 8):
+ return signal.raise_signal
+ elif os.name == 'nt':
+ # On Windows, os.kill() doesn't actually send a signal,
+ # it just terminates the process with the given exit code.
+ pytest.skip("test requires Python 3.8+ on Windows")
+ else:
+ # On Unix, emulate raise_signal() with os.kill().
+ def raise_signal(signum):
+ os.kill(os.getpid(), signum)
+ return raise_signal