summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/_fs.pyx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_fs.pyx
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_fs.pyx')
-rw-r--r--src/arrow/python/pyarrow/_fs.pyx1233
1 files changed, 1233 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_fs.pyx b/src/arrow/python/pyarrow/_fs.pyx
new file mode 100644
index 000000000..34a0ef55b
--- /dev/null
+++ b/src/arrow/python/pyarrow/_fs.pyx
@@ -0,0 +1,1233 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from cpython.datetime cimport datetime, PyDateTime_DateTime
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport PyDateTime_to_TimePoint
+from pyarrow.lib import _detect_compression, frombytes, tobytes
+from pyarrow.lib cimport *
+from pyarrow.util import _stringify_path
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timezone
+import os
+import pathlib
+import sys
+import warnings
+
+
+cdef _init_ca_paths():
+ cdef CFileSystemGlobalOptions options
+
+ import ssl
+ paths = ssl.get_default_verify_paths()
+ if paths.cafile:
+ options.tls_ca_file_path = os.fsencode(paths.cafile)
+ if paths.capath:
+ options.tls_ca_dir_path = os.fsencode(paths.capath)
+ check_status(CFileSystemsInitialize(options))
+
+
+if sys.platform == 'linux':
+ # ARROW-9261: On Linux, we may need to fixup the paths to TLS CA certs
+ # (especially in manylinux packages) since the values hardcoded at
+ # compile-time in libcurl may be wrong.
+ _init_ca_paths()
+
+
+cdef inline c_string _path_as_bytes(path) except *:
+ # handle only abstract paths, not bound to any filesystem like pathlib is,
+ # so we only accept plain strings
+ if not isinstance(path, (bytes, str)):
+ raise TypeError('Path must be a string')
+ # tobytes always uses utf-8, which is more or less ok, at least on Windows
+ # since the C++ side then decodes from utf-8. On Unix, os.fsencode may be
+ # better.
+ return tobytes(path)
+
+
+cdef object _wrap_file_type(CFileType ty):
+ return FileType(<int8_t> ty)
+
+
+cdef CFileType _unwrap_file_type(FileType ty) except *:
+ if ty == FileType.Unknown:
+ return CFileType_Unknown
+ elif ty == FileType.NotFound:
+ return CFileType_NotFound
+ elif ty == FileType.File:
+ return CFileType_File
+ elif ty == FileType.Directory:
+ return CFileType_Directory
+ assert 0
+
+
+cdef class FileInfo(_Weakrefable):
+ """
+ FileSystem entry info.
+
+ Parameters
+ ----------
+ path : str
+ The full path to the filesystem entry.
+ type : FileType
+ The type of the filesystem entry.
+ mtime : datetime or float, default None
+ If given, the modification time of the filesystem entry.
+ If a float is given, it is the number of seconds since the
+ Unix epoch.
+ mtime_ns : int, default None
+ If given, the modification time of the filesystem entry,
+ in nanoseconds since the Unix epoch.
+ `mtime` and `mtime_ns` are mutually exclusive.
+ size : int, default None
+ If given, the filesystem entry size in bytes. This should only
+ be given if `type` is `FileType.File`.
+
+ """
+
+ def __init__(self, path, FileType type=FileType.Unknown, *,
+ mtime=None, mtime_ns=None, size=None):
+ self.info.set_path(tobytes(path))
+ self.info.set_type(_unwrap_file_type(type))
+ if mtime is not None:
+ if mtime_ns is not None:
+ raise TypeError("Only one of mtime and mtime_ns "
+ "can be given")
+ if isinstance(mtime, datetime):
+ self.info.set_mtime(PyDateTime_to_TimePoint(
+ <PyDateTime_DateTime*> mtime))
+ else:
+ self.info.set_mtime(TimePoint_from_s(mtime))
+ elif mtime_ns is not None:
+ self.info.set_mtime(TimePoint_from_ns(mtime_ns))
+ if size is not None:
+ self.info.set_size(size)
+
+ @staticmethod
+ cdef wrap(CFileInfo info):
+ cdef FileInfo self = FileInfo.__new__(FileInfo)
+ self.info = move(info)
+ return self
+
+ cdef inline CFileInfo unwrap(self) nogil:
+ return self.info
+
+ @staticmethod
+ cdef CFileInfo unwrap_safe(obj):
+ if not isinstance(obj, FileInfo):
+ raise TypeError("Expected FileInfo instance, got {0}"
+ .format(type(obj)))
+ return (<FileInfo> obj).unwrap()
+
+ def __repr__(self):
+ def getvalue(attr):
+ try:
+ return getattr(self, attr)
+ except ValueError:
+ return ''
+
+ s = '<FileInfo for {!r}: type={}'.format(self.path, str(self.type))
+ if self.is_file:
+ s += ', size={}'.format(self.size)
+ s += '>'
+ return s
+
+ @property
+ def type(self):
+ """
+ Type of the file.
+
+ The returned enum values can be the following:
+
+ - FileType.NotFound: target does not exist
+ - FileType.Unknown: target exists but its type is unknown (could be a
+ special file such as a Unix socket or character device, or
+ Windows NUL / CON / ...)
+ - FileType.File: target is a regular file
+ - FileType.Directory: target is a regular directory
+
+ Returns
+ -------
+ type : FileType
+ """
+ return _wrap_file_type(self.info.type())
+
+ @property
+ def is_file(self):
+ """
+ """
+ return self.type == FileType.File
+
+ @property
+ def path(self):
+ """
+ The full file path in the filesystem.
+ """
+ return frombytes(self.info.path())
+
+ @property
+ def base_name(self):
+ """
+ The file base name.
+
+ Component after the last directory separator.
+ """
+ return frombytes(self.info.base_name())
+
+ @property
+ def size(self):
+ """
+ The size in bytes, if available.
+
+ Only regular files are guaranteed to have a size.
+
+ Returns
+ -------
+ size : int or None
+ """
+ cdef int64_t size
+ size = self.info.size()
+ return (size if size != -1 else None)
+
+ @property
+ def extension(self):
+ """
+ The file extension.
+ """
+ return frombytes(self.info.extension())
+
+ @property
+ def mtime(self):
+ """
+ The time of last modification, if available.
+
+ Returns
+ -------
+ mtime : datetime.datetime or None
+ """
+ cdef int64_t nanoseconds
+ nanoseconds = TimePoint_to_ns(self.info.mtime())
+ return (datetime.fromtimestamp(nanoseconds / 1.0e9, timezone.utc)
+ if nanoseconds != -1 else None)
+
+ @property
+ def mtime_ns(self):
+ """
+ The time of last modification, if available, expressed in nanoseconds
+ since the Unix epoch.
+
+ Returns
+ -------
+ mtime_ns : int or None
+ """
+ cdef int64_t nanoseconds
+ nanoseconds = TimePoint_to_ns(self.info.mtime())
+ return (nanoseconds if nanoseconds != -1 else None)
+
+
+cdef class FileSelector(_Weakrefable):
+ """
+ File and directory selector.
+
+ It contains a set of options that describes how to search for files and
+ directories.
+
+ Parameters
+ ----------
+ base_dir : str
+ The directory in which to select files. Relative paths also work, use
+ '.' for the current directory and '..' for the parent.
+ allow_not_found : bool, default False
+ The behavior if `base_dir` doesn't exist in the filesystem.
+ If false, an error is returned.
+ If true, an empty selection is returned.
+ recursive : bool, default False
+ Whether to recurse into subdirectories.
+ """
+
+ def __init__(self, base_dir, bint allow_not_found=False,
+ bint recursive=False):
+ self.base_dir = base_dir
+ self.recursive = recursive
+ self.allow_not_found = allow_not_found
+
+ @staticmethod
+ cdef FileSelector wrap(CFileSelector wrapped):
+ cdef FileSelector self = FileSelector.__new__(FileSelector)
+ self.selector = move(wrapped)
+ return self
+
+ cdef inline CFileSelector unwrap(self) nogil:
+ return self.selector
+
+ @property
+ def base_dir(self):
+ return frombytes(self.selector.base_dir)
+
+ @base_dir.setter
+ def base_dir(self, base_dir):
+ self.selector.base_dir = _path_as_bytes(base_dir)
+
+ @property
+ def allow_not_found(self):
+ return self.selector.allow_not_found
+
+ @allow_not_found.setter
+ def allow_not_found(self, bint allow_not_found):
+ self.selector.allow_not_found = allow_not_found
+
+ @property
+ def recursive(self):
+ return self.selector.recursive
+
+ @recursive.setter
+ def recursive(self, bint recursive):
+ self.selector.recursive = recursive
+
+ def __repr__(self):
+ return ("<FileSelector base_dir={0.base_dir!r} "
+ "recursive={0.recursive}>".format(self))
+
+
+cdef class FileSystem(_Weakrefable):
+ """
+ Abstract file system API.
+ """
+
+ def __init__(self):
+ raise TypeError("FileSystem is an abstract class, instantiate one of "
+ "the subclasses instead: LocalFileSystem or "
+ "SubTreeFileSystem")
+
+ @staticmethod
+ def from_uri(uri):
+ """
+ Create a new FileSystem from URI or Path.
+
+ Recognized URI schemes are "file", "mock", "s3fs", "hdfs" and "viewfs".
+ In addition, the argument can be a pathlib.Path object, or a string
+ describing an absolute local path.
+
+ Parameters
+ ----------
+ uri : string
+ URI-based path, for example: file:///some/local/path.
+
+ Returns
+ -------
+ With (filesystem, path) tuple where path is the abstract path inside
+ the FileSystem instance.
+ """
+ cdef:
+ c_string path
+ CResult[shared_ptr[CFileSystem]] result
+
+ if isinstance(uri, pathlib.Path):
+ # Make absolute
+ uri = uri.resolve().absolute()
+ uri = _stringify_path(uri)
+ result = CFileSystemFromUriOrPath(tobytes(uri), &path)
+ return FileSystem.wrap(GetResultValue(result)), frombytes(path)
+
+ cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+ self.wrapped = wrapped
+ self.fs = wrapped.get()
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CFileSystem]& sp):
+ cdef FileSystem self
+
+ typ = frombytes(sp.get().type_name())
+ if typ == 'local':
+ self = LocalFileSystem.__new__(LocalFileSystem)
+ elif typ == 'mock':
+ self = _MockFileSystem.__new__(_MockFileSystem)
+ elif typ == 'subtree':
+ self = SubTreeFileSystem.__new__(SubTreeFileSystem)
+ elif typ == 's3':
+ from pyarrow._s3fs import S3FileSystem
+ self = S3FileSystem.__new__(S3FileSystem)
+ elif typ == 'hdfs':
+ from pyarrow._hdfs import HadoopFileSystem
+ self = HadoopFileSystem.__new__(HadoopFileSystem)
+ elif typ.startswith('py::'):
+ self = PyFileSystem.__new__(PyFileSystem)
+ else:
+ raise TypeError('Cannot wrap FileSystem pointer')
+
+ self.init(sp)
+ return self
+
+ cdef inline shared_ptr[CFileSystem] unwrap(self) nogil:
+ return self.wrapped
+
+ def equals(self, FileSystem other):
+ return self.fs.Equals(other.unwrap())
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ @property
+ def type_name(self):
+ """
+ The filesystem's type name.
+ """
+ return frombytes(self.fs.type_name())
+
+ def get_file_info(self, paths_or_selector):
+ """
+ Get info for the given files.
+
+ Any symlink is automatically dereferenced, recursively. A non-existing
+ or unreachable file returns a FileStat object and has a FileType of
+ value NotFound. An exception indicates a truly exceptional condition
+ (low-level I/O error, etc.).
+
+ Parameters
+ ----------
+ paths_or_selector: FileSelector, path-like or list of path-likes
+ Either a selector object, a path-like object or a list of
+ path-like objects. The selector's base directory will not be
+ part of the results, even if it exists. If it doesn't exist,
+ use `allow_not_found`.
+
+ Returns
+ -------
+ FileInfo or list of FileInfo
+ Single FileInfo object is returned for a single path, otherwise
+ a list of FileInfo objects is returned.
+ """
+ cdef:
+ CFileInfo info
+ c_string path
+ vector[CFileInfo] infos
+ vector[c_string] paths
+ CFileSelector selector
+
+ if isinstance(paths_or_selector, FileSelector):
+ with nogil:
+ selector = (<FileSelector>paths_or_selector).selector
+ infos = GetResultValue(self.fs.GetFileInfo(selector))
+ elif isinstance(paths_or_selector, (list, tuple)):
+ paths = [_path_as_bytes(s) for s in paths_or_selector]
+ with nogil:
+ infos = GetResultValue(self.fs.GetFileInfo(paths))
+ elif isinstance(paths_or_selector, (bytes, str)):
+ path =_path_as_bytes(paths_or_selector)
+ with nogil:
+ info = GetResultValue(self.fs.GetFileInfo(path))
+ return FileInfo.wrap(info)
+ else:
+ raise TypeError('Must pass either path(s) or a FileSelector')
+
+ return [FileInfo.wrap(info) for info in infos]
+
+ def create_dir(self, path, *, bint recursive=True):
+ """
+ Create a directory and subdirectories.
+
+ This function succeeds if the directory already exists.
+
+ Parameters
+ ----------
+ path : str
+ The path of the new directory.
+ recursive: bool, default True
+ Create nested directories as well.
+ """
+ cdef c_string directory = _path_as_bytes(path)
+ with nogil:
+ check_status(self.fs.CreateDir(directory, recursive=recursive))
+
+ def delete_dir(self, path):
+ """Delete a directory and its contents, recursively.
+
+ Parameters
+ ----------
+ path : str
+ The path of the directory to be deleted.
+ """
+ cdef c_string directory = _path_as_bytes(path)
+ with nogil:
+ check_status(self.fs.DeleteDir(directory))
+
+ def delete_dir_contents(self, path, *, bint accept_root_dir=False):
+ """Delete a directory's contents, recursively.
+
+ Like delete_dir, but doesn't delete the directory itself.
+
+ Parameters
+ ----------
+ path : str
+ The path of the directory to be deleted.
+ accept_root_dir : boolean, default False
+ Allow deleting the root directory's contents
+ (if path is empty or "/")
+ """
+ cdef c_string directory = _path_as_bytes(path)
+ if accept_root_dir and directory.strip(b"/") == b"":
+ with nogil:
+ check_status(self.fs.DeleteRootDirContents())
+ else:
+ with nogil:
+ check_status(self.fs.DeleteDirContents(directory))
+
+ def move(self, src, dest):
+ """
+ Move / rename a file or directory.
+
+ If the destination exists:
+ - if it is a non-empty directory, an error is returned
+ - otherwise, if it has the same type as the source, it is replaced
+ - otherwise, behavior is unspecified (implementation-dependent).
+
+ Parameters
+ ----------
+ src : str
+ The path of the file or the directory to be moved.
+ dest : str
+ The destination path where the file or directory is moved to.
+ """
+ cdef:
+ c_string source = _path_as_bytes(src)
+ c_string destination = _path_as_bytes(dest)
+ with nogil:
+ check_status(self.fs.Move(source, destination))
+
+ def copy_file(self, src, dest):
+ """
+ Copy a file.
+
+ If the destination exists and is a directory, an error is returned.
+ Otherwise, it is replaced.
+
+ Parameters
+ ----------
+ src : str
+ The path of the file to be copied from.
+ dest : str
+ The destination path where the file is copied to.
+ """
+ cdef:
+ c_string source = _path_as_bytes(src)
+ c_string destination = _path_as_bytes(dest)
+ with nogil:
+ check_status(self.fs.CopyFile(source, destination))
+
+ def delete_file(self, path):
+ """
+ Delete a file.
+
+ Parameters
+ ----------
+ path : str
+ The path of the file to be deleted.
+ """
+ cdef c_string file = _path_as_bytes(path)
+ with nogil:
+ check_status(self.fs.DeleteFile(file))
+
+ def _wrap_input_stream(self, stream, path, compression, buffer_size):
+ if buffer_size is not None and buffer_size != 0:
+ stream = BufferedInputStream(stream, buffer_size)
+ if compression == 'detect':
+ compression = _detect_compression(path)
+ if compression is not None:
+ stream = CompressedInputStream(stream, compression)
+ return stream
+
+ def _wrap_output_stream(self, stream, path, compression, buffer_size):
+ if buffer_size is not None and buffer_size != 0:
+ stream = BufferedOutputStream(stream, buffer_size)
+ if compression == 'detect':
+ compression = _detect_compression(path)
+ if compression is not None:
+ stream = CompressedOutputStream(stream, compression)
+ return stream
+
+ def open_input_file(self, path):
+ """
+ Open an input file for random access reading.
+
+ Parameters
+ ----------
+ path : str
+ The source to open for reading.
+
+ Returns
+ -------
+ stram : NativeFile
+ """
+ cdef:
+ c_string pathstr = _path_as_bytes(path)
+ NativeFile stream = NativeFile()
+ shared_ptr[CRandomAccessFile] in_handle
+
+ with nogil:
+ in_handle = GetResultValue(self.fs.OpenInputFile(pathstr))
+
+ stream.set_random_access_file(in_handle)
+ stream.is_readable = True
+ return stream
+
+ def open_input_stream(self, path, compression='detect', buffer_size=None):
+ """
+ Open an input stream for sequential reading.
+
+ Parameters
+ ----------
+ source : str
+ The source to open for reading.
+ compression : str optional, default 'detect'
+ The compression algorithm to use for on-the-fly decompression.
+ If "detect" and source is a file path, then compression will be
+ chosen based on the file extension.
+ If None, no compression will be applied. Otherwise, a well-known
+ algorithm name must be supplied (e.g. "gzip").
+ buffer_size : int optional, default None
+ If None or 0, no buffering will happen. Otherwise the size of the
+ temporary read buffer.
+
+ Returns
+ -------
+ stream : NativeFile
+ """
+ cdef:
+ c_string pathstr = _path_as_bytes(path)
+ NativeFile stream = NativeFile()
+ shared_ptr[CInputStream] in_handle
+
+ with nogil:
+ in_handle = GetResultValue(self.fs.OpenInputStream(pathstr))
+
+ stream.set_input_stream(in_handle)
+ stream.is_readable = True
+
+ return self._wrap_input_stream(
+ stream, path=path, compression=compression, buffer_size=buffer_size
+ )
+
+ def open_output_stream(self, path, compression='detect',
+ buffer_size=None, metadata=None):
+ """
+ Open an output stream for sequential writing.
+
+ If the target already exists, existing data is truncated.
+
+ Parameters
+ ----------
+ path : str
+ The source to open for writing.
+ compression : str optional, default 'detect'
+ The compression algorithm to use for on-the-fly compression.
+ If "detect" and source is a file path, then compression will be
+ chosen based on the file extension.
+ If None, no compression will be applied. Otherwise, a well-known
+ algorithm name must be supplied (e.g. "gzip").
+ buffer_size : int optional, default None
+ If None or 0, no buffering will happen. Otherwise the size of the
+ temporary write buffer.
+ metadata : dict optional, default None
+ If not None, a mapping of string keys to string values.
+ Some filesystems support storing metadata along the file
+ (such as "Content-Type").
+ Unsupported metadata keys will be ignored.
+
+ Returns
+ -------
+ stream : NativeFile
+ """
+ cdef:
+ c_string pathstr = _path_as_bytes(path)
+ NativeFile stream = NativeFile()
+ shared_ptr[COutputStream] out_handle
+ shared_ptr[const CKeyValueMetadata] c_metadata
+
+ if metadata is not None:
+ c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(metadata))
+
+ with nogil:
+ out_handle = GetResultValue(
+ self.fs.OpenOutputStream(pathstr, c_metadata))
+
+ stream.set_output_stream(out_handle)
+ stream.is_writable = True
+
+ return self._wrap_output_stream(
+ stream, path=path, compression=compression, buffer_size=buffer_size
+ )
+
+ def open_append_stream(self, path, compression='detect',
+ buffer_size=None, metadata=None):
+ """
+ DEPRECATED: Open an output stream for appending.
+
+ If the target doesn't exist, a new empty file is created.
+
+ .. deprecated:: 6.0
+ Several filesystems don't support this functionality
+ and it will be later removed.
+
+ Parameters
+ ----------
+ path : str
+ The source to open for writing.
+ compression : str optional, default 'detect'
+ The compression algorithm to use for on-the-fly compression.
+ If "detect" and source is a file path, then compression will be
+ chosen based on the file extension.
+ If None, no compression will be applied. Otherwise, a well-known
+ algorithm name must be supplied (e.g. "gzip").
+ buffer_size : int optional, default None
+ If None or 0, no buffering will happen. Otherwise the size of the
+ temporary write buffer.
+ metadata : dict optional, default None
+ If not None, a mapping of string keys to string values.
+ Some filesystems support storing metadata along the file
+ (such as "Content-Type").
+ Unsupported metadata keys will be ignored.
+
+ Returns
+ -------
+ stream : NativeFile
+ """
+ cdef:
+ c_string pathstr = _path_as_bytes(path)
+ NativeFile stream = NativeFile()
+ shared_ptr[COutputStream] out_handle
+ shared_ptr[const CKeyValueMetadata] c_metadata
+
+ warnings.warn(
+ "`open_append_stream` is deprecated as of 6.0.0; several "
+ "filesystems don't support it and it will be later removed",
+ FutureWarning)
+
+ if metadata is not None:
+ c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(metadata))
+
+ with nogil:
+ out_handle = GetResultValue(
+ self.fs.OpenAppendStream(pathstr, c_metadata))
+
+ stream.set_output_stream(out_handle)
+ stream.is_writable = True
+
+ return self._wrap_output_stream(
+ stream, path=path, compression=compression, buffer_size=buffer_size
+ )
+
+ def normalize_path(self, path):
+ """
+ Normalize filesystem path.
+
+ Parameters
+ ----------
+ path : str
+ The path to normalize
+
+ Returns
+ -------
+ normalized_path : str
+ The normalized path
+ """
+ cdef:
+ c_string c_path = _path_as_bytes(path)
+ c_string c_path_normalized
+
+ c_path_normalized = GetResultValue(self.fs.NormalizePath(c_path))
+ return frombytes(c_path_normalized)
+
+
+cdef class LocalFileSystem(FileSystem):
+ """
+ A FileSystem implementation accessing files on the local machine.
+
+ Details such as symlinks are abstracted away (symlinks are always followed,
+ except when deleting an entry).
+
+ Parameters
+ ----------
+ use_mmap : bool, default False
+ Whether open_input_stream and open_input_file should return
+ a mmap'ed file or a regular file.
+ """
+
+ def __init__(self, *, use_mmap=False):
+ cdef:
+ CLocalFileSystemOptions opts
+ shared_ptr[CLocalFileSystem] fs
+
+ opts = CLocalFileSystemOptions.Defaults()
+ opts.use_mmap = use_mmap
+
+ fs = make_shared[CLocalFileSystem](opts)
+ self.init(<shared_ptr[CFileSystem]> fs)
+
+ cdef init(self, const shared_ptr[CFileSystem]& c_fs):
+ FileSystem.init(self, c_fs)
+ self.localfs = <CLocalFileSystem*> c_fs.get()
+
+ @classmethod
+ def _reconstruct(cls, kwargs):
+ # __reduce__ doesn't allow passing named arguments directly to the
+ # reconstructor, hence this wrapper.
+ return cls(**kwargs)
+
+ def __reduce__(self):
+ cdef CLocalFileSystemOptions opts = self.localfs.options()
+ return LocalFileSystem._reconstruct, (dict(
+ use_mmap=opts.use_mmap),)
+
+
+cdef class SubTreeFileSystem(FileSystem):
+ """
+ Delegates to another implementation after prepending a fixed base path.
+
+ This is useful to expose a logical view of a subtree of a filesystem,
+ for example a directory in a LocalFileSystem.
+
+ Note, that this makes no security guarantee. For example, symlinks may
+ allow to "escape" the subtree and access other parts of the underlying
+ filesystem.
+
+ Parameters
+ ----------
+ base_path : str
+ The root of the subtree.
+ base_fs : FileSystem
+ FileSystem object the operations delegated to.
+ """
+
+ def __init__(self, base_path, FileSystem base_fs):
+ cdef:
+ c_string pathstr
+ shared_ptr[CSubTreeFileSystem] wrapped
+
+ pathstr = _path_as_bytes(base_path)
+ wrapped = make_shared[CSubTreeFileSystem](pathstr, base_fs.wrapped)
+
+ self.init(<shared_ptr[CFileSystem]> wrapped)
+
+ cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+ FileSystem.init(self, wrapped)
+ self.subtreefs = <CSubTreeFileSystem*> wrapped.get()
+
+ def __repr__(self):
+ return ("SubTreeFileSystem(base_path={}, base_fs={}"
+ .format(self.base_path, self.base_fs))
+
+ def __reduce__(self):
+ return SubTreeFileSystem, (
+ frombytes(self.subtreefs.base_path()),
+ FileSystem.wrap(self.subtreefs.base_fs())
+ )
+
+ @property
+ def base_path(self):
+ return frombytes(self.subtreefs.base_path())
+
+ @property
+ def base_fs(self):
+ return FileSystem.wrap(self.subtreefs.base_fs())
+
+
+cdef class _MockFileSystem(FileSystem):
+
+ def __init__(self, datetime current_time=None):
+ cdef shared_ptr[CMockFileSystem] wrapped
+
+ current_time = current_time or datetime.now()
+ wrapped = make_shared[CMockFileSystem](
+ PyDateTime_to_TimePoint(<PyDateTime_DateTime*> current_time)
+ )
+
+ self.init(<shared_ptr[CFileSystem]> wrapped)
+
+ cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+ FileSystem.init(self, wrapped)
+ self.mockfs = <CMockFileSystem*> wrapped.get()
+
+
+cdef class PyFileSystem(FileSystem):
+ """
+ A FileSystem with behavior implemented in Python.
+
+ Parameters
+ ----------
+ handler : FileSystemHandler
+ The handler object implementing custom filesystem behavior.
+ """
+
+ def __init__(self, handler):
+ cdef:
+ CPyFileSystemVtable vtable
+ shared_ptr[CPyFileSystem] wrapped
+
+ if not isinstance(handler, FileSystemHandler):
+ raise TypeError("Expected a FileSystemHandler instance, got {0}"
+ .format(type(handler)))
+
+ vtable.get_type_name = _cb_get_type_name
+ vtable.equals = _cb_equals
+ vtable.get_file_info = _cb_get_file_info
+ vtable.get_file_info_vector = _cb_get_file_info_vector
+ vtable.get_file_info_selector = _cb_get_file_info_selector
+ vtable.create_dir = _cb_create_dir
+ vtable.delete_dir = _cb_delete_dir
+ vtable.delete_dir_contents = _cb_delete_dir_contents
+ vtable.delete_root_dir_contents = _cb_delete_root_dir_contents
+ vtable.delete_file = _cb_delete_file
+ vtable.move = _cb_move
+ vtable.copy_file = _cb_copy_file
+ vtable.open_input_stream = _cb_open_input_stream
+ vtable.open_input_file = _cb_open_input_file
+ vtable.open_output_stream = _cb_open_output_stream
+ vtable.open_append_stream = _cb_open_append_stream
+ vtable.normalize_path = _cb_normalize_path
+
+ wrapped = CPyFileSystem.Make(handler, move(vtable))
+ self.init(<shared_ptr[CFileSystem]> wrapped)
+
+ cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+ FileSystem.init(self, wrapped)
+ self.pyfs = <CPyFileSystem*> wrapped.get()
+
+ @property
+ def handler(self):
+ """
+ The filesystem's underlying handler.
+
+ Returns
+ -------
+ handler : FileSystemHandler
+ """
+ return <object> self.pyfs.handler()
+
+ def __reduce__(self):
+ return PyFileSystem, (self.handler,)
+
+
+class FileSystemHandler(ABC):
+ """
+ An abstract class exposing methods to implement PyFileSystem's behavior.
+ """
+
+ @abstractmethod
+ def get_type_name(self):
+ """
+ Implement PyFileSystem.type_name.
+ """
+
+ @abstractmethod
+ def get_file_info(self, paths):
+ """
+ Implement PyFileSystem.get_file_info(paths).
+
+ Parameters
+ ----------
+ paths : paths for which we want to retrieve the info.
+ """
+
+ @abstractmethod
+ def get_file_info_selector(self, selector):
+ """
+ Implement PyFileSystem.get_file_info(selector).
+
+ Parameters
+ ----------
+ selector : selector for which we want to retrieve the info.
+ """
+
+ @abstractmethod
+ def create_dir(self, path, recursive):
+ """
+ Implement PyFileSystem.create_dir(...).
+
+ Parameters
+ ----------
+ path : path of the directory.
+ recursive : if the parent directories should be created too.
+ """
+
+ @abstractmethod
+ def delete_dir(self, path):
+ """
+ Implement PyFileSystem.delete_dir(...).
+
+ Parameters
+ ----------
+ path : path of the directory.
+ """
+
+ @abstractmethod
+ def delete_dir_contents(self, path):
+ """
+ Implement PyFileSystem.delete_dir_contents(...).
+
+ Parameters
+ ----------
+ path : path of the directory.
+ """
+
+ @abstractmethod
+ def delete_root_dir_contents(self):
+ """
+ Implement PyFileSystem.delete_dir_contents("/", accept_root_dir=True).
+ """
+
+ @abstractmethod
+ def delete_file(self, path):
+ """
+ Implement PyFileSystem.delete_file(...).
+
+ Parameters
+ ----------
+ path : path of the file.
+ """
+
+ @abstractmethod
+ def move(self, src, dest):
+ """
+ Implement PyFileSystem.move(...).
+
+ Parameters
+ ----------
+ src : path of what should be moved.
+ dest : path of where it should be moved to.
+ """
+
+ @abstractmethod
+ def copy_file(self, src, dest):
+ """
+ Implement PyFileSystem.copy_file(...).
+
+ Parameters
+ ----------
+ src : path of what should be copied.
+ dest : path of where it should be copied to.
+ """
+
+ @abstractmethod
+ def open_input_stream(self, path):
+ """
+ Implement PyFileSystem.open_input_stream(...).
+
+ Parameters
+ ----------
+ path : path of what should be opened.
+ """
+
+ @abstractmethod
+ def open_input_file(self, path):
+ """
+ Implement PyFileSystem.open_input_file(...).
+
+ Parameters
+ ----------
+ path : path of what should be opened.
+ """
+
+ @abstractmethod
+ def open_output_stream(self, path, metadata):
+ """
+ Implement PyFileSystem.open_output_stream(...).
+
+ Parameters
+ ----------
+ path : path of what should be opened.
+ metadata : mapping of string keys to string values.
+ Some filesystems support storing metadata along the file
+ (such as "Content-Type").
+ """
+
+ @abstractmethod
+ def open_append_stream(self, path, metadata):
+ """
+ Implement PyFileSystem.open_append_stream(...).
+
+ Parameters
+ ----------
+ path : path of what should be opened.
+ metadata : mapping of string keys to string values.
+ Some filesystems support storing metadata along the file
+ (such as "Content-Type").
+ """
+
+ @abstractmethod
+ def normalize_path(self, path):
+ """
+ Implement PyFileSystem.normalize_path(...).
+
+ Parameters
+ ----------
+ path : path of what should be normalized.
+ """
+
+# Callback definitions for CPyFileSystemVtable
+
+
+cdef void _cb_get_type_name(handler, c_string* out) except *:
+ out[0] = tobytes("py::" + handler.get_type_name())
+
+cdef c_bool _cb_equals(handler, const CFileSystem& c_other) except False:
+ if c_other.type_name().startswith(b"py::"):
+ return <object> (<const CPyFileSystem&> c_other).handler() == handler
+
+ return False
+
+cdef void _cb_get_file_info(handler, const c_string& path,
+ CFileInfo* out) except *:
+ infos = handler.get_file_info([frombytes(path)])
+ if not isinstance(infos, list) or len(infos) != 1:
+ raise TypeError("get_file_info should have returned a 1-element list")
+ out[0] = FileInfo.unwrap_safe(infos[0])
+
+cdef void _cb_get_file_info_vector(handler, const vector[c_string]& paths,
+ vector[CFileInfo]* out) except *:
+ py_paths = [frombytes(paths[i]) for i in range(len(paths))]
+ infos = handler.get_file_info(py_paths)
+ if not isinstance(infos, list):
+ raise TypeError("get_file_info should have returned a list")
+ out[0].clear()
+ out[0].reserve(len(infos))
+ for info in infos:
+ out[0].push_back(FileInfo.unwrap_safe(info))
+
+cdef void _cb_get_file_info_selector(handler, const CFileSelector& selector,
+ vector[CFileInfo]* out) except *:
+ infos = handler.get_file_info_selector(FileSelector.wrap(selector))
+ if not isinstance(infos, list):
+ raise TypeError("get_file_info_selector should have returned a list")
+ out[0].clear()
+ out[0].reserve(len(infos))
+ for info in infos:
+ out[0].push_back(FileInfo.unwrap_safe(info))
+
+cdef void _cb_create_dir(handler, const c_string& path,
+ c_bool recursive) except *:
+ handler.create_dir(frombytes(path), recursive)
+
+cdef void _cb_delete_dir(handler, const c_string& path) except *:
+ handler.delete_dir(frombytes(path))
+
+cdef void _cb_delete_dir_contents(handler, const c_string& path) except *:
+ handler.delete_dir_contents(frombytes(path))
+
+cdef void _cb_delete_root_dir_contents(handler) except *:
+ handler.delete_root_dir_contents()
+
+cdef void _cb_delete_file(handler, const c_string& path) except *:
+ handler.delete_file(frombytes(path))
+
+cdef void _cb_move(handler, const c_string& src,
+ const c_string& dest) except *:
+ handler.move(frombytes(src), frombytes(dest))
+
+cdef void _cb_copy_file(handler, const c_string& src,
+ const c_string& dest) except *:
+ handler.copy_file(frombytes(src), frombytes(dest))
+
+cdef void _cb_open_input_stream(handler, const c_string& path,
+ shared_ptr[CInputStream]* out) except *:
+ stream = handler.open_input_stream(frombytes(path))
+ if not isinstance(stream, NativeFile):
+ raise TypeError("open_input_stream should have returned "
+ "a PyArrow file")
+ out[0] = (<NativeFile> stream).get_input_stream()
+
+cdef void _cb_open_input_file(handler, const c_string& path,
+ shared_ptr[CRandomAccessFile]* out) except *:
+ stream = handler.open_input_file(frombytes(path))
+ if not isinstance(stream, NativeFile):
+ raise TypeError("open_input_file should have returned "
+ "a PyArrow file")
+ out[0] = (<NativeFile> stream).get_random_access_file()
+
+cdef void _cb_open_output_stream(
+ handler, const c_string& path,
+ const shared_ptr[const CKeyValueMetadata]& metadata,
+ shared_ptr[COutputStream]* out) except *:
+ stream = handler.open_output_stream(
+ frombytes(path), pyarrow_wrap_metadata(metadata))
+ if not isinstance(stream, NativeFile):
+ raise TypeError("open_output_stream should have returned "
+ "a PyArrow file")
+ out[0] = (<NativeFile> stream).get_output_stream()
+
+cdef void _cb_open_append_stream(
+ handler, const c_string& path,
+ const shared_ptr[const CKeyValueMetadata]& metadata,
+ shared_ptr[COutputStream]* out) except *:
+ stream = handler.open_append_stream(
+ frombytes(path), pyarrow_wrap_metadata(metadata))
+ if not isinstance(stream, NativeFile):
+ raise TypeError("open_append_stream should have returned "
+ "a PyArrow file")
+ out[0] = (<NativeFile> stream).get_output_stream()
+
+cdef void _cb_normalize_path(handler, const c_string& path,
+ c_string* out) except *:
+ out[0] = tobytes(handler.normalize_path(frombytes(path)))
+
+
+def _copy_files(FileSystem source_fs, str source_path,
+ FileSystem destination_fs, str destination_path,
+ int64_t chunk_size, c_bool use_threads):
+ # low-level helper exposed through pyarrow/fs.py::copy_files
+ cdef:
+ CFileLocator c_source
+ vector[CFileLocator] c_sources
+ CFileLocator c_destination
+ vector[CFileLocator] c_destinations
+ FileSystem fs
+ CStatus c_status
+ shared_ptr[CFileSystem] c_fs
+
+ c_source.filesystem = source_fs.unwrap()
+ c_source.path = tobytes(source_path)
+ c_sources.push_back(c_source)
+
+ c_destination.filesystem = destination_fs.unwrap()
+ c_destination.path = tobytes(destination_path)
+ c_destinations.push_back(c_destination)
+
+ with nogil:
+ check_status(CCopyFiles(
+ c_sources, c_destinations,
+ c_default_io_context(), chunk_size, use_threads,
+ ))
+
+
+def _copy_files_selector(FileSystem source_fs, FileSelector source_sel,
+ FileSystem destination_fs, str destination_base_dir,
+ int64_t chunk_size, c_bool use_threads):
+ # low-level helper exposed through pyarrow/fs.py::copy_files
+ cdef c_string c_destination_base_dir = tobytes(destination_base_dir)
+
+ with nogil:
+ check_status(CCopyFilesWithSelector(
+ source_fs.unwrap(), source_sel.unwrap(),
+ destination_fs.unwrap(), c_destination_base_dir,
+ c_default_io_context(), chunk_size, use_threads,
+ ))