# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from collections import namedtuple import warnings cpdef enum MetadataVersion: V1 = CMetadataVersion_V1 V2 = CMetadataVersion_V2 V3 = CMetadataVersion_V3 V4 = CMetadataVersion_V4 V5 = CMetadataVersion_V5 cdef object _wrap_metadata_version(CMetadataVersion version): return MetadataVersion( version) cdef CMetadataVersion _unwrap_metadata_version( MetadataVersion version) except *: if version == MetadataVersion.V1: return CMetadataVersion_V1 elif version == MetadataVersion.V2: return CMetadataVersion_V2 elif version == MetadataVersion.V3: return CMetadataVersion_V3 elif version == MetadataVersion.V4: return CMetadataVersion_V4 elif version == MetadataVersion.V5: return CMetadataVersion_V5 raise ValueError("Not a metadata version: " + repr(version)) _WriteStats = namedtuple( 'WriteStats', ('num_messages', 'num_record_batches', 'num_dictionary_batches', 'num_dictionary_deltas', 'num_replaced_dictionaries')) class WriteStats(_WriteStats): """IPC write statistics Parameters ---------- num_messages : number of messages. num_record_batches : number of record batches. num_dictionary_batches : number of dictionary batches. num_dictionary_deltas : delta of dictionaries. num_replaced_dictionaries : number of replaced dictionaries. """ __slots__ = () @staticmethod cdef _wrap_write_stats(CIpcWriteStats c): return WriteStats(c.num_messages, c.num_record_batches, c.num_dictionary_batches, c.num_dictionary_deltas, c.num_replaced_dictionaries) _ReadStats = namedtuple( 'ReadStats', ('num_messages', 'num_record_batches', 'num_dictionary_batches', 'num_dictionary_deltas', 'num_replaced_dictionaries')) class ReadStats(_ReadStats): """IPC read statistics Parameters ---------- num_messages : number of messages. num_record_batches : number of record batches. num_dictionary_batches : number of dictionary batches. num_dictionary_deltas : delta of dictionaries. num_replaced_dictionaries : number of replaced dictionaries. """ __slots__ = () @staticmethod cdef _wrap_read_stats(CIpcReadStats c): return ReadStats(c.num_messages, c.num_record_batches, c.num_dictionary_batches, c.num_dictionary_deltas, c.num_replaced_dictionaries) cdef class IpcWriteOptions(_Weakrefable): """ Serialization options for the IPC format. Parameters ---------- metadata_version : MetadataVersion, default MetadataVersion.V5 The metadata version to write. V5 is the current and latest, V4 is the pre-1.0 metadata version (with incompatible Union layout). allow_64bit : bool, default False If true, allow field lengths that don't fit in a signed 32-bit int. use_legacy_format : bool, default False Whether to use the pre-Arrow 0.15 IPC format. compression : str, Codec, or None compression codec to use for record batch buffers. If None then batch buffers will be uncompressed. Must be "lz4", "zstd" or None. To specify a compression_level use `pyarrow.Codec` use_threads : bool Whether to use the global CPU thread pool to parallelize any computational tasks like compression. emit_dictionary_deltas : bool Whether to emit dictionary deltas. Default is false for maximum stream compatibility. """ __slots__ = () # cdef block is in lib.pxd def __init__(self, *, metadata_version=MetadataVersion.V5, bint allow_64bit=False, use_legacy_format=False, compression=None, bint use_threads=True, bint emit_dictionary_deltas=False): self.c_options = CIpcWriteOptions.Defaults() self.allow_64bit = allow_64bit self.use_legacy_format = use_legacy_format self.metadata_version = metadata_version if compression is not None: self.compression = compression self.use_threads = use_threads self.emit_dictionary_deltas = emit_dictionary_deltas @property def allow_64bit(self): return self.c_options.allow_64bit @allow_64bit.setter def allow_64bit(self, bint value): self.c_options.allow_64bit = value @property def use_legacy_format(self): return self.c_options.write_legacy_ipc_format @use_legacy_format.setter def use_legacy_format(self, bint value): self.c_options.write_legacy_ipc_format = value @property def metadata_version(self): return _wrap_metadata_version(self.c_options.metadata_version) @metadata_version.setter def metadata_version(self, value): self.c_options.metadata_version = _unwrap_metadata_version(value) @property def compression(self): if self.c_options.codec == nullptr: return None else: return frombytes(self.c_options.codec.get().name()) @compression.setter def compression(self, value): if value is None: self.c_options.codec.reset() elif isinstance(value, str): self.c_options.codec = shared_ptr[CCodec](GetResultValue( CCodec.Create(_ensure_compression(value))).release()) elif isinstance(value, Codec): self.c_options.codec = (value).wrapped else: raise TypeError( "Property `compression` must be None, str, or pyarrow.Codec") @property def use_threads(self): return self.c_options.use_threads @use_threads.setter def use_threads(self, bint value): self.c_options.use_threads = value @property def emit_dictionary_deltas(self): return self.c_options.emit_dictionary_deltas @emit_dictionary_deltas.setter def emit_dictionary_deltas(self, bint value): self.c_options.emit_dictionary_deltas = value cdef class Message(_Weakrefable): """ Container for an Arrow IPC message with metadata and optional body """ def __cinit__(self): pass def __init__(self): raise TypeError("Do not call {}'s constructor directly, use " "`pyarrow.ipc.read_message` function instead." .format(self.__class__.__name__)) @property def type(self): return frombytes(FormatMessageType(self.message.get().type())) @property def metadata(self): return pyarrow_wrap_buffer(self.message.get().metadata()) @property def metadata_version(self): return _wrap_metadata_version(self.message.get().metadata_version()) @property def body(self): cdef shared_ptr[CBuffer] body = self.message.get().body() if body.get() == NULL: return None else: return pyarrow_wrap_buffer(body) def equals(self, Message other): """ Returns True if the message contents (metadata and body) are identical Parameters ---------- other : Message Returns ------- are_equal : bool """ cdef c_bool result with nogil: result = self.message.get().Equals(deref(other.message.get())) return result def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None): """ Write message to generic OutputStream Parameters ---------- sink : NativeFile alignment : int, default 8 Byte alignment for metadata and body memory_pool : MemoryPool, default None Uses default memory pool if not specified """ cdef: int64_t output_length = 0 COutputStream* out CIpcWriteOptions options options.alignment = alignment out = sink.get_output_stream().get() with nogil: check_status(self.message.get() .SerializeTo(out, options, &output_length)) def serialize(self, alignment=8, memory_pool=None): """ Write message as encapsulated IPC message Parameters ---------- alignment : int, default 8 Byte alignment for metadata and body memory_pool : MemoryPool, default None Uses default memory pool if not specified Returns ------- serialized : Buffer """ stream = BufferOutputStream(memory_pool) self.serialize_to(stream, alignment=alignment, memory_pool=memory_pool) return stream.getvalue() def __repr__(self): if self.message == nullptr: return """pyarrow.Message(uninitialized)""" metadata_len = self.metadata.size body = self.body body_len = 0 if body is None else body.size return """pyarrow.Message type: {0} metadata length: {1} body length: {2}""".format(self.type, metadata_len, body_len) cdef class MessageReader(_Weakrefable): """ Interface for reading Message objects from some source (like an InputStream) """ cdef: unique_ptr[CMessageReader] reader def __cinit__(self): pass def __init__(self): raise TypeError("Do not call {}'s constructor directly, use " "`pyarrow.ipc.MessageReader.open_stream` function " "instead.".format(self.__class__.__name__)) @staticmethod def open_stream(source): """ Open stream from source. Parameters ---------- source : a readable source, like an InputStream """ cdef: MessageReader result = MessageReader.__new__(MessageReader) shared_ptr[CInputStream] in_stream unique_ptr[CMessageReader] reader _get_input_stream(source, &in_stream) with nogil: reader = CMessageReader.Open(in_stream) result.reader.reset(reader.release()) return result def __iter__(self): while True: yield self.read_next_message() def read_next_message(self): """ Read next Message from the stream. Raises ------ StopIteration : at end of stream """ cdef Message result = Message.__new__(Message) with nogil: result.message = move(GetResultValue(self.reader.get() .ReadNextMessage())) if result.message.get() == NULL: raise StopIteration return result # ---------------------------------------------------------------------- # File and stream readers and writers cdef class _CRecordBatchWriter(_Weakrefable): """The base RecordBatchWriter wrapper. Provides common implementations of convenience methods. Should not be instantiated directly by user code. """ # cdef block is in lib.pxd def write(self, table_or_batch): """ Write RecordBatch or Table to stream. Parameters ---------- table_or_batch : {RecordBatch, Table} """ if isinstance(table_or_batch, RecordBatch): self.write_batch(table_or_batch) elif isinstance(table_or_batch, Table): self.write_table(table_or_batch) else: raise ValueError(type(table_or_batch)) def write_batch(self, RecordBatch batch): """ Write RecordBatch to stream. Parameters ---------- batch : RecordBatch """ with nogil: check_status(self.writer.get() .WriteRecordBatch(deref(batch.batch))) def write_table(self, Table table, max_chunksize=None, **kwargs): """ Write Table to stream in (contiguous) RecordBatch objects. Parameters ---------- table : Table max_chunksize : int, default None Maximum size for RecordBatch chunks. Individual chunks may be smaller depending on the chunk layout of individual columns. """ cdef: # max_chunksize must be > 0 to have any impact int64_t c_max_chunksize = -1 if 'chunksize' in kwargs: max_chunksize = kwargs['chunksize'] msg = ('The parameter chunksize is deprecated for the write_table ' 'methods as of 0.15, please use parameter ' 'max_chunksize instead') warnings.warn(msg, FutureWarning) if max_chunksize is not None: c_max_chunksize = max_chunksize with nogil: check_status(self.writer.get().WriteTable(table.table[0], c_max_chunksize)) def close(self): """ Close stream and write end-of-stream 0 marker. """ with nogil: check_status(self.writer.get().Close()) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() @property def stats(self): """ Current IPC write statistics. """ if not self.writer: raise ValueError("Operation on closed writer") return _wrap_write_stats(self.writer.get().stats()) cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): cdef: CIpcWriteOptions options bint closed def __cinit__(self): pass def __dealloc__(self): pass @property def _use_legacy_format(self): # For testing (see test_ipc.py) return self.options.write_legacy_ipc_format @property def _metadata_version(self): # For testing (see test_ipc.py) return _wrap_metadata_version(self.options.metadata_version) def _open(self, sink, Schema schema not None, IpcWriteOptions options=IpcWriteOptions()): cdef: shared_ptr[COutputStream] c_sink self.options = options.c_options get_writer(sink, &c_sink) with nogil: self.writer = GetResultValue( MakeStreamWriter(c_sink, schema.sp_schema, self.options)) cdef _get_input_stream(object source, shared_ptr[CInputStream]* out): try: source = as_buffer(source) except TypeError: # Non-buffer-like pass get_input_stream(source, True, out) class _ReadPandasMixin: def read_pandas(self, **options): """ Read contents of stream to a pandas.DataFrame. Read all record batches as a pyarrow.Table then convert it to a pandas.DataFrame using Table.to_pandas. Parameters ---------- **options : arguments to forward to Table.to_pandas Returns ------- df : pandas.DataFrame """ table = self.read_all() return table.to_pandas(**options) cdef class RecordBatchReader(_Weakrefable): """Base class for reading stream of record batches. Provides common implementations of convenience methods. Should not be instantiated directly by user code. """ # cdef block is in lib.pxd def __iter__(self): while True: try: yield self.read_next_batch() except StopIteration: return @property def schema(self): """ Shared schema of the record batches in the stream. """ cdef shared_ptr[CSchema] c_schema with nogil: c_schema = self.reader.get().schema() return pyarrow_wrap_schema(c_schema) def get_next_batch(self): import warnings warnings.warn('Please use read_next_batch instead of ' 'get_next_batch', FutureWarning) return self.read_next_batch() def read_next_batch(self): """ Read next RecordBatch from the stream. Raises ------ StopIteration: At end of stream. """ cdef shared_ptr[CRecordBatch] batch with nogil: check_status(self.reader.get().ReadNext(&batch)) if batch.get() == NULL: raise StopIteration return pyarrow_wrap_batch(batch) def read_all(self): """ Read all record batches as a pyarrow.Table. """ cdef shared_ptr[CTable] table with nogil: check_status(self.reader.get().ReadAll(&table)) return pyarrow_wrap_table(table) read_pandas = _ReadPandasMixin.read_pandas def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): pass def _export_to_c(self, uintptr_t out_ptr): """ Export to a C ArrowArrayStream struct, given its pointer. Parameters ---------- out_ptr: int The raw pointer to a C ArrowArrayStream struct. Be careful: if you don't pass the ArrowArrayStream struct to a consumer, array memory will leak. This is a low-level function intended for expert users. """ with nogil: check_status(ExportRecordBatchReader( self.reader, out_ptr)) @staticmethod def _import_from_c(uintptr_t in_ptr): """ Import RecordBatchReader from a C ArrowArrayStream struct, given its pointer. Parameters ---------- in_ptr: int The raw pointer to a C ArrowArrayStream struct. This is a low-level function intended for expert users. """ cdef: shared_ptr[CRecordBatchReader] c_reader RecordBatchReader self with nogil: c_reader = GetResultValue(ImportRecordBatchReader( in_ptr)) self = RecordBatchReader.__new__(RecordBatchReader) self.reader = c_reader return self @staticmethod def from_batches(schema, batches): """ Create RecordBatchReader from an iterable of batches. Parameters ---------- schema : Schema The shared schema of the record batches batches : Iterable[RecordBatch] The batches that this reader will return. Returns ------- reader : RecordBatchReader """ cdef: shared_ptr[CSchema] c_schema shared_ptr[CRecordBatchReader] c_reader RecordBatchReader self c_schema = pyarrow_unwrap_schema(schema) c_reader = GetResultValue(CPyRecordBatchReader.Make( c_schema, batches)) self = RecordBatchReader.__new__(RecordBatchReader) self.reader = c_reader return self cdef class _RecordBatchStreamReader(RecordBatchReader): cdef: shared_ptr[CInputStream] in_stream CIpcReadOptions options CRecordBatchStreamReader* stream_reader def __cinit__(self): pass def _open(self, source): _get_input_stream(source, &self.in_stream) with nogil: self.reader = GetResultValue(CRecordBatchStreamReader.Open( self.in_stream, self.options)) self.stream_reader = self.reader.get() @property def stats(self): """ Current IPC read statistics. """ if not self.reader: raise ValueError("Operation on closed reader") return _wrap_read_stats(self.stream_reader.stats()) cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): def _open(self, sink, Schema schema not None, IpcWriteOptions options=IpcWriteOptions()): cdef: shared_ptr[COutputStream] c_sink self.options = options.c_options get_writer(sink, &c_sink) with nogil: self.writer = GetResultValue( MakeFileWriter(c_sink, schema.sp_schema, self.options)) cdef class _RecordBatchFileReader(_Weakrefable): cdef: shared_ptr[CRecordBatchFileReader] reader shared_ptr[CRandomAccessFile] file CIpcReadOptions options cdef readonly: Schema schema def __cinit__(self): pass def _open(self, source, footer_offset=None): try: source = as_buffer(source) except TypeError: pass get_reader(source, True, &self.file) cdef int64_t offset = 0 if footer_offset is not None: offset = footer_offset with nogil: if offset != 0: self.reader = GetResultValue( CRecordBatchFileReader.Open2(self.file.get(), offset, self.options)) else: self.reader = GetResultValue( CRecordBatchFileReader.Open(self.file.get(), self.options)) self.schema = pyarrow_wrap_schema(self.reader.get().schema()) @property def num_record_batches(self): return self.reader.get().num_record_batches() def get_batch(self, int i): cdef shared_ptr[CRecordBatch] batch if i < 0 or i >= self.num_record_batches: raise ValueError('Batch number {0} out of range'.format(i)) with nogil: batch = GetResultValue(self.reader.get().ReadRecordBatch(i)) return pyarrow_wrap_batch(batch) # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of # time has passed get_record_batch = get_batch def read_all(self): """ Read all record batches as a pyarrow.Table """ cdef: vector[shared_ptr[CRecordBatch]] batches shared_ptr[CTable] table int i, nbatches nbatches = self.num_record_batches batches.resize(nbatches) with nogil: for i in range(nbatches): batches[i] = GetResultValue(self.reader.get() .ReadRecordBatch(i)) table = GetResultValue( CTable.FromRecordBatches(self.schema.sp_schema, move(batches))) return pyarrow_wrap_table(table) read_pandas = _ReadPandasMixin.read_pandas def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): pass @property def stats(self): """ Current IPC read statistics. """ if not self.reader: raise ValueError("Operation on closed reader") return _wrap_read_stats(self.reader.get().stats()) def get_tensor_size(Tensor tensor): """ Return total size of serialized Tensor including metadata and padding. Parameters ---------- tensor : Tensor The tensor for which we want to known the size. """ cdef int64_t size with nogil: check_status(GetTensorSize(deref(tensor.tp), &size)) return size def get_record_batch_size(RecordBatch batch): """ Return total size of serialized RecordBatch including metadata and padding. Parameters ---------- batch : RecordBatch The recordbatch for which we want to know the size. """ cdef int64_t size with nogil: check_status(GetRecordBatchSize(deref(batch.batch), &size)) return size def write_tensor(Tensor tensor, NativeFile dest): """ Write pyarrow.Tensor to pyarrow.NativeFile object its current position. Parameters ---------- tensor : pyarrow.Tensor dest : pyarrow.NativeFile Returns ------- bytes_written : int Total number of bytes written to the file """ cdef: int32_t metadata_length int64_t body_length handle = dest.get_output_stream() with nogil: check_status( WriteTensor(deref(tensor.tp), handle.get(), &metadata_length, &body_length)) return metadata_length + body_length cdef NativeFile as_native_file(source): if not isinstance(source, NativeFile): if hasattr(source, 'read'): source = PythonFile(source) else: source = BufferReader(source) if not isinstance(source, NativeFile): raise ValueError('Unable to read message from object with type: {0}' .format(type(source))) return source def read_tensor(source): """Read pyarrow.Tensor from pyarrow.NativeFile object from current position. If the file source supports zero copy (e.g. a memory map), then this operation does not allocate any memory. This function not assume that the stream is aligned Parameters ---------- source : pyarrow.NativeFile Returns ------- tensor : Tensor """ cdef: shared_ptr[CTensor] sp_tensor CInputStream* c_stream NativeFile nf = as_native_file(source) c_stream = nf.get_input_stream().get() with nogil: sp_tensor = GetResultValue(ReadTensor(c_stream)) return pyarrow_wrap_tensor(sp_tensor) def read_message(source): """ Read length-prefixed message from file or buffer-like object Parameters ---------- source : pyarrow.NativeFile, file-like object, or buffer-like object Returns ------- message : Message """ cdef: Message result = Message.__new__(Message) CInputStream* c_stream cdef NativeFile nf = as_native_file(source) c_stream = nf.get_input_stream().get() with nogil: result.message = move( GetResultValue(ReadMessage(c_stream, c_default_memory_pool()))) if result.message == nullptr: raise EOFError("End of Arrow stream") return result def read_schema(obj, DictionaryMemo dictionary_memo=None): """ Read Schema from message or buffer Parameters ---------- obj : buffer or Message dictionary_memo : DictionaryMemo, optional Needed to be able to reconstruct dictionary-encoded fields with read_record_batch Returns ------- schema : Schema """ cdef: shared_ptr[CSchema] result shared_ptr[CRandomAccessFile] cpp_file CDictionaryMemo temp_memo CDictionaryMemo* arg_dict_memo if isinstance(obj, Message): raise NotImplementedError(type(obj)) get_reader(obj, True, &cpp_file) if dictionary_memo is not None: arg_dict_memo = dictionary_memo.memo else: arg_dict_memo = &temp_memo with nogil: result = GetResultValue(ReadSchema(cpp_file.get(), arg_dict_memo)) return pyarrow_wrap_schema(result) def read_record_batch(obj, Schema schema, DictionaryMemo dictionary_memo=None): """ Read RecordBatch from message, given a known schema. If reading data from a complete IPC stream, use ipc.open_stream instead Parameters ---------- obj : Message or Buffer-like schema : Schema dictionary_memo : DictionaryMemo, optional If message contains dictionaries, must pass a populated DictionaryMemo Returns ------- batch : RecordBatch """ cdef: shared_ptr[CRecordBatch] result Message message CDictionaryMemo temp_memo CDictionaryMemo* arg_dict_memo if isinstance(obj, Message): message = obj else: message = read_message(obj) if dictionary_memo is not None: arg_dict_memo = dictionary_memo.memo else: arg_dict_memo = &temp_memo with nogil: result = GetResultValue( ReadRecordBatch(deref(message.message.get()), schema.sp_schema, arg_dict_memo, CIpcReadOptions.Defaults())) return pyarrow_wrap_batch(result)