diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/volumes/fs/operations | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs/operations')
22 files changed, 3858 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/operations/__init__.py b/src/pybind/mgr/volumes/fs/operations/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/__init__.py diff --git a/src/pybind/mgr/volumes/fs/operations/access.py b/src/pybind/mgr/volumes/fs/operations/access.py new file mode 100644 index 000000000..158e21c26 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/access.py @@ -0,0 +1,139 @@ +import errno +import json +from typing import List + +def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True): + caps_list = [] # type: List[str] + for k, v in existing_caps['caps'].items(): + if k == 'mds' or k == 'osd': + continue + elif k == 'mon': + if not authorize and v == 'allow r': + continue + caps_list.extend((k,v)) + + if mds_cap_str: + caps_list.extend(('mds', mds_cap_str)) + if osd_cap_str: + caps_list.extend(('osd', osd_cap_str)) + + if authorize and 'mon' not in caps_list: + caps_list.extend(('mon', 'allow r')) + + return caps_list + + +def allow_access(mgr, client_entity, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap, existing_caps): + if existing_caps is None: + ret, out, err = mgr.mon_command({ + "prefix": "auth get-or-create", + "entity": client_entity, + "caps": ['mds', want_mds_cap, 'osd', want_osd_cap, 'mon', 'allow r'], + "format": "json"}) + else: + cap = existing_caps[0] + + def cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, + want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): + + if not orig_mds_caps: + return want_mds_cap, want_osd_cap + + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + if want_mds_cap in mds_cap_tokens: + return orig_mds_caps, orig_osd_caps + + if unwanted_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(unwanted_mds_cap) + osd_cap_tokens.remove(unwanted_osd_cap) + + mds_cap_tokens.append(want_mds_cap) + osd_cap_tokens.append(want_osd_cap) + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + + mds_cap_str, osd_cap_str = cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap) + + caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str) + mgr.mon_command( + { + "prefix": "auth caps", + 'entity': client_entity, + 'caps': caps_list + }) + ret, out, err = mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + + # Result expected like this: + # [ + # { + # "entity": "client.foobar", + # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", + # "caps": { + # "mds": "allow *", + # "mon": "allow *" + # } + # } + # ] + + caps = json.loads(out) + assert len(caps) == 1 + assert caps[0]['entity'] == client_entity + return caps[0]['key'] + +def deny_access(mgr, client_entity, want_mds_caps, want_osd_caps): + ret, out, err = mgr.mon_command({ + "prefix": "auth get", + "entity": client_entity, + "format": "json", + }) + + if ret == -errno.ENOENT: + # Already gone, great. + return + + def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): + if want_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(want_mds_cap) + osd_cap_tokens.remove(want_osd_cap) + break + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + cap = json.loads(out)[0] + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, + want_mds_caps, want_osd_caps) + + caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False) + if not caps_list: + mgr.mon_command( + { + 'prefix': 'auth rm', + 'entity': client_entity + }) + else: + mgr.mon_command( + { + "prefix": "auth caps", + 'entity': client_entity, + 'caps': caps_list + }) diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py new file mode 100644 index 000000000..a2b31f858 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py @@ -0,0 +1,98 @@ +import os +import uuid +import stat +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .index import Index +from ..exception import IndexException, VolumeException +from ..fs_util import list_one_entry_at_a_time + +log = logging.getLogger(__name__) + +class CloneIndex(Index): + SUB_GROUP_NAME = "clone" + PATH_MAX = 4096 + + @property + def path(self): + return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8')) + + def _track(self, sink_path): + tracking_id = str(uuid.uuid4()) + source_path = os.path.join(self.path, tracking_id.encode('utf-8')) + log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path)) + + self.fs.symlink(sink_path, source_path) + return tracking_id + + def track(self, sink_path): + try: + return self._track(sink_path) + except (VolumeException, cephfs.Error) as e: + if isinstance(e, cephfs.Error): + e = IndexException(-e.args[0], e.args[1]) + elif isinstance(e, VolumeException): + e = IndexException(e.errno, e.error_str) + raise e + + def untrack(self, tracking_id): + log.info("untracking {0}".format(tracking_id)) + source_path = os.path.join(self.path, tracking_id.encode('utf-8')) + try: + self.fs.unlink(source_path) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + + def get_oldest_clone_entry(self, exclude=[]): + min_ctime_entry = None + exclude_tracking_ids = [v[0] for v in exclude] + log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids)) + for entry in list_one_entry_at_a_time(self.fs, self.path): + dname = entry.d_name + dpath = os.path.join(self.path, dname) + st = self.fs.lstat(dpath) + if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode): + if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime: + min_ctime_entry = (dname, st) + if min_ctime_entry: + try: + linklen = min_ctime_entry[1].st_size + sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX) + return (min_ctime_entry[0], sink_path[:linklen]) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + return None + + def find_clone_entry_index(self, sink_path): + try: + for entry in list_one_entry_at_a_time(self.fs, self.path): + dname = entry.d_name + dpath = os.path.join(self.path, dname) + st = self.fs.lstat(dpath) + if stat.S_ISLNK(st.st_mode): + target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX) + if sink_path == target_path[:st.st_size]: + return dname + return None + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + +def create_clone_index(fs, vol_spec): + clone_index = CloneIndex(fs, vol_spec) + try: + fs.mkdirs(clone_index.path, 0o700) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + +@contextmanager +def open_clone_index(fs, vol_spec): + clone_index = CloneIndex(fs, vol_spec) + try: + fs.stat(clone_index.path) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + yield clone_index diff --git a/src/pybind/mgr/volumes/fs/operations/group.py b/src/pybind/mgr/volumes/fs/operations/group.py new file mode 100644 index 000000000..c91969278 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/group.py @@ -0,0 +1,301 @@ +import os +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .snapshot_util import mksnap, rmsnap +from .pin_util import pin +from .template import GroupTemplate +from ..fs_util import listdir, listsnaps, get_ancestor_xattr, create_base_dir, has_subdir +from ..exception import VolumeException + +log = logging.getLogger(__name__) + +class Group(GroupTemplate): + # Reserved subvolume group name which we use in paths for subvolumes + # that are not assigned to a group (i.e. created with group=None) + NO_GROUP_NAME = "_nogroup" + + def __init__(self, fs, vol_spec, groupname): + if groupname == Group.NO_GROUP_NAME: + raise VolumeException(-errno.EPERM, "Operation not permitted for group '{0}' as it is an internal group.".format(groupname)) + if groupname in vol_spec.INTERNAL_DIRS: + raise VolumeException(-errno.EINVAL, "'{0}' is an internal directory and not a valid group name.".format(groupname)) + self.fs = fs + self.user_id = None + self.group_id = None + self.vol_spec = vol_spec + self.groupname = groupname if groupname else Group.NO_GROUP_NAME + + @property + def path(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8')) + + @property + def group_name(self): + return self.groupname + + @property + def uid(self): + return self.user_id + + @uid.setter + def uid(self, val): + self.user_id = val + + @property + def gid(self): + return self.group_id + + @gid.setter + def gid(self, val): + self.group_id = val + + def is_default_group(self): + return self.groupname == Group.NO_GROUP_NAME + + def list_subvolumes(self): + try: + return listdir(self.fs, self.path) + except VolumeException as ve: + # listing a default group when it's not yet created + if ve.errno == -errno.ENOENT and self.is_default_group(): + return [] + raise + + def has_subvolumes(self): + try: + return has_subdir(self.fs, self.path) + except VolumeException as ve: + # listing a default group when it's not yet created + if ve.errno == -errno.ENOENT and self.is_default_group(): + return False + raise + + def pin(self, pin_type, pin_setting): + return pin(self.fs, self.path, pin_type, pin_setting) + + def create_snapshot(self, snapname): + snappath = os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + mksnap(self.fs, snappath) + + def remove_snapshot(self, snapname): + snappath = os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + rmsnap(self.fs, snappath) + + def list_snapshots(self): + try: + dirpath = os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return [] + raise + + def info(self): + st = self.fs.statx(self.path, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE + | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE + | cephfs.CEPH_STATX_ATIME | cephfs.CEPH_STATX_MTIME | cephfs.CEPH_STATX_CTIME, + cephfs.AT_SYMLINK_NOFOLLOW) + usedbytes = st["size"] + try: + nsize = int(self.fs.getxattr(self.path, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + nsize = 0 + + try: + data_pool = self.fs.getxattr(self.path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + return {'uid': int(st["uid"]), + 'gid': int(st["gid"]), + 'atime': str(st["atime"]), + 'mtime': str(st["mtime"]), + 'ctime': str(st["ctime"]), + 'mode': int(st["mode"]), + 'data_pool': data_pool, + 'created_at': str(st["btime"]), + 'bytes_quota': "infinite" if nsize == 0 else nsize, + 'bytes_used': int(usedbytes), + 'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)} + + def resize(self, newsize, noshrink): + try: + newsize = int(newsize) + if newsize <= 0: + raise VolumeException(-errno.EINVAL, "Invalid subvolume group size") + except ValueError: + newsize = newsize.lower() + if not (newsize == "inf" or newsize == "infinite"): + raise (VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize))) + newsize = 0 + noshrink = False + + try: + maxbytes = int(self.fs.getxattr(self.path, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + maxbytes = 0 + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + group_stat = self.fs.stat(self.path) + if newsize > 0 and newsize < group_stat.st_size: + if noshrink: + raise VolumeException(-errno.EINVAL, "Can't resize the subvolume group. The new size" + " '{0}' would be lesser than the current used size '{1}'" + .format(newsize, group_stat.st_size)) + + if not newsize == maxbytes: + try: + self.fs.setxattr(self.path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0) + except cephfs.Error as e: + raise (VolumeException(-e.args[0], + "Cannot set new size for the subvolume group. '{0}'".format(e.args[1]))) + return newsize, group_stat.st_size + +def set_group_attrs(fs, path, attrs): + # set subvolume group attrs + # set size + quota = attrs.get("quota") + if quota is not None: + try: + fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # set pool layout + pool = attrs.get("data_pool") + if not pool: + pool = get_ancestor_xattr(fs, path, "ceph.dir.layout.pool") + try: + fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, + "Invalid pool layout '{0}'. It must be a valid data pool".format(pool)) + + # set uid/gid + uid = attrs.get("uid") + if uid is None: + uid = 0 + else: + try: + uid = int(uid) + if uid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid UID") + + gid = attrs.get("gid") + if gid is None: + gid = 0 + else: + try: + gid = int(gid) + if gid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid GID") + fs.chown(path, uid, gid) + + # set mode + mode = attrs.get("mode", None) + if mode is not None: + fs.lchmod(path, mode) + +def create_group(fs, vol_spec, groupname, size, pool, mode, uid, gid): + """ + create a subvolume group. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param groupname: subvolume group name + :param size: In bytes, or None for no size limit + :param pool: the RADOS pool where the data objects of the subvolumes will be stored + :param mode: the user permissions + :param uid: the user identifier + :param gid: the group identifier + :return: None + """ + group = Group(fs, vol_spec, groupname) + path = group.path + vol_spec_base_dir = group.vol_spec.base_dir.encode('utf-8') + + # create vol_spec base directory with default mode(0o755) if it doesn't exist + create_base_dir(fs, vol_spec_base_dir, vol_spec.DEFAULT_MODE) + fs.mkdir(path, mode) + try: + attrs = { + 'uid': uid, + 'gid': gid, + 'data_pool': pool, + 'quota': size + } + set_group_attrs(fs, path, attrs) + except (cephfs.Error, VolumeException) as e: + try: + # cleanup group path on best effort basis + log.debug("cleaning up subvolume group path: {0}".format(path)) + fs.rmdir(path) + except cephfs.Error as ce: + log.debug("failed to clean up subvolume group {0} with path: {1} ({2})".format(groupname, path, ce)) + if isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + +def remove_group(fs, vol_spec, groupname): + """ + remove a subvolume group. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param groupname: subvolume group name + :return: None + """ + group = Group(fs, vol_spec, groupname) + try: + fs.rmdir(group.path) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname)) + raise VolumeException(-e.args[0], e.args[1]) + +@contextmanager +def open_group(fs, vol_spec, groupname): + """ + open a subvolume group. This API is to be used as a context manager. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param groupname: subvolume group name + :return: yields a group object (subclass of GroupTemplate) + """ + group = Group(fs, vol_spec, groupname) + try: + st = fs.stat(group.path) + group.uid = int(st.st_uid) + group.gid = int(st.st_gid) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + if not group.is_default_group(): + raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname)) + else: + raise VolumeException(-e.args[0], e.args[1]) + yield group + +@contextmanager +def open_group_unique(fs, vol_spec, groupname, c_group, c_groupname): + if groupname == c_groupname: + yield c_group + else: + with open_group(fs, vol_spec, groupname) as group: + yield group diff --git a/src/pybind/mgr/volumes/fs/operations/index.py b/src/pybind/mgr/volumes/fs/operations/index.py new file mode 100644 index 000000000..0e4296d75 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/index.py @@ -0,0 +1,23 @@ +import errno +import os + +from ..exception import VolumeException +from .template import GroupTemplate + +class Index(GroupTemplate): + GROUP_NAME = "_index" + + def __init__(self, fs, vol_spec): + self.fs = fs + self.vol_spec = vol_spec + self.groupname = Index.GROUP_NAME + + @property + def path(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8')) + + def track(self, *args): + raise VolumeException(-errno.EINVAL, "operation not supported.") + + def untrack(self, tracking_id): + raise VolumeException(-errno.EINVAL, "operation not supported.") diff --git a/src/pybind/mgr/volumes/fs/operations/lock.py b/src/pybind/mgr/volumes/fs/operations/lock.py new file mode 100644 index 000000000..7ef6923e1 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/lock.py @@ -0,0 +1,43 @@ +from contextlib import contextmanager +import logging +from threading import Lock +from typing import Dict + +log = logging.getLogger(__name__) + +# singleton design pattern taken from http://www.aleax.it/5ep.html + +class GlobalLock(object): + """ + Global lock to serialize operations in mgr/volumes. This lock + is currently held when accessing (opening) a volume to perform + group/subvolume operations. Since this is a big lock, it's rather + inefficient -- but right now it's ok since mgr/volumes does not + expect concurrent operations via its APIs. + + As and when features get added (such as clone, where mgr/volumes + would maintain subvolume states in the filesystem), there might + be a need to allow concurrent operations. In that case it would + be nice to implement an efficient path based locking mechanism. + + See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf + """ + _shared_state = { + 'lock' : Lock(), + 'init' : False + } # type: Dict + + def __init__(self): + with self._shared_state['lock']: + if not self._shared_state['init']: + self._shared_state['init'] = True + # share this state among all instances + self.__dict__ = self._shared_state + + @contextmanager + def lock_op(self): + log.debug("entering global lock") + with self._shared_state['lock']: + log.debug("acquired global lock") + yield + log.debug("exited global lock") diff --git a/src/pybind/mgr/volumes/fs/operations/pin_util.py b/src/pybind/mgr/volumes/fs/operations/pin_util.py new file mode 100644 index 000000000..9ea79e546 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/pin_util.py @@ -0,0 +1,34 @@ +import os +import errno + +import cephfs + +from ..exception import VolumeException +from distutils.util import strtobool + +_pin_value = { + "export": lambda x: int(x), + "distributed": lambda x: int(strtobool(x)), + "random": lambda x: float(x), +} +_pin_xattr = { + "export": "ceph.dir.pin", + "distributed": "ceph.dir.pin.distributed", + "random": "ceph.dir.pin.random", +} + +def pin(fs, path, pin_type, pin_setting): + """ + Set a pin on a directory. + """ + assert pin_type in _pin_xattr + + try: + pin_setting = _pin_value[pin_type](pin_setting) + except ValueError as e: + raise VolumeException(-errno.EINVAL, f"pin value wrong type: {pin_setting}") + + try: + fs.setxattr(path, _pin_xattr[pin_type], str(pin_setting).encode('utf-8'), 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) diff --git a/src/pybind/mgr/volumes/fs/operations/rankevicter.py b/src/pybind/mgr/volumes/fs/operations/rankevicter.py new file mode 100644 index 000000000..5b945c389 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/rankevicter.py @@ -0,0 +1,114 @@ +import errno +import json +import logging +import threading +import time + +from .volume import get_mds_map +from ..exception import ClusterTimeout, ClusterError + +log = logging.getLogger(__name__) + +class RankEvicter(threading.Thread): + """ + Thread for evicting client(s) from a particular MDS daemon instance. + + This is more complex than simply sending a command, because we have to + handle cases where MDS daemons might not be fully up yet, and/or might + be transiently unresponsive to commands. + """ + class GidGone(Exception): + pass + + POLL_PERIOD = 5 + + def __init__(self, mgr, fs, client_spec, volname, rank, gid, mds_map, ready_timeout): + """ + :param client_spec: list of strings, used as filter arguments to "session evict" + pass ["id=123"] to evict a single client with session id 123. + """ + self.volname = volname + self.rank = rank + self.gid = gid + self._mds_map = mds_map + self._client_spec = client_spec + self._fs = fs + self._ready_timeout = ready_timeout + self._ready_waited = 0 + self.mgr = mgr + + self.success = False + self.exception = None + + super(RankEvicter, self).__init__() + + def _ready_to_evict(self): + if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: + log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( + self._client_spec, self.rank, self.gid + )) + raise RankEvicter.GidGone() + + info = self._mds_map['info']["gid_{0}".format(self.gid)] + log.debug("_ready_to_evict: state={0}".format(info['state'])) + return info['state'] in ["up:active", "up:clientreplay"] + + def _wait_for_ready(self): + """ + Wait for that MDS rank to reach an active or clientreplay state, and + not be laggy. + """ + while not self._ready_to_evict(): + if self._ready_waited > self._ready_timeout: + raise ClusterTimeout() + + time.sleep(self.POLL_PERIOD) + self._ready_waited += self.POLL_PERIOD + self._mds_map = get_mds_map(self.mgr, self.volname) + + def _evict(self): + """ + Run the eviction procedure. Return true on success, false on errors. + """ + + # Wait til the MDS is believed by the mon to be available for commands + try: + self._wait_for_ready() + except self.GidGone: + return True + + # Then send it an evict + ret = -errno.ETIMEDOUT + while ret == -errno.ETIMEDOUT: + log.debug("mds_command: {0}, {1}".format( + "%s" % self.gid, ["session", "evict"] + self._client_spec + )) + ret, outb, outs = self._fs.mds_command( + "%s" % self.gid, + json.dumps({ + "prefix": "session evict", + "filters": self._client_spec + }), "") + log.debug("mds_command: complete {0} {1}".format(ret, outs)) + + # If we get a clean response, great, it's gone from that rank. + if ret == 0: + return True + elif ret == -errno.ETIMEDOUT: + # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) + self._mds_map = get_mds_map(self.mgr, self.volname) + try: + self._wait_for_ready() + except self.GidGone: + return True + else: + raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) + + def run(self): + try: + self._evict() + except Exception as e: + self.success = False + self.exception = e + else: + self.success = True diff --git a/src/pybind/mgr/volumes/fs/operations/resolver.py b/src/pybind/mgr/volumes/fs/operations/resolver.py new file mode 100644 index 000000000..a9543654e --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/resolver.py @@ -0,0 +1,26 @@ +import os + +from .group import Group + +def splitall(path): + if path == "/": + return ["/"] + s = os.path.split(path) + return splitall(s[0]) + [s[1]] + +def resolve(vol_spec, path): + parts = splitall(path) + if len(parts) != 4 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix: + return None + groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2] + subvolname = parts[3] + return (groupname, subvolname) + +def resolve_trash(vol_spec, path): + parts = splitall(path) + if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \ + parts[4] != '.trash': + return None + groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2] + subvolname = parts[3] + return (groupname, subvolname) diff --git a/src/pybind/mgr/volumes/fs/operations/snapshot_util.py b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py new file mode 100644 index 000000000..2223c58e5 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py @@ -0,0 +1,30 @@ +import os +import errno + +import cephfs + +from ..exception import VolumeException + +def mksnap(fs, snappath): + """ + Create a snapshot, or do nothing if it already exists. + """ + try: + # snap create does not accept mode -- use default + fs.mkdir(snappath, 0o755) + except cephfs.ObjectExists: + return + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +def rmsnap(fs, snappath): + """ + Remove a snapshot + """ + try: + fs.stat(snappath) + fs.rmdir(snappath) + except cephfs.ObjectNotFound: + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(os.path.basename(snappath))) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) diff --git a/src/pybind/mgr/volumes/fs/operations/subvolume.py b/src/pybind/mgr/volumes/fs/operations/subvolume.py new file mode 100644 index 000000000..dc36477b5 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/subvolume.py @@ -0,0 +1,74 @@ +import os +import errno +from contextlib import contextmanager + +from ..exception import VolumeException +from .template import SubvolumeOpType + +from .versions import loaded_subvolumes + +def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid): + """ + create a subvolume (create a subvolume with the max known version). + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the subvolume + :param size: In bytes, or None for no size limit + :param isolate_nspace: If true, use separate RADOS namespace for this subvolume + :param pool: the RADOS pool where the data objects of the subvolumes will be stored + :param mode: the user permissions + :param uid: the user identifier + :param gid: the group identifier + :return: None + """ + subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname) + subvolume.create(size, isolate_nspace, pool, mode, uid, gid) + +def create_clone(mgr, fs, vol_spec, group, subvolname, pool, source_volume, source_subvolume, snapname): + """ + create a cloned subvolume. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the clone + :param subvolname: clone subvolume nam + :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored + :param source_volume: source subvolumes volume name + :param source_subvolume: source (parent) subvolume object + :param snapname: source subvolume snapshot + :return None + """ + subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname) + subvolume.create_clone(pool, source_volume, source_subvolume, snapname) + +def remove_subvol(mgr, fs, vol_spec, group, subvolname, force=False, retainsnaps=False): + """ + remove a subvolume. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the subvolume + :param subvolname: subvolume name + :param force: force remove subvolumes + :return: None + """ + op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE + with open_subvol(mgr, fs, vol_spec, group, subvolname, op_type) as subvolume: + subvolume.remove(retainsnaps) + +@contextmanager +def open_subvol(mgr, fs, vol_spec, group, subvolname, op_type): + """ + open a subvolume. This API is to be used as a context manager. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the subvolume + :param subvolname: subvolume name + :param op_type: operation type for which subvolume is being opened + :return: yields a subvolume object (subclass of SubvolumeTemplate) + """ + subvolume = loaded_subvolumes.get_subvolume_object(mgr, fs, vol_spec, group, subvolname) + subvolume.open(op_type) + yield subvolume diff --git a/src/pybind/mgr/volumes/fs/operations/template.py b/src/pybind/mgr/volumes/fs/operations/template.py new file mode 100644 index 000000000..eb55bd743 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/template.py @@ -0,0 +1,191 @@ +import errno + +from enum import Enum, unique + +from ..exception import VolumeException + +class GroupTemplate(object): + def list_subvolumes(self): + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create_snapshot(self, snapname): + """ + create a subvolume group snapshot. + + :param: group snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def remove_snapshot(self, snapname): + """ + remove a subvolume group snapshot. + + :param: group snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def list_snapshots(self): + """ + list all subvolume group snapshots. + + :param: None + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + +@unique +class SubvolumeOpType(Enum): + CREATE = 'create' + REMOVE = 'rm' + REMOVE_FORCE = 'rm-force' + PIN = 'pin' + LIST = 'ls' + GETPATH = 'getpath' + INFO = 'info' + RESIZE = 'resize' + SNAP_CREATE = 'snap-create' + SNAP_REMOVE = 'snap-rm' + SNAP_LIST = 'snap-ls' + SNAP_INFO = 'snap-info' + SNAP_PROTECT = 'snap-protect' + SNAP_UNPROTECT = 'snap-unprotect' + CLONE_SOURCE = 'clone-source' + CLONE_CREATE = 'clone-create' + CLONE_STATUS = 'clone-status' + CLONE_CANCEL = 'clone-cancel' + CLONE_INTERNAL = 'clone_internal' + ALLOW_ACCESS = 'allow-access' + DENY_ACCESS = 'deny-access' + AUTH_LIST = 'auth-list' + EVICT = 'evict' + USER_METADATA_SET = 'user-metadata-set' + USER_METADATA_GET = 'user-metadata-get' + USER_METADATA_LIST = 'user-metadata-ls' + USER_METADATA_REMOVE = 'user-metadata-rm' + SNAP_METADATA_SET = 'snap-metadata-set' + SNAP_METADATA_GET = 'snap-metadata-get' + SNAP_METADATA_LIST = 'snap-metadata-ls' + SNAP_METADATA_REMOVE = 'snap-metadata-rm' + +class SubvolumeTemplate(object): + VERSION = None # type: int + + @staticmethod + def version(): + return SubvolumeTemplate.VERSION + + def open(self, op_type): + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def status(self): + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + """ + set up metadata, pools and auth for a subvolume. + + This function is idempotent. It is safe to call this again + for an already-created subvolume, even if it is in use. + + :param size: In bytes, or None for no size limit + :param isolate_nspace: If true, use separate RADOS namespace for this subvolume + :param pool: the RADOS pool where the data objects of the subvolumes will be stored + :param mode: the user permissions + :param uid: the user identifier + :param gid: the group identifier + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + """ + prepare a subvolume to be cloned. + + :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored + :param source_volname: source volume of snapshot + :param source_subvolume: source subvolume of snapshot + :param snapname: snapshot name to be cloned from + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def remove(self): + """ + make a subvolume inaccessible to guests. + + This function is idempotent. It is safe to call this again + + :param: None + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def resize(self, newsize, nshrink): + """ + resize a subvolume + + :param newsize: new size In bytes (or inf/infinite) + :return: new quota size and used bytes as a tuple + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def pin(self, pin_type, pin_setting): + """ + pin a subvolume + + :param pin_type: type of pin + :param pin_setting: setting for pin + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create_snapshot(self, snapname): + """ + snapshot a subvolume. + + :param: subvolume snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def remove_snapshot(self, snapname): + """ + remove a subvolume snapshot. + + :param: subvolume snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def list_snapshots(self): + """ + list all subvolume snapshots. + + :param: None + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def attach_snapshot(self, snapname, tgt_subvolume): + """ + attach a snapshot to a target cloned subvolume. the target subvolume + should be an empty subvolume (type "clone") in "pending" state. + + :param: snapname: snapshot to attach to a clone + :param: tgt_subvolume: target clone subvolume + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def detach_snapshot(self, snapname, tgt_subvolume): + """ + detach a snapshot from a target cloned subvolume. the target subvolume + should either be in "failed" or "completed" state. + + :param: snapname: snapshot to detach from a clone + :param: tgt_subvolume: target clone subvolume + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") diff --git a/src/pybind/mgr/volumes/fs/operations/trash.py b/src/pybind/mgr/volumes/fs/operations/trash.py new file mode 100644 index 000000000..66f1d71cf --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/trash.py @@ -0,0 +1,145 @@ +import os +import uuid +import logging +from contextlib import contextmanager + +import cephfs + +from .template import GroupTemplate +from ..fs_util import listdir +from ..exception import VolumeException + +log = logging.getLogger(__name__) + +class Trash(GroupTemplate): + GROUP_NAME = "_deleting" + + def __init__(self, fs, vol_spec): + self.fs = fs + self.vol_spec = vol_spec + self.groupname = Trash.GROUP_NAME + + @property + def path(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8')) + + @property + def unique_trash_path(self): + """ + return a unique trash directory entry path + """ + return os.path.join(self.path, str(uuid.uuid4()).encode('utf-8')) + + def _get_single_dir_entry(self, exclude_list=[]): + exclude_list.extend((b".", b"..")) + try: + with self.fs.opendir(self.path) as d: + entry = self.fs.readdir(d) + while entry: + if entry.d_name not in exclude_list: + return entry.d_name + entry = self.fs.readdir(d) + return None + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def get_trash_entry(self, exclude_list): + """ + get a trash entry excluding entries provided. + + :praram exclude_list: entries to exclude + :return: trash entry + """ + return self._get_single_dir_entry(exclude_list) + + def purge(self, trashpath, should_cancel): + """ + purge a trash entry. + + :praram trash_entry: the trash entry to purge + :praram should_cancel: callback to check if the purge should be aborted + :return: None + """ + def rmtree(root_path): + log.debug("rmtree {0}".format(root_path)) + try: + with self.fs.opendir(root_path) as dir_handle: + d = self.fs.readdir(dir_handle) + while d and not should_cancel(): + if d.d_name not in (b".", b".."): + d_full = os.path.join(root_path, d.d_name) + if d.is_dir(): + rmtree(d_full) + else: + self.fs.unlink(d_full) + d = self.fs.readdir(dir_handle) + except cephfs.ObjectNotFound: + return + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + # remove the directory only if we were not asked to cancel + # (else we would fail to remove this anyway) + if not should_cancel(): + self.fs.rmdir(root_path) + + # catch any unlink errors + try: + rmtree(trashpath) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def dump(self, path): + """ + move an filesystem entity to trash can. + + :praram path: the filesystem path to be moved + :return: None + """ + try: + self.fs.rename(path, self.unique_trash_path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def link(self, path, bname): + pth = os.path.join(self.path, bname) + try: + self.fs.symlink(path, pth) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def delink(self, bname): + pth = os.path.join(self.path, bname) + try: + self.fs.unlink(pth) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +def create_trashcan(fs, vol_spec): + """ + create a trash can. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :return: None + """ + trashcan = Trash(fs, vol_spec) + try: + fs.mkdirs(trashcan.path, 0o700) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +@contextmanager +def open_trashcan(fs, vol_spec): + """ + open a trash can. This API is to be used as a context manager. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :return: yields a trash can object (subclass of GroupTemplate) + """ + trashcan = Trash(fs, vol_spec) + try: + fs.stat(trashcan.path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + yield trashcan diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py new file mode 100644 index 000000000..544afa165 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py @@ -0,0 +1,112 @@ +import errno +import logging +import importlib + +import cephfs + +from .subvolume_base import SubvolumeBase +from .subvolume_attrs import SubvolumeTypes +from .subvolume_v1 import SubvolumeV1 +from .subvolume_v2 import SubvolumeV2 +from .metadata_manager import MetadataManager +from .op_sm import SubvolumeOpSm +from ..template import SubvolumeOpType +from ...exception import MetadataMgrException, OpSmException, VolumeException + +log = logging.getLogger(__name__) + +class SubvolumeLoader(object): + INVALID_VERSION = -1 + + SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1', 'subvolume_v2.SubvolumeV2'] + + def __init__(self): + self.max_version = SubvolumeLoader.INVALID_VERSION + self.versions = {} + + def _load_module(self, mod_cls): + mod_name, cls_name = mod_cls.split('.') + mod = importlib.import_module('.versions.{0}'.format(mod_name), package='volumes.fs.operations') + return getattr(mod, cls_name) + + def _load_supported_versions(self): + for mod_cls in SubvolumeLoader.SUPPORTED_MODULES: + cls = self._load_module(mod_cls) + log.info("loaded v{0} subvolume".format(cls.version())) + if self.max_version is not None or cls.version() > self.max_version: + self.max_version = cls.version() + self.versions[cls.version()] = cls + if self.max_version == SubvolumeLoader.INVALID_VERSION: + raise VolumeException(-errno.EINVAL, "no subvolume version available") + log.info("max subvolume version is v{0}".format(self.max_version)) + + def _get_subvolume_version(self, version): + try: + return self.versions[version] + except KeyError: + raise VolumeException(-errno.EINVAL, "subvolume class v{0} does not exist".format(version)) + + def get_subvolume_object_max(self, mgr, fs, vol_spec, group, subvolname): + return self._get_subvolume_version(self.max_version)(mgr, fs, vol_spec, group, subvolname) + + def upgrade_to_v2_subvolume(self, subvolume): + # legacy mode subvolumes cannot be upgraded to v2 + if subvolume.legacy_mode: + return + + version = int(subvolume.metadata_mgr.get_global_option('version')) + if version >= SubvolumeV2.version(): + return + + v1_subvolume = self._get_subvolume_version(version)(subvolume.mgr, subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname) + try: + v1_subvolume.open(SubvolumeOpType.SNAP_LIST) + except VolumeException as ve: + # if volume is not ready for snapshot listing, do not upgrade at present + if ve.errno == -errno.EAGAIN: + return + raise + + # v1 subvolumes with snapshots cannot be upgraded to v2 + if v1_subvolume.list_snapshots(): + return + + subvolume.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_VERSION, SubvolumeV2.version()) + subvolume.metadata_mgr.flush() + + def upgrade_legacy_subvolume(self, fs, subvolume): + assert subvolume.legacy_mode + try: + fs.mkdirs(subvolume.legacy_dir, 0o700) + except cephfs.Error as e: + raise VolumeException(-e.args[0], "error accessing subvolume") + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + qpath = subvolume.base_path.decode('utf-8') + # legacy is only upgradable to v1 + subvolume.init_config(SubvolumeV1.version(), subvolume_type, qpath, initial_state) + + def get_subvolume_object(self, mgr, fs, vol_spec, group, subvolname, upgrade=True): + subvolume = SubvolumeBase(mgr, fs, vol_spec, group, subvolname) + try: + subvolume.discover() + self.upgrade_to_v2_subvolume(subvolume) + version = int(subvolume.metadata_mgr.get_global_option('version')) + subvolume_version_object = self._get_subvolume_version(version)(mgr, fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode) + subvolume_version_object.metadata_mgr.refresh() + subvolume_version_object.clean_stale_snapshot_metadata() + return subvolume_version_object + except MetadataMgrException as me: + if me.errno == -errno.ENOENT and upgrade: + self.upgrade_legacy_subvolume(fs, subvolume) + return self.get_subvolume_object(mgr, fs, vol_spec, group, subvolname, upgrade=False) + else: + # log the actual error and generalize error string returned to user + log.error("error accessing subvolume metadata for '{0}' ({1})".format(subvolname, me)) + raise VolumeException(-errno.EINVAL, "error accessing subvolume metadata") + +loaded_subvolumes = SubvolumeLoader() +loaded_subvolumes._load_supported_versions() diff --git a/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py new file mode 100644 index 000000000..259dcd0e0 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py @@ -0,0 +1,208 @@ +from contextlib import contextmanager +import os +import fcntl +import json +import logging +import struct +import uuid + +import cephfs + +from ..group import Group + +log = logging.getLogger(__name__) + +class AuthMetadataError(Exception): + pass + +class AuthMetadataManager(object): + + # Current version + version = 6 + + # Filename extensions for meta files. + META_FILE_EXT = ".meta" + DEFAULT_VOL_PREFIX = "/volumes" + + def __init__(self, fs): + self.fs = fs + self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] + self.volume_prefix = self.DEFAULT_VOL_PREFIX + + def _to_bytes(self, param): + ''' + Helper method that returns byte representation of the given parameter. + ''' + if isinstance(param, str): + return param.encode('utf-8') + elif param is None: + return param + else: + return str(param).encode('utf-8') + + def _subvolume_metadata_path(self, group_name, subvol_name): + return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( + group_name if group_name != Group.NO_GROUP_NAME else "", + subvol_name, + self.META_FILE_EXT)) + + def _check_compat_version(self, compat_version): + if self.version < compat_version: + msg = ("The current version of AuthMetadataManager, version {0} " + "does not support the required feature. Need version {1} " + "or greater".format(self.version, compat_version) + ) + log.error(msg) + raise AuthMetadataError(msg) + + def _metadata_get(self, path): + """ + Return a deserialized JSON object, or None + """ + fd = self.fs.open(path, "r") + # TODO iterate instead of assuming file < 4MB + read_bytes = self.fs.read(fd, 0, 4096 * 1024) + self.fs.close(fd) + if read_bytes: + return json.loads(read_bytes.decode()) + else: + return None + + def _metadata_set(self, path, data): + serialized = json.dumps(data) + fd = self.fs.open(path, "w") + try: + self.fs.write(fd, self._to_bytes(serialized), 0) + self.fs.fsync(fd, 0) + finally: + self.fs.close(fd) + + def _lock(self, path): + @contextmanager + def fn(): + while(1): + fd = self.fs.open(path, os.O_CREAT, 0o755) + self.fs.flock(fd, fcntl.LOCK_EX, self._id) + + # The locked file will be cleaned up sometime. It could be + # unlinked by consumer e.g., an another manila-share service + # instance, before lock was applied on it. Perform checks to + # ensure that this does not happen. + try: + statbuf = self.fs.stat(path) + except cephfs.ObjectNotFound: + self.fs.close(fd) + continue + + fstatbuf = self.fs.fstat(fd) + if statbuf.st_ino == fstatbuf.st_ino: + break + + try: + yield + finally: + self.fs.flock(fd, fcntl.LOCK_UN, self._id) + self.fs.close(fd) + + return fn() + + def _auth_metadata_path(self, auth_id): + return os.path.join(self.volume_prefix, "${0}{1}".format( + auth_id, self.META_FILE_EXT)) + + def auth_lock(self, auth_id): + return self._lock(self._auth_metadata_path(auth_id)) + + def auth_metadata_get(self, auth_id): + """ + Call me with the metadata locked! + + Check whether a auth metadata structure can be decoded by the current + version of AuthMetadataManager. + + Return auth metadata that the current version of AuthMetadataManager + can decode. + """ + auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) + + if auth_metadata: + self._check_compat_version(auth_metadata['compat_version']) + + return auth_metadata + + def auth_metadata_set(self, auth_id, data): + """ + Call me with the metadata locked! + + Fsync the auth metadata. + + Add two version attributes to the auth metadata, + 'compat_version', the minimum AuthMetadataManager version that can + decode the metadata, and 'version', the AuthMetadataManager version + that encoded the metadata. + """ + data['compat_version'] = 6 + data['version'] = self.version + return self._metadata_set(self._auth_metadata_path(auth_id), data) + + def create_subvolume_metadata_file(self, group_name, subvol_name): + """ + Create a subvolume metadata file, if it does not already exist, to store + data about auth ids having access to the subvolume + """ + fd = self.fs.open(self._subvolume_metadata_path(group_name, subvol_name), + os.O_CREAT, 0o755) + self.fs.close(fd) + + def delete_subvolume_metadata_file(self, group_name, subvol_name): + vol_meta_path = self._subvolume_metadata_path(group_name, subvol_name) + try: + self.fs.unlink(vol_meta_path) + except cephfs.ObjectNotFound: + pass + + def subvol_metadata_lock(self, group_name, subvol_name): + """ + Return a ContextManager which locks the authorization metadata for + a particular subvolume, and persists a flag to the metadata indicating + that it is currently locked, so that we can detect dirty situations + during recovery. + + This lock isn't just to make access to the metadata safe: it's also + designed to be used over the two-step process of checking the + metadata and then responding to an authorization request, to + ensure that at the point we respond the metadata hasn't changed + in the background. It's key to how we avoid security holes + resulting from races during that problem , + """ + return self._lock(self._subvolume_metadata_path(group_name, subvol_name)) + + def subvol_metadata_get(self, group_name, subvol_name): + """ + Call me with the metadata locked! + + Check whether a subvolume metadata structure can be decoded by the current + version of AuthMetadataManager. + + Return a subvolume_metadata structure that the current version of + AuthMetadataManager can decode. + """ + subvolume_metadata = self._metadata_get(self._subvolume_metadata_path(group_name, subvol_name)) + + if subvolume_metadata: + self._check_compat_version(subvolume_metadata['compat_version']) + + return subvolume_metadata + + def subvol_metadata_set(self, group_name, subvol_name, data): + """ + Call me with the metadata locked! + + Add two version attributes to the subvolume metadata, + 'compat_version', the minimum AuthMetadataManager version that can + decode the metadata and 'version', the AuthMetadataManager version + that encoded the metadata. + """ + data['compat_version'] = 1 + data['version'] = self.version + return self._metadata_set(self._subvolume_metadata_path(group_name, subvol_name), data) diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py new file mode 100644 index 000000000..718735d91 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py @@ -0,0 +1,200 @@ +import os +import errno +import logging +import sys +import threading +import configparser +import re + +import cephfs + +from ...exception import MetadataMgrException + +log = logging.getLogger(__name__) + +# _lock needs to be shared across all instances of MetadataManager. +# that is why we have a file level instance +_lock = threading.Lock() + + +def _conf_reader(fs, fd, offset=0, length=4096): + while True: + buf = fs.read(fd, offset, length) + offset += len(buf) + if not buf: + return + yield buf.decode('utf-8') + + +class _ConfigWriter: + def __init__(self, fs, fd): + self._fs = fs + self._fd = fd + self._wrote = 0 + + def write(self, value): + buf = value.encode('utf-8') + wrote = self._fs.write(self._fd, buf, -1) + self._wrote += wrote + return wrote + + def fsync(self): + self._fs.fsync(self._fd, 0) + + @property + def wrote(self): + return self._wrote + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + self._fs.close(self._fd) + + +class MetadataManager(object): + GLOBAL_SECTION = "GLOBAL" + USER_METADATA_SECTION = "USER_METADATA" + GLOBAL_META_KEY_VERSION = "version" + GLOBAL_META_KEY_TYPE = "type" + GLOBAL_META_KEY_PATH = "path" + GLOBAL_META_KEY_STATE = "state" + + CLONE_FAILURE_SECTION = "CLONE_FAILURE" + CLONE_FAILURE_META_KEY_ERRNO = "errno" + CLONE_FAILURE_META_KEY_ERROR_MSG = "error_msg" + + def __init__(self, fs, config_path, mode): + self.fs = fs + self.mode = mode + self.config_path = config_path + self.config = configparser.ConfigParser() + + def refresh(self): + fd = None + try: + log.debug("opening config {0}".format(self.config_path)) + with _lock: + fd = self.fs.open(self.config_path, os.O_RDONLY) + cfg = ''.join(_conf_reader(self.fs, fd)) + self.config.read_string(cfg, source=self.config_path) + except UnicodeDecodeError: + raise MetadataMgrException(-errno.EINVAL, + "failed to decode, erroneous metadata config '{0}'".format(self.config_path)) + except cephfs.ObjectNotFound: + raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path)) + except cephfs.Error as e: + raise MetadataMgrException(-e.args[0], e.args[1]) + except configparser.Error: + raise MetadataMgrException(-errno.EINVAL, "failed to parse, erroneous metadata config " + "'{0}'".format(self.config_path)) + finally: + if fd is not None: + self.fs.close(fd) + + def flush(self): + # cull empty sections + for section in list(self.config.sections()): + if len(self.config.items(section)) == 0: + self.config.remove_section(section) + + try: + with _lock: + tmp_config_path = self.config_path + b'.tmp' + fd = self.fs.open(tmp_config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode) + with _ConfigWriter(self.fs, fd) as cfg_writer: + self.config.write(cfg_writer) + cfg_writer.fsync() + self.fs.rename(tmp_config_path, self.config_path) + log.info(f"wrote {cfg_writer.wrote} bytes to config {tmp_config_path}") + log.info(f"Renamed {tmp_config_path} to config {self.config_path}") + except cephfs.Error as e: + raise MetadataMgrException(-e.args[0], e.args[1]) + + def init(self, version, typ, path, state): + # you may init just once before refresh (helps to overwrite conf) + if self.config.has_section(MetadataManager.GLOBAL_SECTION): + raise MetadataMgrException(-errno.EINVAL, "init called on an existing config") + + self.add_section(MetadataManager.GLOBAL_SECTION) + self.update_section_multi( + MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version), + MetadataManager.GLOBAL_META_KEY_TYPE : str(typ), + MetadataManager.GLOBAL_META_KEY_PATH : str(path), + MetadataManager.GLOBAL_META_KEY_STATE : str(state) + }) + + def add_section(self, section): + try: + self.config.add_section(section) + except configparser.DuplicateSectionError: + return + except: + raise MetadataMgrException(-errno.EINVAL, "error adding section to config") + + def remove_option(self, section, key): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + return self.config.remove_option(section, key) + + def remove_section(self, section): + self.config.remove_section(section) + + def update_section(self, section, key, value): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + self.config.set(section, key, str(value)) + + def update_section_multi(self, section, dct): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + for key,value in dct.items(): + self.config.set(section, key, str(value)) + + def update_global_section(self, key, value): + self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value)) + + def get_option(self, section, key): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + if not self.config.has_option(section, key): + raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section)) + return self.config.get(section, key) + + def get_global_option(self, key): + return self.get_option(MetadataManager.GLOBAL_SECTION, key) + + def list_all_options_from_section(self, section): + metadata_dict = {} + if self.config.has_section(section): + options = self.config.options(section) + for option in options: + metadata_dict[option] = self.config.get(section,option) + return metadata_dict + + def list_all_keys_with_specified_values_from_section(self, section, value): + keys = [] + if self.config.has_section(section): + options = self.config.options(section) + for option in options: + if (value == self.config.get(section, option)) : + keys.append(option) + return keys + + def section_has_item(self, section, item): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + return item in [v[1] for v in self.config.items(section)] + + def has_snap_metadata_section(self): + sections = self.config.sections() + r = re.compile('SNAP_METADATA_.*') + for section in sections: + if r.match(section): + return True + return False + + def list_snaps_with_metadata(self): + sections = self.config.sections() + r = re.compile('SNAP_METADATA_.*') + return [section[len("SNAP_METADATA_"):] for section in sections if r.match(section)] diff --git a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py new file mode 100644 index 000000000..1142600cb --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py @@ -0,0 +1,114 @@ +import errno + +from typing import Dict + +from ...exception import OpSmException +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions + +class TransitionKey(object): + def __init__(self, subvol_type, state, action_type): + self.transition_key = [subvol_type, state, action_type] + + def __hash__(self): + return hash(tuple(self.transition_key)) + + def __eq__(self, other): + return self.transition_key == other.transition_key + + def __neq__(self, other): + return not(self == other) + +class SubvolumeOpSm(object): + transition_table = {} # type: Dict + + @staticmethod + def is_complete_state(state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeStates.STATE_COMPLETE + + @staticmethod + def is_failed_state(state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED + + @staticmethod + def is_init_state(stm_type, state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeOpSm.get_init_state(stm_type) + + @staticmethod + def get_init_state(stm_type): + if not isinstance(stm_type, SubvolumeTypes): + raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type)) + init_state = SubvolumeOpSm.transition_table[TransitionKey(stm_type, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE)] + if not init_state: + raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type)) + return init_state + + @staticmethod + def transition(stm_type, current_state, action): + if not isinstance(stm_type, SubvolumeTypes): + raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type)) + if not isinstance(current_state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state)) + if not isinstance(action, SubvolumeActions): + raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action)) + + transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)] + if not transition: + raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type)) + + return transition + +SubvolumeOpSm.transition_table = { + # state transitions for state machine type TYPE_NORMAL + TransitionKey(SubvolumeTypes.TYPE_NORMAL, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE, + + TransitionKey(SubvolumeTypes.TYPE_NORMAL, + SubvolumeStates.STATE_COMPLETE, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, + + # state transitions for state machine type TYPE_CLONE + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_COMPLETE, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_CANCELED, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_FAILED, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, +} diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py new file mode 100644 index 000000000..ec7138cbd --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py @@ -0,0 +1,61 @@ +import errno +from enum import Enum, unique + +from ...exception import VolumeException + +@unique +class SubvolumeTypes(Enum): + TYPE_NORMAL = "subvolume" + TYPE_CLONE = "clone" + + @staticmethod + def from_value(value): + if value == "subvolume": + return SubvolumeTypes.TYPE_NORMAL + if value == "clone": + return SubvolumeTypes.TYPE_CLONE + + raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value)) + +@unique +class SubvolumeStates(Enum): + STATE_INIT = 'init' + STATE_PENDING = 'pending' + STATE_INPROGRESS = 'in-progress' + STATE_FAILED = 'failed' + STATE_COMPLETE = 'complete' + STATE_CANCELED = 'canceled' + STATE_RETAINED = 'snapshot-retained' + + @staticmethod + def from_value(value): + if value == "init": + return SubvolumeStates.STATE_INIT + if value == "pending": + return SubvolumeStates.STATE_PENDING + if value == "in-progress": + return SubvolumeStates.STATE_INPROGRESS + if value == "failed": + return SubvolumeStates.STATE_FAILED + if value == "complete": + return SubvolumeStates.STATE_COMPLETE + if value == "canceled": + return SubvolumeStates.STATE_CANCELED + if value == "snapshot-retained": + return SubvolumeStates.STATE_RETAINED + + raise VolumeException(-errno.EINVAL, "invalid state '{0}'".format(value)) + +@unique +class SubvolumeActions(Enum): + ACTION_NONE = 0 + ACTION_SUCCESS = 1 + ACTION_FAILED = 2 + ACTION_CANCELLED = 3 + ACTION_RETAINED = 4 + +@unique +class SubvolumeFeatures(Enum): + FEATURE_SNAPSHOT_CLONE = "snapshot-clone" + FEATURE_SNAPSHOT_RETENTION = "snapshot-retention" + FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect" diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py new file mode 100644 index 000000000..72fc45a42 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py @@ -0,0 +1,449 @@ +import os +import stat +import uuid +import errno +import logging +import hashlib +from typing import Dict, Union +from pathlib import Path + +import cephfs + +from ..pin_util import pin +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates +from .metadata_manager import MetadataManager +from ..trash import create_trashcan, open_trashcan +from ...fs_util import get_ancestor_xattr +from ...exception import MetadataMgrException, VolumeException +from .op_sm import SubvolumeOpSm +from .auth_metadata import AuthMetadataManager +from .subvolume_attrs import SubvolumeStates + +log = logging.getLogger(__name__) + +class SubvolumeBase(object): + LEGACY_CONF_DIR = "_legacy" + + def __init__(self, mgr, fs, vol_spec, group, subvolname, legacy=False): + self.mgr = mgr + self.fs = fs + self.auth_mdata_mgr = AuthMetadataManager(fs) + self.cmode = None + self.user_id = None + self.group_id = None + self.vol_spec = vol_spec + self.group = group + self.subvolname = subvolname + self.legacy_mode = legacy + self.load_config() + + @property + def uid(self): + return self.user_id + + @uid.setter + def uid(self, val): + self.user_id = val + + @property + def gid(self): + return self.group_id + + @gid.setter + def gid(self, val): + self.group_id = val + + @property + def mode(self): + return self.cmode + + @mode.setter + def mode(self, val): + self.cmode = val + + @property + def base_path(self): + return os.path.join(self.group.path, self.subvolname.encode('utf-8')) + + @property + def config_path(self): + return os.path.join(self.base_path, b".meta") + + @property + def legacy_dir(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), SubvolumeBase.LEGACY_CONF_DIR.encode('utf-8')) + + @property + def legacy_config_path(self): + try: + m = hashlib.md5(self.base_path) + except ValueError: + try: + m = hashlib.md5(self.base_path, usedforsecurity=False) # type: ignore + except TypeError: + raise VolumeException(-errno.EINVAL, + "require python's hashlib library to support usedforsecurity flag in FIPS enabled systems") + + meta_config = "{0}.meta".format(m.hexdigest()) + return os.path.join(self.legacy_dir, meta_config.encode('utf-8')) + + @property + def namespace(self): + return "{0}{1}".format(self.vol_spec.fs_namespace, self.subvolname) + + @property + def group_name(self): + return self.group.group_name + + @property + def subvol_name(self): + return self.subvolname + + @property + def legacy_mode(self): + return self.legacy + + @legacy_mode.setter + def legacy_mode(self, mode): + self.legacy = mode + + @property + def path(self): + """ Path to subvolume data directory """ + raise NotImplementedError + + @property + def features(self): + """ List of features supported by the subvolume, containing items from SubvolumeFeatures """ + raise NotImplementedError + + @property + def state(self): + """ Subvolume state, one of SubvolumeStates """ + return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) + + @property + def subvol_type(self): + return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)) + + @property + def purgeable(self): + """ Boolean declaring if subvolume can be purged """ + raise NotImplementedError + + def clean_stale_snapshot_metadata(self): + """ Clean up stale snapshot metadata """ + raise NotImplementedError + + def load_config(self): + try: + self.fs.stat(self.legacy_config_path) + self.legacy_mode = True + except cephfs.Error as e: + pass + + log.debug("loading config " + "'{0}' [mode: {1}]".format(self.subvolname, "legacy" + if self.legacy_mode else "new")) + if self.legacy_mode: + self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640) + else: + self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640) + + def get_attrs(self, pathname): + # get subvolume attributes + attrs = {} # type: Dict[str, Union[int, str, None]] + stx = self.fs.statx(pathname, + cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE, + cephfs.AT_SYMLINK_NOFOLLOW) + + attrs["uid"] = int(stx["uid"]) + attrs["gid"] = int(stx["gid"]) + attrs["mode"] = int(int(stx["mode"]) & ~stat.S_IFMT(stx["mode"])) + + try: + attrs["data_pool"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.NoData: + attrs["data_pool"] = None + + try: + attrs["pool_namespace"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + attrs["pool_namespace"] = None + + try: + attrs["quota"] = int(self.fs.getxattr(pathname, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + attrs["quota"] = None + + return attrs + + def set_attrs(self, path, attrs): + # set subvolume attributes + # set size + quota = attrs.get("quota") + if quota is not None: + try: + self.fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # set pool layout + data_pool = attrs.get("data_pool") + if data_pool is not None: + try: + self.fs.setxattr(path, 'ceph.dir.layout.pool', data_pool.encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, + "invalid pool layout '{0}' -- need a valid data pool".format(data_pool)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # isolate namespace + xattr_key = xattr_val = None + pool_namespace = attrs.get("pool_namespace") + if pool_namespace is not None: + # enforce security isolation, use separate namespace for this subvolume + xattr_key = 'ceph.dir.layout.pool_namespace' + xattr_val = pool_namespace + elif not data_pool: + # If subvolume's namespace layout is not set, then the subvolume's pool + # layout remains unset and will undesirably change with ancestor's + # pool layout changes. + xattr_key = 'ceph.dir.layout.pool' + xattr_val = None + try: + self.fs.getxattr(path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.NoData as e: + xattr_val = get_ancestor_xattr(self.fs, os.path.split(path)[0], "ceph.dir.layout.pool") + if xattr_key and xattr_val: + try: + self.fs.setxattr(path, xattr_key, xattr_val.encode('utf-8'), 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # set uid/gid + uid = attrs.get("uid") + if uid is None: + uid = self.group.uid + else: + try: + if uid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid UID") + + gid = attrs.get("gid") + if gid is None: + gid = self.group.gid + else: + try: + if gid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid GID") + + if uid is not None and gid is not None: + self.fs.chown(path, uid, gid) + + # set mode + mode = attrs.get("mode", None) + if mode is not None: + self.fs.lchmod(path, mode) + + def _resize(self, path, newsize, noshrink): + try: + newsize = int(newsize) + if newsize <= 0: + raise VolumeException(-errno.EINVAL, "Invalid subvolume size") + except ValueError: + newsize = newsize.lower() + if not (newsize == "inf" or newsize == "infinite"): + raise VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize)) + newsize = 0 + noshrink = False + + try: + maxbytes = int(self.fs.getxattr(path, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + maxbytes = 0 + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + subvolstat = self.fs.stat(path) + if newsize > 0 and newsize < subvolstat.st_size: + if noshrink: + raise VolumeException(-errno.EINVAL, "Can't resize the subvolume. The new size '{0}' would be lesser than the current " + "used size '{1}'".format(newsize, subvolstat.st_size)) + + if not newsize == maxbytes: + try: + self.fs.setxattr(path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], "Cannot set new size for the subvolume. '{0}'".format(e.args[1])) + return newsize, subvolstat.st_size + + def pin(self, pin_type, pin_setting): + return pin(self.fs, self.base_path, pin_type, pin_setting) + + def init_config(self, version, subvolume_type, subvolume_path, subvolume_state): + self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value) + self.metadata_mgr.flush() + + def discover(self): + log.debug("discovering subvolume '{0}' [mode: {1}]".format(self.subvolname, "legacy" if self.legacy_mode else "new")) + try: + self.fs.stat(self.base_path) + self.metadata_mgr.refresh() + log.debug("loaded subvolume '{0}'".format(self.subvolname)) + subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH) + # subvolume with retained snapshots has empty path, don't mistake it for + # fabricated metadata. + if (not self.legacy_mode and self.state != SubvolumeStates.STATE_RETAINED and + self.base_path.decode('utf-8') != str(Path(subvolpath).parent)): + raise MetadataMgrException(-errno.ENOENT, 'fabricated .meta') + except MetadataMgrException as me: + if me.errno in (-errno.ENOENT, -errno.EINVAL) and not self.legacy_mode: + log.warn("subvolume '{0}', {1}, " + "assuming legacy_mode".format(self.subvolname, me.error_str)) + self.legacy_mode = True + self.load_config() + self.discover() + else: + raise + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname)) + + def _trash_dir(self, path): + create_trashcan(self.fs, self.vol_spec) + with open_trashcan(self.fs, self.vol_spec) as trashcan: + trashcan.dump(path) + log.info("subvolume path '{0}' moved to trashcan".format(path)) + + def _link_dir(self, path, bname): + create_trashcan(self.fs, self.vol_spec) + with open_trashcan(self.fs, self.vol_spec) as trashcan: + trashcan.link(path, bname) + log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname)) + + def trash_base_dir(self): + if self.legacy_mode: + self.fs.unlink(self.legacy_config_path) + self._trash_dir(self.base_path) + + def create_base_dir(self, mode): + try: + self.fs.mkdirs(self.base_path, mode) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def info (self): + subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH) + etype = self.subvol_type + st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE | + cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | + cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME | + cephfs.CEPH_STATX_MTIME | cephfs.CEPH_STATX_CTIME, + cephfs.AT_SYMLINK_NOFOLLOW) + usedbytes = st["size"] + try: + nsize = int(self.fs.getxattr(subvolpath, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + nsize = 0 + + try: + data_pool = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool').decode('utf-8') + pool_namespace = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]), + 'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]), + 'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]), + 'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes), + 'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0), + 'pool_namespace': pool_namespace, 'features': self.features, 'state': self.state.value} + + def set_user_metadata(self, keyname, value): + try: + self.metadata_mgr.add_section(MetadataManager.USER_METADATA_SECTION) + self.metadata_mgr.update_section(MetadataManager.USER_METADATA_SECTION, keyname, str(value)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + log.error(f"Failed to set user metadata key={keyname} value={value} on subvolume={self.subvol_name} " + f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-me.args[0], me.args[1]) + + def get_user_metadata(self, keyname): + try: + value = self.metadata_mgr.get_option(MetadataManager.USER_METADATA_SECTION, keyname) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname)) + raise VolumeException(-me.args[0], me.args[1]) + return value + + def list_user_metadata(self): + return self.metadata_mgr.list_all_options_from_section(MetadataManager.USER_METADATA_SECTION) + + def remove_user_metadata(self, keyname): + try: + ret = self.metadata_mgr.remove_option(MetadataManager.USER_METADATA_SECTION, keyname) + if not ret: + raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume metadata does not exist") + log.error(f"Failed to remove user metadata key={keyname} on subvolume={self.subvol_name} " + f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-me.args[0], me.args[1]) + + def get_snap_section_name(self, snapname): + section = "SNAP_METADATA" + "_" + snapname; + return section; + + def set_snapshot_metadata(self, snapname, keyname, value): + try: + section = self.get_snap_section_name(snapname) + self.metadata_mgr.add_section(section) + self.metadata_mgr.update_section(section, keyname, str(value)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + log.error(f"Failed to set snapshot metadata key={keyname} value={value} on snap={snapname} " + f"subvolume={self.subvol_name} group={self.group_name} " + f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-me.args[0], me.args[1]) + + def get_snapshot_metadata(self, snapname, keyname): + try: + value = self.metadata_mgr.get_option(self.get_snap_section_name(snapname), keyname) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname)) + log.error(f"Failed to get snapshot metadata key={keyname} on snap={snapname} " + f"subvolume={self.subvol_name} group={self.group_name} " + f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-me.args[0], me.args[1]) + return value + + def list_snapshot_metadata(self, snapname): + return self.metadata_mgr.list_all_options_from_section(self.get_snap_section_name(snapname)) + + def remove_snapshot_metadata(self, snapname, keyname): + try: + ret = self.metadata_mgr.remove_option(self.get_snap_section_name(snapname), keyname) + if not ret: + raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "snapshot metadata not does not exist") + log.error(f"Failed to remove snapshot metadata key={keyname} on snap={snapname} " + f"subvolume={self.subvol_name} group={self.group_name} " + f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-me.args[0], me.args[1]) diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py new file mode 100644 index 000000000..b5a10dd6c --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py @@ -0,0 +1,904 @@ +import os +import sys +import stat +import uuid +import errno +import logging +import json +from datetime import datetime +from typing import Any, List, Dict +from pathlib import Path + +import cephfs + +from .metadata_manager import MetadataManager +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures +from .op_sm import SubvolumeOpSm +from .subvolume_base import SubvolumeBase +from ..template import SubvolumeTemplate +from ..snapshot_util import mksnap, rmsnap +from ..access import allow_access, deny_access +from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError +from ...fs_util import listsnaps, is_inherited_snap, create_base_dir +from ..template import SubvolumeOpType +from ..group import Group +from ..rankevicter import RankEvicter +from ..volume import get_mds_map + +from ..clone_index import open_clone_index, create_clone_index + +log = logging.getLogger(__name__) + +class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): + """ + Version 1 subvolumes creates a subvolume with path as follows, + volumes/<group-name>/<subvolume-name>/<uuid>/ + + - The directory under which user data resides is <uuid> + - Snapshots of the subvolume are taken within the <uuid> directory + - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing, + - global information about the subvolume (version, path, type, state) + - snapshots attached to an ongoing clone operation + - clone snapshot source if subvolume is a clone of a snapshot + - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under + /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid> + component in the path. + """ + VERSION = 1 + + @staticmethod + def version(): + return SubvolumeV1.VERSION + + @property + def path(self): + try: + # no need to stat the path -- open() does that + return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8') + except MetadataMgrException as me: + raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") + + @property + def features(self): + return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value] + + def mark_subvolume(self): + # set subvolume attr, on subvolume root, marking it as a CephFS subvolume + # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes + try: + # MDS treats this as a noop for already marked subvolume + self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def snapshot_base_path(self): + """ Base path for all snapshots """ + return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + + def snapshot_path(self, snapname): + """ Path to a specific snapshot named 'snapname' """ + return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8')) + + def snapshot_data_path(self, snapname): + """ Path to user data directory within a subvolume snapshot named 'snapname' """ + return self.snapshot_path(snapname) + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # create group directory with default mode(0o755) if it doesn't exist. + create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE) + # create directory and set attributes + self.fs.mkdirs(subvol_path, mode) + self.mark_subvolume() + attrs = { + 'uid': uid, + 'gid': gid, + 'data_pool': pool, + 'pool_namespace': self.namespace if isolate_nspace else None, + 'quota': size + } + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata + qpath = subvol_path.decode('utf-8') + self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state) + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def add_clone_source(self, volname, subvolume, snapname, flush=False): + self.metadata_mgr.add_section("source") + self.metadata_mgr.update_section("source", "volume", volname) + if not subvolume.group.is_default_group(): + self.metadata_mgr.update_section("source", "group", subvolume.group_name) + self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name) + self.metadata_mgr.update_section("source", "snapshot", snapname) + if flush: + self.metadata_mgr.flush() + + def remove_clone_source(self, flush=False): + self.metadata_mgr.remove_section("source") + if flush: + self.metadata_mgr.flush() + + def add_clone_failure(self, errno, error_msg): + try: + self.metadata_mgr.add_section(MetadataManager.CLONE_FAILURE_SECTION) + self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION, + MetadataManager.CLONE_FAILURE_META_KEY_ERRNO, errno) + self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION, + MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG, error_msg) + self.metadata_mgr.flush() + except MetadataMgrException as me: + log.error(f"Failed to add clone failure status clone={self.subvol_name} group={self.group_name} " + f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + subvolume_type = SubvolumeTypes.TYPE_CLONE + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "clone failed: internal error") + + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # source snapshot attrs are used to create clone subvolume. + # attributes of subvolume's content though, are synced during the cloning process. + attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname)) + + # The source of the clone may have exceeded its quota limit as + # CephFS quotas are imprecise. Cloning such a source may fail if + # the quota on the destination is set before starting the clone + # copy. So always set the quota on destination after cloning is + # successful. + attrs["quota"] = None + + # override snapshot pool setting, if one is provided for the clone + if pool is not None: + attrs["data_pool"] = pool + attrs["pool_namespace"] = None + + # create directory and set attributes + self.fs.mkdirs(subvol_path, attrs.get("mode")) + self.mark_subvolume() + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata and clone source + qpath = subvol_path.decode('utf-8') + self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value) + self.add_clone_source(source_volname, source_subvolume, snapname) + self.metadata_mgr.flush() + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def allowed_ops_by_type(self, vol_type): + if vol_type == SubvolumeTypes.TYPE_CLONE: + return {op_type for op_type in SubvolumeOpType} + + if vol_type == SubvolumeTypes.TYPE_NORMAL: + return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + return {} + + def allowed_ops_by_state(self, vol_state): + if vol_state == SubvolumeStates.STATE_COMPLETE: + return {op_type for op_type in SubvolumeOpType} + + return {SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.CLONE_CREATE, + SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + def open(self, op_type): + if not isinstance(op_type, SubvolumeOpType): + raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( + op_type.value, self.subvolname)) + try: + self.metadata_mgr.refresh() + + etype = self.subvol_type + if op_type not in self.allowed_ops_by_type(etype): + raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( + op_type.value, self.subvolname, etype.value)) + + estate = self.state + if op_type not in self.allowed_ops_by_state(estate): + raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( + self.subvolname, op_type.value)) + + subvol_path = self.path + log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) + st = self.fs.stat(subvol_path) + # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark + self.mark_subvolume() + + self.uid = int(st.st_uid) + self.gid = int(st.st_gid) + self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(me.args[0], me.args[1]) + except cephfs.ObjectNotFound: + log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname)) + raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def _recover_auth_meta(self, auth_id, auth_meta): + """ + Call me after locking the auth meta file. + """ + remove_subvolumes = [] + + for subvol, subvol_data in auth_meta['subvolumes'].items(): + if not subvol_data['dirty']: + continue + + (group_name, subvol_name) = subvol.split('/') + group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME + access_level = subvol_data['access_level'] + + with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name) + + # No SVMeta update indicates that there was no auth update + # in Ceph either. So it's safe to remove corresponding + # partial update in AMeta. + if not subvol_meta or auth_id not in subvol_meta['auths']: + remove_subvolumes.append(subvol) + continue + + want_auth = { + 'access_level': access_level, + 'dirty': False, + } + # SVMeta update looks clean. Ceph auth update must have been + # clean. Update the dirty flag and continue + if subvol_meta['auths'][auth_id] == want_auth: + auth_meta['subvolumes'][subvol]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + continue + + client_entity = "client.{0}".format(auth_id) + ret, out, err = self.mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + if ret == 0: + existing_caps = json.loads(out) + elif ret == -errno.ENOENT: + existing_caps = None + else: + log.error(err) + raise VolumeException(ret, err) + + self._authorize_subvolume(auth_id, access_level, existing_caps) + + # Recovered from partial auth updates for the auth ID's access + # to a subvolume. + auth_meta['subvolumes'][subvol]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + for subvol in remove_subvolumes: + del auth_meta['subvolumes'][subvol] + + if not auth_meta['subvolumes']: + # Clean up auth meta file + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + # Recovered from all partial auth updates for the auth ID. + auth_meta['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False): + """ + Get-or-create a Ceph auth identity for `auth_id` and grant them access + to + :param auth_id: + :param access_level: + :param tenant_id: Optionally provide a stringizable object to + restrict any created cephx IDs to other callers + passing the same tenant ID. + :allow_existing_id: Optionally authorize existing auth-ids not + created by ceph_volume_client. + :return: + """ + + with self.auth_mdata_mgr.auth_lock(auth_id): + client_entity = "client.{0}".format(auth_id) + ret, out, err = self.mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + + if ret == 0: + existing_caps = json.loads(out) + elif ret == -errno.ENOENT: + existing_caps = None + else: + log.error(err) + raise VolumeException(ret, err) + + # Existing meta, or None, to be updated + auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) + + # subvolume data to be inserted + group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None + group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) + subvolume = { + group_subvol_id : { + # The access level at which the auth_id is authorized to + # access the volume. + 'access_level': access_level, + 'dirty': True, + } + } + + if auth_meta is None: + if not allow_existing_id and existing_caps is not None: + msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id) + log.error(msg) + raise VolumeException(-errno.EPERM, msg) + + # non-existent auth IDs + sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( + auth_id, tenant_id + )) + log.debug("Authorize: no existing meta") + auth_meta = { + 'dirty': True, + 'tenant_id': str(tenant_id) if tenant_id else None, + 'subvolumes': subvolume + } + else: + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + # Disallow tenants to share auth IDs + if str(auth_meta['tenant_id']) != str(tenant_id): + msg = "auth ID: {0} is already in use".format(auth_id) + log.error(msg) + raise VolumeException(-errno.EPERM, msg) + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Authorize: existing tenant {tenant}".format( + tenant=auth_meta['tenant_id'] + )) + auth_meta['dirty'] = True + auth_meta['subvolumes'].update(subvolume) + + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + key = self._authorize_subvolume(auth_id, access_level, existing_caps) + + auth_meta['dirty'] = False + auth_meta['subvolumes'][group_subvol_id]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + if tenant_id: + return key + else: + # Caller wasn't multi-tenant aware: be safe and don't give + # them a key + return "" + + def _authorize_subvolume(self, auth_id, access_level, existing_caps): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + + auth = { + auth_id: { + 'access_level': access_level, + 'dirty': True, + } + } + + if subvol_meta is None: + subvol_meta = { + 'auths': auth + } + else: + subvol_meta['auths'].update(auth) + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + key = self._authorize(auth_id, access_level, existing_caps) + + subvol_meta['auths'][auth_id]['dirty'] = False + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + return key + + def _authorize(self, auth_id, access_level, existing_caps): + subvol_path = self.path + log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path)) + + # First I need to work out what the data pool is for this share: + # read the layout + try: + pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + try: + namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + namespace = None + + # Now construct auth capabilities that give the guest just enough + # permissions to access the share + client_entity = "client.{0}".format(auth_id) + want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8')) + want_osd_cap = "allow {0} pool={1}{2}".format( + access_level, pool, " namespace={0}".format(namespace) if namespace else "") + + # Construct auth caps that if present might conflict with the desired + # auth caps. + unwanted_access_level = 'r' if access_level == 'rw' else 'rw' + unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8')) + unwanted_osd_cap = "allow {0} pool={1}{2}".format( + unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "") + + return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap, existing_caps) + + def deauthorize(self, auth_id): + with self.auth_mdata_mgr.auth_lock(auth_id): + # Existing meta, or None, to be updated + auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) + + if auth_meta is None: + msg = "auth ID: {0} doesn't exist".format(auth_id) + log.error(msg) + raise VolumeException(-errno.ENOENT, msg) + + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None + group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) + if (auth_meta is None) or (not auth_meta['subvolumes']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + # Clean up the auth meta file of an auth ID + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + if group_subvol_id not in auth_meta['subvolumes']: + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + return + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + auth_meta['dirty'] = True + auth_meta['subvolumes'][group_subvol_id]['dirty'] = True + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + self._deauthorize_subvolume(auth_id) + + # Filter out the volume we're deauthorizing + del auth_meta['subvolumes'][group_subvol_id] + + # Clean up auth meta file + if not auth_meta['subvolumes']: + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + auth_meta['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + def _deauthorize_subvolume(self, auth_id): + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + + if (subvol_meta is None) or (auth_id not in subvol_meta['auths']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + return + + subvol_meta['auths'][auth_id]['dirty'] = True + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + self._deauthorize(auth_id) + + # Remove the auth_id from the metadata *after* removing it + # from ceph, so that if we crashed here, we would actually + # recreate the auth ID during recovery (i.e. end up with + # a consistent state). + + # Filter out the auth we're removing + del subvol_meta['auths'][auth_id] + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + def _deauthorize(self, auth_id): + """ + The volume must still exist. + """ + client_entity = "client.{0}".format(auth_id) + subvol_path = self.path + try: + pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + try: + namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + namespace = None + + # The auth_id might have read-only or read-write mount access for the + # subvolume path. + access_levels = ('r', 'rw') + want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8')) + for access_level in access_levels] + want_osd_caps = ['allow {0} pool={1}{2}'.format( + access_level, pool_name, " namespace={0}".format(namespace) if namespace else "") + for access_level in access_levels] + deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps) + + def authorized_list(self): + """ + Expose a list of auth IDs that have access to a subvolume. + + return: a list of (auth_id, access_level) tuples, where + the access_level can be 'r' , or 'rw'. + None if no auth ID is given access to the subvolume. + """ + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + auths = [] # type: List[Dict[str,str]] + if not meta or not meta['auths']: + return auths + + for auth, auth_data in meta['auths'].items(): + # Skip partial auth updates. + if not auth_data['dirty']: + auths.append({auth: auth_data['access_level']}) + + return auths + + def evict(self, volname, auth_id, timeout=30): + """ + Evict all clients based on the authorization ID and the subvolume path mounted. + Assumes that the authorization key has been revoked prior to calling this function. + + This operation can throw an exception if the mon cluster is unresponsive, or + any individual MDS daemon is unresponsive for longer than the timeout passed in. + """ + + client_spec = ["auth_name={0}".format(auth_id), ] + client_spec.append("client_metadata.root={0}". + format(self.path.decode('utf-8'))) + + log.info("evict clients with {0}".format(', '.join(client_spec))) + + mds_map = get_mds_map(self.mgr, volname) + if not mds_map: + raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname)) + + up = {} + for name, gid in mds_map['up'].items(): + # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" + assert name.startswith("mds_") + up[int(name[4:])] = gid + + # For all MDS ranks held by a daemon + # Do the parallelism in python instead of using "tell mds.*", because + # the latter doesn't give us per-mds output + threads = [] + for rank, gid in up.items(): + thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + log.info("evict: joined all") + + for t in threads: + if not t.success: + msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". + format(', '.join(client_spec), t.rank, t.gid, t.exception) + ) + log.error(msg) + raise EvictionError(msg) + + def _get_clone_source(self): + try: + clone_source = { + 'volume' : self.metadata_mgr.get_option("source", "volume"), + 'subvolume': self.metadata_mgr.get_option("source", "subvolume"), + 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"), + } + + try: + clone_source["group"] = self.metadata_mgr.get_option("source", "group") + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + pass + else: + raise + except MetadataMgrException as me: + raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") + return clone_source + + def _get_clone_failure(self): + clone_failure = { + 'errno' : self.metadata_mgr.get_option(MetadataManager.CLONE_FAILURE_SECTION, MetadataManager.CLONE_FAILURE_META_KEY_ERRNO), + 'error_msg' : self.metadata_mgr.get_option(MetadataManager.CLONE_FAILURE_SECTION, MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG), + } + return clone_failure + + @property + def status(self): + state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) + subvolume_type = self.subvol_type + subvolume_status = { + 'state' : state.value + } + if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: + subvolume_status["source"] = self._get_clone_source() + if SubvolumeOpSm.is_failed_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: + try: + subvolume_status["failure"] = self._get_clone_failure() + except MetadataMgrException: + pass + + return subvolume_status + + @property + def state(self): + return super(SubvolumeV1, self).state + + @state.setter + def state(self, val): + state = val[0].value + flush = val[1] + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state) + if flush: + self.metadata_mgr.flush() + + def remove(self, retainsnaps=False): + if retainsnaps: + raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname)) + if self.list_snapshots(): + raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname)) + self.trash_base_dir() + + def resize(self, newsize, noshrink): + subvol_path = self.path + return self._resize(subvol_path, newsize, noshrink) + + def create_snapshot(self, snapname): + try: + group_snapshot_path = os.path.join(self.group.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + self.fs.stat(group_snapshot_path) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + snappath = self.snapshot_path(snapname) + mksnap(self.fs, snappath) + else: + raise VolumeException(-e.args[0], e.args[1]) + else: + raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same") + + def has_pending_clones(self, snapname): + try: + return self.metadata_mgr.section_has_item('clone snaps', snapname) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + return False + raise + + def get_pending_clones(self, snapname): + pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any] + pending_track_id_list = [] + pending_clone_list = [] + index_path = "" + orphan_clones_count = 0 + + try: + if self.has_pending_clones(snapname): + pending_track_id_list = self.metadata_mgr.list_all_keys_with_specified_values_from_section('clone snaps', snapname) + else: + return pending_clones_info + except MetadataMgrException as me: + if me.errno != -errno.ENOENT: + raise VolumeException(-me.args[0], me.args[1]) + + try: + with open_clone_index(self.fs, self.vol_spec) as index: + index_path = index.path.decode('utf-8') + except IndexException as e: + log.warning("failed to open clone index '{0}' for snapshot '{1}'".format(e, snapname)) + raise VolumeException(-errno.EINVAL, "failed to open clone index") + + for track_id in pending_track_id_list: + try: + link_path = self.fs.readlink(os.path.join(index_path, track_id), 4096) + except cephfs.Error as e: + if e.errno != errno.ENOENT: + raise VolumeException(-e.args[0], e.args[1]) + else: + try: + # If clone is completed between 'list_all_keys_with_specified_values_from_section' + # and readlink(track_id_path) call then readlink will fail with error ENOENT (2) + # Hence we double check whether track_id is exist in .meta file or not. + value = self.metadata_mgr.get_option('clone snaps', track_id) + # Edge case scenario. + # If track_id for clone exist but path /volumes/_index/clone/{track_id} not found + # then clone is orphan. + orphan_clones_count += 1 + continue + except MetadataMgrException as me: + if me.errno != -errno.ENOENT: + raise VolumeException(-me.args[0], me.args[1]) + + path = Path(link_path.decode('utf-8')) + clone_name = os.path.basename(link_path).decode('utf-8') + group_name = os.path.basename(path.parent.absolute()) + details = {"name": clone_name} # type: Dict[str, str] + if group_name != Group.NO_GROUP_NAME: + details["target_group"] = group_name + pending_clone_list.append(details) + + if len(pending_clone_list) != 0: + pending_clones_info["has_pending_clones"] = "yes" + pending_clones_info["pending_clones"] = pending_clone_list + else: + pending_clones_info["has_pending_clones"] = "no" + + if orphan_clones_count > 0: + pending_clones_info["orphan_clones_count"] = orphan_clones_count + + return pending_clones_info + + def remove_snapshot(self, snapname, force=False): + if self.has_pending_clones(snapname): + raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname)) + snappath = self.snapshot_path(snapname) + try: + self.metadata_mgr.remove_section(self.get_snap_section_name(snapname)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + if force: + log.info(f"Allowing snapshot removal on failure of it's metadata removal with force on " + f"snap={snapname} subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, " + f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + pass + else: + log.error(f"Failed to remove snapshot metadata on snap={snapname} subvol={self.subvol_name} " + f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-errno.EAGAIN, + f"failed to remove snapshot metadata on snap={snapname} reason={me.args[0]} {me.args[1]}") + rmsnap(self.fs, snappath) + + def snapshot_info(self, snapname): + if is_inherited_snap(snapname): + raise VolumeException(-errno.EINVAL, + "snapshot name '{0}' is invalid".format(snapname)) + snappath = self.snapshot_data_path(snapname) + snap_info = {} + try: + snap_attrs = {'created_at':'ceph.snap.btime', + 'data_pool':'ceph.dir.layout.pool'} + for key, val in snap_attrs.items(): + snap_info[key] = self.fs.getxattr(snappath, val) + pending_clones_info = self.get_pending_clones(snapname) + info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))), + 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any] + info_dict.update(pending_clones_info); + return info_dict + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise VolumeException(-errno.ENOENT, + "snapshot '{0}' does not exist".format(snapname)) + raise VolumeException(-e.args[0], e.args[1]) + + def list_snapshots(self): + try: + dirpath = self.snapshot_base_path() + return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return [] + raise + + def clean_stale_snapshot_metadata(self): + """ Clean up stale snapshot metadata """ + if self.metadata_mgr.has_snap_metadata_section(): + snap_list = self.list_snapshots() + snaps_with_metadata_list = self.metadata_mgr.list_snaps_with_metadata() + for snap_with_metadata in snaps_with_metadata_list: + if snap_with_metadata.encode('utf-8') not in snap_list: + try: + self.metadata_mgr.remove_section(self.get_snap_section_name(snap_with_metadata)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + log.error(f"Failed to remove stale snap metadata on snap={snap_with_metadata} " + f"subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, " + f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + pass + + def _add_snap_clone(self, track_id, snapname): + self.metadata_mgr.add_section("clone snaps") + self.metadata_mgr.update_section("clone snaps", track_id, snapname) + self.metadata_mgr.flush() + + def _remove_snap_clone(self, track_id): + self.metadata_mgr.remove_option("clone snaps", track_id) + self.metadata_mgr.flush() + + def attach_snapshot(self, snapname, tgt_subvolume): + if not snapname.encode('utf-8') in self.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + try: + create_clone_index(self.fs, self.vol_spec) + with open_clone_index(self.fs, self.vol_spec) as index: + track_idx = index.track(tgt_subvolume.base_path) + self._add_snap_clone(track_idx, snapname) + except (IndexException, MetadataMgrException) as e: + log.warning("error creating clone index: {0}".format(e)) + raise VolumeException(-errno.EINVAL, "error cloning subvolume") + + def detach_snapshot(self, snapname, track_id): + try: + with open_clone_index(self.fs, self.vol_spec) as index: + index.untrack(track_id) + self._remove_snap_clone(track_id) + except (IndexException, MetadataMgrException) as e: + log.warning("error delining snapshot from clone: {0}".format(e)) + raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone") diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py new file mode 100644 index 000000000..03085d049 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py @@ -0,0 +1,394 @@ +import os +import stat +import uuid +import errno +import logging + +import cephfs + +from .metadata_manager import MetadataManager +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures +from .op_sm import SubvolumeOpSm +from .subvolume_v1 import SubvolumeV1 +from ..template import SubvolumeTemplate +from ...exception import OpSmException, VolumeException, MetadataMgrException +from ...fs_util import listdir, create_base_dir +from ..template import SubvolumeOpType + +log = logging.getLogger(__name__) + +class SubvolumeV2(SubvolumeV1): + """ + Version 2 subvolumes creates a subvolume with path as follows, + volumes/<group-name>/<subvolume-name>/<uuid>/ + + The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots + of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory, + rather than under the <uuid> directory, as is the case of V1 subvolumes. + + - The directory under which user data resides is <uuid> + - Snapshots of the subvolume are taken within the <subvolume-name> directory + - A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar + to V1 subvolumes + - On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining + the rest of the subvolume and its meta file. + - The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of + older incarnations of the same subvolume. + - V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the + snapshot retention feature + """ + VERSION = 2 + + @staticmethod + def version(): + return SubvolumeV2.VERSION + + @property + def features(self): + return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, + SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value, + SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value] + + @property + def retained(self): + try: + self.metadata_mgr.refresh() + if self.state == SubvolumeStates.STATE_RETAINED: + return True + return False + except MetadataMgrException as me: + if me.errno != -errno.ENOENT: + raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname)) + return False + + @property + def purgeable(self): + if not self.retained or self.list_snapshots() or self.has_pending_purges: + return False + return True + + @property + def has_pending_purges(self): + try: + return not listdir(self.fs, self.trash_dir) == [] + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return False + raise + + @property + def trash_dir(self): + return os.path.join(self.base_path, b".trash") + + def create_trashcan(self): + """per subvolume trash directory""" + try: + self.fs.stat(self.trash_dir) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + try: + self.fs.mkdir(self.trash_dir, 0o700) + except cephfs.Error as ce: + raise VolumeException(-ce.args[0], ce.args[1]) + else: + raise VolumeException(-e.args[0], e.args[1]) + + def mark_subvolume(self): + # set subvolume attr, on subvolume root, marking it as a CephFS subvolume + # subvolume root is where snapshots would be taken, and hence is the base_path for v2 subvolumes + try: + # MDS treats this as a noop for already marked subvolume + self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + @staticmethod + def is_valid_uuid(uuid_str): + try: + uuid.UUID(uuid_str) + return True + except ValueError: + return False + + def snapshot_base_path(self): + return os.path.join(self.base_path, self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + + def snapshot_data_path(self, snapname): + snap_base_path = self.snapshot_path(snapname) + uuid_str = None + try: + with self.fs.opendir(snap_base_path) as dir_handle: + d = self.fs.readdir(dir_handle) + while d: + if d.d_name not in (b".", b".."): + d_full_path = os.path.join(snap_base_path, d.d_name) + stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW) + if stat.S_ISDIR(stx.get('mode')): + if self.is_valid_uuid(d.d_name.decode('utf-8')): + uuid_str = d.d_name + d = self.fs.readdir(dir_handle) + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + raise VolumeException(-e.args[0], e.args[1]) + + if not uuid_str: + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + + return os.path.join(snap_base_path, uuid_str) + + def _remove_on_failure(self, subvol_path, retained): + if retained: + log.info("cleaning up subvolume incarnation with path: {0}".format(subvol_path)) + try: + self.fs.rmdir(subvol_path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + else: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove(internal_cleanup=True) + + def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state): + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value) + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath) + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value) + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + + retained = self.retained + if retained and self.has_pending_purges: + raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress") + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # create group directory with default mode(0o755) if it doesn't exist. + create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE) + self.fs.mkdirs(subvol_path, mode) + self.mark_subvolume() + attrs = { + 'uid': uid, + 'gid': gid, + 'data_pool': pool, + 'pool_namespace': self.namespace if isolate_nspace else None, + 'quota': size + } + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata + qpath = subvol_path.decode('utf-8') + if retained: + self._set_incarnation_metadata(subvolume_type, qpath, initial_state) + self.metadata_mgr.flush() + else: + self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state) + + # Create the subvolume metadata file which manages auth-ids if it doesn't exist + self.auth_mdata_mgr.create_subvolume_metadata_file(self.group.groupname, self.subvolname) + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + self._remove_on_failure(subvol_path, retained) + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + subvolume_type = SubvolumeTypes.TYPE_CLONE + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "clone failed: internal error") + + retained = self.retained + if retained and self.has_pending_purges: + raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress") + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # source snapshot attrs are used to create clone subvolume + # attributes of subvolume's content though, are synced during the cloning process. + attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname)) + + # The source of the clone may have exceeded its quota limit as + # CephFS quotas are imprecise. Cloning such a source may fail if + # the quota on the destination is set before starting the clone + # copy. So always set the quota on destination after cloning is + # successful. + attrs["quota"] = None + + # override snapshot pool setting, if one is provided for the clone + if pool is not None: + attrs["data_pool"] = pool + attrs["pool_namespace"] = None + + # create directory and set attributes + self.fs.mkdirs(subvol_path, attrs.get("mode")) + self.mark_subvolume() + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata and clone source + qpath = subvol_path.decode('utf-8') + if retained: + self._set_incarnation_metadata(subvolume_type, qpath, initial_state) + else: + self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value) + self.add_clone_source(source_volname, source_subvolume, snapname) + self.metadata_mgr.flush() + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + self._remove_on_failure(subvol_path, retained) + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def allowed_ops_by_type(self, vol_type): + if vol_type == SubvolumeTypes.TYPE_CLONE: + return {op_type for op_type in SubvolumeOpType} + + if vol_type == SubvolumeTypes.TYPE_NORMAL: + return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + return {} + + def allowed_ops_by_state(self, vol_state): + if vol_state == SubvolumeStates.STATE_COMPLETE: + return {op_type for op_type in SubvolumeOpType} + + if vol_state == SubvolumeStates.STATE_RETAINED: + return { + SubvolumeOpType.REMOVE, + SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.LIST, + SubvolumeOpType.INFO, + SubvolumeOpType.SNAP_REMOVE, + SubvolumeOpType.SNAP_LIST, + SubvolumeOpType.SNAP_INFO, + SubvolumeOpType.SNAP_PROTECT, + SubvolumeOpType.SNAP_UNPROTECT, + SubvolumeOpType.CLONE_SOURCE + } + + return {SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.CLONE_CREATE, + SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL, + SubvolumeOpType.CLONE_SOURCE} + + def open(self, op_type): + if not isinstance(op_type, SubvolumeOpType): + raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( + op_type.value, self.subvolname)) + try: + self.metadata_mgr.refresh() + # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark + self.mark_subvolume() + + etype = self.subvol_type + if op_type not in self.allowed_ops_by_type(etype): + raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( + op_type.value, self.subvolname, etype.value)) + + estate = self.state + if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format( + self.subvolname)) + + if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED: + raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( + self.subvolname, op_type.value)) + + if estate != SubvolumeStates.STATE_RETAINED: + subvol_path = self.path + log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) + st = self.fs.stat(subvol_path) + + self.uid = int(st.st_uid) + self.gid = int(st.st_gid) + self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(me.args[0], me.args[1]) + except cephfs.ObjectNotFound: + log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname)) + raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def trash_incarnation_dir(self): + """rename subvolume (uuid component) to trash""" + self.create_trashcan() + try: + bname = os.path.basename(self.path) + tpath = os.path.join(self.trash_dir, bname) + log.debug("trash: {0} -> {1}".format(self.path, tpath)) + self.fs.rename(self.path, tpath) + self._link_dir(tpath, bname) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + @staticmethod + def safe_to_remove_subvolume_clone(subvol_state): + # Both the STATE_FAILED and STATE_CANCELED are handled by 'handle_clone_failed' in the state + # machine which removes the entry from the index. Hence, it's safe to removed clone with + # force option for both. + acceptable_rm_clone_states = [SubvolumeStates.STATE_COMPLETE, SubvolumeStates.STATE_CANCELED, + SubvolumeStates.STATE_FAILED, SubvolumeStates.STATE_RETAINED] + if subvol_state not in acceptable_rm_clone_states: + return False + return True + + def remove(self, retainsnaps=False, internal_cleanup=False): + if self.list_snapshots(): + if not retainsnaps: + raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname)) + else: + if not internal_cleanup and not self.safe_to_remove_subvolume_clone(self.state): + raise VolumeException(-errno.EAGAIN, + "{0} clone in-progress -- please cancel the clone and retry".format(self.subvolname)) + if not self.has_pending_purges: + self.trash_base_dir() + # Delete the volume meta file, if it's not already deleted + self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname) + return + if self.state != SubvolumeStates.STATE_RETAINED: + self.trash_incarnation_dir() + self.metadata_mgr.remove_section(MetadataManager.USER_METADATA_SECTION) + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "") + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value) + self.metadata_mgr.flush() + # Delete the volume meta file, if it's not already deleted + self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname) + + def info(self): + if self.state != SubvolumeStates.STATE_RETAINED: + return super(SubvolumeV2, self).info() + + return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value} + + def remove_snapshot(self, snapname, force=False): + super(SubvolumeV2, self).remove_snapshot(snapname, force) + if self.purgeable: + self.trash_base_dir() + # tickle the volume purge job to purge this entry, using ESTALE + raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname)) + # if not purgeable, subvol is not retained, or has snapshots, or already has purge jobs that will garbage collect this subvol diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py new file mode 100644 index 000000000..a79aa55e1 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/volume.py @@ -0,0 +1,198 @@ +import errno +import logging +import os + +from typing import List + +from contextlib import contextmanager + +import orchestrator + +from .lock import GlobalLock +from ..exception import VolumeException +from ..fs_util import create_pool, remove_pool, create_filesystem, \ + remove_filesystem, create_mds, volume_exists +from .trash import Trash +from mgr_util import open_filesystem, CephfsConnectionException + +log = logging.getLogger(__name__) + +def gen_pool_names(volname): + """ + return metadata and data pool name (from a filesystem/volume name) as a tuple + """ + return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname) + +def get_mds_map(mgr, volname): + """ + return mdsmap for a volname + """ + mds_map = None + fs_map = mgr.get("fs_map") + for f in fs_map['filesystems']: + if volname == f['mdsmap']['fs_name']: + return f['mdsmap'] + return mds_map + +def get_pool_names(mgr, volname): + """ + return metadata and data pools (list) names of volume as a tuple + """ + fs_map = mgr.get("fs_map") + metadata_pool_id = None + data_pool_ids = [] # type: List[int] + for f in fs_map['filesystems']: + if volname == f['mdsmap']['fs_name']: + metadata_pool_id = f['mdsmap']['metadata_pool'] + data_pool_ids = f['mdsmap']['data_pools'] + break + if metadata_pool_id is None: + return None, None + + osdmap = mgr.get("osd_map") + pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']]) + metadata_pool = pools[metadata_pool_id] + data_pools = [pools[id] for id in data_pool_ids] + return metadata_pool, data_pools + +def get_pool_ids(mgr, volname): + """ + return metadata and data pools (list) id of volume as a tuple + """ + fs_map = mgr.get("fs_map") + metadata_pool_id = None + data_pool_ids = [] # type: List[int] + for f in fs_map['filesystems']: + if volname == f['mdsmap']['fs_name']: + metadata_pool_id = f['mdsmap']['metadata_pool'] + data_pool_ids = f['mdsmap']['data_pools'] + break + if metadata_pool_id is None: + return None, None + return metadata_pool_id, data_pool_ids + +def create_volume(mgr, volname, placement): + """ + create volume (pool, filesystem and mds) + """ + metadata_pool, data_pool = gen_pool_names(volname) + # create pools + r, outb, outs = create_pool(mgr, metadata_pool) + if r != 0: + return r, outb, outs + r, outb, outs = create_pool(mgr, data_pool) + if r != 0: + #cleanup + remove_pool(mgr, metadata_pool) + return r, outb, outs + # create filesystem + r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool) + if r != 0: + log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs)) + #cleanup + remove_pool(mgr, data_pool) + remove_pool(mgr, metadata_pool) + return r, outb, outs + return create_mds(mgr, volname, placement) + + +def delete_volume(mgr, volname, metadata_pool, data_pools): + """ + delete the given module (tear down mds, remove filesystem, remove pools) + """ + # Tear down MDS daemons + try: + completion = mgr.remove_service('mds.' + volname) + orchestrator.raise_if_exception(completion) + except (ImportError, orchestrator.OrchestratorError): + log.warning("OrchestratorError, not tearing down MDS daemons") + except Exception as e: + # Don't let detailed orchestrator exceptions (python backtraces) + # bubble out to the user + log.exception("Failed to tear down MDS daemons") + return -errno.EINVAL, "", str(e) + + # In case orchestrator didn't tear down MDS daemons cleanly, or + # there was no orchestrator, we force the daemons down. + if volume_exists(mgr, volname): + r, outb, outs = remove_filesystem(mgr, volname) + if r != 0: + return r, outb, outs + else: + err = "Filesystem not found for volume '{0}'".format(volname) + log.warning(err) + return -errno.ENOENT, "", err + r, outb, outs = remove_pool(mgr, metadata_pool) + if r != 0: + return r, outb, outs + + for data_pool in data_pools: + r, outb, outs = remove_pool(mgr, data_pool) + if r != 0: + return r, outb, outs + result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools)) + return r, result_str, "" + + +def list_volumes(mgr): + """ + list all filesystem volumes. + + :param: None + :return: None + """ + result = [] + fs_map = mgr.get("fs_map") + for f in fs_map['filesystems']: + result.append({'name': f['mdsmap']['fs_name']}) + return result + + +def get_pending_subvol_deletions_count(path): + """ + Get the number of pending subvolumes deletions. + """ + trashdir = os.path.join(path, Trash.GROUP_NAME) + try: + num_pending_subvol_del = len(os.listdir(trashdir)) + except OSError as e: + if e.errno == errno.ENOENT: + num_pending_subvol_del = 0 + + return {'pending_subvolume_deletions': num_pending_subvol_del} + + +@contextmanager +def open_volume(vc, volname): + """ + open a volume for exclusive access. This API is to be used as a contextr + manager. + + :param vc: volume client instance + :param volname: volume name + :return: yields a volume handle (ceph filesystem handle) + """ + g_lock = GlobalLock() + with g_lock.lock_op(): + try: + with open_filesystem(vc, volname) as fs_handle: + yield fs_handle + except CephfsConnectionException as ce: + raise VolumeException(ce.errno, ce.error_str) + + +@contextmanager +def open_volume_lockless(vc, volname): + """ + open a volume with shared access. This API is to be used as a context + manager. + + :param vc: volume client instance + :param volname: volume name + :return: yields a volume handle (ceph filesystem handle) + """ + try: + with open_filesystem(vc, volname) as fs_handle: + yield fs_handle + except CephfsConnectionException as ce: + raise VolumeException(ce.errno, ce.error_str) |