diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/dataset.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/dataset.py')
-rw-r--r-- | src/arrow/python/pyarrow/dataset.py | 881 |
1 files changed, 881 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/dataset.py b/src/arrow/python/pyarrow/dataset.py new file mode 100644 index 000000000..42515a9f4 --- /dev/null +++ b/src/arrow/python/pyarrow/dataset.py @@ -0,0 +1,881 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Dataset is currently unstable. APIs subject to change without notice.""" + +import pyarrow as pa +from pyarrow.util import _is_iterable, _stringify_path, _is_path_like + +from pyarrow._dataset import ( # noqa + CsvFileFormat, + CsvFragmentScanOptions, + Expression, + Dataset, + DatasetFactory, + DirectoryPartitioning, + FileFormat, + FileFragment, + FileSystemDataset, + FileSystemDatasetFactory, + FileSystemFactoryOptions, + FileWriteOptions, + Fragment, + HivePartitioning, + IpcFileFormat, + IpcFileWriteOptions, + InMemoryDataset, + ParquetDatasetFactory, + ParquetFactoryOptions, + ParquetFileFormat, + ParquetFileFragment, + ParquetFileWriteOptions, + ParquetFragmentScanOptions, + ParquetReadOptions, + Partitioning, + PartitioningFactory, + RowGroupInfo, + Scanner, + TaggedRecordBatch, + UnionDataset, + UnionDatasetFactory, + _get_partition_keys, + _filesystemdataset_write, +) + +_orc_available = False +_orc_msg = ( + "The pyarrow installation is not built with support for the ORC file " + "format." +) + +try: + from pyarrow._dataset_orc import OrcFileFormat + _orc_available = True +except ImportError: + pass + + +def __getattr__(name): + if name == "OrcFileFormat" and not _orc_available: + raise ImportError(_orc_msg) + + raise AttributeError( + "module 'pyarrow.dataset' has no attribute '{0}'".format(name) + ) + + +def field(name): + """Reference a named column of the dataset. + + Stores only the field's name. Type and other information is known only when + the expression is bound to a dataset having an explicit scheme. + + Parameters + ---------- + name : string + The name of the field the expression references to. + + Returns + ------- + field_expr : Expression + """ + return Expression._field(name) + + +def scalar(value): + """Expression representing a scalar value. + + Parameters + ---------- + value : bool, int, float or string + Python value of the scalar. Note that only a subset of types are + currently supported. + + Returns + ------- + scalar_expr : Expression + """ + return Expression._scalar(value) + + +def partitioning(schema=None, field_names=None, flavor=None, + dictionaries=None): + """ + Specify a partitioning scheme. + + The supported schemes include: + + - "DirectoryPartitioning": this scheme expects one segment in the file path + for each field in the specified schema (all fields are required to be + present). For example given schema<year:int16, month:int8> the path + "/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11). + - "HivePartitioning": a scheme for "/$key=$value/" nested directories as + found in Apache Hive. This is a multi-level, directory based partitioning + scheme. Data is partitioned by static values of a particular column in + the schema. Partition keys are represented in the form $key=$value in + directory names. Field order is ignored, as are missing or unrecognized + field names. + For example, given schema<year:int16, month:int8, day:int8>, a possible + path would be "/year=2009/month=11/day=15" (but the field order does not + need to match). + + Parameters + ---------- + schema : pyarrow.Schema, default None + The schema that describes the partitions present in the file path. + If not specified, and `field_names` and/or `flavor` are specified, + the schema will be inferred from the file path (and a + PartitioningFactory is returned). + field_names : list of str, default None + A list of strings (field names). If specified, the schema's types are + inferred from the file paths (only valid for DirectoryPartitioning). + flavor : str, default None + The default is DirectoryPartitioning. Specify ``flavor="hive"`` for + a HivePartitioning. + dictionaries : Dict[str, Array] + If the type of any field of `schema` is a dictionary type, the + corresponding entry of `dictionaries` must be an array containing + every value which may be taken by the corresponding column or an + error will be raised in parsing. Alternatively, pass `infer` to have + Arrow discover the dictionary values, in which case a + PartitioningFactory is returned. + + Returns + ------- + Partitioning or PartitioningFactory + + Examples + -------- + + Specify the Schema for paths like "/2009/June": + + >>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())])) + + or let the types be inferred by only specifying the field names: + + >>> partitioning(field_names=["year", "month"]) + + For paths like "/2009/June", the year will be inferred as int32 while month + will be inferred as string. + + Specify a Schema with dictionary encoding, providing dictionary values: + + >>> partitioning( + ... pa.schema([ + ... ("year", pa.int16()), + ... ("month", pa.dictionary(pa.int8(), pa.string())) + ... ]), + ... dictionaries={ + ... "month": pa.array(["January", "February", "March"]), + ... }) + + Alternatively, specify a Schema with dictionary encoding, but have Arrow + infer the dictionary values: + + >>> partitioning( + ... pa.schema([ + ... ("year", pa.int16()), + ... ("month", pa.dictionary(pa.int8(), pa.string())) + ... ]), + ... dictionaries="infer") + + Create a Hive scheme for a path like "/year=2009/month=11": + + >>> partitioning( + ... pa.schema([("year", pa.int16()), ("month", pa.int8())]), + ... flavor="hive") + + A Hive scheme can also be discovered from the directory structure (and + types will be inferred): + + >>> partitioning(flavor="hive") + + """ + if flavor is None: + # default flavor + if schema is not None: + if field_names is not None: + raise ValueError( + "Cannot specify both 'schema' and 'field_names'") + if dictionaries == 'infer': + return DirectoryPartitioning.discover(schema=schema) + return DirectoryPartitioning(schema, dictionaries) + elif field_names is not None: + if isinstance(field_names, list): + return DirectoryPartitioning.discover(field_names) + else: + raise ValueError( + "Expected list of field names, got {}".format( + type(field_names))) + else: + raise ValueError( + "For the default directory flavor, need to specify " + "a Schema or a list of field names") + elif flavor == 'hive': + if field_names is not None: + raise ValueError("Cannot specify 'field_names' for flavor 'hive'") + elif schema is not None: + if isinstance(schema, pa.Schema): + if dictionaries == 'infer': + return HivePartitioning.discover(schema=schema) + return HivePartitioning(schema, dictionaries) + else: + raise ValueError( + "Expected Schema for 'schema', got {}".format( + type(schema))) + else: + return HivePartitioning.discover() + else: + raise ValueError("Unsupported flavor") + + +def _ensure_partitioning(scheme): + """ + Validate input and return a Partitioning(Factory). + + It passes None through if no partitioning scheme is defined. + """ + if scheme is None: + pass + elif isinstance(scheme, str): + scheme = partitioning(flavor=scheme) + elif isinstance(scheme, list): + scheme = partitioning(field_names=scheme) + elif isinstance(scheme, (Partitioning, PartitioningFactory)): + pass + else: + ValueError("Expected Partitioning or PartitioningFactory, got {}" + .format(type(scheme))) + return scheme + + +def _ensure_format(obj): + if isinstance(obj, FileFormat): + return obj + elif obj == "parquet": + return ParquetFileFormat() + elif obj in {"ipc", "arrow", "feather"}: + return IpcFileFormat() + elif obj == "csv": + return CsvFileFormat() + elif obj == "orc": + if not _orc_available: + raise ValueError(_orc_msg) + return OrcFileFormat() + else: + raise ValueError("format '{}' is not supported".format(obj)) + + +def _ensure_multiple_sources(paths, filesystem=None): + """ + Treat a list of paths as files belonging to a single file system + + If the file system is local then also validates that all paths + are referencing existing *files* otherwise any non-file paths will be + silently skipped (for example on a remote filesystem). + + Parameters + ---------- + paths : list of path-like + Note that URIs are not allowed. + filesystem : FileSystem or str, optional + If an URI is passed, then its path component will act as a prefix for + the file paths. + + Returns + ------- + (FileSystem, list of str) + File system object and a list of normalized paths. + + Raises + ------ + TypeError + If the passed filesystem has wrong type. + IOError + If the file system is local and a referenced path is not available or + not a file. + """ + from pyarrow.fs import ( + LocalFileSystem, SubTreeFileSystem, _MockFileSystem, FileType, + _ensure_filesystem + ) + + if filesystem is None: + # fall back to local file system as the default + filesystem = LocalFileSystem() + else: + # construct a filesystem if it is a valid URI + filesystem = _ensure_filesystem(filesystem) + + is_local = ( + isinstance(filesystem, (LocalFileSystem, _MockFileSystem)) or + (isinstance(filesystem, SubTreeFileSystem) and + isinstance(filesystem.base_fs, LocalFileSystem)) + ) + + # allow normalizing irregular paths such as Windows local paths + paths = [filesystem.normalize_path(_stringify_path(p)) for p in paths] + + # validate that all of the paths are pointing to existing *files* + # possible improvement is to group the file_infos by type and raise for + # multiple paths per error category + if is_local: + for info in filesystem.get_file_info(paths): + file_type = info.type + if file_type == FileType.File: + continue + elif file_type == FileType.NotFound: + raise FileNotFoundError(info.path) + elif file_type == FileType.Directory: + raise IsADirectoryError( + 'Path {} points to a directory, but only file paths are ' + 'supported. To construct a nested or union dataset pass ' + 'a list of dataset objects instead.'.format(info.path) + ) + else: + raise IOError( + 'Path {} exists but its type is unknown (could be a ' + 'special file such as a Unix socket or character device, ' + 'or Windows NUL / CON / ...)'.format(info.path) + ) + + return filesystem, paths + + +def _ensure_single_source(path, filesystem=None): + """ + Treat path as either a recursively traversable directory or a single file. + + Parameters + ---------- + path : path-like + filesystem : FileSystem or str, optional + If an URI is passed, then its path component will act as a prefix for + the file paths. + + Returns + ------- + (FileSystem, list of str or fs.Selector) + File system object and either a single item list pointing to a file or + an fs.Selector object pointing to a directory. + + Raises + ------ + TypeError + If the passed filesystem has wrong type. + FileNotFoundError + If the referenced file or directory doesn't exist. + """ + from pyarrow.fs import FileType, FileSelector, _resolve_filesystem_and_path + + # at this point we already checked that `path` is a path-like + filesystem, path = _resolve_filesystem_and_path(path, filesystem) + + # ensure that the path is normalized before passing to dataset discovery + path = filesystem.normalize_path(path) + + # retrieve the file descriptor + file_info = filesystem.get_file_info(path) + + # depending on the path type either return with a recursive + # directory selector or as a list containing a single file + if file_info.type == FileType.Directory: + paths_or_selector = FileSelector(path, recursive=True) + elif file_info.type == FileType.File: + paths_or_selector = [path] + else: + raise FileNotFoundError(path) + + return filesystem, paths_or_selector + + +def _filesystem_dataset(source, schema=None, filesystem=None, + partitioning=None, format=None, + partition_base_dir=None, exclude_invalid_files=None, + selector_ignore_prefixes=None): + """ + Create a FileSystemDataset which can be used to build a Dataset. + + Parameters are documented in the dataset function. + + Returns + ------- + FileSystemDataset + """ + format = _ensure_format(format or 'parquet') + partitioning = _ensure_partitioning(partitioning) + + if isinstance(source, (list, tuple)): + fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) + else: + fs, paths_or_selector = _ensure_single_source(source, filesystem) + + options = FileSystemFactoryOptions( + partitioning=partitioning, + partition_base_dir=partition_base_dir, + exclude_invalid_files=exclude_invalid_files, + selector_ignore_prefixes=selector_ignore_prefixes + ) + factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options) + + return factory.finish(schema) + + +def _in_memory_dataset(source, schema=None, **kwargs): + if any(v is not None for v in kwargs.values()): + raise ValueError( + "For in-memory datasets, you cannot pass any additional arguments") + return InMemoryDataset(source, schema) + + +def _union_dataset(children, schema=None, **kwargs): + if any(v is not None for v in kwargs.values()): + raise ValueError( + "When passing a list of Datasets, you cannot pass any additional " + "arguments" + ) + + if schema is None: + # unify the children datasets' schemas + schema = pa.unify_schemas([child.schema for child in children]) + + # create datasets with the requested schema + children = [child.replace_schema(schema) for child in children] + + return UnionDataset(schema, children) + + +def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None, + partitioning=None, partition_base_dir=None): + """ + Create a FileSystemDataset from a `_metadata` file created via + `pyarrrow.parquet.write_metadata`. + + Parameters + ---------- + metadata_path : path, + Path pointing to a single file parquet metadata file + schema : Schema, optional + Optionally provide the Schema for the Dataset, in which case it will + not be inferred from the source. + filesystem : FileSystem or URI string, default None + If a single path is given as source and filesystem is None, then the + filesystem will be inferred from the path. + If an URI string is passed, then a filesystem object is constructed + using the URI's optional path component as a directory prefix. See the + examples below. + Note that the URIs on Windows must follow 'file:///C:...' or + 'file:/C:...' patterns. + format : ParquetFileFormat + An instance of a ParquetFileFormat if special options needs to be + passed. + partitioning : Partitioning, PartitioningFactory, str, list of str + The partitioning scheme specified with the ``partitioning()`` + function. A flavor string can be used as shortcut, and with a list of + field names a DirectionaryPartitioning will be inferred. + partition_base_dir : str, optional + For the purposes of applying the partitioning, paths will be + stripped of the partition_base_dir. Files not matching the + partition_base_dir prefix will be skipped for partitioning discovery. + The ignored files will still be part of the Dataset, but will not + have partition information. + + Returns + ------- + FileSystemDataset + """ + from pyarrow.fs import LocalFileSystem, _ensure_filesystem + + if format is None: + format = ParquetFileFormat() + elif not isinstance(format, ParquetFileFormat): + raise ValueError("format argument must be a ParquetFileFormat") + + if filesystem is None: + filesystem = LocalFileSystem() + else: + filesystem = _ensure_filesystem(filesystem) + + metadata_path = filesystem.normalize_path(_stringify_path(metadata_path)) + options = ParquetFactoryOptions( + partition_base_dir=partition_base_dir, + partitioning=_ensure_partitioning(partitioning) + ) + + factory = ParquetDatasetFactory( + metadata_path, filesystem, format, options=options) + return factory.finish(schema) + + +def dataset(source, schema=None, format=None, filesystem=None, + partitioning=None, partition_base_dir=None, + exclude_invalid_files=None, ignore_prefixes=None): + """ + Open a dataset. + + Datasets provides functionality to efficiently work with tabular, + potentially larger than memory and multi-file dataset. + + - A unified interface for different sources, like Parquet and Feather + - Discovery of sources (crawling directories, handle directory-based + partitioned datasets, basic schema normalization) + - Optimized reading with predicate pushdown (filtering rows), projection + (selecting columns), parallel reading or fine-grained managing of tasks. + + Note that this is the high-level API, to have more control over the dataset + construction use the low-level API classes (FileSystemDataset, + FilesystemDatasetFactory, etc.) + + Parameters + ---------- + source : path, list of paths, dataset, list of datasets, (list of) batches\ +or tables, iterable of batches, RecordBatchReader, or URI + Path pointing to a single file: + Open a FileSystemDataset from a single file. + Path pointing to a directory: + The directory gets discovered recursively according to a + partitioning scheme if given. + List of file paths: + Create a FileSystemDataset from explicitly given files. The files + must be located on the same filesystem given by the filesystem + parameter. + Note that in contrary of construction from a single file, passing + URIs as paths is not allowed. + List of datasets: + A nested UnionDataset gets constructed, it allows arbitrary + composition of other datasets. + Note that additional keyword arguments are not allowed. + (List of) batches or tables, iterable of batches, or RecordBatchReader: + Create an InMemoryDataset. If an iterable or empty list is given, + a schema must also be given. If an iterable or RecordBatchReader + is given, the resulting dataset can only be scanned once; further + attempts will raise an error. + schema : Schema, optional + Optionally provide the Schema for the Dataset, in which case it will + not be inferred from the source. + format : FileFormat or str + Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For + Feather, only version 2 files are supported. + filesystem : FileSystem or URI string, default None + If a single path is given as source and filesystem is None, then the + filesystem will be inferred from the path. + If an URI string is passed, then a filesystem object is constructed + using the URI's optional path component as a directory prefix. See the + examples below. + Note that the URIs on Windows must follow 'file:///C:...' or + 'file:/C:...' patterns. + partitioning : Partitioning, PartitioningFactory, str, list of str + The partitioning scheme specified with the ``partitioning()`` + function. A flavor string can be used as shortcut, and with a list of + field names a DirectionaryPartitioning will be inferred. + partition_base_dir : str, optional + For the purposes of applying the partitioning, paths will be + stripped of the partition_base_dir. Files not matching the + partition_base_dir prefix will be skipped for partitioning discovery. + The ignored files will still be part of the Dataset, but will not + have partition information. + exclude_invalid_files : bool, optional (default True) + If True, invalid files will be excluded (file format specific check). + This will incur IO for each files in a serial and single threaded + fashion. Disabling this feature will skip the IO, but unsupported + files may be present in the Dataset (resulting in an error at scan + time). + ignore_prefixes : list, optional + Files matching any of these prefixes will be ignored by the + discovery process. This is matched to the basename of a path. + By default this is ['.', '_']. + Note that discovery happens only if a directory is passed as source. + + Returns + ------- + dataset : Dataset + Either a FileSystemDataset or a UnionDataset depending on the source + parameter. + + Examples + -------- + Opening a single file: + + >>> dataset("path/to/file.parquet", format="parquet") + + Opening a single file with an explicit schema: + + >>> dataset("path/to/file.parquet", schema=myschema, format="parquet") + + Opening a dataset for a single directory: + + >>> dataset("path/to/nyc-taxi/", format="parquet") + >>> dataset("s3://mybucket/nyc-taxi/", format="parquet") + + Opening a dataset from a list of relatives local paths: + + >>> dataset([ + ... "part0/data.parquet", + ... "part1/data.parquet", + ... "part3/data.parquet", + ... ], format='parquet') + + With filesystem provided: + + >>> paths = [ + ... 'part0/data.parquet', + ... 'part1/data.parquet', + ... 'part3/data.parquet', + ... ] + >>> dataset(paths, filesystem='file:///directory/prefix, format='parquet') + + Which is equivalent with: + + >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem()) + >>> dataset(paths, filesystem=fs, format='parquet') + + With a remote filesystem URI: + + >>> paths = [ + ... 'nested/directory/part0/data.parquet', + ... 'nested/directory/part1/data.parquet', + ... 'nested/directory/part3/data.parquet', + ... ] + >>> dataset(paths, filesystem='s3://bucket/', format='parquet') + + Similarly to the local example, the directory prefix may be included in the + filesystem URI: + + >>> dataset(paths, filesystem='s3://bucket/nested/directory', + ... format='parquet') + + Construction of a nested dataset: + + >>> dataset([ + ... dataset("s3://old-taxi-data", format="parquet"), + ... dataset("local/path/to/data", format="ipc") + ... ]) + """ + # collect the keyword arguments for later reuse + kwargs = dict( + schema=schema, + filesystem=filesystem, + partitioning=partitioning, + format=format, + partition_base_dir=partition_base_dir, + exclude_invalid_files=exclude_invalid_files, + selector_ignore_prefixes=ignore_prefixes + ) + + if _is_path_like(source): + return _filesystem_dataset(source, **kwargs) + elif isinstance(source, (tuple, list)): + if all(_is_path_like(elem) for elem in source): + return _filesystem_dataset(source, **kwargs) + elif all(isinstance(elem, Dataset) for elem in source): + return _union_dataset(source, **kwargs) + elif all(isinstance(elem, (pa.RecordBatch, pa.Table)) + for elem in source): + return _in_memory_dataset(source, **kwargs) + else: + unique_types = set(type(elem).__name__ for elem in source) + type_names = ', '.join('{}'.format(t) for t in unique_types) + raise TypeError( + 'Expected a list of path-like or dataset objects, or a list ' + 'of batches or tables. The given list contains the following ' + 'types: {}'.format(type_names) + ) + elif isinstance(source, (pa.RecordBatch, pa.Table)): + return _in_memory_dataset(source, **kwargs) + else: + raise TypeError( + 'Expected a path-like, list of path-likes or a list of Datasets ' + 'instead of the given type: {}'.format(type(source).__name__) + ) + + +def _ensure_write_partitioning(part, schema, flavor): + if isinstance(part, PartitioningFactory): + raise ValueError("A PartitioningFactory cannot be used. " + "Did you call the partitioning function " + "without supplying a schema?") + + if isinstance(part, Partitioning) and flavor: + raise ValueError( + "Providing a partitioning_flavor with " + "a Partitioning object is not supported" + ) + elif isinstance(part, (tuple, list)): + # Name of fields were provided instead of a partitioning object. + # Create a partitioning factory with those field names. + part = partitioning( + schema=pa.schema([schema.field(f) for f in part]), + flavor=flavor + ) + elif part is None: + part = partitioning(pa.schema([]), flavor=flavor) + + if not isinstance(part, Partitioning): + raise ValueError( + "partitioning must be a Partitioning object or " + "a list of column names" + ) + + return part + + +def write_dataset(data, base_dir, basename_template=None, format=None, + partitioning=None, partitioning_flavor=None, schema=None, + filesystem=None, file_options=None, use_threads=True, + max_partitions=None, file_visitor=None, + existing_data_behavior='error'): + """ + Write a dataset to a given format and partitioning. + + Parameters + ---------- + data : Dataset, Table/RecordBatch, RecordBatchReader, list of + Table/RecordBatch, or iterable of RecordBatch + The data to write. This can be a Dataset instance or + in-memory Arrow data. If an iterable is given, the schema must + also be given. + base_dir : str + The root directory where to write the dataset. + basename_template : str, optional + A template string used to generate basenames of written data files. + The token '{i}' will be replaced with an automatically incremented + integer. If not specified, it defaults to + "part-{i}." + format.default_extname + format : FileFormat or str + The format in which to write the dataset. Currently supported: + "parquet", "ipc"/"feather". If a FileSystemDataset is being written + and `format` is not specified, it defaults to the same format as the + specified FileSystemDataset. When writing a Table or RecordBatch, this + keyword is required. + partitioning : Partitioning or list[str], optional + The partitioning scheme specified with the ``partitioning()`` + function or a list of field names. When providing a list of + field names, you can use ``partitioning_flavor`` to drive which + partitioning type should be used. + partitioning_flavor : str, optional + One of the partitioning flavors supported by + ``pyarrow.dataset.partitioning``. If omitted will use the + default of ``partitioning()`` which is directory partitioning. + schema : Schema, optional + filesystem : FileSystem, optional + file_options : FileWriteOptions, optional + FileFormat specific write options, created using the + ``FileFormat.make_write_options()`` function. + use_threads : bool, default True + Write files in parallel. If enabled, then maximum parallelism will be + used determined by the number of available CPU cores. + max_partitions : int, default 1024 + Maximum number of partitions any batch may be written into. + file_visitor : Function + If set, this function will be called with a WrittenFile instance + for each file created during the call. This object will have both + a path attribute and a metadata attribute. + + The path attribute will be a string containing the path to + the created file. + + The metadata attribute will be the parquet metadata of the file. + This metadata will have the file path attribute set and can be used + to build a _metadata file. The metadata attribute will be None if + the format is not parquet. + + Example visitor which simple collects the filenames created:: + + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) + existing_data_behavior : 'error' | 'overwrite_or_ignore' | \ +'delete_matching' + Controls how the dataset will handle data that already exists in + the destination. The default behavior ('error') is to raise an error + if any data exists in the destination. + + 'overwrite_or_ignore' will ignore any existing data and will + overwrite files with the same name as an output file. Other + existing files will be ignored. This behavior, in combination + with a unique basename_template for each write, will allow for + an append workflow. + + 'delete_matching' is useful when you are writing a partitioned + dataset. The first time each partition directory is encountered + the entire directory will be deleted. This allows you to overwrite + old partitions completely. + """ + from pyarrow.fs import _resolve_filesystem_and_path + + if isinstance(data, (list, tuple)): + schema = schema or data[0].schema + data = InMemoryDataset(data, schema=schema) + elif isinstance(data, (pa.RecordBatch, pa.Table)): + schema = schema or data.schema + data = InMemoryDataset(data, schema=schema) + elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data): + data = Scanner.from_batches(data, schema=schema, use_async=True) + schema = None + elif not isinstance(data, (Dataset, Scanner)): + raise ValueError( + "Only Dataset, Scanner, Table/RecordBatch, RecordBatchReader, " + "a list of Tables/RecordBatches, or iterable of batches are " + "supported." + ) + + if format is None and isinstance(data, FileSystemDataset): + format = data.format + else: + format = _ensure_format(format) + + if file_options is None: + file_options = format.make_write_options() + + if format != file_options.format: + raise TypeError("Supplied FileWriteOptions have format {}, " + "which doesn't match supplied FileFormat {}".format( + format, file_options)) + + if basename_template is None: + basename_template = "part-{i}." + format.default_extname + + if max_partitions is None: + max_partitions = 1024 + + # at this point data is a Scanner or a Dataset, anything else + # was converted to one of those two. So we can grab the schema + # to build the partitioning object from Dataset. + if isinstance(data, Scanner): + partitioning_schema = data.dataset_schema + else: + partitioning_schema = data.schema + partitioning = _ensure_write_partitioning(partitioning, + schema=partitioning_schema, + flavor=partitioning_flavor) + + filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem) + + if isinstance(data, Dataset): + scanner = data.scanner(use_threads=use_threads, use_async=True) + else: + # scanner was passed directly by the user, in which case a schema + # cannot be passed + if schema is not None: + raise ValueError("Cannot specify a schema when writing a Scanner") + scanner = data + + _filesystemdataset_write( + scanner, base_dir, basename_template, filesystem, partitioning, + file_options, max_partitions, file_visitor, existing_data_behavior + ) |