summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/table.pxi
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/python/pyarrow/table.pxi')
-rw-r--r--src/arrow/python/pyarrow/table.pxi2389
1 files changed, 2389 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/table.pxi b/src/arrow/python/pyarrow/table.pxi
new file mode 100644
index 000000000..8105ce482
--- /dev/null
+++ b/src/arrow/python/pyarrow/table.pxi
@@ -0,0 +1,2389 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+
+
+cdef class ChunkedArray(_PandasConvertible):
+ """
+ An array-like composed from a (possibly empty) collection of pyarrow.Arrays
+
+ Warnings
+ --------
+ Do not call this class's constructor directly.
+ """
+
+ def __cinit__(self):
+ self.chunked_array = NULL
+
+ def __init__(self):
+ raise TypeError("Do not call ChunkedArray's constructor directly, use "
+ "`chunked_array` function instead.")
+
+ cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array):
+ self.sp_chunked_array = chunked_array
+ self.chunked_array = chunked_array.get()
+
+ def __reduce__(self):
+ return chunked_array, (self.chunks, self.type)
+
+ @property
+ def data(self):
+ import warnings
+ warnings.warn("Calling .data on ChunkedArray is provided for "
+ "compatibility after Column was removed, simply drop "
+ "this attribute", FutureWarning)
+ return self
+
+ @property
+ def type(self):
+ return pyarrow_wrap_data_type(self.sp_chunked_array.get().type())
+
+ def length(self):
+ return self.chunked_array.length()
+
+ def __len__(self):
+ return self.length()
+
+ def __repr__(self):
+ type_format = object.__repr__(self)
+ return '{0}\n{1}'.format(type_format, str(self))
+
+ def to_string(self, *, int indent=0, int window=10,
+ c_bool skip_new_lines=False):
+ """
+ Render a "pretty-printed" string representation of the ChunkedArray
+
+ Parameters
+ ----------
+ indent : int
+ How much to indent right the content of the array,
+ by default ``0``.
+ window : int
+ How many items to preview at the begin and end
+ of the array when the arrays is bigger than the window.
+ The other elements will be ellipsed.
+ skip_new_lines : bool
+ If the array should be rendered as a single line of text
+ or if each element should be on its own line.
+ """
+ cdef:
+ c_string result
+ PrettyPrintOptions options
+
+ with nogil:
+ options = PrettyPrintOptions(indent, window)
+ options.skip_new_lines = skip_new_lines
+ check_status(
+ PrettyPrint(
+ deref(self.chunked_array),
+ options,
+ &result
+ )
+ )
+
+ return frombytes(result, safe=True)
+
+ def format(self, **kwargs):
+ import warnings
+ warnings.warn('ChunkedArray.format is deprecated, '
+ 'use ChunkedArray.to_string')
+ return self.to_string(**kwargs)
+
+ def __str__(self):
+ return self.to_string()
+
+ def validate(self, *, full=False):
+ """
+ Perform validation checks. An exception is raised if validation fails.
+
+ By default only cheap validation checks are run. Pass `full=True`
+ for thorough validation checks (potentially O(n)).
+
+ Parameters
+ ----------
+ full: bool, default False
+ If True, run expensive checks, otherwise cheap checks only.
+
+ Raises
+ ------
+ ArrowInvalid
+ """
+ if full:
+ with nogil:
+ check_status(self.sp_chunked_array.get().ValidateFull())
+ else:
+ with nogil:
+ check_status(self.sp_chunked_array.get().Validate())
+
+ @property
+ def null_count(self):
+ """
+ Number of null entries
+
+ Returns
+ -------
+ int
+ """
+ return self.chunked_array.null_count()
+
+ @property
+ def nbytes(self):
+ """
+ Total number of bytes consumed by the elements of the chunked array.
+ """
+ size = 0
+ for chunk in self.iterchunks():
+ size += chunk.nbytes
+ return size
+
+ def __sizeof__(self):
+ return super(ChunkedArray, self).__sizeof__() + self.nbytes
+
+ def __iter__(self):
+ for chunk in self.iterchunks():
+ for item in chunk:
+ yield item
+
+ def __getitem__(self, key):
+ """
+ Slice or return value at given index
+
+ Parameters
+ ----------
+ key : integer or slice
+ Slices with step not equal to 1 (or None) will produce a copy
+ rather than a zero-copy view
+
+ Returns
+ -------
+ value : Scalar (index) or ChunkedArray (slice)
+ """
+ if isinstance(key, slice):
+ return _normalize_slice(self, key)
+
+ return self.getitem(_normalize_index(key, self.chunked_array.length()))
+
+ cdef getitem(self, int64_t index):
+ cdef int j
+
+ for j in range(self.num_chunks):
+ if index < self.chunked_array.chunk(j).get().length():
+ return self.chunk(j)[index]
+ else:
+ index -= self.chunked_array.chunk(j).get().length()
+
+ def is_null(self, *, nan_is_null=False):
+ """
+ Return boolean array indicating the null values.
+
+ Parameters
+ ----------
+ nan_is_null : bool (optional, default False)
+ Whether floating-point NaN values should also be considered null.
+
+ Returns
+ -------
+ array : boolean Array or ChunkedArray
+ """
+ options = _pc().NullOptions(nan_is_null=nan_is_null)
+ return _pc().call_function('is_null', [self], options)
+
+ def is_valid(self):
+ """
+ Return boolean array indicating the non-null values.
+ """
+ return _pc().is_valid(self)
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def fill_null(self, fill_value):
+ """
+ See pyarrow.compute.fill_null docstring for usage.
+ """
+ return _pc().fill_null(self, fill_value)
+
+ def equals(self, ChunkedArray other):
+ """
+ Return whether the contents of two chunked arrays are equal.
+
+ Parameters
+ ----------
+ other : pyarrow.ChunkedArray
+ Chunked array to compare against.
+
+ Returns
+ -------
+ are_equal : bool
+ """
+ if other is None:
+ return False
+
+ cdef:
+ CChunkedArray* this_arr = self.chunked_array
+ CChunkedArray* other_arr = other.chunked_array
+ c_bool result
+
+ with nogil:
+ result = this_arr.Equals(deref(other_arr))
+
+ return result
+
+ def _to_pandas(self, options, **kwargs):
+ return _array_like_to_pandas(self, options)
+
+ def to_numpy(self):
+ """
+ Return a NumPy copy of this array (experimental).
+
+ Returns
+ -------
+ array : numpy.ndarray
+ """
+ cdef:
+ PyObject* out
+ PandasOptions c_options
+ object values
+
+ if self.type.id == _Type_EXTENSION:
+ storage_array = chunked_array(
+ [chunk.storage for chunk in self.iterchunks()],
+ type=self.type.storage_type
+ )
+ return storage_array.to_numpy()
+
+ with nogil:
+ check_status(
+ ConvertChunkedArrayToPandas(
+ c_options,
+ self.sp_chunked_array,
+ self,
+ &out
+ )
+ )
+
+ # wrap_array_output uses pandas to convert to Categorical, here
+ # always convert to numpy array
+ values = PyObject_to_object(out)
+
+ if isinstance(values, dict):
+ values = np.take(values['dictionary'], values['indices'])
+
+ return values
+
+ def __array__(self, dtype=None):
+ values = self.to_numpy()
+ if dtype is None:
+ return values
+ return values.astype(dtype)
+
+ def cast(self, object target_type, safe=True):
+ """
+ Cast array values to another data type
+
+ See pyarrow.compute.cast for usage
+ """
+ return _pc().cast(self, target_type, safe=safe)
+
+ def dictionary_encode(self, null_encoding='mask'):
+ """
+ Compute dictionary-encoded representation of array
+
+ Returns
+ -------
+ pyarrow.ChunkedArray
+ Same chunking as the input, all chunks share a common dictionary.
+ """
+ options = _pc().DictionaryEncodeOptions(null_encoding)
+ return _pc().call_function('dictionary_encode', [self], options)
+
+ def flatten(self, MemoryPool memory_pool=None):
+ """
+ Flatten this ChunkedArray. If it has a struct type, the column is
+ flattened into one array per struct field.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ result : List[ChunkedArray]
+ """
+ cdef:
+ vector[shared_ptr[CChunkedArray]] flattened
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ flattened = GetResultValue(self.chunked_array.Flatten(pool))
+
+ return [pyarrow_wrap_chunked_array(col) for col in flattened]
+
+ def combine_chunks(self, MemoryPool memory_pool=None):
+ """
+ Flatten this ChunkedArray into a single non-chunked array.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ result : Array
+ """
+ return concat_arrays(self.chunks)
+
+ def unique(self):
+ """
+ Compute distinct elements in array
+
+ Returns
+ -------
+ pyarrow.Array
+ """
+ return _pc().call_function('unique', [self])
+
+ def value_counts(self):
+ """
+ Compute counts of unique elements in array.
+
+ Returns
+ -------
+ An array of <input type "Values", int64_t "Counts"> structs
+ """
+ return _pc().call_function('value_counts', [self])
+
+ def slice(self, offset=0, length=None):
+ """
+ Compute zero-copy slice of this ChunkedArray
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Offset from start of array to slice
+ length : int, default None
+ Length of slice (default is until end of batch starting from
+ offset)
+
+ Returns
+ -------
+ sliced : ChunkedArray
+ """
+ cdef shared_ptr[CChunkedArray] result
+
+ if offset < 0:
+ raise IndexError('Offset must be non-negative')
+
+ offset = min(len(self), offset)
+ if length is None:
+ result = self.chunked_array.Slice(offset)
+ else:
+ result = self.chunked_array.Slice(offset, length)
+
+ return pyarrow_wrap_chunked_array(result)
+
+ def filter(self, mask, object null_selection_behavior="drop"):
+ """
+ Select values from a chunked array. See pyarrow.compute.filter for full
+ usage.
+ """
+ return _pc().filter(self, mask, null_selection_behavior)
+
+ def index(self, value, start=None, end=None, *, memory_pool=None):
+ """
+ Find the first index of a value.
+
+ See pyarrow.compute.index for full usage.
+ """
+ return _pc().index(self, value, start, end, memory_pool=memory_pool)
+
+ def take(self, object indices):
+ """
+ Select values from a chunked array. See pyarrow.compute.take for full
+ usage.
+ """
+ return _pc().take(self, indices)
+
+ def drop_null(self):
+ """
+ Remove missing values from a chunked array.
+ See pyarrow.compute.drop_null for full description.
+ """
+ return _pc().drop_null(self)
+
+ def unify_dictionaries(self, MemoryPool memory_pool=None):
+ """
+ Unify dictionaries across all chunks.
+
+ This method returns an equivalent chunked array, but where all
+ chunks share the same dictionary values. Dictionary indices are
+ transposed accordingly.
+
+ If there are no dictionaries in the chunked array, it is returned
+ unchanged.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ result : ChunkedArray
+ """
+ cdef:
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+ shared_ptr[CChunkedArray] c_result
+
+ with nogil:
+ c_result = GetResultValue(CDictionaryUnifier.UnifyChunkedArray(
+ self.sp_chunked_array, pool))
+
+ return pyarrow_wrap_chunked_array(c_result)
+
+ @property
+ def num_chunks(self):
+ """
+ Number of underlying chunks
+
+ Returns
+ -------
+ int
+ """
+ return self.chunked_array.num_chunks()
+
+ def chunk(self, i):
+ """
+ Select a chunk by its index
+
+ Parameters
+ ----------
+ i : int
+
+ Returns
+ -------
+ pyarrow.Array
+ """
+ if i >= self.num_chunks or i < 0:
+ raise IndexError('Chunk index out of range.')
+
+ return pyarrow_wrap_array(self.chunked_array.chunk(i))
+
+ @property
+ def chunks(self):
+ return list(self.iterchunks())
+
+ def iterchunks(self):
+ for i in range(self.num_chunks):
+ yield self.chunk(i)
+
+ def to_pylist(self):
+ """
+ Convert to a list of native Python objects.
+ """
+ result = []
+ for i in range(self.num_chunks):
+ result += self.chunk(i).to_pylist()
+ return result
+
+
+def chunked_array(arrays, type=None):
+ """
+ Construct chunked array from list of array-like objects
+
+ Parameters
+ ----------
+ arrays : Array, list of Array, or values coercible to arrays
+ Must all be the same data type. Can be empty only if type also passed.
+ type : DataType or string coercible to DataType
+
+ Returns
+ -------
+ ChunkedArray
+ """
+ cdef:
+ Array arr
+ vector[shared_ptr[CArray]] c_arrays
+ shared_ptr[CChunkedArray] sp_chunked_array
+
+ type = ensure_type(type, allow_none=True)
+
+ if isinstance(arrays, Array):
+ arrays = [arrays]
+
+ for x in arrays:
+ arr = x if isinstance(x, Array) else array(x, type=type)
+
+ if type is None:
+ # it allows more flexible chunked array construction from to coerce
+ # subsequent arrays to the firstly inferred array type
+ # it also spares the inference overhead after the first chunk
+ type = arr.type
+ else:
+ if arr.type != type:
+ raise TypeError(
+ "All array chunks must have type {}".format(type)
+ )
+
+ c_arrays.push_back(arr.sp_array)
+
+ if c_arrays.size() == 0 and type is None:
+ raise ValueError("When passing an empty collection of arrays "
+ "you must also pass the data type")
+
+ sp_chunked_array.reset(
+ new CChunkedArray(c_arrays, pyarrow_unwrap_data_type(type))
+ )
+ with nogil:
+ check_status(sp_chunked_array.get().Validate())
+
+ return pyarrow_wrap_chunked_array(sp_chunked_array)
+
+
+cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema):
+ cdef:
+ Py_ssize_t K = len(arrays)
+ c_string c_name
+ shared_ptr[CDataType] c_type
+ shared_ptr[const CKeyValueMetadata] c_meta
+ vector[shared_ptr[CField]] c_fields
+
+ if metadata is not None:
+ c_meta = KeyValueMetadata(metadata).unwrap()
+
+ if K == 0:
+ if names is None or len(names) == 0:
+ schema.reset(new CSchema(c_fields, c_meta))
+ return arrays
+ else:
+ raise ValueError('Length of names ({}) does not match '
+ 'length of arrays ({})'.format(len(names), K))
+
+ c_fields.resize(K)
+
+ if names is None:
+ raise ValueError('Must pass names or schema when constructing '
+ 'Table or RecordBatch.')
+
+ if len(names) != K:
+ raise ValueError('Length of names ({}) does not match '
+ 'length of arrays ({})'.format(len(names), K))
+
+ converted_arrays = []
+ for i in range(K):
+ val = arrays[i]
+ if not isinstance(val, (Array, ChunkedArray)):
+ val = array(val)
+
+ c_type = (<DataType> val.type).sp_type
+
+ if names[i] is None:
+ c_name = b'None'
+ else:
+ c_name = tobytes(names[i])
+ c_fields[i].reset(new CField(c_name, c_type, True))
+ converted_arrays.append(val)
+
+ schema.reset(new CSchema(c_fields, c_meta))
+ return converted_arrays
+
+
+cdef _sanitize_arrays(arrays, names, schema, metadata,
+ shared_ptr[CSchema]* c_schema):
+ cdef Schema cy_schema
+ if schema is None:
+ converted_arrays = _schema_from_arrays(arrays, names, metadata,
+ c_schema)
+ else:
+ if names is not None:
+ raise ValueError('Cannot pass both schema and names')
+ if metadata is not None:
+ raise ValueError('Cannot pass both schema and metadata')
+ cy_schema = schema
+
+ if len(schema) != len(arrays):
+ raise ValueError('Schema and number of arrays unequal')
+
+ c_schema[0] = cy_schema.sp_schema
+ converted_arrays = []
+ for i, item in enumerate(arrays):
+ item = asarray(item, type=schema[i].type)
+ converted_arrays.append(item)
+ return converted_arrays
+
+
+cdef class RecordBatch(_PandasConvertible):
+ """
+ Batch of rows of columns of equal length
+
+ Warnings
+ --------
+ Do not call this class's constructor directly, use one of the
+ ``RecordBatch.from_*`` functions instead.
+ """
+
+ def __cinit__(self):
+ self.batch = NULL
+ self._schema = None
+
+ def __init__(self):
+ raise TypeError("Do not call RecordBatch's constructor directly, use "
+ "one of the `RecordBatch.from_*` functions instead.")
+
+ cdef void init(self, const shared_ptr[CRecordBatch]& batch):
+ self.sp_batch = batch
+ self.batch = batch.get()
+
+ @staticmethod
+ def from_pydict(mapping, schema=None, metadata=None):
+ """
+ Construct a RecordBatch from Arrow arrays or columns.
+
+ Parameters
+ ----------
+ mapping : dict or Mapping
+ A mapping of strings to Arrays or Python lists.
+ schema : Schema, default None
+ If not passed, will be inferred from the Mapping values.
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if inferred).
+
+ Returns
+ -------
+ RecordBatch
+ """
+
+ return _from_pydict(cls=RecordBatch,
+ mapping=mapping,
+ schema=schema,
+ metadata=metadata)
+
+ def __reduce__(self):
+ return _reconstruct_record_batch, (self.columns, self.schema)
+
+ def __len__(self):
+ return self.batch.num_rows()
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def to_string(self, show_metadata=False):
+ # Use less verbose schema output.
+ schema_as_string = self.schema.to_string(
+ show_field_metadata=show_metadata,
+ show_schema_metadata=show_metadata
+ )
+ return 'pyarrow.{}\n{}'.format(type(self).__name__, schema_as_string)
+
+ def __repr__(self):
+ return self.to_string()
+
+ def validate(self, *, full=False):
+ """
+ Perform validation checks. An exception is raised if validation fails.
+
+ By default only cheap validation checks are run. Pass `full=True`
+ for thorough validation checks (potentially O(n)).
+
+ Parameters
+ ----------
+ full: bool, default False
+ If True, run expensive checks, otherwise cheap checks only.
+
+ Raises
+ ------
+ ArrowInvalid
+ """
+ if full:
+ with nogil:
+ check_status(self.batch.ValidateFull())
+ else:
+ with nogil:
+ check_status(self.batch.Validate())
+
+ def replace_schema_metadata(self, metadata=None):
+ """
+ Create shallow copy of record batch by replacing schema
+ key-value metadata with the indicated new metadata (which may be None,
+ which deletes any existing metadata
+
+ Parameters
+ ----------
+ metadata : dict, default None
+
+ Returns
+ -------
+ shallow_copy : RecordBatch
+ """
+ cdef:
+ shared_ptr[const CKeyValueMetadata] c_meta
+ shared_ptr[CRecordBatch] c_batch
+
+ metadata = ensure_metadata(metadata, allow_none=True)
+ c_meta = pyarrow_unwrap_metadata(metadata)
+ with nogil:
+ c_batch = self.batch.ReplaceSchemaMetadata(c_meta)
+
+ return pyarrow_wrap_batch(c_batch)
+
+ @property
+ def num_columns(self):
+ """
+ Number of columns
+
+ Returns
+ -------
+ int
+ """
+ return self.batch.num_columns()
+
+ @property
+ def num_rows(self):
+ """
+ Number of rows
+
+ Due to the definition of a RecordBatch, all columns have the same
+ number of rows.
+
+ Returns
+ -------
+ int
+ """
+ return len(self)
+
+ @property
+ def schema(self):
+ """
+ Schema of the RecordBatch and its columns
+
+ Returns
+ -------
+ pyarrow.Schema
+ """
+ if self._schema is None:
+ self._schema = pyarrow_wrap_schema(self.batch.schema())
+
+ return self._schema
+
+ def field(self, i):
+ """
+ Select a schema field by its column name or numeric index
+
+ Parameters
+ ----------
+ i : int or string
+ The index or name of the field to retrieve
+
+ Returns
+ -------
+ pyarrow.Field
+ """
+ return self.schema.field(i)
+
+ @property
+ def columns(self):
+ """
+ List of all columns in numerical order
+
+ Returns
+ -------
+ list of pa.Array
+ """
+ return [self.column(i) for i in range(self.num_columns)]
+
+ def _ensure_integer_index(self, i):
+ """
+ Ensure integer index (convert string column name to integer if needed).
+ """
+ if isinstance(i, (bytes, str)):
+ field_indices = self.schema.get_all_field_indices(i)
+
+ if len(field_indices) == 0:
+ raise KeyError(
+ "Field \"{}\" does not exist in record batch schema"
+ .format(i))
+ elif len(field_indices) > 1:
+ raise KeyError(
+ "Field \"{}\" exists {} times in record batch schema"
+ .format(i, len(field_indices)))
+ else:
+ return field_indices[0]
+ elif isinstance(i, int):
+ return i
+ else:
+ raise TypeError("Index must either be string or integer")
+
+ def column(self, i):
+ """
+ Select single column from record batch
+
+ Parameters
+ ----------
+ i : int or string
+ The index or name of the column to retrieve.
+
+ Returns
+ -------
+ column : pyarrow.Array
+ """
+ return self._column(self._ensure_integer_index(i))
+
+ def _column(self, int i):
+ """
+ Select single column from record batch by its numeric index.
+
+ Parameters
+ ----------
+ i : int
+ The index of the column to retrieve.
+
+ Returns
+ -------
+ column : pyarrow.Array
+ """
+ cdef int index = <int> _normalize_index(i, self.num_columns)
+ cdef Array result = pyarrow_wrap_array(self.batch.column(index))
+ result._name = self.schema[index].name
+ return result
+
+ @property
+ def nbytes(self):
+ """
+ Total number of bytes consumed by the elements of the record batch.
+ """
+ size = 0
+ for i in range(self.num_columns):
+ size += self.column(i).nbytes
+ return size
+
+ def __sizeof__(self):
+ return super(RecordBatch, self).__sizeof__() + self.nbytes
+
+ def __getitem__(self, key):
+ """
+ Slice or return column at given index or column name
+
+ Parameters
+ ----------
+ key : integer, str, or slice
+ Slices with step not equal to 1 (or None) will produce a copy
+ rather than a zero-copy view
+
+ Returns
+ -------
+ value : Array (index/column) or RecordBatch (slice)
+ """
+ if isinstance(key, slice):
+ return _normalize_slice(self, key)
+ else:
+ return self.column(key)
+
+ def serialize(self, memory_pool=None):
+ """
+ Write RecordBatch to Buffer as encapsulated IPC message.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ Uses default memory pool if not specified
+
+ Returns
+ -------
+ serialized : Buffer
+ """
+ cdef shared_ptr[CBuffer] buffer
+ cdef CIpcWriteOptions options = CIpcWriteOptions.Defaults()
+ options.memory_pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ buffer = GetResultValue(
+ SerializeRecordBatch(deref(self.batch), options))
+ return pyarrow_wrap_buffer(buffer)
+
+ def slice(self, offset=0, length=None):
+ """
+ Compute zero-copy slice of this RecordBatch
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Offset from start of record batch to slice
+ length : int, default None
+ Length of slice (default is until end of batch starting from
+ offset)
+
+ Returns
+ -------
+ sliced : RecordBatch
+ """
+ cdef shared_ptr[CRecordBatch] result
+
+ if offset < 0:
+ raise IndexError('Offset must be non-negative')
+
+ offset = min(len(self), offset)
+ if length is None:
+ result = self.batch.Slice(offset)
+ else:
+ result = self.batch.Slice(offset, length)
+
+ return pyarrow_wrap_batch(result)
+
+ def filter(self, mask, object null_selection_behavior="drop"):
+ """
+ Select record from a record batch. See pyarrow.compute.filter for full
+ usage.
+ """
+ return _pc().filter(self, mask, null_selection_behavior)
+
+ def equals(self, object other, bint check_metadata=False):
+ """
+ Check if contents of two record batches are equal.
+
+ Parameters
+ ----------
+ other : pyarrow.RecordBatch
+ RecordBatch to compare against.
+ check_metadata : bool, default False
+ Whether schema metadata equality should be checked as well.
+
+ Returns
+ -------
+ are_equal : bool
+ """
+ cdef:
+ CRecordBatch* this_batch = self.batch
+ shared_ptr[CRecordBatch] other_batch = pyarrow_unwrap_batch(other)
+ c_bool result
+
+ if not other_batch:
+ return False
+
+ with nogil:
+ result = this_batch.Equals(deref(other_batch), check_metadata)
+
+ return result
+
+ def take(self, object indices):
+ """
+ Select records from a RecordBatch. See pyarrow.compute.take for full
+ usage.
+ """
+ return _pc().take(self, indices)
+
+ def drop_null(self):
+ """
+ Remove missing values from a RecordBatch.
+ See pyarrow.compute.drop_null for full usage.
+ """
+ return _pc().drop_null(self)
+
+ def to_pydict(self):
+ """
+ Convert the RecordBatch to a dict or OrderedDict.
+
+ Returns
+ -------
+ dict
+ """
+ entries = []
+ for i in range(self.batch.num_columns()):
+ name = bytes(self.batch.column_name(i)).decode('utf8')
+ column = self[i].to_pylist()
+ entries.append((name, column))
+ return ordered_dict(entries)
+
+ def _to_pandas(self, options, **kwargs):
+ return Table.from_batches([self])._to_pandas(options, **kwargs)
+
+ @classmethod
+ def from_pandas(cls, df, Schema schema=None, preserve_index=None,
+ nthreads=None, columns=None):
+ """
+ Convert pandas.DataFrame to an Arrow RecordBatch
+
+ Parameters
+ ----------
+ df : pandas.DataFrame
+ schema : pyarrow.Schema, optional
+ The expected schema of the RecordBatch. This can be used to
+ indicate the type of columns if we cannot infer it automatically.
+ If passed, the output will have exactly this schema. Columns
+ specified in the schema that are not found in the DataFrame columns
+ or its index will raise an error. Additional columns or index
+ levels in the DataFrame which are not specified in the schema will
+ be ignored.
+ preserve_index : bool, optional
+ Whether to store the index as an additional column in the resulting
+ ``RecordBatch``. The default of None will store the index as a
+ column, except for RangeIndex which is stored as metadata only. Use
+ ``preserve_index=True`` to force it to be stored as a column.
+ nthreads : int, default None (may use up to system CPU count threads)
+ If greater than 1, convert columns to Arrow in parallel using
+ indicated number of threads
+ columns : list, optional
+ List of column to be converted. If None, use all columns.
+
+ Returns
+ -------
+ pyarrow.RecordBatch
+ """
+ from pyarrow.pandas_compat import dataframe_to_arrays
+ arrays, schema = dataframe_to_arrays(
+ df, schema, preserve_index, nthreads=nthreads, columns=columns
+ )
+ return cls.from_arrays(arrays, schema=schema)
+
+ @staticmethod
+ def from_arrays(list arrays, names=None, schema=None, metadata=None):
+ """
+ Construct a RecordBatch from multiple pyarrow.Arrays
+
+ Parameters
+ ----------
+ arrays : list of pyarrow.Array
+ One for each field in RecordBatch
+ names : list of str, optional
+ Names for the batch fields. If not passed, schema must be passed
+ schema : Schema, default None
+ Schema for the created batch. If not passed, names must be passed
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if inferred).
+
+ Returns
+ -------
+ pyarrow.RecordBatch
+ """
+ cdef:
+ Array arr
+ shared_ptr[CSchema] c_schema
+ vector[shared_ptr[CArray]] c_arrays
+ int64_t num_rows
+
+ if len(arrays) > 0:
+ num_rows = len(arrays[0])
+ else:
+ num_rows = 0
+
+ if isinstance(names, Schema):
+ import warnings
+ warnings.warn("Schema passed to names= option, please "
+ "pass schema= explicitly. "
+ "Will raise exception in future", FutureWarning)
+ schema = names
+ names = None
+
+ converted_arrays = _sanitize_arrays(arrays, names, schema, metadata,
+ &c_schema)
+
+ c_arrays.reserve(len(arrays))
+ for arr in converted_arrays:
+ if len(arr) != num_rows:
+ raise ValueError('Arrays were not all the same length: '
+ '{0} vs {1}'.format(len(arr), num_rows))
+ c_arrays.push_back(arr.sp_array)
+
+ result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows,
+ c_arrays))
+ result.validate()
+ return result
+
+ @staticmethod
+ def from_struct_array(StructArray struct_array):
+ """
+ Construct a RecordBatch from a StructArray.
+
+ Each field in the StructArray will become a column in the resulting
+ ``RecordBatch``.
+
+ Parameters
+ ----------
+ struct_array : StructArray
+ Array to construct the record batch from.
+
+ Returns
+ -------
+ pyarrow.RecordBatch
+ """
+ cdef:
+ shared_ptr[CRecordBatch] c_record_batch
+ with nogil:
+ c_record_batch = GetResultValue(
+ CRecordBatch.FromStructArray(struct_array.sp_array))
+ return pyarrow_wrap_batch(c_record_batch)
+
+ def _export_to_c(self, uintptr_t out_ptr, uintptr_t out_schema_ptr=0):
+ """
+ Export to a C ArrowArray struct, given its pointer.
+
+ If a C ArrowSchema struct pointer is also given, the record batch
+ schema is exported to it at the same time.
+
+ Parameters
+ ----------
+ out_ptr: int
+ The raw pointer to a C ArrowArray struct.
+ out_schema_ptr: int (optional)
+ The raw pointer to a C ArrowSchema struct.
+
+ Be careful: if you don't pass the ArrowArray struct to a consumer,
+ array memory will leak. This is a low-level function intended for
+ expert users.
+ """
+ with nogil:
+ check_status(ExportRecordBatch(deref(self.sp_batch),
+ <ArrowArray*> out_ptr,
+ <ArrowSchema*> out_schema_ptr))
+
+ @staticmethod
+ def _import_from_c(uintptr_t in_ptr, schema):
+ """
+ Import RecordBatch from a C ArrowArray struct, given its pointer
+ and the imported schema.
+
+ Parameters
+ ----------
+ in_ptr: int
+ The raw pointer to a C ArrowArray struct.
+ type: Schema or int
+ Either a Schema object, or the raw pointer to a C ArrowSchema
+ struct.
+
+ This is a low-level function intended for expert users.
+ """
+ cdef:
+ shared_ptr[CRecordBatch] c_batch
+
+ c_schema = pyarrow_unwrap_schema(schema)
+ if c_schema == nullptr:
+ # Not a Schema object, perhaps a raw ArrowSchema pointer
+ schema_ptr = <uintptr_t> schema
+ with nogil:
+ c_batch = GetResultValue(ImportRecordBatch(
+ <ArrowArray*> in_ptr, <ArrowSchema*> schema_ptr))
+ else:
+ with nogil:
+ c_batch = GetResultValue(ImportRecordBatch(
+ <ArrowArray*> in_ptr, c_schema))
+ return pyarrow_wrap_batch(c_batch)
+
+
+def _reconstruct_record_batch(columns, schema):
+ """
+ Internal: reconstruct RecordBatch from pickled components.
+ """
+ return RecordBatch.from_arrays(columns, schema=schema)
+
+
+def table_to_blocks(options, Table table, categories, extension_columns):
+ cdef:
+ PyObject* result_obj
+ shared_ptr[CTable] c_table
+ CMemoryPool* pool
+ PandasOptions c_options = _convert_pandas_options(options)
+
+ if categories is not None:
+ c_options.categorical_columns = {tobytes(cat) for cat in categories}
+ if extension_columns is not None:
+ c_options.extension_columns = {tobytes(col)
+ for col in extension_columns}
+
+ # ARROW-3789(wesm); Convert date/timestamp types to datetime64[ns]
+ c_options.coerce_temporal_nanoseconds = True
+
+ if c_options.self_destruct:
+ # Move the shared_ptr, table is now unsafe to use further
+ c_table = move(table.sp_table)
+ table.table = NULL
+ else:
+ c_table = table.sp_table
+
+ with nogil:
+ check_status(
+ libarrow.ConvertTableToPandas(c_options, move(c_table),
+ &result_obj)
+ )
+
+ return PyObject_to_object(result_obj)
+
+
+cdef class Table(_PandasConvertible):
+ """
+ A collection of top-level named, equal length Arrow arrays.
+
+ Warning
+ -------
+ Do not call this class's constructor directly, use one of the ``from_*``
+ methods instead.
+ """
+
+ def __cinit__(self):
+ self.table = NULL
+
+ def __init__(self):
+ raise TypeError("Do not call Table's constructor directly, use one of "
+ "the `Table.from_*` functions instead.")
+
+ def to_string(self, *, show_metadata=False, preview_cols=0):
+ """
+ Return human-readable string representation of Table.
+
+ Parameters
+ ----------
+ show_metadata : bool, default True
+ Display Field-level and Schema-level KeyValueMetadata.
+ preview_cols : int, default 0
+ Display values of the columns for the first N columns.
+
+ Returns
+ -------
+ str
+ """
+ # Use less verbose schema output.
+ schema_as_string = self.schema.to_string(
+ show_field_metadata=show_metadata,
+ show_schema_metadata=show_metadata
+ )
+ title = 'pyarrow.{}\n{}'.format(type(self).__name__, schema_as_string)
+ pieces = [title]
+ if preview_cols:
+ pieces.append('----')
+ for i in range(min(self.num_columns, preview_cols)):
+ pieces.append('{}: {}'.format(
+ self.field(i).name,
+ self.column(i).to_string(indent=0, skip_new_lines=True)
+ ))
+ if preview_cols < self.num_columns:
+ pieces.append('...')
+ return '\n'.join(pieces)
+
+ def __repr__(self):
+ if self.table == NULL:
+ raise ValueError("Table's internal pointer is NULL, do not use "
+ "any methods or attributes on this object")
+ return self.to_string(preview_cols=10)
+
+ cdef void init(self, const shared_ptr[CTable]& table):
+ self.sp_table = table
+ self.table = table.get()
+
+ def validate(self, *, full=False):
+ """
+ Perform validation checks. An exception is raised if validation fails.
+
+ By default only cheap validation checks are run. Pass `full=True`
+ for thorough validation checks (potentially O(n)).
+
+ Parameters
+ ----------
+ full: bool, default False
+ If True, run expensive checks, otherwise cheap checks only.
+
+ Raises
+ ------
+ ArrowInvalid
+ """
+ if full:
+ with nogil:
+ check_status(self.table.ValidateFull())
+ else:
+ with nogil:
+ check_status(self.table.Validate())
+
+ def __reduce__(self):
+ # Reduce the columns as ChunkedArrays to avoid serializing schema
+ # data twice
+ columns = [col for col in self.columns]
+ return _reconstruct_table, (columns, self.schema)
+
+ def __getitem__(self, key):
+ """
+ Slice or return column at given index or column name.
+
+ Parameters
+ ----------
+ key : integer, str, or slice
+ Slices with step not equal to 1 (or None) will produce a copy
+ rather than a zero-copy view.
+
+ Returns
+ -------
+ ChunkedArray (index/column) or Table (slice)
+ """
+ if isinstance(key, slice):
+ return _normalize_slice(self, key)
+ else:
+ return self.column(key)
+
+ def slice(self, offset=0, length=None):
+ """
+ Compute zero-copy slice of this Table.
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Offset from start of table to slice.
+ length : int, default None
+ Length of slice (default is until end of table starting from
+ offset).
+
+ Returns
+ -------
+ Table
+ """
+ cdef shared_ptr[CTable] result
+
+ if offset < 0:
+ raise IndexError('Offset must be non-negative')
+
+ offset = min(len(self), offset)
+ if length is None:
+ result = self.table.Slice(offset)
+ else:
+ result = self.table.Slice(offset, length)
+
+ return pyarrow_wrap_table(result)
+
+ def filter(self, mask, object null_selection_behavior="drop"):
+ """
+ Select records from a Table. See :func:`pyarrow.compute.filter` for
+ full usage.
+ """
+ return _pc().filter(self, mask, null_selection_behavior)
+
+ def take(self, object indices):
+ """
+ Select records from a Table. See :func:`pyarrow.compute.take` for full
+ usage.
+ """
+ return _pc().take(self, indices)
+
+ def drop_null(self):
+ """
+ Remove missing values from a Table.
+ See :func:`pyarrow.compute.drop_null` for full usage.
+ """
+ return _pc().drop_null(self)
+
+ def select(self, object columns):
+ """
+ Select columns of the Table.
+
+ Returns a new Table with the specified columns, and metadata
+ preserved.
+
+ Parameters
+ ----------
+ columns : list-like
+ The column names or integer indices to select.
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+ vector[int] c_indices
+
+ for idx in columns:
+ idx = self._ensure_integer_index(idx)
+ idx = _normalize_index(idx, self.num_columns)
+ c_indices.push_back(<int> idx)
+
+ with nogil:
+ c_table = GetResultValue(self.table.SelectColumns(move(c_indices)))
+
+ return pyarrow_wrap_table(c_table)
+
+ def replace_schema_metadata(self, metadata=None):
+ """
+ Create shallow copy of table by replacing schema
+ key-value metadata with the indicated new metadata (which may be None),
+ which deletes any existing metadata.
+
+ Parameters
+ ----------
+ metadata : dict, default None
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ shared_ptr[const CKeyValueMetadata] c_meta
+ shared_ptr[CTable] c_table
+
+ metadata = ensure_metadata(metadata, allow_none=True)
+ c_meta = pyarrow_unwrap_metadata(metadata)
+ with nogil:
+ c_table = self.table.ReplaceSchemaMetadata(c_meta)
+
+ return pyarrow_wrap_table(c_table)
+
+ def flatten(self, MemoryPool memory_pool=None):
+ """
+ Flatten this Table.
+
+ Each column with a struct type is flattened
+ into one column per struct field. Other columns are left unchanged.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ shared_ptr[CTable] flattened
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ flattened = GetResultValue(self.table.Flatten(pool))
+
+ return pyarrow_wrap_table(flattened)
+
+ def combine_chunks(self, MemoryPool memory_pool=None):
+ """
+ Make a new table by combining the chunks this table has.
+
+ All the underlying chunks in the ChunkedArray of each column are
+ concatenated into zero or one chunk.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool.
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ shared_ptr[CTable] combined
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ combined = GetResultValue(self.table.CombineChunks(pool))
+
+ return pyarrow_wrap_table(combined)
+
+ def unify_dictionaries(self, MemoryPool memory_pool=None):
+ """
+ Unify dictionaries across all chunks.
+
+ This method returns an equivalent table, but where all chunks of
+ each column share the same dictionary values. Dictionary indices
+ are transposed accordingly.
+
+ Columns without dictionaries are returned unchanged.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+ shared_ptr[CTable] c_result
+
+ with nogil:
+ c_result = GetResultValue(CDictionaryUnifier.UnifyTable(
+ deref(self.table), pool))
+
+ return pyarrow_wrap_table(c_result)
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def equals(self, Table other, bint check_metadata=False):
+ """
+ Check if contents of two tables are equal.
+
+ Parameters
+ ----------
+ other : pyarrow.Table
+ Table to compare against.
+ check_metadata : bool, default False
+ Whether schema metadata equality should be checked as well.
+
+ Returns
+ -------
+ bool
+ """
+ if other is None:
+ return False
+
+ cdef:
+ CTable* this_table = self.table
+ CTable* other_table = other.table
+ c_bool result
+
+ with nogil:
+ result = this_table.Equals(deref(other_table), check_metadata)
+
+ return result
+
+ def cast(self, Schema target_schema, bint safe=True):
+ """
+ Cast table values to another schema.
+
+ Parameters
+ ----------
+ target_schema : Schema
+ Schema to cast to, the names and order of fields must match.
+ safe : bool, default True
+ Check for overflows or other unsafe conversions.
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ ChunkedArray column, casted
+ Field field
+ list newcols = []
+
+ if self.schema.names != target_schema.names:
+ raise ValueError("Target schema's field names are not matching "
+ "the table's field names: {!r}, {!r}"
+ .format(self.schema.names, target_schema.names))
+
+ for column, field in zip(self.itercolumns(), target_schema):
+ casted = column.cast(field.type, safe=safe)
+ newcols.append(casted)
+
+ return Table.from_arrays(newcols, schema=target_schema)
+
+ @classmethod
+ def from_pandas(cls, df, Schema schema=None, preserve_index=None,
+ nthreads=None, columns=None, bint safe=True):
+ """
+ Convert pandas.DataFrame to an Arrow Table.
+
+ The column types in the resulting Arrow Table are inferred from the
+ dtypes of the pandas.Series in the DataFrame. In the case of non-object
+ Series, the NumPy dtype is translated to its Arrow equivalent. In the
+ case of `object`, we need to guess the datatype by looking at the
+ Python objects in this Series.
+
+ Be aware that Series of the `object` dtype don't carry enough
+ information to always lead to a meaningful Arrow type. In the case that
+ we cannot infer a type, e.g. because the DataFrame is of length 0 or
+ the Series only contains None/nan objects, the type is set to
+ null. This behavior can be avoided by constructing an explicit schema
+ and passing it to this function.
+
+ Parameters
+ ----------
+ df : pandas.DataFrame
+ schema : pyarrow.Schema, optional
+ The expected schema of the Arrow Table. This can be used to
+ indicate the type of columns if we cannot infer it automatically.
+ If passed, the output will have exactly this schema. Columns
+ specified in the schema that are not found in the DataFrame columns
+ or its index will raise an error. Additional columns or index
+ levels in the DataFrame which are not specified in the schema will
+ be ignored.
+ preserve_index : bool, optional
+ Whether to store the index as an additional column in the resulting
+ ``Table``. The default of None will store the index as a column,
+ except for RangeIndex which is stored as metadata only. Use
+ ``preserve_index=True`` to force it to be stored as a column.
+ nthreads : int, default None (may use up to system CPU count threads)
+ If greater than 1, convert columns to Arrow in parallel using
+ indicated number of threads.
+ columns : list, optional
+ List of column to be converted. If None, use all columns.
+ safe : bool, default True
+ Check for overflows or other unsafe conversions.
+
+ Returns
+ -------
+ Table
+
+ Examples
+ --------
+
+ >>> import pandas as pd
+ >>> import pyarrow as pa
+ >>> df = pd.DataFrame({
+ ... 'int': [1, 2],
+ ... 'str': ['a', 'b']
+ ... })
+ >>> pa.Table.from_pandas(df)
+ <pyarrow.lib.Table object at 0x7f05d1fb1b40>
+ """
+ from pyarrow.pandas_compat import dataframe_to_arrays
+ arrays, schema = dataframe_to_arrays(
+ df,
+ schema=schema,
+ preserve_index=preserve_index,
+ nthreads=nthreads,
+ columns=columns,
+ safe=safe
+ )
+ return cls.from_arrays(arrays, schema=schema)
+
+ @staticmethod
+ def from_arrays(arrays, names=None, schema=None, metadata=None):
+ """
+ Construct a Table from Arrow arrays.
+
+ Parameters
+ ----------
+ arrays : list of pyarrow.Array or pyarrow.ChunkedArray
+ Equal-length arrays that should form the table.
+ names : list of str, optional
+ Names for the table columns. If not passed, schema must be passed.
+ schema : Schema, default None
+ Schema for the created table. If not passed, names must be passed.
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if inferred).
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ vector[shared_ptr[CChunkedArray]] columns
+ shared_ptr[CSchema] c_schema
+ int i, K = <int> len(arrays)
+
+ converted_arrays = _sanitize_arrays(arrays, names, schema, metadata,
+ &c_schema)
+
+ columns.reserve(K)
+ for item in converted_arrays:
+ if isinstance(item, Array):
+ columns.push_back(
+ make_shared[CChunkedArray](
+ (<Array> item).sp_array
+ )
+ )
+ elif isinstance(item, ChunkedArray):
+ columns.push_back((<ChunkedArray> item).sp_chunked_array)
+ else:
+ raise TypeError(type(item))
+
+ result = pyarrow_wrap_table(CTable.Make(c_schema, columns))
+ result.validate()
+ return result
+
+ @staticmethod
+ def from_pydict(mapping, schema=None, metadata=None):
+ """
+ Construct a Table from Arrow arrays or columns.
+
+ Parameters
+ ----------
+ mapping : dict or Mapping
+ A mapping of strings to Arrays or Python lists.
+ schema : Schema, default None
+ If not passed, will be inferred from the Mapping values.
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if inferred).
+
+ Returns
+ -------
+ Table
+ """
+
+ return _from_pydict(cls=Table,
+ mapping=mapping,
+ schema=schema,
+ metadata=metadata)
+
+ @staticmethod
+ def from_batches(batches, Schema schema=None):
+ """
+ Construct a Table from a sequence or iterator of Arrow RecordBatches.
+
+ Parameters
+ ----------
+ batches : sequence or iterator of RecordBatch
+ Sequence of RecordBatch to be converted, all schemas must be equal.
+ schema : Schema, default None
+ If not passed, will be inferred from the first RecordBatch.
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] c_batches
+ shared_ptr[CTable] c_table
+ shared_ptr[CSchema] c_schema
+ RecordBatch batch
+
+ for batch in batches:
+ c_batches.push_back(batch.sp_batch)
+
+ if schema is None:
+ if c_batches.size() == 0:
+ raise ValueError('Must pass schema, or at least '
+ 'one RecordBatch')
+ c_schema = c_batches[0].get().schema()
+ else:
+ c_schema = schema.sp_schema
+
+ with nogil:
+ c_table = GetResultValue(
+ CTable.FromRecordBatches(c_schema, move(c_batches)))
+
+ return pyarrow_wrap_table(c_table)
+
+ def to_batches(self, max_chunksize=None, **kwargs):
+ """
+ Convert Table to list of (contiguous) RecordBatch objects.
+
+ Parameters
+ ----------
+ max_chunksize : int, default None
+ Maximum size for RecordBatch chunks. Individual chunks may be
+ smaller depending on the chunk layout of individual columns.
+
+ Returns
+ -------
+ list of RecordBatch
+ """
+ cdef:
+ unique_ptr[TableBatchReader] reader
+ int64_t c_max_chunksize
+ list result = []
+ shared_ptr[CRecordBatch] batch
+
+ reader.reset(new TableBatchReader(deref(self.table)))
+
+ if 'chunksize' in kwargs:
+ max_chunksize = kwargs['chunksize']
+ msg = ('The parameter chunksize is deprecated for '
+ 'pyarrow.Table.to_batches as of 0.15, please use '
+ 'the parameter max_chunksize instead')
+ warnings.warn(msg, FutureWarning)
+
+ if max_chunksize is not None:
+ c_max_chunksize = max_chunksize
+ reader.get().set_chunksize(c_max_chunksize)
+
+ while True:
+ with nogil:
+ check_status(reader.get().ReadNext(&batch))
+
+ if batch.get() == NULL:
+ break
+
+ result.append(pyarrow_wrap_batch(batch))
+
+ return result
+
+ def _to_pandas(self, options, categories=None, ignore_metadata=False,
+ types_mapper=None):
+ from pyarrow.pandas_compat import table_to_blockmanager
+ mgr = table_to_blockmanager(
+ options, self, categories,
+ ignore_metadata=ignore_metadata,
+ types_mapper=types_mapper)
+ return pandas_api.data_frame(mgr)
+
+ def to_pydict(self):
+ """
+ Convert the Table to a dict or OrderedDict.
+
+ Returns
+ -------
+ dict
+ """
+ cdef:
+ size_t i
+ size_t num_columns = self.table.num_columns()
+ list entries = []
+ ChunkedArray column
+
+ for i in range(num_columns):
+ column = self.column(i)
+ entries.append((self.field(i).name, column.to_pylist()))
+
+ return ordered_dict(entries)
+
+ @property
+ def schema(self):
+ """
+ Schema of the table and its columns.
+
+ Returns
+ -------
+ Schema
+ """
+ return pyarrow_wrap_schema(self.table.schema())
+
+ def field(self, i):
+ """
+ Select a schema field by its column name or numeric index.
+
+ Parameters
+ ----------
+ i : int or string
+ The index or name of the field to retrieve.
+
+ Returns
+ -------
+ Field
+ """
+ return self.schema.field(i)
+
+ def _ensure_integer_index(self, i):
+ """
+ Ensure integer index (convert string column name to integer if needed).
+ """
+ if isinstance(i, (bytes, str)):
+ field_indices = self.schema.get_all_field_indices(i)
+
+ if len(field_indices) == 0:
+ raise KeyError("Field \"{}\" does not exist in table schema"
+ .format(i))
+ elif len(field_indices) > 1:
+ raise KeyError("Field \"{}\" exists {} times in table schema"
+ .format(i, len(field_indices)))
+ else:
+ return field_indices[0]
+ elif isinstance(i, int):
+ return i
+ else:
+ raise TypeError("Index must either be string or integer")
+
+ def column(self, i):
+ """
+ Select a column by its column name, or numeric index.
+
+ Parameters
+ ----------
+ i : int or string
+ The index or name of the column to retrieve.
+
+ Returns
+ -------
+ ChunkedArray
+ """
+ return self._column(self._ensure_integer_index(i))
+
+ def _column(self, int i):
+ """
+ Select a column by its numeric index.
+
+ Parameters
+ ----------
+ i : int
+ The index of the column to retrieve.
+
+ Returns
+ -------
+ ChunkedArray
+ """
+ cdef int index = <int> _normalize_index(i, self.num_columns)
+ cdef ChunkedArray result = pyarrow_wrap_chunked_array(
+ self.table.column(index))
+ result._name = self.schema[index].name
+ return result
+
+ def itercolumns(self):
+ """
+ Iterator over all columns in their numerical order.
+
+ Yields
+ ------
+ ChunkedArray
+ """
+ for i in range(self.num_columns):
+ yield self._column(i)
+
+ @property
+ def columns(self):
+ """
+ List of all columns in numerical order.
+
+ Returns
+ -------
+ list of ChunkedArray
+ """
+ return [self._column(i) for i in range(self.num_columns)]
+
+ @property
+ def num_columns(self):
+ """
+ Number of columns in this table.
+
+ Returns
+ -------
+ int
+ """
+ return self.table.num_columns()
+
+ @property
+ def num_rows(self):
+ """
+ Number of rows in this table.
+
+ Due to the definition of a table, all columns have the same number of
+ rows.
+
+ Returns
+ -------
+ int
+ """
+ return self.table.num_rows()
+
+ def __len__(self):
+ return self.num_rows
+
+ @property
+ def shape(self):
+ """
+ Dimensions of the table: (#rows, #columns).
+
+ Returns
+ -------
+ (int, int)
+ Number of rows and number of columns.
+ """
+ return (self.num_rows, self.num_columns)
+
+ @property
+ def nbytes(self):
+ """
+ Total number of bytes consumed by the elements of the table.
+
+ Returns
+ -------
+ int
+ """
+ size = 0
+ for column in self.itercolumns():
+ size += column.nbytes
+ return size
+
+ def __sizeof__(self):
+ return super(Table, self).__sizeof__() + self.nbytes
+
+ def add_column(self, int i, field_, column):
+ """
+ Add column to Table at position.
+
+ A new table is returned with the column added, the original table
+ object is left unchanged.
+
+ Parameters
+ ----------
+ i : int
+ Index to place the column at.
+ field_ : str or Field
+ If a string is passed then the type is deduced from the column
+ data.
+ column : Array, list of Array, or values coercible to arrays
+ Column data.
+
+ Returns
+ -------
+ Table
+ New table with the passed column added.
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+ Field c_field
+ ChunkedArray c_arr
+
+ if isinstance(column, ChunkedArray):
+ c_arr = column
+ else:
+ c_arr = chunked_array(column)
+
+ if isinstance(field_, Field):
+ c_field = field_
+ else:
+ c_field = field(field_, c_arr.type)
+
+ with nogil:
+ c_table = GetResultValue(self.table.AddColumn(
+ i, c_field.sp_field, c_arr.sp_chunked_array))
+
+ return pyarrow_wrap_table(c_table)
+
+ def append_column(self, field_, column):
+ """
+ Append column at end of columns.
+
+ Parameters
+ ----------
+ field_ : str or Field
+ If a string is passed then the type is deduced from the column
+ data.
+ column : Array, list of Array, or values coercible to arrays
+ Column data.
+
+ Returns
+ -------
+ Table
+ New table with the passed column added.
+ """
+ return self.add_column(self.num_columns, field_, column)
+
+ def remove_column(self, int i):
+ """
+ Create new Table with the indicated column removed.
+
+ Parameters
+ ----------
+ i : int
+ Index of column to remove.
+
+ Returns
+ -------
+ Table
+ New table without the column.
+ """
+ cdef shared_ptr[CTable] c_table
+
+ with nogil:
+ c_table = GetResultValue(self.table.RemoveColumn(i))
+
+ return pyarrow_wrap_table(c_table)
+
+ def set_column(self, int i, field_, column):
+ """
+ Replace column in Table at position.
+
+ Parameters
+ ----------
+ i : int
+ Index to place the column at.
+ field_ : str or Field
+ If a string is passed then the type is deduced from the column
+ data.
+ column : Array, list of Array, or values coercible to arrays
+ Column data.
+
+ Returns
+ -------
+ Table
+ New table with the passed column set.
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+ Field c_field
+ ChunkedArray c_arr
+
+ if isinstance(column, ChunkedArray):
+ c_arr = column
+ else:
+ c_arr = chunked_array(column)
+
+ if isinstance(field_, Field):
+ c_field = field_
+ else:
+ c_field = field(field_, c_arr.type)
+
+ with nogil:
+ c_table = GetResultValue(self.table.SetColumn(
+ i, c_field.sp_field, c_arr.sp_chunked_array))
+
+ return pyarrow_wrap_table(c_table)
+
+ @property
+ def column_names(self):
+ """
+ Names of the table's columns.
+
+ Returns
+ -------
+ list of str
+ """
+ names = self.table.ColumnNames()
+ return [frombytes(name) for name in names]
+
+ def rename_columns(self, names):
+ """
+ Create new table with columns renamed to provided names.
+
+ Parameters
+ ----------
+ names : list of str
+ List of new column names.
+
+ Returns
+ -------
+ Table
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+ vector[c_string] c_names
+
+ for name in names:
+ c_names.push_back(tobytes(name))
+
+ with nogil:
+ c_table = GetResultValue(self.table.RenameColumns(move(c_names)))
+
+ return pyarrow_wrap_table(c_table)
+
+ def drop(self, columns):
+ """
+ Drop one or more columns and return a new table.
+
+ Parameters
+ ----------
+ columns : list of str
+ List of field names referencing existing columns.
+
+ Raises
+ ------
+ KeyError
+ If any of the passed columns name are not existing.
+
+ Returns
+ -------
+ Table
+ New table without the columns.
+ """
+ indices = []
+ for col in columns:
+ idx = self.schema.get_field_index(col)
+ if idx == -1:
+ raise KeyError("Column {!r} not found".format(col))
+ indices.append(idx)
+
+ indices.sort()
+ indices.reverse()
+
+ table = self
+ for idx in indices:
+ table = table.remove_column(idx)
+
+ return table
+
+
+def _reconstruct_table(arrays, schema):
+ """
+ Internal: reconstruct pa.Table from pickled components.
+ """
+ return Table.from_arrays(arrays, schema=schema)
+
+
+def record_batch(data, names=None, schema=None, metadata=None):
+ """
+ Create a pyarrow.RecordBatch from another Python data structure or sequence
+ of arrays.
+
+ Parameters
+ ----------
+ data : pandas.DataFrame, list
+ A DataFrame or list of arrays or chunked arrays.
+ names : list, default None
+ Column names if list of arrays passed as data. Mutually exclusive with
+ 'schema' argument.
+ schema : Schema, default None
+ The expected schema of the RecordBatch. If not passed, will be inferred
+ from the data. Mutually exclusive with 'names' argument.
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if schema not passed).
+
+ Returns
+ -------
+ RecordBatch
+
+ See Also
+ --------
+ RecordBatch.from_arrays, RecordBatch.from_pandas, table
+ """
+ # accept schema as first argument for backwards compatibility / usability
+ if isinstance(names, Schema) and schema is None:
+ schema = names
+ names = None
+
+ if isinstance(data, (list, tuple)):
+ return RecordBatch.from_arrays(data, names=names, schema=schema,
+ metadata=metadata)
+ elif _pandas_api.is_data_frame(data):
+ return RecordBatch.from_pandas(data, schema=schema)
+ else:
+ raise TypeError("Expected pandas DataFrame or list of arrays")
+
+
+def table(data, names=None, schema=None, metadata=None, nthreads=None):
+ """
+ Create a pyarrow.Table from a Python data structure or sequence of arrays.
+
+ Parameters
+ ----------
+ data : pandas.DataFrame, dict, list
+ A DataFrame, mapping of strings to Arrays or Python lists, or list of
+ arrays or chunked arrays.
+ names : list, default None
+ Column names if list of arrays passed as data. Mutually exclusive with
+ 'schema' argument.
+ schema : Schema, default None
+ The expected schema of the Arrow Table. If not passed, will be inferred
+ from the data. Mutually exclusive with 'names' argument.
+ If passed, the output will have exactly this schema (raising an error
+ when columns are not found in the data and ignoring additional data not
+ specified in the schema, when data is a dict or DataFrame).
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if schema not passed).
+ nthreads : int, default None (may use up to system CPU count threads)
+ For pandas.DataFrame inputs: if greater than 1, convert columns to
+ Arrow in parallel using indicated number of threads.
+
+ Returns
+ -------
+ Table
+
+ See Also
+ --------
+ Table.from_arrays, Table.from_pandas, Table.from_pydict
+ """
+ # accept schema as first argument for backwards compatibility / usability
+ if isinstance(names, Schema) and schema is None:
+ schema = names
+ names = None
+
+ if isinstance(data, (list, tuple)):
+ return Table.from_arrays(data, names=names, schema=schema,
+ metadata=metadata)
+ elif isinstance(data, dict):
+ if names is not None:
+ raise ValueError(
+ "The 'names' argument is not valid when passing a dictionary")
+ return Table.from_pydict(data, schema=schema, metadata=metadata)
+ elif _pandas_api.is_data_frame(data):
+ if names is not None or metadata is not None:
+ raise ValueError(
+ "The 'names' and 'metadata' arguments are not valid when "
+ "passing a pandas DataFrame")
+ return Table.from_pandas(data, schema=schema, nthreads=nthreads)
+ else:
+ raise TypeError(
+ "Expected pandas DataFrame, python dictionary or list of arrays")
+
+
+def concat_tables(tables, c_bool promote=False, MemoryPool memory_pool=None):
+ """
+ Concatenate pyarrow.Table objects.
+
+ If promote==False, a zero-copy concatenation will be performed. The schemas
+ of all the Tables must be the same (except the metadata), otherwise an
+ exception will be raised. The result Table will share the metadata with the
+ first table.
+
+ If promote==True, any null type arrays will be casted to the type of other
+ arrays in the column of the same name. If a table is missing a particular
+ field, null values of the appropriate type will be generated to take the
+ place of the missing field. The new schema will share the metadata with the
+ first table. Each field in the new schema will share the metadata with the
+ first table which has the field defined. Note that type promotions may
+ involve additional allocations on the given ``memory_pool``.
+
+ Parameters
+ ----------
+ tables : iterable of pyarrow.Table objects
+ Pyarrow tables to concatenate into a single Table.
+ promote : bool, default False
+ If True, concatenate tables with null-filling and null type promotion.
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool.
+ """
+ cdef:
+ vector[shared_ptr[CTable]] c_tables
+ shared_ptr[CTable] c_result_table
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+ Table table
+ CConcatenateTablesOptions options = (
+ CConcatenateTablesOptions.Defaults())
+
+ for table in tables:
+ c_tables.push_back(table.sp_table)
+
+ with nogil:
+ options.unify_schemas = promote
+ c_result_table = GetResultValue(
+ ConcatenateTables(c_tables, options, pool))
+
+ return pyarrow_wrap_table(c_result_table)
+
+
+def _from_pydict(cls, mapping, schema, metadata):
+ """
+ Construct a Table/RecordBatch from Arrow arrays or columns.
+
+ Parameters
+ ----------
+ cls : Class Table/RecordBatch
+ mapping : dict or Mapping
+ A mapping of strings to Arrays or Python lists.
+ schema : Schema, default None
+ If not passed, will be inferred from the Mapping values.
+ metadata : dict or Mapping, default None
+ Optional metadata for the schema (if inferred).
+
+ Returns
+ -------
+ Table/RecordBatch
+ """
+
+ arrays = []
+ if schema is None:
+ names = []
+ for k, v in mapping.items():
+ names.append(k)
+ arrays.append(asarray(v))
+ return cls.from_arrays(arrays, names, metadata=metadata)
+ elif isinstance(schema, Schema):
+ for field in schema:
+ try:
+ v = mapping[field.name]
+ except KeyError:
+ try:
+ v = mapping[tobytes(field.name)]
+ except KeyError:
+ present = mapping.keys()
+ missing = [n for n in schema.names if n not in present]
+ raise KeyError(
+ "The passed mapping doesn't contain the "
+ "following field(s) of the schema: {}".
+ format(', '.join(missing))
+ )
+ arrays.append(asarray(v, type=field.type))
+ # Will raise if metadata is not None
+ return cls.from_arrays(arrays, schema=schema, metadata=metadata)
+ else:
+ raise TypeError('Schema must be an instance of pyarrow.Schema')