diff options
Diffstat (limited to 'src/arrow/python/pyarrow/filesystem.py')
-rw-r--r-- | src/arrow/python/pyarrow/filesystem.py | 511 |
1 files changed, 511 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/filesystem.py b/src/arrow/python/pyarrow/filesystem.py new file mode 100644 index 000000000..c2017e42b --- /dev/null +++ b/src/arrow/python/pyarrow/filesystem.py @@ -0,0 +1,511 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import os +import posixpath +import sys +import urllib.parse +import warnings + +from os.path import join as pjoin + +import pyarrow as pa +from pyarrow.util import implements, _stringify_path, _is_path_like, _DEPR_MSG + + +_FS_DEPR_MSG = _DEPR_MSG.format( + "filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem" +) + + +class FileSystem: + """ + Abstract filesystem interface. + """ + + def cat(self, path): + """ + Return contents of file as a bytes object. + + Parameters + ---------- + path : str + File path to read content from. + + Returns + ------- + contents : bytes + """ + with self.open(path, 'rb') as f: + return f.read() + + def ls(self, path): + """ + Return list of file paths. + + Parameters + ---------- + path : str + Directory to list contents from. + """ + raise NotImplementedError + + def delete(self, path, recursive=False): + """ + Delete the indicated file or directory. + + Parameters + ---------- + path : str + Path to delete. + recursive : bool, default False + If True, also delete child paths for directories. + """ + raise NotImplementedError + + def disk_usage(self, path): + """ + Compute bytes used by all contents under indicated path in file tree. + + Parameters + ---------- + path : str + Can be a file path or directory. + + Returns + ------- + usage : int + """ + path = _stringify_path(path) + path_info = self.stat(path) + if path_info['kind'] == 'file': + return path_info['size'] + + total = 0 + for root, directories, files in self.walk(path): + for child_path in files: + abspath = self._path_join(root, child_path) + total += self.stat(abspath)['size'] + + return total + + def _path_join(self, *args): + return self.pathsep.join(args) + + def stat(self, path): + """ + Information about a filesystem entry. + + Returns + ------- + stat : dict + """ + raise NotImplementedError('FileSystem.stat') + + def rm(self, path, recursive=False): + """ + Alias for FileSystem.delete. + """ + return self.delete(path, recursive=recursive) + + def mv(self, path, new_path): + """ + Alias for FileSystem.rename. + """ + return self.rename(path, new_path) + + def rename(self, path, new_path): + """ + Rename file, like UNIX mv command. + + Parameters + ---------- + path : str + Path to alter. + new_path : str + Path to move to. + """ + raise NotImplementedError('FileSystem.rename') + + def mkdir(self, path, create_parents=True): + """ + Create a directory. + + Parameters + ---------- + path : str + Path to the directory. + create_parents : bool, default True + If the parent directories don't exists create them as well. + """ + raise NotImplementedError + + def exists(self, path): + """ + Return True if path exists. + + Parameters + ---------- + path : str + Path to check. + """ + raise NotImplementedError + + def isdir(self, path): + """ + Return True if path is a directory. + + Parameters + ---------- + path : str + Path to check. + """ + raise NotImplementedError + + def isfile(self, path): + """ + Return True if path is a file. + + Parameters + ---------- + path : str + Path to check. + """ + raise NotImplementedError + + def _isfilestore(self): + """ + Returns True if this FileSystem is a unix-style file store with + directories. + """ + raise NotImplementedError + + def read_parquet(self, path, columns=None, metadata=None, schema=None, + use_threads=True, use_pandas_metadata=False): + """ + Read Parquet data from path in file system. Can read from a single file + or a directory of files. + + Parameters + ---------- + path : str + Single file path or directory + columns : List[str], optional + Subset of columns to read. + metadata : pyarrow.parquet.FileMetaData + Known metadata to validate files against. + schema : pyarrow.parquet.Schema + Known schema to validate files against. Alternative to metadata + argument. + use_threads : bool, default True + Perform multi-threaded column reads. + use_pandas_metadata : bool, default False + If True and file has custom pandas schema metadata, ensure that + index columns are also loaded. + + Returns + ------- + table : pyarrow.Table + """ + from pyarrow.parquet import ParquetDataset + dataset = ParquetDataset(path, schema=schema, metadata=metadata, + filesystem=self) + return dataset.read(columns=columns, use_threads=use_threads, + use_pandas_metadata=use_pandas_metadata) + + def open(self, path, mode='rb'): + """ + Open file for reading or writing. + """ + raise NotImplementedError + + @property + def pathsep(self): + return '/' + + +class LocalFileSystem(FileSystem): + + _instance = None + + def __init__(self): + warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) + super().__init__() + + @classmethod + def _get_instance(cls): + if cls._instance is None: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + cls._instance = LocalFileSystem() + return cls._instance + + @classmethod + def get_instance(cls): + warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) + return cls._get_instance() + + @implements(FileSystem.ls) + def ls(self, path): + path = _stringify_path(path) + return sorted(pjoin(path, x) for x in os.listdir(path)) + + @implements(FileSystem.mkdir) + def mkdir(self, path, create_parents=True): + path = _stringify_path(path) + if create_parents: + os.makedirs(path) + else: + os.mkdir(path) + + @implements(FileSystem.isdir) + def isdir(self, path): + path = _stringify_path(path) + return os.path.isdir(path) + + @implements(FileSystem.isfile) + def isfile(self, path): + path = _stringify_path(path) + return os.path.isfile(path) + + @implements(FileSystem._isfilestore) + def _isfilestore(self): + return True + + @implements(FileSystem.exists) + def exists(self, path): + path = _stringify_path(path) + return os.path.exists(path) + + @implements(FileSystem.open) + def open(self, path, mode='rb'): + """ + Open file for reading or writing. + """ + path = _stringify_path(path) + return open(path, mode=mode) + + @property + def pathsep(self): + return os.path.sep + + def walk(self, path): + """ + Directory tree generator, see os.walk. + """ + path = _stringify_path(path) + return os.walk(path) + + +class DaskFileSystem(FileSystem): + """ + Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc. + """ + + def __init__(self, fs): + warnings.warn( + "The pyarrow.filesystem.DaskFileSystem/S3FSWrapper are deprecated " + "as of pyarrow 3.0.0, and will be removed in a future version.", + FutureWarning, stacklevel=2) + self.fs = fs + + @implements(FileSystem.isdir) + def isdir(self, path): + raise NotImplementedError("Unsupported file system API") + + @implements(FileSystem.isfile) + def isfile(self, path): + raise NotImplementedError("Unsupported file system API") + + @implements(FileSystem._isfilestore) + def _isfilestore(self): + """ + Object Stores like S3 and GCSFS are based on key lookups, not true + file-paths. + """ + return False + + @implements(FileSystem.delete) + def delete(self, path, recursive=False): + path = _stringify_path(path) + return self.fs.rm(path, recursive=recursive) + + @implements(FileSystem.exists) + def exists(self, path): + path = _stringify_path(path) + return self.fs.exists(path) + + @implements(FileSystem.mkdir) + def mkdir(self, path, create_parents=True): + path = _stringify_path(path) + if create_parents: + return self.fs.mkdirs(path) + else: + return self.fs.mkdir(path) + + @implements(FileSystem.open) + def open(self, path, mode='rb'): + """ + Open file for reading or writing. + """ + path = _stringify_path(path) + return self.fs.open(path, mode=mode) + + def ls(self, path, detail=False): + path = _stringify_path(path) + return self.fs.ls(path, detail=detail) + + def walk(self, path): + """ + Directory tree generator, like os.walk. + """ + path = _stringify_path(path) + return self.fs.walk(path) + + +class S3FSWrapper(DaskFileSystem): + + @implements(FileSystem.isdir) + def isdir(self, path): + path = _sanitize_s3(_stringify_path(path)) + try: + contents = self.fs.ls(path) + if len(contents) == 1 and contents[0] == path: + return False + else: + return True + except OSError: + return False + + @implements(FileSystem.isfile) + def isfile(self, path): + path = _sanitize_s3(_stringify_path(path)) + try: + contents = self.fs.ls(path) + return len(contents) == 1 and contents[0] == path + except OSError: + return False + + def walk(self, path, refresh=False): + """ + Directory tree generator, like os.walk. + + Generator version of what is in s3fs, which yields a flattened list of + files. + """ + path = _sanitize_s3(_stringify_path(path)) + directories = set() + files = set() + + for key in list(self.fs._ls(path, refresh=refresh)): + path = key['Key'] + if key['StorageClass'] == 'DIRECTORY': + directories.add(path) + elif key['StorageClass'] == 'BUCKET': + pass + else: + files.add(path) + + # s3fs creates duplicate 'DIRECTORY' entries + files = sorted([posixpath.split(f)[1] for f in files + if f not in directories]) + directories = sorted([posixpath.split(x)[1] + for x in directories]) + + yield path, directories, files + + for directory in directories: + yield from self.walk(directory, refresh=refresh) + + +def _sanitize_s3(path): + if path.startswith('s3://'): + return path.replace('s3://', '') + else: + return path + + +def _ensure_filesystem(fs): + fs_type = type(fs) + + # If the arrow filesystem was subclassed, assume it supports the full + # interface and return it + if not issubclass(fs_type, FileSystem): + if "fsspec" in sys.modules: + fsspec = sys.modules["fsspec"] + if isinstance(fs, fsspec.AbstractFileSystem): + # for recent fsspec versions that stop inheriting from + # pyarrow.filesystem.FileSystem, still allow fsspec + # filesystems (which should be compatible with our legacy fs) + return fs + + raise OSError('Unrecognized filesystem: {}'.format(fs_type)) + else: + return fs + + +def resolve_filesystem_and_path(where, filesystem=None): + """ + Return filesystem from path which could be an HDFS URI, a local URI, + or a plain filesystem path. + """ + if not _is_path_like(where): + if filesystem is not None: + raise ValueError("filesystem passed but where is file-like, so" + " there is nothing to open with filesystem.") + return filesystem, where + + if filesystem is not None: + filesystem = _ensure_filesystem(filesystem) + if isinstance(filesystem, LocalFileSystem): + path = _stringify_path(where) + elif not isinstance(where, str): + raise TypeError( + "Expected string path; path-like objects are only allowed " + "with a local filesystem" + ) + else: + path = where + return filesystem, path + + path = _stringify_path(where) + + parsed_uri = urllib.parse.urlparse(path) + if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs': + # Input is hdfs URI such as hdfs://host:port/myfile.parquet + netloc_split = parsed_uri.netloc.split(':') + host = netloc_split[0] + if host == '': + host = 'default' + else: + host = parsed_uri.scheme + "://" + host + port = 0 + if len(netloc_split) == 2 and netloc_split[1].isnumeric(): + port = int(netloc_split[1]) + fs = pa.hdfs._connect(host=host, port=port) + fs_path = parsed_uri.path + elif parsed_uri.scheme == 'file': + # Input is local URI such as file:///home/user/myfile.parquet + fs = LocalFileSystem._get_instance() + fs_path = parsed_uri.path + else: + # Input is local path such as /home/user/myfile.parquet + fs = LocalFileSystem._get_instance() + fs_path = path + + return fs, fs_path |