diff options
Diffstat (limited to '')
-rw-r--r-- | src/arrow/python/pyarrow/hdfs.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/hdfs.py b/src/arrow/python/pyarrow/hdfs.py new file mode 100644 index 000000000..56667bd5d --- /dev/null +++ b/src/arrow/python/pyarrow/hdfs.py @@ -0,0 +1,240 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import os +import posixpath +import sys +import warnings + +from pyarrow.util import implements, _DEPR_MSG +from pyarrow.filesystem import FileSystem +import pyarrow._hdfsio as _hdfsio + + +class HadoopFileSystem(_hdfsio.HadoopFileSystem, FileSystem): + """ + DEPRECATED: FileSystem interface for HDFS cluster. + + See pyarrow.hdfs.connect for full connection details + + .. deprecated:: 2.0 + ``pyarrow.hdfs.HadoopFileSystem`` is deprecated, + please use ``pyarrow.fs.HadoopFileSystem`` instead. + """ + + def __init__(self, host="default", port=0, user=None, kerb_ticket=None, + driver='libhdfs', extra_conf=None): + warnings.warn( + _DEPR_MSG.format( + "hdfs.HadoopFileSystem", "2.0.0", "fs.HadoopFileSystem"), + FutureWarning, stacklevel=2) + if driver == 'libhdfs': + _maybe_set_hadoop_classpath() + + self._connect(host, port, user, kerb_ticket, extra_conf) + + def __reduce__(self): + return (HadoopFileSystem, (self.host, self.port, self.user, + self.kerb_ticket, self.extra_conf)) + + def _isfilestore(self): + """ + Return True if this is a Unix-style file store with directories. + """ + return True + + @implements(FileSystem.isdir) + def isdir(self, path): + return super().isdir(path) + + @implements(FileSystem.isfile) + def isfile(self, path): + return super().isfile(path) + + @implements(FileSystem.delete) + def delete(self, path, recursive=False): + return super().delete(path, recursive) + + def mkdir(self, path, **kwargs): + """ + Create directory in HDFS. + + Parameters + ---------- + path : str + Directory path to create, including any parent directories. + + Notes + ----- + libhdfs does not support create_parents=False, so we ignore this here + """ + return super().mkdir(path) + + @implements(FileSystem.rename) + def rename(self, path, new_path): + return super().rename(path, new_path) + + @implements(FileSystem.exists) + def exists(self, path): + return super().exists(path) + + def ls(self, path, detail=False): + """ + Retrieve directory contents and metadata, if requested. + + Parameters + ---------- + path : str + HDFS path to retrieve contents of. + detail : bool, default False + If False, only return list of paths. + + Returns + ------- + result : list of dicts (detail=True) or strings (detail=False) + """ + return super().ls(path, detail) + + def walk(self, top_path): + """ + Directory tree generator for HDFS, like os.walk. + + Parameters + ---------- + top_path : str + Root directory for tree traversal. + + Returns + ------- + Generator yielding 3-tuple (dirpath, dirnames, filename) + """ + contents = self.ls(top_path, detail=True) + + directories, files = _libhdfs_walk_files_dirs(top_path, contents) + yield top_path, directories, files + for dirname in directories: + yield from self.walk(self._path_join(top_path, dirname)) + + +def _maybe_set_hadoop_classpath(): + import re + + if re.search(r'hadoop-common[^/]+.jar', os.environ.get('CLASSPATH', '')): + return + + if 'HADOOP_HOME' in os.environ: + if sys.platform != 'win32': + classpath = _derive_hadoop_classpath() + else: + hadoop_bin = '{}/bin/hadoop'.format(os.environ['HADOOP_HOME']) + classpath = _hadoop_classpath_glob(hadoop_bin) + else: + classpath = _hadoop_classpath_glob('hadoop') + + os.environ['CLASSPATH'] = classpath.decode('utf-8') + + +def _derive_hadoop_classpath(): + import subprocess + + find_args = ('find', '-L', os.environ['HADOOP_HOME'], '-name', '*.jar') + find = subprocess.Popen(find_args, stdout=subprocess.PIPE) + xargs_echo = subprocess.Popen(('xargs', 'echo'), + stdin=find.stdout, + stdout=subprocess.PIPE) + jars = subprocess.check_output(('tr', "' '", "':'"), + stdin=xargs_echo.stdout) + hadoop_conf = os.environ["HADOOP_CONF_DIR"] \ + if "HADOOP_CONF_DIR" in os.environ \ + else os.environ["HADOOP_HOME"] + "/etc/hadoop" + return (hadoop_conf + ":").encode("utf-8") + jars + + +def _hadoop_classpath_glob(hadoop_bin): + import subprocess + + hadoop_classpath_args = (hadoop_bin, 'classpath', '--glob') + return subprocess.check_output(hadoop_classpath_args) + + +def _libhdfs_walk_files_dirs(top_path, contents): + files = [] + directories = [] + for c in contents: + scrubbed_name = posixpath.split(c['name'])[1] + if c['kind'] == 'file': + files.append(scrubbed_name) + else: + directories.append(scrubbed_name) + + return directories, files + + +def connect(host="default", port=0, user=None, kerb_ticket=None, + extra_conf=None): + """ + DEPRECATED: Connect to an HDFS cluster. + + All parameters are optional and should only be set if the defaults need + to be overridden. + + Authentication should be automatic if the HDFS cluster uses Kerberos. + However, if a username is specified, then the ticket cache will likely + be required. + + .. deprecated:: 2.0 + ``pyarrow.hdfs.connect`` is deprecated, + please use ``pyarrow.fs.HadoopFileSystem`` instead. + + Parameters + ---------- + host : NameNode. Set to "default" for fs.defaultFS from core-site.xml. + port : NameNode's port. Set to 0 for default or logical (HA) nodes. + user : Username when connecting to HDFS; None implies login user. + kerb_ticket : Path to Kerberos ticket cache. + extra_conf : dict, default None + extra Key/Value pairs for config; Will override any + hdfs-site.xml properties + + Notes + ----- + The first time you call this method, it will take longer than usual due + to JNI spin-up time. + + Returns + ------- + filesystem : HadoopFileSystem + """ + warnings.warn( + _DEPR_MSG.format("hdfs.connect", "2.0.0", "fs.HadoopFileSystem"), + FutureWarning, stacklevel=2 + ) + return _connect( + host=host, port=port, user=user, kerb_ticket=kerb_ticket, + extra_conf=extra_conf + ) + + +def _connect(host="default", port=0, user=None, kerb_ticket=None, + extra_conf=None): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + fs = HadoopFileSystem(host=host, port=port, user=user, + kerb_ticket=kerb_ticket, + extra_conf=extra_conf) + return fs |