diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/tests/parquet | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/tests/parquet')
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/common.py | 177 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/conftest.py | 87 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_basic.py | 631 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py | 115 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_data_types.py | 529 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_dataset.py | 1661 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_datetime.py | 440 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_metadata.py | 524 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_pandas.py | 687 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py | 276 | ||||
-rw-r--r-- | src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py | 278 |
11 files changed, 5405 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/tests/parquet/common.py b/src/arrow/python/pyarrow/tests/parquet/common.py new file mode 100644 index 000000000..90bfb55d1 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/common.py @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow.tests import util + +parametrize_legacy_dataset = pytest.mark.parametrize( + "use_legacy_dataset", + [True, pytest.param(False, marks=pytest.mark.dataset)]) +parametrize_legacy_dataset_not_supported = pytest.mark.parametrize( + "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)]) +parametrize_legacy_dataset_fixed = pytest.mark.parametrize( + "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail), + pytest.param(False, marks=pytest.mark.dataset)]) + +# Marks all of the tests in this module +# Ignore these with pytest ... -m 'not parquet' +pytestmark = pytest.mark.parquet + + +def _write_table(table, path, **kwargs): + # So we see the ImportError somewhere + import pyarrow.parquet as pq + from pyarrow.pandas_compat import _pandas_api + + if _pandas_api.is_data_frame(table): + table = pa.Table.from_pandas(table) + + pq.write_table(table, path, **kwargs) + return table + + +def _read_table(*args, **kwargs): + import pyarrow.parquet as pq + + table = pq.read_table(*args, **kwargs) + table.validate(full=True) + return table + + +def _roundtrip_table(table, read_table_kwargs=None, + write_table_kwargs=None, use_legacy_dataset=True): + read_table_kwargs = read_table_kwargs or {} + write_table_kwargs = write_table_kwargs or {} + + writer = pa.BufferOutputStream() + _write_table(table, writer, **write_table_kwargs) + reader = pa.BufferReader(writer.getvalue()) + return _read_table(reader, use_legacy_dataset=use_legacy_dataset, + **read_table_kwargs) + + +def _check_roundtrip(table, expected=None, read_table_kwargs=None, + use_legacy_dataset=True, **write_table_kwargs): + if expected is None: + expected = table + + read_table_kwargs = read_table_kwargs or {} + + # intentionally check twice + result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs, + write_table_kwargs=write_table_kwargs, + use_legacy_dataset=use_legacy_dataset) + assert result.equals(expected) + result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs, + write_table_kwargs=write_table_kwargs, + use_legacy_dataset=use_legacy_dataset) + assert result.equals(expected) + + +def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True): + table = pa.Table.from_pandas(df) + result = _roundtrip_table( + table, write_table_kwargs=write_kwargs, + use_legacy_dataset=use_legacy_dataset) + return result.to_pandas() + + +def _random_integers(size, dtype): + # We do not generate integers outside the int64 range + platform_int_info = np.iinfo('int_') + iinfo = np.iinfo(dtype) + return np.random.randint(max(iinfo.min, platform_int_info.min), + min(iinfo.max, platform_int_info.max), + size=size).astype(dtype) + + +def _test_dataframe(size=10000, seed=0): + import pandas as pd + + np.random.seed(seed) + df = pd.DataFrame({ + 'uint8': _random_integers(size, np.uint8), + 'uint16': _random_integers(size, np.uint16), + 'uint32': _random_integers(size, np.uint32), + 'uint64': _random_integers(size, np.uint64), + 'int8': _random_integers(size, np.int8), + 'int16': _random_integers(size, np.int16), + 'int32': _random_integers(size, np.int32), + 'int64': _random_integers(size, np.int64), + 'float32': np.random.randn(size).astype(np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'strings': [util.rands(10) for i in range(size)], + 'all_none': [None] * size, + 'all_none_category': [None] * size + }) + + # TODO(PARQUET-1015) + # df['all_none_category'] = df['all_none_category'].astype('category') + return df + + +def make_sample_file(table_or_df): + import pyarrow.parquet as pq + + if isinstance(table_or_df, pa.Table): + a_table = table_or_df + else: + a_table = pa.Table.from_pandas(table_or_df) + + buf = io.BytesIO() + _write_table(a_table, buf, compression='SNAPPY', version='2.6', + coerce_timestamps='ms') + + buf.seek(0) + return pq.ParquetFile(buf) + + +def alltypes_sample(size=10000, seed=0, categorical=False): + import pandas as pd + + np.random.seed(seed) + arrays = { + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + # TODO(wesm): Test other timestamp resolutions now that arrow supports + # them + 'datetime': np.arange("2016-01-01T00:00:00.001", size, + dtype='datetime64[ms]'), + 'str': pd.Series([str(x) for x in range(size)]), + 'empty_str': [''] * size, + 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None], + 'null': [None] * size, + 'null_list': [None] * 2 + [[None] * (x % 4) for x in range(size - 2)], + } + if categorical: + arrays['str_category'] = arrays['str'].astype('category') + return pd.DataFrame(arrays) diff --git a/src/arrow/python/pyarrow/tests/parquet/conftest.py b/src/arrow/python/pyarrow/tests/parquet/conftest.py new file mode 100644 index 000000000..1e75493cd --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/conftest.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from pyarrow.util import guid + + +@pytest.fixture(scope='module') +def datadir(base_datadir): + return base_datadir / 'parquet' + + +@pytest.fixture +def s3_bucket(s3_server): + boto3 = pytest.importorskip('boto3') + botocore = pytest.importorskip('botocore') + + host, port, access_key, secret_key = s3_server['connection'] + s3 = boto3.resource( + 's3', + endpoint_url='http://{}:{}'.format(host, port), + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + config=botocore.client.Config(signature_version='s3v4'), + region_name='us-east-1' + ) + bucket = s3.Bucket('test-s3fs') + try: + bucket.create() + except Exception: + # we get BucketAlreadyOwnedByYou error with fsspec handler + pass + return 'test-s3fs' + + +@pytest.fixture +def s3_example_s3fs(s3_server, s3_bucket): + s3fs = pytest.importorskip('s3fs') + + host, port, access_key, secret_key = s3_server['connection'] + fs = s3fs.S3FileSystem( + key=access_key, + secret=secret_key, + client_kwargs={ + 'endpoint_url': 'http://{}:{}'.format(host, port) + } + ) + + test_path = '{}/{}'.format(s3_bucket, guid()) + + fs.mkdir(test_path) + yield fs, test_path + try: + fs.rm(test_path, recursive=True) + except FileNotFoundError: + pass + + +@pytest.fixture +def s3_example_fs(s3_server): + from pyarrow.fs import FileSystem + + host, port, access_key, secret_key = s3_server['connection'] + uri = ( + "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" + .format(access_key, secret_key, host, port) + ) + fs, path = FileSystem.from_uri(uri) + + fs.create_dir("mybucket") + + yield fs, uri, path diff --git a/src/arrow/python/pyarrow/tests/parquet/test_basic.py b/src/arrow/python/pyarrow/tests/parquet/test_basic.py new file mode 100644 index 000000000..cf1aaa21f --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_basic.py @@ -0,0 +1,631 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections import OrderedDict +import io + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow import fs +from pyarrow.filesystem import LocalFileSystem, FileSystem +from pyarrow.tests import util +from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table, + parametrize_legacy_dataset) + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _read_table, _write_table +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.pandas_examples import dataframe_with_lists + from pyarrow.tests.parquet.common import alltypes_sample +except ImportError: + pd = tm = None + + +pytestmark = pytest.mark.parquet + + +def test_parquet_invalid_version(tempdir): + table = pa.table({'a': [1, 2, 3]}) + with pytest.raises(ValueError, match="Unsupported Parquet format version"): + _write_table(table, tempdir / 'test_version.parquet', version="2.2") + with pytest.raises(ValueError, match="Unsupported Parquet data page " + + "version"): + _write_table(table, tempdir / 'test_version.parquet', + data_page_version="2.2") + + +@parametrize_legacy_dataset +def test_set_data_page_size(use_legacy_dataset): + arr = pa.array([1, 2, 3] * 100000) + t = pa.Table.from_arrays([arr], names=['f0']) + + # 128K, 512K + page_sizes = [2 << 16, 2 << 18] + for target_page_size in page_sizes: + _check_roundtrip(t, data_page_size=target_page_size, + use_legacy_dataset=use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_chunked_table_write(use_legacy_dataset): + # ARROW-232 + tables = [] + batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10)) + tables.append(pa.Table.from_batches([batch] * 3)) + df, _ = dataframe_with_lists() + batch = pa.RecordBatch.from_pandas(df) + tables.append(pa.Table.from_batches([batch] * 3)) + + for data_page_version in ['1.0', '2.0']: + for use_dictionary in [True, False]: + for table in tables: + _check_roundtrip( + table, version='2.6', + use_legacy_dataset=use_legacy_dataset, + data_page_version=data_page_version, + use_dictionary=use_dictionary) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_memory_map(tempdir, use_legacy_dataset): + df = alltypes_sample(size=10) + + table = pa.Table.from_pandas(df) + _check_roundtrip(table, read_table_kwargs={'memory_map': True}, + version='2.6', use_legacy_dataset=use_legacy_dataset) + + filename = str(tempdir / 'tmp_file') + with open(filename, 'wb') as f: + _write_table(table, f, version='2.6') + table_read = pq.read_pandas(filename, memory_map=True, + use_legacy_dataset=use_legacy_dataset) + assert table_read.equals(table) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_enable_buffered_stream(tempdir, use_legacy_dataset): + df = alltypes_sample(size=10) + + table = pa.Table.from_pandas(df) + _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025}, + version='2.6', use_legacy_dataset=use_legacy_dataset) + + filename = str(tempdir / 'tmp_file') + with open(filename, 'wb') as f: + _write_table(table, f, version='2.6') + table_read = pq.read_pandas(filename, buffer_size=4096, + use_legacy_dataset=use_legacy_dataset) + assert table_read.equals(table) + + +@parametrize_legacy_dataset +def test_special_chars_filename(tempdir, use_legacy_dataset): + table = pa.Table.from_arrays([pa.array([42])], ["ints"]) + filename = "foo # bar" + path = tempdir / filename + assert not path.exists() + _write_table(table, str(path)) + assert path.exists() + table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset) + assert table_read.equals(table) + + +@parametrize_legacy_dataset +def test_invalid_source(use_legacy_dataset): + # Test that we provide an helpful error message pointing out + # that None wasn't expected when trying to open a Parquet None file. + # + # Depending on use_legacy_dataset the message changes slightly + # but in both cases it should point out that None wasn't expected. + with pytest.raises(TypeError, match="None"): + pq.read_table(None, use_legacy_dataset=use_legacy_dataset) + + with pytest.raises(TypeError, match="None"): + pq.ParquetFile(None) + + +@pytest.mark.slow +def test_file_with_over_int16_max_row_groups(): + # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper + # limit on the number of row groups, but this limit only impacts files with + # encrypted row group metadata because of the int16 row group ordinal used + # in the Parquet Thrift metadata. Unencrypted files are not impacted, so + # this test checks that it works (even if it isn't a good idea) + t = pa.table([list(range(40000))], names=['f0']) + _check_roundtrip(t, row_group_size=1) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_empty_table_roundtrip(use_legacy_dataset): + df = alltypes_sample(size=10) + + # Create a non-empty table to infer the types correctly, then slice to 0 + table = pa.Table.from_pandas(df) + table = pa.Table.from_arrays( + [col.chunk(0)[:0] for col in table.itercolumns()], + names=table.schema.names) + + assert table.schema.field('null').type == pa.null() + assert table.schema.field('null_list').type == pa.list_(pa.null()) + _check_roundtrip( + table, version='2.6', use_legacy_dataset=use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_empty_table_no_columns(use_legacy_dataset): + df = pd.DataFrame() + empty = pa.Table.from_pandas(df, preserve_index=False) + _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset) + + +@parametrize_legacy_dataset +def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset): + # Bug report in ARROW-3792 + cols = OrderedDict( + int32=pa.int32(), + list_string=pa.list_(pa.string()) + ) + data = [[], [OrderedDict(int32=1, list_string=('G',)), ]] + + # This produces a table with a column like + # <Column name='list_string' type=ListType(list<item: string>)> + # [ + # [], + # [ + # [ + # "G" + # ] + # ] + # ] + # + # Each column is a ChunkedArray with 2 elements + my_arrays = [pa.array(batch, type=pa.struct(cols)).flatten() + for batch in data] + my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols)) + for batch in my_arrays] + tbl = pa.Table.from_batches(my_batches, pa.schema(cols)) + _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_multiple_path_types(tempdir, use_legacy_dataset): + # Test compatibility with PEP 519 path-like objects + path = tempdir / 'zzz.parquet' + df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) + _write_table(df, path) + table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + # Test compatibility with plain string paths + path = str(tempdir) + 'zzz.parquet' + df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) + _write_table(df, path) + table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@parametrize_legacy_dataset +def test_fspath(tempdir, use_legacy_dataset): + # ARROW-12472 support __fspath__ objects without using str() + path = tempdir / "test.parquet" + table = pa.table({"a": [1, 2, 3]}) + _write_table(table, path) + + fs_protocol_obj = util.FSProtocolClass(path) + + result = _read_table( + fs_protocol_obj, use_legacy_dataset=use_legacy_dataset + ) + assert result.equals(table) + + # combined with non-local filesystem raises + with pytest.raises(TypeError): + _read_table(fs_protocol_obj, filesystem=FileSystem()) + + +@pytest.mark.dataset +@parametrize_legacy_dataset +@pytest.mark.parametrize("filesystem", [ + None, fs.LocalFileSystem(), LocalFileSystem._get_instance() +]) +def test_relative_paths(tempdir, use_legacy_dataset, filesystem): + # reading and writing from relative paths + table = pa.table({"a": [1, 2, 3]}) + + # reading + pq.write_table(table, str(tempdir / "data.parquet")) + with util.change_cwd(tempdir): + result = pq.read_table("data.parquet", filesystem=filesystem, + use_legacy_dataset=use_legacy_dataset) + assert result.equals(table) + + # writing + with util.change_cwd(tempdir): + pq.write_table(table, "data2.parquet", filesystem=filesystem) + result = pq.read_table(tempdir / "data2.parquet") + assert result.equals(table) + + +def test_read_non_existing_file(): + # ensure we have a proper error message + with pytest.raises(FileNotFoundError): + pq.read_table('i-am-not-existing.parquet') + + +def test_file_error_python_exception(): + class BogusFile(io.BytesIO): + def read(self, *args): + raise ZeroDivisionError("zorglub") + + def seek(self, *args): + raise ZeroDivisionError("zorglub") + + # ensure the Python exception is restored + with pytest.raises(ZeroDivisionError, match="zorglub"): + pq.read_table(BogusFile(b"")) + + +@parametrize_legacy_dataset +def test_parquet_read_from_buffer(tempdir, use_legacy_dataset): + # reading from a buffer from python's open() + table = pa.table({"a": [1, 2, 3]}) + pq.write_table(table, str(tempdir / "data.parquet")) + + with open(str(tempdir / "data.parquet"), "rb") as f: + result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset) + assert result.equals(table) + + with open(str(tempdir / "data.parquet"), "rb") as f: + result = pq.read_table(pa.PythonFile(f), + use_legacy_dataset=use_legacy_dataset) + assert result.equals(table) + + +@parametrize_legacy_dataset +def test_byte_stream_split(use_legacy_dataset): + # This is only a smoke test. + arr_float = pa.array(list(map(float, range(100)))) + arr_int = pa.array(list(map(int, range(100)))) + data_float = [arr_float, arr_float] + table = pa.Table.from_arrays(data_float, names=['a', 'b']) + + # Check with byte_stream_split for both columns. + _check_roundtrip(table, expected=table, compression="gzip", + use_dictionary=False, use_byte_stream_split=True) + + # Check with byte_stream_split for column 'b' and dictionary + # for column 'a'. + _check_roundtrip(table, expected=table, compression="gzip", + use_dictionary=['a'], + use_byte_stream_split=['b']) + + # Check with a collision for both columns. + _check_roundtrip(table, expected=table, compression="gzip", + use_dictionary=['a', 'b'], + use_byte_stream_split=['a', 'b']) + + # Check with mixed column types. + mixed_table = pa.Table.from_arrays([arr_float, arr_int], + names=['a', 'b']) + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=['b'], + use_byte_stream_split=['a']) + + # Try to use the wrong data type with the byte_stream_split encoding. + # This should throw an exception. + table = pa.Table.from_arrays([arr_int], names=['tmp']) + with pytest.raises(IOError): + _check_roundtrip(table, expected=table, use_byte_stream_split=True, + use_dictionary=False, + use_legacy_dataset=use_legacy_dataset) + + +@parametrize_legacy_dataset +def test_compression_level(use_legacy_dataset): + arr = pa.array(list(map(int, range(1000)))) + data = [arr, arr] + table = pa.Table.from_arrays(data, names=['a', 'b']) + + # Check one compression level. + _check_roundtrip(table, expected=table, compression="gzip", + compression_level=1, + use_legacy_dataset=use_legacy_dataset) + + # Check another one to make sure that compression_level=1 does not + # coincide with the default one in Arrow. + _check_roundtrip(table, expected=table, compression="gzip", + compression_level=5, + use_legacy_dataset=use_legacy_dataset) + + # Check that the user can provide a compression per column + _check_roundtrip(table, expected=table, + compression={'a': "gzip", 'b': "snappy"}, + use_legacy_dataset=use_legacy_dataset) + + # Check that the user can provide a compression level per column + _check_roundtrip(table, expected=table, compression="gzip", + compression_level={'a': 2, 'b': 3}, + use_legacy_dataset=use_legacy_dataset) + + # Check that specifying a compression level for a codec which does allow + # specifying one, results into an error. + # Uncompressed, snappy, lz4 and lzo do not support specifying a compression + # level. + # GZIP (zlib) allows for specifying a compression level but as of up + # to version 1.2.11 the valid range is [-1, 9]. + invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337), + ("None", 444), ("lzo", 14)] + buf = io.BytesIO() + for (codec, level) in invalid_combinations: + with pytest.raises((ValueError, OSError)): + _write_table(table, buf, compression=codec, + compression_level=level) + + +def test_sanitized_spark_field_names(): + a0 = pa.array([0, 1, 2, 3, 4]) + name = 'prohib; ,\t{}' + table = pa.Table.from_arrays([a0], [name]) + + result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'}) + + expected_name = 'prohib______' + assert result.schema[0].name == expected_name + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_multithreaded_read(use_legacy_dataset): + df = alltypes_sample(size=10000) + + table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(table, buf, compression='SNAPPY', version='2.6') + + buf.seek(0) + table1 = _read_table( + buf, use_threads=True, use_legacy_dataset=use_legacy_dataset) + + buf.seek(0) + table2 = _read_table( + buf, use_threads=False, use_legacy_dataset=use_legacy_dataset) + + assert table1.equals(table2) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_min_chunksize(use_legacy_dataset): + data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D']) + table = pa.Table.from_pandas(data.reset_index()) + + buf = io.BytesIO() + _write_table(table, buf, chunk_size=-1) + + buf.seek(0) + result = _read_table(buf, use_legacy_dataset=use_legacy_dataset) + + assert result.equals(table) + + with pytest.raises(ValueError): + _write_table(table, buf, chunk_size=0) + + +@pytest.mark.pandas +def test_write_error_deletes_incomplete_file(tempdir): + # ARROW-1285 + df = pd.DataFrame({'a': list('abc'), + 'b': list(range(1, 4)), + 'c': np.arange(3, 6).astype('u1'), + 'd': np.arange(4.0, 7.0, dtype='float64'), + 'e': [True, False, True], + 'f': pd.Categorical(list('abc')), + 'g': pd.date_range('20130101', periods=3), + 'h': pd.date_range('20130101', periods=3, + tz='US/Eastern'), + 'i': pd.date_range('20130101', periods=3, freq='ns')}) + + pdf = pa.Table.from_pandas(df) + + filename = tempdir / 'tmp_file' + try: + _write_table(pdf, filename) + except pa.ArrowException: + pass + + assert not filename.exists() + + +@parametrize_legacy_dataset +def test_read_non_existent_file(tempdir, use_legacy_dataset): + path = 'non-existent-file.parquet' + try: + pq.read_table(path, use_legacy_dataset=use_legacy_dataset) + except Exception as e: + assert path in e.args[0] + + +@parametrize_legacy_dataset +def test_read_table_doesnt_warn(datadir, use_legacy_dataset): + with pytest.warns(None) as record: + pq.read_table(datadir / 'v0.7.1.parquet', + use_legacy_dataset=use_legacy_dataset) + + assert len(record) == 0 + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_zlib_compression_bug(use_legacy_dataset): + # ARROW-3514: "zlib deflate failed, output buffer too small" + table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col']) + f = io.BytesIO() + pq.write_table(table, f, compression='gzip') + + f.seek(0) + roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset) + tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas()) + + +@parametrize_legacy_dataset +def test_parquet_file_too_small(tempdir, use_legacy_dataset): + path = str(tempdir / "test.parquet") + # TODO(dataset) with datasets API it raises OSError instead + with pytest.raises((pa.ArrowInvalid, OSError), + match='size is 0 bytes'): + with open(path, 'wb') as f: + pass + pq.read_table(path, use_legacy_dataset=use_legacy_dataset) + + with pytest.raises((pa.ArrowInvalid, OSError), + match='size is 4 bytes'): + with open(path, 'wb') as f: + f.write(b'ffff') + pq.read_table(path, use_legacy_dataset=use_legacy_dataset) + + +@pytest.mark.pandas +@pytest.mark.fastparquet +@pytest.mark.filterwarnings("ignore:RangeIndex:FutureWarning") +@pytest.mark.filterwarnings("ignore:tostring:DeprecationWarning:fastparquet") +def test_fastparquet_cross_compatibility(tempdir): + fp = pytest.importorskip('fastparquet') + + df = pd.DataFrame( + { + "a": list("abc"), + "b": list(range(1, 4)), + "c": np.arange(4.0, 7.0, dtype="float64"), + "d": [True, False, True], + "e": pd.date_range("20130101", periods=3), + "f": pd.Categorical(["a", "b", "a"]), + # fastparquet writes list as BYTE_ARRAY JSON, so no roundtrip + # "g": [[1, 2], None, [1, 2, 3]], + } + ) + table = pa.table(df) + + # Arrow -> fastparquet + file_arrow = str(tempdir / "cross_compat_arrow.parquet") + pq.write_table(table, file_arrow, compression=None) + + fp_file = fp.ParquetFile(file_arrow) + df_fp = fp_file.to_pandas() + tm.assert_frame_equal(df, df_fp) + + # Fastparquet -> arrow + file_fastparquet = str(tempdir / "cross_compat_fastparquet.parquet") + fp.write(file_fastparquet, df) + + table_fp = pq.read_pandas(file_fastparquet) + # for fastparquet written file, categoricals comes back as strings + # (no arrow schema in parquet metadata) + df['f'] = df['f'].astype(object) + tm.assert_frame_equal(table_fp.to_pandas(), df) + + +@parametrize_legacy_dataset +@pytest.mark.parametrize('array_factory', [ + lambda: pa.array([0, None] * 10), + lambda: pa.array([0, None] * 10).dictionary_encode(), + lambda: pa.array(["", None] * 10), + lambda: pa.array(["", None] * 10).dictionary_encode(), +]) +@pytest.mark.parametrize('use_dictionary', [False, True]) +@pytest.mark.parametrize('read_dictionary', [False, True]) +def test_buffer_contents( + array_factory, use_dictionary, read_dictionary, use_legacy_dataset +): + # Test that null values are deterministically initialized to zero + # after a roundtrip through Parquet. + # See ARROW-8006 and ARROW-8011. + orig_table = pa.Table.from_pydict({"col": array_factory()}) + bio = io.BytesIO() + pq.write_table(orig_table, bio, use_dictionary=True) + bio.seek(0) + read_dictionary = ['col'] if read_dictionary else None + table = pq.read_table(bio, use_threads=False, + read_dictionary=read_dictionary, + use_legacy_dataset=use_legacy_dataset) + + for col in table.columns: + [chunk] = col.chunks + buf = chunk.buffers()[1] + assert buf.to_pybytes() == buf.size * b"\0" + + +def test_parquet_compression_roundtrip(tempdir): + # ARROW-10480: ensure even with nonstandard Parquet file naming + # conventions, writing and then reading a file works. In + # particular, ensure that we don't automatically double-compress + # the stream due to auto-detecting the extension in the filename + table = pa.table([pa.array(range(4))], names=["ints"]) + path = tempdir / "arrow-10480.pyarrow.gz" + pq.write_table(table, path, compression="GZIP") + result = pq.read_table(path) + assert result.equals(table) + + +def test_empty_row_groups(tempdir): + # ARROW-3020 + table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0']) + + path = tempdir / 'empty_row_groups.parquet' + + num_groups = 3 + with pq.ParquetWriter(path, table.schema) as writer: + for i in range(num_groups): + writer.write_table(table) + + reader = pq.ParquetFile(path) + assert reader.metadata.num_row_groups == num_groups + + for i in range(num_groups): + assert reader.read_row_group(i).equals(table) + + +def test_reads_over_batch(tempdir): + data = [None] * (1 << 20) + data.append([1]) + # Large list<int64> with mostly nones and one final + # value. This should force batched reads when + # reading back. + table = pa.Table.from_arrays([data], ['column']) + + path = tempdir / 'arrow-11607.parquet' + pq.write_table(table, path) + table2 = pq.read_table(path) + assert table == table2 diff --git a/src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py b/src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py new file mode 100644 index 000000000..91bc08df6 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_compliant_nested_type.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +import pyarrow as pa +from pyarrow.tests.parquet.common import parametrize_legacy_dataset + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import (_read_table, + _check_roundtrip) +except ImportError: + pq = None + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe +except ImportError: + pd = tm = None + +pytestmark = pytest.mark.parquet + +# Tests for ARROW-11497 +_test_data_simple = [ + {'items': [1, 2]}, + {'items': [0]}, +] + +_test_data_complex = [ + {'items': [{'name': 'elem1', 'value': '1'}, + {'name': 'elem2', 'value': '2'}]}, + {'items': [{'name': 'elem1', 'value': '0'}]}, +] + +parametrize_test_data = pytest.mark.parametrize( + "test_data", [_test_data_simple, _test_data_complex]) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@parametrize_test_data +def test_write_compliant_nested_type_enable(tempdir, + use_legacy_dataset, test_data): + # prepare dataframe for testing + df = pd.DataFrame(data=test_data) + # verify that we can read/write pandas df with new flag + _roundtrip_pandas_dataframe(df, + write_kwargs={ + 'use_compliant_nested_type': True}, + use_legacy_dataset=use_legacy_dataset) + + # Write to a parquet file with compliant nested type + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + with pq.ParquetWriter(path, table.schema, + use_compliant_nested_type=True, + version='2.6') as writer: + writer.write_table(table) + # Read back as a table + new_table = _read_table(path) + # Validate that "items" columns compliant to Parquet nested format + # Should be like this: list<element: struct<name: string, value: string>> + assert isinstance(new_table.schema.types[0], pa.ListType) + assert new_table.schema.types[0].value_field.name == 'element' + + # Verify that the new table can be read/written correctly + _check_roundtrip(new_table, + use_legacy_dataset=use_legacy_dataset, + use_compliant_nested_type=True) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@parametrize_test_data +def test_write_compliant_nested_type_disable(tempdir, + use_legacy_dataset, test_data): + # prepare dataframe for testing + df = pd.DataFrame(data=test_data) + # verify that we can read/write with new flag disabled (default behaviour) + _roundtrip_pandas_dataframe(df, write_kwargs={}, + use_legacy_dataset=use_legacy_dataset) + + # Write to a parquet file while disabling compliant nested type + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + with pq.ParquetWriter(path, table.schema, version='2.6') as writer: + writer.write_table(table) + new_table = _read_table(path) + + # Validate that "items" columns is not compliant to Parquet nested format + # Should be like this: list<item: struct<name: string, value: string>> + assert isinstance(new_table.schema.types[0], pa.ListType) + assert new_table.schema.types[0].value_field.name == 'item' + + # Verify that the new table can be read/written correctly + _check_roundtrip(new_table, + use_legacy_dataset=use_legacy_dataset, + use_compliant_nested_type=False) diff --git a/src/arrow/python/pyarrow/tests/parquet/test_data_types.py b/src/arrow/python/pyarrow/tests/parquet/test_data_types.py new file mode 100644 index 000000000..1e2660006 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_data_types.py @@ -0,0 +1,529 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import decimal +import io + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow.tests import util +from pyarrow.tests.parquet.common import (_check_roundtrip, + parametrize_legacy_dataset) + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _read_table, _write_table +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.pandas_examples import (dataframe_with_arrays, + dataframe_with_lists) + from pyarrow.tests.parquet.common import alltypes_sample +except ImportError: + pd = tm = None + + +pytestmark = pytest.mark.parquet + + +# General roundtrip of data types +# ----------------------------------------------------------------------------- + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@pytest.mark.parametrize('chunk_size', [None, 1000]) +def test_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset): + df = alltypes_sample(size=10000, categorical=True) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + assert arrow_table.schema.pandas_metadata is not None + + _write_table(arrow_table, filename, version='2.6', + coerce_timestamps='ms', chunk_size=chunk_size) + table_read = pq.read_pandas( + filename, use_legacy_dataset=use_legacy_dataset) + assert table_read.schema.pandas_metadata is not None + + read_metadata = table_read.schema.metadata + assert arrow_table.schema.metadata == read_metadata + + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_parquet_1_0_roundtrip(tempdir, use_legacy_dataset): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'str': [str(x) for x in range(size)], + 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None], + 'empty_str': [''] * size + }) + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + _write_table(arrow_table, filename, version='1.0') + table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + + # We pass uint32_t as int64_t if we write Parquet version 1.0 + df['uint32'] = df['uint32'].values.astype(np.int64) + + tm.assert_frame_equal(df, df_read) + + +# Dictionary +# ----------------------------------------------------------------------------- + + +def _simple_table_write_read(table, use_legacy_dataset): + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + contents = bio.getvalue() + return pq.read_table( + pa.BufferReader(contents), use_legacy_dataset=use_legacy_dataset + ) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_direct_read_dictionary(use_legacy_dataset): + # ARROW-3325 + repeats = 10 + nunique = 5 + + data = [ + [util.rands(10) for i in range(nunique)] * repeats, + + ] + table = pa.table(data, names=['f0']) + + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + contents = bio.getvalue() + + result = pq.read_table(pa.BufferReader(contents), + read_dictionary=['f0'], + use_legacy_dataset=use_legacy_dataset) + + # Compute dictionary-encoded subfield + expected = pa.table([table[0].dictionary_encode()], names=['f0']) + assert result.equals(expected) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_direct_read_dictionary_subfield(use_legacy_dataset): + repeats = 10 + nunique = 5 + + data = [ + [[util.rands(10)] for i in range(nunique)] * repeats, + ] + table = pa.table(data, names=['f0']) + + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + contents = bio.getvalue() + result = pq.read_table(pa.BufferReader(contents), + read_dictionary=['f0.list.item'], + use_legacy_dataset=use_legacy_dataset) + + arr = pa.array(data[0]) + values_as_dict = arr.values.dictionary_encode() + + inner_indices = values_as_dict.indices.cast('int32') + new_values = pa.DictionaryArray.from_arrays(inner_indices, + values_as_dict.dictionary) + + offsets = pa.array(range(51), type='int32') + expected_arr = pa.ListArray.from_arrays(offsets, new_values) + expected = pa.table([expected_arr], names=['f0']) + + assert result.equals(expected) + assert result[0].num_chunks == 1 + + +@parametrize_legacy_dataset +def test_dictionary_array_automatically_read(use_legacy_dataset): + # ARROW-3246 + + # Make a large dictionary, a little over 4MB of data + dict_length = 4000 + dict_values = pa.array([('x' * 1000 + '_{}'.format(i)) + for i in range(dict_length)]) + + num_chunks = 10 + chunk_size = 100 + chunks = [] + for i in range(num_chunks): + indices = np.random.randint(0, dict_length, + size=chunk_size).astype(np.int32) + chunks.append(pa.DictionaryArray.from_arrays(pa.array(indices), + dict_values)) + + table = pa.table([pa.chunked_array(chunks)], names=['f0']) + result = _simple_table_write_read(table, use_legacy_dataset) + + assert result.equals(table) + + # The only key in the metadata was the Arrow schema key + assert result.schema.metadata is None + + +# Decimal +# ----------------------------------------------------------------------------- + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_decimal_roundtrip(tempdir, use_legacy_dataset): + num_values = 10 + + columns = {} + for precision in range(1, 39): + for scale in range(0, precision + 1): + with util.random_seed(0): + random_decimal_values = [ + util.randdecimal(precision, scale) + for _ in range(num_values) + ] + column_name = ('dec_precision_{:d}_scale_{:d}' + .format(precision, scale)) + columns[column_name] = random_decimal_values + + expected = pd.DataFrame(columns) + filename = tempdir / 'decimals.parquet' + string_filename = str(filename) + table = pa.Table.from_pandas(expected) + _write_table(table, string_filename) + result_table = _read_table( + string_filename, use_legacy_dataset=use_legacy_dataset) + result = result_table.to_pandas() + tm.assert_frame_equal(result, expected) + + +@pytest.mark.pandas +@pytest.mark.xfail( + raises=OSError, reason='Parquet does not support negative scale' +) +def test_decimal_roundtrip_negative_scale(tempdir): + expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]}) + filename = tempdir / 'decimals.parquet' + string_filename = str(filename) + t = pa.Table.from_pandas(expected) + _write_table(t, string_filename) + result_table = _read_table(string_filename) + result = result_table.to_pandas() + tm.assert_frame_equal(result, expected) + + +# List types +# ----------------------------------------------------------------------------- + + +@parametrize_legacy_dataset +@pytest.mark.parametrize('dtype', [int, float]) +def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset): + filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__) + data = [pa.array(list(map(dtype, range(5))))] + table = pa.Table.from_arrays(data, names=['a']) + _write_table(table, filename) + table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) + for i in range(table.num_columns): + col_written = table[i] + col_read = table_read[i] + assert table.field(i).name == table_read.field(i).name + assert col_read.num_chunks == 1 + data_written = col_written.chunk(0) + data_read = col_read.chunk(0) + assert data_written.equals(data_read) + + +@parametrize_legacy_dataset +def test_empty_lists_table_roundtrip(use_legacy_dataset): + # ARROW-2744: Shouldn't crash when writing an array of empty lists + arr = pa.array([[], []], type=pa.list_(pa.int32())) + table = pa.Table.from_arrays([arr], ["A"]) + _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset) + + +@parametrize_legacy_dataset +def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset): + # Reproduce failure in ARROW-5630 + typ = pa.list_(pa.field("item", pa.float32(), False)) + num_rows = 10000 + t = pa.table([ + pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] * + (num_rows // 10)), type=typ) + ], ['a']) + _check_roundtrip( + t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset) + + +@parametrize_legacy_dataset +def test_nested_list_struct_multiple_batches_roundtrip( + tempdir, use_legacy_dataset +): + # Reproduce failure in ARROW-11024 + data = [[{'x': 'abc', 'y': 'abc'}]]*100 + [[{'x': 'abc', 'y': 'gcb'}]]*100 + table = pa.table([pa.array(data)], names=['column']) + _check_roundtrip( + table, row_group_size=20, use_legacy_dataset=use_legacy_dataset) + + # Reproduce failure in ARROW-11069 (plain non-nested structs with strings) + data = pa.array( + [{'a': '1', 'b': '2'}, {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'}]*10 + ) + table = pa.table({'column': data}) + _check_roundtrip( + table, row_group_size=10, use_legacy_dataset=use_legacy_dataset) + + +def test_writing_empty_lists(): + # ARROW-2591: [Python] Segmentation fault issue in pq.write_table + arr1 = pa.array([[], []], pa.list_(pa.int32())) + table = pa.Table.from_arrays([arr1], ['list(int32)']) + _check_roundtrip(table) + + +@pytest.mark.pandas +def test_column_of_arrays(tempdir): + df, schema = dataframe_with_arrays() + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df, schema=schema) + _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms') + table_read = _read_table(filename) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +def test_column_of_lists(tempdir): + df, schema = dataframe_with_lists(parquet_compatible=True) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df, schema=schema) + _write_table(arrow_table, filename, version='2.6') + table_read = _read_table(filename) + df_read = table_read.to_pandas() + + tm.assert_frame_equal(df, df_read) + + +def test_large_list_records(): + # This was fixed in PARQUET-1100 + + list_lengths = np.random.randint(0, 500, size=50) + list_lengths[::10] = 0 + + list_values = [list(map(int, np.random.randint(0, 100, size=x))) + if i % 8 else None + for i, x in enumerate(list_lengths)] + + a1 = pa.array(list_values) + + table = pa.Table.from_arrays([a1], ['int_lists']) + _check_roundtrip(table) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_parquet_nested_convenience(tempdir, use_legacy_dataset): + # ARROW-1684 + df = pd.DataFrame({ + 'a': [[1, 2, 3], None, [4, 5], []], + 'b': [[1.], None, None, [6., 7.]], + }) + + path = str(tempdir / 'nested_convenience.parquet') + + table = pa.Table.from_pandas(df, preserve_index=False) + _write_table(table, path) + + read = pq.read_table( + path, columns=['a'], use_legacy_dataset=use_legacy_dataset) + tm.assert_frame_equal(read.to_pandas(), df[['a']]) + + read = pq.read_table( + path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset) + tm.assert_frame_equal(read.to_pandas(), df) + + +# Binary +# ----------------------------------------------------------------------------- + + +def test_fixed_size_binary(): + t0 = pa.binary(10) + data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo'] + a0 = pa.array(data, type=t0) + + table = pa.Table.from_arrays([a0], + ['binary[10]']) + _check_roundtrip(table) + + +# Large types +# ----------------------------------------------------------------------------- + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_large_table_int32_overflow(): + size = np.iinfo('int32').max + 1 + + arr = np.ones(size, dtype='uint8') + + parr = pa.array(arr, type=pa.uint8()) + + table = pa.Table.from_arrays([parr], names=['one']) + f = io.BytesIO() + _write_table(table, f) + + +def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs): + stream = pa.BufferOutputStream() + _write_table(table, stream, **write_kwargs) + buf = stream.getvalue() + return _read_table(buf, use_legacy_dataset=use_legacy_dataset) + + +@pytest.mark.slow +@pytest.mark.large_memory +@parametrize_legacy_dataset +def test_byte_array_exactly_2gb(use_legacy_dataset): + # Test edge case reported in ARROW-3762 + val = b'x' * (1 << 10) + + base = pa.array([val] * ((1 << 21) - 1)) + cases = [ + [b'x' * 1023], # 2^31 - 1 + [b'x' * 1024], # 2^31 + [b'x' * 1025] # 2^31 + 1 + ] + for case in cases: + values = pa.chunked_array([base, pa.array(case)]) + t = pa.table([values], names=['f0']) + result = _simple_table_roundtrip( + t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False) + assert t.equals(result) + + +@pytest.mark.slow +@pytest.mark.pandas +@pytest.mark.large_memory +@parametrize_legacy_dataset +def test_binary_array_overflow_to_chunked(use_legacy_dataset): + # ARROW-3762 + + # 2^31 + 1 bytes + values = [b'x'] + [ + b'x' * (1 << 20) + ] * 2 * (1 << 10) + df = pd.DataFrame({'byte_col': values}) + + tbl = pa.Table.from_pandas(df, preserve_index=False) + read_tbl = _simple_table_roundtrip( + tbl, use_legacy_dataset=use_legacy_dataset) + + col0_data = read_tbl[0] + assert isinstance(col0_data, pa.ChunkedArray) + + # Split up into 2GB chunks + assert col0_data.num_chunks == 2 + + assert tbl.equals(read_tbl) + + +@pytest.mark.slow +@pytest.mark.pandas +@pytest.mark.large_memory +@parametrize_legacy_dataset +def test_list_of_binary_large_cell(use_legacy_dataset): + # ARROW-4688 + data = [] + + # TODO(wesm): handle chunked children + # 2^31 - 1 bytes in a single cell + # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)]) + + # A little under 2GB in cell each containing approximately 10MB each + data.extend([[b'x' * 1000000] * 10] * 214) + + arr = pa.array(data) + table = pa.Table.from_arrays([arr], ['chunky_cells']) + read_table = _simple_table_roundtrip( + table, use_legacy_dataset=use_legacy_dataset) + assert table.equals(read_table) + + +def test_large_binary(): + data = [b'foo', b'bar'] * 50 + for type in [pa.large_binary(), pa.large_string()]: + arr = pa.array(data, type=type) + table = pa.Table.from_arrays([arr], names=['strs']) + for use_dictionary in [False, True]: + _check_roundtrip(table, use_dictionary=use_dictionary) + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_large_binary_huge(): + s = b'xy' * 997 + data = [s] * ((1 << 33) // len(s)) + for type in [pa.large_binary(), pa.large_string()]: + arr = pa.array(data, type=type) + table = pa.Table.from_arrays([arr], names=['strs']) + for use_dictionary in [False, True]: + _check_roundtrip(table, use_dictionary=use_dictionary) + del arr, table + + +@pytest.mark.large_memory +def test_large_binary_overflow(): + s = b'x' * (1 << 31) + arr = pa.array([s], type=pa.large_binary()) + table = pa.Table.from_arrays([arr], names=['strs']) + for use_dictionary in [False, True]: + writer = pa.BufferOutputStream() + with pytest.raises( + pa.ArrowInvalid, + match="Parquet cannot store strings with size 2GB or more"): + _write_table(table, writer, use_dictionary=use_dictionary) diff --git a/src/arrow/python/pyarrow/tests/parquet/test_dataset.py b/src/arrow/python/pyarrow/tests/parquet/test_dataset.py new file mode 100644 index 000000000..82f7e5814 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_dataset.py @@ -0,0 +1,1661 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +import os + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow import fs +from pyarrow.filesystem import LocalFileSystem +from pyarrow.tests import util +from pyarrow.tests.parquet.common import ( + parametrize_legacy_dataset, parametrize_legacy_dataset_fixed, + parametrize_legacy_dataset_not_supported) +from pyarrow.util import guid +from pyarrow.vendored.version import Version + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import ( + _read_table, _test_dataframe, _write_table) +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + +except ImportError: + pd = tm = None + +pytestmark = pytest.mark.parquet + + +@pytest.mark.pandas +def test_parquet_piece_read(tempdir): + df = _test_dataframe(1000) + table = pa.Table.from_pandas(df) + + path = tempdir / 'parquet_piece_read.parquet' + _write_table(table, path, version='2.6') + + with pytest.warns(DeprecationWarning): + piece1 = pq.ParquetDatasetPiece(path) + + result = piece1.read() + assert result.equals(table) + + +@pytest.mark.pandas +def test_parquet_piece_open_and_get_metadata(tempdir): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df) + + path = tempdir / 'parquet_piece_read.parquet' + _write_table(table, path, version='2.6') + + with pytest.warns(DeprecationWarning): + piece = pq.ParquetDatasetPiece(path) + table1 = piece.read() + assert isinstance(table1, pa.Table) + meta1 = piece.get_metadata() + assert isinstance(meta1, pq.FileMetaData) + + assert table.equals(table1) + + +@pytest.mark.filterwarnings("ignore:ParquetDatasetPiece:DeprecationWarning") +def test_parquet_piece_basics(): + path = '/baz.parq' + + piece1 = pq.ParquetDatasetPiece(path) + piece2 = pq.ParquetDatasetPiece(path, row_group=1) + piece3 = pq.ParquetDatasetPiece( + path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)]) + + assert str(piece1) == path + assert str(piece2) == '/baz.parq | row_group=1' + assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1' + + assert piece1 == piece1 + assert piece2 == piece2 + assert piece3 == piece3 + assert piece1 != piece3 + + +def test_partition_set_dictionary_type(): + set1 = pq.PartitionSet('key1', ['foo', 'bar', 'baz']) + set2 = pq.PartitionSet('key2', [2007, 2008, 2009]) + + assert isinstance(set1.dictionary, pa.StringArray) + assert isinstance(set2.dictionary, pa.IntegerArray) + + set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)]) + with pytest.raises(TypeError): + set3.dictionary + + +@parametrize_legacy_dataset_fixed +def test_filesystem_uri(tempdir, use_legacy_dataset): + table = pa.table({"a": [1, 2, 3]}) + + directory = tempdir / "data_dir" + directory.mkdir() + path = directory / "data.parquet" + pq.write_table(table, str(path)) + + # filesystem object + result = pq.read_table( + path, filesystem=fs.LocalFileSystem(), + use_legacy_dataset=use_legacy_dataset) + assert result.equals(table) + + # filesystem URI + result = pq.read_table( + "data_dir/data.parquet", filesystem=util._filesystem_uri(tempdir), + use_legacy_dataset=use_legacy_dataset) + assert result.equals(table) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_read_partitioned_directory(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset) + + +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") +@pytest.mark.pandas +def test_create_parquet_dataset_multi_threaded(tempdir): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + _partition_test_for_filesystem(fs, base_path) + + manifest = pq.ParquetManifest(base_path, filesystem=fs, + metadata_nthreads=1) + dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16) + assert len(dataset.pieces) > 0 + partitions = dataset.partitions + assert len(partitions.partition_names) > 0 + assert partitions.partition_names == manifest.partitions.partition_names + assert len(partitions.levels) == len(manifest.partitions.levels) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_read_partitioned_columns_selection(tempdir, use_legacy_dataset): + # ARROW-3861 - do not include partition columns in resulting table when + # `columns` keyword was passed without those columns + fs = LocalFileSystem._get_instance() + base_path = tempdir + _partition_test_for_filesystem(fs, base_path) + + dataset = pq.ParquetDataset( + base_path, use_legacy_dataset=use_legacy_dataset) + result = dataset.read(columns=["values"]) + if use_legacy_dataset: + # ParquetDataset implementation always includes the partition columns + # automatically, and we can't easily "fix" this since dask relies on + # this behaviour (ARROW-8644) + assert result.column_names == ["values", "foo", "bar"] + else: + assert result.column_names == ["values"] + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_filters_equivalency(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1] + string_keys = ['a', 'b', 'c'] + boolean_keys = [True, False] + partition_spec = [ + ['integer', integer_keys], + ['string', string_keys], + ['boolean', boolean_keys] + ] + + df = pd.DataFrame({ + 'integer': np.array(integer_keys, dtype='i4').repeat(15), + 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2), + 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5), + 3), + }, columns=['integer', 'string', 'boolean']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + # Old filters syntax: + # integer == 1 AND string != b AND boolean == True + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[('integer', '=', 1), ('string', '!=', 'b'), + ('boolean', '==', 'True')], + use_legacy_dataset=use_legacy_dataset, + ) + table = dataset.read() + result_df = (table.to_pandas().reset_index(drop=True)) + + assert 0 not in result_df['integer'].values + assert 'b' not in result_df['string'].values + assert False not in result_df['boolean'].values + + # filters in disjunctive normal form: + # (integer == 1 AND string != b AND boolean == True) OR + # (integer == 2 AND boolean == False) + # TODO(ARROW-3388): boolean columns are reconstructed as string + filters = [ + [ + ('integer', '=', 1), + ('string', '!=', 'b'), + ('boolean', '==', 'True') + ], + [('integer', '=', 0), ('boolean', '==', 'False')] + ] + dataset = pq.ParquetDataset( + base_path, filesystem=fs, filters=filters, + use_legacy_dataset=use_legacy_dataset) + table = dataset.read() + result_df = table.to_pandas().reset_index(drop=True) + + # Check that all rows in the DF fulfill the filter + # Pandas 0.23.x has problems with indexing constant memoryviews in + # categoricals. Thus we need to make an explicit copy here with np.array. + df_filter_1 = (np.array(result_df['integer']) == 1) \ + & (np.array(result_df['string']) != 'b') \ + & (np.array(result_df['boolean']) == 'True') + df_filter_2 = (np.array(result_df['integer']) == 0) \ + & (np.array(result_df['boolean']) == 'False') + assert df_filter_1.sum() > 0 + assert df_filter_2.sum() > 0 + assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum()) + + if use_legacy_dataset: + # Check for \0 in predicate values. Until they are correctly + # implemented in ARROW-3391, they would otherwise lead to weird + # results with the current code. + with pytest.raises(NotImplementedError): + filters = [[('string', '==', b'1\0a')]] + pq.ParquetDataset(base_path, filesystem=fs, filters=filters) + with pytest.raises(NotImplementedError): + filters = [[('string', '==', '1\0a')]] + pq.ParquetDataset(base_path, filesystem=fs, filters=filters) + else: + for filters in [[[('string', '==', b'1\0a')]], + [[('string', '==', '1\0a')]]]: + dataset = pq.ParquetDataset( + base_path, filesystem=fs, filters=filters, + use_legacy_dataset=False) + assert dataset.read().num_rows == 0 + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_filters_cutoff_exclusive_integer(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1, 2, 3, 4] + partition_spec = [ + ['integers', integer_keys], + ] + N = 5 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'integers': np.array(integer_keys, dtype='i4'), + }, columns=['index', 'integers']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[ + ('integers', '<', 4), + ('integers', '>', 1), + ], + use_legacy_dataset=use_legacy_dataset + ) + table = dataset.read() + result_df = (table.to_pandas() + .sort_values(by='index') + .reset_index(drop=True)) + + result_list = [x for x in map(int, result_df['integers'].values)] + assert result_list == [2, 3] + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@pytest.mark.xfail( + # different error with use_legacy_datasets because result_df is no longer + # categorical + raises=(TypeError, AssertionError), + reason='Loss of type information in creation of categoricals.' +) +def test_filters_cutoff_exclusive_datetime(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + date_keys = [ + datetime.date(2018, 4, 9), + datetime.date(2018, 4, 10), + datetime.date(2018, 4, 11), + datetime.date(2018, 4, 12), + datetime.date(2018, 4, 13) + ] + partition_spec = [ + ['dates', date_keys] + ] + N = 5 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'dates': np.array(date_keys, dtype='datetime64'), + }, columns=['index', 'dates']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[ + ('dates', '<', "2018-04-12"), + ('dates', '>', "2018-04-10") + ], + use_legacy_dataset=use_legacy_dataset + ) + table = dataset.read() + result_df = (table.to_pandas() + .sort_values(by='index') + .reset_index(drop=True)) + + expected = pd.Categorical( + np.array([datetime.date(2018, 4, 11)], dtype='datetime64'), + categories=np.array(date_keys, dtype='datetime64')) + + assert result_df['dates'].values == expected + + +@pytest.mark.pandas +@pytest.mark.dataset +def test_filters_inclusive_datetime(tempdir): + # ARROW-11480 + path = tempdir / 'timestamps.parquet' + + pd.DataFrame({ + "dates": pd.date_range("2020-01-01", periods=10, freq="D"), + "id": range(10) + }).to_parquet(path, use_deprecated_int96_timestamps=True) + + table = pq.read_table(path, filters=[ + ("dates", "<=", datetime.datetime(2020, 1, 5)) + ]) + + assert table.column('id').to_pylist() == [0, 1, 2, 3, 4] + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_filters_inclusive_integer(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1, 2, 3, 4] + partition_spec = [ + ['integers', integer_keys], + ] + N = 5 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'integers': np.array(integer_keys, dtype='i4'), + }, columns=['index', 'integers']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[ + ('integers', '<=', 3), + ('integers', '>=', 2), + ], + use_legacy_dataset=use_legacy_dataset + ) + table = dataset.read() + result_df = (table.to_pandas() + .sort_values(by='index') + .reset_index(drop=True)) + + result_list = [int(x) for x in map(int, result_df['integers'].values)] + assert result_list == [2, 3] + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_filters_inclusive_set(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1] + string_keys = ['a', 'b', 'c'] + boolean_keys = [True, False] + partition_spec = [ + ['integer', integer_keys], + ['string', string_keys], + ['boolean', boolean_keys] + ] + + df = pd.DataFrame({ + 'integer': np.array(integer_keys, dtype='i4').repeat(15), + 'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2), + 'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5), + 3), + }, columns=['integer', 'string', 'boolean']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[('string', 'in', 'ab')], + use_legacy_dataset=use_legacy_dataset + ) + table = dataset.read() + result_df = (table.to_pandas().reset_index(drop=True)) + + assert 'a' in result_df['string'].values + assert 'b' in result_df['string'].values + assert 'c' not in result_df['string'].values + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, + filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')), + ('boolean', 'not in', {False})], + use_legacy_dataset=use_legacy_dataset + ) + table = dataset.read() + result_df = (table.to_pandas().reset_index(drop=True)) + + assert 0 not in result_df['integer'].values + assert 'c' not in result_df['string'].values + assert False not in result_df['boolean'].values + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_filters_invalid_pred_op(tempdir, use_legacy_dataset): + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1, 2, 3, 4] + partition_spec = [ + ['integers', integer_keys], + ] + N = 5 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'integers': np.array(integer_keys, dtype='i4'), + }, columns=['index', 'integers']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + with pytest.raises(TypeError): + pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', 'in', 3), ], + use_legacy_dataset=use_legacy_dataset) + + with pytest.raises(ValueError): + pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', '=<', 3), ], + use_legacy_dataset=use_legacy_dataset) + + if use_legacy_dataset: + with pytest.raises(ValueError): + pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', 'in', set()), ], + use_legacy_dataset=use_legacy_dataset) + else: + # Dataset API returns empty table instead + dataset = pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', 'in', set()), ], + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().num_rows == 0 + + if use_legacy_dataset: + with pytest.raises(ValueError): + pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', '!=', {3})], + use_legacy_dataset=use_legacy_dataset) + else: + dataset = pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', '!=', {3})], + use_legacy_dataset=use_legacy_dataset) + with pytest.raises(NotImplementedError): + assert dataset.read().num_rows == 0 + + +@pytest.mark.pandas +@parametrize_legacy_dataset_fixed +def test_filters_invalid_column(tempdir, use_legacy_dataset): + # ARROW-5572 - raise error on invalid name in filter specification + # works with new dataset / xfail with legacy implementation + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1, 2, 3, 4] + partition_spec = [['integers', integer_keys]] + N = 5 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'integers': np.array(integer_keys, dtype='i4'), + }, columns=['index', 'integers']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + msg = r"No match for FieldRef.Name\(non_existent_column\)" + with pytest.raises(ValueError, match=msg): + pq.ParquetDataset(base_path, filesystem=fs, + filters=[('non_existent_column', '<', 3), ], + use_legacy_dataset=use_legacy_dataset).read() + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_filters_read_table(tempdir, use_legacy_dataset): + # test that filters keyword is passed through in read_table + fs = LocalFileSystem._get_instance() + base_path = tempdir + + integer_keys = [0, 1, 2, 3, 4] + partition_spec = [ + ['integers', integer_keys], + ] + N = 5 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'integers': np.array(integer_keys, dtype='i4'), + }, columns=['index', 'integers']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + table = pq.read_table( + base_path, filesystem=fs, filters=[('integers', '<', 3)], + use_legacy_dataset=use_legacy_dataset) + assert table.num_rows == 3 + + table = pq.read_table( + base_path, filesystem=fs, filters=[[('integers', '<', 3)]], + use_legacy_dataset=use_legacy_dataset) + assert table.num_rows == 3 + + table = pq.read_pandas( + base_path, filters=[('integers', '<', 3)], + use_legacy_dataset=use_legacy_dataset) + assert table.num_rows == 3 + + +@pytest.mark.pandas +@parametrize_legacy_dataset_fixed +def test_partition_keys_with_underscores(tempdir, use_legacy_dataset): + # ARROW-5666 - partition field values with underscores preserve underscores + # xfail with legacy dataset -> they get interpreted as integers + fs = LocalFileSystem._get_instance() + base_path = tempdir + + string_keys = ["2019_2", "2019_3"] + partition_spec = [ + ['year_week', string_keys], + ] + N = 2 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'year_week': np.array(string_keys, dtype='object'), + }, columns=['index', 'year_week']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, use_legacy_dataset=use_legacy_dataset) + result = dataset.read() + assert result.column("year_week").to_pylist() == string_keys + + +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_read_s3fs(s3_example_s3fs, use_legacy_dataset): + fs, path = s3_example_s3fs + path = path + "/test.parquet" + table = pa.table({"a": [1, 2, 3]}) + _write_table(table, path, filesystem=fs) + + result = _read_table( + path, filesystem=fs, use_legacy_dataset=use_legacy_dataset + ) + assert result.equals(table) + + +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset): + fs, directory = s3_example_s3fs + path = directory + "/test.parquet" + table = pa.table({"a": [1, 2, 3]}) + _write_table(table, path, filesystem=fs) + + result = _read_table( + directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset + ) + assert result.equals(table) + + +@pytest.mark.pandas +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_read_partitioned_directory_s3fs_wrapper( + s3_example_s3fs, use_legacy_dataset +): + import s3fs + + from pyarrow.filesystem import S3FSWrapper + + if Version(s3fs.__version__) >= Version("0.5"): + pytest.skip("S3FSWrapper no longer working for s3fs 0.5+") + + fs, path = s3_example_s3fs + with pytest.warns(FutureWarning): + wrapper = S3FSWrapper(fs) + _partition_test_for_filesystem(wrapper, path) + + # Check that we can auto-wrap + dataset = pq.ParquetDataset( + path, filesystem=fs, use_legacy_dataset=use_legacy_dataset + ) + dataset.read() + + +@pytest.mark.pandas +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset): + fs, path = s3_example_s3fs + _partition_test_for_filesystem( + fs, path, use_legacy_dataset=use_legacy_dataset + ) + + +def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True): + foo_keys = [0, 1] + bar_keys = ['a', 'b', 'c'] + partition_spec = [ + ['foo', foo_keys], + ['bar', bar_keys] + ] + N = 30 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'foo': np.array(foo_keys, dtype='i4').repeat(15), + 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2), + 'values': np.random.randn(N) + }, columns=['index', 'foo', 'bar', 'values']) + + _generate_partition_directories(fs, base_path, partition_spec, df) + + dataset = pq.ParquetDataset( + base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset) + table = dataset.read() + result_df = (table.to_pandas() + .sort_values(by='index') + .reset_index(drop=True)) + + expected_df = (df.sort_values(by='index') + .reset_index(drop=True) + .reindex(columns=result_df.columns)) + + expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys) + expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys) + + assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all() + + tm.assert_frame_equal(result_df, expected_df) + + +def _generate_partition_directories(fs, base_dir, partition_spec, df): + # partition_spec : list of lists, e.g. [['foo', [0, 1, 2], + # ['bar', ['a', 'b', 'c']] + # part_table : a pyarrow.Table to write to each partition + DEPTH = len(partition_spec) + + pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/")) + + def _visit_level(base_dir, level, part_keys): + name, values = partition_spec[level] + for value in values: + this_part_keys = part_keys + [(name, value)] + + level_dir = pathsep.join([ + str(base_dir), + '{}={}'.format(name, value) + ]) + fs.mkdir(level_dir) + + if level == DEPTH - 1: + # Generate example data + file_path = pathsep.join([level_dir, guid()]) + filtered_df = _filter_partition(df, this_part_keys) + part_table = pa.Table.from_pandas(filtered_df) + with fs.open(file_path, 'wb') as f: + _write_table(part_table, f) + assert fs.exists(file_path) + + file_success = pathsep.join([level_dir, '_SUCCESS']) + with fs.open(file_success, 'wb') as f: + pass + else: + _visit_level(level_dir, level + 1, this_part_keys) + file_success = pathsep.join([level_dir, '_SUCCESS']) + with fs.open(file_success, 'wb') as f: + pass + + _visit_level(base_dir, 0, []) + + +def _test_read_common_metadata_files(fs, base_path): + import pandas as pd + + import pyarrow.parquet as pq + + N = 100 + df = pd.DataFrame({ + 'index': np.arange(N), + 'values': np.random.randn(N) + }, columns=['index', 'values']) + + base_path = str(base_path) + data_path = os.path.join(base_path, 'data.parquet') + + table = pa.Table.from_pandas(df) + + with fs.open(data_path, 'wb') as f: + _write_table(table, f) + + metadata_path = os.path.join(base_path, '_common_metadata') + with fs.open(metadata_path, 'wb') as f: + pq.write_metadata(table.schema, f) + + dataset = pq.ParquetDataset(base_path, filesystem=fs) + assert dataset.common_metadata_path == str(metadata_path) + + with fs.open(data_path) as f: + common_schema = pq.read_metadata(f).schema + assert dataset.schema.equals(common_schema) + + # handle list of one directory + dataset2 = pq.ParquetDataset([base_path], filesystem=fs) + assert dataset2.schema.equals(dataset.schema) + + +@pytest.mark.pandas +def test_read_common_metadata_files(tempdir): + fs = LocalFileSystem._get_instance() + _test_read_common_metadata_files(fs, tempdir) + + +@pytest.mark.pandas +def test_read_metadata_files(tempdir): + fs = LocalFileSystem._get_instance() + + N = 100 + df = pd.DataFrame({ + 'index': np.arange(N), + 'values': np.random.randn(N) + }, columns=['index', 'values']) + + data_path = tempdir / 'data.parquet' + + table = pa.Table.from_pandas(df) + + with fs.open(data_path, 'wb') as f: + _write_table(table, f) + + metadata_path = tempdir / '_metadata' + with fs.open(metadata_path, 'wb') as f: + pq.write_metadata(table.schema, f) + + dataset = pq.ParquetDataset(tempdir, filesystem=fs) + assert dataset.metadata_path == str(metadata_path) + + with fs.open(data_path) as f: + metadata_schema = pq.read_metadata(f).schema + assert dataset.schema.equals(metadata_schema) + + +def _filter_partition(df, part_keys): + predicate = np.ones(len(df), dtype=bool) + + to_drop = [] + for name, value in part_keys: + to_drop.append(name) + + # to avoid pandas warning + if isinstance(value, (datetime.date, datetime.datetime)): + value = pd.Timestamp(value) + + predicate &= df[name] == value + + return df[predicate].drop(to_drop, axis=1) + + +@parametrize_legacy_dataset +@pytest.mark.pandas +def test_filter_before_validate_schema(tempdir, use_legacy_dataset): + # ARROW-4076 apply filter before schema validation + # to avoid checking unneeded schemas + + # create partitioned dataset with mismatching schemas which would + # otherwise raise if first validation all schemas + dir1 = tempdir / 'A=0' + dir1.mkdir() + table1 = pa.Table.from_pandas(pd.DataFrame({'B': [1, 2, 3]})) + pq.write_table(table1, dir1 / 'data.parquet') + + dir2 = tempdir / 'A=1' + dir2.mkdir() + table2 = pa.Table.from_pandas(pd.DataFrame({'B': ['a', 'b', 'c']})) + pq.write_table(table2, dir2 / 'data.parquet') + + # read single file using filter + table = pq.read_table(tempdir, filters=[[('A', '==', 0)]], + use_legacy_dataset=use_legacy_dataset) + assert table.column('B').equals(pa.chunked_array([[1, 2, 3]])) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_read_multiple_files(tempdir, use_legacy_dataset): + nfiles = 10 + size = 5 + + dirpath = tempdir / guid() + dirpath.mkdir() + + test_data = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(size, seed=i) + + # Hack so that we don't have a dtype cast in v1 files + df['uint32'] = df['uint32'].astype(np.int64) + + path = dirpath / '{}.parquet'.format(i) + + table = pa.Table.from_pandas(df) + _write_table(table, path) + + test_data.append(table) + paths.append(path) + + # Write a _SUCCESS.crc file + (dirpath / '_SUCCESS.crc').touch() + + def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): + dataset = pq.ParquetDataset( + paths, use_legacy_dataset=use_legacy_dataset, **kwargs) + return dataset.read(columns=columns, use_threads=use_threads) + + result = read_multiple_files(paths) + expected = pa.concat_tables(test_data) + + assert result.equals(expected) + + # Read with provided metadata + # TODO(dataset) specifying metadata not yet supported + metadata = pq.read_metadata(paths[0]) + if use_legacy_dataset: + result2 = read_multiple_files(paths, metadata=metadata) + assert result2.equals(expected) + + result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read() + assert result3.equals(expected) + else: + with pytest.raises(ValueError, match="no longer supported"): + pq.read_table(paths, metadata=metadata, use_legacy_dataset=False) + + # Read column subset + to_read = [0, 2, 6, result.num_columns - 1] + + col_names = [result.field(i).name for i in to_read] + out = pq.read_table( + dirpath, columns=col_names, use_legacy_dataset=use_legacy_dataset + ) + expected = pa.Table.from_arrays([result.column(i) for i in to_read], + names=col_names, + metadata=result.schema.metadata) + assert out.equals(expected) + + # Read with multiple threads + pq.read_table( + dirpath, use_threads=True, use_legacy_dataset=use_legacy_dataset + ) + + # Test failure modes with non-uniform metadata + bad_apple = _test_dataframe(size, seed=i).iloc[:, :4] + bad_apple_path = tempdir / '{}.parquet'.format(guid()) + + t = pa.Table.from_pandas(bad_apple) + _write_table(t, bad_apple_path) + + if not use_legacy_dataset: + # TODO(dataset) Dataset API skips bad files + return + + bad_meta = pq.read_metadata(bad_apple_path) + + with pytest.raises(ValueError): + read_multiple_files(paths + [bad_apple_path]) + + with pytest.raises(ValueError): + read_multiple_files(paths, metadata=bad_meta) + + mixed_paths = [bad_apple_path, paths[0]] + + with pytest.raises(ValueError): + read_multiple_files(mixed_paths, schema=bad_meta.schema) + + with pytest.raises(ValueError): + read_multiple_files(mixed_paths) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_read_pandas(tempdir, use_legacy_dataset): + nfiles = 5 + size = 5 + + dirpath = tempdir / guid() + dirpath.mkdir() + + test_data = [] + frames = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(size, seed=i) + df.index = np.arange(i * size, (i + 1) * size) + df.index.name = 'index' + + path = dirpath / '{}.parquet'.format(i) + + table = pa.Table.from_pandas(df) + _write_table(table, path) + test_data.append(table) + frames.append(df) + paths.append(path) + + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + columns = ['uint8', 'strings'] + result = dataset.read_pandas(columns=columns).to_pandas() + expected = pd.concat([x[columns] for x in frames]) + + tm.assert_frame_equal(result, expected) + + # also be able to pass the columns as a set (ARROW-12314) + result = dataset.read_pandas(columns=set(columns)).to_pandas() + assert result.shape == expected.shape + # column order can be different because of using a set + tm.assert_frame_equal(result.reindex(columns=expected.columns), expected) + + +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_memory_map(tempdir, use_legacy_dataset): + # ARROW-2627: Check that we can use ParquetDataset with memory-mapping + dirpath = tempdir / guid() + dirpath.mkdir() + + df = _test_dataframe(10, seed=0) + path = dirpath / '{}.parquet'.format(0) + table = pa.Table.from_pandas(df) + _write_table(table, path, version='2.6') + + dataset = pq.ParquetDataset( + dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) + if use_legacy_dataset: + assert dataset.pieces[0].read().equals(table) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + df = _test_dataframe(10, seed=0) + path = dirpath / '{}.parquet'.format(0) + table = pa.Table.from_pandas(df) + _write_table(table, path, version='2.6') + + with pytest.raises(ValueError): + pq.ParquetDataset( + dirpath, buffer_size=-64, + use_legacy_dataset=use_legacy_dataset) + + for buffer_size in [128, 1024]: + dataset = pq.ParquetDataset( + dirpath, buffer_size=buffer_size, + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + df = _test_dataframe(10, seed=0) + path = dirpath / '{}.parquet'.format(0) + table = pa.Table.from_pandas(df) + _write_table(table, path, version='2.6') + + for pre_buffer in (True, False): + dataset = pq.ParquetDataset( + dirpath, pre_buffer=pre_buffer, + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) + actual = pq.read_table(dirpath, pre_buffer=pre_buffer, + use_legacy_dataset=use_legacy_dataset) + assert actual.equals(table) + + +def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5): + test_data = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(file_nrows, seed=i) + path = base_path / '{}.parquet'.format(i) + + test_data.append(_write_table(df, path)) + paths.append(path) + return paths + + +def _assert_dataset_paths(dataset, paths, use_legacy_dataset): + if use_legacy_dataset: + assert set(map(str, paths)) == {x.path for x in dataset._pieces} + else: + paths = [str(path.as_posix()) for path in paths] + assert set(paths) == set(dataset._dataset.files) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@pytest.mark.parametrize('dir_prefix', ['_', '.']) +def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + paths = _make_example_multifile_dataset(dirpath, nfiles=10, + file_nrows=5) + + # private directory + (dirpath / '{}staging'.format(dir_prefix)).mkdir() + + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + + _assert_dataset_paths(dataset, paths, use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + paths = _make_example_multifile_dataset(dirpath, nfiles=10, + file_nrows=5) + + with (dirpath / '.DS_Store').open('wb') as f: + f.write(b'gibberish') + + with (dirpath / '.private').open('wb') as f: + f.write(b'gibberish') + + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + + _assert_dataset_paths(dataset, paths, use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + paths = _make_example_multifile_dataset(dirpath, nfiles=10, + file_nrows=5) + + with (dirpath / '_committed_123').open('wb') as f: + f.write(b'abcd') + + with (dirpath / '_started_321').open('wb') as f: + f.write(b'abcd') + + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + + _assert_dataset_paths(dataset, paths, use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@pytest.mark.parametrize('dir_prefix', ['_', '.']) +def test_ignore_no_private_directories_in_base_path( + tempdir, dir_prefix, use_legacy_dataset +): + # ARROW-8427 - don't ignore explicitly listed files if parent directory + # is a private directory + dirpath = tempdir / "{0}data".format(dir_prefix) / guid() + dirpath.mkdir(parents=True) + + paths = _make_example_multifile_dataset(dirpath, nfiles=10, + file_nrows=5) + + dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset) + _assert_dataset_paths(dataset, paths, use_legacy_dataset) + + # ARROW-9644 - don't ignore full directory with underscore in base path + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + _assert_dataset_paths(dataset, paths, use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset_fixed +def test_ignore_custom_prefixes(tempdir, use_legacy_dataset): + # ARROW-9573 - allow override of default ignore_prefixes + part = ["xxx"] * 3 + ["yyy"] * 3 + table = pa.table([ + pa.array(range(len(part))), + pa.array(part).dictionary_encode(), + ], names=['index', '_part']) + + # TODO use_legacy_dataset ARROW-10247 + pq.write_to_dataset(table, str(tempdir), partition_cols=['_part']) + + private_duplicate = tempdir / '_private_duplicate' + private_duplicate.mkdir() + pq.write_to_dataset(table, str(private_duplicate), + partition_cols=['_part']) + + read = pq.read_table( + tempdir, use_legacy_dataset=use_legacy_dataset, + ignore_prefixes=['_private']) + + assert read.equals(table) + + +@parametrize_legacy_dataset_fixed +def test_empty_directory(tempdir, use_legacy_dataset): + # ARROW-5310 - reading empty directory + # fails with legacy implementation + empty_dir = tempdir / 'dataset' + empty_dir.mkdir() + + dataset = pq.ParquetDataset( + empty_dir, use_legacy_dataset=use_legacy_dataset) + result = dataset.read() + assert result.num_rows == 0 + assert result.num_columns == 0 + + +def _test_write_to_dataset_with_partitions(base_path, + use_legacy_dataset=True, + filesystem=None, + schema=None, + index_name=None): + import pandas as pd + import pandas.testing as tm + + import pyarrow.parquet as pq + + # ARROW-1400 + output_df = pd.DataFrame({'group1': list('aaabbbbccc'), + 'group2': list('eefeffgeee'), + 'num': list(range(10)), + 'nan': [np.nan] * 10, + 'date': np.arange('2017-01-01', '2017-01-11', + dtype='datetime64[D]')}) + cols = output_df.columns.tolist() + partition_by = ['group1', 'group2'] + output_table = pa.Table.from_pandas(output_df, schema=schema, safe=False, + preserve_index=False) + pq.write_to_dataset(output_table, base_path, partition_by, + filesystem=filesystem, + use_legacy_dataset=use_legacy_dataset) + + metadata_path = os.path.join(str(base_path), '_common_metadata') + + if filesystem is not None: + with filesystem.open(metadata_path, 'wb') as f: + pq.write_metadata(output_table.schema, f) + else: + pq.write_metadata(output_table.schema, metadata_path) + + # ARROW-2891: Ensure the output_schema is preserved when writing a + # partitioned dataset + dataset = pq.ParquetDataset(base_path, + filesystem=filesystem, + validate_schema=True, + use_legacy_dataset=use_legacy_dataset) + # ARROW-2209: Ensure the dataset schema also includes the partition columns + if use_legacy_dataset: + dataset_cols = set(dataset.schema.to_arrow_schema().names) + else: + # NB schema property is an arrow and not parquet schema + dataset_cols = set(dataset.schema.names) + + assert dataset_cols == set(output_table.schema.names) + + input_table = dataset.read() + input_df = input_table.to_pandas() + + # Read data back in and compare with original DataFrame + # Partitioned columns added to the end of the DataFrame when read + input_df_cols = input_df.columns.tolist() + assert partition_by == input_df_cols[-1 * len(partition_by):] + + input_df = input_df[cols] + # Partitioned columns become 'categorical' dtypes + for col in partition_by: + output_df[col] = output_df[col].astype('category') + tm.assert_frame_equal(output_df, input_df) + + +def _test_write_to_dataset_no_partitions(base_path, + use_legacy_dataset=True, + filesystem=None): + import pandas as pd + + import pyarrow.parquet as pq + + # ARROW-1400 + output_df = pd.DataFrame({'group1': list('aaabbbbccc'), + 'group2': list('eefeffgeee'), + 'num': list(range(10)), + 'date': np.arange('2017-01-01', '2017-01-11', + dtype='datetime64[D]')}) + cols = output_df.columns.tolist() + output_table = pa.Table.from_pandas(output_df) + + if filesystem is None: + filesystem = LocalFileSystem._get_instance() + + # Without partitions, append files to root_path + n = 5 + for i in range(n): + pq.write_to_dataset(output_table, base_path, + filesystem=filesystem) + output_files = [file for file in filesystem.ls(str(base_path)) + if file.endswith(".parquet")] + assert len(output_files) == n + + # Deduplicated incoming DataFrame should match + # original outgoing Dataframe + input_table = pq.ParquetDataset( + base_path, filesystem=filesystem, + use_legacy_dataset=use_legacy_dataset + ).read() + input_df = input_table.to_pandas() + input_df = input_df.drop_duplicates() + input_df = input_df[cols] + assert output_df.equals(input_df) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset): + _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions_and_schema( + tempdir, use_legacy_dataset +): + schema = pa.schema([pa.field('group1', type=pa.string()), + pa.field('group2', type=pa.string()), + pa.field('num', type=pa.int64()), + pa.field('nan', type=pa.int32()), + pa.field('date', type=pa.timestamp(unit='us'))]) + _test_write_to_dataset_with_partitions( + str(tempdir), use_legacy_dataset, schema=schema) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions_and_index_name( + tempdir, use_legacy_dataset +): + _test_write_to_dataset_with_partitions( + str(tempdir), use_legacy_dataset, index_name='index_name') + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset): + _test_write_to_dataset_no_partitions(str(tempdir), use_legacy_dataset) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset): + _test_write_to_dataset_with_partitions( + tempdir / "test1", use_legacy_dataset) + _test_write_to_dataset_no_partitions( + tempdir / "test2", use_legacy_dataset) + + +@pytest.mark.pandas +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_write_to_dataset_pathlib_nonlocal( + tempdir, s3_example_s3fs, use_legacy_dataset +): + # pathlib paths are only accepted for local files + fs, _ = s3_example_s3fs + + with pytest.raises(TypeError, match="path-like objects are only allowed"): + _test_write_to_dataset_with_partitions( + tempdir / "test1", use_legacy_dataset, filesystem=fs) + + with pytest.raises(TypeError, match="path-like objects are only allowed"): + _test_write_to_dataset_no_partitions( + tempdir / "test2", use_legacy_dataset, filesystem=fs) + + +@pytest.mark.pandas +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions_s3fs( + s3_example_s3fs, use_legacy_dataset +): + fs, path = s3_example_s3fs + + _test_write_to_dataset_with_partitions( + path, use_legacy_dataset, filesystem=fs) + + +@pytest.mark.pandas +@pytest.mark.s3 +@parametrize_legacy_dataset +def test_write_to_dataset_no_partitions_s3fs( + s3_example_s3fs, use_legacy_dataset +): + fs, path = s3_example_s3fs + + _test_write_to_dataset_no_partitions( + path, use_legacy_dataset, filesystem=fs) + + +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") +@pytest.mark.pandas +@parametrize_legacy_dataset_not_supported +def test_write_to_dataset_with_partitions_and_custom_filenames( + tempdir, use_legacy_dataset +): + output_df = pd.DataFrame({'group1': list('aaabbbbccc'), + 'group2': list('eefeffgeee'), + 'num': list(range(10)), + 'nan': [np.nan] * 10, + 'date': np.arange('2017-01-01', '2017-01-11', + dtype='datetime64[D]')}) + partition_by = ['group1', 'group2'] + output_table = pa.Table.from_pandas(output_df) + path = str(tempdir) + + def partition_filename_callback(keys): + return "{}-{}.parquet".format(*keys) + + pq.write_to_dataset(output_table, path, + partition_by, partition_filename_callback, + use_legacy_dataset=use_legacy_dataset) + + dataset = pq.ParquetDataset(path) + + # ARROW-3538: Ensure partition filenames match the given pattern + # defined in the local function partition_filename_callback + expected_basenames = [ + 'a-e.parquet', 'a-f.parquet', + 'b-e.parquet', 'b-f.parquet', + 'b-g.parquet', 'c-e.parquet' + ] + output_basenames = [os.path.basename(p.path) for p in dataset.pieces] + + assert sorted(expected_basenames) == sorted(output_basenames) + + +@pytest.mark.dataset +@pytest.mark.pandas +def test_write_to_dataset_filesystem(tempdir): + df = pd.DataFrame({'A': [1, 2, 3]}) + table = pa.Table.from_pandas(df) + path = str(tempdir) + + pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem()) + result = pq.read_table(path) + assert result.equals(table) + + +# TODO(dataset) support pickling +def _make_dataset_for_pickling(tempdir, N=100): + path = tempdir / 'data.parquet' + fs = LocalFileSystem._get_instance() + + df = pd.DataFrame({ + 'index': np.arange(N), + 'values': np.random.randn(N) + }, columns=['index', 'values']) + table = pa.Table.from_pandas(df) + + num_groups = 3 + with pq.ParquetWriter(path, table.schema) as writer: + for i in range(num_groups): + writer.write_table(table) + + reader = pq.ParquetFile(path) + assert reader.metadata.num_row_groups == num_groups + + metadata_path = tempdir / '_metadata' + with fs.open(metadata_path, 'wb') as f: + pq.write_metadata(table.schema, f) + + dataset = pq.ParquetDataset(tempdir, filesystem=fs) + assert dataset.metadata_path == str(metadata_path) + + return dataset + + +def _assert_dataset_is_picklable(dataset, pickler): + def is_pickleable(obj): + return obj == pickler.loads(pickler.dumps(obj)) + + assert is_pickleable(dataset) + assert is_pickleable(dataset.metadata) + assert is_pickleable(dataset.metadata.schema) + assert len(dataset.metadata.schema) + for column in dataset.metadata.schema: + assert is_pickleable(column) + + for piece in dataset._pieces: + assert is_pickleable(piece) + metadata = piece.get_metadata() + assert metadata.num_row_groups + for i in range(metadata.num_row_groups): + assert is_pickleable(metadata.row_group(i)) + + +@pytest.mark.pandas +def test_builtin_pickle_dataset(tempdir, datadir): + import pickle + dataset = _make_dataset_for_pickling(tempdir) + _assert_dataset_is_picklable(dataset, pickler=pickle) + + +@pytest.mark.pandas +def test_cloudpickle_dataset(tempdir, datadir): + cp = pytest.importorskip('cloudpickle') + dataset = _make_dataset_for_pickling(tempdir) + _assert_dataset_is_picklable(dataset, pickler=cp) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_partitioned_dataset(tempdir, use_legacy_dataset): + # ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset + # to a Parquet file + path = tempdir / "ARROW-3208" + df = pd.DataFrame({ + 'one': [-1, 10, 2.5, 100, 1000, 1, 29.2], + 'two': [-1, 10, 2, 100, 1000, 1, 11], + 'three': [0, 0, 0, 0, 0, 0, 0] + }) + table = pa.Table.from_pandas(df) + pq.write_to_dataset(table, root_path=str(path), + partition_cols=['one', 'two']) + table = pq.ParquetDataset( + path, use_legacy_dataset=use_legacy_dataset).read() + pq.write_table(table, path / "output.parquet") + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_read_dictionary(tempdir, use_legacy_dataset): + path = tempdir / "ARROW-3325-dataset" + t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) + t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) + # TODO pass use_legacy_dataset (need to fix unique names) + pq.write_to_dataset(t1, root_path=str(path)) + pq.write_to_dataset(t2, root_path=str(path)) + + result = pq.ParquetDataset( + path, read_dictionary=['f0'], + use_legacy_dataset=use_legacy_dataset).read() + + # The order of the chunks is non-deterministic + ex_chunks = [t1[0].chunk(0).dictionary_encode(), + t2[0].chunk(0).dictionary_encode()] + + assert result[0].num_chunks == 2 + c0, c1 = result[0].chunk(0), result[0].chunk(1) + if c0.equals(ex_chunks[0]): + assert c1.equals(ex_chunks[1]) + else: + assert c0.equals(ex_chunks[1]) + assert c1.equals(ex_chunks[0]) + + +@pytest.mark.dataset +def test_dataset_unsupported_keywords(): + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([])) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([])) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, validate_schema=False) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, split_row_groups=True) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, metadata_nthreads=4) + + with pytest.raises(ValueError, match="no longer supported"): + pq.read_table("", use_legacy_dataset=False, metadata=pa.schema([])) + + +@pytest.mark.dataset +def test_dataset_partitioning(tempdir): + import pyarrow.dataset as ds + + # create small dataset with directory partitioning + root_path = tempdir / "test_partitioning" + (root_path / "2012" / "10" / "01").mkdir(parents=True) + + table = pa.table({'a': [1, 2, 3]}) + pq.write_table( + table, str(root_path / "2012" / "10" / "01" / "data.parquet")) + + # This works with new dataset API + + # read_table + part = ds.partitioning(field_names=["year", "month", "day"]) + result = pq.read_table( + str(root_path), partitioning=part, use_legacy_dataset=False) + assert result.column_names == ["a", "year", "month", "day"] + + result = pq.ParquetDataset( + str(root_path), partitioning=part, use_legacy_dataset=False).read() + assert result.column_names == ["a", "year", "month", "day"] + + # This raises an error for legacy dataset + with pytest.raises(ValueError): + pq.read_table( + str(root_path), partitioning=part, use_legacy_dataset=True) + + with pytest.raises(ValueError): + pq.ParquetDataset( + str(root_path), partitioning=part, use_legacy_dataset=True) + + +@pytest.mark.dataset +def test_parquet_dataset_new_filesystem(tempdir): + # Ensure we can pass new FileSystem object to ParquetDataset + # (use new implementation automatically without specifying + # use_legacy_dataset=False) + table = pa.table({'a': [1, 2, 3]}) + pq.write_table(table, tempdir / 'data.parquet') + # don't use simple LocalFileSystem (as that gets mapped to legacy one) + filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem()) + dataset = pq.ParquetDataset('.', filesystem=filesystem) + result = dataset.read() + assert result.equals(table) + + +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") +def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir): + # ARROW-10462 ensure that on Windows we properly use posix-style paths + # as used by fsspec + fsspec = pytest.importorskip("fsspec") + filesystem = fsspec.filesystem('file') + table = pa.table({'a': [1, 2, 3]}) + pq.write_table(table, tempdir / 'data.parquet') + + # pass a posix-style path (using "/" also on Windows) + path = str(tempdir).replace("\\", "/") + dataset = pq.ParquetDataset(path, filesystem=filesystem) + # ensure the piece path is also posix-style + expected = path + "/data.parquet" + assert dataset.pieces[0].path == expected + + +@pytest.mark.dataset +def test_parquet_dataset_deprecated_properties(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + pq.write_table(table, path) + dataset = pq.ParquetDataset(path) + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): + dataset.pieces + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.partitions"): + dataset.partitions + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.memory_map"): + dataset.memory_map + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.read_dictio"): + dataset.read_dictionary + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.buffer_size"): + dataset.buffer_size + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"): + dataset.fs + + dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False) + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): + dataset2.pieces diff --git a/src/arrow/python/pyarrow/tests/parquet/test_datetime.py b/src/arrow/python/pyarrow/tests/parquet/test_datetime.py new file mode 100644 index 000000000..f39f1c762 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_datetime.py @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +import io + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow.tests.parquet.common import ( + _check_roundtrip, parametrize_legacy_dataset) + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _read_table, _write_table +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe +except ImportError: + pd = tm = None + + +pytestmark = pytest.mark.parquet + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_datetime_tz(use_legacy_dataset): + s = pd.Series([datetime.datetime(2017, 9, 6)]) + s = s.dt.tz_localize('utc') + + s.index = s + + # Both a column and an index to hit both use cases + df = pd.DataFrame({'tz_aware': s, + 'tz_eastern': s.dt.tz_convert('US/Eastern')}, + index=s) + + f = io.BytesIO() + + arrow_table = pa.Table.from_pandas(df) + + _write_table(arrow_table, f, coerce_timestamps='ms') + f.seek(0) + + table_read = pq.read_pandas(f, use_legacy_dataset=use_legacy_dataset) + + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_datetime_timezone_tzinfo(use_legacy_dataset): + value = datetime.datetime(2018, 1, 1, 1, 23, 45, + tzinfo=datetime.timezone.utc) + df = pd.DataFrame({'foo': [value]}) + + _roundtrip_pandas_dataframe( + df, write_kwargs={}, use_legacy_dataset=use_legacy_dataset) + + +@pytest.mark.pandas +def test_coerce_timestamps(tempdir): + from collections import OrderedDict + + # ARROW-622 + arrays = OrderedDict() + fields = [pa.field('datetime64', + pa.list_(pa.timestamp('ms')))] + arrays['datetime64'] = [ + np.array(['2007-07-13T01:23:34.123456789', + None, + '2010-08-13T05:46:57.437699912'], + dtype='datetime64[ms]'), + None, + None, + np.array(['2007-07-13T02', + None, + '2010-08-13T05:46:57.437699912'], + dtype='datetime64[ms]'), + ] + + df = pd.DataFrame(arrays) + schema = pa.schema(fields) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df, schema=schema) + + _write_table(arrow_table, filename, version='2.6', coerce_timestamps='us') + table_read = _read_table(filename) + df_read = table_read.to_pandas() + + df_expected = df.copy() + for i, x in enumerate(df_expected['datetime64']): + if isinstance(x, np.ndarray): + df_expected['datetime64'][i] = x.astype('M8[us]') + + tm.assert_frame_equal(df_expected, df_read) + + with pytest.raises(ValueError): + _write_table(arrow_table, filename, version='2.6', + coerce_timestamps='unknown') + + +@pytest.mark.pandas +def test_coerce_timestamps_truncated(tempdir): + """ + ARROW-2555: Test that we can truncate timestamps when coercing if + explicitly allowed. + """ + dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1, + second=1, microsecond=1) + dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1, + second=1) + + fields_us = [pa.field('datetime64', pa.timestamp('us'))] + arrays_us = {'datetime64': [dt_us, dt_ms]} + + df_us = pd.DataFrame(arrays_us) + schema_us = pa.schema(fields_us) + + filename = tempdir / 'pandas_truncated.parquet' + table_us = pa.Table.from_pandas(df_us, schema=schema_us) + + _write_table(table_us, filename, version='2.6', coerce_timestamps='ms', + allow_truncated_timestamps=True) + table_ms = _read_table(filename) + df_ms = table_ms.to_pandas() + + arrays_expected = {'datetime64': [dt_ms, dt_ms]} + df_expected = pd.DataFrame(arrays_expected) + tm.assert_frame_equal(df_expected, df_ms) + + +@pytest.mark.pandas +def test_date_time_types(tempdir): + t1 = pa.date32() + data1 = np.array([17259, 17260, 17261], dtype='int32') + a1 = pa.array(data1, type=t1) + + t2 = pa.date64() + data2 = data1.astype('int64') * 86400000 + a2 = pa.array(data2, type=t2) + + t3 = pa.timestamp('us') + start = pd.Timestamp('2001-01-01').value / 1000 + data3 = np.array([start, start + 1, start + 2], dtype='int64') + a3 = pa.array(data3, type=t3) + + t4 = pa.time32('ms') + data4 = np.arange(3, dtype='i4') + a4 = pa.array(data4, type=t4) + + t5 = pa.time64('us') + a5 = pa.array(data4.astype('int64'), type=t5) + + t6 = pa.time32('s') + a6 = pa.array(data4, type=t6) + + ex_t6 = pa.time32('ms') + ex_a6 = pa.array(data4 * 1000, type=ex_t6) + + t7 = pa.timestamp('ns') + start = pd.Timestamp('2001-01-01').value + data7 = np.array([start, start + 1000, start + 2000], + dtype='int64') + a7 = pa.array(data7, type=t7) + + table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7], + ['date32', 'date64', 'timestamp[us]', + 'time32[s]', 'time64[us]', + 'time32_from64[s]', + 'timestamp[ns]']) + + # date64 as date32 + # time32[s] to time32[ms] + expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], + ['date32', 'date64', 'timestamp[us]', + 'time32[s]', 'time64[us]', + 'time32_from64[s]', + 'timestamp[ns]']) + + _check_roundtrip(table, expected=expected, version='2.6') + + t0 = pa.timestamp('ms') + data0 = np.arange(4, dtype='int64') + a0 = pa.array(data0, type=t0) + + t1 = pa.timestamp('us') + data1 = np.arange(4, dtype='int64') + a1 = pa.array(data1, type=t1) + + t2 = pa.timestamp('ns') + data2 = np.arange(4, dtype='int64') + a2 = pa.array(data2, type=t2) + + table = pa.Table.from_arrays([a0, a1, a2], + ['ts[ms]', 'ts[us]', 'ts[ns]']) + expected = pa.Table.from_arrays([a0, a1, a2], + ['ts[ms]', 'ts[us]', 'ts[ns]']) + + # int64 for all timestamps supported by default + filename = tempdir / 'int64_timestamps.parquet' + _write_table(table, filename, version='2.6') + parquet_schema = pq.ParquetFile(filename).schema + for i in range(3): + assert parquet_schema.column(i).physical_type == 'INT64' + read_table = _read_table(filename) + assert read_table.equals(expected) + + t0_ns = pa.timestamp('ns') + data0_ns = np.array(data0 * 1000000, dtype='int64') + a0_ns = pa.array(data0_ns, type=t0_ns) + + t1_ns = pa.timestamp('ns') + data1_ns = np.array(data1 * 1000, dtype='int64') + a1_ns = pa.array(data1_ns, type=t1_ns) + + expected = pa.Table.from_arrays([a0_ns, a1_ns, a2], + ['ts[ms]', 'ts[us]', 'ts[ns]']) + + # int96 nanosecond timestamps produced upon request + filename = tempdir / 'explicit_int96_timestamps.parquet' + _write_table(table, filename, version='2.6', + use_deprecated_int96_timestamps=True) + parquet_schema = pq.ParquetFile(filename).schema + for i in range(3): + assert parquet_schema.column(i).physical_type == 'INT96' + read_table = _read_table(filename) + assert read_table.equals(expected) + + # int96 nanosecond timestamps implied by flavor 'spark' + filename = tempdir / 'spark_int96_timestamps.parquet' + _write_table(table, filename, version='2.6', + flavor='spark') + parquet_schema = pq.ParquetFile(filename).schema + for i in range(3): + assert parquet_schema.column(i).physical_type == 'INT96' + read_table = _read_table(filename) + assert read_table.equals(expected) + + +@pytest.mark.pandas +@pytest.mark.parametrize('unit', ['s', 'ms', 'us', 'ns']) +def test_coerce_int96_timestamp_unit(unit): + i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000 + + d_s = np.arange(i_s, i_s + 10, 1, dtype='int64') + d_ms = d_s * 1000 + d_us = d_ms * 1000 + d_ns = d_us * 1000 + + a_s = pa.array(d_s, type=pa.timestamp('s')) + a_ms = pa.array(d_ms, type=pa.timestamp('ms')) + a_us = pa.array(d_us, type=pa.timestamp('us')) + a_ns = pa.array(d_ns, type=pa.timestamp('ns')) + + arrays = {"s": a_s, "ms": a_ms, "us": a_us, "ns": a_ns} + names = ['ts_s', 'ts_ms', 'ts_us', 'ts_ns'] + table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names) + + # For either Parquet version, coercing to nanoseconds is allowed + # if Int96 storage is used + expected = pa.Table.from_arrays([arrays.get(unit)]*4, names) + read_table_kwargs = {"coerce_int96_timestamp_unit": unit} + _check_roundtrip(table, expected, + read_table_kwargs=read_table_kwargs, + use_deprecated_int96_timestamps=True) + _check_roundtrip(table, expected, version='2.6', + read_table_kwargs=read_table_kwargs, + use_deprecated_int96_timestamps=True) + + +@pytest.mark.pandas +@pytest.mark.parametrize('pq_reader_method', ['ParquetFile', 'read_table']) +def test_coerce_int96_timestamp_overflow(pq_reader_method, tempdir): + + def get_table(pq_reader_method, filename, **kwargs): + if pq_reader_method == "ParquetFile": + return pq.ParquetFile(filename, **kwargs).read() + elif pq_reader_method == "read_table": + return pq.read_table(filename, **kwargs) + + # Recreating the initial JIRA issue referrenced in ARROW-12096 + oob_dts = [ + datetime.datetime(1000, 1, 1), + datetime.datetime(2000, 1, 1), + datetime.datetime(3000, 1, 1) + ] + df = pd.DataFrame({"a": oob_dts}) + table = pa.table(df) + + filename = tempdir / "test_round_trip_overflow.parquet" + pq.write_table(table, filename, use_deprecated_int96_timestamps=True, + version="1.0") + + # with the default resolution of ns, we get wrong values for INT96 + # that are out of bounds for nanosecond range + tab_error = get_table(pq_reader_method, filename) + assert tab_error["a"].to_pylist() != oob_dts + + # avoid this overflow by specifying the resolution to use for INT96 values + tab_correct = get_table( + pq_reader_method, filename, coerce_int96_timestamp_unit="s" + ) + df_correct = tab_correct.to_pandas(timestamp_as_object=True) + tm.assert_frame_equal(df, df_correct) + + +def test_timestamp_restore_timezone(): + # ARROW-5888, restore timezone from serialized metadata + ty = pa.timestamp('ms', tz='America/New_York') + arr = pa.array([1, 2, 3], type=ty) + t = pa.table([arr], names=['f0']) + _check_roundtrip(t) + + +def test_timestamp_restore_timezone_nanosecond(): + # ARROW-9634, also restore timezone for nanosecond data that get stored + # as microseconds in the parquet file + ty = pa.timestamp('ns', tz='America/New_York') + arr = pa.array([1000, 2000, 3000], type=ty) + table = pa.table([arr], names=['f0']) + ty_us = pa.timestamp('us', tz='America/New_York') + expected = pa.table([arr.cast(ty_us)], names=['f0']) + _check_roundtrip(table, expected=expected) + + +@pytest.mark.pandas +def test_list_of_datetime_time_roundtrip(): + # ARROW-4135 + times = pd.to_datetime(['09:00', '09:30', '10:00', '10:30', '11:00', + '11:30', '12:00']) + df = pd.DataFrame({'time': [times.time]}) + _roundtrip_pandas_dataframe(df, write_kwargs={}) + + +@pytest.mark.pandas +def test_parquet_version_timestamp_differences(): + i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000 + + d_s = np.arange(i_s, i_s + 10, 1, dtype='int64') + d_ms = d_s * 1000 + d_us = d_ms * 1000 + d_ns = d_us * 1000 + + a_s = pa.array(d_s, type=pa.timestamp('s')) + a_ms = pa.array(d_ms, type=pa.timestamp('ms')) + a_us = pa.array(d_us, type=pa.timestamp('us')) + a_ns = pa.array(d_ns, type=pa.timestamp('ns')) + + names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns'] + table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names) + + # Using Parquet version 1.0, seconds should be coerced to milliseconds + # and nanoseconds should be coerced to microseconds by default + expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names) + _check_roundtrip(table, expected) + + # Using Parquet version 2.0, seconds should be coerced to milliseconds + # and nanoseconds should be retained by default + expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names) + _check_roundtrip(table, expected, version='2.6') + + # Using Parquet version 1.0, coercing to milliseconds or microseconds + # is allowed + expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names) + _check_roundtrip(table, expected, coerce_timestamps='ms') + + # Using Parquet version 2.0, coercing to milliseconds or microseconds + # is allowed + expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names) + _check_roundtrip(table, expected, version='2.6', coerce_timestamps='us') + + # TODO: after pyarrow allows coerce_timestamps='ns', tests like the + # following should pass ... + + # Using Parquet version 1.0, coercing to nanoseconds is not allowed + # expected = None + # with pytest.raises(NotImplementedError): + # _roundtrip_table(table, coerce_timestamps='ns') + + # Using Parquet version 2.0, coercing to nanoseconds is allowed + # expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names) + # _check_roundtrip(table, expected, version='2.6', coerce_timestamps='ns') + + # For either Parquet version, coercing to nanoseconds is allowed + # if Int96 storage is used + expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names) + _check_roundtrip(table, expected, + use_deprecated_int96_timestamps=True) + _check_roundtrip(table, expected, version='2.6', + use_deprecated_int96_timestamps=True) + + +@pytest.mark.pandas +def test_noncoerced_nanoseconds_written_without_exception(tempdir): + # ARROW-1957: the Parquet version 2.0 writer preserves Arrow + # nanosecond timestamps by default + n = 9 + df = pd.DataFrame({'x': range(n)}, + index=pd.date_range('2017-01-01', freq='1n', periods=n)) + tb = pa.Table.from_pandas(df) + + filename = tempdir / 'written.parquet' + try: + pq.write_table(tb, filename, version='2.6') + except Exception: + pass + assert filename.exists() + + recovered_table = pq.read_table(filename) + assert tb.equals(recovered_table) + + # Loss of data through coercion (without explicit override) still an error + filename = tempdir / 'not_written.parquet' + with pytest.raises(ValueError): + pq.write_table(tb, filename, coerce_timestamps='ms', version='2.6') diff --git a/src/arrow/python/pyarrow/tests/parquet/test_metadata.py b/src/arrow/python/pyarrow/tests/parquet/test_metadata.py new file mode 100644 index 000000000..9fa2f6394 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_metadata.py @@ -0,0 +1,524 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from collections import OrderedDict + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _write_table +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.parquet.common import alltypes_sample +except ImportError: + pd = tm = None + + +pytestmark = pytest.mark.parquet + + +@pytest.mark.pandas +def test_parquet_metadata_api(): + df = alltypes_sample(size=10000) + df = df.reindex(columns=sorted(df.columns)) + df.index = np.random.randint(0, 1000000, size=len(df)) + + fileh = make_sample_file(df) + ncols = len(df.columns) + + # Series of sniff tests + meta = fileh.metadata + repr(meta) + assert meta.num_rows == len(df) + assert meta.num_columns == ncols + 1 # +1 for index + assert meta.num_row_groups == 1 + assert meta.format_version == '2.6' + assert 'parquet-cpp' in meta.created_by + assert isinstance(meta.serialized_size, int) + assert isinstance(meta.metadata, dict) + + # Schema + schema = fileh.schema + assert meta.schema is schema + assert len(schema) == ncols + 1 # +1 for index + repr(schema) + + col = schema[0] + repr(col) + assert col.name == df.columns[0] + assert col.max_definition_level == 1 + assert col.max_repetition_level == 0 + assert col.max_repetition_level == 0 + + assert col.physical_type == 'BOOLEAN' + assert col.converted_type == 'NONE' + + with pytest.raises(IndexError): + schema[ncols + 1] # +1 for index + + with pytest.raises(IndexError): + schema[-1] + + # Row group + for rg in range(meta.num_row_groups): + rg_meta = meta.row_group(rg) + assert isinstance(rg_meta, pq.RowGroupMetaData) + repr(rg_meta) + + for col in range(rg_meta.num_columns): + col_meta = rg_meta.column(col) + assert isinstance(col_meta, pq.ColumnChunkMetaData) + repr(col_meta) + + with pytest.raises(IndexError): + meta.row_group(-1) + + with pytest.raises(IndexError): + meta.row_group(meta.num_row_groups + 1) + + rg_meta = meta.row_group(0) + assert rg_meta.num_rows == len(df) + assert rg_meta.num_columns == ncols + 1 # +1 for index + assert rg_meta.total_byte_size > 0 + + with pytest.raises(IndexError): + col_meta = rg_meta.column(-1) + + with pytest.raises(IndexError): + col_meta = rg_meta.column(ncols + 2) + + col_meta = rg_meta.column(0) + assert col_meta.file_offset > 0 + assert col_meta.file_path == '' # created from BytesIO + assert col_meta.physical_type == 'BOOLEAN' + assert col_meta.num_values == 10000 + assert col_meta.path_in_schema == 'bool' + assert col_meta.is_stats_set is True + assert isinstance(col_meta.statistics, pq.Statistics) + assert col_meta.compression == 'SNAPPY' + assert col_meta.encodings == ('PLAIN', 'RLE') + assert col_meta.has_dictionary_page is False + assert col_meta.dictionary_page_offset is None + assert col_meta.data_page_offset > 0 + assert col_meta.total_compressed_size > 0 + assert col_meta.total_uncompressed_size > 0 + with pytest.raises(NotImplementedError): + col_meta.has_index_page + with pytest.raises(NotImplementedError): + col_meta.index_page_offset + + +def test_parquet_metadata_lifetime(tempdir): + # ARROW-6642 - ensure that chained access keeps parent objects alive + table = pa.table({'a': [1, 2, 3]}) + pq.write_table(table, tempdir / 'test_metadata_segfault.parquet') + parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet') + parquet_file.metadata.row_group(0).column(0).statistics + + +@pytest.mark.pandas +@pytest.mark.parametrize( + ( + 'data', + 'type', + 'physical_type', + 'min_value', + 'max_value', + 'null_count', + 'num_values', + 'distinct_count' + ), + [ + ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0), + ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0), + ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0), + ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0), + ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0), + ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0), + ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0), + ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0), + ( + [-1.1, 2.2, 2.3, None, 4.4], pa.float32(), + 'FLOAT', -1.1, 4.4, 1, 4, 0 + ), + ( + [-1.1, 2.2, 2.3, None, 4.4], pa.float64(), + 'DOUBLE', -1.1, 4.4, 1, 4, 0 + ), + ( + ['', 'b', chr(1000), None, 'aaa'], pa.binary(), + 'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, 0 + ), + ( + [True, False, False, True, True], pa.bool_(), + 'BOOLEAN', False, True, 0, 5, 0 + ), + ( + [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(), + 'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0 + ), + ] +) +def test_parquet_column_statistics_api(data, type, physical_type, min_value, + max_value, null_count, num_values, + distinct_count): + df = pd.DataFrame({'data': data}) + schema = pa.schema([pa.field('data', type)]) + table = pa.Table.from_pandas(df, schema=schema, safe=False) + fileh = make_sample_file(table) + + meta = fileh.metadata + + rg_meta = meta.row_group(0) + col_meta = rg_meta.column(0) + + stat = col_meta.statistics + assert stat.has_min_max + assert _close(type, stat.min, min_value) + assert _close(type, stat.max, max_value) + assert stat.null_count == null_count + assert stat.num_values == num_values + # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount + # method, missing distinct_count is represented as zero instead of None + assert stat.distinct_count == distinct_count + assert stat.physical_type == physical_type + + +def _close(type, left, right): + if type == pa.float32(): + return abs(left - right) < 1E-7 + elif type == pa.float64(): + return abs(left - right) < 1E-13 + else: + return left == right + + +# ARROW-6339 +@pytest.mark.pandas +def test_parquet_raise_on_unset_statistics(): + df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")}) + meta = make_sample_file(pa.Table.from_pandas(df)).metadata + + assert not meta.row_group(0).column(0).statistics.has_min_max + assert meta.row_group(0).column(0).statistics.max is None + + +def test_statistics_convert_logical_types(tempdir): + # ARROW-5166, ARROW-4139 + + # (min, max, type) + cases = [(10, 11164359321221007157, pa.uint64()), + (10, 4294967295, pa.uint32()), + ("ähnlich", "öffentlich", pa.utf8()), + (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), + pa.time32('ms')), + (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), + pa.time64('us')), + (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), + datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), + pa.timestamp('ms')), + (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), + datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), + pa.timestamp('us'))] + + for i, (min_val, max_val, typ) in enumerate(cases): + t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)], + ['col']) + path = str(tempdir / ('example{}.parquet'.format(i))) + pq.write_table(t, path, version='2.6') + pf = pq.ParquetFile(path) + stats = pf.metadata.row_group(0).column(0).statistics + assert stats.min == min_val + assert stats.max == max_val + + +def test_parquet_write_disable_statistics(tempdir): + table = pa.Table.from_pydict( + OrderedDict([ + ('a', pa.array([1, 2, 3])), + ('b', pa.array(['a', 'b', 'c'])) + ]) + ) + _write_table(table, tempdir / 'data.parquet') + meta = pq.read_metadata(tempdir / 'data.parquet') + for col in [0, 1]: + cc = meta.row_group(0).column(col) + assert cc.is_stats_set is True + assert cc.statistics is not None + + _write_table(table, tempdir / 'data2.parquet', write_statistics=False) + meta = pq.read_metadata(tempdir / 'data2.parquet') + for col in [0, 1]: + cc = meta.row_group(0).column(col) + assert cc.is_stats_set is False + assert cc.statistics is None + + _write_table(table, tempdir / 'data3.parquet', write_statistics=['a']) + meta = pq.read_metadata(tempdir / 'data3.parquet') + cc_a = meta.row_group(0).column(0) + cc_b = meta.row_group(0).column(1) + assert cc_a.is_stats_set is True + assert cc_b.is_stats_set is False + assert cc_a.statistics is not None + assert cc_b.statistics is None + + +def test_field_id_metadata(): + # ARROW-7080 + field_id = b'PARQUET:field_id' + inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'}) + middle = pa.field('middle', pa.struct( + [inner]), metadata={field_id: b'101'}) + fields = [ + pa.field('basic', pa.int32(), metadata={ + b'other': b'abc', field_id: b'1'}), + pa.field( + 'list', + pa.list_(pa.field('list-inner', pa.int32(), + metadata={field_id: b'10'})), + metadata={field_id: b'11'}), + pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}), + pa.field('no-metadata', pa.int32()), + pa.field('non-integral-field-id', pa.int32(), + metadata={field_id: b'xyz'}), + pa.field('negative-field-id', pa.int32(), + metadata={field_id: b'-1000'}) + ] + arrs = [[] for _ in fields] + table = pa.table(arrs, schema=pa.schema(fields)) + + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + contents = bio.getvalue() + + pf = pq.ParquetFile(pa.BufferReader(contents)) + schema = pf.schema_arrow + + assert schema[0].metadata[field_id] == b'1' + assert schema[0].metadata[b'other'] == b'abc' + + list_field = schema[1] + assert list_field.metadata[field_id] == b'11' + + list_item_field = list_field.type.value_field + assert list_item_field.metadata[field_id] == b'10' + + struct_field = schema[2] + assert struct_field.metadata[field_id] == b'102' + + struct_middle_field = struct_field.type[0] + assert struct_middle_field.metadata[field_id] == b'101' + + struct_inner_field = struct_middle_field.type[0] + assert struct_inner_field.metadata[field_id] == b'100' + + assert schema[3].metadata is None + # Invalid input is passed through (ok) but does not + # have field_id in parquet (not tested) + assert schema[4].metadata[field_id] == b'xyz' + assert schema[5].metadata[field_id] == b'-1000' + + +@pytest.mark.pandas +def test_multi_dataset_metadata(tempdir): + filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"] + metapath = str(tempdir / "_metadata") + + # create a test dataset + df = pd.DataFrame({ + 'one': [1, 2, 3], + 'two': [-1, -2, -3], + 'three': [[1, 2], [2, 3], [3, 4]], + }) + table = pa.Table.from_pandas(df) + + # write dataset twice and collect/merge metadata + _meta = None + for filename in filenames: + meta = [] + pq.write_table(table, str(tempdir / filename), + metadata_collector=meta) + meta[0].set_file_path(filename) + if _meta is None: + _meta = meta[0] + else: + _meta.append_row_groups(meta[0]) + + # Write merged metadata-only file + with open(metapath, "wb") as f: + _meta.write_metadata_file(f) + + # Read back the metadata + meta = pq.read_metadata(metapath) + md = meta.to_dict() + _md = _meta.to_dict() + for key in _md: + if key != 'serialized_size': + assert _md[key] == md[key] + assert _md['num_columns'] == 3 + assert _md['num_rows'] == 6 + assert _md['num_row_groups'] == 2 + assert _md['serialized_size'] == 0 + assert md['serialized_size'] > 0 + + +def test_write_metadata(tempdir): + path = str(tempdir / "metadata") + schema = pa.schema([("a", "int64"), ("b", "float64")]) + + # write a pyarrow schema + pq.write_metadata(schema, path) + parquet_meta = pq.read_metadata(path) + schema_as_arrow = parquet_meta.schema.to_arrow_schema() + assert schema_as_arrow.equals(schema) + + # ARROW-8980: Check that the ARROW:schema metadata key was removed + if schema_as_arrow.metadata: + assert b'ARROW:schema' not in schema_as_arrow.metadata + + # pass through writer keyword arguments + for version in ["1.0", "2.0", "2.4", "2.6"]: + pq.write_metadata(schema, path, version=version) + parquet_meta = pq.read_metadata(path) + # The version is stored as a single integer in the Parquet metadata, + # so it cannot correctly express dotted format versions + expected_version = "1.0" if version == "1.0" else "2.6" + assert parquet_meta.format_version == expected_version + + # metadata_collector: list of FileMetaData objects + table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema) + pq.write_table(table, tempdir / "data.parquet") + parquet_meta = pq.read_metadata(str(tempdir / "data.parquet")) + pq.write_metadata( + schema, path, metadata_collector=[parquet_meta, parquet_meta] + ) + parquet_meta_mult = pq.read_metadata(path) + assert parquet_meta_mult.num_row_groups == 2 + + # append metadata with different schema raises an error + with pytest.raises(RuntimeError, match="requires equal schemas"): + pq.write_metadata( + pa.schema([("a", "int32"), ("b", "null")]), + path, metadata_collector=[parquet_meta, parquet_meta] + ) + + +def test_table_large_metadata(): + # ARROW-8694 + my_schema = pa.schema([pa.field('f0', 'double')], + metadata={'large': 'x' * 10000000}) + + table = pa.table([np.arange(10)], schema=my_schema) + _check_roundtrip(table) + + +@pytest.mark.pandas +def test_compare_schemas(): + df = alltypes_sample(size=10000) + + fileh = make_sample_file(df) + fileh2 = make_sample_file(df) + fileh3 = make_sample_file(df[df.columns[::2]]) + + # ParquetSchema + assert isinstance(fileh.schema, pq.ParquetSchema) + assert fileh.schema.equals(fileh.schema) + assert fileh.schema == fileh.schema + assert fileh.schema.equals(fileh2.schema) + assert fileh.schema == fileh2.schema + assert fileh.schema != 'arbitrary object' + assert not fileh.schema.equals(fileh3.schema) + assert fileh.schema != fileh3.schema + + # ColumnSchema + assert isinstance(fileh.schema[0], pq.ColumnSchema) + assert fileh.schema[0].equals(fileh.schema[0]) + assert fileh.schema[0] == fileh.schema[0] + assert not fileh.schema[0].equals(fileh.schema[1]) + assert fileh.schema[0] != fileh.schema[1] + assert fileh.schema[0] != 'arbitrary object' + + +@pytest.mark.pandas +def test_read_schema(tempdir): + N = 100 + df = pd.DataFrame({ + 'index': np.arange(N), + 'values': np.random.randn(N) + }, columns=['index', 'values']) + + data_path = tempdir / 'test.parquet' + + table = pa.Table.from_pandas(df) + _write_table(table, data_path) + + read1 = pq.read_schema(data_path) + read2 = pq.read_schema(data_path, memory_map=True) + assert table.schema.equals(read1) + assert table.schema.equals(read2) + + assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas'] + + +def test_parquet_metadata_empty_to_dict(tempdir): + # https://issues.apache.org/jira/browse/ARROW-10146 + table = pa.table({"a": pa.array([], type="int64")}) + pq.write_table(table, tempdir / "data.parquet") + metadata = pq.read_metadata(tempdir / "data.parquet") + # ensure this doesn't error / statistics set to None + metadata_dict = metadata.to_dict() + assert len(metadata_dict["row_groups"]) == 1 + assert len(metadata_dict["row_groups"][0]["columns"]) == 1 + assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None + + +@pytest.mark.slow +@pytest.mark.large_memory +def test_metadata_exceeds_message_size(): + # ARROW-13655: Thrift may enable a defaut message size that limits + # the size of Parquet metadata that can be written. + NCOLS = 1000 + NREPEATS = 4000 + + table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)}) + + with pa.BufferOutputStream() as out: + pq.write_table(table, out) + buf = out.getvalue() + + original_metadata = pq.read_metadata(pa.BufferReader(buf)) + metadata = pq.read_metadata(pa.BufferReader(buf)) + for i in range(NREPEATS): + metadata.append_row_groups(original_metadata) + + with pa.BufferOutputStream() as out: + metadata.write_metadata_file(out) + buf = out.getvalue() + + metadata = pq.read_metadata(pa.BufferReader(buf)) diff --git a/src/arrow/python/pyarrow/tests/parquet/test_pandas.py b/src/arrow/python/pyarrow/tests/parquet/test_pandas.py new file mode 100644 index 000000000..5fc1b7060 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_pandas.py @@ -0,0 +1,687 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +import json + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow.fs import LocalFileSystem, SubTreeFileSystem +from pyarrow.tests.parquet.common import ( + parametrize_legacy_dataset, parametrize_legacy_dataset_not_supported) +from pyarrow.util import guid +from pyarrow.vendored.version import Version + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import (_read_table, _test_dataframe, + _write_table) +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.parquet.common import (_roundtrip_pandas_dataframe, + alltypes_sample) +except ImportError: + pd = tm = None + + +pytestmark = pytest.mark.parquet + + +@pytest.mark.pandas +def test_pandas_parquet_custom_metadata(tempdir): + df = alltypes_sample(size=10000) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + assert b'pandas' in arrow_table.schema.metadata + + _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms') + + metadata = pq.read_metadata(filename).metadata + assert b'pandas' in metadata + + js = json.loads(metadata[b'pandas'].decode('utf8')) + assert js['index_columns'] == [{'kind': 'range', + 'name': None, + 'start': 0, 'stop': 10000, + 'step': 1}] + + +@pytest.mark.pandas +def test_merging_parquet_tables_with_different_pandas_metadata(tempdir): + # ARROW-3728: Merging Parquet Files - Pandas Meta in Schema Mismatch + schema = pa.schema([ + pa.field('int', pa.int16()), + pa.field('float', pa.float32()), + pa.field('string', pa.string()) + ]) + df1 = pd.DataFrame({ + 'int': np.arange(3, dtype=np.uint8), + 'float': np.arange(3, dtype=np.float32), + 'string': ['ABBA', 'EDDA', 'ACDC'] + }) + df2 = pd.DataFrame({ + 'int': [4, 5], + 'float': [1.1, None], + 'string': [None, None] + }) + table1 = pa.Table.from_pandas(df1, schema=schema, preserve_index=False) + table2 = pa.Table.from_pandas(df2, schema=schema, preserve_index=False) + + assert not table1.schema.equals(table2.schema, check_metadata=True) + assert table1.schema.equals(table2.schema) + + writer = pq.ParquetWriter(tempdir / 'merged.parquet', schema=schema) + writer.write_table(table1) + writer.write_table(table2) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_column_multiindex(tempdir, use_legacy_dataset): + df = alltypes_sample(size=10) + df.columns = pd.MultiIndex.from_tuples( + list(zip(df.columns, df.columns[::-1])), + names=['level_1', 'level_2'] + ) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + assert arrow_table.schema.pandas_metadata is not None + + _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms') + + table_read = pq.read_pandas( + filename, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written( + tempdir, use_legacy_dataset +): + df = alltypes_sample(size=10000) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + js = arrow_table.schema.pandas_metadata + assert not js['index_columns'] + # ARROW-2170 + # While index_columns should be empty, columns needs to be filled still. + assert js['columns'] + + _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms') + table_read = pq.read_pandas( + filename, use_legacy_dataset=use_legacy_dataset) + + js = table_read.schema.pandas_metadata + assert not js['index_columns'] + + read_metadata = table_read.schema.metadata + assert arrow_table.schema.metadata == read_metadata + + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +# TODO(dataset) duplicate column selection actually gives duplicate columns now +@pytest.mark.pandas +@parametrize_legacy_dataset_not_supported +def test_pandas_column_selection(tempdir, use_legacy_dataset): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16) + }) + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + _write_table(arrow_table, filename) + table_read = _read_table( + filename, columns=['uint8'], use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + + tm.assert_frame_equal(df[['uint8']], df_read) + + # ARROW-4267: Selection of duplicate columns still leads to these columns + # being read uniquely. + table_read = _read_table( + filename, columns=['uint8', 'uint8'], + use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + + tm.assert_frame_equal(df[['uint8']], df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset): + df = _test_dataframe(10000) + arrow_table = pa.Table.from_pandas(df) + imos = pa.BufferOutputStream() + _write_table(arrow_table, imos, version='2.6') + buf = imos.getvalue() + reader = pa.BufferReader(buf) + df_read = _read_table( + reader, use_legacy_dataset=use_legacy_dataset).to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_read_pandas_column_subset(tempdir, use_legacy_dataset): + df = _test_dataframe(10000) + arrow_table = pa.Table.from_pandas(df) + imos = pa.BufferOutputStream() + _write_table(arrow_table, imos, version='2.6') + buf = imos.getvalue() + reader = pa.BufferReader(buf) + df_read = pq.read_pandas( + reader, columns=['strings', 'uint8'], + use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(df[['strings', 'uint8']], df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset): + df = _test_dataframe(0) + arrow_table = pa.Table.from_pandas(df) + imos = pa.BufferOutputStream() + _write_table(arrow_table, imos, version='2.6') + buf = imos.getvalue() + reader = pa.BufferReader(buf) + df_read = _read_table( + reader, use_legacy_dataset=use_legacy_dataset).to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +def test_pandas_can_write_nested_data(tempdir): + data = { + "agg_col": [ + {"page_type": 1}, + {"record_type": 1}, + {"non_consecutive_home": 0}, + ], + "uid_first": "1001" + } + df = pd.DataFrame(data=data) + arrow_table = pa.Table.from_pandas(df) + imos = pa.BufferOutputStream() + # This succeeds under V2 + _write_table(arrow_table, imos) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset): + filename = tempdir / 'pandas_pyfile_roundtrip.parquet' + size = 5 + df = pd.DataFrame({ + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'strings': ['foo', 'bar', None, 'baz', 'qux'] + }) + + arrow_table = pa.Table.from_pandas(df) + + with filename.open('wb') as f: + _write_table(arrow_table, f, version="1.0") + + data = io.BytesIO(filename.read_bytes()) + + table_read = _read_table(data, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0 + }) + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + + for use_dictionary in [True, False]: + _write_table(arrow_table, filename, version='2.6', + use_dictionary=use_dictionary) + table_read = _read_table( + filename, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + for write_statistics in [True, False]: + _write_table(arrow_table, filename, version='2.6', + write_statistics=write_statistics) + table_read = _read_table(filename, + use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']: + if (compression != 'NONE' and + not pa.lib.Codec.is_available(compression)): + continue + _write_table(arrow_table, filename, version='2.6', + compression=compression) + table_read = _read_table( + filename, use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + +@pytest.mark.pandas +def test_spark_flavor_preserves_pandas_metadata(): + df = _test_dataframe(size=100) + df.index = np.arange(0, 10 * len(df), 10) + df.index.name = 'foo' + + result = _roundtrip_pandas_dataframe(df, {'version': '2.0', + 'flavor': 'spark'}) + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_index_column_name_duplicate(tempdir, use_legacy_dataset): + data = { + 'close': { + pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998, + pd.Timestamp('2017-06-30 01:32:00'): 154.99958999999998, + }, + 'time': { + pd.Timestamp('2017-06-30 01:31:00'): pd.Timestamp( + '2017-06-30 01:31:00' + ), + pd.Timestamp('2017-06-30 01:32:00'): pd.Timestamp( + '2017-06-30 01:32:00' + ), + } + } + path = str(tempdir / 'data.parquet') + dfx = pd.DataFrame(data).set_index('time', drop=False) + tdfx = pa.Table.from_pandas(dfx) + _write_table(tdfx, path) + arrow_table = _read_table(path, use_legacy_dataset=use_legacy_dataset) + result_df = arrow_table.to_pandas() + tm.assert_frame_equal(result_df, dfx) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_multiindex_duplicate_values(tempdir, use_legacy_dataset): + num_rows = 3 + numbers = list(range(num_rows)) + index = pd.MultiIndex.from_arrays( + [['foo', 'foo', 'bar'], numbers], + names=['foobar', 'some_numbers'], + ) + + df = pd.DataFrame({'numbers': numbers}, index=index) + table = pa.Table.from_pandas(df) + + filename = tempdir / 'dup_multi_index_levels.parquet' + + _write_table(table, filename) + result_table = _read_table(filename, use_legacy_dataset=use_legacy_dataset) + assert table.equals(result_table) + + result_df = result_table.to_pandas() + tm.assert_frame_equal(result_df, df) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_backwards_compatible_index_naming(datadir, use_legacy_dataset): + expected_string = b"""\ +carat cut color clarity depth table price x y z + 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 + 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31 + 0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31 + 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63 + 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75 + 0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48 + 0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47 + 0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53 + 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49 + 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39""" + expected = pd.read_csv(io.BytesIO(expected_string), sep=r'\s{2,}', + index_col=None, header=0, engine='python') + table = _read_table( + datadir / 'v0.7.1.parquet', use_legacy_dataset=use_legacy_dataset) + result = table.to_pandas() + tm.assert_frame_equal(result, expected) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_backwards_compatible_index_multi_level_named( + datadir, use_legacy_dataset +): + expected_string = b"""\ +carat cut color clarity depth table price x y z + 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 + 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31 + 0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31 + 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63 + 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75 + 0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48 + 0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47 + 0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53 + 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49 + 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39""" + expected = pd.read_csv( + io.BytesIO(expected_string), sep=r'\s{2,}', + index_col=['cut', 'color', 'clarity'], + header=0, engine='python' + ).sort_index() + + table = _read_table(datadir / 'v0.7.1.all-named-index.parquet', + use_legacy_dataset=use_legacy_dataset) + result = table.to_pandas() + tm.assert_frame_equal(result, expected) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_backwards_compatible_index_multi_level_some_named( + datadir, use_legacy_dataset +): + expected_string = b"""\ +carat cut color clarity depth table price x y z + 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 + 0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31 + 0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31 + 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63 + 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75 + 0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48 + 0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47 + 0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53 + 0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49 + 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39""" + expected = pd.read_csv( + io.BytesIO(expected_string), + sep=r'\s{2,}', index_col=['cut', 'color', 'clarity'], + header=0, engine='python' + ).sort_index() + expected.index = expected.index.set_names(['cut', None, 'clarity']) + + table = _read_table(datadir / 'v0.7.1.some-named-index.parquet', + use_legacy_dataset=use_legacy_dataset) + result = table.to_pandas() + tm.assert_frame_equal(result, expected) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_backwards_compatible_column_metadata_handling( + datadir, use_legacy_dataset +): + expected = pd.DataFrame( + {'a': [1, 2, 3], 'b': [.1, .2, .3], + 'c': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')}) + expected.index = pd.MultiIndex.from_arrays( + [['a', 'b', 'c'], + pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')], + names=['index', None]) + + path = datadir / 'v0.7.1.column-metadata-handling.parquet' + table = _read_table(path, use_legacy_dataset=use_legacy_dataset) + result = table.to_pandas() + tm.assert_frame_equal(result, expected) + + table = _read_table( + path, columns=['a'], use_legacy_dataset=use_legacy_dataset) + result = table.to_pandas() + tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True)) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_categorical_index_survives_roundtrip(use_legacy_dataset): + # ARROW-3652, addressed by ARROW-3246 + df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2']) + df['c1'] = df['c1'].astype('category') + df = df.set_index(['c1']) + + table = pa.Table.from_pandas(df) + bos = pa.BufferOutputStream() + pq.write_table(table, bos) + ref_df = pq.read_pandas( + bos.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas() + assert isinstance(ref_df.index, pd.CategoricalIndex) + assert ref_df.index.equals(df.index) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_categorical_order_survives_roundtrip(use_legacy_dataset): + # ARROW-6302 + df = pd.DataFrame({"a": pd.Categorical( + ["a", "b", "c", "a"], categories=["b", "c", "d"], ordered=True)}) + + table = pa.Table.from_pandas(df) + bos = pa.BufferOutputStream() + pq.write_table(table, bos) + + contents = bos.getvalue() + result = pq.read_pandas( + contents, use_legacy_dataset=use_legacy_dataset).to_pandas() + + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_categorical_na_type_row_groups(use_legacy_dataset): + # ARROW-5085 + df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100}) + df_category = df.astype({"col": "category", "int": "category"}) + table = pa.Table.from_pandas(df) + table_cat = pa.Table.from_pandas(df_category) + buf = pa.BufferOutputStream() + + # it works + pq.write_table(table_cat, buf, version='2.6', chunk_size=10) + result = pq.read_table( + buf.getvalue(), use_legacy_dataset=use_legacy_dataset) + + # Result is non-categorical + assert result[0].equals(table[0]) + assert result[1].equals(table[1]) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_pandas_categorical_roundtrip(use_legacy_dataset): + # ARROW-5480, this was enabled by ARROW-3246 + + # Have one of the categories unobserved and include a null (-1) + codes = np.array([2, 0, 0, 2, 0, -1, 2], dtype='int32') + categories = ['foo', 'bar', 'baz'] + df = pd.DataFrame({'x': pd.Categorical.from_codes( + codes, categories=categories)}) + + buf = pa.BufferOutputStream() + pq.write_table(pa.table(df), buf) + + result = pq.read_table( + buf.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas() + assert result.x.dtype == 'category' + assert (result.x.cat.categories == categories).all() + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_pandas_preserve_extensiondtypes( + tempdir, use_legacy_dataset +): + # ARROW-8251 - preserve pandas extension dtypes in roundtrip + if Version(pd.__version__) < Version("1.0.0"): + pytest.skip("__arrow_array__ added to pandas in 1.0.0") + + df = pd.DataFrame({'part': 'a', "col": [1, 2, 3]}) + df['col'] = df['col'].astype("Int64") + table = pa.table(df) + + pq.write_to_dataset( + table, str(tempdir / "case1"), partition_cols=['part'], + use_legacy_dataset=use_legacy_dataset + ) + result = pq.read_table( + str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(result[["col"]], df[["col"]]) + + pq.write_to_dataset( + table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset + ) + result = pq.read_table( + str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(result[["col"]], df[["col"]]) + + pq.write_table(table, str(tempdir / "data.parquet")) + result = pq.read_table( + str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(result[["col"]], df[["col"]]) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_pandas_preserve_index(tempdir, use_legacy_dataset): + # ARROW-8251 - preserve pandas index in roundtrip + + df = pd.DataFrame({'part': ['a', 'a', 'b'], "col": [1, 2, 3]}) + df.index = pd.Index(['a', 'b', 'c'], name="idx") + table = pa.table(df) + df_cat = df[["col", "part"]].copy() + df_cat["part"] = df_cat["part"].astype("category") + + pq.write_to_dataset( + table, str(tempdir / "case1"), partition_cols=['part'], + use_legacy_dataset=use_legacy_dataset + ) + result = pq.read_table( + str(tempdir / "case1"), use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(result, df_cat) + + pq.write_to_dataset( + table, str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset + ) + result = pq.read_table( + str(tempdir / "case2"), use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(result, df) + + pq.write_table(table, str(tempdir / "data.parquet")) + result = pq.read_table( + str(tempdir / "data.parquet"), use_legacy_dataset=use_legacy_dataset + ).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@pytest.mark.parametrize('preserve_index', [True, False, None]) +def test_dataset_read_pandas_common_metadata(tempdir, preserve_index): + # ARROW-1103 + nfiles = 5 + size = 5 + + dirpath = tempdir / guid() + dirpath.mkdir() + + test_data = [] + frames = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(size, seed=i) + df.index = pd.Index(np.arange(i * size, (i + 1) * size), name='index') + + path = dirpath / '{}.parquet'.format(i) + + table = pa.Table.from_pandas(df, preserve_index=preserve_index) + + # Obliterate metadata + table = table.replace_schema_metadata(None) + assert table.schema.metadata is None + + _write_table(table, path) + test_data.append(table) + frames.append(df) + paths.append(path) + + # Write _metadata common file + table_for_metadata = pa.Table.from_pandas( + df, preserve_index=preserve_index + ) + pq.write_metadata(table_for_metadata.schema, dirpath / '_metadata') + + dataset = pq.ParquetDataset(dirpath) + columns = ['uint8', 'strings'] + result = dataset.read_pandas(columns=columns).to_pandas() + expected = pd.concat([x[columns] for x in frames]) + expected.index.name = ( + df.index.name if preserve_index is not False else None) + tm.assert_frame_equal(result, expected) + + +@pytest.mark.pandas +def test_read_pandas_passthrough_keywords(tempdir): + # ARROW-11464 - previously not all keywords were passed through (such as + # the filesystem keyword) + df = pd.DataFrame({'a': [1, 2, 3]}) + + filename = tempdir / 'data.parquet' + _write_table(df, filename) + + result = pq.read_pandas( + 'data.parquet', + filesystem=SubTreeFileSystem(str(tempdir), LocalFileSystem()) + ) + assert result.equals(pa.table(df)) diff --git a/src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py b/src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py new file mode 100644 index 000000000..3a4d4aaa2 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_parquet_file.py @@ -0,0 +1,276 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +import os + +import pytest + +import pyarrow as pa + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _write_table +except ImportError: + pq = None + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.parquet.common import alltypes_sample +except ImportError: + pd = tm = None + +pytestmark = pytest.mark.parquet + + +@pytest.mark.pandas +def test_pass_separate_metadata(): + # ARROW-471 + df = alltypes_sample(size=10000) + + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, compression='snappy', version='2.6') + + buf.seek(0) + metadata = pq.read_metadata(buf) + + buf.seek(0) + + fileh = pq.ParquetFile(buf, metadata=metadata) + + tm.assert_frame_equal(df, fileh.read().to_pandas()) + + +@pytest.mark.pandas +def test_read_single_row_group(): + # ARROW-471 + N, K = 10000, 4 + df = alltypes_sample(size=N) + + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.6') + + buf.seek(0) + + pf = pq.ParquetFile(buf) + + assert pf.num_row_groups == K + + row_groups = [pf.read_row_group(i) for i in range(K)] + result = pa.concat_tables(row_groups) + tm.assert_frame_equal(df, result.to_pandas()) + + +@pytest.mark.pandas +def test_read_single_row_group_with_column_subset(): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.6') + + buf.seek(0) + pf = pq.ParquetFile(buf) + + cols = list(df.columns[:2]) + row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)] + result = pa.concat_tables(row_groups) + tm.assert_frame_equal(df[cols], result.to_pandas()) + + # ARROW-4267: Selection of duplicate columns still leads to these columns + # being read uniquely. + row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)] + result = pa.concat_tables(row_groups) + tm.assert_frame_equal(df[cols], result.to_pandas()) + + +@pytest.mark.pandas +def test_read_multiple_row_groups(): + N, K = 10000, 4 + df = alltypes_sample(size=N) + + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.6') + + buf.seek(0) + + pf = pq.ParquetFile(buf) + + assert pf.num_row_groups == K + + result = pf.read_row_groups(range(K)) + tm.assert_frame_equal(df, result.to_pandas()) + + +@pytest.mark.pandas +def test_read_multiple_row_groups_with_column_subset(): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.6') + + buf.seek(0) + pf = pq.ParquetFile(buf) + + cols = list(df.columns[:2]) + result = pf.read_row_groups(range(K), columns=cols) + tm.assert_frame_equal(df[cols], result.to_pandas()) + + # ARROW-4267: Selection of duplicate columns still leads to these columns + # being read uniquely. + result = pf.read_row_groups(range(K), columns=cols + cols) + tm.assert_frame_equal(df[cols], result.to_pandas()) + + +@pytest.mark.pandas +def test_scan_contents(): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.6') + + buf.seek(0) + pf = pq.ParquetFile(buf) + + assert pf.scan_contents() == 10000 + assert pf.scan_contents(df.columns[:4]) == 10000 + + +def test_parquet_file_pass_directory_instead_of_file(tempdir): + # ARROW-7208 + path = tempdir / 'directory' + os.mkdir(str(path)) + + with pytest.raises(IOError, match="Expected file path"): + pq.ParquetFile(path) + + +def test_read_column_invalid_index(): + table = pa.table([pa.array([4, 5]), pa.array(["foo", "bar"])], + names=['ints', 'strs']) + bio = pa.BufferOutputStream() + pq.write_table(table, bio) + f = pq.ParquetFile(bio.getvalue()) + assert f.reader.read_column(0).to_pylist() == [4, 5] + assert f.reader.read_column(1).to_pylist() == ["foo", "bar"] + for index in (-1, 2): + with pytest.raises((ValueError, IndexError)): + f.reader.read_column(index) + + +@pytest.mark.pandas +@pytest.mark.parametrize('batch_size', [300, 1000, 1300]) +def test_iter_batches_columns_reader(tempdir, batch_size): + total_size = 3000 + chunk_size = 1000 + # TODO: Add categorical support + df = alltypes_sample(size=total_size) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + _write_table(arrow_table, filename, version='2.6', + coerce_timestamps='ms', chunk_size=chunk_size) + + file_ = pq.ParquetFile(filename) + for columns in [df.columns[:10], df.columns[10:]]: + batches = file_.iter_batches(batch_size=batch_size, columns=columns) + batch_starts = range(0, total_size+batch_size, batch_size) + for batch, start in zip(batches, batch_starts): + end = min(total_size, start + batch_size) + tm.assert_frame_equal( + batch.to_pandas(), + df.iloc[start:end, :].loc[:, columns].reset_index(drop=True) + ) + + +@pytest.mark.pandas +@pytest.mark.parametrize('chunk_size', [1000]) +def test_iter_batches_reader(tempdir, chunk_size): + df = alltypes_sample(size=10000, categorical=True) + + filename = tempdir / 'pandas_roundtrip.parquet' + arrow_table = pa.Table.from_pandas(df) + assert arrow_table.schema.pandas_metadata is not None + + _write_table(arrow_table, filename, version='2.6', + coerce_timestamps='ms', chunk_size=chunk_size) + + file_ = pq.ParquetFile(filename) + + def get_all_batches(f): + for row_group in range(f.num_row_groups): + batches = f.iter_batches( + batch_size=900, + row_groups=[row_group], + ) + + for batch in batches: + yield batch + + batches = list(get_all_batches(file_)) + batch_no = 0 + + for i in range(file_.num_row_groups): + tm.assert_frame_equal( + batches[batch_no].to_pandas(), + file_.read_row_groups([i]).to_pandas().head(900) + ) + + batch_no += 1 + + tm.assert_frame_equal( + batches[batch_no].to_pandas().reset_index(drop=True), + file_.read_row_groups([i]).to_pandas().iloc[900:].reset_index( + drop=True + ) + ) + + batch_no += 1 + + +@pytest.mark.pandas +@pytest.mark.parametrize('pre_buffer', [False, True]) +def test_pre_buffer(pre_buffer): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.6') + + buf.seek(0) + pf = pq.ParquetFile(buf, pre_buffer=pre_buffer) + assert pf.read().num_rows == N diff --git a/src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py b/src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py new file mode 100644 index 000000000..9be7634c8 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -0,0 +1,278 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +import pyarrow as pa +from pyarrow import fs +from pyarrow.filesystem import FileSystem, LocalFileSystem +from pyarrow.tests.parquet.common import parametrize_legacy_dataset + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import _read_table, _test_dataframe +except ImportError: + pq = None + + +try: + import pandas as pd + import pandas.testing as tm + +except ImportError: + pd = tm = None + +pytestmark = pytest.mark.parquet + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_parquet_incremental_file_build(tempdir, use_legacy_dataset): + df = _test_dataframe(100) + df['unique_id'] = 0 + + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + out = pa.BufferOutputStream() + + writer = pq.ParquetWriter(out, arrow_table.schema, version='2.6') + + frames = [] + for i in range(10): + df['unique_id'] = i + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + writer.write_table(arrow_table) + + frames.append(df.copy()) + + writer.close() + + buf = out.getvalue() + result = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) + + expected = pd.concat(frames, ignore_index=True) + tm.assert_frame_equal(result.to_pandas(), expected) + + +def test_validate_schema_write_table(tempdir): + # ARROW-2926 + simple_fields = [ + pa.field('POS', pa.uint32()), + pa.field('desc', pa.string()) + ] + + simple_schema = pa.schema(simple_fields) + + # simple_table schema does not match simple_schema + simple_from_array = [pa.array([1]), pa.array(['bla'])] + simple_table = pa.Table.from_arrays(simple_from_array, ['POS', 'desc']) + + path = tempdir / 'simple_validate_schema.parquet' + + with pq.ParquetWriter(path, simple_schema, + version='2.6', + compression='snappy', flavor='spark') as w: + with pytest.raises(ValueError): + w.write_table(simple_table) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_parquet_writer_context_obj(tempdir, use_legacy_dataset): + df = _test_dataframe(100) + df['unique_id'] = 0 + + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + out = pa.BufferOutputStream() + + with pq.ParquetWriter(out, arrow_table.schema, version='2.6') as writer: + + frames = [] + for i in range(10): + df['unique_id'] = i + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + writer.write_table(arrow_table) + + frames.append(df.copy()) + + buf = out.getvalue() + result = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) + + expected = pd.concat(frames, ignore_index=True) + tm.assert_frame_equal(result.to_pandas(), expected) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_parquet_writer_context_obj_with_exception( + tempdir, use_legacy_dataset +): + df = _test_dataframe(100) + df['unique_id'] = 0 + + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + out = pa.BufferOutputStream() + error_text = 'Artificial Error' + + try: + with pq.ParquetWriter(out, + arrow_table.schema, + version='2.6') as writer: + + frames = [] + for i in range(10): + df['unique_id'] = i + arrow_table = pa.Table.from_pandas(df, preserve_index=False) + writer.write_table(arrow_table) + frames.append(df.copy()) + if i == 5: + raise ValueError(error_text) + except Exception as e: + assert str(e) == error_text + + buf = out.getvalue() + result = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) + + expected = pd.concat(frames, ignore_index=True) + tm.assert_frame_equal(result.to_pandas(), expected) + + +@pytest.mark.pandas +@pytest.mark.parametrize("filesystem", [ + None, + LocalFileSystem._get_instance(), + fs.LocalFileSystem(), +]) +def test_parquet_writer_filesystem_local(tempdir, filesystem): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + + with pq.ParquetWriter( + path, table.schema, filesystem=filesystem, version='2.6' + ) as writer: + writer.write_table(table) + + result = _read_table(path).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@pytest.mark.s3 +def test_parquet_writer_filesystem_s3(s3_example_fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, uri, path = s3_example_fs + + with pq.ParquetWriter( + path, table.schema, filesystem=fs, version='2.6' + ) as writer: + writer.write_table(table) + + result = _read_table(uri).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@pytest.mark.s3 +def test_parquet_writer_filesystem_s3_uri(s3_example_fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, uri, path = s3_example_fs + + with pq.ParquetWriter(uri, table.schema, version='2.6') as writer: + writer.write_table(table) + + result = _read_table(path, filesystem=fs).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +@pytest.mark.s3 +def test_parquet_writer_filesystem_s3fs(s3_example_s3fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, directory = s3_example_s3fs + path = directory + "/test.parquet" + + with pq.ParquetWriter( + path, table.schema, filesystem=fs, version='2.6' + ) as writer: + writer.write_table(table) + + result = _read_table(path, filesystem=fs).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +def test_parquet_writer_filesystem_buffer_raises(): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + filesystem = fs.LocalFileSystem() + + # Should raise ValueError when filesystem is passed with file-like object + with pytest.raises(ValueError, match="specified path is file-like"): + pq.ParquetWriter( + pa.BufferOutputStream(), table.schema, filesystem=filesystem + ) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset): + out = pa.BufferOutputStream() + + class CustomFS(FileSystem): + def __init__(self): + self.path = None + self.mode = None + + def open(self, path, mode='rb'): + self.path = path + self.mode = mode + return out + + fs = CustomFS() + fname = 'expected_fname.parquet' + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.6') \ + as writer: + writer.write_table(table) + + assert fs.path == fname + assert fs.mode == 'wb' + assert out.closed + + buf = out.getvalue() + table_read = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) + df_read = table_read.to_pandas() + tm.assert_frame_equal(df_read, df) + + # Should raise ValueError when filesystem is passed with file-like object + with pytest.raises(ValueError) as err_info: + pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs) + expected_msg = ("filesystem passed but where is file-like, so" + " there is nothing to open with filesystem.") + assert str(err_info) == expected_msg |