diff options
Diffstat (limited to 'src/tools/cephfs/shell/cephfs-shell')
-rwxr-xr-x | src/tools/cephfs/shell/cephfs-shell | 1854 |
1 files changed, 1854 insertions, 0 deletions
diff --git a/src/tools/cephfs/shell/cephfs-shell b/src/tools/cephfs/shell/cephfs-shell new file mode 100755 index 000000000..58884a275 --- /dev/null +++ b/src/tools/cephfs/shell/cephfs-shell @@ -0,0 +1,1854 @@ +#!/usr/bin/python3 +# coding = utf-8 + +import argparse +import os +import os.path +import sys +import cephfs as libcephfs +import shutil +import traceback +import colorama +import fnmatch +import math +import re +import shlex +import stat +import errno + +from distutils.version import LooseVersion + +from cmd2 import Cmd +from cmd2 import __version__ as cmd2_version +# XXX: In cmd2 versions < 1.0.1, we'll get SystemExit(2) instead of +# Cmd2ArgparseError +if LooseVersion(cmd2_version) >= LooseVersion("1.0.1"): + from cmd2.exceptions import Cmd2ArgparseError +else: + # HACK: so that we don't have check for version everywhere + # Cmd2ArgparseError is used. + class Cmd2ArgparseError: + pass + +if sys.version_info.major < 3: + raise RuntimeError("cephfs-shell is only compatible with python3") + +try: + from cmd2 import with_argparser +except ImportError: + def with_argparser(argparser): + import functools + + def argparser_decorator(func): + @functools.wraps(func) + def wrapper(thiz, cmdline): + if isinstance(cmdline, list): + arglist = cmdline + else: + # do not split if it's already a list + arglist = shlex.split(cmdline, posix=False) + # in case user quotes the command args + arglist = [arg.strip('\'""') for arg in arglist] + try: + args = argparser.parse_args(arglist) + except SystemExit: + shell.exit_code = 1 + # argparse exits at seeing bad arguments + return + else: + return func(thiz, args) + argparser.prog = func.__name__[3:] + if argparser.description is None and func.__doc__: + argparser.description = func.__doc__ + + return wrapper + + return argparser_decorator + + +cephfs = None # holds CephFS Python bindings +shell = None # holds instance of class CephFSShell +exit_codes = {'Misc': 1, + 'KeyboardInterrupt': 2, + errno.EPERM: 3, + errno.EACCES: 4, + errno.ENOENT: 5, + errno.EIO: 6, + errno.ENOSPC: 7, + errno.EEXIST: 8, + errno.ENODATA: 9, + errno.EINVAL: 10, + errno.EOPNOTSUPP: 11, + errno.ERANGE: 12, + errno.EWOULDBLOCK: 13, + errno.ENOTEMPTY: 14, + errno.ENOTDIR: 15, + errno.EDQUOT: 16, + errno.EPIPE: 17, + errno.ESHUTDOWN: 18, + errno.ECONNABORTED: 19, + errno.ECONNREFUSED: 20, + errno.ECONNRESET: 21, + errno.EINTR: 22, + errno.EISDIR: 23} + + +######################################################################### +# +# Following are methods are generically useful through class CephFSShell +# +####################################################################### + + +def poutput(s, end='\n'): + shell.poutput(s, end=end) + + +def perror(msg, **kwargs): + shell.perror(msg, **kwargs) + + +def set_exit_code_msg(errcode='Misc', msg=''): + """ + Set exit code and print error message + """ + if isinstance(msg, libcephfs.Error): + shell.exit_code = exit_codes[msg.get_error_code()] + else: + shell.exit_code = exit_codes[errcode] + if msg: + perror(msg) + + +def mode_notation(mode): + """ + """ + permission_bits = {'0': '---', + '1': '--x', + '2': '-w-', + '3': '-wx', + '4': 'r--', + '5': 'r-x', + '6': 'rw-', + '7': 'rwx'} + mode = str(oct(mode)) + notation = '-' + if mode[2] == '4': + notation = 'd' + elif mode[2:4] == '12': + notation = 'l' + for i in mode[-3:]: + notation += permission_bits[i] + return notation + + +def get_chunks(file_size): + chunk_start = 0 + chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size + while chunk_start + chunk_size < file_size: + yield chunk_start, chunk_size + chunk_start += chunk_size + final_chunk_size = file_size - chunk_start + yield chunk_start, final_chunk_size + + +def to_bytes(param): + # don't convert as follows as it can lead unusable results like converting + # [1, 2, 3, 4] to '[1, 2, 3, 4]' - + # str(param).encode('utf-8') + if isinstance(param, bytes): + return param + elif isinstance(param, str): + return bytes(param, encoding='utf-8') + elif isinstance(param, list): + return [i.encode('utf-8') if isinstance(i, str) else to_bytes(i) for + i in param] + elif isinstance(param, int) or isinstance(param, float): + return str(param).encode('utf-8') + elif param is None: + return None + + +def ls(path, opts=''): + # opts tries to be like /bin/ls opts + almost_all = 'A' in opts + try: + with cephfs.opendir(path) as d: + while True: + dent = cephfs.readdir(d) + if dent is None: + return + elif almost_all and dent.d_name in (b'.', b'..'): + continue + yield dent + except libcephfs.ObjectNotFound as e: + set_exit_code_msg(msg=e) + + +def glob(path, pattern): + paths = [] + parent_dir = os.path.dirname(path) + if parent_dir == b'': + parent_dir = b'/' + if path == b'/' or is_dir_exists(os.path.basename(path), parent_dir): + for i in ls(path, opts='A'): + if fnmatch.fnmatch(i.d_name, pattern): + paths.append(os.path.join(path, i.d_name)) + return paths + + +def locate_file(name, case_sensitive=True): + dir_list = sorted(set(dirwalk(cephfs.getcwd()))) + if not case_sensitive: + return [dname for dname in dir_list if name.lower() in dname.lower()] + else: + return [dname for dname in dir_list if name in dname] + + +def get_all_possible_paths(pattern): + complete_pattern = pattern[:] + paths = [] + is_rel_path = not os.path.isabs(pattern) + if is_rel_path: + dir_ = cephfs.getcwd() + else: + dir_ = b'/' + pattern = pattern[1:] + patterns = pattern.split(b'/') + paths.extend(glob(dir_, patterns[0])) + patterns.pop(0) + for pattern in patterns: + for path in paths: + paths.extend(glob(path, pattern)) + if is_rel_path: + complete_pattern = os.path.join(cephfs.getcwd(), complete_pattern) + return [path for path in paths if fnmatch.fnmatch(path, complete_pattern)] + + +suffixes = ['B', 'K', 'M', 'G', 'T', 'P'] + + +def humansize(nbytes): + i = 0 + while nbytes >= 1024 and i < len(suffixes) - 1: + nbytes /= 1024. + i += 1 + nbytes = math.ceil(nbytes) + f = ('%d' % nbytes).rstrip('.') + return '%s%s' % (f, suffixes[i]) + + +def style_listing(path, is_dir, is_symlink, ls_long=False): + if not (is_dir or is_symlink): + return path + pretty = colorama.Style.BRIGHT + if is_symlink: + pretty += colorama.Fore.CYAN + path + if ls_long: + # Add target path + pretty += ' -> ' + cephfs.readlink(path, size=255).decode('utf-8') + elif is_dir: + pretty += colorama.Fore.BLUE + path + '/' + pretty += colorama.Style.RESET_ALL + return pretty + + +def print_long(path, is_dir, is_symlink, human_readable): + info = cephfs.stat(path, follow_symlink=(not is_symlink)) + pretty = style_listing(os.path.basename(path.decode('utf-8')), is_dir, is_symlink, True) + if human_readable: + sizefmt = '\t {:10s}'.format(humansize(info.st_size)) + else: + sizefmt = '{:12d}'.format(info.st_size) + poutput(f'{mode_notation(info.st_mode)} {sizefmt} {info.st_uid} {info.st_gid} {info.st_mtime}' + f' {pretty}') + + +def word_len(word): + """ + Returns the word length, minus any color codes. + """ + if word[0] == '\x1b': + return len(word) - 9 + return len(word) + + +def is_dir_exists(path, dir_=b''): + path_to_stat = os.path.join(dir_, path) + try: + return ((cephfs.stat(path_to_stat).st_mode & 0o0040000) != 0) + except libcephfs.Error: + return False + + +def is_file_exists(path, dir_=b''): + try: + # if its not a directory, then its a file + return ((cephfs.stat(os.path.join(dir_, path)).st_mode & 0o0040000) == 0) + except libcephfs.Error: + return False + + +def print_list(words, termwidth=79): + if not words: + return + words = [word.decode('utf-8') if isinstance(word, bytes) else word for word in words] + width = max([word_len(word) for word in words]) + 2 + nwords = len(words) + ncols = max(1, (termwidth + 1) // (width + 1)) + nrows = (nwords + ncols - 1) // ncols + for row in range(nrows): + for i in range(row, nwords, nrows): + word = words[i] + print_width = width + if word[0] == '\x1b': + print_width = print_width + 10 + + poutput('%-*s' % (print_width, words[i]), + end='\n' if i + nrows >= nwords else '') + + +def copy_from_local(local_path, remote_path): + stdin = -1 + file_ = None + fd = None + convert_to_bytes = False + if local_path == b'-': + file_ = sys.stdin + convert_to_bytes = True + else: + try: + file_ = open(local_path, 'rb') + except PermissionError as e: + set_exit_code_msg(e.errno, 'error: no permission to read local file {}'.format( + local_path.decode('utf-8'))) + return + stdin = 1 + try: + fd = cephfs.open(remote_path, 'w', 0o666) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + return + progress = 0 + while True: + data = file_.read(65536) + if not data or len(data) == 0: + break + if convert_to_bytes: + data = to_bytes(data) + wrote = cephfs.write(fd, data, progress) + if wrote < 0: + break + progress += wrote + cephfs.close(fd) + if stdin > 0: + file_.close() + poutput('') + + +def copy_to_local(remote_path, local_path): + fd = None + if local_path != b'-': + local_dir = os.path.dirname(local_path) + dir_list = remote_path.rsplit(b'/', 1) + if not os.path.exists(local_dir): + os.makedirs(local_dir) + if len(dir_list) > 2 and dir_list[1] == b'': + return + fd = open(local_path, 'wb+') + file_ = cephfs.open(remote_path, 'r') + file_size = cephfs.stat(remote_path).st_size + if file_size <= 0: + return + progress = 0 + for chunk_start, chunk_size in get_chunks(file_size): + file_chunk = cephfs.read(file_, chunk_start, chunk_size) + progress += len(file_chunk) + if fd: + fd.write(file_chunk) + else: + poutput(file_chunk.decode('utf-8')) + cephfs.close(file_) + if fd: + fd.close() + + +def dirwalk(path): + """ + walk a directory tree, using a generator + """ + path = os.path.normpath(path) + for item in ls(path, opts='A'): + fullpath = os.path.join(path, item.d_name) + src_path = fullpath.rsplit(b'/', 1)[0] + + yield os.path.normpath(fullpath) + if is_dir_exists(item.d_name, src_path): + for x in dirwalk(fullpath): + yield x + + +################################################################## +# +# Following methods are implementation for CephFS Shell commands +# +################################################################# + +class CephFSShell(Cmd): + + def __init__(self): + super().__init__() + self.working_dir = cephfs.getcwd().decode('utf-8') + self.set_prompt() + self.interactive = False + self.umask = '2' + + def default(self, line): + self.exit_code = 127 + perror('Unrecognized command') + + def set_prompt(self): + self.prompt = ('\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX + + self.working_dir + colorama.Style.RESET_ALL + + '\033[01;33m>>>\033[00m ') + + def create_argparser(self, command): + try: + argparse_args = getattr(self, 'argparse_' + command) + except AttributeError: + set_exit_code_msg() + return None + doc_lines = getattr( + self, 'do_' + command).__doc__.expandtabs().splitlines() + if '' in doc_lines: + blank_idx = doc_lines.index('') + usage = doc_lines[:blank_idx] + description = doc_lines[blank_idx + 1:] + else: + usage = doc_lines + description = [] + parser = argparse.ArgumentParser( + prog=command, + usage='\n'.join(usage), + description='\n'.join(description), + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + for args, kwargs in argparse_args: + parser.add_argument(*args, **kwargs) + return parser + + def complete_filenames(self, text, line, begidx, endidx): + if not text: + completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir()) + for x in ls(b".", opts='A')] + else: + if text.count('/') > 0: + completions = [text.rsplit('/', 1)[0] + '/' + + x.d_name.decode('utf-8') + '/' + * int(x.is_dir()) for x in ls('/' + + text.rsplit('/', 1)[0], opts='A') + if x.d_name.decode('utf-8').startswith( + text.rsplit('/', 1)[1])] + else: + completions = [x.d_name.decode('utf-8') + '/' + * int(x.is_dir()) for x in ls(b".", opts='A') + if x.d_name.decode('utf-8').startswith(text)] + if len(completions) == 1 and completions[0][-1] == '/': + dir_, file_ = completions[0].rsplit('/', 1) + completions.extend([dir_ + '/' + x.d_name.decode('utf-8') + + '/' * int(x.is_dir()) for x in + ls('/' + dir_, opts='A') + if x.d_name.decode('utf-8').startswith(file_)]) + return self.delimiter_complete(text, line, begidx, endidx, completions, '/') + return completions + + def onecmd(self, line, **kwargs): + """ + Global error catcher + """ + try: + res = Cmd.onecmd(self, line, **kwargs) + if self.interactive: + self.set_prompt() + return res + except ConnectionError as e: + set_exit_code_msg(e.errno, f'***\n{e}') + except KeyboardInterrupt: + set_exit_code_msg('KeyboardInterrupt', 'Command aborted') + except (libcephfs.Error, Exception) as e: + if shell.debug: + traceback.print_exc(file=sys.stdout) + if isinstance(e, Cmd2ArgparseError): + # NOTE: In case of Cmd2ArgparseError the error message is + # already printed beforehand (plus Cmd2ArgparseError + # instances have empty error message), so let's just set the + # exit code. + set_exit_code_msg(msg=None) + else: + set_exit_code_msg(msg=f'{type(e).__name__}: {e}') + # In cmd2 versions < 1.1.0 we'll get SystemExit(2) instead of + # Cmd2ArgparseError + except SystemExit: + raise + + class path_to_bytes(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + values = to_bytes(values) + setattr(namespace, self.dest, values) + + # TODO: move the necessary contents from here to `class path_to_bytes`. + class get_list_of_bytes_path(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + values = to_bytes(values) + + if values == b'.': + values = cephfs.getcwd() + else: + for i in values: + if i == b'.': + values[values.index(i)] = cephfs.getcwd() + + setattr(namespace, self.dest, values) + + def complete_mkdir(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + class ModeAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + if nargs is not None and nargs != '?': + raise ValueError("more than one modes not allowed") + super().__init__(option_strings, dest, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + o_mode = 0 + res = None + try: + o_mode = int(values, base=8) + except ValueError: + res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', values) + if res is None: + parser.error(f"invalid mode: {values}\n" + "mode must be a numeric octal literal\n" + "or ((u?g?o?)|(a?))(=)(r?w?x?)") + else: + # we are supporting only assignment of mode and not + or - + # as is generally available with the chmod command + # eg. + # >>> res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', 'go=') + # >>> res.groups() + # ('go', 'go', None, '=', '') + val = res.groups() + + if val[3] != '=': + parser.error("need assignment operator between user " + "and mode specifiers") + if val[4] == '': + parser.error(f"invalid mode: {values}\n" + "mode must be combination of: r | w | x") + users = '' + if val[2] is None: + users = val[1] + else: + users = val[2] + + t_mode = 0 + if users == 'a': + users = 'ugo' + + if 'r' in val[4]: + t_mode |= 4 + if 'w' in val[4]: + t_mode |= 2 + if 'x' in val[4]: + t_mode |= 1 + + if 'u' in users: + o_mode |= (t_mode << 6) + if 'g' in users: + o_mode |= (t_mode << 3) + if 'o' in users: + o_mode |= t_mode + + if o_mode < 0: + parser.error(f"invalid mode: {values}\n" + "mode cannot be negative") + if o_mode > 0o7777: + parser.error(f"invalid mode: {values}\n" + "mode cannot be greater than octal 07777") + + setattr(namespace, self.dest, str(oct(o_mode))) + + mkdir_parser = argparse.ArgumentParser( + description='Create the directory(ies), if they do not already exist.') + mkdir_parser.add_argument('dirs', type=str, + action=path_to_bytes, + metavar='DIR_NAME', + help='Name of new_directory.', + nargs='+') + mkdir_parser.add_argument('-m', '--mode', type=str, + action=ModeAction, + help='Sets the access mode for the new directory.') + mkdir_parser.add_argument('-p', '--parent', action='store_true', + help='Create parent directories as necessary. ' + 'When this option is specified, no error is' + 'reported if a directory already exists.') + + @with_argparser(mkdir_parser) + def do_mkdir(self, args): + """ + Create directory. + """ + for path in args.dirs: + if args.mode: + permission = int(args.mode, 8) + else: + permission = 0o777 + if args.parent: + cephfs.mkdirs(path, permission) + else: + try: + cephfs.mkdir(path, permission) + except libcephfs.Error as e: + set_exit_code_msg(e) + + def complete_put(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + index_dict = {1: self.path_complete} + return self.index_based_complete(text, line, begidx, endidx, index_dict) + + put_parser = argparse.ArgumentParser( + description='Copy a file/directory to Ceph File System from Local File System.') + put_parser.add_argument('local_path', type=str, action=path_to_bytes, + help='Path of the file in the local system') + put_parser.add_argument('remote_path', type=str, action=path_to_bytes, + help='Path of the file in the remote system') + put_parser.add_argument('-f', '--force', action='store_true', + help='Overwrites the destination if it already exists.') + + @with_argparser(put_parser) + def do_put(self, args): + """ + Copy a local file/directory to CephFS. + """ + if args.local_path != b'-' and not os.path.isfile(args.local_path) \ + and not os.path.isdir(args.local_path): + set_exit_code_msg(errno.ENOENT, + msg=f"error: " + f"{args.local_path.decode('utf-8')}: " + f"No such file or directory") + return + + if (is_file_exists(args.remote_path) or is_dir_exists( + args.remote_path)) and not args.force: + set_exit_code_msg(msg=f"error: file/directory " + f"{args.remote_path.decode('utf-8')} " + f"exists, use --force to overwrite") + return + + root_src_dir = args.local_path + root_dst_dir = args.remote_path + if args.local_path == b'.' or args.local_path == b'./': + root_src_dir = os.getcwdb() + elif len(args.local_path.rsplit(b'/', 1)) < 2: + root_src_dir = os.path.join(os.getcwdb(), args.local_path) + else: + p = args.local_path.split(b'/') + if p[0] == b'.': + root_src_dir = os.getcwdb() + p.pop(0) + while len(p) > 0: + root_src_dir += b'/' + p.pop(0) + + if root_dst_dir == b'.': + if args.local_path != b'-': + root_dst_dir = root_src_dir.rsplit(b'/', 1)[1] + if root_dst_dir == b'': + root_dst_dir = root_src_dir.rsplit(b'/', 1)[0] + a = root_dst_dir.rsplit(b'/', 1) + if len(a) > 1: + root_dst_dir = a[1] + else: + root_dst_dir = a[0] + else: + set_exit_code_msg(errno.EINVAL, 'error: no filename specified ' + 'for destination') + return + + if root_dst_dir[-1] != b'/': + root_dst_dir += b'/' + + if args.local_path == b'-' or os.path.isfile(root_src_dir): + if args.local_path == b'-': + root_src_dir = b'-' + copy_from_local(root_src_dir, root_dst_dir) + else: + for src_dir, dirs, files in os.walk(root_src_dir): + if isinstance(src_dir, str): + src_dir = to_bytes(src_dir) + dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1) + dst_dir = re.sub(rb'\/+', b'/', cephfs.getcwd() + + dst_dir) + if args.force and dst_dir != b'/' and not is_dir_exists( + dst_dir[:-1]) and not locate_file(dst_dir): + try: + cephfs.mkdirs(dst_dir, 0o777) + except libcephfs.Error: + pass + if (not args.force) and dst_dir != b'/' and not is_dir_exists( + dst_dir) and not os.path.isfile(root_src_dir): + try: + cephfs.mkdirs(dst_dir, 0o777) + except libcephfs.Error: + # TODO: perhaps, set retval to 1? + pass + + for dir_ in dirs: + dir_name = os.path.join(dst_dir, dir_) + if not is_dir_exists(dir_name): + try: + cephfs.mkdirs(dir_name, 0o777) + except libcephfs.Error: + # TODO: perhaps, set retval to 1? + pass + + for file_ in files: + src_file = os.path.join(src_dir, file_) + dst_file = re.sub(rb'\/+', b'/', b'/' + dst_dir + b'/' + file_) + if (not args.force) and is_file_exists(dst_file): + return + copy_from_local(src_file, os.path.join(cephfs.getcwd(), + dst_file)) + + def complete_get(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + get_parser = argparse.ArgumentParser( + description='Copy a file from Ceph File System to Local Directory.') + get_parser.add_argument('remote_path', type=str, action=path_to_bytes, + help='Path of the file in the remote system') + get_parser.add_argument('local_path', type=str, action=path_to_bytes, + help='Path of the file in the local system') + get_parser.add_argument('-f', '--force', action='store_true', + help='Overwrites the destination if it already exists.') + + @with_argparser(get_parser) + def do_get(self, args): + """ + Copy a file/directory from CephFS to given path. + """ + if not is_file_exists(args.remote_path) and not \ + is_dir_exists(args.remote_path): + set_exit_code_msg(errno.ENOENT, "error: no file/directory" + " found at specified remote " + "path") + return + if (os.path.isfile(args.local_path) or os.path.isdir( + args.local_path)) and not args.force: + set_exit_code_msg(msg=f"error: file/directory " + f"{args.local_path.decode('utf-8')}" + f" already exists, use --force to " + f"overwrite") + return + root_src_dir = args.remote_path + root_dst_dir = args.local_path + fname = root_src_dir.rsplit(b'/', 1) + if args.local_path == b'.': + root_dst_dir = os.getcwdb() + if args.remote_path == b'.': + root_src_dir = cephfs.getcwd() + if args.local_path == b'-': + if args.remote_path == b'.' or args.remote_path == b'./': + set_exit_code_msg(errno.EINVAL, 'error: no remote file name specified') + return + copy_to_local(root_src_dir, b'-') + elif is_file_exists(args.remote_path): + copy_to_local(root_src_dir, root_dst_dir) + elif b'/' in root_src_dir and is_file_exists(fname[1], fname[0]): + copy_to_local(root_src_dir, root_dst_dir) + else: + files = list(reversed(sorted(dirwalk(root_src_dir)))) + for file_ in files: + dst_dirpath, dst_file = file_.rsplit(b'/', 1) + if dst_dirpath in files: + files.remove(dst_dirpath) + dst_path = os.path.join(root_dst_dir, dst_dirpath, dst_file) + dst_path = os.path.normpath(dst_path) + if is_dir_exists(file_): + try: + os.makedirs(dst_path) + except OSError: + pass + else: + copy_to_local(file_, dst_path) + + return 0 + + def complete_ln(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + ln_parser = argparse.ArgumentParser( + description='Add a hard link to an existing file or create a symbolic ' + 'link to an existing file or directory.') + ln_parser.add_argument('target', type=str, action=path_to_bytes, + help='File/Directory of which link is ' + 'to be created') + ln_parser.add_argument('link_name', type=str, action=path_to_bytes, + help='Link to target with the name link_name', + nargs='?') + ln_parser.add_argument('-s', '--symbolic', action='store_true', + help='Create symbolic link') + ln_parser.add_argument('-v', '--verbose', action='store_true', + help='Print name of each linked file') + ln_parser.add_argument('-f', '--force', action='store_true', + help='Force create link/symbolic link') + + @with_argparser(ln_parser) + def do_ln(self, args): + if not is_file_exists(args.target) \ + and not is_dir_exists(args.target): + set_exit_code_msg(errno.ENOENT, + msg=f"ln: failed to access " + f"'{args.target.decode('utf-8')}" + f"': No such file or directory") + return + + is_a_dir = False + if is_dir_exists(args.target): + is_a_dir = True + + target_last_char_slash = False + if args.target.decode('utf-8')[len(args.target) - 1] == '/': + target_last_char_slash = True + + link_name = '' + + if args.link_name is None: + if target_last_char_slash is True: + if is_dir_exists(args.target): + pass + else: + set_exit_code_msg(errno.ENOTDIR, + f"ln: failed to access " + f"'{args.target.decode('utf-8')}': " + f"Not a directory") + return + link_name = os.path.join(cephfs.getcwd(), + os.path.basename( + os.path.normpath(args.target))) + if (is_file_exists(link_name) or is_dir_exists( + link_name)) and not args.force: + set_exit_code_msg(errno.ENOENT, + msg=f"ln: failed to create link " + f"{link_name.decode('utf-8')}: " + f"exists") + return + else: + if is_dir_exists(args.link_name): + dest = args.link_name.decode('utf-8').rstrip('/') + dest_first_half = dest.encode('utf-8') + b'/' + if is_file_exists(args.target): + if target_last_char_slash is True: + set_exit_code_msg(errno.ENOTDIR, + "ln: failed to access " + f"'{args.target.decode('utf-8')}': " + "Not a directory") + return + dest_file = os.path.basename(os.path.normpath(args.target)) + link_name = dest_first_half + dest_file + + elif is_dir_exists(args.target): + dest_dir = os.path.basename(os.path.normpath(args.target)) + link_name = dest_first_half + dest_dir + + else: + # if the destination is not a file or a dir then: + # accept it as file so the end part of path cannot have + # a `/` succeeding it. + test_path = args.link_name.decode('utf-8') + if test_path[len(test_path) - 1] == '/': + set_exit_code_msg(errno.ENOENT, f"'{test_path}': " + f"No such file or " + f"directory") + return + else: + link_name = test_path.encode('utf-8') + + if args.force: + try: + cephfs.lstat(os.path.join(b'', link_name)) + if not is_a_dir or (is_a_dir and args.symbolic): + cephfs.unlink(link_name) + except libcephfs.ObjectNotFound: + pass + + try: + if args.symbolic: + cephfs.symlink(args.target, link_name) + else: + if is_a_dir: + set_exit_code_msg(errno.EPERM, + f"ln: {args.target.decode('utf-8')}: " + "hard link not allowed for directory") + return + cephfs.link(args.target, link_name) + except libcephfs.Error as e: + set_exit_code_msg(msg=str(e)) + return + + if args.verbose: + poutput(f"{link_name.decode('utf-8')} ->" + f" {args.target.decode('utf-8')}") + + def complete_ls(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + ls_parser = argparse.ArgumentParser( + description='Copy a file from Ceph File System from Local Directory.') + ls_parser.add_argument('-l', '--long', action='store_true', + help='Detailed list of items in the directory.') + ls_parser.add_argument('-r', '--reverse', action='store_true', + help='Reverse order of listing items in the directory.') + ls_parser.add_argument('-H', action='store_true', help='Human Readable') + ls_parser.add_argument('-a', '--all', action='store_true', + help='Do not Ignore entries starting with .') + ls_parser.add_argument('-S', action='store_true', help='Sort by file_size') + ls_parser.add_argument('paths', help='Name of Directories', + action=path_to_bytes, nargs='*', default=['.']) + + @with_argparser(ls_parser) + def do_ls(self, args): + """ + List all the files and directories in the current working directory + """ + paths = args.paths + for path in paths: + values = [] + items = [] + try: + if path.count(b'*') > 0: + all_items = get_all_possible_paths(path) + if len(all_items) == 0: + continue + path = all_items[0].rsplit(b'/', 1)[0] + if path == b'': + path = b'/' + dirs = [] + for i in all_items: + for item in ls(path): + d_name = item.d_name + if os.path.basename(i) == d_name: + if item.is_dir(): + dirs.append(os.path.join(path, d_name)) + else: + items.append(item) + if dirs: + paths.extend(dirs) + else: + poutput(path.decode('utf-8'), end=':\n') + items = sorted(items, key=lambda item: item.d_name) + else: + if path != b'' and path != cephfs.getcwd() and len(paths) > 1: + poutput(path.decode('utf-8'), end=':\n') + items = sorted(ls(path), key=lambda item: item.d_name) + if not args.all: + items = [i for i in items if not i.d_name.startswith(b'.')] + if args.S: + items = sorted(items, key=lambda item: cephfs.stat( + path + b'/' + item.d_name, follow_symlink=( + not item.is_symbol_file())).st_size) + if args.reverse: + items = reversed(items) + for item in items: + filepath = item.d_name + is_dir = item.is_dir() + is_sym_lnk = item.is_symbol_file() + try: + if args.long and args.H: + print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir, + is_sym_lnk, True) + elif args.long: + print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir, + is_sym_lnk, False) + elif is_sym_lnk or is_dir: + values.append(style_listing(filepath.decode('utf-8'), is_dir, + is_sym_lnk)) + else: + values.append(filepath) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + if not args.long: + print_list(values, shutil.get_terminal_size().columns) + if path != paths[-1]: + poutput('') + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + + def complete_rmdir(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + rmdir_parser = argparse.ArgumentParser( + description='Remove the directory(ies), if they are empty.') + rmdir_parser.add_argument('paths', help='Directory Path(s)', nargs='+', + action=path_to_bytes) + rmdir_parser.add_argument('-p', '--parent', action='store_true', + help="remove directory and its ancestors; " + "e.g., 'rmdir -p a/b/c' is similar to " + "'rmdir a/b/c a/b a'") + + @with_argparser(rmdir_parser) + def do_rmdir(self, args): + self.do_rmdir_helper(args) + + def do_rmdir_helper(self, args): + """ + Remove a specific Directory + """ + is_pattern = False + paths = args.paths + for path in paths: + if path.count(b'*') > 0: + is_pattern = True + all_items = get_all_possible_paths(path) + if len(all_items) > 0: + path = all_items[0].rsplit(b'/', 1)[0] + if path == b'': + path = b'/' + dirs = [] + for i in all_items: + for item in ls(path): + d_name = item.d_name + if os.path.basename(i) == d_name: + if item.is_dir(): + dirs.append(os.path.join(path, d_name)) + paths.extend(dirs) + continue + else: + is_pattern = False + + if args.parent: + path = os.path.join(cephfs.getcwd(), path.rsplit(b'/')[0]) + files = list(sorted(set(dirwalk(path)), reverse=True)) + if not files: + path = b'.' + for filepath in files: + try: + cephfs.rmdir(os.path.normpath(filepath)) + except libcephfs.Error as e: + perror(e) + path = b'.' + break + else: + path = os.path.normpath(os.path.join(cephfs.getcwd(), path)) + if not is_pattern and path != os.path.normpath(b''): + try: + cephfs.rmdir(path) + except libcephfs.Error as e: + if e.get_error_code() == 2: + set_exit_code_msg(e.get_error_code(), + "rmdir: failed to remove " + f"{path.decode('utf-8')}: " + "No such file or directory") + elif e.get_error_code() == 20: + set_exit_code_msg(e.get_error_code(), + "rmdir: failed to remove " + f"{path.decode('utf-8')}: " + "Not a directory") + elif e.get_error_code() == 39: + set_exit_code_msg(e.get_error_code(), + "rmdir: failed to remove " + f"{path.decode('utf-8')}: " + "Directory not empty") + else: + set_exit_code_msg(msg=e) + + def complete_rm(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + rm_parser = argparse.ArgumentParser(description='Remove File.') + rm_parser.add_argument('paths', help='File Path.', nargs='+', + action=path_to_bytes) + + @with_argparser(rm_parser) + def do_rm(self, args): + """ + Remove a specific file + """ + file_paths = args.paths + for path in file_paths: + if path.count(b'*') > 0: + file_paths.extend([i for i in get_all_possible_paths( + path) if is_file_exists(i)]) + else: + try: + cephfs.unlink(path) + except libcephfs.Error as e: + # NOTE: perhaps we need a better msg here + if e.get_error_code() == 2: + set_exit_code_msg(e.get_error_code(), + "rm: failed to remove " + f"{path.decode('utf-8')}: " + "No such file or directory") + elif e.get_error_code() == 21: + set_exit_code_msg(e.get_error_code(), + "rm: failed to remove " + f"{path.decode('utf-8')}: " + "Is a directory") + else: + set_exit_code_msg(msg=e) + + def complete_mv(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + mv_parser = argparse.ArgumentParser(description='Move File.') + mv_parser.add_argument('src_path', type=str, action=path_to_bytes, + help='Source File Path.') + mv_parser.add_argument('dest_path', type=str, action=path_to_bytes, + help='Destination File Path.') + + @with_argparser(mv_parser) + def do_mv(self, args): + """ + Rename a file or Move a file from source path to the destination + """ + cephfs.rename(args.src_path, args.dest_path) + + def complete_cd(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + cd_parser = argparse.ArgumentParser(description='Change working directory') + cd_parser.add_argument('path', type=str, help='Name of the directory.', + action=path_to_bytes, nargs='?', default='/') + + @with_argparser(cd_parser) + def do_cd(self, args): + """ + Change working directory + """ + cephfs.chdir(args.path) + self.working_dir = cephfs.getcwd().decode('utf-8') + self.set_prompt() + + def do_cwd(self, arglist): + """ + Get current working directory. + """ + poutput(cephfs.getcwd().decode('utf-8')) + + def complete_chmod(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + chmod_parser = argparse.ArgumentParser(description='Change permission of a file/directory.') + chmod_parser.add_argument('mode', type=str, action=ModeAction, help='Mode') + chmod_parser.add_argument('paths', type=str, action=path_to_bytes, + help='Path of the file/directory', nargs='+') + + @with_argparser(chmod_parser) + def do_chmod(self, args): + """ + Change permission of a file/directory + """ + for path in args.paths: + mode = int(args.mode, base=8) + try: + cephfs.chmod(path, mode) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + + def complete_cat(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + cat_parser = argparse.ArgumentParser(description='') + cat_parser.add_argument('paths', help='Name of Files', action=path_to_bytes, + nargs='+') + + @with_argparser(cat_parser) + def do_cat(self, args): + """ + Print contents of a file + """ + for path in args.paths: + if is_file_exists(path): + copy_to_local(path, b'-') + else: + set_exit_code_msg(errno.ENOENT, '{}: no such file'.format( + path.decode('utf-8'))) + + umask_parser = argparse.ArgumentParser(description='Set umask value.') + umask_parser.add_argument('mode', help='Mode', type=str, action=ModeAction, + nargs='?', default='') + + @with_argparser(umask_parser) + def do_umask(self, args): + """ + Set Umask value. + """ + if args.mode == '': + poutput(self.umask.zfill(4)) + else: + mode = int(args.mode, 8) + self.umask = str(oct(cephfs.umask(mode))[2:]) + + def complete_write(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + write_parser = argparse.ArgumentParser(description='Writes data into a file') + write_parser.add_argument('path', type=str, action=path_to_bytes, + help='Name of File') + + @with_argparser(write_parser) + def do_write(self, args): + """ + Write data into a file. + """ + + copy_from_local(b'-', args.path) + + def complete_lcd(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + index_dict = {1: self.path_complete} + return self.index_based_complete(text, line, begidx, endidx, index_dict) + + lcd_parser = argparse.ArgumentParser(description='') + lcd_parser.add_argument('path', type=str, action=path_to_bytes, help='Path') + + @with_argparser(lcd_parser) + def do_lcd(self, args): + """ + Moves into the given local directory + """ + try: + os.chdir(os.path.expanduser(args.path)) + except OSError as e: + set_exit_code_msg(e.errno, "Cannot change to " + f"{e.filename.decode('utf-8')}: {e.strerror}") + + def complete_lls(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + index_dict = {1: self.path_complete} + return self.index_based_complete(text, line, begidx, endidx, index_dict) + + lls_parser = argparse.ArgumentParser( + description='List files in local system.') + lls_parser.add_argument('paths', help='Paths', action=path_to_bytes, + nargs='*') + + @with_argparser(lls_parser) + def do_lls(self, args): + """ + Lists all files and folders in the current local directory + """ + if not args.paths: + print_list(os.listdir(os.getcwdb())) + else: + for path in args.paths: + try: + items = os.listdir(path) + poutput("{}:".format(path.decode('utf-8'))) + print_list(items) + except OSError as e: + set_exit_code_msg(e.errno, f"{e.filename.decode('utf-8')}: " + f"{e.strerror}") + # Arguments to the with_argpaser decorator function are sticky. + # The items in args.path do not get overwritten in subsequent calls. + # The arguments remain in args.paths after the function exits and we + # need to clean it up to ensure the next call works as expected. + args.paths.clear() + + def do_lpwd(self, arglist): + """ + Prints the absolute path of the current local directory + """ + poutput(os.getcwd()) + + def complete_df(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + df_parser = argparse.ArgumentParser(description='Show information about\ + the amount of available disk space') + df_parser.add_argument('file', help='Name of the file', nargs='*', + default=['.'], action=path_to_bytes) + + @with_argparser(df_parser) + def do_df(self, arglist): + """ + Display the amount of available disk space for file systems + """ + header = True # Set to true for printing header only once + if b'.' == arglist.file[0]: + arglist.file = ls(b'.') + + for file in arglist.file: + if isinstance(file, libcephfs.DirEntry): + file = file.d_name + if file == b'.' or file == b'..': + continue + try: + statfs = cephfs.statfs(file) + stat = cephfs.stat(file) + block_size = (statfs['f_blocks'] * statfs['f_bsize']) // 1024 + available = block_size - stat.st_size + use = 0 + + if block_size > 0: + use = (stat.st_size * 100) // block_size + + if header: + header = False + poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format( + "1K-blocks", "Used", "Available", "Use%", + "Stored on")) + + poutput('{:d}\t{:18d}\t{:8d}\t{:10s} {}'.format(block_size, + stat.st_size, available, str(int(use)) + '%', + file.decode('utf-8'))) + except libcephfs.OSError as e: + set_exit_code_msg(e.get_error_code(), "could not statfs {}: {}".format( + file.decode('utf-8'), e.strerror)) + + locate_parser = argparse.ArgumentParser( + description='Find file within file system') + locate_parser.add_argument('name', help='name', type=str, + action=path_to_bytes) + locate_parser.add_argument('-c', '--count', action='store_true', + help='Count list of items located.') + locate_parser.add_argument( + '-i', '--ignorecase', action='store_true', help='Ignore case') + + @with_argparser(locate_parser) + def do_locate(self, args): + """ + Find a file within the File System + """ + if args.name.count(b'*') == 1: + if args.name[0] == b'*': + args.name += b'/' + elif args.name[-1] == '*': + args.name = b'/' + args.name + args.name = args.name.replace(b'*', b'') + if args.ignorecase: + locations = locate_file(args.name, False) + else: + locations = locate_file(args.name) + if args.count: + poutput(len(locations)) + else: + poutput((b'\n'.join(locations)).decode('utf-8')) + + def complete_du(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + du_parser = argparse.ArgumentParser( + description='Disk Usage of a Directory') + du_parser.add_argument('paths', type=str, action=get_list_of_bytes_path, + help='Name of the directory.', nargs='*', + default=[b'.']) + du_parser.add_argument('-r', action='store_true', + help='Recursive Disk usage of all directories.') + + @with_argparser(du_parser) + def do_du(self, args): + """ + Print disk usage of a given path(s). + """ + def print_disk_usage(files): + if isinstance(files, bytes): + files = (files, ) + + for f in files: + try: + st = cephfs.lstat(f) + + if stat.S_ISDIR(st.st_mode): + dusage = int(cephfs.getxattr(f, + 'ceph.dir.rbytes').decode('utf-8')) + else: + dusage = st.st_size + + # print path in local context + f = os.path.normpath(f) + if f[0] is ord('/'): + f = b'.' + f + poutput('{:10s} {}'.format(humansize(dusage), + f.decode('utf-8'))) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + continue + + for path in args.paths: + if args.r: + print_disk_usage(sorted(set(dirwalk(path)).union({path}))) + else: + print_disk_usage(path) + + quota_parser = argparse.ArgumentParser( + description='Quota management for a Directory') + quota_parser.add_argument('op', choices=['get', 'set'], + help='Quota operation type.') + quota_parser.add_argument('path', type=str, action=path_to_bytes, + help='Name of the directory.') + quota_parser.add_argument('--max_bytes', type=int, default=-1, nargs='?', + help='Max cumulative size of the data under ' + 'this directory.') + quota_parser.add_argument('--max_files', type=int, default=-1, nargs='?', + help='Total number of files under this ' + 'directory tree.') + + @with_argparser(quota_parser) + def do_quota(self, args): + """ + Quota management. + """ + if not is_dir_exists(args.path): + set_exit_code_msg(errno.ENOENT, 'error: no such directory {}'.format( + args.path.decode('utf-8'))) + return + + if args.op == 'set': + if (args.max_bytes == -1) and (args.max_files == -1): + set_exit_code_msg(errno.EINVAL, 'please specify either ' + '--max_bytes or --max_files or both') + return + + if args.max_bytes >= 0: + max_bytes = to_bytes(str(args.max_bytes)) + try: + cephfs.setxattr(args.path, 'ceph.quota.max_bytes', + max_bytes, os.XATTR_CREATE) + poutput('max_bytes set to %d' % args.max_bytes) + except libcephfs.Error as e: + cephfs.setxattr(args.path, 'ceph.quota.max_bytes', + max_bytes, os.XATTR_REPLACE) + set_exit_code_msg(e.get_error_code(), 'max_bytes reset to ' + f'{args.max_bytes}') + + if args.max_files >= 0: + max_files = to_bytes(str(args.max_files)) + try: + cephfs.setxattr(args.path, 'ceph.quota.max_files', + max_files, os.XATTR_CREATE) + poutput('max_files set to %d' % args.max_files) + except libcephfs.Error as e: + cephfs.setxattr(args.path, 'ceph.quota.max_files', + max_files, os.XATTR_REPLACE) + set_exit_code_msg(e.get_error_code(), 'max_files reset to ' + f'{args.max_files}') + elif args.op == 'get': + max_bytes = '0' + max_files = '0' + try: + max_bytes = cephfs.getxattr(args.path, 'ceph.quota.max_bytes') + poutput('max_bytes: {}'.format(max_bytes.decode('utf-8'))) + except libcephfs.Error as e: + set_exit_code_msg(e.get_error_code(), 'max_bytes is not set') + + try: + max_files = cephfs.getxattr(args.path, 'ceph.quota.max_files') + poutput('max_files: {}'.format(max_files.decode('utf-8'))) + except libcephfs.Error as e: + set_exit_code_msg(e.get_error_code(), 'max_files is not set') + + snap_parser = argparse.ArgumentParser(description='Snapshot Management') + snap_parser.add_argument('op', type=str, + help='Snapshot operation: create or delete') + snap_parser.add_argument('name', type=str, action=path_to_bytes, + help='Name of snapshot') + snap_parser.add_argument('dir', type=str, action=path_to_bytes, + help='Directory for which snapshot ' + 'needs to be created or deleted') + + @with_argparser(snap_parser) + def do_snap(self, args): + """ + Snapshot management for the volume + """ + # setting self.colors to None turns off colorizing and + # perror emits plain text + self.colors = None + + snapdir = '.snap' + conf_snapdir = cephfs.conf_get('client_snapdir') + if conf_snapdir is not None: + snapdir = conf_snapdir + snapdir = to_bytes(snapdir) + if args.op == 'create': + try: + if is_dir_exists(args.dir): + cephfs.mkdir(os.path.join(args.dir, snapdir, args.name), 0o755) + else: + set_exit_code_msg(errno.ENOENT, "'{}': no such directory".format( + args.dir.decode('utf-8'))) + except libcephfs.Error as e: + set_exit_code_msg(e.get_error_code(), + "snapshot '{}' already exists".format( + args.name.decode('utf-8'))) + elif args.op == 'delete': + snap_dir = os.path.join(args.dir, snapdir, args.name) + try: + if is_dir_exists(snap_dir): + newargs = argparse.Namespace(paths=[snap_dir], parent=False) + self.do_rmdir_helper(newargs) + else: + set_exit_code_msg(errno.ENOENT, "'{}': no such snapshot".format( + args.name.decode('utf-8'))) + except libcephfs.Error as e: + set_exit_code_msg(e.get_error_code(), "error while deleting " + "'{}'".format(snap_dir.decode('utf-8'))) + else: + set_exit_code_msg(errno.EINVAL, "snapshot can only be created or " + "deleted; check - help snap") + + def do_help(self, line): + """ + Get details about a command. + Usage: help <cmd> - for a specific command + help all - for all the commands + """ + if line == 'all': + for k in dir(self): + if k.startswith('do_'): + poutput('-' * 80) + super().do_help(k[3:]) + return + parser = self.create_argparser(line) + if parser: + parser.print_help() + else: + super().do_help(line) + + def complete_stat(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + stat_parser = argparse.ArgumentParser( + description='Display file or file system status') + stat_parser.add_argument('paths', type=str, help='file paths', + action=path_to_bytes, nargs='+') + + @with_argparser(stat_parser) + def do_stat(self, args): + """ + Display file or file system status + """ + for path in args.paths: + try: + stat = cephfs.stat(path) + atime = stat.st_atime.isoformat(' ') + mtime = stat.st_mtime.isoformat(' ') + ctime = stat.st_mtime.isoformat(' ') + + poutput("File: {}\nSize: {:d}\nBlocks: {:d}\nIO Block: {:d}\n" + "Device: {:d}\tInode: {:d}\tLinks: {:d}\nPermission: " + "{:o}/{}\tUid: {:d}\tGid: {:d}\nAccess: {}\nModify: " + "{}\nChange: {}".format(path.decode('utf-8'), + stat.st_size, stat.st_blocks, + stat.st_blksize, stat.st_dev, + stat.st_ino, stat.st_nlink, + stat.st_mode, + mode_notation(stat.st_mode), + stat.st_uid, stat.st_gid, atime, + mtime, ctime)) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + + setxattr_parser = argparse.ArgumentParser( + description='Set extended attribute for a file') + setxattr_parser.add_argument('path', type=str, action=path_to_bytes, help='Name of the file') + setxattr_parser.add_argument('name', type=str, help='Extended attribute name') + setxattr_parser.add_argument('value', type=str, help='Extended attribute value') + + @with_argparser(setxattr_parser) + def do_setxattr(self, args): + """ + Set extended attribute for a file + """ + val_bytes = to_bytes(args.value) + name_bytes = to_bytes(args.name) + try: + cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_CREATE) + poutput('{} is successfully set to {}'.format(args.name, args.value)) + except libcephfs.ObjectExists: + cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_REPLACE) + poutput('{} is successfully reset to {}'.format(args.name, args.value)) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + + getxattr_parser = argparse.ArgumentParser( + description='Get extended attribute set for a file') + getxattr_parser.add_argument('path', type=str, action=path_to_bytes, + help='Name of the file') + getxattr_parser.add_argument('name', type=str, help='Extended attribute name') + + @with_argparser(getxattr_parser) + def do_getxattr(self, args): + """ + Get extended attribute for a file + """ + try: + poutput('{}'.format(cephfs.getxattr(args.path, + to_bytes(args.name)).decode('utf-8'))) + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + + listxattr_parser = argparse.ArgumentParser( + description='List extended attributes set for a file') + listxattr_parser.add_argument('path', type=str, action=path_to_bytes, + help='Name of the file') + + @with_argparser(listxattr_parser) + def do_listxattr(self, args): + """ + List extended attributes for a file + """ + try: + size, xattr_list = cephfs.listxattr(args.path) + if size > 0: + poutput('{}'.format(xattr_list.replace(b'\x00', b' ').decode('utf-8'))) + else: + poutput('No extended attribute is set') + except libcephfs.Error as e: + set_exit_code_msg(msg=e) + + +####################################################### +# +# Following are methods that get cephfs-shell started. +# +##################################################### + +def setup_cephfs(args): + """ + Mounting a cephfs + """ + global cephfs + try: + cephfs = libcephfs.LibCephFS(conffile='') + cephfs.mount(filesystem_name=args.fs) + except libcephfs.ObjectNotFound as e: + print('couldn\'t find ceph configuration not found') + sys.exit(e.get_error_code()) + except libcephfs.Error as e: + print(e) + sys.exit(e.get_error_code()) + + +def str_to_bool(val): + """ + Return corresponding bool values for strings like 'true' or 'false'. + """ + if not isinstance(val, str): + return val + + val = val.replace('\n', '') + if val.lower() in ['true', 'yes']: + return True + elif val.lower() in ['false', 'no']: + return False + else: + return val + + +def read_shell_conf(shell, shell_conf_file): + import configparser + + sec = 'cephfs-shell' + opts = [] + if LooseVersion(cmd2_version) >= LooseVersion("0.10.0"): + for attr in shell.settables.keys(): + opts.append(attr) + else: + if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"): + # hardcoding options for 0.7.9 because - + # 1. we use cmd2 v0.7.9 with teuthology and + # 2. there's no way distinguish between a shell setting and shell + # object attribute until v0.10.0 + opts = ['abbrev', 'autorun_on_edit', 'colors', + 'continuation_prompt', 'debug', 'echo', 'editor', + 'feedback_to_output', 'locals_in_py', 'prompt', 'quiet', + 'timing'] + elif LooseVersion(cmd2_version) >= LooseVersion("0.9.23"): + opts.append('allow_style') + # no equivalent option was defined by cmd2. + else: + pass + + # default and only section in our conf file. + cp = configparser.ConfigParser(default_section=sec, strict=False) + cp.read(shell_conf_file) + for opt in opts: + if cp.has_option(sec, opt): + setattr(shell, opt, str_to_bool(cp.get(sec, opt))) + + +def get_shell_conffile_path(arg_conf=''): + conf_filename = 'cephfs-shell.conf' + env_var = 'CEPHFS_SHELL_CONF' + + arg_conf = '' if not arg_conf else arg_conf + home_dir_conf = os.path.expanduser('~/.' + conf_filename) + env_conf = os.environ[env_var] if env_var in os.environ else '' + + # here's the priority by which conf gets read. + for path in (arg_conf, env_conf, home_dir_conf): + if os.path.isfile(path): + return path + else: + return '' + + +def manage_args(): + main_parser = argparse.ArgumentParser(description='') + main_parser.add_argument('-b', '--batch', action='store', + help='Path to CephFS shell script/batch file' + 'containing CephFS shell commands', + type=str) + main_parser.add_argument('-c', '--config', action='store', + help='Path to Ceph configuration file.', + type=str) + main_parser.add_argument('-f', '--fs', action='store', + help='Name of filesystem to mount.', + type=str) + main_parser.add_argument('-t', '--test', action='store', + help='Test against transcript(s) in FILE', + nargs='+') + main_parser.add_argument('commands', nargs='*', help='Comma delimited ' + 'commands. The shell executes the given command ' + 'and quits immediately with the return value of ' + 'command. In case no commands are provided, the ' + 'shell is launched.', default=[]) + + args = main_parser.parse_args() + args.exe_and_quit = False # Execute and quit, don't launch the shell. + + if args.batch: + if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"): + args.commands = ['load ' + args.batch, ',quit'] + else: + args.commands = ['run_script ' + args.batch, ',quit'] + if args.test: + args.commands.extend(['-t,'] + [arg + ',' for arg in args.test]) + if not args.batch and len(args.commands) > 0: + args.exe_and_quit = True + + manage_sys_argv(args) + + return args + + +def manage_sys_argv(args): + exe = sys.argv[0] + sys.argv.clear() + sys.argv.append(exe) + sys.argv.extend([i.strip() for i in ' '.join(args.commands).split(',')]) + + setup_cephfs(args) + + +def execute_cmd_args(args): + """ + Launch a shell session if no arguments were passed, else just execute + the given argument as a shell command and exit the shell session + immediately at (last) command's termination with the (last) command's + return value. + """ + if not args.exe_and_quit: + return shell.cmdloop() + return execute_cmds_and_quit(args) + + +def execute_cmds_and_quit(args): + """ + Multiple commands might be passed separated by commas, feed onecmd() + one command at a time. + """ + # do_* methods triggered by cephfs-shell commands return None when they + # complete running successfully. Until 0.9.6, shell.onecmd() returned this + # value to indicate whether the execution of the commands should stop, but + # since 0.9.7 it returns the return value of do_* methods only if it's + # not None. When it is None it returns False instead of None. + if LooseVersion(cmd2_version) <= LooseVersion("0.9.6"): + stop_exec_val = None + else: + stop_exec_val = False + + args_to_onecmd = '' + if len(args.commands) <= 1: + args.commands = args.commands[0].split(' ') + for cmdarg in args.commands: + if ',' in cmdarg: + args_to_onecmd += ' ' + cmdarg[0:-1] + onecmd_retval = shell.onecmd(args_to_onecmd) + # if the current command failed, let's abort the execution of + # series of commands passed. + if onecmd_retval is not stop_exec_val: + return onecmd_retval + if shell.exit_code != 0: + return shell.exit_code + + args_to_onecmd = '' + continue + + args_to_onecmd += ' ' + cmdarg + return shell.onecmd(args_to_onecmd) + + +if __name__ == '__main__': + args = manage_args() + + shell = CephFSShell() + # TODO: perhaps, we should add an option to pass ceph.conf? + read_shell_conf(shell, get_shell_conffile_path(args.config)) + # XXX: setting shell.exit_code to zero so that in case there are no errors + # and exceptions, it is not set by any method or function of cephfs-shell + # and return values from shell.cmdloop() or shell.onecmd() is not an + # integer, we can treat it as the return value of cephfs-shell. + shell.exit_code = 0 + + retval = execute_cmd_args(args) + sys.exit(retval if retval else shell.exit_code) |