summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/_csv.pyx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_csv.pyx
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_csv.pyx')
-rw-r--r--src/arrow/python/pyarrow/_csv.pyx1077
1 files changed, 1077 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_csv.pyx b/src/arrow/python/pyarrow/_csv.pyx
new file mode 100644
index 000000000..19ade4324
--- /dev/null
+++ b/src/arrow/python/pyarrow/_csv.pyx
@@ -0,0 +1,1077 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: language_level = 3
+
+from cython.operator cimport dereference as deref
+
+import codecs
+from collections.abc import Mapping
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
+ RecordBatchReader, ensure_type,
+ maybe_unbox_memory_pool, get_input_stream,
+ get_writer, native_transcoding_input_stream,
+ pyarrow_unwrap_batch, pyarrow_unwrap_schema,
+ pyarrow_unwrap_table, pyarrow_wrap_schema,
+ pyarrow_wrap_table, pyarrow_wrap_data_type,
+ pyarrow_unwrap_data_type, Table, RecordBatch,
+ StopToken, _CRecordBatchWriter)
+from pyarrow.lib import frombytes, tobytes, SignalStopHandler
+from pyarrow.util import _stringify_path
+
+
+cdef unsigned char _single_char(s) except 0:
+ val = ord(s)
+ if val == 0 or val > 127:
+ raise ValueError("Expecting an ASCII character")
+ return <unsigned char> val
+
+
+cdef class ReadOptions(_Weakrefable):
+ """
+ Options for reading CSV files.
+
+ Parameters
+ ----------
+ use_threads : bool, optional (default True)
+ Whether to use multiple threads to accelerate reading
+ block_size : int, optional
+ How much bytes to process at a time from the input stream.
+ This will determine multi-threading granularity as well as
+ the size of individual record batches or table chunks.
+ Minimum valid value for block size is 1
+ skip_rows : int, optional (default 0)
+ The number of rows to skip before the column names (if any)
+ and the CSV data.
+ skip_rows_after_names : int, optional (default 0)
+ The number of rows to skip after the column names.
+ This number can be larger than the number of rows in one
+ block, and empty rows are counted.
+ The order of application is as follows:
+ - `skip_rows` is applied (if non-zero);
+ - column names aread (unless `column_names` is set);
+ - `skip_rows_after_names` is applied (if non-zero).
+ column_names : list, optional
+ The column names of the target table. If empty, fall back on
+ `autogenerate_column_names`.
+ autogenerate_column_names : bool, optional (default False)
+ Whether to autogenerate column names if `column_names` is empty.
+ If true, column names will be of the form "f0", "f1"...
+ If false, column names will be read from the first CSV row
+ after `skip_rows`.
+ encoding : str, optional (default 'utf8')
+ The character encoding of the CSV data. Columns that cannot
+ decode using this encoding can still be read as Binary.
+ """
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ # __init__() is not called when unpickling, initialize storage here
+ def __cinit__(self, *argw, **kwargs):
+ self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults()))
+
+ def __init__(self, *, use_threads=None, block_size=None, skip_rows=None,
+ column_names=None, autogenerate_column_names=None,
+ encoding='utf8', skip_rows_after_names=None):
+ if use_threads is not None:
+ self.use_threads = use_threads
+ if block_size is not None:
+ self.block_size = block_size
+ if skip_rows is not None:
+ self.skip_rows = skip_rows
+ if column_names is not None:
+ self.column_names = column_names
+ if autogenerate_column_names is not None:
+ self.autogenerate_column_names= autogenerate_column_names
+ # Python-specific option
+ self.encoding = encoding
+ if skip_rows_after_names is not None:
+ self.skip_rows_after_names = skip_rows_after_names
+
+ @property
+ def use_threads(self):
+ """
+ Whether to use multiple threads to accelerate reading.
+ """
+ return deref(self.options).use_threads
+
+ @use_threads.setter
+ def use_threads(self, value):
+ deref(self.options).use_threads = value
+
+ @property
+ def block_size(self):
+ """
+ How much bytes to process at a time from the input stream.
+ This will determine multi-threading granularity as well as
+ the size of individual record batches or table chunks.
+ """
+ return deref(self.options).block_size
+
+ @block_size.setter
+ def block_size(self, value):
+ deref(self.options).block_size = value
+
+ @property
+ def skip_rows(self):
+ """
+ The number of rows to skip before the column names (if any)
+ and the CSV data.
+ See `skip_rows_after_names` for interaction description
+ """
+ return deref(self.options).skip_rows
+
+ @skip_rows.setter
+ def skip_rows(self, value):
+ deref(self.options).skip_rows = value
+
+ @property
+ def column_names(self):
+ """
+ The column names of the target table. If empty, fall back on
+ `autogenerate_column_names`.
+ """
+ return [frombytes(s) for s in deref(self.options).column_names]
+
+ @column_names.setter
+ def column_names(self, value):
+ deref(self.options).column_names.clear()
+ for item in value:
+ deref(self.options).column_names.push_back(tobytes(item))
+
+ @property
+ def autogenerate_column_names(self):
+ """
+ Whether to autogenerate column names if `column_names` is empty.
+ If true, column names will be of the form "f0", "f1"...
+ If false, column names will be read from the first CSV row
+ after `skip_rows`.
+ """
+ return deref(self.options).autogenerate_column_names
+
+ @autogenerate_column_names.setter
+ def autogenerate_column_names(self, value):
+ deref(self.options).autogenerate_column_names = value
+
+ @property
+ def skip_rows_after_names(self):
+ """
+ The number of rows to skip after the column names.
+ This number can be larger than the number of rows in one
+ block, and empty rows are counted.
+ The order of application is as follows:
+ - `skip_rows` is applied (if non-zero);
+ - column names aread (unless `column_names` is set);
+ - `skip_rows_after_names` is applied (if non-zero).
+ """
+ return deref(self.options).skip_rows_after_names
+
+ @skip_rows_after_names.setter
+ def skip_rows_after_names(self, value):
+ deref(self.options).skip_rows_after_names = value
+
+ def validate(self):
+ check_status(deref(self.options).Validate())
+
+ def equals(self, ReadOptions other):
+ return (
+ self.use_threads == other.use_threads and
+ self.block_size == other.block_size and
+ self.skip_rows == other.skip_rows and
+ self.column_names == other.column_names and
+ self.autogenerate_column_names ==
+ other.autogenerate_column_names and
+ self.encoding == other.encoding and
+ self.skip_rows_after_names == other.skip_rows_after_names
+ )
+
+ @staticmethod
+ cdef ReadOptions wrap(CCSVReadOptions options):
+ out = ReadOptions()
+ out.options.reset(new CCSVReadOptions(move(options)))
+ out.encoding = 'utf8' # No way to know this
+ return out
+
+ def __getstate__(self):
+ return (self.use_threads, self.block_size, self.skip_rows,
+ self.column_names, self.autogenerate_column_names,
+ self.encoding, self.skip_rows_after_names)
+
+ def __setstate__(self, state):
+ (self.use_threads, self.block_size, self.skip_rows,
+ self.column_names, self.autogenerate_column_names,
+ self.encoding, self.skip_rows_after_names) = state
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+
+cdef class ParseOptions(_Weakrefable):
+ """
+ Options for parsing CSV files.
+
+ Parameters
+ ----------
+ delimiter : 1-character string, optional (default ',')
+ The character delimiting individual cells in the CSV data.
+ quote_char : 1-character string or False, optional (default '"')
+ The character used optionally for quoting CSV values
+ (False if quoting is not allowed).
+ double_quote : bool, optional (default True)
+ Whether two quotes in a quoted CSV value denote a single quote
+ in the data.
+ escape_char : 1-character string or False, optional (default False)
+ The character used optionally for escaping special characters
+ (False if escaping is not allowed).
+ newlines_in_values : bool, optional (default False)
+ Whether newline characters are allowed in CSV values.
+ Setting this to True reduces the performance of multi-threaded
+ CSV reading.
+ ignore_empty_lines : bool, optional (default True)
+ Whether empty lines are ignored in CSV input.
+ If False, an empty line is interpreted as containing a single empty
+ value (assuming a one-column CSV file).
+ """
+ __slots__ = ()
+
+ def __cinit__(self, *argw, **kwargs):
+ self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults()))
+
+ def __init__(self, *, delimiter=None, quote_char=None, double_quote=None,
+ escape_char=None, newlines_in_values=None,
+ ignore_empty_lines=None):
+ if delimiter is not None:
+ self.delimiter = delimiter
+ if quote_char is not None:
+ self.quote_char = quote_char
+ if double_quote is not None:
+ self.double_quote = double_quote
+ if escape_char is not None:
+ self.escape_char = escape_char
+ if newlines_in_values is not None:
+ self.newlines_in_values = newlines_in_values
+ if ignore_empty_lines is not None:
+ self.ignore_empty_lines = ignore_empty_lines
+
+ @property
+ def delimiter(self):
+ """
+ The character delimiting individual cells in the CSV data.
+ """
+ return chr(deref(self.options).delimiter)
+
+ @delimiter.setter
+ def delimiter(self, value):
+ deref(self.options).delimiter = _single_char(value)
+
+ @property
+ def quote_char(self):
+ """
+ The character used optionally for quoting CSV values
+ (False if quoting is not allowed).
+ """
+ if deref(self.options).quoting:
+ return chr(deref(self.options).quote_char)
+ else:
+ return False
+
+ @quote_char.setter
+ def quote_char(self, value):
+ if value is False:
+ deref(self.options).quoting = False
+ else:
+ deref(self.options).quote_char = _single_char(value)
+ deref(self.options).quoting = True
+
+ @property
+ def double_quote(self):
+ """
+ Whether two quotes in a quoted CSV value denote a single quote
+ in the data.
+ """
+ return deref(self.options).double_quote
+
+ @double_quote.setter
+ def double_quote(self, value):
+ deref(self.options).double_quote = value
+
+ @property
+ def escape_char(self):
+ """
+ The character used optionally for escaping special characters
+ (False if escaping is not allowed).
+ """
+ if deref(self.options).escaping:
+ return chr(deref(self.options).escape_char)
+ else:
+ return False
+
+ @escape_char.setter
+ def escape_char(self, value):
+ if value is False:
+ deref(self.options).escaping = False
+ else:
+ deref(self.options).escape_char = _single_char(value)
+ deref(self.options).escaping = True
+
+ @property
+ def newlines_in_values(self):
+ """
+ Whether newline characters are allowed in CSV values.
+ Setting this to True reduces the performance of multi-threaded
+ CSV reading.
+ """
+ return deref(self.options).newlines_in_values
+
+ @newlines_in_values.setter
+ def newlines_in_values(self, value):
+ deref(self.options).newlines_in_values = value
+
+ @property
+ def ignore_empty_lines(self):
+ """
+ Whether empty lines are ignored in CSV input.
+ If False, an empty line is interpreted as containing a single empty
+ value (assuming a one-column CSV file).
+ """
+ return deref(self.options).ignore_empty_lines
+
+ @ignore_empty_lines.setter
+ def ignore_empty_lines(self, value):
+ deref(self.options).ignore_empty_lines = value
+
+ def validate(self):
+ check_status(deref(self.options).Validate())
+
+ def equals(self, ParseOptions other):
+ return (
+ self.delimiter == other.delimiter and
+ self.quote_char == other.quote_char and
+ self.double_quote == other.double_quote and
+ self.escape_char == other.escape_char and
+ self.newlines_in_values == other.newlines_in_values and
+ self.ignore_empty_lines == other.ignore_empty_lines
+ )
+
+ @staticmethod
+ cdef ParseOptions wrap(CCSVParseOptions options):
+ out = ParseOptions()
+ out.options.reset(new CCSVParseOptions(move(options)))
+ return out
+
+ def __getstate__(self):
+ return (self.delimiter, self.quote_char, self.double_quote,
+ self.escape_char, self.newlines_in_values,
+ self.ignore_empty_lines)
+
+ def __setstate__(self, state):
+ (self.delimiter, self.quote_char, self.double_quote,
+ self.escape_char, self.newlines_in_values,
+ self.ignore_empty_lines) = state
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+
+cdef class _ISO8601(_Weakrefable):
+ """
+ A special object indicating ISO-8601 parsing.
+ """
+ __slots__ = ()
+
+ def __str__(self):
+ return 'ISO8601'
+
+ def __eq__(self, other):
+ return isinstance(other, _ISO8601)
+
+
+ISO8601 = _ISO8601()
+
+
+cdef class ConvertOptions(_Weakrefable):
+ """
+ Options for converting CSV data.
+
+ Parameters
+ ----------
+ check_utf8 : bool, optional (default True)
+ Whether to check UTF8 validity of string columns.
+ column_types : pa.Schema or dict, optional
+ Explicitly map column names to column types. Passing this argument
+ disables type inference on the defined columns.
+ null_values : list, optional
+ A sequence of strings that denote nulls in the data
+ (defaults are appropriate in most cases). Note that by default,
+ string columns are not checked for null values. To enable
+ null checking for those, specify ``strings_can_be_null=True``.
+ true_values : list, optional
+ A sequence of strings that denote true booleans in the data
+ (defaults are appropriate in most cases).
+ false_values : list, optional
+ A sequence of strings that denote false booleans in the data
+ (defaults are appropriate in most cases).
+ decimal_point : 1-character string, optional (default '.')
+ The character used as decimal point in floating-point and decimal
+ data.
+ timestamp_parsers : list, optional
+ A sequence of strptime()-compatible format strings, tried in order
+ when attempting to infer or convert timestamp values (the special
+ value ISO8601() can also be given). By default, a fast built-in
+ ISO-8601 parser is used.
+ strings_can_be_null : bool, optional (default False)
+ Whether string / binary columns can have null values.
+ If true, then strings in null_values are considered null for
+ string columns.
+ If false, then all strings are valid string values.
+ quoted_strings_can_be_null : bool, optional (default True)
+ Whether quoted values can be null.
+ If true, then strings in "null_values" are also considered null
+ when they appear quoted in the CSV file. Otherwise, quoted values
+ are never considered null.
+ auto_dict_encode : bool, optional (default False)
+ Whether to try to automatically dict-encode string / binary data.
+ If true, then when type inference detects a string or binary column,
+ it it dict-encoded up to `auto_dict_max_cardinality` distinct values
+ (per chunk), after which it switches to regular encoding.
+ This setting is ignored for non-inferred columns (those in
+ `column_types`).
+ auto_dict_max_cardinality : int, optional
+ The maximum dictionary cardinality for `auto_dict_encode`.
+ This value is per chunk.
+ include_columns : list, optional
+ The names of columns to include in the Table.
+ If empty, the Table will include all columns from the CSV file.
+ If not empty, only these columns will be included, in this order.
+ include_missing_columns : bool, optional (default False)
+ If false, columns in `include_columns` but not in the CSV file will
+ error out.
+ If true, columns in `include_columns` but not in the CSV file will
+ produce a column of nulls (whose type is selected using
+ `column_types`, or null by default).
+ This option is ignored if `include_columns` is empty.
+ """
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __cinit__(self, *argw, **kwargs):
+ self.options.reset(
+ new CCSVConvertOptions(CCSVConvertOptions.Defaults()))
+
+ def __init__(self, *, check_utf8=None, column_types=None, null_values=None,
+ true_values=None, false_values=None, decimal_point=None,
+ strings_can_be_null=None, quoted_strings_can_be_null=None,
+ include_columns=None, include_missing_columns=None,
+ auto_dict_encode=None, auto_dict_max_cardinality=None,
+ timestamp_parsers=None):
+ if check_utf8 is not None:
+ self.check_utf8 = check_utf8
+ if column_types is not None:
+ self.column_types = column_types
+ if null_values is not None:
+ self.null_values = null_values
+ if true_values is not None:
+ self.true_values = true_values
+ if false_values is not None:
+ self.false_values = false_values
+ if decimal_point is not None:
+ self.decimal_point = decimal_point
+ if strings_can_be_null is not None:
+ self.strings_can_be_null = strings_can_be_null
+ if quoted_strings_can_be_null is not None:
+ self.quoted_strings_can_be_null = quoted_strings_can_be_null
+ if include_columns is not None:
+ self.include_columns = include_columns
+ if include_missing_columns is not None:
+ self.include_missing_columns = include_missing_columns
+ if auto_dict_encode is not None:
+ self.auto_dict_encode = auto_dict_encode
+ if auto_dict_max_cardinality is not None:
+ self.auto_dict_max_cardinality = auto_dict_max_cardinality
+ if timestamp_parsers is not None:
+ self.timestamp_parsers = timestamp_parsers
+
+ @property
+ def check_utf8(self):
+ """
+ Whether to check UTF8 validity of string columns.
+ """
+ return deref(self.options).check_utf8
+
+ @check_utf8.setter
+ def check_utf8(self, value):
+ deref(self.options).check_utf8 = value
+
+ @property
+ def strings_can_be_null(self):
+ """
+ Whether string / binary columns can have null values.
+ """
+ return deref(self.options).strings_can_be_null
+
+ @strings_can_be_null.setter
+ def strings_can_be_null(self, value):
+ deref(self.options).strings_can_be_null = value
+
+ @property
+ def quoted_strings_can_be_null(self):
+ """
+ Whether quoted values can be null.
+ """
+ return deref(self.options).quoted_strings_can_be_null
+
+ @quoted_strings_can_be_null.setter
+ def quoted_strings_can_be_null(self, value):
+ deref(self.options).quoted_strings_can_be_null = value
+
+ @property
+ def column_types(self):
+ """
+ Explicitly map column names to column types.
+ """
+ d = {frombytes(item.first): pyarrow_wrap_data_type(item.second)
+ for item in deref(self.options).column_types}
+ return d
+
+ @column_types.setter
+ def column_types(self, value):
+ cdef:
+ shared_ptr[CDataType] typ
+
+ if isinstance(value, Mapping):
+ value = value.items()
+
+ deref(self.options).column_types.clear()
+ for item in value:
+ if isinstance(item, Field):
+ k = item.name
+ v = item.type
+ else:
+ k, v = item
+ typ = pyarrow_unwrap_data_type(ensure_type(v))
+ assert typ != NULL
+ deref(self.options).column_types[tobytes(k)] = typ
+
+ @property
+ def null_values(self):
+ """
+ A sequence of strings that denote nulls in the data.
+ """
+ return [frombytes(x) for x in deref(self.options).null_values]
+
+ @null_values.setter
+ def null_values(self, value):
+ deref(self.options).null_values = [tobytes(x) for x in value]
+
+ @property
+ def true_values(self):
+ """
+ A sequence of strings that denote true booleans in the data.
+ """
+ return [frombytes(x) for x in deref(self.options).true_values]
+
+ @true_values.setter
+ def true_values(self, value):
+ deref(self.options).true_values = [tobytes(x) for x in value]
+
+ @property
+ def false_values(self):
+ """
+ A sequence of strings that denote false booleans in the data.
+ """
+ return [frombytes(x) for x in deref(self.options).false_values]
+
+ @false_values.setter
+ def false_values(self, value):
+ deref(self.options).false_values = [tobytes(x) for x in value]
+
+ @property
+ def decimal_point(self):
+ """
+ The character used as decimal point in floating-point and decimal
+ data.
+ """
+ return chr(deref(self.options).decimal_point)
+
+ @decimal_point.setter
+ def decimal_point(self, value):
+ deref(self.options).decimal_point = _single_char(value)
+
+ @property
+ def auto_dict_encode(self):
+ """
+ Whether to try to automatically dict-encode string / binary data.
+ """
+ return deref(self.options).auto_dict_encode
+
+ @auto_dict_encode.setter
+ def auto_dict_encode(self, value):
+ deref(self.options).auto_dict_encode = value
+
+ @property
+ def auto_dict_max_cardinality(self):
+ """
+ The maximum dictionary cardinality for `auto_dict_encode`.
+
+ This value is per chunk.
+ """
+ return deref(self.options).auto_dict_max_cardinality
+
+ @auto_dict_max_cardinality.setter
+ def auto_dict_max_cardinality(self, value):
+ deref(self.options).auto_dict_max_cardinality = value
+
+ @property
+ def include_columns(self):
+ """
+ The names of columns to include in the Table.
+
+ If empty, the Table will include all columns from the CSV file.
+ If not empty, only these columns will be included, in this order.
+ """
+ return [frombytes(s) for s in deref(self.options).include_columns]
+
+ @include_columns.setter
+ def include_columns(self, value):
+ deref(self.options).include_columns.clear()
+ for item in value:
+ deref(self.options).include_columns.push_back(tobytes(item))
+
+ @property
+ def include_missing_columns(self):
+ """
+ If false, columns in `include_columns` but not in the CSV file will
+ error out.
+ If true, columns in `include_columns` but not in the CSV file will
+ produce a null column (whose type is selected using `column_types`,
+ or null by default).
+ This option is ignored if `include_columns` is empty.
+ """
+ return deref(self.options).include_missing_columns
+
+ @include_missing_columns.setter
+ def include_missing_columns(self, value):
+ deref(self.options).include_missing_columns = value
+
+ @property
+ def timestamp_parsers(self):
+ """
+ A sequence of strptime()-compatible format strings, tried in order
+ when attempting to infer or convert timestamp values (the special
+ value ISO8601() can also be given). By default, a fast built-in
+ ISO-8601 parser is used.
+ """
+ cdef:
+ shared_ptr[CTimestampParser] c_parser
+ c_string kind
+
+ parsers = []
+ for c_parser in deref(self.options).timestamp_parsers:
+ kind = deref(c_parser).kind()
+ if kind == b'strptime':
+ parsers.append(frombytes(deref(c_parser).format()))
+ else:
+ assert kind == b'iso8601'
+ parsers.append(ISO8601)
+
+ return parsers
+
+ @timestamp_parsers.setter
+ def timestamp_parsers(self, value):
+ cdef:
+ vector[shared_ptr[CTimestampParser]] c_parsers
+
+ for v in value:
+ if isinstance(v, str):
+ c_parsers.push_back(CTimestampParser.MakeStrptime(tobytes(v)))
+ elif v == ISO8601:
+ c_parsers.push_back(CTimestampParser.MakeISO8601())
+ else:
+ raise TypeError("Expected list of str or ISO8601 objects")
+
+ deref(self.options).timestamp_parsers = move(c_parsers)
+
+ @staticmethod
+ cdef ConvertOptions wrap(CCSVConvertOptions options):
+ out = ConvertOptions()
+ out.options.reset(new CCSVConvertOptions(move(options)))
+ return out
+
+ def validate(self):
+ check_status(deref(self.options).Validate())
+
+ def equals(self, ConvertOptions other):
+ return (
+ self.check_utf8 == other.check_utf8 and
+ self.column_types == other.column_types and
+ self.null_values == other.null_values and
+ self.true_values == other.true_values and
+ self.false_values == other.false_values and
+ self.decimal_point == other.decimal_point and
+ self.timestamp_parsers == other.timestamp_parsers and
+ self.strings_can_be_null == other.strings_can_be_null and
+ self.quoted_strings_can_be_null ==
+ other.quoted_strings_can_be_null and
+ self.auto_dict_encode == other.auto_dict_encode and
+ self.auto_dict_max_cardinality ==
+ other.auto_dict_max_cardinality and
+ self.include_columns == other.include_columns and
+ self.include_missing_columns == other.include_missing_columns
+ )
+
+ def __getstate__(self):
+ return (self.check_utf8, self.column_types, self.null_values,
+ self.true_values, self.false_values, self.decimal_point,
+ self.timestamp_parsers, self.strings_can_be_null,
+ self.quoted_strings_can_be_null, self.auto_dict_encode,
+ self.auto_dict_max_cardinality, self.include_columns,
+ self.include_missing_columns)
+
+ def __setstate__(self, state):
+ (self.check_utf8, self.column_types, self.null_values,
+ self.true_values, self.false_values, self.decimal_point,
+ self.timestamp_parsers, self.strings_can_be_null,
+ self.quoted_strings_can_be_null, self.auto_dict_encode,
+ self.auto_dict_max_cardinality, self.include_columns,
+ self.include_missing_columns) = state
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+
+cdef _get_reader(input_file, ReadOptions read_options,
+ shared_ptr[CInputStream]* out):
+ use_memory_map = False
+ get_input_stream(input_file, use_memory_map, out)
+ if read_options is not None:
+ out[0] = native_transcoding_input_stream(out[0],
+ read_options.encoding,
+ 'utf8')
+
+
+cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out):
+ if read_options is None:
+ out[0] = CCSVReadOptions.Defaults()
+ else:
+ out[0] = deref(read_options.options)
+
+
+cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out):
+ if parse_options is None:
+ out[0] = CCSVParseOptions.Defaults()
+ else:
+ out[0] = deref(parse_options.options)
+
+
+cdef _get_convert_options(ConvertOptions convert_options,
+ CCSVConvertOptions* out):
+ if convert_options is None:
+ out[0] = CCSVConvertOptions.Defaults()
+ else:
+ out[0] = deref(convert_options.options)
+
+
+cdef class CSVStreamingReader(RecordBatchReader):
+ """An object that reads record batches incrementally from a CSV file.
+
+ Should not be instantiated directly by user code.
+ """
+ cdef readonly:
+ Schema schema
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly, "
+ "use pyarrow.csv.open_csv() instead."
+ .format(self.__class__.__name__))
+
+ # Note about cancellation: we cannot create a SignalStopHandler
+ # by default here, as several CSVStreamingReader instances may be
+ # created (including by the same thread). Handling cancellation
+ # would require having the user pass the SignalStopHandler.
+ # (in addition to solving ARROW-11853)
+
+ cdef _open(self, shared_ptr[CInputStream] stream,
+ CCSVReadOptions c_read_options,
+ CCSVParseOptions c_parse_options,
+ CCSVConvertOptions c_convert_options,
+ MemoryPool memory_pool):
+ cdef:
+ shared_ptr[CSchema] c_schema
+ CIOContext io_context
+
+ io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))
+
+ with nogil:
+ self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
+ CCSVStreamingReader.Make(
+ io_context, stream,
+ move(c_read_options), move(c_parse_options),
+ move(c_convert_options)))
+ c_schema = self.reader.get().schema()
+
+ self.schema = pyarrow_wrap_schema(c_schema)
+
+
+def read_csv(input_file, read_options=None, parse_options=None,
+ convert_options=None, MemoryPool memory_pool=None):
+ """
+ Read a Table from a stream of CSV data.
+
+ Parameters
+ ----------
+ input_file : string, path or file-like object
+ The location of CSV data. If a string or path, and if it ends
+ with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
+ the data is automatically decompressed when reading.
+ read_options : pyarrow.csv.ReadOptions, optional
+ Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
+ for defaults)
+ parse_options : pyarrow.csv.ParseOptions, optional
+ Options for the CSV parser
+ (see pyarrow.csv.ParseOptions constructor for defaults)
+ convert_options : pyarrow.csv.ConvertOptions, optional
+ Options for converting CSV data
+ (see pyarrow.csv.ConvertOptions constructor for defaults)
+ memory_pool : MemoryPool, optional
+ Pool to allocate Table memory from
+
+ Returns
+ -------
+ :class:`pyarrow.Table`
+ Contents of the CSV file as a in-memory table.
+ """
+ cdef:
+ shared_ptr[CInputStream] stream
+ CCSVReadOptions c_read_options
+ CCSVParseOptions c_parse_options
+ CCSVConvertOptions c_convert_options
+ CIOContext io_context
+ shared_ptr[CCSVReader] reader
+ shared_ptr[CTable] table
+
+ _get_reader(input_file, read_options, &stream)
+ _get_read_options(read_options, &c_read_options)
+ _get_parse_options(parse_options, &c_parse_options)
+ _get_convert_options(convert_options, &c_convert_options)
+
+ with SignalStopHandler() as stop_handler:
+ io_context = CIOContext(
+ maybe_unbox_memory_pool(memory_pool),
+ (<StopToken> stop_handler.stop_token).stop_token)
+ reader = GetResultValue(CCSVReader.Make(
+ io_context, stream,
+ c_read_options, c_parse_options, c_convert_options))
+
+ with nogil:
+ table = GetResultValue(reader.get().Read())
+
+ return pyarrow_wrap_table(table)
+
+
+def open_csv(input_file, read_options=None, parse_options=None,
+ convert_options=None, MemoryPool memory_pool=None):
+ """
+ Open a streaming reader of CSV data.
+
+ Reading using this function is always single-threaded.
+
+ Parameters
+ ----------
+ input_file : string, path or file-like object
+ The location of CSV data. If a string or path, and if it ends
+ with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
+ the data is automatically decompressed when reading.
+ read_options : pyarrow.csv.ReadOptions, optional
+ Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
+ for defaults)
+ parse_options : pyarrow.csv.ParseOptions, optional
+ Options for the CSV parser
+ (see pyarrow.csv.ParseOptions constructor for defaults)
+ convert_options : pyarrow.csv.ConvertOptions, optional
+ Options for converting CSV data
+ (see pyarrow.csv.ConvertOptions constructor for defaults)
+ memory_pool : MemoryPool, optional
+ Pool to allocate Table memory from
+
+ Returns
+ -------
+ :class:`pyarrow.csv.CSVStreamingReader`
+ """
+ cdef:
+ shared_ptr[CInputStream] stream
+ CCSVReadOptions c_read_options
+ CCSVParseOptions c_parse_options
+ CCSVConvertOptions c_convert_options
+ CSVStreamingReader reader
+
+ _get_reader(input_file, read_options, &stream)
+ _get_read_options(read_options, &c_read_options)
+ _get_parse_options(parse_options, &c_parse_options)
+ _get_convert_options(convert_options, &c_convert_options)
+
+ reader = CSVStreamingReader.__new__(CSVStreamingReader)
+ reader._open(stream, move(c_read_options), move(c_parse_options),
+ move(c_convert_options), memory_pool)
+ return reader
+
+
+cdef class WriteOptions(_Weakrefable):
+ """
+ Options for writing CSV files.
+
+ Parameters
+ ----------
+ include_header : bool, optional (default True)
+ Whether to write an initial header line with column names
+ batch_size : int, optional (default 1024)
+ How many rows to process together when converting and writing
+ CSV data
+ """
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, *, include_header=None, batch_size=None):
+ self.options.reset(new CCSVWriteOptions(CCSVWriteOptions.Defaults()))
+ if include_header is not None:
+ self.include_header = include_header
+ if batch_size is not None:
+ self.batch_size = batch_size
+
+ @property
+ def include_header(self):
+ """
+ Whether to write an initial header line with column names.
+ """
+ return deref(self.options).include_header
+
+ @include_header.setter
+ def include_header(self, value):
+ deref(self.options).include_header = value
+
+ @property
+ def batch_size(self):
+ """
+ How many rows to process together when converting and writing
+ CSV data.
+ """
+ return deref(self.options).batch_size
+
+ @batch_size.setter
+ def batch_size(self, value):
+ deref(self.options).batch_size = value
+
+ @staticmethod
+ cdef WriteOptions wrap(CCSVWriteOptions options):
+ out = WriteOptions()
+ out.options.reset(new CCSVWriteOptions(move(options)))
+ return out
+
+ def validate(self):
+ check_status(self.options.get().Validate())
+
+
+cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out):
+ if write_options is None:
+ out[0] = CCSVWriteOptions.Defaults()
+ else:
+ out[0] = deref(write_options.options)
+
+
+def write_csv(data, output_file, write_options=None,
+ MemoryPool memory_pool=None):
+ """
+ Write record batch or table to a CSV file.
+
+ Parameters
+ ----------
+ data : pyarrow.RecordBatch or pyarrow.Table
+ The data to write.
+ output_file : string, path, pyarrow.NativeFile, or file-like object
+ The location where to write the CSV data.
+ write_options : pyarrow.csv.WriteOptions
+ Options to configure writing the CSV data.
+ memory_pool : MemoryPool, optional
+ Pool for temporary allocations.
+ """
+ cdef:
+ shared_ptr[COutputStream] stream
+ CCSVWriteOptions c_write_options
+ CMemoryPool* c_memory_pool
+ CRecordBatch* batch
+ CTable* table
+ _get_write_options(write_options, &c_write_options)
+
+ get_writer(output_file, &stream)
+ c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+ c_write_options.io_context = CIOContext(c_memory_pool)
+ if isinstance(data, RecordBatch):
+ batch = pyarrow_unwrap_batch(data).get()
+ with nogil:
+ check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
+ elif isinstance(data, Table):
+ table = pyarrow_unwrap_table(data).get()
+ with nogil:
+ check_status(WriteCSV(deref(table), c_write_options, stream.get()))
+ else:
+ raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
+
+
+cdef class CSVWriter(_CRecordBatchWriter):
+ """
+ Writer to create a CSV file.
+
+ Parameters
+ ----------
+ sink : str, path, pyarrow.OutputStream or file-like object
+ The location where to write the CSV data.
+ schema : pyarrow.Schema
+ The schema of the data to be written.
+ write_options : pyarrow.csv.WriteOptions
+ Options to configure writing the CSV data.
+ memory_pool : MemoryPool, optional
+ Pool for temporary allocations.
+ """
+
+ def __init__(self, sink, Schema schema, *,
+ WriteOptions write_options=None, MemoryPool memory_pool=None):
+ cdef:
+ shared_ptr[COutputStream] c_stream
+ shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+ CCSVWriteOptions c_write_options
+ CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+ _get_write_options(write_options, &c_write_options)
+ c_write_options.io_context = CIOContext(c_memory_pool)
+ get_writer(sink, &c_stream)
+ with nogil:
+ self.writer = GetResultValue(MakeCSVWriter(
+ c_stream, c_schema, c_write_options))