summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/feather.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/python/pyarrow/feather.py')
-rw-r--r--src/arrow/python/pyarrow/feather.py265
1 files changed, 265 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/feather.py b/src/arrow/python/pyarrow/feather.py
new file mode 100644
index 000000000..2170a93c3
--- /dev/null
+++ b/src/arrow/python/pyarrow/feather.py
@@ -0,0 +1,265 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+from pyarrow.pandas_compat import _pandas_api # noqa
+from pyarrow.lib import (Codec, Table, # noqa
+ concat_tables, schema)
+import pyarrow.lib as ext
+from pyarrow import _feather
+from pyarrow._feather import FeatherError # noqa: F401
+from pyarrow.vendored.version import Version
+
+
+def _check_pandas_version():
+ if _pandas_api.loose_version < Version('0.17.0'):
+ raise ImportError("feather requires pandas >= 0.17.0")
+
+
+class FeatherDataset:
+ """
+ Encapsulates details of reading a list of Feather files.
+
+ Parameters
+ ----------
+ path_or_paths : List[str]
+ A list of file names
+ validate_schema : bool, default True
+ Check that individual file schemas are all the same / compatible
+ """
+
+ def __init__(self, path_or_paths, validate_schema=True):
+ self.paths = path_or_paths
+ self.validate_schema = validate_schema
+
+ def read_table(self, columns=None):
+ """
+ Read multiple feather files as a single pyarrow.Table
+
+ Parameters
+ ----------
+ columns : List[str]
+ Names of columns to read from the file
+
+ Returns
+ -------
+ pyarrow.Table
+ Content of the file as a table (of columns)
+ """
+ _fil = read_table(self.paths[0], columns=columns)
+ self._tables = [_fil]
+ self.schema = _fil.schema
+
+ for path in self.paths[1:]:
+ table = read_table(path, columns=columns)
+ if self.validate_schema:
+ self.validate_schemas(path, table)
+ self._tables.append(table)
+ return concat_tables(self._tables)
+
+ def validate_schemas(self, piece, table):
+ if not self.schema.equals(table.schema):
+ raise ValueError('Schema in {!s} was different. \n'
+ '{!s}\n\nvs\n\n{!s}'
+ .format(piece, self.schema,
+ table.schema))
+
+ def read_pandas(self, columns=None, use_threads=True):
+ """
+ Read multiple Parquet files as a single pandas DataFrame
+
+ Parameters
+ ----------
+ columns : List[str]
+ Names of columns to read from the file
+ use_threads : bool, default True
+ Use multiple threads when converting to pandas
+
+ Returns
+ -------
+ pandas.DataFrame
+ Content of the file as a pandas DataFrame (of columns)
+ """
+ _check_pandas_version()
+ return self.read_table(columns=columns).to_pandas(
+ use_threads=use_threads)
+
+
+def check_chunked_overflow(name, col):
+ if col.num_chunks == 1:
+ return
+
+ if col.type in (ext.binary(), ext.string()):
+ raise ValueError("Column '{}' exceeds 2GB maximum capacity of "
+ "a Feather binary column. This restriction may be "
+ "lifted in the future".format(name))
+ else:
+ # TODO(wesm): Not sure when else this might be reached
+ raise ValueError("Column '{}' of type {} was chunked on conversion "
+ "to Arrow and cannot be currently written to "
+ "Feather format".format(name, str(col.type)))
+
+
+_FEATHER_SUPPORTED_CODECS = {'lz4', 'zstd', 'uncompressed'}
+
+
+def write_feather(df, dest, compression=None, compression_level=None,
+ chunksize=None, version=2):
+ """
+ Write a pandas.DataFrame to Feather format.
+
+ Parameters
+ ----------
+ df : pandas.DataFrame or pyarrow.Table
+ Data to write out as Feather format.
+ dest : str
+ Local destination path.
+ compression : string, default None
+ Can be one of {"zstd", "lz4", "uncompressed"}. The default of None uses
+ LZ4 for V2 files if it is available, otherwise uncompressed.
+ compression_level : int, default None
+ Use a compression level particular to the chosen compressor. If None
+ use the default compression level
+ chunksize : int, default None
+ For V2 files, the internal maximum size of Arrow RecordBatch chunks
+ when writing the Arrow IPC file format. None means use the default,
+ which is currently 64K
+ version : int, default 2
+ Feather file version. Version 2 is the current. Version 1 is the more
+ limited legacy format
+ """
+ if _pandas_api.have_pandas:
+ _check_pandas_version()
+ if (_pandas_api.has_sparse and
+ isinstance(df, _pandas_api.pd.SparseDataFrame)):
+ df = df.to_dense()
+
+ if _pandas_api.is_data_frame(df):
+ table = Table.from_pandas(df, preserve_index=False)
+
+ if version == 1:
+ # Version 1 does not chunking
+ for i, name in enumerate(table.schema.names):
+ col = table[i]
+ check_chunked_overflow(name, col)
+ else:
+ table = df
+
+ if version == 1:
+ if len(table.column_names) > len(set(table.column_names)):
+ raise ValueError("cannot serialize duplicate column names")
+
+ if compression is not None:
+ raise ValueError("Feather V1 files do not support compression "
+ "option")
+
+ if chunksize is not None:
+ raise ValueError("Feather V1 files do not support chunksize "
+ "option")
+ else:
+ if compression is None and Codec.is_available('lz4_frame'):
+ compression = 'lz4'
+ elif (compression is not None and
+ compression not in _FEATHER_SUPPORTED_CODECS):
+ raise ValueError('compression="{}" not supported, must be '
+ 'one of {}'.format(compression,
+ _FEATHER_SUPPORTED_CODECS))
+
+ try:
+ _feather.write_feather(table, dest, compression=compression,
+ compression_level=compression_level,
+ chunksize=chunksize, version=version)
+ except Exception:
+ if isinstance(dest, str):
+ try:
+ os.remove(dest)
+ except os.error:
+ pass
+ raise
+
+
+def read_feather(source, columns=None, use_threads=True, memory_map=True):
+ """
+ Read a pandas.DataFrame from Feather format. To read as pyarrow.Table use
+ feather.read_table.
+
+ Parameters
+ ----------
+ source : str file path, or file-like object
+ columns : sequence, optional
+ Only read a specific set of columns. If not provided, all columns are
+ read.
+ use_threads : bool, default True
+ Whether to parallelize reading using multiple threads. If false the
+ restriction is only used in the conversion to Pandas and not in the
+ reading from Feather format.
+ memory_map : boolean, default True
+ Use memory mapping when opening file on disk
+
+ Returns
+ -------
+ df : pandas.DataFrame
+ """
+ _check_pandas_version()
+ return (read_table(source, columns=columns, memory_map=memory_map)
+ .to_pandas(use_threads=use_threads))
+
+
+def read_table(source, columns=None, memory_map=True):
+ """
+ Read a pyarrow.Table from Feather format
+
+ Parameters
+ ----------
+ source : str file path, or file-like object
+ columns : sequence, optional
+ Only read a specific set of columns. If not provided, all columns are
+ read.
+ memory_map : boolean, default True
+ Use memory mapping when opening file on disk
+
+ Returns
+ -------
+ table : pyarrow.Table
+ """
+ reader = _feather.FeatherReader(source, use_memory_map=memory_map)
+
+ if columns is None:
+ return reader.read()
+
+ column_types = [type(column) for column in columns]
+ if all(map(lambda t: t == int, column_types)):
+ table = reader.read_indices(columns)
+ elif all(map(lambda t: t == str, column_types)):
+ table = reader.read_names(columns)
+ else:
+ column_type_names = [t.__name__ for t in column_types]
+ raise TypeError("Columns must be indices or names. "
+ "Got columns {} of types {}"
+ .format(columns, column_type_names))
+
+ # Feather v1 already respects the column selection
+ if reader.version < 3:
+ return table
+ # Feather v2 reads with sorted / deduplicated selection
+ elif sorted(set(columns)) == columns:
+ return table
+ else:
+ # follow exact order / selection of names
+ return table.select(columns)