diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/tools/cephfs/cephfs-shell | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/tools/cephfs/cephfs-shell | 1295 |
1 files changed, 1295 insertions, 0 deletions
diff --git a/src/tools/cephfs/cephfs-shell b/src/tools/cephfs/cephfs-shell new file mode 100644 index 00000000..5db84b56 --- /dev/null +++ b/src/tools/cephfs/cephfs-shell @@ -0,0 +1,1295 @@ +#!/usr/bin/python3 +# coding = utf-8 + +import argparse +import os +import os.path +import sys +from cmd2 import Cmd +import cephfs as libcephfs +import shutil +import traceback +import colorama +import fnmatch +import math +import re +import shlex + +if sys.version_info.major < 3: + raise RuntimeError("cephfs-shell is only compatible with python3") + +try: + from cmd2 import with_argparser +except ImportError: + def with_argparser(argparser): + import functools + + def argparser_decorator(func): + @functools.wraps(func) + def wrapper(thiz, cmdline): + if isinstance(cmdline, list): + arglist = cmdline + else: + # do not split if it's already a list + arglist = shlex.split(cmdline, posix=False) + # in case user quotes the command args + arglist = [arg.strip('\'""') for arg in arglist] + try: + args = argparser.parse_args(arglist) + except SystemExit: + # argparse exits at seeing bad arguments + return + else: + return func(thiz, args) + argparser.prog = func.__name__[3:] + if argparser.description is None and func.__doc__: + argparser.description = func.__doc__ + + return wrapper + + return argparser_decorator + + +cephfs = None +shell = None + + +def poutput(s, end='\n'): + shell.poutput(s, end=end) + + +def setup_cephfs(config_file): + """ + Mouting a cephfs + """ + global cephfs + cephfs = libcephfs.LibCephFS(conffile=config_file) + cephfs.mount() + + +def mode_notation(mode): + """ + """ + permission_bits = {'0': '---', + '1': '--x', + '2': '-w-', + '3': '-wx', + '4': 'r--', + '5': 'r-x', + '6': 'rw-', + '7': 'rwx'} + mode = str(oct(mode)) + notation = '-' + if mode[2] == '4': + notation = 'd' + for i in mode[-3:]: + notation += permission_bits[i] + return notation + + +def get_chunks(file_size): + chunk_start = 0 + chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size + while chunk_start + chunk_size < file_size: + yield(chunk_start, chunk_size) + chunk_start += chunk_size + final_chunk_size = file_size - chunk_start + yield(chunk_start, final_chunk_size) + + +def to_bytes(string): + return bytes(string, encoding='utf-8') + +def ls(path, opts=''): + # opts tries to be like /bin/ls opts + almost_all = 'A' in opts + try: + with cephfs.opendir(path) as d: + while True: + dent = cephfs.readdir(d) + if dent is None: + return + elif almost_all and dent.d_name in (b'.', b'..'): + continue + yield dent + except cephfs.ObjectNotFound: + return [] + +def glob(path, pattern): + paths = [] + parent_dir = os.path.dirname(path) + if parent_dir == b'': + parent_dir = b'/' + if path == b'/' or is_dir_exists(os.path.basename(path), parent_dir): + for i in ls(path, opts='A'): + if fnmatch.fnmatch(i.d_name, pattern): + paths.append(os.path.join(path, i.d_name)) + return paths + + +def locate_file(name, case_sensitive=True): + dir_list = sorted(set(dirwalk(cephfs.getcwd()))) + if not case_sensitive: + return [dname for dname in dir_list if name.lower() in dname.lower()] + else: + return [dname for dname in dir_list if name in dname] + + +def get_all_possible_paths(pattern): + complete_pattern = pattern[:] + paths = [] + is_rel_path = not os.path.isabs(pattern) + if is_rel_path: + dir_ = cephfs.getcwd() + else: + dir_ = b'/' + pattern = pattern[1:] + patterns = pattern.split(b'/') + paths.extend(glob(dir_, patterns[0])) + patterns.pop(0) + for pattern in patterns: + for path in paths: + paths.extend(glob(path, pattern)) + return [path for path in paths if fnmatch.fnmatch(path, + os.path.join(cephfs.getcwd(), complete_pattern))] + + +suffixes = ['B', 'K', 'M', 'G', 'T', 'P'] + + +def humansize(nbytes): + i = 0 + while nbytes >= 1024 and i < len(suffixes)-1: + nbytes /= 1024. + i += 1 + nbytes = math.ceil(nbytes) + f = ('%d' % nbytes).rstrip('.') + return '%s%s' % (f, suffixes[i]) + + +def print_long(path, is_dir, human_readable): + info = cephfs.stat(path) + pretty = os.path.basename(path.decode('utf-8')) + if is_dir: + pretty = colorama.Style.BRIGHT + colorama.Fore.CYAN + pretty + '/' + colorama.Style.RESET_ALL + if human_readable: + poutput('{}\t{:10s} {} {} {} {}'.format( + mode_notation(info.st_mode), + humansize(info.st_size), info.st_uid, + info.st_gid, info.st_mtime, pretty, sep='\t')) + else: + poutput('{} {:12d} {} {} {} {}'.format( + mode_notation(info.st_mode), info.st_size, info.st_uid, + info.st_gid, info.st_mtime, pretty, sep='\t')) + + +def word_len(word): + """ + Returns the word length, minus any color codes. + """ + if word[0] == '\x1b': + return len(word) - 9 + return len(word) + + +def is_dir_exists(path, dir_=b''): + path_to_stat = os.path.join(dir_, path) + try: + return ((cephfs.stat(path_to_stat).st_mode & 0o0040000) != 0) + except libcephfs.Error: + return False + + +def is_file_exists(path, dir_=b''): + try: + # if its not a directory, then its a file + return ((cephfs.stat(os.path.join(dir_, path)).st_mode & 0o0040000) == 0) + except libcephfs.Error: + return False + + +def print_list(words, termwidth=79): + if not words: + return + words = [word.decode('utf-8') if isinstance(word, bytes) else word for word in words] + width = max([word_len(word) for word in words]) + 2 + nwords = len(words) + ncols = max(1, (termwidth + 1) // (width + 1)) + nrows = (nwords + ncols - 1) // ncols + for row in range(nrows): + for i in range(row, nwords, nrows): + word = words[i] + print_width = width + if word[0] == '\x1b': + print_width = print_width + 10 + + poutput('%-*s' % (print_width, words[i]), + end='\n' if i + nrows >= nwords else '') + + +def copy_from_local(local_path, remote_path): + stdin = -1 + file_ = None + fd = None + convert_to_bytes = False + if local_path == b'-': + file_ = sys.stdin + convert_to_bytes = True + else: + try: + file_ = open(local_path, 'rb') + except PermissionError: + perror('error: no permission to read local file {}'.format( + local_path.decode('utf-8')), end='\n', apply_style=True) + return + stdin = 1 + try: + fd = cephfs.open(remote_path, 'w', 0o666) + except libcephfs.Error: + perror('error: no permission to write remote file {}'.format( + remote_path.decode('utf-8')), end='\n', apply_style=True) + return + progress = 0 + while True: + data = file_.read(65536) + if not data or len(data) == 0: + break + if convert_to_bytes: + data = to_bytes(data) + wrote = cephfs.write(fd, data, progress) + if wrote < 0: + break + progress += wrote + cephfs.close(fd) + if stdin > 0: + file_.close() + poutput('') + + +def copy_to_local(remote_path, local_path): + fd = None + if local_path != b'-': + local_dir = os.path.dirname(local_path) + dir_list = remote_path.rsplit(b'/', 1) + if not os.path.exists(local_dir): + os.makedirs(local_dir) + if len(dir_list) > 2 and dir_list[1] == b'': + return + fd = open(local_path, 'wb+') + file_ = cephfs.open(remote_path, 'r') + file_size = cephfs.stat(remote_path).st_size + if file_size <= 0: + return + progress = 0 + for chunk_start, chunk_size in get_chunks(file_size): + file_chunk = cephfs.read(file_, chunk_start, chunk_size) + progress += len(file_chunk) + if fd: + fd.write(file_chunk) + else: + poutput(file_chunk.decode('utf-8')) + cephfs.close(file_) + if fd: + fd.close() + + +def dirwalk(path): + """ + walk a directory tree, using a generator + """ + path = os.path.normpath(path) + for item in ls(path, opts='A'): + fullpath = os.path.join(path, item.d_name) + src_path = fullpath.rsplit(b'/', 1)[0] + + yield os.path.normpath(fullpath) + if is_dir_exists(item.d_name, src_path): + for x in dirwalk(fullpath): + yield x + + +class CephFSShell(Cmd): + + def __init__(self): + super().__init__(use_ipython=False) + self.working_dir = cephfs.getcwd().decode('utf-8') + self.set_prompt() + self.interactive = False + self.umask = '2' + + def default(self, line): + self.poutput('Unrecognized command') + + def set_prompt(self): + self.prompt = ('\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX + + self.working_dir + colorama.Style.RESET_ALL + + '\033[01;33m>>>\033[00m ') + + def create_argparser(self, command): + try: + argparse_args = getattr(self, 'argparse_' + command) + except AttributeError: + return None + doc_lines = getattr( + self, 'do_' + command).__doc__.expandtabs().splitlines() + if ''in doc_lines: + blank_idx = doc_lines.index('') + usage = doc_lines[:blank_idx] + description = doc_lines[blank_idx + 1:] + else: + usage = doc_lines + description = [] + parser = argparse.ArgumentParser( + prog=command, + usage='\n'.join(usage), + description='\n'.join(description), + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + for args, kwargs in argparse_args: + parser.add_argument(*args, **kwargs) + return parser + + def complete_filenames(self, text, line, begidx, endidx): + if not text: + completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir()) + for x in ls(b".", opts='A')] + else: + if text.count('/') > 0: + completions = [text.rsplit('/', 1)[0] + '/' + + x.d_name.decode('utf-8') + '/' + * int(x.is_dir()) for x in ls('/' + + text.rsplit('/', 1)[0], opts='A') + if x.d_name.decode('utf-8').startswith( + text.rsplit('/', 1)[1])] + else: + completions = [x.d_name.decode('utf-8') + '/' + * int(x.is_dir()) for x in ls(b".", opts='A') + if x.d_name.decode('utf-8').startswith(text)] + if len(completions) == 1 and completions[0][-1] == '/': + dir_, file_ = completions[0].rsplit('/', 1) + completions.extend([dir_ + '/' + x.d_name.decode('utf-8') + + '/' * int(x.is_dir()) for x in + ls('/' + dir_, opts='A') + if x.d_name.decode('utf-8').startswith(file_)]) + return self.delimiter_complete(text, line, begidx, endidx, completions, '/') + return completions + + def onecmd(self, line): + """ + Global error catcher + """ + try: + res = Cmd.onecmd(self, line) + if self.interactive: + self.set_prompt() + return res + except ConnectionError as e: + self.poutput('***', e) + except KeyboardInterrupt: + self.poutput('Command aborted') + except Exception as e: + self.poutput(e) + traceback.print_exc(file=sys.stdout) + + class path_to_bytes(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + if isinstance(values, str): + values = to_bytes(values) + if isinstance(values, list): + values = list(map(to_bytes, values)) + setattr(namespace, self.dest, values) + + def complete_mkdir(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + class ModeAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + if nargs is not None and nargs != '?': + raise ValueError("more than one modes not allowed") + super().__init__(option_strings, dest, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + o_mode = 0 + res = None + try: + o_mode = int(values, base=8) + except ValueError: + res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', values) + if res is None: + parser.error("invalid mode: %s\n" + "mode must be a numeric octal literal\n" + "or ((u?g?o?)|(a?))(=)(r?w?x?)" % + values) + else: + # we are supporting only assignment of mode and not + or - + # as is generally available with the chmod command + # eg. + # >>> res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', 'go=') + # >>> res.groups() + # ('go', 'go', None, '=', '') + val = res.groups() + + if val[3] != '=': + parser.error("need assignment operator between user " + "and mode specifiers") + if val[4] == '': + parser.error("invalid mode: %s\n" + "mode must be combination of: r | w | x" % + values) + users = '' + if val[2] is None: + users = val[1] + else: + users = val[2] + + t_mode = 0 + if users == 'a': + users = 'ugo' + + if 'r' in val[4]: + t_mode |= 4 + if 'w' in val[4]: + t_mode |= 2 + if 'x' in val[4]: + t_mode |= 1 + + if 'u' in users: + o_mode |= (t_mode << 6) + if 'g' in users: + o_mode |= (t_mode << 3) + if 'o' in users: + o_mode |= t_mode + + if o_mode < 0: + parser.error("invalid mode: %s\n" + "mode cannot be negative" % values) + if o_mode > 0o777: + parser.error("invalid mode: %s\n" + "mode cannot be greater than octal 0777" % values) + + setattr(namespace, self.dest, str(oct(o_mode))) + + mkdir_parser = argparse.ArgumentParser( + description='Create the directory(ies), if they do not already exist.') + mkdir_parser.add_argument('dirs', type=str, + action=path_to_bytes, + metavar='DIR_NAME', + help='Name of new_directory.', + nargs='+') + mkdir_parser.add_argument('-m', '--mode', type=str, + action=ModeAction, + help='Sets the access mode for the new directory.') + mkdir_parser.add_argument('-p', '--parent', action='store_true', + help='Create parent directories as necessary. \ +When this option is specified, no error is reported if a directory already \ +exists.') + + @with_argparser(mkdir_parser) + def do_mkdir(self, args): + """ + Create directory. + """ + for path in args.dirs: + if args.mode: + permission = int(args.mode, 8) + else: + permission = 0o777 + if args.parent: + cephfs.mkdirs(path, permission) + else: + try: + cephfs.mkdir(path, permission) + except libcephfs.Error: + self.poutput("directory missing in the path; " + "you may want to pass the -p argument") + return + + def complete_put(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + index_dict = {1: self.path_complete} + return self.index_based_complete(text, line, begidx, endidx, index_dict) + + put_parser = argparse.ArgumentParser( + description='Copy a file/directory to Ceph File System from Local File System.') + put_parser.add_argument('local_path', type=str, action=path_to_bytes, + help='Path of the file in the local system') + put_parser.add_argument('remote_path', type=str, action=path_to_bytes, + help='Path of the file in the remote system.', + nargs='?', default='.') + put_parser.add_argument('-f', '--force', action='store_true', + help='Overwrites the destination if it already exists.') + + @with_argparser(put_parser) + def do_put(self, args): + """ + Copy a file to Ceph File System from Local Directory. + """ + root_src_dir = args.local_path + root_dst_dir = args.remote_path + if args.local_path == b'.' or args.local_path == b'./': + root_src_dir = os.getcwdb() + elif len(args.local_path.rsplit(b'/', 1)) < 2: + root_src_dir = os.path.join(os.getcwdb(), args.local_path) + else: + p = args.local_path.split(b'/') + if p[0] == b'.': + root_src_dir = os.getcwdb() + p.pop(0) + while len(p) > 0: + root_src_dir += b'/' + p.pop(0) + + if root_dst_dir == b'.': + if args.local_path != b'-': + root_dst_dir = root_src_dir.rsplit(b'/', 1)[1] + if root_dst_dir == b'': + root_dst_dir = root_src_dir.rsplit(b'/', 1)[0] + a = root_dst_dir.rsplit(b'/', 1) + if len(a) > 1: + root_dst_dir = a[1] + else: + root_dst_dir = a[0] + else: + self.poutput("error: no filename specified for destination") + return + + if root_dst_dir[-1] != b'/': + root_dst_dir += b'/' + + if args.local_path == b'-' or os.path.isfile(root_src_dir): + if not args.force: + if os.path.isfile(root_src_dir): + dst_file = root_dst_dir + if is_file_exists(dst_file): + self.perror('{}: file exists! use --force to overwrite'.format( + dst_file.decode('utf-8')), end='\n', + apply_style=True) + return + if args.local_path == b'-': + root_src_dir = b'-' + copy_from_local(root_src_dir, root_dst_dir) + else: + for src_dir, dirs, files in os.walk(root_src_dir): + if isinstance(src_dir, str): + src_dir = to_bytes(src_dir) + dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1) + dst_dir = re.sub(rb'\/+', b'/', cephfs.getcwd() + + dst_dir) + if args.force and dst_dir != b'/' and not is_dir_exists( + dst_dir[:-1]) and not locate_file(dst_dir): + try: + cephfs.mkdirs(dst_dir, 0o777) + except libcephfs.Error: + pass + if (not args.force) and dst_dir != b'/' and not is_dir_exists( + dst_dir) and not os.path.isfile(root_src_dir): + try: + cephfs.mkdirs(dst_dir, 0o777) + except libcephfs.Error: + pass + + for dir_ in dirs: + dir_name = os.path.join(dst_dir, dir_) + if not is_dir_exists(dir_name): + try: + cephfs.mkdirs(dir_name, 0o777) + except libcephfs.Error: + pass + + for file_ in files: + src_file = os.path.join(src_dir, file_) + dst_file = re.sub(rb'\/+', b'/', b'/' + dst_dir + b'/' + file_) + if (not args.force) and is_file_exists(dst_file): + return + copy_from_local(src_file, os.path.join(cephfs.getcwd(), + dst_file)) + + def complete_get(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + get_parser = argparse.ArgumentParser( + description='Copy a file from Ceph File System from Local Directory.') + get_parser.add_argument('remote_path', type=str, action=path_to_bytes, + help='Path of the file in the remote system') + get_parser.add_argument('local_path', type=str, action=path_to_bytes, + help='Path of the file in the local system', + nargs='?', default='.') + get_parser.add_argument('-f', '--force', action='store_true', + help='Overwrites the destination if it already exists.') + + @with_argparser(get_parser) + def do_get(self, args): + """ + Copy a file/directory from Ceph File System to Local Directory. + """ + root_src_dir = args.remote_path + root_dst_dir = args.local_path + fname = root_src_dir.rsplit(b'/', 1) + if args.local_path == b'.': + root_dst_dir = os.getcwdb() + if args.remote_path == b'.': + root_src_dir = cephfs.getcwd() + if args.local_path == b'-': + if args.remote_path == b'.' or args.remote_path == b'./': + self.perror('error: no remote file name specified', end='\n', + apply_style=True) + return + copy_to_local(root_src_dir, b'-') + elif is_file_exists(args.remote_path): + copy_to_local(root_src_dir, + root_dst_dir + b'/' + root_src_dir) + elif b'/'in root_src_dir and is_file_exists(fname[1], fname[0]): + copy_to_local(root_src_dir, root_dst_dir) + else: + files = list(reversed(sorted(dirwalk(root_src_dir)))) + if len(files) == 0: + try: + os.makedirs(root_dst_dir + b'/' + root_src_dir) + except OSError: + if args.force: + pass + else: + self.perror('{}: already exists! use --force to overwrite'.format( + root_src_dir.decode('utf-8')), end='\n', + apply_style=True) + return + + for file_ in files: + dst_dirpath, dst_file = file_.rsplit(b'/', 1) + if dst_dirpath in files: + files.remove(dst_dirpath) + dst_path = os.path.join(root_dst_dir, dst_dirpath, dst_file) + dst_path = os.path.normpath(dst_path) + if is_dir_exists(file_): + try: + os.makedirs(dst_path) + except OSError: + pass + else: + if not args.force: + try: + os.stat(dst_path) + self.perror('{}: file already exists! use --force to override'.format( + file_.decode('utf-8')), end='\n', + apply_style=True) + return + except OSError: + copy_to_local(file_, dst_path) + else: + copy_to_local(file_, dst_path) + + return 0 + + def complete_ls(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + ls_parser = argparse.ArgumentParser( + description='Copy a file from Ceph File System from Local Directory.') + ls_parser.add_argument('-l', '--long', action='store_true', + help='Detailed list of items in the directory.') + ls_parser.add_argument('-r', '--reverse', action='store_true', + help='Reverse order of listing items in the directory.') + ls_parser.add_argument('-H', action='store_true', help='Human Readable') + ls_parser.add_argument('-a', '--all', action='store_true', + help='Do not Ignore entries starting with .') + ls_parser.add_argument('-S', action='store_true', help='Sort by file_size') + ls_parser.add_argument('paths', help='Name of Directories', + action=path_to_bytes, nargs='*', default=['.']) + + @with_argparser(ls_parser) + def do_ls(self, args): + """ + List all the files and directories in the current working directory + """ + paths = args.paths + for path in paths: + values = [] + items = [] + if path.count(b'*') > 0: + all_items = get_all_possible_paths(path) + if len(all_items) == 0: + continue + path = all_items[0].rsplit(b'/', 1)[0] + if path == b'': + path = b'/' + dirs = [] + for i in all_items: + for item in ls(path): + d_name = item.d_name + if os.path.basename(i) == d_name: + if item.is_dir(): + dirs.append(os.path.join(path, d_name)) + else: + items.append(item) + if dirs: + paths.extend(dirs) + else: + self.poutput(path.decode('utf-8'), end=':\n') + items = sorted(items, key=lambda item: item.d_name) + else: + if path != b'' and path != cephfs.getcwd() and len(paths) > 1: + self.poutput(path.decode('utf-8'), end=':\n') + items = sorted(ls(path), + key=lambda item: item.d_name) + if not args.all: + items = [i for i in items if not i.d_name.startswith(b'.')] + + if args.S: + items = sorted(items, key=lambda item: cephfs.stat( + path + b'/' + item.d_name).st_size) + + if args.reverse: + items = reversed(items) + for item in items: + filepath = item.d_name + is_dir = item.is_dir() + + if args.long and args.H: + print_long(cephfs.getcwd() + + path + + b'/' + + filepath, + is_dir, True) + elif args.long: + print_long(cephfs.getcwd() + + path + + b'/' + + filepath, + is_dir, False) + elif is_dir: + values.append(colorama.Style.BRIGHT + + colorama.Fore.CYAN + + filepath.decode('utf-8') + + '/' + + colorama.Style.RESET_ALL) + else: + values.append(filepath) + if not args.long: + print_list(values, shutil.get_terminal_size().columns) + if path != paths[-1]: + self.poutput('') + + def complete_rmdir(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + rmdir_parser = argparse.ArgumentParser(description='Remove Directory.') + rmdir_parser.add_argument('paths', help='Directory Path.', nargs='+', + action=path_to_bytes) + rmdir_parser.add_argument('-p', '--parent', action='store_true', + help='Remove parent directories as necessary. \ +When this option is specified, no error is reported if a directory has any \ +sub-directories, files') + + @with_argparser(rmdir_parser) + def do_rmdir(self, args): + """ + Remove a specific Directory + """ + is_pattern = False + paths = args.paths + for path in paths: + if path.count(b'*') > 0: + is_pattern = True + all_items = get_all_possible_paths(path) + if len(all_items) > 0: + path = all_items[0].rsplit(b'/', 1)[0] + if path == b'': + path = b'/' + dirs = [] + for i in all_items: + for item in ls(path): + d_name = item.d_name + if os.path.basename(i) == d_name: + if item.is_dir(): + dirs.append(os.path.join(path, d_name)) + paths.extend(dirs) + continue + else: + is_pattern = False + path = os.path.normpath(os.path.join(cephfs.getcwd(), path)) + if args.parent: + files = reversed(sorted(set(dirwalk(path)))) + for filepath in files: + filepath = os.path.normpath(filepath) + if filepath[1:] != path: + try: + cephfs.rmdir(filepath) + except libcephfs.Error: + cephfs.unlink(filepath) + if not is_pattern and path != os.path.normpath(b''): + try: + cephfs.rmdir(path) + except libcephfs.Error: + self.perror('error: no such directory {} exists'.format( + path.decode('utf-8')), end='\n', + apply_style=True) + + def complete_rm(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + rm_parser = argparse.ArgumentParser(description='Remove File.') + rm_parser.add_argument('paths', help='File Path.', nargs='+', + action=path_to_bytes) + + @with_argparser(rm_parser) + def do_rm(self, args): + """ + Remove a specific file + """ + file_paths = args.paths + for path in file_paths: + if path.count(b'*') > 0: + file_paths.extend([i for i in get_all_possible_paths( + path) if is_file_exists(i)]) + else: + try: + cephfs.unlink(path) + except libcephfs.Error: + self.perror('{}: no such file'.format(path.decode('utf-8')), + end='\n', apply_style=True) + + def complete_mv(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + mv_parser = argparse.ArgumentParser(description='Move File.') + mv_parser.add_argument('src_path', type=str, action=path_to_bytes, + help='Source File Path.') + mv_parser.add_argument('dest_path', type=str, action=path_to_bytes, + help='Destination File Path.') + + @with_argparser(mv_parser) + def do_mv(self, args): + """ + Rename a file or Move a file from source path to the destination + """ + try: + cephfs.rename(args.src_path, args.dest_path) + except libcephfs.Error: + self.poutput("error: need a file name to move to") + + def complete_cd(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + cd_parser = argparse.ArgumentParser(description='Change working directory') + cd_parser.add_argument('path', type=str, help='Name of the directory.', + action=path_to_bytes, nargs='?', default='/') + + @with_argparser(cd_parser) + def do_cd(self, args): + """ + Change working directory + """ + try: + cephfs.chdir(args.path) + self.working_dir = cephfs.getcwd().decode('utf-8') + self.set_prompt() + except libcephfs.Error: + self.perror('{}: no such directory'.format(args.path.decode('utf-8')), + end='\n', apply_style=True) + + def do_cwd(self, arglist): + """ + Get current working directory. + """ + self.poutput(cephfs.getcwd().decode('utf-8')) + + def complete_chmod(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + chmod_parser = argparse.ArgumentParser(description='Create Directory.') + chmod_parser.add_argument('mode', type=str, action=ModeAction, help='Mode') + chmod_parser.add_argument('paths', type=str, action=path_to_bytes, + help='Name of the file', nargs='+') + + @with_argparser(chmod_parser) + def do_chmod(self, args): + """ + Change permission of a file + """ + for path in args.paths: + mode = int(args.mode, base=8) + try: + cephfs.chmod(path, mode) + except libcephfs.Error: + self.perror('{}: no such file or directory'.format( + path.decode('utf-8')), end='\n', apply_style=True) + + def complete_cat(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + cat_parser = argparse.ArgumentParser(description='') + cat_parser.add_argument('paths', help='Name of Files', action=path_to_bytes, + nargs='+') + + @with_argparser(cat_parser) + def do_cat(self, args): + """ + Print contents of a file + """ + for path in args.paths: + if is_file_exists(path): + copy_to_local(path, b'-') + else: + self.perror('{}: no such file'.format(path.decode('utf-8')), + end='\n', apply_style=True) + + umask_parser = argparse.ArgumentParser(description='Set umask value.') + umask_parser.add_argument('mode', help='Mode', type=str, action=ModeAction, + nargs='?', default='') + + @with_argparser(umask_parser) + def do_umask(self, args): + """ + Set Umask value. + """ + if args.mode == '': + self.poutput(self.umask.zfill(4)) + else: + mode = int(args.mode, 8) + self.umask = str(oct(cephfs.umask(mode))[2:]) + + def complete_write(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + write_parser = argparse.ArgumentParser(description='Writes data into a file') + write_parser.add_argument('path', type=str, action=path_to_bytes, + help='Name of File') + + @with_argparser(write_parser) + def do_write(self, args): + """ + Write data into a file. + """ + + copy_from_local(b'-', args.path) + + def complete_lcd(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + index_dict = {1: self.path_complete} + return self.index_based_complete(text, line, begidx, endidx, index_dict) + + lcd_parser = argparse.ArgumentParser(description='') + lcd_parser.add_argument('path', type=str, action=path_to_bytes, help='Path') + + @with_argparser(lcd_parser) + def do_lcd(self, args): + """ + Moves into the given local directory + """ + try: + os.chdir(os.path.expanduser(args.path)) + except OSError as e: + self.perror("Cannot change to {}: {}".format(e.filename, + e.strerror), False) + + def complete_lls(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + index_dict = {1: self.path_complete} + return self.index_based_complete(text, line, begidx, endidx, index_dict) + + lls_parser = argparse.ArgumentParser( + description='List files in local system.') + lls_parser.add_argument('paths', help='Paths', action=path_to_bytes, + nargs='*') + + @with_argparser(lls_parser) + def do_lls(self, args): + """ + Lists all files and folders in the current local directory + """ + if not args.paths: + print_list(os.listdir(os.getcwdb())) + else: + for path in args.paths: + try: + items = os.listdir(path) + self.poutput("{}:".format(path.decode('utf-8'))) + print_list(items) + except OSError as e: + self.perror("'{}': {}".format(e.filename, e.strerror), False) + # Arguments to the with_argpaser decorator function are sticky. + # The items in args.path do not get overwritten in subsequent calls. + # The arguments remain in args.paths after the function exits and we + # neeed to clean it up to ensure the next call works as expected. + args.paths.clear() + + def do_lpwd(self, arglist): + """ + Prints the absolute path of the current local directory + """ + self.poutput(os.getcwd()) + + def do_df(self, arglist): + """ + Display the amount of available disk space for file systems + """ + for index, i in enumerate(ls(b".", opts='A')): + if index == 0: + self.poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format( + "1K-blocks", "Used", "Available", "Use%", "Stored on")) + if not is_dir_exists(i.d_name): + statfs = cephfs.statfs(i.d_name) + stat = cephfs.stat(i.d_name) + block_size = statfs['f_blocks']*statfs['f_bsize'] // 1024 + available = block_size - stat.st_size + use = 0 + if block_size > 0: + use = (stat.st_size*100 // block_size) + self.poutput('{:25d}\t{:5d}\t{:10d}\t{:5s} {}'.format( + statfs['f_fsid'], stat.st_size, available, + str(int(use)) + '%', i.d_name.decode('utf-8'))) + + locate_parser = argparse.ArgumentParser( + description='Find file within file system') + locate_parser.add_argument('name', help='name', type=str, + action=path_to_bytes) + locate_parser.add_argument('-c', '--count', action='store_true', + help='Count list of items located.') + locate_parser.add_argument( + '-i', '--ignorecase', action='store_true', help='Ignore case') + + @with_argparser(locate_parser) + def do_locate(self, args): + """ + Find a file within the File System + """ + if args.name.count(b'*') == 1: + if args.name[0] == b'*': + args.name += b'/' + elif args.name[-1] == '*': + args.name = b'/' + args.name + args.name = args.name.replace(b'*', b'') + if args.ignorecase: + locations = locate_file(args.name, False) + else: + locations = locate_file(args.name) + if args.count: + self.poutput(len(locations)) + else: + self.poutput((b'\n'.join(locations)).decode('utf-8')) + + def complete_du(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + du_parser = argparse.ArgumentParser( + description='Disk Usage of a Directory') + du_parser.add_argument('dirs', type=str, action=path_to_bytes, + help='Name of the directory.', nargs='?', + default='.') + du_parser.add_argument('-r', action='store_true', + help='Recursive Disk usage of all directories.') + + @with_argparser(du_parser) + def do_du(self, args): + """ + Disk Usage of a Directory + """ + if args.dirs == b'': + args.dirs = cephfs.getcwd() + for dir_ in args.dirs: + if args.r: + for i in reversed(sorted(set(dirwalk(dir_)))): + i = os.path.normpath(i) + try: + xattr = cephfs.getxattr(i, 'ceph.dir.rbytes') + self.poutput('{:10s} {}'.format( + humansize(int(xattr.decode('utf-8'))), '.' + + i.decode('utf-8'))) + except libcephfs.Error: + continue + else: + dir_ = os.path.normpath(dir_) + self.poutput('{:10s} {}'.format(humansize(int(cephfs.getxattr( + dir_, 'ceph.dir.rbytes').decode('utf-8'))), '.' + + dir_.decode('utf-8'))) + + quota_parser = argparse.ArgumentParser( + description='Quota management for a Directory') + quota_parser.add_argument('op', choices=['get', 'set'], + help='Quota operation type.') + quota_parser.add_argument('path', type=str, action=path_to_bytes, + help='Name of the directory.') + quota_parser.add_argument('--max_bytes', type=int, default=-1, nargs='?', + help='Max cumulative size of the data under ' + 'this directory.') + quota_parser.add_argument('--max_files', type=int, default=-1, nargs='?', + help='Total number of files under this ' + 'directory tree.') + + @with_argparser(quota_parser) + def do_quota(self, args): + """ + Quota management. + """ + if not is_dir_exists(args.path): + self.perror('error: no such directory {}'.format(args.path.decode('utf-8')), + end='\n', apply_style=True) + return + + if args.op == 'set': + if (args.max_bytes == -1) and (args.max_files == -1): + self.poutput('please specify either --max_bytes or ' + '--max_files or both') + return + + if args.max_bytes >= 0: + max_bytes = to_bytes(str(args.max_bytes)) + try: + cephfs.setxattr(args.path, 'ceph.quota.max_bytes', + max_bytes, len(max_bytes), + os.XATTR_CREATE) + self.poutput('max_bytes set to %d' % args.max_bytes) + except libcephfs.Error: + cephfs.setxattr(args.path, 'ceph.quota.max_bytes', + max_bytes, len(max_bytes), + os.XATTR_REPLACE) + self.poutput('max_bytes reset to %d' % args.max_bytes) + + if args.max_files >= 0: + max_files = to_bytes(str(args.max_files)) + try: + cephfs.setxattr(args.path, 'ceph.quota.max_files', + max_files, len(max_files), + os.XATTR_CREATE) + self.poutput('max_files set to %d' % args.max_files) + except libcephfs.Error: + cephfs.setxattr(args.path, 'ceph.quota.max_files', + max_files, len(max_files), + os.XATTR_REPLACE) + self.poutput('max_files reset to %d' % args.max_files) + elif args.op == 'get': + max_bytes = '0' + max_files = '0' + try: + max_bytes = cephfs.getxattr(args.path, + 'ceph.quota.max_bytes') + self.poutput('max_bytes: %s' % max_bytes) + except libcephfs.Error: + self.poutput('max_bytes is not set') + pass + + try: + max_files = cephfs.getxattr(args.path, + 'ceph.quota.max_files') + self.poutput('max_files: %s' % max_files) + except libcephfs.Error: + self.poutput('max_files is not set') + pass + + def do_help(self, line): + """ + Get details about a command. + Usage: help <cmd> - for a specific command + help all - for all the commands + """ + if line == 'all': + for k in dir(self): + if k.startswith('do_'): + self.poutput('-'*80) + super().do_help(k[3:]) + return + parser = self.create_argparser(line) + if parser: + parser.print_help() + else: + super().do_help(line) + + def complete_stat(self, text, line, begidx, endidx): + """ + auto complete of file name. + """ + return self.complete_filenames(text, line, begidx, endidx) + + stat_parser = argparse.ArgumentParser( + description='Display file or file system status') + stat_parser.add_argument('paths', type=str, help='file paths', + action=path_to_bytes, nargs='+') + + @with_argparser(stat_parser) + def do_stat(self, args): + """ + Display file or file system status + """ + for path in args.paths: + try: + stat = cephfs.stat(path) + atime = stat.st_atime.isoformat(' ') + mtime = stat.st_mtime.isoformat(' ') + ctime = stat.st_mtime.isoformat(' ') + + self.poutput("File: {}\nSize: {:d}\nBlocks: {:d}\nIO Block: {:d}\n\ +Device: {:d}\tInode: {:d}\tLinks: {:d}\nPermission: {:o}/{}\tUid: {:d}\tGid: {:d}\n\ +Access: {}\nModify: {}\nChange: {}".format(path.decode('utf-8'), stat.st_size, + stat.st_blocks, stat.st_blksize, stat.st_dev, + stat.st_ino, stat.st_nlink, stat.st_mode, + mode_notation(stat.st_mode), stat.st_uid, + stat.st_gid, atime, mtime, ctime)) + except libcephfs.Error: + self.perror('{}: no such file or directory'.format(path.decode('utf-8')), + end='\n', apply_style=True) + + +if __name__ == '__main__': + config_file = '' + exe = sys.argv[0] + main_parser = argparse.ArgumentParser(description='') + main_parser.add_argument('-c', '--config', action='store', + help='Configuration file_path', type=str) + main_parser.add_argument( + '-b', '--batch', action='store', help='Batch File path.', type=str) + main_parser.add_argument('-t', '--test', action='store', + help='Test against transcript(s) in FILE', + nargs='+') + main_parser.add_argument('commands', nargs='*', + help='comma delimited commands', default=[]) + args = main_parser.parse_args() + if args.config: + config_file = args.config + if args.batch: + args.commands = ['load ' + args.batch, ',quit'] + if args.test: + args.commands.extend(['-t,'] + [arg+',' for arg in args.test]) + sys.argv.clear() + sys.argv.append(exe) + sys.argv.extend([i.strip() for i in ' '.join(args.commands).split(',')]) + setup_cephfs(config_file) + shell = CephFSShell() + shell.cmdloop() |