summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/dataset.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/dataset.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/dataset.py')
-rw-r--r--src/arrow/python/pyarrow/dataset.py881
1 files changed, 881 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/dataset.py b/src/arrow/python/pyarrow/dataset.py
new file mode 100644
index 000000000..42515a9f4
--- /dev/null
+++ b/src/arrow/python/pyarrow/dataset.py
@@ -0,0 +1,881 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Dataset is currently unstable. APIs subject to change without notice."""
+
+import pyarrow as pa
+from pyarrow.util import _is_iterable, _stringify_path, _is_path_like
+
+from pyarrow._dataset import ( # noqa
+ CsvFileFormat,
+ CsvFragmentScanOptions,
+ Expression,
+ Dataset,
+ DatasetFactory,
+ DirectoryPartitioning,
+ FileFormat,
+ FileFragment,
+ FileSystemDataset,
+ FileSystemDatasetFactory,
+ FileSystemFactoryOptions,
+ FileWriteOptions,
+ Fragment,
+ HivePartitioning,
+ IpcFileFormat,
+ IpcFileWriteOptions,
+ InMemoryDataset,
+ ParquetDatasetFactory,
+ ParquetFactoryOptions,
+ ParquetFileFormat,
+ ParquetFileFragment,
+ ParquetFileWriteOptions,
+ ParquetFragmentScanOptions,
+ ParquetReadOptions,
+ Partitioning,
+ PartitioningFactory,
+ RowGroupInfo,
+ Scanner,
+ TaggedRecordBatch,
+ UnionDataset,
+ UnionDatasetFactory,
+ _get_partition_keys,
+ _filesystemdataset_write,
+)
+
+_orc_available = False
+_orc_msg = (
+ "The pyarrow installation is not built with support for the ORC file "
+ "format."
+)
+
+try:
+ from pyarrow._dataset_orc import OrcFileFormat
+ _orc_available = True
+except ImportError:
+ pass
+
+
+def __getattr__(name):
+ if name == "OrcFileFormat" and not _orc_available:
+ raise ImportError(_orc_msg)
+
+ raise AttributeError(
+ "module 'pyarrow.dataset' has no attribute '{0}'".format(name)
+ )
+
+
+def field(name):
+ """Reference a named column of the dataset.
+
+ Stores only the field's name. Type and other information is known only when
+ the expression is bound to a dataset having an explicit scheme.
+
+ Parameters
+ ----------
+ name : string
+ The name of the field the expression references to.
+
+ Returns
+ -------
+ field_expr : Expression
+ """
+ return Expression._field(name)
+
+
+def scalar(value):
+ """Expression representing a scalar value.
+
+ Parameters
+ ----------
+ value : bool, int, float or string
+ Python value of the scalar. Note that only a subset of types are
+ currently supported.
+
+ Returns
+ -------
+ scalar_expr : Expression
+ """
+ return Expression._scalar(value)
+
+
+def partitioning(schema=None, field_names=None, flavor=None,
+ dictionaries=None):
+ """
+ Specify a partitioning scheme.
+
+ The supported schemes include:
+
+ - "DirectoryPartitioning": this scheme expects one segment in the file path
+ for each field in the specified schema (all fields are required to be
+ present). For example given schema<year:int16, month:int8> the path
+ "/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
+ - "HivePartitioning": a scheme for "/$key=$value/" nested directories as
+ found in Apache Hive. This is a multi-level, directory based partitioning
+ scheme. Data is partitioned by static values of a particular column in
+ the schema. Partition keys are represented in the form $key=$value in
+ directory names. Field order is ignored, as are missing or unrecognized
+ field names.
+ For example, given schema<year:int16, month:int8, day:int8>, a possible
+ path would be "/year=2009/month=11/day=15" (but the field order does not
+ need to match).
+
+ Parameters
+ ----------
+ schema : pyarrow.Schema, default None
+ The schema that describes the partitions present in the file path.
+ If not specified, and `field_names` and/or `flavor` are specified,
+ the schema will be inferred from the file path (and a
+ PartitioningFactory is returned).
+ field_names : list of str, default None
+ A list of strings (field names). If specified, the schema's types are
+ inferred from the file paths (only valid for DirectoryPartitioning).
+ flavor : str, default None
+ The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
+ a HivePartitioning.
+ dictionaries : Dict[str, Array]
+ If the type of any field of `schema` is a dictionary type, the
+ corresponding entry of `dictionaries` must be an array containing
+ every value which may be taken by the corresponding column or an
+ error will be raised in parsing. Alternatively, pass `infer` to have
+ Arrow discover the dictionary values, in which case a
+ PartitioningFactory is returned.
+
+ Returns
+ -------
+ Partitioning or PartitioningFactory
+
+ Examples
+ --------
+
+ Specify the Schema for paths like "/2009/June":
+
+ >>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())]))
+
+ or let the types be inferred by only specifying the field names:
+
+ >>> partitioning(field_names=["year", "month"])
+
+ For paths like "/2009/June", the year will be inferred as int32 while month
+ will be inferred as string.
+
+ Specify a Schema with dictionary encoding, providing dictionary values:
+
+ >>> partitioning(
+ ... pa.schema([
+ ... ("year", pa.int16()),
+ ... ("month", pa.dictionary(pa.int8(), pa.string()))
+ ... ]),
+ ... dictionaries={
+ ... "month": pa.array(["January", "February", "March"]),
+ ... })
+
+ Alternatively, specify a Schema with dictionary encoding, but have Arrow
+ infer the dictionary values:
+
+ >>> partitioning(
+ ... pa.schema([
+ ... ("year", pa.int16()),
+ ... ("month", pa.dictionary(pa.int8(), pa.string()))
+ ... ]),
+ ... dictionaries="infer")
+
+ Create a Hive scheme for a path like "/year=2009/month=11":
+
+ >>> partitioning(
+ ... pa.schema([("year", pa.int16()), ("month", pa.int8())]),
+ ... flavor="hive")
+
+ A Hive scheme can also be discovered from the directory structure (and
+ types will be inferred):
+
+ >>> partitioning(flavor="hive")
+
+ """
+ if flavor is None:
+ # default flavor
+ if schema is not None:
+ if field_names is not None:
+ raise ValueError(
+ "Cannot specify both 'schema' and 'field_names'")
+ if dictionaries == 'infer':
+ return DirectoryPartitioning.discover(schema=schema)
+ return DirectoryPartitioning(schema, dictionaries)
+ elif field_names is not None:
+ if isinstance(field_names, list):
+ return DirectoryPartitioning.discover(field_names)
+ else:
+ raise ValueError(
+ "Expected list of field names, got {}".format(
+ type(field_names)))
+ else:
+ raise ValueError(
+ "For the default directory flavor, need to specify "
+ "a Schema or a list of field names")
+ elif flavor == 'hive':
+ if field_names is not None:
+ raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
+ elif schema is not None:
+ if isinstance(schema, pa.Schema):
+ if dictionaries == 'infer':
+ return HivePartitioning.discover(schema=schema)
+ return HivePartitioning(schema, dictionaries)
+ else:
+ raise ValueError(
+ "Expected Schema for 'schema', got {}".format(
+ type(schema)))
+ else:
+ return HivePartitioning.discover()
+ else:
+ raise ValueError("Unsupported flavor")
+
+
+def _ensure_partitioning(scheme):
+ """
+ Validate input and return a Partitioning(Factory).
+
+ It passes None through if no partitioning scheme is defined.
+ """
+ if scheme is None:
+ pass
+ elif isinstance(scheme, str):
+ scheme = partitioning(flavor=scheme)
+ elif isinstance(scheme, list):
+ scheme = partitioning(field_names=scheme)
+ elif isinstance(scheme, (Partitioning, PartitioningFactory)):
+ pass
+ else:
+ ValueError("Expected Partitioning or PartitioningFactory, got {}"
+ .format(type(scheme)))
+ return scheme
+
+
+def _ensure_format(obj):
+ if isinstance(obj, FileFormat):
+ return obj
+ elif obj == "parquet":
+ return ParquetFileFormat()
+ elif obj in {"ipc", "arrow", "feather"}:
+ return IpcFileFormat()
+ elif obj == "csv":
+ return CsvFileFormat()
+ elif obj == "orc":
+ if not _orc_available:
+ raise ValueError(_orc_msg)
+ return OrcFileFormat()
+ else:
+ raise ValueError("format '{}' is not supported".format(obj))
+
+
+def _ensure_multiple_sources(paths, filesystem=None):
+ """
+ Treat a list of paths as files belonging to a single file system
+
+ If the file system is local then also validates that all paths
+ are referencing existing *files* otherwise any non-file paths will be
+ silently skipped (for example on a remote filesystem).
+
+ Parameters
+ ----------
+ paths : list of path-like
+ Note that URIs are not allowed.
+ filesystem : FileSystem or str, optional
+ If an URI is passed, then its path component will act as a prefix for
+ the file paths.
+
+ Returns
+ -------
+ (FileSystem, list of str)
+ File system object and a list of normalized paths.
+
+ Raises
+ ------
+ TypeError
+ If the passed filesystem has wrong type.
+ IOError
+ If the file system is local and a referenced path is not available or
+ not a file.
+ """
+ from pyarrow.fs import (
+ LocalFileSystem, SubTreeFileSystem, _MockFileSystem, FileType,
+ _ensure_filesystem
+ )
+
+ if filesystem is None:
+ # fall back to local file system as the default
+ filesystem = LocalFileSystem()
+ else:
+ # construct a filesystem if it is a valid URI
+ filesystem = _ensure_filesystem(filesystem)
+
+ is_local = (
+ isinstance(filesystem, (LocalFileSystem, _MockFileSystem)) or
+ (isinstance(filesystem, SubTreeFileSystem) and
+ isinstance(filesystem.base_fs, LocalFileSystem))
+ )
+
+ # allow normalizing irregular paths such as Windows local paths
+ paths = [filesystem.normalize_path(_stringify_path(p)) for p in paths]
+
+ # validate that all of the paths are pointing to existing *files*
+ # possible improvement is to group the file_infos by type and raise for
+ # multiple paths per error category
+ if is_local:
+ for info in filesystem.get_file_info(paths):
+ file_type = info.type
+ if file_type == FileType.File:
+ continue
+ elif file_type == FileType.NotFound:
+ raise FileNotFoundError(info.path)
+ elif file_type == FileType.Directory:
+ raise IsADirectoryError(
+ 'Path {} points to a directory, but only file paths are '
+ 'supported. To construct a nested or union dataset pass '
+ 'a list of dataset objects instead.'.format(info.path)
+ )
+ else:
+ raise IOError(
+ 'Path {} exists but its type is unknown (could be a '
+ 'special file such as a Unix socket or character device, '
+ 'or Windows NUL / CON / ...)'.format(info.path)
+ )
+
+ return filesystem, paths
+
+
+def _ensure_single_source(path, filesystem=None):
+ """
+ Treat path as either a recursively traversable directory or a single file.
+
+ Parameters
+ ----------
+ path : path-like
+ filesystem : FileSystem or str, optional
+ If an URI is passed, then its path component will act as a prefix for
+ the file paths.
+
+ Returns
+ -------
+ (FileSystem, list of str or fs.Selector)
+ File system object and either a single item list pointing to a file or
+ an fs.Selector object pointing to a directory.
+
+ Raises
+ ------
+ TypeError
+ If the passed filesystem has wrong type.
+ FileNotFoundError
+ If the referenced file or directory doesn't exist.
+ """
+ from pyarrow.fs import FileType, FileSelector, _resolve_filesystem_and_path
+
+ # at this point we already checked that `path` is a path-like
+ filesystem, path = _resolve_filesystem_and_path(path, filesystem)
+
+ # ensure that the path is normalized before passing to dataset discovery
+ path = filesystem.normalize_path(path)
+
+ # retrieve the file descriptor
+ file_info = filesystem.get_file_info(path)
+
+ # depending on the path type either return with a recursive
+ # directory selector or as a list containing a single file
+ if file_info.type == FileType.Directory:
+ paths_or_selector = FileSelector(path, recursive=True)
+ elif file_info.type == FileType.File:
+ paths_or_selector = [path]
+ else:
+ raise FileNotFoundError(path)
+
+ return filesystem, paths_or_selector
+
+
+def _filesystem_dataset(source, schema=None, filesystem=None,
+ partitioning=None, format=None,
+ partition_base_dir=None, exclude_invalid_files=None,
+ selector_ignore_prefixes=None):
+ """
+ Create a FileSystemDataset which can be used to build a Dataset.
+
+ Parameters are documented in the dataset function.
+
+ Returns
+ -------
+ FileSystemDataset
+ """
+ format = _ensure_format(format or 'parquet')
+ partitioning = _ensure_partitioning(partitioning)
+
+ if isinstance(source, (list, tuple)):
+ fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
+ else:
+ fs, paths_or_selector = _ensure_single_source(source, filesystem)
+
+ options = FileSystemFactoryOptions(
+ partitioning=partitioning,
+ partition_base_dir=partition_base_dir,
+ exclude_invalid_files=exclude_invalid_files,
+ selector_ignore_prefixes=selector_ignore_prefixes
+ )
+ factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)
+
+ return factory.finish(schema)
+
+
+def _in_memory_dataset(source, schema=None, **kwargs):
+ if any(v is not None for v in kwargs.values()):
+ raise ValueError(
+ "For in-memory datasets, you cannot pass any additional arguments")
+ return InMemoryDataset(source, schema)
+
+
+def _union_dataset(children, schema=None, **kwargs):
+ if any(v is not None for v in kwargs.values()):
+ raise ValueError(
+ "When passing a list of Datasets, you cannot pass any additional "
+ "arguments"
+ )
+
+ if schema is None:
+ # unify the children datasets' schemas
+ schema = pa.unify_schemas([child.schema for child in children])
+
+ # create datasets with the requested schema
+ children = [child.replace_schema(schema) for child in children]
+
+ return UnionDataset(schema, children)
+
+
+def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None,
+ partitioning=None, partition_base_dir=None):
+ """
+ Create a FileSystemDataset from a `_metadata` file created via
+ `pyarrrow.parquet.write_metadata`.
+
+ Parameters
+ ----------
+ metadata_path : path,
+ Path pointing to a single file parquet metadata file
+ schema : Schema, optional
+ Optionally provide the Schema for the Dataset, in which case it will
+ not be inferred from the source.
+ filesystem : FileSystem or URI string, default None
+ If a single path is given as source and filesystem is None, then the
+ filesystem will be inferred from the path.
+ If an URI string is passed, then a filesystem object is constructed
+ using the URI's optional path component as a directory prefix. See the
+ examples below.
+ Note that the URIs on Windows must follow 'file:///C:...' or
+ 'file:/C:...' patterns.
+ format : ParquetFileFormat
+ An instance of a ParquetFileFormat if special options needs to be
+ passed.
+ partitioning : Partitioning, PartitioningFactory, str, list of str
+ The partitioning scheme specified with the ``partitioning()``
+ function. A flavor string can be used as shortcut, and with a list of
+ field names a DirectionaryPartitioning will be inferred.
+ partition_base_dir : str, optional
+ For the purposes of applying the partitioning, paths will be
+ stripped of the partition_base_dir. Files not matching the
+ partition_base_dir prefix will be skipped for partitioning discovery.
+ The ignored files will still be part of the Dataset, but will not
+ have partition information.
+
+ Returns
+ -------
+ FileSystemDataset
+ """
+ from pyarrow.fs import LocalFileSystem, _ensure_filesystem
+
+ if format is None:
+ format = ParquetFileFormat()
+ elif not isinstance(format, ParquetFileFormat):
+ raise ValueError("format argument must be a ParquetFileFormat")
+
+ if filesystem is None:
+ filesystem = LocalFileSystem()
+ else:
+ filesystem = _ensure_filesystem(filesystem)
+
+ metadata_path = filesystem.normalize_path(_stringify_path(metadata_path))
+ options = ParquetFactoryOptions(
+ partition_base_dir=partition_base_dir,
+ partitioning=_ensure_partitioning(partitioning)
+ )
+
+ factory = ParquetDatasetFactory(
+ metadata_path, filesystem, format, options=options)
+ return factory.finish(schema)
+
+
+def dataset(source, schema=None, format=None, filesystem=None,
+ partitioning=None, partition_base_dir=None,
+ exclude_invalid_files=None, ignore_prefixes=None):
+ """
+ Open a dataset.
+
+ Datasets provides functionality to efficiently work with tabular,
+ potentially larger than memory and multi-file dataset.
+
+ - A unified interface for different sources, like Parquet and Feather
+ - Discovery of sources (crawling directories, handle directory-based
+ partitioned datasets, basic schema normalization)
+ - Optimized reading with predicate pushdown (filtering rows), projection
+ (selecting columns), parallel reading or fine-grained managing of tasks.
+
+ Note that this is the high-level API, to have more control over the dataset
+ construction use the low-level API classes (FileSystemDataset,
+ FilesystemDatasetFactory, etc.)
+
+ Parameters
+ ----------
+ source : path, list of paths, dataset, list of datasets, (list of) batches\
+or tables, iterable of batches, RecordBatchReader, or URI
+ Path pointing to a single file:
+ Open a FileSystemDataset from a single file.
+ Path pointing to a directory:
+ The directory gets discovered recursively according to a
+ partitioning scheme if given.
+ List of file paths:
+ Create a FileSystemDataset from explicitly given files. The files
+ must be located on the same filesystem given by the filesystem
+ parameter.
+ Note that in contrary of construction from a single file, passing
+ URIs as paths is not allowed.
+ List of datasets:
+ A nested UnionDataset gets constructed, it allows arbitrary
+ composition of other datasets.
+ Note that additional keyword arguments are not allowed.
+ (List of) batches or tables, iterable of batches, or RecordBatchReader:
+ Create an InMemoryDataset. If an iterable or empty list is given,
+ a schema must also be given. If an iterable or RecordBatchReader
+ is given, the resulting dataset can only be scanned once; further
+ attempts will raise an error.
+ schema : Schema, optional
+ Optionally provide the Schema for the Dataset, in which case it will
+ not be inferred from the source.
+ format : FileFormat or str
+ Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For
+ Feather, only version 2 files are supported.
+ filesystem : FileSystem or URI string, default None
+ If a single path is given as source and filesystem is None, then the
+ filesystem will be inferred from the path.
+ If an URI string is passed, then a filesystem object is constructed
+ using the URI's optional path component as a directory prefix. See the
+ examples below.
+ Note that the URIs on Windows must follow 'file:///C:...' or
+ 'file:/C:...' patterns.
+ partitioning : Partitioning, PartitioningFactory, str, list of str
+ The partitioning scheme specified with the ``partitioning()``
+ function. A flavor string can be used as shortcut, and with a list of
+ field names a DirectionaryPartitioning will be inferred.
+ partition_base_dir : str, optional
+ For the purposes of applying the partitioning, paths will be
+ stripped of the partition_base_dir. Files not matching the
+ partition_base_dir prefix will be skipped for partitioning discovery.
+ The ignored files will still be part of the Dataset, but will not
+ have partition information.
+ exclude_invalid_files : bool, optional (default True)
+ If True, invalid files will be excluded (file format specific check).
+ This will incur IO for each files in a serial and single threaded
+ fashion. Disabling this feature will skip the IO, but unsupported
+ files may be present in the Dataset (resulting in an error at scan
+ time).
+ ignore_prefixes : list, optional
+ Files matching any of these prefixes will be ignored by the
+ discovery process. This is matched to the basename of a path.
+ By default this is ['.', '_'].
+ Note that discovery happens only if a directory is passed as source.
+
+ Returns
+ -------
+ dataset : Dataset
+ Either a FileSystemDataset or a UnionDataset depending on the source
+ parameter.
+
+ Examples
+ --------
+ Opening a single file:
+
+ >>> dataset("path/to/file.parquet", format="parquet")
+
+ Opening a single file with an explicit schema:
+
+ >>> dataset("path/to/file.parquet", schema=myschema, format="parquet")
+
+ Opening a dataset for a single directory:
+
+ >>> dataset("path/to/nyc-taxi/", format="parquet")
+ >>> dataset("s3://mybucket/nyc-taxi/", format="parquet")
+
+ Opening a dataset from a list of relatives local paths:
+
+ >>> dataset([
+ ... "part0/data.parquet",
+ ... "part1/data.parquet",
+ ... "part3/data.parquet",
+ ... ], format='parquet')
+
+ With filesystem provided:
+
+ >>> paths = [
+ ... 'part0/data.parquet',
+ ... 'part1/data.parquet',
+ ... 'part3/data.parquet',
+ ... ]
+ >>> dataset(paths, filesystem='file:///directory/prefix, format='parquet')
+
+ Which is equivalent with:
+
+ >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem())
+ >>> dataset(paths, filesystem=fs, format='parquet')
+
+ With a remote filesystem URI:
+
+ >>> paths = [
+ ... 'nested/directory/part0/data.parquet',
+ ... 'nested/directory/part1/data.parquet',
+ ... 'nested/directory/part3/data.parquet',
+ ... ]
+ >>> dataset(paths, filesystem='s3://bucket/', format='parquet')
+
+ Similarly to the local example, the directory prefix may be included in the
+ filesystem URI:
+
+ >>> dataset(paths, filesystem='s3://bucket/nested/directory',
+ ... format='parquet')
+
+ Construction of a nested dataset:
+
+ >>> dataset([
+ ... dataset("s3://old-taxi-data", format="parquet"),
+ ... dataset("local/path/to/data", format="ipc")
+ ... ])
+ """
+ # collect the keyword arguments for later reuse
+ kwargs = dict(
+ schema=schema,
+ filesystem=filesystem,
+ partitioning=partitioning,
+ format=format,
+ partition_base_dir=partition_base_dir,
+ exclude_invalid_files=exclude_invalid_files,
+ selector_ignore_prefixes=ignore_prefixes
+ )
+
+ if _is_path_like(source):
+ return _filesystem_dataset(source, **kwargs)
+ elif isinstance(source, (tuple, list)):
+ if all(_is_path_like(elem) for elem in source):
+ return _filesystem_dataset(source, **kwargs)
+ elif all(isinstance(elem, Dataset) for elem in source):
+ return _union_dataset(source, **kwargs)
+ elif all(isinstance(elem, (pa.RecordBatch, pa.Table))
+ for elem in source):
+ return _in_memory_dataset(source, **kwargs)
+ else:
+ unique_types = set(type(elem).__name__ for elem in source)
+ type_names = ', '.join('{}'.format(t) for t in unique_types)
+ raise TypeError(
+ 'Expected a list of path-like or dataset objects, or a list '
+ 'of batches or tables. The given list contains the following '
+ 'types: {}'.format(type_names)
+ )
+ elif isinstance(source, (pa.RecordBatch, pa.Table)):
+ return _in_memory_dataset(source, **kwargs)
+ else:
+ raise TypeError(
+ 'Expected a path-like, list of path-likes or a list of Datasets '
+ 'instead of the given type: {}'.format(type(source).__name__)
+ )
+
+
+def _ensure_write_partitioning(part, schema, flavor):
+ if isinstance(part, PartitioningFactory):
+ raise ValueError("A PartitioningFactory cannot be used. "
+ "Did you call the partitioning function "
+ "without supplying a schema?")
+
+ if isinstance(part, Partitioning) and flavor:
+ raise ValueError(
+ "Providing a partitioning_flavor with "
+ "a Partitioning object is not supported"
+ )
+ elif isinstance(part, (tuple, list)):
+ # Name of fields were provided instead of a partitioning object.
+ # Create a partitioning factory with those field names.
+ part = partitioning(
+ schema=pa.schema([schema.field(f) for f in part]),
+ flavor=flavor
+ )
+ elif part is None:
+ part = partitioning(pa.schema([]), flavor=flavor)
+
+ if not isinstance(part, Partitioning):
+ raise ValueError(
+ "partitioning must be a Partitioning object or "
+ "a list of column names"
+ )
+
+ return part
+
+
+def write_dataset(data, base_dir, basename_template=None, format=None,
+ partitioning=None, partitioning_flavor=None, schema=None,
+ filesystem=None, file_options=None, use_threads=True,
+ max_partitions=None, file_visitor=None,
+ existing_data_behavior='error'):
+ """
+ Write a dataset to a given format and partitioning.
+
+ Parameters
+ ----------
+ data : Dataset, Table/RecordBatch, RecordBatchReader, list of
+ Table/RecordBatch, or iterable of RecordBatch
+ The data to write. This can be a Dataset instance or
+ in-memory Arrow data. If an iterable is given, the schema must
+ also be given.
+ base_dir : str
+ The root directory where to write the dataset.
+ basename_template : str, optional
+ A template string used to generate basenames of written data files.
+ The token '{i}' will be replaced with an automatically incremented
+ integer. If not specified, it defaults to
+ "part-{i}." + format.default_extname
+ format : FileFormat or str
+ The format in which to write the dataset. Currently supported:
+ "parquet", "ipc"/"feather". If a FileSystemDataset is being written
+ and `format` is not specified, it defaults to the same format as the
+ specified FileSystemDataset. When writing a Table or RecordBatch, this
+ keyword is required.
+ partitioning : Partitioning or list[str], optional
+ The partitioning scheme specified with the ``partitioning()``
+ function or a list of field names. When providing a list of
+ field names, you can use ``partitioning_flavor`` to drive which
+ partitioning type should be used.
+ partitioning_flavor : str, optional
+ One of the partitioning flavors supported by
+ ``pyarrow.dataset.partitioning``. If omitted will use the
+ default of ``partitioning()`` which is directory partitioning.
+ schema : Schema, optional
+ filesystem : FileSystem, optional
+ file_options : FileWriteOptions, optional
+ FileFormat specific write options, created using the
+ ``FileFormat.make_write_options()`` function.
+ use_threads : bool, default True
+ Write files in parallel. If enabled, then maximum parallelism will be
+ used determined by the number of available CPU cores.
+ max_partitions : int, default 1024
+ Maximum number of partitions any batch may be written into.
+ file_visitor : Function
+ If set, this function will be called with a WrittenFile instance
+ for each file created during the call. This object will have both
+ a path attribute and a metadata attribute.
+
+ The path attribute will be a string containing the path to
+ the created file.
+
+ The metadata attribute will be the parquet metadata of the file.
+ This metadata will have the file path attribute set and can be used
+ to build a _metadata file. The metadata attribute will be None if
+ the format is not parquet.
+
+ Example visitor which simple collects the filenames created::
+
+ visited_paths = []
+
+ def file_visitor(written_file):
+ visited_paths.append(written_file.path)
+ existing_data_behavior : 'error' | 'overwrite_or_ignore' | \
+'delete_matching'
+ Controls how the dataset will handle data that already exists in
+ the destination. The default behavior ('error') is to raise an error
+ if any data exists in the destination.
+
+ 'overwrite_or_ignore' will ignore any existing data and will
+ overwrite files with the same name as an output file. Other
+ existing files will be ignored. This behavior, in combination
+ with a unique basename_template for each write, will allow for
+ an append workflow.
+
+ 'delete_matching' is useful when you are writing a partitioned
+ dataset. The first time each partition directory is encountered
+ the entire directory will be deleted. This allows you to overwrite
+ old partitions completely.
+ """
+ from pyarrow.fs import _resolve_filesystem_and_path
+
+ if isinstance(data, (list, tuple)):
+ schema = schema or data[0].schema
+ data = InMemoryDataset(data, schema=schema)
+ elif isinstance(data, (pa.RecordBatch, pa.Table)):
+ schema = schema or data.schema
+ data = InMemoryDataset(data, schema=schema)
+ elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data):
+ data = Scanner.from_batches(data, schema=schema, use_async=True)
+ schema = None
+ elif not isinstance(data, (Dataset, Scanner)):
+ raise ValueError(
+ "Only Dataset, Scanner, Table/RecordBatch, RecordBatchReader, "
+ "a list of Tables/RecordBatches, or iterable of batches are "
+ "supported."
+ )
+
+ if format is None and isinstance(data, FileSystemDataset):
+ format = data.format
+ else:
+ format = _ensure_format(format)
+
+ if file_options is None:
+ file_options = format.make_write_options()
+
+ if format != file_options.format:
+ raise TypeError("Supplied FileWriteOptions have format {}, "
+ "which doesn't match supplied FileFormat {}".format(
+ format, file_options))
+
+ if basename_template is None:
+ basename_template = "part-{i}." + format.default_extname
+
+ if max_partitions is None:
+ max_partitions = 1024
+
+ # at this point data is a Scanner or a Dataset, anything else
+ # was converted to one of those two. So we can grab the schema
+ # to build the partitioning object from Dataset.
+ if isinstance(data, Scanner):
+ partitioning_schema = data.dataset_schema
+ else:
+ partitioning_schema = data.schema
+ partitioning = _ensure_write_partitioning(partitioning,
+ schema=partitioning_schema,
+ flavor=partitioning_flavor)
+
+ filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem)
+
+ if isinstance(data, Dataset):
+ scanner = data.scanner(use_threads=use_threads, use_async=True)
+ else:
+ # scanner was passed directly by the user, in which case a schema
+ # cannot be passed
+ if schema is not None:
+ raise ValueError("Cannot specify a schema when writing a Scanner")
+ scanner = data
+
+ _filesystemdataset_write(
+ scanner, base_dir, basename_template, filesystem, partitioning,
+ file_options, max_partitions, file_visitor, existing_data_behavior
+ )