diff options
Diffstat (limited to 'src/arrow/docs/source/python/dataset.rst')
-rw-r--r-- | src/arrow/docs/source/python/dataset.rst | 626 |
1 files changed, 626 insertions, 0 deletions
diff --git a/src/arrow/docs/source/python/dataset.rst b/src/arrow/docs/source/python/dataset.rst new file mode 100644 index 000000000..e2d8c900b --- /dev/null +++ b/src/arrow/docs/source/python/dataset.rst @@ -0,0 +1,626 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. currentmodule:: pyarrow.dataset + +.. _dataset: + +Tabular Datasets +================ + +.. warning:: + + The ``pyarrow.dataset`` module is experimental (specifically the classes), + and a stable API is not yet guaranteed. + +The ``pyarrow.dataset`` module provides functionality to efficiently work with +tabular, potentially larger than memory, and multi-file datasets. This includes: + +* A unified interface that supports different sources and file formats + (Parquet, ORC, Feather / Arrow IPC, and CSV files) and different file systems + (local, cloud). +* Discovery of sources (crawling directories, handle directory-based partitioned + datasets, basic schema normalization, ..) +* Optimized reading with predicate pushdown (filtering rows), projection + (selecting and deriving columns), and optionally parallel reading. + +Currently, only Parquet, ORC, Feather / Arrow IPC, and CSV files are +supported. The goal is to expand this in the future to other file formats and +data sources (e.g. database connections). + +For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for +reading Parquet datasets: ``pyarrow.dataset``'s goal is similar but not specific +to the Parquet format and not tied to Python: the same datasets API is exposed +in the R bindings or Arrow. In addition ``pyarrow.dataset`` boasts improved +performance and new features (e.g. filtering within files rather than only on +partition keys). + + +Reading Datasets +---------------- + +.. TODO Full blown example with NYC taxi data to show off, afterwards explain all parts: + +For the examples below, let's create a small dataset consisting +of a directory with two parquet files: + +.. ipython:: python + + import tempfile + import pathlib + import pyarrow as pa + import pyarrow.parquet as pq + import numpy as np + + base = pathlib.Path(tempfile.gettempdir()) + (base / "parquet_dataset").mkdir(exist_ok=True) + + # creating an Arrow Table + table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5}) + + # writing it into two parquet files + pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet") + pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet") + +Dataset discovery +~~~~~~~~~~~~~~~~~ + +A :class:`Dataset` object can be created with the :func:`dataset` function. We +can pass it the path to the directory containing the data files: + +.. ipython:: python + + import pyarrow.dataset as ds + dataset = ds.dataset(base / "parquet_dataset", format="parquet") + dataset + +In addition to searching a base directory, :func:`dataset` accepts a path to a +single file or a list of file paths. + +Creating a :class:`Dataset` object does not begin reading the data itself. If +needed, it only crawls the directory to find all the files: + +.. ipython:: python + + dataset.files + +... and infers the dataset's schema (by default from the first file): + +.. ipython:: python + + print(dataset.schema.to_string(show_field_metadata=False)) + +Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion +of it) into a pyarrow Table (note that depending on the size of your dataset +this can require a lot of memory, see below on filtering / iterative loading): + +.. ipython:: python + + dataset.to_table() + # converting to pandas to see the contents of the scanned table + dataset.to_table().to_pandas() + +Reading different file formats +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The above examples use Parquet files as dataset sources but the Dataset API +provides a consistent interface across multiple file formats and filesystems. +Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats are +supported; more formats are planned in the future. + +If we save the table as Feather files instead of Parquet files: + +.. ipython:: python + + import pyarrow.feather as feather + + feather.write_feather(table, base / "data.feather") + +…then we can read the Feather file using the same functions, but with specifying +``format="feather"``: + +.. ipython:: python + + dataset = ds.dataset(base / "data.feather", format="feather") + dataset.to_table().to_pandas().head() + +Customizing file formats +~~~~~~~~~~~~~~~~~~~~~~~~ + +The format name as a string, like:: + + ds.dataset(..., format="parquet") + +is short hand for a default constructed :class:`ParquetFileFormat`:: + + ds.dataset(..., format=ds.ParquetFileFormat()) + +The :class:`FileFormat` objects can be customized using keywords. For example:: + + parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) + ds.dataset(..., format=parquet_format) + +Will configure column ``"a"`` to be dictionary encoded on scan. + +Filtering data +-------------- + +To avoid reading all data when only needing a subset, the ``columns`` and +``filter`` keywords can be used. + +The ``columns`` keyword can be used to only read the specified columns: + +.. ipython:: python + + dataset = ds.dataset(base / "parquet_dataset", format="parquet") + dataset.to_table(columns=['a', 'b']).to_pandas() + +With the ``filter`` keyword, rows which do not match the filter predicate will +not be included in the returned table. The keyword expects a boolean +:class:`Expression` referencing at least one of the columns: + +.. ipython:: python + + dataset.to_table(filter=ds.field('a') >= 7).to_pandas() + dataset.to_table(filter=ds.field('c') == 2).to_pandas() + +The easiest way to construct those :class:`Expression` objects is by using the +:func:`field` helper function. Any column - not just partition columns - can be +referenced using the :func:`field` function (which creates a +:class:`FieldExpression`). Operator overloads are provided to compose filters +including the comparisons (equal, larger/less than, etc), set membership +testing, and boolean combinations (``&``, ``|``, ``~``): + +.. ipython:: python + + ds.field('a') != 3 + ds.field('a').isin([1, 2, 3]) + (ds.field('a') > ds.field('b')) & (ds.field('b') > 1) + +Note that :class:`Expression` objects can **not** be combined by python logical +operators ``and``, ``or`` and ``not``. + +Projecting columns +------------------ + +The ``columns`` keyword can be used to read a subset of the columns of the +dataset by passing it a list of column names. The keyword can also be used +for more complex projections in combination with expressions. + +In this case, we pass it a dictionary with the keys being the resulting +column names and the values the expression that is used to construct the column +values: + +.. ipython:: python + + projection = { + "a_renamed": ds.field("a"), + "b_as_float32": ds.field("b").cast("float32"), + "c_1": ds.field("c") == 1, + } + dataset.to_table(columns=projection).to_pandas().head() + +The dictionary also determines the column selection (only the keys in the +dictionary will be present as columns in the resulting table). If you want +to include a derived column in *addition* to the existing columns, you can +build up the dictionary from the dataset schema: + +.. ipython:: python + + projection = {col: ds.field(col) for col in dataset.schema.names} + projection.update({"b_large": ds.field("b") > 1}) + dataset.to_table(columns=projection).to_pandas().head() + + +Reading partitioned data +------------------------ + +Above, a dataset consisting of a flat directory with files was shown. However, a +dataset can exploit a nested directory structure defining a partitioned dataset, +where the sub-directory names hold information about which subset of the data is +stored in that directory. + +For example, a dataset partitioned by year and month may look like on disk: + +.. code-block:: text + + dataset_name/ + year=2007/ + month=01/ + data0.parquet + data1.parquet + ... + month=02/ + data0.parquet + data1.parquet + ... + month=03/ + ... + year=2008/ + month=01/ + ... + ... + +The above partitioning scheme is using "/key=value/" directory names, as found +in Apache Hive. + +Let's create a small partitioned dataset. The :func:`~pyarrow.parquet.write_to_dataset` +function can write such hive-like partitioned datasets. + +.. ipython:: python + + table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5, + 'part': ['a'] * 5 + ['b'] * 5}) + pq.write_to_dataset(table, str(base / "parquet_dataset_partitioned"), + partition_cols=['part']) + +The above created a directory with two subdirectories ("part=a" and "part=b"), +and the Parquet files written in those directories no longer include the "part" +column. + +Reading this dataset with :func:`dataset`, we now specify that the dataset +should use a hive-like partitioning scheme with the ``partitioning`` keyword: + +.. ipython:: python + + dataset = ds.dataset(str(base / "parquet_dataset_partitioned"), format="parquet", + partitioning="hive") + dataset.files + +Although the partition fields are not included in the actual Parquet files, +they will be added back to the resulting table when scanning this dataset: + +.. ipython:: python + + dataset.to_table().to_pandas().head(3) + +We can now filter on the partition keys, which avoids loading files +altogether if they do not match the filter: + +.. ipython:: python + + dataset.to_table(filter=ds.field("part") == "b").to_pandas() + + +Different partitioning schemes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The above example uses a hive-like directory scheme, such as "/year=2009/month=11/day=15". +We specified this passing the ``partitioning="hive"`` keyword. In this case, +the types of the partition keys are inferred from the file paths. + +It is also possible to explicitly define the schema of the partition keys +using the :func:`partitioning` function. For example: + +.. code-block:: python + + part = ds.partitioning( + pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]), + flavor="hive" + ) + dataset = ds.dataset(..., partitioning=part) + +"Directory partitioning" is also supported, where the segments in the file path +represent the values of the partition keys without including the name (the +field name are implicit in the segment's index). For example, given field names +"year", "month", and "day", one path might be "/2019/11/15". + +Since the names are not included in the file paths, these must be specified +when constructing a directory partitioning: + +.. code-block:: python + + part = ds.partitioning(field_names=["year", "month", "day"]) + +Directory partitioning also supports providing a full schema rather than inferring +types from file paths. + + +Reading from cloud storage +-------------------------- + +In addition to local files, pyarrow also supports reading from cloud storage. +Currently, :class:`HDFS <pyarrow.fs.HadoopFileSystem>` and +:class:`Amazon S3-compatible storage <pyarrow.fs.S3FileSystem>` are supported. + +When passing a file URI, the file system will be inferred. For example, +specifying a S3 path: + +.. code-block:: python + + dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"]) + +Typically, you will want to customize the connection parameters, and then +a file system object can be created and passed to the ``filesystem`` keyword: + +.. code-block:: python + + from pyarrow import fs + + s3 = fs.S3FileSystem(region="us-east-2") + dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=s3, + partitioning=["year", "month"]) + +The currently available classes are :class:`~pyarrow.fs.S3FileSystem` and +:class:`~pyarrow.fs.HadoopFileSystem`. See the :ref:`filesystem` docs for more +details. + + +Reading from Minio +------------------ + +In addition to cloud storage, pyarrow also supports reading from a +`MinIO <https://github.com/minio/minio>`_ object storage instance emulating S3 +APIs. Paired with `toxiproxy <https://github.com/shopify/toxiproxy>`_, this is +useful for testing or benchmarking. + +.. code-block:: python + + from pyarrow import fs + + # By default, MinIO will listen for unencrypted HTTP traffic. + minio = fs.S3FileSystem(scheme="http", endpoint="localhost:9000") + dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=minio, + partitioning=["year", "month"]) + + +Working with Parquet Datasets +----------------------------- + +While the Datasets API provides a unified interface to different file formats, +some specific methods exist for Parquet Datasets. + +Some processing frameworks such as Dask (optionally) use a ``_metadata`` file +with partitioned datasets which includes information about the schema and the +row group metadata of the full dataset. Using such a file can give a more +efficient creation of a parquet Dataset, since it does not need to infer the +schema and crawl the directories for all Parquet files (this is especially the +case for filesystems where accessing files is expensive). The +:func:`parquet_dataset` function allows us to create a Dataset from a partitioned +dataset with a ``_metadata`` file: + +.. code-block:: python + + dataset = ds.parquet_dataset("/path/to/dir/_metadata") + +By default, the constructed :class:`Dataset` object for Parquet datasets maps +each fragment to a single Parquet file. If you want fragments mapping to each +row group of a Parquet file, you can use the ``split_by_row_group()`` method of +the fragments: + +.. code-block:: python + + fragments = list(dataset.get_fragments()) + fragments[0].split_by_row_group() + +This method returns a list of new Fragments mapping to each row group of +the original Fragment (Parquet file). Both ``get_fragments()`` and +``split_by_row_group()`` accept an optional filter expression to get a +filtered list of fragments. + + +Manual specification of the Dataset +----------------------------------- + +The :func:`dataset` function allows easy creation of a Dataset viewing a directory, +crawling all subdirectories for files and partitioning information. However +sometimes discovery is not required and the dataset's files and partitions +are already known (for example, when this information is stored in metadata). +In this case it is possible to create a Dataset explicitly without any +automatic discovery or inference. + +For the example here, we are going to use a dataset where the file names contain +additional partitioning information: + +.. ipython:: python + + # creating a dummy dataset: directory with two files + table = pa.table({'col1': range(3), 'col2': np.random.randn(3)}) + (base / "parquet_dataset_manual").mkdir(exist_ok=True) + pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet") + pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet") + +To create a Dataset from a list of files, we need to specify the paths, schema, +format, filesystem, and partition expressions manually: + +.. ipython:: python + + from pyarrow import fs + + schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())]) + + dataset = ds.FileSystemDataset.from_paths( + ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(), + filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()), + partitions=[ds.field('year') == 2018, ds.field('year') == 2019]) + +Since we specified the "partition expressions" for our files, this information +is materialized as columns when reading the data and can be used for filtering: + +.. ipython:: python + + dataset.to_table().to_pandas() + dataset.to_table(filter=ds.field('year') == 2019).to_pandas() + +Another benefit of manually listing the files is that the order of the files +controls the order of the data. When performing an ordered read (or a read to +a table) then the rows returned will match the order of the files given. This +only applies when the dataset is constructed with a list of files. There +are no order guarantees given when the files are instead discovered by scanning +a directory. + +Iterative (out of core or streaming) reads +------------------------------------------ + +The previous examples have demonstrated how to read the data into a table using :func:`~Dataset.to_table`. This is +useful if the dataset is small or there is only a small amount of data that needs to +be read. The dataset API contains additional methods to read and process large amounts +of data in a streaming fashion. + +The easiest way to do this is to use the method :meth:`Dataset.to_batches`. This +method returns an iterator of record batches. For example, we can use this method to +calculate the average of a column without loading the entire column into memory: + +.. ipython:: python + + import pyarrow.compute as pc + + col2_sum = 0 + count = 0 + for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()): + col2_sum += pc.sum(batch.column("col2")).as_py() + count += batch.num_rows + mean_a = col2_sum/count + +Customizing the batch size +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +An iterative read of a dataset is often called a "scan" of the dataset and pyarrow +uses an object called a :class:`Scanner` to do this. A Scanner is created for you +automatically by the :func:`~Dataset.to_table` and :func:`~Dataset.to_batches` method of the dataset. +Any arguments you pass to these methods will be passed on to the Scanner constructor. + +One of those parameters is the ``batch_size``. This controls the maximum size of the +batches returned by the scanner. Batches can still be smaller than the ``batch_size`` +if the dataset consists of small files or those files themselves consist of small +row groups. For example, a parquet file with 10,000 rows per row group will yield +batches with, at most, 10,000 rows unless the ``batch_size`` is set to a smaller value. + +The default batch size is one million rows and this is typically a good default but +you may want to customize it if you are reading a large number of columns. + +Writing Datasets +---------------- + +The dataset API also simplifies writing data to a dataset using :func:`write_dataset` . This can be useful when +you want to partition your data or you need to write a large amount of data. A +basic dataset write is similar to writing a table except that you specify a directory +instead of a filename. + +.. ipython:: python + + base = pathlib.Path(tempfile.gettempdir()) + dataset_root = base / "sample_dataset" + dataset_root.mkdir(exist_ok=True) + + table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5}) + ds.write_dataset(table, dataset_root, format="parquet") + +The above example will create a single file named part-0.parquet in our sample_dataset +directory. + +.. warning:: + + If you run the example again it will replace the existing part-0.parquet file. + Appending files to an existing dataset requires specifying a new + ``basename_template`` for each call to ``ds.write_dataset`` + to avoid overwrite. + +Writing partitioned data +~~~~~~~~~~~~~~~~~~~~~~~~ + +A partitioning object can be used to specify how your output data should be partitioned. +This uses the same kind of partitioning objects we used for reading datasets. To write +our above data out to a partitioned directory we only need to specify how we want the +dataset to be partitioned. For example: + +.. ipython:: python + + part = ds.partitioning( + pa.schema([("c", pa.int16())]), flavor="hive" + ) + ds.write_dataset(table, dataset_root, format="parquet", partitioning=part) + +This will create two files. Half our data will be in the dataset_root/c=1 directory and +the other half will be in the dataset_root/c=2 directory. + +Writing large amounts of data +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The above examples wrote data from a table. If you are writing a large amount of data +you may not be able to load everything into a single in-memory table. Fortunately, the +:func:`~Dataset.write_dataset` method also accepts an iterable of record batches. This makes it really +simple, for example, to repartition a large dataset without loading the entire dataset +into memory: + +.. ipython:: python + + old_part = ds.partitioning( + pa.schema([("c", pa.int16())]), flavor="hive" + ) + new_part = ds.partitioning( + pa.schema([("c", pa.int16())]), flavor=None + ) + input_dataset = ds.dataset(dataset_root, partitioning=old_part) + new_root = base / "repartitioned_dataset" + # A scanner can act as an iterator of record batches but you could also receive + # data from the network (e.g. via flight), from your own scanning, or from any + # other method that yields record batches. In addition, you can pass a dataset + # into write_dataset directly but this method is useful if you want to customize + # the scanner (e.g. to filter the input dataset or set a maximum batch size) + scanner = input_dataset.scanner(use_async=True) + + ds.write_dataset(scanner, new_root, format="parquet", partitioning=new_part) + +After the above example runs our data will be in dataset_root/1 and dataset_root/2 +directories. In this simple example we are not changing the structure of the data +(only the directory naming schema) but you could also use this mechnaism to change +which columns are used to partition the dataset. This is useful when you expect to +query your data in specific ways and you can utilize partitioning to reduce the +amount of data you need to read. + +Customizing & inspecting written files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By default the dataset API will create files named "part-i.format" where "i" is a integer +generated during the write and "format" is the file format specified in the write_dataset +call. For simple datasets it may be possible to know which files will be created but for +larger or partitioned datasets it is not so easy. The ``file_visitor`` keyword can be used +to supply a visitor that will be called as each file is created: + +.. ipython:: python + + def file_visitor(written_file): + print(f"path={written_file.path}") + print(f"metadata={written_file.metadata}") + +.. ipython:: python + + ds.write_dataset(table, base / "dataset_visited", format="parquet", partitioning=part, + file_visitor=file_visitor) + +This will allow you to collect the filenames that belong to the dataset and store them elsewhere +which can be useful when you want to avoid scanning directories the next time you need to read +the data. It can also be used to generate the _metadata index file used by other tools such as +dask or spark to create an index of the dataset. + +Configuring format-specific parameters during a write +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In addition to the common options shared by all formats there are also format specific options +that are unique to a particular format. For example, to allow truncated timestamps while writing +Parquet files: + +.. ipython:: python + + dataset_root = base / "sample_dataset2" + dataset_root.mkdir(exist_ok=True) + + parquet_format = ds.ParquetFileFormat() + write_options = parquet_format.make_write_options(allow_truncated_timestamps=True) + ds.write_dataset(table, dataset_root, format="parquet", partitioning=part, + file_options=write_options) |