diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_hdfs.pyx | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_hdfs.pyx')
-rw-r--r-- | src/arrow/python/pyarrow/_hdfs.pyx | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_hdfs.pyx b/src/arrow/python/pyarrow/_hdfs.pyx new file mode 100644 index 000000000..7a3b974be --- /dev/null +++ b/src/arrow/python/pyarrow/_hdfs.pyx @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.lib cimport check_status +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_fs cimport * +from pyarrow._fs cimport FileSystem + +from pyarrow.lib import frombytes, tobytes +from pyarrow.util import _stringify_path + + +cdef class HadoopFileSystem(FileSystem): + """ + HDFS backed FileSystem implementation + + Parameters + ---------- + host : str + HDFS host to connect to. Set to "default" for fs.defaultFS from + core-site.xml. + port : int, default 8020 + HDFS port to connect to. Set to 0 for default or logical (HA) nodes. + user : str, default None + Username when connecting to HDFS; None implies login user. + replication : int, default 3 + Number of copies each block will have. + buffer_size : int, default 0 + If 0, no buffering will happen otherwise the size of the temporary read + and write buffer. + default_block_size : int, default None + None means the default configuration for HDFS, a typical block size is + 128 MB. + kerb_ticket : string or path, default None + If not None, the path to the Kerberos ticket cache. + extra_conf : dict, default None + Extra key/value pairs for configuration; will override any + hdfs-site.xml properties. + """ + + cdef: + CHadoopFileSystem* hdfs + + def __init__(self, str host, int port=8020, *, str user=None, + int replication=3, int buffer_size=0, + default_block_size=None, kerb_ticket=None, + extra_conf=None): + cdef: + CHdfsOptions options + shared_ptr[CHadoopFileSystem] wrapped + + if not host.startswith(('hdfs://', 'viewfs://')) and host != "default": + # TODO(kszucs): do more sanitization + host = 'hdfs://{}'.format(host) + + options.ConfigureEndPoint(tobytes(host), int(port)) + options.ConfigureReplication(replication) + options.ConfigureBufferSize(buffer_size) + + if user is not None: + options.ConfigureUser(tobytes(user)) + if default_block_size is not None: + options.ConfigureBlockSize(default_block_size) + if kerb_ticket is not None: + options.ConfigureKerberosTicketCachePath( + tobytes(_stringify_path(kerb_ticket))) + if extra_conf is not None: + for k, v in extra_conf.items(): + options.ConfigureExtraConf(tobytes(k), tobytes(v)) + + with nogil: + wrapped = GetResultValue(CHadoopFileSystem.Make(options)) + self.init(<shared_ptr[CFileSystem]> wrapped) + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + FileSystem.init(self, wrapped) + self.hdfs = <CHadoopFileSystem*> wrapped.get() + + @staticmethod + def from_uri(uri): + """ + Instantiate HadoopFileSystem object from an URI string. + + The following two calls are equivalent + + * ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test\ +&replication=1')`` + * ``HadoopFileSystem('localhost', port=8020, user='test', \ +replication=1)`` + + Parameters + ---------- + uri : str + A string URI describing the connection to HDFS. + In order to change the user, replication, buffer_size or + default_block_size pass the values as query parts. + + Returns + ------- + HadoopFileSystem + """ + cdef: + HadoopFileSystem self = HadoopFileSystem.__new__(HadoopFileSystem) + shared_ptr[CHadoopFileSystem] wrapped + CHdfsOptions options + + options = GetResultValue(CHdfsOptions.FromUriString(tobytes(uri))) + with nogil: + wrapped = GetResultValue(CHadoopFileSystem.Make(options)) + + self.init(<shared_ptr[CFileSystem]> wrapped) + return self + + @classmethod + def _reconstruct(cls, kwargs): + return cls(**kwargs) + + def __reduce__(self): + cdef CHdfsOptions opts = self.hdfs.options() + return ( + HadoopFileSystem._reconstruct, (dict( + host=frombytes(opts.connection_config.host), + port=opts.connection_config.port, + user=frombytes(opts.connection_config.user), + replication=opts.replication, + buffer_size=opts.buffer_size, + default_block_size=opts.default_block_size, + kerb_ticket=frombytes(opts.connection_config.kerb_ticket), + extra_conf={frombytes(k): frombytes(v) + for k, v in opts.connection_config.extra_conf}, + ),) + ) |