From 17d6a993fc17d533460c5f40f3908c708e057c18 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 18:45:17 +0200 Subject: Merging upstream version 18.2.3. Signed-off-by: Daniel Baumann --- src/pybind/mgr/volumes/fs/async_cloner.py | 49 +++++++++++++++------- src/pybind/mgr/volumes/fs/operations/access.py | 2 +- src/pybind/mgr/volumes/fs/operations/group.py | 3 ++ src/pybind/mgr/volumes/fs/operations/lock.py | 4 +- src/pybind/mgr/volumes/fs/operations/pin_util.py | 3 +- src/pybind/mgr/volumes/fs/operations/trash.py | 1 - .../mgr/volumes/fs/operations/versions/__init__.py | 17 +++++++- .../fs/operations/versions/metadata_manager.py | 2 +- .../mgr/volumes/fs/operations/versions/op_sm.py | 2 +- .../fs/operations/versions/subvolume_base.py | 4 +- .../volumes/fs/operations/versions/subvolume_v1.py | 41 +++++++++--------- .../volumes/fs/operations/versions/subvolume_v2.py | 25 ++++++----- src/pybind/mgr/volumes/fs/operations/volume.py | 31 ++++++++++++-- src/pybind/mgr/volumes/fs/volume.py | 24 +++++++---- 14 files changed, 138 insertions(+), 70 deletions(-) (limited to 'src/pybind/mgr/volumes/fs') diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 95f7d64e1..685b2f03c 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -191,7 +191,7 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel): def set_quota_on_clone(fs_handle, clone_volumes_pair): src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2]) dst_path = clone_volumes_pair[0].path - quota = None # type: Optional[int] + quota: Optional[int] = None try: quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8')) except cephfs.NoData: @@ -205,7 +205,7 @@ def set_quota_on_clone(fs_handle, clone_volumes_pair): except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) - quota_files = None # type: Optional[int] + quota_files: Optional[int] = None try: quota_files = int(fs_handle.getxattr(src_path, 'ceph.quota.max_files').decode('utf-8')) except cephfs.NoData: @@ -221,19 +221,25 @@ def set_quota_on_clone(fs_handle, clone_volumes_pair): def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel): with open_volume_lockless(fs_client, volname) as fs_handle: - with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: - src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) - dst_path = clone_volumes[0].path + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, + groupname, subvolname) \ + as (subvol0, subvol1, subvol2): + src_path = subvol1.snapshot_data_path(subvol2) + dst_path = subvol0.path + # XXX: this is where cloning (of subvolume's snapshots) actually + # happens. bulk_copy(fs_handle, src_path, dst_path, should_cancel) - set_quota_on_clone(fs_handle, clone_volumes) + set_quota_on_clone(fs_handle, (subvol0, subvol1, subvol2)) def update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve): with open_volume_lockless(fs_client, volname) as fs_handle: - with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, + groupname, subvolname) \ + as (subvol0, subvol1, subvol2) : if ve.errno == -errno.EINTR: - clone_volumes[0].add_clone_failure(-ve.errno, "user interrupted clone operation") + subvol0.add_clone_failure(-ve.errno, "user interrupted clone operation") else: - clone_volumes[0].add_clone_failure(-ve.errno, ve.error_str) + subvol0.add_clone_failure(-ve.errno, ve.error_str) def log_clone_failure(volname, groupname, subvolname, ve): if ve.errno == -errno.EINTR: @@ -261,8 +267,10 @@ def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolnam try: with open_volume(fs_client, volname) as fs_handle: # detach source but leave the clone section intact for later inspection - with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: - clone_volumes[1].detach_snapshot(clone_volumes[2], index) + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, + volname, groupname, subvolname) \ + as (subvol0, subvol1, subvol2): + subvol1.detach_snapshot(subvol2, index) except (MetadataMgrException, VolumeException) as e: log.error("failed to detach clone from snapshot: {0}".format(e)) return (None, True) @@ -270,9 +278,11 @@ def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolnam def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): try: with open_volume(fs_client, volname) as fs_handle: - with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: - clone_volumes[1].detach_snapshot(clone_volumes[2], index) - clone_volumes[0].remove_clone_source(flush=True) + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, + volname, groupname, subvolname) \ + as (subvol0, subvol1, subvol2): + subvol1.detach_snapshot(subvol2, index) + subvol0.remove_clone_source(flush=True) except (MetadataMgrException, VolumeException) as e: log.error("failed to detach clone from snapshot: {0}".format(e)) return (None, True) @@ -287,9 +297,14 @@ def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, st time.sleep(snapshot_clone_delay) log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay)) while not finished: + # XXX: this is where request operation is mapped to relevant + # function. handler = state_table.get(current_state, None) if not handler: raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) + # XXX: this is where the requested operation for subvolume's + # snapshot clone is performed. the function for the request + # operation is run through "handler". (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel) if next_state: log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ @@ -322,9 +337,10 @@ class Cloner(AsyncJobs): this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as the driver. file types supported are directories, symbolic links and regular files. """ - def __init__(self, volume_client, tp_size, snapshot_clone_delay): + def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait): self.vc = volume_client self.snapshot_clone_delay = snapshot_clone_delay + self.snapshot_clone_no_wait = clone_no_wait self.state_table = { SubvolumeStates.STATE_PENDING : handle_clone_pending, SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, @@ -340,6 +356,9 @@ class Cloner(AsyncJobs): def reconfigure_snapshot_clone_delay(self, timeout): self.snapshot_clone_delay = timeout + def reconfigure_reject_clones(self, clone_no_wait): + self.snapshot_clone_no_wait = clone_no_wait + def is_clone_cancelable(self, clone_state): return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/operations/access.py b/src/pybind/mgr/volumes/fs/operations/access.py index 9b7b24316..7e916e955 100644 --- a/src/pybind/mgr/volumes/fs/operations/access.py +++ b/src/pybind/mgr/volumes/fs/operations/access.py @@ -4,7 +4,7 @@ from typing import List def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True): - caps_list = [] # type: List[str] + caps_list: List[str] = [] for k, v in existing_caps['caps'].items(): if k == 'mds' or k == 'osd': continue diff --git a/src/pybind/mgr/volumes/fs/operations/group.py b/src/pybind/mgr/volumes/fs/operations/group.py index 8b4061033..efc10e079 100644 --- a/src/pybind/mgr/volumes/fs/operations/group.py +++ b/src/pybind/mgr/volumes/fs/operations/group.py @@ -269,6 +269,9 @@ def remove_group(fs, vol_spec, groupname): except cephfs.Error as e: if e.args[0] == errno.ENOENT: raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname)) + elif e.args[0] == errno.ENOTEMPTY: + raise VolumeException(-errno.ENOTEMPTY, f"subvolume group {groupname} contains subvolume(s) " + "or retained snapshots of deleted subvolume(s)") raise VolumeException(-e.args[0], e.args[1]) diff --git a/src/pybind/mgr/volumes/fs/operations/lock.py b/src/pybind/mgr/volumes/fs/operations/lock.py index 7ef6923e1..9588ddec1 100644 --- a/src/pybind/mgr/volumes/fs/operations/lock.py +++ b/src/pybind/mgr/volumes/fs/operations/lock.py @@ -22,10 +22,10 @@ class GlobalLock(object): See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf """ - _shared_state = { + _shared_state: Dict = { 'lock' : Lock(), 'init' : False - } # type: Dict + } def __init__(self): with self._shared_state['lock']: diff --git a/src/pybind/mgr/volumes/fs/operations/pin_util.py b/src/pybind/mgr/volumes/fs/operations/pin_util.py index 9ea79e546..a12ab5b4d 100644 --- a/src/pybind/mgr/volumes/fs/operations/pin_util.py +++ b/src/pybind/mgr/volumes/fs/operations/pin_util.py @@ -1,4 +1,3 @@ -import os import errno import cephfs @@ -25,7 +24,7 @@ def pin(fs, path, pin_type, pin_setting): try: pin_setting = _pin_value[pin_type](pin_setting) - except ValueError as e: + except ValueError: raise VolumeException(-errno.EINVAL, f"pin value wrong type: {pin_setting}") try: diff --git a/src/pybind/mgr/volumes/fs/operations/trash.py b/src/pybind/mgr/volumes/fs/operations/trash.py index 66f1d71cf..d76d43a43 100644 --- a/src/pybind/mgr/volumes/fs/operations/trash.py +++ b/src/pybind/mgr/volumes/fs/operations/trash.py @@ -6,7 +6,6 @@ from contextlib import contextmanager import cephfs from .template import GroupTemplate -from ..fs_util import listdir from ..exception import VolumeException log = logging.getLogger(__name__) diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py index 544afa165..097620d73 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py @@ -49,6 +49,18 @@ class SubvolumeLoader(object): def get_subvolume_object_max(self, mgr, fs, vol_spec, group, subvolname): return self._get_subvolume_version(self.max_version)(mgr, fs, vol_spec, group, subvolname) + def allow_subvolume_upgrade(self, subvolume): + asu = True + try: + opt = subvolume.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_ALLOW_SUBVOLUME_UPGRADE) + asu = False if opt == "0" else True + except MetadataMgrException: + # this key is injected for QA testing and will not be available in + # production + pass + + return asu + def upgrade_to_v2_subvolume(self, subvolume): # legacy mode subvolumes cannot be upgraded to v2 if subvolume.legacy_mode: @@ -58,6 +70,9 @@ class SubvolumeLoader(object): if version >= SubvolumeV2.version(): return + if not self.allow_subvolume_upgrade(subvolume): + return + v1_subvolume = self._get_subvolume_version(version)(subvolume.mgr, subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname) try: v1_subvolume.open(SubvolumeOpType.SNAP_LIST) @@ -83,7 +98,7 @@ class SubvolumeLoader(object): subvolume_type = SubvolumeTypes.TYPE_NORMAL try: initial_state = SubvolumeOpSm.get_init_state(subvolume_type) - except OpSmException as oe: + except OpSmException: raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") qpath = subvolume.base_path.decode('utf-8') # legacy is only upgradable to v1 diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py index 718735d91..610a61e6a 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py @@ -1,7 +1,6 @@ import os import errno import logging -import sys import threading import configparser import re @@ -59,6 +58,7 @@ class MetadataManager(object): GLOBAL_META_KEY_TYPE = "type" GLOBAL_META_KEY_PATH = "path" GLOBAL_META_KEY_STATE = "state" + GLOBAL_META_KEY_ALLOW_SUBVOLUME_UPGRADE = "allow_subvolume_upgrade" CLONE_FAILURE_SECTION = "CLONE_FAILURE" CLONE_FAILURE_META_KEY_ERRNO = "errno" diff --git a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py index 1142600cb..93eafb2bd 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py @@ -19,7 +19,7 @@ class TransitionKey(object): return not(self == other) class SubvolumeOpSm(object): - transition_table = {} # type: Dict + transition_table: Dict = {} @staticmethod def is_complete_state(state): diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py index 3bae0707a..8fbe177e5 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py @@ -144,7 +144,7 @@ class SubvolumeBase(object): try: self.fs.stat(self.legacy_config_path) self.legacy_mode = True - except cephfs.Error as e: + except cephfs.Error: pass log.debug("loading config " @@ -160,7 +160,7 @@ class SubvolumeBase(object): def get_attrs(self, pathname): # get subvolume attributes - attrs = {} # type: Dict[str, Union[int, str, None]] + attrs: Dict[str, Union[int, str, None]] = {} stx = self.fs.statx(pathname, cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE, diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py index b5a10dd6c..90f35a4c9 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py @@ -55,7 +55,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): try: # no need to stat the path -- open() does that return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8') - except MetadataMgrException as me: + except MetadataMgrException: raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") @property @@ -68,7 +68,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): try: # MDS treats this as a noop for already marked subvolume self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0) - except cephfs.InvalidValue as e: + except cephfs.InvalidValue: raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) @@ -89,7 +89,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): subvolume_type = SubvolumeTypes.TYPE_NORMAL try: initial_state = SubvolumeOpSm.get_init_state(subvolume_type) - except OpSmException as oe: + except OpSmException: raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) @@ -98,7 +98,6 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE) # create directory and set attributes self.fs.mkdirs(subvol_path, mode) - self.mark_subvolume() attrs = { 'uid': uid, 'gid': gid, @@ -111,6 +110,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): # persist subvolume metadata qpath = subvol_path.decode('utf-8') self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state) + self.mark_subvolume() except (VolumeException, MetadataMgrException, cephfs.Error) as e: try: log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) @@ -156,7 +156,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): subvolume_type = SubvolumeTypes.TYPE_CLONE try: initial_state = SubvolumeOpSm.get_init_state(subvolume_type) - except OpSmException as oe: + except OpSmException: raise VolumeException(-errno.EINVAL, "clone failed: internal error") subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) @@ -596,7 +596,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): """ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) - auths = [] # type: List[Dict[str,str]] + auths: List[Dict[str,str]] = [] if not meta or not meta['auths']: return auths @@ -669,7 +669,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): pass else: raise - except MetadataMgrException as me: + except MetadataMgrException: raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") return clone_source @@ -684,16 +684,16 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): def status(self): state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) subvolume_type = self.subvol_type - subvolume_status = { - 'state' : state.value - } - if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: - subvolume_status["source"] = self._get_clone_source() - if SubvolumeOpSm.is_failed_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: - try: - subvolume_status["failure"] = self._get_clone_failure() - except MetadataMgrException: - pass + subvolume_status = {'state' : state.value} + + if subvolume_type == SubvolumeTypes.TYPE_CLONE: + if not SubvolumeOpSm.is_complete_state(state): + subvolume_status["source"] = self._get_clone_source() + if SubvolumeOpSm.is_failed_state(state): + try: + subvolume_status["failure"] = self._get_clone_failure() + except MetadataMgrException: + pass return subvolume_status @@ -744,7 +744,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): raise def get_pending_clones(self, snapname): - pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any] + pending_clones_info: Dict[str, Any] = {"has_pending_clones": "no"} pending_track_id_list = [] pending_clone_list = [] index_path = "" @@ -777,7 +777,6 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): # If clone is completed between 'list_all_keys_with_specified_values_from_section' # and readlink(track_id_path) call then readlink will fail with error ENOENT (2) # Hence we double check whether track_id is exist in .meta file or not. - value = self.metadata_mgr.get_option('clone snaps', track_id) # Edge case scenario. # If track_id for clone exist but path /volumes/_index/clone/{track_id} not found # then clone is orphan. @@ -790,7 +789,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): path = Path(link_path.decode('utf-8')) clone_name = os.path.basename(link_path).decode('utf-8') group_name = os.path.basename(path.parent.absolute()) - details = {"name": clone_name} # type: Dict[str, str] + details = {"name": clone_name} if group_name != Group.NO_GROUP_NAME: details["target_group"] = group_name pending_clone_list.append(details) @@ -839,7 +838,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): snap_info[key] = self.fs.getxattr(snappath, val) pending_clones_info = self.get_pending_clones(snapname) info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))), - 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any] + 'data_pool': snap_info['data_pool'].decode('utf-8')} info_dict.update(pending_clones_info); return info_dict except cephfs.Error as e: diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py index 03085d049..55d7f945b 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py @@ -10,7 +10,6 @@ from .metadata_manager import MetadataManager from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures from .op_sm import SubvolumeOpSm from .subvolume_v1 import SubvolumeV1 -from ..template import SubvolumeTemplate from ...exception import OpSmException, VolumeException, MetadataMgrException from ...fs_util import listdir, create_base_dir from ..template import SubvolumeOpType @@ -99,7 +98,7 @@ class SubvolumeV2(SubvolumeV1): try: # MDS treats this as a noop for already marked subvolume self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0) - except cephfs.InvalidValue as e: + except cephfs.InvalidValue: raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) @@ -159,7 +158,7 @@ class SubvolumeV2(SubvolumeV1): subvolume_type = SubvolumeTypes.TYPE_NORMAL try: initial_state = SubvolumeOpSm.get_init_state(subvolume_type) - except OpSmException as oe: + except OpSmException: raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") retained = self.retained @@ -207,7 +206,7 @@ class SubvolumeV2(SubvolumeV1): subvolume_type = SubvolumeTypes.TYPE_CLONE try: initial_state = SubvolumeOpSm.get_init_state(subvolume_type) - except OpSmException as oe: + except OpSmException: raise VolumeException(-errno.EINVAL, "clone failed: internal error") retained = self.retained @@ -308,13 +307,17 @@ class SubvolumeV2(SubvolumeV1): op_type.value, self.subvolname, etype.value)) estate = self.state - if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED: - raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format( - self.subvolname)) - - if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED: - raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( - self.subvolname, op_type.value)) + if op_type not in self.allowed_ops_by_state(estate): + if estate == SubvolumeStates.STATE_RETAINED: + raise VolumeException( + -errno.ENOENT, + f'subvolume "{self.subvolname}" is removed and has ' + 'only snapshots retained') + else: + raise VolumeException( + -errno.EAGAIN, + f'subvolume "{self.subvolname}" is not ready for ' + f'operation "{op_type.value}"') if estate != SubvolumeStates.STATE_RETAINED: subvol_path = self.path diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py index 395a3fb4e..0bf428271 100644 --- a/src/pybind/mgr/volumes/fs/operations/volume.py +++ b/src/pybind/mgr/volumes/fs/operations/volume.py @@ -9,11 +9,12 @@ from contextlib import contextmanager import orchestrator from .lock import GlobalLock -from ..exception import VolumeException +from ..exception import VolumeException, IndexException from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \ remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir from .trash import Trash from mgr_util import open_filesystem, CephfsConnectionException +from .clone_index import open_clone_index log = logging.getLogger(__name__) @@ -40,7 +41,7 @@ def get_pool_names(mgr, volname): """ fs_map = mgr.get("fs_map") metadata_pool_id = None - data_pool_ids = [] # type: List[int] + data_pool_ids: List[int] = [] for f in fs_map['filesystems']: if volname == f['mdsmap']['fs_name']: metadata_pool_id = f['mdsmap']['metadata_pool'] @@ -61,7 +62,7 @@ def get_pool_ids(mgr, volname): """ fs_map = mgr.get("fs_map") metadata_pool_id = None - data_pool_ids = [] # type: List[int] + data_pool_ids: List[int] = [] for f in fs_map['filesystems']: if volname == f['mdsmap']['fs_name']: metadata_pool_id = f['mdsmap']['metadata_pool'] @@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path): return {'pending_subvolume_deletions': num_pending_subvol_del} +def get_all_pending_clones_count(self, mgr, vol_spec): + pending_clones_cnt = 0 + index_path = "" + fs_map = mgr.get('fs_map') + for fs in fs_map['filesystems']: + volname = fs['mdsmap']['fs_name'] + try: + with open_volume(self, volname) as fs_handle: + with open_clone_index(fs_handle, vol_spec) as index: + index_path = index.path.decode('utf-8') + pending_clones_cnt = pending_clones_cnt \ + + len(listdir(fs_handle, index_path, + filter_entries=None, filter_files=False)) + except IndexException as e: + if e.errno == -errno.ENOENT: + continue + raise VolumeException(-e.args[0], e.args[1]) + except VolumeException as ve: + log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) + raise ve + + return pending_clones_cnt + + @contextmanager def open_volume(vc, volname): """ diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 5c6642444..2e96f8306 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -1,7 +1,6 @@ import json import errno import logging -import os import mgr_util from typing import TYPE_CHECKING @@ -14,13 +13,14 @@ from .fs_util import listdir, has_subdir from .operations.group import open_group, create_group, remove_group, \ open_group_unique, set_group_attrs from .operations.volume import create_volume, delete_volume, rename_volume, \ - list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count + list_volumes, open_volume, get_pool_names, get_pool_ids, \ + get_pending_subvol_deletions_count, get_all_pending_clones_count from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ create_clone -from .operations.trash import Trash from .vol_spec import VolSpec -from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError +from .exception import VolumeException, ClusterError, ClusterTimeout, \ + EvictionError from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin from .operations.template import SubvolumeOpType @@ -55,7 +55,8 @@ class VolumeClient(CephfsClient["Module"]): super().__init__(mgr) # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) - self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay, + self.mgr.snapshot_clone_no_wait) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the @@ -338,7 +339,7 @@ class VolumeClient(CephfsClient["Module"]): with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume: - key = subvolume.evict(volname, authid) + subvolume.evict(volname, authid) ret = 0, "", "" except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e: if isinstance(e, VolumeException): @@ -424,6 +425,7 @@ class VolumeClient(CephfsClient["Module"]): subvol_info_dict = subvolume.info() subvol_info_dict["mon_addrs"] = mon_addr_lst + subvol_info_dict["flavor"] = subvolume.VERSION ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), "" except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -695,7 +697,7 @@ class VolumeClient(CephfsClient["Module"]): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT): log.warning("snapshot protect call is deprecated and will be removed in a future release") except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -710,7 +712,7 @@ class VolumeClient(CephfsClient["Module"]): try: with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, groupname) as group: - with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT): log.warning("snapshot unprotect call is deprecated and will be removed in a future release") except VolumeException as ve: ret = self.volume_exception_to_retval(ve) @@ -765,6 +767,10 @@ class VolumeClient(CephfsClient["Module"]): s_groupname = kwargs['group_name'] try: + if self.mgr.snapshot_clone_no_wait and \ + get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones: + raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later")) + with open_volume(self, volname) as fs_handle: with open_group(fs_handle, self.volspec, s_groupname) as s_group: with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: @@ -962,7 +968,7 @@ class VolumeClient(CephfsClient["Module"]): try: with open_volume(self, volname) as fs_handle: - with open_group(fs_handle, self.volspec, groupname) as group: + with open_group(fs_handle, self.volspec, groupname): # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots # at the subvolume group (see: https://tracker.ceph.com/issues/46074) # group.create_snapshot(snapname) -- cgit v1.2.3