# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ FileSystem abstraction to interact with various local and remote filesystems. """ from pyarrow.util import _is_path_like, _stringify_path from pyarrow._fs import ( # noqa FileSelector, FileType, FileInfo, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, FileSystemHandler, PyFileSystem, _copy_files, _copy_files_selector, ) # For backward compatibility. FileStats = FileInfo _not_imported = [] try: from pyarrow._hdfs import HadoopFileSystem # noqa except ImportError: _not_imported.append("HadoopFileSystem") try: from pyarrow._s3fs import ( # noqa S3FileSystem, S3LogLevel, initialize_s3, finalize_s3) except ImportError: _not_imported.append("S3FileSystem") else: initialize_s3() def __getattr__(name): if name in _not_imported: raise ImportError( "The pyarrow installation is not built with support for " "'{0}'".format(name) ) raise AttributeError( "module 'pyarrow.fs' has no attribute '{0}'".format(name) ) def _filesystem_from_str(uri): # instantiate the file system from an uri, if the uri has a path # component then it will be treated as a path prefix filesystem, prefix = FileSystem.from_uri(uri) prefix = filesystem.normalize_path(prefix) if prefix: # validate that the prefix is pointing to a directory prefix_info = filesystem.get_file_info([prefix])[0] if prefix_info.type != FileType.Directory: raise ValueError( "The path component of the filesystem URI must point to a " "directory but it has a type: `{}`. The path component " "is `{}` and the given filesystem URI is `{}`".format( prefix_info.type.name, prefix_info.path, uri ) ) filesystem = SubTreeFileSystem(prefix, filesystem) return filesystem def _ensure_filesystem( filesystem, use_mmap=False, allow_legacy_filesystem=False ): if isinstance(filesystem, FileSystem): return filesystem elif isinstance(filesystem, str): if use_mmap: raise ValueError( "Specifying to use memory mapping not supported for " "filesytem specified as an URI string" ) return _filesystem_from_str(filesystem) # handle fsspec-compatible filesystems try: import fsspec except ImportError: pass else: if isinstance(filesystem, fsspec.AbstractFileSystem): if type(filesystem).__name__ == 'LocalFileSystem': # In case its a simple LocalFileSystem, use native arrow one return LocalFileSystem(use_mmap=use_mmap) return PyFileSystem(FSSpecHandler(filesystem)) # map old filesystems to new ones import pyarrow.filesystem as legacyfs if isinstance(filesystem, legacyfs.LocalFileSystem): return LocalFileSystem(use_mmap=use_mmap) # TODO handle HDFS? if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem): return filesystem raise TypeError( "Unrecognized filesystem: {}. `filesystem` argument must be a " "FileSystem instance or a valid file system URI'".format( type(filesystem)) ) def _resolve_filesystem_and_path( path, filesystem=None, allow_legacy_filesystem=False ): """ Return filesystem/path from path which could be an URI or a plain filesystem path. """ if not _is_path_like(path): if filesystem is not None: raise ValueError( "'filesystem' passed but the specified path is file-like, so" " there is nothing to open with 'filesystem'." ) return filesystem, path if filesystem is not None: filesystem = _ensure_filesystem( filesystem, allow_legacy_filesystem=allow_legacy_filesystem ) if isinstance(filesystem, LocalFileSystem): path = _stringify_path(path) elif not isinstance(path, str): raise TypeError( "Expected string path; path-like objects are only allowed " "with a local filesystem" ) if not allow_legacy_filesystem: path = filesystem.normalize_path(path) return filesystem, path path = _stringify_path(path) # if filesystem is not given, try to automatically determine one # first check if the file exists as a local (relative) file path # if not then try to parse the path as an URI filesystem = LocalFileSystem() try: file_info = filesystem.get_file_info(path) except OSError: file_info = None exists_locally = False else: exists_locally = (file_info.type != FileType.NotFound) # if the file or directory doesn't exists locally, then assume that # the path is an URI describing the file system as well if not exists_locally: try: filesystem, path = FileSystem.from_uri(path) except ValueError as e: # neither an URI nor a locally existing path, so assume that # local path was given and propagate a nicer file not found error # instead of a more confusing scheme parsing error if "empty scheme" not in str(e): raise else: path = filesystem.normalize_path(path) return filesystem, path def copy_files(source, destination, source_filesystem=None, destination_filesystem=None, *, chunk_size=1024*1024, use_threads=True): """ Copy files between FileSystems. This functions allows you to recursively copy directories of files from one file system to another, such as from S3 to your local machine. Parameters ---------- source : string Source file path or URI to a single file or directory. If a directory, files will be copied recursively from this path. destination : string Destination file path or URI. If `source` is a file, `destination` is also interpreted as the destination file (not directory). Directories will be created as necessary. source_filesystem : FileSystem, optional Source filesystem, needs to be specified if `source` is not a URI, otherwise inferred. destination_filesystem : FileSystem, optional Destination filesystem, needs to be specified if `destination` is not a URI, otherwise inferred. chunk_size : int, default 1MB The maximum size of block to read before flushing to the destination file. A larger chunk_size will use more memory while copying but may help accommodate high latency FileSystems. use_threads : bool, default True Whether to use multiple threads to accelerate copying. Examples -------- Copy an S3 bucket's files to a local directory: >>> copy_files("s3://your-bucket-name", "local-directory") Using a FileSystem object: >>> copy_files("your-bucket-name", "local-directory", ... source_filesystem=S3FileSystem(...)) """ source_fs, source_path = _resolve_filesystem_and_path( source, source_filesystem ) destination_fs, destination_path = _resolve_filesystem_and_path( destination, destination_filesystem ) file_info = source_fs.get_file_info(source_path) if file_info.type == FileType.Directory: source_sel = FileSelector(source_path, recursive=True) _copy_files_selector(source_fs, source_sel, destination_fs, destination_path, chunk_size, use_threads) else: _copy_files(source_fs, source_path, destination_fs, destination_path, chunk_size, use_threads) class FSSpecHandler(FileSystemHandler): """ Handler for fsspec-based Python filesystems. https://filesystem-spec.readthedocs.io/en/latest/index.html Parameters ---------- fs : The FSSpec-compliant filesystem instance. Examples -------- >>> PyFileSystem(FSSpecHandler(fsspec_fs)) """ def __init__(self, fs): self.fs = fs def __eq__(self, other): if isinstance(other, FSSpecHandler): return self.fs == other.fs return NotImplemented def __ne__(self, other): if isinstance(other, FSSpecHandler): return self.fs != other.fs return NotImplemented def get_type_name(self): protocol = self.fs.protocol if isinstance(protocol, list): protocol = protocol[0] return "fsspec+{0}".format(protocol) def normalize_path(self, path): return path @staticmethod def _create_file_info(path, info): size = info["size"] if info["type"] == "file": ftype = FileType.File elif info["type"] == "directory": ftype = FileType.Directory # some fsspec filesystems include a file size for directories size = None else: ftype = FileType.Unknown return FileInfo(path, ftype, size=size, mtime=info.get("mtime", None)) def get_file_info(self, paths): infos = [] for path in paths: try: info = self.fs.info(path) except FileNotFoundError: infos.append(FileInfo(path, FileType.NotFound)) else: infos.append(self._create_file_info(path, info)) return infos def get_file_info_selector(self, selector): if not self.fs.isdir(selector.base_dir): if self.fs.exists(selector.base_dir): raise NotADirectoryError(selector.base_dir) else: if selector.allow_not_found: return [] else: raise FileNotFoundError(selector.base_dir) if selector.recursive: maxdepth = None else: maxdepth = 1 infos = [] selected_files = self.fs.find( selector.base_dir, maxdepth=maxdepth, withdirs=True, detail=True ) for path, info in selected_files.items(): infos.append(self._create_file_info(path, info)) return infos def create_dir(self, path, recursive): # mkdir also raises FileNotFoundError when base directory is not found try: self.fs.mkdir(path, create_parents=recursive) except FileExistsError: pass def delete_dir(self, path): self.fs.rm(path, recursive=True) def _delete_dir_contents(self, path): for subpath in self.fs.listdir(path, detail=False): if self.fs.isdir(subpath): self.fs.rm(subpath, recursive=True) elif self.fs.isfile(subpath): self.fs.rm(subpath) def delete_dir_contents(self, path): if path.strip("/") == "": raise ValueError( "delete_dir_contents called on path '", path, "'") self._delete_dir_contents(path) def delete_root_dir_contents(self): self._delete_dir_contents("/") def delete_file(self, path): # fs.rm correctly raises IsADirectoryError when `path` is a directory # instead of a file and `recursive` is not set to True if not self.fs.exists(path): raise FileNotFoundError(path) self.fs.rm(path) def move(self, src, dest): self.fs.mv(src, dest, recursive=True) def copy_file(self, src, dest): # fs.copy correctly raises IsADirectoryError when `src` is a directory # instead of a file self.fs.copy(src, dest) # TODO can we read/pass metadata (e.g. Content-Type) in the methods below? def open_input_stream(self, path): from pyarrow import PythonFile if not self.fs.isfile(path): raise FileNotFoundError(path) return PythonFile(self.fs.open(path, mode="rb"), mode="r") def open_input_file(self, path): from pyarrow import PythonFile if not self.fs.isfile(path): raise FileNotFoundError(path) return PythonFile(self.fs.open(path, mode="rb"), mode="r") def open_output_stream(self, path, metadata): from pyarrow import PythonFile return PythonFile(self.fs.open(path, mode="wb"), mode="w") def open_append_stream(self, path, metadata): from pyarrow import PythonFile return PythonFile(self.fs.open(path, mode="ab"), mode="w")