# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # Arrow file and stream reader/writer classes, and other messaging tools import os import pyarrow as pa from pyarrow.lib import (IpcWriteOptions, ReadStats, WriteStats, # noqa Message, MessageReader, RecordBatchReader, _ReadPandasMixin, MetadataVersion, read_message, read_record_batch, read_schema, read_tensor, write_tensor, get_record_batch_size, get_tensor_size) import pyarrow.lib as lib class RecordBatchStreamReader(lib._RecordBatchStreamReader): """ Reader for the Arrow streaming binary format. Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. """ def __init__(self, source): self._open(source) _ipc_writer_class_doc = """\ Parameters ---------- sink : str, pyarrow.NativeFile, or file-like Python object Either a file path, or a writable file object. schema : pyarrow.Schema The Arrow schema for data to be written to the file. options : pyarrow.ipc.IpcWriteOptions Options for IPC serialization. If None, default values will be used: the legacy format will not be used unless overridden by setting the environment variable ARROW_PRE_0_15_IPC_FORMAT=1, and the V5 metadata version will be used unless overridden by setting the environment variable ARROW_PRE_1_0_METADATA_VERSION=1. use_legacy_format : bool, default None Deprecated in favor of setting options. Cannot be provided with options. If None, False will be used unless this default is overridden by setting the environment variable ARROW_PRE_0_15_IPC_FORMAT=1""" class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): __doc__ = """Writer for the Arrow streaming binary format {}""".format(_ipc_writer_class_doc) def __init__(self, sink, schema, *, use_legacy_format=None, options=None): options = _get_legacy_format_default(use_legacy_format, options) self._open(sink, schema, options=options) class RecordBatchFileReader(lib._RecordBatchFileReader): """ Class for reading Arrow record batch data from the Arrow binary file format Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data """ def __init__(self, source, footer_offset=None): self._open(source, footer_offset=footer_offset) class RecordBatchFileWriter(lib._RecordBatchFileWriter): __doc__ = """Writer to create the Arrow binary file format {}""".format(_ipc_writer_class_doc) def __init__(self, sink, schema, *, use_legacy_format=None, options=None): options = _get_legacy_format_default(use_legacy_format, options) self._open(sink, schema, options=options) def _get_legacy_format_default(use_legacy_format, options): if use_legacy_format is not None and options is not None: raise ValueError( "Can provide at most one of options and use_legacy_format") elif options: if not isinstance(options, IpcWriteOptions): raise TypeError("expected IpcWriteOptions, got {}" .format(type(options))) return options metadata_version = MetadataVersion.V5 if use_legacy_format is None: use_legacy_format = \ bool(int(os.environ.get('ARROW_PRE_0_15_IPC_FORMAT', '0'))) if bool(int(os.environ.get('ARROW_PRE_1_0_METADATA_VERSION', '0'))): metadata_version = MetadataVersion.V4 return IpcWriteOptions(use_legacy_format=use_legacy_format, metadata_version=metadata_version) def new_stream(sink, schema, *, use_legacy_format=None, options=None): return RecordBatchStreamWriter(sink, schema, use_legacy_format=use_legacy_format, options=options) new_stream.__doc__ = """\ Create an Arrow columnar IPC stream writer instance {}""".format(_ipc_writer_class_doc) def open_stream(source): """ Create reader for Arrow streaming format. Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. Returns ------- reader : RecordBatchStreamReader """ return RecordBatchStreamReader(source) def new_file(sink, schema, *, use_legacy_format=None, options=None): return RecordBatchFileWriter(sink, schema, use_legacy_format=use_legacy_format, options=options) new_file.__doc__ = """\ Create an Arrow columnar IPC file writer instance {}""".format(_ipc_writer_class_doc) def open_file(source, footer_offset=None): """ Create reader for Arrow file format. Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object. footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data. Returns ------- reader : RecordBatchFileReader """ return RecordBatchFileReader(source, footer_offset=footer_offset) def serialize_pandas(df, *, nthreads=None, preserve_index=None): """ Serialize a pandas DataFrame into a buffer protocol compatible object. Parameters ---------- df : pandas.DataFrame nthreads : int, default None Number of threads to use for conversion to Arrow, default all CPUs. preserve_index : bool, default None The default of None will store the index as a column, except for RangeIndex which is stored as metadata only. If True, always preserve the pandas index data as a column. If False, no index information is saved and the result will have a default RangeIndex. Returns ------- buf : buffer An object compatible with the buffer protocol. """ batch = pa.RecordBatch.from_pandas(df, nthreads=nthreads, preserve_index=preserve_index) sink = pa.BufferOutputStream() with pa.RecordBatchStreamWriter(sink, batch.schema) as writer: writer.write_batch(batch) return sink.getvalue() def deserialize_pandas(buf, *, use_threads=True): """Deserialize a buffer protocol compatible object into a pandas DataFrame. Parameters ---------- buf : buffer An object compatible with the buffer protocol. use_threads : bool, default True Whether to parallelize the conversion using multiple threads. Returns ------- df : pandas.DataFrame """ buffer_reader = pa.BufferReader(buf) with pa.RecordBatchStreamReader(buffer_reader) as reader: table = reader.read_all() return table.to_pandas(use_threads=use_threads)