diff options
Diffstat (limited to 'src/arrow/python/pyarrow/io.pxi')
-rw-r--r-- | src/arrow/python/pyarrow/io.pxi | 2137 |
1 files changed, 2137 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/io.pxi b/src/arrow/python/pyarrow/io.pxi new file mode 100644 index 000000000..f6c2b4219 --- /dev/null +++ b/src/arrow/python/pyarrow/io.pxi @@ -0,0 +1,2137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Cython wrappers for IO interfaces defined in arrow::io and messaging in +# arrow::ipc + +from libc.stdlib cimport malloc, free + +import codecs +import re +import sys +import threading +import time +import warnings +from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation +from queue import Queue, Empty as QueueEmpty + +from pyarrow.util import _is_path_like, _stringify_path + + +# 64K +DEFAULT_BUFFER_SIZE = 2 ** 16 + + +# To let us get a PyObject* and avoid Cython auto-ref-counting +cdef extern from "Python.h": + PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"( + char *v, Py_ssize_t len) except NULL + + +def io_thread_count(): + """ + Return the number of threads to use for I/O operations. + + Many operations, such as scanning a dataset, will implicitly make + use of this pool. The number of threads is set to a fixed value at + startup. It can be modified at runtime by calling + :func:`set_io_thread_count()`. + + See Also + -------- + set_io_thread_count : Modify the size of this pool. + cpu_count : The analogous function for the CPU thread pool. + """ + return GetIOThreadPoolCapacity() + + +def set_io_thread_count(int count): + """ + Set the number of threads to use for I/O operations. + + Many operations, such as scanning a dataset, will implicitly make + use of this pool. + + Parameters + ---------- + count : int + The max number of threads that may be used for I/O. + Must be positive. + + See Also + -------- + io_thread_count : Get the size of this pool. + set_cpu_count : The analogous function for the CPU thread pool. + """ + if count < 1: + raise ValueError("IO thread count must be strictly positive") + check_status(SetIOThreadPoolCapacity(count)) + + +cdef class NativeFile(_Weakrefable): + """ + The base class for all Arrow streams. + + Streams are either readable, writable, or both. + They optionally support seeking. + + While this class exposes methods to read or write data from Python, the + primary intent of using a Arrow stream is to pass it to other Arrow + facilities that will make use of it, such as Arrow IPC routines. + + Be aware that there are subtle differences with regular Python files, + e.g. destroying a writable Arrow stream without closing it explicitly + will not flush any pending data. + """ + + def __cinit__(self): + self.own_file = False + self.is_readable = False + self.is_writable = False + self.is_seekable = False + + def __dealloc__(self): + if self.own_file: + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + self.close() + + @property + def mode(self): + """ + The file mode. Currently instances of NativeFile may support: + + * rb: binary read + * wb: binary write + * rb+: binary read and write + """ + # Emulate built-in file modes + if self.is_readable and self.is_writable: + return 'rb+' + elif self.is_readable: + return 'rb' + elif self.is_writable: + return 'wb' + else: + raise ValueError('File object is malformed, has no mode') + + def readable(self): + self._assert_open() + return self.is_readable + + def writable(self): + self._assert_open() + return self.is_writable + + def seekable(self): + self._assert_open() + return self.is_seekable + + def isatty(self): + self._assert_open() + return False + + def fileno(self): + """ + NOT IMPLEMENTED + """ + raise UnsupportedOperation() + + @property + def closed(self): + if self.is_readable: + return self.input_stream.get().closed() + elif self.is_writable: + return self.output_stream.get().closed() + else: + return True + + def close(self): + if not self.closed: + with nogil: + if self.is_readable: + check_status(self.input_stream.get().Close()) + else: + check_status(self.output_stream.get().Close()) + + cdef set_random_access_file(self, shared_ptr[CRandomAccessFile] handle): + self.input_stream = <shared_ptr[CInputStream]> handle + self.random_access = handle + self.is_seekable = True + + cdef set_input_stream(self, shared_ptr[CInputStream] handle): + self.input_stream = handle + self.random_access.reset() + self.is_seekable = False + + cdef set_output_stream(self, shared_ptr[COutputStream] handle): + self.output_stream = handle + + cdef shared_ptr[CRandomAccessFile] get_random_access_file(self) except *: + self._assert_readable() + self._assert_seekable() + return self.random_access + + cdef shared_ptr[CInputStream] get_input_stream(self) except *: + self._assert_readable() + return self.input_stream + + cdef shared_ptr[COutputStream] get_output_stream(self) except *: + self._assert_writable() + return self.output_stream + + def _assert_open(self): + if self.closed: + raise ValueError("I/O operation on closed file") + + def _assert_readable(self): + self._assert_open() + if not self.is_readable: + # XXX UnsupportedOperation + raise IOError("only valid on readable files") + + def _assert_writable(self): + self._assert_open() + if not self.is_writable: + raise IOError("only valid on writable files") + + def _assert_seekable(self): + self._assert_open() + if not self.is_seekable: + raise IOError("only valid on seekable files") + + def size(self): + """ + Return file size + """ + cdef int64_t size + + handle = self.get_random_access_file() + with nogil: + size = GetResultValue(handle.get().GetSize()) + + return size + + def metadata(self): + """ + Return file metadata + """ + cdef: + shared_ptr[const CKeyValueMetadata] c_metadata + + handle = self.get_input_stream() + with nogil: + c_metadata = GetResultValue(handle.get().ReadMetadata()) + + metadata = {} + if c_metadata.get() != nullptr: + for i in range(c_metadata.get().size()): + metadata[frombytes(c_metadata.get().key(i))] = \ + c_metadata.get().value(i) + return metadata + + def tell(self): + """ + Return current stream position + """ + cdef int64_t position + + if self.is_readable: + rd_handle = self.get_random_access_file() + with nogil: + position = GetResultValue(rd_handle.get().Tell()) + else: + wr_handle = self.get_output_stream() + with nogil: + position = GetResultValue(wr_handle.get().Tell()) + + return position + + def seek(self, int64_t position, int whence=0): + """ + Change current file stream position + + Parameters + ---------- + position : int + Byte offset, interpreted relative to value of whence argument + whence : int, default 0 + Point of reference for seek offset + + Notes + ----- + Values of whence: + * 0 -- start of stream (the default); offset should be zero or positive + * 1 -- current stream position; offset may be negative + * 2 -- end of stream; offset is usually negative + + Returns + ------- + new_position : the new absolute stream position + """ + cdef int64_t offset + handle = self.get_random_access_file() + + with nogil: + if whence == 0: + offset = position + elif whence == 1: + offset = GetResultValue(handle.get().Tell()) + offset = offset + position + elif whence == 2: + offset = GetResultValue(handle.get().GetSize()) + offset = offset + position + else: + with gil: + raise ValueError("Invalid value of whence: {0}" + .format(whence)) + check_status(handle.get().Seek(offset)) + + return self.tell() + + def flush(self): + """ + Flush the stream, if applicable. + + An error is raised if stream is not writable. + """ + self._assert_open() + # For IOBase compatibility, flush() on an input stream is a no-op + if self.is_writable: + handle = self.get_output_stream() + with nogil: + check_status(handle.get().Flush()) + + def write(self, data): + """ + Write byte from any object implementing buffer protocol (bytes, + bytearray, ndarray, pyarrow.Buffer) + + Parameters + ---------- + data : bytes-like object or exporter of buffer protocol + + Returns + ------- + nbytes : number of bytes written + """ + self._assert_writable() + handle = self.get_output_stream() + + cdef shared_ptr[CBuffer] buf = as_c_buffer(data) + + with nogil: + check_status(handle.get().WriteBuffer(buf)) + return buf.get().size() + + def read(self, nbytes=None): + """ + Read indicated number of bytes from file, or read all remaining bytes + if no argument passed + + Parameters + ---------- + nbytes : int, default None + + Returns + ------- + data : bytes + """ + cdef: + int64_t c_nbytes + int64_t bytes_read = 0 + PyObject* obj + + if nbytes is None: + if not self.is_seekable: + # Cannot get file size => read chunkwise + bs = 16384 + chunks = [] + while True: + chunk = self.read(bs) + if not chunk: + break + chunks.append(chunk) + return b"".join(chunks) + + c_nbytes = self.size() - self.tell() + else: + c_nbytes = nbytes + + handle = self.get_input_stream() + + # Allocate empty write space + obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes) + + cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj) + with nogil: + bytes_read = GetResultValue(handle.get().Read(c_nbytes, buf)) + + if bytes_read < c_nbytes: + cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read) + + return PyObject_to_object(obj) + + def read_at(self, nbytes, offset): + """ + Read indicated number of bytes at offset from the file + + Parameters + ---------- + nbytes : int + offset : int + + Returns + ------- + data : bytes + """ + cdef: + int64_t c_nbytes + int64_t c_offset + int64_t bytes_read = 0 + PyObject* obj + + c_nbytes = nbytes + + c_offset = offset + + handle = self.get_random_access_file() + + # Allocate empty write space + obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes) + + cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj) + with nogil: + bytes_read = GetResultValue(handle.get(). + ReadAt(c_offset, c_nbytes, buf)) + + if bytes_read < c_nbytes: + cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read) + + return PyObject_to_object(obj) + + def read1(self, nbytes=None): + """Read and return up to n bytes. + + Alias for read, needed to match the IOBase interface.""" + return self.read(nbytes=None) + + def readall(self): + return self.read() + + def readinto(self, b): + """ + Read into the supplied buffer + + Parameters + ----------- + b: any python object supporting buffer interface + + Returns + -------- + number of bytes written + """ + + cdef: + int64_t bytes_read + uint8_t* buf + Buffer py_buf + int64_t buf_len + + handle = self.get_input_stream() + + py_buf = py_buffer(b) + buf_len = py_buf.size + buf = py_buf.buffer.get().mutable_data() + + with nogil: + bytes_read = GetResultValue(handle.get().Read(buf_len, buf)) + + return bytes_read + + def readline(self, size=None): + """NOT IMPLEMENTED. Read and return a line of bytes from the file. + + If size is specified, read at most size bytes. + + Line terminator is always b"\\n". + """ + + raise UnsupportedOperation() + + def readlines(self, hint=None): + """NOT IMPLEMENTED. Read lines of the file + + Parameters + ----------- + + hint: int maximum number of bytes read until we stop + """ + + raise UnsupportedOperation() + + def __iter__(self): + self._assert_readable() + return self + + def __next__(self): + line = self.readline() + if not line: + raise StopIteration + return line + + def read_buffer(self, nbytes=None): + cdef: + int64_t c_nbytes + int64_t bytes_read = 0 + shared_ptr[CBuffer] output + + handle = self.get_input_stream() + + if nbytes is None: + if not self.is_seekable: + # Cannot get file size => read chunkwise + return py_buffer(self.read()) + c_nbytes = self.size() - self.tell() + else: + c_nbytes = nbytes + + with nogil: + output = GetResultValue(handle.get().ReadBuffer(c_nbytes)) + + return pyarrow_wrap_buffer(output) + + def truncate(self): + """ + NOT IMPLEMENTED + """ + raise UnsupportedOperation() + + def writelines(self, lines): + self._assert_writable() + + for line in lines: + self.write(line) + + def download(self, stream_or_path, buffer_size=None): + """ + Read file completely to local path (rather than reading completely into + memory). First seeks to the beginning of the file. + """ + cdef: + int64_t bytes_read = 0 + uint8_t* buf + + handle = self.get_input_stream() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + write_queue = Queue(50) + + if not hasattr(stream_or_path, 'read'): + stream = open(stream_or_path, 'wb') + + def cleanup(): + stream.close() + else: + stream = stream_or_path + + def cleanup(): + pass + + done = False + exc_info = None + + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + stream.write(buf) + except Exception as e: + exc_info = sys.exc_info() + finally: + cleanup() + + self.seek(0) + + writer_thread = threading.Thread(target=bg_write) + + # This isn't ideal -- PyBytes_FromStringAndSize copies the data from + # the passed buffer, so it's hard for us to avoid doubling the memory + buf = <uint8_t*> malloc(buffer_size) + if buf == NULL: + raise MemoryError("Failed to allocate {0} bytes" + .format(buffer_size)) + + writer_thread.start() + + cdef int64_t total_bytes = 0 + cdef int32_t c_buffer_size = buffer_size + + try: + while True: + with nogil: + bytes_read = GetResultValue( + handle.get().Read(c_buffer_size, buf)) + + total_bytes += bytes_read + + # EOF + if bytes_read == 0: + break + + pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf, + bytes_read) + + if writer_thread.is_alive(): + while write_queue.full(): + time.sleep(0.01) + else: + break + + write_queue.put_nowait(pybuf) + finally: + free(buf) + done = True + + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] + + def upload(self, stream, buffer_size=None): + """ + Pipe file-like object to file + """ + write_queue = Queue(50) + self._assert_writable() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + done = False + exc_info = None + + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + + self.write(buf) + + except Exception as e: + exc_info = sys.exc_info() + + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() + + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break + + if writer_thread.is_alive(): + while write_queue.full(): + time.sleep(0.01) + else: + break + + write_queue.put_nowait(buf) + finally: + done = True + + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] + +BufferedIOBase.register(NativeFile) + +# ---------------------------------------------------------------------- +# Python file-like objects + + +cdef class PythonFile(NativeFile): + """ + A stream backed by a Python file object. + + This class allows using Python file objects with arbitrary Arrow + functions, including functions written in another language than Python. + + As a downside, there is a non-zero redirection cost in translating + Arrow stream calls to Python method calls. Furthermore, Python's + Global Interpreter Lock may limit parallelism in some situations. + """ + cdef: + object handle + + def __cinit__(self, handle, mode=None): + self.handle = handle + + if mode is None: + try: + inferred_mode = handle.mode + except AttributeError: + # Not all file-like objects have a mode attribute + # (e.g. BytesIO) + try: + inferred_mode = 'w' if handle.writable() else 'r' + except AttributeError: + raise ValueError("could not infer open mode for file-like " + "object %r, please pass it explicitly" + % (handle,)) + else: + inferred_mode = mode + + if inferred_mode.startswith('w'): + kind = 'w' + elif inferred_mode.startswith('r'): + kind = 'r' + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + # If mode was given, check it matches the given file + if mode is not None: + if isinstance(handle, IOBase): + # Python 3 IO object + if kind == 'r': + if not handle.readable(): + raise TypeError("readable file expected") + else: + if not handle.writable(): + raise TypeError("writable file expected") + # (other duck-typed file-like objects are possible) + + # If possible, check the file is a binary file + if isinstance(handle, TextIOBase): + raise TypeError("binary file expected, got text file") + + if kind == 'r': + self.set_random_access_file( + shared_ptr[CRandomAccessFile](new PyReadableFile(handle))) + self.is_readable = True + else: + self.set_output_stream( + shared_ptr[COutputStream](new PyOutputStream(handle))) + self.is_writable = True + + def truncate(self, pos=None): + self.handle.truncate(pos) + + def readline(self, size=None): + return self.handle.readline(size) + + def readlines(self, hint=None): + return self.handle.readlines(hint) + + +cdef class MemoryMappedFile(NativeFile): + """ + A stream that represents a memory-mapped file. + + Supports 'r', 'r+', 'w' modes. + """ + cdef: + shared_ptr[CMemoryMappedFile] handle + object path + + @staticmethod + def create(path, size): + """ + Create a MemoryMappedFile + + Parameters + ---------- + path : str + Where to create the file. + size : int + Size of the memory mapped file. + """ + cdef: + shared_ptr[CMemoryMappedFile] handle + c_string c_path = encode_file_path(path) + int64_t c_size = size + + with nogil: + handle = GetResultValue(CMemoryMappedFile.Create(c_path, c_size)) + + cdef MemoryMappedFile result = MemoryMappedFile() + result.path = path + result.is_readable = True + result.is_writable = True + result.set_output_stream(<shared_ptr[COutputStream]> handle) + result.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle) + result.handle = handle + + return result + + def _open(self, path, mode='r'): + self.path = path + + cdef: + FileMode c_mode + shared_ptr[CMemoryMappedFile] handle + c_string c_path = encode_file_path(path) + + if mode in ('r', 'rb'): + c_mode = FileMode_READ + self.is_readable = True + elif mode in ('w', 'wb'): + c_mode = FileMode_WRITE + self.is_writable = True + elif mode in ('r+', 'r+b', 'rb+'): + c_mode = FileMode_READWRITE + self.is_readable = True + self.is_writable = True + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + with nogil: + handle = GetResultValue(CMemoryMappedFile.Open(c_path, c_mode)) + + self.set_output_stream(<shared_ptr[COutputStream]> handle) + self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle) + self.handle = handle + + def resize(self, new_size): + """ + Resize the map and underlying file. + + Parameters + ---------- + new_size : new size in bytes + """ + check_status(self.handle.get().Resize(new_size)) + + def fileno(self): + self._assert_open() + return self.handle.get().file_descriptor() + + +def memory_map(path, mode='r'): + """ + Open memory map at file path. Size of the memory map cannot change. + + Parameters + ---------- + path : str + mode : {'r', 'r+', 'w'}, default 'r' + Whether the file is opened for reading ('r+'), writing ('w') + or both ('r+'). + + Returns + ------- + mmap : MemoryMappedFile + """ + _check_is_file(path) + + cdef MemoryMappedFile mmap = MemoryMappedFile() + mmap._open(path, mode) + return mmap + + +cdef _check_is_file(path): + if os.path.isdir(path): + raise IOError("Expected file path, but {0} is a directory" + .format(path)) + + +def create_memory_map(path, size): + """ + Create a file of the given size and memory-map it. + + Parameters + ---------- + path : str + The file path to create, on the local filesystem. + size : int + The file size to create. + + Returns + ------- + mmap : MemoryMappedFile + """ + return MemoryMappedFile.create(path, size) + + +cdef class OSFile(NativeFile): + """ + A stream backed by a regular file descriptor. + """ + cdef: + object path + + def __cinit__(self, path, mode='r', MemoryPool memory_pool=None): + _check_is_file(path) + self.path = path + + cdef: + FileMode c_mode + shared_ptr[Readable] handle + c_string c_path = encode_file_path(path) + + if mode in ('r', 'rb'): + self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) + elif mode in ('w', 'wb'): + self._open_writable(c_path) + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + cdef _open_readable(self, c_string path, CMemoryPool* pool): + cdef shared_ptr[ReadableFile] handle + + with nogil: + handle = GetResultValue(ReadableFile.Open(path, pool)) + + self.is_readable = True + self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle) + + cdef _open_writable(self, c_string path): + with nogil: + self.output_stream = GetResultValue(FileOutputStream.Open(path)) + self.is_writable = True + + def fileno(self): + self._assert_open() + return self.handle.file_descriptor() + + +cdef class FixedSizeBufferWriter(NativeFile): + """ + A stream writing to a Arrow buffer. + """ + + def __cinit__(self, Buffer buffer): + self.output_stream.reset(new CFixedSizeBufferWriter(buffer.buffer)) + self.is_writable = True + + def set_memcopy_threads(self, int num_threads): + cdef CFixedSizeBufferWriter* writer = \ + <CFixedSizeBufferWriter*> self.output_stream.get() + writer.set_memcopy_threads(num_threads) + + def set_memcopy_blocksize(self, int64_t blocksize): + cdef CFixedSizeBufferWriter* writer = \ + <CFixedSizeBufferWriter*> self.output_stream.get() + writer.set_memcopy_blocksize(blocksize) + + def set_memcopy_threshold(self, int64_t threshold): + cdef CFixedSizeBufferWriter* writer = \ + <CFixedSizeBufferWriter*> self.output_stream.get() + writer.set_memcopy_threshold(threshold) + + +# ---------------------------------------------------------------------- +# Arrow buffers + + +cdef class Buffer(_Weakrefable): + """ + The base class for all Arrow buffers. + + A buffer represents a contiguous memory area. Many buffers will own + their memory, though not all of them do. + """ + + def __cinit__(self): + pass + + def __init__(self): + raise TypeError("Do not call Buffer's constructor directly, use " + "`pyarrow.py_buffer` function instead.") + + cdef void init(self, const shared_ptr[CBuffer]& buffer): + self.buffer = buffer + self.shape[0] = self.size + self.strides[0] = <Py_ssize_t>(1) + + def __len__(self): + return self.size + + @property + def size(self): + """ + The buffer size in bytes. + """ + return self.buffer.get().size() + + @property + def address(self): + """ + The buffer's address, as an integer. + + The returned address may point to CPU or device memory. + Use `is_cpu()` to disambiguate. + """ + return self.buffer.get().address() + + def hex(self): + """ + Compute hexadecimal representation of the buffer. + + Returns + ------- + : bytes + """ + return self.buffer.get().ToHexString() + + @property + def is_mutable(self): + """ + Whether the buffer is mutable. + """ + return self.buffer.get().is_mutable() + + @property + def is_cpu(self): + """ + Whether the buffer is CPU-accessible. + """ + return self.buffer.get().is_cpu() + + @property + def parent(self): + cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent() + + if parent_buf.get() == NULL: + return None + else: + return pyarrow_wrap_buffer(parent_buf) + + def __getitem__(self, key): + if PySlice_Check(key): + if (key.step or 1) != 1: + raise IndexError('only slices with step 1 supported') + return _normalize_slice(self, key) + + return self.getitem(_normalize_index(key, self.size)) + + cdef getitem(self, int64_t i): + return self.buffer.get().data()[i] + + def slice(self, offset=0, length=None): + """ + Slice this buffer. Memory is not copied. + + You can also use the Python slice notation ``buffer[start:stop]``. + + Parameters + ---------- + offset : int, default 0 + Offset from start of buffer to slice. + length : int, default None + Length of slice (default is until end of Buffer starting from + offset). + + Returns + ------- + sliced : Buffer + A logical view over this buffer. + """ + cdef shared_ptr[CBuffer] result + + if offset < 0: + raise IndexError('Offset must be non-negative') + + if length is None: + result = SliceBuffer(self.buffer, offset) + else: + result = SliceBuffer(self.buffer, offset, max(length, 0)) + + return pyarrow_wrap_buffer(result) + + def equals(self, Buffer other): + """ + Determine if two buffers contain exactly the same data. + + Parameters + ---------- + other : Buffer + + Returns + ------- + are_equal : True if buffer contents and size are equal + """ + cdef c_bool result = False + with nogil: + result = self.buffer.get().Equals(deref(other.buffer.get())) + return result + + def __eq__(self, other): + if isinstance(other, Buffer): + return self.equals(other) + else: + return self.equals(py_buffer(other)) + + def __reduce_ex__(self, protocol): + if protocol >= 5: + return py_buffer, (builtin_pickle.PickleBuffer(self),) + else: + return py_buffer, (self.to_pybytes(),) + + def to_pybytes(self): + """ + Return this buffer as a Python bytes object. Memory is copied. + """ + return cp.PyBytes_FromStringAndSize( + <const char*>self.buffer.get().data(), + self.buffer.get().size()) + + def __getbuffer__(self, cp.Py_buffer* buffer, int flags): + if self.buffer.get().is_mutable(): + buffer.readonly = 0 + else: + if flags & cp.PyBUF_WRITABLE: + raise BufferError("Writable buffer requested but Arrow " + "buffer was not mutable") + buffer.readonly = 1 + buffer.buf = <char *>self.buffer.get().data() + buffer.format = 'b' + buffer.internal = NULL + buffer.itemsize = 1 + buffer.len = self.size + buffer.ndim = 1 + buffer.obj = self + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = NULL + + def __getsegcount__(self, Py_ssize_t *len_out): + if len_out != NULL: + len_out[0] = <Py_ssize_t>self.size + return 1 + + def __getreadbuffer__(self, Py_ssize_t idx, void **p): + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = <void*> self.buffer.get().data() + return self.size + + def __getwritebuffer__(self, Py_ssize_t idx, void **p): + if not self.buffer.get().is_mutable(): + raise SystemError("trying to write an immutable buffer") + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = <void*> self.buffer.get().data() + return self.size + + +cdef class ResizableBuffer(Buffer): + """ + A base class for buffers that can be resized. + """ + + cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer): + self.init(<shared_ptr[CBuffer]> buffer) + + def resize(self, int64_t new_size, shrink_to_fit=False): + """ + Resize buffer to indicated size. + + Parameters + ---------- + new_size : int + New size of buffer (padding may be added internally). + shrink_to_fit : bool, default False + If this is true, the buffer is shrunk when new_size is less + than the current size. + If this is false, the buffer is never shrunk. + """ + cdef c_bool c_shrink_to_fit = shrink_to_fit + with nogil: + check_status((<CResizableBuffer*> self.buffer.get()) + .Resize(new_size, c_shrink_to_fit)) + + +cdef shared_ptr[CResizableBuffer] _allocate_buffer(CMemoryPool* pool) except *: + with nogil: + return to_shared(GetResultValue(AllocateResizableBuffer(0, pool))) + + +def allocate_buffer(int64_t size, MemoryPool memory_pool=None, + resizable=False): + """ + Allocate a mutable buffer. + + Parameters + ---------- + size : int + Number of bytes to allocate (plus internal padding) + memory_pool : MemoryPool, optional + The pool to allocate memory from. + If not given, the default memory pool is used. + resizable : bool, default False + If true, the returned buffer is resizable. + + Returns + ------- + buffer : Buffer or ResizableBuffer + """ + cdef: + CMemoryPool* cpool = maybe_unbox_memory_pool(memory_pool) + shared_ptr[CResizableBuffer] c_rz_buffer + shared_ptr[CBuffer] c_buffer + + if resizable: + with nogil: + c_rz_buffer = to_shared(GetResultValue( + AllocateResizableBuffer(size, cpool))) + return pyarrow_wrap_resizable_buffer(c_rz_buffer) + else: + with nogil: + c_buffer = to_shared(GetResultValue(AllocateBuffer(size, cpool))) + return pyarrow_wrap_buffer(c_buffer) + + +cdef class BufferOutputStream(NativeFile): + + cdef: + shared_ptr[CResizableBuffer] buffer + + def __cinit__(self, MemoryPool memory_pool=None): + self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool)) + self.output_stream.reset(new CBufferOutputStream( + <shared_ptr[CResizableBuffer]> self.buffer)) + self.is_writable = True + + def getvalue(self): + """ + Finalize output stream and return result as pyarrow.Buffer. + + Returns + ------- + value : Buffer + """ + with nogil: + check_status(self.output_stream.get().Close()) + return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer) + + +cdef class MockOutputStream(NativeFile): + + def __cinit__(self): + self.output_stream.reset(new CMockOutputStream()) + self.is_writable = True + + def size(self): + handle = <CMockOutputStream*> self.output_stream.get() + return handle.GetExtentBytesWritten() + + +cdef class BufferReader(NativeFile): + """ + Zero-copy reader from objects convertible to Arrow buffer. + + Parameters + ---------- + obj : Python bytes or pyarrow.Buffer + """ + cdef: + Buffer buffer + + def __cinit__(self, object obj): + self.buffer = as_buffer(obj) + self.set_random_access_file(shared_ptr[CRandomAccessFile]( + new CBufferReader(self.buffer.buffer))) + self.is_readable = True + + +cdef class CompressedInputStream(NativeFile): + """ + An input stream wrapper which decompresses data on the fly. + + Parameters + ---------- + stream : string, path, pa.NativeFile, or file-like object + Input stream object to wrap with the compression. + compression : str + The compression type ("bz2", "brotli", "gzip", "lz4" or "zstd"). + """ + + def __init__(self, object stream, str compression not None): + cdef: + NativeFile nf + Codec codec = Codec(compression) + shared_ptr[CInputStream] c_reader + shared_ptr[CCompressedInputStream] compressed_stream + nf = get_native_file(stream, False) + c_reader = nf.get_input_stream() + compressed_stream = GetResultValue( + CCompressedInputStream.Make(codec.unwrap(), c_reader) + ) + self.set_input_stream(<shared_ptr[CInputStream]> compressed_stream) + self.is_readable = True + + +cdef class CompressedOutputStream(NativeFile): + """ + An output stream wrapper which compresses data on the fly. + + Parameters + ---------- + stream : string, path, pa.NativeFile, or file-like object + Input stream object to wrap with the compression. + compression : str + The compression type ("bz2", "brotli", "gzip", "lz4" or "zstd"). + """ + + def __init__(self, object stream, str compression not None): + cdef: + Codec codec = Codec(compression) + shared_ptr[COutputStream] c_writer + shared_ptr[CCompressedOutputStream] compressed_stream + get_writer(stream, &c_writer) + compressed_stream = GetResultValue( + CCompressedOutputStream.Make(codec.unwrap(), c_writer) + ) + self.set_output_stream(<shared_ptr[COutputStream]> compressed_stream) + self.is_writable = True + + +ctypedef CBufferedInputStream* _CBufferedInputStreamPtr +ctypedef CBufferedOutputStream* _CBufferedOutputStreamPtr +ctypedef CRandomAccessFile* _RandomAccessFilePtr + + +cdef class BufferedInputStream(NativeFile): + """ + An input stream that performs buffered reads from + an unbuffered input stream, which can mitigate the overhead + of many small reads in some cases. + + Parameters + ---------- + stream : NativeFile + The input stream to wrap with the buffer + buffer_size : int + Size of the temporary read buffer. + memory_pool : MemoryPool + The memory pool used to allocate the buffer. + """ + + def __init__(self, NativeFile stream, int buffer_size, + MemoryPool memory_pool=None): + cdef shared_ptr[CBufferedInputStream] buffered_stream + + if buffer_size <= 0: + raise ValueError('Buffer size must be larger than zero') + buffered_stream = GetResultValue(CBufferedInputStream.Create( + buffer_size, maybe_unbox_memory_pool(memory_pool), + stream.get_input_stream())) + + self.set_input_stream(<shared_ptr[CInputStream]> buffered_stream) + self.is_readable = True + + def detach(self): + """ + Release the raw InputStream. + Further operations on this stream are invalid. + + Returns + ------- + raw : NativeFile + The underlying raw input stream + """ + cdef: + shared_ptr[CInputStream] c_raw + _CBufferedInputStreamPtr buffered + NativeFile raw + + buffered = dynamic_cast[_CBufferedInputStreamPtr]( + self.input_stream.get()) + assert buffered != nullptr + + with nogil: + c_raw = GetResultValue(buffered.Detach()) + + raw = NativeFile() + raw.is_readable = True + # Find out whether the raw stream is a RandomAccessFile + # or a mere InputStream. This helps us support seek() etc. + # selectively. + if dynamic_cast[_RandomAccessFilePtr](c_raw.get()) != nullptr: + raw.set_random_access_file( + static_pointer_cast[CRandomAccessFile, CInputStream](c_raw)) + else: + raw.set_input_stream(c_raw) + return raw + + +cdef class BufferedOutputStream(NativeFile): + """ + An output stream that performs buffered reads from + an unbuffered output stream, which can mitigate the overhead + of many small writes in some cases. + + Parameters + ---------- + stream : NativeFile + The writable output stream to wrap with the buffer + buffer_size : int + Size of the buffer that should be added. + memory_pool : MemoryPool + The memory pool used to allocate the buffer. + """ + + def __init__(self, NativeFile stream, int buffer_size, + MemoryPool memory_pool=None): + cdef shared_ptr[CBufferedOutputStream] buffered_stream + + if buffer_size <= 0: + raise ValueError('Buffer size must be larger than zero') + buffered_stream = GetResultValue(CBufferedOutputStream.Create( + buffer_size, maybe_unbox_memory_pool(memory_pool), + stream.get_output_stream())) + + self.set_output_stream(<shared_ptr[COutputStream]> buffered_stream) + self.is_writable = True + + def detach(self): + """ + Flush any buffered writes and release the raw OutputStream. + Further operations on this stream are invalid. + + Returns + ------- + raw : NativeFile + The underlying raw output stream. + """ + cdef: + shared_ptr[COutputStream] c_raw + _CBufferedOutputStreamPtr buffered + NativeFile raw + + buffered = dynamic_cast[_CBufferedOutputStreamPtr]( + self.output_stream.get()) + assert buffered != nullptr + + with nogil: + c_raw = GetResultValue(buffered.Detach()) + + raw = NativeFile() + raw.is_writable = True + raw.set_output_stream(c_raw) + return raw + + +cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src, + shared_ptr[CBuffer]* dest) except *: + py_dest = transform_func(pyarrow_wrap_buffer(src)) + dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest)) + + +cdef class TransformInputStream(NativeFile): + """ + Transform an input stream. + + Parameters + ---------- + stream : NativeFile + The stream to transform. + transform_func : callable + The transformation to apply. + """ + + def __init__(self, NativeFile stream, transform_func): + self.set_input_stream(TransformInputStream.make_native( + stream.get_input_stream(), transform_func)) + self.is_readable = True + + @staticmethod + cdef shared_ptr[CInputStream] make_native( + shared_ptr[CInputStream] stream, transform_func) except *: + cdef: + shared_ptr[CInputStream] transform_stream + CTransformInputStreamVTable vtable + + vtable.transform = _cb_transform + return MakeTransformInputStream(stream, move(vtable), + transform_func) + + +class Transcoder: + + def __init__(self, decoder, encoder): + self._decoder = decoder + self._encoder = encoder + + def __call__(self, buf): + final = len(buf) == 0 + return self._encoder.encode(self._decoder.decode(buf, final), final) + + +def transcoding_input_stream(stream, src_encoding, dest_encoding): + """ + Add a transcoding transformation to the stream. + Incoming data will be decoded according to ``src_encoding`` and + then re-encoded according to ``dest_encoding``. + + Parameters + ---------- + stream : NativeFile + The stream to which the transformation should be applied. + src_encoding : str + The codec to use when reading data data. + dest_encoding : str + The codec to use for emitted data. + """ + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup(dest_encoding) + if src_codec.name == dest_codec.name: + # Avoid losing performance on no-op transcoding + # (encoding errors won't be detected) + return stream + return TransformInputStream(stream, + Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + +cdef shared_ptr[CInputStream] native_transcoding_input_stream( + shared_ptr[CInputStream] stream, src_encoding, + dest_encoding) except *: + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup(dest_encoding) + if src_codec.name == dest_codec.name: + # Avoid losing performance on no-op transcoding + # (encoding errors won't be detected) + return stream + return TransformInputStream.make_native( + stream, Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + +def py_buffer(object obj): + """ + Construct an Arrow buffer from a Python bytes-like or buffer-like object + + Parameters + ---------- + obj : object + the object from which the buffer should be constructed. + """ + cdef shared_ptr[CBuffer] buf + buf = GetResultValue(PyBuffer.FromPyObject(obj)) + return pyarrow_wrap_buffer(buf) + + +def foreign_buffer(address, size, base=None): + """ + Construct an Arrow buffer with the given *address* and *size*. + + The buffer will be optionally backed by the Python *base* object, if given. + The *base* object will be kept alive as long as this buffer is alive, + including across language boundaries (for example if the buffer is + referenced by C++ code). + + Parameters + ---------- + address : int + The starting address of the buffer. The address can + refer to both device or host memory but it must be + accessible from device after mapping it with + `get_device_address` method. + size : int + The size of device buffer in bytes. + base : {None, object} + Object that owns the referenced memory. + """ + cdef: + intptr_t c_addr = address + int64_t c_size = size + shared_ptr[CBuffer] buf + + check_status(PyForeignBuffer.Make(<uint8_t*> c_addr, c_size, + base, &buf)) + return pyarrow_wrap_buffer(buf) + + +def as_buffer(object o): + if isinstance(o, Buffer): + return o + return py_buffer(o) + + +cdef shared_ptr[CBuffer] as_c_buffer(object o) except *: + cdef shared_ptr[CBuffer] buf + if isinstance(o, Buffer): + buf = (<Buffer> o).buffer + if buf == nullptr: + raise ValueError("got null buffer") + else: + buf = GetResultValue(PyBuffer.FromPyObject(o)) + return buf + + +cdef NativeFile get_native_file(object source, c_bool use_memory_map): + try: + source_path = _stringify_path(source) + except TypeError: + if isinstance(source, Buffer): + source = BufferReader(source) + elif not isinstance(source, NativeFile) and hasattr(source, 'read'): + # Optimistically hope this is file-like + source = PythonFile(source, mode='r') + else: + if use_memory_map: + source = memory_map(source_path, mode='r') + else: + source = OSFile(source_path, mode='r') + + return source + + +cdef get_reader(object source, c_bool use_memory_map, + shared_ptr[CRandomAccessFile]* reader): + cdef NativeFile nf + + nf = get_native_file(source, use_memory_map) + reader[0] = nf.get_random_access_file() + + +cdef get_input_stream(object source, c_bool use_memory_map, + shared_ptr[CInputStream]* out): + """ + Like get_reader(), but can automatically decompress, and returns + an InputStream. + """ + cdef: + NativeFile nf + Codec codec + shared_ptr[CInputStream] input_stream + + try: + codec = Codec.detect(source) + except TypeError: + codec = None + + nf = get_native_file(source, use_memory_map) + input_stream = nf.get_input_stream() + + # codec is None if compression can't be detected + if codec is not None: + input_stream = <shared_ptr[CInputStream]> GetResultValue( + CCompressedInputStream.Make(codec.unwrap(), input_stream) + ) + + out[0] = input_stream + + +cdef get_writer(object source, shared_ptr[COutputStream]* writer): + cdef NativeFile nf + + try: + source_path = _stringify_path(source) + except TypeError: + if not isinstance(source, NativeFile) and hasattr(source, 'write'): + # Optimistically hope this is file-like + source = PythonFile(source, mode='w') + else: + source = OSFile(source_path, mode='w') + + if isinstance(source, NativeFile): + nf = source + writer[0] = nf.get_output_stream() + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + + +# --------------------------------------------------------------------- + + +def _detect_compression(path): + if isinstance(path, str): + if path.endswith('.bz2'): + return 'bz2' + elif path.endswith('.gz'): + return 'gzip' + elif path.endswith('.lz4'): + return 'lz4' + elif path.endswith('.zst'): + return 'zstd' + + +cdef CCompressionType _ensure_compression(str name) except *: + uppercase = name.upper() + if uppercase == 'BZ2': + return CCompressionType_BZ2 + elif uppercase == 'GZIP': + return CCompressionType_GZIP + elif uppercase == 'BROTLI': + return CCompressionType_BROTLI + elif uppercase == 'LZ4' or uppercase == 'LZ4_FRAME': + return CCompressionType_LZ4_FRAME + elif uppercase == 'LZ4_RAW': + return CCompressionType_LZ4 + elif uppercase == 'SNAPPY': + return CCompressionType_SNAPPY + elif uppercase == 'ZSTD': + return CCompressionType_ZSTD + else: + raise ValueError('Invalid value for compression: {!r}'.format(name)) + + +cdef class Codec(_Weakrefable): + """ + Compression codec. + + Parameters + ---------- + compression : str + Type of compression codec to initialize, valid values are: 'gzip', + 'bz2', 'brotli', 'lz4' (or 'lz4_frame'), 'lz4_raw', 'zstd' and + 'snappy'. + compression_level : int, None + Optional parameter specifying how aggressively to compress. The + possible ranges and effect of this parameter depend on the specific + codec chosen. Higher values compress more but typically use more + resources (CPU/RAM). Some codecs support negative values. + + gzip + The compression_level maps to the memlevel parameter of + deflateInit2. Higher levels use more RAM but are faster + and should have higher compression ratios. + + bz2 + The compression level maps to the blockSize100k parameter of + the BZ2_bzCompressInit function. Higher levels use more RAM + but are faster and should have higher compression ratios. + + brotli + The compression level maps to the BROTLI_PARAM_QUALITY + parameter. Higher values are slower and should have higher + compression ratios. + + lz4/lz4_frame/lz4_raw + The compression level parameter is not supported and must + be None + + zstd + The compression level maps to the compressionLevel parameter + of ZSTD_initCStream. Negative values are supported. Higher + values are slower and should have higher compression ratios. + + snappy + The compression level parameter is not supported and must + be None + + + Raises + ------ + ValueError + If invalid compression value is passed. + """ + + def __init__(self, str compression not None, compression_level=None): + cdef CCompressionType typ = _ensure_compression(compression) + if compression_level is not None: + self.wrapped = shared_ptr[CCodec](move(GetResultValue( + CCodec.CreateWithLevel(typ, compression_level)))) + else: + self.wrapped = shared_ptr[CCodec](move(GetResultValue( + CCodec.Create(typ)))) + + cdef inline CCodec* unwrap(self) nogil: + return self.wrapped.get() + + @staticmethod + def detect(path): + """ + Detect and instantiate compression codec based on file extension. + + Parameters + ---------- + path : str, path-like + File-path to detect compression from. + + Raises + ------ + TypeError + If the passed value is not path-like. + ValueError + If the compression can't be detected from the path. + + Returns + ------- + Codec + """ + return Codec(_detect_compression(_stringify_path(path))) + + @staticmethod + def is_available(str compression not None): + """ + Returns whether the compression support has been built and enabled. + + Parameters + ---------- + compression : str + Type of compression codec, + refer to Codec docstring for a list of supported ones. + + Returns + ------- + bool + """ + cdef CCompressionType typ = _ensure_compression(compression) + return CCodec.IsAvailable(typ) + + @staticmethod + def supports_compression_level(str compression not None): + """ + Returns true if the compression level parameter is supported + for the given codec. + + Parameters + ---------- + compression : str + Type of compression codec, + refer to Codec docstring for a list of supported ones. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return CCodec.SupportsCompressionLevel(typ) + + @staticmethod + def default_compression_level(str compression not None): + """ + Returns the compression level that Arrow will use for the codec if + None is specified. + + Parameters + ---------- + compression : str + Type of compression codec, + refer to Codec docstring for a list of supported ones. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.DefaultCompressionLevel(typ)) + + @staticmethod + def minimum_compression_level(str compression not None): + """ + Returns the smallest valid value for the compression level + + Parameters + ---------- + compression : str + Type of compression codec, + refer to Codec docstring for a list of supported ones. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.MinimumCompressionLevel(typ)) + + @staticmethod + def maximum_compression_level(str compression not None): + """ + Returns the largest valid value for the compression level + + Parameters + ---------- + compression : str + Type of compression codec, + refer to Codec docstring for a list of supported ones. + """ + cdef CCompressionType typ = _ensure_compression(compression) + return GetResultValue(CCodec.MaximumCompressionLevel(typ)) + + @property + def name(self): + """Returns the name of the codec""" + return frombytes(self.unwrap().name()) + + @property + def compression_level(self): + """Returns the compression level parameter of the codec""" + return frombytes(self.unwrap().compression_level()) + + def compress(self, object buf, asbytes=False, memory_pool=None): + """ + Compress data from buffer-like object. + + Parameters + ---------- + buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol + asbytes : bool, default False + Return result as Python bytes object, otherwise Buffer + memory_pool : MemoryPool, default None + Memory pool to use for buffer allocations, if any + + Returns + ------- + compressed : pyarrow.Buffer or bytes (if asbytes=True) + """ + cdef: + shared_ptr[CBuffer] owned_buf + CBuffer* c_buf + PyObject* pyobj + ResizableBuffer out_buf + int64_t max_output_size + int64_t output_length + uint8_t* output_buffer = NULL + + owned_buf = as_c_buffer(buf) + c_buf = owned_buf.get() + + max_output_size = self.wrapped.get().MaxCompressedLen( + c_buf.size(), c_buf.data() + ) + + if asbytes: + pyobj = PyBytes_FromStringAndSizeNative(NULL, max_output_size) + output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(<object> pyobj) + else: + out_buf = allocate_buffer( + max_output_size, memory_pool=memory_pool, resizable=True + ) + output_buffer = out_buf.buffer.get().mutable_data() + + with nogil: + output_length = GetResultValue( + self.unwrap().Compress( + c_buf.size(), + c_buf.data(), + max_output_size, + output_buffer + ) + ) + + if asbytes: + cp._PyBytes_Resize(&pyobj, <Py_ssize_t> output_length) + return PyObject_to_object(pyobj) + else: + out_buf.resize(output_length) + return out_buf + + def decompress(self, object buf, decompressed_size=None, asbytes=False, + memory_pool=None): + """ + Decompress data from buffer-like object. + + Parameters + ---------- + buf : pyarrow.Buffer, bytes, or memoryview-compatible object + decompressed_size : int64_t, default None + If not specified, will be computed if the codec is able to + determine the uncompressed buffer size. + asbytes : boolean, default False + Return result as Python bytes object, otherwise Buffer + memory_pool : MemoryPool, default None + Memory pool to use for buffer allocations, if any. + + Returns + ------- + uncompressed : pyarrow.Buffer or bytes (if asbytes=True) + """ + cdef: + shared_ptr[CBuffer] owned_buf + CBuffer* c_buf + Buffer out_buf + int64_t output_size + uint8_t* output_buffer = NULL + + owned_buf = as_c_buffer(buf) + c_buf = owned_buf.get() + + if decompressed_size is None: + raise ValueError( + "Must pass decompressed_size for {} codec".format(self) + ) + + output_size = decompressed_size + + if asbytes: + pybuf = cp.PyBytes_FromStringAndSize(NULL, output_size) + output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(pybuf) + else: + out_buf = allocate_buffer(output_size, memory_pool=memory_pool) + output_buffer = out_buf.buffer.get().mutable_data() + + with nogil: + GetResultValue( + self.unwrap().Decompress( + c_buf.size(), + c_buf.data(), + output_size, + output_buffer + ) + ) + + return pybuf if asbytes else out_buf + + +def compress(object buf, codec='lz4', asbytes=False, memory_pool=None): + """ + Compress data from buffer-like object. + + Parameters + ---------- + buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol + codec : str, default 'lz4' + Compression codec. + Supported types: {'brotli, 'gzip', 'lz4', 'lz4_raw', 'snappy', 'zstd'} + asbytes : bool, default False + Return result as Python bytes object, otherwise Buffer. + memory_pool : MemoryPool, default None + Memory pool to use for buffer allocations, if any. + + Returns + ------- + compressed : pyarrow.Buffer or bytes (if asbytes=True) + """ + cdef Codec coder = Codec(codec) + return coder.compress(buf, asbytes=asbytes, memory_pool=memory_pool) + + +def decompress(object buf, decompressed_size=None, codec='lz4', + asbytes=False, memory_pool=None): + """ + Decompress data from buffer-like object. + + Parameters + ---------- + buf : pyarrow.Buffer, bytes, or memoryview-compatible object + Input object to decompress data from. + decompressed_size : int64_t, default None + If not specified, will be computed if the codec is able to determine + the uncompressed buffer size. + codec : str, default 'lz4' + Compression codec. + Supported types: {'brotli, 'gzip', 'lz4', 'lz4_raw', 'snappy', 'zstd'} + asbytes : bool, default False + Return result as Python bytes object, otherwise Buffer. + memory_pool : MemoryPool, default None + Memory pool to use for buffer allocations, if any. + + Returns + ------- + uncompressed : pyarrow.Buffer or bytes (if asbytes=True) + """ + cdef Codec decoder = Codec(codec) + return decoder.decompress(buf, asbytes=asbytes, memory_pool=memory_pool, + decompressed_size=decompressed_size) + + +def input_stream(source, compression='detect', buffer_size=None): + """ + Create an Arrow input stream. + + Parameters + ---------- + source : str, Path, buffer, file-like object, ... + The source to open for reading. + compression : str optional, default 'detect' + The compression algorithm to use for on-the-fly decompression. + If "detect" and source is a file path, then compression will be + chosen based on the file extension. + If None, no compression will be applied. + Otherwise, a well-known algorithm name must be supplied (e.g. "gzip"). + buffer_size : int, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary read buffer. + """ + cdef NativeFile stream + + try: + source_path = _stringify_path(source) + except TypeError: + source_path = None + + if isinstance(source, NativeFile): + stream = source + elif source_path is not None: + stream = OSFile(source_path, 'r') + elif isinstance(source, (Buffer, memoryview)): + stream = BufferReader(as_buffer(source)) + elif (hasattr(source, 'read') and + hasattr(source, 'close') and + hasattr(source, 'closed')): + stream = PythonFile(source, 'r') + else: + raise TypeError("pa.input_stream() called with instance of '{}'" + .format(source.__class__)) + + if compression == 'detect': + # detect for OSFile too + compression = _detect_compression(source_path) + + if buffer_size is not None and buffer_size != 0: + stream = BufferedInputStream(stream, buffer_size) + + if compression is not None: + stream = CompressedInputStream(stream, compression) + + return stream + + +def output_stream(source, compression='detect', buffer_size=None): + """ + Create an Arrow output stream. + + Parameters + ---------- + source : str, Path, buffer, file-like object, ... + The source to open for writing. + compression : str optional, default 'detect' + The compression algorithm to use for on-the-fly compression. + If "detect" and source is a file path, then compression will be + chosen based on the file extension. + If None, no compression will be applied. + Otherwise, a well-known algorithm name must be supplied (e.g. "gzip"). + buffer_size : int, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary write buffer. + """ + cdef NativeFile stream + + try: + source_path = _stringify_path(source) + except TypeError: + source_path = None + + if isinstance(source, NativeFile): + stream = source + elif source_path is not None: + stream = OSFile(source_path, 'w') + elif isinstance(source, (Buffer, memoryview)): + stream = FixedSizeBufferWriter(as_buffer(source)) + elif (hasattr(source, 'write') and + hasattr(source, 'close') and + hasattr(source, 'closed')): + stream = PythonFile(source, 'w') + else: + raise TypeError("pa.output_stream() called with instance of '{}'" + .format(source.__class__)) + + if compression == 'detect': + compression = _detect_compression(source_path) + + if buffer_size is not None and buffer_size != 0: + stream = BufferedOutputStream(stream, buffer_size) + + if compression is not None: + stream = CompressedOutputStream(stream, compression) + + return stream |