summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/_dataset.pyx
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/python/pyarrow/_dataset.pyx')
-rw-r--r--src/arrow/python/pyarrow/_dataset.pyx3408
1 files changed, 3408 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_dataset.pyx b/src/arrow/python/pyarrow/_dataset.pyx
new file mode 100644
index 000000000..459c3b8fb
--- /dev/null
+++ b/src/arrow/python/pyarrow/_dataset.pyx
@@ -0,0 +1,3408 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+"""Dataset is currently unstable. APIs subject to change without notice."""
+
+from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE
+from cython.operator cimport dereference as deref
+
+import collections
+import os
+import warnings
+
+import pyarrow as pa
+from pyarrow.lib cimport *
+from pyarrow.lib import ArrowTypeError, frombytes, tobytes
+from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
+from pyarrow._csv cimport (
+ ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
+from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
+
+from pyarrow._parquet cimport (
+ _create_writer_properties, _create_arrow_writer_properties,
+ FileMetaData, RowGroupMetaData, ColumnChunkMetaData
+)
+
+
+def _forbid_instantiation(klass, subclasses_instead=True):
+ msg = '{} is an abstract class thus cannot be initialized.'.format(
+ klass.__name__
+ )
+ if subclasses_instead:
+ subclasses = [cls.__name__ for cls in klass.__subclasses__]
+ msg += ' Use one of the subclasses instead: {}'.format(
+ ', '.join(subclasses)
+ )
+ raise TypeError(msg)
+
+
+_orc_fileformat = None
+_orc_imported = False
+
+
+def _get_orc_fileformat():
+ """
+ Import OrcFileFormat on first usage (to avoid circular import issue
+ when `pyarrow._dataset_orc` would be imported first)
+ """
+ global _orc_fileformat
+ global _orc_imported
+ if not _orc_imported:
+ try:
+ from pyarrow._dataset_orc import OrcFileFormat
+ _orc_fileformat = OrcFileFormat
+ except ImportError as e:
+ _orc_fileformat = None
+ finally:
+ _orc_imported = True
+ return _orc_fileformat
+
+
+cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
+
+ cdef:
+ CFileSource c_source
+ shared_ptr[CFileSystem] c_filesystem
+ c_string c_path
+ shared_ptr[CRandomAccessFile] c_file
+ shared_ptr[CBuffer] c_buffer
+
+ if isinstance(file, Buffer):
+ c_buffer = pyarrow_unwrap_buffer(file)
+ c_source = CFileSource(move(c_buffer))
+
+ elif _is_path_like(file):
+ if filesystem is None:
+ raise ValueError("cannot construct a FileSource from "
+ "a path without a FileSystem")
+ c_filesystem = filesystem.unwrap()
+ c_path = tobytes(_stringify_path(file))
+ c_source = CFileSource(move(c_path), move(c_filesystem))
+
+ elif hasattr(file, 'read'):
+ # Optimistically hope this is file-like
+ c_file = get_native_file(file, False).get_random_access_file()
+ c_source = CFileSource(move(c_file))
+
+ else:
+ raise TypeError("cannot construct a FileSource "
+ "from " + str(file))
+
+ return c_source
+
+
+cdef CSegmentEncoding _get_segment_encoding(str segment_encoding):
+ if segment_encoding == "none":
+ return CSegmentEncodingNone
+ elif segment_encoding == "uri":
+ return CSegmentEncodingUri
+ raise ValueError(f"Unknown segment encoding: {segment_encoding}")
+
+
+cdef class Expression(_Weakrefable):
+ """
+ A logical expression to be evaluated against some input.
+
+ To create an expression:
+
+ - Use the factory function ``pyarrow.dataset.scalar()`` to create a
+ scalar (not necessary when combined, see example below).
+ - Use the factory function ``pyarrow.dataset.field()`` to reference
+ a field (column in table).
+ - Compare fields and scalars with ``<``, ``<=``, ``==``, ``>=``, ``>``.
+ - Combine expressions using python operators ``&`` (logical and),
+ ``|`` (logical or) and ``~`` (logical not).
+ Note: python keywords ``and``, ``or`` and ``not`` cannot be used
+ to combine expressions.
+ - Check whether the expression is contained in a list of values with
+ the ``pyarrow.dataset.Expression.isin()`` member function.
+
+ Examples
+ --------
+
+ >>> import pyarrow.dataset as ds
+ >>> (ds.field("a") < ds.scalar(3)) | (ds.field("b") > 7)
+ <pyarrow.dataset.Expression ((a < 3:int64) or (b > 7:int64))>
+ >>> ds.field('a') != 3
+ <pyarrow.dataset.Expression (a != 3)>
+ >>> ds.field('a').isin([1, 2, 3])
+ <pyarrow.dataset.Expression (a is in [
+ 1,
+ 2,
+ 3
+ ])>
+ """
+ cdef:
+ CExpression expr
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const CExpression& sp):
+ self.expr = sp
+
+ @staticmethod
+ cdef wrap(const CExpression& sp):
+ cdef Expression self = Expression.__new__(Expression)
+ self.init(sp)
+ return self
+
+ cdef inline CExpression unwrap(self):
+ return self.expr
+
+ def equals(self, Expression other):
+ return self.expr.Equals(other.unwrap())
+
+ def __str__(self):
+ return frombytes(self.expr.ToString())
+
+ def __repr__(self):
+ return "<pyarrow.dataset.{0} {1}>".format(
+ self.__class__.__name__, str(self)
+ )
+
+ @staticmethod
+ def _deserialize(Buffer buffer not None):
+ return Expression.wrap(GetResultValue(CDeserializeExpression(
+ pyarrow_unwrap_buffer(buffer))))
+
+ def __reduce__(self):
+ buffer = pyarrow_wrap_buffer(GetResultValue(
+ CSerializeExpression(self.expr)))
+ return Expression._deserialize, (buffer,)
+
+ @staticmethod
+ cdef Expression _expr_or_scalar(object expr):
+ if isinstance(expr, Expression):
+ return (<Expression> expr)
+ return (<Expression> Expression._scalar(expr))
+
+ @staticmethod
+ cdef Expression _call(str function_name, list arguments,
+ shared_ptr[CFunctionOptions] options=(
+ <shared_ptr[CFunctionOptions]> nullptr)):
+ cdef:
+ vector[CExpression] c_arguments
+
+ for argument in arguments:
+ c_arguments.push_back((<Expression> argument).expr)
+
+ return Expression.wrap(CMakeCallExpression(tobytes(function_name),
+ move(c_arguments), options))
+
+ def __richcmp__(self, other, int op):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call({
+ Py_EQ: "equal",
+ Py_NE: "not_equal",
+ Py_GT: "greater",
+ Py_GE: "greater_equal",
+ Py_LT: "less",
+ Py_LE: "less_equal",
+ }[op], [self, other])
+
+ def __bool__(self):
+ raise ValueError(
+ "An Expression cannot be evaluated to python True or False. "
+ "If you are using the 'and', 'or' or 'not' operators, use '&', "
+ "'|' or '~' instead."
+ )
+
+ def __invert__(self):
+ return Expression._call("invert", [self])
+
+ def __and__(Expression self, other):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call("and_kleene", [self, other])
+
+ def __or__(Expression self, other):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call("or_kleene", [self, other])
+
+ def __add__(Expression self, other):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call("add_checked", [self, other])
+
+ def __mul__(Expression self, other):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call("multiply_checked", [self, other])
+
+ def __sub__(Expression self, other):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call("subtract_checked", [self, other])
+
+ def __truediv__(Expression self, other):
+ other = Expression._expr_or_scalar(other)
+ return Expression._call("divide_checked", [self, other])
+
+ def is_valid(self):
+ """Checks whether the expression is not-null (valid)"""
+ return Expression._call("is_valid", [self])
+
+ def is_null(self, bint nan_is_null=False):
+ """Checks whether the expression is null"""
+ cdef:
+ shared_ptr[CFunctionOptions] c_options
+
+ c_options.reset(new CNullOptions(nan_is_null))
+ return Expression._call("is_null", [self], c_options)
+
+ def cast(self, type, bint safe=True):
+ """Explicitly change the expression's data type"""
+ cdef shared_ptr[CCastOptions] c_options
+ c_options.reset(new CCastOptions(safe))
+ c_options.get().to_type = pyarrow_unwrap_data_type(ensure_type(type))
+ return Expression._call("cast", [self],
+ <shared_ptr[CFunctionOptions]> c_options)
+
+ def isin(self, values):
+ """Checks whether the expression is contained in values"""
+ cdef:
+ shared_ptr[CFunctionOptions] c_options
+ CDatum c_values
+
+ if not isinstance(values, pa.Array):
+ values = pa.array(values)
+
+ c_values = CDatum(pyarrow_unwrap_array(values))
+ c_options.reset(new CSetLookupOptions(c_values, True))
+ return Expression._call("is_in", [self], c_options)
+
+ @staticmethod
+ def _field(str name not None):
+ return Expression.wrap(CMakeFieldExpression(tobytes(name)))
+
+ @staticmethod
+ def _scalar(value):
+ cdef:
+ Scalar scalar
+
+ if isinstance(value, Scalar):
+ scalar = value
+ else:
+ scalar = pa.scalar(value)
+
+ return Expression.wrap(CMakeScalarExpression(scalar.unwrap()))
+
+
+_deserialize = Expression._deserialize
+cdef Expression _true = Expression._scalar(True)
+
+
+cdef class Dataset(_Weakrefable):
+ """
+ Collection of data fragments and potentially child datasets.
+
+ Arrow Datasets allow you to query against data that has been split across
+ multiple files. This sharding of data may indicate partitioning, which
+ can accelerate queries that only touch some partitions (files).
+ """
+
+ cdef:
+ shared_ptr[CDataset] wrapped
+ CDataset* dataset
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CDataset]& sp):
+ self.wrapped = sp
+ self.dataset = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CDataset]& sp):
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'union': UnionDataset,
+ 'filesystem': FileSystemDataset,
+ 'in-memory': InMemoryDataset,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+
+ cdef Dataset self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ cdef shared_ptr[CDataset] unwrap(self) nogil:
+ return self.wrapped
+
+ @property
+ def partition_expression(self):
+ """
+ An Expression which evaluates to true for all data viewed by this
+ Dataset.
+ """
+ return Expression.wrap(self.dataset.partition_expression())
+
+ def replace_schema(self, Schema schema not None):
+ """
+ Return a copy of this Dataset with a different schema.
+
+ The copy will view the same Fragments. If the new schema is not
+ compatible with the original dataset's schema then an error will
+ be raised.
+ """
+ cdef shared_ptr[CDataset] copy = GetResultValue(
+ self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)))
+ return Dataset.wrap(move(copy))
+
+ def get_fragments(self, Expression filter=None):
+ """Returns an iterator over the fragments in this dataset.
+
+ Parameters
+ ----------
+ filter : Expression, default None
+ Return fragments matching the optional filter, either using the
+ partition_expression or internal information like Parquet's
+ statistics.
+
+ Returns
+ -------
+ fragments : iterator of Fragment
+ """
+ cdef:
+ CExpression c_filter
+ CFragmentIterator c_iterator
+
+ if filter is None:
+ c_fragments = move(GetResultValue(self.dataset.GetFragments()))
+ else:
+ c_filter = _bind(filter, self.schema)
+ c_fragments = move(GetResultValue(
+ self.dataset.GetFragments(c_filter)))
+
+ for maybe_fragment in c_fragments:
+ yield Fragment.wrap(GetResultValue(move(maybe_fragment)))
+
+ def scanner(self, **kwargs):
+ """Builds a scan operation against the dataset.
+
+ Data is not loaded immediately. Instead, this produces a Scanner,
+ which exposes further operations (e.g. loading all data as a
+ table, counting rows).
+
+ Parameters
+ ----------
+ columns : list of str, default None
+ The columns to project. This can be a list of column names to
+ include (order and duplicates will be preserved), or a dictionary
+ with {new_column_name: expression} values for more advanced
+ projections.
+ The columns will be passed down to Datasets and corresponding data
+ fragments to avoid loading, copying, and deserializing columns
+ that will not be required further down the compute chain.
+ By default all of the available columns are projected. Raises
+ an exception if any of the referenced column names does not exist
+ in the dataset's Schema.
+ filter : Expression, default None
+ Scan will return only the rows matching the filter.
+ If possible the predicate will be pushed down to exploit the
+ partition information or internal metadata found in the data
+ source, e.g. Parquet statistics. Otherwise filters the loaded
+ RecordBatches before yielding them.
+ batch_size : int, default 1M
+ The maximum row count for scanned record batches. If scanned
+ record batches are overflowing memory then this method can be
+ called to reduce their size.
+ use_threads : bool, default True
+ If enabled, then maximum parallelism will be used determined by
+ the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
+
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required. If not specified, uses the
+ default pool.
+ fragment_scan_options : FragmentScanOptions, default None
+ Options specific to a particular scan and fragment type, which
+ can change between different scans of the same dataset.
+
+ Returns
+ -------
+ scanner : Scanner
+
+ Examples
+ --------
+ >>> import pyarrow.dataset as ds
+ >>> dataset = ds.dataset("path/to/dataset")
+
+ Selecting a subset of the columns:
+
+ >>> dataset.scanner(columns=["A", "B"]).to_table()
+
+ Projecting selected columns using an expression:
+
+ >>> dataset.scanner(columns={
+ ... "A_int": ds.field("A").cast("int64"),
+ ... }).to_table()
+
+ Filtering rows while scanning:
+
+ >>> dataset.scanner(filter=ds.field("A") > 0).to_table()
+ """
+ return Scanner.from_dataset(self, **kwargs)
+
+ def to_batches(self, **kwargs):
+ """Read the dataset as materialized record batches.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ record_batches : iterator of RecordBatch
+ """
+ return self.scanner(**kwargs).to_batches()
+
+ def to_table(self, **kwargs):
+ """Read the dataset to an arrow table.
+
+ Note that this method reads all the selected data from the dataset
+ into memory.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ table : Table instance
+ """
+ return self.scanner(**kwargs).to_table()
+
+ def take(self, object indices, **kwargs):
+ """Select rows of data by index.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ table : Table instance
+ """
+ return self.scanner(**kwargs).take(indices)
+
+ def head(self, int num_rows, **kwargs):
+ """Load the first N rows of the dataset.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ table : Table instance
+ """
+ return self.scanner(**kwargs).head(num_rows)
+
+ def count_rows(self, **kwargs):
+ """Count rows matching the scanner filter.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ count : int
+ """
+ return self.scanner(**kwargs).count_rows()
+
+ @property
+ def schema(self):
+ """The common schema of the full Dataset"""
+ return pyarrow_wrap_schema(self.dataset.schema())
+
+
+cdef class InMemoryDataset(Dataset):
+ """
+ A Dataset wrapping in-memory data.
+
+ Parameters
+ ----------
+ source : The data for this dataset.
+ Can be a RecordBatch, Table, list of
+ RecordBatch/Table, iterable of RecordBatch, or a RecordBatchReader.
+ If an iterable is provided, the schema must also be provided.
+ schema : Schema, optional
+ Only required if passing an iterable as the source.
+ """
+
+ cdef:
+ CInMemoryDataset* in_memory_dataset
+
+ def __init__(self, source, Schema schema=None):
+ cdef:
+ RecordBatchReader reader
+ shared_ptr[CInMemoryDataset] in_memory_dataset
+
+ if isinstance(source, (pa.RecordBatch, pa.Table)):
+ source = [source]
+
+ if isinstance(source, (list, tuple)):
+ batches = []
+ for item in source:
+ if isinstance(item, pa.RecordBatch):
+ batches.append(item)
+ elif isinstance(item, pa.Table):
+ batches.extend(item.to_batches())
+ else:
+ raise TypeError(
+ 'Expected a list of tables or batches. The given list '
+ 'contains a ' + type(item).__name__)
+ if schema is None:
+ schema = item.schema
+ elif not schema.equals(item.schema):
+ raise ArrowTypeError(
+ f'Item has schema\n{item.schema}\nwhich does not '
+ f'match expected schema\n{schema}')
+ if not batches and schema is None:
+ raise ValueError('Must provide schema to construct in-memory '
+ 'dataset from an empty list')
+ table = pa.Table.from_batches(batches, schema=schema)
+ in_memory_dataset = make_shared[CInMemoryDataset](
+ pyarrow_unwrap_table(table))
+ else:
+ raise TypeError(
+ 'Expected a table, batch, or list of tables/batches '
+ 'instead of the given type: ' +
+ type(source).__name__
+ )
+
+ self.init(<shared_ptr[CDataset]> in_memory_dataset)
+
+ cdef void init(self, const shared_ptr[CDataset]& sp):
+ Dataset.init(self, sp)
+ self.in_memory_dataset = <CInMemoryDataset*> sp.get()
+
+
+cdef class UnionDataset(Dataset):
+ """
+ A Dataset wrapping child datasets.
+
+ Children's schemas must agree with the provided schema.
+
+ Parameters
+ ----------
+ schema : Schema
+ A known schema to conform to.
+ children : list of Dataset
+ One or more input children
+ """
+
+ cdef:
+ CUnionDataset* union_dataset
+
+ def __init__(self, Schema schema not None, children):
+ cdef:
+ Dataset child
+ CDatasetVector c_children
+ shared_ptr[CUnionDataset] union_dataset
+
+ for child in children:
+ c_children.push_back(child.wrapped)
+
+ union_dataset = GetResultValue(CUnionDataset.Make(
+ pyarrow_unwrap_schema(schema), move(c_children)))
+ self.init(<shared_ptr[CDataset]> union_dataset)
+
+ cdef void init(self, const shared_ptr[CDataset]& sp):
+ Dataset.init(self, sp)
+ self.union_dataset = <CUnionDataset*> sp.get()
+
+ def __reduce__(self):
+ return UnionDataset, (self.schema, self.children)
+
+ @property
+ def children(self):
+ cdef CDatasetVector children = self.union_dataset.children()
+ return [Dataset.wrap(children[i]) for i in range(children.size())]
+
+
+cdef class FileSystemDataset(Dataset):
+ """
+ A Dataset of file fragments.
+
+ A FileSystemDataset is composed of one or more FileFragment.
+
+ Parameters
+ ----------
+ fragments : list[Fragments]
+ List of fragments to consume.
+ schema : Schema
+ The top-level schema of the Dataset.
+ format : FileFormat
+ File format of the fragments, currently only ParquetFileFormat,
+ IpcFileFormat, and CsvFileFormat are supported.
+ filesystem : FileSystem
+ FileSystem of the fragments.
+ root_partition : Expression, optional
+ The top-level partition of the DataDataset.
+ """
+
+ cdef:
+ CFileSystemDataset* filesystem_dataset
+
+ def __init__(self, fragments, Schema schema, FileFormat format,
+ FileSystem filesystem=None, root_partition=None):
+ cdef:
+ FileFragment fragment=None
+ vector[shared_ptr[CFileFragment]] c_fragments
+ CResult[shared_ptr[CDataset]] result
+ shared_ptr[CFileSystem] c_filesystem
+
+ if root_partition is None:
+ root_partition = _true
+ elif not isinstance(root_partition, Expression):
+ raise TypeError(
+ "Argument 'root_partition' has incorrect type (expected "
+ "Epression, got {0})".format(type(root_partition))
+ )
+
+ for fragment in fragments:
+ c_fragments.push_back(
+ static_pointer_cast[CFileFragment, CFragment](
+ fragment.unwrap()))
+
+ if filesystem is None:
+ filesystem = fragment.filesystem
+
+ if filesystem is not None:
+ c_filesystem = filesystem.unwrap()
+
+ result = CFileSystemDataset.Make(
+ pyarrow_unwrap_schema(schema),
+ (<Expression> root_partition).unwrap(),
+ format.unwrap(),
+ c_filesystem,
+ c_fragments
+ )
+ self.init(GetResultValue(result))
+
+ @property
+ def filesystem(self):
+ return FileSystem.wrap(self.filesystem_dataset.filesystem())
+
+ @property
+ def partitioning(self):
+ """
+ The partitioning of the Dataset source, if discovered.
+
+ If the FileSystemDataset is created using the ``dataset()`` factory
+ function with a partitioning specified, this will return the
+ finalized Partitioning object from the dataset discovery. In all
+ other cases, this returns None.
+ """
+ c_partitioning = self.filesystem_dataset.partitioning()
+ if c_partitioning.get() == nullptr:
+ return None
+ try:
+ return Partitioning.wrap(c_partitioning)
+ except TypeError:
+ # e.g. type_name "default"
+ return None
+
+ cdef void init(self, const shared_ptr[CDataset]& sp):
+ Dataset.init(self, sp)
+ self.filesystem_dataset = <CFileSystemDataset*> sp.get()
+
+ def __reduce__(self):
+ return FileSystemDataset, (
+ list(self.get_fragments()),
+ self.schema,
+ self.format,
+ self.filesystem,
+ self.partition_expression
+ )
+
+ @classmethod
+ def from_paths(cls, paths, schema=None, format=None,
+ filesystem=None, partitions=None, root_partition=None):
+ """A Dataset created from a list of paths on a particular filesystem.
+
+ Parameters
+ ----------
+ paths : list of str
+ List of file paths to create the fragments from.
+ schema : Schema
+ The top-level schema of the DataDataset.
+ format : FileFormat
+ File format to create fragments from, currently only
+ ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
+ filesystem : FileSystem
+ The filesystem which files are from.
+ partitions : List[Expression], optional
+ Attach additional partition information for the file paths.
+ root_partition : Expression, optional
+ The top-level partition of the DataDataset.
+ """
+ cdef:
+ FileFragment fragment
+
+ if root_partition is None:
+ root_partition = _true
+
+ for arg, class_, name in [
+ (schema, Schema, 'schema'),
+ (format, FileFormat, 'format'),
+ (filesystem, FileSystem, 'filesystem'),
+ (root_partition, Expression, 'root_partition')
+ ]:
+ if not isinstance(arg, class_):
+ raise TypeError(
+ "Argument '{0}' has incorrect type (expected {1}, "
+ "got {2})".format(name, class_.__name__, type(arg))
+ )
+
+ partitions = partitions or [_true] * len(paths)
+
+ if len(paths) != len(partitions):
+ raise ValueError(
+ 'The number of files resulting from paths_or_selector '
+ 'must be equal to the number of partitions.'
+ )
+
+ fragments = [
+ format.make_fragment(path, filesystem, partitions[i])
+ for i, path in enumerate(paths)
+ ]
+ return FileSystemDataset(fragments, schema, format,
+ filesystem, root_partition)
+
+ @property
+ def files(self):
+ """List of the files"""
+ cdef vector[c_string] files = self.filesystem_dataset.files()
+ return [frombytes(f) for f in files]
+
+ @property
+ def format(self):
+ """The FileFormat of this source."""
+ return FileFormat.wrap(self.filesystem_dataset.format())
+
+
+cdef CExpression _bind(Expression filter, Schema schema) except *:
+ assert schema is not None
+
+ if filter is None:
+ return _true.unwrap()
+
+ return GetResultValue(filter.unwrap().Bind(
+ deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef class FileWriteOptions(_Weakrefable):
+
+ cdef:
+ shared_ptr[CFileWriteOptions] wrapped
+ CFileWriteOptions* c_options
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
+ self.wrapped = sp
+ self.c_options = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CFileWriteOptions]& sp):
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'csv': CsvFileWriteOptions,
+ 'ipc': IpcFileWriteOptions,
+ 'parquet': ParquetFileWriteOptions,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+
+ cdef FileWriteOptions self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ @property
+ def format(self):
+ return FileFormat.wrap(self.c_options.format())
+
+ cdef inline shared_ptr[CFileWriteOptions] unwrap(self):
+ return self.wrapped
+
+
+cdef class FileFormat(_Weakrefable):
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CFileFormat]& sp):
+ self.wrapped = sp
+ self.format = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CFileFormat]& sp):
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'ipc': IpcFileFormat,
+ 'csv': CsvFileFormat,
+ 'parquet': ParquetFileFormat,
+ 'orc': _get_orc_fileformat(),
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+
+ cdef FileFormat self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CFileFormat] unwrap(self):
+ return self.wrapped
+
+ def inspect(self, file, filesystem=None):
+ """Infer the schema of a file."""
+ c_source = _make_file_source(file, filesystem)
+ c_schema = GetResultValue(self.format.Inspect(c_source))
+ return pyarrow_wrap_schema(move(c_schema))
+
+ def make_fragment(self, file, filesystem=None,
+ Expression partition_expression=None):
+ """
+ Make a FileFragment of this FileFormat. The filter may not reference
+ fields absent from the provided schema. If no schema is provided then
+ one will be inferred.
+ """
+ if partition_expression is None:
+ partition_expression = _true
+
+ c_source = _make_file_source(file, filesystem)
+ c_fragment = <shared_ptr[CFragment]> GetResultValue(
+ self.format.MakeFragment(move(c_source),
+ partition_expression.unwrap(),
+ <shared_ptr[CSchema]>nullptr))
+ return Fragment.wrap(move(c_fragment))
+
+ def make_write_options(self):
+ return FileWriteOptions.wrap(self.format.DefaultWriteOptions())
+
+ @property
+ def default_extname(self):
+ return frombytes(self.format.type_name())
+
+ @property
+ def default_fragment_scan_options(self):
+ return FragmentScanOptions.wrap(
+ self.wrapped.get().default_fragment_scan_options)
+
+ @default_fragment_scan_options.setter
+ def default_fragment_scan_options(self, FragmentScanOptions options):
+ if options is None:
+ self.wrapped.get().default_fragment_scan_options =\
+ <shared_ptr[CFragmentScanOptions]>nullptr
+ else:
+ self._set_default_fragment_scan_options(options)
+
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ raise ValueError(f"Cannot set fragment scan options for "
+ f"'{options.type_name}' on {self.__class__.__name__}")
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+
+cdef class Fragment(_Weakrefable):
+ """Fragment of data from a Dataset."""
+
+ cdef:
+ shared_ptr[CFragment] wrapped
+ CFragment* fragment
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CFragment]& sp):
+ self.wrapped = sp
+ self.fragment = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CFragment]& sp):
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ # IpcFileFormat and CsvFileFormat do not have corresponding
+ # subclasses of FileFragment
+ 'ipc': FileFragment,
+ 'csv': FileFragment,
+ 'parquet': ParquetFileFragment,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ class_ = Fragment
+
+ cdef Fragment self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CFragment] unwrap(self):
+ return self.wrapped
+
+ @property
+ def physical_schema(self):
+ """Return the physical schema of this Fragment. This schema can be
+ different from the dataset read schema."""
+ cdef:
+ CResult[shared_ptr[CSchema]] maybe_schema
+ with nogil:
+ maybe_schema = self.fragment.ReadPhysicalSchema()
+ return pyarrow_wrap_schema(GetResultValue(maybe_schema))
+
+ @property
+ def partition_expression(self):
+ """An Expression which evaluates to true for all data viewed by this
+ Fragment.
+ """
+ return Expression.wrap(self.fragment.partition_expression())
+
+ def scanner(self, Schema schema=None, **kwargs):
+ """Builds a scan operation against the dataset.
+
+ Data is not loaded immediately. Instead, this produces a Scanner,
+ which exposes further operations (e.g. loading all data as a
+ table, counting rows).
+
+ Parameters
+ ----------
+ schema : Schema
+ Schema to use for scanning. This is used to unify a Fragment to
+ it's Dataset's schema. If not specified this will use the
+ Fragment's physical schema which might differ for each Fragment.
+ columns : list of str, default None
+ The columns to project. This can be a list of column names to
+ include (order and duplicates will be preserved), or a dictionary
+ with {new_column_name: expression} values for more advanced
+ projections.
+ The columns will be passed down to Datasets and corresponding data
+ fragments to avoid loading, copying, and deserializing columns
+ that will not be required further down the compute chain.
+ By default all of the available columns are projected. Raises
+ an exception if any of the referenced column names does not exist
+ in the dataset's Schema.
+ filter : Expression, default None
+ Scan will return only the rows matching the filter.
+ If possible the predicate will be pushed down to exploit the
+ partition information or internal metadata found in the data
+ source, e.g. Parquet statistics. Otherwise filters the loaded
+ RecordBatches before yielding them.
+ batch_size : int, default 1M
+ The maximum row count for scanned record batches. If scanned
+ record batches are overflowing memory then this method can be
+ called to reduce their size.
+ use_threads : bool, default True
+ If enabled, then maximum parallelism will be used determined by
+ the number of available CPU cores.
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required. If not specified, uses the
+ default pool.
+ fragment_scan_options : FragmentScanOptions, default None
+ Options specific to a particular scan and fragment type, which
+ can change between different scans of the same dataset.
+
+ Returns
+ -------
+ scanner : Scanner
+
+ """
+ return Scanner.from_fragment(self, schema=schema, **kwargs)
+
+ def to_batches(self, Schema schema=None, **kwargs):
+ """Read the fragment as materialized record batches.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ record_batches : iterator of RecordBatch
+ """
+ return self.scanner(schema=schema, **kwargs).to_batches()
+
+ def to_table(self, Schema schema=None, **kwargs):
+ """Convert this Fragment into a Table.
+
+ Use this convenience utility with care. This will serially materialize
+ the Scan result in memory before creating the Table.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ table : Table
+ """
+ return self.scanner(schema=schema, **kwargs).to_table()
+
+ def take(self, object indices, **kwargs):
+ """Select rows of data by index.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ table : Table instance
+ """
+ return self.scanner(**kwargs).take(indices)
+
+ def head(self, int num_rows, **kwargs):
+ """Load the first N rows of the fragment.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ table : Table instance
+ """
+ return self.scanner(**kwargs).head(num_rows)
+
+ def count_rows(self, **kwargs):
+ """Count rows matching the scanner filter.
+
+ See scanner method parameters documentation.
+
+ Returns
+ -------
+ count : int
+ """
+ return self.scanner(**kwargs).count_rows()
+
+
+cdef class FileFragment(Fragment):
+ """A Fragment representing a data file."""
+
+ cdef:
+ CFileFragment* file_fragment
+
+ cdef void init(self, const shared_ptr[CFragment]& sp):
+ Fragment.init(self, sp)
+ self.file_fragment = <CFileFragment*> sp.get()
+
+ def __repr__(self):
+ type_name = frombytes(self.fragment.type_name())
+ if type_name != "parquet":
+ typ = f" type={type_name}"
+ else:
+ # parquet has a subclass -> type embedded in class name
+ typ = ""
+ partition_dict = _get_partition_keys(self.partition_expression)
+ partition = ", ".join(
+ [f"{key}={val}" for key, val in partition_dict.items()]
+ )
+ if partition:
+ partition = f" partition=[{partition}]"
+ return "<pyarrow.dataset.{0}{1} path={2}{3}>".format(
+ self.__class__.__name__, typ, self.path, partition
+ )
+
+ def __reduce__(self):
+ buffer = self.buffer
+ return self.format.make_fragment, (
+ self.path if buffer is None else buffer,
+ self.filesystem,
+ self.partition_expression
+ )
+
+ @property
+ def path(self):
+ """
+ The path of the data file viewed by this fragment, if it views a
+ file. If instead it views a buffer, this will be "<Buffer>".
+ """
+ return frombytes(self.file_fragment.source().path())
+
+ @property
+ def filesystem(self):
+ """
+ The FileSystem containing the data file viewed by this fragment, if
+ it views a file. If instead it views a buffer, this will be None.
+ """
+ cdef:
+ shared_ptr[CFileSystem] c_fs
+ c_fs = self.file_fragment.source().filesystem()
+
+ if c_fs.get() == nullptr:
+ return None
+
+ return FileSystem.wrap(c_fs)
+
+ @property
+ def buffer(self):
+ """
+ The buffer viewed by this fragment, if it views a buffer. If
+ instead it views a file, this will be None.
+ """
+ cdef:
+ shared_ptr[CBuffer] c_buffer
+ c_buffer = self.file_fragment.source().buffer()
+
+ if c_buffer.get() == nullptr:
+ return None
+
+ return pyarrow_wrap_buffer(c_buffer)
+
+ @property
+ def format(self):
+ """
+ The format of the data file viewed by this fragment.
+ """
+ return FileFormat.wrap(self.file_fragment.format())
+
+
+class RowGroupInfo:
+ """
+ A wrapper class for RowGroup information
+
+ Parameters
+ ----------
+ id : the group id.
+ metadata : the rowgroup metadata.
+ schema : schema of the rows.
+ """
+
+ def __init__(self, id, metadata, schema):
+ self.id = id
+ self.metadata = metadata
+ self.schema = schema
+
+ @property
+ def num_rows(self):
+ return self.metadata.num_rows
+
+ @property
+ def total_byte_size(self):
+ return self.metadata.total_byte_size
+
+ @property
+ def statistics(self):
+ def name_stats(i):
+ col = self.metadata.column(i)
+
+ stats = col.statistics
+ if stats is None or not stats.has_min_max:
+ return None, None
+
+ name = col.path_in_schema
+ field_index = self.schema.get_field_index(name)
+ if field_index < 0:
+ return None, None
+
+ typ = self.schema.field(field_index).type
+ return col.path_in_schema, {
+ 'min': pa.scalar(stats.min, type=typ).as_py(),
+ 'max': pa.scalar(stats.max, type=typ).as_py()
+ }
+
+ return {
+ name: stats for name, stats
+ in map(name_stats, range(self.metadata.num_columns))
+ if stats is not None
+ }
+
+ def __repr__(self):
+ return "RowGroupInfo({})".format(self.id)
+
+ def __eq__(self, other):
+ if isinstance(other, int):
+ return self.id == other
+ if not isinstance(other, RowGroupInfo):
+ return False
+ return self.id == other.id
+
+
+cdef class FragmentScanOptions(_Weakrefable):
+ """Scan options specific to a particular fragment and scan operation."""
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ self.wrapped = sp
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CFragmentScanOptions]& sp):
+ if not sp:
+ return None
+
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'csv': CsvFragmentScanOptions,
+ 'parquet': ParquetFragmentScanOptions,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+
+ cdef FragmentScanOptions self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ @property
+ def type_name(self):
+ return frombytes(self.wrapped.get().type_name())
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+
+cdef class ParquetFileFragment(FileFragment):
+ """A Fragment representing a parquet file."""
+
+ cdef:
+ CParquetFileFragment* parquet_file_fragment
+
+ cdef void init(self, const shared_ptr[CFragment]& sp):
+ FileFragment.init(self, sp)
+ self.parquet_file_fragment = <CParquetFileFragment*> sp.get()
+
+ def __reduce__(self):
+ buffer = self.buffer
+ row_groups = [row_group.id for row_group in self.row_groups]
+ return self.format.make_fragment, (
+ self.path if buffer is None else buffer,
+ self.filesystem,
+ self.partition_expression,
+ row_groups
+ )
+
+ def ensure_complete_metadata(self):
+ """
+ Ensure that all metadata (statistics, physical schema, ...) have
+ been read and cached in this fragment.
+ """
+ check_status(self.parquet_file_fragment.EnsureCompleteMetadata())
+
+ @property
+ def row_groups(self):
+ metadata = self.metadata
+ cdef vector[int] row_groups = self.parquet_file_fragment.row_groups()
+ return [RowGroupInfo(i, metadata.row_group(i), self.physical_schema)
+ for i in row_groups]
+
+ @property
+ def metadata(self):
+ self.ensure_complete_metadata()
+ cdef FileMetaData metadata = FileMetaData()
+ metadata.init(self.parquet_file_fragment.metadata())
+ return metadata
+
+ @property
+ def num_row_groups(self):
+ """
+ Return the number of row groups viewed by this fragment (not the
+ number of row groups in the origin file).
+ """
+ self.ensure_complete_metadata()
+ return self.parquet_file_fragment.row_groups().size()
+
+ def split_by_row_group(self, Expression filter=None,
+ Schema schema=None):
+ """
+ Split the fragment into multiple fragments.
+
+ Yield a Fragment wrapping each row group in this ParquetFileFragment.
+ Row groups will be excluded whose metadata contradicts the optional
+ filter.
+
+ Parameters
+ ----------
+ filter : Expression, default None
+ Only include the row groups which satisfy this predicate (using
+ the Parquet RowGroup statistics).
+ schema : Schema, default None
+ Schema to use when filtering row groups. Defaults to the
+ Fragment's phsyical schema
+
+ Returns
+ -------
+ A list of Fragments
+ """
+ cdef:
+ vector[shared_ptr[CFragment]] c_fragments
+ CExpression c_filter
+ shared_ptr[CFragment] c_fragment
+
+ schema = schema or self.physical_schema
+ c_filter = _bind(filter, schema)
+ with nogil:
+ c_fragments = move(GetResultValue(
+ self.parquet_file_fragment.SplitByRowGroup(move(c_filter))))
+
+ return [Fragment.wrap(c_fragment) for c_fragment in c_fragments]
+
+ def subset(self, Expression filter=None, Schema schema=None,
+ object row_group_ids=None):
+ """
+ Create a subset of the fragment (viewing a subset of the row groups).
+
+ Subset can be specified by either a filter predicate (with optional
+ schema) or by a list of row group IDs. Note that when using a filter,
+ the resulting fragment can be empty (viewing no row groups).
+
+ Parameters
+ ----------
+ filter : Expression, default None
+ Only include the row groups which satisfy this predicate (using
+ the Parquet RowGroup statistics).
+ schema : Schema, default None
+ Schema to use when filtering row groups. Defaults to the
+ Fragment's phsyical schema
+ row_group_ids : list of ints
+ The row group IDs to include in the subset. Can only be specified
+ if `filter` is None.
+
+ Returns
+ -------
+ ParquetFileFragment
+ """
+ cdef:
+ CExpression c_filter
+ vector[int] c_row_group_ids
+ shared_ptr[CFragment] c_fragment
+
+ if filter is not None and row_group_ids is not None:
+ raise ValueError(
+ "Cannot specify both 'filter' and 'row_group_ids'."
+ )
+
+ if filter is not None:
+ schema = schema or self.physical_schema
+ c_filter = _bind(filter, schema)
+ with nogil:
+ c_fragment = move(GetResultValue(
+ self.parquet_file_fragment.SubsetWithFilter(
+ move(c_filter))))
+ elif row_group_ids is not None:
+ c_row_group_ids = [
+ <int> row_group for row_group in sorted(set(row_group_ids))
+ ]
+ with nogil:
+ c_fragment = move(GetResultValue(
+ self.parquet_file_fragment.SubsetWithIds(
+ move(c_row_group_ids))))
+ else:
+ raise ValueError(
+ "Need to specify one of 'filter' or 'row_group_ids'"
+ )
+
+ return Fragment.wrap(c_fragment)
+
+
+cdef class ParquetReadOptions(_Weakrefable):
+ """
+ Parquet format specific options for reading.
+
+ Parameters
+ ----------
+ dictionary_columns : list of string, default None
+ Names of columns which should be dictionary encoded as
+ they are read.
+ coerce_int96_timestamp_unit : str, default None.
+ Cast timestamps that are stored in INT96 format to a particular
+ resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
+ and therefore INT96 timestamps will be infered as timestamps
+ in nanoseconds.
+ """
+
+ cdef public:
+ set dictionary_columns
+ TimeUnit _coerce_int96_timestamp_unit
+
+ # Also see _PARQUET_READ_OPTIONS
+ def __init__(self, dictionary_columns=None,
+ coerce_int96_timestamp_unit=None):
+ self.dictionary_columns = set(dictionary_columns or set())
+ self.coerce_int96_timestamp_unit = coerce_int96_timestamp_unit
+
+ @property
+ def coerce_int96_timestamp_unit(self):
+ return timeunit_to_string(self._coerce_int96_timestamp_unit)
+
+ @coerce_int96_timestamp_unit.setter
+ def coerce_int96_timestamp_unit(self, unit):
+ if unit is not None:
+ self._coerce_int96_timestamp_unit = string_to_timeunit(unit)
+ else:
+ self._coerce_int96_timestamp_unit = TimeUnit_NANO
+
+ def equals(self, ParquetReadOptions other):
+ return (self.dictionary_columns == other.dictionary_columns and
+ self.coerce_int96_timestamp_unit ==
+ other.coerce_int96_timestamp_unit)
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+ def __repr__(self):
+ return (
+ f"<ParquetReadOptions"
+ f" dictionary_columns={self.dictionary_columns}"
+ f" coerce_int96_timestamp_unit={self.coerce_int96_timestamp_unit}>"
+ )
+
+
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+ cdef:
+ CParquetFileWriteOptions* parquet_options
+ object _properties
+
+ def update(self, **kwargs):
+ arrow_fields = {
+ "use_deprecated_int96_timestamps",
+ "coerce_timestamps",
+ "allow_truncated_timestamps",
+ }
+
+ setters = set()
+ for name, value in kwargs.items():
+ if name not in self._properties:
+ raise TypeError("unexpected parquet write option: " + name)
+ self._properties[name] = value
+ if name in arrow_fields:
+ setters.add(self._set_arrow_properties)
+ else:
+ setters.add(self._set_properties)
+
+ for setter in setters:
+ setter()
+
+ def _set_properties(self):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ opts.writer_properties = _create_writer_properties(
+ use_dictionary=self._properties["use_dictionary"],
+ compression=self._properties["compression"],
+ version=self._properties["version"],
+ write_statistics=self._properties["write_statistics"],
+ data_page_size=self._properties["data_page_size"],
+ compression_level=self._properties["compression_level"],
+ use_byte_stream_split=(
+ self._properties["use_byte_stream_split"]
+ ),
+ data_page_version=self._properties["data_page_version"],
+ )
+
+ def _set_arrow_properties(self):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ opts.arrow_writer_properties = _create_arrow_writer_properties(
+ use_deprecated_int96_timestamps=(
+ self._properties["use_deprecated_int96_timestamps"]
+ ),
+ coerce_timestamps=self._properties["coerce_timestamps"],
+ allow_truncated_timestamps=(
+ self._properties["allow_truncated_timestamps"]
+ ),
+ writer_engine_version="V2",
+ use_compliant_nested_type=(
+ self._properties["use_compliant_nested_type"]
+ )
+ )
+
+ cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
+ FileWriteOptions.init(self, sp)
+ self.parquet_options = <CParquetFileWriteOptions*> sp.get()
+ self._properties = dict(
+ use_dictionary=True,
+ compression="snappy",
+ version="1.0",
+ write_statistics=None,
+ data_page_size=None,
+ compression_level=None,
+ use_byte_stream_split=False,
+ data_page_version="1.0",
+ use_deprecated_int96_timestamps=False,
+ coerce_timestamps=None,
+ allow_truncated_timestamps=False,
+ use_compliant_nested_type=False,
+ )
+ self._set_properties()
+ self._set_arrow_properties()
+
+
+cdef set _PARQUET_READ_OPTIONS = {
+ 'dictionary_columns', 'coerce_int96_timestamp_unit'
+}
+
+
+cdef class ParquetFileFormat(FileFormat):
+ """
+ FileFormat for Parquet
+
+ Parameters
+ ----------
+ read_options : ParquetReadOptions
+ Read options for the file.
+ default_fragment_scan_options : ParquetFragmentScanOptions
+ Scan Options for the file.
+ **kwargs : dict
+ Additional options for read option or scan option.
+ """
+
+ cdef:
+ CParquetFileFormat* parquet_format
+
+ def __init__(self, read_options=None,
+ default_fragment_scan_options=None, **kwargs):
+ cdef:
+ shared_ptr[CParquetFileFormat] wrapped
+ CParquetFileFormatReaderOptions* options
+
+ # Read/scan options
+ read_options_args = {option: kwargs[option] for option in kwargs
+ if option in _PARQUET_READ_OPTIONS}
+ scan_args = {option: kwargs[option] for option in kwargs
+ if option not in _PARQUET_READ_OPTIONS}
+ if read_options and read_options_args:
+ duplicates = ', '.join(sorted(read_options_args))
+ raise ValueError(f'If `read_options` is given, '
+ f'cannot specify {duplicates}')
+ if default_fragment_scan_options and scan_args:
+ duplicates = ', '.join(sorted(scan_args))
+ raise ValueError(f'If `default_fragment_scan_options` is given, '
+ f'cannot specify {duplicates}')
+
+ if read_options is None:
+ read_options = ParquetReadOptions(**read_options_args)
+ elif isinstance(read_options, dict):
+ # For backwards compatibility
+ duplicates = []
+ for option, value in read_options.items():
+ if option in _PARQUET_READ_OPTIONS:
+ read_options_args[option] = value
+ else:
+ duplicates.append(option)
+ scan_args[option] = value
+ if duplicates:
+ duplicates = ", ".join(duplicates)
+ warnings.warn(f'The scan options {duplicates} should be '
+ 'specified directly as keyword arguments')
+ read_options = ParquetReadOptions(**read_options_args)
+ elif not isinstance(read_options, ParquetReadOptions):
+ raise TypeError('`read_options` must be either a dictionary or an '
+ 'instance of ParquetReadOptions')
+
+ if default_fragment_scan_options is None:
+ default_fragment_scan_options = ParquetFragmentScanOptions(
+ **scan_args)
+ elif isinstance(default_fragment_scan_options, dict):
+ default_fragment_scan_options = ParquetFragmentScanOptions(
+ **default_fragment_scan_options)
+ elif not isinstance(default_fragment_scan_options,
+ ParquetFragmentScanOptions):
+ raise TypeError('`default_fragment_scan_options` must be either a '
+ 'dictionary or an instance of '
+ 'ParquetFragmentScanOptions')
+
+ wrapped = make_shared[CParquetFileFormat]()
+ options = &(wrapped.get().reader_options)
+ if read_options.dictionary_columns is not None:
+ for column in read_options.dictionary_columns:
+ options.dict_columns.insert(tobytes(column))
+ options.coerce_int96_timestamp_unit = \
+ read_options._coerce_int96_timestamp_unit
+
+ self.init(<shared_ptr[CFileFormat]> wrapped)
+ self.default_fragment_scan_options = default_fragment_scan_options
+
+ cdef void init(self, const shared_ptr[CFileFormat]& sp):
+ FileFormat.init(self, sp)
+ self.parquet_format = <CParquetFileFormat*> sp.get()
+
+ @property
+ def read_options(self):
+ cdef CParquetFileFormatReaderOptions* options
+ options = &self.parquet_format.reader_options
+ parquet_read_options = ParquetReadOptions(
+ dictionary_columns={frombytes(col)
+ for col in options.dict_columns},
+ )
+ # Read options getter/setter works with strings so setting
+ # the private property which uses the C Type
+ parquet_read_options._coerce_int96_timestamp_unit = \
+ options.coerce_int96_timestamp_unit
+ return parquet_read_options
+
+ def make_write_options(self, **kwargs):
+ opts = FileFormat.make_write_options(self)
+ (<ParquetFileWriteOptions> opts).update(**kwargs)
+ return opts
+
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ if options.type_name == 'parquet':
+ self.parquet_format.default_fragment_scan_options = options.wrapped
+ else:
+ super()._set_default_fragment_scan_options(options)
+
+ def equals(self, ParquetFileFormat other):
+ return (
+ self.read_options.equals(other.read_options) and
+ self.default_fragment_scan_options ==
+ other.default_fragment_scan_options
+ )
+
+ def __reduce__(self):
+ return ParquetFileFormat, (self.read_options,
+ self.default_fragment_scan_options)
+
+ def __repr__(self):
+ return f"<ParquetFileFormat read_options={self.read_options}>"
+
+ def make_fragment(self, file, filesystem=None,
+ Expression partition_expression=None, row_groups=None):
+ cdef:
+ vector[int] c_row_groups
+
+ if partition_expression is None:
+ partition_expression = _true
+
+ if row_groups is None:
+ return super().make_fragment(file, filesystem,
+ partition_expression)
+
+ c_source = _make_file_source(file, filesystem)
+ c_row_groups = [<int> row_group for row_group in set(row_groups)]
+
+ c_fragment = <shared_ptr[CFragment]> GetResultValue(
+ self.parquet_format.MakeFragment(move(c_source),
+ partition_expression.unwrap(),
+ <shared_ptr[CSchema]>nullptr,
+ move(c_row_groups)))
+ return Fragment.wrap(move(c_fragment))
+
+
+cdef class ParquetFragmentScanOptions(FragmentScanOptions):
+ """
+ Scan-specific options for Parquet fragments.
+
+ Parameters
+ ----------
+ use_buffered_stream : bool, default False
+ Read files through buffered input streams rather than loading entire
+ row groups at once. This may be enabled to reduce memory overhead.
+ Disabled by default.
+ buffer_size : int, default 8192
+ Size of buffered stream, if enabled. Default is 8KB.
+ pre_buffer : bool, default False
+ If enabled, pre-buffer the raw Parquet data instead of issuing one
+ read per column chunk. This can improve performance on high-latency
+ filesystems.
+ enable_parallel_column_conversion : bool, default False
+ EXPERIMENTAL: Parallelize conversion across columns. This option is
+ ignored if a scan is already parallelized across input files to avoid
+ thread contention. This option will be removed after support is added
+ for simultaneous parallelization across files and columns.
+ """
+
+ cdef:
+ CParquetFragmentScanOptions* parquet_options
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, bint use_buffered_stream=False,
+ buffer_size=8192,
+ bint pre_buffer=False,
+ bint enable_parallel_column_conversion=False):
+ self.init(shared_ptr[CFragmentScanOptions](
+ new CParquetFragmentScanOptions()))
+ self.use_buffered_stream = use_buffered_stream
+ self.buffer_size = buffer_size
+ self.pre_buffer = pre_buffer
+ self.enable_parallel_column_conversion = \
+ enable_parallel_column_conversion
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ FragmentScanOptions.init(self, sp)
+ self.parquet_options = <CParquetFragmentScanOptions*> sp.get()
+
+ cdef CReaderProperties* reader_properties(self):
+ return self.parquet_options.reader_properties.get()
+
+ cdef ArrowReaderProperties* arrow_reader_properties(self):
+ return self.parquet_options.arrow_reader_properties.get()
+
+ @property
+ def use_buffered_stream(self):
+ return self.reader_properties().is_buffered_stream_enabled()
+
+ @use_buffered_stream.setter
+ def use_buffered_stream(self, bint use_buffered_stream):
+ if use_buffered_stream:
+ self.reader_properties().enable_buffered_stream()
+ else:
+ self.reader_properties().disable_buffered_stream()
+
+ @property
+ def buffer_size(self):
+ return self.reader_properties().buffer_size()
+
+ @buffer_size.setter
+ def buffer_size(self, buffer_size):
+ if buffer_size <= 0:
+ raise ValueError("Buffer size must be larger than zero")
+ self.reader_properties().set_buffer_size(buffer_size)
+
+ @property
+ def pre_buffer(self):
+ return self.arrow_reader_properties().pre_buffer()
+
+ @pre_buffer.setter
+ def pre_buffer(self, bint pre_buffer):
+ self.arrow_reader_properties().set_pre_buffer(pre_buffer)
+
+ @property
+ def enable_parallel_column_conversion(self):
+ return self.parquet_options.enable_parallel_column_conversion
+
+ @enable_parallel_column_conversion.setter
+ def enable_parallel_column_conversion(
+ self, bint enable_parallel_column_conversion):
+ self.parquet_options.enable_parallel_column_conversion = \
+ enable_parallel_column_conversion
+
+ def equals(self, ParquetFragmentScanOptions other):
+ return (
+ self.use_buffered_stream == other.use_buffered_stream and
+ self.buffer_size == other.buffer_size and
+ self.pre_buffer == other.pre_buffer and
+ self.enable_parallel_column_conversion ==
+ other.enable_parallel_column_conversion
+ )
+
+ def __reduce__(self):
+ return ParquetFragmentScanOptions, (
+ self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+ self.enable_parallel_column_conversion
+ )
+
+
+cdef class IpcFileWriteOptions(FileWriteOptions):
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+
+cdef class IpcFileFormat(FileFormat):
+
+ def __init__(self):
+ self.init(shared_ptr[CFileFormat](new CIpcFileFormat()))
+
+ def equals(self, IpcFileFormat other):
+ return True
+
+ @property
+ def default_extname(self):
+ return "feather"
+
+ def __reduce__(self):
+ return IpcFileFormat, tuple()
+
+
+cdef class CsvFileFormat(FileFormat):
+ """
+ FileFormat for CSV files.
+
+ Parameters
+ ----------
+ parse_options : ParseOptions
+ Options regarding CSV parsing.
+ convert_options : ConvertOptions
+ Options regarding value conversion.
+ read_options : ReadOptions
+ General read options.
+ default_fragment_scan_options : CsvFragmentScanOptions
+ Default options for fragments scan.
+ """
+ cdef:
+ CCsvFileFormat* csv_format
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, ParseOptions parse_options=None,
+ default_fragment_scan_options=None,
+ ConvertOptions convert_options=None,
+ ReadOptions read_options=None):
+ self.init(shared_ptr[CFileFormat](new CCsvFileFormat()))
+ if parse_options is not None:
+ self.parse_options = parse_options
+ if convert_options is not None or read_options is not None:
+ if default_fragment_scan_options:
+ raise ValueError('If `default_fragment_scan_options` is '
+ 'given, cannot specify convert_options '
+ 'or read_options')
+ self.default_fragment_scan_options = CsvFragmentScanOptions(
+ convert_options=convert_options, read_options=read_options)
+ elif isinstance(default_fragment_scan_options, dict):
+ self.default_fragment_scan_options = CsvFragmentScanOptions(
+ **default_fragment_scan_options)
+ elif isinstance(default_fragment_scan_options, CsvFragmentScanOptions):
+ self.default_fragment_scan_options = default_fragment_scan_options
+ elif default_fragment_scan_options is not None:
+ raise TypeError('`default_fragment_scan_options` must be either '
+ 'a dictionary or an instance of '
+ 'CsvFragmentScanOptions')
+
+ cdef void init(self, const shared_ptr[CFileFormat]& sp):
+ FileFormat.init(self, sp)
+ self.csv_format = <CCsvFileFormat*> sp.get()
+
+ def make_write_options(self, **kwargs):
+ cdef CsvFileWriteOptions opts = \
+ <CsvFileWriteOptions> FileFormat.make_write_options(self)
+ opts.write_options = WriteOptions(**kwargs)
+ return opts
+
+ @property
+ def parse_options(self):
+ return ParseOptions.wrap(self.csv_format.parse_options)
+
+ @parse_options.setter
+ def parse_options(self, ParseOptions parse_options not None):
+ self.csv_format.parse_options = deref(parse_options.options)
+
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ if options.type_name == 'csv':
+ self.csv_format.default_fragment_scan_options = options.wrapped
+ else:
+ super()._set_default_fragment_scan_options(options)
+
+ def equals(self, CsvFileFormat other):
+ return (
+ self.parse_options.equals(other.parse_options) and
+ self.default_fragment_scan_options ==
+ other.default_fragment_scan_options)
+
+ def __reduce__(self):
+ return CsvFileFormat, (self.parse_options,
+ self.default_fragment_scan_options)
+
+ def __repr__(self):
+ return f"<CsvFileFormat parse_options={self.parse_options}>"
+
+
+cdef class CsvFragmentScanOptions(FragmentScanOptions):
+ """
+ Scan-specific options for CSV fragments.
+
+ Parameters
+ ----------
+ convert_options : ConvertOptions
+ Options regarding value conversion.
+ read_options : ReadOptions
+ General read options.
+ """
+
+ cdef:
+ CCsvFragmentScanOptions* csv_options
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, ConvertOptions convert_options=None,
+ ReadOptions read_options=None):
+ self.init(shared_ptr[CFragmentScanOptions](
+ new CCsvFragmentScanOptions()))
+ if convert_options is not None:
+ self.convert_options = convert_options
+ if read_options is not None:
+ self.read_options = read_options
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ FragmentScanOptions.init(self, sp)
+ self.csv_options = <CCsvFragmentScanOptions*> sp.get()
+
+ @property
+ def convert_options(self):
+ return ConvertOptions.wrap(self.csv_options.convert_options)
+
+ @convert_options.setter
+ def convert_options(self, ConvertOptions convert_options not None):
+ self.csv_options.convert_options = deref(convert_options.options)
+
+ @property
+ def read_options(self):
+ return ReadOptions.wrap(self.csv_options.read_options)
+
+ @read_options.setter
+ def read_options(self, ReadOptions read_options not None):
+ self.csv_options.read_options = deref(read_options.options)
+
+ def equals(self, CsvFragmentScanOptions other):
+ return (
+ other and
+ self.convert_options.equals(other.convert_options) and
+ self.read_options.equals(other.read_options))
+
+ def __reduce__(self):
+ return CsvFragmentScanOptions, (self.convert_options,
+ self.read_options)
+
+
+cdef class CsvFileWriteOptions(FileWriteOptions):
+ cdef:
+ CCsvFileWriteOptions* csv_options
+ object _properties
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ @property
+ def write_options(self):
+ return WriteOptions.wrap(deref(self.csv_options.write_options))
+
+ @write_options.setter
+ def write_options(self, WriteOptions write_options not None):
+ self.csv_options.write_options.reset(
+ new CCSVWriteOptions(deref(write_options.options)))
+
+ cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
+ FileWriteOptions.init(self, sp)
+ self.csv_options = <CCsvFileWriteOptions*> sp.get()
+
+
+cdef class Partitioning(_Weakrefable):
+
+ cdef:
+ shared_ptr[CPartitioning] wrapped
+ CPartitioning* partitioning
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef init(self, const shared_ptr[CPartitioning]& sp):
+ self.wrapped = sp
+ self.partitioning = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CPartitioning]& sp):
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'directory': DirectoryPartitioning,
+ 'hive': HivePartitioning,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+
+ cdef Partitioning self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CPartitioning] unwrap(self):
+ return self.wrapped
+
+ def parse(self, path):
+ cdef CResult[CExpression] result
+ result = self.partitioning.Parse(tobytes(path))
+ return Expression.wrap(GetResultValue(result))
+
+ @property
+ def schema(self):
+ """The arrow Schema attached to the partitioning."""
+ return pyarrow_wrap_schema(self.partitioning.schema())
+
+
+cdef class PartitioningFactory(_Weakrefable):
+
+ cdef:
+ shared_ptr[CPartitioningFactory] wrapped
+ CPartitioningFactory* factory
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef init(self, const shared_ptr[CPartitioningFactory]& sp):
+ self.wrapped = sp
+ self.factory = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CPartitioningFactory]& sp):
+ cdef PartitioningFactory self = PartitioningFactory.__new__(
+ PartitioningFactory
+ )
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CPartitioningFactory] unwrap(self):
+ return self.wrapped
+
+ @property
+ def type_name(self):
+ return frombytes(self.factory.type_name())
+
+
+cdef vector[shared_ptr[CArray]] _partitioning_dictionaries(
+ Schema schema, dictionaries) except *:
+ cdef:
+ vector[shared_ptr[CArray]] c_dictionaries
+
+ dictionaries = dictionaries or {}
+
+ for field in schema:
+ dictionary = dictionaries.get(field.name)
+
+ if (isinstance(field.type, pa.DictionaryType) and
+ dictionary is not None):
+ c_dictionaries.push_back(pyarrow_unwrap_array(dictionary))
+ else:
+ c_dictionaries.push_back(<shared_ptr[CArray]> nullptr)
+
+ return c_dictionaries
+
+
+cdef class DirectoryPartitioning(Partitioning):
+ """
+ A Partitioning based on a specified Schema.
+
+ The DirectoryPartitioning expects one segment in the file path for each
+ field in the schema (all fields are required to be present).
+ For example given schema<year:int16, month:int8> the path "/2009/11" would
+ be parsed to ("year"_ == 2009 and "month"_ == 11).
+
+ Parameters
+ ----------
+ schema : Schema
+ The schema that describes the partitions present in the file path.
+ dictionaries : Dict[str, Array]
+ If the type of any field of `schema` is a dictionary type, the
+ corresponding entry of `dictionaries` must be an array containing
+ every value which may be taken by the corresponding column or an
+ error will be raised in parsing.
+ segment_encoding : str, default "uri"
+ After splitting paths into segments, decode the segments. Valid
+ values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+ Returns
+ -------
+ DirectoryPartitioning
+
+ Examples
+ --------
+ >>> from pyarrow.dataset import DirectoryPartitioning
+ >>> partition = DirectoryPartitioning(
+ ... pa.schema([("year", pa.int16()), ("month", pa.int8())]))
+ >>> print(partitioning.parse("/2009/11"))
+ ((year == 2009:int16) and (month == 11:int8))
+ """
+
+ cdef:
+ CDirectoryPartitioning* directory_partitioning
+
+ def __init__(self, Schema schema not None, dictionaries=None,
+ segment_encoding="uri"):
+ cdef:
+ shared_ptr[CDirectoryPartitioning] c_partitioning
+ CKeyValuePartitioningOptions c_options
+
+ c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+ c_partitioning = make_shared[CDirectoryPartitioning](
+ pyarrow_unwrap_schema(schema),
+ _partitioning_dictionaries(schema, dictionaries),
+ c_options,
+ )
+ self.init(<shared_ptr[CPartitioning]> c_partitioning)
+
+ cdef init(self, const shared_ptr[CPartitioning]& sp):
+ Partitioning.init(self, sp)
+ self.directory_partitioning = <CDirectoryPartitioning*> sp.get()
+
+ @staticmethod
+ def discover(field_names=None, infer_dictionary=False,
+ max_partition_dictionary_size=0,
+ schema=None, segment_encoding="uri"):
+ """
+ Discover a DirectoryPartitioning.
+
+ Parameters
+ ----------
+ field_names : list of str
+ The names to associate with the values from the subdirectory names.
+ If schema is given, will be populated from the schema.
+ infer_dictionary : bool, default False
+ When inferring a schema for partition fields, yield dictionary
+ encoded types instead of plain types. This can be more efficient
+ when materializing virtual columns, and Expressions parsed by the
+ finished Partitioning will include dictionaries of all unique
+ inspected values for each field.
+ max_partition_dictionary_size : int, default 0
+ Synonymous with infer_dictionary for backwards compatibility with
+ 1.0: setting this to -1 or None is equivalent to passing
+ infer_dictionary=True.
+ schema : Schema, default None
+ Use this schema instead of inferring a schema from partition
+ values. Partition values will be validated against this schema
+ before accumulation into the Partitioning's dictionary.
+ segment_encoding : str, default "uri"
+ After splitting paths into segments, decode the segments. Valid
+ values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+ Returns
+ -------
+ PartitioningFactory
+ To be used in the FileSystemFactoryOptions.
+ """
+ cdef:
+ CPartitioningFactoryOptions c_options
+ vector[c_string] c_field_names
+
+ if max_partition_dictionary_size in {-1, None}:
+ infer_dictionary = True
+ elif max_partition_dictionary_size != 0:
+ raise NotImplementedError("max_partition_dictionary_size must be "
+ "0, -1, or None")
+
+ if infer_dictionary:
+ c_options.infer_dictionary = True
+
+ if schema:
+ c_options.schema = pyarrow_unwrap_schema(schema)
+ c_field_names = [tobytes(f.name) for f in schema]
+ elif not field_names:
+ raise ValueError(
+ "Neither field_names nor schema was passed; "
+ "cannot infer field_names")
+ else:
+ c_field_names = [tobytes(s) for s in field_names]
+
+ c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+
+ return PartitioningFactory.wrap(
+ CDirectoryPartitioning.MakeFactory(c_field_names, c_options))
+
+ @property
+ def dictionaries(self):
+ """
+ The unique values for each partition field, if available.
+
+ Those values are only available if the Partitioning object was
+ created through dataset discovery from a PartitioningFactory, or
+ if the dictionaries were manually specified in the constructor.
+ If not available, this returns None.
+ """
+ cdef vector[shared_ptr[CArray]] c_arrays
+ c_arrays = self.directory_partitioning.dictionaries()
+ res = []
+ for arr in c_arrays:
+ if arr.get() == nullptr:
+ # Partitioning object has not been created through
+ # inspected Factory
+ return None
+ res.append(pyarrow_wrap_array(arr))
+ return res
+
+
+cdef class HivePartitioning(Partitioning):
+ """
+ A Partitioning for "/$key=$value/" nested directories as found in
+ Apache Hive.
+
+ Multi-level, directory based partitioning scheme originating from
+ Apache Hive with all data files stored in the leaf directories. Data is
+ partitioned by static values of a particular column in the schema.
+ Partition keys are represented in the form $key=$value in directory names.
+ Field order is ignored, as are missing or unrecognized field names.
+
+ For example, given schema<year:int16, month:int8, day:int8>, a possible
+ path would be "/year=2009/month=11/day=15".
+
+ Parameters
+ ----------
+ schema : Schema
+ The schema that describes the partitions present in the file path.
+ dictionaries : Dict[str, Array]
+ If the type of any field of `schema` is a dictionary type, the
+ corresponding entry of `dictionaries` must be an array containing
+ every value which may be taken by the corresponding column or an
+ error will be raised in parsing.
+ null_fallback : str, default "__HIVE_DEFAULT_PARTITION__"
+ If any field is None then this fallback will be used as a label
+ segment_encoding : str, default "uri"
+ After splitting paths into segments, decode the segments. Valid
+ values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+ Returns
+ -------
+ HivePartitioning
+
+ Examples
+ --------
+ >>> from pyarrow.dataset import HivePartitioning
+ >>> partitioning = HivePartitioning(
+ ... pa.schema([("year", pa.int16()), ("month", pa.int8())]))
+ >>> print(partitioning.parse("/year=2009/month=11"))
+ ((year == 2009:int16) and (month == 11:int8))
+
+ """
+
+ cdef:
+ CHivePartitioning* hive_partitioning
+
+ def __init__(self,
+ Schema schema not None,
+ dictionaries=None,
+ null_fallback="__HIVE_DEFAULT_PARTITION__",
+ segment_encoding="uri"):
+
+ cdef:
+ shared_ptr[CHivePartitioning] c_partitioning
+ CHivePartitioningOptions c_options
+
+ c_options.null_fallback = tobytes(null_fallback)
+ c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+
+ c_partitioning = make_shared[CHivePartitioning](
+ pyarrow_unwrap_schema(schema),
+ _partitioning_dictionaries(schema, dictionaries),
+ c_options,
+ )
+ self.init(<shared_ptr[CPartitioning]> c_partitioning)
+
+ cdef init(self, const shared_ptr[CPartitioning]& sp):
+ Partitioning.init(self, sp)
+ self.hive_partitioning = <CHivePartitioning*> sp.get()
+
+ @staticmethod
+ def discover(infer_dictionary=False,
+ max_partition_dictionary_size=0,
+ null_fallback="__HIVE_DEFAULT_PARTITION__",
+ schema=None,
+ segment_encoding="uri"):
+ """
+ Discover a HivePartitioning.
+
+ Parameters
+ ----------
+ infer_dictionary : bool, default False
+ When inferring a schema for partition fields, yield dictionary
+ encoded types instead of plain. This can be more efficient when
+ materializing virtual columns, and Expressions parsed by the
+ finished Partitioning will include dictionaries of all unique
+ inspected values for each field.
+ max_partition_dictionary_size : int, default 0
+ Synonymous with infer_dictionary for backwards compatibility with
+ 1.0: setting this to -1 or None is equivalent to passing
+ infer_dictionary=True.
+ null_fallback : str, default "__HIVE_DEFAULT_PARTITION__"
+ When inferring a schema for partition fields this value will be
+ replaced by null. The default is set to __HIVE_DEFAULT_PARTITION__
+ for compatibility with Spark
+ schema : Schema, default None
+ Use this schema instead of inferring a schema from partition
+ values. Partition values will be validated against this schema
+ before accumulation into the Partitioning's dictionary.
+ segment_encoding : str, default "uri"
+ After splitting paths into segments, decode the segments. Valid
+ values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+ Returns
+ -------
+ PartitioningFactory
+ To be used in the FileSystemFactoryOptions.
+ """
+ cdef:
+ CHivePartitioningFactoryOptions c_options
+
+ if max_partition_dictionary_size in {-1, None}:
+ infer_dictionary = True
+ elif max_partition_dictionary_size != 0:
+ raise NotImplementedError("max_partition_dictionary_size must be "
+ "0, -1, or None")
+
+ if infer_dictionary:
+ c_options.infer_dictionary = True
+
+ c_options.null_fallback = tobytes(null_fallback)
+
+ if schema:
+ c_options.schema = pyarrow_unwrap_schema(schema)
+
+ c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+
+ return PartitioningFactory.wrap(
+ CHivePartitioning.MakeFactory(c_options))
+
+ @property
+ def dictionaries(self):
+ """
+ The unique values for each partition field, if available.
+
+ Those values are only available if the Partitioning object was
+ created through dataset discovery from a PartitioningFactory, or
+ if the dictionaries were manually specified in the constructor.
+ If not available, this returns None.
+ """
+ cdef vector[shared_ptr[CArray]] c_arrays
+ c_arrays = self.hive_partitioning.dictionaries()
+ res = []
+ for arr in c_arrays:
+ if arr.get() == nullptr:
+ # Partitioning object has not been created through
+ # inspected Factory
+ return None
+ res.append(pyarrow_wrap_array(arr))
+ return res
+
+
+cdef class DatasetFactory(_Weakrefable):
+ """
+ DatasetFactory is used to create a Dataset, inspect the Schema
+ of the fragments contained in it, and declare a partitioning.
+ """
+
+ cdef:
+ shared_ptr[CDatasetFactory] wrapped
+ CDatasetFactory* factory
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef init(self, const shared_ptr[CDatasetFactory]& sp):
+ self.wrapped = sp
+ self.factory = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CDatasetFactory]& sp):
+ cdef DatasetFactory self = \
+ DatasetFactory.__new__(DatasetFactory)
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CDatasetFactory] unwrap(self) nogil:
+ return self.wrapped
+
+ @property
+ def root_partition(self):
+ return Expression.wrap(self.factory.root_partition())
+
+ @root_partition.setter
+ def root_partition(self, Expression expr):
+ check_status(self.factory.SetRootPartition(expr.unwrap()))
+
+ def inspect_schemas(self):
+ cdef CResult[vector[shared_ptr[CSchema]]] result
+ cdef CInspectOptions options
+ with nogil:
+ result = self.factory.InspectSchemas(options)
+
+ schemas = []
+ for s in GetResultValue(result):
+ schemas.append(pyarrow_wrap_schema(s))
+ return schemas
+
+ def inspect(self):
+ """
+ Inspect all data fragments and return a common Schema.
+
+ Returns
+ -------
+ Schema
+ """
+ cdef:
+ CInspectOptions options
+ CResult[shared_ptr[CSchema]] result
+ with nogil:
+ result = self.factory.Inspect(options)
+ return pyarrow_wrap_schema(GetResultValue(result))
+
+ def finish(self, Schema schema=None):
+ """
+ Create a Dataset using the inspected schema or an explicit schema
+ (if given).
+
+ Parameters
+ ----------
+ schema : Schema, default None
+ The schema to conform the source to. If None, the inspected
+ schema is used.
+
+ Returns
+ -------
+ Dataset
+ """
+ cdef:
+ shared_ptr[CSchema] sp_schema
+ CResult[shared_ptr[CDataset]] result
+
+ if schema is not None:
+ sp_schema = pyarrow_unwrap_schema(schema)
+ with nogil:
+ result = self.factory.FinishWithSchema(sp_schema)
+ else:
+ with nogil:
+ result = self.factory.Finish()
+
+ return Dataset.wrap(GetResultValue(result))
+
+
+cdef class FileSystemFactoryOptions(_Weakrefable):
+ """
+ Influences the discovery of filesystem paths.
+
+ Parameters
+ ----------
+ partition_base_dir : str, optional
+ For the purposes of applying the partitioning, paths will be
+ stripped of the partition_base_dir. Files not matching the
+ partition_base_dir prefix will be skipped for partitioning discovery.
+ The ignored files will still be part of the Dataset, but will not
+ have partition information.
+ partitioning : Partitioning/PartitioningFactory, optional
+ Apply the Partitioning to every discovered Fragment. See Partitioning or
+ PartitioningFactory documentation.
+ exclude_invalid_files : bool, optional (default True)
+ If True, invalid files will be excluded (file format specific check).
+ This will incur IO for each files in a serial and single threaded
+ fashion. Disabling this feature will skip the IO, but unsupported
+ files may be present in the Dataset (resulting in an error at scan
+ time).
+ selector_ignore_prefixes : list, optional
+ When discovering from a Selector (and not from an explicit file list),
+ ignore files and directories matching any of these prefixes.
+ By default this is ['.', '_'].
+ """
+
+ cdef:
+ CFileSystemFactoryOptions options
+
+ __slots__ = () # avoid mistakingly creating attributes
+
+ def __init__(self, partition_base_dir=None, partitioning=None,
+ exclude_invalid_files=None,
+ list selector_ignore_prefixes=None):
+ if isinstance(partitioning, PartitioningFactory):
+ self.partitioning_factory = partitioning
+ elif isinstance(partitioning, Partitioning):
+ self.partitioning = partitioning
+
+ if partition_base_dir is not None:
+ self.partition_base_dir = partition_base_dir
+ if exclude_invalid_files is not None:
+ self.exclude_invalid_files = exclude_invalid_files
+ if selector_ignore_prefixes is not None:
+ self.selector_ignore_prefixes = selector_ignore_prefixes
+
+ cdef inline CFileSystemFactoryOptions unwrap(self):
+ return self.options
+
+ @property
+ def partitioning(self):
+ """Partitioning to apply to discovered files.
+
+ NOTE: setting this property will overwrite partitioning_factory.
+ """
+ c_partitioning = self.options.partitioning.partitioning()
+ if c_partitioning.get() == nullptr:
+ return None
+ return Partitioning.wrap(c_partitioning)
+
+ @partitioning.setter
+ def partitioning(self, Partitioning value):
+ self.options.partitioning = (<Partitioning> value).unwrap()
+
+ @property
+ def partitioning_factory(self):
+ """PartitioningFactory to apply to discovered files and
+ discover a Partitioning.
+
+ NOTE: setting this property will overwrite partitioning.
+ """
+ c_factory = self.options.partitioning.factory()
+ if c_factory.get() == nullptr:
+ return None
+ return PartitioningFactory.wrap(c_factory)
+
+ @partitioning_factory.setter
+ def partitioning_factory(self, PartitioningFactory value):
+ self.options.partitioning = (<PartitioningFactory> value).unwrap()
+
+ @property
+ def partition_base_dir(self):
+ """
+ Base directory to strip paths before applying the partitioning.
+ """
+ return frombytes(self.options.partition_base_dir)
+
+ @partition_base_dir.setter
+ def partition_base_dir(self, value):
+ self.options.partition_base_dir = tobytes(value)
+
+ @property
+ def exclude_invalid_files(self):
+ """Whether to exclude invalid files."""
+ return self.options.exclude_invalid_files
+
+ @exclude_invalid_files.setter
+ def exclude_invalid_files(self, bint value):
+ self.options.exclude_invalid_files = value
+
+ @property
+ def selector_ignore_prefixes(self):
+ """
+ List of prefixes. Files matching one of those prefixes will be
+ ignored by the discovery process.
+ """
+ return [frombytes(p) for p in self.options.selector_ignore_prefixes]
+
+ @selector_ignore_prefixes.setter
+ def selector_ignore_prefixes(self, values):
+ self.options.selector_ignore_prefixes = [tobytes(v) for v in values]
+
+
+cdef class FileSystemDatasetFactory(DatasetFactory):
+ """
+ Create a DatasetFactory from a list of paths with schema inspection.
+
+ Parameters
+ ----------
+ filesystem : pyarrow.fs.FileSystem
+ Filesystem to discover.
+ paths_or_selector : pyarrow.fs.Selector or list of path-likes
+ Either a Selector object or a list of path-like objects.
+ format : FileFormat
+ Currently only ParquetFileFormat and IpcFileFormat are supported.
+ options : FileSystemFactoryOptions, optional
+ Various flags influencing the discovery of filesystem paths.
+ """
+
+ cdef:
+ CFileSystemDatasetFactory* filesystem_factory
+
+ def __init__(self, FileSystem filesystem not None, paths_or_selector,
+ FileFormat format not None,
+ FileSystemFactoryOptions options=None):
+ cdef:
+ vector[c_string] paths
+ CFileSelector c_selector
+ CResult[shared_ptr[CDatasetFactory]] result
+ shared_ptr[CFileSystem] c_filesystem
+ shared_ptr[CFileFormat] c_format
+ CFileSystemFactoryOptions c_options
+
+ options = options or FileSystemFactoryOptions()
+ c_options = options.unwrap()
+ c_filesystem = filesystem.unwrap()
+ c_format = format.unwrap()
+
+ if isinstance(paths_or_selector, FileSelector):
+ with nogil:
+ c_selector = (<FileSelector> paths_or_selector).selector
+ result = CFileSystemDatasetFactory.MakeFromSelector(
+ c_filesystem,
+ c_selector,
+ c_format,
+ c_options
+ )
+ elif isinstance(paths_or_selector, (list, tuple)):
+ paths = [tobytes(s) for s in paths_or_selector]
+ with nogil:
+ result = CFileSystemDatasetFactory.MakeFromPaths(
+ c_filesystem,
+ paths,
+ c_format,
+ c_options
+ )
+ else:
+ raise TypeError('Must pass either paths or a FileSelector, but '
+ 'passed {}'.format(type(paths_or_selector)))
+
+ self.init(GetResultValue(result))
+
+ cdef init(self, shared_ptr[CDatasetFactory]& sp):
+ DatasetFactory.init(self, sp)
+ self.filesystem_factory = <CFileSystemDatasetFactory*> sp.get()
+
+
+cdef class UnionDatasetFactory(DatasetFactory):
+ """
+ Provides a way to inspect/discover a Dataset's expected schema before
+ materialization.
+
+ Parameters
+ ----------
+ factories : list of DatasetFactory
+ """
+
+ cdef:
+ CUnionDatasetFactory* union_factory
+
+ def __init__(self, list factories):
+ cdef:
+ DatasetFactory factory
+ vector[shared_ptr[CDatasetFactory]] c_factories
+ for factory in factories:
+ c_factories.push_back(factory.unwrap())
+ self.init(GetResultValue(CUnionDatasetFactory.Make(c_factories)))
+
+ cdef init(self, const shared_ptr[CDatasetFactory]& sp):
+ DatasetFactory.init(self, sp)
+ self.union_factory = <CUnionDatasetFactory*> sp.get()
+
+
+cdef class ParquetFactoryOptions(_Weakrefable):
+ """
+ Influences the discovery of parquet dataset.
+
+ Parameters
+ ----------
+ partition_base_dir : str, optional
+ For the purposes of applying the partitioning, paths will be
+ stripped of the partition_base_dir. Files not matching the
+ partition_base_dir prefix will be skipped for partitioning discovery.
+ The ignored files will still be part of the Dataset, but will not
+ have partition information.
+ partitioning : Partitioning, PartitioningFactory, optional
+ The partitioning scheme applied to fragments, see ``Partitioning``.
+ validate_column_chunk_paths : bool, default False
+ Assert that all ColumnChunk paths are consistent. The parquet spec
+ allows for ColumnChunk data to be stored in multiple files, but
+ ParquetDatasetFactory supports only a single file with all ColumnChunk
+ data. If this flag is set construction of a ParquetDatasetFactory will
+ raise an error if ColumnChunk data is not resident in a single file.
+ """
+
+ cdef:
+ CParquetFactoryOptions options
+
+ __slots__ = () # avoid mistakingly creating attributes
+
+ def __init__(self, partition_base_dir=None, partitioning=None,
+ validate_column_chunk_paths=False):
+ if isinstance(partitioning, PartitioningFactory):
+ self.partitioning_factory = partitioning
+ elif isinstance(partitioning, Partitioning):
+ self.partitioning = partitioning
+
+ if partition_base_dir is not None:
+ self.partition_base_dir = partition_base_dir
+
+ self.options.validate_column_chunk_paths = validate_column_chunk_paths
+
+ cdef inline CParquetFactoryOptions unwrap(self):
+ return self.options
+
+ @property
+ def partitioning(self):
+ """Partitioning to apply to discovered files.
+
+ NOTE: setting this property will overwrite partitioning_factory.
+ """
+ c_partitioning = self.options.partitioning.partitioning()
+ if c_partitioning.get() == nullptr:
+ return None
+ return Partitioning.wrap(c_partitioning)
+
+ @partitioning.setter
+ def partitioning(self, Partitioning value):
+ self.options.partitioning = (<Partitioning> value).unwrap()
+
+ @property
+ def partitioning_factory(self):
+ """PartitioningFactory to apply to discovered files and
+ discover a Partitioning.
+
+ NOTE: setting this property will overwrite partitioning.
+ """
+ c_factory = self.options.partitioning.factory()
+ if c_factory.get() == nullptr:
+ return None
+ return PartitioningFactory.wrap(c_factory)
+
+ @partitioning_factory.setter
+ def partitioning_factory(self, PartitioningFactory value):
+ self.options.partitioning = (<PartitioningFactory> value).unwrap()
+
+ @property
+ def partition_base_dir(self):
+ """
+ Base directory to strip paths before applying the partitioning.
+ """
+ return frombytes(self.options.partition_base_dir)
+
+ @partition_base_dir.setter
+ def partition_base_dir(self, value):
+ self.options.partition_base_dir = tobytes(value)
+
+ @property
+ def validate_column_chunk_paths(self):
+ """
+ Base directory to strip paths before applying the partitioning.
+ """
+ return self.options.validate_column_chunk_paths
+
+ @validate_column_chunk_paths.setter
+ def validate_column_chunk_paths(self, value):
+ self.options.validate_column_chunk_paths = value
+
+
+cdef class ParquetDatasetFactory(DatasetFactory):
+ """
+ Create a ParquetDatasetFactory from a Parquet `_metadata` file.
+
+ Parameters
+ ----------
+ metadata_path : str
+ Path to the `_metadata` parquet metadata-only file generated with
+ `pyarrow.parquet.write_metadata`.
+ filesystem : pyarrow.fs.FileSystem
+ Filesystem to read the metadata_path from, and subsequent parquet
+ files.
+ format : ParquetFileFormat
+ Parquet format options.
+ options : ParquetFactoryOptions, optional
+ Various flags influencing the discovery of filesystem paths.
+ """
+
+ cdef:
+ CParquetDatasetFactory* parquet_factory
+
+ def __init__(self, metadata_path, FileSystem filesystem not None,
+ FileFormat format not None,
+ ParquetFactoryOptions options=None):
+ cdef:
+ c_string path
+ shared_ptr[CFileSystem] c_filesystem
+ shared_ptr[CParquetFileFormat] c_format
+ CResult[shared_ptr[CDatasetFactory]] result
+ CParquetFactoryOptions c_options
+
+ c_path = tobytes(metadata_path)
+ c_filesystem = filesystem.unwrap()
+ c_format = static_pointer_cast[CParquetFileFormat, CFileFormat](
+ format.unwrap())
+ options = options or ParquetFactoryOptions()
+ c_options = options.unwrap()
+
+ result = CParquetDatasetFactory.MakeFromMetaDataPath(
+ c_path, c_filesystem, c_format, c_options)
+ self.init(GetResultValue(result))
+
+ cdef init(self, shared_ptr[CDatasetFactory]& sp):
+ DatasetFactory.init(self, sp)
+ self.parquet_factory = <CParquetDatasetFactory*> sp.get()
+
+
+cdef class RecordBatchIterator(_Weakrefable):
+ """An iterator over a sequence of record batches."""
+ cdef:
+ # An object that must be kept alive with the iterator.
+ object iterator_owner
+ # Iterator is a non-POD type and Cython uses offsetof, leading
+ # to a compiler warning unless wrapped like so
+ shared_ptr[CRecordBatchIterator] iterator
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__, subclasses_instead=False)
+
+ @staticmethod
+ cdef wrap(object owner, CRecordBatchIterator iterator):
+ cdef RecordBatchIterator self = \
+ RecordBatchIterator.__new__(RecordBatchIterator)
+ self.iterator_owner = owner
+ self.iterator = make_shared[CRecordBatchIterator](move(iterator))
+ return self
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ cdef shared_ptr[CRecordBatch] record_batch
+ with nogil:
+ record_batch = GetResultValue(move(self.iterator.get().Next()))
+ if record_batch == NULL:
+ raise StopIteration
+ return pyarrow_wrap_batch(record_batch)
+
+
+class TaggedRecordBatch(collections.namedtuple(
+ "TaggedRecordBatch", ["record_batch", "fragment"])):
+ """
+ A combination of a record batch and the fragment it came from.
+
+ Parameters
+ ----------
+ record_batch : The record batch.
+ fragment : fragment of the record batch.
+ """
+
+
+cdef class TaggedRecordBatchIterator(_Weakrefable):
+ """An iterator over a sequence of record batches with fragments."""
+ cdef:
+ object iterator_owner
+ shared_ptr[CTaggedRecordBatchIterator] iterator
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__, subclasses_instead=False)
+
+ @staticmethod
+ cdef wrap(object owner, CTaggedRecordBatchIterator iterator):
+ cdef TaggedRecordBatchIterator self = \
+ TaggedRecordBatchIterator.__new__(TaggedRecordBatchIterator)
+ self.iterator_owner = owner
+ self.iterator = make_shared[CTaggedRecordBatchIterator](
+ move(iterator))
+ return self
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ cdef CTaggedRecordBatch batch
+ with nogil:
+ batch = GetResultValue(move(self.iterator.get().Next()))
+ if batch.record_batch == NULL:
+ raise StopIteration
+ return TaggedRecordBatch(
+ record_batch=pyarrow_wrap_batch(batch.record_batch),
+ fragment=Fragment.wrap(batch.fragment))
+
+
+_DEFAULT_BATCH_SIZE = 2**20
+
+
+cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr,
+ object columns=None, Expression filter=None,
+ int batch_size=_DEFAULT_BATCH_SIZE,
+ bint use_threads=True, bint use_async=False,
+ MemoryPool memory_pool=None,
+ FragmentScanOptions fragment_scan_options=None)\
+ except *:
+ cdef:
+ CScannerBuilder *builder
+ vector[CExpression] c_exprs
+
+ builder = ptr.get()
+
+ check_status(builder.Filter(_bind(
+ filter, pyarrow_wrap_schema(builder.schema()))))
+
+ if columns is not None:
+ if isinstance(columns, dict):
+ for expr in columns.values():
+ if not isinstance(expr, Expression):
+ raise TypeError(
+ "Expected an Expression for a 'column' dictionary "
+ "value, got {} instead".format(type(expr))
+ )
+ c_exprs.push_back((<Expression> expr).unwrap())
+
+ check_status(
+ builder.Project(c_exprs, [tobytes(c) for c in columns.keys()])
+ )
+ elif isinstance(columns, list):
+ check_status(builder.ProjectColumns([tobytes(c) for c in columns]))
+ else:
+ raise ValueError(
+ "Expected a list or a dict for 'columns', "
+ "got {} instead.".format(type(columns))
+ )
+
+ check_status(builder.BatchSize(batch_size))
+ check_status(builder.UseThreads(use_threads))
+ check_status(builder.UseAsync(use_async))
+ if memory_pool:
+ check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool)))
+ if fragment_scan_options:
+ check_status(
+ builder.FragmentScanOptions(fragment_scan_options.wrapped))
+
+
+cdef class Scanner(_Weakrefable):
+ """A materialized scan operation with context and options bound.
+
+ A scanner is the class that glues the scan tasks, data fragments and data
+ sources together.
+
+ Parameters
+ ----------
+ dataset : Dataset
+ Dataset to scan.
+ columns : list of str or dict, default None
+ The columns to project. This can be a list of column names to include
+ (order and duplicates will be preserved), or a dictionary with
+ {new_column_name: expression} values for more advanced projections.
+ The columns will be passed down to Datasets and corresponding data
+ fragments to avoid loading, copying, and deserializing columns
+ that will not be required further down the compute chain.
+ By default all of the available columns are projected. Raises
+ an exception if any of the referenced column names does not exist
+ in the dataset's Schema.
+ filter : Expression, default None
+ Scan will return only the rows matching the filter.
+ If possible the predicate will be pushed down to exploit the
+ partition information or internal metadata found in the data
+ source, e.g. Parquet statistics. Otherwise filters the loaded
+ RecordBatches before yielding them.
+ batch_size : int, default 1M
+ The maximum row count for scanned record batches. If scanned
+ record batches are overflowing memory then this method can be
+ called to reduce their size.
+ use_threads : bool, default True
+ If enabled, then maximum parallelism will be used determined by
+ the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required. If not specified, uses the
+ default pool.
+ """
+
+ cdef:
+ shared_ptr[CScanner] wrapped
+ CScanner* scanner
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CScanner]& sp):
+ self.wrapped = sp
+ self.scanner = sp.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CScanner]& sp):
+ cdef Scanner self = Scanner.__new__(Scanner)
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CScanner] unwrap(self):
+ return self.wrapped
+
+ @staticmethod
+ def from_dataset(Dataset dataset not None,
+ bint use_threads=True, bint use_async=False,
+ MemoryPool memory_pool=None,
+ object columns=None, Expression filter=None,
+ int batch_size=_DEFAULT_BATCH_SIZE,
+ FragmentScanOptions fragment_scan_options=None):
+ """
+ Create Scanner from Dataset,
+ refer to Scanner class doc for additional details on Scanner.
+
+ Parameters
+ ----------
+ dataset : Dataset
+ Dataset to scan.
+ columns : list of str or dict, default None
+ The columns to project.
+ filter : Expression, default None
+ Scan will return only the rows matching the filter.
+ batch_size : int, default 1M
+ The maximum row count for scanned record batches.
+ use_threads : bool, default True
+ If enabled, then maximum parallelism will be used determined by
+ the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required. If not specified, uses the
+ default pool.
+ fragment_scan_options : FragmentScanOptions
+ The fragment scan options.
+ """
+ cdef:
+ shared_ptr[CScanOptions] options = make_shared[CScanOptions]()
+ shared_ptr[CScannerBuilder] builder
+ shared_ptr[CScanner] scanner
+
+ builder = make_shared[CScannerBuilder](dataset.unwrap(), options)
+ _populate_builder(builder, columns=columns, filter=filter,
+ batch_size=batch_size, use_threads=use_threads,
+ use_async=use_async, memory_pool=memory_pool,
+ fragment_scan_options=fragment_scan_options)
+
+ scanner = GetResultValue(builder.get().Finish())
+ return Scanner.wrap(scanner)
+
+ @staticmethod
+ def from_fragment(Fragment fragment not None, Schema schema=None,
+ bint use_threads=True, bint use_async=False,
+ MemoryPool memory_pool=None,
+ object columns=None, Expression filter=None,
+ int batch_size=_DEFAULT_BATCH_SIZE,
+ FragmentScanOptions fragment_scan_options=None):
+ """
+ Create Scanner from Fragment,
+ refer to Scanner class doc for additional details on Scanner.
+
+ Parameters
+ ----------
+ fragment : Fragment
+ fragment to scan.
+ schema : Schema
+ The schema of the fragment.
+ columns : list of str or dict, default None
+ The columns to project.
+ filter : Expression, default None
+ Scan will return only the rows matching the filter.
+ batch_size : int, default 1M
+ The maximum row count for scanned record batches.
+ use_threads : bool, default True
+ If enabled, then maximum parallelism will be used determined by
+ the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required. If not specified, uses the
+ default pool.
+ fragment_scan_options : FragmentScanOptions
+ The fragment scan options.
+ """
+ cdef:
+ shared_ptr[CScanOptions] options = make_shared[CScanOptions]()
+ shared_ptr[CScannerBuilder] builder
+ shared_ptr[CScanner] scanner
+
+ schema = schema or fragment.physical_schema
+
+ builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema),
+ fragment.unwrap(), options)
+ _populate_builder(builder, columns=columns, filter=filter,
+ batch_size=batch_size, use_threads=use_threads,
+ use_async=use_async, memory_pool=memory_pool,
+ fragment_scan_options=fragment_scan_options)
+
+ scanner = GetResultValue(builder.get().Finish())
+ return Scanner.wrap(scanner)
+
+ @staticmethod
+ def from_batches(source, Schema schema=None, bint use_threads=True,
+ bint use_async=False,
+ MemoryPool memory_pool=None, object columns=None,
+ Expression filter=None,
+ int batch_size=_DEFAULT_BATCH_SIZE,
+ FragmentScanOptions fragment_scan_options=None):
+ """
+ Create a Scanner from an iterator of batches.
+
+ This creates a scanner which can be used only once. It is
+ intended to support writing a dataset (which takes a scanner)
+ from a source which can be read only once (e.g. a
+ RecordBatchReader or generator).
+
+ Parameters
+ ----------
+ source : Iterator
+ The iterator of Batches.
+ schema : Schema
+ The schema of the batches.
+ columns : list of str or dict, default None
+ The columns to project.
+ filter : Expression, default None
+ Scan will return only the rows matching the filter.
+ batch_size : int, default 1M
+ The maximum row count for scanned record batches.
+ use_threads : bool, default True
+ If enabled, then maximum parallelism will be used determined by
+ the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required. If not specified, uses the
+ default pool.
+ fragment_scan_options : FragmentScanOptions
+ The fragment scan options.
+ """
+ cdef:
+ shared_ptr[CScanOptions] options = make_shared[CScanOptions]()
+ shared_ptr[CScannerBuilder] builder
+ shared_ptr[CScanner] scanner
+ RecordBatchReader reader
+ if isinstance(source, pa.ipc.RecordBatchReader):
+ if schema:
+ raise ValueError('Cannot specify a schema when providing '
+ 'a RecordBatchReader')
+ reader = source
+ elif _is_iterable(source):
+ if schema is None:
+ raise ValueError('Must provide schema to construct scanner '
+ 'from an iterable')
+ reader = pa.ipc.RecordBatchReader.from_batches(schema, source)
+ else:
+ raise TypeError('Expected a RecordBatchReader or an iterable of '
+ 'batches instead of the given type: ' +
+ type(source).__name__)
+ builder = CScannerBuilder.FromRecordBatchReader(reader.reader)
+ _populate_builder(builder, columns=columns, filter=filter,
+ batch_size=batch_size, use_threads=use_threads,
+ use_async=use_async, memory_pool=memory_pool,
+ fragment_scan_options=fragment_scan_options)
+ scanner = GetResultValue(builder.get().Finish())
+ return Scanner.wrap(scanner)
+
+ @property
+ def dataset_schema(self):
+ """The schema with which batches will be read from fragments."""
+ return pyarrow_wrap_schema(
+ self.scanner.options().get().dataset_schema)
+
+ @property
+ def projected_schema(self):
+ """The materialized schema of the data, accounting for projections.
+
+ This is the schema of any data returned from the scanner.
+ """
+ return pyarrow_wrap_schema(
+ self.scanner.options().get().projected_schema)
+
+ def to_batches(self):
+ """Consume a Scanner in record batches.
+
+ Returns
+ -------
+ record_batches : iterator of RecordBatch
+ """
+ def _iterator(batch_iter):
+ for batch in batch_iter:
+ yield batch.record_batch
+ # Don't make ourselves a generator so errors are raised immediately
+ return _iterator(self.scan_batches())
+
+ def scan_batches(self):
+ """Consume a Scanner in record batches with corresponding fragments.
+
+ Returns
+ -------
+ record_batches : iterator of TaggedRecordBatch
+ """
+ cdef CTaggedRecordBatchIterator iterator
+ with nogil:
+ iterator = move(GetResultValue(self.scanner.ScanBatches()))
+ # Don't make ourselves a generator so errors are raised immediately
+ return TaggedRecordBatchIterator.wrap(self, move(iterator))
+
+ def to_table(self):
+ """Convert a Scanner into a Table.
+
+ Use this convenience utility with care. This will serially materialize
+ the Scan result in memory before creating the Table.
+
+ Returns
+ -------
+ table : Table
+ """
+ cdef CResult[shared_ptr[CTable]] result
+
+ with nogil:
+ result = self.scanner.ToTable()
+
+ return pyarrow_wrap_table(GetResultValue(result))
+
+ def take(self, object indices):
+ """Select rows of data by index.
+
+ Will only consume as many batches of the underlying dataset as
+ needed. Otherwise, this is equivalent to
+ ``to_table().take(indices)``.
+
+ Returns
+ -------
+ table : Table
+ """
+ cdef CResult[shared_ptr[CTable]] result
+ cdef shared_ptr[CArray] c_indices = pyarrow_unwrap_array(indices)
+ with nogil:
+ result = self.scanner.TakeRows(deref(c_indices))
+ return pyarrow_wrap_table(GetResultValue(result))
+
+ def head(self, int num_rows):
+ """Load the first N rows of the dataset.
+
+ Returns
+ -------
+ table : Table instance
+ """
+ cdef CResult[shared_ptr[CTable]] result
+ with nogil:
+ result = self.scanner.Head(num_rows)
+ return pyarrow_wrap_table(GetResultValue(result))
+
+ def count_rows(self):
+ """Count rows matching the scanner filter.
+
+ Returns
+ -------
+ count : int
+ """
+ cdef CResult[int64_t] result
+ with nogil:
+ result = self.scanner.CountRows()
+ return GetResultValue(result)
+
+ def to_reader(self):
+ """Consume this scanner as a RecordBatchReader."""
+ cdef RecordBatchReader reader
+ reader = RecordBatchReader.__new__(RecordBatchReader)
+ reader.reader = GetResultValue(self.scanner.ToRecordBatchReader())
+ return reader
+
+
+def _get_partition_keys(Expression partition_expression):
+ """
+ Extract partition keys (equality constraints between a field and a scalar)
+ from an expression as a dict mapping the field's name to its value.
+
+ NB: All expressions yielded by a HivePartitioning or DirectoryPartitioning
+ will be conjunctions of equality conditions and are accessible through this
+ function. Other subexpressions will be ignored.
+
+ For example, an expression of
+ <pyarrow.dataset.Expression ((part == A:string) and (year == 2016:int32))>
+ is converted to {'part': 'A', 'year': 2016}
+ """
+ cdef:
+ CExpression expr = partition_expression.unwrap()
+ pair[CFieldRef, CDatum] ref_val
+
+ out = {}
+ for ref_val in GetResultValue(CExtractKnownFieldValues(expr)).map:
+ assert ref_val.first.name() != nullptr
+ assert ref_val.second.kind() == DatumType_SCALAR
+ val = pyarrow_wrap_scalar(ref_val.second.scalar())
+ out[frombytes(deref(ref_val.first.name()))] = val.as_py()
+ return out
+
+
+ctypedef CParquetFileWriter* _CParquetFileWriterPtr
+
+cdef class WrittenFile(_Weakrefable):
+ """
+ Metadata information about files written as
+ part of a dataset write operation
+ """
+
+ """The full path to the created file"""
+ cdef public str path
+ """
+ If the file is a parquet file this will contain the parquet metadata.
+ This metadata will have the file path attribute set to the path of
+ the written file.
+ """
+ cdef public object metadata
+
+ def __init__(self, path, metadata):
+ self.path = path
+ self.metadata = metadata
+
+cdef void _filesystemdataset_write_visitor(
+ dict visit_args,
+ CFileWriter* file_writer):
+ cdef:
+ str path
+ str base_dir
+ WrittenFile written_file
+ FileMetaData parquet_metadata
+ CParquetFileWriter* parquet_file_writer
+
+ parquet_metadata = None
+ path = frombytes(deref(file_writer).destination().path)
+ if deref(deref(file_writer).format()).type_name() == b"parquet":
+ parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer)
+ with nogil:
+ metadata = deref(
+ deref(parquet_file_writer).parquet_writer()).metadata()
+ if metadata:
+ base_dir = frombytes(visit_args['base_dir'])
+ parquet_metadata = FileMetaData()
+ parquet_metadata.init(metadata)
+ parquet_metadata.set_file_path(os.path.relpath(path, base_dir))
+ written_file = WrittenFile(path, parquet_metadata)
+ visit_args['file_visitor'](written_file)
+
+
+def _filesystemdataset_write(
+ Scanner data not None,
+ object base_dir not None,
+ str basename_template not None,
+ FileSystem filesystem not None,
+ Partitioning partitioning not None,
+ FileWriteOptions file_options not None,
+ int max_partitions,
+ object file_visitor,
+ str existing_data_behavior not None
+):
+ """
+ CFileSystemDataset.Write wrapper
+ """
+ cdef:
+ CFileSystemDatasetWriteOptions c_options
+ shared_ptr[CScanner] c_scanner
+ vector[shared_ptr[CRecordBatch]] c_batches
+ dict visit_args
+
+ c_options.file_write_options = file_options.unwrap()
+ c_options.filesystem = filesystem.unwrap()
+ c_options.base_dir = tobytes(_stringify_path(base_dir))
+ c_options.partitioning = partitioning.unwrap()
+ c_options.max_partitions = max_partitions
+ c_options.basename_template = tobytes(basename_template)
+ if existing_data_behavior == 'error':
+ c_options.existing_data_behavior = ExistingDataBehavior_ERROR
+ elif existing_data_behavior == 'overwrite_or_ignore':
+ c_options.existing_data_behavior =\
+ ExistingDataBehavior_OVERWRITE_OR_IGNORE
+ elif existing_data_behavior == 'delete_matching':
+ c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING
+ else:
+ raise ValueError(
+ ("existing_data_behavior must be one of 'error', ",
+ "'overwrite_or_ignore' or 'delete_matching'")
+ )
+
+ if file_visitor is not None:
+ visit_args = {'base_dir': c_options.base_dir,
+ 'file_visitor': file_visitor}
+ # Need to use post_finish because parquet metadata is not available
+ # until after Finish has been called
+ c_options.writer_post_finish = BindFunction[cb_writer_finish_internal](
+ &_filesystemdataset_write_visitor, visit_args)
+
+ c_scanner = data.unwrap()
+ with nogil:
+ check_status(CFileSystemDataset.Write(c_options, c_scanner))