# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # cython: language_level = 3 from cpython.datetime cimport datetime, PyDateTime_DateTime from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport PyDateTime_to_TimePoint from pyarrow.lib import _detect_compression, frombytes, tobytes from pyarrow.lib cimport * from pyarrow.util import _stringify_path from abc import ABC, abstractmethod from datetime import datetime, timezone import os import pathlib import sys import warnings cdef _init_ca_paths(): cdef CFileSystemGlobalOptions options import ssl paths = ssl.get_default_verify_paths() if paths.cafile: options.tls_ca_file_path = os.fsencode(paths.cafile) if paths.capath: options.tls_ca_dir_path = os.fsencode(paths.capath) check_status(CFileSystemsInitialize(options)) if sys.platform == 'linux': # ARROW-9261: On Linux, we may need to fixup the paths to TLS CA certs # (especially in manylinux packages) since the values hardcoded at # compile-time in libcurl may be wrong. _init_ca_paths() cdef inline c_string _path_as_bytes(path) except *: # handle only abstract paths, not bound to any filesystem like pathlib is, # so we only accept plain strings if not isinstance(path, (bytes, str)): raise TypeError('Path must be a string') # tobytes always uses utf-8, which is more or less ok, at least on Windows # since the C++ side then decodes from utf-8. On Unix, os.fsencode may be # better. return tobytes(path) cdef object _wrap_file_type(CFileType ty): return FileType( ty) cdef CFileType _unwrap_file_type(FileType ty) except *: if ty == FileType.Unknown: return CFileType_Unknown elif ty == FileType.NotFound: return CFileType_NotFound elif ty == FileType.File: return CFileType_File elif ty == FileType.Directory: return CFileType_Directory assert 0 cdef class FileInfo(_Weakrefable): """ FileSystem entry info. Parameters ---------- path : str The full path to the filesystem entry. type : FileType The type of the filesystem entry. mtime : datetime or float, default None If given, the modification time of the filesystem entry. If a float is given, it is the number of seconds since the Unix epoch. mtime_ns : int, default None If given, the modification time of the filesystem entry, in nanoseconds since the Unix epoch. `mtime` and `mtime_ns` are mutually exclusive. size : int, default None If given, the filesystem entry size in bytes. This should only be given if `type` is `FileType.File`. """ def __init__(self, path, FileType type=FileType.Unknown, *, mtime=None, mtime_ns=None, size=None): self.info.set_path(tobytes(path)) self.info.set_type(_unwrap_file_type(type)) if mtime is not None: if mtime_ns is not None: raise TypeError("Only one of mtime and mtime_ns " "can be given") if isinstance(mtime, datetime): self.info.set_mtime(PyDateTime_to_TimePoint( mtime)) else: self.info.set_mtime(TimePoint_from_s(mtime)) elif mtime_ns is not None: self.info.set_mtime(TimePoint_from_ns(mtime_ns)) if size is not None: self.info.set_size(size) @staticmethod cdef wrap(CFileInfo info): cdef FileInfo self = FileInfo.__new__(FileInfo) self.info = move(info) return self cdef inline CFileInfo unwrap(self) nogil: return self.info @staticmethod cdef CFileInfo unwrap_safe(obj): if not isinstance(obj, FileInfo): raise TypeError("Expected FileInfo instance, got {0}" .format(type(obj))) return ( obj).unwrap() def __repr__(self): def getvalue(attr): try: return getattr(self, attr) except ValueError: return '' s = '".format(self)) cdef class FileSystem(_Weakrefable): """ Abstract file system API. """ def __init__(self): raise TypeError("FileSystem is an abstract class, instantiate one of " "the subclasses instead: LocalFileSystem or " "SubTreeFileSystem") @staticmethod def from_uri(uri): """ Create a new FileSystem from URI or Path. Recognized URI schemes are "file", "mock", "s3fs", "hdfs" and "viewfs". In addition, the argument can be a pathlib.Path object, or a string describing an absolute local path. Parameters ---------- uri : string URI-based path, for example: file:///some/local/path. Returns ------- With (filesystem, path) tuple where path is the abstract path inside the FileSystem instance. """ cdef: c_string path CResult[shared_ptr[CFileSystem]] result if isinstance(uri, pathlib.Path): # Make absolute uri = uri.resolve().absolute() uri = _stringify_path(uri) result = CFileSystemFromUriOrPath(tobytes(uri), &path) return FileSystem.wrap(GetResultValue(result)), frombytes(path) cdef init(self, const shared_ptr[CFileSystem]& wrapped): self.wrapped = wrapped self.fs = wrapped.get() @staticmethod cdef wrap(const shared_ptr[CFileSystem]& sp): cdef FileSystem self typ = frombytes(sp.get().type_name()) if typ == 'local': self = LocalFileSystem.__new__(LocalFileSystem) elif typ == 'mock': self = _MockFileSystem.__new__(_MockFileSystem) elif typ == 'subtree': self = SubTreeFileSystem.__new__(SubTreeFileSystem) elif typ == 's3': from pyarrow._s3fs import S3FileSystem self = S3FileSystem.__new__(S3FileSystem) elif typ == 'hdfs': from pyarrow._hdfs import HadoopFileSystem self = HadoopFileSystem.__new__(HadoopFileSystem) elif typ.startswith('py::'): self = PyFileSystem.__new__(PyFileSystem) else: raise TypeError('Cannot wrap FileSystem pointer') self.init(sp) return self cdef inline shared_ptr[CFileSystem] unwrap(self) nogil: return self.wrapped def equals(self, FileSystem other): return self.fs.Equals(other.unwrap()) def __eq__(self, other): try: return self.equals(other) except TypeError: return NotImplemented @property def type_name(self): """ The filesystem's type name. """ return frombytes(self.fs.type_name()) def get_file_info(self, paths_or_selector): """ Get info for the given files. Any symlink is automatically dereferenced, recursively. A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound. An exception indicates a truly exceptional condition (low-level I/O error, etc.). Parameters ---------- paths_or_selector: FileSelector, path-like or list of path-likes Either a selector object, a path-like object or a list of path-like objects. The selector's base directory will not be part of the results, even if it exists. If it doesn't exist, use `allow_not_found`. Returns ------- FileInfo or list of FileInfo Single FileInfo object is returned for a single path, otherwise a list of FileInfo objects is returned. """ cdef: CFileInfo info c_string path vector[CFileInfo] infos vector[c_string] paths CFileSelector selector if isinstance(paths_or_selector, FileSelector): with nogil: selector = (paths_or_selector).selector infos = GetResultValue(self.fs.GetFileInfo(selector)) elif isinstance(paths_or_selector, (list, tuple)): paths = [_path_as_bytes(s) for s in paths_or_selector] with nogil: infos = GetResultValue(self.fs.GetFileInfo(paths)) elif isinstance(paths_or_selector, (bytes, str)): path =_path_as_bytes(paths_or_selector) with nogil: info = GetResultValue(self.fs.GetFileInfo(path)) return FileInfo.wrap(info) else: raise TypeError('Must pass either path(s) or a FileSelector') return [FileInfo.wrap(info) for info in infos] def create_dir(self, path, *, bint recursive=True): """ Create a directory and subdirectories. This function succeeds if the directory already exists. Parameters ---------- path : str The path of the new directory. recursive: bool, default True Create nested directories as well. """ cdef c_string directory = _path_as_bytes(path) with nogil: check_status(self.fs.CreateDir(directory, recursive=recursive)) def delete_dir(self, path): """Delete a directory and its contents, recursively. Parameters ---------- path : str The path of the directory to be deleted. """ cdef c_string directory = _path_as_bytes(path) with nogil: check_status(self.fs.DeleteDir(directory)) def delete_dir_contents(self, path, *, bint accept_root_dir=False): """Delete a directory's contents, recursively. Like delete_dir, but doesn't delete the directory itself. Parameters ---------- path : str The path of the directory to be deleted. accept_root_dir : boolean, default False Allow deleting the root directory's contents (if path is empty or "/") """ cdef c_string directory = _path_as_bytes(path) if accept_root_dir and directory.strip(b"/") == b"": with nogil: check_status(self.fs.DeleteRootDirContents()) else: with nogil: check_status(self.fs.DeleteDirContents(directory)) def move(self, src, dest): """ Move / rename a file or directory. If the destination exists: - if it is a non-empty directory, an error is returned - otherwise, if it has the same type as the source, it is replaced - otherwise, behavior is unspecified (implementation-dependent). Parameters ---------- src : str The path of the file or the directory to be moved. dest : str The destination path where the file or directory is moved to. """ cdef: c_string source = _path_as_bytes(src) c_string destination = _path_as_bytes(dest) with nogil: check_status(self.fs.Move(source, destination)) def copy_file(self, src, dest): """ Copy a file. If the destination exists and is a directory, an error is returned. Otherwise, it is replaced. Parameters ---------- src : str The path of the file to be copied from. dest : str The destination path where the file is copied to. """ cdef: c_string source = _path_as_bytes(src) c_string destination = _path_as_bytes(dest) with nogil: check_status(self.fs.CopyFile(source, destination)) def delete_file(self, path): """ Delete a file. Parameters ---------- path : str The path of the file to be deleted. """ cdef c_string file = _path_as_bytes(path) with nogil: check_status(self.fs.DeleteFile(file)) def _wrap_input_stream(self, stream, path, compression, buffer_size): if buffer_size is not None and buffer_size != 0: stream = BufferedInputStream(stream, buffer_size) if compression == 'detect': compression = _detect_compression(path) if compression is not None: stream = CompressedInputStream(stream, compression) return stream def _wrap_output_stream(self, stream, path, compression, buffer_size): if buffer_size is not None and buffer_size != 0: stream = BufferedOutputStream(stream, buffer_size) if compression == 'detect': compression = _detect_compression(path) if compression is not None: stream = CompressedOutputStream(stream, compression) return stream def open_input_file(self, path): """ Open an input file for random access reading. Parameters ---------- path : str The source to open for reading. Returns ------- stram : NativeFile """ cdef: c_string pathstr = _path_as_bytes(path) NativeFile stream = NativeFile() shared_ptr[CRandomAccessFile] in_handle with nogil: in_handle = GetResultValue(self.fs.OpenInputFile(pathstr)) stream.set_random_access_file(in_handle) stream.is_readable = True return stream def open_input_stream(self, path, compression='detect', buffer_size=None): """ Open an input stream for sequential reading. Parameters ---------- source : str The source to open for reading. compression : str optional, default 'detect' The compression algorithm to use for on-the-fly decompression. If "detect" and source is a file path, then compression will be chosen based on the file extension. If None, no compression will be applied. Otherwise, a well-known algorithm name must be supplied (e.g. "gzip"). buffer_size : int optional, default None If None or 0, no buffering will happen. Otherwise the size of the temporary read buffer. Returns ------- stream : NativeFile """ cdef: c_string pathstr = _path_as_bytes(path) NativeFile stream = NativeFile() shared_ptr[CInputStream] in_handle with nogil: in_handle = GetResultValue(self.fs.OpenInputStream(pathstr)) stream.set_input_stream(in_handle) stream.is_readable = True return self._wrap_input_stream( stream, path=path, compression=compression, buffer_size=buffer_size ) def open_output_stream(self, path, compression='detect', buffer_size=None, metadata=None): """ Open an output stream for sequential writing. If the target already exists, existing data is truncated. Parameters ---------- path : str The source to open for writing. compression : str optional, default 'detect' The compression algorithm to use for on-the-fly compression. If "detect" and source is a file path, then compression will be chosen based on the file extension. If None, no compression will be applied. Otherwise, a well-known algorithm name must be supplied (e.g. "gzip"). buffer_size : int optional, default None If None or 0, no buffering will happen. Otherwise the size of the temporary write buffer. metadata : dict optional, default None If not None, a mapping of string keys to string values. Some filesystems support storing metadata along the file (such as "Content-Type"). Unsupported metadata keys will be ignored. Returns ------- stream : NativeFile """ cdef: c_string pathstr = _path_as_bytes(path) NativeFile stream = NativeFile() shared_ptr[COutputStream] out_handle shared_ptr[const CKeyValueMetadata] c_metadata if metadata is not None: c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(metadata)) with nogil: out_handle = GetResultValue( self.fs.OpenOutputStream(pathstr, c_metadata)) stream.set_output_stream(out_handle) stream.is_writable = True return self._wrap_output_stream( stream, path=path, compression=compression, buffer_size=buffer_size ) def open_append_stream(self, path, compression='detect', buffer_size=None, metadata=None): """ DEPRECATED: Open an output stream for appending. If the target doesn't exist, a new empty file is created. .. deprecated:: 6.0 Several filesystems don't support this functionality and it will be later removed. Parameters ---------- path : str The source to open for writing. compression : str optional, default 'detect' The compression algorithm to use for on-the-fly compression. If "detect" and source is a file path, then compression will be chosen based on the file extension. If None, no compression will be applied. Otherwise, a well-known algorithm name must be supplied (e.g. "gzip"). buffer_size : int optional, default None If None or 0, no buffering will happen. Otherwise the size of the temporary write buffer. metadata : dict optional, default None If not None, a mapping of string keys to string values. Some filesystems support storing metadata along the file (such as "Content-Type"). Unsupported metadata keys will be ignored. Returns ------- stream : NativeFile """ cdef: c_string pathstr = _path_as_bytes(path) NativeFile stream = NativeFile() shared_ptr[COutputStream] out_handle shared_ptr[const CKeyValueMetadata] c_metadata warnings.warn( "`open_append_stream` is deprecated as of 6.0.0; several " "filesystems don't support it and it will be later removed", FutureWarning) if metadata is not None: c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(metadata)) with nogil: out_handle = GetResultValue( self.fs.OpenAppendStream(pathstr, c_metadata)) stream.set_output_stream(out_handle) stream.is_writable = True return self._wrap_output_stream( stream, path=path, compression=compression, buffer_size=buffer_size ) def normalize_path(self, path): """ Normalize filesystem path. Parameters ---------- path : str The path to normalize Returns ------- normalized_path : str The normalized path """ cdef: c_string c_path = _path_as_bytes(path) c_string c_path_normalized c_path_normalized = GetResultValue(self.fs.NormalizePath(c_path)) return frombytes(c_path_normalized) cdef class LocalFileSystem(FileSystem): """ A FileSystem implementation accessing files on the local machine. Details such as symlinks are abstracted away (symlinks are always followed, except when deleting an entry). Parameters ---------- use_mmap : bool, default False Whether open_input_stream and open_input_file should return a mmap'ed file or a regular file. """ def __init__(self, *, use_mmap=False): cdef: CLocalFileSystemOptions opts shared_ptr[CLocalFileSystem] fs opts = CLocalFileSystemOptions.Defaults() opts.use_mmap = use_mmap fs = make_shared[CLocalFileSystem](opts) self.init( fs) cdef init(self, const shared_ptr[CFileSystem]& c_fs): FileSystem.init(self, c_fs) self.localfs = c_fs.get() @classmethod def _reconstruct(cls, kwargs): # __reduce__ doesn't allow passing named arguments directly to the # reconstructor, hence this wrapper. return cls(**kwargs) def __reduce__(self): cdef CLocalFileSystemOptions opts = self.localfs.options() return LocalFileSystem._reconstruct, (dict( use_mmap=opts.use_mmap),) cdef class SubTreeFileSystem(FileSystem): """ Delegates to another implementation after prepending a fixed base path. This is useful to expose a logical view of a subtree of a filesystem, for example a directory in a LocalFileSystem. Note, that this makes no security guarantee. For example, symlinks may allow to "escape" the subtree and access other parts of the underlying filesystem. Parameters ---------- base_path : str The root of the subtree. base_fs : FileSystem FileSystem object the operations delegated to. """ def __init__(self, base_path, FileSystem base_fs): cdef: c_string pathstr shared_ptr[CSubTreeFileSystem] wrapped pathstr = _path_as_bytes(base_path) wrapped = make_shared[CSubTreeFileSystem](pathstr, base_fs.wrapped) self.init( wrapped) cdef init(self, const shared_ptr[CFileSystem]& wrapped): FileSystem.init(self, wrapped) self.subtreefs = wrapped.get() def __repr__(self): return ("SubTreeFileSystem(base_path={}, base_fs={}" .format(self.base_path, self.base_fs)) def __reduce__(self): return SubTreeFileSystem, ( frombytes(self.subtreefs.base_path()), FileSystem.wrap(self.subtreefs.base_fs()) ) @property def base_path(self): return frombytes(self.subtreefs.base_path()) @property def base_fs(self): return FileSystem.wrap(self.subtreefs.base_fs()) cdef class _MockFileSystem(FileSystem): def __init__(self, datetime current_time=None): cdef shared_ptr[CMockFileSystem] wrapped current_time = current_time or datetime.now() wrapped = make_shared[CMockFileSystem]( PyDateTime_to_TimePoint( current_time) ) self.init( wrapped) cdef init(self, const shared_ptr[CFileSystem]& wrapped): FileSystem.init(self, wrapped) self.mockfs = wrapped.get() cdef class PyFileSystem(FileSystem): """ A FileSystem with behavior implemented in Python. Parameters ---------- handler : FileSystemHandler The handler object implementing custom filesystem behavior. """ def __init__(self, handler): cdef: CPyFileSystemVtable vtable shared_ptr[CPyFileSystem] wrapped if not isinstance(handler, FileSystemHandler): raise TypeError("Expected a FileSystemHandler instance, got {0}" .format(type(handler))) vtable.get_type_name = _cb_get_type_name vtable.equals = _cb_equals vtable.get_file_info = _cb_get_file_info vtable.get_file_info_vector = _cb_get_file_info_vector vtable.get_file_info_selector = _cb_get_file_info_selector vtable.create_dir = _cb_create_dir vtable.delete_dir = _cb_delete_dir vtable.delete_dir_contents = _cb_delete_dir_contents vtable.delete_root_dir_contents = _cb_delete_root_dir_contents vtable.delete_file = _cb_delete_file vtable.move = _cb_move vtable.copy_file = _cb_copy_file vtable.open_input_stream = _cb_open_input_stream vtable.open_input_file = _cb_open_input_file vtable.open_output_stream = _cb_open_output_stream vtable.open_append_stream = _cb_open_append_stream vtable.normalize_path = _cb_normalize_path wrapped = CPyFileSystem.Make(handler, move(vtable)) self.init( wrapped) cdef init(self, const shared_ptr[CFileSystem]& wrapped): FileSystem.init(self, wrapped) self.pyfs = wrapped.get() @property def handler(self): """ The filesystem's underlying handler. Returns ------- handler : FileSystemHandler """ return self.pyfs.handler() def __reduce__(self): return PyFileSystem, (self.handler,) class FileSystemHandler(ABC): """ An abstract class exposing methods to implement PyFileSystem's behavior. """ @abstractmethod def get_type_name(self): """ Implement PyFileSystem.type_name. """ @abstractmethod def get_file_info(self, paths): """ Implement PyFileSystem.get_file_info(paths). Parameters ---------- paths : paths for which we want to retrieve the info. """ @abstractmethod def get_file_info_selector(self, selector): """ Implement PyFileSystem.get_file_info(selector). Parameters ---------- selector : selector for which we want to retrieve the info. """ @abstractmethod def create_dir(self, path, recursive): """ Implement PyFileSystem.create_dir(...). Parameters ---------- path : path of the directory. recursive : if the parent directories should be created too. """ @abstractmethod def delete_dir(self, path): """ Implement PyFileSystem.delete_dir(...). Parameters ---------- path : path of the directory. """ @abstractmethod def delete_dir_contents(self, path): """ Implement PyFileSystem.delete_dir_contents(...). Parameters ---------- path : path of the directory. """ @abstractmethod def delete_root_dir_contents(self): """ Implement PyFileSystem.delete_dir_contents("/", accept_root_dir=True). """ @abstractmethod def delete_file(self, path): """ Implement PyFileSystem.delete_file(...). Parameters ---------- path : path of the file. """ @abstractmethod def move(self, src, dest): """ Implement PyFileSystem.move(...). Parameters ---------- src : path of what should be moved. dest : path of where it should be moved to. """ @abstractmethod def copy_file(self, src, dest): """ Implement PyFileSystem.copy_file(...). Parameters ---------- src : path of what should be copied. dest : path of where it should be copied to. """ @abstractmethod def open_input_stream(self, path): """ Implement PyFileSystem.open_input_stream(...). Parameters ---------- path : path of what should be opened. """ @abstractmethod def open_input_file(self, path): """ Implement PyFileSystem.open_input_file(...). Parameters ---------- path : path of what should be opened. """ @abstractmethod def open_output_stream(self, path, metadata): """ Implement PyFileSystem.open_output_stream(...). Parameters ---------- path : path of what should be opened. metadata : mapping of string keys to string values. Some filesystems support storing metadata along the file (such as "Content-Type"). """ @abstractmethod def open_append_stream(self, path, metadata): """ Implement PyFileSystem.open_append_stream(...). Parameters ---------- path : path of what should be opened. metadata : mapping of string keys to string values. Some filesystems support storing metadata along the file (such as "Content-Type"). """ @abstractmethod def normalize_path(self, path): """ Implement PyFileSystem.normalize_path(...). Parameters ---------- path : path of what should be normalized. """ # Callback definitions for CPyFileSystemVtable cdef void _cb_get_type_name(handler, c_string* out) except *: out[0] = tobytes("py::" + handler.get_type_name()) cdef c_bool _cb_equals(handler, const CFileSystem& c_other) except False: if c_other.type_name().startswith(b"py::"): return ( c_other).handler() == handler return False cdef void _cb_get_file_info(handler, const c_string& path, CFileInfo* out) except *: infos = handler.get_file_info([frombytes(path)]) if not isinstance(infos, list) or len(infos) != 1: raise TypeError("get_file_info should have returned a 1-element list") out[0] = FileInfo.unwrap_safe(infos[0]) cdef void _cb_get_file_info_vector(handler, const vector[c_string]& paths, vector[CFileInfo]* out) except *: py_paths = [frombytes(paths[i]) for i in range(len(paths))] infos = handler.get_file_info(py_paths) if not isinstance(infos, list): raise TypeError("get_file_info should have returned a list") out[0].clear() out[0].reserve(len(infos)) for info in infos: out[0].push_back(FileInfo.unwrap_safe(info)) cdef void _cb_get_file_info_selector(handler, const CFileSelector& selector, vector[CFileInfo]* out) except *: infos = handler.get_file_info_selector(FileSelector.wrap(selector)) if not isinstance(infos, list): raise TypeError("get_file_info_selector should have returned a list") out[0].clear() out[0].reserve(len(infos)) for info in infos: out[0].push_back(FileInfo.unwrap_safe(info)) cdef void _cb_create_dir(handler, const c_string& path, c_bool recursive) except *: handler.create_dir(frombytes(path), recursive) cdef void _cb_delete_dir(handler, const c_string& path) except *: handler.delete_dir(frombytes(path)) cdef void _cb_delete_dir_contents(handler, const c_string& path) except *: handler.delete_dir_contents(frombytes(path)) cdef void _cb_delete_root_dir_contents(handler) except *: handler.delete_root_dir_contents() cdef void _cb_delete_file(handler, const c_string& path) except *: handler.delete_file(frombytes(path)) cdef void _cb_move(handler, const c_string& src, const c_string& dest) except *: handler.move(frombytes(src), frombytes(dest)) cdef void _cb_copy_file(handler, const c_string& src, const c_string& dest) except *: handler.copy_file(frombytes(src), frombytes(dest)) cdef void _cb_open_input_stream(handler, const c_string& path, shared_ptr[CInputStream]* out) except *: stream = handler.open_input_stream(frombytes(path)) if not isinstance(stream, NativeFile): raise TypeError("open_input_stream should have returned " "a PyArrow file") out[0] = ( stream).get_input_stream() cdef void _cb_open_input_file(handler, const c_string& path, shared_ptr[CRandomAccessFile]* out) except *: stream = handler.open_input_file(frombytes(path)) if not isinstance(stream, NativeFile): raise TypeError("open_input_file should have returned " "a PyArrow file") out[0] = ( stream).get_random_access_file() cdef void _cb_open_output_stream( handler, const c_string& path, const shared_ptr[const CKeyValueMetadata]& metadata, shared_ptr[COutputStream]* out) except *: stream = handler.open_output_stream( frombytes(path), pyarrow_wrap_metadata(metadata)) if not isinstance(stream, NativeFile): raise TypeError("open_output_stream should have returned " "a PyArrow file") out[0] = ( stream).get_output_stream() cdef void _cb_open_append_stream( handler, const c_string& path, const shared_ptr[const CKeyValueMetadata]& metadata, shared_ptr[COutputStream]* out) except *: stream = handler.open_append_stream( frombytes(path), pyarrow_wrap_metadata(metadata)) if not isinstance(stream, NativeFile): raise TypeError("open_append_stream should have returned " "a PyArrow file") out[0] = ( stream).get_output_stream() cdef void _cb_normalize_path(handler, const c_string& path, c_string* out) except *: out[0] = tobytes(handler.normalize_path(frombytes(path))) def _copy_files(FileSystem source_fs, str source_path, FileSystem destination_fs, str destination_path, int64_t chunk_size, c_bool use_threads): # low-level helper exposed through pyarrow/fs.py::copy_files cdef: CFileLocator c_source vector[CFileLocator] c_sources CFileLocator c_destination vector[CFileLocator] c_destinations FileSystem fs CStatus c_status shared_ptr[CFileSystem] c_fs c_source.filesystem = source_fs.unwrap() c_source.path = tobytes(source_path) c_sources.push_back(c_source) c_destination.filesystem = destination_fs.unwrap() c_destination.path = tobytes(destination_path) c_destinations.push_back(c_destination) with nogil: check_status(CCopyFiles( c_sources, c_destinations, c_default_io_context(), chunk_size, use_threads, )) def _copy_files_selector(FileSystem source_fs, FileSelector source_sel, FileSystem destination_fs, str destination_base_dir, int64_t chunk_size, c_bool use_threads): # low-level helper exposed through pyarrow/fs.py::copy_files cdef c_string c_destination_base_dir = tobytes(destination_base_dir) with nogil: check_status(CCopyFilesWithSelector( source_fs.unwrap(), source_sel.unwrap(), destination_fs.unwrap(), c_destination_base_dir, c_default_io_context(), chunk_size, use_threads, ))