summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/operations
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/volumes/fs/operations/__init__.py0
-rw-r--r--src/pybind/mgr/volumes/fs/operations/access.py139
-rw-r--r--src/pybind/mgr/volumes/fs/operations/clone_index.py98
-rw-r--r--src/pybind/mgr/volumes/fs/operations/group.py301
-rw-r--r--src/pybind/mgr/volumes/fs/operations/index.py23
-rw-r--r--src/pybind/mgr/volumes/fs/operations/lock.py43
-rw-r--r--src/pybind/mgr/volumes/fs/operations/pin_util.py34
-rw-r--r--src/pybind/mgr/volumes/fs/operations/rankevicter.py114
-rw-r--r--src/pybind/mgr/volumes/fs/operations/resolver.py26
-rw-r--r--src/pybind/mgr/volumes/fs/operations/snapshot_util.py30
-rw-r--r--src/pybind/mgr/volumes/fs/operations/subvolume.py74
-rw-r--r--src/pybind/mgr/volumes/fs/operations/template.py191
-rw-r--r--src/pybind/mgr/volumes/fs/operations/trash.py145
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/__init__.py112
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py208
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py200
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/op_sm.py114
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py61
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py449
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py904
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py394
-rw-r--r--src/pybind/mgr/volumes/fs/operations/volume.py198
22 files changed, 3858 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/operations/__init__.py b/src/pybind/mgr/volumes/fs/operations/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/__init__.py
diff --git a/src/pybind/mgr/volumes/fs/operations/access.py b/src/pybind/mgr/volumes/fs/operations/access.py
new file mode 100644
index 000000000..158e21c26
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/access.py
@@ -0,0 +1,139 @@
+import errno
+import json
+from typing import List
+
+def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True):
+ caps_list = [] # type: List[str]
+ for k, v in existing_caps['caps'].items():
+ if k == 'mds' or k == 'osd':
+ continue
+ elif k == 'mon':
+ if not authorize and v == 'allow r':
+ continue
+ caps_list.extend((k,v))
+
+ if mds_cap_str:
+ caps_list.extend(('mds', mds_cap_str))
+ if osd_cap_str:
+ caps_list.extend(('osd', osd_cap_str))
+
+ if authorize and 'mon' not in caps_list:
+ caps_list.extend(('mon', 'allow r'))
+
+ return caps_list
+
+
+def allow_access(mgr, client_entity, want_mds_cap, want_osd_cap,
+ unwanted_mds_cap, unwanted_osd_cap, existing_caps):
+ if existing_caps is None:
+ ret, out, err = mgr.mon_command({
+ "prefix": "auth get-or-create",
+ "entity": client_entity,
+ "caps": ['mds', want_mds_cap, 'osd', want_osd_cap, 'mon', 'allow r'],
+ "format": "json"})
+ else:
+ cap = existing_caps[0]
+
+ def cap_update(
+ orig_mds_caps, orig_osd_caps, want_mds_cap,
+ want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
+
+ if not orig_mds_caps:
+ return want_mds_cap, want_osd_cap
+
+ mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
+ osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
+
+ if want_mds_cap in mds_cap_tokens:
+ return orig_mds_caps, orig_osd_caps
+
+ if unwanted_mds_cap in mds_cap_tokens:
+ mds_cap_tokens.remove(unwanted_mds_cap)
+ osd_cap_tokens.remove(unwanted_osd_cap)
+
+ mds_cap_tokens.append(want_mds_cap)
+ osd_cap_tokens.append(want_osd_cap)
+
+ return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
+
+ orig_mds_caps = cap['caps'].get('mds', "")
+ orig_osd_caps = cap['caps'].get('osd', "")
+
+ mds_cap_str, osd_cap_str = cap_update(
+ orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
+ unwanted_mds_cap, unwanted_osd_cap)
+
+ caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str)
+ mgr.mon_command(
+ {
+ "prefix": "auth caps",
+ 'entity': client_entity,
+ 'caps': caps_list
+ })
+ ret, out, err = mgr.mon_command(
+ {
+ 'prefix': 'auth get',
+ 'entity': client_entity,
+ 'format': 'json'
+ })
+
+ # Result expected like this:
+ # [
+ # {
+ # "entity": "client.foobar",
+ # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
+ # "caps": {
+ # "mds": "allow *",
+ # "mon": "allow *"
+ # }
+ # }
+ # ]
+
+ caps = json.loads(out)
+ assert len(caps) == 1
+ assert caps[0]['entity'] == client_entity
+ return caps[0]['key']
+
+def deny_access(mgr, client_entity, want_mds_caps, want_osd_caps):
+ ret, out, err = mgr.mon_command({
+ "prefix": "auth get",
+ "entity": client_entity,
+ "format": "json",
+ })
+
+ if ret == -errno.ENOENT:
+ # Already gone, great.
+ return
+
+ def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
+ mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
+ osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
+
+ for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
+ if want_mds_cap in mds_cap_tokens:
+ mds_cap_tokens.remove(want_mds_cap)
+ osd_cap_tokens.remove(want_osd_cap)
+ break
+
+ return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
+
+ cap = json.loads(out)[0]
+ orig_mds_caps = cap['caps'].get('mds', "")
+ orig_osd_caps = cap['caps'].get('osd', "")
+ mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
+ want_mds_caps, want_osd_caps)
+
+ caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False)
+ if not caps_list:
+ mgr.mon_command(
+ {
+ 'prefix': 'auth rm',
+ 'entity': client_entity
+ })
+ else:
+ mgr.mon_command(
+ {
+ "prefix": "auth caps",
+ 'entity': client_entity,
+ 'caps': caps_list
+ })
diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py
new file mode 100644
index 000000000..a2b31f858
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py
@@ -0,0 +1,98 @@
+import os
+import uuid
+import stat
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .index import Index
+from ..exception import IndexException, VolumeException
+from ..fs_util import list_one_entry_at_a_time
+
+log = logging.getLogger(__name__)
+
+class CloneIndex(Index):
+ SUB_GROUP_NAME = "clone"
+ PATH_MAX = 4096
+
+ @property
+ def path(self):
+ return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8'))
+
+ def _track(self, sink_path):
+ tracking_id = str(uuid.uuid4())
+ source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
+ log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path))
+
+ self.fs.symlink(sink_path, source_path)
+ return tracking_id
+
+ def track(self, sink_path):
+ try:
+ return self._track(sink_path)
+ except (VolumeException, cephfs.Error) as e:
+ if isinstance(e, cephfs.Error):
+ e = IndexException(-e.args[0], e.args[1])
+ elif isinstance(e, VolumeException):
+ e = IndexException(e.errno, e.error_str)
+ raise e
+
+ def untrack(self, tracking_id):
+ log.info("untracking {0}".format(tracking_id))
+ source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
+ try:
+ self.fs.unlink(source_path)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+ def get_oldest_clone_entry(self, exclude=[]):
+ min_ctime_entry = None
+ exclude_tracking_ids = [v[0] for v in exclude]
+ log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids))
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode):
+ if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime:
+ min_ctime_entry = (dname, st)
+ if min_ctime_entry:
+ try:
+ linklen = min_ctime_entry[1].st_size
+ sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX)
+ return (min_ctime_entry[0], sink_path[:linklen])
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+ return None
+
+ def find_clone_entry_index(self, sink_path):
+ try:
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if stat.S_ISLNK(st.st_mode):
+ target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
+ if sink_path == target_path[:st.st_size]:
+ return dname
+ return None
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+def create_clone_index(fs, vol_spec):
+ clone_index = CloneIndex(fs, vol_spec)
+ try:
+ fs.mkdirs(clone_index.path, 0o700)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_clone_index(fs, vol_spec):
+ clone_index = CloneIndex(fs, vol_spec)
+ try:
+ fs.stat(clone_index.path)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+ yield clone_index
diff --git a/src/pybind/mgr/volumes/fs/operations/group.py b/src/pybind/mgr/volumes/fs/operations/group.py
new file mode 100644
index 000000000..c91969278
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/group.py
@@ -0,0 +1,301 @@
+import os
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .snapshot_util import mksnap, rmsnap
+from .pin_util import pin
+from .template import GroupTemplate
+from ..fs_util import listdir, listsnaps, get_ancestor_xattr, create_base_dir, has_subdir
+from ..exception import VolumeException
+
+log = logging.getLogger(__name__)
+
+class Group(GroupTemplate):
+ # Reserved subvolume group name which we use in paths for subvolumes
+ # that are not assigned to a group (i.e. created with group=None)
+ NO_GROUP_NAME = "_nogroup"
+
+ def __init__(self, fs, vol_spec, groupname):
+ if groupname == Group.NO_GROUP_NAME:
+ raise VolumeException(-errno.EPERM, "Operation not permitted for group '{0}' as it is an internal group.".format(groupname))
+ if groupname in vol_spec.INTERNAL_DIRS:
+ raise VolumeException(-errno.EINVAL, "'{0}' is an internal directory and not a valid group name.".format(groupname))
+ self.fs = fs
+ self.user_id = None
+ self.group_id = None
+ self.vol_spec = vol_spec
+ self.groupname = groupname if groupname else Group.NO_GROUP_NAME
+
+ @property
+ def path(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8'))
+
+ @property
+ def group_name(self):
+ return self.groupname
+
+ @property
+ def uid(self):
+ return self.user_id
+
+ @uid.setter
+ def uid(self, val):
+ self.user_id = val
+
+ @property
+ def gid(self):
+ return self.group_id
+
+ @gid.setter
+ def gid(self, val):
+ self.group_id = val
+
+ def is_default_group(self):
+ return self.groupname == Group.NO_GROUP_NAME
+
+ def list_subvolumes(self):
+ try:
+ return listdir(self.fs, self.path)
+ except VolumeException as ve:
+ # listing a default group when it's not yet created
+ if ve.errno == -errno.ENOENT and self.is_default_group():
+ return []
+ raise
+
+ def has_subvolumes(self):
+ try:
+ return has_subdir(self.fs, self.path)
+ except VolumeException as ve:
+ # listing a default group when it's not yet created
+ if ve.errno == -errno.ENOENT and self.is_default_group():
+ return False
+ raise
+
+ def pin(self, pin_type, pin_setting):
+ return pin(self.fs, self.path, pin_type, pin_setting)
+
+ def create_snapshot(self, snapname):
+ snappath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ mksnap(self.fs, snappath)
+
+ def remove_snapshot(self, snapname):
+ snappath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ rmsnap(self.fs, snappath)
+
+ def list_snapshots(self):
+ try:
+ dirpath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+ return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return []
+ raise
+
+ def info(self):
+ st = self.fs.statx(self.path, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE
+ | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE
+ | cephfs.CEPH_STATX_ATIME | cephfs.CEPH_STATX_MTIME | cephfs.CEPH_STATX_CTIME,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ usedbytes = st["size"]
+ try:
+ nsize = int(self.fs.getxattr(self.path, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ nsize = 0
+
+ try:
+ data_pool = self.fs.getxattr(self.path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ return {'uid': int(st["uid"]),
+ 'gid': int(st["gid"]),
+ 'atime': str(st["atime"]),
+ 'mtime': str(st["mtime"]),
+ 'ctime': str(st["ctime"]),
+ 'mode': int(st["mode"]),
+ 'data_pool': data_pool,
+ 'created_at': str(st["btime"]),
+ 'bytes_quota': "infinite" if nsize == 0 else nsize,
+ 'bytes_used': int(usedbytes),
+ 'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}
+
+ def resize(self, newsize, noshrink):
+ try:
+ newsize = int(newsize)
+ if newsize <= 0:
+ raise VolumeException(-errno.EINVAL, "Invalid subvolume group size")
+ except ValueError:
+ newsize = newsize.lower()
+ if not (newsize == "inf" or newsize == "infinite"):
+ raise (VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize)))
+ newsize = 0
+ noshrink = False
+
+ try:
+ maxbytes = int(self.fs.getxattr(self.path, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ maxbytes = 0
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ group_stat = self.fs.stat(self.path)
+ if newsize > 0 and newsize < group_stat.st_size:
+ if noshrink:
+ raise VolumeException(-errno.EINVAL, "Can't resize the subvolume group. The new size"
+ " '{0}' would be lesser than the current used size '{1}'"
+ .format(newsize, group_stat.st_size))
+
+ if not newsize == maxbytes:
+ try:
+ self.fs.setxattr(self.path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise (VolumeException(-e.args[0],
+ "Cannot set new size for the subvolume group. '{0}'".format(e.args[1])))
+ return newsize, group_stat.st_size
+
+def set_group_attrs(fs, path, attrs):
+ # set subvolume group attrs
+ # set size
+ quota = attrs.get("quota")
+ if quota is not None:
+ try:
+ fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set pool layout
+ pool = attrs.get("data_pool")
+ if not pool:
+ pool = get_ancestor_xattr(fs, path, "ceph.dir.layout.pool")
+ try:
+ fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL,
+ "Invalid pool layout '{0}'. It must be a valid data pool".format(pool))
+
+ # set uid/gid
+ uid = attrs.get("uid")
+ if uid is None:
+ uid = 0
+ else:
+ try:
+ uid = int(uid)
+ if uid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid UID")
+
+ gid = attrs.get("gid")
+ if gid is None:
+ gid = 0
+ else:
+ try:
+ gid = int(gid)
+ if gid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid GID")
+ fs.chown(path, uid, gid)
+
+ # set mode
+ mode = attrs.get("mode", None)
+ if mode is not None:
+ fs.lchmod(path, mode)
+
+def create_group(fs, vol_spec, groupname, size, pool, mode, uid, gid):
+ """
+ create a subvolume group.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param groupname: subvolume group name
+ :param size: In bytes, or None for no size limit
+ :param pool: the RADOS pool where the data objects of the subvolumes will be stored
+ :param mode: the user permissions
+ :param uid: the user identifier
+ :param gid: the group identifier
+ :return: None
+ """
+ group = Group(fs, vol_spec, groupname)
+ path = group.path
+ vol_spec_base_dir = group.vol_spec.base_dir.encode('utf-8')
+
+ # create vol_spec base directory with default mode(0o755) if it doesn't exist
+ create_base_dir(fs, vol_spec_base_dir, vol_spec.DEFAULT_MODE)
+ fs.mkdir(path, mode)
+ try:
+ attrs = {
+ 'uid': uid,
+ 'gid': gid,
+ 'data_pool': pool,
+ 'quota': size
+ }
+ set_group_attrs(fs, path, attrs)
+ except (cephfs.Error, VolumeException) as e:
+ try:
+ # cleanup group path on best effort basis
+ log.debug("cleaning up subvolume group path: {0}".format(path))
+ fs.rmdir(path)
+ except cephfs.Error as ce:
+ log.debug("failed to clean up subvolume group {0} with path: {1} ({2})".format(groupname, path, ce))
+ if isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+def remove_group(fs, vol_spec, groupname):
+ """
+ remove a subvolume group.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param groupname: subvolume group name
+ :return: None
+ """
+ group = Group(fs, vol_spec, groupname)
+ try:
+ fs.rmdir(group.path)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_group(fs, vol_spec, groupname):
+ """
+ open a subvolume group. This API is to be used as a context manager.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param groupname: subvolume group name
+ :return: yields a group object (subclass of GroupTemplate)
+ """
+ group = Group(fs, vol_spec, groupname)
+ try:
+ st = fs.stat(group.path)
+ group.uid = int(st.st_uid)
+ group.gid = int(st.st_gid)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ if not group.is_default_group():
+ raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname))
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+ yield group
+
+@contextmanager
+def open_group_unique(fs, vol_spec, groupname, c_group, c_groupname):
+ if groupname == c_groupname:
+ yield c_group
+ else:
+ with open_group(fs, vol_spec, groupname) as group:
+ yield group
diff --git a/src/pybind/mgr/volumes/fs/operations/index.py b/src/pybind/mgr/volumes/fs/operations/index.py
new file mode 100644
index 000000000..0e4296d75
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/index.py
@@ -0,0 +1,23 @@
+import errno
+import os
+
+from ..exception import VolumeException
+from .template import GroupTemplate
+
+class Index(GroupTemplate):
+ GROUP_NAME = "_index"
+
+ def __init__(self, fs, vol_spec):
+ self.fs = fs
+ self.vol_spec = vol_spec
+ self.groupname = Index.GROUP_NAME
+
+ @property
+ def path(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8'))
+
+ def track(self, *args):
+ raise VolumeException(-errno.EINVAL, "operation not supported.")
+
+ def untrack(self, tracking_id):
+ raise VolumeException(-errno.EINVAL, "operation not supported.")
diff --git a/src/pybind/mgr/volumes/fs/operations/lock.py b/src/pybind/mgr/volumes/fs/operations/lock.py
new file mode 100644
index 000000000..7ef6923e1
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/lock.py
@@ -0,0 +1,43 @@
+from contextlib import contextmanager
+import logging
+from threading import Lock
+from typing import Dict
+
+log = logging.getLogger(__name__)
+
+# singleton design pattern taken from http://www.aleax.it/5ep.html
+
+class GlobalLock(object):
+ """
+ Global lock to serialize operations in mgr/volumes. This lock
+ is currently held when accessing (opening) a volume to perform
+ group/subvolume operations. Since this is a big lock, it's rather
+ inefficient -- but right now it's ok since mgr/volumes does not
+ expect concurrent operations via its APIs.
+
+ As and when features get added (such as clone, where mgr/volumes
+ would maintain subvolume states in the filesystem), there might
+ be a need to allow concurrent operations. In that case it would
+ be nice to implement an efficient path based locking mechanism.
+
+ See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf
+ """
+ _shared_state = {
+ 'lock' : Lock(),
+ 'init' : False
+ } # type: Dict
+
+ def __init__(self):
+ with self._shared_state['lock']:
+ if not self._shared_state['init']:
+ self._shared_state['init'] = True
+ # share this state among all instances
+ self.__dict__ = self._shared_state
+
+ @contextmanager
+ def lock_op(self):
+ log.debug("entering global lock")
+ with self._shared_state['lock']:
+ log.debug("acquired global lock")
+ yield
+ log.debug("exited global lock")
diff --git a/src/pybind/mgr/volumes/fs/operations/pin_util.py b/src/pybind/mgr/volumes/fs/operations/pin_util.py
new file mode 100644
index 000000000..9ea79e546
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/pin_util.py
@@ -0,0 +1,34 @@
+import os
+import errno
+
+import cephfs
+
+from ..exception import VolumeException
+from distutils.util import strtobool
+
+_pin_value = {
+ "export": lambda x: int(x),
+ "distributed": lambda x: int(strtobool(x)),
+ "random": lambda x: float(x),
+}
+_pin_xattr = {
+ "export": "ceph.dir.pin",
+ "distributed": "ceph.dir.pin.distributed",
+ "random": "ceph.dir.pin.random",
+}
+
+def pin(fs, path, pin_type, pin_setting):
+ """
+ Set a pin on a directory.
+ """
+ assert pin_type in _pin_xattr
+
+ try:
+ pin_setting = _pin_value[pin_type](pin_setting)
+ except ValueError as e:
+ raise VolumeException(-errno.EINVAL, f"pin value wrong type: {pin_setting}")
+
+ try:
+ fs.setxattr(path, _pin_xattr[pin_type], str(pin_setting).encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
diff --git a/src/pybind/mgr/volumes/fs/operations/rankevicter.py b/src/pybind/mgr/volumes/fs/operations/rankevicter.py
new file mode 100644
index 000000000..5b945c389
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/rankevicter.py
@@ -0,0 +1,114 @@
+import errno
+import json
+import logging
+import threading
+import time
+
+from .volume import get_mds_map
+from ..exception import ClusterTimeout, ClusterError
+
+log = logging.getLogger(__name__)
+
+class RankEvicter(threading.Thread):
+ """
+ Thread for evicting client(s) from a particular MDS daemon instance.
+
+ This is more complex than simply sending a command, because we have to
+ handle cases where MDS daemons might not be fully up yet, and/or might
+ be transiently unresponsive to commands.
+ """
+ class GidGone(Exception):
+ pass
+
+ POLL_PERIOD = 5
+
+ def __init__(self, mgr, fs, client_spec, volname, rank, gid, mds_map, ready_timeout):
+ """
+ :param client_spec: list of strings, used as filter arguments to "session evict"
+ pass ["id=123"] to evict a single client with session id 123.
+ """
+ self.volname = volname
+ self.rank = rank
+ self.gid = gid
+ self._mds_map = mds_map
+ self._client_spec = client_spec
+ self._fs = fs
+ self._ready_timeout = ready_timeout
+ self._ready_waited = 0
+ self.mgr = mgr
+
+ self.success = False
+ self.exception = None
+
+ super(RankEvicter, self).__init__()
+
+ def _ready_to_evict(self):
+ if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
+ log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
+ self._client_spec, self.rank, self.gid
+ ))
+ raise RankEvicter.GidGone()
+
+ info = self._mds_map['info']["gid_{0}".format(self.gid)]
+ log.debug("_ready_to_evict: state={0}".format(info['state']))
+ return info['state'] in ["up:active", "up:clientreplay"]
+
+ def _wait_for_ready(self):
+ """
+ Wait for that MDS rank to reach an active or clientreplay state, and
+ not be laggy.
+ """
+ while not self._ready_to_evict():
+ if self._ready_waited > self._ready_timeout:
+ raise ClusterTimeout()
+
+ time.sleep(self.POLL_PERIOD)
+ self._ready_waited += self.POLL_PERIOD
+ self._mds_map = get_mds_map(self.mgr, self.volname)
+
+ def _evict(self):
+ """
+ Run the eviction procedure. Return true on success, false on errors.
+ """
+
+ # Wait til the MDS is believed by the mon to be available for commands
+ try:
+ self._wait_for_ready()
+ except self.GidGone:
+ return True
+
+ # Then send it an evict
+ ret = -errno.ETIMEDOUT
+ while ret == -errno.ETIMEDOUT:
+ log.debug("mds_command: {0}, {1}".format(
+ "%s" % self.gid, ["session", "evict"] + self._client_spec
+ ))
+ ret, outb, outs = self._fs.mds_command(
+ "%s" % self.gid,
+ json.dumps({
+ "prefix": "session evict",
+ "filters": self._client_spec
+ }), "")
+ log.debug("mds_command: complete {0} {1}".format(ret, outs))
+
+ # If we get a clean response, great, it's gone from that rank.
+ if ret == 0:
+ return True
+ elif ret == -errno.ETIMEDOUT:
+ # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
+ self._mds_map = get_mds_map(self.mgr, self.volname)
+ try:
+ self._wait_for_ready()
+ except self.GidGone:
+ return True
+ else:
+ raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
+
+ def run(self):
+ try:
+ self._evict()
+ except Exception as e:
+ self.success = False
+ self.exception = e
+ else:
+ self.success = True
diff --git a/src/pybind/mgr/volumes/fs/operations/resolver.py b/src/pybind/mgr/volumes/fs/operations/resolver.py
new file mode 100644
index 000000000..a9543654e
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/resolver.py
@@ -0,0 +1,26 @@
+import os
+
+from .group import Group
+
+def splitall(path):
+ if path == "/":
+ return ["/"]
+ s = os.path.split(path)
+ return splitall(s[0]) + [s[1]]
+
+def resolve(vol_spec, path):
+ parts = splitall(path)
+ if len(parts) != 4 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix:
+ return None
+ groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+ subvolname = parts[3]
+ return (groupname, subvolname)
+
+def resolve_trash(vol_spec, path):
+ parts = splitall(path)
+ if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \
+ parts[4] != '.trash':
+ return None
+ groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+ subvolname = parts[3]
+ return (groupname, subvolname)
diff --git a/src/pybind/mgr/volumes/fs/operations/snapshot_util.py b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py
new file mode 100644
index 000000000..2223c58e5
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py
@@ -0,0 +1,30 @@
+import os
+import errno
+
+import cephfs
+
+from ..exception import VolumeException
+
+def mksnap(fs, snappath):
+ """
+ Create a snapshot, or do nothing if it already exists.
+ """
+ try:
+ # snap create does not accept mode -- use default
+ fs.mkdir(snappath, 0o755)
+ except cephfs.ObjectExists:
+ return
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+def rmsnap(fs, snappath):
+ """
+ Remove a snapshot
+ """
+ try:
+ fs.stat(snappath)
+ fs.rmdir(snappath)
+ except cephfs.ObjectNotFound:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(os.path.basename(snappath)))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
diff --git a/src/pybind/mgr/volumes/fs/operations/subvolume.py b/src/pybind/mgr/volumes/fs/operations/subvolume.py
new file mode 100644
index 000000000..dc36477b5
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/subvolume.py
@@ -0,0 +1,74 @@
+import os
+import errno
+from contextlib import contextmanager
+
+from ..exception import VolumeException
+from .template import SubvolumeOpType
+
+from .versions import loaded_subvolumes
+
+def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid):
+ """
+ create a subvolume (create a subvolume with the max known version).
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the subvolume
+ :param size: In bytes, or None for no size limit
+ :param isolate_nspace: If true, use separate RADOS namespace for this subvolume
+ :param pool: the RADOS pool where the data objects of the subvolumes will be stored
+ :param mode: the user permissions
+ :param uid: the user identifier
+ :param gid: the group identifier
+ :return: None
+ """
+ subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname)
+ subvolume.create(size, isolate_nspace, pool, mode, uid, gid)
+
+def create_clone(mgr, fs, vol_spec, group, subvolname, pool, source_volume, source_subvolume, snapname):
+ """
+ create a cloned subvolume.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the clone
+ :param subvolname: clone subvolume nam
+ :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored
+ :param source_volume: source subvolumes volume name
+ :param source_subvolume: source (parent) subvolume object
+ :param snapname: source subvolume snapshot
+ :return None
+ """
+ subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname)
+ subvolume.create_clone(pool, source_volume, source_subvolume, snapname)
+
+def remove_subvol(mgr, fs, vol_spec, group, subvolname, force=False, retainsnaps=False):
+ """
+ remove a subvolume.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the subvolume
+ :param subvolname: subvolume name
+ :param force: force remove subvolumes
+ :return: None
+ """
+ op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE
+ with open_subvol(mgr, fs, vol_spec, group, subvolname, op_type) as subvolume:
+ subvolume.remove(retainsnaps)
+
+@contextmanager
+def open_subvol(mgr, fs, vol_spec, group, subvolname, op_type):
+ """
+ open a subvolume. This API is to be used as a context manager.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the subvolume
+ :param subvolname: subvolume name
+ :param op_type: operation type for which subvolume is being opened
+ :return: yields a subvolume object (subclass of SubvolumeTemplate)
+ """
+ subvolume = loaded_subvolumes.get_subvolume_object(mgr, fs, vol_spec, group, subvolname)
+ subvolume.open(op_type)
+ yield subvolume
diff --git a/src/pybind/mgr/volumes/fs/operations/template.py b/src/pybind/mgr/volumes/fs/operations/template.py
new file mode 100644
index 000000000..eb55bd743
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/template.py
@@ -0,0 +1,191 @@
+import errno
+
+from enum import Enum, unique
+
+from ..exception import VolumeException
+
+class GroupTemplate(object):
+ def list_subvolumes(self):
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create_snapshot(self, snapname):
+ """
+ create a subvolume group snapshot.
+
+ :param: group snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def remove_snapshot(self, snapname):
+ """
+ remove a subvolume group snapshot.
+
+ :param: group snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def list_snapshots(self):
+ """
+ list all subvolume group snapshots.
+
+ :param: None
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+@unique
+class SubvolumeOpType(Enum):
+ CREATE = 'create'
+ REMOVE = 'rm'
+ REMOVE_FORCE = 'rm-force'
+ PIN = 'pin'
+ LIST = 'ls'
+ GETPATH = 'getpath'
+ INFO = 'info'
+ RESIZE = 'resize'
+ SNAP_CREATE = 'snap-create'
+ SNAP_REMOVE = 'snap-rm'
+ SNAP_LIST = 'snap-ls'
+ SNAP_INFO = 'snap-info'
+ SNAP_PROTECT = 'snap-protect'
+ SNAP_UNPROTECT = 'snap-unprotect'
+ CLONE_SOURCE = 'clone-source'
+ CLONE_CREATE = 'clone-create'
+ CLONE_STATUS = 'clone-status'
+ CLONE_CANCEL = 'clone-cancel'
+ CLONE_INTERNAL = 'clone_internal'
+ ALLOW_ACCESS = 'allow-access'
+ DENY_ACCESS = 'deny-access'
+ AUTH_LIST = 'auth-list'
+ EVICT = 'evict'
+ USER_METADATA_SET = 'user-metadata-set'
+ USER_METADATA_GET = 'user-metadata-get'
+ USER_METADATA_LIST = 'user-metadata-ls'
+ USER_METADATA_REMOVE = 'user-metadata-rm'
+ SNAP_METADATA_SET = 'snap-metadata-set'
+ SNAP_METADATA_GET = 'snap-metadata-get'
+ SNAP_METADATA_LIST = 'snap-metadata-ls'
+ SNAP_METADATA_REMOVE = 'snap-metadata-rm'
+
+class SubvolumeTemplate(object):
+ VERSION = None # type: int
+
+ @staticmethod
+ def version():
+ return SubvolumeTemplate.VERSION
+
+ def open(self, op_type):
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def status(self):
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ """
+ set up metadata, pools and auth for a subvolume.
+
+ This function is idempotent. It is safe to call this again
+ for an already-created subvolume, even if it is in use.
+
+ :param size: In bytes, or None for no size limit
+ :param isolate_nspace: If true, use separate RADOS namespace for this subvolume
+ :param pool: the RADOS pool where the data objects of the subvolumes will be stored
+ :param mode: the user permissions
+ :param uid: the user identifier
+ :param gid: the group identifier
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ """
+ prepare a subvolume to be cloned.
+
+ :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored
+ :param source_volname: source volume of snapshot
+ :param source_subvolume: source subvolume of snapshot
+ :param snapname: snapshot name to be cloned from
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def remove(self):
+ """
+ make a subvolume inaccessible to guests.
+
+ This function is idempotent. It is safe to call this again
+
+ :param: None
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def resize(self, newsize, nshrink):
+ """
+ resize a subvolume
+
+ :param newsize: new size In bytes (or inf/infinite)
+ :return: new quota size and used bytes as a tuple
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def pin(self, pin_type, pin_setting):
+ """
+ pin a subvolume
+
+ :param pin_type: type of pin
+ :param pin_setting: setting for pin
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create_snapshot(self, snapname):
+ """
+ snapshot a subvolume.
+
+ :param: subvolume snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def remove_snapshot(self, snapname):
+ """
+ remove a subvolume snapshot.
+
+ :param: subvolume snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def list_snapshots(self):
+ """
+ list all subvolume snapshots.
+
+ :param: None
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def attach_snapshot(self, snapname, tgt_subvolume):
+ """
+ attach a snapshot to a target cloned subvolume. the target subvolume
+ should be an empty subvolume (type "clone") in "pending" state.
+
+ :param: snapname: snapshot to attach to a clone
+ :param: tgt_subvolume: target clone subvolume
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def detach_snapshot(self, snapname, tgt_subvolume):
+ """
+ detach a snapshot from a target cloned subvolume. the target subvolume
+ should either be in "failed" or "completed" state.
+
+ :param: snapname: snapshot to detach from a clone
+ :param: tgt_subvolume: target clone subvolume
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
diff --git a/src/pybind/mgr/volumes/fs/operations/trash.py b/src/pybind/mgr/volumes/fs/operations/trash.py
new file mode 100644
index 000000000..66f1d71cf
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/trash.py
@@ -0,0 +1,145 @@
+import os
+import uuid
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .template import GroupTemplate
+from ..fs_util import listdir
+from ..exception import VolumeException
+
+log = logging.getLogger(__name__)
+
+class Trash(GroupTemplate):
+ GROUP_NAME = "_deleting"
+
+ def __init__(self, fs, vol_spec):
+ self.fs = fs
+ self.vol_spec = vol_spec
+ self.groupname = Trash.GROUP_NAME
+
+ @property
+ def path(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8'))
+
+ @property
+ def unique_trash_path(self):
+ """
+ return a unique trash directory entry path
+ """
+ return os.path.join(self.path, str(uuid.uuid4()).encode('utf-8'))
+
+ def _get_single_dir_entry(self, exclude_list=[]):
+ exclude_list.extend((b".", b".."))
+ try:
+ with self.fs.opendir(self.path) as d:
+ entry = self.fs.readdir(d)
+ while entry:
+ if entry.d_name not in exclude_list:
+ return entry.d_name
+ entry = self.fs.readdir(d)
+ return None
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def get_trash_entry(self, exclude_list):
+ """
+ get a trash entry excluding entries provided.
+
+ :praram exclude_list: entries to exclude
+ :return: trash entry
+ """
+ return self._get_single_dir_entry(exclude_list)
+
+ def purge(self, trashpath, should_cancel):
+ """
+ purge a trash entry.
+
+ :praram trash_entry: the trash entry to purge
+ :praram should_cancel: callback to check if the purge should be aborted
+ :return: None
+ """
+ def rmtree(root_path):
+ log.debug("rmtree {0}".format(root_path))
+ try:
+ with self.fs.opendir(root_path) as dir_handle:
+ d = self.fs.readdir(dir_handle)
+ while d and not should_cancel():
+ if d.d_name not in (b".", b".."):
+ d_full = os.path.join(root_path, d.d_name)
+ if d.is_dir():
+ rmtree(d_full)
+ else:
+ self.fs.unlink(d_full)
+ d = self.fs.readdir(dir_handle)
+ except cephfs.ObjectNotFound:
+ return
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ # remove the directory only if we were not asked to cancel
+ # (else we would fail to remove this anyway)
+ if not should_cancel():
+ self.fs.rmdir(root_path)
+
+ # catch any unlink errors
+ try:
+ rmtree(trashpath)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def dump(self, path):
+ """
+ move an filesystem entity to trash can.
+
+ :praram path: the filesystem path to be moved
+ :return: None
+ """
+ try:
+ self.fs.rename(path, self.unique_trash_path)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def link(self, path, bname):
+ pth = os.path.join(self.path, bname)
+ try:
+ self.fs.symlink(path, pth)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def delink(self, bname):
+ pth = os.path.join(self.path, bname)
+ try:
+ self.fs.unlink(pth)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+def create_trashcan(fs, vol_spec):
+ """
+ create a trash can.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :return: None
+ """
+ trashcan = Trash(fs, vol_spec)
+ try:
+ fs.mkdirs(trashcan.path, 0o700)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_trashcan(fs, vol_spec):
+ """
+ open a trash can. This API is to be used as a context manager.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :return: yields a trash can object (subclass of GroupTemplate)
+ """
+ trashcan = Trash(fs, vol_spec)
+ try:
+ fs.stat(trashcan.path)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ yield trashcan
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
new file mode 100644
index 000000000..544afa165
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
@@ -0,0 +1,112 @@
+import errno
+import logging
+import importlib
+
+import cephfs
+
+from .subvolume_base import SubvolumeBase
+from .subvolume_attrs import SubvolumeTypes
+from .subvolume_v1 import SubvolumeV1
+from .subvolume_v2 import SubvolumeV2
+from .metadata_manager import MetadataManager
+from .op_sm import SubvolumeOpSm
+from ..template import SubvolumeOpType
+from ...exception import MetadataMgrException, OpSmException, VolumeException
+
+log = logging.getLogger(__name__)
+
+class SubvolumeLoader(object):
+ INVALID_VERSION = -1
+
+ SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1', 'subvolume_v2.SubvolumeV2']
+
+ def __init__(self):
+ self.max_version = SubvolumeLoader.INVALID_VERSION
+ self.versions = {}
+
+ def _load_module(self, mod_cls):
+ mod_name, cls_name = mod_cls.split('.')
+ mod = importlib.import_module('.versions.{0}'.format(mod_name), package='volumes.fs.operations')
+ return getattr(mod, cls_name)
+
+ def _load_supported_versions(self):
+ for mod_cls in SubvolumeLoader.SUPPORTED_MODULES:
+ cls = self._load_module(mod_cls)
+ log.info("loaded v{0} subvolume".format(cls.version()))
+ if self.max_version is not None or cls.version() > self.max_version:
+ self.max_version = cls.version()
+ self.versions[cls.version()] = cls
+ if self.max_version == SubvolumeLoader.INVALID_VERSION:
+ raise VolumeException(-errno.EINVAL, "no subvolume version available")
+ log.info("max subvolume version is v{0}".format(self.max_version))
+
+ def _get_subvolume_version(self, version):
+ try:
+ return self.versions[version]
+ except KeyError:
+ raise VolumeException(-errno.EINVAL, "subvolume class v{0} does not exist".format(version))
+
+ def get_subvolume_object_max(self, mgr, fs, vol_spec, group, subvolname):
+ return self._get_subvolume_version(self.max_version)(mgr, fs, vol_spec, group, subvolname)
+
+ def upgrade_to_v2_subvolume(self, subvolume):
+ # legacy mode subvolumes cannot be upgraded to v2
+ if subvolume.legacy_mode:
+ return
+
+ version = int(subvolume.metadata_mgr.get_global_option('version'))
+ if version >= SubvolumeV2.version():
+ return
+
+ v1_subvolume = self._get_subvolume_version(version)(subvolume.mgr, subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname)
+ try:
+ v1_subvolume.open(SubvolumeOpType.SNAP_LIST)
+ except VolumeException as ve:
+ # if volume is not ready for snapshot listing, do not upgrade at present
+ if ve.errno == -errno.EAGAIN:
+ return
+ raise
+
+ # v1 subvolumes with snapshots cannot be upgraded to v2
+ if v1_subvolume.list_snapshots():
+ return
+
+ subvolume.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_VERSION, SubvolumeV2.version())
+ subvolume.metadata_mgr.flush()
+
+ def upgrade_legacy_subvolume(self, fs, subvolume):
+ assert subvolume.legacy_mode
+ try:
+ fs.mkdirs(subvolume.legacy_dir, 0o700)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], "error accessing subvolume")
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+ qpath = subvolume.base_path.decode('utf-8')
+ # legacy is only upgradable to v1
+ subvolume.init_config(SubvolumeV1.version(), subvolume_type, qpath, initial_state)
+
+ def get_subvolume_object(self, mgr, fs, vol_spec, group, subvolname, upgrade=True):
+ subvolume = SubvolumeBase(mgr, fs, vol_spec, group, subvolname)
+ try:
+ subvolume.discover()
+ self.upgrade_to_v2_subvolume(subvolume)
+ version = int(subvolume.metadata_mgr.get_global_option('version'))
+ subvolume_version_object = self._get_subvolume_version(version)(mgr, fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode)
+ subvolume_version_object.metadata_mgr.refresh()
+ subvolume_version_object.clean_stale_snapshot_metadata()
+ return subvolume_version_object
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT and upgrade:
+ self.upgrade_legacy_subvolume(fs, subvolume)
+ return self.get_subvolume_object(mgr, fs, vol_spec, group, subvolname, upgrade=False)
+ else:
+ # log the actual error and generalize error string returned to user
+ log.error("error accessing subvolume metadata for '{0}' ({1})".format(subvolname, me))
+ raise VolumeException(-errno.EINVAL, "error accessing subvolume metadata")
+
+loaded_subvolumes = SubvolumeLoader()
+loaded_subvolumes._load_supported_versions()
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py
new file mode 100644
index 000000000..259dcd0e0
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py
@@ -0,0 +1,208 @@
+from contextlib import contextmanager
+import os
+import fcntl
+import json
+import logging
+import struct
+import uuid
+
+import cephfs
+
+from ..group import Group
+
+log = logging.getLogger(__name__)
+
+class AuthMetadataError(Exception):
+ pass
+
+class AuthMetadataManager(object):
+
+ # Current version
+ version = 6
+
+ # Filename extensions for meta files.
+ META_FILE_EXT = ".meta"
+ DEFAULT_VOL_PREFIX = "/volumes"
+
+ def __init__(self, fs):
+ self.fs = fs
+ self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
+ self.volume_prefix = self.DEFAULT_VOL_PREFIX
+
+ def _to_bytes(self, param):
+ '''
+ Helper method that returns byte representation of the given parameter.
+ '''
+ if isinstance(param, str):
+ return param.encode('utf-8')
+ elif param is None:
+ return param
+ else:
+ return str(param).encode('utf-8')
+
+ def _subvolume_metadata_path(self, group_name, subvol_name):
+ return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
+ group_name if group_name != Group.NO_GROUP_NAME else "",
+ subvol_name,
+ self.META_FILE_EXT))
+
+ def _check_compat_version(self, compat_version):
+ if self.version < compat_version:
+ msg = ("The current version of AuthMetadataManager, version {0} "
+ "does not support the required feature. Need version {1} "
+ "or greater".format(self.version, compat_version)
+ )
+ log.error(msg)
+ raise AuthMetadataError(msg)
+
+ def _metadata_get(self, path):
+ """
+ Return a deserialized JSON object, or None
+ """
+ fd = self.fs.open(path, "r")
+ # TODO iterate instead of assuming file < 4MB
+ read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+ self.fs.close(fd)
+ if read_bytes:
+ return json.loads(read_bytes.decode())
+ else:
+ return None
+
+ def _metadata_set(self, path, data):
+ serialized = json.dumps(data)
+ fd = self.fs.open(path, "w")
+ try:
+ self.fs.write(fd, self._to_bytes(serialized), 0)
+ self.fs.fsync(fd, 0)
+ finally:
+ self.fs.close(fd)
+
+ def _lock(self, path):
+ @contextmanager
+ def fn():
+ while(1):
+ fd = self.fs.open(path, os.O_CREAT, 0o755)
+ self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+
+ # The locked file will be cleaned up sometime. It could be
+ # unlinked by consumer e.g., an another manila-share service
+ # instance, before lock was applied on it. Perform checks to
+ # ensure that this does not happen.
+ try:
+ statbuf = self.fs.stat(path)
+ except cephfs.ObjectNotFound:
+ self.fs.close(fd)
+ continue
+
+ fstatbuf = self.fs.fstat(fd)
+ if statbuf.st_ino == fstatbuf.st_ino:
+ break
+
+ try:
+ yield
+ finally:
+ self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+ self.fs.close(fd)
+
+ return fn()
+
+ def _auth_metadata_path(self, auth_id):
+ return os.path.join(self.volume_prefix, "${0}{1}".format(
+ auth_id, self.META_FILE_EXT))
+
+ def auth_lock(self, auth_id):
+ return self._lock(self._auth_metadata_path(auth_id))
+
+ def auth_metadata_get(self, auth_id):
+ """
+ Call me with the metadata locked!
+
+ Check whether a auth metadata structure can be decoded by the current
+ version of AuthMetadataManager.
+
+ Return auth metadata that the current version of AuthMetadataManager
+ can decode.
+ """
+ auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
+
+ if auth_metadata:
+ self._check_compat_version(auth_metadata['compat_version'])
+
+ return auth_metadata
+
+ def auth_metadata_set(self, auth_id, data):
+ """
+ Call me with the metadata locked!
+
+ Fsync the auth metadata.
+
+ Add two version attributes to the auth metadata,
+ 'compat_version', the minimum AuthMetadataManager version that can
+ decode the metadata, and 'version', the AuthMetadataManager version
+ that encoded the metadata.
+ """
+ data['compat_version'] = 6
+ data['version'] = self.version
+ return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+ def create_subvolume_metadata_file(self, group_name, subvol_name):
+ """
+ Create a subvolume metadata file, if it does not already exist, to store
+ data about auth ids having access to the subvolume
+ """
+ fd = self.fs.open(self._subvolume_metadata_path(group_name, subvol_name),
+ os.O_CREAT, 0o755)
+ self.fs.close(fd)
+
+ def delete_subvolume_metadata_file(self, group_name, subvol_name):
+ vol_meta_path = self._subvolume_metadata_path(group_name, subvol_name)
+ try:
+ self.fs.unlink(vol_meta_path)
+ except cephfs.ObjectNotFound:
+ pass
+
+ def subvol_metadata_lock(self, group_name, subvol_name):
+ """
+ Return a ContextManager which locks the authorization metadata for
+ a particular subvolume, and persists a flag to the metadata indicating
+ that it is currently locked, so that we can detect dirty situations
+ during recovery.
+
+ This lock isn't just to make access to the metadata safe: it's also
+ designed to be used over the two-step process of checking the
+ metadata and then responding to an authorization request, to
+ ensure that at the point we respond the metadata hasn't changed
+ in the background. It's key to how we avoid security holes
+ resulting from races during that problem ,
+ """
+ return self._lock(self._subvolume_metadata_path(group_name, subvol_name))
+
+ def subvol_metadata_get(self, group_name, subvol_name):
+ """
+ Call me with the metadata locked!
+
+ Check whether a subvolume metadata structure can be decoded by the current
+ version of AuthMetadataManager.
+
+ Return a subvolume_metadata structure that the current version of
+ AuthMetadataManager can decode.
+ """
+ subvolume_metadata = self._metadata_get(self._subvolume_metadata_path(group_name, subvol_name))
+
+ if subvolume_metadata:
+ self._check_compat_version(subvolume_metadata['compat_version'])
+
+ return subvolume_metadata
+
+ def subvol_metadata_set(self, group_name, subvol_name, data):
+ """
+ Call me with the metadata locked!
+
+ Add two version attributes to the subvolume metadata,
+ 'compat_version', the minimum AuthMetadataManager version that can
+ decode the metadata and 'version', the AuthMetadataManager version
+ that encoded the metadata.
+ """
+ data['compat_version'] = 1
+ data['version'] = self.version
+ return self._metadata_set(self._subvolume_metadata_path(group_name, subvol_name), data)
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
new file mode 100644
index 000000000..718735d91
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
@@ -0,0 +1,200 @@
+import os
+import errno
+import logging
+import sys
+import threading
+import configparser
+import re
+
+import cephfs
+
+from ...exception import MetadataMgrException
+
+log = logging.getLogger(__name__)
+
+# _lock needs to be shared across all instances of MetadataManager.
+# that is why we have a file level instance
+_lock = threading.Lock()
+
+
+def _conf_reader(fs, fd, offset=0, length=4096):
+ while True:
+ buf = fs.read(fd, offset, length)
+ offset += len(buf)
+ if not buf:
+ return
+ yield buf.decode('utf-8')
+
+
+class _ConfigWriter:
+ def __init__(self, fs, fd):
+ self._fs = fs
+ self._fd = fd
+ self._wrote = 0
+
+ def write(self, value):
+ buf = value.encode('utf-8')
+ wrote = self._fs.write(self._fd, buf, -1)
+ self._wrote += wrote
+ return wrote
+
+ def fsync(self):
+ self._fs.fsync(self._fd, 0)
+
+ @property
+ def wrote(self):
+ return self._wrote
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self._fs.close(self._fd)
+
+
+class MetadataManager(object):
+ GLOBAL_SECTION = "GLOBAL"
+ USER_METADATA_SECTION = "USER_METADATA"
+ GLOBAL_META_KEY_VERSION = "version"
+ GLOBAL_META_KEY_TYPE = "type"
+ GLOBAL_META_KEY_PATH = "path"
+ GLOBAL_META_KEY_STATE = "state"
+
+ CLONE_FAILURE_SECTION = "CLONE_FAILURE"
+ CLONE_FAILURE_META_KEY_ERRNO = "errno"
+ CLONE_FAILURE_META_KEY_ERROR_MSG = "error_msg"
+
+ def __init__(self, fs, config_path, mode):
+ self.fs = fs
+ self.mode = mode
+ self.config_path = config_path
+ self.config = configparser.ConfigParser()
+
+ def refresh(self):
+ fd = None
+ try:
+ log.debug("opening config {0}".format(self.config_path))
+ with _lock:
+ fd = self.fs.open(self.config_path, os.O_RDONLY)
+ cfg = ''.join(_conf_reader(self.fs, fd))
+ self.config.read_string(cfg, source=self.config_path)
+ except UnicodeDecodeError:
+ raise MetadataMgrException(-errno.EINVAL,
+ "failed to decode, erroneous metadata config '{0}'".format(self.config_path))
+ except cephfs.ObjectNotFound:
+ raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path))
+ except cephfs.Error as e:
+ raise MetadataMgrException(-e.args[0], e.args[1])
+ except configparser.Error:
+ raise MetadataMgrException(-errno.EINVAL, "failed to parse, erroneous metadata config "
+ "'{0}'".format(self.config_path))
+ finally:
+ if fd is not None:
+ self.fs.close(fd)
+
+ def flush(self):
+ # cull empty sections
+ for section in list(self.config.sections()):
+ if len(self.config.items(section)) == 0:
+ self.config.remove_section(section)
+
+ try:
+ with _lock:
+ tmp_config_path = self.config_path + b'.tmp'
+ fd = self.fs.open(tmp_config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
+ with _ConfigWriter(self.fs, fd) as cfg_writer:
+ self.config.write(cfg_writer)
+ cfg_writer.fsync()
+ self.fs.rename(tmp_config_path, self.config_path)
+ log.info(f"wrote {cfg_writer.wrote} bytes to config {tmp_config_path}")
+ log.info(f"Renamed {tmp_config_path} to config {self.config_path}")
+ except cephfs.Error as e:
+ raise MetadataMgrException(-e.args[0], e.args[1])
+
+ def init(self, version, typ, path, state):
+ # you may init just once before refresh (helps to overwrite conf)
+ if self.config.has_section(MetadataManager.GLOBAL_SECTION):
+ raise MetadataMgrException(-errno.EINVAL, "init called on an existing config")
+
+ self.add_section(MetadataManager.GLOBAL_SECTION)
+ self.update_section_multi(
+ MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version),
+ MetadataManager.GLOBAL_META_KEY_TYPE : str(typ),
+ MetadataManager.GLOBAL_META_KEY_PATH : str(path),
+ MetadataManager.GLOBAL_META_KEY_STATE : str(state)
+ })
+
+ def add_section(self, section):
+ try:
+ self.config.add_section(section)
+ except configparser.DuplicateSectionError:
+ return
+ except:
+ raise MetadataMgrException(-errno.EINVAL, "error adding section to config")
+
+ def remove_option(self, section, key):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ return self.config.remove_option(section, key)
+
+ def remove_section(self, section):
+ self.config.remove_section(section)
+
+ def update_section(self, section, key, value):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ self.config.set(section, key, str(value))
+
+ def update_section_multi(self, section, dct):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ for key,value in dct.items():
+ self.config.set(section, key, str(value))
+
+ def update_global_section(self, key, value):
+ self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value))
+
+ def get_option(self, section, key):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ if not self.config.has_option(section, key):
+ raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section))
+ return self.config.get(section, key)
+
+ def get_global_option(self, key):
+ return self.get_option(MetadataManager.GLOBAL_SECTION, key)
+
+ def list_all_options_from_section(self, section):
+ metadata_dict = {}
+ if self.config.has_section(section):
+ options = self.config.options(section)
+ for option in options:
+ metadata_dict[option] = self.config.get(section,option)
+ return metadata_dict
+
+ def list_all_keys_with_specified_values_from_section(self, section, value):
+ keys = []
+ if self.config.has_section(section):
+ options = self.config.options(section)
+ for option in options:
+ if (value == self.config.get(section, option)) :
+ keys.append(option)
+ return keys
+
+ def section_has_item(self, section, item):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ return item in [v[1] for v in self.config.items(section)]
+
+ def has_snap_metadata_section(self):
+ sections = self.config.sections()
+ r = re.compile('SNAP_METADATA_.*')
+ for section in sections:
+ if r.match(section):
+ return True
+ return False
+
+ def list_snaps_with_metadata(self):
+ sections = self.config.sections()
+ r = re.compile('SNAP_METADATA_.*')
+ return [section[len("SNAP_METADATA_"):] for section in sections if r.match(section)]
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
new file mode 100644
index 000000000..1142600cb
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
@@ -0,0 +1,114 @@
+import errno
+
+from typing import Dict
+
+from ...exception import OpSmException
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions
+
+class TransitionKey(object):
+ def __init__(self, subvol_type, state, action_type):
+ self.transition_key = [subvol_type, state, action_type]
+
+ def __hash__(self):
+ return hash(tuple(self.transition_key))
+
+ def __eq__(self, other):
+ return self.transition_key == other.transition_key
+
+ def __neq__(self, other):
+ return not(self == other)
+
+class SubvolumeOpSm(object):
+ transition_table = {} # type: Dict
+
+ @staticmethod
+ def is_complete_state(state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeStates.STATE_COMPLETE
+
+ @staticmethod
+ def is_failed_state(state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED
+
+ @staticmethod
+ def is_init_state(stm_type, state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeOpSm.get_init_state(stm_type)
+
+ @staticmethod
+ def get_init_state(stm_type):
+ if not isinstance(stm_type, SubvolumeTypes):
+ raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+ init_state = SubvolumeOpSm.transition_table[TransitionKey(stm_type,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE)]
+ if not init_state:
+ raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type))
+ return init_state
+
+ @staticmethod
+ def transition(stm_type, current_state, action):
+ if not isinstance(stm_type, SubvolumeTypes):
+ raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+ if not isinstance(current_state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state))
+ if not isinstance(action, SubvolumeActions):
+ raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action))
+
+ transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)]
+ if not transition:
+ raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type))
+
+ return transition
+
+SubvolumeOpSm.transition_table = {
+ # state transitions for state machine type TYPE_NORMAL
+ TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE,
+
+ TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+ SubvolumeStates.STATE_COMPLETE,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ # state transitions for state machine type TYPE_CLONE
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_COMPLETE,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_CANCELED,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_FAILED,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+}
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py
new file mode 100644
index 000000000..ec7138cbd
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py
@@ -0,0 +1,61 @@
+import errno
+from enum import Enum, unique
+
+from ...exception import VolumeException
+
+@unique
+class SubvolumeTypes(Enum):
+ TYPE_NORMAL = "subvolume"
+ TYPE_CLONE = "clone"
+
+ @staticmethod
+ def from_value(value):
+ if value == "subvolume":
+ return SubvolumeTypes.TYPE_NORMAL
+ if value == "clone":
+ return SubvolumeTypes.TYPE_CLONE
+
+ raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value))
+
+@unique
+class SubvolumeStates(Enum):
+ STATE_INIT = 'init'
+ STATE_PENDING = 'pending'
+ STATE_INPROGRESS = 'in-progress'
+ STATE_FAILED = 'failed'
+ STATE_COMPLETE = 'complete'
+ STATE_CANCELED = 'canceled'
+ STATE_RETAINED = 'snapshot-retained'
+
+ @staticmethod
+ def from_value(value):
+ if value == "init":
+ return SubvolumeStates.STATE_INIT
+ if value == "pending":
+ return SubvolumeStates.STATE_PENDING
+ if value == "in-progress":
+ return SubvolumeStates.STATE_INPROGRESS
+ if value == "failed":
+ return SubvolumeStates.STATE_FAILED
+ if value == "complete":
+ return SubvolumeStates.STATE_COMPLETE
+ if value == "canceled":
+ return SubvolumeStates.STATE_CANCELED
+ if value == "snapshot-retained":
+ return SubvolumeStates.STATE_RETAINED
+
+ raise VolumeException(-errno.EINVAL, "invalid state '{0}'".format(value))
+
+@unique
+class SubvolumeActions(Enum):
+ ACTION_NONE = 0
+ ACTION_SUCCESS = 1
+ ACTION_FAILED = 2
+ ACTION_CANCELLED = 3
+ ACTION_RETAINED = 4
+
+@unique
+class SubvolumeFeatures(Enum):
+ FEATURE_SNAPSHOT_CLONE = "snapshot-clone"
+ FEATURE_SNAPSHOT_RETENTION = "snapshot-retention"
+ FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect"
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
new file mode 100644
index 000000000..72fc45a42
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
@@ -0,0 +1,449 @@
+import os
+import stat
+import uuid
+import errno
+import logging
+import hashlib
+from typing import Dict, Union
+from pathlib import Path
+
+import cephfs
+
+from ..pin_util import pin
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates
+from .metadata_manager import MetadataManager
+from ..trash import create_trashcan, open_trashcan
+from ...fs_util import get_ancestor_xattr
+from ...exception import MetadataMgrException, VolumeException
+from .op_sm import SubvolumeOpSm
+from .auth_metadata import AuthMetadataManager
+from .subvolume_attrs import SubvolumeStates
+
+log = logging.getLogger(__name__)
+
+class SubvolumeBase(object):
+ LEGACY_CONF_DIR = "_legacy"
+
+ def __init__(self, mgr, fs, vol_spec, group, subvolname, legacy=False):
+ self.mgr = mgr
+ self.fs = fs
+ self.auth_mdata_mgr = AuthMetadataManager(fs)
+ self.cmode = None
+ self.user_id = None
+ self.group_id = None
+ self.vol_spec = vol_spec
+ self.group = group
+ self.subvolname = subvolname
+ self.legacy_mode = legacy
+ self.load_config()
+
+ @property
+ def uid(self):
+ return self.user_id
+
+ @uid.setter
+ def uid(self, val):
+ self.user_id = val
+
+ @property
+ def gid(self):
+ return self.group_id
+
+ @gid.setter
+ def gid(self, val):
+ self.group_id = val
+
+ @property
+ def mode(self):
+ return self.cmode
+
+ @mode.setter
+ def mode(self, val):
+ self.cmode = val
+
+ @property
+ def base_path(self):
+ return os.path.join(self.group.path, self.subvolname.encode('utf-8'))
+
+ @property
+ def config_path(self):
+ return os.path.join(self.base_path, b".meta")
+
+ @property
+ def legacy_dir(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), SubvolumeBase.LEGACY_CONF_DIR.encode('utf-8'))
+
+ @property
+ def legacy_config_path(self):
+ try:
+ m = hashlib.md5(self.base_path)
+ except ValueError:
+ try:
+ m = hashlib.md5(self.base_path, usedforsecurity=False) # type: ignore
+ except TypeError:
+ raise VolumeException(-errno.EINVAL,
+ "require python's hashlib library to support usedforsecurity flag in FIPS enabled systems")
+
+ meta_config = "{0}.meta".format(m.hexdigest())
+ return os.path.join(self.legacy_dir, meta_config.encode('utf-8'))
+
+ @property
+ def namespace(self):
+ return "{0}{1}".format(self.vol_spec.fs_namespace, self.subvolname)
+
+ @property
+ def group_name(self):
+ return self.group.group_name
+
+ @property
+ def subvol_name(self):
+ return self.subvolname
+
+ @property
+ def legacy_mode(self):
+ return self.legacy
+
+ @legacy_mode.setter
+ def legacy_mode(self, mode):
+ self.legacy = mode
+
+ @property
+ def path(self):
+ """ Path to subvolume data directory """
+ raise NotImplementedError
+
+ @property
+ def features(self):
+ """ List of features supported by the subvolume, containing items from SubvolumeFeatures """
+ raise NotImplementedError
+
+ @property
+ def state(self):
+ """ Subvolume state, one of SubvolumeStates """
+ return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+
+ @property
+ def subvol_type(self):
+ return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+
+ @property
+ def purgeable(self):
+ """ Boolean declaring if subvolume can be purged """
+ raise NotImplementedError
+
+ def clean_stale_snapshot_metadata(self):
+ """ Clean up stale snapshot metadata """
+ raise NotImplementedError
+
+ def load_config(self):
+ try:
+ self.fs.stat(self.legacy_config_path)
+ self.legacy_mode = True
+ except cephfs.Error as e:
+ pass
+
+ log.debug("loading config "
+ "'{0}' [mode: {1}]".format(self.subvolname, "legacy"
+ if self.legacy_mode else "new"))
+ if self.legacy_mode:
+ self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640)
+ else:
+ self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640)
+
+ def get_attrs(self, pathname):
+ # get subvolume attributes
+ attrs = {} # type: Dict[str, Union[int, str, None]]
+ stx = self.fs.statx(pathname,
+ cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+
+ attrs["uid"] = int(stx["uid"])
+ attrs["gid"] = int(stx["gid"])
+ attrs["mode"] = int(int(stx["mode"]) & ~stat.S_IFMT(stx["mode"]))
+
+ try:
+ attrs["data_pool"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.NoData:
+ attrs["data_pool"] = None
+
+ try:
+ attrs["pool_namespace"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.NoData:
+ attrs["pool_namespace"] = None
+
+ try:
+ attrs["quota"] = int(self.fs.getxattr(pathname, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ attrs["quota"] = None
+
+ return attrs
+
+ def set_attrs(self, path, attrs):
+ # set subvolume attributes
+ # set size
+ quota = attrs.get("quota")
+ if quota is not None:
+ try:
+ self.fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set pool layout
+ data_pool = attrs.get("data_pool")
+ if data_pool is not None:
+ try:
+ self.fs.setxattr(path, 'ceph.dir.layout.pool', data_pool.encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL,
+ "invalid pool layout '{0}' -- need a valid data pool".format(data_pool))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # isolate namespace
+ xattr_key = xattr_val = None
+ pool_namespace = attrs.get("pool_namespace")
+ if pool_namespace is not None:
+ # enforce security isolation, use separate namespace for this subvolume
+ xattr_key = 'ceph.dir.layout.pool_namespace'
+ xattr_val = pool_namespace
+ elif not data_pool:
+ # If subvolume's namespace layout is not set, then the subvolume's pool
+ # layout remains unset and will undesirably change with ancestor's
+ # pool layout changes.
+ xattr_key = 'ceph.dir.layout.pool'
+ xattr_val = None
+ try:
+ self.fs.getxattr(path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.NoData as e:
+ xattr_val = get_ancestor_xattr(self.fs, os.path.split(path)[0], "ceph.dir.layout.pool")
+ if xattr_key and xattr_val:
+ try:
+ self.fs.setxattr(path, xattr_key, xattr_val.encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set uid/gid
+ uid = attrs.get("uid")
+ if uid is None:
+ uid = self.group.uid
+ else:
+ try:
+ if uid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid UID")
+
+ gid = attrs.get("gid")
+ if gid is None:
+ gid = self.group.gid
+ else:
+ try:
+ if gid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid GID")
+
+ if uid is not None and gid is not None:
+ self.fs.chown(path, uid, gid)
+
+ # set mode
+ mode = attrs.get("mode", None)
+ if mode is not None:
+ self.fs.lchmod(path, mode)
+
+ def _resize(self, path, newsize, noshrink):
+ try:
+ newsize = int(newsize)
+ if newsize <= 0:
+ raise VolumeException(-errno.EINVAL, "Invalid subvolume size")
+ except ValueError:
+ newsize = newsize.lower()
+ if not (newsize == "inf" or newsize == "infinite"):
+ raise VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize))
+ newsize = 0
+ noshrink = False
+
+ try:
+ maxbytes = int(self.fs.getxattr(path, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ maxbytes = 0
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ subvolstat = self.fs.stat(path)
+ if newsize > 0 and newsize < subvolstat.st_size:
+ if noshrink:
+ raise VolumeException(-errno.EINVAL, "Can't resize the subvolume. The new size '{0}' would be lesser than the current "
+ "used size '{1}'".format(newsize, subvolstat.st_size))
+
+ if not newsize == maxbytes:
+ try:
+ self.fs.setxattr(path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], "Cannot set new size for the subvolume. '{0}'".format(e.args[1]))
+ return newsize, subvolstat.st_size
+
+ def pin(self, pin_type, pin_setting):
+ return pin(self.fs, self.base_path, pin_type, pin_setting)
+
+ def init_config(self, version, subvolume_type, subvolume_path, subvolume_state):
+ self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value)
+ self.metadata_mgr.flush()
+
+ def discover(self):
+ log.debug("discovering subvolume '{0}' [mode: {1}]".format(self.subvolname, "legacy" if self.legacy_mode else "new"))
+ try:
+ self.fs.stat(self.base_path)
+ self.metadata_mgr.refresh()
+ log.debug("loaded subvolume '{0}'".format(self.subvolname))
+ subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH)
+ # subvolume with retained snapshots has empty path, don't mistake it for
+ # fabricated metadata.
+ if (not self.legacy_mode and self.state != SubvolumeStates.STATE_RETAINED and
+ self.base_path.decode('utf-8') != str(Path(subvolpath).parent)):
+ raise MetadataMgrException(-errno.ENOENT, 'fabricated .meta')
+ except MetadataMgrException as me:
+ if me.errno in (-errno.ENOENT, -errno.EINVAL) and not self.legacy_mode:
+ log.warn("subvolume '{0}', {1}, "
+ "assuming legacy_mode".format(self.subvolname, me.error_str))
+ self.legacy_mode = True
+ self.load_config()
+ self.discover()
+ else:
+ raise
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname))
+
+ def _trash_dir(self, path):
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.dump(path)
+ log.info("subvolume path '{0}' moved to trashcan".format(path))
+
+ def _link_dir(self, path, bname):
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.link(path, bname)
+ log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname))
+
+ def trash_base_dir(self):
+ if self.legacy_mode:
+ self.fs.unlink(self.legacy_config_path)
+ self._trash_dir(self.base_path)
+
+ def create_base_dir(self, mode):
+ try:
+ self.fs.mkdirs(self.base_path, mode)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def info (self):
+ subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH)
+ etype = self.subvol_type
+ st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE |
+ cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID |
+ cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME |
+ cephfs.CEPH_STATX_MTIME | cephfs.CEPH_STATX_CTIME,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ usedbytes = st["size"]
+ try:
+ nsize = int(self.fs.getxattr(subvolpath, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ nsize = 0
+
+ try:
+ data_pool = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool').decode('utf-8')
+ pool_namespace = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]),
+ 'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]),
+ 'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]),
+ 'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes),
+ 'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0),
+ 'pool_namespace': pool_namespace, 'features': self.features, 'state': self.state.value}
+
+ def set_user_metadata(self, keyname, value):
+ try:
+ self.metadata_mgr.add_section(MetadataManager.USER_METADATA_SECTION)
+ self.metadata_mgr.update_section(MetadataManager.USER_METADATA_SECTION, keyname, str(value))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ log.error(f"Failed to set user metadata key={keyname} value={value} on subvolume={self.subvol_name} "
+ f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-me.args[0], me.args[1])
+
+ def get_user_metadata(self, keyname):
+ try:
+ value = self.metadata_mgr.get_option(MetadataManager.USER_METADATA_SECTION, keyname)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname))
+ raise VolumeException(-me.args[0], me.args[1])
+ return value
+
+ def list_user_metadata(self):
+ return self.metadata_mgr.list_all_options_from_section(MetadataManager.USER_METADATA_SECTION)
+
+ def remove_user_metadata(self, keyname):
+ try:
+ ret = self.metadata_mgr.remove_option(MetadataManager.USER_METADATA_SECTION, keyname)
+ if not ret:
+ raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume metadata does not exist")
+ log.error(f"Failed to remove user metadata key={keyname} on subvolume={self.subvol_name} "
+ f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-me.args[0], me.args[1])
+
+ def get_snap_section_name(self, snapname):
+ section = "SNAP_METADATA" + "_" + snapname;
+ return section;
+
+ def set_snapshot_metadata(self, snapname, keyname, value):
+ try:
+ section = self.get_snap_section_name(snapname)
+ self.metadata_mgr.add_section(section)
+ self.metadata_mgr.update_section(section, keyname, str(value))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ log.error(f"Failed to set snapshot metadata key={keyname} value={value} on snap={snapname} "
+ f"subvolume={self.subvol_name} group={self.group_name} "
+ f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-me.args[0], me.args[1])
+
+ def get_snapshot_metadata(self, snapname, keyname):
+ try:
+ value = self.metadata_mgr.get_option(self.get_snap_section_name(snapname), keyname)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname))
+ log.error(f"Failed to get snapshot metadata key={keyname} on snap={snapname} "
+ f"subvolume={self.subvol_name} group={self.group_name} "
+ f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-me.args[0], me.args[1])
+ return value
+
+ def list_snapshot_metadata(self, snapname):
+ return self.metadata_mgr.list_all_options_from_section(self.get_snap_section_name(snapname))
+
+ def remove_snapshot_metadata(self, snapname, keyname):
+ try:
+ ret = self.metadata_mgr.remove_option(self.get_snap_section_name(snapname), keyname)
+ if not ret:
+ raise VolumeException(-errno.ENOENT, "key '{0}' does not exist.".format(keyname))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "snapshot metadata not does not exist")
+ log.error(f"Failed to remove snapshot metadata key={keyname} on snap={snapname} "
+ f"subvolume={self.subvol_name} group={self.group_name} "
+ f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-me.args[0], me.args[1])
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
new file mode 100644
index 000000000..b5a10dd6c
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
@@ -0,0 +1,904 @@
+import os
+import sys
+import stat
+import uuid
+import errno
+import logging
+import json
+from datetime import datetime
+from typing import Any, List, Dict
+from pathlib import Path
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
+from .op_sm import SubvolumeOpSm
+from .subvolume_base import SubvolumeBase
+from ..template import SubvolumeTemplate
+from ..snapshot_util import mksnap, rmsnap
+from ..access import allow_access, deny_access
+from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError
+from ...fs_util import listsnaps, is_inherited_snap, create_base_dir
+from ..template import SubvolumeOpType
+from ..group import Group
+from ..rankevicter import RankEvicter
+from ..volume import get_mds_map
+
+from ..clone_index import open_clone_index, create_clone_index
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
+ """
+ Version 1 subvolumes creates a subvolume with path as follows,
+ volumes/<group-name>/<subvolume-name>/<uuid>/
+
+ - The directory under which user data resides is <uuid>
+ - Snapshots of the subvolume are taken within the <uuid> directory
+ - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
+ - global information about the subvolume (version, path, type, state)
+ - snapshots attached to an ongoing clone operation
+ - clone snapshot source if subvolume is a clone of a snapshot
+ - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
+ /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
+ component in the path.
+ """
+ VERSION = 1
+
+ @staticmethod
+ def version():
+ return SubvolumeV1.VERSION
+
+ @property
+ def path(self):
+ try:
+ # no need to stat the path -- open() does that
+ return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+
+ @property
+ def features(self):
+ return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
+
+ def mark_subvolume(self):
+ # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
+ # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
+ try:
+ # MDS treats this as a noop for already marked subvolume
+ self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def snapshot_base_path(self):
+ """ Base path for all snapshots """
+ return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+
+ def snapshot_path(self, snapname):
+ """ Path to a specific snapshot named 'snapname' """
+ return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8'))
+
+ def snapshot_data_path(self, snapname):
+ """ Path to user data directory within a subvolume snapshot named 'snapname' """
+ return self.snapshot_path(snapname)
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # create group directory with default mode(0o755) if it doesn't exist.
+ create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE)
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, mode)
+ self.mark_subvolume()
+ attrs = {
+ 'uid': uid,
+ 'gid': gid,
+ 'data_pool': pool,
+ 'pool_namespace': self.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata
+ qpath = subvol_path.decode('utf-8')
+ self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def add_clone_source(self, volname, subvolume, snapname, flush=False):
+ self.metadata_mgr.add_section("source")
+ self.metadata_mgr.update_section("source", "volume", volname)
+ if not subvolume.group.is_default_group():
+ self.metadata_mgr.update_section("source", "group", subvolume.group_name)
+ self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name)
+ self.metadata_mgr.update_section("source", "snapshot", snapname)
+ if flush:
+ self.metadata_mgr.flush()
+
+ def remove_clone_source(self, flush=False):
+ self.metadata_mgr.remove_section("source")
+ if flush:
+ self.metadata_mgr.flush()
+
+ def add_clone_failure(self, errno, error_msg):
+ try:
+ self.metadata_mgr.add_section(MetadataManager.CLONE_FAILURE_SECTION)
+ self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION,
+ MetadataManager.CLONE_FAILURE_META_KEY_ERRNO, errno)
+ self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION,
+ MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG, error_msg)
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ log.error(f"Failed to add clone failure status clone={self.subvol_name} group={self.group_name} "
+ f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ subvolume_type = SubvolumeTypes.TYPE_CLONE
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "clone failed: internal error")
+
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # source snapshot attrs are used to create clone subvolume.
+ # attributes of subvolume's content though, are synced during the cloning process.
+ attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
+
+ # The source of the clone may have exceeded its quota limit as
+ # CephFS quotas are imprecise. Cloning such a source may fail if
+ # the quota on the destination is set before starting the clone
+ # copy. So always set the quota on destination after cloning is
+ # successful.
+ attrs["quota"] = None
+
+ # override snapshot pool setting, if one is provided for the clone
+ if pool is not None:
+ attrs["data_pool"] = pool
+ attrs["pool_namespace"] = None
+
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, attrs.get("mode"))
+ self.mark_subvolume()
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata and clone source
+ qpath = subvol_path.decode('utf-8')
+ self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value)
+ self.add_clone_source(source_volname, source_subvolume, snapname)
+ self.metadata_mgr.flush()
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def allowed_ops_by_type(self, vol_type):
+ if vol_type == SubvolumeTypes.TYPE_CLONE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_type == SubvolumeTypes.TYPE_NORMAL:
+ return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ return {}
+
+ def allowed_ops_by_state(self, vol_state):
+ if vol_state == SubvolumeStates.STATE_COMPLETE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ return {SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.CLONE_CREATE,
+ SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ def open(self, op_type):
+ if not isinstance(op_type, SubvolumeOpType):
+ raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+ op_type.value, self.subvolname))
+ try:
+ self.metadata_mgr.refresh()
+
+ etype = self.subvol_type
+ if op_type not in self.allowed_ops_by_type(etype):
+ raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+ op_type.value, self.subvolname, etype.value))
+
+ estate = self.state
+ if op_type not in self.allowed_ops_by_state(estate):
+ raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+ self.subvolname, op_type.value))
+
+ subvol_path = self.path
+ log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+ st = self.fs.stat(subvol_path)
+ # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
+ self.mark_subvolume()
+
+ self.uid = int(st.st_uid)
+ self.gid = int(st.st_gid)
+ self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(me.args[0], me.args[1])
+ except cephfs.ObjectNotFound:
+ log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+ raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def _recover_auth_meta(self, auth_id, auth_meta):
+ """
+ Call me after locking the auth meta file.
+ """
+ remove_subvolumes = []
+
+ for subvol, subvol_data in auth_meta['subvolumes'].items():
+ if not subvol_data['dirty']:
+ continue
+
+ (group_name, subvol_name) = subvol.split('/')
+ group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME
+ access_level = subvol_data['access_level']
+
+ with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name)
+
+ # No SVMeta update indicates that there was no auth update
+ # in Ceph either. So it's safe to remove corresponding
+ # partial update in AMeta.
+ if not subvol_meta or auth_id not in subvol_meta['auths']:
+ remove_subvolumes.append(subvol)
+ continue
+
+ want_auth = {
+ 'access_level': access_level,
+ 'dirty': False,
+ }
+ # SVMeta update looks clean. Ceph auth update must have been
+ # clean. Update the dirty flag and continue
+ if subvol_meta['auths'][auth_id] == want_auth:
+ auth_meta['subvolumes'][subvol]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+ continue
+
+ client_entity = "client.{0}".format(auth_id)
+ ret, out, err = self.mgr.mon_command(
+ {
+ 'prefix': 'auth get',
+ 'entity': client_entity,
+ 'format': 'json'
+ })
+ if ret == 0:
+ existing_caps = json.loads(out)
+ elif ret == -errno.ENOENT:
+ existing_caps = None
+ else:
+ log.error(err)
+ raise VolumeException(ret, err)
+
+ self._authorize_subvolume(auth_id, access_level, existing_caps)
+
+ # Recovered from partial auth updates for the auth ID's access
+ # to a subvolume.
+ auth_meta['subvolumes'][subvol]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ for subvol in remove_subvolumes:
+ del auth_meta['subvolumes'][subvol]
+
+ if not auth_meta['subvolumes']:
+ # Clean up auth meta file
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ # Recovered from all partial auth updates for the auth ID.
+ auth_meta['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False):
+ """
+ Get-or-create a Ceph auth identity for `auth_id` and grant them access
+ to
+ :param auth_id:
+ :param access_level:
+ :param tenant_id: Optionally provide a stringizable object to
+ restrict any created cephx IDs to other callers
+ passing the same tenant ID.
+ :allow_existing_id: Optionally authorize existing auth-ids not
+ created by ceph_volume_client.
+ :return:
+ """
+
+ with self.auth_mdata_mgr.auth_lock(auth_id):
+ client_entity = "client.{0}".format(auth_id)
+ ret, out, err = self.mgr.mon_command(
+ {
+ 'prefix': 'auth get',
+ 'entity': client_entity,
+ 'format': 'json'
+ })
+
+ if ret == 0:
+ existing_caps = json.loads(out)
+ elif ret == -errno.ENOENT:
+ existing_caps = None
+ else:
+ log.error(err)
+ raise VolumeException(ret, err)
+
+ # Existing meta, or None, to be updated
+ auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+ # subvolume data to be inserted
+ group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+ group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+ subvolume = {
+ group_subvol_id : {
+ # The access level at which the auth_id is authorized to
+ # access the volume.
+ 'access_level': access_level,
+ 'dirty': True,
+ }
+ }
+
+ if auth_meta is None:
+ if not allow_existing_id and existing_caps is not None:
+ msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.EPERM, msg)
+
+ # non-existent auth IDs
+ sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+ auth_id, tenant_id
+ ))
+ log.debug("Authorize: no existing meta")
+ auth_meta = {
+ 'dirty': True,
+ 'tenant_id': str(tenant_id) if tenant_id else None,
+ 'subvolumes': subvolume
+ }
+ else:
+ # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
+ if 'volumes' in auth_meta:
+ auth_meta['subvolumes'] = auth_meta.pop('volumes')
+
+ # Disallow tenants to share auth IDs
+ if str(auth_meta['tenant_id']) != str(tenant_id):
+ msg = "auth ID: {0} is already in use".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.EPERM, msg)
+
+ if auth_meta['dirty']:
+ self._recover_auth_meta(auth_id, auth_meta)
+
+ log.debug("Authorize: existing tenant {tenant}".format(
+ tenant=auth_meta['tenant_id']
+ ))
+ auth_meta['dirty'] = True
+ auth_meta['subvolumes'].update(subvolume)
+
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ key = self._authorize_subvolume(auth_id, access_level, existing_caps)
+
+ auth_meta['dirty'] = False
+ auth_meta['subvolumes'][group_subvol_id]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ if tenant_id:
+ return key
+ else:
+ # Caller wasn't multi-tenant aware: be safe and don't give
+ # them a key
+ return ""
+
+ def _authorize_subvolume(self, auth_id, access_level, existing_caps):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+ auth = {
+ auth_id: {
+ 'access_level': access_level,
+ 'dirty': True,
+ }
+ }
+
+ if subvol_meta is None:
+ subvol_meta = {
+ 'auths': auth
+ }
+ else:
+ subvol_meta['auths'].update(auth)
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ key = self._authorize(auth_id, access_level, existing_caps)
+
+ subvol_meta['auths'][auth_id]['dirty'] = False
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ return key
+
+ def _authorize(self, auth_id, access_level, existing_caps):
+ subvol_path = self.path
+ log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path))
+
+ # First I need to work out what the data pool is for this share:
+ # read the layout
+ try:
+ pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ try:
+ namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.NoData:
+ namespace = None
+
+ # Now construct auth capabilities that give the guest just enough
+ # permissions to access the share
+ client_entity = "client.{0}".format(auth_id)
+ want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8'))
+ want_osd_cap = "allow {0} pool={1}{2}".format(
+ access_level, pool, " namespace={0}".format(namespace) if namespace else "")
+
+ # Construct auth caps that if present might conflict with the desired
+ # auth caps.
+ unwanted_access_level = 'r' if access_level == 'rw' else 'rw'
+ unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8'))
+ unwanted_osd_cap = "allow {0} pool={1}{2}".format(
+ unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "")
+
+ return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap,
+ unwanted_mds_cap, unwanted_osd_cap, existing_caps)
+
+ def deauthorize(self, auth_id):
+ with self.auth_mdata_mgr.auth_lock(auth_id):
+ # Existing meta, or None, to be updated
+ auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+ if auth_meta is None:
+ msg = "auth ID: {0} doesn't exist".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.ENOENT, msg)
+
+ # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
+ if 'volumes' in auth_meta:
+ auth_meta['subvolumes'] = auth_meta.pop('volumes')
+
+ group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+ group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+ if (auth_meta is None) or (not auth_meta['subvolumes']):
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ # Clean up the auth meta file of an auth ID
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ if group_subvol_id not in auth_meta['subvolumes']:
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ return
+
+ if auth_meta['dirty']:
+ self._recover_auth_meta(auth_id, auth_meta)
+
+ auth_meta['dirty'] = True
+ auth_meta['subvolumes'][group_subvol_id]['dirty'] = True
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ self._deauthorize_subvolume(auth_id)
+
+ # Filter out the volume we're deauthorizing
+ del auth_meta['subvolumes'][group_subvol_id]
+
+ # Clean up auth meta file
+ if not auth_meta['subvolumes']:
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ auth_meta['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ def _deauthorize_subvolume(self, auth_id):
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+ if (subvol_meta is None) or (auth_id not in subvol_meta['auths']):
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ return
+
+ subvol_meta['auths'][auth_id]['dirty'] = True
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ self._deauthorize(auth_id)
+
+ # Remove the auth_id from the metadata *after* removing it
+ # from ceph, so that if we crashed here, we would actually
+ # recreate the auth ID during recovery (i.e. end up with
+ # a consistent state).
+
+ # Filter out the auth we're removing
+ del subvol_meta['auths'][auth_id]
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ def _deauthorize(self, auth_id):
+ """
+ The volume must still exist.
+ """
+ client_entity = "client.{0}".format(auth_id)
+ subvol_path = self.path
+ try:
+ pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ try:
+ namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.NoData:
+ namespace = None
+
+ # The auth_id might have read-only or read-write mount access for the
+ # subvolume path.
+ access_levels = ('r', 'rw')
+ want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8'))
+ for access_level in access_levels]
+ want_osd_caps = ['allow {0} pool={1}{2}'.format(
+ access_level, pool_name, " namespace={0}".format(namespace) if namespace else "")
+ for access_level in access_levels]
+ deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps)
+
+ def authorized_list(self):
+ """
+ Expose a list of auth IDs that have access to a subvolume.
+
+ return: a list of (auth_id, access_level) tuples, where
+ the access_level can be 'r' , or 'rw'.
+ None if no auth ID is given access to the subvolume.
+ """
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+ auths = [] # type: List[Dict[str,str]]
+ if not meta or not meta['auths']:
+ return auths
+
+ for auth, auth_data in meta['auths'].items():
+ # Skip partial auth updates.
+ if not auth_data['dirty']:
+ auths.append({auth: auth_data['access_level']})
+
+ return auths
+
+ def evict(self, volname, auth_id, timeout=30):
+ """
+ Evict all clients based on the authorization ID and the subvolume path mounted.
+ Assumes that the authorization key has been revoked prior to calling this function.
+
+ This operation can throw an exception if the mon cluster is unresponsive, or
+ any individual MDS daemon is unresponsive for longer than the timeout passed in.
+ """
+
+ client_spec = ["auth_name={0}".format(auth_id), ]
+ client_spec.append("client_metadata.root={0}".
+ format(self.path.decode('utf-8')))
+
+ log.info("evict clients with {0}".format(', '.join(client_spec)))
+
+ mds_map = get_mds_map(self.mgr, volname)
+ if not mds_map:
+ raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname))
+
+ up = {}
+ for name, gid in mds_map['up'].items():
+ # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
+ assert name.startswith("mds_")
+ up[int(name[4:])] = gid
+
+ # For all MDS ranks held by a daemon
+ # Do the parallelism in python instead of using "tell mds.*", because
+ # the latter doesn't give us per-mds output
+ threads = []
+ for rank, gid in up.items():
+ thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout)
+ thread.start()
+ threads.append(thread)
+
+ for t in threads:
+ t.join()
+
+ log.info("evict: joined all")
+
+ for t in threads:
+ if not t.success:
+ msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
+ format(', '.join(client_spec), t.rank, t.gid, t.exception)
+ )
+ log.error(msg)
+ raise EvictionError(msg)
+
+ def _get_clone_source(self):
+ try:
+ clone_source = {
+ 'volume' : self.metadata_mgr.get_option("source", "volume"),
+ 'subvolume': self.metadata_mgr.get_option("source", "subvolume"),
+ 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"),
+ }
+
+ try:
+ clone_source["group"] = self.metadata_mgr.get_option("source", "group")
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ pass
+ else:
+ raise
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+ return clone_source
+
+ def _get_clone_failure(self):
+ clone_failure = {
+ 'errno' : self.metadata_mgr.get_option(MetadataManager.CLONE_FAILURE_SECTION, MetadataManager.CLONE_FAILURE_META_KEY_ERRNO),
+ 'error_msg' : self.metadata_mgr.get_option(MetadataManager.CLONE_FAILURE_SECTION, MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG),
+ }
+ return clone_failure
+
+ @property
+ def status(self):
+ state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+ subvolume_type = self.subvol_type
+ subvolume_status = {
+ 'state' : state.value
+ }
+ if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
+ subvolume_status["source"] = self._get_clone_source()
+ if SubvolumeOpSm.is_failed_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
+ try:
+ subvolume_status["failure"] = self._get_clone_failure()
+ except MetadataMgrException:
+ pass
+
+ return subvolume_status
+
+ @property
+ def state(self):
+ return super(SubvolumeV1, self).state
+
+ @state.setter
+ def state(self, val):
+ state = val[0].value
+ flush = val[1]
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
+ if flush:
+ self.metadata_mgr.flush()
+
+ def remove(self, retainsnaps=False):
+ if retainsnaps:
+ raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname))
+ if self.list_snapshots():
+ raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
+ self.trash_base_dir()
+
+ def resize(self, newsize, noshrink):
+ subvol_path = self.path
+ return self._resize(subvol_path, newsize, noshrink)
+
+ def create_snapshot(self, snapname):
+ try:
+ group_snapshot_path = os.path.join(self.group.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ self.fs.stat(group_snapshot_path)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ snappath = self.snapshot_path(snapname)
+ mksnap(self.fs, snappath)
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same")
+
+ def has_pending_clones(self, snapname):
+ try:
+ return self.metadata_mgr.section_has_item('clone snaps', snapname)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ return False
+ raise
+
+ def get_pending_clones(self, snapname):
+ pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any]
+ pending_track_id_list = []
+ pending_clone_list = []
+ index_path = ""
+ orphan_clones_count = 0
+
+ try:
+ if self.has_pending_clones(snapname):
+ pending_track_id_list = self.metadata_mgr.list_all_keys_with_specified_values_from_section('clone snaps', snapname)
+ else:
+ return pending_clones_info
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise VolumeException(-me.args[0], me.args[1])
+
+ try:
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ index_path = index.path.decode('utf-8')
+ except IndexException as e:
+ log.warning("failed to open clone index '{0}' for snapshot '{1}'".format(e, snapname))
+ raise VolumeException(-errno.EINVAL, "failed to open clone index")
+
+ for track_id in pending_track_id_list:
+ try:
+ link_path = self.fs.readlink(os.path.join(index_path, track_id), 4096)
+ except cephfs.Error as e:
+ if e.errno != errno.ENOENT:
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ try:
+ # If clone is completed between 'list_all_keys_with_specified_values_from_section'
+ # and readlink(track_id_path) call then readlink will fail with error ENOENT (2)
+ # Hence we double check whether track_id is exist in .meta file or not.
+ value = self.metadata_mgr.get_option('clone snaps', track_id)
+ # Edge case scenario.
+ # If track_id for clone exist but path /volumes/_index/clone/{track_id} not found
+ # then clone is orphan.
+ orphan_clones_count += 1
+ continue
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise VolumeException(-me.args[0], me.args[1])
+
+ path = Path(link_path.decode('utf-8'))
+ clone_name = os.path.basename(link_path).decode('utf-8')
+ group_name = os.path.basename(path.parent.absolute())
+ details = {"name": clone_name} # type: Dict[str, str]
+ if group_name != Group.NO_GROUP_NAME:
+ details["target_group"] = group_name
+ pending_clone_list.append(details)
+
+ if len(pending_clone_list) != 0:
+ pending_clones_info["has_pending_clones"] = "yes"
+ pending_clones_info["pending_clones"] = pending_clone_list
+ else:
+ pending_clones_info["has_pending_clones"] = "no"
+
+ if orphan_clones_count > 0:
+ pending_clones_info["orphan_clones_count"] = orphan_clones_count
+
+ return pending_clones_info
+
+ def remove_snapshot(self, snapname, force=False):
+ if self.has_pending_clones(snapname):
+ raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
+ snappath = self.snapshot_path(snapname)
+ try:
+ self.metadata_mgr.remove_section(self.get_snap_section_name(snapname))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ if force:
+ log.info(f"Allowing snapshot removal on failure of it's metadata removal with force on "
+ f"snap={snapname} subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, "
+ f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ pass
+ else:
+ log.error(f"Failed to remove snapshot metadata on snap={snapname} subvol={self.subvol_name} "
+ f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-errno.EAGAIN,
+ f"failed to remove snapshot metadata on snap={snapname} reason={me.args[0]} {me.args[1]}")
+ rmsnap(self.fs, snappath)
+
+ def snapshot_info(self, snapname):
+ if is_inherited_snap(snapname):
+ raise VolumeException(-errno.EINVAL,
+ "snapshot name '{0}' is invalid".format(snapname))
+ snappath = self.snapshot_data_path(snapname)
+ snap_info = {}
+ try:
+ snap_attrs = {'created_at':'ceph.snap.btime',
+ 'data_pool':'ceph.dir.layout.pool'}
+ for key, val in snap_attrs.items():
+ snap_info[key] = self.fs.getxattr(snappath, val)
+ pending_clones_info = self.get_pending_clones(snapname)
+ info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
+ 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any]
+ info_dict.update(pending_clones_info);
+ return info_dict
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT,
+ "snapshot '{0}' does not exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def list_snapshots(self):
+ try:
+ dirpath = self.snapshot_base_path()
+ return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return []
+ raise
+
+ def clean_stale_snapshot_metadata(self):
+ """ Clean up stale snapshot metadata """
+ if self.metadata_mgr.has_snap_metadata_section():
+ snap_list = self.list_snapshots()
+ snaps_with_metadata_list = self.metadata_mgr.list_snaps_with_metadata()
+ for snap_with_metadata in snaps_with_metadata_list:
+ if snap_with_metadata.encode('utf-8') not in snap_list:
+ try:
+ self.metadata_mgr.remove_section(self.get_snap_section_name(snap_with_metadata))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ log.error(f"Failed to remove stale snap metadata on snap={snap_with_metadata} "
+ f"subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, "
+ f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ pass
+
+ def _add_snap_clone(self, track_id, snapname):
+ self.metadata_mgr.add_section("clone snaps")
+ self.metadata_mgr.update_section("clone snaps", track_id, snapname)
+ self.metadata_mgr.flush()
+
+ def _remove_snap_clone(self, track_id):
+ self.metadata_mgr.remove_option("clone snaps", track_id)
+ self.metadata_mgr.flush()
+
+ def attach_snapshot(self, snapname, tgt_subvolume):
+ if not snapname.encode('utf-8') in self.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ try:
+ create_clone_index(self.fs, self.vol_spec)
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ track_idx = index.track(tgt_subvolume.base_path)
+ self._add_snap_clone(track_idx, snapname)
+ except (IndexException, MetadataMgrException) as e:
+ log.warning("error creating clone index: {0}".format(e))
+ raise VolumeException(-errno.EINVAL, "error cloning subvolume")
+
+ def detach_snapshot(self, snapname, track_id):
+ try:
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ index.untrack(track_id)
+ self._remove_snap_clone(track_id)
+ except (IndexException, MetadataMgrException) as e:
+ log.warning("error delining snapshot from clone: {0}".format(e))
+ raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone")
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
new file mode 100644
index 000000000..03085d049
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
@@ -0,0 +1,394 @@
+import os
+import stat
+import uuid
+import errno
+import logging
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
+from .op_sm import SubvolumeOpSm
+from .subvolume_v1 import SubvolumeV1
+from ..template import SubvolumeTemplate
+from ...exception import OpSmException, VolumeException, MetadataMgrException
+from ...fs_util import listdir, create_base_dir
+from ..template import SubvolumeOpType
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV2(SubvolumeV1):
+ """
+ Version 2 subvolumes creates a subvolume with path as follows,
+ volumes/<group-name>/<subvolume-name>/<uuid>/
+
+ The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots
+ of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory,
+ rather than under the <uuid> directory, as is the case of V1 subvolumes.
+
+ - The directory under which user data resides is <uuid>
+ - Snapshots of the subvolume are taken within the <subvolume-name> directory
+ - A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar
+ to V1 subvolumes
+ - On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining
+ the rest of the subvolume and its meta file.
+ - The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of
+ older incarnations of the same subvolume.
+ - V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the
+ snapshot retention feature
+ """
+ VERSION = 2
+
+ @staticmethod
+ def version():
+ return SubvolumeV2.VERSION
+
+ @property
+ def features(self):
+ return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value,
+ SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value,
+ SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value]
+
+ @property
+ def retained(self):
+ try:
+ self.metadata_mgr.refresh()
+ if self.state == SubvolumeStates.STATE_RETAINED:
+ return True
+ return False
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
+ return False
+
+ @property
+ def purgeable(self):
+ if not self.retained or self.list_snapshots() or self.has_pending_purges:
+ return False
+ return True
+
+ @property
+ def has_pending_purges(self):
+ try:
+ return not listdir(self.fs, self.trash_dir) == []
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return False
+ raise
+
+ @property
+ def trash_dir(self):
+ return os.path.join(self.base_path, b".trash")
+
+ def create_trashcan(self):
+ """per subvolume trash directory"""
+ try:
+ self.fs.stat(self.trash_dir)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ try:
+ self.fs.mkdir(self.trash_dir, 0o700)
+ except cephfs.Error as ce:
+ raise VolumeException(-ce.args[0], ce.args[1])
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def mark_subvolume(self):
+ # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
+ # subvolume root is where snapshots would be taken, and hence is the base_path for v2 subvolumes
+ try:
+ # MDS treats this as a noop for already marked subvolume
+ self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ @staticmethod
+ def is_valid_uuid(uuid_str):
+ try:
+ uuid.UUID(uuid_str)
+ return True
+ except ValueError:
+ return False
+
+ def snapshot_base_path(self):
+ return os.path.join(self.base_path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+
+ def snapshot_data_path(self, snapname):
+ snap_base_path = self.snapshot_path(snapname)
+ uuid_str = None
+ try:
+ with self.fs.opendir(snap_base_path) as dir_handle:
+ d = self.fs.readdir(dir_handle)
+ while d:
+ if d.d_name not in (b".", b".."):
+ d_full_path = os.path.join(snap_base_path, d.d_name)
+ stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISDIR(stx.get('mode')):
+ if self.is_valid_uuid(d.d_name.decode('utf-8')):
+ uuid_str = d.d_name
+ d = self.fs.readdir(dir_handle)
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+ if not uuid_str:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+
+ return os.path.join(snap_base_path, uuid_str)
+
+ def _remove_on_failure(self, subvol_path, retained):
+ if retained:
+ log.info("cleaning up subvolume incarnation with path: {0}".format(subvol_path))
+ try:
+ self.fs.rmdir(subvol_path)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove(internal_cleanup=True)
+
+ def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state):
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+
+ retained = self.retained
+ if retained and self.has_pending_purges:
+ raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # create group directory with default mode(0o755) if it doesn't exist.
+ create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE)
+ self.fs.mkdirs(subvol_path, mode)
+ self.mark_subvolume()
+ attrs = {
+ 'uid': uid,
+ 'gid': gid,
+ 'data_pool': pool,
+ 'pool_namespace': self.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata
+ qpath = subvol_path.decode('utf-8')
+ if retained:
+ self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+ self.metadata_mgr.flush()
+ else:
+ self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state)
+
+ # Create the subvolume metadata file which manages auth-ids if it doesn't exist
+ self.auth_mdata_mgr.create_subvolume_metadata_file(self.group.groupname, self.subvolname)
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ self._remove_on_failure(subvol_path, retained)
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ subvolume_type = SubvolumeTypes.TYPE_CLONE
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "clone failed: internal error")
+
+ retained = self.retained
+ if retained and self.has_pending_purges:
+ raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # source snapshot attrs are used to create clone subvolume
+ # attributes of subvolume's content though, are synced during the cloning process.
+ attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
+
+ # The source of the clone may have exceeded its quota limit as
+ # CephFS quotas are imprecise. Cloning such a source may fail if
+ # the quota on the destination is set before starting the clone
+ # copy. So always set the quota on destination after cloning is
+ # successful.
+ attrs["quota"] = None
+
+ # override snapshot pool setting, if one is provided for the clone
+ if pool is not None:
+ attrs["data_pool"] = pool
+ attrs["pool_namespace"] = None
+
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, attrs.get("mode"))
+ self.mark_subvolume()
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata and clone source
+ qpath = subvol_path.decode('utf-8')
+ if retained:
+ self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+ else:
+ self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value)
+ self.add_clone_source(source_volname, source_subvolume, snapname)
+ self.metadata_mgr.flush()
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ self._remove_on_failure(subvol_path, retained)
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def allowed_ops_by_type(self, vol_type):
+ if vol_type == SubvolumeTypes.TYPE_CLONE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_type == SubvolumeTypes.TYPE_NORMAL:
+ return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ return {}
+
+ def allowed_ops_by_state(self, vol_state):
+ if vol_state == SubvolumeStates.STATE_COMPLETE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_state == SubvolumeStates.STATE_RETAINED:
+ return {
+ SubvolumeOpType.REMOVE,
+ SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.LIST,
+ SubvolumeOpType.INFO,
+ SubvolumeOpType.SNAP_REMOVE,
+ SubvolumeOpType.SNAP_LIST,
+ SubvolumeOpType.SNAP_INFO,
+ SubvolumeOpType.SNAP_PROTECT,
+ SubvolumeOpType.SNAP_UNPROTECT,
+ SubvolumeOpType.CLONE_SOURCE
+ }
+
+ return {SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.CLONE_CREATE,
+ SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL,
+ SubvolumeOpType.CLONE_SOURCE}
+
+ def open(self, op_type):
+ if not isinstance(op_type, SubvolumeOpType):
+ raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+ op_type.value, self.subvolname))
+ try:
+ self.metadata_mgr.refresh()
+ # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
+ self.mark_subvolume()
+
+ etype = self.subvol_type
+ if op_type not in self.allowed_ops_by_type(etype):
+ raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+ op_type.value, self.subvolname, etype.value))
+
+ estate = self.state
+ if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format(
+ self.subvolname))
+
+ if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+ self.subvolname, op_type.value))
+
+ if estate != SubvolumeStates.STATE_RETAINED:
+ subvol_path = self.path
+ log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+ st = self.fs.stat(subvol_path)
+
+ self.uid = int(st.st_uid)
+ self.gid = int(st.st_gid)
+ self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(me.args[0], me.args[1])
+ except cephfs.ObjectNotFound:
+ log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+ raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def trash_incarnation_dir(self):
+ """rename subvolume (uuid component) to trash"""
+ self.create_trashcan()
+ try:
+ bname = os.path.basename(self.path)
+ tpath = os.path.join(self.trash_dir, bname)
+ log.debug("trash: {0} -> {1}".format(self.path, tpath))
+ self.fs.rename(self.path, tpath)
+ self._link_dir(tpath, bname)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ @staticmethod
+ def safe_to_remove_subvolume_clone(subvol_state):
+ # Both the STATE_FAILED and STATE_CANCELED are handled by 'handle_clone_failed' in the state
+ # machine which removes the entry from the index. Hence, it's safe to removed clone with
+ # force option for both.
+ acceptable_rm_clone_states = [SubvolumeStates.STATE_COMPLETE, SubvolumeStates.STATE_CANCELED,
+ SubvolumeStates.STATE_FAILED, SubvolumeStates.STATE_RETAINED]
+ if subvol_state not in acceptable_rm_clone_states:
+ return False
+ return True
+
+ def remove(self, retainsnaps=False, internal_cleanup=False):
+ if self.list_snapshots():
+ if not retainsnaps:
+ raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
+ else:
+ if not internal_cleanup and not self.safe_to_remove_subvolume_clone(self.state):
+ raise VolumeException(-errno.EAGAIN,
+ "{0} clone in-progress -- please cancel the clone and retry".format(self.subvolname))
+ if not self.has_pending_purges:
+ self.trash_base_dir()
+ # Delete the volume meta file, if it's not already deleted
+ self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
+ return
+ if self.state != SubvolumeStates.STATE_RETAINED:
+ self.trash_incarnation_dir()
+ self.metadata_mgr.remove_section(MetadataManager.USER_METADATA_SECTION)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
+ self.metadata_mgr.flush()
+ # Delete the volume meta file, if it's not already deleted
+ self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
+
+ def info(self):
+ if self.state != SubvolumeStates.STATE_RETAINED:
+ return super(SubvolumeV2, self).info()
+
+ return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value}
+
+ def remove_snapshot(self, snapname, force=False):
+ super(SubvolumeV2, self).remove_snapshot(snapname, force)
+ if self.purgeable:
+ self.trash_base_dir()
+ # tickle the volume purge job to purge this entry, using ESTALE
+ raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
+ # if not purgeable, subvol is not retained, or has snapshots, or already has purge jobs that will garbage collect this subvol
diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py
new file mode 100644
index 000000000..a79aa55e1
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/volume.py
@@ -0,0 +1,198 @@
+import errno
+import logging
+import os
+
+from typing import List
+
+from contextlib import contextmanager
+
+import orchestrator
+
+from .lock import GlobalLock
+from ..exception import VolumeException
+from ..fs_util import create_pool, remove_pool, create_filesystem, \
+ remove_filesystem, create_mds, volume_exists
+from .trash import Trash
+from mgr_util import open_filesystem, CephfsConnectionException
+
+log = logging.getLogger(__name__)
+
+def gen_pool_names(volname):
+ """
+ return metadata and data pool name (from a filesystem/volume name) as a tuple
+ """
+ return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
+
+def get_mds_map(mgr, volname):
+ """
+ return mdsmap for a volname
+ """
+ mds_map = None
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ return f['mdsmap']
+ return mds_map
+
+def get_pool_names(mgr, volname):
+ """
+ return metadata and data pools (list) names of volume as a tuple
+ """
+ fs_map = mgr.get("fs_map")
+ metadata_pool_id = None
+ data_pool_ids = [] # type: List[int]
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ metadata_pool_id = f['mdsmap']['metadata_pool']
+ data_pool_ids = f['mdsmap']['data_pools']
+ break
+ if metadata_pool_id is None:
+ return None, None
+
+ osdmap = mgr.get("osd_map")
+ pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']])
+ metadata_pool = pools[metadata_pool_id]
+ data_pools = [pools[id] for id in data_pool_ids]
+ return metadata_pool, data_pools
+
+def get_pool_ids(mgr, volname):
+ """
+ return metadata and data pools (list) id of volume as a tuple
+ """
+ fs_map = mgr.get("fs_map")
+ metadata_pool_id = None
+ data_pool_ids = [] # type: List[int]
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ metadata_pool_id = f['mdsmap']['metadata_pool']
+ data_pool_ids = f['mdsmap']['data_pools']
+ break
+ if metadata_pool_id is None:
+ return None, None
+ return metadata_pool_id, data_pool_ids
+
+def create_volume(mgr, volname, placement):
+ """
+ create volume (pool, filesystem and mds)
+ """
+ metadata_pool, data_pool = gen_pool_names(volname)
+ # create pools
+ r, outb, outs = create_pool(mgr, metadata_pool)
+ if r != 0:
+ return r, outb, outs
+ r, outb, outs = create_pool(mgr, data_pool)
+ if r != 0:
+ #cleanup
+ remove_pool(mgr, metadata_pool)
+ return r, outb, outs
+ # create filesystem
+ r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
+ if r != 0:
+ log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
+ #cleanup
+ remove_pool(mgr, data_pool)
+ remove_pool(mgr, metadata_pool)
+ return r, outb, outs
+ return create_mds(mgr, volname, placement)
+
+
+def delete_volume(mgr, volname, metadata_pool, data_pools):
+ """
+ delete the given module (tear down mds, remove filesystem, remove pools)
+ """
+ # Tear down MDS daemons
+ try:
+ completion = mgr.remove_service('mds.' + volname)
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ log.warning("OrchestratorError, not tearing down MDS daemons")
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to tear down MDS daemons")
+ return -errno.EINVAL, "", str(e)
+
+ # In case orchestrator didn't tear down MDS daemons cleanly, or
+ # there was no orchestrator, we force the daemons down.
+ if volume_exists(mgr, volname):
+ r, outb, outs = remove_filesystem(mgr, volname)
+ if r != 0:
+ return r, outb, outs
+ else:
+ err = "Filesystem not found for volume '{0}'".format(volname)
+ log.warning(err)
+ return -errno.ENOENT, "", err
+ r, outb, outs = remove_pool(mgr, metadata_pool)
+ if r != 0:
+ return r, outb, outs
+
+ for data_pool in data_pools:
+ r, outb, outs = remove_pool(mgr, data_pool)
+ if r != 0:
+ return r, outb, outs
+ result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
+ return r, result_str, ""
+
+
+def list_volumes(mgr):
+ """
+ list all filesystem volumes.
+
+ :param: None
+ :return: None
+ """
+ result = []
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ result.append({'name': f['mdsmap']['fs_name']})
+ return result
+
+
+def get_pending_subvol_deletions_count(path):
+ """
+ Get the number of pending subvolumes deletions.
+ """
+ trashdir = os.path.join(path, Trash.GROUP_NAME)
+ try:
+ num_pending_subvol_del = len(os.listdir(trashdir))
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ num_pending_subvol_del = 0
+
+ return {'pending_subvolume_deletions': num_pending_subvol_del}
+
+
+@contextmanager
+def open_volume(vc, volname):
+ """
+ open a volume for exclusive access. This API is to be used as a contextr
+ manager.
+
+ :param vc: volume client instance
+ :param volname: volume name
+ :return: yields a volume handle (ceph filesystem handle)
+ """
+ g_lock = GlobalLock()
+ with g_lock.lock_op():
+ try:
+ with open_filesystem(vc, volname) as fs_handle:
+ yield fs_handle
+ except CephfsConnectionException as ce:
+ raise VolumeException(ce.errno, ce.error_str)
+
+
+@contextmanager
+def open_volume_lockless(vc, volname):
+ """
+ open a volume with shared access. This API is to be used as a context
+ manager.
+
+ :param vc: volume client instance
+ :param volname: volume name
+ :return: yields a volume handle (ceph filesystem handle)
+ """
+ try:
+ with open_filesystem(vc, volname) as fs_handle:
+ yield fs_handle
+ except CephfsConnectionException as ce:
+ raise VolumeException(ce.errno, ce.error_str)