summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/_compute.pyx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_compute.pyx
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_compute.pyx')
-rw-r--r--src/arrow/python/pyarrow/_compute.pyx1296
1 files changed, 1296 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_compute.pyx b/src/arrow/python/pyarrow/_compute.pyx
new file mode 100644
index 000000000..d62c9c0ee
--- /dev/null
+++ b/src/arrow/python/pyarrow/_compute.pyx
@@ -0,0 +1,1296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from collections import namedtuple
+
+from pyarrow.lib import frombytes, tobytes, ordered_dict
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func):
+ """
+ Wrap a C++ scalar Function in a ScalarFunction object.
+ """
+ cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction)
+ func.init(sp_func)
+ return func
+
+
+cdef wrap_vector_function(const shared_ptr[CFunction]& sp_func):
+ """
+ Wrap a C++ vector Function in a VectorFunction object.
+ """
+ cdef VectorFunction func = VectorFunction.__new__(VectorFunction)
+ func.init(sp_func)
+ return func
+
+
+cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func):
+ """
+ Wrap a C++ aggregate Function in a ScalarAggregateFunction object.
+ """
+ cdef ScalarAggregateFunction func = \
+ ScalarAggregateFunction.__new__(ScalarAggregateFunction)
+ func.init(sp_func)
+ return func
+
+
+cdef wrap_hash_aggregate_function(const shared_ptr[CFunction]& sp_func):
+ """
+ Wrap a C++ aggregate Function in a HashAggregateFunction object.
+ """
+ cdef HashAggregateFunction func = \
+ HashAggregateFunction.__new__(HashAggregateFunction)
+ func.init(sp_func)
+ return func
+
+
+cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func):
+ """
+ Wrap a C++ meta Function in a MetaFunction object.
+ """
+ cdef MetaFunction func = MetaFunction.__new__(MetaFunction)
+ func.init(sp_func)
+ return func
+
+
+cdef wrap_function(const shared_ptr[CFunction]& sp_func):
+ """
+ Wrap a C++ Function in a Function object.
+
+ This dispatches to specialized wrappers depending on the function kind.
+ """
+ if sp_func.get() == NULL:
+ raise ValueError("Function was NULL")
+
+ cdef FunctionKind c_kind = sp_func.get().kind()
+ if c_kind == FunctionKind_SCALAR:
+ return wrap_scalar_function(sp_func)
+ elif c_kind == FunctionKind_VECTOR:
+ return wrap_vector_function(sp_func)
+ elif c_kind == FunctionKind_SCALAR_AGGREGATE:
+ return wrap_scalar_aggregate_function(sp_func)
+ elif c_kind == FunctionKind_HASH_AGGREGATE:
+ return wrap_hash_aggregate_function(sp_func)
+ elif c_kind == FunctionKind_META:
+ return wrap_meta_function(sp_func)
+ else:
+ raise NotImplementedError("Unknown Function::Kind")
+
+
+cdef wrap_scalar_kernel(const CScalarKernel* c_kernel):
+ if c_kernel == NULL:
+ raise ValueError("Kernel was NULL")
+ cdef ScalarKernel kernel = ScalarKernel.__new__(ScalarKernel)
+ kernel.init(c_kernel)
+ return kernel
+
+
+cdef wrap_vector_kernel(const CVectorKernel* c_kernel):
+ if c_kernel == NULL:
+ raise ValueError("Kernel was NULL")
+ cdef VectorKernel kernel = VectorKernel.__new__(VectorKernel)
+ kernel.init(c_kernel)
+ return kernel
+
+
+cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel):
+ if c_kernel == NULL:
+ raise ValueError("Kernel was NULL")
+ cdef ScalarAggregateKernel kernel = \
+ ScalarAggregateKernel.__new__(ScalarAggregateKernel)
+ kernel.init(c_kernel)
+ return kernel
+
+
+cdef wrap_hash_aggregate_kernel(const CHashAggregateKernel* c_kernel):
+ if c_kernel == NULL:
+ raise ValueError("Kernel was NULL")
+ cdef HashAggregateKernel kernel = \
+ HashAggregateKernel.__new__(HashAggregateKernel)
+ kernel.init(c_kernel)
+ return kernel
+
+
+cdef class Kernel(_Weakrefable):
+ """
+ A kernel object.
+
+ Kernels handle the execution of a Function for a certain signature.
+ """
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly"
+ .format(self.__class__.__name__))
+
+
+cdef class ScalarKernel(Kernel):
+ cdef const CScalarKernel* kernel
+
+ cdef void init(self, const CScalarKernel* kernel) except *:
+ self.kernel = kernel
+
+ def __repr__(self):
+ return ("ScalarKernel<{}>"
+ .format(frombytes(self.kernel.signature.get().ToString())))
+
+
+cdef class VectorKernel(Kernel):
+ cdef const CVectorKernel* kernel
+
+ cdef void init(self, const CVectorKernel* kernel) except *:
+ self.kernel = kernel
+
+ def __repr__(self):
+ return ("VectorKernel<{}>"
+ .format(frombytes(self.kernel.signature.get().ToString())))
+
+
+cdef class ScalarAggregateKernel(Kernel):
+ cdef const CScalarAggregateKernel* kernel
+
+ cdef void init(self, const CScalarAggregateKernel* kernel) except *:
+ self.kernel = kernel
+
+ def __repr__(self):
+ return ("ScalarAggregateKernel<{}>"
+ .format(frombytes(self.kernel.signature.get().ToString())))
+
+
+cdef class HashAggregateKernel(Kernel):
+ cdef const CHashAggregateKernel* kernel
+
+ cdef void init(self, const CHashAggregateKernel* kernel) except *:
+ self.kernel = kernel
+
+ def __repr__(self):
+ return ("HashAggregateKernel<{}>"
+ .format(frombytes(self.kernel.signature.get().ToString())))
+
+
+FunctionDoc = namedtuple(
+ "FunctionDoc",
+ ("summary", "description", "arg_names", "options_class"))
+
+
+cdef class Function(_Weakrefable):
+ """
+ A compute function.
+
+ A function implements a certain logical computation over a range of
+ possible input signatures. Each signature accepts a range of input
+ types and is implemented by a given Kernel.
+
+ Functions can be of different kinds:
+
+ * "scalar" functions apply an item-wise computation over all items
+ of their inputs. Each item in the output only depends on the values
+ of the inputs at the same position. Examples: addition, comparisons,
+ string predicates...
+
+ * "vector" functions apply a collection-wise computation, such that
+ each item in the output may depend on the values of several items
+ in each input. Examples: dictionary encoding, sorting, extracting
+ unique values...
+
+ * "scalar_aggregate" functions reduce the dimensionality of the inputs by
+ applying a reduction function. Examples: sum, min_max, mode...
+
+ * "hash_aggregate" functions apply a reduction function to an input
+ subdivided by grouping criteria. They may not be directly called.
+ Examples: hash_sum, hash_min_max...
+
+ * "meta" functions dispatch to other functions.
+ """
+
+ cdef:
+ shared_ptr[CFunction] sp_func
+ CFunction* base_func
+
+ _kind_map = {
+ FunctionKind_SCALAR: "scalar",
+ FunctionKind_VECTOR: "vector",
+ FunctionKind_SCALAR_AGGREGATE: "scalar_aggregate",
+ FunctionKind_HASH_AGGREGATE: "hash_aggregate",
+ FunctionKind_META: "meta",
+ }
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly"
+ .format(self.__class__.__name__))
+
+ cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+ self.sp_func = sp_func
+ self.base_func = sp_func.get()
+
+ def __repr__(self):
+ return ("arrow.compute.Function<name={}, kind={}, "
+ "arity={}, num_kernels={}>"
+ .format(self.name, self.kind, self.arity, self.num_kernels))
+
+ def __reduce__(self):
+ # Reduction uses the global registry
+ return get_function, (self.name,)
+
+ @property
+ def name(self):
+ """
+ The function name.
+ """
+ return frombytes(self.base_func.name())
+
+ @property
+ def arity(self):
+ """
+ The function arity.
+
+ If Ellipsis (i.e. `...`) is returned, the function takes a variable
+ number of arguments.
+ """
+ cdef CArity arity = self.base_func.arity()
+ if arity.is_varargs:
+ return ...
+ else:
+ return arity.num_args
+
+ @property
+ def kind(self):
+ """
+ The function kind.
+ """
+ cdef FunctionKind c_kind = self.base_func.kind()
+ try:
+ return self._kind_map[c_kind]
+ except KeyError:
+ raise NotImplementedError("Unknown Function::Kind")
+
+ @property
+ def _doc(self):
+ """
+ The C++-like function documentation (for internal use).
+ """
+ cdef CFunctionDoc c_doc = self.base_func.doc()
+ return FunctionDoc(frombytes(c_doc.summary),
+ frombytes(c_doc.description),
+ [frombytes(s) for s in c_doc.arg_names],
+ frombytes(c_doc.options_class))
+
+ @property
+ def num_kernels(self):
+ """
+ The number of kernels implementing this function.
+ """
+ return self.base_func.num_kernels()
+
+ def call(self, args, FunctionOptions options=None,
+ MemoryPool memory_pool=None):
+ """
+ Call the function on the given arguments.
+ """
+ cdef:
+ const CFunctionOptions* c_options = NULL
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+ CExecContext c_exec_ctx = CExecContext(pool)
+ vector[CDatum] c_args
+ CDatum result
+
+ _pack_compute_args(args, &c_args)
+
+ if options is not None:
+ c_options = options.get_options()
+
+ with nogil:
+ result = GetResultValue(
+ self.base_func.Execute(c_args, c_options, &c_exec_ctx)
+ )
+
+ return wrap_datum(result)
+
+
+cdef class ScalarFunction(Function):
+ cdef const CScalarFunction* func
+
+ cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+ Function.init(self, sp_func)
+ self.func = <const CScalarFunction*> sp_func.get()
+
+ @property
+ def kernels(self):
+ """
+ The kernels implementing this function.
+ """
+ cdef vector[const CScalarKernel*] kernels = self.func.kernels()
+ return [wrap_scalar_kernel(k) for k in kernels]
+
+
+cdef class VectorFunction(Function):
+ cdef const CVectorFunction* func
+
+ cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+ Function.init(self, sp_func)
+ self.func = <const CVectorFunction*> sp_func.get()
+
+ @property
+ def kernels(self):
+ """
+ The kernels implementing this function.
+ """
+ cdef vector[const CVectorKernel*] kernels = self.func.kernels()
+ return [wrap_vector_kernel(k) for k in kernels]
+
+
+cdef class ScalarAggregateFunction(Function):
+ cdef const CScalarAggregateFunction* func
+
+ cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+ Function.init(self, sp_func)
+ self.func = <const CScalarAggregateFunction*> sp_func.get()
+
+ @property
+ def kernels(self):
+ """
+ The kernels implementing this function.
+ """
+ cdef vector[const CScalarAggregateKernel*] kernels = \
+ self.func.kernels()
+ return [wrap_scalar_aggregate_kernel(k) for k in kernels]
+
+
+cdef class HashAggregateFunction(Function):
+ cdef const CHashAggregateFunction* func
+
+ cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+ Function.init(self, sp_func)
+ self.func = <const CHashAggregateFunction*> sp_func.get()
+
+ @property
+ def kernels(self):
+ """
+ The kernels implementing this function.
+ """
+ cdef vector[const CHashAggregateKernel*] kernels = self.func.kernels()
+ return [wrap_hash_aggregate_kernel(k) for k in kernels]
+
+
+cdef class MetaFunction(Function):
+ cdef const CMetaFunction* func
+
+ cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+ Function.init(self, sp_func)
+ self.func = <const CMetaFunction*> sp_func.get()
+
+ # Since num_kernels is exposed, also expose a kernels property
+ @property
+ def kernels(self):
+ """
+ The kernels implementing this function.
+ """
+ return []
+
+
+cdef _pack_compute_args(object values, vector[CDatum]* out):
+ for val in values:
+ if isinstance(val, (list, np.ndarray)):
+ val = lib.asarray(val)
+
+ if isinstance(val, Array):
+ out.push_back(CDatum((<Array> val).sp_array))
+ continue
+ elif isinstance(val, ChunkedArray):
+ out.push_back(CDatum((<ChunkedArray> val).sp_chunked_array))
+ continue
+ elif isinstance(val, Scalar):
+ out.push_back(CDatum((<Scalar> val).unwrap()))
+ continue
+ elif isinstance(val, RecordBatch):
+ out.push_back(CDatum((<RecordBatch> val).sp_batch))
+ continue
+ elif isinstance(val, Table):
+ out.push_back(CDatum((<Table> val).sp_table))
+ continue
+ else:
+ # Is it a Python scalar?
+ try:
+ scal = lib.scalar(val)
+ except Exception:
+ # Raise dedicated error below
+ pass
+ else:
+ out.push_back(CDatum((<Scalar> scal).unwrap()))
+ continue
+
+ raise TypeError(f"Got unexpected argument type {type(val)} "
+ "for compute function")
+
+
+cdef class FunctionRegistry(_Weakrefable):
+ cdef CFunctionRegistry* registry
+
+ def __init__(self):
+ self.registry = GetFunctionRegistry()
+
+ def list_functions(self):
+ """
+ Return all function names in the registry.
+ """
+ cdef vector[c_string] names = self.registry.GetFunctionNames()
+ return [frombytes(name) for name in names]
+
+ def get_function(self, name):
+ """
+ Look up a function by name in the registry.
+
+ Parameters
+ ----------
+ name : str
+ The name of the function to lookup
+ """
+ cdef:
+ c_string c_name = tobytes(name)
+ shared_ptr[CFunction] func
+ with nogil:
+ func = GetResultValue(self.registry.GetFunction(c_name))
+ return wrap_function(func)
+
+
+cdef FunctionRegistry _global_func_registry = FunctionRegistry()
+
+
+def function_registry():
+ return _global_func_registry
+
+
+def get_function(name):
+ """
+ Get a function by name.
+
+ The function is looked up in the global registry
+ (as returned by `function_registry()`).
+
+ Parameters
+ ----------
+ name : str
+ The name of the function to lookup
+ """
+ return _global_func_registry.get_function(name)
+
+
+def list_functions():
+ """
+ Return all function names in the global registry.
+ """
+ return _global_func_registry.list_functions()
+
+
+def call_function(name, args, options=None, memory_pool=None):
+ """
+ Call a named function.
+
+ The function is looked up in the global registry
+ (as returned by `function_registry()`).
+
+ Parameters
+ ----------
+ name : str
+ The name of the function to call.
+ args : list
+ The arguments to the function.
+ options : optional
+ options provided to the function.
+ memory_pool : MemoryPool, optional
+ memory pool to use for allocations during function execution.
+ """
+ func = _global_func_registry.get_function(name)
+ return func.call(args, options=options, memory_pool=memory_pool)
+
+
+cdef class FunctionOptions(_Weakrefable):
+ __slots__ = () # avoid mistakingly creating attributes
+
+ cdef const CFunctionOptions* get_options(self) except NULL:
+ return self.wrapped.get()
+
+ cdef void init(self, unique_ptr[CFunctionOptions] options):
+ self.wrapped = move(options)
+
+ def serialize(self):
+ cdef:
+ CResult[shared_ptr[CBuffer]] res = self.get_options().Serialize()
+ shared_ptr[CBuffer] c_buf = GetResultValue(res)
+ return pyarrow_wrap_buffer(c_buf)
+
+ @staticmethod
+ def deserialize(buf):
+ """
+ Deserialize options for a function.
+
+ Parameters
+ ----------
+ buf : Buffer
+ The buffer containing the data to deserialize.
+ """
+ cdef:
+ shared_ptr[CBuffer] c_buf = pyarrow_unwrap_buffer(buf)
+ CResult[unique_ptr[CFunctionOptions]] maybe_options = \
+ DeserializeFunctionOptions(deref(c_buf))
+ unique_ptr[CFunctionOptions] c_options
+ c_options = move(GetResultValue(move(maybe_options)))
+ type_name = frombytes(c_options.get().options_type().type_name())
+ module = globals()
+ if type_name not in module:
+ raise ValueError(f'Cannot deserialize "{type_name}"')
+ klass = module[type_name]
+ options = klass.__new__(klass)
+ (<FunctionOptions> options).init(move(c_options))
+ return options
+
+ def __repr__(self):
+ type_name = self.__class__.__name__
+ # Remove {} so we can use our own braces
+ string_repr = frombytes(self.get_options().ToString())[1:-1]
+ return f"{type_name}({string_repr})"
+
+ def __eq__(self, FunctionOptions other):
+ return self.get_options().Equals(deref(other.get_options()))
+
+
+def _raise_invalid_function_option(value, description, *,
+ exception_class=ValueError):
+ raise exception_class(f"\"{value}\" is not a valid {description}")
+
+
+# NOTE:
+# To properly expose the constructor signature of FunctionOptions
+# subclasses, we use a two-level inheritance:
+# 1. a C extension class that implements option validation and setting
+# (won't expose function signatures because of
+# https://github.com/cython/cython/issues/3873)
+# 2. a Python derived class that implements the constructor
+
+cdef class _CastOptions(FunctionOptions):
+ cdef CCastOptions* options
+
+ cdef void init(self, unique_ptr[CFunctionOptions] options):
+ FunctionOptions.init(self, move(options))
+ self.options = <CCastOptions*> self.wrapped.get()
+
+ def _set_options(self, DataType target_type, allow_int_overflow,
+ allow_time_truncate, allow_time_overflow,
+ allow_decimal_truncate, allow_float_truncate,
+ allow_invalid_utf8):
+ self.init(unique_ptr[CFunctionOptions](new CCastOptions()))
+ self._set_type(target_type)
+ if allow_int_overflow is not None:
+ self.allow_int_overflow = allow_int_overflow
+ if allow_time_truncate is not None:
+ self.allow_time_truncate = allow_time_truncate
+ if allow_time_overflow is not None:
+ self.allow_time_overflow = allow_time_overflow
+ if allow_decimal_truncate is not None:
+ self.allow_decimal_truncate = allow_decimal_truncate
+ if allow_float_truncate is not None:
+ self.allow_float_truncate = allow_float_truncate
+ if allow_invalid_utf8 is not None:
+ self.allow_invalid_utf8 = allow_invalid_utf8
+
+ def _set_type(self, target_type=None):
+ if target_type is not None:
+ deref(self.options).to_type = \
+ (<DataType> ensure_type(target_type)).sp_type
+
+ def _set_safe(self):
+ self.init(unique_ptr[CFunctionOptions](
+ new CCastOptions(CCastOptions.Safe())))
+
+ def _set_unsafe(self):
+ self.init(unique_ptr[CFunctionOptions](
+ new CCastOptions(CCastOptions.Unsafe())))
+
+ def is_safe(self):
+ return not (deref(self.options).allow_int_overflow or
+ deref(self.options).allow_time_truncate or
+ deref(self.options).allow_time_overflow or
+ deref(self.options).allow_decimal_truncate or
+ deref(self.options).allow_float_truncate or
+ deref(self.options).allow_invalid_utf8)
+
+ @property
+ def allow_int_overflow(self):
+ return deref(self.options).allow_int_overflow
+
+ @allow_int_overflow.setter
+ def allow_int_overflow(self, c_bool flag):
+ deref(self.options).allow_int_overflow = flag
+
+ @property
+ def allow_time_truncate(self):
+ return deref(self.options).allow_time_truncate
+
+ @allow_time_truncate.setter
+ def allow_time_truncate(self, c_bool flag):
+ deref(self.options).allow_time_truncate = flag
+
+ @property
+ def allow_time_overflow(self):
+ return deref(self.options).allow_time_overflow
+
+ @allow_time_overflow.setter
+ def allow_time_overflow(self, c_bool flag):
+ deref(self.options).allow_time_overflow = flag
+
+ @property
+ def allow_decimal_truncate(self):
+ return deref(self.options).allow_decimal_truncate
+
+ @allow_decimal_truncate.setter
+ def allow_decimal_truncate(self, c_bool flag):
+ deref(self.options).allow_decimal_truncate = flag
+
+ @property
+ def allow_float_truncate(self):
+ return deref(self.options).allow_float_truncate
+
+ @allow_float_truncate.setter
+ def allow_float_truncate(self, c_bool flag):
+ deref(self.options).allow_float_truncate = flag
+
+ @property
+ def allow_invalid_utf8(self):
+ return deref(self.options).allow_invalid_utf8
+
+ @allow_invalid_utf8.setter
+ def allow_invalid_utf8(self, c_bool flag):
+ deref(self.options).allow_invalid_utf8 = flag
+
+
+class CastOptions(_CastOptions):
+
+ def __init__(self, target_type=None, *, allow_int_overflow=None,
+ allow_time_truncate=None, allow_time_overflow=None,
+ allow_decimal_truncate=None, allow_float_truncate=None,
+ allow_invalid_utf8=None):
+ self._set_options(target_type, allow_int_overflow, allow_time_truncate,
+ allow_time_overflow, allow_decimal_truncate,
+ allow_float_truncate, allow_invalid_utf8)
+
+ @staticmethod
+ def safe(target_type=None):
+ """"
+ Create a CastOptions for a safe cast.
+
+ Parameters
+ ----------
+ target_type : optional
+ Target cast type for the safe cast.
+ """
+ self = CastOptions()
+ self._set_safe()
+ self._set_type(target_type)
+ return self
+
+ @staticmethod
+ def unsafe(target_type=None):
+ """"
+ Create a CastOptions for an unsafe cast.
+
+ Parameters
+ ----------
+ target_type : optional
+ Target cast type for the unsafe cast.
+ """
+ self = CastOptions()
+ self._set_unsafe()
+ self._set_type(target_type)
+ return self
+
+
+cdef class _ElementWiseAggregateOptions(FunctionOptions):
+ def _set_options(self, skip_nulls):
+ self.wrapped.reset(new CElementWiseAggregateOptions(skip_nulls))
+
+
+class ElementWiseAggregateOptions(_ElementWiseAggregateOptions):
+ def __init__(self, *, skip_nulls=True):
+ self._set_options(skip_nulls)
+
+
+cdef CRoundMode unwrap_round_mode(round_mode) except *:
+ if round_mode == "down":
+ return CRoundMode_DOWN
+ elif round_mode == "up":
+ return CRoundMode_UP
+ elif round_mode == "towards_zero":
+ return CRoundMode_TOWARDS_ZERO
+ elif round_mode == "towards_infinity":
+ return CRoundMode_TOWARDS_INFINITY
+ elif round_mode == "half_down":
+ return CRoundMode_HALF_DOWN
+ elif round_mode == "half_up":
+ return CRoundMode_HALF_UP
+ elif round_mode == "half_towards_zero":
+ return CRoundMode_HALF_TOWARDS_ZERO
+ elif round_mode == "half_towards_infinity":
+ return CRoundMode_HALF_TOWARDS_INFINITY
+ elif round_mode == "half_to_even":
+ return CRoundMode_HALF_TO_EVEN
+ elif round_mode == "half_to_odd":
+ return CRoundMode_HALF_TO_ODD
+ _raise_invalid_function_option(round_mode, "round mode")
+
+
+cdef class _RoundOptions(FunctionOptions):
+ def _set_options(self, ndigits, round_mode):
+ self.wrapped.reset(
+ new CRoundOptions(ndigits, unwrap_round_mode(round_mode))
+ )
+
+
+class RoundOptions(_RoundOptions):
+ def __init__(self, ndigits=0, round_mode="half_to_even"):
+ self._set_options(ndigits, round_mode)
+
+
+cdef class _RoundToMultipleOptions(FunctionOptions):
+ def _set_options(self, multiple, round_mode):
+ self.wrapped.reset(
+ new CRoundToMultipleOptions(multiple,
+ unwrap_round_mode(round_mode))
+ )
+
+
+class RoundToMultipleOptions(_RoundToMultipleOptions):
+ def __init__(self, multiple=1.0, round_mode="half_to_even"):
+ self._set_options(multiple, round_mode)
+
+
+cdef class _JoinOptions(FunctionOptions):
+ _null_handling_map = {
+ "emit_null": CJoinNullHandlingBehavior_EMIT_NULL,
+ "skip": CJoinNullHandlingBehavior_SKIP,
+ "replace": CJoinNullHandlingBehavior_REPLACE,
+ }
+
+ def _set_options(self, null_handling, null_replacement):
+ try:
+ self.wrapped.reset(
+ new CJoinOptions(self._null_handling_map[null_handling],
+ tobytes(null_replacement))
+ )
+ except KeyError:
+ _raise_invalid_function_option(null_handling, "null handling")
+
+
+class JoinOptions(_JoinOptions):
+ def __init__(self, null_handling="emit_null", null_replacement=""):
+ self._set_options(null_handling, null_replacement)
+
+
+cdef class _MatchSubstringOptions(FunctionOptions):
+ def _set_options(self, pattern, ignore_case):
+ self.wrapped.reset(
+ new CMatchSubstringOptions(tobytes(pattern), ignore_case)
+ )
+
+
+class MatchSubstringOptions(_MatchSubstringOptions):
+ def __init__(self, pattern, *, ignore_case=False):
+ self._set_options(pattern, ignore_case)
+
+
+cdef class _PadOptions(FunctionOptions):
+ def _set_options(self, width, padding):
+ self.wrapped.reset(new CPadOptions(width, tobytes(padding)))
+
+
+class PadOptions(_PadOptions):
+ def __init__(self, width, padding=' '):
+ self._set_options(width, padding)
+
+
+cdef class _TrimOptions(FunctionOptions):
+ def _set_options(self, characters):
+ self.wrapped.reset(new CTrimOptions(tobytes(characters)))
+
+
+class TrimOptions(_TrimOptions):
+ def __init__(self, characters):
+ self._set_options(tobytes(characters))
+
+
+cdef class _ReplaceSliceOptions(FunctionOptions):
+ def _set_options(self, start, stop, replacement):
+ self.wrapped.reset(
+ new CReplaceSliceOptions(start, stop, tobytes(replacement))
+ )
+
+
+class ReplaceSliceOptions(_ReplaceSliceOptions):
+ def __init__(self, start, stop, replacement):
+ self._set_options(start, stop, replacement)
+
+
+cdef class _ReplaceSubstringOptions(FunctionOptions):
+ def _set_options(self, pattern, replacement, max_replacements):
+ self.wrapped.reset(
+ new CReplaceSubstringOptions(tobytes(pattern),
+ tobytes(replacement),
+ max_replacements)
+ )
+
+
+class ReplaceSubstringOptions(_ReplaceSubstringOptions):
+ def __init__(self, pattern, replacement, *, max_replacements=-1):
+ self._set_options(pattern, replacement, max_replacements)
+
+
+cdef class _ExtractRegexOptions(FunctionOptions):
+ def _set_options(self, pattern):
+ self.wrapped.reset(new CExtractRegexOptions(tobytes(pattern)))
+
+
+class ExtractRegexOptions(_ExtractRegexOptions):
+ def __init__(self, pattern):
+ self._set_options(pattern)
+
+
+cdef class _SliceOptions(FunctionOptions):
+ def _set_options(self, start, stop, step):
+ self.wrapped.reset(new CSliceOptions(start, stop, step))
+
+
+class SliceOptions(_SliceOptions):
+ def __init__(self, start, stop=sys.maxsize, step=1):
+ self._set_options(start, stop, step)
+
+
+cdef class _FilterOptions(FunctionOptions):
+ _null_selection_map = {
+ "drop": CFilterNullSelectionBehavior_DROP,
+ "emit_null": CFilterNullSelectionBehavior_EMIT_NULL,
+ }
+
+ def _set_options(self, null_selection_behavior):
+ try:
+ self.wrapped.reset(
+ new CFilterOptions(
+ self._null_selection_map[null_selection_behavior]
+ )
+ )
+ except KeyError:
+ _raise_invalid_function_option(null_selection_behavior,
+ "null selection behavior")
+
+
+class FilterOptions(_FilterOptions):
+ def __init__(self, null_selection_behavior="drop"):
+ self._set_options(null_selection_behavior)
+
+
+cdef class _DictionaryEncodeOptions(FunctionOptions):
+ _null_encoding_map = {
+ "encode": CDictionaryEncodeNullEncodingBehavior_ENCODE,
+ "mask": CDictionaryEncodeNullEncodingBehavior_MASK,
+ }
+
+ def _set_options(self, null_encoding):
+ try:
+ self.wrapped.reset(
+ new CDictionaryEncodeOptions(
+ self._null_encoding_map[null_encoding]
+ )
+ )
+ except KeyError:
+ _raise_invalid_function_option(null_encoding, "null encoding")
+
+
+class DictionaryEncodeOptions(_DictionaryEncodeOptions):
+ def __init__(self, null_encoding="mask"):
+ self._set_options(null_encoding)
+
+
+cdef class _TakeOptions(FunctionOptions):
+ def _set_options(self, boundscheck):
+ self.wrapped.reset(new CTakeOptions(boundscheck))
+
+
+class TakeOptions(_TakeOptions):
+ def __init__(self, *, boundscheck=True):
+ self._set_options(boundscheck)
+
+
+cdef class _MakeStructOptions(FunctionOptions):
+ def _set_options(self, field_names, field_nullability, field_metadata):
+ cdef:
+ vector[c_string] c_field_names
+ vector[shared_ptr[const CKeyValueMetadata]] c_field_metadata
+ for name in field_names:
+ c_field_names.push_back(tobytes(name))
+ for metadata in field_metadata:
+ c_field_metadata.push_back(pyarrow_unwrap_metadata(metadata))
+ self.wrapped.reset(
+ new CMakeStructOptions(c_field_names, field_nullability,
+ c_field_metadata)
+ )
+
+
+class MakeStructOptions(_MakeStructOptions):
+ def __init__(self, field_names, *, field_nullability=None,
+ field_metadata=None):
+ if field_nullability is None:
+ field_nullability = [True] * len(field_names)
+ if field_metadata is None:
+ field_metadata = [None] * len(field_names)
+ self._set_options(field_names, field_nullability, field_metadata)
+
+
+cdef class _ScalarAggregateOptions(FunctionOptions):
+ def _set_options(self, skip_nulls, min_count):
+ self.wrapped.reset(new CScalarAggregateOptions(skip_nulls, min_count))
+
+
+class ScalarAggregateOptions(_ScalarAggregateOptions):
+ def __init__(self, *, skip_nulls=True, min_count=1):
+ self._set_options(skip_nulls, min_count)
+
+
+cdef class _CountOptions(FunctionOptions):
+ _mode_map = {
+ "only_valid": CCountMode_ONLY_VALID,
+ "only_null": CCountMode_ONLY_NULL,
+ "all": CCountMode_ALL,
+ }
+
+ def _set_options(self, mode):
+ try:
+ self.wrapped.reset(new CCountOptions(self._mode_map[mode]))
+ except KeyError:
+ _raise_invalid_function_option(mode, "count mode")
+
+
+class CountOptions(_CountOptions):
+ def __init__(self, mode="only_valid"):
+ self._set_options(mode)
+
+
+cdef class _IndexOptions(FunctionOptions):
+ def _set_options(self, scalar):
+ self.wrapped.reset(new CIndexOptions(pyarrow_unwrap_scalar(scalar)))
+
+
+class IndexOptions(_IndexOptions):
+ """
+ Options for the index kernel.
+
+ Parameters
+ ----------
+ value : Scalar
+ The value to search for.
+ """
+
+ def __init__(self, value):
+ self._set_options(value)
+
+
+cdef class _ModeOptions(FunctionOptions):
+ def _set_options(self, n, skip_nulls, min_count):
+ self.wrapped.reset(new CModeOptions(n, skip_nulls, min_count))
+
+
+class ModeOptions(_ModeOptions):
+ def __init__(self, n=1, *, skip_nulls=True, min_count=0):
+ self._set_options(n, skip_nulls, min_count)
+
+
+cdef class _SetLookupOptions(FunctionOptions):
+ def _set_options(self, value_set, c_bool skip_nulls):
+ cdef unique_ptr[CDatum] valset
+ if isinstance(value_set, Array):
+ valset.reset(new CDatum((<Array> value_set).sp_array))
+ elif isinstance(value_set, ChunkedArray):
+ valset.reset(
+ new CDatum((<ChunkedArray> value_set).sp_chunked_array)
+ )
+ elif isinstance(value_set, Scalar):
+ valset.reset(new CDatum((<Scalar> value_set).unwrap()))
+ else:
+ _raise_invalid_function_option(value_set, "value set",
+ exception_class=TypeError)
+
+ self.wrapped.reset(new CSetLookupOptions(deref(valset), skip_nulls))
+
+
+class SetLookupOptions(_SetLookupOptions):
+ def __init__(self, value_set, *, skip_nulls=False):
+ self._set_options(value_set, skip_nulls)
+
+
+cdef class _StrptimeOptions(FunctionOptions):
+ _unit_map = {
+ "s": TimeUnit_SECOND,
+ "ms": TimeUnit_MILLI,
+ "us": TimeUnit_MICRO,
+ "ns": TimeUnit_NANO,
+ }
+
+ def _set_options(self, format, unit):
+ try:
+ self.wrapped.reset(
+ new CStrptimeOptions(tobytes(format), self._unit_map[unit])
+ )
+ except KeyError:
+ _raise_invalid_function_option(unit, "time unit")
+
+
+class StrptimeOptions(_StrptimeOptions):
+ def __init__(self, format, unit):
+ self._set_options(format, unit)
+
+
+cdef class _StrftimeOptions(FunctionOptions):
+ def _set_options(self, format, locale):
+ self.wrapped.reset(
+ new CStrftimeOptions(tobytes(format), tobytes(locale))
+ )
+
+
+class StrftimeOptions(_StrftimeOptions):
+ def __init__(self, format="%Y-%m-%dT%H:%M:%S", locale="C"):
+ self._set_options(format, locale)
+
+
+cdef class _DayOfWeekOptions(FunctionOptions):
+ def _set_options(self, count_from_zero, week_start):
+ self.wrapped.reset(
+ new CDayOfWeekOptions(count_from_zero, week_start)
+ )
+
+
+class DayOfWeekOptions(_DayOfWeekOptions):
+ def __init__(self, *, count_from_zero=True, week_start=1):
+ self._set_options(count_from_zero, week_start)
+
+
+cdef class _WeekOptions(FunctionOptions):
+ def _set_options(self, week_starts_monday, count_from_zero,
+ first_week_is_fully_in_year):
+ self.wrapped.reset(
+ new CWeekOptions(week_starts_monday, count_from_zero,
+ first_week_is_fully_in_year)
+ )
+
+
+class WeekOptions(_WeekOptions):
+ def __init__(self, *, week_starts_monday=True, count_from_zero=False,
+ first_week_is_fully_in_year=False):
+ self._set_options(week_starts_monday,
+ count_from_zero, first_week_is_fully_in_year)
+
+
+cdef class _AssumeTimezoneOptions(FunctionOptions):
+ _ambiguous_map = {
+ "raise": CAssumeTimezoneAmbiguous_AMBIGUOUS_RAISE,
+ "earliest": CAssumeTimezoneAmbiguous_AMBIGUOUS_EARLIEST,
+ "latest": CAssumeTimezoneAmbiguous_AMBIGUOUS_LATEST,
+ }
+ _nonexistent_map = {
+ "raise": CAssumeTimezoneNonexistent_NONEXISTENT_RAISE,
+ "earliest": CAssumeTimezoneNonexistent_NONEXISTENT_EARLIEST,
+ "latest": CAssumeTimezoneNonexistent_NONEXISTENT_LATEST,
+ }
+
+ def _set_options(self, timezone, ambiguous, nonexistent):
+ if ambiguous not in self._ambiguous_map:
+ _raise_invalid_function_option(ambiguous,
+ "'ambiguous' timestamp handling")
+ if nonexistent not in self._nonexistent_map:
+ _raise_invalid_function_option(nonexistent,
+ "'nonexistent' timestamp handling")
+ self.wrapped.reset(
+ new CAssumeTimezoneOptions(tobytes(timezone),
+ self._ambiguous_map[ambiguous],
+ self._nonexistent_map[nonexistent])
+ )
+
+
+class AssumeTimezoneOptions(_AssumeTimezoneOptions):
+ def __init__(self, timezone, *, ambiguous="raise", nonexistent="raise"):
+ self._set_options(timezone, ambiguous, nonexistent)
+
+
+cdef class _NullOptions(FunctionOptions):
+ def _set_options(self, nan_is_null):
+ self.wrapped.reset(new CNullOptions(nan_is_null))
+
+
+class NullOptions(_NullOptions):
+ def __init__(self, *, nan_is_null=False):
+ self._set_options(nan_is_null)
+
+
+cdef class _VarianceOptions(FunctionOptions):
+ def _set_options(self, ddof, skip_nulls, min_count):
+ self.wrapped.reset(new CVarianceOptions(ddof, skip_nulls, min_count))
+
+
+class VarianceOptions(_VarianceOptions):
+ def __init__(self, *, ddof=0, skip_nulls=True, min_count=0):
+ self._set_options(ddof, skip_nulls, min_count)
+
+
+cdef class _SplitOptions(FunctionOptions):
+ def _set_options(self, max_splits, reverse):
+ self.wrapped.reset(new CSplitOptions(max_splits, reverse))
+
+
+class SplitOptions(_SplitOptions):
+ def __init__(self, *, max_splits=-1, reverse=False):
+ self._set_options(max_splits, reverse)
+
+
+cdef class _SplitPatternOptions(FunctionOptions):
+ def _set_options(self, pattern, max_splits, reverse):
+ self.wrapped.reset(
+ new CSplitPatternOptions(tobytes(pattern), max_splits, reverse)
+ )
+
+
+class SplitPatternOptions(_SplitPatternOptions):
+ def __init__(self, pattern, *, max_splits=-1, reverse=False):
+ self._set_options(pattern, max_splits, reverse)
+
+
+cdef CSortOrder unwrap_sort_order(order) except *:
+ if order == "ascending":
+ return CSortOrder_Ascending
+ elif order == "descending":
+ return CSortOrder_Descending
+ _raise_invalid_function_option(order, "sort order")
+
+
+cdef CNullPlacement unwrap_null_placement(null_placement) except *:
+ if null_placement == "at_start":
+ return CNullPlacement_AtStart
+ elif null_placement == "at_end":
+ return CNullPlacement_AtEnd
+ _raise_invalid_function_option(null_placement, "null placement")
+
+
+cdef class _PartitionNthOptions(FunctionOptions):
+ def _set_options(self, pivot, null_placement):
+ self.wrapped.reset(new CPartitionNthOptions(
+ pivot, unwrap_null_placement(null_placement)))
+
+
+class PartitionNthOptions(_PartitionNthOptions):
+ def __init__(self, pivot, *, null_placement="at_end"):
+ self._set_options(pivot, null_placement)
+
+
+cdef class _ArraySortOptions(FunctionOptions):
+ def _set_options(self, order, null_placement):
+ self.wrapped.reset(new CArraySortOptions(
+ unwrap_sort_order(order), unwrap_null_placement(null_placement)))
+
+
+class ArraySortOptions(_ArraySortOptions):
+ def __init__(self, order="ascending", *, null_placement="at_end"):
+ self._set_options(order, null_placement)
+
+
+cdef class _SortOptions(FunctionOptions):
+ def _set_options(self, sort_keys, null_placement):
+ cdef vector[CSortKey] c_sort_keys
+ for name, order in sort_keys:
+ c_sort_keys.push_back(
+ CSortKey(tobytes(name), unwrap_sort_order(order))
+ )
+ self.wrapped.reset(new CSortOptions(
+ c_sort_keys, unwrap_null_placement(null_placement)))
+
+
+class SortOptions(_SortOptions):
+ def __init__(self, sort_keys, *, null_placement="at_end"):
+ self._set_options(sort_keys, null_placement)
+
+
+cdef class _SelectKOptions(FunctionOptions):
+ def _set_options(self, k, sort_keys):
+ cdef vector[CSortKey] c_sort_keys
+ for name, order in sort_keys:
+ c_sort_keys.push_back(
+ CSortKey(tobytes(name), unwrap_sort_order(order))
+ )
+ self.wrapped.reset(new CSelectKOptions(k, c_sort_keys))
+
+
+class SelectKOptions(_SelectKOptions):
+ def __init__(self, k, sort_keys):
+ self._set_options(k, sort_keys)
+
+
+cdef class _QuantileOptions(FunctionOptions):
+ _interp_map = {
+ "linear": CQuantileInterp_LINEAR,
+ "lower": CQuantileInterp_LOWER,
+ "higher": CQuantileInterp_HIGHER,
+ "nearest": CQuantileInterp_NEAREST,
+ "midpoint": CQuantileInterp_MIDPOINT,
+ }
+
+ def _set_options(self, quantiles, interp, skip_nulls, min_count):
+ try:
+ self.wrapped.reset(
+ new CQuantileOptions(quantiles, self._interp_map[interp],
+ skip_nulls, min_count)
+ )
+ except KeyError:
+ _raise_invalid_function_option(interp, "quantile interpolation")
+
+
+class QuantileOptions(_QuantileOptions):
+ def __init__(self, q=0.5, *, interpolation="linear", skip_nulls=True,
+ min_count=0):
+ if not isinstance(q, (list, tuple, np.ndarray)):
+ q = [q]
+ self._set_options(q, interpolation, skip_nulls, min_count)
+
+
+cdef class _TDigestOptions(FunctionOptions):
+ def _set_options(self, quantiles, delta, buffer_size, skip_nulls,
+ min_count):
+ self.wrapped.reset(
+ new CTDigestOptions(quantiles, delta, buffer_size, skip_nulls,
+ min_count)
+ )
+
+
+class TDigestOptions(_TDigestOptions):
+ def __init__(self, q=0.5, *, delta=100, buffer_size=500, skip_nulls=True,
+ min_count=0):
+ if not isinstance(q, (list, tuple, np.ndarray)):
+ q = [q]
+ self._set_options(q, delta, buffer_size, skip_nulls, min_count)