summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/tests/test_fs.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/python/pyarrow/tests/test_fs.py')
-rw-r--r--src/arrow/python/pyarrow/tests/test_fs.py1714
1 files changed, 1714 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/tests/test_fs.py b/src/arrow/python/pyarrow/tests/test_fs.py
new file mode 100644
index 000000000..48bdae8a5
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/test_fs.py
@@ -0,0 +1,1714 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timezone, timedelta
+import gzip
+import os
+import pathlib
+import pickle
+import subprocess
+import sys
+import time
+
+import pytest
+import weakref
+
+import pyarrow as pa
+from pyarrow.tests.test_io import assert_file_not_found
+from pyarrow.tests.util import _filesystem_uri, ProxyHandler
+from pyarrow.vendored.version import Version
+
+from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem,
+ LocalFileSystem, SubTreeFileSystem, _MockFileSystem,
+ FileSystemHandler, PyFileSystem, FSSpecHandler,
+ copy_files)
+
+
+class DummyHandler(FileSystemHandler):
+ def __init__(self, value=42):
+ self._value = value
+
+ def __eq__(self, other):
+ if isinstance(other, FileSystemHandler):
+ return self._value == other._value
+ return NotImplemented
+
+ def __ne__(self, other):
+ if isinstance(other, FileSystemHandler):
+ return self._value != other._value
+ return NotImplemented
+
+ def get_type_name(self):
+ return "dummy"
+
+ def normalize_path(self, path):
+ return path
+
+ def get_file_info(self, paths):
+ info = []
+ for path in paths:
+ if "file" in path:
+ info.append(FileInfo(path, FileType.File))
+ elif "dir" in path:
+ info.append(FileInfo(path, FileType.Directory))
+ elif "notfound" in path:
+ info.append(FileInfo(path, FileType.NotFound))
+ elif "badtype" in path:
+ # Will raise when converting
+ info.append(object())
+ else:
+ raise IOError
+ return info
+
+ def get_file_info_selector(self, selector):
+ if selector.base_dir != "somedir":
+ if selector.allow_not_found:
+ return []
+ else:
+ raise FileNotFoundError(selector.base_dir)
+ infos = [
+ FileInfo("somedir/file1", FileType.File, size=123),
+ FileInfo("somedir/subdir1", FileType.Directory),
+ ]
+ if selector.recursive:
+ infos += [
+ FileInfo("somedir/subdir1/file2", FileType.File, size=456),
+ ]
+ return infos
+
+ def create_dir(self, path, recursive):
+ if path == "recursive":
+ assert recursive is True
+ elif path == "non-recursive":
+ assert recursive is False
+ else:
+ raise IOError
+
+ def delete_dir(self, path):
+ assert path == "delete_dir"
+
+ def delete_dir_contents(self, path):
+ if not path.strip("/"):
+ raise ValueError
+ assert path == "delete_dir_contents"
+
+ def delete_root_dir_contents(self):
+ pass
+
+ def delete_file(self, path):
+ assert path == "delete_file"
+
+ def move(self, src, dest):
+ assert src == "move_from"
+ assert dest == "move_to"
+
+ def copy_file(self, src, dest):
+ assert src == "copy_file_from"
+ assert dest == "copy_file_to"
+
+ def open_input_stream(self, path):
+ if "notfound" in path:
+ raise FileNotFoundError(path)
+ data = "{0}:input_stream".format(path).encode('utf8')
+ return pa.BufferReader(data)
+
+ def open_input_file(self, path):
+ if "notfound" in path:
+ raise FileNotFoundError(path)
+ data = "{0}:input_file".format(path).encode('utf8')
+ return pa.BufferReader(data)
+
+ def open_output_stream(self, path, metadata):
+ if "notfound" in path:
+ raise FileNotFoundError(path)
+ return pa.BufferOutputStream()
+
+ def open_append_stream(self, path, metadata):
+ if "notfound" in path:
+ raise FileNotFoundError(path)
+ return pa.BufferOutputStream()
+
+
+@pytest.fixture
+def localfs(request, tempdir):
+ return dict(
+ fs=LocalFileSystem(),
+ pathfn=lambda p: (tempdir / p).as_posix(),
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def py_localfs(request, tempdir):
+ return dict(
+ fs=PyFileSystem(ProxyHandler(LocalFileSystem())),
+ pathfn=lambda p: (tempdir / p).as_posix(),
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def mockfs(request):
+ return dict(
+ fs=_MockFileSystem(),
+ pathfn=lambda p: p,
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def py_mockfs(request):
+ return dict(
+ fs=PyFileSystem(ProxyHandler(_MockFileSystem())),
+ pathfn=lambda p: p,
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def localfs_with_mmap(request, tempdir):
+ return dict(
+ fs=LocalFileSystem(use_mmap=True),
+ pathfn=lambda p: (tempdir / p).as_posix(),
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def subtree_localfs(request, tempdir, localfs):
+ return dict(
+ fs=SubTreeFileSystem(str(tempdir), localfs['fs']),
+ pathfn=lambda p: p,
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def s3fs(request, s3_server):
+ request.config.pyarrow.requires('s3')
+ from pyarrow.fs import S3FileSystem
+
+ host, port, access_key, secret_key = s3_server['connection']
+ bucket = 'pyarrow-filesystem/'
+
+ fs = S3FileSystem(
+ access_key=access_key,
+ secret_key=secret_key,
+ endpoint_override='{}:{}'.format(host, port),
+ scheme='http'
+ )
+ fs.create_dir(bucket)
+
+ yield dict(
+ fs=fs,
+ pathfn=bucket.__add__,
+ allow_move_dir=False,
+ allow_append_to_file=False,
+ )
+ fs.delete_dir(bucket)
+
+
+@pytest.fixture
+def subtree_s3fs(request, s3fs):
+ prefix = 'pyarrow-filesystem/prefix/'
+ return dict(
+ fs=SubTreeFileSystem(prefix, s3fs['fs']),
+ pathfn=prefix.__add__,
+ allow_move_dir=False,
+ allow_append_to_file=False,
+ )
+
+
+_minio_limited_policy = """{
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Effect": "Allow",
+ "Action": [
+ "s3:ListAllMyBuckets",
+ "s3:PutObject",
+ "s3:GetObject",
+ "s3:ListBucket",
+ "s3:PutObjectTagging",
+ "s3:DeleteObject",
+ "s3:GetObjectVersion"
+ ],
+ "Resource": [
+ "arn:aws:s3:::*"
+ ]
+ }
+ ]
+}"""
+
+
+def _run_mc_command(mcdir, *args):
+ full_args = ['mc', '-C', mcdir] + list(args)
+ proc = subprocess.Popen(full_args, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, encoding='utf-8')
+ retval = proc.wait(10)
+ cmd_str = ' '.join(full_args)
+ print(f'Cmd: {cmd_str}')
+ print(f' Return: {retval}')
+ print(f' Stdout: {proc.stdout.read()}')
+ print(f' Stderr: {proc.stderr.read()}')
+ if retval != 0:
+ raise ChildProcessError("Could not run mc")
+
+
+def _wait_for_minio_startup(mcdir, address, access_key, secret_key):
+ start = time.time()
+ while time.time() - start < 10:
+ try:
+ _run_mc_command(mcdir, 'alias', 'set', 'myminio',
+ f'http://{address}', access_key, secret_key)
+ return
+ except ChildProcessError:
+ time.sleep(1)
+ raise Exception("mc command could not connect to local minio")
+
+
+def _configure_limited_user(tmpdir, address, access_key, secret_key):
+ """
+ Attempts to use the mc command to configure the minio server
+ with a special user limited:limited123 which does not have
+ permission to create buckets. This mirrors some real life S3
+ configurations where users are given strict permissions.
+
+ Arrow S3 operations should still work in such a configuration
+ (e.g. see ARROW-13685)
+ """
+ try:
+ mcdir = os.path.join(tmpdir, 'mc')
+ os.mkdir(mcdir)
+ policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json')
+ with open(policy_path, mode='w') as policy_file:
+ policy_file.write(_minio_limited_policy)
+ # The s3_server fixture starts the minio process but
+ # it takes a few moments for the process to become available
+ _wait_for_minio_startup(mcdir, address, access_key, secret_key)
+ # These commands create a limited user with a specific
+ # policy and creates a sample bucket for that user to
+ # write to
+ _run_mc_command(mcdir, 'admin', 'policy', 'add',
+ 'myminio/', 'no-create-buckets', policy_path)
+ _run_mc_command(mcdir, 'admin', 'user', 'add',
+ 'myminio/', 'limited', 'limited123')
+ _run_mc_command(mcdir, 'admin', 'policy', 'set',
+ 'myminio', 'no-create-buckets', 'user=limited')
+ _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket')
+ return True
+ except FileNotFoundError:
+ # If mc is not found, skip these tests
+ return False
+
+
+@pytest.fixture(scope='session')
+def limited_s3_user(request, s3_server):
+ if sys.platform == 'win32':
+ # Can't rely on FileNotFound check because
+ # there is sometimes an mc command on Windows
+ # which is unrelated to the minio mc
+ pytest.skip('The mc command is not installed on Windows')
+ request.config.pyarrow.requires('s3')
+ tempdir = s3_server['tempdir']
+ host, port, access_key, secret_key = s3_server['connection']
+ address = '{}:{}'.format(host, port)
+ if not _configure_limited_user(tempdir, address, access_key, secret_key):
+ pytest.skip('Could not locate mc command to configure limited user')
+
+
+@pytest.fixture
+def hdfs(request, hdfs_connection):
+ request.config.pyarrow.requires('hdfs')
+ if not pa.have_libhdfs():
+ pytest.skip('Cannot locate libhdfs')
+
+ from pyarrow.fs import HadoopFileSystem
+
+ host, port, user = hdfs_connection
+ fs = HadoopFileSystem(host, port=port, user=user)
+
+ return dict(
+ fs=fs,
+ pathfn=lambda p: p,
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def py_fsspec_localfs(request, tempdir):
+ fsspec = pytest.importorskip("fsspec")
+ fs = fsspec.filesystem('file')
+ return dict(
+ fs=PyFileSystem(FSSpecHandler(fs)),
+ pathfn=lambda p: (tempdir / p).as_posix(),
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def py_fsspec_memoryfs(request, tempdir):
+ fsspec = pytest.importorskip("fsspec", minversion="0.7.5")
+ if fsspec.__version__ == "0.8.5":
+ # see https://issues.apache.org/jira/browse/ARROW-10934
+ pytest.skip("Bug in fsspec 0.8.5 for in-memory filesystem")
+ fs = fsspec.filesystem('memory')
+ return dict(
+ fs=PyFileSystem(FSSpecHandler(fs)),
+ pathfn=lambda p: p,
+ allow_move_dir=True,
+ allow_append_to_file=True,
+ )
+
+
+@pytest.fixture
+def py_fsspec_s3fs(request, s3_server):
+ s3fs = pytest.importorskip("s3fs")
+ if (sys.version_info < (3, 7) and
+ Version(s3fs.__version__) >= Version("0.5")):
+ pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7")
+
+ host, port, access_key, secret_key = s3_server['connection']
+ bucket = 'pyarrow-filesystem/'
+
+ fs = s3fs.S3FileSystem(
+ key=access_key,
+ secret=secret_key,
+ client_kwargs=dict(endpoint_url='http://{}:{}'.format(host, port))
+ )
+ fs = PyFileSystem(FSSpecHandler(fs))
+ fs.create_dir(bucket)
+
+ yield dict(
+ fs=fs,
+ pathfn=bucket.__add__,
+ allow_move_dir=False,
+ allow_append_to_file=True,
+ )
+ fs.delete_dir(bucket)
+
+
+@pytest.fixture(params=[
+ pytest.param(
+ pytest.lazy_fixture('localfs'),
+ id='LocalFileSystem()'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('localfs_with_mmap'),
+ id='LocalFileSystem(use_mmap=True)'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('subtree_localfs'),
+ id='SubTreeFileSystem(LocalFileSystem())'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('s3fs'),
+ id='S3FileSystem',
+ marks=pytest.mark.s3
+ ),
+ pytest.param(
+ pytest.lazy_fixture('hdfs'),
+ id='HadoopFileSystem',
+ marks=pytest.mark.hdfs
+ ),
+ pytest.param(
+ pytest.lazy_fixture('mockfs'),
+ id='_MockFileSystem()'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('py_localfs'),
+ id='PyFileSystem(ProxyHandler(LocalFileSystem()))'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('py_mockfs'),
+ id='PyFileSystem(ProxyHandler(_MockFileSystem()))'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('py_fsspec_localfs'),
+ id='PyFileSystem(FSSpecHandler(fsspec.LocalFileSystem()))'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('py_fsspec_memoryfs'),
+ id='PyFileSystem(FSSpecHandler(fsspec.filesystem("memory")))'
+ ),
+ pytest.param(
+ pytest.lazy_fixture('py_fsspec_s3fs'),
+ id='PyFileSystem(FSSpecHandler(s3fs.S3FileSystem()))',
+ marks=pytest.mark.s3
+ ),
+])
+def filesystem_config(request):
+ return request.param
+
+
+@pytest.fixture
+def fs(request, filesystem_config):
+ return filesystem_config['fs']
+
+
+@pytest.fixture
+def pathfn(request, filesystem_config):
+ return filesystem_config['pathfn']
+
+
+@pytest.fixture
+def allow_move_dir(request, filesystem_config):
+ return filesystem_config['allow_move_dir']
+
+
+@pytest.fixture
+def allow_append_to_file(request, filesystem_config):
+ return filesystem_config['allow_append_to_file']
+
+
+def check_mtime(file_info):
+ assert isinstance(file_info.mtime, datetime)
+ assert isinstance(file_info.mtime_ns, int)
+ assert file_info.mtime_ns >= 0
+ assert file_info.mtime_ns == pytest.approx(
+ file_info.mtime.timestamp() * 1e9)
+ # It's an aware UTC datetime
+ tzinfo = file_info.mtime.tzinfo
+ assert tzinfo is not None
+ assert tzinfo.utcoffset(None) == timedelta(0)
+
+
+def check_mtime_absent(file_info):
+ assert file_info.mtime is None
+ assert file_info.mtime_ns is None
+
+
+def check_mtime_or_absent(file_info):
+ if file_info.mtime is None:
+ check_mtime_absent(file_info)
+ else:
+ check_mtime(file_info)
+
+
+def skip_fsspec_s3fs(fs):
+ if fs.type_name == "py::fsspec+s3":
+ pytest.xfail(reason="Not working with fsspec's s3fs")
+
+
+@pytest.mark.s3
+def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user):
+ from pyarrow.fs import S3FileSystem
+
+ host, port, _, _ = s3_server['connection']
+
+ fs = S3FileSystem(
+ access_key='limited',
+ secret_key='limited123',
+ endpoint_override='{}:{}'.format(host, port),
+ scheme='http'
+ )
+ fs.create_dir('existing-bucket/test')
+
+
+def test_file_info_constructor():
+ dt = datetime.fromtimestamp(1568799826, timezone.utc)
+
+ info = FileInfo("foo/bar")
+ assert info.path == "foo/bar"
+ assert info.base_name == "bar"
+ assert info.type == FileType.Unknown
+ assert info.size is None
+ check_mtime_absent(info)
+
+ info = FileInfo("foo/baz.txt", type=FileType.File, size=123,
+ mtime=1568799826.5)
+ assert info.path == "foo/baz.txt"
+ assert info.base_name == "baz.txt"
+ assert info.type == FileType.File
+ assert info.size == 123
+ assert info.mtime_ns == 1568799826500000000
+ check_mtime(info)
+
+ info = FileInfo("foo", type=FileType.Directory, mtime=dt)
+ assert info.path == "foo"
+ assert info.base_name == "foo"
+ assert info.type == FileType.Directory
+ assert info.size is None
+ assert info.mtime == dt
+ assert info.mtime_ns == 1568799826000000000
+ check_mtime(info)
+
+
+def test_cannot_instantiate_base_filesystem():
+ with pytest.raises(TypeError):
+ FileSystem()
+
+
+def test_filesystem_equals():
+ fs0 = LocalFileSystem()
+ fs1 = LocalFileSystem()
+ fs2 = _MockFileSystem()
+
+ assert fs0.equals(fs0)
+ assert fs0.equals(fs1)
+ with pytest.raises(TypeError):
+ fs0.equals('string')
+ assert fs0 == fs0 == fs1
+ assert fs0 != 4
+
+ assert fs2 == fs2
+ assert fs2 != _MockFileSystem()
+
+ assert SubTreeFileSystem('/base', fs0) == SubTreeFileSystem('/base', fs0)
+ assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/base', fs2)
+ assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/other', fs0)
+
+
+def test_subtree_filesystem():
+ localfs = LocalFileSystem()
+
+ subfs = SubTreeFileSystem('/base', localfs)
+ assert subfs.base_path == '/base/'
+ assert subfs.base_fs == localfs
+ assert repr(subfs).startswith('SubTreeFileSystem(base_path=/base/, '
+ 'base_fs=<pyarrow._fs.LocalFileSystem')
+
+ subfs = SubTreeFileSystem('/another/base/', LocalFileSystem())
+ assert subfs.base_path == '/another/base/'
+ assert subfs.base_fs == localfs
+ assert repr(subfs).startswith('SubTreeFileSystem(base_path=/another/base/,'
+ ' base_fs=<pyarrow._fs.LocalFileSystem')
+
+
+def test_filesystem_pickling(fs):
+ if fs.type_name.split('::')[-1] == 'mock':
+ pytest.xfail(reason='MockFileSystem is not serializable')
+
+ serialized = pickle.dumps(fs)
+ restored = pickle.loads(serialized)
+ assert isinstance(restored, FileSystem)
+ assert restored.equals(fs)
+
+
+def test_filesystem_is_functional_after_pickling(fs, pathfn):
+ if fs.type_name.split('::')[-1] == 'mock':
+ pytest.xfail(reason='MockFileSystem is not serializable')
+ skip_fsspec_s3fs(fs)
+
+ aaa = pathfn('a/aa/aaa/')
+ bb = pathfn('a/bb')
+ c = pathfn('c.txt')
+
+ fs.create_dir(aaa)
+ with fs.open_output_stream(bb):
+ pass # touch
+ with fs.open_output_stream(c) as fp:
+ fp.write(b'test')
+
+ restored = pickle.loads(pickle.dumps(fs))
+ aaa_info, bb_info, c_info = restored.get_file_info([aaa, bb, c])
+ assert aaa_info.type == FileType.Directory
+ assert bb_info.type == FileType.File
+ assert c_info.type == FileType.File
+
+
+def test_type_name():
+ fs = LocalFileSystem()
+ assert fs.type_name == "local"
+ fs = _MockFileSystem()
+ assert fs.type_name == "mock"
+
+
+def test_normalize_path(fs):
+ # Trivial path names (without separators) should generally be
+ # already normalized. Just a sanity check.
+ assert fs.normalize_path("foo") == "foo"
+
+
+def test_non_path_like_input_raises(fs):
+ class Path:
+ pass
+
+ invalid_paths = [1, 1.1, Path(), tuple(), {}, [], lambda: 1,
+ pathlib.Path()]
+ for path in invalid_paths:
+ with pytest.raises(TypeError):
+ fs.create_dir(path)
+
+
+def test_get_file_info(fs, pathfn):
+ aaa = pathfn('a/aa/aaa/')
+ bb = pathfn('a/bb')
+ c = pathfn('c.txt')
+ zzz = pathfn('zzz')
+
+ fs.create_dir(aaa)
+ with fs.open_output_stream(bb):
+ pass # touch
+ with fs.open_output_stream(c) as fp:
+ fp.write(b'test')
+
+ aaa_info, bb_info, c_info, zzz_info = fs.get_file_info([aaa, bb, c, zzz])
+
+ assert aaa_info.path == aaa
+ assert 'aaa' in repr(aaa_info)
+ assert aaa_info.extension == ''
+ if fs.type_name == "py::fsspec+s3":
+ # s3fs doesn't create empty directories
+ assert aaa_info.type == FileType.NotFound
+ else:
+ assert aaa_info.type == FileType.Directory
+ assert 'FileType.Directory' in repr(aaa_info)
+ assert aaa_info.size is None
+ check_mtime_or_absent(aaa_info)
+
+ assert bb_info.path == str(bb)
+ assert bb_info.base_name == 'bb'
+ assert bb_info.extension == ''
+ assert bb_info.type == FileType.File
+ assert 'FileType.File' in repr(bb_info)
+ assert bb_info.size == 0
+ if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]:
+ check_mtime(bb_info)
+
+ assert c_info.path == str(c)
+ assert c_info.base_name == 'c.txt'
+ assert c_info.extension == 'txt'
+ assert c_info.type == FileType.File
+ assert 'FileType.File' in repr(c_info)
+ assert c_info.size == 4
+ if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]:
+ check_mtime(c_info)
+
+ assert zzz_info.path == str(zzz)
+ assert zzz_info.base_name == 'zzz'
+ assert zzz_info.extension == ''
+ assert zzz_info.type == FileType.NotFound
+ assert zzz_info.size is None
+ assert zzz_info.mtime is None
+ assert 'FileType.NotFound' in repr(zzz_info)
+ check_mtime_absent(zzz_info)
+
+ # with single path
+ aaa_info2 = fs.get_file_info(aaa)
+ assert aaa_info.path == aaa_info2.path
+ assert aaa_info.type == aaa_info2.type
+
+
+def test_get_file_info_with_selector(fs, pathfn):
+ base_dir = pathfn('selector-dir/')
+ file_a = pathfn('selector-dir/test_file_a')
+ file_b = pathfn('selector-dir/test_file_b')
+ dir_a = pathfn('selector-dir/test_dir_a')
+ file_c = pathfn('selector-dir/test_dir_a/test_file_c')
+ dir_b = pathfn('selector-dir/test_dir_b')
+
+ try:
+ fs.create_dir(base_dir)
+ with fs.open_output_stream(file_a):
+ pass
+ with fs.open_output_stream(file_b):
+ pass
+ fs.create_dir(dir_a)
+ with fs.open_output_stream(file_c):
+ pass
+ fs.create_dir(dir_b)
+
+ # recursive selector
+ selector = FileSelector(base_dir, allow_not_found=False,
+ recursive=True)
+ assert selector.base_dir == base_dir
+
+ infos = fs.get_file_info(selector)
+ if fs.type_name == "py::fsspec+s3":
+ # s3fs only lists directories if they are not empty, but depending
+ # on the s3fs/fsspec version combo, it includes the base_dir
+ # (https://github.com/dask/s3fs/issues/393)
+ assert (len(infos) == 4) or (len(infos) == 5)
+ else:
+ assert len(infos) == 5
+
+ for info in infos:
+ if (info.path.endswith(file_a) or info.path.endswith(file_b) or
+ info.path.endswith(file_c)):
+ assert info.type == FileType.File
+ elif (info.path.rstrip("/").endswith(dir_a) or
+ info.path.rstrip("/").endswith(dir_b)):
+ assert info.type == FileType.Directory
+ elif (fs.type_name == "py::fsspec+s3" and
+ info.path.rstrip("/").endswith("selector-dir")):
+ # s3fs can include base dir, see above
+ assert info.type == FileType.Directory
+ else:
+ raise ValueError('unexpected path {}'.format(info.path))
+ check_mtime_or_absent(info)
+
+ # non-recursive selector -> not selecting the nested file_c
+ selector = FileSelector(base_dir, recursive=False)
+
+ infos = fs.get_file_info(selector)
+ if fs.type_name == "py::fsspec+s3":
+ # s3fs only lists directories if they are not empty
+ # + for s3fs 0.5.2 all directories are dropped because of buggy
+ # side-effect of previous find() call
+ # (https://github.com/dask/s3fs/issues/410)
+ assert (len(infos) == 3) or (len(infos) == 2)
+ else:
+ assert len(infos) == 4
+
+ finally:
+ fs.delete_dir(base_dir)
+
+
+def test_create_dir(fs, pathfn):
+ # s3fs fails deleting dir fails if it is empty
+ # (https://github.com/dask/s3fs/issues/317)
+ skip_fsspec_s3fs(fs)
+ d = pathfn('test-directory/')
+
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(d)
+
+ fs.create_dir(d)
+ fs.delete_dir(d)
+
+ d = pathfn('deeply/nested/test-directory/')
+ fs.create_dir(d, recursive=True)
+ fs.delete_dir(d)
+
+
+def test_delete_dir(fs, pathfn):
+ skip_fsspec_s3fs(fs)
+
+ d = pathfn('directory/')
+ nd = pathfn('directory/nested/')
+
+ fs.create_dir(nd)
+ fs.delete_dir(d)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(nd)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(d)
+
+
+def test_delete_dir_contents(fs, pathfn):
+ skip_fsspec_s3fs(fs)
+
+ d = pathfn('directory/')
+ nd = pathfn('directory/nested/')
+
+ fs.create_dir(nd)
+ fs.delete_dir_contents(d)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(nd)
+ fs.delete_dir(d)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(d)
+
+
+def _check_root_dir_contents(config):
+ fs = config['fs']
+ pathfn = config['pathfn']
+
+ d = pathfn('directory/')
+ nd = pathfn('directory/nested/')
+
+ fs.create_dir(nd)
+ with pytest.raises(pa.ArrowInvalid):
+ fs.delete_dir_contents("")
+ with pytest.raises(pa.ArrowInvalid):
+ fs.delete_dir_contents("/")
+ with pytest.raises(pa.ArrowInvalid):
+ fs.delete_dir_contents("//")
+
+ fs.delete_dir_contents("", accept_root_dir=True)
+ fs.delete_dir_contents("/", accept_root_dir=True)
+ fs.delete_dir_contents("//", accept_root_dir=True)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(d)
+
+
+def test_delete_root_dir_contents(mockfs, py_mockfs):
+ _check_root_dir_contents(mockfs)
+ _check_root_dir_contents(py_mockfs)
+
+
+def test_copy_file(fs, pathfn):
+ s = pathfn('test-copy-source-file')
+ t = pathfn('test-copy-target-file')
+
+ with fs.open_output_stream(s):
+ pass
+
+ fs.copy_file(s, t)
+ fs.delete_file(s)
+ fs.delete_file(t)
+
+
+def test_move_directory(fs, pathfn, allow_move_dir):
+ # move directory (doesn't work with S3)
+ s = pathfn('source-dir/')
+ t = pathfn('target-dir/')
+
+ fs.create_dir(s)
+
+ if allow_move_dir:
+ fs.move(s, t)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_dir(s)
+ fs.delete_dir(t)
+ else:
+ with pytest.raises(pa.ArrowIOError):
+ fs.move(s, t)
+
+
+def test_move_file(fs, pathfn):
+ # s3fs moving a file with recursive=True on latest 0.5 version
+ # (https://github.com/dask/s3fs/issues/394)
+ skip_fsspec_s3fs(fs)
+
+ s = pathfn('test-move-source-file')
+ t = pathfn('test-move-target-file')
+
+ with fs.open_output_stream(s):
+ pass
+
+ fs.move(s, t)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_file(s)
+ fs.delete_file(t)
+
+
+def test_delete_file(fs, pathfn):
+ p = pathfn('test-delete-target-file')
+ with fs.open_output_stream(p):
+ pass
+
+ fs.delete_file(p)
+ with pytest.raises(pa.ArrowIOError):
+ fs.delete_file(p)
+
+ d = pathfn('test-delete-nested')
+ fs.create_dir(d)
+ f = pathfn('test-delete-nested/target-file')
+ with fs.open_output_stream(f) as s:
+ s.write(b'data')
+
+ fs.delete_dir(d)
+
+
+def identity(v):
+ return v
+
+
+@pytest.mark.gzip
+@pytest.mark.parametrize(
+ ('compression', 'buffer_size', 'compressor'),
+ [
+ (None, None, identity),
+ (None, 64, identity),
+ ('gzip', None, gzip.compress),
+ ('gzip', 256, gzip.compress),
+ ]
+)
+def test_open_input_stream(fs, pathfn, compression, buffer_size, compressor):
+ p = pathfn('open-input-stream')
+
+ data = b'some data for reading\n' * 512
+ with fs.open_output_stream(p) as s:
+ s.write(compressor(data))
+
+ with fs.open_input_stream(p, compression, buffer_size) as s:
+ result = s.read()
+
+ assert result == data
+
+
+def test_open_input_file(fs, pathfn):
+ p = pathfn('open-input-file')
+
+ data = b'some data' * 1024
+ with fs.open_output_stream(p) as s:
+ s.write(data)
+
+ read_from = len(b'some data') * 512
+ with fs.open_input_file(p) as f:
+ f.seek(read_from)
+ result = f.read()
+
+ assert result == data[read_from:]
+
+
+@pytest.mark.gzip
+@pytest.mark.parametrize(
+ ('compression', 'buffer_size', 'decompressor'),
+ [
+ (None, None, identity),
+ (None, 64, identity),
+ ('gzip', None, gzip.decompress),
+ ('gzip', 256, gzip.decompress),
+ ]
+)
+def test_open_output_stream(fs, pathfn, compression, buffer_size,
+ decompressor):
+ p = pathfn('open-output-stream')
+
+ data = b'some data for writing' * 1024
+ with fs.open_output_stream(p, compression, buffer_size) as f:
+ f.write(data)
+
+ with fs.open_input_stream(p, compression, buffer_size) as f:
+ assert f.read(len(data)) == data
+
+
+@pytest.mark.gzip
+@pytest.mark.parametrize(
+ ('compression', 'buffer_size', 'compressor', 'decompressor'),
+ [
+ (None, None, identity, identity),
+ (None, 64, identity, identity),
+ ('gzip', None, gzip.compress, gzip.decompress),
+ ('gzip', 256, gzip.compress, gzip.decompress),
+ ]
+)
+@pytest.mark.filterwarnings("ignore::FutureWarning")
+def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor,
+ decompressor, allow_append_to_file):
+ p = pathfn('open-append-stream')
+
+ initial = compressor(b'already existing')
+ with fs.open_output_stream(p) as s:
+ s.write(initial)
+
+ if allow_append_to_file:
+ with fs.open_append_stream(p, compression=compression,
+ buffer_size=buffer_size) as f:
+ f.write(b'\nnewly added')
+
+ with fs.open_input_stream(p) as f:
+ result = f.read()
+
+ result = decompressor(result)
+ assert result == b'already existing\nnewly added'
+ else:
+ with pytest.raises(pa.ArrowNotImplementedError):
+ fs.open_append_stream(p, compression=compression,
+ buffer_size=buffer_size)
+
+
+def test_open_output_stream_metadata(fs, pathfn):
+ p = pathfn('open-output-stream-metadata')
+ metadata = {'Content-Type': 'x-pyarrow/test'}
+
+ data = b'some data'
+ with fs.open_output_stream(p, metadata=metadata) as f:
+ f.write(data)
+
+ with fs.open_input_stream(p) as f:
+ assert f.read() == data
+ got_metadata = f.metadata()
+
+ if fs.type_name == 's3' or 'mock' in fs.type_name:
+ for k, v in metadata.items():
+ assert got_metadata[k] == v.encode()
+ else:
+ assert got_metadata == {}
+
+
+def test_localfs_options():
+ # LocalFileSystem instantiation
+ LocalFileSystem(use_mmap=False)
+
+ with pytest.raises(TypeError):
+ LocalFileSystem(xxx=False)
+
+
+def test_localfs_errors(localfs):
+ # Local filesystem errors should raise the right Python exceptions
+ # (e.g. FileNotFoundError)
+ fs = localfs['fs']
+ with assert_file_not_found():
+ fs.open_input_stream('/non/existent/file')
+ with assert_file_not_found():
+ fs.open_output_stream('/non/existent/file')
+ with assert_file_not_found():
+ fs.create_dir('/non/existent/dir', recursive=False)
+ with assert_file_not_found():
+ fs.delete_dir('/non/existent/dir')
+ with assert_file_not_found():
+ fs.delete_file('/non/existent/dir')
+ with assert_file_not_found():
+ fs.move('/non/existent', '/xxx')
+ with assert_file_not_found():
+ fs.copy_file('/non/existent', '/xxx')
+
+
+def test_localfs_file_info(localfs):
+ fs = localfs['fs']
+
+ file_path = pathlib.Path(__file__)
+ dir_path = file_path.parent
+ [file_info, dir_info] = fs.get_file_info([file_path.as_posix(),
+ dir_path.as_posix()])
+ assert file_info.size == file_path.stat().st_size
+ assert file_info.mtime_ns == file_path.stat().st_mtime_ns
+ check_mtime(file_info)
+ assert dir_info.mtime_ns == dir_path.stat().st_mtime_ns
+ check_mtime(dir_info)
+
+
+def test_mockfs_mtime_roundtrip(mockfs):
+ dt = datetime.fromtimestamp(1568799826, timezone.utc)
+ fs = _MockFileSystem(dt)
+
+ with fs.open_output_stream('foo'):
+ pass
+ [info] = fs.get_file_info(['foo'])
+ assert info.mtime == dt
+
+
+@pytest.mark.s3
+def test_s3_options():
+ from pyarrow.fs import S3FileSystem
+
+ fs = S3FileSystem(access_key='access', secret_key='secret',
+ session_token='token', region='us-east-2',
+ scheme='https', endpoint_override='localhost:8999')
+ assert isinstance(fs, S3FileSystem)
+ assert fs.region == 'us-east-2'
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = S3FileSystem(role_arn='role', session_name='session',
+ external_id='id', load_frequency=100)
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = S3FileSystem(anonymous=True)
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = S3FileSystem(background_writes=True,
+ default_metadata={"ACL": "authenticated-read",
+ "Content-Type": "text/plain"})
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ with pytest.raises(ValueError):
+ S3FileSystem(access_key='access')
+ with pytest.raises(ValueError):
+ S3FileSystem(secret_key='secret')
+ with pytest.raises(ValueError):
+ S3FileSystem(access_key='access', session_token='token')
+ with pytest.raises(ValueError):
+ S3FileSystem(secret_key='secret', session_token='token')
+ with pytest.raises(ValueError):
+ S3FileSystem(
+ access_key='access', secret_key='secret', role_arn='arn'
+ )
+ with pytest.raises(ValueError):
+ S3FileSystem(
+ access_key='access', secret_key='secret', anonymous=True
+ )
+ with pytest.raises(ValueError):
+ S3FileSystem(role_arn="arn", anonymous=True)
+ with pytest.raises(ValueError):
+ S3FileSystem(default_metadata=["foo", "bar"])
+
+
+@pytest.mark.s3
+def test_s3_proxy_options(monkeypatch):
+ from pyarrow.fs import S3FileSystem
+
+ # The following two are equivalent:
+ proxy_opts_1_dict = {'scheme': 'http', 'host': 'localhost', 'port': 8999}
+ proxy_opts_1_str = 'http://localhost:8999'
+ # The following two are equivalent:
+ proxy_opts_2_dict = {'scheme': 'https', 'host': 'localhost', 'port': 8080}
+ proxy_opts_2_str = 'https://localhost:8080'
+
+ # Check dict case for 'proxy_options'
+ fs = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ # Check str case for 'proxy_options'
+ fs = S3FileSystem(proxy_options=proxy_opts_1_str)
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = S3FileSystem(proxy_options=proxy_opts_2_str)
+ assert isinstance(fs, S3FileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ # Check that two FSs using the same proxy_options dict are equal
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ assert fs1 == fs2
+ assert pickle.loads(pickle.dumps(fs1)) == fs2
+ assert pickle.loads(pickle.dumps(fs2)) == fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ assert fs1 == fs2
+ assert pickle.loads(pickle.dumps(fs1)) == fs2
+ assert pickle.loads(pickle.dumps(fs2)) == fs1
+
+ # Check that two FSs using the same proxy_options str are equal
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_str)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_1_str)
+ assert fs1 == fs2
+ assert pickle.loads(pickle.dumps(fs1)) == fs2
+ assert pickle.loads(pickle.dumps(fs2)) == fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_2_str)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_str)
+ assert fs1 == fs2
+ assert pickle.loads(pickle.dumps(fs1)) == fs2
+ assert pickle.loads(pickle.dumps(fs2)) == fs1
+
+ # Check that two FSs using equivalent proxy_options
+ # (one dict, one str) are equal
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_1_str)
+ assert fs1 == fs2
+ assert pickle.loads(pickle.dumps(fs1)) == fs2
+ assert pickle.loads(pickle.dumps(fs2)) == fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_str)
+ assert fs1 == fs2
+ assert pickle.loads(pickle.dumps(fs1)) == fs2
+ assert pickle.loads(pickle.dumps(fs2)) == fs1
+
+ # Check that two FSs using nonequivalent proxy_options are not equal
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_str)
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_str)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_str)
+ fs2 = S3FileSystem(proxy_options=proxy_opts_2_str)
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ # Check that two FSs (one using proxy_options and the other not)
+ # are not equal
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict)
+ fs2 = S3FileSystem()
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_1_str)
+ fs2 = S3FileSystem()
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict)
+ fs2 = S3FileSystem()
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ fs1 = S3FileSystem(proxy_options=proxy_opts_2_str)
+ fs2 = S3FileSystem()
+ assert fs1 != fs2
+ assert pickle.loads(pickle.dumps(fs1)) != fs2
+ assert pickle.loads(pickle.dumps(fs2)) != fs1
+
+ # Only dict and str are supported
+ with pytest.raises(TypeError):
+ S3FileSystem(proxy_options=('http', 'localhost', 9090))
+ # Missing scheme
+ with pytest.raises(KeyError):
+ S3FileSystem(proxy_options={'host': 'localhost', 'port': 9090})
+ # Missing host
+ with pytest.raises(KeyError):
+ S3FileSystem(proxy_options={'scheme': 'https', 'port': 9090})
+ # Missing port
+ with pytest.raises(KeyError):
+ S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost'})
+ # Invalid proxy URI (invalid scheme htttps)
+ with pytest.raises(pa.ArrowInvalid):
+ S3FileSystem(proxy_options='htttps://localhost:9000')
+ # Invalid proxy_options dict (invalid scheme htttps)
+ with pytest.raises(pa.ArrowInvalid):
+ S3FileSystem(proxy_options={'scheme': 'htttp', 'host': 'localhost',
+ 'port': 8999})
+
+
+@pytest.mark.hdfs
+def test_hdfs_options(hdfs_connection):
+ from pyarrow.fs import HadoopFileSystem
+ if not pa.have_libhdfs():
+ pytest.skip('Cannot locate libhdfs')
+
+ host, port, user = hdfs_connection
+
+ replication = 2
+ buffer_size = 64*1024
+ default_block_size = 128*1024**2
+ uri = ('hdfs://{}:{}/?user={}&replication={}&buffer_size={}'
+ '&default_block_size={}')
+
+ hdfs1 = HadoopFileSystem(host, port, user='libhdfs',
+ replication=replication, buffer_size=buffer_size,
+ default_block_size=default_block_size)
+ hdfs2 = HadoopFileSystem.from_uri(uri.format(
+ host, port, 'libhdfs', replication, buffer_size, default_block_size
+ ))
+ hdfs3 = HadoopFileSystem.from_uri(uri.format(
+ host, port, 'me', replication, buffer_size, default_block_size
+ ))
+ hdfs4 = HadoopFileSystem.from_uri(uri.format(
+ host, port, 'me', replication + 1, buffer_size, default_block_size
+ ))
+ hdfs5 = HadoopFileSystem(host, port)
+ hdfs6 = HadoopFileSystem.from_uri('hdfs://{}:{}'.format(host, port))
+ hdfs7 = HadoopFileSystem(host, port, user='localuser')
+ hdfs8 = HadoopFileSystem(host, port, user='localuser',
+ kerb_ticket="cache_path")
+ hdfs9 = HadoopFileSystem(host, port, user='localuser',
+ kerb_ticket=pathlib.Path("cache_path"))
+ hdfs10 = HadoopFileSystem(host, port, user='localuser',
+ kerb_ticket="cache_path2")
+ hdfs11 = HadoopFileSystem(host, port, user='localuser',
+ kerb_ticket="cache_path",
+ extra_conf={'hdfs_token': 'abcd'})
+
+ assert hdfs1 == hdfs2
+ assert hdfs5 == hdfs6
+ assert hdfs6 != hdfs7
+ assert hdfs2 != hdfs3
+ assert hdfs3 != hdfs4
+ assert hdfs7 != hdfs5
+ assert hdfs2 != hdfs3
+ assert hdfs3 != hdfs4
+ assert hdfs7 != hdfs8
+ assert hdfs8 == hdfs9
+ assert hdfs10 != hdfs9
+ assert hdfs11 != hdfs8
+
+ with pytest.raises(TypeError):
+ HadoopFileSystem()
+ with pytest.raises(TypeError):
+ HadoopFileSystem.from_uri(3)
+
+ for fs in [hdfs1, hdfs2, hdfs3, hdfs4, hdfs5, hdfs6, hdfs7, hdfs8,
+ hdfs9, hdfs10, hdfs11]:
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ host, port, user = hdfs_connection
+
+ hdfs = HadoopFileSystem(host, port, user=user)
+ assert hdfs.get_file_info(FileSelector('/'))
+
+ hdfs = HadoopFileSystem.from_uri(
+ "hdfs://{}:{}/?user={}".format(host, port, user)
+ )
+ assert hdfs.get_file_info(FileSelector('/'))
+
+
+@pytest.mark.parametrize(('uri', 'expected_klass', 'expected_path'), [
+ # leading slashes are removed intentionally, because MockFileSystem doesn't
+ # have a distinction between relative and absolute paths
+ ('mock:', _MockFileSystem, ''),
+ ('mock:foo/bar', _MockFileSystem, 'foo/bar'),
+ ('mock:/foo/bar', _MockFileSystem, 'foo/bar'),
+ ('mock:///foo/bar', _MockFileSystem, 'foo/bar'),
+ ('file:/', LocalFileSystem, '/'),
+ ('file:///', LocalFileSystem, '/'),
+ ('file:/foo/bar', LocalFileSystem, '/foo/bar'),
+ ('file:///foo/bar', LocalFileSystem, '/foo/bar'),
+ ('/', LocalFileSystem, '/'),
+ ('/foo/bar', LocalFileSystem, '/foo/bar'),
+])
+def test_filesystem_from_uri(uri, expected_klass, expected_path):
+ fs, path = FileSystem.from_uri(uri)
+ assert isinstance(fs, expected_klass)
+ assert path == expected_path
+
+
+@pytest.mark.parametrize(
+ 'path',
+ ['', '/', 'foo/bar', '/foo/bar', __file__]
+)
+def test_filesystem_from_path_object(path):
+ p = pathlib.Path(path)
+ fs, path = FileSystem.from_uri(p)
+ assert isinstance(fs, LocalFileSystem)
+ assert path == p.resolve().absolute().as_posix()
+
+
+@pytest.mark.s3
+def test_filesystem_from_uri_s3(s3_server):
+ from pyarrow.fs import S3FileSystem
+
+ host, port, access_key, secret_key = s3_server['connection']
+
+ uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}" \
+ .format(access_key, secret_key, host, port)
+
+ fs, path = FileSystem.from_uri(uri)
+ assert isinstance(fs, S3FileSystem)
+ assert path == "mybucket/foo/bar"
+
+ fs.create_dir(path)
+ [info] = fs.get_file_info([path])
+ assert info.path == path
+ assert info.type == FileType.Directory
+
+
+def test_py_filesystem():
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+ assert isinstance(fs, PyFileSystem)
+ assert fs.type_name == "py::dummy"
+ assert fs.handler is handler
+
+ with pytest.raises(TypeError):
+ PyFileSystem(None)
+
+
+def test_py_filesystem_equality():
+ handler1 = DummyHandler(1)
+ handler2 = DummyHandler(2)
+ handler3 = DummyHandler(2)
+ fs1 = PyFileSystem(handler1)
+ fs2 = PyFileSystem(handler1)
+ fs3 = PyFileSystem(handler2)
+ fs4 = PyFileSystem(handler3)
+
+ assert fs2 is not fs1
+ assert fs3 is not fs2
+ assert fs4 is not fs3
+ assert fs2 == fs1 # Same handler
+ assert fs3 != fs2 # Unequal handlers
+ assert fs4 == fs3 # Equal handlers
+
+ assert fs1 != LocalFileSystem()
+ assert fs1 != object()
+
+
+def test_py_filesystem_pickling():
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+
+ serialized = pickle.dumps(fs)
+ restored = pickle.loads(serialized)
+ assert isinstance(restored, FileSystem)
+ assert restored == fs
+ assert restored.handler == handler
+ assert restored.type_name == "py::dummy"
+
+
+def test_py_filesystem_lifetime():
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+ assert isinstance(fs, PyFileSystem)
+ wr = weakref.ref(handler)
+ handler = None
+ assert wr() is not None
+ fs = None
+ assert wr() is None
+
+ # Taking the .handler attribute doesn't wreck reference counts
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+ wr = weakref.ref(handler)
+ handler = None
+ assert wr() is fs.handler
+ assert wr() is not None
+ fs = None
+ assert wr() is None
+
+
+def test_py_filesystem_get_file_info():
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+
+ [info] = fs.get_file_info(['some/dir'])
+ assert info.path == 'some/dir'
+ assert info.type == FileType.Directory
+
+ [info] = fs.get_file_info(['some/file'])
+ assert info.path == 'some/file'
+ assert info.type == FileType.File
+
+ [info] = fs.get_file_info(['notfound'])
+ assert info.path == 'notfound'
+ assert info.type == FileType.NotFound
+
+ with pytest.raises(TypeError):
+ fs.get_file_info(['badtype'])
+
+ with pytest.raises(IOError):
+ fs.get_file_info(['xxx'])
+
+
+def test_py_filesystem_get_file_info_selector():
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+
+ selector = FileSelector(base_dir="somedir")
+ infos = fs.get_file_info(selector)
+ assert len(infos) == 2
+ assert infos[0].path == "somedir/file1"
+ assert infos[0].type == FileType.File
+ assert infos[0].size == 123
+ assert infos[1].path == "somedir/subdir1"
+ assert infos[1].type == FileType.Directory
+ assert infos[1].size is None
+
+ selector = FileSelector(base_dir="somedir", recursive=True)
+ infos = fs.get_file_info(selector)
+ assert len(infos) == 3
+ assert infos[0].path == "somedir/file1"
+ assert infos[1].path == "somedir/subdir1"
+ assert infos[2].path == "somedir/subdir1/file2"
+
+ selector = FileSelector(base_dir="notfound")
+ with pytest.raises(FileNotFoundError):
+ fs.get_file_info(selector)
+
+ selector = FileSelector(base_dir="notfound", allow_not_found=True)
+ assert fs.get_file_info(selector) == []
+
+
+def test_py_filesystem_ops():
+ handler = DummyHandler()
+ fs = PyFileSystem(handler)
+
+ fs.create_dir("recursive", recursive=True)
+ fs.create_dir("non-recursive", recursive=False)
+ with pytest.raises(IOError):
+ fs.create_dir("foobar")
+
+ fs.delete_dir("delete_dir")
+ fs.delete_dir_contents("delete_dir_contents")
+ for path in ("", "/", "//"):
+ with pytest.raises(ValueError):
+ fs.delete_dir_contents(path)
+ fs.delete_dir_contents(path, accept_root_dir=True)
+ fs.delete_file("delete_file")
+ fs.move("move_from", "move_to")
+ fs.copy_file("copy_file_from", "copy_file_to")
+
+
+def test_py_open_input_stream():
+ fs = PyFileSystem(DummyHandler())
+
+ with fs.open_input_stream("somefile") as f:
+ assert f.read() == b"somefile:input_stream"
+ with pytest.raises(FileNotFoundError):
+ fs.open_input_stream("notfound")
+
+
+def test_py_open_input_file():
+ fs = PyFileSystem(DummyHandler())
+
+ with fs.open_input_file("somefile") as f:
+ assert f.read() == b"somefile:input_file"
+ with pytest.raises(FileNotFoundError):
+ fs.open_input_file("notfound")
+
+
+def test_py_open_output_stream():
+ fs = PyFileSystem(DummyHandler())
+
+ with fs.open_output_stream("somefile") as f:
+ f.write(b"data")
+
+
+@pytest.mark.filterwarnings("ignore::FutureWarning")
+def test_py_open_append_stream():
+ fs = PyFileSystem(DummyHandler())
+
+ with fs.open_append_stream("somefile") as f:
+ f.write(b"data")
+
+
+@pytest.mark.s3
+def test_s3_real_aws():
+ # Exercise connection code with an AWS-backed S3 bucket.
+ # This is a minimal integration check for ARROW-9261 and similar issues.
+ from pyarrow.fs import S3FileSystem
+ default_region = (os.environ.get('PYARROW_TEST_S3_REGION') or
+ 'us-east-1')
+ fs = S3FileSystem(anonymous=True)
+ assert fs.region == default_region
+
+ fs = S3FileSystem(anonymous=True, region='us-east-2')
+ entries = fs.get_file_info(FileSelector('ursa-labs-taxi-data'))
+ assert len(entries) > 0
+ with fs.open_input_stream('ursa-labs-taxi-data/2019/06/data.parquet') as f:
+ md = f.metadata()
+ assert 'Content-Type' in md
+ assert md['Last-Modified'] == b'2020-01-17T16:26:28Z'
+ # For some reason, the header value is quoted
+ # (both with AWS and Minio)
+ assert md['ETag'] == b'"f1efd5d76cb82861e1542117bfa52b90-8"'
+
+
+@pytest.mark.s3
+def test_s3_real_aws_region_selection():
+ # Taken from a registry of open S3-hosted datasets
+ # at https://github.com/awslabs/open-data-registry
+ fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt')
+ assert fs.region == 'eu-west-1'
+ with fs.open_input_stream(path) as f:
+ assert b"Meteo-France Atmospheric models on AWS" in f.read(50)
+
+ # Passing an explicit region disables auto-selection
+ fs, path = FileSystem.from_uri(
+ 's3://mf-nwp-models/README.txt?region=us-east-2')
+ assert fs.region == 'us-east-2'
+ # Reading from the wrong region may still work for public buckets...
+
+ # Non-existent bucket (hopefully, otherwise need to fix this test)
+ with pytest.raises(IOError, match="Bucket '.*' not found"):
+ FileSystem.from_uri('s3://x-arrow-non-existent-bucket')
+ fs, path = FileSystem.from_uri(
+ 's3://x-arrow-non-existent-bucket?region=us-east-3')
+ assert fs.region == 'us-east-3'
+
+
+@pytest.mark.s3
+def test_copy_files(s3_connection, s3fs, tempdir):
+ fs = s3fs["fs"]
+ pathfn = s3fs["pathfn"]
+
+ # create test file on S3 filesystem
+ path = pathfn('c.txt')
+ with fs.open_output_stream(path) as f:
+ f.write(b'test')
+
+ # create URI for created file
+ host, port, access_key, secret_key = s3_connection
+ source_uri = (
+ f"s3://{access_key}:{secret_key}@{path}"
+ f"?scheme=http&endpoint_override={host}:{port}"
+ )
+ # copy from S3 URI to local file
+ local_path1 = str(tempdir / "c_copied1.txt")
+ copy_files(source_uri, local_path1)
+
+ localfs = LocalFileSystem()
+ with localfs.open_input_stream(local_path1) as f:
+ assert f.read() == b"test"
+
+ # copy from S3 path+filesystem to local file
+ local_path2 = str(tempdir / "c_copied2.txt")
+ copy_files(path, local_path2, source_filesystem=fs)
+ with localfs.open_input_stream(local_path2) as f:
+ assert f.read() == b"test"
+
+ # copy to local file with URI
+ local_path3 = str(tempdir / "c_copied3.txt")
+ destination_uri = _filesystem_uri(local_path3) # file://
+ copy_files(source_uri, destination_uri)
+
+ with localfs.open_input_stream(local_path3) as f:
+ assert f.read() == b"test"
+
+ # copy to local file with path+filesystem
+ local_path4 = str(tempdir / "c_copied4.txt")
+ copy_files(source_uri, local_path4, destination_filesystem=localfs)
+
+ with localfs.open_input_stream(local_path4) as f:
+ assert f.read() == b"test"
+
+ # copy with additional options
+ local_path5 = str(tempdir / "c_copied5.txt")
+ copy_files(source_uri, local_path5, chunk_size=1, use_threads=False)
+
+ with localfs.open_input_stream(local_path5) as f:
+ assert f.read() == b"test"
+
+
+def test_copy_files_directory(tempdir):
+ localfs = LocalFileSystem()
+
+ # create source directory with 2 files
+ source_dir = tempdir / "source"
+ source_dir.mkdir()
+ with localfs.open_output_stream(str(source_dir / "file1")) as f:
+ f.write(b'test1')
+ with localfs.open_output_stream(str(source_dir / "file2")) as f:
+ f.write(b'test2')
+
+ def check_copied_files(destination_dir):
+ with localfs.open_input_stream(str(destination_dir / "file1")) as f:
+ assert f.read() == b"test1"
+ with localfs.open_input_stream(str(destination_dir / "file2")) as f:
+ assert f.read() == b"test2"
+
+ # Copy directory with local file paths
+ destination_dir1 = tempdir / "destination1"
+ # TODO need to create?
+ destination_dir1.mkdir()
+ copy_files(str(source_dir), str(destination_dir1))
+ check_copied_files(destination_dir1)
+
+ # Copy directory with path+filesystem
+ destination_dir2 = tempdir / "destination2"
+ destination_dir2.mkdir()
+ copy_files(str(source_dir), str(destination_dir2),
+ source_filesystem=localfs, destination_filesystem=localfs)
+ check_copied_files(destination_dir2)
+
+ # Copy directory with URI
+ destination_dir3 = tempdir / "destination3"
+ destination_dir3.mkdir()
+ source_uri = _filesystem_uri(str(source_dir)) # file://
+ destination_uri = _filesystem_uri(str(destination_dir3))
+ copy_files(source_uri, destination_uri)
+ check_copied_files(destination_dir3)
+
+ # Copy directory with Path objects
+ destination_dir4 = tempdir / "destination4"
+ destination_dir4.mkdir()
+ copy_files(source_dir, destination_dir4)
+ check_copied_files(destination_dir4)
+
+ # copy with additional non-default options
+ destination_dir5 = tempdir / "destination5"
+ destination_dir5.mkdir()
+ copy_files(source_dir, destination_dir5, chunk_size=1, use_threads=False)
+ check_copied_files(destination_dir5)