summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/tests/parquet
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/python/pyarrow/tests/parquet')
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/common.py177
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/conftest.py87
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_basic.py631
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py115
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_data_types.py529
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_dataset.py1661
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_datetime.py440
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_metadata.py524
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_pandas.py687
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py276
-rw-r--r--src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py278
11 files changed, 5405 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/tests/parquet/common.py b/src/arrow/python/pyarrow/tests/parquet/common.py
new file mode 100644
index 000000000..90bfb55d1
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/common.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.tests import util
+
+parametrize_legacy_dataset = pytest.mark.parametrize(
+ "use_legacy_dataset",
+ [True, pytest.param(False, marks=pytest.mark.dataset)])
+parametrize_legacy_dataset_not_supported = pytest.mark.parametrize(
+ "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
+parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
+ "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
+ pytest.param(False, marks=pytest.mark.dataset)])
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not parquet'
+pytestmark = pytest.mark.parquet
+
+
+def _write_table(table, path, **kwargs):
+ # So we see the ImportError somewhere
+ import pyarrow.parquet as pq
+ from pyarrow.pandas_compat import _pandas_api
+
+ if _pandas_api.is_data_frame(table):
+ table = pa.Table.from_pandas(table)
+
+ pq.write_table(table, path, **kwargs)
+ return table
+
+
+def _read_table(*args, **kwargs):
+ import pyarrow.parquet as pq
+
+ table = pq.read_table(*args, **kwargs)
+ table.validate(full=True)
+ return table
+
+
+def _roundtrip_table(table, read_table_kwargs=None,
+ write_table_kwargs=None, use_legacy_dataset=True):
+ read_table_kwargs = read_table_kwargs or {}
+ write_table_kwargs = write_table_kwargs or {}
+
+ writer = pa.BufferOutputStream()
+ _write_table(table, writer, **write_table_kwargs)
+ reader = pa.BufferReader(writer.getvalue())
+ return _read_table(reader, use_legacy_dataset=use_legacy_dataset,
+ **read_table_kwargs)
+
+
+def _check_roundtrip(table, expected=None, read_table_kwargs=None,
+ use_legacy_dataset=True, **write_table_kwargs):
+ if expected is None:
+ expected = table
+
+ read_table_kwargs = read_table_kwargs or {}
+
+ # intentionally check twice
+ result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
+ write_table_kwargs=write_table_kwargs,
+ use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(expected)
+ result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
+ write_table_kwargs=write_table_kwargs,
+ use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(expected)
+
+
+def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True):
+ table = pa.Table.from_pandas(df)
+ result = _roundtrip_table(
+ table, write_table_kwargs=write_kwargs,
+ use_legacy_dataset=use_legacy_dataset)
+ return result.to_pandas()
+
+
+def _random_integers(size, dtype):
+ # We do not generate integers outside the int64 range
+ platform_int_info = np.iinfo('int_')
+ iinfo = np.iinfo(dtype)
+ return np.random.randint(max(iinfo.min, platform_int_info.min),
+ min(iinfo.max, platform_int_info.max),
+ size=size).astype(dtype)
+
+
+def _test_dataframe(size=10000, seed=0):
+ import pandas as pd
+
+ np.random.seed(seed)
+ df = pd.DataFrame({
+ 'uint8': _random_integers(size, np.uint8),
+ 'uint16': _random_integers(size, np.uint16),
+ 'uint32': _random_integers(size, np.uint32),
+ 'uint64': _random_integers(size, np.uint64),
+ 'int8': _random_integers(size, np.int8),
+ 'int16': _random_integers(size, np.int16),
+ 'int32': _random_integers(size, np.int32),
+ 'int64': _random_integers(size, np.int64),
+ 'float32': np.random.randn(size).astype(np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ 'strings': [util.rands(10) for i in range(size)],
+ 'all_none': [None] * size,
+ 'all_none_category': [None] * size
+ })
+
+ # TODO(PARQUET-1015)
+ # df['all_none_category'] = df['all_none_category'].astype('category')
+ return df
+
+
+def make_sample_file(table_or_df):
+ import pyarrow.parquet as pq
+
+ if isinstance(table_or_df, pa.Table):
+ a_table = table_or_df
+ else:
+ a_table = pa.Table.from_pandas(table_or_df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, compression='SNAPPY', version='2.6',
+ coerce_timestamps='ms')
+
+ buf.seek(0)
+ return pq.ParquetFile(buf)
+
+
+def alltypes_sample(size=10000, seed=0, categorical=False):
+ import pandas as pd
+
+ np.random.seed(seed)
+ arrays = {
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ # TODO(wesm): Test other timestamp resolutions now that arrow supports
+ # them
+ 'datetime': np.arange("2016-01-01T00:00:00.001", size,
+ dtype='datetime64[ms]'),
+ 'str': pd.Series([str(x) for x in range(size)]),
+ 'empty_str': [''] * size,
+ 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
+ 'null': [None] * size,
+ 'null_list': [None] * 2 + [[None] * (x % 4) for x in range(size - 2)],
+ }
+ if categorical:
+ arrays['str_category'] = arrays['str'].astype('category')
+ return pd.DataFrame(arrays)
diff --git a/src/arrow/python/pyarrow/tests/parquet/conftest.py b/src/arrow/python/pyarrow/tests/parquet/conftest.py
new file mode 100644
index 000000000..1e75493cd
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/conftest.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from pyarrow.util import guid
+
+
+@pytest.fixture(scope='module')
+def datadir(base_datadir):
+ return base_datadir / 'parquet'
+
+
+@pytest.fixture
+def s3_bucket(s3_server):
+ boto3 = pytest.importorskip('boto3')
+ botocore = pytest.importorskip('botocore')
+
+ host, port, access_key, secret_key = s3_server['connection']
+ s3 = boto3.resource(
+ 's3',
+ endpoint_url='http://{}:{}'.format(host, port),
+ aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ config=botocore.client.Config(signature_version='s3v4'),
+ region_name='us-east-1'
+ )
+ bucket = s3.Bucket('test-s3fs')
+ try:
+ bucket.create()
+ except Exception:
+ # we get BucketAlreadyOwnedByYou error with fsspec handler
+ pass
+ return 'test-s3fs'
+
+
+@pytest.fixture
+def s3_example_s3fs(s3_server, s3_bucket):
+ s3fs = pytest.importorskip('s3fs')
+
+ host, port, access_key, secret_key = s3_server['connection']
+ fs = s3fs.S3FileSystem(
+ key=access_key,
+ secret=secret_key,
+ client_kwargs={
+ 'endpoint_url': 'http://{}:{}'.format(host, port)
+ }
+ )
+
+ test_path = '{}/{}'.format(s3_bucket, guid())
+
+ fs.mkdir(test_path)
+ yield fs, test_path
+ try:
+ fs.rm(test_path, recursive=True)
+ except FileNotFoundError:
+ pass
+
+
+@pytest.fixture
+def s3_example_fs(s3_server):
+ from pyarrow.fs import FileSystem
+
+ host, port, access_key, secret_key = s3_server['connection']
+ uri = (
+ "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+ .format(access_key, secret_key, host, port)
+ )
+ fs, path = FileSystem.from_uri(uri)
+
+ fs.create_dir("mybucket")
+
+ yield fs, uri, path
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_basic.py b/src/arrow/python/pyarrow/tests/parquet/test_basic.py
new file mode 100644
index 000000000..cf1aaa21f
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_basic.py
@@ -0,0 +1,631 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from collections import OrderedDict
+import io
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow import fs
+from pyarrow.filesystem import LocalFileSystem, FileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
+ parametrize_legacy_dataset)
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import _read_table, _write_table
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.pandas_examples import dataframe_with_lists
+ from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+ pd = tm = None
+
+
+pytestmark = pytest.mark.parquet
+
+
+def test_parquet_invalid_version(tempdir):
+ table = pa.table({'a': [1, 2, 3]})
+ with pytest.raises(ValueError, match="Unsupported Parquet format version"):
+ _write_table(table, tempdir / 'test_version.parquet', version="2.2")
+ with pytest.raises(ValueError, match="Unsupported Parquet data page " +
+ "version"):
+ _write_table(table, tempdir / 'test_version.parquet',
+ data_page_version="2.2")
+
+
+@parametrize_legacy_dataset
+def test_set_data_page_size(use_legacy_dataset):
+ arr = pa.array([1, 2, 3] * 100000)
+ t = pa.Table.from_arrays([arr], names=['f0'])
+
+ # 128K, 512K
+ page_sizes = [2 << 16, 2 << 18]
+ for target_page_size in page_sizes:
+ _check_roundtrip(t, data_page_size=target_page_size,
+ use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_chunked_table_write(use_legacy_dataset):
+ # ARROW-232
+ tables = []
+ batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10))
+ tables.append(pa.Table.from_batches([batch] * 3))
+ df, _ = dataframe_with_lists()
+ batch = pa.RecordBatch.from_pandas(df)
+ tables.append(pa.Table.from_batches([batch] * 3))
+
+ for data_page_version in ['1.0', '2.0']:
+ for use_dictionary in [True, False]:
+ for table in tables:
+ _check_roundtrip(
+ table, version='2.6',
+ use_legacy_dataset=use_legacy_dataset,
+ data_page_version=data_page_version,
+ use_dictionary=use_dictionary)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_memory_map(tempdir, use_legacy_dataset):
+ df = alltypes_sample(size=10)
+
+ table = pa.Table.from_pandas(df)
+ _check_roundtrip(table, read_table_kwargs={'memory_map': True},
+ version='2.6', use_legacy_dataset=use_legacy_dataset)
+
+ filename = str(tempdir / 'tmp_file')
+ with open(filename, 'wb') as f:
+ _write_table(table, f, version='2.6')
+ table_read = pq.read_pandas(filename, memory_map=True,
+ use_legacy_dataset=use_legacy_dataset)
+ assert table_read.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_enable_buffered_stream(tempdir, use_legacy_dataset):
+ df = alltypes_sample(size=10)
+
+ table = pa.Table.from_pandas(df)
+ _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
+ version='2.6', use_legacy_dataset=use_legacy_dataset)
+
+ filename = str(tempdir / 'tmp_file')
+ with open(filename, 'wb') as f:
+ _write_table(table, f, version='2.6')
+ table_read = pq.read_pandas(filename, buffer_size=4096,
+ use_legacy_dataset=use_legacy_dataset)
+ assert table_read.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_special_chars_filename(tempdir, use_legacy_dataset):
+ table = pa.Table.from_arrays([pa.array([42])], ["ints"])
+ filename = "foo # bar"
+ path = tempdir / filename
+ assert not path.exists()
+ _write_table(table, str(path))
+ assert path.exists()
+ table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
+ assert table_read.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_invalid_source(use_legacy_dataset):
+ # Test that we provide an helpful error message pointing out
+ # that None wasn't expected when trying to open a Parquet None file.
+ #
+ # Depending on use_legacy_dataset the message changes slightly
+ # but in both cases it should point out that None wasn't expected.
+ with pytest.raises(TypeError, match="None"):
+ pq.read_table(None, use_legacy_dataset=use_legacy_dataset)
+
+ with pytest.raises(TypeError, match="None"):
+ pq.ParquetFile(None)
+
+
+@pytest.mark.slow
+def test_file_with_over_int16_max_row_groups():
+ # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper
+ # limit on the number of row groups, but this limit only impacts files with
+ # encrypted row group metadata because of the int16 row group ordinal used
+ # in the Parquet Thrift metadata. Unencrypted files are not impacted, so
+ # this test checks that it works (even if it isn't a good idea)
+ t = pa.table([list(range(40000))], names=['f0'])
+ _check_roundtrip(t, row_group_size=1)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_empty_table_roundtrip(use_legacy_dataset):
+ df = alltypes_sample(size=10)
+
+ # Create a non-empty table to infer the types correctly, then slice to 0
+ table = pa.Table.from_pandas(df)
+ table = pa.Table.from_arrays(
+ [col.chunk(0)[:0] for col in table.itercolumns()],
+ names=table.schema.names)
+
+ assert table.schema.field('null').type == pa.null()
+ assert table.schema.field('null_list').type == pa.list_(pa.null())
+ _check_roundtrip(
+ table, version='2.6', use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_empty_table_no_columns(use_legacy_dataset):
+ df = pd.DataFrame()
+ empty = pa.Table.from_pandas(df, preserve_index=False)
+ _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset):
+ # Bug report in ARROW-3792
+ cols = OrderedDict(
+ int32=pa.int32(),
+ list_string=pa.list_(pa.string())
+ )
+ data = [[], [OrderedDict(int32=1, list_string=('G',)), ]]
+
+ # This produces a table with a column like
+ # <Column name='list_string' type=ListType(list<item: string>)>
+ # [
+ # [],
+ # [
+ # [
+ # "G"
+ # ]
+ # ]
+ # ]
+ #
+ # Each column is a ChunkedArray with 2 elements
+ my_arrays = [pa.array(batch, type=pa.struct(cols)).flatten()
+ for batch in data]
+ my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols))
+ for batch in my_arrays]
+ tbl = pa.Table.from_batches(my_batches, pa.schema(cols))
+ _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multiple_path_types(tempdir, use_legacy_dataset):
+ # Test compatibility with PEP 519 path-like objects
+ path = tempdir / 'zzz.parquet'
+ df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+ _write_table(df, path)
+ table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+ # Test compatibility with plain string paths
+ path = str(tempdir) + 'zzz.parquet'
+ df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
+ _write_table(df, path)
+ table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@parametrize_legacy_dataset
+def test_fspath(tempdir, use_legacy_dataset):
+ # ARROW-12472 support __fspath__ objects without using str()
+ path = tempdir / "test.parquet"
+ table = pa.table({"a": [1, 2, 3]})
+ _write_table(table, path)
+
+ fs_protocol_obj = util.FSProtocolClass(path)
+
+ result = _read_table(
+ fs_protocol_obj, use_legacy_dataset=use_legacy_dataset
+ )
+ assert result.equals(table)
+
+ # combined with non-local filesystem raises
+ with pytest.raises(TypeError):
+ _read_table(fs_protocol_obj, filesystem=FileSystem())
+
+
+@pytest.mark.dataset
+@parametrize_legacy_dataset
+@pytest.mark.parametrize("filesystem", [
+ None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+])
+def test_relative_paths(tempdir, use_legacy_dataset, filesystem):
+ # reading and writing from relative paths
+ table = pa.table({"a": [1, 2, 3]})
+
+ # reading
+ pq.write_table(table, str(tempdir / "data.parquet"))
+ with util.change_cwd(tempdir):
+ result = pq.read_table("data.parquet", filesystem=filesystem,
+ use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(table)
+
+ # writing
+ with util.change_cwd(tempdir):
+ pq.write_table(table, "data2.parquet", filesystem=filesystem)
+ result = pq.read_table(tempdir / "data2.parquet")
+ assert result.equals(table)
+
+
+def test_read_non_existing_file():
+ # ensure we have a proper error message
+ with pytest.raises(FileNotFoundError):
+ pq.read_table('i-am-not-existing.parquet')
+
+
+def test_file_error_python_exception():
+ class BogusFile(io.BytesIO):
+ def read(self, *args):
+ raise ZeroDivisionError("zorglub")
+
+ def seek(self, *args):
+ raise ZeroDivisionError("zorglub")
+
+ # ensure the Python exception is restored
+ with pytest.raises(ZeroDivisionError, match="zorglub"):
+ pq.read_table(BogusFile(b""))
+
+
+@parametrize_legacy_dataset
+def test_parquet_read_from_buffer(tempdir, use_legacy_dataset):
+ # reading from a buffer from python's open()
+ table = pa.table({"a": [1, 2, 3]})
+ pq.write_table(table, str(tempdir / "data.parquet"))
+
+ with open(str(tempdir / "data.parquet"), "rb") as f:
+ result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(table)
+
+ with open(str(tempdir / "data.parquet"), "rb") as f:
+ result = pq.read_table(pa.PythonFile(f),
+ use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(table)
+
+
+@parametrize_legacy_dataset
+def test_byte_stream_split(use_legacy_dataset):
+ # This is only a smoke test.
+ arr_float = pa.array(list(map(float, range(100))))
+ arr_int = pa.array(list(map(int, range(100))))
+ data_float = [arr_float, arr_float]
+ table = pa.Table.from_arrays(data_float, names=['a', 'b'])
+
+ # Check with byte_stream_split for both columns.
+ _check_roundtrip(table, expected=table, compression="gzip",
+ use_dictionary=False, use_byte_stream_split=True)
+
+ # Check with byte_stream_split for column 'b' and dictionary
+ # for column 'a'.
+ _check_roundtrip(table, expected=table, compression="gzip",
+ use_dictionary=['a'],
+ use_byte_stream_split=['b'])
+
+ # Check with a collision for both columns.
+ _check_roundtrip(table, expected=table, compression="gzip",
+ use_dictionary=['a', 'b'],
+ use_byte_stream_split=['a', 'b'])
+
+ # Check with mixed column types.
+ mixed_table = pa.Table.from_arrays([arr_float, arr_int],
+ names=['a', 'b'])
+ _check_roundtrip(mixed_table, expected=mixed_table,
+ use_dictionary=['b'],
+ use_byte_stream_split=['a'])
+
+ # Try to use the wrong data type with the byte_stream_split encoding.
+ # This should throw an exception.
+ table = pa.Table.from_arrays([arr_int], names=['tmp'])
+ with pytest.raises(IOError):
+ _check_roundtrip(table, expected=table, use_byte_stream_split=True,
+ use_dictionary=False,
+ use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_compression_level(use_legacy_dataset):
+ arr = pa.array(list(map(int, range(1000))))
+ data = [arr, arr]
+ table = pa.Table.from_arrays(data, names=['a', 'b'])
+
+ # Check one compression level.
+ _check_roundtrip(table, expected=table, compression="gzip",
+ compression_level=1,
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Check another one to make sure that compression_level=1 does not
+ # coincide with the default one in Arrow.
+ _check_roundtrip(table, expected=table, compression="gzip",
+ compression_level=5,
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Check that the user can provide a compression per column
+ _check_roundtrip(table, expected=table,
+ compression={'a': "gzip", 'b': "snappy"},
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Check that the user can provide a compression level per column
+ _check_roundtrip(table, expected=table, compression="gzip",
+ compression_level={'a': 2, 'b': 3},
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Check that specifying a compression level for a codec which does allow
+ # specifying one, results into an error.
+ # Uncompressed, snappy, lz4 and lzo do not support specifying a compression
+ # level.
+ # GZIP (zlib) allows for specifying a compression level but as of up
+ # to version 1.2.11 the valid range is [-1, 9].
+ invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
+ ("None", 444), ("lzo", 14)]
+ buf = io.BytesIO()
+ for (codec, level) in invalid_combinations:
+ with pytest.raises((ValueError, OSError)):
+ _write_table(table, buf, compression=codec,
+ compression_level=level)
+
+
+def test_sanitized_spark_field_names():
+ a0 = pa.array([0, 1, 2, 3, 4])
+ name = 'prohib; ,\t{}'
+ table = pa.Table.from_arrays([a0], [name])
+
+ result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'})
+
+ expected_name = 'prohib______'
+ assert result.schema[0].name == expected_name
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multithreaded_read(use_legacy_dataset):
+ df = alltypes_sample(size=10000)
+
+ table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(table, buf, compression='SNAPPY', version='2.6')
+
+ buf.seek(0)
+ table1 = _read_table(
+ buf, use_threads=True, use_legacy_dataset=use_legacy_dataset)
+
+ buf.seek(0)
+ table2 = _read_table(
+ buf, use_threads=False, use_legacy_dataset=use_legacy_dataset)
+
+ assert table1.equals(table2)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_min_chunksize(use_legacy_dataset):
+ data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
+ table = pa.Table.from_pandas(data.reset_index())
+
+ buf = io.BytesIO()
+ _write_table(table, buf, chunk_size=-1)
+
+ buf.seek(0)
+ result = _read_table(buf, use_legacy_dataset=use_legacy_dataset)
+
+ assert result.equals(table)
+
+ with pytest.raises(ValueError):
+ _write_table(table, buf, chunk_size=0)
+
+
+@pytest.mark.pandas
+def test_write_error_deletes_incomplete_file(tempdir):
+ # ARROW-1285
+ df = pd.DataFrame({'a': list('abc'),
+ 'b': list(range(1, 4)),
+ 'c': np.arange(3, 6).astype('u1'),
+ 'd': np.arange(4.0, 7.0, dtype='float64'),
+ 'e': [True, False, True],
+ 'f': pd.Categorical(list('abc')),
+ 'g': pd.date_range('20130101', periods=3),
+ 'h': pd.date_range('20130101', periods=3,
+ tz='US/Eastern'),
+ 'i': pd.date_range('20130101', periods=3, freq='ns')})
+
+ pdf = pa.Table.from_pandas(df)
+
+ filename = tempdir / 'tmp_file'
+ try:
+ _write_table(pdf, filename)
+ except pa.ArrowException:
+ pass
+
+ assert not filename.exists()
+
+
+@parametrize_legacy_dataset
+def test_read_non_existent_file(tempdir, use_legacy_dataset):
+ path = 'non-existent-file.parquet'
+ try:
+ pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+ except Exception as e:
+ assert path in e.args[0]
+
+
+@parametrize_legacy_dataset
+def test_read_table_doesnt_warn(datadir, use_legacy_dataset):
+ with pytest.warns(None) as record:
+ pq.read_table(datadir / 'v0.7.1.parquet',
+ use_legacy_dataset=use_legacy_dataset)
+
+ assert len(record) == 0
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_zlib_compression_bug(use_legacy_dataset):
+ # ARROW-3514: "zlib deflate failed, output buffer too small"
+ table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
+ f = io.BytesIO()
+ pq.write_table(table, f, compression='gzip')
+
+ f.seek(0)
+ roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
+ tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
+
+
+@parametrize_legacy_dataset
+def test_parquet_file_too_small(tempdir, use_legacy_dataset):
+ path = str(tempdir / "test.parquet")
+ # TODO(dataset) with datasets API it raises OSError instead
+ with pytest.raises((pa.ArrowInvalid, OSError),
+ match='size is 0 bytes'):
+ with open(path, 'wb') as f:
+ pass
+ pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+
+ with pytest.raises((pa.ArrowInvalid, OSError),
+ match='size is 4 bytes'):
+ with open(path, 'wb') as f:
+ f.write(b'ffff')
+ pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@pytest.mark.fastparquet
+@pytest.mark.filterwarnings("ignore:RangeIndex:FutureWarning")
+@pytest.mark.filterwarnings("ignore:tostring:DeprecationWarning:fastparquet")
+def test_fastparquet_cross_compatibility(tempdir):
+ fp = pytest.importorskip('fastparquet')
+
+ df = pd.DataFrame(
+ {
+ "a": list("abc"),
+ "b": list(range(1, 4)),
+ "c": np.arange(4.0, 7.0, dtype="float64"),
+ "d": [True, False, True],
+ "e": pd.date_range("20130101", periods=3),
+ "f": pd.Categorical(["a", "b", "a"]),
+ # fastparquet writes list as BYTE_ARRAY JSON, so no roundtrip
+ # "g": [[1, 2], None, [1, 2, 3]],
+ }
+ )
+ table = pa.table(df)
+
+ # Arrow -> fastparquet
+ file_arrow = str(tempdir / "cross_compat_arrow.parquet")
+ pq.write_table(table, file_arrow, compression=None)
+
+ fp_file = fp.ParquetFile(file_arrow)
+ df_fp = fp_file.to_pandas()
+ tm.assert_frame_equal(df, df_fp)
+
+ # Fastparquet -> arrow
+ file_fastparquet = str(tempdir / "cross_compat_fastparquet.parquet")
+ fp.write(file_fastparquet, df)
+
+ table_fp = pq.read_pandas(file_fastparquet)
+ # for fastparquet written file, categoricals comes back as strings
+ # (no arrow schema in parquet metadata)
+ df['f'] = df['f'].astype(object)
+ tm.assert_frame_equal(table_fp.to_pandas(), df)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('array_factory', [
+ lambda: pa.array([0, None] * 10),
+ lambda: pa.array([0, None] * 10).dictionary_encode(),
+ lambda: pa.array(["", None] * 10),
+ lambda: pa.array(["", None] * 10).dictionary_encode(),
+])
+@pytest.mark.parametrize('use_dictionary', [False, True])
+@pytest.mark.parametrize('read_dictionary', [False, True])
+def test_buffer_contents(
+ array_factory, use_dictionary, read_dictionary, use_legacy_dataset
+):
+ # Test that null values are deterministically initialized to zero
+ # after a roundtrip through Parquet.
+ # See ARROW-8006 and ARROW-8011.
+ orig_table = pa.Table.from_pydict({"col": array_factory()})
+ bio = io.BytesIO()
+ pq.write_table(orig_table, bio, use_dictionary=True)
+ bio.seek(0)
+ read_dictionary = ['col'] if read_dictionary else None
+ table = pq.read_table(bio, use_threads=False,
+ read_dictionary=read_dictionary,
+ use_legacy_dataset=use_legacy_dataset)
+
+ for col in table.columns:
+ [chunk] = col.chunks
+ buf = chunk.buffers()[1]
+ assert buf.to_pybytes() == buf.size * b"\0"
+
+
+def test_parquet_compression_roundtrip(tempdir):
+ # ARROW-10480: ensure even with nonstandard Parquet file naming
+ # conventions, writing and then reading a file works. In
+ # particular, ensure that we don't automatically double-compress
+ # the stream due to auto-detecting the extension in the filename
+ table = pa.table([pa.array(range(4))], names=["ints"])
+ path = tempdir / "arrow-10480.pyarrow.gz"
+ pq.write_table(table, path, compression="GZIP")
+ result = pq.read_table(path)
+ assert result.equals(table)
+
+
+def test_empty_row_groups(tempdir):
+ # ARROW-3020
+ table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])
+
+ path = tempdir / 'empty_row_groups.parquet'
+
+ num_groups = 3
+ with pq.ParquetWriter(path, table.schema) as writer:
+ for i in range(num_groups):
+ writer.write_table(table)
+
+ reader = pq.ParquetFile(path)
+ assert reader.metadata.num_row_groups == num_groups
+
+ for i in range(num_groups):
+ assert reader.read_row_group(i).equals(table)
+
+
+def test_reads_over_batch(tempdir):
+ data = [None] * (1 << 20)
+ data.append([1])
+ # Large list<int64> with mostly nones and one final
+ # value. This should force batched reads when
+ # reading back.
+ table = pa.Table.from_arrays([data], ['column'])
+
+ path = tempdir / 'arrow-11607.parquet'
+ pq.write_table(table, path)
+ table2 = pq.read_table(path)
+ assert table == table2
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py b/src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py
new file mode 100644
index 000000000..91bc08df6
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import pyarrow as pa
+from pyarrow.tests.parquet.common import parametrize_legacy_dataset
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import (_read_table,
+ _check_roundtrip)
+except ImportError:
+ pq = None
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe
+except ImportError:
+ pd = tm = None
+
+pytestmark = pytest.mark.parquet
+
+# Tests for ARROW-11497
+_test_data_simple = [
+ {'items': [1, 2]},
+ {'items': [0]},
+]
+
+_test_data_complex = [
+ {'items': [{'name': 'elem1', 'value': '1'},
+ {'name': 'elem2', 'value': '2'}]},
+ {'items': [{'name': 'elem1', 'value': '0'}]},
+]
+
+parametrize_test_data = pytest.mark.parametrize(
+ "test_data", [_test_data_simple, _test_data_complex])
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@parametrize_test_data
+def test_write_compliant_nested_type_enable(tempdir,
+ use_legacy_dataset, test_data):
+ # prepare dataframe for testing
+ df = pd.DataFrame(data=test_data)
+ # verify that we can read/write pandas df with new flag
+ _roundtrip_pandas_dataframe(df,
+ write_kwargs={
+ 'use_compliant_nested_type': True},
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Write to a parquet file with compliant nested type
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ path = str(tempdir / 'data.parquet')
+ with pq.ParquetWriter(path, table.schema,
+ use_compliant_nested_type=True,
+ version='2.6') as writer:
+ writer.write_table(table)
+ # Read back as a table
+ new_table = _read_table(path)
+ # Validate that "items" columns compliant to Parquet nested format
+ # Should be like this: list<element: struct<name: string, value: string>>
+ assert isinstance(new_table.schema.types[0], pa.ListType)
+ assert new_table.schema.types[0].value_field.name == 'element'
+
+ # Verify that the new table can be read/written correctly
+ _check_roundtrip(new_table,
+ use_legacy_dataset=use_legacy_dataset,
+ use_compliant_nested_type=True)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@parametrize_test_data
+def test_write_compliant_nested_type_disable(tempdir,
+ use_legacy_dataset, test_data):
+ # prepare dataframe for testing
+ df = pd.DataFrame(data=test_data)
+ # verify that we can read/write with new flag disabled (default behaviour)
+ _roundtrip_pandas_dataframe(df, write_kwargs={},
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Write to a parquet file while disabling compliant nested type
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ path = str(tempdir / 'data.parquet')
+ with pq.ParquetWriter(path, table.schema, version='2.6') as writer:
+ writer.write_table(table)
+ new_table = _read_table(path)
+
+ # Validate that "items" columns is not compliant to Parquet nested format
+ # Should be like this: list<item: struct<name: string, value: string>>
+ assert isinstance(new_table.schema.types[0], pa.ListType)
+ assert new_table.schema.types[0].value_field.name == 'item'
+
+ # Verify that the new table can be read/written correctly
+ _check_roundtrip(new_table,
+ use_legacy_dataset=use_legacy_dataset,
+ use_compliant_nested_type=False)
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_data_types.py b/src/arrow/python/pyarrow/tests/parquet/test_data_types.py
new file mode 100644
index 000000000..1e2660006
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_data_types.py
@@ -0,0 +1,529 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import decimal
+import io
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (_check_roundtrip,
+ parametrize_legacy_dataset)
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import _read_table, _write_table
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.pandas_examples import (dataframe_with_arrays,
+ dataframe_with_lists)
+ from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+ pd = tm = None
+
+
+pytestmark = pytest.mark.parquet
+
+
+# General roundtrip of data types
+# -----------------------------------------------------------------------------
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('chunk_size', [None, 1000])
+def test_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset):
+ df = alltypes_sample(size=10000, categorical=True)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ assert arrow_table.schema.pandas_metadata is not None
+
+ _write_table(arrow_table, filename, version='2.6',
+ coerce_timestamps='ms', chunk_size=chunk_size)
+ table_read = pq.read_pandas(
+ filename, use_legacy_dataset=use_legacy_dataset)
+ assert table_read.schema.pandas_metadata is not None
+
+ read_metadata = table_read.schema.metadata
+ assert arrow_table.schema.metadata == read_metadata
+
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_1_0_roundtrip(tempdir, use_legacy_dataset):
+ size = 10000
+ np.random.seed(0)
+ df = pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ 'str': [str(x) for x in range(size)],
+ 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
+ 'empty_str': [''] * size
+ })
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ _write_table(arrow_table, filename, version='1.0')
+ table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+
+ # We pass uint32_t as int64_t if we write Parquet version 1.0
+ df['uint32'] = df['uint32'].values.astype(np.int64)
+
+ tm.assert_frame_equal(df, df_read)
+
+
+# Dictionary
+# -----------------------------------------------------------------------------
+
+
+def _simple_table_write_read(table, use_legacy_dataset):
+ bio = pa.BufferOutputStream()
+ pq.write_table(table, bio)
+ contents = bio.getvalue()
+ return pq.read_table(
+ pa.BufferReader(contents), use_legacy_dataset=use_legacy_dataset
+ )
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_direct_read_dictionary(use_legacy_dataset):
+ # ARROW-3325
+ repeats = 10
+ nunique = 5
+
+ data = [
+ [util.rands(10) for i in range(nunique)] * repeats,
+
+ ]
+ table = pa.table(data, names=['f0'])
+
+ bio = pa.BufferOutputStream()
+ pq.write_table(table, bio)
+ contents = bio.getvalue()
+
+ result = pq.read_table(pa.BufferReader(contents),
+ read_dictionary=['f0'],
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Compute dictionary-encoded subfield
+ expected = pa.table([table[0].dictionary_encode()], names=['f0'])
+ assert result.equals(expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_direct_read_dictionary_subfield(use_legacy_dataset):
+ repeats = 10
+ nunique = 5
+
+ data = [
+ [[util.rands(10)] for i in range(nunique)] * repeats,
+ ]
+ table = pa.table(data, names=['f0'])
+
+ bio = pa.BufferOutputStream()
+ pq.write_table(table, bio)
+ contents = bio.getvalue()
+ result = pq.read_table(pa.BufferReader(contents),
+ read_dictionary=['f0.list.item'],
+ use_legacy_dataset=use_legacy_dataset)
+
+ arr = pa.array(data[0])
+ values_as_dict = arr.values.dictionary_encode()
+
+ inner_indices = values_as_dict.indices.cast('int32')
+ new_values = pa.DictionaryArray.from_arrays(inner_indices,
+ values_as_dict.dictionary)
+
+ offsets = pa.array(range(51), type='int32')
+ expected_arr = pa.ListArray.from_arrays(offsets, new_values)
+ expected = pa.table([expected_arr], names=['f0'])
+
+ assert result.equals(expected)
+ assert result[0].num_chunks == 1
+
+
+@parametrize_legacy_dataset
+def test_dictionary_array_automatically_read(use_legacy_dataset):
+ # ARROW-3246
+
+ # Make a large dictionary, a little over 4MB of data
+ dict_length = 4000
+ dict_values = pa.array([('x' * 1000 + '_{}'.format(i))
+ for i in range(dict_length)])
+
+ num_chunks = 10
+ chunk_size = 100
+ chunks = []
+ for i in range(num_chunks):
+ indices = np.random.randint(0, dict_length,
+ size=chunk_size).astype(np.int32)
+ chunks.append(pa.DictionaryArray.from_arrays(pa.array(indices),
+ dict_values))
+
+ table = pa.table([pa.chunked_array(chunks)], names=['f0'])
+ result = _simple_table_write_read(table, use_legacy_dataset)
+
+ assert result.equals(table)
+
+ # The only key in the metadata was the Arrow schema key
+ assert result.schema.metadata is None
+
+
+# Decimal
+# -----------------------------------------------------------------------------
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_decimal_roundtrip(tempdir, use_legacy_dataset):
+ num_values = 10
+
+ columns = {}
+ for precision in range(1, 39):
+ for scale in range(0, precision + 1):
+ with util.random_seed(0):
+ random_decimal_values = [
+ util.randdecimal(precision, scale)
+ for _ in range(num_values)
+ ]
+ column_name = ('dec_precision_{:d}_scale_{:d}'
+ .format(precision, scale))
+ columns[column_name] = random_decimal_values
+
+ expected = pd.DataFrame(columns)
+ filename = tempdir / 'decimals.parquet'
+ string_filename = str(filename)
+ table = pa.Table.from_pandas(expected)
+ _write_table(table, string_filename)
+ result_table = _read_table(
+ string_filename, use_legacy_dataset=use_legacy_dataset)
+ result = result_table.to_pandas()
+ tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@pytest.mark.xfail(
+ raises=OSError, reason='Parquet does not support negative scale'
+)
+def test_decimal_roundtrip_negative_scale(tempdir):
+ expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]})
+ filename = tempdir / 'decimals.parquet'
+ string_filename = str(filename)
+ t = pa.Table.from_pandas(expected)
+ _write_table(t, string_filename)
+ result_table = _read_table(string_filename)
+ result = result_table.to_pandas()
+ tm.assert_frame_equal(result, expected)
+
+
+# List types
+# -----------------------------------------------------------------------------
+
+
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dtype', [int, float])
+def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
+ filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
+ data = [pa.array(list(map(dtype, range(5))))]
+ table = pa.Table.from_arrays(data, names=['a'])
+ _write_table(table, filename)
+ table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+ for i in range(table.num_columns):
+ col_written = table[i]
+ col_read = table_read[i]
+ assert table.field(i).name == table_read.field(i).name
+ assert col_read.num_chunks == 1
+ data_written = col_written.chunk(0)
+ data_read = col_read.chunk(0)
+ assert data_written.equals(data_read)
+
+
+@parametrize_legacy_dataset
+def test_empty_lists_table_roundtrip(use_legacy_dataset):
+ # ARROW-2744: Shouldn't crash when writing an array of empty lists
+ arr = pa.array([[], []], type=pa.list_(pa.int32()))
+ table = pa.Table.from_arrays([arr], ["A"])
+ _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
+ # Reproduce failure in ARROW-5630
+ typ = pa.list_(pa.field("item", pa.float32(), False))
+ num_rows = 10000
+ t = pa.table([
+ pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] *
+ (num_rows // 10)), type=typ)
+ ], ['a'])
+ _check_roundtrip(
+ t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
+
+
+@parametrize_legacy_dataset
+def test_nested_list_struct_multiple_batches_roundtrip(
+ tempdir, use_legacy_dataset
+):
+ # Reproduce failure in ARROW-11024
+ data = [[{'x': 'abc', 'y': 'abc'}]]*100 + [[{'x': 'abc', 'y': 'gcb'}]]*100
+ table = pa.table([pa.array(data)], names=['column'])
+ _check_roundtrip(
+ table, row_group_size=20, use_legacy_dataset=use_legacy_dataset)
+
+ # Reproduce failure in ARROW-11069 (plain non-nested structs with strings)
+ data = pa.array(
+ [{'a': '1', 'b': '2'}, {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'}]*10
+ )
+ table = pa.table({'column': data})
+ _check_roundtrip(
+ table, row_group_size=10, use_legacy_dataset=use_legacy_dataset)
+
+
+def test_writing_empty_lists():
+ # ARROW-2591: [Python] Segmentation fault issue in pq.write_table
+ arr1 = pa.array([[], []], pa.list_(pa.int32()))
+ table = pa.Table.from_arrays([arr1], ['list(int32)'])
+ _check_roundtrip(table)
+
+
+@pytest.mark.pandas
+def test_column_of_arrays(tempdir):
+ df, schema = dataframe_with_arrays()
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df, schema=schema)
+ _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms')
+ table_read = _read_table(filename)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+def test_column_of_lists(tempdir):
+ df, schema = dataframe_with_lists(parquet_compatible=True)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df, schema=schema)
+ _write_table(arrow_table, filename, version='2.6')
+ table_read = _read_table(filename)
+ df_read = table_read.to_pandas()
+
+ tm.assert_frame_equal(df, df_read)
+
+
+def test_large_list_records():
+ # This was fixed in PARQUET-1100
+
+ list_lengths = np.random.randint(0, 500, size=50)
+ list_lengths[::10] = 0
+
+ list_values = [list(map(int, np.random.randint(0, 100, size=x)))
+ if i % 8 else None
+ for i, x in enumerate(list_lengths)]
+
+ a1 = pa.array(list_values)
+
+ table = pa.Table.from_arrays([a1], ['int_lists'])
+ _check_roundtrip(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_nested_convenience(tempdir, use_legacy_dataset):
+ # ARROW-1684
+ df = pd.DataFrame({
+ 'a': [[1, 2, 3], None, [4, 5], []],
+ 'b': [[1.], None, None, [6., 7.]],
+ })
+
+ path = str(tempdir / 'nested_convenience.parquet')
+
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ _write_table(table, path)
+
+ read = pq.read_table(
+ path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
+ tm.assert_frame_equal(read.to_pandas(), df[['a']])
+
+ read = pq.read_table(
+ path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset)
+ tm.assert_frame_equal(read.to_pandas(), df)
+
+
+# Binary
+# -----------------------------------------------------------------------------
+
+
+def test_fixed_size_binary():
+ t0 = pa.binary(10)
+ data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo']
+ a0 = pa.array(data, type=t0)
+
+ table = pa.Table.from_arrays([a0],
+ ['binary[10]'])
+ _check_roundtrip(table)
+
+
+# Large types
+# -----------------------------------------------------------------------------
+
+
+@pytest.mark.slow
+@pytest.mark.large_memory
+def test_large_table_int32_overflow():
+ size = np.iinfo('int32').max + 1
+
+ arr = np.ones(size, dtype='uint8')
+
+ parr = pa.array(arr, type=pa.uint8())
+
+ table = pa.Table.from_arrays([parr], names=['one'])
+ f = io.BytesIO()
+ _write_table(table, f)
+
+
+def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs):
+ stream = pa.BufferOutputStream()
+ _write_table(table, stream, **write_kwargs)
+ buf = stream.getvalue()
+ return _read_table(buf, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.slow
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_byte_array_exactly_2gb(use_legacy_dataset):
+ # Test edge case reported in ARROW-3762
+ val = b'x' * (1 << 10)
+
+ base = pa.array([val] * ((1 << 21) - 1))
+ cases = [
+ [b'x' * 1023], # 2^31 - 1
+ [b'x' * 1024], # 2^31
+ [b'x' * 1025] # 2^31 + 1
+ ]
+ for case in cases:
+ values = pa.chunked_array([base, pa.array(case)])
+ t = pa.table([values], names=['f0'])
+ result = _simple_table_roundtrip(
+ t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False)
+ assert t.equals(result)
+
+
+@pytest.mark.slow
+@pytest.mark.pandas
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_binary_array_overflow_to_chunked(use_legacy_dataset):
+ # ARROW-3762
+
+ # 2^31 + 1 bytes
+ values = [b'x'] + [
+ b'x' * (1 << 20)
+ ] * 2 * (1 << 10)
+ df = pd.DataFrame({'byte_col': values})
+
+ tbl = pa.Table.from_pandas(df, preserve_index=False)
+ read_tbl = _simple_table_roundtrip(
+ tbl, use_legacy_dataset=use_legacy_dataset)
+
+ col0_data = read_tbl[0]
+ assert isinstance(col0_data, pa.ChunkedArray)
+
+ # Split up into 2GB chunks
+ assert col0_data.num_chunks == 2
+
+ assert tbl.equals(read_tbl)
+
+
+@pytest.mark.slow
+@pytest.mark.pandas
+@pytest.mark.large_memory
+@parametrize_legacy_dataset
+def test_list_of_binary_large_cell(use_legacy_dataset):
+ # ARROW-4688
+ data = []
+
+ # TODO(wesm): handle chunked children
+ # 2^31 - 1 bytes in a single cell
+ # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)])
+
+ # A little under 2GB in cell each containing approximately 10MB each
+ data.extend([[b'x' * 1000000] * 10] * 214)
+
+ arr = pa.array(data)
+ table = pa.Table.from_arrays([arr], ['chunky_cells'])
+ read_table = _simple_table_roundtrip(
+ table, use_legacy_dataset=use_legacy_dataset)
+ assert table.equals(read_table)
+
+
+def test_large_binary():
+ data = [b'foo', b'bar'] * 50
+ for type in [pa.large_binary(), pa.large_string()]:
+ arr = pa.array(data, type=type)
+ table = pa.Table.from_arrays([arr], names=['strs'])
+ for use_dictionary in [False, True]:
+ _check_roundtrip(table, use_dictionary=use_dictionary)
+
+
+@pytest.mark.slow
+@pytest.mark.large_memory
+def test_large_binary_huge():
+ s = b'xy' * 997
+ data = [s] * ((1 << 33) // len(s))
+ for type in [pa.large_binary(), pa.large_string()]:
+ arr = pa.array(data, type=type)
+ table = pa.Table.from_arrays([arr], names=['strs'])
+ for use_dictionary in [False, True]:
+ _check_roundtrip(table, use_dictionary=use_dictionary)
+ del arr, table
+
+
+@pytest.mark.large_memory
+def test_large_binary_overflow():
+ s = b'x' * (1 << 31)
+ arr = pa.array([s], type=pa.large_binary())
+ table = pa.Table.from_arrays([arr], names=['strs'])
+ for use_dictionary in [False, True]:
+ writer = pa.BufferOutputStream()
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="Parquet cannot store strings with size 2GB or more"):
+ _write_table(table, writer, use_dictionary=use_dictionary)
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_dataset.py b/src/arrow/python/pyarrow/tests/parquet/test_dataset.py
new file mode 100644
index 000000000..82f7e5814
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_dataset.py
@@ -0,0 +1,1661 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import os
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow import fs
+from pyarrow.filesystem import LocalFileSystem
+from pyarrow.tests import util
+from pyarrow.tests.parquet.common import (
+ parametrize_legacy_dataset, parametrize_legacy_dataset_fixed,
+ parametrize_legacy_dataset_not_supported)
+from pyarrow.util import guid
+from pyarrow.vendored.version import Version
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import (
+ _read_table, _test_dataframe, _write_table)
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+except ImportError:
+ pd = tm = None
+
+pytestmark = pytest.mark.parquet
+
+
+@pytest.mark.pandas
+def test_parquet_piece_read(tempdir):
+ df = _test_dataframe(1000)
+ table = pa.Table.from_pandas(df)
+
+ path = tempdir / 'parquet_piece_read.parquet'
+ _write_table(table, path, version='2.6')
+
+ with pytest.warns(DeprecationWarning):
+ piece1 = pq.ParquetDatasetPiece(path)
+
+ result = piece1.read()
+ assert result.equals(table)
+
+
+@pytest.mark.pandas
+def test_parquet_piece_open_and_get_metadata(tempdir):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df)
+
+ path = tempdir / 'parquet_piece_read.parquet'
+ _write_table(table, path, version='2.6')
+
+ with pytest.warns(DeprecationWarning):
+ piece = pq.ParquetDatasetPiece(path)
+ table1 = piece.read()
+ assert isinstance(table1, pa.Table)
+ meta1 = piece.get_metadata()
+ assert isinstance(meta1, pq.FileMetaData)
+
+ assert table.equals(table1)
+
+
+@pytest.mark.filterwarnings("ignore:ParquetDatasetPiece:DeprecationWarning")
+def test_parquet_piece_basics():
+ path = '/baz.parq'
+
+ piece1 = pq.ParquetDatasetPiece(path)
+ piece2 = pq.ParquetDatasetPiece(path, row_group=1)
+ piece3 = pq.ParquetDatasetPiece(
+ path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
+
+ assert str(piece1) == path
+ assert str(piece2) == '/baz.parq | row_group=1'
+ assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
+
+ assert piece1 == piece1
+ assert piece2 == piece2
+ assert piece3 == piece3
+ assert piece1 != piece3
+
+
+def test_partition_set_dictionary_type():
+ set1 = pq.PartitionSet('key1', ['foo', 'bar', 'baz'])
+ set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
+
+ assert isinstance(set1.dictionary, pa.StringArray)
+ assert isinstance(set2.dictionary, pa.IntegerArray)
+
+ set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
+ with pytest.raises(TypeError):
+ set3.dictionary
+
+
+@parametrize_legacy_dataset_fixed
+def test_filesystem_uri(tempdir, use_legacy_dataset):
+ table = pa.table({"a": [1, 2, 3]})
+
+ directory = tempdir / "data_dir"
+ directory.mkdir()
+ path = directory / "data.parquet"
+ pq.write_table(table, str(path))
+
+ # filesystem object
+ result = pq.read_table(
+ path, filesystem=fs.LocalFileSystem(),
+ use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(table)
+
+ # filesystem URI
+ result = pq.read_table(
+ "data_dir/data.parquet", filesystem=util._filesystem_uri(tempdir),
+ use_legacy_dataset=use_legacy_dataset)
+ assert result.equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_directory(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)
+
+
+@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
+@pytest.mark.pandas
+def test_create_parquet_dataset_multi_threaded(tempdir):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ _partition_test_for_filesystem(fs, base_path)
+
+ manifest = pq.ParquetManifest(base_path, filesystem=fs,
+ metadata_nthreads=1)
+ dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
+ assert len(dataset.pieces) > 0
+ partitions = dataset.partitions
+ assert len(partitions.partition_names) > 0
+ assert partitions.partition_names == manifest.partitions.partition_names
+ assert len(partitions.levels) == len(manifest.partitions.levels)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_partitioned_columns_selection(tempdir, use_legacy_dataset):
+ # ARROW-3861 - do not include partition columns in resulting table when
+ # `columns` keyword was passed without those columns
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+ _partition_test_for_filesystem(fs, base_path)
+
+ dataset = pq.ParquetDataset(
+ base_path, use_legacy_dataset=use_legacy_dataset)
+ result = dataset.read(columns=["values"])
+ if use_legacy_dataset:
+ # ParquetDataset implementation always includes the partition columns
+ # automatically, and we can't easily "fix" this since dask relies on
+ # this behaviour (ARROW-8644)
+ assert result.column_names == ["values", "foo", "bar"]
+ else:
+ assert result.column_names == ["values"]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_equivalency(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1]
+ string_keys = ['a', 'b', 'c']
+ boolean_keys = [True, False]
+ partition_spec = [
+ ['integer', integer_keys],
+ ['string', string_keys],
+ ['boolean', boolean_keys]
+ ]
+
+ df = pd.DataFrame({
+ 'integer': np.array(integer_keys, dtype='i4').repeat(15),
+ 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+ 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+ 3),
+ }, columns=['integer', 'string', 'boolean'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ # Old filters syntax:
+ # integer == 1 AND string != b AND boolean == True
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[('integer', '=', 1), ('string', '!=', 'b'),
+ ('boolean', '==', 'True')],
+ use_legacy_dataset=use_legacy_dataset,
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas().reset_index(drop=True))
+
+ assert 0 not in result_df['integer'].values
+ assert 'b' not in result_df['string'].values
+ assert False not in result_df['boolean'].values
+
+ # filters in disjunctive normal form:
+ # (integer == 1 AND string != b AND boolean == True) OR
+ # (integer == 2 AND boolean == False)
+ # TODO(ARROW-3388): boolean columns are reconstructed as string
+ filters = [
+ [
+ ('integer', '=', 1),
+ ('string', '!=', 'b'),
+ ('boolean', '==', 'True')
+ ],
+ [('integer', '=', 0), ('boolean', '==', 'False')]
+ ]
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs, filters=filters,
+ use_legacy_dataset=use_legacy_dataset)
+ table = dataset.read()
+ result_df = table.to_pandas().reset_index(drop=True)
+
+ # Check that all rows in the DF fulfill the filter
+ # Pandas 0.23.x has problems with indexing constant memoryviews in
+ # categoricals. Thus we need to make an explicit copy here with np.array.
+ df_filter_1 = (np.array(result_df['integer']) == 1) \
+ & (np.array(result_df['string']) != 'b') \
+ & (np.array(result_df['boolean']) == 'True')
+ df_filter_2 = (np.array(result_df['integer']) == 0) \
+ & (np.array(result_df['boolean']) == 'False')
+ assert df_filter_1.sum() > 0
+ assert df_filter_2.sum() > 0
+ assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
+
+ if use_legacy_dataset:
+ # Check for \0 in predicate values. Until they are correctly
+ # implemented in ARROW-3391, they would otherwise lead to weird
+ # results with the current code.
+ with pytest.raises(NotImplementedError):
+ filters = [[('string', '==', b'1\0a')]]
+ pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+ with pytest.raises(NotImplementedError):
+ filters = [[('string', '==', '1\0a')]]
+ pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
+ else:
+ for filters in [[[('string', '==', b'1\0a')]],
+ [[('string', '==', '1\0a')]]]:
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs, filters=filters,
+ use_legacy_dataset=False)
+ assert dataset.read().num_rows == 0
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[
+ ('integers', '<', 4),
+ ('integers', '>', 1),
+ ],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ result_list = [x for x in map(int, result_df['integers'].values)]
+ assert result_list == [2, 3]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.xfail(
+ # different error with use_legacy_datasets because result_df is no longer
+ # categorical
+ raises=(TypeError, AssertionError),
+ reason='Loss of type information in creation of categoricals.'
+)
+def test_filters_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ date_keys = [
+ datetime.date(2018, 4, 9),
+ datetime.date(2018, 4, 10),
+ datetime.date(2018, 4, 11),
+ datetime.date(2018, 4, 12),
+ datetime.date(2018, 4, 13)
+ ]
+ partition_spec = [
+ ['dates', date_keys]
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'dates': np.array(date_keys, dtype='datetime64'),
+ }, columns=['index', 'dates'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[
+ ('dates', '<', "2018-04-12"),
+ ('dates', '>', "2018-04-10")
+ ],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ expected = pd.Categorical(
+ np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
+ categories=np.array(date_keys, dtype='datetime64'))
+
+ assert result_df['dates'].values == expected
+
+
+@pytest.mark.pandas
+@pytest.mark.dataset
+def test_filters_inclusive_datetime(tempdir):
+ # ARROW-11480
+ path = tempdir / 'timestamps.parquet'
+
+ pd.DataFrame({
+ "dates": pd.date_range("2020-01-01", periods=10, freq="D"),
+ "id": range(10)
+ }).to_parquet(path, use_deprecated_int96_timestamps=True)
+
+ table = pq.read_table(path, filters=[
+ ("dates", "<=", datetime.datetime(2020, 1, 5))
+ ])
+
+ assert table.column('id').to_pylist() == [0, 1, 2, 3, 4]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_inclusive_integer(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[
+ ('integers', '<=', 3),
+ ('integers', '>=', 2),
+ ],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ result_list = [int(x) for x in map(int, result_df['integers'].values)]
+ assert result_list == [2, 3]
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_inclusive_set(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1]
+ string_keys = ['a', 'b', 'c']
+ boolean_keys = [True, False]
+ partition_spec = [
+ ['integer', integer_keys],
+ ['string', string_keys],
+ ['boolean', boolean_keys]
+ ]
+
+ df = pd.DataFrame({
+ 'integer': np.array(integer_keys, dtype='i4').repeat(15),
+ 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
+ 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
+ 3),
+ }, columns=['integer', 'string', 'boolean'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[('string', 'in', 'ab')],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas().reset_index(drop=True))
+
+ assert 'a' in result_df['string'].values
+ assert 'b' in result_df['string'].values
+ assert 'c' not in result_df['string'].values
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs,
+ filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
+ ('boolean', 'not in', {False})],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ table = dataset.read()
+ result_df = (table.to_pandas().reset_index(drop=True))
+
+ assert 0 not in result_df['integer'].values
+ assert 'c' not in result_df['string'].values
+ assert False not in result_df['boolean'].values
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_invalid_pred_op(tempdir, use_legacy_dataset):
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ with pytest.raises(TypeError):
+ pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', 'in', 3), ],
+ use_legacy_dataset=use_legacy_dataset)
+
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', '=<', 3), ],
+ use_legacy_dataset=use_legacy_dataset)
+
+ if use_legacy_dataset:
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', 'in', set()), ],
+ use_legacy_dataset=use_legacy_dataset)
+ else:
+ # Dataset API returns empty table instead
+ dataset = pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', 'in', set()), ],
+ use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().num_rows == 0
+
+ if use_legacy_dataset:
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', '!=', {3})],
+ use_legacy_dataset=use_legacy_dataset)
+ else:
+ dataset = pq.ParquetDataset(base_path,
+ filesystem=fs,
+ filters=[('integers', '!=', {3})],
+ use_legacy_dataset=use_legacy_dataset)
+ with pytest.raises(NotImplementedError):
+ assert dataset.read().num_rows == 0
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_filters_invalid_column(tempdir, use_legacy_dataset):
+ # ARROW-5572 - raise error on invalid name in filter specification
+ # works with new dataset / xfail with legacy implementation
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [['integers', integer_keys]]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ msg = r"No match for FieldRef.Name\(non_existent_column\)"
+ with pytest.raises(ValueError, match=msg):
+ pq.ParquetDataset(base_path, filesystem=fs,
+ filters=[('non_existent_column', '<', 3), ],
+ use_legacy_dataset=use_legacy_dataset).read()
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_filters_read_table(tempdir, use_legacy_dataset):
+ # test that filters keyword is passed through in read_table
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ integer_keys = [0, 1, 2, 3, 4]
+ partition_spec = [
+ ['integers', integer_keys],
+ ]
+ N = 5
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'integers': np.array(integer_keys, dtype='i4'),
+ }, columns=['index', 'integers'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ table = pq.read_table(
+ base_path, filesystem=fs, filters=[('integers', '<', 3)],
+ use_legacy_dataset=use_legacy_dataset)
+ assert table.num_rows == 3
+
+ table = pq.read_table(
+ base_path, filesystem=fs, filters=[[('integers', '<', 3)]],
+ use_legacy_dataset=use_legacy_dataset)
+ assert table.num_rows == 3
+
+ table = pq.read_pandas(
+ base_path, filters=[('integers', '<', 3)],
+ use_legacy_dataset=use_legacy_dataset)
+ assert table.num_rows == 3
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_partition_keys_with_underscores(tempdir, use_legacy_dataset):
+ # ARROW-5666 - partition field values with underscores preserve underscores
+ # xfail with legacy dataset -> they get interpreted as integers
+ fs = LocalFileSystem._get_instance()
+ base_path = tempdir
+
+ string_keys = ["2019_2", "2019_3"]
+ partition_spec = [
+ ['year_week', string_keys],
+ ]
+ N = 2
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'year_week': np.array(string_keys, dtype='object'),
+ }, columns=['index', 'year_week'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, use_legacy_dataset=use_legacy_dataset)
+ result = dataset.read()
+ assert result.column("year_week").to_pylist() == string_keys
+
+
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_read_s3fs(s3_example_s3fs, use_legacy_dataset):
+ fs, path = s3_example_s3fs
+ path = path + "/test.parquet"
+ table = pa.table({"a": [1, 2, 3]})
+ _write_table(table, path, filesystem=fs)
+
+ result = _read_table(
+ path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+ )
+ assert result.equals(table)
+
+
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
+ fs, directory = s3_example_s3fs
+ path = directory + "/test.parquet"
+ table = pa.table({"a": [1, 2, 3]})
+ _write_table(table, path, filesystem=fs)
+
+ result = _read_table(
+ directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+ )
+ assert result.equals(table)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_read_partitioned_directory_s3fs_wrapper(
+ s3_example_s3fs, use_legacy_dataset
+):
+ import s3fs
+
+ from pyarrow.filesystem import S3FSWrapper
+
+ if Version(s3fs.__version__) >= Version("0.5"):
+ pytest.skip("S3FSWrapper no longer working for s3fs 0.5+")
+
+ fs, path = s3_example_s3fs
+ with pytest.warns(FutureWarning):
+ wrapper = S3FSWrapper(fs)
+ _partition_test_for_filesystem(wrapper, path)
+
+ # Check that we can auto-wrap
+ dataset = pq.ParquetDataset(
+ path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
+ )
+ dataset.read()
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
+ fs, path = s3_example_s3fs
+ _partition_test_for_filesystem(
+ fs, path, use_legacy_dataset=use_legacy_dataset
+ )
+
+
+def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
+ foo_keys = [0, 1]
+ bar_keys = ['a', 'b', 'c']
+ partition_spec = [
+ ['foo', foo_keys],
+ ['bar', bar_keys]
+ ]
+ N = 30
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'foo': np.array(foo_keys, dtype='i4').repeat(15),
+ 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'foo', 'bar', 'values'])
+
+ _generate_partition_directories(fs, base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(
+ base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset)
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ expected_df = (df.sort_values(by='index')
+ .reset_index(drop=True)
+ .reindex(columns=result_df.columns))
+
+ expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+ expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+
+ assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
+
+ tm.assert_frame_equal(result_df, expected_df)
+
+
+def _generate_partition_directories(fs, base_dir, partition_spec, df):
+ # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
+ # ['bar', ['a', 'b', 'c']]
+ # part_table : a pyarrow.Table to write to each partition
+ DEPTH = len(partition_spec)
+
+ pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))
+
+ def _visit_level(base_dir, level, part_keys):
+ name, values = partition_spec[level]
+ for value in values:
+ this_part_keys = part_keys + [(name, value)]
+
+ level_dir = pathsep.join([
+ str(base_dir),
+ '{}={}'.format(name, value)
+ ])
+ fs.mkdir(level_dir)
+
+ if level == DEPTH - 1:
+ # Generate example data
+ file_path = pathsep.join([level_dir, guid()])
+ filtered_df = _filter_partition(df, this_part_keys)
+ part_table = pa.Table.from_pandas(filtered_df)
+ with fs.open(file_path, 'wb') as f:
+ _write_table(part_table, f)
+ assert fs.exists(file_path)
+
+ file_success = pathsep.join([level_dir, '_SUCCESS'])
+ with fs.open(file_success, 'wb') as f:
+ pass
+ else:
+ _visit_level(level_dir, level + 1, this_part_keys)
+ file_success = pathsep.join([level_dir, '_SUCCESS'])
+ with fs.open(file_success, 'wb') as f:
+ pass
+
+ _visit_level(base_dir, 0, [])
+
+
+def _test_read_common_metadata_files(fs, base_path):
+ import pandas as pd
+
+ import pyarrow.parquet as pq
+
+ N = 100
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'values'])
+
+ base_path = str(base_path)
+ data_path = os.path.join(base_path, 'data.parquet')
+
+ table = pa.Table.from_pandas(df)
+
+ with fs.open(data_path, 'wb') as f:
+ _write_table(table, f)
+
+ metadata_path = os.path.join(base_path, '_common_metadata')
+ with fs.open(metadata_path, 'wb') as f:
+ pq.write_metadata(table.schema, f)
+
+ dataset = pq.ParquetDataset(base_path, filesystem=fs)
+ assert dataset.common_metadata_path == str(metadata_path)
+
+ with fs.open(data_path) as f:
+ common_schema = pq.read_metadata(f).schema
+ assert dataset.schema.equals(common_schema)
+
+ # handle list of one directory
+ dataset2 = pq.ParquetDataset([base_path], filesystem=fs)
+ assert dataset2.schema.equals(dataset.schema)
+
+
+@pytest.mark.pandas
+def test_read_common_metadata_files(tempdir):
+ fs = LocalFileSystem._get_instance()
+ _test_read_common_metadata_files(fs, tempdir)
+
+
+@pytest.mark.pandas
+def test_read_metadata_files(tempdir):
+ fs = LocalFileSystem._get_instance()
+
+ N = 100
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'values'])
+
+ data_path = tempdir / 'data.parquet'
+
+ table = pa.Table.from_pandas(df)
+
+ with fs.open(data_path, 'wb') as f:
+ _write_table(table, f)
+
+ metadata_path = tempdir / '_metadata'
+ with fs.open(metadata_path, 'wb') as f:
+ pq.write_metadata(table.schema, f)
+
+ dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+ assert dataset.metadata_path == str(metadata_path)
+
+ with fs.open(data_path) as f:
+ metadata_schema = pq.read_metadata(f).schema
+ assert dataset.schema.equals(metadata_schema)
+
+
+def _filter_partition(df, part_keys):
+ predicate = np.ones(len(df), dtype=bool)
+
+ to_drop = []
+ for name, value in part_keys:
+ to_drop.append(name)
+
+ # to avoid pandas warning
+ if isinstance(value, (datetime.date, datetime.datetime)):
+ value = pd.Timestamp(value)
+
+ predicate &= df[name] == value
+
+ return df[predicate].drop(to_drop, axis=1)
+
+
+@parametrize_legacy_dataset
+@pytest.mark.pandas
+def test_filter_before_validate_schema(tempdir, use_legacy_dataset):
+ # ARROW-4076 apply filter before schema validation
+ # to avoid checking unneeded schemas
+
+ # create partitioned dataset with mismatching schemas which would
+ # otherwise raise if first validation all schemas
+ dir1 = tempdir / 'A=0'
+ dir1.mkdir()
+ table1 = pa.Table.from_pandas(pd.DataFrame({'B': [1, 2, 3]}))
+ pq.write_table(table1, dir1 / 'data.parquet')
+
+ dir2 = tempdir / 'A=1'
+ dir2.mkdir()
+ table2 = pa.Table.from_pandas(pd.DataFrame({'B': ['a', 'b', 'c']}))
+ pq.write_table(table2, dir2 / 'data.parquet')
+
+ # read single file using filter
+ table = pq.read_table(tempdir, filters=[[('A', '==', 0)]],
+ use_legacy_dataset=use_legacy_dataset)
+ assert table.column('B').equals(pa.chunked_array([[1, 2, 3]]))
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_multiple_files(tempdir, use_legacy_dataset):
+ nfiles = 10
+ size = 5
+
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ test_data = []
+ paths = []
+ for i in range(nfiles):
+ df = _test_dataframe(size, seed=i)
+
+ # Hack so that we don't have a dtype cast in v1 files
+ df['uint32'] = df['uint32'].astype(np.int64)
+
+ path = dirpath / '{}.parquet'.format(i)
+
+ table = pa.Table.from_pandas(df)
+ _write_table(table, path)
+
+ test_data.append(table)
+ paths.append(path)
+
+ # Write a _SUCCESS.crc file
+ (dirpath / '_SUCCESS.crc').touch()
+
+ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
+ dataset = pq.ParquetDataset(
+ paths, use_legacy_dataset=use_legacy_dataset, **kwargs)
+ return dataset.read(columns=columns, use_threads=use_threads)
+
+ result = read_multiple_files(paths)
+ expected = pa.concat_tables(test_data)
+
+ assert result.equals(expected)
+
+ # Read with provided metadata
+ # TODO(dataset) specifying metadata not yet supported
+ metadata = pq.read_metadata(paths[0])
+ if use_legacy_dataset:
+ result2 = read_multiple_files(paths, metadata=metadata)
+ assert result2.equals(expected)
+
+ result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read()
+ assert result3.equals(expected)
+ else:
+ with pytest.raises(ValueError, match="no longer supported"):
+ pq.read_table(paths, metadata=metadata, use_legacy_dataset=False)
+
+ # Read column subset
+ to_read = [0, 2, 6, result.num_columns - 1]
+
+ col_names = [result.field(i).name for i in to_read]
+ out = pq.read_table(
+ dirpath, columns=col_names, use_legacy_dataset=use_legacy_dataset
+ )
+ expected = pa.Table.from_arrays([result.column(i) for i in to_read],
+ names=col_names,
+ metadata=result.schema.metadata)
+ assert out.equals(expected)
+
+ # Read with multiple threads
+ pq.read_table(
+ dirpath, use_threads=True, use_legacy_dataset=use_legacy_dataset
+ )
+
+ # Test failure modes with non-uniform metadata
+ bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
+ bad_apple_path = tempdir / '{}.parquet'.format(guid())
+
+ t = pa.Table.from_pandas(bad_apple)
+ _write_table(t, bad_apple_path)
+
+ if not use_legacy_dataset:
+ # TODO(dataset) Dataset API skips bad files
+ return
+
+ bad_meta = pq.read_metadata(bad_apple_path)
+
+ with pytest.raises(ValueError):
+ read_multiple_files(paths + [bad_apple_path])
+
+ with pytest.raises(ValueError):
+ read_multiple_files(paths, metadata=bad_meta)
+
+ mixed_paths = [bad_apple_path, paths[0]]
+
+ with pytest.raises(ValueError):
+ read_multiple_files(mixed_paths, schema=bad_meta.schema)
+
+ with pytest.raises(ValueError):
+ read_multiple_files(mixed_paths)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_read_pandas(tempdir, use_legacy_dataset):
+ nfiles = 5
+ size = 5
+
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ test_data = []
+ frames = []
+ paths = []
+ for i in range(nfiles):
+ df = _test_dataframe(size, seed=i)
+ df.index = np.arange(i * size, (i + 1) * size)
+ df.index.name = 'index'
+
+ path = dirpath / '{}.parquet'.format(i)
+
+ table = pa.Table.from_pandas(df)
+ _write_table(table, path)
+ test_data.append(table)
+ frames.append(df)
+ paths.append(path)
+
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+ columns = ['uint8', 'strings']
+ result = dataset.read_pandas(columns=columns).to_pandas()
+ expected = pd.concat([x[columns] for x in frames])
+
+ tm.assert_frame_equal(result, expected)
+
+ # also be able to pass the columns as a set (ARROW-12314)
+ result = dataset.read_pandas(columns=set(columns)).to_pandas()
+ assert result.shape == expected.shape
+ # column order can be different because of using a set
+ tm.assert_frame_equal(result.reindex(columns=expected.columns), expected)
+
+
+@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_memory_map(tempdir, use_legacy_dataset):
+ # ARROW-2627: Check that we can use ParquetDataset with memory-mapping
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ df = _test_dataframe(10, seed=0)
+ path = dirpath / '{}.parquet'.format(0)
+ table = pa.Table.from_pandas(df)
+ _write_table(table, path, version='2.6')
+
+ dataset = pq.ParquetDataset(
+ dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().equals(table)
+ if use_legacy_dataset:
+ assert dataset.pieces[0].read().equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ df = _test_dataframe(10, seed=0)
+ path = dirpath / '{}.parquet'.format(0)
+ table = pa.Table.from_pandas(df)
+ _write_table(table, path, version='2.6')
+
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(
+ dirpath, buffer_size=-64,
+ use_legacy_dataset=use_legacy_dataset)
+
+ for buffer_size in [128, 1024]:
+ dataset = pq.ParquetDataset(
+ dirpath, buffer_size=buffer_size,
+ use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().equals(table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset):
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ df = _test_dataframe(10, seed=0)
+ path = dirpath / '{}.parquet'.format(0)
+ table = pa.Table.from_pandas(df)
+ _write_table(table, path, version='2.6')
+
+ for pre_buffer in (True, False):
+ dataset = pq.ParquetDataset(
+ dirpath, pre_buffer=pre_buffer,
+ use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().equals(table)
+ actual = pq.read_table(dirpath, pre_buffer=pre_buffer,
+ use_legacy_dataset=use_legacy_dataset)
+ assert actual.equals(table)
+
+
+def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
+ test_data = []
+ paths = []
+ for i in range(nfiles):
+ df = _test_dataframe(file_nrows, seed=i)
+ path = base_path / '{}.parquet'.format(i)
+
+ test_data.append(_write_table(df, path))
+ paths.append(path)
+ return paths
+
+
+def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
+ if use_legacy_dataset:
+ assert set(map(str, paths)) == {x.path for x in dataset._pieces}
+ else:
+ paths = [str(path.as_posix()) for path in paths]
+ assert set(paths) == set(dataset._dataset.files)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dir_prefix', ['_', '.'])
+def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset):
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+ file_nrows=5)
+
+ # private directory
+ (dirpath / '{}staging'.format(dir_prefix)).mkdir()
+
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset):
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+ file_nrows=5)
+
+ with (dirpath / '.DS_Store').open('wb') as f:
+ f.write(b'gibberish')
+
+ with (dirpath / '.private').open('wb') as f:
+ f.write(b'gibberish')
+
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+ file_nrows=5)
+
+ with (dirpath / '_committed_123').open('wb') as f:
+ f.write(b'abcd')
+
+ with (dirpath / '_started_321').open('wb') as f:
+ f.write(b'abcd')
+
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+@pytest.mark.parametrize('dir_prefix', ['_', '.'])
+def test_ignore_no_private_directories_in_base_path(
+ tempdir, dir_prefix, use_legacy_dataset
+):
+ # ARROW-8427 - don't ignore explicitly listed files if parent directory
+ # is a private directory
+ dirpath = tempdir / "{0}data".format(dir_prefix) / guid()
+ dirpath.mkdir(parents=True)
+
+ paths = _make_example_multifile_dataset(dirpath, nfiles=10,
+ file_nrows=5)
+
+ dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+ # ARROW-9644 - don't ignore full directory with underscore in base path
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset_fixed
+def test_ignore_custom_prefixes(tempdir, use_legacy_dataset):
+ # ARROW-9573 - allow override of default ignore_prefixes
+ part = ["xxx"] * 3 + ["yyy"] * 3
+ table = pa.table([
+ pa.array(range(len(part))),
+ pa.array(part).dictionary_encode(),
+ ], names=['index', '_part'])
+
+ # TODO use_legacy_dataset ARROW-10247
+ pq.write_to_dataset(table, str(tempdir), partition_cols=['_part'])
+
+ private_duplicate = tempdir / '_private_duplicate'
+ private_duplicate.mkdir()
+ pq.write_to_dataset(table, str(private_duplicate),
+ partition_cols=['_part'])
+
+ read = pq.read_table(
+ tempdir, use_legacy_dataset=use_legacy_dataset,
+ ignore_prefixes=['_private'])
+
+ assert read.equals(table)
+
+
+@parametrize_legacy_dataset_fixed
+def test_empty_directory(tempdir, use_legacy_dataset):
+ # ARROW-5310 - reading empty directory
+ # fails with legacy implementation
+ empty_dir = tempdir / 'dataset'
+ empty_dir.mkdir()
+
+ dataset = pq.ParquetDataset(
+ empty_dir, use_legacy_dataset=use_legacy_dataset)
+ result = dataset.read()
+ assert result.num_rows == 0
+ assert result.num_columns == 0
+
+
+def _test_write_to_dataset_with_partitions(base_path,
+ use_legacy_dataset=True,
+ filesystem=None,
+ schema=None,
+ index_name=None):
+ import pandas as pd
+ import pandas.testing as tm
+
+ import pyarrow.parquet as pq
+
+ # ARROW-1400
+ output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
+ 'group2': list('eefeffgeee'),
+ 'num': list(range(10)),
+ 'nan': [np.nan] * 10,
+ 'date': np.arange('2017-01-01', '2017-01-11',
+ dtype='datetime64[D]')})
+ cols = output_df.columns.tolist()
+ partition_by = ['group1', 'group2']
+ output_table = pa.Table.from_pandas(output_df, schema=schema, safe=False,
+ preserve_index=False)
+ pq.write_to_dataset(output_table, base_path, partition_by,
+ filesystem=filesystem,
+ use_legacy_dataset=use_legacy_dataset)
+
+ metadata_path = os.path.join(str(base_path), '_common_metadata')
+
+ if filesystem is not None:
+ with filesystem.open(metadata_path, 'wb') as f:
+ pq.write_metadata(output_table.schema, f)
+ else:
+ pq.write_metadata(output_table.schema, metadata_path)
+
+ # ARROW-2891: Ensure the output_schema is preserved when writing a
+ # partitioned dataset
+ dataset = pq.ParquetDataset(base_path,
+ filesystem=filesystem,
+ validate_schema=True,
+ use_legacy_dataset=use_legacy_dataset)
+ # ARROW-2209: Ensure the dataset schema also includes the partition columns
+ if use_legacy_dataset:
+ dataset_cols = set(dataset.schema.to_arrow_schema().names)
+ else:
+ # NB schema property is an arrow and not parquet schema
+ dataset_cols = set(dataset.schema.names)
+
+ assert dataset_cols == set(output_table.schema.names)
+
+ input_table = dataset.read()
+ input_df = input_table.to_pandas()
+
+ # Read data back in and compare with original DataFrame
+ # Partitioned columns added to the end of the DataFrame when read
+ input_df_cols = input_df.columns.tolist()
+ assert partition_by == input_df_cols[-1 * len(partition_by):]
+
+ input_df = input_df[cols]
+ # Partitioned columns become 'categorical' dtypes
+ for col in partition_by:
+ output_df[col] = output_df[col].astype('category')
+ tm.assert_frame_equal(output_df, input_df)
+
+
+def _test_write_to_dataset_no_partitions(base_path,
+ use_legacy_dataset=True,
+ filesystem=None):
+ import pandas as pd
+
+ import pyarrow.parquet as pq
+
+ # ARROW-1400
+ output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
+ 'group2': list('eefeffgeee'),
+ 'num': list(range(10)),
+ 'date': np.arange('2017-01-01', '2017-01-11',
+ dtype='datetime64[D]')})
+ cols = output_df.columns.tolist()
+ output_table = pa.Table.from_pandas(output_df)
+
+ if filesystem is None:
+ filesystem = LocalFileSystem._get_instance()
+
+ # Without partitions, append files to root_path
+ n = 5
+ for i in range(n):
+ pq.write_to_dataset(output_table, base_path,
+ filesystem=filesystem)
+ output_files = [file for file in filesystem.ls(str(base_path))
+ if file.endswith(".parquet")]
+ assert len(output_files) == n
+
+ # Deduplicated incoming DataFrame should match
+ # original outgoing Dataframe
+ input_table = pq.ParquetDataset(
+ base_path, filesystem=filesystem,
+ use_legacy_dataset=use_legacy_dataset
+ ).read()
+ input_df = input_table.to_pandas()
+ input_df = input_df.drop_duplicates()
+ input_df = input_df[cols]
+ assert output_df.equals(input_df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset):
+ _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_schema(
+ tempdir, use_legacy_dataset
+):
+ schema = pa.schema([pa.field('group1', type=pa.string()),
+ pa.field('group2', type=pa.string()),
+ pa.field('num', type=pa.int64()),
+ pa.field('nan', type=pa.int32()),
+ pa.field('date', type=pa.timestamp(unit='us'))])
+ _test_write_to_dataset_with_partitions(
+ str(tempdir), use_legacy_dataset, schema=schema)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_and_index_name(
+ tempdir, use_legacy_dataset
+):
+ _test_write_to_dataset_with_partitions(
+ str(tempdir), use_legacy_dataset, index_name='index_name')
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset):
+ _test_write_to_dataset_no_partitions(str(tempdir), use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset):
+ _test_write_to_dataset_with_partitions(
+ tempdir / "test1", use_legacy_dataset)
+ _test_write_to_dataset_no_partitions(
+ tempdir / "test2", use_legacy_dataset)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_write_to_dataset_pathlib_nonlocal(
+ tempdir, s3_example_s3fs, use_legacy_dataset
+):
+ # pathlib paths are only accepted for local files
+ fs, _ = s3_example_s3fs
+
+ with pytest.raises(TypeError, match="path-like objects are only allowed"):
+ _test_write_to_dataset_with_partitions(
+ tempdir / "test1", use_legacy_dataset, filesystem=fs)
+
+ with pytest.raises(TypeError, match="path-like objects are only allowed"):
+ _test_write_to_dataset_no_partitions(
+ tempdir / "test2", use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_write_to_dataset_with_partitions_s3fs(
+ s3_example_s3fs, use_legacy_dataset
+):
+ fs, path = s3_example_s3fs
+
+ _test_write_to_dataset_with_partitions(
+ path, use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+@parametrize_legacy_dataset
+def test_write_to_dataset_no_partitions_s3fs(
+ s3_example_s3fs, use_legacy_dataset
+):
+ fs, path = s3_example_s3fs
+
+ _test_write_to_dataset_no_partitions(
+ path, use_legacy_dataset, filesystem=fs)
+
+
+@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
+@pytest.mark.pandas
+@parametrize_legacy_dataset_not_supported
+def test_write_to_dataset_with_partitions_and_custom_filenames(
+ tempdir, use_legacy_dataset
+):
+ output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
+ 'group2': list('eefeffgeee'),
+ 'num': list(range(10)),
+ 'nan': [np.nan] * 10,
+ 'date': np.arange('2017-01-01', '2017-01-11',
+ dtype='datetime64[D]')})
+ partition_by = ['group1', 'group2']
+ output_table = pa.Table.from_pandas(output_df)
+ path = str(tempdir)
+
+ def partition_filename_callback(keys):
+ return "{}-{}.parquet".format(*keys)
+
+ pq.write_to_dataset(output_table, path,
+ partition_by, partition_filename_callback,
+ use_legacy_dataset=use_legacy_dataset)
+
+ dataset = pq.ParquetDataset(path)
+
+ # ARROW-3538: Ensure partition filenames match the given pattern
+ # defined in the local function partition_filename_callback
+ expected_basenames = [
+ 'a-e.parquet', 'a-f.parquet',
+ 'b-e.parquet', 'b-f.parquet',
+ 'b-g.parquet', 'c-e.parquet'
+ ]
+ output_basenames = [os.path.basename(p.path) for p in dataset.pieces]
+
+ assert sorted(expected_basenames) == sorted(output_basenames)
+
+
+@pytest.mark.dataset
+@pytest.mark.pandas
+def test_write_to_dataset_filesystem(tempdir):
+ df = pd.DataFrame({'A': [1, 2, 3]})
+ table = pa.Table.from_pandas(df)
+ path = str(tempdir)
+
+ pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem())
+ result = pq.read_table(path)
+ assert result.equals(table)
+
+
+# TODO(dataset) support pickling
+def _make_dataset_for_pickling(tempdir, N=100):
+ path = tempdir / 'data.parquet'
+ fs = LocalFileSystem._get_instance()
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'values'])
+ table = pa.Table.from_pandas(df)
+
+ num_groups = 3
+ with pq.ParquetWriter(path, table.schema) as writer:
+ for i in range(num_groups):
+ writer.write_table(table)
+
+ reader = pq.ParquetFile(path)
+ assert reader.metadata.num_row_groups == num_groups
+
+ metadata_path = tempdir / '_metadata'
+ with fs.open(metadata_path, 'wb') as f:
+ pq.write_metadata(table.schema, f)
+
+ dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+ assert dataset.metadata_path == str(metadata_path)
+
+ return dataset
+
+
+def _assert_dataset_is_picklable(dataset, pickler):
+ def is_pickleable(obj):
+ return obj == pickler.loads(pickler.dumps(obj))
+
+ assert is_pickleable(dataset)
+ assert is_pickleable(dataset.metadata)
+ assert is_pickleable(dataset.metadata.schema)
+ assert len(dataset.metadata.schema)
+ for column in dataset.metadata.schema:
+ assert is_pickleable(column)
+
+ for piece in dataset._pieces:
+ assert is_pickleable(piece)
+ metadata = piece.get_metadata()
+ assert metadata.num_row_groups
+ for i in range(metadata.num_row_groups):
+ assert is_pickleable(metadata.row_group(i))
+
+
+@pytest.mark.pandas
+def test_builtin_pickle_dataset(tempdir, datadir):
+ import pickle
+ dataset = _make_dataset_for_pickling(tempdir)
+ _assert_dataset_is_picklable(dataset, pickler=pickle)
+
+
+@pytest.mark.pandas
+def test_cloudpickle_dataset(tempdir, datadir):
+ cp = pytest.importorskip('cloudpickle')
+ dataset = _make_dataset_for_pickling(tempdir)
+ _assert_dataset_is_picklable(dataset, pickler=cp)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_partitioned_dataset(tempdir, use_legacy_dataset):
+ # ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset
+ # to a Parquet file
+ path = tempdir / "ARROW-3208"
+ df = pd.DataFrame({
+ 'one': [-1, 10, 2.5, 100, 1000, 1, 29.2],
+ 'two': [-1, 10, 2, 100, 1000, 1, 11],
+ 'three': [0, 0, 0, 0, 0, 0, 0]
+ })
+ table = pa.Table.from_pandas(df)
+ pq.write_to_dataset(table, root_path=str(path),
+ partition_cols=['one', 'two'])
+ table = pq.ParquetDataset(
+ path, use_legacy_dataset=use_legacy_dataset).read()
+ pq.write_table(table, path / "output.parquet")
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_read_dictionary(tempdir, use_legacy_dataset):
+ path = tempdir / "ARROW-3325-dataset"
+ t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
+ t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
+ # TODO pass use_legacy_dataset (need to fix unique names)
+ pq.write_to_dataset(t1, root_path=str(path))
+ pq.write_to_dataset(t2, root_path=str(path))
+
+ result = pq.ParquetDataset(
+ path, read_dictionary=['f0'],
+ use_legacy_dataset=use_legacy_dataset).read()
+
+ # The order of the chunks is non-deterministic
+ ex_chunks = [t1[0].chunk(0).dictionary_encode(),
+ t2[0].chunk(0).dictionary_encode()]
+
+ assert result[0].num_chunks == 2
+ c0, c1 = result[0].chunk(0), result[0].chunk(1)
+ if c0.equals(ex_chunks[0]):
+ assert c1.equals(ex_chunks[1])
+ else:
+ assert c0.equals(ex_chunks[1])
+ assert c1.equals(ex_chunks[0])
+
+
+@pytest.mark.dataset
+def test_dataset_unsupported_keywords():
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([]))
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([]))
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, validate_schema=False)
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, split_row_groups=True)
+
+ with pytest.raises(ValueError, match="not yet supported with the new"):
+ pq.ParquetDataset("", use_legacy_dataset=False, metadata_nthreads=4)
+
+ with pytest.raises(ValueError, match="no longer supported"):
+ pq.read_table("", use_legacy_dataset=False, metadata=pa.schema([]))
+
+
+@pytest.mark.dataset
+def test_dataset_partitioning(tempdir):
+ import pyarrow.dataset as ds
+
+ # create small dataset with directory partitioning
+ root_path = tempdir / "test_partitioning"
+ (root_path / "2012" / "10" / "01").mkdir(parents=True)
+
+ table = pa.table({'a': [1, 2, 3]})
+ pq.write_table(
+ table, str(root_path / "2012" / "10" / "01" / "data.parquet"))
+
+ # This works with new dataset API
+
+ # read_table
+ part = ds.partitioning(field_names=["year", "month", "day"])
+ result = pq.read_table(
+ str(root_path), partitioning=part, use_legacy_dataset=False)
+ assert result.column_names == ["a", "year", "month", "day"]
+
+ result = pq.ParquetDataset(
+ str(root_path), partitioning=part, use_legacy_dataset=False).read()
+ assert result.column_names == ["a", "year", "month", "day"]
+
+ # This raises an error for legacy dataset
+ with pytest.raises(ValueError):
+ pq.read_table(
+ str(root_path), partitioning=part, use_legacy_dataset=True)
+
+ with pytest.raises(ValueError):
+ pq.ParquetDataset(
+ str(root_path), partitioning=part, use_legacy_dataset=True)
+
+
+@pytest.mark.dataset
+def test_parquet_dataset_new_filesystem(tempdir):
+ # Ensure we can pass new FileSystem object to ParquetDataset
+ # (use new implementation automatically without specifying
+ # use_legacy_dataset=False)
+ table = pa.table({'a': [1, 2, 3]})
+ pq.write_table(table, tempdir / 'data.parquet')
+ # don't use simple LocalFileSystem (as that gets mapped to legacy one)
+ filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem())
+ dataset = pq.ParquetDataset('.', filesystem=filesystem)
+ result = dataset.read()
+ assert result.equals(table)
+
+
+@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
+def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir):
+ # ARROW-10462 ensure that on Windows we properly use posix-style paths
+ # as used by fsspec
+ fsspec = pytest.importorskip("fsspec")
+ filesystem = fsspec.filesystem('file')
+ table = pa.table({'a': [1, 2, 3]})
+ pq.write_table(table, tempdir / 'data.parquet')
+
+ # pass a posix-style path (using "/" also on Windows)
+ path = str(tempdir).replace("\\", "/")
+ dataset = pq.ParquetDataset(path, filesystem=filesystem)
+ # ensure the piece path is also posix-style
+ expected = path + "/data.parquet"
+ assert dataset.pieces[0].path == expected
+
+
+@pytest.mark.dataset
+def test_parquet_dataset_deprecated_properties(tempdir):
+ table = pa.table({'a': [1, 2, 3]})
+ path = tempdir / 'data.parquet'
+ pq.write_table(table, path)
+ dataset = pq.ParquetDataset(path)
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
+ dataset.pieces
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.partitions"):
+ dataset.partitions
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.memory_map"):
+ dataset.memory_map
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.read_dictio"):
+ dataset.read_dictionary
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.buffer_size"):
+ dataset.buffer_size
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"):
+ dataset.fs
+
+ dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False)
+
+ with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
+ dataset2.pieces
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_datetime.py b/src/arrow/python/pyarrow/tests/parquet/test_datetime.py
new file mode 100644
index 000000000..f39f1c762
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_datetime.py
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import io
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.tests.parquet.common import (
+ _check_roundtrip, parametrize_legacy_dataset)
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import _read_table, _write_table
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe
+except ImportError:
+ pd = tm = None
+
+
+pytestmark = pytest.mark.parquet
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_datetime_tz(use_legacy_dataset):
+ s = pd.Series([datetime.datetime(2017, 9, 6)])
+ s = s.dt.tz_localize('utc')
+
+ s.index = s
+
+ # Both a column and an index to hit both use cases
+ df = pd.DataFrame({'tz_aware': s,
+ 'tz_eastern': s.dt.tz_convert('US/Eastern')},
+ index=s)
+
+ f = io.BytesIO()
+
+ arrow_table = pa.Table.from_pandas(df)
+
+ _write_table(arrow_table, f, coerce_timestamps='ms')
+ f.seek(0)
+
+ table_read = pq.read_pandas(f, use_legacy_dataset=use_legacy_dataset)
+
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_datetime_timezone_tzinfo(use_legacy_dataset):
+ value = datetime.datetime(2018, 1, 1, 1, 23, 45,
+ tzinfo=datetime.timezone.utc)
+ df = pd.DataFrame({'foo': [value]})
+
+ _roundtrip_pandas_dataframe(
+ df, write_kwargs={}, use_legacy_dataset=use_legacy_dataset)
+
+
+@pytest.mark.pandas
+def test_coerce_timestamps(tempdir):
+ from collections import OrderedDict
+
+ # ARROW-622
+ arrays = OrderedDict()
+ fields = [pa.field('datetime64',
+ pa.list_(pa.timestamp('ms')))]
+ arrays['datetime64'] = [
+ np.array(['2007-07-13T01:23:34.123456789',
+ None,
+ '2010-08-13T05:46:57.437699912'],
+ dtype='datetime64[ms]'),
+ None,
+ None,
+ np.array(['2007-07-13T02',
+ None,
+ '2010-08-13T05:46:57.437699912'],
+ dtype='datetime64[ms]'),
+ ]
+
+ df = pd.DataFrame(arrays)
+ schema = pa.schema(fields)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df, schema=schema)
+
+ _write_table(arrow_table, filename, version='2.6', coerce_timestamps='us')
+ table_read = _read_table(filename)
+ df_read = table_read.to_pandas()
+
+ df_expected = df.copy()
+ for i, x in enumerate(df_expected['datetime64']):
+ if isinstance(x, np.ndarray):
+ df_expected['datetime64'][i] = x.astype('M8[us]')
+
+ tm.assert_frame_equal(df_expected, df_read)
+
+ with pytest.raises(ValueError):
+ _write_table(arrow_table, filename, version='2.6',
+ coerce_timestamps='unknown')
+
+
+@pytest.mark.pandas
+def test_coerce_timestamps_truncated(tempdir):
+ """
+ ARROW-2555: Test that we can truncate timestamps when coercing if
+ explicitly allowed.
+ """
+ dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
+ second=1, microsecond=1)
+ dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
+ second=1)
+
+ fields_us = [pa.field('datetime64', pa.timestamp('us'))]
+ arrays_us = {'datetime64': [dt_us, dt_ms]}
+
+ df_us = pd.DataFrame(arrays_us)
+ schema_us = pa.schema(fields_us)
+
+ filename = tempdir / 'pandas_truncated.parquet'
+ table_us = pa.Table.from_pandas(df_us, schema=schema_us)
+
+ _write_table(table_us, filename, version='2.6', coerce_timestamps='ms',
+ allow_truncated_timestamps=True)
+ table_ms = _read_table(filename)
+ df_ms = table_ms.to_pandas()
+
+ arrays_expected = {'datetime64': [dt_ms, dt_ms]}
+ df_expected = pd.DataFrame(arrays_expected)
+ tm.assert_frame_equal(df_expected, df_ms)
+
+
+@pytest.mark.pandas
+def test_date_time_types(tempdir):
+ t1 = pa.date32()
+ data1 = np.array([17259, 17260, 17261], dtype='int32')
+ a1 = pa.array(data1, type=t1)
+
+ t2 = pa.date64()
+ data2 = data1.astype('int64') * 86400000
+ a2 = pa.array(data2, type=t2)
+
+ t3 = pa.timestamp('us')
+ start = pd.Timestamp('2001-01-01').value / 1000
+ data3 = np.array([start, start + 1, start + 2], dtype='int64')
+ a3 = pa.array(data3, type=t3)
+
+ t4 = pa.time32('ms')
+ data4 = np.arange(3, dtype='i4')
+ a4 = pa.array(data4, type=t4)
+
+ t5 = pa.time64('us')
+ a5 = pa.array(data4.astype('int64'), type=t5)
+
+ t6 = pa.time32('s')
+ a6 = pa.array(data4, type=t6)
+
+ ex_t6 = pa.time32('ms')
+ ex_a6 = pa.array(data4 * 1000, type=ex_t6)
+
+ t7 = pa.timestamp('ns')
+ start = pd.Timestamp('2001-01-01').value
+ data7 = np.array([start, start + 1000, start + 2000],
+ dtype='int64')
+ a7 = pa.array(data7, type=t7)
+
+ table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7],
+ ['date32', 'date64', 'timestamp[us]',
+ 'time32[s]', 'time64[us]',
+ 'time32_from64[s]',
+ 'timestamp[ns]'])
+
+ # date64 as date32
+ # time32[s] to time32[ms]
+ expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
+ ['date32', 'date64', 'timestamp[us]',
+ 'time32[s]', 'time64[us]',
+ 'time32_from64[s]',
+ 'timestamp[ns]'])
+
+ _check_roundtrip(table, expected=expected, version='2.6')
+
+ t0 = pa.timestamp('ms')
+ data0 = np.arange(4, dtype='int64')
+ a0 = pa.array(data0, type=t0)
+
+ t1 = pa.timestamp('us')
+ data1 = np.arange(4, dtype='int64')
+ a1 = pa.array(data1, type=t1)
+
+ t2 = pa.timestamp('ns')
+ data2 = np.arange(4, dtype='int64')
+ a2 = pa.array(data2, type=t2)
+
+ table = pa.Table.from_arrays([a0, a1, a2],
+ ['ts[ms]', 'ts[us]', 'ts[ns]'])
+ expected = pa.Table.from_arrays([a0, a1, a2],
+ ['ts[ms]', 'ts[us]', 'ts[ns]'])
+
+ # int64 for all timestamps supported by default
+ filename = tempdir / 'int64_timestamps.parquet'
+ _write_table(table, filename, version='2.6')
+ parquet_schema = pq.ParquetFile(filename).schema
+ for i in range(3):
+ assert parquet_schema.column(i).physical_type == 'INT64'
+ read_table = _read_table(filename)
+ assert read_table.equals(expected)
+
+ t0_ns = pa.timestamp('ns')
+ data0_ns = np.array(data0 * 1000000, dtype='int64')
+ a0_ns = pa.array(data0_ns, type=t0_ns)
+
+ t1_ns = pa.timestamp('ns')
+ data1_ns = np.array(data1 * 1000, dtype='int64')
+ a1_ns = pa.array(data1_ns, type=t1_ns)
+
+ expected = pa.Table.from_arrays([a0_ns, a1_ns, a2],
+ ['ts[ms]', 'ts[us]', 'ts[ns]'])
+
+ # int96 nanosecond timestamps produced upon request
+ filename = tempdir / 'explicit_int96_timestamps.parquet'
+ _write_table(table, filename, version='2.6',
+ use_deprecated_int96_timestamps=True)
+ parquet_schema = pq.ParquetFile(filename).schema
+ for i in range(3):
+ assert parquet_schema.column(i).physical_type == 'INT96'
+ read_table = _read_table(filename)
+ assert read_table.equals(expected)
+
+ # int96 nanosecond timestamps implied by flavor 'spark'
+ filename = tempdir / 'spark_int96_timestamps.parquet'
+ _write_table(table, filename, version='2.6',
+ flavor='spark')
+ parquet_schema = pq.ParquetFile(filename).schema
+ for i in range(3):
+ assert parquet_schema.column(i).physical_type == 'INT96'
+ read_table = _read_table(filename)
+ assert read_table.equals(expected)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('unit', ['s', 'ms', 'us', 'ns'])
+def test_coerce_int96_timestamp_unit(unit):
+ i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000
+
+ d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
+ d_ms = d_s * 1000
+ d_us = d_ms * 1000
+ d_ns = d_us * 1000
+
+ a_s = pa.array(d_s, type=pa.timestamp('s'))
+ a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
+ a_us = pa.array(d_us, type=pa.timestamp('us'))
+ a_ns = pa.array(d_ns, type=pa.timestamp('ns'))
+
+ arrays = {"s": a_s, "ms": a_ms, "us": a_us, "ns": a_ns}
+ names = ['ts_s', 'ts_ms', 'ts_us', 'ts_ns']
+ table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)
+
+ # For either Parquet version, coercing to nanoseconds is allowed
+ # if Int96 storage is used
+ expected = pa.Table.from_arrays([arrays.get(unit)]*4, names)
+ read_table_kwargs = {"coerce_int96_timestamp_unit": unit}
+ _check_roundtrip(table, expected,
+ read_table_kwargs=read_table_kwargs,
+ use_deprecated_int96_timestamps=True)
+ _check_roundtrip(table, expected, version='2.6',
+ read_table_kwargs=read_table_kwargs,
+ use_deprecated_int96_timestamps=True)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('pq_reader_method', ['ParquetFile', 'read_table'])
+def test_coerce_int96_timestamp_overflow(pq_reader_method, tempdir):
+
+ def get_table(pq_reader_method, filename, **kwargs):
+ if pq_reader_method == "ParquetFile":
+ return pq.ParquetFile(filename, **kwargs).read()
+ elif pq_reader_method == "read_table":
+ return pq.read_table(filename, **kwargs)
+
+ # Recreating the initial JIRA issue referrenced in ARROW-12096
+ oob_dts = [
+ datetime.datetime(1000, 1, 1),
+ datetime.datetime(2000, 1, 1),
+ datetime.datetime(3000, 1, 1)
+ ]
+ df = pd.DataFrame({"a": oob_dts})
+ table = pa.table(df)
+
+ filename = tempdir / "test_round_trip_overflow.parquet"
+ pq.write_table(table, filename, use_deprecated_int96_timestamps=True,
+ version="1.0")
+
+ # with the default resolution of ns, we get wrong values for INT96
+ # that are out of bounds for nanosecond range
+ tab_error = get_table(pq_reader_method, filename)
+ assert tab_error["a"].to_pylist() != oob_dts
+
+ # avoid this overflow by specifying the resolution to use for INT96 values
+ tab_correct = get_table(
+ pq_reader_method, filename, coerce_int96_timestamp_unit="s"
+ )
+ df_correct = tab_correct.to_pandas(timestamp_as_object=True)
+ tm.assert_frame_equal(df, df_correct)
+
+
+def test_timestamp_restore_timezone():
+ # ARROW-5888, restore timezone from serialized metadata
+ ty = pa.timestamp('ms', tz='America/New_York')
+ arr = pa.array([1, 2, 3], type=ty)
+ t = pa.table([arr], names=['f0'])
+ _check_roundtrip(t)
+
+
+def test_timestamp_restore_timezone_nanosecond():
+ # ARROW-9634, also restore timezone for nanosecond data that get stored
+ # as microseconds in the parquet file
+ ty = pa.timestamp('ns', tz='America/New_York')
+ arr = pa.array([1000, 2000, 3000], type=ty)
+ table = pa.table([arr], names=['f0'])
+ ty_us = pa.timestamp('us', tz='America/New_York')
+ expected = pa.table([arr.cast(ty_us)], names=['f0'])
+ _check_roundtrip(table, expected=expected)
+
+
+@pytest.mark.pandas
+def test_list_of_datetime_time_roundtrip():
+ # ARROW-4135
+ times = pd.to_datetime(['09:00', '09:30', '10:00', '10:30', '11:00',
+ '11:30', '12:00'])
+ df = pd.DataFrame({'time': [times.time]})
+ _roundtrip_pandas_dataframe(df, write_kwargs={})
+
+
+@pytest.mark.pandas
+def test_parquet_version_timestamp_differences():
+ i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000
+
+ d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
+ d_ms = d_s * 1000
+ d_us = d_ms * 1000
+ d_ns = d_us * 1000
+
+ a_s = pa.array(d_s, type=pa.timestamp('s'))
+ a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
+ a_us = pa.array(d_us, type=pa.timestamp('us'))
+ a_ns = pa.array(d_ns, type=pa.timestamp('ns'))
+
+ names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns']
+ table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)
+
+ # Using Parquet version 1.0, seconds should be coerced to milliseconds
+ # and nanoseconds should be coerced to microseconds by default
+ expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names)
+ _check_roundtrip(table, expected)
+
+ # Using Parquet version 2.0, seconds should be coerced to milliseconds
+ # and nanoseconds should be retained by default
+ expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names)
+ _check_roundtrip(table, expected, version='2.6')
+
+ # Using Parquet version 1.0, coercing to milliseconds or microseconds
+ # is allowed
+ expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names)
+ _check_roundtrip(table, expected, coerce_timestamps='ms')
+
+ # Using Parquet version 2.0, coercing to milliseconds or microseconds
+ # is allowed
+ expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names)
+ _check_roundtrip(table, expected, version='2.6', coerce_timestamps='us')
+
+ # TODO: after pyarrow allows coerce_timestamps='ns', tests like the
+ # following should pass ...
+
+ # Using Parquet version 1.0, coercing to nanoseconds is not allowed
+ # expected = None
+ # with pytest.raises(NotImplementedError):
+ # _roundtrip_table(table, coerce_timestamps='ns')
+
+ # Using Parquet version 2.0, coercing to nanoseconds is allowed
+ # expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
+ # _check_roundtrip(table, expected, version='2.6', coerce_timestamps='ns')
+
+ # For either Parquet version, coercing to nanoseconds is allowed
+ # if Int96 storage is used
+ expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
+ _check_roundtrip(table, expected,
+ use_deprecated_int96_timestamps=True)
+ _check_roundtrip(table, expected, version='2.6',
+ use_deprecated_int96_timestamps=True)
+
+
+@pytest.mark.pandas
+def test_noncoerced_nanoseconds_written_without_exception(tempdir):
+ # ARROW-1957: the Parquet version 2.0 writer preserves Arrow
+ # nanosecond timestamps by default
+ n = 9
+ df = pd.DataFrame({'x': range(n)},
+ index=pd.date_range('2017-01-01', freq='1n', periods=n))
+ tb = pa.Table.from_pandas(df)
+
+ filename = tempdir / 'written.parquet'
+ try:
+ pq.write_table(tb, filename, version='2.6')
+ except Exception:
+ pass
+ assert filename.exists()
+
+ recovered_table = pq.read_table(filename)
+ assert tb.equals(recovered_table)
+
+ # Loss of data through coercion (without explicit override) still an error
+ filename = tempdir / 'not_written.parquet'
+ with pytest.raises(ValueError):
+ pq.write_table(tb, filename, coerce_timestamps='ms', version='2.6')
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_metadata.py b/src/arrow/python/pyarrow/tests/parquet/test_metadata.py
new file mode 100644
index 000000000..9fa2f6394
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_metadata.py
@@ -0,0 +1,524 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+from collections import OrderedDict
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import _write_table
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+ pd = tm = None
+
+
+pytestmark = pytest.mark.parquet
+
+
+@pytest.mark.pandas
+def test_parquet_metadata_api():
+ df = alltypes_sample(size=10000)
+ df = df.reindex(columns=sorted(df.columns))
+ df.index = np.random.randint(0, 1000000, size=len(df))
+
+ fileh = make_sample_file(df)
+ ncols = len(df.columns)
+
+ # Series of sniff tests
+ meta = fileh.metadata
+ repr(meta)
+ assert meta.num_rows == len(df)
+ assert meta.num_columns == ncols + 1 # +1 for index
+ assert meta.num_row_groups == 1
+ assert meta.format_version == '2.6'
+ assert 'parquet-cpp' in meta.created_by
+ assert isinstance(meta.serialized_size, int)
+ assert isinstance(meta.metadata, dict)
+
+ # Schema
+ schema = fileh.schema
+ assert meta.schema is schema
+ assert len(schema) == ncols + 1 # +1 for index
+ repr(schema)
+
+ col = schema[0]
+ repr(col)
+ assert col.name == df.columns[0]
+ assert col.max_definition_level == 1
+ assert col.max_repetition_level == 0
+ assert col.max_repetition_level == 0
+
+ assert col.physical_type == 'BOOLEAN'
+ assert col.converted_type == 'NONE'
+
+ with pytest.raises(IndexError):
+ schema[ncols + 1] # +1 for index
+
+ with pytest.raises(IndexError):
+ schema[-1]
+
+ # Row group
+ for rg in range(meta.num_row_groups):
+ rg_meta = meta.row_group(rg)
+ assert isinstance(rg_meta, pq.RowGroupMetaData)
+ repr(rg_meta)
+
+ for col in range(rg_meta.num_columns):
+ col_meta = rg_meta.column(col)
+ assert isinstance(col_meta, pq.ColumnChunkMetaData)
+ repr(col_meta)
+
+ with pytest.raises(IndexError):
+ meta.row_group(-1)
+
+ with pytest.raises(IndexError):
+ meta.row_group(meta.num_row_groups + 1)
+
+ rg_meta = meta.row_group(0)
+ assert rg_meta.num_rows == len(df)
+ assert rg_meta.num_columns == ncols + 1 # +1 for index
+ assert rg_meta.total_byte_size > 0
+
+ with pytest.raises(IndexError):
+ col_meta = rg_meta.column(-1)
+
+ with pytest.raises(IndexError):
+ col_meta = rg_meta.column(ncols + 2)
+
+ col_meta = rg_meta.column(0)
+ assert col_meta.file_offset > 0
+ assert col_meta.file_path == '' # created from BytesIO
+ assert col_meta.physical_type == 'BOOLEAN'
+ assert col_meta.num_values == 10000
+ assert col_meta.path_in_schema == 'bool'
+ assert col_meta.is_stats_set is True
+ assert isinstance(col_meta.statistics, pq.Statistics)
+ assert col_meta.compression == 'SNAPPY'
+ assert col_meta.encodings == ('PLAIN', 'RLE')
+ assert col_meta.has_dictionary_page is False
+ assert col_meta.dictionary_page_offset is None
+ assert col_meta.data_page_offset > 0
+ assert col_meta.total_compressed_size > 0
+ assert col_meta.total_uncompressed_size > 0
+ with pytest.raises(NotImplementedError):
+ col_meta.has_index_page
+ with pytest.raises(NotImplementedError):
+ col_meta.index_page_offset
+
+
+def test_parquet_metadata_lifetime(tempdir):
+ # ARROW-6642 - ensure that chained access keeps parent objects alive
+ table = pa.table({'a': [1, 2, 3]})
+ pq.write_table(table, tempdir / 'test_metadata_segfault.parquet')
+ parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet')
+ parquet_file.metadata.row_group(0).column(0).statistics
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize(
+ (
+ 'data',
+ 'type',
+ 'physical_type',
+ 'min_value',
+ 'max_value',
+ 'null_count',
+ 'num_values',
+ 'distinct_count'
+ ),
+ [
+ ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0),
+ ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0),
+ ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0),
+ ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0),
+ ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0),
+ ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0),
+ ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0),
+ ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0),
+ (
+ [-1.1, 2.2, 2.3, None, 4.4], pa.float32(),
+ 'FLOAT', -1.1, 4.4, 1, 4, 0
+ ),
+ (
+ [-1.1, 2.2, 2.3, None, 4.4], pa.float64(),
+ 'DOUBLE', -1.1, 4.4, 1, 4, 0
+ ),
+ (
+ ['', 'b', chr(1000), None, 'aaa'], pa.binary(),
+ 'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, 0
+ ),
+ (
+ [True, False, False, True, True], pa.bool_(),
+ 'BOOLEAN', False, True, 0, 5, 0
+ ),
+ (
+ [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(),
+ 'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0
+ ),
+ ]
+)
+def test_parquet_column_statistics_api(data, type, physical_type, min_value,
+ max_value, null_count, num_values,
+ distinct_count):
+ df = pd.DataFrame({'data': data})
+ schema = pa.schema([pa.field('data', type)])
+ table = pa.Table.from_pandas(df, schema=schema, safe=False)
+ fileh = make_sample_file(table)
+
+ meta = fileh.metadata
+
+ rg_meta = meta.row_group(0)
+ col_meta = rg_meta.column(0)
+
+ stat = col_meta.statistics
+ assert stat.has_min_max
+ assert _close(type, stat.min, min_value)
+ assert _close(type, stat.max, max_value)
+ assert stat.null_count == null_count
+ assert stat.num_values == num_values
+ # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount
+ # method, missing distinct_count is represented as zero instead of None
+ assert stat.distinct_count == distinct_count
+ assert stat.physical_type == physical_type
+
+
+def _close(type, left, right):
+ if type == pa.float32():
+ return abs(left - right) < 1E-7
+ elif type == pa.float64():
+ return abs(left - right) < 1E-13
+ else:
+ return left == right
+
+
+# ARROW-6339
+@pytest.mark.pandas
+def test_parquet_raise_on_unset_statistics():
+ df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")})
+ meta = make_sample_file(pa.Table.from_pandas(df)).metadata
+
+ assert not meta.row_group(0).column(0).statistics.has_min_max
+ assert meta.row_group(0).column(0).statistics.max is None
+
+
+def test_statistics_convert_logical_types(tempdir):
+ # ARROW-5166, ARROW-4139
+
+ # (min, max, type)
+ cases = [(10, 11164359321221007157, pa.uint64()),
+ (10, 4294967295, pa.uint32()),
+ ("ähnlich", "öffentlich", pa.utf8()),
+ (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
+ pa.time32('ms')),
+ (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
+ pa.time64('us')),
+ (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
+ datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
+ pa.timestamp('ms')),
+ (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
+ datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
+ pa.timestamp('us'))]
+
+ for i, (min_val, max_val, typ) in enumerate(cases):
+ t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)],
+ ['col'])
+ path = str(tempdir / ('example{}.parquet'.format(i)))
+ pq.write_table(t, path, version='2.6')
+ pf = pq.ParquetFile(path)
+ stats = pf.metadata.row_group(0).column(0).statistics
+ assert stats.min == min_val
+ assert stats.max == max_val
+
+
+def test_parquet_write_disable_statistics(tempdir):
+ table = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c']))
+ ])
+ )
+ _write_table(table, tempdir / 'data.parquet')
+ meta = pq.read_metadata(tempdir / 'data.parquet')
+ for col in [0, 1]:
+ cc = meta.row_group(0).column(col)
+ assert cc.is_stats_set is True
+ assert cc.statistics is not None
+
+ _write_table(table, tempdir / 'data2.parquet', write_statistics=False)
+ meta = pq.read_metadata(tempdir / 'data2.parquet')
+ for col in [0, 1]:
+ cc = meta.row_group(0).column(col)
+ assert cc.is_stats_set is False
+ assert cc.statistics is None
+
+ _write_table(table, tempdir / 'data3.parquet', write_statistics=['a'])
+ meta = pq.read_metadata(tempdir / 'data3.parquet')
+ cc_a = meta.row_group(0).column(0)
+ cc_b = meta.row_group(0).column(1)
+ assert cc_a.is_stats_set is True
+ assert cc_b.is_stats_set is False
+ assert cc_a.statistics is not None
+ assert cc_b.statistics is None
+
+
+def test_field_id_metadata():
+ # ARROW-7080
+ field_id = b'PARQUET:field_id'
+ inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'})
+ middle = pa.field('middle', pa.struct(
+ [inner]), metadata={field_id: b'101'})
+ fields = [
+ pa.field('basic', pa.int32(), metadata={
+ b'other': b'abc', field_id: b'1'}),
+ pa.field(
+ 'list',
+ pa.list_(pa.field('list-inner', pa.int32(),
+ metadata={field_id: b'10'})),
+ metadata={field_id: b'11'}),
+ pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}),
+ pa.field('no-metadata', pa.int32()),
+ pa.field('non-integral-field-id', pa.int32(),
+ metadata={field_id: b'xyz'}),
+ pa.field('negative-field-id', pa.int32(),
+ metadata={field_id: b'-1000'})
+ ]
+ arrs = [[] for _ in fields]
+ table = pa.table(arrs, schema=pa.schema(fields))
+
+ bio = pa.BufferOutputStream()
+ pq.write_table(table, bio)
+ contents = bio.getvalue()
+
+ pf = pq.ParquetFile(pa.BufferReader(contents))
+ schema = pf.schema_arrow
+
+ assert schema[0].metadata[field_id] == b'1'
+ assert schema[0].metadata[b'other'] == b'abc'
+
+ list_field = schema[1]
+ assert list_field.metadata[field_id] == b'11'
+
+ list_item_field = list_field.type.value_field
+ assert list_item_field.metadata[field_id] == b'10'
+
+ struct_field = schema[2]
+ assert struct_field.metadata[field_id] == b'102'
+
+ struct_middle_field = struct_field.type[0]
+ assert struct_middle_field.metadata[field_id] == b'101'
+
+ struct_inner_field = struct_middle_field.type[0]
+ assert struct_inner_field.metadata[field_id] == b'100'
+
+ assert schema[3].metadata is None
+ # Invalid input is passed through (ok) but does not
+ # have field_id in parquet (not tested)
+ assert schema[4].metadata[field_id] == b'xyz'
+ assert schema[5].metadata[field_id] == b'-1000'
+
+
+@pytest.mark.pandas
+def test_multi_dataset_metadata(tempdir):
+ filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
+ metapath = str(tempdir / "_metadata")
+
+ # create a test dataset
+ df = pd.DataFrame({
+ 'one': [1, 2, 3],
+ 'two': [-1, -2, -3],
+ 'three': [[1, 2], [2, 3], [3, 4]],
+ })
+ table = pa.Table.from_pandas(df)
+
+ # write dataset twice and collect/merge metadata
+ _meta = None
+ for filename in filenames:
+ meta = []
+ pq.write_table(table, str(tempdir / filename),
+ metadata_collector=meta)
+ meta[0].set_file_path(filename)
+ if _meta is None:
+ _meta = meta[0]
+ else:
+ _meta.append_row_groups(meta[0])
+
+ # Write merged metadata-only file
+ with open(metapath, "wb") as f:
+ _meta.write_metadata_file(f)
+
+ # Read back the metadata
+ meta = pq.read_metadata(metapath)
+ md = meta.to_dict()
+ _md = _meta.to_dict()
+ for key in _md:
+ if key != 'serialized_size':
+ assert _md[key] == md[key]
+ assert _md['num_columns'] == 3
+ assert _md['num_rows'] == 6
+ assert _md['num_row_groups'] == 2
+ assert _md['serialized_size'] == 0
+ assert md['serialized_size'] > 0
+
+
+def test_write_metadata(tempdir):
+ path = str(tempdir / "metadata")
+ schema = pa.schema([("a", "int64"), ("b", "float64")])
+
+ # write a pyarrow schema
+ pq.write_metadata(schema, path)
+ parquet_meta = pq.read_metadata(path)
+ schema_as_arrow = parquet_meta.schema.to_arrow_schema()
+ assert schema_as_arrow.equals(schema)
+
+ # ARROW-8980: Check that the ARROW:schema metadata key was removed
+ if schema_as_arrow.metadata:
+ assert b'ARROW:schema' not in schema_as_arrow.metadata
+
+ # pass through writer keyword arguments
+ for version in ["1.0", "2.0", "2.4", "2.6"]:
+ pq.write_metadata(schema, path, version=version)
+ parquet_meta = pq.read_metadata(path)
+ # The version is stored as a single integer in the Parquet metadata,
+ # so it cannot correctly express dotted format versions
+ expected_version = "1.0" if version == "1.0" else "2.6"
+ assert parquet_meta.format_version == expected_version
+
+ # metadata_collector: list of FileMetaData objects
+ table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema)
+ pq.write_table(table, tempdir / "data.parquet")
+ parquet_meta = pq.read_metadata(str(tempdir / "data.parquet"))
+ pq.write_metadata(
+ schema, path, metadata_collector=[parquet_meta, parquet_meta]
+ )
+ parquet_meta_mult = pq.read_metadata(path)
+ assert parquet_meta_mult.num_row_groups == 2
+
+ # append metadata with different schema raises an error
+ with pytest.raises(RuntimeError, match="requires equal schemas"):
+ pq.write_metadata(
+ pa.schema([("a", "int32"), ("b", "null")]),
+ path, metadata_collector=[parquet_meta, parquet_meta]
+ )
+
+
+def test_table_large_metadata():
+ # ARROW-8694
+ my_schema = pa.schema([pa.field('f0', 'double')],
+ metadata={'large': 'x' * 10000000})
+
+ table = pa.table([np.arange(10)], schema=my_schema)
+ _check_roundtrip(table)
+
+
+@pytest.mark.pandas
+def test_compare_schemas():
+ df = alltypes_sample(size=10000)
+
+ fileh = make_sample_file(df)
+ fileh2 = make_sample_file(df)
+ fileh3 = make_sample_file(df[df.columns[::2]])
+
+ # ParquetSchema
+ assert isinstance(fileh.schema, pq.ParquetSchema)
+ assert fileh.schema.equals(fileh.schema)
+ assert fileh.schema == fileh.schema
+ assert fileh.schema.equals(fileh2.schema)
+ assert fileh.schema == fileh2.schema
+ assert fileh.schema != 'arbitrary object'
+ assert not fileh.schema.equals(fileh3.schema)
+ assert fileh.schema != fileh3.schema
+
+ # ColumnSchema
+ assert isinstance(fileh.schema[0], pq.ColumnSchema)
+ assert fileh.schema[0].equals(fileh.schema[0])
+ assert fileh.schema[0] == fileh.schema[0]
+ assert not fileh.schema[0].equals(fileh.schema[1])
+ assert fileh.schema[0] != fileh.schema[1]
+ assert fileh.schema[0] != 'arbitrary object'
+
+
+@pytest.mark.pandas
+def test_read_schema(tempdir):
+ N = 100
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'values'])
+
+ data_path = tempdir / 'test.parquet'
+
+ table = pa.Table.from_pandas(df)
+ _write_table(table, data_path)
+
+ read1 = pq.read_schema(data_path)
+ read2 = pq.read_schema(data_path, memory_map=True)
+ assert table.schema.equals(read1)
+ assert table.schema.equals(read2)
+
+ assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas']
+
+
+def test_parquet_metadata_empty_to_dict(tempdir):
+ # https://issues.apache.org/jira/browse/ARROW-10146
+ table = pa.table({"a": pa.array([], type="int64")})
+ pq.write_table(table, tempdir / "data.parquet")
+ metadata = pq.read_metadata(tempdir / "data.parquet")
+ # ensure this doesn't error / statistics set to None
+ metadata_dict = metadata.to_dict()
+ assert len(metadata_dict["row_groups"]) == 1
+ assert len(metadata_dict["row_groups"][0]["columns"]) == 1
+ assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None
+
+
+@pytest.mark.slow
+@pytest.mark.large_memory
+def test_metadata_exceeds_message_size():
+ # ARROW-13655: Thrift may enable a defaut message size that limits
+ # the size of Parquet metadata that can be written.
+ NCOLS = 1000
+ NREPEATS = 4000
+
+ table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)})
+
+ with pa.BufferOutputStream() as out:
+ pq.write_table(table, out)
+ buf = out.getvalue()
+
+ original_metadata = pq.read_metadata(pa.BufferReader(buf))
+ metadata = pq.read_metadata(pa.BufferReader(buf))
+ for i in range(NREPEATS):
+ metadata.append_row_groups(original_metadata)
+
+ with pa.BufferOutputStream() as out:
+ metadata.write_metadata_file(out)
+ buf = out.getvalue()
+
+ metadata = pq.read_metadata(pa.BufferReader(buf))
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_pandas.py b/src/arrow/python/pyarrow/tests/parquet/test_pandas.py
new file mode 100644
index 000000000..5fc1b7060
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_pandas.py
@@ -0,0 +1,687 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import json
+
+import numpy as np
+import pytest
+
+import pyarrow as pa
+from pyarrow.fs import LocalFileSystem, SubTreeFileSystem
+from pyarrow.tests.parquet.common import (
+ parametrize_legacy_dataset, parametrize_legacy_dataset_not_supported)
+from pyarrow.util import guid
+from pyarrow.vendored.version import Version
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
+ _write_table)
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.parquet.common import (_roundtrip_pandas_dataframe,
+ alltypes_sample)
+except ImportError:
+ pd = tm = None
+
+
+pytestmark = pytest.mark.parquet
+
+
+@pytest.mark.pandas
+def test_pandas_parquet_custom_metadata(tempdir):
+ df = alltypes_sample(size=10000)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ assert b'pandas' in arrow_table.schema.metadata
+
+ _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms')
+
+ metadata = pq.read_metadata(filename).metadata
+ assert b'pandas' in metadata
+
+ js = json.loads(metadata[b'pandas'].decode('utf8'))
+ assert js['index_columns'] == [{'kind': 'range',
+ 'name': None,
+ 'start': 0, 'stop': 10000,
+ 'step': 1}]
+
+
+@pytest.mark.pandas
+def test_merging_parquet_tables_with_different_pandas_metadata(tempdir):
+ # ARROW-3728: Merging Parquet Files - Pandas Meta in Schema Mismatch
+ schema = pa.schema([
+ pa.field('int', pa.int16()),
+ pa.field('float', pa.float32()),
+ pa.field('string', pa.string())
+ ])
+ df1 = pd.DataFrame({
+ 'int': np.arange(3, dtype=np.uint8),
+ 'float': np.arange(3, dtype=np.float32),
+ 'string': ['ABBA', 'EDDA', 'ACDC']
+ })
+ df2 = pd.DataFrame({
+ 'int': [4, 5],
+ 'float': [1.1, None],
+ 'string': [None, None]
+ })
+ table1 = pa.Table.from_pandas(df1, schema=schema, preserve_index=False)
+ table2 = pa.Table.from_pandas(df2, schema=schema, preserve_index=False)
+
+ assert not table1.schema.equals(table2.schema, check_metadata=True)
+ assert table1.schema.equals(table2.schema)
+
+ writer = pq.ParquetWriter(tempdir / 'merged.parquet', schema=schema)
+ writer.write_table(table1)
+ writer.write_table(table2)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_column_multiindex(tempdir, use_legacy_dataset):
+ df = alltypes_sample(size=10)
+ df.columns = pd.MultiIndex.from_tuples(
+ list(zip(df.columns, df.columns[::-1])),
+ names=['level_1', 'level_2']
+ )
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ assert arrow_table.schema.pandas_metadata is not None
+
+ _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms')
+
+ table_read = pq.read_pandas(
+ filename, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(
+ tempdir, use_legacy_dataset
+):
+ df = alltypes_sample(size=10000)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ js = arrow_table.schema.pandas_metadata
+ assert not js['index_columns']
+ # ARROW-2170
+ # While index_columns should be empty, columns needs to be filled still.
+ assert js['columns']
+
+ _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms')
+ table_read = pq.read_pandas(
+ filename, use_legacy_dataset=use_legacy_dataset)
+
+ js = table_read.schema.pandas_metadata
+ assert not js['index_columns']
+
+ read_metadata = table_read.schema.metadata
+ assert arrow_table.schema.metadata == read_metadata
+
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+# TODO(dataset) duplicate column selection actually gives duplicate columns now
+@pytest.mark.pandas
+@parametrize_legacy_dataset_not_supported
+def test_pandas_column_selection(tempdir, use_legacy_dataset):
+ size = 10000
+ np.random.seed(0)
+ df = pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16)
+ })
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ _write_table(arrow_table, filename)
+ table_read = _read_table(
+ filename, columns=['uint8'], use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+
+ tm.assert_frame_equal(df[['uint8']], df_read)
+
+ # ARROW-4267: Selection of duplicate columns still leads to these columns
+ # being read uniquely.
+ table_read = _read_table(
+ filename, columns=['uint8', 'uint8'],
+ use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+
+ tm.assert_frame_equal(df[['uint8']], df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset):
+ df = _test_dataframe(10000)
+ arrow_table = pa.Table.from_pandas(df)
+ imos = pa.BufferOutputStream()
+ _write_table(arrow_table, imos, version='2.6')
+ buf = imos.getvalue()
+ reader = pa.BufferReader(buf)
+ df_read = _read_table(
+ reader, use_legacy_dataset=use_legacy_dataset).to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_read_pandas_column_subset(tempdir, use_legacy_dataset):
+ df = _test_dataframe(10000)
+ arrow_table = pa.Table.from_pandas(df)
+ imos = pa.BufferOutputStream()
+ _write_table(arrow_table, imos, version='2.6')
+ buf = imos.getvalue()
+ reader = pa.BufferReader(buf)
+ df_read = pq.read_pandas(
+ reader, columns=['strings', 'uint8'],
+ use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset):
+ df = _test_dataframe(0)
+ arrow_table = pa.Table.from_pandas(df)
+ imos = pa.BufferOutputStream()
+ _write_table(arrow_table, imos, version='2.6')
+ buf = imos.getvalue()
+ reader = pa.BufferReader(buf)
+ df_read = _read_table(
+ reader, use_legacy_dataset=use_legacy_dataset).to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+def test_pandas_can_write_nested_data(tempdir):
+ data = {
+ "agg_col": [
+ {"page_type": 1},
+ {"record_type": 1},
+ {"non_consecutive_home": 0},
+ ],
+ "uid_first": "1001"
+ }
+ df = pd.DataFrame(data=data)
+ arrow_table = pa.Table.from_pandas(df)
+ imos = pa.BufferOutputStream()
+ # This succeeds under V2
+ _write_table(arrow_table, imos)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
+ filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
+ size = 5
+ df = pd.DataFrame({
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ 'strings': ['foo', 'bar', None, 'baz', 'qux']
+ })
+
+ arrow_table = pa.Table.from_pandas(df)
+
+ with filename.open('wb') as f:
+ _write_table(arrow_table, f, version="1.0")
+
+ data = io.BytesIO(filename.read_bytes())
+
+ table_read = _read_table(data, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset):
+ size = 10000
+ np.random.seed(0)
+ df = pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0
+ })
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+
+ for use_dictionary in [True, False]:
+ _write_table(arrow_table, filename, version='2.6',
+ use_dictionary=use_dictionary)
+ table_read = _read_table(
+ filename, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+ for write_statistics in [True, False]:
+ _write_table(arrow_table, filename, version='2.6',
+ write_statistics=write_statistics)
+ table_read = _read_table(filename,
+ use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+ for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']:
+ if (compression != 'NONE' and
+ not pa.lib.Codec.is_available(compression)):
+ continue
+ _write_table(arrow_table, filename, version='2.6',
+ compression=compression)
+ table_read = _read_table(
+ filename, use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df, df_read)
+
+
+@pytest.mark.pandas
+def test_spark_flavor_preserves_pandas_metadata():
+ df = _test_dataframe(size=100)
+ df.index = np.arange(0, 10 * len(df), 10)
+ df.index.name = 'foo'
+
+ result = _roundtrip_pandas_dataframe(df, {'version': '2.0',
+ 'flavor': 'spark'})
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_index_column_name_duplicate(tempdir, use_legacy_dataset):
+ data = {
+ 'close': {
+ pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
+ pd.Timestamp('2017-06-30 01:32:00'): 154.99958999999998,
+ },
+ 'time': {
+ pd.Timestamp('2017-06-30 01:31:00'): pd.Timestamp(
+ '2017-06-30 01:31:00'
+ ),
+ pd.Timestamp('2017-06-30 01:32:00'): pd.Timestamp(
+ '2017-06-30 01:32:00'
+ ),
+ }
+ }
+ path = str(tempdir / 'data.parquet')
+ dfx = pd.DataFrame(data).set_index('time', drop=False)
+ tdfx = pa.Table.from_pandas(dfx)
+ _write_table(tdfx, path)
+ arrow_table = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+ result_df = arrow_table.to_pandas()
+ tm.assert_frame_equal(result_df, dfx)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_multiindex_duplicate_values(tempdir, use_legacy_dataset):
+ num_rows = 3
+ numbers = list(range(num_rows))
+ index = pd.MultiIndex.from_arrays(
+ [['foo', 'foo', 'bar'], numbers],
+ names=['foobar', 'some_numbers'],
+ )
+
+ df = pd.DataFrame({'numbers': numbers}, index=index)
+ table = pa.Table.from_pandas(df)
+
+ filename = tempdir / 'dup_multi_index_levels.parquet'
+
+ _write_table(table, filename)
+ result_table = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
+ assert table.equals(result_table)
+
+ result_df = result_table.to_pandas()
+ tm.assert_frame_equal(result_df, df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_naming(datadir, use_legacy_dataset):
+ expected_string = b"""\
+carat cut color clarity depth table price x y z
+ 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
+ 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
+ 0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
+ 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
+ 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
+ 0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
+ 0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
+ 0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
+ 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
+ 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
+ expected = pd.read_csv(io.BytesIO(expected_string), sep=r'\s{2,}',
+ index_col=None, header=0, engine='python')
+ table = _read_table(
+ datadir / 'v0.7.1.parquet', use_legacy_dataset=use_legacy_dataset)
+ result = table.to_pandas()
+ tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_multi_level_named(
+ datadir, use_legacy_dataset
+):
+ expected_string = b"""\
+carat cut color clarity depth table price x y z
+ 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
+ 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
+ 0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
+ 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
+ 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
+ 0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
+ 0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
+ 0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
+ 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
+ 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
+ expected = pd.read_csv(
+ io.BytesIO(expected_string), sep=r'\s{2,}',
+ index_col=['cut', 'color', 'clarity'],
+ header=0, engine='python'
+ ).sort_index()
+
+ table = _read_table(datadir / 'v0.7.1.all-named-index.parquet',
+ use_legacy_dataset=use_legacy_dataset)
+ result = table.to_pandas()
+ tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_backwards_compatible_index_multi_level_some_named(
+ datadir, use_legacy_dataset
+):
+ expected_string = b"""\
+carat cut color clarity depth table price x y z
+ 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
+ 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
+ 0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
+ 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
+ 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
+ 0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
+ 0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
+ 0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
+ 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
+ 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
+ expected = pd.read_csv(
+ io.BytesIO(expected_string),
+ sep=r'\s{2,}', index_col=['cut', 'color', 'clarity'],
+ header=0, engine='python'
+ ).sort_index()
+ expected.index = expected.index.set_names(['cut', None, 'clarity'])
+
+ table = _read_table(datadir / 'v0.7.1.some-named-index.parquet',
+ use_legacy_dataset=use_legacy_dataset)
+ result = table.to_pandas()
+ tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_backwards_compatible_column_metadata_handling(
+ datadir, use_legacy_dataset
+):
+ expected = pd.DataFrame(
+ {'a': [1, 2, 3], 'b': [.1, .2, .3],
+ 'c': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')})
+ expected.index = pd.MultiIndex.from_arrays(
+ [['a', 'b', 'c'],
+ pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')],
+ names=['index', None])
+
+ path = datadir / 'v0.7.1.column-metadata-handling.parquet'
+ table = _read_table(path, use_legacy_dataset=use_legacy_dataset)
+ result = table.to_pandas()
+ tm.assert_frame_equal(result, expected)
+
+ table = _read_table(
+ path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
+ result = table.to_pandas()
+ tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True))
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_categorical_index_survives_roundtrip(use_legacy_dataset):
+ # ARROW-3652, addressed by ARROW-3246
+ df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2'])
+ df['c1'] = df['c1'].astype('category')
+ df = df.set_index(['c1'])
+
+ table = pa.Table.from_pandas(df)
+ bos = pa.BufferOutputStream()
+ pq.write_table(table, bos)
+ ref_df = pq.read_pandas(
+ bos.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas()
+ assert isinstance(ref_df.index, pd.CategoricalIndex)
+ assert ref_df.index.equals(df.index)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_categorical_order_survives_roundtrip(use_legacy_dataset):
+ # ARROW-6302
+ df = pd.DataFrame({"a": pd.Categorical(
+ ["a", "b", "c", "a"], categories=["b", "c", "d"], ordered=True)})
+
+ table = pa.Table.from_pandas(df)
+ bos = pa.BufferOutputStream()
+ pq.write_table(table, bos)
+
+ contents = bos.getvalue()
+ result = pq.read_pandas(
+ contents, use_legacy_dataset=use_legacy_dataset).to_pandas()
+
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
+ # ARROW-5085
+ df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100})
+ df_category = df.astype({"col": "category", "int": "category"})
+ table = pa.Table.from_pandas(df)
+ table_cat = pa.Table.from_pandas(df_category)
+ buf = pa.BufferOutputStream()
+
+ # it works
+ pq.write_table(table_cat, buf, version='2.6', chunk_size=10)
+ result = pq.read_table(
+ buf.getvalue(), use_legacy_dataset=use_legacy_dataset)
+
+ # Result is non-categorical
+ assert result[0].equals(table[0])
+ assert result[1].equals(table[1])
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_pandas_categorical_roundtrip(use_legacy_dataset):
+ # ARROW-5480, this was enabled by ARROW-3246
+
+ # Have one of the categories unobserved and include a null (-1)
+ codes = np.array([2, 0, 0, 2, 0, -1, 2], dtype='int32')
+ categories = ['foo', 'bar', 'baz']
+ df = pd.DataFrame({'x': pd.Categorical.from_codes(
+ codes, categories=categories)})
+
+ buf = pa.BufferOutputStream()
+ pq.write_table(pa.table(df), buf)
+
+ result = pq.read_table(
+ buf.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas()
+ assert result.x.dtype == 'category'
+ assert (result.x.cat.categories == categories).all()
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pandas_preserve_extensiondtypes(
+ tempdir, use_legacy_dataset
+):
+ # ARROW-8251 - preserve pandas extension dtypes in roundtrip
+ if Version(pd.__version__) < Version("1.0.0"):
+ pytest.skip("__arrow_array__ added to pandas in 1.0.0")
+
+ df = pd.DataFrame({'part': 'a', "col": [1, 2, 3]})
+ df['col'] = df['col'].astype("Int64")
+ table = pa.table(df)
+
+ pq.write_to_dataset(
+ table, str(tempdir / "case1"), partition_cols=['part'],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ result = pq.read_table(
+ str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+ pq.write_to_dataset(
+ table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+ )
+ result = pq.read_table(
+ str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+ pq.write_table(table, str(tempdir / "data.parquet"))
+ result = pq.read_table(
+ str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(result[["col"]], df[["col"]])
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_write_to_dataset_pandas_preserve_index(tempdir, use_legacy_dataset):
+ # ARROW-8251 - preserve pandas index in roundtrip
+
+ df = pd.DataFrame({'part': ['a', 'a', 'b'], "col": [1, 2, 3]})
+ df.index = pd.Index(['a', 'b', 'c'], name="idx")
+ table = pa.table(df)
+ df_cat = df[["col", "part"]].copy()
+ df_cat["part"] = df_cat["part"].astype("category")
+
+ pq.write_to_dataset(
+ table, str(tempdir / "case1"), partition_cols=['part'],
+ use_legacy_dataset=use_legacy_dataset
+ )
+ result = pq.read_table(
+ str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(result, df_cat)
+
+ pq.write_to_dataset(
+ table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+ )
+ result = pq.read_table(
+ str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+ pq.write_table(table, str(tempdir / "data.parquet"))
+ result = pq.read_table(
+ str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset
+ ).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('preserve_index', [True, False, None])
+def test_dataset_read_pandas_common_metadata(tempdir, preserve_index):
+ # ARROW-1103
+ nfiles = 5
+ size = 5
+
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ test_data = []
+ frames = []
+ paths = []
+ for i in range(nfiles):
+ df = _test_dataframe(size, seed=i)
+ df.index = pd.Index(np.arange(i * size, (i + 1) * size), name='index')
+
+ path = dirpath / '{}.parquet'.format(i)
+
+ table = pa.Table.from_pandas(df, preserve_index=preserve_index)
+
+ # Obliterate metadata
+ table = table.replace_schema_metadata(None)
+ assert table.schema.metadata is None
+
+ _write_table(table, path)
+ test_data.append(table)
+ frames.append(df)
+ paths.append(path)
+
+ # Write _metadata common file
+ table_for_metadata = pa.Table.from_pandas(
+ df, preserve_index=preserve_index
+ )
+ pq.write_metadata(table_for_metadata.schema, dirpath / '_metadata')
+
+ dataset = pq.ParquetDataset(dirpath)
+ columns = ['uint8', 'strings']
+ result = dataset.read_pandas(columns=columns).to_pandas()
+ expected = pd.concat([x[columns] for x in frames])
+ expected.index.name = (
+ df.index.name if preserve_index is not False else None)
+ tm.assert_frame_equal(result, expected)
+
+
+@pytest.mark.pandas
+def test_read_pandas_passthrough_keywords(tempdir):
+ # ARROW-11464 - previously not all keywords were passed through (such as
+ # the filesystem keyword)
+ df = pd.DataFrame({'a': [1, 2, 3]})
+
+ filename = tempdir / 'data.parquet'
+ _write_table(df, filename)
+
+ result = pq.read_pandas(
+ 'data.parquet',
+ filesystem=SubTreeFileSystem(str(tempdir), LocalFileSystem())
+ )
+ assert result.equals(pa.table(df))
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py b/src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py
new file mode 100644
index 000000000..3a4d4aaa2
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import os
+
+import pytest
+
+import pyarrow as pa
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import _write_table
+except ImportError:
+ pq = None
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+ from pyarrow.tests.parquet.common import alltypes_sample
+except ImportError:
+ pd = tm = None
+
+pytestmark = pytest.mark.parquet
+
+
+@pytest.mark.pandas
+def test_pass_separate_metadata():
+ # ARROW-471
+ df = alltypes_sample(size=10000)
+
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, compression='snappy', version='2.6')
+
+ buf.seek(0)
+ metadata = pq.read_metadata(buf)
+
+ buf.seek(0)
+
+ fileh = pq.ParquetFile(buf, metadata=metadata)
+
+ tm.assert_frame_equal(df, fileh.read().to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_single_row_group():
+ # ARROW-471
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.6')
+
+ buf.seek(0)
+
+ pf = pq.ParquetFile(buf)
+
+ assert pf.num_row_groups == K
+
+ row_groups = [pf.read_row_group(i) for i in range(K)]
+ result = pa.concat_tables(row_groups)
+ tm.assert_frame_equal(df, result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_single_row_group_with_column_subset():
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.6')
+
+ buf.seek(0)
+ pf = pq.ParquetFile(buf)
+
+ cols = list(df.columns[:2])
+ row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
+ result = pa.concat_tables(row_groups)
+ tm.assert_frame_equal(df[cols], result.to_pandas())
+
+ # ARROW-4267: Selection of duplicate columns still leads to these columns
+ # being read uniquely.
+ row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)]
+ result = pa.concat_tables(row_groups)
+ tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_multiple_row_groups():
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.6')
+
+ buf.seek(0)
+
+ pf = pq.ParquetFile(buf)
+
+ assert pf.num_row_groups == K
+
+ result = pf.read_row_groups(range(K))
+ tm.assert_frame_equal(df, result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_read_multiple_row_groups_with_column_subset():
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.6')
+
+ buf.seek(0)
+ pf = pq.ParquetFile(buf)
+
+ cols = list(df.columns[:2])
+ result = pf.read_row_groups(range(K), columns=cols)
+ tm.assert_frame_equal(df[cols], result.to_pandas())
+
+ # ARROW-4267: Selection of duplicate columns still leads to these columns
+ # being read uniquely.
+ result = pf.read_row_groups(range(K), columns=cols + cols)
+ tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@pytest.mark.pandas
+def test_scan_contents():
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.6')
+
+ buf.seek(0)
+ pf = pq.ParquetFile(buf)
+
+ assert pf.scan_contents() == 10000
+ assert pf.scan_contents(df.columns[:4]) == 10000
+
+
+def test_parquet_file_pass_directory_instead_of_file(tempdir):
+ # ARROW-7208
+ path = tempdir / 'directory'
+ os.mkdir(str(path))
+
+ with pytest.raises(IOError, match="Expected file path"):
+ pq.ParquetFile(path)
+
+
+def test_read_column_invalid_index():
+ table = pa.table([pa.array([4, 5]), pa.array(["foo", "bar"])],
+ names=['ints', 'strs'])
+ bio = pa.BufferOutputStream()
+ pq.write_table(table, bio)
+ f = pq.ParquetFile(bio.getvalue())
+ assert f.reader.read_column(0).to_pylist() == [4, 5]
+ assert f.reader.read_column(1).to_pylist() == ["foo", "bar"]
+ for index in (-1, 2):
+ with pytest.raises((ValueError, IndexError)):
+ f.reader.read_column(index)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('batch_size', [300, 1000, 1300])
+def test_iter_batches_columns_reader(tempdir, batch_size):
+ total_size = 3000
+ chunk_size = 1000
+ # TODO: Add categorical support
+ df = alltypes_sample(size=total_size)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ _write_table(arrow_table, filename, version='2.6',
+ coerce_timestamps='ms', chunk_size=chunk_size)
+
+ file_ = pq.ParquetFile(filename)
+ for columns in [df.columns[:10], df.columns[10:]]:
+ batches = file_.iter_batches(batch_size=batch_size, columns=columns)
+ batch_starts = range(0, total_size+batch_size, batch_size)
+ for batch, start in zip(batches, batch_starts):
+ end = min(total_size, start + batch_size)
+ tm.assert_frame_equal(
+ batch.to_pandas(),
+ df.iloc[start:end, :].loc[:, columns].reset_index(drop=True)
+ )
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('chunk_size', [1000])
+def test_iter_batches_reader(tempdir, chunk_size):
+ df = alltypes_sample(size=10000, categorical=True)
+
+ filename = tempdir / 'pandas_roundtrip.parquet'
+ arrow_table = pa.Table.from_pandas(df)
+ assert arrow_table.schema.pandas_metadata is not None
+
+ _write_table(arrow_table, filename, version='2.6',
+ coerce_timestamps='ms', chunk_size=chunk_size)
+
+ file_ = pq.ParquetFile(filename)
+
+ def get_all_batches(f):
+ for row_group in range(f.num_row_groups):
+ batches = f.iter_batches(
+ batch_size=900,
+ row_groups=[row_group],
+ )
+
+ for batch in batches:
+ yield batch
+
+ batches = list(get_all_batches(file_))
+ batch_no = 0
+
+ for i in range(file_.num_row_groups):
+ tm.assert_frame_equal(
+ batches[batch_no].to_pandas(),
+ file_.read_row_groups([i]).to_pandas().head(900)
+ )
+
+ batch_no += 1
+
+ tm.assert_frame_equal(
+ batches[batch_no].to_pandas().reset_index(drop=True),
+ file_.read_row_groups([i]).to_pandas().iloc[900:].reset_index(
+ drop=True
+ )
+ )
+
+ batch_no += 1
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('pre_buffer', [False, True])
+def test_pre_buffer(pre_buffer):
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.6')
+
+ buf.seek(0)
+ pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
+ assert pf.read().num_rows == N
diff --git a/src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py b/src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py
new file mode 100644
index 000000000..9be7634c8
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py
@@ -0,0 +1,278 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import fs
+from pyarrow.filesystem import FileSystem, LocalFileSystem
+from pyarrow.tests.parquet.common import parametrize_legacy_dataset
+
+try:
+ import pyarrow.parquet as pq
+ from pyarrow.tests.parquet.common import _read_table, _test_dataframe
+except ImportError:
+ pq = None
+
+
+try:
+ import pandas as pd
+ import pandas.testing as tm
+
+except ImportError:
+ pd = tm = None
+
+pytestmark = pytest.mark.parquet
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
+ df = _test_dataframe(100)
+ df['unique_id'] = 0
+
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ out = pa.BufferOutputStream()
+
+ writer = pq.ParquetWriter(out, arrow_table.schema, version='2.6')
+
+ frames = []
+ for i in range(10):
+ df['unique_id'] = i
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ writer.write_table(arrow_table)
+
+ frames.append(df.copy())
+
+ writer.close()
+
+ buf = out.getvalue()
+ result = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+ expected = pd.concat(frames, ignore_index=True)
+ tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+def test_validate_schema_write_table(tempdir):
+ # ARROW-2926
+ simple_fields = [
+ pa.field('POS', pa.uint32()),
+ pa.field('desc', pa.string())
+ ]
+
+ simple_schema = pa.schema(simple_fields)
+
+ # simple_table schema does not match simple_schema
+ simple_from_array = [pa.array([1]), pa.array(['bla'])]
+ simple_table = pa.Table.from_arrays(simple_from_array, ['POS', 'desc'])
+
+ path = tempdir / 'simple_validate_schema.parquet'
+
+ with pq.ParquetWriter(path, simple_schema,
+ version='2.6',
+ compression='snappy', flavor='spark') as w:
+ with pytest.raises(ValueError):
+ w.write_table(simple_table)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
+ df = _test_dataframe(100)
+ df['unique_id'] = 0
+
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ out = pa.BufferOutputStream()
+
+ with pq.ParquetWriter(out, arrow_table.schema, version='2.6') as writer:
+
+ frames = []
+ for i in range(10):
+ df['unique_id'] = i
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ writer.write_table(arrow_table)
+
+ frames.append(df.copy())
+
+ buf = out.getvalue()
+ result = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+ expected = pd.concat(frames, ignore_index=True)
+ tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_context_obj_with_exception(
+ tempdir, use_legacy_dataset
+):
+ df = _test_dataframe(100)
+ df['unique_id'] = 0
+
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ out = pa.BufferOutputStream()
+ error_text = 'Artificial Error'
+
+ try:
+ with pq.ParquetWriter(out,
+ arrow_table.schema,
+ version='2.6') as writer:
+
+ frames = []
+ for i in range(10):
+ df['unique_id'] = i
+ arrow_table = pa.Table.from_pandas(df, preserve_index=False)
+ writer.write_table(arrow_table)
+ frames.append(df.copy())
+ if i == 5:
+ raise ValueError(error_text)
+ except Exception as e:
+ assert str(e) == error_text
+
+ buf = out.getvalue()
+ result = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+
+ expected = pd.concat(frames, ignore_index=True)
+ tm.assert_frame_equal(result.to_pandas(), expected)
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize("filesystem", [
+ None,
+ LocalFileSystem._get_instance(),
+ fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ path = str(tempdir / 'data.parquet')
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=filesystem, version='2.6'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(path).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ fs, uri, path = s3_example_fs
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=fs, version='2.6'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(uri).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+def test_parquet_writer_filesystem_s3_uri(s3_example_fs):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ fs, uri, path = s3_example_fs
+
+ with pq.ParquetWriter(uri, table.schema, version='2.6') as writer:
+ writer.write_table(table)
+
+ result = _read_table(path, filesystem=fs).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+@pytest.mark.s3
+def test_parquet_writer_filesystem_s3fs(s3_example_s3fs):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ fs, directory = s3_example_s3fs
+ path = directory + "/test.parquet"
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=fs, version='2.6'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(path, filesystem=fs).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
+@pytest.mark.pandas
+def test_parquet_writer_filesystem_buffer_raises():
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ filesystem = fs.LocalFileSystem()
+
+ # Should raise ValueError when filesystem is passed with file-like object
+ with pytest.raises(ValueError, match="specified path is file-like"):
+ pq.ParquetWriter(
+ pa.BufferOutputStream(), table.schema, filesystem=filesystem
+ )
+
+
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):
+ out = pa.BufferOutputStream()
+
+ class CustomFS(FileSystem):
+ def __init__(self):
+ self.path = None
+ self.mode = None
+
+ def open(self, path, mode='rb'):
+ self.path = path
+ self.mode = mode
+ return out
+
+ fs = CustomFS()
+ fname = 'expected_fname.parquet'
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.6') \
+ as writer:
+ writer.write_table(table)
+
+ assert fs.path == fname
+ assert fs.mode == 'wb'
+ assert out.closed
+
+ buf = out.getvalue()
+ table_read = _read_table(
+ pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset)
+ df_read = table_read.to_pandas()
+ tm.assert_frame_equal(df_read, df)
+
+ # Should raise ValueError when filesystem is passed with file-like object
+ with pytest.raises(ValueError) as err_info:
+ pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs)
+ expected_msg = ("filesystem passed but where is file-like, so"
+ " there is nothing to open with filesystem.")
+ assert str(err_info) == expected_msg