# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # cython: profile=False # distutils: language = c++ import io from textwrap import indent import warnings import numpy as np from cython.operator cimport dereference as deref from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport (_Weakrefable, Buffer, Array, Schema, check_status, MemoryPool, maybe_unbox_memory_pool, Table, NativeFile, pyarrow_wrap_chunked_array, pyarrow_wrap_schema, pyarrow_wrap_table, pyarrow_wrap_buffer, pyarrow_wrap_batch, NativeFile, get_reader, get_writer, string_to_timeunit) from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream, _stringify_path, _datetime_from_int, tobytes, frombytes) cimport cpython as cp cdef class Statistics(_Weakrefable): def __cinit__(self): pass def __repr__(self): return """{} has_min_max: {} min: {} max: {} null_count: {} distinct_count: {} num_values: {} physical_type: {} logical_type: {} converted_type (legacy): {}""".format(object.__repr__(self), self.has_min_max, self.min, self.max, self.null_count, self.distinct_count, self.num_values, self.physical_type, str(self.logical_type), self.converted_type) def to_dict(self): d = dict( has_min_max=self.has_min_max, min=self.min, max=self.max, null_count=self.null_count, distinct_count=self.distinct_count, num_values=self.num_values, physical_type=self.physical_type ) return d def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented def equals(self, Statistics other): return self.statistics.get().Equals(deref(other.statistics.get())) @property def has_min_max(self): return self.statistics.get().HasMinMax() @property def has_null_count(self): return self.statistics.get().HasNullCount() @property def has_distinct_count(self): return self.statistics.get().HasDistinctCount() @property def min_raw(self): if self.has_min_max: return _cast_statistic_raw_min(self.statistics.get()) else: return None @property def max_raw(self): if self.has_min_max: return _cast_statistic_raw_max(self.statistics.get()) else: return None @property def min(self): if self.has_min_max: return _cast_statistic_min(self.statistics.get()) else: return None @property def max(self): if self.has_min_max: return _cast_statistic_max(self.statistics.get()) else: return None @property def null_count(self): return self.statistics.get().null_count() @property def distinct_count(self): return self.statistics.get().distinct_count() @property def num_values(self): return self.statistics.get().num_values() @property def physical_type(self): raw_physical_type = self.statistics.get().physical_type() return physical_type_name_from_enum(raw_physical_type) @property def logical_type(self): return wrap_logical_type(self.statistics.get().descr().logical_type()) @property def converted_type(self): raw_converted_type = self.statistics.get().descr().converted_type() return converted_type_name_from_enum(raw_converted_type) cdef class ParquetLogicalType(_Weakrefable): cdef: shared_ptr[const CParquetLogicalType] type def __cinit__(self): pass cdef init(self, const shared_ptr[const CParquetLogicalType]& type): self.type = type def __str__(self): return frombytes(self.type.get().ToString(), safe=True) def to_json(self): return frombytes(self.type.get().ToJSON()) @property def type(self): return logical_type_name_from_enum(self.type.get().type()) cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type): cdef ParquetLogicalType out = ParquetLogicalType() out.init(type) return out cdef _cast_statistic_raw_min(CStatistics* statistics): cdef ParquetType physical_type = statistics.physical_type() cdef uint32_t type_length = statistics.descr().type_length() if physical_type == ParquetType_BOOLEAN: return ( statistics).min() elif physical_type == ParquetType_INT32: return ( statistics).min() elif physical_type == ParquetType_INT64: return ( statistics).min() elif physical_type == ParquetType_FLOAT: return ( statistics).min() elif physical_type == ParquetType_DOUBLE: return ( statistics).min() elif physical_type == ParquetType_BYTE_ARRAY: return _box_byte_array(( statistics).min()) elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY: return _box_flba(( statistics).min(), type_length) cdef _cast_statistic_raw_max(CStatistics* statistics): cdef ParquetType physical_type = statistics.physical_type() cdef uint32_t type_length = statistics.descr().type_length() if physical_type == ParquetType_BOOLEAN: return ( statistics).max() elif physical_type == ParquetType_INT32: return ( statistics).max() elif physical_type == ParquetType_INT64: return ( statistics).max() elif physical_type == ParquetType_FLOAT: return ( statistics).max() elif physical_type == ParquetType_DOUBLE: return ( statistics).max() elif physical_type == ParquetType_BYTE_ARRAY: return _box_byte_array(( statistics).max()) elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY: return _box_flba(( statistics).max(), type_length) cdef _cast_statistic_min(CStatistics* statistics): min_raw = _cast_statistic_raw_min(statistics) return _box_logical_type_value(min_raw, statistics.descr()) cdef _cast_statistic_max(CStatistics* statistics): max_raw = _cast_statistic_raw_max(statistics) return _box_logical_type_value(max_raw, statistics.descr()) cdef _box_logical_type_value(object value, const ColumnDescriptor* descr): cdef: const CParquetLogicalType* ltype = descr.logical_type().get() ParquetTimeUnit time_unit const CParquetIntType* itype const CParquetTimestampType* ts_type if ltype.type() == ParquetLogicalType_STRING: return value.decode('utf8') elif ltype.type() == ParquetLogicalType_TIME: time_unit = ( ltype).time_unit() if time_unit == ParquetTimeUnit_MILLIS: return _datetime_from_int(value, unit=TimeUnit_MILLI).time() else: return _datetime_from_int(value, unit=TimeUnit_MICRO).time() elif ltype.type() == ParquetLogicalType_TIMESTAMP: ts_type = ltype time_unit = ts_type.time_unit() if ts_type.is_adjusted_to_utc(): import pytz tzinfo = pytz.utc else: tzinfo = None if time_unit == ParquetTimeUnit_MILLIS: return _datetime_from_int(value, unit=TimeUnit_MILLI, tzinfo=tzinfo) elif time_unit == ParquetTimeUnit_MICROS: return _datetime_from_int(value, unit=TimeUnit_MICRO, tzinfo=tzinfo) elif time_unit == ParquetTimeUnit_NANOS: return _datetime_from_int(value, unit=TimeUnit_NANO, tzinfo=tzinfo) else: raise ValueError("Unsupported time unit") elif ltype.type() == ParquetLogicalType_INT: itype = ltype if not itype.is_signed() and itype.bit_width() == 32: return int(np.int32(value).view(np.uint32)) elif not itype.is_signed() and itype.bit_width() == 64: return int(np.int64(value).view(np.uint64)) else: return value else: # No logical boxing defined return value cdef _box_byte_array(ParquetByteArray val): return cp.PyBytes_FromStringAndSize( val.ptr, val.len) cdef _box_flba(ParquetFLBA val, uint32_t len): return cp.PyBytes_FromStringAndSize( val.ptr, len) cdef class ColumnChunkMetaData(_Weakrefable): def __cinit__(self): pass def __repr__(self): statistics = indent(repr(self.statistics), 4 * ' ') return """{0} file_offset: {1} file_path: {2} physical_type: {3} num_values: {4} path_in_schema: {5} is_stats_set: {6} statistics: {7} compression: {8} encodings: {9} has_dictionary_page: {10} dictionary_page_offset: {11} data_page_offset: {12} total_compressed_size: {13} total_uncompressed_size: {14}""".format(object.__repr__(self), self.file_offset, self.file_path, self.physical_type, self.num_values, self.path_in_schema, self.is_stats_set, statistics, self.compression, self.encodings, self.has_dictionary_page, self.dictionary_page_offset, self.data_page_offset, self.total_compressed_size, self.total_uncompressed_size) def to_dict(self): statistics = self.statistics.to_dict() if self.is_stats_set else None d = dict( file_offset=self.file_offset, file_path=self.file_path, physical_type=self.physical_type, num_values=self.num_values, path_in_schema=self.path_in_schema, is_stats_set=self.is_stats_set, statistics=statistics, compression=self.compression, encodings=self.encodings, has_dictionary_page=self.has_dictionary_page, dictionary_page_offset=self.dictionary_page_offset, data_page_offset=self.data_page_offset, total_compressed_size=self.total_compressed_size, total_uncompressed_size=self.total_uncompressed_size ) return d def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented def equals(self, ColumnChunkMetaData other): return self.metadata.Equals(deref(other.metadata)) @property def file_offset(self): return self.metadata.file_offset() @property def file_path(self): return frombytes(self.metadata.file_path()) @property def physical_type(self): return physical_type_name_from_enum(self.metadata.type()) @property def num_values(self): return self.metadata.num_values() @property def path_in_schema(self): path = self.metadata.path_in_schema().get().ToDotString() return frombytes(path) @property def is_stats_set(self): return self.metadata.is_stats_set() @property def statistics(self): if not self.metadata.is_stats_set(): return None statistics = Statistics() statistics.init(self.metadata.statistics(), self) return statistics @property def compression(self): return compression_name_from_enum(self.metadata.compression()) @property def encodings(self): return tuple(map(encoding_name_from_enum, self.metadata.encodings())) @property def has_dictionary_page(self): return bool(self.metadata.has_dictionary_page()) @property def dictionary_page_offset(self): if self.has_dictionary_page: return self.metadata.dictionary_page_offset() else: return None @property def data_page_offset(self): return self.metadata.data_page_offset() @property def has_index_page(self): raise NotImplementedError('not supported in parquet-cpp') @property def index_page_offset(self): raise NotImplementedError("parquet-cpp doesn't return valid values") @property def total_compressed_size(self): return self.metadata.total_compressed_size() @property def total_uncompressed_size(self): return self.metadata.total_uncompressed_size() cdef class RowGroupMetaData(_Weakrefable): def __cinit__(self, FileMetaData parent, int index): if index < 0 or index >= parent.num_row_groups: raise IndexError('{0} out of bounds'.format(index)) self.up_metadata = parent._metadata.RowGroup(index) self.metadata = self.up_metadata.get() self.parent = parent self.index = index def __reduce__(self): return RowGroupMetaData, (self.parent, self.index) def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented def equals(self, RowGroupMetaData other): return self.metadata.Equals(deref(other.metadata)) def column(self, int i): if i < 0 or i >= self.num_columns: raise IndexError('{0} out of bounds'.format(i)) chunk = ColumnChunkMetaData() chunk.init(self, i) return chunk def __repr__(self): return """{0} num_columns: {1} num_rows: {2} total_byte_size: {3}""".format(object.__repr__(self), self.num_columns, self.num_rows, self.total_byte_size) def to_dict(self): columns = [] d = dict( num_columns=self.num_columns, num_rows=self.num_rows, total_byte_size=self.total_byte_size, columns=columns, ) for i in range(self.num_columns): columns.append(self.column(i).to_dict()) return d @property def num_columns(self): return self.metadata.num_columns() @property def num_rows(self): return self.metadata.num_rows() @property def total_byte_size(self): return self.metadata.total_byte_size() def _reconstruct_filemetadata(Buffer serialized): cdef: FileMetaData metadata = FileMetaData.__new__(FileMetaData) CBuffer *buffer = serialized.buffer.get() uint32_t metadata_len = buffer.size() metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len)) return metadata cdef class FileMetaData(_Weakrefable): def __cinit__(self): pass def __reduce__(self): cdef: NativeFile sink = BufferOutputStream() COutputStream* c_sink = sink.get_output_stream().get() with nogil: self._metadata.WriteTo(c_sink) cdef Buffer buffer = sink.getvalue() return _reconstruct_filemetadata, (buffer,) def __repr__(self): return """{0} created_by: {1} num_columns: {2} num_rows: {3} num_row_groups: {4} format_version: {5} serialized_size: {6}""".format(object.__repr__(self), self.created_by, self.num_columns, self.num_rows, self.num_row_groups, self.format_version, self.serialized_size) def to_dict(self): row_groups = [] d = dict( created_by=self.created_by, num_columns=self.num_columns, num_rows=self.num_rows, num_row_groups=self.num_row_groups, row_groups=row_groups, format_version=self.format_version, serialized_size=self.serialized_size ) for i in range(self.num_row_groups): row_groups.append(self.row_group(i).to_dict()) return d def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented def equals(self, FileMetaData other): return self._metadata.Equals(deref(other._metadata)) @property def schema(self): if self._schema is None: self._schema = ParquetSchema(self) return self._schema @property def serialized_size(self): return self._metadata.size() @property def num_columns(self): return self._metadata.num_columns() @property def num_rows(self): return self._metadata.num_rows() @property def num_row_groups(self): return self._metadata.num_row_groups() @property def format_version(self): cdef ParquetVersion version = self._metadata.version() if version == ParquetVersion_V1: return '1.0' elif version == ParquetVersion_V2_0: return 'pseudo-2.0' elif version == ParquetVersion_V2_4: return '2.4' elif version == ParquetVersion_V2_6: return '2.6' else: warnings.warn('Unrecognized file version, assuming 1.0: {}' .format(version)) return '1.0' @property def created_by(self): return frombytes(self._metadata.created_by()) @property def metadata(self): cdef: unordered_map[c_string, c_string] metadata const CKeyValueMetadata* underlying_metadata underlying_metadata = self._metadata.key_value_metadata().get() if underlying_metadata != NULL: underlying_metadata.ToUnorderedMap(&metadata) return metadata else: return None def row_group(self, int i): return RowGroupMetaData(self, i) def set_file_path(self, path): """ Modify the file_path field of each ColumnChunk in the FileMetaData to be a particular value """ cdef: c_string c_path = tobytes(path) self._metadata.set_file_path(c_path) def append_row_groups(self, FileMetaData other): """ Append row groups of other FileMetaData object """ cdef shared_ptr[CFileMetaData] c_metadata c_metadata = other.sp_metadata self._metadata.AppendRowGroups(deref(c_metadata)) def write_metadata_file(self, where): """ Write the metadata object to a metadata-only file """ cdef: shared_ptr[COutputStream] sink c_string c_where try: where = _stringify_path(where) except TypeError: get_writer(where, &sink) else: c_where = tobytes(where) with nogil: sink = GetResultValue(FileOutputStream.Open(c_where)) with nogil: check_status( WriteMetaDataFile(deref(self._metadata), sink.get())) cdef class ParquetSchema(_Weakrefable): def __cinit__(self, FileMetaData container): self.parent = container self.schema = container._metadata.schema() def __repr__(self): return "{0}\n{1}".format( object.__repr__(self), frombytes(self.schema.ToString(), safe=True)) def __reduce__(self): return ParquetSchema, (self.parent,) def __len__(self): return self.schema.num_columns() def __getitem__(self, i): return self.column(i) @property def names(self): return [self[i].name for i in range(len(self))] def to_arrow_schema(self): """ Convert Parquet schema to effective Arrow schema Returns ------- schema : pyarrow.Schema """ cdef shared_ptr[CSchema] sp_arrow_schema with nogil: check_status(FromParquetSchema( self.schema, default_arrow_reader_properties(), self.parent._metadata.key_value_metadata(), &sp_arrow_schema)) return pyarrow_wrap_schema(sp_arrow_schema) def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented def equals(self, ParquetSchema other): """ Returns True if the Parquet schemas are equal """ return self.schema.Equals(deref(other.schema)) def column(self, i): if i < 0 or i >= len(self): raise IndexError('{0} out of bounds'.format(i)) return ColumnSchema(self, i) cdef class ColumnSchema(_Weakrefable): cdef: int index ParquetSchema parent const ColumnDescriptor* descr def __cinit__(self, ParquetSchema schema, int index): self.parent = schema self.index = index # for pickling support self.descr = schema.schema.Column(index) def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented def __reduce__(self): return ColumnSchema, (self.parent, self.index) def equals(self, ColumnSchema other): """ Returns True if the column schemas are equal """ return self.descr.Equals(deref(other.descr)) def __repr__(self): physical_type = self.physical_type converted_type = self.converted_type if converted_type == 'DECIMAL': converted_type = 'DECIMAL({0}, {1})'.format(self.precision, self.scale) elif physical_type == 'FIXED_LEN_BYTE_ARRAY': converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})' .format(self.length)) return """ name: {0} path: {1} max_definition_level: {2} max_repetition_level: {3} physical_type: {4} logical_type: {5} converted_type (legacy): {6}""".format(self.name, self.path, self.max_definition_level, self.max_repetition_level, physical_type, str(self.logical_type), converted_type) @property def name(self): return frombytes(self.descr.name()) @property def path(self): return frombytes(self.descr.path().get().ToDotString()) @property def max_definition_level(self): return self.descr.max_definition_level() @property def max_repetition_level(self): return self.descr.max_repetition_level() @property def physical_type(self): return physical_type_name_from_enum(self.descr.physical_type()) @property def logical_type(self): return wrap_logical_type(self.descr.logical_type()) @property def converted_type(self): return converted_type_name_from_enum(self.descr.converted_type()) @property def logical_type(self): return wrap_logical_type(self.descr.logical_type()) # FIXED_LEN_BYTE_ARRAY attribute @property def length(self): return self.descr.type_length() # Decimal attributes @property def precision(self): return self.descr.type_precision() @property def scale(self): return self.descr.type_scale() cdef physical_type_name_from_enum(ParquetType type_): return { ParquetType_BOOLEAN: 'BOOLEAN', ParquetType_INT32: 'INT32', ParquetType_INT64: 'INT64', ParquetType_INT96: 'INT96', ParquetType_FLOAT: 'FLOAT', ParquetType_DOUBLE: 'DOUBLE', ParquetType_BYTE_ARRAY: 'BYTE_ARRAY', ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY', }.get(type_, 'UNKNOWN') cdef logical_type_name_from_enum(ParquetLogicalTypeId type_): return { ParquetLogicalType_UNDEFINED: 'UNDEFINED', ParquetLogicalType_STRING: 'STRING', ParquetLogicalType_MAP: 'MAP', ParquetLogicalType_LIST: 'LIST', ParquetLogicalType_ENUM: 'ENUM', ParquetLogicalType_DECIMAL: 'DECIMAL', ParquetLogicalType_DATE: 'DATE', ParquetLogicalType_TIME: 'TIME', ParquetLogicalType_TIMESTAMP: 'TIMESTAMP', ParquetLogicalType_INT: 'INT', ParquetLogicalType_JSON: 'JSON', ParquetLogicalType_BSON: 'BSON', ParquetLogicalType_UUID: 'UUID', ParquetLogicalType_NONE: 'NONE', }.get(type_, 'UNKNOWN') cdef converted_type_name_from_enum(ParquetConvertedType type_): return { ParquetConvertedType_NONE: 'NONE', ParquetConvertedType_UTF8: 'UTF8', ParquetConvertedType_MAP: 'MAP', ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE', ParquetConvertedType_LIST: 'LIST', ParquetConvertedType_ENUM: 'ENUM', ParquetConvertedType_DECIMAL: 'DECIMAL', ParquetConvertedType_DATE: 'DATE', ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS', ParquetConvertedType_TIME_MICROS: 'TIME_MICROS', ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS', ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS', ParquetConvertedType_UINT_8: 'UINT_8', ParquetConvertedType_UINT_16: 'UINT_16', ParquetConvertedType_UINT_32: 'UINT_32', ParquetConvertedType_UINT_64: 'UINT_64', ParquetConvertedType_INT_8: 'INT_8', ParquetConvertedType_INT_16: 'INT_16', ParquetConvertedType_INT_32: 'INT_32', ParquetConvertedType_INT_64: 'INT_64', ParquetConvertedType_JSON: 'JSON', ParquetConvertedType_BSON: 'BSON', ParquetConvertedType_INTERVAL: 'INTERVAL', }.get(type_, 'UNKNOWN') cdef encoding_name_from_enum(ParquetEncoding encoding_): return { ParquetEncoding_PLAIN: 'PLAIN', ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY', ParquetEncoding_RLE: 'RLE', ParquetEncoding_BIT_PACKED: 'BIT_PACKED', ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED', ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY', ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY', ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY', ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT', }.get(encoding_, 'UNKNOWN') cdef compression_name_from_enum(ParquetCompression compression_): return { ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED', ParquetCompression_SNAPPY: 'SNAPPY', ParquetCompression_GZIP: 'GZIP', ParquetCompression_LZO: 'LZO', ParquetCompression_BROTLI: 'BROTLI', ParquetCompression_LZ4: 'LZ4', ParquetCompression_ZSTD: 'ZSTD', }.get(compression_, 'UNKNOWN') cdef int check_compression_name(name) except -1: if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD'}: raise ArrowException("Unsupported compression: " + name) return 0 cdef ParquetCompression compression_from_name(name): name = name.upper() if name == 'SNAPPY': return ParquetCompression_SNAPPY elif name == 'GZIP': return ParquetCompression_GZIP elif name == 'LZO': return ParquetCompression_LZO elif name == 'BROTLI': return ParquetCompression_BROTLI elif name == 'LZ4': return ParquetCompression_LZ4 elif name == 'ZSTD': return ParquetCompression_ZSTD else: return ParquetCompression_UNCOMPRESSED cdef class ParquetReader(_Weakrefable): cdef: object source CMemoryPool* pool unique_ptr[FileReader] reader FileMetaData _metadata cdef public: _column_idx_map def __cinit__(self, MemoryPool memory_pool=None): self.pool = maybe_unbox_memory_pool(memory_pool) self._metadata = None def open(self, object source not None, bint use_memory_map=True, read_dictionary=None, FileMetaData metadata=None, int buffer_size=0, bint pre_buffer=False, coerce_int96_timestamp_unit=None): cdef: shared_ptr[CRandomAccessFile] rd_handle shared_ptr[CFileMetaData] c_metadata CReaderProperties properties = default_reader_properties() ArrowReaderProperties arrow_props = ( default_arrow_reader_properties()) c_string path FileReaderBuilder builder TimeUnit int96_timestamp_unit_code if metadata is not None: c_metadata = metadata.sp_metadata if buffer_size > 0: properties.enable_buffered_stream() properties.set_buffer_size(buffer_size) elif buffer_size == 0: properties.disable_buffered_stream() else: raise ValueError('Buffer size must be larger than zero') arrow_props.set_pre_buffer(pre_buffer) if coerce_int96_timestamp_unit is None: # use the default defined in default_arrow_reader_properties() pass else: arrow_props.set_coerce_int96_timestamp_unit( string_to_timeunit(coerce_int96_timestamp_unit)) self.source = source get_reader(source, use_memory_map, &rd_handle) with nogil: check_status(builder.Open(rd_handle, properties, c_metadata)) # Set up metadata with nogil: c_metadata = builder.raw_reader().metadata() self._metadata = result = FileMetaData() result.init(c_metadata) if read_dictionary is not None: self._set_read_dictionary(read_dictionary, &arrow_props) with nogil: check_status(builder.memory_pool(self.pool) .properties(arrow_props) .Build(&self.reader)) cdef _set_read_dictionary(self, read_dictionary, ArrowReaderProperties* props): for column in read_dictionary: if not isinstance(column, int): column = self.column_name_idx(column) props.set_read_dictionary(column, True) @property def column_paths(self): cdef: FileMetaData container = self.metadata const CFileMetaData* metadata = container._metadata vector[c_string] path int i = 0 paths = [] for i in range(0, metadata.num_columns()): path = (metadata.schema().Column(i) .path().get().ToDotVector()) paths.append([frombytes(x) for x in path]) return paths @property def metadata(self): return self._metadata @property def schema_arrow(self): cdef shared_ptr[CSchema] out with nogil: check_status(self.reader.get().GetSchema(&out)) return pyarrow_wrap_schema(out) @property def num_row_groups(self): return self.reader.get().num_row_groups() def set_use_threads(self, bint use_threads): self.reader.get().set_use_threads(use_threads) def set_batch_size(self, int64_t batch_size): self.reader.get().set_batch_size(batch_size) def iter_batches(self, int64_t batch_size, row_groups, column_indices=None, bint use_threads=True): cdef: vector[int] c_row_groups vector[int] c_column_indices shared_ptr[CRecordBatch] record_batch shared_ptr[TableBatchReader] batch_reader unique_ptr[CRecordBatchReader] recordbatchreader self.set_batch_size(batch_size) if use_threads: self.set_use_threads(use_threads) for row_group in row_groups: c_row_groups.push_back(row_group) if column_indices is not None: for index in column_indices: c_column_indices.push_back(index) with nogil: check_status( self.reader.get().GetRecordBatchReader( c_row_groups, c_column_indices, &recordbatchreader ) ) else: with nogil: check_status( self.reader.get().GetRecordBatchReader( c_row_groups, &recordbatchreader ) ) while True: with nogil: check_status( recordbatchreader.get().ReadNext(&record_batch) ) if record_batch.get() == NULL: break yield pyarrow_wrap_batch(record_batch) def read_row_group(self, int i, column_indices=None, bint use_threads=True): return self.read_row_groups([i], column_indices, use_threads) def read_row_groups(self, row_groups not None, column_indices=None, bint use_threads=True): cdef: shared_ptr[CTable] ctable vector[int] c_row_groups vector[int] c_column_indices self.set_use_threads(use_threads) for row_group in row_groups: c_row_groups.push_back(row_group) if column_indices is not None: for index in column_indices: c_column_indices.push_back(index) with nogil: check_status(self.reader.get() .ReadRowGroups(c_row_groups, c_column_indices, &ctable)) else: # Read all columns with nogil: check_status(self.reader.get() .ReadRowGroups(c_row_groups, &ctable)) return pyarrow_wrap_table(ctable) def read_all(self, column_indices=None, bint use_threads=True): cdef: shared_ptr[CTable] ctable vector[int] c_column_indices self.set_use_threads(use_threads) if column_indices is not None: for index in column_indices: c_column_indices.push_back(index) with nogil: check_status(self.reader.get() .ReadTable(c_column_indices, &ctable)) else: # Read all columns with nogil: check_status(self.reader.get() .ReadTable(&ctable)) return pyarrow_wrap_table(ctable) def scan_contents(self, column_indices=None, batch_size=65536): cdef: vector[int] c_column_indices int32_t c_batch_size int64_t c_num_rows if column_indices is not None: for index in column_indices: c_column_indices.push_back(index) c_batch_size = batch_size with nogil: check_status(self.reader.get() .ScanContents(c_column_indices, c_batch_size, &c_num_rows)) return c_num_rows def column_name_idx(self, column_name): """ Find the matching index of a column in the schema. Parameter --------- column_name: str Name of the column, separation of nesting levels is done via ".". Returns ------- column_idx: int Integer index of the position of the column """ cdef: FileMetaData container = self.metadata const CFileMetaData* metadata = container._metadata int i = 0 if self._column_idx_map is None: self._column_idx_map = {} for i in range(0, metadata.num_columns()): col_bytes = tobytes(metadata.schema().Column(i) .path().get().ToDotString()) self._column_idx_map[col_bytes] = i return self._column_idx_map[tobytes(column_name)] def read_column(self, int column_index): cdef shared_ptr[CChunkedArray] out with nogil: check_status(self.reader.get() .ReadColumn(column_index, &out)) return pyarrow_wrap_chunked_array(out) def read_schema_field(self, int field_index): cdef shared_ptr[CChunkedArray] out with nogil: check_status(self.reader.get() .ReadSchemaField(field_index, &out)) return pyarrow_wrap_chunked_array(out) cdef shared_ptr[WriterProperties] _create_writer_properties( use_dictionary=None, compression=None, version=None, write_statistics=None, data_page_size=None, compression_level=None, use_byte_stream_split=False, data_page_version=None) except *: """General writer properties""" cdef: shared_ptr[WriterProperties] properties WriterProperties.Builder props # data_page_version if data_page_version is not None: if data_page_version == "1.0": props.data_page_version(ParquetDataPageVersion_V1) elif data_page_version == "2.0": props.data_page_version(ParquetDataPageVersion_V2) else: raise ValueError("Unsupported Parquet data page version: {0}" .format(data_page_version)) # version if version is not None: if version == "1.0": props.version(ParquetVersion_V1) elif version in ("2.0", "pseudo-2.0"): warnings.warn( "Parquet format '2.0' pseudo version is deprecated, use " "'2.4' or '2.6' for fine-grained feature selection", FutureWarning, stacklevel=2) props.version(ParquetVersion_V2_0) elif version == "2.4": props.version(ParquetVersion_V2_4) elif version == "2.6": props.version(ParquetVersion_V2_6) else: raise ValueError("Unsupported Parquet format version: {0}" .format(version)) # compression if isinstance(compression, basestring): check_compression_name(compression) props.compression(compression_from_name(compression)) elif compression is not None: for column, codec in compression.iteritems(): check_compression_name(codec) props.compression(tobytes(column), compression_from_name(codec)) if isinstance(compression_level, int): props.compression_level(compression_level) elif compression_level is not None: for column, level in compression_level.iteritems(): props.compression_level(tobytes(column), level) # use_dictionary if isinstance(use_dictionary, bool): if use_dictionary: props.enable_dictionary() else: props.disable_dictionary() elif use_dictionary is not None: # Deactivate dictionary encoding by default props.disable_dictionary() for column in use_dictionary: props.enable_dictionary(tobytes(column)) # write_statistics if isinstance(write_statistics, bool): if write_statistics: props.enable_statistics() else: props.disable_statistics() elif write_statistics is not None: # Deactivate statistics by default and enable for specified columns props.disable_statistics() for column in write_statistics: props.enable_statistics(tobytes(column)) # use_byte_stream_split if isinstance(use_byte_stream_split, bool): if use_byte_stream_split: props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT) elif use_byte_stream_split is not None: for column in use_byte_stream_split: props.encoding(tobytes(column), ParquetEncoding_BYTE_STREAM_SPLIT) if data_page_size is not None: props.data_pagesize(data_page_size) properties = props.build() return properties cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( use_deprecated_int96_timestamps=False, coerce_timestamps=None, allow_truncated_timestamps=False, writer_engine_version=None, use_compliant_nested_type=False) except *: """Arrow writer properties""" cdef: shared_ptr[ArrowWriterProperties] arrow_properties ArrowWriterProperties.Builder arrow_props # Store the original Arrow schema so things like dictionary types can # be automatically reconstructed arrow_props.store_schema() # int96 support if use_deprecated_int96_timestamps: arrow_props.enable_deprecated_int96_timestamps() else: arrow_props.disable_deprecated_int96_timestamps() # coerce_timestamps if coerce_timestamps == 'ms': arrow_props.coerce_timestamps(TimeUnit_MILLI) elif coerce_timestamps == 'us': arrow_props.coerce_timestamps(TimeUnit_MICRO) elif coerce_timestamps is not None: raise ValueError('Invalid value for coerce_timestamps: {0}' .format(coerce_timestamps)) # allow_truncated_timestamps if allow_truncated_timestamps: arrow_props.allow_truncated_timestamps() else: arrow_props.disallow_truncated_timestamps() # use_compliant_nested_type if use_compliant_nested_type: arrow_props.enable_compliant_nested_types() else: arrow_props.disable_compliant_nested_types() # writer_engine_version if writer_engine_version == "V1": warnings.warn("V1 parquet writer engine is a no-op. Use V2.") arrow_props.set_engine_version(ArrowWriterEngineVersion.V1) elif writer_engine_version != "V2": raise ValueError("Unsupported Writer Engine Version: {0}" .format(writer_engine_version)) arrow_properties = arrow_props.build() return arrow_properties cdef class ParquetWriter(_Weakrefable): cdef: unique_ptr[FileWriter] writer shared_ptr[COutputStream] sink bint own_sink cdef readonly: object use_dictionary object use_deprecated_int96_timestamps object use_byte_stream_split object coerce_timestamps object allow_truncated_timestamps object compression object compression_level object data_page_version object use_compliant_nested_type object version object write_statistics object writer_engine_version int row_group_size int64_t data_page_size def __cinit__(self, where, Schema schema, use_dictionary=None, compression=None, version=None, write_statistics=None, MemoryPool memory_pool=None, use_deprecated_int96_timestamps=False, coerce_timestamps=None, data_page_size=None, allow_truncated_timestamps=False, compression_level=None, use_byte_stream_split=False, writer_engine_version=None, data_page_version=None, use_compliant_nested_type=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties c_string c_where CMemoryPool* pool try: where = _stringify_path(where) except TypeError: get_writer(where, &self.sink) self.own_sink = False else: c_where = tobytes(where) with nogil: self.sink = GetResultValue(FileOutputStream.Open(c_where)) self.own_sink = True properties = _create_writer_properties( use_dictionary=use_dictionary, compression=compression, version=version, write_statistics=write_statistics, data_page_size=data_page_size, compression_level=compression_level, use_byte_stream_split=use_byte_stream_split, data_page_version=data_page_version ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, coerce_timestamps=coerce_timestamps, allow_truncated_timestamps=allow_truncated_timestamps, writer_engine_version=writer_engine_version, use_compliant_nested_type=use_compliant_nested_type ) pool = maybe_unbox_memory_pool(memory_pool) with nogil: check_status( FileWriter.Open(deref(schema.schema), pool, self.sink, properties, arrow_properties, &self.writer)) def close(self): with nogil: check_status(self.writer.get().Close()) if self.own_sink: check_status(self.sink.get().Close()) def write_table(self, Table table, row_group_size=None): cdef: CTable* ctable = table.table int64_t c_row_group_size if row_group_size is None or row_group_size == -1: c_row_group_size = ctable.num_rows() elif row_group_size == 0: raise ValueError('Row group size cannot be 0') else: c_row_group_size = row_group_size with nogil: check_status(self.writer.get() .WriteTable(deref(ctable), c_row_group_size)) @property def metadata(self): cdef: shared_ptr[CFileMetaData] metadata FileMetaData result with nogil: metadata = self.writer.get().metadata() if metadata: result = FileMetaData() result.init(metadata) return result raise RuntimeError( 'file metadata is only available after writer close')