summaryrefslogtreecommitdiffstats
path: root/src/tools/cephfs/shell/cephfs-shell
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/cephfs/shell/cephfs-shell')
-rwxr-xr-xsrc/tools/cephfs/shell/cephfs-shell1854
1 files changed, 1854 insertions, 0 deletions
diff --git a/src/tools/cephfs/shell/cephfs-shell b/src/tools/cephfs/shell/cephfs-shell
new file mode 100755
index 000000000..58884a275
--- /dev/null
+++ b/src/tools/cephfs/shell/cephfs-shell
@@ -0,0 +1,1854 @@
+#!/usr/bin/python3
+# coding = utf-8
+
+import argparse
+import os
+import os.path
+import sys
+import cephfs as libcephfs
+import shutil
+import traceback
+import colorama
+import fnmatch
+import math
+import re
+import shlex
+import stat
+import errno
+
+from distutils.version import LooseVersion
+
+from cmd2 import Cmd
+from cmd2 import __version__ as cmd2_version
+# XXX: In cmd2 versions < 1.0.1, we'll get SystemExit(2) instead of
+# Cmd2ArgparseError
+if LooseVersion(cmd2_version) >= LooseVersion("1.0.1"):
+ from cmd2.exceptions import Cmd2ArgparseError
+else:
+ # HACK: so that we don't have check for version everywhere
+ # Cmd2ArgparseError is used.
+ class Cmd2ArgparseError:
+ pass
+
+if sys.version_info.major < 3:
+ raise RuntimeError("cephfs-shell is only compatible with python3")
+
+try:
+ from cmd2 import with_argparser
+except ImportError:
+ def with_argparser(argparser):
+ import functools
+
+ def argparser_decorator(func):
+ @functools.wraps(func)
+ def wrapper(thiz, cmdline):
+ if isinstance(cmdline, list):
+ arglist = cmdline
+ else:
+ # do not split if it's already a list
+ arglist = shlex.split(cmdline, posix=False)
+ # in case user quotes the command args
+ arglist = [arg.strip('\'""') for arg in arglist]
+ try:
+ args = argparser.parse_args(arglist)
+ except SystemExit:
+ shell.exit_code = 1
+ # argparse exits at seeing bad arguments
+ return
+ else:
+ return func(thiz, args)
+ argparser.prog = func.__name__[3:]
+ if argparser.description is None and func.__doc__:
+ argparser.description = func.__doc__
+
+ return wrapper
+
+ return argparser_decorator
+
+
+cephfs = None # holds CephFS Python bindings
+shell = None # holds instance of class CephFSShell
+exit_codes = {'Misc': 1,
+ 'KeyboardInterrupt': 2,
+ errno.EPERM: 3,
+ errno.EACCES: 4,
+ errno.ENOENT: 5,
+ errno.EIO: 6,
+ errno.ENOSPC: 7,
+ errno.EEXIST: 8,
+ errno.ENODATA: 9,
+ errno.EINVAL: 10,
+ errno.EOPNOTSUPP: 11,
+ errno.ERANGE: 12,
+ errno.EWOULDBLOCK: 13,
+ errno.ENOTEMPTY: 14,
+ errno.ENOTDIR: 15,
+ errno.EDQUOT: 16,
+ errno.EPIPE: 17,
+ errno.ESHUTDOWN: 18,
+ errno.ECONNABORTED: 19,
+ errno.ECONNREFUSED: 20,
+ errno.ECONNRESET: 21,
+ errno.EINTR: 22,
+ errno.EISDIR: 23}
+
+
+#########################################################################
+#
+# Following are methods are generically useful through class CephFSShell
+#
+#######################################################################
+
+
+def poutput(s, end='\n'):
+ shell.poutput(s, end=end)
+
+
+def perror(msg, **kwargs):
+ shell.perror(msg, **kwargs)
+
+
+def set_exit_code_msg(errcode='Misc', msg=''):
+ """
+ Set exit code and print error message
+ """
+ if isinstance(msg, libcephfs.Error):
+ shell.exit_code = exit_codes[msg.get_error_code()]
+ else:
+ shell.exit_code = exit_codes[errcode]
+ if msg:
+ perror(msg)
+
+
+def mode_notation(mode):
+ """
+ """
+ permission_bits = {'0': '---',
+ '1': '--x',
+ '2': '-w-',
+ '3': '-wx',
+ '4': 'r--',
+ '5': 'r-x',
+ '6': 'rw-',
+ '7': 'rwx'}
+ mode = str(oct(mode))
+ notation = '-'
+ if mode[2] == '4':
+ notation = 'd'
+ elif mode[2:4] == '12':
+ notation = 'l'
+ for i in mode[-3:]:
+ notation += permission_bits[i]
+ return notation
+
+
+def get_chunks(file_size):
+ chunk_start = 0
+ chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size
+ while chunk_start + chunk_size < file_size:
+ yield chunk_start, chunk_size
+ chunk_start += chunk_size
+ final_chunk_size = file_size - chunk_start
+ yield chunk_start, final_chunk_size
+
+
+def to_bytes(param):
+ # don't convert as follows as it can lead unusable results like converting
+ # [1, 2, 3, 4] to '[1, 2, 3, 4]' -
+ # str(param).encode('utf-8')
+ if isinstance(param, bytes):
+ return param
+ elif isinstance(param, str):
+ return bytes(param, encoding='utf-8')
+ elif isinstance(param, list):
+ return [i.encode('utf-8') if isinstance(i, str) else to_bytes(i) for
+ i in param]
+ elif isinstance(param, int) or isinstance(param, float):
+ return str(param).encode('utf-8')
+ elif param is None:
+ return None
+
+
+def ls(path, opts=''):
+ # opts tries to be like /bin/ls opts
+ almost_all = 'A' in opts
+ try:
+ with cephfs.opendir(path) as d:
+ while True:
+ dent = cephfs.readdir(d)
+ if dent is None:
+ return
+ elif almost_all and dent.d_name in (b'.', b'..'):
+ continue
+ yield dent
+ except libcephfs.ObjectNotFound as e:
+ set_exit_code_msg(msg=e)
+
+
+def glob(path, pattern):
+ paths = []
+ parent_dir = os.path.dirname(path)
+ if parent_dir == b'':
+ parent_dir = b'/'
+ if path == b'/' or is_dir_exists(os.path.basename(path), parent_dir):
+ for i in ls(path, opts='A'):
+ if fnmatch.fnmatch(i.d_name, pattern):
+ paths.append(os.path.join(path, i.d_name))
+ return paths
+
+
+def locate_file(name, case_sensitive=True):
+ dir_list = sorted(set(dirwalk(cephfs.getcwd())))
+ if not case_sensitive:
+ return [dname for dname in dir_list if name.lower() in dname.lower()]
+ else:
+ return [dname for dname in dir_list if name in dname]
+
+
+def get_all_possible_paths(pattern):
+ complete_pattern = pattern[:]
+ paths = []
+ is_rel_path = not os.path.isabs(pattern)
+ if is_rel_path:
+ dir_ = cephfs.getcwd()
+ else:
+ dir_ = b'/'
+ pattern = pattern[1:]
+ patterns = pattern.split(b'/')
+ paths.extend(glob(dir_, patterns[0]))
+ patterns.pop(0)
+ for pattern in patterns:
+ for path in paths:
+ paths.extend(glob(path, pattern))
+ if is_rel_path:
+ complete_pattern = os.path.join(cephfs.getcwd(), complete_pattern)
+ return [path for path in paths if fnmatch.fnmatch(path, complete_pattern)]
+
+
+suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
+
+
+def humansize(nbytes):
+ i = 0
+ while nbytes >= 1024 and i < len(suffixes) - 1:
+ nbytes /= 1024.
+ i += 1
+ nbytes = math.ceil(nbytes)
+ f = ('%d' % nbytes).rstrip('.')
+ return '%s%s' % (f, suffixes[i])
+
+
+def style_listing(path, is_dir, is_symlink, ls_long=False):
+ if not (is_dir or is_symlink):
+ return path
+ pretty = colorama.Style.BRIGHT
+ if is_symlink:
+ pretty += colorama.Fore.CYAN + path
+ if ls_long:
+ # Add target path
+ pretty += ' -> ' + cephfs.readlink(path, size=255).decode('utf-8')
+ elif is_dir:
+ pretty += colorama.Fore.BLUE + path + '/'
+ pretty += colorama.Style.RESET_ALL
+ return pretty
+
+
+def print_long(path, is_dir, is_symlink, human_readable):
+ info = cephfs.stat(path, follow_symlink=(not is_symlink))
+ pretty = style_listing(os.path.basename(path.decode('utf-8')), is_dir, is_symlink, True)
+ if human_readable:
+ sizefmt = '\t {:10s}'.format(humansize(info.st_size))
+ else:
+ sizefmt = '{:12d}'.format(info.st_size)
+ poutput(f'{mode_notation(info.st_mode)} {sizefmt} {info.st_uid} {info.st_gid} {info.st_mtime}'
+ f' {pretty}')
+
+
+def word_len(word):
+ """
+ Returns the word length, minus any color codes.
+ """
+ if word[0] == '\x1b':
+ return len(word) - 9
+ return len(word)
+
+
+def is_dir_exists(path, dir_=b''):
+ path_to_stat = os.path.join(dir_, path)
+ try:
+ return ((cephfs.stat(path_to_stat).st_mode & 0o0040000) != 0)
+ except libcephfs.Error:
+ return False
+
+
+def is_file_exists(path, dir_=b''):
+ try:
+ # if its not a directory, then its a file
+ return ((cephfs.stat(os.path.join(dir_, path)).st_mode & 0o0040000) == 0)
+ except libcephfs.Error:
+ return False
+
+
+def print_list(words, termwidth=79):
+ if not words:
+ return
+ words = [word.decode('utf-8') if isinstance(word, bytes) else word for word in words]
+ width = max([word_len(word) for word in words]) + 2
+ nwords = len(words)
+ ncols = max(1, (termwidth + 1) // (width + 1))
+ nrows = (nwords + ncols - 1) // ncols
+ for row in range(nrows):
+ for i in range(row, nwords, nrows):
+ word = words[i]
+ print_width = width
+ if word[0] == '\x1b':
+ print_width = print_width + 10
+
+ poutput('%-*s' % (print_width, words[i]),
+ end='\n' if i + nrows >= nwords else '')
+
+
+def copy_from_local(local_path, remote_path):
+ stdin = -1
+ file_ = None
+ fd = None
+ convert_to_bytes = False
+ if local_path == b'-':
+ file_ = sys.stdin
+ convert_to_bytes = True
+ else:
+ try:
+ file_ = open(local_path, 'rb')
+ except PermissionError as e:
+ set_exit_code_msg(e.errno, 'error: no permission to read local file {}'.format(
+ local_path.decode('utf-8')))
+ return
+ stdin = 1
+ try:
+ fd = cephfs.open(remote_path, 'w', 0o666)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+ return
+ progress = 0
+ while True:
+ data = file_.read(65536)
+ if not data or len(data) == 0:
+ break
+ if convert_to_bytes:
+ data = to_bytes(data)
+ wrote = cephfs.write(fd, data, progress)
+ if wrote < 0:
+ break
+ progress += wrote
+ cephfs.close(fd)
+ if stdin > 0:
+ file_.close()
+ poutput('')
+
+
+def copy_to_local(remote_path, local_path):
+ fd = None
+ if local_path != b'-':
+ local_dir = os.path.dirname(local_path)
+ dir_list = remote_path.rsplit(b'/', 1)
+ if not os.path.exists(local_dir):
+ os.makedirs(local_dir)
+ if len(dir_list) > 2 and dir_list[1] == b'':
+ return
+ fd = open(local_path, 'wb+')
+ file_ = cephfs.open(remote_path, 'r')
+ file_size = cephfs.stat(remote_path).st_size
+ if file_size <= 0:
+ return
+ progress = 0
+ for chunk_start, chunk_size in get_chunks(file_size):
+ file_chunk = cephfs.read(file_, chunk_start, chunk_size)
+ progress += len(file_chunk)
+ if fd:
+ fd.write(file_chunk)
+ else:
+ poutput(file_chunk.decode('utf-8'))
+ cephfs.close(file_)
+ if fd:
+ fd.close()
+
+
+def dirwalk(path):
+ """
+ walk a directory tree, using a generator
+ """
+ path = os.path.normpath(path)
+ for item in ls(path, opts='A'):
+ fullpath = os.path.join(path, item.d_name)
+ src_path = fullpath.rsplit(b'/', 1)[0]
+
+ yield os.path.normpath(fullpath)
+ if is_dir_exists(item.d_name, src_path):
+ for x in dirwalk(fullpath):
+ yield x
+
+
+##################################################################
+#
+# Following methods are implementation for CephFS Shell commands
+#
+#################################################################
+
+class CephFSShell(Cmd):
+
+ def __init__(self):
+ super().__init__()
+ self.working_dir = cephfs.getcwd().decode('utf-8')
+ self.set_prompt()
+ self.interactive = False
+ self.umask = '2'
+
+ def default(self, line):
+ self.exit_code = 127
+ perror('Unrecognized command')
+
+ def set_prompt(self):
+ self.prompt = ('\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX
+ + self.working_dir + colorama.Style.RESET_ALL
+ + '\033[01;33m>>>\033[00m ')
+
+ def create_argparser(self, command):
+ try:
+ argparse_args = getattr(self, 'argparse_' + command)
+ except AttributeError:
+ set_exit_code_msg()
+ return None
+ doc_lines = getattr(
+ self, 'do_' + command).__doc__.expandtabs().splitlines()
+ if '' in doc_lines:
+ blank_idx = doc_lines.index('')
+ usage = doc_lines[:blank_idx]
+ description = doc_lines[blank_idx + 1:]
+ else:
+ usage = doc_lines
+ description = []
+ parser = argparse.ArgumentParser(
+ prog=command,
+ usage='\n'.join(usage),
+ description='\n'.join(description),
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ )
+ for args, kwargs in argparse_args:
+ parser.add_argument(*args, **kwargs)
+ return parser
+
+ def complete_filenames(self, text, line, begidx, endidx):
+ if not text:
+ completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir())
+ for x in ls(b".", opts='A')]
+ else:
+ if text.count('/') > 0:
+ completions = [text.rsplit('/', 1)[0] + '/'
+ + x.d_name.decode('utf-8') + '/'
+ * int(x.is_dir()) for x in ls('/'
+ + text.rsplit('/', 1)[0], opts='A')
+ if x.d_name.decode('utf-8').startswith(
+ text.rsplit('/', 1)[1])]
+ else:
+ completions = [x.d_name.decode('utf-8') + '/'
+ * int(x.is_dir()) for x in ls(b".", opts='A')
+ if x.d_name.decode('utf-8').startswith(text)]
+ if len(completions) == 1 and completions[0][-1] == '/':
+ dir_, file_ = completions[0].rsplit('/', 1)
+ completions.extend([dir_ + '/' + x.d_name.decode('utf-8')
+ + '/' * int(x.is_dir()) for x in
+ ls('/' + dir_, opts='A')
+ if x.d_name.decode('utf-8').startswith(file_)])
+ return self.delimiter_complete(text, line, begidx, endidx, completions, '/')
+ return completions
+
+ def onecmd(self, line, **kwargs):
+ """
+ Global error catcher
+ """
+ try:
+ res = Cmd.onecmd(self, line, **kwargs)
+ if self.interactive:
+ self.set_prompt()
+ return res
+ except ConnectionError as e:
+ set_exit_code_msg(e.errno, f'***\n{e}')
+ except KeyboardInterrupt:
+ set_exit_code_msg('KeyboardInterrupt', 'Command aborted')
+ except (libcephfs.Error, Exception) as e:
+ if shell.debug:
+ traceback.print_exc(file=sys.stdout)
+ if isinstance(e, Cmd2ArgparseError):
+ # NOTE: In case of Cmd2ArgparseError the error message is
+ # already printed beforehand (plus Cmd2ArgparseError
+ # instances have empty error message), so let's just set the
+ # exit code.
+ set_exit_code_msg(msg=None)
+ else:
+ set_exit_code_msg(msg=f'{type(e).__name__}: {e}')
+ # In cmd2 versions < 1.1.0 we'll get SystemExit(2) instead of
+ # Cmd2ArgparseError
+ except SystemExit:
+ raise
+
+ class path_to_bytes(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ values = to_bytes(values)
+ setattr(namespace, self.dest, values)
+
+ # TODO: move the necessary contents from here to `class path_to_bytes`.
+ class get_list_of_bytes_path(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ values = to_bytes(values)
+
+ if values == b'.':
+ values = cephfs.getcwd()
+ else:
+ for i in values:
+ if i == b'.':
+ values[values.index(i)] = cephfs.getcwd()
+
+ setattr(namespace, self.dest, values)
+
+ def complete_mkdir(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ class ModeAction(argparse.Action):
+ def __init__(self, option_strings, dest, nargs=None, **kwargs):
+ if nargs is not None and nargs != '?':
+ raise ValueError("more than one modes not allowed")
+ super().__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ o_mode = 0
+ res = None
+ try:
+ o_mode = int(values, base=8)
+ except ValueError:
+ res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', values)
+ if res is None:
+ parser.error(f"invalid mode: {values}\n"
+ "mode must be a numeric octal literal\n"
+ "or ((u?g?o?)|(a?))(=)(r?w?x?)")
+ else:
+ # we are supporting only assignment of mode and not + or -
+ # as is generally available with the chmod command
+ # eg.
+ # >>> res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', 'go=')
+ # >>> res.groups()
+ # ('go', 'go', None, '=', '')
+ val = res.groups()
+
+ if val[3] != '=':
+ parser.error("need assignment operator between user "
+ "and mode specifiers")
+ if val[4] == '':
+ parser.error(f"invalid mode: {values}\n"
+ "mode must be combination of: r | w | x")
+ users = ''
+ if val[2] is None:
+ users = val[1]
+ else:
+ users = val[2]
+
+ t_mode = 0
+ if users == 'a':
+ users = 'ugo'
+
+ if 'r' in val[4]:
+ t_mode |= 4
+ if 'w' in val[4]:
+ t_mode |= 2
+ if 'x' in val[4]:
+ t_mode |= 1
+
+ if 'u' in users:
+ o_mode |= (t_mode << 6)
+ if 'g' in users:
+ o_mode |= (t_mode << 3)
+ if 'o' in users:
+ o_mode |= t_mode
+
+ if o_mode < 0:
+ parser.error(f"invalid mode: {values}\n"
+ "mode cannot be negative")
+ if o_mode > 0o7777:
+ parser.error(f"invalid mode: {values}\n"
+ "mode cannot be greater than octal 07777")
+
+ setattr(namespace, self.dest, str(oct(o_mode)))
+
+ mkdir_parser = argparse.ArgumentParser(
+ description='Create the directory(ies), if they do not already exist.')
+ mkdir_parser.add_argument('dirs', type=str,
+ action=path_to_bytes,
+ metavar='DIR_NAME',
+ help='Name of new_directory.',
+ nargs='+')
+ mkdir_parser.add_argument('-m', '--mode', type=str,
+ action=ModeAction,
+ help='Sets the access mode for the new directory.')
+ mkdir_parser.add_argument('-p', '--parent', action='store_true',
+ help='Create parent directories as necessary. '
+ 'When this option is specified, no error is'
+ 'reported if a directory already exists.')
+
+ @with_argparser(mkdir_parser)
+ def do_mkdir(self, args):
+ """
+ Create directory.
+ """
+ for path in args.dirs:
+ if args.mode:
+ permission = int(args.mode, 8)
+ else:
+ permission = 0o777
+ if args.parent:
+ cephfs.mkdirs(path, permission)
+ else:
+ try:
+ cephfs.mkdir(path, permission)
+ except libcephfs.Error as e:
+ set_exit_code_msg(e)
+
+ def complete_put(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ put_parser = argparse.ArgumentParser(
+ description='Copy a file/directory to Ceph File System from Local File System.')
+ put_parser.add_argument('local_path', type=str, action=path_to_bytes,
+ help='Path of the file in the local system')
+ put_parser.add_argument('remote_path', type=str, action=path_to_bytes,
+ help='Path of the file in the remote system')
+ put_parser.add_argument('-f', '--force', action='store_true',
+ help='Overwrites the destination if it already exists.')
+
+ @with_argparser(put_parser)
+ def do_put(self, args):
+ """
+ Copy a local file/directory to CephFS.
+ """
+ if args.local_path != b'-' and not os.path.isfile(args.local_path) \
+ and not os.path.isdir(args.local_path):
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"error: "
+ f"{args.local_path.decode('utf-8')}: "
+ f"No such file or directory")
+ return
+
+ if (is_file_exists(args.remote_path) or is_dir_exists(
+ args.remote_path)) and not args.force:
+ set_exit_code_msg(msg=f"error: file/directory "
+ f"{args.remote_path.decode('utf-8')} "
+ f"exists, use --force to overwrite")
+ return
+
+ root_src_dir = args.local_path
+ root_dst_dir = args.remote_path
+ if args.local_path == b'.' or args.local_path == b'./':
+ root_src_dir = os.getcwdb()
+ elif len(args.local_path.rsplit(b'/', 1)) < 2:
+ root_src_dir = os.path.join(os.getcwdb(), args.local_path)
+ else:
+ p = args.local_path.split(b'/')
+ if p[0] == b'.':
+ root_src_dir = os.getcwdb()
+ p.pop(0)
+ while len(p) > 0:
+ root_src_dir += b'/' + p.pop(0)
+
+ if root_dst_dir == b'.':
+ if args.local_path != b'-':
+ root_dst_dir = root_src_dir.rsplit(b'/', 1)[1]
+ if root_dst_dir == b'':
+ root_dst_dir = root_src_dir.rsplit(b'/', 1)[0]
+ a = root_dst_dir.rsplit(b'/', 1)
+ if len(a) > 1:
+ root_dst_dir = a[1]
+ else:
+ root_dst_dir = a[0]
+ else:
+ set_exit_code_msg(errno.EINVAL, 'error: no filename specified '
+ 'for destination')
+ return
+
+ if root_dst_dir[-1] != b'/':
+ root_dst_dir += b'/'
+
+ if args.local_path == b'-' or os.path.isfile(root_src_dir):
+ if args.local_path == b'-':
+ root_src_dir = b'-'
+ copy_from_local(root_src_dir, root_dst_dir)
+ else:
+ for src_dir, dirs, files in os.walk(root_src_dir):
+ if isinstance(src_dir, str):
+ src_dir = to_bytes(src_dir)
+ dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1)
+ dst_dir = re.sub(rb'\/+', b'/', cephfs.getcwd()
+ + dst_dir)
+ if args.force and dst_dir != b'/' and not is_dir_exists(
+ dst_dir[:-1]) and not locate_file(dst_dir):
+ try:
+ cephfs.mkdirs(dst_dir, 0o777)
+ except libcephfs.Error:
+ pass
+ if (not args.force) and dst_dir != b'/' and not is_dir_exists(
+ dst_dir) and not os.path.isfile(root_src_dir):
+ try:
+ cephfs.mkdirs(dst_dir, 0o777)
+ except libcephfs.Error:
+ # TODO: perhaps, set retval to 1?
+ pass
+
+ for dir_ in dirs:
+ dir_name = os.path.join(dst_dir, dir_)
+ if not is_dir_exists(dir_name):
+ try:
+ cephfs.mkdirs(dir_name, 0o777)
+ except libcephfs.Error:
+ # TODO: perhaps, set retval to 1?
+ pass
+
+ for file_ in files:
+ src_file = os.path.join(src_dir, file_)
+ dst_file = re.sub(rb'\/+', b'/', b'/' + dst_dir + b'/' + file_)
+ if (not args.force) and is_file_exists(dst_file):
+ return
+ copy_from_local(src_file, os.path.join(cephfs.getcwd(),
+ dst_file))
+
+ def complete_get(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ get_parser = argparse.ArgumentParser(
+ description='Copy a file from Ceph File System to Local Directory.')
+ get_parser.add_argument('remote_path', type=str, action=path_to_bytes,
+ help='Path of the file in the remote system')
+ get_parser.add_argument('local_path', type=str, action=path_to_bytes,
+ help='Path of the file in the local system')
+ get_parser.add_argument('-f', '--force', action='store_true',
+ help='Overwrites the destination if it already exists.')
+
+ @with_argparser(get_parser)
+ def do_get(self, args):
+ """
+ Copy a file/directory from CephFS to given path.
+ """
+ if not is_file_exists(args.remote_path) and not \
+ is_dir_exists(args.remote_path):
+ set_exit_code_msg(errno.ENOENT, "error: no file/directory"
+ " found at specified remote "
+ "path")
+ return
+ if (os.path.isfile(args.local_path) or os.path.isdir(
+ args.local_path)) and not args.force:
+ set_exit_code_msg(msg=f"error: file/directory "
+ f"{args.local_path.decode('utf-8')}"
+ f" already exists, use --force to "
+ f"overwrite")
+ return
+ root_src_dir = args.remote_path
+ root_dst_dir = args.local_path
+ fname = root_src_dir.rsplit(b'/', 1)
+ if args.local_path == b'.':
+ root_dst_dir = os.getcwdb()
+ if args.remote_path == b'.':
+ root_src_dir = cephfs.getcwd()
+ if args.local_path == b'-':
+ if args.remote_path == b'.' or args.remote_path == b'./':
+ set_exit_code_msg(errno.EINVAL, 'error: no remote file name specified')
+ return
+ copy_to_local(root_src_dir, b'-')
+ elif is_file_exists(args.remote_path):
+ copy_to_local(root_src_dir, root_dst_dir)
+ elif b'/' in root_src_dir and is_file_exists(fname[1], fname[0]):
+ copy_to_local(root_src_dir, root_dst_dir)
+ else:
+ files = list(reversed(sorted(dirwalk(root_src_dir))))
+ for file_ in files:
+ dst_dirpath, dst_file = file_.rsplit(b'/', 1)
+ if dst_dirpath in files:
+ files.remove(dst_dirpath)
+ dst_path = os.path.join(root_dst_dir, dst_dirpath, dst_file)
+ dst_path = os.path.normpath(dst_path)
+ if is_dir_exists(file_):
+ try:
+ os.makedirs(dst_path)
+ except OSError:
+ pass
+ else:
+ copy_to_local(file_, dst_path)
+
+ return 0
+
+ def complete_ln(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ ln_parser = argparse.ArgumentParser(
+ description='Add a hard link to an existing file or create a symbolic '
+ 'link to an existing file or directory.')
+ ln_parser.add_argument('target', type=str, action=path_to_bytes,
+ help='File/Directory of which link is '
+ 'to be created')
+ ln_parser.add_argument('link_name', type=str, action=path_to_bytes,
+ help='Link to target with the name link_name',
+ nargs='?')
+ ln_parser.add_argument('-s', '--symbolic', action='store_true',
+ help='Create symbolic link')
+ ln_parser.add_argument('-v', '--verbose', action='store_true',
+ help='Print name of each linked file')
+ ln_parser.add_argument('-f', '--force', action='store_true',
+ help='Force create link/symbolic link')
+
+ @with_argparser(ln_parser)
+ def do_ln(self, args):
+ if not is_file_exists(args.target) \
+ and not is_dir_exists(args.target):
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"ln: failed to access "
+ f"'{args.target.decode('utf-8')}"
+ f"': No such file or directory")
+ return
+
+ is_a_dir = False
+ if is_dir_exists(args.target):
+ is_a_dir = True
+
+ target_last_char_slash = False
+ if args.target.decode('utf-8')[len(args.target) - 1] == '/':
+ target_last_char_slash = True
+
+ link_name = ''
+
+ if args.link_name is None:
+ if target_last_char_slash is True:
+ if is_dir_exists(args.target):
+ pass
+ else:
+ set_exit_code_msg(errno.ENOTDIR,
+ f"ln: failed to access "
+ f"'{args.target.decode('utf-8')}': "
+ f"Not a directory")
+ return
+ link_name = os.path.join(cephfs.getcwd(),
+ os.path.basename(
+ os.path.normpath(args.target)))
+ if (is_file_exists(link_name) or is_dir_exists(
+ link_name)) and not args.force:
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"ln: failed to create link "
+ f"{link_name.decode('utf-8')}: "
+ f"exists")
+ return
+ else:
+ if is_dir_exists(args.link_name):
+ dest = args.link_name.decode('utf-8').rstrip('/')
+ dest_first_half = dest.encode('utf-8') + b'/'
+ if is_file_exists(args.target):
+ if target_last_char_slash is True:
+ set_exit_code_msg(errno.ENOTDIR,
+ "ln: failed to access "
+ f"'{args.target.decode('utf-8')}': "
+ "Not a directory")
+ return
+ dest_file = os.path.basename(os.path.normpath(args.target))
+ link_name = dest_first_half + dest_file
+
+ elif is_dir_exists(args.target):
+ dest_dir = os.path.basename(os.path.normpath(args.target))
+ link_name = dest_first_half + dest_dir
+
+ else:
+ # if the destination is not a file or a dir then:
+ # accept it as file so the end part of path cannot have
+ # a `/` succeeding it.
+ test_path = args.link_name.decode('utf-8')
+ if test_path[len(test_path) - 1] == '/':
+ set_exit_code_msg(errno.ENOENT, f"'{test_path}': "
+ f"No such file or "
+ f"directory")
+ return
+ else:
+ link_name = test_path.encode('utf-8')
+
+ if args.force:
+ try:
+ cephfs.lstat(os.path.join(b'', link_name))
+ if not is_a_dir or (is_a_dir and args.symbolic):
+ cephfs.unlink(link_name)
+ except libcephfs.ObjectNotFound:
+ pass
+
+ try:
+ if args.symbolic:
+ cephfs.symlink(args.target, link_name)
+ else:
+ if is_a_dir:
+ set_exit_code_msg(errno.EPERM,
+ f"ln: {args.target.decode('utf-8')}: "
+ "hard link not allowed for directory")
+ return
+ cephfs.link(args.target, link_name)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=str(e))
+ return
+
+ if args.verbose:
+ poutput(f"{link_name.decode('utf-8')} ->"
+ f" {args.target.decode('utf-8')}")
+
+ def complete_ls(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ ls_parser = argparse.ArgumentParser(
+ description='Copy a file from Ceph File System from Local Directory.')
+ ls_parser.add_argument('-l', '--long', action='store_true',
+ help='Detailed list of items in the directory.')
+ ls_parser.add_argument('-r', '--reverse', action='store_true',
+ help='Reverse order of listing items in the directory.')
+ ls_parser.add_argument('-H', action='store_true', help='Human Readable')
+ ls_parser.add_argument('-a', '--all', action='store_true',
+ help='Do not Ignore entries starting with .')
+ ls_parser.add_argument('-S', action='store_true', help='Sort by file_size')
+ ls_parser.add_argument('paths', help='Name of Directories',
+ action=path_to_bytes, nargs='*', default=['.'])
+
+ @with_argparser(ls_parser)
+ def do_ls(self, args):
+ """
+ List all the files and directories in the current working directory
+ """
+ paths = args.paths
+ for path in paths:
+ values = []
+ items = []
+ try:
+ if path.count(b'*') > 0:
+ all_items = get_all_possible_paths(path)
+ if len(all_items) == 0:
+ continue
+ path = all_items[0].rsplit(b'/', 1)[0]
+ if path == b'':
+ path = b'/'
+ dirs = []
+ for i in all_items:
+ for item in ls(path):
+ d_name = item.d_name
+ if os.path.basename(i) == d_name:
+ if item.is_dir():
+ dirs.append(os.path.join(path, d_name))
+ else:
+ items.append(item)
+ if dirs:
+ paths.extend(dirs)
+ else:
+ poutput(path.decode('utf-8'), end=':\n')
+ items = sorted(items, key=lambda item: item.d_name)
+ else:
+ if path != b'' and path != cephfs.getcwd() and len(paths) > 1:
+ poutput(path.decode('utf-8'), end=':\n')
+ items = sorted(ls(path), key=lambda item: item.d_name)
+ if not args.all:
+ items = [i for i in items if not i.d_name.startswith(b'.')]
+ if args.S:
+ items = sorted(items, key=lambda item: cephfs.stat(
+ path + b'/' + item.d_name, follow_symlink=(
+ not item.is_symbol_file())).st_size)
+ if args.reverse:
+ items = reversed(items)
+ for item in items:
+ filepath = item.d_name
+ is_dir = item.is_dir()
+ is_sym_lnk = item.is_symbol_file()
+ try:
+ if args.long and args.H:
+ print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir,
+ is_sym_lnk, True)
+ elif args.long:
+ print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir,
+ is_sym_lnk, False)
+ elif is_sym_lnk or is_dir:
+ values.append(style_listing(filepath.decode('utf-8'), is_dir,
+ is_sym_lnk))
+ else:
+ values.append(filepath)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+ if not args.long:
+ print_list(values, shutil.get_terminal_size().columns)
+ if path != paths[-1]:
+ poutput('')
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ def complete_rmdir(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ rmdir_parser = argparse.ArgumentParser(
+ description='Remove the directory(ies), if they are empty.')
+ rmdir_parser.add_argument('paths', help='Directory Path(s)', nargs='+',
+ action=path_to_bytes)
+ rmdir_parser.add_argument('-p', '--parent', action='store_true',
+ help="remove directory and its ancestors; "
+ "e.g., 'rmdir -p a/b/c' is similar to "
+ "'rmdir a/b/c a/b a'")
+
+ @with_argparser(rmdir_parser)
+ def do_rmdir(self, args):
+ self.do_rmdir_helper(args)
+
+ def do_rmdir_helper(self, args):
+ """
+ Remove a specific Directory
+ """
+ is_pattern = False
+ paths = args.paths
+ for path in paths:
+ if path.count(b'*') > 0:
+ is_pattern = True
+ all_items = get_all_possible_paths(path)
+ if len(all_items) > 0:
+ path = all_items[0].rsplit(b'/', 1)[0]
+ if path == b'':
+ path = b'/'
+ dirs = []
+ for i in all_items:
+ for item in ls(path):
+ d_name = item.d_name
+ if os.path.basename(i) == d_name:
+ if item.is_dir():
+ dirs.append(os.path.join(path, d_name))
+ paths.extend(dirs)
+ continue
+ else:
+ is_pattern = False
+
+ if args.parent:
+ path = os.path.join(cephfs.getcwd(), path.rsplit(b'/')[0])
+ files = list(sorted(set(dirwalk(path)), reverse=True))
+ if not files:
+ path = b'.'
+ for filepath in files:
+ try:
+ cephfs.rmdir(os.path.normpath(filepath))
+ except libcephfs.Error as e:
+ perror(e)
+ path = b'.'
+ break
+ else:
+ path = os.path.normpath(os.path.join(cephfs.getcwd(), path))
+ if not is_pattern and path != os.path.normpath(b''):
+ try:
+ cephfs.rmdir(path)
+ except libcephfs.Error as e:
+ if e.get_error_code() == 2:
+ set_exit_code_msg(e.get_error_code(),
+ "rmdir: failed to remove "
+ f"{path.decode('utf-8')}: "
+ "No such file or directory")
+ elif e.get_error_code() == 20:
+ set_exit_code_msg(e.get_error_code(),
+ "rmdir: failed to remove "
+ f"{path.decode('utf-8')}: "
+ "Not a directory")
+ elif e.get_error_code() == 39:
+ set_exit_code_msg(e.get_error_code(),
+ "rmdir: failed to remove "
+ f"{path.decode('utf-8')}: "
+ "Directory not empty")
+ else:
+ set_exit_code_msg(msg=e)
+
+ def complete_rm(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ rm_parser = argparse.ArgumentParser(description='Remove File.')
+ rm_parser.add_argument('paths', help='File Path.', nargs='+',
+ action=path_to_bytes)
+
+ @with_argparser(rm_parser)
+ def do_rm(self, args):
+ """
+ Remove a specific file
+ """
+ file_paths = args.paths
+ for path in file_paths:
+ if path.count(b'*') > 0:
+ file_paths.extend([i for i in get_all_possible_paths(
+ path) if is_file_exists(i)])
+ else:
+ try:
+ cephfs.unlink(path)
+ except libcephfs.Error as e:
+ # NOTE: perhaps we need a better msg here
+ if e.get_error_code() == 2:
+ set_exit_code_msg(e.get_error_code(),
+ "rm: failed to remove "
+ f"{path.decode('utf-8')}: "
+ "No such file or directory")
+ elif e.get_error_code() == 21:
+ set_exit_code_msg(e.get_error_code(),
+ "rm: failed to remove "
+ f"{path.decode('utf-8')}: "
+ "Is a directory")
+ else:
+ set_exit_code_msg(msg=e)
+
+ def complete_mv(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ mv_parser = argparse.ArgumentParser(description='Move File.')
+ mv_parser.add_argument('src_path', type=str, action=path_to_bytes,
+ help='Source File Path.')
+ mv_parser.add_argument('dest_path', type=str, action=path_to_bytes,
+ help='Destination File Path.')
+
+ @with_argparser(mv_parser)
+ def do_mv(self, args):
+ """
+ Rename a file or Move a file from source path to the destination
+ """
+ cephfs.rename(args.src_path, args.dest_path)
+
+ def complete_cd(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ cd_parser = argparse.ArgumentParser(description='Change working directory')
+ cd_parser.add_argument('path', type=str, help='Name of the directory.',
+ action=path_to_bytes, nargs='?', default='/')
+
+ @with_argparser(cd_parser)
+ def do_cd(self, args):
+ """
+ Change working directory
+ """
+ cephfs.chdir(args.path)
+ self.working_dir = cephfs.getcwd().decode('utf-8')
+ self.set_prompt()
+
+ def do_cwd(self, arglist):
+ """
+ Get current working directory.
+ """
+ poutput(cephfs.getcwd().decode('utf-8'))
+
+ def complete_chmod(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ chmod_parser = argparse.ArgumentParser(description='Change permission of a file/directory.')
+ chmod_parser.add_argument('mode', type=str, action=ModeAction, help='Mode')
+ chmod_parser.add_argument('paths', type=str, action=path_to_bytes,
+ help='Path of the file/directory', nargs='+')
+
+ @with_argparser(chmod_parser)
+ def do_chmod(self, args):
+ """
+ Change permission of a file/directory
+ """
+ for path in args.paths:
+ mode = int(args.mode, base=8)
+ try:
+ cephfs.chmod(path, mode)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ def complete_cat(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ cat_parser = argparse.ArgumentParser(description='')
+ cat_parser.add_argument('paths', help='Name of Files', action=path_to_bytes,
+ nargs='+')
+
+ @with_argparser(cat_parser)
+ def do_cat(self, args):
+ """
+ Print contents of a file
+ """
+ for path in args.paths:
+ if is_file_exists(path):
+ copy_to_local(path, b'-')
+ else:
+ set_exit_code_msg(errno.ENOENT, '{}: no such file'.format(
+ path.decode('utf-8')))
+
+ umask_parser = argparse.ArgumentParser(description='Set umask value.')
+ umask_parser.add_argument('mode', help='Mode', type=str, action=ModeAction,
+ nargs='?', default='')
+
+ @with_argparser(umask_parser)
+ def do_umask(self, args):
+ """
+ Set Umask value.
+ """
+ if args.mode == '':
+ poutput(self.umask.zfill(4))
+ else:
+ mode = int(args.mode, 8)
+ self.umask = str(oct(cephfs.umask(mode))[2:])
+
+ def complete_write(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ write_parser = argparse.ArgumentParser(description='Writes data into a file')
+ write_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of File')
+
+ @with_argparser(write_parser)
+ def do_write(self, args):
+ """
+ Write data into a file.
+ """
+
+ copy_from_local(b'-', args.path)
+
+ def complete_lcd(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ lcd_parser = argparse.ArgumentParser(description='')
+ lcd_parser.add_argument('path', type=str, action=path_to_bytes, help='Path')
+
+ @with_argparser(lcd_parser)
+ def do_lcd(self, args):
+ """
+ Moves into the given local directory
+ """
+ try:
+ os.chdir(os.path.expanduser(args.path))
+ except OSError as e:
+ set_exit_code_msg(e.errno, "Cannot change to "
+ f"{e.filename.decode('utf-8')}: {e.strerror}")
+
+ def complete_lls(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ lls_parser = argparse.ArgumentParser(
+ description='List files in local system.')
+ lls_parser.add_argument('paths', help='Paths', action=path_to_bytes,
+ nargs='*')
+
+ @with_argparser(lls_parser)
+ def do_lls(self, args):
+ """
+ Lists all files and folders in the current local directory
+ """
+ if not args.paths:
+ print_list(os.listdir(os.getcwdb()))
+ else:
+ for path in args.paths:
+ try:
+ items = os.listdir(path)
+ poutput("{}:".format(path.decode('utf-8')))
+ print_list(items)
+ except OSError as e:
+ set_exit_code_msg(e.errno, f"{e.filename.decode('utf-8')}: "
+ f"{e.strerror}")
+ # Arguments to the with_argpaser decorator function are sticky.
+ # The items in args.path do not get overwritten in subsequent calls.
+ # The arguments remain in args.paths after the function exits and we
+ # need to clean it up to ensure the next call works as expected.
+ args.paths.clear()
+
+ def do_lpwd(self, arglist):
+ """
+ Prints the absolute path of the current local directory
+ """
+ poutput(os.getcwd())
+
+ def complete_df(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ df_parser = argparse.ArgumentParser(description='Show information about\
+ the amount of available disk space')
+ df_parser.add_argument('file', help='Name of the file', nargs='*',
+ default=['.'], action=path_to_bytes)
+
+ @with_argparser(df_parser)
+ def do_df(self, arglist):
+ """
+ Display the amount of available disk space for file systems
+ """
+ header = True # Set to true for printing header only once
+ if b'.' == arglist.file[0]:
+ arglist.file = ls(b'.')
+
+ for file in arglist.file:
+ if isinstance(file, libcephfs.DirEntry):
+ file = file.d_name
+ if file == b'.' or file == b'..':
+ continue
+ try:
+ statfs = cephfs.statfs(file)
+ stat = cephfs.stat(file)
+ block_size = (statfs['f_blocks'] * statfs['f_bsize']) // 1024
+ available = block_size - stat.st_size
+ use = 0
+
+ if block_size > 0:
+ use = (stat.st_size * 100) // block_size
+
+ if header:
+ header = False
+ poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format(
+ "1K-blocks", "Used", "Available", "Use%",
+ "Stored on"))
+
+ poutput('{:d}\t{:18d}\t{:8d}\t{:10s} {}'.format(block_size,
+ stat.st_size, available, str(int(use)) + '%',
+ file.decode('utf-8')))
+ except libcephfs.OSError as e:
+ set_exit_code_msg(e.get_error_code(), "could not statfs {}: {}".format(
+ file.decode('utf-8'), e.strerror))
+
+ locate_parser = argparse.ArgumentParser(
+ description='Find file within file system')
+ locate_parser.add_argument('name', help='name', type=str,
+ action=path_to_bytes)
+ locate_parser.add_argument('-c', '--count', action='store_true',
+ help='Count list of items located.')
+ locate_parser.add_argument(
+ '-i', '--ignorecase', action='store_true', help='Ignore case')
+
+ @with_argparser(locate_parser)
+ def do_locate(self, args):
+ """
+ Find a file within the File System
+ """
+ if args.name.count(b'*') == 1:
+ if args.name[0] == b'*':
+ args.name += b'/'
+ elif args.name[-1] == '*':
+ args.name = b'/' + args.name
+ args.name = args.name.replace(b'*', b'')
+ if args.ignorecase:
+ locations = locate_file(args.name, False)
+ else:
+ locations = locate_file(args.name)
+ if args.count:
+ poutput(len(locations))
+ else:
+ poutput((b'\n'.join(locations)).decode('utf-8'))
+
+ def complete_du(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ du_parser = argparse.ArgumentParser(
+ description='Disk Usage of a Directory')
+ du_parser.add_argument('paths', type=str, action=get_list_of_bytes_path,
+ help='Name of the directory.', nargs='*',
+ default=[b'.'])
+ du_parser.add_argument('-r', action='store_true',
+ help='Recursive Disk usage of all directories.')
+
+ @with_argparser(du_parser)
+ def do_du(self, args):
+ """
+ Print disk usage of a given path(s).
+ """
+ def print_disk_usage(files):
+ if isinstance(files, bytes):
+ files = (files, )
+
+ for f in files:
+ try:
+ st = cephfs.lstat(f)
+
+ if stat.S_ISDIR(st.st_mode):
+ dusage = int(cephfs.getxattr(f,
+ 'ceph.dir.rbytes').decode('utf-8'))
+ else:
+ dusage = st.st_size
+
+ # print path in local context
+ f = os.path.normpath(f)
+ if f[0] is ord('/'):
+ f = b'.' + f
+ poutput('{:10s} {}'.format(humansize(dusage),
+ f.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+ continue
+
+ for path in args.paths:
+ if args.r:
+ print_disk_usage(sorted(set(dirwalk(path)).union({path})))
+ else:
+ print_disk_usage(path)
+
+ quota_parser = argparse.ArgumentParser(
+ description='Quota management for a Directory')
+ quota_parser.add_argument('op', choices=['get', 'set'],
+ help='Quota operation type.')
+ quota_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of the directory.')
+ quota_parser.add_argument('--max_bytes', type=int, default=-1, nargs='?',
+ help='Max cumulative size of the data under '
+ 'this directory.')
+ quota_parser.add_argument('--max_files', type=int, default=-1, nargs='?',
+ help='Total number of files under this '
+ 'directory tree.')
+
+ @with_argparser(quota_parser)
+ def do_quota(self, args):
+ """
+ Quota management.
+ """
+ if not is_dir_exists(args.path):
+ set_exit_code_msg(errno.ENOENT, 'error: no such directory {}'.format(
+ args.path.decode('utf-8')))
+ return
+
+ if args.op == 'set':
+ if (args.max_bytes == -1) and (args.max_files == -1):
+ set_exit_code_msg(errno.EINVAL, 'please specify either '
+ '--max_bytes or --max_files or both')
+ return
+
+ if args.max_bytes >= 0:
+ max_bytes = to_bytes(str(args.max_bytes))
+ try:
+ cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
+ max_bytes, os.XATTR_CREATE)
+ poutput('max_bytes set to %d' % args.max_bytes)
+ except libcephfs.Error as e:
+ cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
+ max_bytes, os.XATTR_REPLACE)
+ set_exit_code_msg(e.get_error_code(), 'max_bytes reset to '
+ f'{args.max_bytes}')
+
+ if args.max_files >= 0:
+ max_files = to_bytes(str(args.max_files))
+ try:
+ cephfs.setxattr(args.path, 'ceph.quota.max_files',
+ max_files, os.XATTR_CREATE)
+ poutput('max_files set to %d' % args.max_files)
+ except libcephfs.Error as e:
+ cephfs.setxattr(args.path, 'ceph.quota.max_files',
+ max_files, os.XATTR_REPLACE)
+ set_exit_code_msg(e.get_error_code(), 'max_files reset to '
+ f'{args.max_files}')
+ elif args.op == 'get':
+ max_bytes = '0'
+ max_files = '0'
+ try:
+ max_bytes = cephfs.getxattr(args.path, 'ceph.quota.max_bytes')
+ poutput('max_bytes: {}'.format(max_bytes.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(), 'max_bytes is not set')
+
+ try:
+ max_files = cephfs.getxattr(args.path, 'ceph.quota.max_files')
+ poutput('max_files: {}'.format(max_files.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(), 'max_files is not set')
+
+ snap_parser = argparse.ArgumentParser(description='Snapshot Management')
+ snap_parser.add_argument('op', type=str,
+ help='Snapshot operation: create or delete')
+ snap_parser.add_argument('name', type=str, action=path_to_bytes,
+ help='Name of snapshot')
+ snap_parser.add_argument('dir', type=str, action=path_to_bytes,
+ help='Directory for which snapshot '
+ 'needs to be created or deleted')
+
+ @with_argparser(snap_parser)
+ def do_snap(self, args):
+ """
+ Snapshot management for the volume
+ """
+ # setting self.colors to None turns off colorizing and
+ # perror emits plain text
+ self.colors = None
+
+ snapdir = '.snap'
+ conf_snapdir = cephfs.conf_get('client_snapdir')
+ if conf_snapdir is not None:
+ snapdir = conf_snapdir
+ snapdir = to_bytes(snapdir)
+ if args.op == 'create':
+ try:
+ if is_dir_exists(args.dir):
+ cephfs.mkdir(os.path.join(args.dir, snapdir, args.name), 0o755)
+ else:
+ set_exit_code_msg(errno.ENOENT, "'{}': no such directory".format(
+ args.dir.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(),
+ "snapshot '{}' already exists".format(
+ args.name.decode('utf-8')))
+ elif args.op == 'delete':
+ snap_dir = os.path.join(args.dir, snapdir, args.name)
+ try:
+ if is_dir_exists(snap_dir):
+ newargs = argparse.Namespace(paths=[snap_dir], parent=False)
+ self.do_rmdir_helper(newargs)
+ else:
+ set_exit_code_msg(errno.ENOENT, "'{}': no such snapshot".format(
+ args.name.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(), "error while deleting "
+ "'{}'".format(snap_dir.decode('utf-8')))
+ else:
+ set_exit_code_msg(errno.EINVAL, "snapshot can only be created or "
+ "deleted; check - help snap")
+
+ def do_help(self, line):
+ """
+ Get details about a command.
+ Usage: help <cmd> - for a specific command
+ help all - for all the commands
+ """
+ if line == 'all':
+ for k in dir(self):
+ if k.startswith('do_'):
+ poutput('-' * 80)
+ super().do_help(k[3:])
+ return
+ parser = self.create_argparser(line)
+ if parser:
+ parser.print_help()
+ else:
+ super().do_help(line)
+
+ def complete_stat(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ stat_parser = argparse.ArgumentParser(
+ description='Display file or file system status')
+ stat_parser.add_argument('paths', type=str, help='file paths',
+ action=path_to_bytes, nargs='+')
+
+ @with_argparser(stat_parser)
+ def do_stat(self, args):
+ """
+ Display file or file system status
+ """
+ for path in args.paths:
+ try:
+ stat = cephfs.stat(path)
+ atime = stat.st_atime.isoformat(' ')
+ mtime = stat.st_mtime.isoformat(' ')
+ ctime = stat.st_mtime.isoformat(' ')
+
+ poutput("File: {}\nSize: {:d}\nBlocks: {:d}\nIO Block: {:d}\n"
+ "Device: {:d}\tInode: {:d}\tLinks: {:d}\nPermission: "
+ "{:o}/{}\tUid: {:d}\tGid: {:d}\nAccess: {}\nModify: "
+ "{}\nChange: {}".format(path.decode('utf-8'),
+ stat.st_size, stat.st_blocks,
+ stat.st_blksize, stat.st_dev,
+ stat.st_ino, stat.st_nlink,
+ stat.st_mode,
+ mode_notation(stat.st_mode),
+ stat.st_uid, stat.st_gid, atime,
+ mtime, ctime))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ setxattr_parser = argparse.ArgumentParser(
+ description='Set extended attribute for a file')
+ setxattr_parser.add_argument('path', type=str, action=path_to_bytes, help='Name of the file')
+ setxattr_parser.add_argument('name', type=str, help='Extended attribute name')
+ setxattr_parser.add_argument('value', type=str, help='Extended attribute value')
+
+ @with_argparser(setxattr_parser)
+ def do_setxattr(self, args):
+ """
+ Set extended attribute for a file
+ """
+ val_bytes = to_bytes(args.value)
+ name_bytes = to_bytes(args.name)
+ try:
+ cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_CREATE)
+ poutput('{} is successfully set to {}'.format(args.name, args.value))
+ except libcephfs.ObjectExists:
+ cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_REPLACE)
+ poutput('{} is successfully reset to {}'.format(args.name, args.value))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ getxattr_parser = argparse.ArgumentParser(
+ description='Get extended attribute set for a file')
+ getxattr_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of the file')
+ getxattr_parser.add_argument('name', type=str, help='Extended attribute name')
+
+ @with_argparser(getxattr_parser)
+ def do_getxattr(self, args):
+ """
+ Get extended attribute for a file
+ """
+ try:
+ poutput('{}'.format(cephfs.getxattr(args.path,
+ to_bytes(args.name)).decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ listxattr_parser = argparse.ArgumentParser(
+ description='List extended attributes set for a file')
+ listxattr_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of the file')
+
+ @with_argparser(listxattr_parser)
+ def do_listxattr(self, args):
+ """
+ List extended attributes for a file
+ """
+ try:
+ size, xattr_list = cephfs.listxattr(args.path)
+ if size > 0:
+ poutput('{}'.format(xattr_list.replace(b'\x00', b' ').decode('utf-8')))
+ else:
+ poutput('No extended attribute is set')
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+
+#######################################################
+#
+# Following are methods that get cephfs-shell started.
+#
+#####################################################
+
+def setup_cephfs(args):
+ """
+ Mounting a cephfs
+ """
+ global cephfs
+ try:
+ cephfs = libcephfs.LibCephFS(conffile='')
+ cephfs.mount(filesystem_name=args.fs)
+ except libcephfs.ObjectNotFound as e:
+ print('couldn\'t find ceph configuration not found')
+ sys.exit(e.get_error_code())
+ except libcephfs.Error as e:
+ print(e)
+ sys.exit(e.get_error_code())
+
+
+def str_to_bool(val):
+ """
+ Return corresponding bool values for strings like 'true' or 'false'.
+ """
+ if not isinstance(val, str):
+ return val
+
+ val = val.replace('\n', '')
+ if val.lower() in ['true', 'yes']:
+ return True
+ elif val.lower() in ['false', 'no']:
+ return False
+ else:
+ return val
+
+
+def read_shell_conf(shell, shell_conf_file):
+ import configparser
+
+ sec = 'cephfs-shell'
+ opts = []
+ if LooseVersion(cmd2_version) >= LooseVersion("0.10.0"):
+ for attr in shell.settables.keys():
+ opts.append(attr)
+ else:
+ if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"):
+ # hardcoding options for 0.7.9 because -
+ # 1. we use cmd2 v0.7.9 with teuthology and
+ # 2. there's no way distinguish between a shell setting and shell
+ # object attribute until v0.10.0
+ opts = ['abbrev', 'autorun_on_edit', 'colors',
+ 'continuation_prompt', 'debug', 'echo', 'editor',
+ 'feedback_to_output', 'locals_in_py', 'prompt', 'quiet',
+ 'timing']
+ elif LooseVersion(cmd2_version) >= LooseVersion("0.9.23"):
+ opts.append('allow_style')
+ # no equivalent option was defined by cmd2.
+ else:
+ pass
+
+ # default and only section in our conf file.
+ cp = configparser.ConfigParser(default_section=sec, strict=False)
+ cp.read(shell_conf_file)
+ for opt in opts:
+ if cp.has_option(sec, opt):
+ setattr(shell, opt, str_to_bool(cp.get(sec, opt)))
+
+
+def get_shell_conffile_path(arg_conf=''):
+ conf_filename = 'cephfs-shell.conf'
+ env_var = 'CEPHFS_SHELL_CONF'
+
+ arg_conf = '' if not arg_conf else arg_conf
+ home_dir_conf = os.path.expanduser('~/.' + conf_filename)
+ env_conf = os.environ[env_var] if env_var in os.environ else ''
+
+ # here's the priority by which conf gets read.
+ for path in (arg_conf, env_conf, home_dir_conf):
+ if os.path.isfile(path):
+ return path
+ else:
+ return ''
+
+
+def manage_args():
+ main_parser = argparse.ArgumentParser(description='')
+ main_parser.add_argument('-b', '--batch', action='store',
+ help='Path to CephFS shell script/batch file'
+ 'containing CephFS shell commands',
+ type=str)
+ main_parser.add_argument('-c', '--config', action='store',
+ help='Path to Ceph configuration file.',
+ type=str)
+ main_parser.add_argument('-f', '--fs', action='store',
+ help='Name of filesystem to mount.',
+ type=str)
+ main_parser.add_argument('-t', '--test', action='store',
+ help='Test against transcript(s) in FILE',
+ nargs='+')
+ main_parser.add_argument('commands', nargs='*', help='Comma delimited '
+ 'commands. The shell executes the given command '
+ 'and quits immediately with the return value of '
+ 'command. In case no commands are provided, the '
+ 'shell is launched.', default=[])
+
+ args = main_parser.parse_args()
+ args.exe_and_quit = False # Execute and quit, don't launch the shell.
+
+ if args.batch:
+ if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"):
+ args.commands = ['load ' + args.batch, ',quit']
+ else:
+ args.commands = ['run_script ' + args.batch, ',quit']
+ if args.test:
+ args.commands.extend(['-t,'] + [arg + ',' for arg in args.test])
+ if not args.batch and len(args.commands) > 0:
+ args.exe_and_quit = True
+
+ manage_sys_argv(args)
+
+ return args
+
+
+def manage_sys_argv(args):
+ exe = sys.argv[0]
+ sys.argv.clear()
+ sys.argv.append(exe)
+ sys.argv.extend([i.strip() for i in ' '.join(args.commands).split(',')])
+
+ setup_cephfs(args)
+
+
+def execute_cmd_args(args):
+ """
+ Launch a shell session if no arguments were passed, else just execute
+ the given argument as a shell command and exit the shell session
+ immediately at (last) command's termination with the (last) command's
+ return value.
+ """
+ if not args.exe_and_quit:
+ return shell.cmdloop()
+ return execute_cmds_and_quit(args)
+
+
+def execute_cmds_and_quit(args):
+ """
+ Multiple commands might be passed separated by commas, feed onecmd()
+ one command at a time.
+ """
+ # do_* methods triggered by cephfs-shell commands return None when they
+ # complete running successfully. Until 0.9.6, shell.onecmd() returned this
+ # value to indicate whether the execution of the commands should stop, but
+ # since 0.9.7 it returns the return value of do_* methods only if it's
+ # not None. When it is None it returns False instead of None.
+ if LooseVersion(cmd2_version) <= LooseVersion("0.9.6"):
+ stop_exec_val = None
+ else:
+ stop_exec_val = False
+
+ args_to_onecmd = ''
+ if len(args.commands) <= 1:
+ args.commands = args.commands[0].split(' ')
+ for cmdarg in args.commands:
+ if ',' in cmdarg:
+ args_to_onecmd += ' ' + cmdarg[0:-1]
+ onecmd_retval = shell.onecmd(args_to_onecmd)
+ # if the current command failed, let's abort the execution of
+ # series of commands passed.
+ if onecmd_retval is not stop_exec_val:
+ return onecmd_retval
+ if shell.exit_code != 0:
+ return shell.exit_code
+
+ args_to_onecmd = ''
+ continue
+
+ args_to_onecmd += ' ' + cmdarg
+ return shell.onecmd(args_to_onecmd)
+
+
+if __name__ == '__main__':
+ args = manage_args()
+
+ shell = CephFSShell()
+ # TODO: perhaps, we should add an option to pass ceph.conf?
+ read_shell_conf(shell, get_shell_conffile_path(args.config))
+ # XXX: setting shell.exit_code to zero so that in case there are no errors
+ # and exceptions, it is not set by any method or function of cephfs-shell
+ # and return values from shell.cmdloop() or shell.onecmd() is not an
+ # integer, we can treat it as the return value of cephfs-shell.
+ shell.exit_code = 0
+
+ retval = execute_cmd_args(args)
+ sys.exit(retval if retval else shell.exit_code)