# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import os import posixpath import sys import urllib.parse import warnings from os.path import join as pjoin import pyarrow as pa from pyarrow.util import implements, _stringify_path, _is_path_like, _DEPR_MSG _FS_DEPR_MSG = _DEPR_MSG.format( "filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem" ) class FileSystem: """ Abstract filesystem interface. """ def cat(self, path): """ Return contents of file as a bytes object. Parameters ---------- path : str File path to read content from. Returns ------- contents : bytes """ with self.open(path, 'rb') as f: return f.read() def ls(self, path): """ Return list of file paths. Parameters ---------- path : str Directory to list contents from. """ raise NotImplementedError def delete(self, path, recursive=False): """ Delete the indicated file or directory. Parameters ---------- path : str Path to delete. recursive : bool, default False If True, also delete child paths for directories. """ raise NotImplementedError def disk_usage(self, path): """ Compute bytes used by all contents under indicated path in file tree. Parameters ---------- path : str Can be a file path or directory. Returns ------- usage : int """ path = _stringify_path(path) path_info = self.stat(path) if path_info['kind'] == 'file': return path_info['size'] total = 0 for root, directories, files in self.walk(path): for child_path in files: abspath = self._path_join(root, child_path) total += self.stat(abspath)['size'] return total def _path_join(self, *args): return self.pathsep.join(args) def stat(self, path): """ Information about a filesystem entry. Returns ------- stat : dict """ raise NotImplementedError('FileSystem.stat') def rm(self, path, recursive=False): """ Alias for FileSystem.delete. """ return self.delete(path, recursive=recursive) def mv(self, path, new_path): """ Alias for FileSystem.rename. """ return self.rename(path, new_path) def rename(self, path, new_path): """ Rename file, like UNIX mv command. Parameters ---------- path : str Path to alter. new_path : str Path to move to. """ raise NotImplementedError('FileSystem.rename') def mkdir(self, path, create_parents=True): """ Create a directory. Parameters ---------- path : str Path to the directory. create_parents : bool, default True If the parent directories don't exists create them as well. """ raise NotImplementedError def exists(self, path): """ Return True if path exists. Parameters ---------- path : str Path to check. """ raise NotImplementedError def isdir(self, path): """ Return True if path is a directory. Parameters ---------- path : str Path to check. """ raise NotImplementedError def isfile(self, path): """ Return True if path is a file. Parameters ---------- path : str Path to check. """ raise NotImplementedError def _isfilestore(self): """ Returns True if this FileSystem is a unix-style file store with directories. """ raise NotImplementedError def read_parquet(self, path, columns=None, metadata=None, schema=None, use_threads=True, use_pandas_metadata=False): """ Read Parquet data from path in file system. Can read from a single file or a directory of files. Parameters ---------- path : str Single file path or directory columns : List[str], optional Subset of columns to read. metadata : pyarrow.parquet.FileMetaData Known metadata to validate files against. schema : pyarrow.parquet.Schema Known schema to validate files against. Alternative to metadata argument. use_threads : bool, default True Perform multi-threaded column reads. use_pandas_metadata : bool, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded. Returns ------- table : pyarrow.Table """ from pyarrow.parquet import ParquetDataset dataset = ParquetDataset(path, schema=schema, metadata=metadata, filesystem=self) return dataset.read(columns=columns, use_threads=use_threads, use_pandas_metadata=use_pandas_metadata) def open(self, path, mode='rb'): """ Open file for reading or writing. """ raise NotImplementedError @property def pathsep(self): return '/' class LocalFileSystem(FileSystem): _instance = None def __init__(self): warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) super().__init__() @classmethod def _get_instance(cls): if cls._instance is None: with warnings.catch_warnings(): warnings.simplefilter("ignore") cls._instance = LocalFileSystem() return cls._instance @classmethod def get_instance(cls): warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) return cls._get_instance() @implements(FileSystem.ls) def ls(self, path): path = _stringify_path(path) return sorted(pjoin(path, x) for x in os.listdir(path)) @implements(FileSystem.mkdir) def mkdir(self, path, create_parents=True): path = _stringify_path(path) if create_parents: os.makedirs(path) else: os.mkdir(path) @implements(FileSystem.isdir) def isdir(self, path): path = _stringify_path(path) return os.path.isdir(path) @implements(FileSystem.isfile) def isfile(self, path): path = _stringify_path(path) return os.path.isfile(path) @implements(FileSystem._isfilestore) def _isfilestore(self): return True @implements(FileSystem.exists) def exists(self, path): path = _stringify_path(path) return os.path.exists(path) @implements(FileSystem.open) def open(self, path, mode='rb'): """ Open file for reading or writing. """ path = _stringify_path(path) return open(path, mode=mode) @property def pathsep(self): return os.path.sep def walk(self, path): """ Directory tree generator, see os.walk. """ path = _stringify_path(path) return os.walk(path) class DaskFileSystem(FileSystem): """ Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc. """ def __init__(self, fs): warnings.warn( "The pyarrow.filesystem.DaskFileSystem/S3FSWrapper are deprecated " "as of pyarrow 3.0.0, and will be removed in a future version.", FutureWarning, stacklevel=2) self.fs = fs @implements(FileSystem.isdir) def isdir(self, path): raise NotImplementedError("Unsupported file system API") @implements(FileSystem.isfile) def isfile(self, path): raise NotImplementedError("Unsupported file system API") @implements(FileSystem._isfilestore) def _isfilestore(self): """ Object Stores like S3 and GCSFS are based on key lookups, not true file-paths. """ return False @implements(FileSystem.delete) def delete(self, path, recursive=False): path = _stringify_path(path) return self.fs.rm(path, recursive=recursive) @implements(FileSystem.exists) def exists(self, path): path = _stringify_path(path) return self.fs.exists(path) @implements(FileSystem.mkdir) def mkdir(self, path, create_parents=True): path = _stringify_path(path) if create_parents: return self.fs.mkdirs(path) else: return self.fs.mkdir(path) @implements(FileSystem.open) def open(self, path, mode='rb'): """ Open file for reading or writing. """ path = _stringify_path(path) return self.fs.open(path, mode=mode) def ls(self, path, detail=False): path = _stringify_path(path) return self.fs.ls(path, detail=detail) def walk(self, path): """ Directory tree generator, like os.walk. """ path = _stringify_path(path) return self.fs.walk(path) class S3FSWrapper(DaskFileSystem): @implements(FileSystem.isdir) def isdir(self, path): path = _sanitize_s3(_stringify_path(path)) try: contents = self.fs.ls(path) if len(contents) == 1 and contents[0] == path: return False else: return True except OSError: return False @implements(FileSystem.isfile) def isfile(self, path): path = _sanitize_s3(_stringify_path(path)) try: contents = self.fs.ls(path) return len(contents) == 1 and contents[0] == path except OSError: return False def walk(self, path, refresh=False): """ Directory tree generator, like os.walk. Generator version of what is in s3fs, which yields a flattened list of files. """ path = _sanitize_s3(_stringify_path(path)) directories = set() files = set() for key in list(self.fs._ls(path, refresh=refresh)): path = key['Key'] if key['StorageClass'] == 'DIRECTORY': directories.add(path) elif key['StorageClass'] == 'BUCKET': pass else: files.add(path) # s3fs creates duplicate 'DIRECTORY' entries files = sorted([posixpath.split(f)[1] for f in files if f not in directories]) directories = sorted([posixpath.split(x)[1] for x in directories]) yield path, directories, files for directory in directories: yield from self.walk(directory, refresh=refresh) def _sanitize_s3(path): if path.startswith('s3://'): return path.replace('s3://', '') else: return path def _ensure_filesystem(fs): fs_type = type(fs) # If the arrow filesystem was subclassed, assume it supports the full # interface and return it if not issubclass(fs_type, FileSystem): if "fsspec" in sys.modules: fsspec = sys.modules["fsspec"] if isinstance(fs, fsspec.AbstractFileSystem): # for recent fsspec versions that stop inheriting from # pyarrow.filesystem.FileSystem, still allow fsspec # filesystems (which should be compatible with our legacy fs) return fs raise OSError('Unrecognized filesystem: {}'.format(fs_type)) else: return fs def resolve_filesystem_and_path(where, filesystem=None): """ Return filesystem from path which could be an HDFS URI, a local URI, or a plain filesystem path. """ if not _is_path_like(where): if filesystem is not None: raise ValueError("filesystem passed but where is file-like, so" " there is nothing to open with filesystem.") return filesystem, where if filesystem is not None: filesystem = _ensure_filesystem(filesystem) if isinstance(filesystem, LocalFileSystem): path = _stringify_path(where) elif not isinstance(where, str): raise TypeError( "Expected string path; path-like objects are only allowed " "with a local filesystem" ) else: path = where return filesystem, path path = _stringify_path(where) parsed_uri = urllib.parse.urlparse(path) if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs': # Input is hdfs URI such as hdfs://host:port/myfile.parquet netloc_split = parsed_uri.netloc.split(':') host = netloc_split[0] if host == '': host = 'default' else: host = parsed_uri.scheme + "://" + host port = 0 if len(netloc_split) == 2 and netloc_split[1].isnumeric(): port = int(netloc_split[1]) fs = pa.hdfs._connect(host=host, port=port) fs_path = parsed_uri.path elif parsed_uri.scheme == 'file': # Input is local URI such as file:///home/user/myfile.parquet fs = LocalFileSystem._get_instance() fs_path = parsed_uri.path else: # Input is local path such as /home/user/myfile.parquet fs = LocalFileSystem._get_instance() fs_path = path return fs, fs_path