diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py')
-rw-r--r-- | src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py | 904 |
1 files changed, 904 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py new file mode 100644 index 000000000..b5a10dd6c --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py @@ -0,0 +1,904 @@ +import os +import sys +import stat +import uuid +import errno +import logging +import json +from datetime import datetime +from typing import Any, List, Dict +from pathlib import Path + +import cephfs + +from .metadata_manager import MetadataManager +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures +from .op_sm import SubvolumeOpSm +from .subvolume_base import SubvolumeBase +from ..template import SubvolumeTemplate +from ..snapshot_util import mksnap, rmsnap +from ..access import allow_access, deny_access +from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError +from ...fs_util import listsnaps, is_inherited_snap, create_base_dir +from ..template import SubvolumeOpType +from ..group import Group +from ..rankevicter import RankEvicter +from ..volume import get_mds_map + +from ..clone_index import open_clone_index, create_clone_index + +log = logging.getLogger(__name__) + +class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): + """ + Version 1 subvolumes creates a subvolume with path as follows, + volumes/<group-name>/<subvolume-name>/<uuid>/ + + - The directory under which user data resides is <uuid> + - Snapshots of the subvolume are taken within the <uuid> directory + - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing, + - global information about the subvolume (version, path, type, state) + - snapshots attached to an ongoing clone operation + - clone snapshot source if subvolume is a clone of a snapshot + - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under + /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid> + component in the path. + """ + VERSION = 1 + + @staticmethod + def version(): + return SubvolumeV1.VERSION + + @property + def path(self): + try: + # no need to stat the path -- open() does that + return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8') + except MetadataMgrException as me: + raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") + + @property + def features(self): + return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value] + + def mark_subvolume(self): + # set subvolume attr, on subvolume root, marking it as a CephFS subvolume + # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes + try: + # MDS treats this as a noop for already marked subvolume + self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def snapshot_base_path(self): + """ Base path for all snapshots """ + return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + + def snapshot_path(self, snapname): + """ Path to a specific snapshot named 'snapname' """ + return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8')) + + def snapshot_data_path(self, snapname): + """ Path to user data directory within a subvolume snapshot named 'snapname' """ + return self.snapshot_path(snapname) + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # create group directory with default mode(0o755) if it doesn't exist. + create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE) + # create directory and set attributes + self.fs.mkdirs(subvol_path, mode) + self.mark_subvolume() + attrs = { + 'uid': uid, + 'gid': gid, + 'data_pool': pool, + 'pool_namespace': self.namespace if isolate_nspace else None, + 'quota': size + } + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata + qpath = subvol_path.decode('utf-8') + self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state) + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def add_clone_source(self, volname, subvolume, snapname, flush=False): + self.metadata_mgr.add_section("source") + self.metadata_mgr.update_section("source", "volume", volname) + if not subvolume.group.is_default_group(): + self.metadata_mgr.update_section("source", "group", subvolume.group_name) + self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name) + self.metadata_mgr.update_section("source", "snapshot", snapname) + if flush: + self.metadata_mgr.flush() + + def remove_clone_source(self, flush=False): + self.metadata_mgr.remove_section("source") + if flush: + self.metadata_mgr.flush() + + def add_clone_failure(self, errno, error_msg): + try: + self.metadata_mgr.add_section(MetadataManager.CLONE_FAILURE_SECTION) + self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION, + MetadataManager.CLONE_FAILURE_META_KEY_ERRNO, errno) + self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION, + MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG, error_msg) + self.metadata_mgr.flush() + except MetadataMgrException as me: + log.error(f"Failed to add clone failure status clone={self.subvol_name} group={self.group_name} " + f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + subvolume_type = SubvolumeTypes.TYPE_CLONE + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "clone failed: internal error") + + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # source snapshot attrs are used to create clone subvolume. + # attributes of subvolume's content though, are synced during the cloning process. + attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname)) + + # The source of the clone may have exceeded its quota limit as + # CephFS quotas are imprecise. Cloning such a source may fail if + # the quota on the destination is set before starting the clone + # copy. So always set the quota on destination after cloning is + # successful. + attrs["quota"] = None + + # override snapshot pool setting, if one is provided for the clone + if pool is not None: + attrs["data_pool"] = pool + attrs["pool_namespace"] = None + + # create directory and set attributes + self.fs.mkdirs(subvol_path, attrs.get("mode")) + self.mark_subvolume() + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata and clone source + qpath = subvol_path.decode('utf-8') + self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value) + self.add_clone_source(source_volname, source_subvolume, snapname) + self.metadata_mgr.flush() + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def allowed_ops_by_type(self, vol_type): + if vol_type == SubvolumeTypes.TYPE_CLONE: + return {op_type for op_type in SubvolumeOpType} + + if vol_type == SubvolumeTypes.TYPE_NORMAL: + return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + return {} + + def allowed_ops_by_state(self, vol_state): + if vol_state == SubvolumeStates.STATE_COMPLETE: + return {op_type for op_type in SubvolumeOpType} + + return {SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.CLONE_CREATE, + SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + def open(self, op_type): + if not isinstance(op_type, SubvolumeOpType): + raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( + op_type.value, self.subvolname)) + try: + self.metadata_mgr.refresh() + + etype = self.subvol_type + if op_type not in self.allowed_ops_by_type(etype): + raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( + op_type.value, self.subvolname, etype.value)) + + estate = self.state + if op_type not in self.allowed_ops_by_state(estate): + raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( + self.subvolname, op_type.value)) + + subvol_path = self.path + log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) + st = self.fs.stat(subvol_path) + # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark + self.mark_subvolume() + + self.uid = int(st.st_uid) + self.gid = int(st.st_gid) + self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(me.args[0], me.args[1]) + except cephfs.ObjectNotFound: + log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname)) + raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def _recover_auth_meta(self, auth_id, auth_meta): + """ + Call me after locking the auth meta file. + """ + remove_subvolumes = [] + + for subvol, subvol_data in auth_meta['subvolumes'].items(): + if not subvol_data['dirty']: + continue + + (group_name, subvol_name) = subvol.split('/') + group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME + access_level = subvol_data['access_level'] + + with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name) + + # No SVMeta update indicates that there was no auth update + # in Ceph either. So it's safe to remove corresponding + # partial update in AMeta. + if not subvol_meta or auth_id not in subvol_meta['auths']: + remove_subvolumes.append(subvol) + continue + + want_auth = { + 'access_level': access_level, + 'dirty': False, + } + # SVMeta update looks clean. Ceph auth update must have been + # clean. Update the dirty flag and continue + if subvol_meta['auths'][auth_id] == want_auth: + auth_meta['subvolumes'][subvol]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + continue + + client_entity = "client.{0}".format(auth_id) + ret, out, err = self.mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + if ret == 0: + existing_caps = json.loads(out) + elif ret == -errno.ENOENT: + existing_caps = None + else: + log.error(err) + raise VolumeException(ret, err) + + self._authorize_subvolume(auth_id, access_level, existing_caps) + + # Recovered from partial auth updates for the auth ID's access + # to a subvolume. + auth_meta['subvolumes'][subvol]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + for subvol in remove_subvolumes: + del auth_meta['subvolumes'][subvol] + + if not auth_meta['subvolumes']: + # Clean up auth meta file + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + # Recovered from all partial auth updates for the auth ID. + auth_meta['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False): + """ + Get-or-create a Ceph auth identity for `auth_id` and grant them access + to + :param auth_id: + :param access_level: + :param tenant_id: Optionally provide a stringizable object to + restrict any created cephx IDs to other callers + passing the same tenant ID. + :allow_existing_id: Optionally authorize existing auth-ids not + created by ceph_volume_client. + :return: + """ + + with self.auth_mdata_mgr.auth_lock(auth_id): + client_entity = "client.{0}".format(auth_id) + ret, out, err = self.mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + + if ret == 0: + existing_caps = json.loads(out) + elif ret == -errno.ENOENT: + existing_caps = None + else: + log.error(err) + raise VolumeException(ret, err) + + # Existing meta, or None, to be updated + auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) + + # subvolume data to be inserted + group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None + group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) + subvolume = { + group_subvol_id : { + # The access level at which the auth_id is authorized to + # access the volume. + 'access_level': access_level, + 'dirty': True, + } + } + + if auth_meta is None: + if not allow_existing_id and existing_caps is not None: + msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id) + log.error(msg) + raise VolumeException(-errno.EPERM, msg) + + # non-existent auth IDs + sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( + auth_id, tenant_id + )) + log.debug("Authorize: no existing meta") + auth_meta = { + 'dirty': True, + 'tenant_id': str(tenant_id) if tenant_id else None, + 'subvolumes': subvolume + } + else: + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + # Disallow tenants to share auth IDs + if str(auth_meta['tenant_id']) != str(tenant_id): + msg = "auth ID: {0} is already in use".format(auth_id) + log.error(msg) + raise VolumeException(-errno.EPERM, msg) + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Authorize: existing tenant {tenant}".format( + tenant=auth_meta['tenant_id'] + )) + auth_meta['dirty'] = True + auth_meta['subvolumes'].update(subvolume) + + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + key = self._authorize_subvolume(auth_id, access_level, existing_caps) + + auth_meta['dirty'] = False + auth_meta['subvolumes'][group_subvol_id]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + if tenant_id: + return key + else: + # Caller wasn't multi-tenant aware: be safe and don't give + # them a key + return "" + + def _authorize_subvolume(self, auth_id, access_level, existing_caps): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + + auth = { + auth_id: { + 'access_level': access_level, + 'dirty': True, + } + } + + if subvol_meta is None: + subvol_meta = { + 'auths': auth + } + else: + subvol_meta['auths'].update(auth) + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + key = self._authorize(auth_id, access_level, existing_caps) + + subvol_meta['auths'][auth_id]['dirty'] = False + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + return key + + def _authorize(self, auth_id, access_level, existing_caps): + subvol_path = self.path + log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path)) + + # First I need to work out what the data pool is for this share: + # read the layout + try: + pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + try: + namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + namespace = None + + # Now construct auth capabilities that give the guest just enough + # permissions to access the share + client_entity = "client.{0}".format(auth_id) + want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8')) + want_osd_cap = "allow {0} pool={1}{2}".format( + access_level, pool, " namespace={0}".format(namespace) if namespace else "") + + # Construct auth caps that if present might conflict with the desired + # auth caps. + unwanted_access_level = 'r' if access_level == 'rw' else 'rw' + unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8')) + unwanted_osd_cap = "allow {0} pool={1}{2}".format( + unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "") + + return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap, existing_caps) + + def deauthorize(self, auth_id): + with self.auth_mdata_mgr.auth_lock(auth_id): + # Existing meta, or None, to be updated + auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) + + if auth_meta is None: + msg = "auth ID: {0} doesn't exist".format(auth_id) + log.error(msg) + raise VolumeException(-errno.ENOENT, msg) + + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None + group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) + if (auth_meta is None) or (not auth_meta['subvolumes']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + # Clean up the auth meta file of an auth ID + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + if group_subvol_id not in auth_meta['subvolumes']: + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + return + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + auth_meta['dirty'] = True + auth_meta['subvolumes'][group_subvol_id]['dirty'] = True + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + self._deauthorize_subvolume(auth_id) + + # Filter out the volume we're deauthorizing + del auth_meta['subvolumes'][group_subvol_id] + + # Clean up auth meta file + if not auth_meta['subvolumes']: + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + auth_meta['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + def _deauthorize_subvolume(self, auth_id): + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + + if (subvol_meta is None) or (auth_id not in subvol_meta['auths']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + return + + subvol_meta['auths'][auth_id]['dirty'] = True + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + self._deauthorize(auth_id) + + # Remove the auth_id from the metadata *after* removing it + # from ceph, so that if we crashed here, we would actually + # recreate the auth ID during recovery (i.e. end up with + # a consistent state). + + # Filter out the auth we're removing + del subvol_meta['auths'][auth_id] + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + def _deauthorize(self, auth_id): + """ + The volume must still exist. + """ + client_entity = "client.{0}".format(auth_id) + subvol_path = self.path + try: + pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + try: + namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + namespace = None + + # The auth_id might have read-only or read-write mount access for the + # subvolume path. + access_levels = ('r', 'rw') + want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8')) + for access_level in access_levels] + want_osd_caps = ['allow {0} pool={1}{2}'.format( + access_level, pool_name, " namespace={0}".format(namespace) if namespace else "") + for access_level in access_levels] + deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps) + + def authorized_list(self): + """ + Expose a list of auth IDs that have access to a subvolume. + + return: a list of (auth_id, access_level) tuples, where + the access_level can be 'r' , or 'rw'. + None if no auth ID is given access to the subvolume. + """ + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + auths = [] # type: List[Dict[str,str]] + if not meta or not meta['auths']: + return auths + + for auth, auth_data in meta['auths'].items(): + # Skip partial auth updates. + if not auth_data['dirty']: + auths.append({auth: auth_data['access_level']}) + + return auths + + def evict(self, volname, auth_id, timeout=30): + """ + Evict all clients based on the authorization ID and the subvolume path mounted. + Assumes that the authorization key has been revoked prior to calling this function. + + This operation can throw an exception if the mon cluster is unresponsive, or + any individual MDS daemon is unresponsive for longer than the timeout passed in. + """ + + client_spec = ["auth_name={0}".format(auth_id), ] + client_spec.append("client_metadata.root={0}". + format(self.path.decode('utf-8'))) + + log.info("evict clients with {0}".format(', '.join(client_spec))) + + mds_map = get_mds_map(self.mgr, volname) + if not mds_map: + raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname)) + + up = {} + for name, gid in mds_map['up'].items(): + # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" + assert name.startswith("mds_") + up[int(name[4:])] = gid + + # For all MDS ranks held by a daemon + # Do the parallelism in python instead of using "tell mds.*", because + # the latter doesn't give us per-mds output + threads = [] + for rank, gid in up.items(): + thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + log.info("evict: joined all") + + for t in threads: + if not t.success: + msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". + format(', '.join(client_spec), t.rank, t.gid, t.exception) + ) + log.error(msg) + raise EvictionError(msg) + + def _get_clone_source(self): + try: + clone_source = { + 'volume' : self.metadata_mgr.get_option("source", "volume"), + 'subvolume': self.metadata_mgr.get_option("source", "subvolume"), + 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"), + } + + try: + clone_source["group"] = self.metadata_mgr.get_option("source", "group") + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + pass + else: + raise + except MetadataMgrException as me: + raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") + return clone_source + + def _get_clone_failure(self): + clone_failure = { + 'errno' : self.metadata_mgr.get_option(MetadataManager.CLONE_FAILURE_SECTION, MetadataManager.CLONE_FAILURE_META_KEY_ERRNO), + 'error_msg' : self.metadata_mgr.get_option(MetadataManager.CLONE_FAILURE_SECTION, MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG), + } + return clone_failure + + @property + def status(self): + state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) + subvolume_type = self.subvol_type + subvolume_status = { + 'state' : state.value + } + if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: + subvolume_status["source"] = self._get_clone_source() + if SubvolumeOpSm.is_failed_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: + try: + subvolume_status["failure"] = self._get_clone_failure() + except MetadataMgrException: + pass + + return subvolume_status + + @property + def state(self): + return super(SubvolumeV1, self).state + + @state.setter + def state(self, val): + state = val[0].value + flush = val[1] + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state) + if flush: + self.metadata_mgr.flush() + + def remove(self, retainsnaps=False): + if retainsnaps: + raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname)) + if self.list_snapshots(): + raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname)) + self.trash_base_dir() + + def resize(self, newsize, noshrink): + subvol_path = self.path + return self._resize(subvol_path, newsize, noshrink) + + def create_snapshot(self, snapname): + try: + group_snapshot_path = os.path.join(self.group.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + self.fs.stat(group_snapshot_path) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + snappath = self.snapshot_path(snapname) + mksnap(self.fs, snappath) + else: + raise VolumeException(-e.args[0], e.args[1]) + else: + raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same") + + def has_pending_clones(self, snapname): + try: + return self.metadata_mgr.section_has_item('clone snaps', snapname) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + return False + raise + + def get_pending_clones(self, snapname): + pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any] + pending_track_id_list = [] + pending_clone_list = [] + index_path = "" + orphan_clones_count = 0 + + try: + if self.has_pending_clones(snapname): + pending_track_id_list = self.metadata_mgr.list_all_keys_with_specified_values_from_section('clone snaps', snapname) + else: + return pending_clones_info + except MetadataMgrException as me: + if me.errno != -errno.ENOENT: + raise VolumeException(-me.args[0], me.args[1]) + + try: + with open_clone_index(self.fs, self.vol_spec) as index: + index_path = index.path.decode('utf-8') + except IndexException as e: + log.warning("failed to open clone index '{0}' for snapshot '{1}'".format(e, snapname)) + raise VolumeException(-errno.EINVAL, "failed to open clone index") + + for track_id in pending_track_id_list: + try: + link_path = self.fs.readlink(os.path.join(index_path, track_id), 4096) + except cephfs.Error as e: + if e.errno != errno.ENOENT: + raise VolumeException(-e.args[0], e.args[1]) + else: + try: + # If clone is completed between 'list_all_keys_with_specified_values_from_section' + # and readlink(track_id_path) call then readlink will fail with error ENOENT (2) + # Hence we double check whether track_id is exist in .meta file or not. + value = self.metadata_mgr.get_option('clone snaps', track_id) + # Edge case scenario. + # If track_id for clone exist but path /volumes/_index/clone/{track_id} not found + # then clone is orphan. + orphan_clones_count += 1 + continue + except MetadataMgrException as me: + if me.errno != -errno.ENOENT: + raise VolumeException(-me.args[0], me.args[1]) + + path = Path(link_path.decode('utf-8')) + clone_name = os.path.basename(link_path).decode('utf-8') + group_name = os.path.basename(path.parent.absolute()) + details = {"name": clone_name} # type: Dict[str, str] + if group_name != Group.NO_GROUP_NAME: + details["target_group"] = group_name + pending_clone_list.append(details) + + if len(pending_clone_list) != 0: + pending_clones_info["has_pending_clones"] = "yes" + pending_clones_info["pending_clones"] = pending_clone_list + else: + pending_clones_info["has_pending_clones"] = "no" + + if orphan_clones_count > 0: + pending_clones_info["orphan_clones_count"] = orphan_clones_count + + return pending_clones_info + + def remove_snapshot(self, snapname, force=False): + if self.has_pending_clones(snapname): + raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname)) + snappath = self.snapshot_path(snapname) + try: + self.metadata_mgr.remove_section(self.get_snap_section_name(snapname)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + if force: + log.info(f"Allowing snapshot removal on failure of it's metadata removal with force on " + f"snap={snapname} subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, " + f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + pass + else: + log.error(f"Failed to remove snapshot metadata on snap={snapname} subvol={self.subvol_name} " + f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + raise VolumeException(-errno.EAGAIN, + f"failed to remove snapshot metadata on snap={snapname} reason={me.args[0]} {me.args[1]}") + rmsnap(self.fs, snappath) + + def snapshot_info(self, snapname): + if is_inherited_snap(snapname): + raise VolumeException(-errno.EINVAL, + "snapshot name '{0}' is invalid".format(snapname)) + snappath = self.snapshot_data_path(snapname) + snap_info = {} + try: + snap_attrs = {'created_at':'ceph.snap.btime', + 'data_pool':'ceph.dir.layout.pool'} + for key, val in snap_attrs.items(): + snap_info[key] = self.fs.getxattr(snappath, val) + pending_clones_info = self.get_pending_clones(snapname) + info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))), + 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any] + info_dict.update(pending_clones_info); + return info_dict + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise VolumeException(-errno.ENOENT, + "snapshot '{0}' does not exist".format(snapname)) + raise VolumeException(-e.args[0], e.args[1]) + + def list_snapshots(self): + try: + dirpath = self.snapshot_base_path() + return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return [] + raise + + def clean_stale_snapshot_metadata(self): + """ Clean up stale snapshot metadata """ + if self.metadata_mgr.has_snap_metadata_section(): + snap_list = self.list_snapshots() + snaps_with_metadata_list = self.metadata_mgr.list_snaps_with_metadata() + for snap_with_metadata in snaps_with_metadata_list: + if snap_with_metadata.encode('utf-8') not in snap_list: + try: + self.metadata_mgr.remove_section(self.get_snap_section_name(snap_with_metadata)) + self.metadata_mgr.flush() + except MetadataMgrException as me: + log.error(f"Failed to remove stale snap metadata on snap={snap_with_metadata} " + f"subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, " + f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}") + pass + + def _add_snap_clone(self, track_id, snapname): + self.metadata_mgr.add_section("clone snaps") + self.metadata_mgr.update_section("clone snaps", track_id, snapname) + self.metadata_mgr.flush() + + def _remove_snap_clone(self, track_id): + self.metadata_mgr.remove_option("clone snaps", track_id) + self.metadata_mgr.flush() + + def attach_snapshot(self, snapname, tgt_subvolume): + if not snapname.encode('utf-8') in self.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + try: + create_clone_index(self.fs, self.vol_spec) + with open_clone_index(self.fs, self.vol_spec) as index: + track_idx = index.track(tgt_subvolume.base_path) + self._add_snap_clone(track_idx, snapname) + except (IndexException, MetadataMgrException) as e: + log.warning("error creating clone index: {0}".format(e)) + raise VolumeException(-errno.EINVAL, "error cloning subvolume") + + def detach_snapshot(self, snapname, track_id): + try: + with open_clone_index(self.fs, self.vol_spec) as index: + index.untrack(track_id) + self._remove_snap_clone(track_id) + except (IndexException, MetadataMgrException) as e: + log.warning("error delining snapshot from clone: {0}".format(e)) + raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone") |