From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/volumes/fs/async_cloner.py | 413 ++++++++++++++++++++++++++++++ 1 file changed, 413 insertions(+) create mode 100644 src/pybind/mgr/volumes/fs/async_cloner.py (limited to 'src/pybind/mgr/volumes/fs/async_cloner.py') diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py new file mode 100644 index 000000000..95f7d64e1 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -0,0 +1,413 @@ +import os +import stat +import time +import errno +import logging +from contextlib import contextmanager +from typing import Optional + +import cephfs +from mgr_util import lock_timeout_log + +from .async_job import AsyncJobs +from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException +from .fs_util import copy_file +from .operations.versions.op_sm import SubvolumeOpSm +from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions +from .operations.resolver import resolve +from .operations.volume import open_volume, open_volume_lockless +from .operations.group import open_group +from .operations.subvolume import open_subvol +from .operations.clone_index import open_clone_index +from .operations.template import SubvolumeOpType + +log = logging.getLogger(__name__) + +# helper for fetching a clone entry for a given volume +def get_next_clone_entry(fs_client, volspec, volname, running_jobs): + log.debug("fetching clone entry for volume '{0}'".format(volname)) + + try: + with open_volume_lockless(fs_client, volname) as fs_handle: + try: + with open_clone_index(fs_handle, volspec) as clone_index: + job = clone_index.get_oldest_clone_entry(running_jobs) + return 0, job + except IndexException as ve: + if ve.errno == -errno.ENOENT: + return 0, None + raise ve + except VolumeException as ve: + log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) + return ve.errno, None + +@contextmanager +def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type): + with open_volume(fs_client, volname) as fs_handle: + with open_group(fs_handle, volspec, groupname) as group: + with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: + yield subvolume + +@contextmanager +def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type): + with open_group(fs_handle, volspec, groupname) as group: + with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: + yield subvolume + +@contextmanager +def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): + # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return + # the clone subvolume as the source subvolume + if s_groupname == c_groupname and s_subvolname == c_subvolname: + yield c_subvolume + else: + with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume: + yield s_subvolume + + +@contextmanager +def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname): + with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: + s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) + if groupname == s_groupname and subvolname == s_subvolname: + # use the same subvolume to avoid metadata overwrites + yield (clone_subvolume, clone_subvolume, s_snapname) + else: + with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: + yield (clone_subvolume, source_subvolume, s_snapname) + +def get_clone_state(fs_client, volspec, volname, groupname, subvolname): + with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: + return subvolume.state + +def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state): + with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: + subvolume.state = (state, True) + +def get_clone_source(clone_subvolume): + source = clone_subvolume._get_clone_source() + return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) + +def get_next_state_on_error(errnum): + if errnum == -errno.EINTR: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_CANCELLED) + else: + # jump to failed state, on all other errors + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_FAILED) + return next_state + +def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): + try: + if should_cancel(): + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_CANCELLED) + update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, + VolumeException(-errno.EINTR, "user interrupted clone operation")) + else: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_SUCCESS) + except OpSmException as oe: + raise VolumeException(oe.errno, oe.error_str) + return (next_state, False) + +def sync_attrs(fs_handle, target_path, source_statx): + try: + fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"]) + fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()), + time.mktime(source_statx["mtime"].timetuple()))) + fs_handle.lchmod(target_path, source_statx["mode"]) + except cephfs.Error as e: + log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e)) + raise e + +def bulk_copy(fs_handle, source_path, dst_path, should_cancel): + """ + bulk copy data from source to destination -- only directories, symlinks + and regular files are synced. + """ + log.info("copying data from {0} to {1}".format(source_path, dst_path)) + def cptree(src_root_path, dst_root_path): + log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) + try: + with fs_handle.opendir(src_root_path) as dir_handle: + d = fs_handle.readdir(dir_handle) + while d and not should_cancel(): + if d.d_name not in (b".", b".."): + log.debug("d={0}".format(d)) + d_full_src = os.path.join(src_root_path, d.d_name) + d_full_dst = os.path.join(dst_root_path, d.d_name) + stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE | + cephfs.CEPH_STATX_UID | + cephfs.CEPH_STATX_GID | + cephfs.CEPH_STATX_ATIME | + cephfs.CEPH_STATX_MTIME | + cephfs.CEPH_STATX_SIZE, + cephfs.AT_SYMLINK_NOFOLLOW) + handled = True + mo = stx["mode"] & ~stat.S_IFMT(stx["mode"]) + if stat.S_ISDIR(stx["mode"]): + log.debug("cptree: (DIR) {0}".format(d_full_src)) + try: + fs_handle.mkdir(d_full_dst, mo) + except cephfs.Error as e: + if not e.args[0] == errno.EEXIST: + raise + cptree(d_full_src, d_full_dst) + elif stat.S_ISLNK(stx["mode"]): + log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) + target = fs_handle.readlink(d_full_src, 4096) + try: + fs_handle.symlink(target[:stx["size"]], d_full_dst) + except cephfs.Error as e: + if not e.args[0] == errno.EEXIST: + raise + elif stat.S_ISREG(stx["mode"]): + log.debug("cptree: (REG) {0}".format(d_full_src)) + copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel) + else: + handled = False + log.warning("cptree: (IGNORE) {0}".format(d_full_src)) + if handled: + sync_attrs(fs_handle, d_full_dst, stx) + d = fs_handle.readdir(dir_handle) + stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME | + cephfs.CEPH_STATX_MTIME, + cephfs.AT_SYMLINK_NOFOLLOW) + fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()), + time.mktime(stx_root["mtime"].timetuple()))) + except cephfs.Error as e: + if not e.args[0] == errno.ENOENT: + raise VolumeException(-e.args[0], e.args[1]) + cptree(source_path, dst_path) + if should_cancel(): + raise VolumeException(-errno.EINTR, "user interrupted clone operation") + +def set_quota_on_clone(fs_handle, clone_volumes_pair): + src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2]) + dst_path = clone_volumes_pair[0].path + quota = None # type: Optional[int] + try: + quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + pass + + if quota is not None: + try: + fs_handle.setxattr(dst_path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + quota_files = None # type: Optional[int] + try: + quota_files = int(fs_handle.getxattr(src_path, 'ceph.quota.max_files').decode('utf-8')) + except cephfs.NoData: + pass + + if quota_files is not None: + try: + fs_handle.setxattr(dst_path, 'ceph.quota.max_files', str(quota_files).encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, "invalid file count specified: '{0}'".format(quota_files)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel): + with open_volume_lockless(fs_client, volname) as fs_handle: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: + src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) + dst_path = clone_volumes[0].path + bulk_copy(fs_handle, src_path, dst_path, should_cancel) + set_quota_on_clone(fs_handle, clone_volumes) + +def update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve): + with open_volume_lockless(fs_client, volname) as fs_handle: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: + if ve.errno == -errno.EINTR: + clone_volumes[0].add_clone_failure(-ve.errno, "user interrupted clone operation") + else: + clone_volumes[0].add_clone_failure(-ve.errno, ve.error_str) + +def log_clone_failure(volname, groupname, subvolname, ve): + if ve.errno == -errno.EINTR: + log.info("Clone cancelled: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + elif ve.errno == -errno.EDQUOT: + log.error("Clone failed: ({0}, {1}, {2}, reason -> Disk quota exceeded)".format(volname, groupname, subvolname)) + else: + log.error("Clone failed: ({0}, {1}, {2}, reason -> {3})".format(volname, groupname, subvolname, ve)) + +def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): + try: + do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel) + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_SUCCESS) + except VolumeException as ve: + update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve) + log_clone_failure(volname, groupname, subvolname, ve) + next_state = get_next_state_on_error(ve.errno) + except OpSmException as oe: + raise VolumeException(oe.errno, oe.error_str) + return (next_state, False) + +def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): + try: + with open_volume(fs_client, volname) as fs_handle: + # detach source but leave the clone section intact for later inspection + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: + clone_volumes[1].detach_snapshot(clone_volumes[2], index) + except (MetadataMgrException, VolumeException) as e: + log.error("failed to detach clone from snapshot: {0}".format(e)) + return (None, True) + +def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): + try: + with open_volume(fs_client, volname) as fs_handle: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: + clone_volumes[1].detach_snapshot(clone_volumes[2], index) + clone_volumes[0].remove_clone_source(flush=True) + except (MetadataMgrException, VolumeException) as e: + log.error("failed to detach clone from snapshot: {0}".format(e)) + return (None, True) + +def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay): + finished = False + current_state = None + try: + current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname) + log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) + if current_state == SubvolumeStates.STATE_PENDING: + time.sleep(snapshot_clone_delay) + log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay)) + while not finished: + handler = state_table.get(current_state, None) + if not handler: + raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) + (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel) + if next_state: + log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ + current_state, next_state)) + set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state) + current_state = next_state + except (MetadataMgrException, VolumeException) as e: + log.error(f"clone failed for ({volname}, {groupname}, {subvolname}) " + f"(current_state: {current_state}, reason: {e} {os.strerror(-e.args[0])})") + raise + +def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay): + log.info("cloning to subvolume path: {0}".format(clone_path)) + resolved = resolve(volspec, clone_path) + + groupname = resolved[0] + subvolname = resolved[1] + log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) + + try: + log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay) + log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + except (MetadataMgrException, VolumeException) as e: + log.error(f"clone failed for ({volname}, {groupname}, {subvolname}), reason: {e} {os.strerror(-e.args[0])}") + +class Cloner(AsyncJobs): + """ + Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. + this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as + the driver. file types supported are directories, symbolic links and regular files. + """ + def __init__(self, volume_client, tp_size, snapshot_clone_delay): + self.vc = volume_client + self.snapshot_clone_delay = snapshot_clone_delay + self.state_table = { + SubvolumeStates.STATE_PENDING : handle_clone_pending, + SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, + SubvolumeStates.STATE_COMPLETE : handle_clone_complete, + SubvolumeStates.STATE_FAILED : handle_clone_failed, + SubvolumeStates.STATE_CANCELED : handle_clone_failed, + } + super(Cloner, self).__init__(volume_client, "cloner", tp_size) + + def reconfigure_max_concurrent_clones(self, tp_size): + return super(Cloner, self).reconfigure_max_async_threads(tp_size) + + def reconfigure_snapshot_clone_delay(self, timeout): + self.snapshot_clone_delay = timeout + + def is_clone_cancelable(self, clone_state): + return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) + + def get_clone_tracking_index(self, fs_handle, clone_subvolume): + with open_clone_index(fs_handle, self.vc.volspec) as index: + return index.find_clone_entry_index(clone_subvolume.base_path) + + def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx): + clone_state = SubvolumeStates.from_value(status['state']) + assert self.is_clone_cancelable(clone_state) + + s_groupname = status['source'].get('group', None) + s_subvolname = status['source']['subvolume'] + s_snapname = status['source']['snapshot'] + + with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume, + clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + clone_state, + SubvolumeActions.ACTION_CANCELLED) + clone_subvolume.state = (next_state, True) + clone_subvolume.add_clone_failure(errno.EINTR, "user interrupted clone operation") + s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) + + def cancel_job(self, volname, job): + """ + override base class `cancel_job`. interpret @job as (clone, group) tuple. + """ + clonename = job[0] + groupname = job[1] + track_idx = None + + try: + with open_volume(self.fs_client, volname) as fs_handle: + with open_group(fs_handle, self.vc.volspec, groupname) as group: + with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: + status = clone_subvolume.status + clone_state = SubvolumeStates.from_value(status['state']) + if not self.is_clone_cancelable(clone_state): + raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") + track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) + if not track_idx: + log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) + raise VolumeException(-errno.EINVAL, "error canceling clone") + clone_job = (track_idx, clone_subvolume.base_path) + jobs = [j[0] for j in self.jobs[volname]] + with lock_timeout_log(self.lock): + if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state) and not clone_job in jobs: + logging.debug("Cancelling pending job {0}".format(clone_job)) + # clone has not started yet -- cancel right away. + self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx) + return + # cancelling an on-going clone would persist "canceled" state in subvolume metadata. + # to persist the new state, async cloner accesses the volume in exclusive mode. + # accessing the volume in exclusive mode here would lead to deadlock. + assert track_idx is not None + with lock_timeout_log(self.lock): + with open_volume_lockless(self.fs_client, volname) as fs_handle: + with open_group(fs_handle, self.vc.volspec, groupname) as group: + with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: + if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): + raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") + except (IndexException, MetadataMgrException) as e: + log.error("error cancelling clone {0}: ({1})".format(job, e)) + raise VolumeException(-errno.EINVAL, "error canceling clone") + + def get_next_job(self, volname, running_jobs): + return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs) + + def execute_job(self, volname, job, should_cancel): + clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay) -- cgit v1.2.3