summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/volumes')
-rw-r--r--src/pybind/mgr/volumes/fs/async_cloner.py49
-rw-r--r--src/pybind/mgr/volumes/fs/operations/access.py2
-rw-r--r--src/pybind/mgr/volumes/fs/operations/group.py3
-rw-r--r--src/pybind/mgr/volumes/fs/operations/lock.py4
-rw-r--r--src/pybind/mgr/volumes/fs/operations/pin_util.py3
-rw-r--r--src/pybind/mgr/volumes/fs/operations/trash.py1
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/__init__.py17
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py2
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/op_sm.py2
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py4
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py41
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py25
-rw-r--r--src/pybind/mgr/volumes/fs/operations/volume.py31
-rw-r--r--src/pybind/mgr/volumes/fs/volume.py24
-rw-r--r--src/pybind/mgr/volumes/module.py28
15 files changed, 164 insertions, 72 deletions
diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py
index 95f7d64e1..685b2f03c 100644
--- a/src/pybind/mgr/volumes/fs/async_cloner.py
+++ b/src/pybind/mgr/volumes/fs/async_cloner.py
@@ -191,7 +191,7 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
def set_quota_on_clone(fs_handle, clone_volumes_pair):
src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2])
dst_path = clone_volumes_pair[0].path
- quota = None # type: Optional[int]
+ quota: Optional[int] = None
try:
quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8'))
except cephfs.NoData:
@@ -205,7 +205,7 @@ def set_quota_on_clone(fs_handle, clone_volumes_pair):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
- quota_files = None # type: Optional[int]
+ quota_files: Optional[int] = None
try:
quota_files = int(fs_handle.getxattr(src_path, 'ceph.quota.max_files').decode('utf-8'))
except cephfs.NoData:
@@ -221,19 +221,25 @@ def set_quota_on_clone(fs_handle, clone_volumes_pair):
def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel):
with open_volume_lockless(fs_client, volname) as fs_handle:
- with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
- src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
- dst_path = clone_volumes[0].path
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname,
+ groupname, subvolname) \
+ as (subvol0, subvol1, subvol2):
+ src_path = subvol1.snapshot_data_path(subvol2)
+ dst_path = subvol0.path
+ # XXX: this is where cloning (of subvolume's snapshots) actually
+ # happens.
bulk_copy(fs_handle, src_path, dst_path, should_cancel)
- set_quota_on_clone(fs_handle, clone_volumes)
+ set_quota_on_clone(fs_handle, (subvol0, subvol1, subvol2))
def update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve):
with open_volume_lockless(fs_client, volname) as fs_handle:
- with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname,
+ groupname, subvolname) \
+ as (subvol0, subvol1, subvol2) :
if ve.errno == -errno.EINTR:
- clone_volumes[0].add_clone_failure(-ve.errno, "user interrupted clone operation")
+ subvol0.add_clone_failure(-ve.errno, "user interrupted clone operation")
else:
- clone_volumes[0].add_clone_failure(-ve.errno, ve.error_str)
+ subvol0.add_clone_failure(-ve.errno, ve.error_str)
def log_clone_failure(volname, groupname, subvolname, ve):
if ve.errno == -errno.EINTR:
@@ -261,8 +267,10 @@ def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolnam
try:
with open_volume(fs_client, volname) as fs_handle:
# detach source but leave the clone section intact for later inspection
- with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
- clone_volumes[1].detach_snapshot(clone_volumes[2], index)
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec,
+ volname, groupname, subvolname) \
+ as (subvol0, subvol1, subvol2):
+ subvol1.detach_snapshot(subvol2, index)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
@@ -270,9 +278,11 @@ def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolnam
def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
try:
with open_volume(fs_client, volname) as fs_handle:
- with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
- clone_volumes[1].detach_snapshot(clone_volumes[2], index)
- clone_volumes[0].remove_clone_source(flush=True)
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec,
+ volname, groupname, subvolname) \
+ as (subvol0, subvol1, subvol2):
+ subvol1.detach_snapshot(subvol2, index)
+ subvol0.remove_clone_source(flush=True)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
@@ -287,9 +297,14 @@ def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, st
time.sleep(snapshot_clone_delay)
log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
while not finished:
+ # XXX: this is where request operation is mapped to relevant
+ # function.
handler = state_table.get(current_state, None)
if not handler:
raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
+ # XXX: this is where the requested operation for subvolume's
+ # snapshot clone is performed. the function for the request
+ # operation is run through "handler".
(next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel)
if next_state:
log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
@@ -322,9 +337,10 @@ class Cloner(AsyncJobs):
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
- def __init__(self, volume_client, tp_size, snapshot_clone_delay):
+ def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
+ self.snapshot_clone_no_wait = clone_no_wait
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
@@ -340,6 +356,9 @@ class Cloner(AsyncJobs):
def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout
+ def reconfigure_reject_clones(self, clone_no_wait):
+ self.snapshot_clone_no_wait = clone_no_wait
+
def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
diff --git a/src/pybind/mgr/volumes/fs/operations/access.py b/src/pybind/mgr/volumes/fs/operations/access.py
index 9b7b24316..7e916e955 100644
--- a/src/pybind/mgr/volumes/fs/operations/access.py
+++ b/src/pybind/mgr/volumes/fs/operations/access.py
@@ -4,7 +4,7 @@ from typing import List
def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True):
- caps_list = [] # type: List[str]
+ caps_list: List[str] = []
for k, v in existing_caps['caps'].items():
if k == 'mds' or k == 'osd':
continue
diff --git a/src/pybind/mgr/volumes/fs/operations/group.py b/src/pybind/mgr/volumes/fs/operations/group.py
index 8b4061033..efc10e079 100644
--- a/src/pybind/mgr/volumes/fs/operations/group.py
+++ b/src/pybind/mgr/volumes/fs/operations/group.py
@@ -269,6 +269,9 @@ def remove_group(fs, vol_spec, groupname):
except cephfs.Error as e:
if e.args[0] == errno.ENOENT:
raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname))
+ elif e.args[0] == errno.ENOTEMPTY:
+ raise VolumeException(-errno.ENOTEMPTY, f"subvolume group {groupname} contains subvolume(s) "
+ "or retained snapshots of deleted subvolume(s)")
raise VolumeException(-e.args[0], e.args[1])
diff --git a/src/pybind/mgr/volumes/fs/operations/lock.py b/src/pybind/mgr/volumes/fs/operations/lock.py
index 7ef6923e1..9588ddec1 100644
--- a/src/pybind/mgr/volumes/fs/operations/lock.py
+++ b/src/pybind/mgr/volumes/fs/operations/lock.py
@@ -22,10 +22,10 @@ class GlobalLock(object):
See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf
"""
- _shared_state = {
+ _shared_state: Dict = {
'lock' : Lock(),
'init' : False
- } # type: Dict
+ }
def __init__(self):
with self._shared_state['lock']:
diff --git a/src/pybind/mgr/volumes/fs/operations/pin_util.py b/src/pybind/mgr/volumes/fs/operations/pin_util.py
index 9ea79e546..a12ab5b4d 100644
--- a/src/pybind/mgr/volumes/fs/operations/pin_util.py
+++ b/src/pybind/mgr/volumes/fs/operations/pin_util.py
@@ -1,4 +1,3 @@
-import os
import errno
import cephfs
@@ -25,7 +24,7 @@ def pin(fs, path, pin_type, pin_setting):
try:
pin_setting = _pin_value[pin_type](pin_setting)
- except ValueError as e:
+ except ValueError:
raise VolumeException(-errno.EINVAL, f"pin value wrong type: {pin_setting}")
try:
diff --git a/src/pybind/mgr/volumes/fs/operations/trash.py b/src/pybind/mgr/volumes/fs/operations/trash.py
index 66f1d71cf..d76d43a43 100644
--- a/src/pybind/mgr/volumes/fs/operations/trash.py
+++ b/src/pybind/mgr/volumes/fs/operations/trash.py
@@ -6,7 +6,6 @@ from contextlib import contextmanager
import cephfs
from .template import GroupTemplate
-from ..fs_util import listdir
from ..exception import VolumeException
log = logging.getLogger(__name__)
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
index 544afa165..097620d73 100644
--- a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
+++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
@@ -49,6 +49,18 @@ class SubvolumeLoader(object):
def get_subvolume_object_max(self, mgr, fs, vol_spec, group, subvolname):
return self._get_subvolume_version(self.max_version)(mgr, fs, vol_spec, group, subvolname)
+ def allow_subvolume_upgrade(self, subvolume):
+ asu = True
+ try:
+ opt = subvolume.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_ALLOW_SUBVOLUME_UPGRADE)
+ asu = False if opt == "0" else True
+ except MetadataMgrException:
+ # this key is injected for QA testing and will not be available in
+ # production
+ pass
+
+ return asu
+
def upgrade_to_v2_subvolume(self, subvolume):
# legacy mode subvolumes cannot be upgraded to v2
if subvolume.legacy_mode:
@@ -58,6 +70,9 @@ class SubvolumeLoader(object):
if version >= SubvolumeV2.version():
return
+ if not self.allow_subvolume_upgrade(subvolume):
+ return
+
v1_subvolume = self._get_subvolume_version(version)(subvolume.mgr, subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname)
try:
v1_subvolume.open(SubvolumeOpType.SNAP_LIST)
@@ -83,7 +98,7 @@ class SubvolumeLoader(object):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
qpath = subvolume.base_path.decode('utf-8')
# legacy is only upgradable to v1
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
index 718735d91..610a61e6a 100644
--- a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
+++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
@@ -1,7 +1,6 @@
import os
import errno
import logging
-import sys
import threading
import configparser
import re
@@ -59,6 +58,7 @@ class MetadataManager(object):
GLOBAL_META_KEY_TYPE = "type"
GLOBAL_META_KEY_PATH = "path"
GLOBAL_META_KEY_STATE = "state"
+ GLOBAL_META_KEY_ALLOW_SUBVOLUME_UPGRADE = "allow_subvolume_upgrade"
CLONE_FAILURE_SECTION = "CLONE_FAILURE"
CLONE_FAILURE_META_KEY_ERRNO = "errno"
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
index 1142600cb..93eafb2bd 100644
--- a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
+++ b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
@@ -19,7 +19,7 @@ class TransitionKey(object):
return not(self == other)
class SubvolumeOpSm(object):
- transition_table = {} # type: Dict
+ transition_table: Dict = {}
@staticmethod
def is_complete_state(state):
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
index 3bae0707a..8fbe177e5 100644
--- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
@@ -144,7 +144,7 @@ class SubvolumeBase(object):
try:
self.fs.stat(self.legacy_config_path)
self.legacy_mode = True
- except cephfs.Error as e:
+ except cephfs.Error:
pass
log.debug("loading config "
@@ -160,7 +160,7 @@ class SubvolumeBase(object):
def get_attrs(self, pathname):
# get subvolume attributes
- attrs = {} # type: Dict[str, Union[int, str, None]]
+ attrs: Dict[str, Union[int, str, None]] = {}
stx = self.fs.statx(pathname,
cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID
| cephfs.CEPH_STATX_MODE,
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
index b5a10dd6c..90f35a4c9 100644
--- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
@@ -55,7 +55,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
try:
# no need to stat the path -- open() does that
return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
- except MetadataMgrException as me:
+ except MetadataMgrException:
raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
@property
@@ -68,7 +68,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
- except cephfs.InvalidValue as e:
+ except cephfs.InvalidValue:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
@@ -89,7 +89,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
@@ -98,7 +98,6 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE)
# create directory and set attributes
self.fs.mkdirs(subvol_path, mode)
- self.mark_subvolume()
attrs = {
'uid': uid,
'gid': gid,
@@ -111,6 +110,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
# persist subvolume metadata
qpath = subvol_path.decode('utf-8')
self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
+ self.mark_subvolume()
except (VolumeException, MetadataMgrException, cephfs.Error) as e:
try:
log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
@@ -156,7 +156,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
subvolume_type = SubvolumeTypes.TYPE_CLONE
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
@@ -596,7 +596,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
"""
with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
- auths = [] # type: List[Dict[str,str]]
+ auths: List[Dict[str,str]] = []
if not meta or not meta['auths']:
return auths
@@ -669,7 +669,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
pass
else:
raise
- except MetadataMgrException as me:
+ except MetadataMgrException:
raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
return clone_source
@@ -684,16 +684,16 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
def status(self):
state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
subvolume_type = self.subvol_type
- subvolume_status = {
- 'state' : state.value
- }
- if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
- subvolume_status["source"] = self._get_clone_source()
- if SubvolumeOpSm.is_failed_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
- try:
- subvolume_status["failure"] = self._get_clone_failure()
- except MetadataMgrException:
- pass
+ subvolume_status = {'state' : state.value}
+
+ if subvolume_type == SubvolumeTypes.TYPE_CLONE:
+ if not SubvolumeOpSm.is_complete_state(state):
+ subvolume_status["source"] = self._get_clone_source()
+ if SubvolumeOpSm.is_failed_state(state):
+ try:
+ subvolume_status["failure"] = self._get_clone_failure()
+ except MetadataMgrException:
+ pass
return subvolume_status
@@ -744,7 +744,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
raise
def get_pending_clones(self, snapname):
- pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any]
+ pending_clones_info: Dict[str, Any] = {"has_pending_clones": "no"}
pending_track_id_list = []
pending_clone_list = []
index_path = ""
@@ -777,7 +777,6 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
# If clone is completed between 'list_all_keys_with_specified_values_from_section'
# and readlink(track_id_path) call then readlink will fail with error ENOENT (2)
# Hence we double check whether track_id is exist in .meta file or not.
- value = self.metadata_mgr.get_option('clone snaps', track_id)
# Edge case scenario.
# If track_id for clone exist but path /volumes/_index/clone/{track_id} not found
# then clone is orphan.
@@ -790,7 +789,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
path = Path(link_path.decode('utf-8'))
clone_name = os.path.basename(link_path).decode('utf-8')
group_name = os.path.basename(path.parent.absolute())
- details = {"name": clone_name} # type: Dict[str, str]
+ details = {"name": clone_name}
if group_name != Group.NO_GROUP_NAME:
details["target_group"] = group_name
pending_clone_list.append(details)
@@ -839,7 +838,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
snap_info[key] = self.fs.getxattr(snappath, val)
pending_clones_info = self.get_pending_clones(snapname)
info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
- 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any]
+ 'data_pool': snap_info['data_pool'].decode('utf-8')}
info_dict.update(pending_clones_info);
return info_dict
except cephfs.Error as e:
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
index 03085d049..55d7f945b 100644
--- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
@@ -10,7 +10,6 @@ from .metadata_manager import MetadataManager
from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
from .op_sm import SubvolumeOpSm
from .subvolume_v1 import SubvolumeV1
-from ..template import SubvolumeTemplate
from ...exception import OpSmException, VolumeException, MetadataMgrException
from ...fs_util import listdir, create_base_dir
from ..template import SubvolumeOpType
@@ -99,7 +98,7 @@ class SubvolumeV2(SubvolumeV1):
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0)
- except cephfs.InvalidValue as e:
+ except cephfs.InvalidValue:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
@@ -159,7 +158,7 @@ class SubvolumeV2(SubvolumeV1):
subvolume_type = SubvolumeTypes.TYPE_NORMAL
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
retained = self.retained
@@ -207,7 +206,7 @@ class SubvolumeV2(SubvolumeV1):
subvolume_type = SubvolumeTypes.TYPE_CLONE
try:
initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
- except OpSmException as oe:
+ except OpSmException:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
retained = self.retained
@@ -308,13 +307,17 @@ class SubvolumeV2(SubvolumeV1):
op_type.value, self.subvolname, etype.value))
estate = self.state
- if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED:
- raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format(
- self.subvolname))
-
- if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED:
- raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
- self.subvolname, op_type.value))
+ if op_type not in self.allowed_ops_by_state(estate):
+ if estate == SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(
+ -errno.ENOENT,
+ f'subvolume "{self.subvolname}" is removed and has '
+ 'only snapshots retained')
+ else:
+ raise VolumeException(
+ -errno.EAGAIN,
+ f'subvolume "{self.subvolname}" is not ready for '
+ f'operation "{op_type.value}"')
if estate != SubvolumeStates.STATE_RETAINED:
subvol_path = self.path
diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py
index 395a3fb4e..0bf428271 100644
--- a/src/pybind/mgr/volumes/fs/operations/volume.py
+++ b/src/pybind/mgr/volumes/fs/operations/volume.py
@@ -9,11 +9,12 @@ from contextlib import contextmanager
import orchestrator
from .lock import GlobalLock
-from ..exception import VolumeException
+from ..exception import VolumeException, IndexException
from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
from .trash import Trash
from mgr_util import open_filesystem, CephfsConnectionException
+from .clone_index import open_clone_index
log = logging.getLogger(__name__)
@@ -40,7 +41,7 @@ def get_pool_names(mgr, volname):
"""
fs_map = mgr.get("fs_map")
metadata_pool_id = None
- data_pool_ids = [] # type: List[int]
+ data_pool_ids: List[int] = []
for f in fs_map['filesystems']:
if volname == f['mdsmap']['fs_name']:
metadata_pool_id = f['mdsmap']['metadata_pool']
@@ -61,7 +62,7 @@ def get_pool_ids(mgr, volname):
"""
fs_map = mgr.get("fs_map")
metadata_pool_id = None
- data_pool_ids = [] # type: List[int]
+ data_pool_ids: List[int] = []
for f in fs_map['filesystems']:
if volname == f['mdsmap']['fs_name']:
metadata_pool_id = f['mdsmap']['metadata_pool']
@@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path):
return {'pending_subvolume_deletions': num_pending_subvol_del}
+def get_all_pending_clones_count(self, mgr, vol_spec):
+ pending_clones_cnt = 0
+ index_path = ""
+ fs_map = mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ volname = fs['mdsmap']['fs_name']
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_clone_index(fs_handle, vol_spec) as index:
+ index_path = index.path.decode('utf-8')
+ pending_clones_cnt = pending_clones_cnt \
+ + len(listdir(fs_handle, index_path,
+ filter_entries=None, filter_files=False))
+ except IndexException as e:
+ if e.errno == -errno.ENOENT:
+ continue
+ raise VolumeException(-e.args[0], e.args[1])
+ except VolumeException as ve:
+ log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
+ raise ve
+
+ return pending_clones_cnt
+
+
@contextmanager
def open_volume(vc, volname):
"""
diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py
index 5c6642444..2e96f8306 100644
--- a/src/pybind/mgr/volumes/fs/volume.py
+++ b/src/pybind/mgr/volumes/fs/volume.py
@@ -1,7 +1,6 @@
import json
import errno
import logging
-import os
import mgr_util
from typing import TYPE_CHECKING
@@ -14,13 +13,14 @@ from .fs_util import listdir, has_subdir
from .operations.group import open_group, create_group, remove_group, \
open_group_unique, set_group_attrs
from .operations.volume import create_volume, delete_volume, rename_volume, \
- list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
+ list_volumes, open_volume, get_pool_names, get_pool_ids, \
+ get_pending_subvol_deletions_count, get_all_pending_clones_count
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
-from .operations.trash import Trash
from .vol_spec import VolSpec
-from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .exception import VolumeException, ClusterError, ClusterTimeout, \
+ EvictionError
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
@@ -55,7 +55,8 @@ class VolumeClient(CephfsClient["Module"]):
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
- self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
+ self.mgr.snapshot_clone_no_wait)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
@@ -338,7 +339,7 @@ class VolumeClient(CephfsClient["Module"]):
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
- key = subvolume.evict(volname, authid)
+ subvolume.evict(volname, authid)
ret = 0, "", ""
except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
if isinstance(e, VolumeException):
@@ -424,6 +425,7 @@ class VolumeClient(CephfsClient["Module"]):
subvol_info_dict = subvolume.info()
subvol_info_dict["mon_addrs"] = mon_addr_lst
+ subvol_info_dict["flavor"] = subvolume.VERSION
ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
@@ -695,7 +697,7 @@ class VolumeClient(CephfsClient["Module"]):
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT):
log.warning("snapshot protect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
@@ -710,7 +712,7 @@ class VolumeClient(CephfsClient["Module"]):
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT):
log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
@@ -765,6 +767,10 @@ class VolumeClient(CephfsClient["Module"]):
s_groupname = kwargs['group_name']
try:
+ if self.mgr.snapshot_clone_no_wait and \
+ get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
+ raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))
+
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, s_groupname) as s_group:
with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
@@ -962,7 +968,7 @@ class VolumeClient(CephfsClient["Module"]):
try:
with open_volume(self, volname) as fs_handle:
- with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_group(fs_handle, self.volspec, groupname):
# as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
# at the subvolume group (see: https://tracker.ceph.com/issues/46074)
# group.create_snapshot(snapname)
diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py
index b9c8e7893..8a50baaad 100644
--- a/src/pybind/mgr/volumes/module.py
+++ b/src/pybind/mgr/volumes/module.py
@@ -483,8 +483,13 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
Option(
'snapshot_clone_delay',
type='int',
- default=0,
- desc='Delay clone begin operation by snapshot_clone_delay seconds')
+ default=0,
+ desc='Delay clone begin operation by snapshot_clone_delay seconds'),
+ Option(
+ 'snapshot_clone_no_wait',
+ type='bool',
+ default=True,
+ desc='Reject subvolume clone request when cloner threads are busy')
]
def __init__(self, *args, **kwargs):
@@ -492,6 +497,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
# for mypy
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
+ self.snapshot_clone_no_wait = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
@@ -522,6 +528,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
elif opt['name'] == "snapshot_clone_delay":
self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
+ elif opt['name'] == "snapshot_clone_no_wait":
+ self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
@@ -845,3 +853,19 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
def _cmd_fs_clone_cancel(self, inbuf, cmd):
return self.vc.clone_cancel(
vol_name=cmd['vol_name'], clone_name=cmd['clone_name'], group_name=cmd.get('group_name', None))
+
+ # remote method
+ def subvolume_getpath(self, vol_name, subvol, group_name):
+ return self.vc.subvolume_getpath(vol_name=vol_name,
+ sub_name=subvol,
+ group_name=group_name)
+
+ # remote method
+ def subvolume_ls(self, vol_name, group_name):
+ return self.vc.list_subvolumes(vol_name=vol_name, group_name=group_name)
+
+ # remote method
+ def subvolume_info(self, vol_name, subvol, group_name):
+ return self.vc.subvolume_info(vol_name=vol_name,
+ sub_name=subvol,
+ group_name=group_name)