diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/volumes/fs | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs')
29 files changed, 5172 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/__init__.py b/src/pybind/mgr/volumes/fs/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/pybind/mgr/volumes/fs/__init__.py diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py new file mode 100644 index 00000000..0c5155e7 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -0,0 +1,345 @@ +import os +import stat +import time +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .async_job import AsyncJobs +from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException +from .fs_util import copy_file +from .operations.versions.op_sm import SubvolumeOpSm +from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions +from .operations.resolver import resolve +from .operations.volume import open_volume, open_volume_lockless +from .operations.group import open_group +from .operations.subvolume import open_subvol +from .operations.clone_index import open_clone_index +from .operations.template import SubvolumeOpType + +log = logging.getLogger(__name__) + +# helper for fetching a clone entry for a given volume +def get_next_clone_entry(volume_client, volname, running_jobs): + log.debug("fetching clone entry for volume '{0}'".format(volname)) + + try: + with open_volume_lockless(volume_client, volname) as fs_handle: + try: + with open_clone_index(fs_handle, volume_client.volspec) as clone_index: + job = clone_index.get_oldest_clone_entry(running_jobs) + return 0, job + except IndexException as ve: + if ve.errno == -errno.ENOENT: + return 0, None + raise ve + except VolumeException as ve: + log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) + return ve.errno, None + +@contextmanager +def open_at_volume(volume_client, volname, groupname, subvolname, op_type): + with open_volume(volume_client, volname) as fs_handle: + with open_group(fs_handle, volume_client.volspec, groupname) as group: + with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: + yield subvolume + +@contextmanager +def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type): + with open_group(fs_handle, volume_client.volspec, groupname) as group: + with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: + yield subvolume + +@contextmanager +def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): + # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return + # the clone subvolume as the source subvolume + if s_groupname == c_groupname and s_subvolname == c_subvolname: + yield c_subvolume + else: + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume: + yield s_subvolume + + +@contextmanager +def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname): + with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: + s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) + if groupname == s_groupname and subvolname == s_subvolname: + # use the same subvolume to avoid metadata overwrites + yield (clone_subvolume, clone_subvolume, s_snapname) + else: + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: + yield (clone_subvolume, source_subvolume, s_snapname) + +def get_clone_state(volume_client, volname, groupname, subvolname): + with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: + return subvolume.state + +def set_clone_state(volume_client, volname, groupname, subvolname, state): + with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: + subvolume.state = (state, True) + +def get_clone_source(clone_subvolume): + source = clone_subvolume._get_clone_source() + return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) + +def get_next_state_on_error(errnum): + if errnum == -errno.EINTR: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_CANCELLED) + else: + # jump to failed state, on all other errors + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_FAILED) + return next_state + +def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + if should_cancel(): + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_CANCELLED) + else: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_SUCCESS) + except OpSmException as oe: + raise VolumeException(oe.errno, oe.error_str) + return (next_state, False) + +def sync_attrs(fs_handle, target_path, source_statx): + try: + fs_handle.lchown(target_path, int(source_statx["uid"]), int(source_statx["gid"])) + fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()), + time.mktime(source_statx["mtime"].timetuple()))) + except cephfs.Error as e: + log.warn("error synchronizing attrs for {0} ({1})".format(target_path, e)) + raise e + +def bulk_copy(fs_handle, source_path, dst_path, should_cancel): + """ + bulk copy data from source to destination -- only directories, symlinks + and regular files are synced. + """ + log.info("copying data from {0} to {1}".format(source_path, dst_path)) + def cptree(src_root_path, dst_root_path): + log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) + try: + with fs_handle.opendir(src_root_path) as dir_handle: + d = fs_handle.readdir(dir_handle) + while d and not should_cancel(): + if d.d_name not in (b".", b".."): + log.debug("d={0}".format(d)) + d_full_src = os.path.join(src_root_path, d.d_name) + d_full_dst = os.path.join(dst_root_path, d.d_name) + stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE | + cephfs.CEPH_STATX_UID | + cephfs.CEPH_STATX_GID | + cephfs.CEPH_STATX_ATIME | + cephfs.CEPH_STATX_MTIME | + cephfs.CEPH_STATX_SIZE, + cephfs.AT_SYMLINK_NOFOLLOW) + handled = True + mo = stx["mode"] & ~stat.S_IFMT(stx["mode"]) + if stat.S_ISDIR(stx["mode"]): + log.debug("cptree: (DIR) {0}".format(d_full_src)) + try: + fs_handle.mkdir(d_full_dst, mo) + except cephfs.Error as e: + if not e.args[0] == errno.EEXIST: + raise + cptree(d_full_src, d_full_dst) + elif stat.S_ISLNK(stx["mode"]): + log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) + target = fs_handle.readlink(d_full_src, 4096) + try: + fs_handle.symlink(target[:stx["size"]], d_full_dst) + except cephfs.Error as e: + if not e.args[0] == errno.EEXIST: + raise + elif stat.S_ISREG(stx["mode"]): + log.debug("cptree: (REG) {0}".format(d_full_src)) + copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel) + else: + handled = False + log.warn("cptree: (IGNORE) {0}".format(d_full_src)) + if handled: + sync_attrs(fs_handle, d_full_dst, stx) + d = fs_handle.readdir(dir_handle) + stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME | + cephfs.CEPH_STATX_MTIME, + cephfs.AT_SYMLINK_NOFOLLOW) + fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()), + time.mktime(stx_root["mtime"].timetuple()))) + except cephfs.Error as e: + if not e.args[0] == errno.ENOENT: + raise VolumeException(-e.args[0], e.args[1]) + cptree(source_path, dst_path) + if should_cancel(): + raise VolumeException(-errno.EINTR, "clone operation interrupted") + +def do_clone(volume_client, volname, groupname, subvolname, should_cancel): + with open_volume_lockless(volume_client, volname) as fs_handle: + with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: + src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) + dst_path = clone_volumes[0].path + bulk_copy(fs_handle, src_path, dst_path, should_cancel) + +def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + do_clone(volume_client, volname, groupname, subvolname, should_cancel) + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_SUCCESS) + except VolumeException as ve: + next_state = get_next_state_on_error(ve.errno) + except OpSmException as oe: + raise VolumeException(oe.errno, oe.error_str) + return (next_state, False) + +def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + with open_volume(volume_client, volname) as fs_handle: + # detach source but leave the clone section intact for later inspection + with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: + clone_volumes[1].detach_snapshot(clone_volumes[2], index) + except (MetadataMgrException, VolumeException) as e: + log.error("failed to detach clone from snapshot: {0}".format(e)) + return (None, True) + +def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + with open_volume(volume_client, volname) as fs_handle: + with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: + clone_volumes[1].detach_snapshot(clone_volumes[2], index) + clone_volumes[0].remove_clone_source(flush=True) + except (MetadataMgrException, VolumeException) as e: + log.error("failed to detach clone from snapshot: {0}".format(e)) + return (None, True) + +def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel): + finished = False + current_state = None + try: + current_state = get_clone_state(volume_client, volname, groupname, subvolname) + log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) + while not finished: + handler = state_table.get(current_state, None) + if not handler: + raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) + (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel) + if next_state: + log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ + current_state, next_state)) + set_clone_state(volume_client, volname, groupname, subvolname, next_state) + current_state = next_state + except VolumeException as ve: + log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ + subvolname, current_state, ve)) + +def clone(volume_client, volname, index, clone_path, state_table, should_cancel): + log.info("cloning to subvolume path: {0}".format(clone_path)) + resolved = resolve(volume_client.volspec, clone_path) + + groupname = resolved[0] + subvolname = resolved[1] + log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) + + try: + log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel) + log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + except VolumeException as ve: + log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) + +class Cloner(AsyncJobs): + """ + Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. + this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as + the driver. file types supported are directories, symbolic links and regular files. + """ + def __init__(self, volume_client, tp_size): + self.vc = volume_client + self.state_table = { + SubvolumeStates.STATE_PENDING : handle_clone_pending, + SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, + SubvolumeStates.STATE_COMPLETE : handle_clone_complete, + SubvolumeStates.STATE_FAILED : handle_clone_failed, + SubvolumeStates.STATE_CANCELED : handle_clone_failed, + } + super(Cloner, self).__init__(volume_client, "cloner", tp_size) + + def reconfigure_max_concurrent_clones(self, tp_size): + super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size) + + def is_clone_cancelable(self, clone_state): + return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) + + def get_clone_tracking_index(self, fs_handle, clone_subvolume): + with open_clone_index(fs_handle, self.vc.volspec) as index: + return index.find_clone_entry_index(clone_subvolume.base_path) + + def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx): + clone_state = SubvolumeStates.from_value(status['state']) + assert self.is_clone_cancelable(clone_state) + + s_groupname = status['source'].get('group', None) + s_subvolname = status['source']['subvolume'] + s_snapname = status['source']['snapshot'] + + with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname, + clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: + next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, + clone_state, + SubvolumeActions.ACTION_CANCELLED) + clone_subvolume.state = (next_state, True) + s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) + + def cancel_job(self, volname, job): + """ + override base class `cancel_job`. interpret @job as (clone, group) tuple. + """ + clonename = job[0] + groupname = job[1] + track_idx = None + + try: + with open_volume(self.vc, volname) as fs_handle: + with open_group(fs_handle, self.vc.volspec, groupname) as group: + with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: + status = clone_subvolume.status + clone_state = SubvolumeStates.from_value(status['state']) + if not self.is_clone_cancelable(clone_state): + raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") + track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) + if not track_idx: + log.warn("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) + raise VolumeException(-errno.EINVAL, "error canceling clone") + if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state): + # clone has not started yet -- cancel right away. + self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx) + return + # cancelling an on-going clone would persist "canceled" state in subvolume metadata. + # to persist the new state, async cloner accesses the volume in exclusive mode. + # accessing the volume in exclusive mode here would lead to deadlock. + assert track_idx is not None + with self.lock: + with open_volume_lockless(self.vc, volname) as fs_handle: + with open_group(fs_handle, self.vc.volspec, groupname) as group: + with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: + if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): + raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") + except (IndexException, MetadataMgrException) as e: + log.error("error cancelling clone {0}: ({1})".format(job, e)) + raise VolumeException(-errno.EINVAL, "error canceling clone") + + def get_next_job(self, volname, running_jobs): + return get_next_clone_entry(self.vc, volname, running_jobs) + + def execute_job(self, volname, job, should_cancel): + clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py new file mode 100644 index 00000000..954e89c4 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -0,0 +1,279 @@ +import sys +import time +import logging +import threading +import traceback +from collections import deque + +from .exception import NotImplementedException + +log = logging.getLogger(__name__) + +class JobThread(threading.Thread): + # this is "not" configurable and there is no need for it to be + # configurable. if a thread encounters an exception, we retry + # until it hits this many consecutive exceptions. + MAX_RETRIES_ON_EXCEPTION = 10 + + def __init__(self, async_job, volume_client, name): + self.vc = volume_client + self.async_job = async_job + # event object to cancel jobs + self.cancel_event = threading.Event() + threading.Thread.__init__(self, name=name) + + def run(self): + retries = 0 + thread_id = threading.currentThread() + assert isinstance(thread_id, JobThread) + thread_name = thread_id.getName() + log.debug("thread [{0}] starting".format(thread_name)) + + while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: + vol_job = None + try: + # fetch next job to execute + with self.async_job.lock: + while True: + if self.should_reconfigure_num_threads(): + log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) + self.async_job.threads.remove(self) + return + vol_job = self.async_job.get_job() + if vol_job: + break + self.async_job.cv.wait() + self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) + + # execute the job (outside lock) + self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel()) + retries = 0 + except NotImplementedException: + raise + except Exception: + # unless the jobs fetching and execution routines are not implemented + # retry till we hit cap limit. + retries += 1 + log.warning("thread [{0}] encountered fatal error: (attempt#" \ + " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION)) + exc_type, exc_value, exc_traceback = sys.exc_info() + log.warning("traceback: {0}".format("".join( + traceback.format_exception(exc_type, exc_value, exc_traceback)))) + finally: + # when done, unregister the job + if vol_job: + with self.async_job.lock: + self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id) + time.sleep(1) + log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) + self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) + with self.async_job.lock: + self.async_job.threads.remove(self) + + def should_reconfigure_num_threads(self): + # reconfigure of max_concurrent_clones + return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs + + def cancel_job(self): + self.cancel_event.set() + + def should_cancel(self): + return self.cancel_event.is_set() + + def reset_cancel(self): + self.cancel_event.clear() + +class AsyncJobs(object): + """ + Class providing asynchronous execution of jobs via worker threads. + `jobs` are grouped by `volume`, so a `volume` can have N number of + `jobs` executing concurrently (capped by number of concurrent jobs). + + Usability is simple: subclass this and implement the following: + - get_next_job(volname, running_jobs) + - execute_job(volname, job, should_cancel) + + ... and do not forget to invoke base class constructor. + + Job cancelation is for a volume as a whole, i.e., all executing jobs + for a volume are canceled. Cancelation is poll based -- jobs need to + periodically check if cancelation is requested, after which the job + should return as soon as possible. Cancelation check is provided + via `should_cancel()` lambda passed to `execute_job()`. + """ + + def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): + self.vc = volume_client + # queue of volumes for starting async jobs + self.q = deque() + # volume => job tracking + self.jobs = {} + # lock, cv for kickstarting jobs + self.lock = threading.Lock() + self.cv = threading.Condition(self.lock) + # cv for job cancelation + self.waiting = False + self.cancel_cv = threading.Condition(self.lock) + self.nr_concurrent_jobs = nr_concurrent_jobs + + self.threads = [] + for i in range(nr_concurrent_jobs): + self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) + self.threads[-1].start() + + def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs): + """ + reconfigure number of cloner threads + """ + with self.lock: + self.nr_concurrent_jobs = nr_concurrent_jobs + # Decrease in concurrency. Notify threads which are waiting for a job to terminate. + if len(self.threads) > nr_concurrent_jobs: + self.cv.notifyAll() + # Increase in concurrency + if len(self.threads) < nr_concurrent_jobs: + for i in range(len(self.threads), nr_concurrent_jobs): + self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i))) + self.threads[-1].start() + + def get_job(self): + log.debug("processing {0} volume entries".format(len(self.q))) + nr_vols = len(self.q) + to_remove = [] + next_job = None + while nr_vols > 0: + volname = self.q[0] + # do this now so that the other thread pick up jobs for other volumes + self.q.rotate(1) + running_jobs = [j[0] for j in self.jobs[volname]] + (ret, job) = self.get_next_job(volname, running_jobs) + if job: + next_job = (volname, job) + break + # this is an optimization when for a given volume there are no more + # jobs and no jobs are in progress. in such cases we remove the volume + # from the tracking list so as to: + # + # a. not query the filesystem for jobs over and over again + # b. keep the filesystem connection idle so that it can be freed + # from the connection pool + # + # if at all there are jobs for a volume, the volume gets added again + # to the tracking list and the jobs get kickstarted. + # note that, we do not iterate the volume list fully if there is a + # jobs to process (that will take place eventually). + if ret == 0 and not job and not running_jobs: + to_remove.append(volname) + nr_vols -= 1 + for vol in to_remove: + log.debug("auto removing volume '{0}' from tracked volumes".format(vol)) + self.q.remove(vol) + self.jobs.pop(vol) + return next_job + + def register_async_job(self, volname, job, thread_id): + log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id)) + self.jobs[volname].append((job, thread_id)) + + def unregister_async_job(self, volname, job, thread_id): + log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id)) + self.jobs[volname].remove((job, thread_id)) + + cancelled = thread_id.should_cancel() + thread_id.reset_cancel() + + # wake up cancellation waiters if needed + if cancelled: + logging.info("waking up cancellation waiters") + self.cancel_cv.notifyAll() + + def queue_job(self, volname): + """ + queue a volume for asynchronous job execution. + """ + log.info("queuing job for volume '{0}'".format(volname)) + with self.lock: + if not volname in self.q: + self.q.append(volname) + self.jobs[volname] = [] + self.cv.notifyAll() + + def _cancel_jobs(self, volname): + """ + cancel all jobs for the volume. do nothing is the no jobs are + executing for the given volume. this would wait until all jobs + get interrupted and finish execution. + """ + log.info("cancelling jobs for volume '{0}'".format(volname)) + try: + if not volname in self.q and not volname in self.jobs: + return + self.q.remove(volname) + # cancel in-progress operation and wait until complete + for j in self.jobs[volname]: + j[1].cancel_job() + # wait for cancellation to complete + while self.jobs[volname]: + log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \ + "cancel".format(len(self.jobs[volname]), volname)) + self.cancel_cv.wait() + self.jobs.pop(volname) + except (KeyError, ValueError): + pass + + def _cancel_job(self, volname, job): + """ + cancel a executing job for a given volume. return True if canceled, False + otherwise (volume/job not found). + """ + canceled = False + log.info("canceling job {0} for volume {1}".format(job, volname)) + try: + if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]: + return canceled + for j in self.jobs[volname]: + if j[0] == job: + j[1].cancel_job() + # be safe against _cancel_jobs() running concurrently + while j in self.jobs.get(volname, []): + self.cancel_cv.wait() + canceled = True + break + except (KeyError, ValueError): + pass + return canceled + + def cancel_job(self, volname, job): + with self.lock: + return self._cancel_job(volname, job) + + def cancel_jobs(self, volname): + """ + cancel all executing jobs for a given volume. + """ + with self.lock: + self._cancel_jobs(volname) + + def cancel_all_jobs(self): + """ + call all executing jobs for all volumes. + """ + with self.lock: + for volname in list(self.q): + self._cancel_jobs(volname) + + def get_next_job(self, volname, running_jobs): + """ + get the next job for asynchronous execution as (retcode, job) tuple. if no + jobs are available return (0, None) else return (0, job). on error return + (-ret, None). called under `self.lock`. + """ + raise NotImplementedException() + + def execute_job(self, volname, job, should_cancel): + """ + execute a job for a volume. the job can block on I/O operations, sleep for long + hours and do all kinds of synchronous work. called outside `self.lock`. + """ + raise NotImplementedException() + diff --git a/src/pybind/mgr/volumes/fs/exception.py b/src/pybind/mgr/volumes/fs/exception.py new file mode 100644 index 00000000..4f903b99 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/exception.py @@ -0,0 +1,63 @@ +class VolumeException(Exception): + def __init__(self, error_code, error_message): + self.errno = error_code + self.error_str = error_message + + def to_tuple(self): + return self.errno, "", self.error_str + + def __str__(self): + return "{0} ({1})".format(self.errno, self.error_str) + +class MetadataMgrException(Exception): + def __init__(self, error_code, error_message): + self.errno = error_code + self.error_str = error_message + + def __str__(self): + return "{0} ({1})".format(self.errno, self.error_str) + +class IndexException(Exception): + def __init__(self, error_code, error_message): + self.errno = error_code + self.error_str = error_message + + def __str__(self): + return "{0} ({1})".format(self.errno, self.error_str) + +class OpSmException(Exception): + def __init__(self, error_code, error_message): + self.errno = error_code + self.error_str = error_message + + def __str__(self): + return "{0} ({1})".format(self.errno, self.error_str) + +class NotImplementedException(Exception): + pass + +class ClusterTimeout(Exception): + """ + Exception indicating that we timed out trying to talk to the Ceph cluster, + either to the mons, or to any individual daemon that the mons indicate ought + to be up but isn't responding to us. + """ + pass + +class ClusterError(Exception): + """ + Exception indicating that the cluster returned an error to a command that + we thought should be successful based on our last knowledge of the cluster + state. + """ + def __init__(self, action, result_code, result_str): + self._action = action + self._result_code = result_code + self._result_str = result_str + + def __str__(self): + return "Error {0} (\"{1}\") while {2}".format( + self._result_code, self._result_str, self._action) + +class EvictionError(Exception): + pass diff --git a/src/pybind/mgr/volumes/fs/fs_util.py b/src/pybind/mgr/volumes/fs/fs_util.py new file mode 100644 index 00000000..6fe02f58 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/fs_util.py @@ -0,0 +1,161 @@ +import os +import errno +import logging + +import cephfs +import orchestrator + +from .exception import VolumeException + +log = logging.getLogger(__name__) + +def create_pool(mgr, pool_name, pg_num): + # create the given pool + command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num} + return mgr.mon_command(command) + +def remove_pool(mgr, pool_name): + command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name, + 'yes_i_really_really_mean_it': True} + return mgr.mon_command(command) + +def create_filesystem(mgr, fs_name, metadata_pool, data_pool): + command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool, + 'data': data_pool} + return mgr.mon_command(command) + +def remove_filesystem(mgr, fs_name): + command = {'prefix': 'fs fail', 'fs_name': fs_name} + r, outb, outs = mgr.mon_command(command) + if r != 0: + return r, outb, outs + + command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True} + return mgr.mon_command(command) + +def create_mds(mgr, fs_name): + spec = orchestrator.StatelessServiceSpec() + spec.name = fs_name + try: + completion = mgr.add_stateless_service("mds", spec) + mgr._orchestrator_wait([completion]) + orchestrator.raise_if_exception(completion) + except (ImportError, orchestrator.OrchestratorError): + return 0, "", "Volume created successfully (no MDS daemons created)" + except Exception as e: + # Don't let detailed orchestrator exceptions (python backtraces) + # bubble out to the user + log.exception("Failed to create MDS daemons") + return -errno.EINVAL, "", str(e) + return 0, "", "" + +def volume_exists(mgr, fs_name): + fs_map = mgr.get('fs_map') + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == fs_name: + return True + return False + +def listdir(fs, dirpath): + """ + Get the directory names (only dirs) for a given path + """ + dirs = [] + try: + with fs.opendir(dirpath) as dir_handle: + d = fs.readdir(dir_handle) + while d: + if (d.d_name not in (b".", b"..")) and d.is_dir(): + dirs.append(d.d_name) + d = fs.readdir(dir_handle) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + return dirs + +def is_inherited_snap(snapname): + """ + Returns True if the snapname is inherited else False + """ + return snapname.startswith("_") + +def listsnaps(fs, volspec, snapdirpath, filter_inherited_snaps=False): + """ + Get the snap names from a given snap directory path + """ + if os.path.basename(snapdirpath) != volspec.snapshot_prefix.encode('utf-8'): + raise VolumeException(-errno.EINVAL, "Not a snap directory: {0}".format(snapdirpath)) + snaps = [] + try: + with fs.opendir(snapdirpath) as dir_handle: + d = fs.readdir(dir_handle) + while d: + if (d.d_name not in (b".", b"..")) and d.is_dir(): + d_name = d.d_name.decode('utf-8') + if not is_inherited_snap(d_name): + snaps.append(d.d_name) + elif is_inherited_snap(d_name) and not filter_inherited_snaps: + snaps.append(d.d_name) + d = fs.readdir(dir_handle) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + return snaps + +def list_one_entry_at_a_time(fs, dirpath): + """ + Get a directory entry (one entry a time) + """ + try: + with fs.opendir(dirpath) as dir_handle: + d = fs.readdir(dir_handle) + while d: + if d.d_name not in (b".", b".."): + yield d + d = fs.readdir(dir_handle) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +def copy_file(fs, src, dst, mode, cancel_check=None): + """ + Copy a regular file from @src to @dst. @dst is overwritten if it exists. + """ + src_fd = dst_fd = None + try: + src_fd = fs.open(src, os.O_RDONLY); + dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode) + except cephfs.Error as e: + if src_fd is not None: + fs.close(src_fd) + if dst_fd is not None: + fs.close(dst_fd) + raise VolumeException(-e.args[0], e.args[1]) + + IO_SIZE = 8 * 1024 * 1024 + try: + while True: + if cancel_check and cancel_check(): + raise VolumeException(-errno.EINTR, "copy operation interrupted") + data = fs.read(src_fd, -1, IO_SIZE) + if not len(data): + break + written = 0 + while written < len(data): + written += fs.write(dst_fd, data[written:], -1) + fs.fsync(dst_fd, 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + finally: + fs.close(src_fd) + fs.close(dst_fd) + +def get_ancestor_xattr(fs, path, attr): + """ + Helper for reading layout information: if this xattr is missing + on the requested path, keep checking parents until we find it. + """ + try: + return fs.getxattr(path, attr).decode('utf-8') + except cephfs.NoData as e: + if path == "/": + raise VolumeException(-e.args[0], e.args[1]) + else: + return get_ancestor_xattr(fs, os.path.split(path)[0], attr) diff --git a/src/pybind/mgr/volumes/fs/operations/__init__.py b/src/pybind/mgr/volumes/fs/operations/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/__init__.py diff --git a/src/pybind/mgr/volumes/fs/operations/access.py b/src/pybind/mgr/volumes/fs/operations/access.py new file mode 100644 index 00000000..44430f59 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/access.py @@ -0,0 +1,142 @@ +import errno +import json +try: + from typing import List +except ImportError: + pass # For typing only + +def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True): + caps_list = [] # type: List[str] + for k, v in existing_caps['caps'].items(): + if k == 'mds' or k == 'osd': + continue + elif k == 'mon': + if not authorize and v == 'allow r': + continue + caps_list.extend((k,v)) + + if mds_cap_str: + caps_list.extend(('mds', mds_cap_str)) + if osd_cap_str: + caps_list.extend(('osd', osd_cap_str)) + + if authorize and 'mon' not in caps_list: + caps_list.extend(('mon', 'allow r')) + + return caps_list + + +def allow_access(mgr, client_entity, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap, existing_caps): + if existing_caps is None: + ret, out, err = mgr.mon_command({ + "prefix": "auth get-or-create", + "entity": client_entity, + "caps": ['mds', want_mds_cap, 'osd', want_osd_cap, 'mon', 'allow r'], + "format": "json"}) + else: + cap = existing_caps[0] + + def cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, + want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): + + if not orig_mds_caps: + return want_mds_cap, want_osd_cap + + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + if want_mds_cap in mds_cap_tokens: + return orig_mds_caps, orig_osd_caps + + if unwanted_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(unwanted_mds_cap) + osd_cap_tokens.remove(unwanted_osd_cap) + + mds_cap_tokens.append(want_mds_cap) + osd_cap_tokens.append(want_osd_cap) + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + + mds_cap_str, osd_cap_str = cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap) + + caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str) + mgr.mon_command( + { + "prefix": "auth caps", + 'entity': client_entity, + 'caps': caps_list + }) + ret, out, err = mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + + # Result expected like this: + # [ + # { + # "entity": "client.foobar", + # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", + # "caps": { + # "mds": "allow *", + # "mon": "allow *" + # } + # } + # ] + + caps = json.loads(out) + assert len(caps) == 1 + assert caps[0]['entity'] == client_entity + return caps[0]['key'] + +def deny_access(mgr, client_entity, want_mds_caps, want_osd_caps): + ret, out, err = mgr.mon_command({ + "prefix": "auth get", + "entity": client_entity, + "format": "json", + }) + + if ret == -errno.ENOENT: + # Already gone, great. + return + + def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): + if want_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(want_mds_cap) + osd_cap_tokens.remove(want_osd_cap) + break + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + cap = json.loads(out)[0] + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, + want_mds_caps, want_osd_caps) + + caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False) + if not caps_list: + mgr.mon_command( + { + 'prefix': 'auth rm', + 'entity': client_entity + }) + else: + mgr.mon_command( + { + "prefix": "auth caps", + 'entity': client_entity, + 'caps': caps_list + }) diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py new file mode 100644 index 00000000..a2b31f85 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py @@ -0,0 +1,98 @@ +import os +import uuid +import stat +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .index import Index +from ..exception import IndexException, VolumeException +from ..fs_util import list_one_entry_at_a_time + +log = logging.getLogger(__name__) + +class CloneIndex(Index): + SUB_GROUP_NAME = "clone" + PATH_MAX = 4096 + + @property + def path(self): + return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8')) + + def _track(self, sink_path): + tracking_id = str(uuid.uuid4()) + source_path = os.path.join(self.path, tracking_id.encode('utf-8')) + log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path)) + + self.fs.symlink(sink_path, source_path) + return tracking_id + + def track(self, sink_path): + try: + return self._track(sink_path) + except (VolumeException, cephfs.Error) as e: + if isinstance(e, cephfs.Error): + e = IndexException(-e.args[0], e.args[1]) + elif isinstance(e, VolumeException): + e = IndexException(e.errno, e.error_str) + raise e + + def untrack(self, tracking_id): + log.info("untracking {0}".format(tracking_id)) + source_path = os.path.join(self.path, tracking_id.encode('utf-8')) + try: + self.fs.unlink(source_path) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + + def get_oldest_clone_entry(self, exclude=[]): + min_ctime_entry = None + exclude_tracking_ids = [v[0] for v in exclude] + log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids)) + for entry in list_one_entry_at_a_time(self.fs, self.path): + dname = entry.d_name + dpath = os.path.join(self.path, dname) + st = self.fs.lstat(dpath) + if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode): + if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime: + min_ctime_entry = (dname, st) + if min_ctime_entry: + try: + linklen = min_ctime_entry[1].st_size + sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX) + return (min_ctime_entry[0], sink_path[:linklen]) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + return None + + def find_clone_entry_index(self, sink_path): + try: + for entry in list_one_entry_at_a_time(self.fs, self.path): + dname = entry.d_name + dpath = os.path.join(self.path, dname) + st = self.fs.lstat(dpath) + if stat.S_ISLNK(st.st_mode): + target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX) + if sink_path == target_path[:st.st_size]: + return dname + return None + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + +def create_clone_index(fs, vol_spec): + clone_index = CloneIndex(fs, vol_spec) + try: + fs.mkdirs(clone_index.path, 0o700) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + +@contextmanager +def open_clone_index(fs, vol_spec): + clone_index = CloneIndex(fs, vol_spec) + try: + fs.stat(clone_index.path) + except cephfs.Error as e: + raise IndexException(-e.args[0], e.args[1]) + yield clone_index diff --git a/src/pybind/mgr/volumes/fs/operations/group.py b/src/pybind/mgr/volumes/fs/operations/group.py new file mode 100644 index 00000000..ae334563 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/group.py @@ -0,0 +1,186 @@ +import os +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .snapshot_util import mksnap, rmsnap +from .template import GroupTemplate +from ..fs_util import listdir, listsnaps, get_ancestor_xattr +from ..exception import VolumeException + +log = logging.getLogger(__name__) + +class Group(GroupTemplate): + # Reserved subvolume group name which we use in paths for subvolumes + # that are not assigned to a group (i.e. created with group=None) + NO_GROUP_NAME = "_nogroup" + + def __init__(self, fs, vol_spec, groupname): + assert groupname != Group.NO_GROUP_NAME + self.fs = fs + self.user_id = None + self.group_id = None + self.vol_spec = vol_spec + self.groupname = groupname if groupname else Group.NO_GROUP_NAME + + @property + def path(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8')) + + @property + def group_name(self): + return self.groupname + + @property + def uid(self): + return self.user_id + + @uid.setter + def uid(self, val): + self.user_id = val + + @property + def gid(self): + return self.group_id + + @gid.setter + def gid(self, val): + self.group_id = val + + def is_default_group(self): + return self.groupname == Group.NO_GROUP_NAME + + def list_subvolumes(self): + try: + return listdir(self.fs, self.path) + except VolumeException as ve: + # listing a default group when it's not yet created + if ve.errno == -errno.ENOENT and self.is_default_group(): + return [] + raise + + def create_snapshot(self, snapname): + snappath = os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + mksnap(self.fs, snappath) + + def remove_snapshot(self, snapname): + snappath = os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + rmsnap(self.fs, snappath) + + def list_snapshots(self): + try: + dirpath = os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return [] + raise + +def create_group(fs, vol_spec, groupname, pool, mode, uid, gid): + """ + create a subvolume group. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param groupname: subvolume group name + :param pool: the RADOS pool where the data objects of the subvolumes will be stored + :param mode: the user permissions + :param uid: the user identifier + :param gid: the group identifier + :return: None + """ + group = Group(fs, vol_spec, groupname) + path = group.path + fs.mkdirs(path, mode) + try: + if not pool: + pool = get_ancestor_xattr(fs, path, "ceph.dir.layout.pool") + try: + fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, + "Invalid pool layout '{0}'. It must be a valid data pool".format(pool)) + if uid is None: + uid = 0 + else: + try: + uid = int(uid) + if uid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid UID") + if gid is None: + gid = 0 + else: + try: + gid = int(gid) + if gid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid GID") + fs.chown(path, uid, gid) + except (cephfs.Error, VolumeException) as e: + try: + # cleanup group path on best effort basis + log.debug("cleaning up subvolume group path: {0}".format(path)) + fs.rmdir(path) + except cephfs.Error as ce: + log.debug("failed to clean up subvolume group {0} with path: {1} ({2})".format(groupname, path, ce)) + if isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + +def remove_group(fs, vol_spec, groupname): + """ + remove a subvolume group. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param groupname: subvolume group name + :return: None + """ + group = Group(fs, vol_spec, groupname) + try: + fs.rmdir(group.path) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname)) + raise VolumeException(-e.args[0], e.args[1]) + +@contextmanager +def open_group(fs, vol_spec, groupname): + """ + open a subvolume group. This API is to be used as a context manager. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param groupname: subvolume group name + :return: yields a group object (subclass of GroupTemplate) + """ + group = Group(fs, vol_spec, groupname) + try: + st = fs.stat(group.path) + group.uid = int(st.st_uid) + group.gid = int(st.st_gid) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + if not group.is_default_group(): + raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname)) + else: + raise VolumeException(-e.args[0], e.args[1]) + yield group + +@contextmanager +def open_group_unique(fs, vol_spec, groupname, c_group, c_groupname): + if groupname == c_groupname: + yield c_group + else: + with open_group(fs, vol_spec, groupname) as group: + yield group diff --git a/src/pybind/mgr/volumes/fs/operations/index.py b/src/pybind/mgr/volumes/fs/operations/index.py new file mode 100644 index 00000000..0e4296d7 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/index.py @@ -0,0 +1,23 @@ +import errno +import os + +from ..exception import VolumeException +from .template import GroupTemplate + +class Index(GroupTemplate): + GROUP_NAME = "_index" + + def __init__(self, fs, vol_spec): + self.fs = fs + self.vol_spec = vol_spec + self.groupname = Index.GROUP_NAME + + @property + def path(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8')) + + def track(self, *args): + raise VolumeException(-errno.EINVAL, "operation not supported.") + + def untrack(self, tracking_id): + raise VolumeException(-errno.EINVAL, "operation not supported.") diff --git a/src/pybind/mgr/volumes/fs/operations/lock.py b/src/pybind/mgr/volumes/fs/operations/lock.py new file mode 100644 index 00000000..ab5f1d04 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/lock.py @@ -0,0 +1,42 @@ +from contextlib import contextmanager +import logging +from threading import Lock + +log = logging.getLogger(__name__) + +# singleton design pattern taken from http://www.aleax.it/5ep.html + +class GlobalLock(object): + """ + Global lock to serialize operations in mgr/volumes. This lock + is currently held when accessing (opening) a volume to perform + group/subvolume operations. Since this is a big lock, it's rather + inefficient -- but right now it's ok since mgr/volumes does not + expect concurrent operations via its APIs. + + As and when features get added (such as clone, where mgr/volumes + would maintain subvolume states in the filesystem), there might + be a need to allow concurrent operations. In that case it would + be nice to implement an efficient path based locking mechanism. + + See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf + """ + _shared_state = { + 'lock' : Lock(), + 'init' : False + } + + def __init__(self): + with self._shared_state['lock']: + if not self._shared_state['init']: + self._shared_state['init'] = True + # share this state among all instances + self.__dict__ = self._shared_state + + @contextmanager + def lock_op(self): + log.debug("entering global lock") + with self._shared_state['lock']: + log.debug("acquired global lock") + yield + log.debug("exited global lock") diff --git a/src/pybind/mgr/volumes/fs/operations/rankevicter.py b/src/pybind/mgr/volumes/fs/operations/rankevicter.py new file mode 100644 index 00000000..5b945c38 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/rankevicter.py @@ -0,0 +1,114 @@ +import errno +import json +import logging +import threading +import time + +from .volume import get_mds_map +from ..exception import ClusterTimeout, ClusterError + +log = logging.getLogger(__name__) + +class RankEvicter(threading.Thread): + """ + Thread for evicting client(s) from a particular MDS daemon instance. + + This is more complex than simply sending a command, because we have to + handle cases where MDS daemons might not be fully up yet, and/or might + be transiently unresponsive to commands. + """ + class GidGone(Exception): + pass + + POLL_PERIOD = 5 + + def __init__(self, mgr, fs, client_spec, volname, rank, gid, mds_map, ready_timeout): + """ + :param client_spec: list of strings, used as filter arguments to "session evict" + pass ["id=123"] to evict a single client with session id 123. + """ + self.volname = volname + self.rank = rank + self.gid = gid + self._mds_map = mds_map + self._client_spec = client_spec + self._fs = fs + self._ready_timeout = ready_timeout + self._ready_waited = 0 + self.mgr = mgr + + self.success = False + self.exception = None + + super(RankEvicter, self).__init__() + + def _ready_to_evict(self): + if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: + log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( + self._client_spec, self.rank, self.gid + )) + raise RankEvicter.GidGone() + + info = self._mds_map['info']["gid_{0}".format(self.gid)] + log.debug("_ready_to_evict: state={0}".format(info['state'])) + return info['state'] in ["up:active", "up:clientreplay"] + + def _wait_for_ready(self): + """ + Wait for that MDS rank to reach an active or clientreplay state, and + not be laggy. + """ + while not self._ready_to_evict(): + if self._ready_waited > self._ready_timeout: + raise ClusterTimeout() + + time.sleep(self.POLL_PERIOD) + self._ready_waited += self.POLL_PERIOD + self._mds_map = get_mds_map(self.mgr, self.volname) + + def _evict(self): + """ + Run the eviction procedure. Return true on success, false on errors. + """ + + # Wait til the MDS is believed by the mon to be available for commands + try: + self._wait_for_ready() + except self.GidGone: + return True + + # Then send it an evict + ret = -errno.ETIMEDOUT + while ret == -errno.ETIMEDOUT: + log.debug("mds_command: {0}, {1}".format( + "%s" % self.gid, ["session", "evict"] + self._client_spec + )) + ret, outb, outs = self._fs.mds_command( + "%s" % self.gid, + json.dumps({ + "prefix": "session evict", + "filters": self._client_spec + }), "") + log.debug("mds_command: complete {0} {1}".format(ret, outs)) + + # If we get a clean response, great, it's gone from that rank. + if ret == 0: + return True + elif ret == -errno.ETIMEDOUT: + # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) + self._mds_map = get_mds_map(self.mgr, self.volname) + try: + self._wait_for_ready() + except self.GidGone: + return True + else: + raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) + + def run(self): + try: + self._evict() + except Exception as e: + self.success = False + self.exception = e + else: + self.success = True diff --git a/src/pybind/mgr/volumes/fs/operations/resolver.py b/src/pybind/mgr/volumes/fs/operations/resolver.py new file mode 100644 index 00000000..a9543654 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/resolver.py @@ -0,0 +1,26 @@ +import os + +from .group import Group + +def splitall(path): + if path == "/": + return ["/"] + s = os.path.split(path) + return splitall(s[0]) + [s[1]] + +def resolve(vol_spec, path): + parts = splitall(path) + if len(parts) != 4 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix: + return None + groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2] + subvolname = parts[3] + return (groupname, subvolname) + +def resolve_trash(vol_spec, path): + parts = splitall(path) + if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \ + parts[4] != '.trash': + return None + groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2] + subvolname = parts[3] + return (groupname, subvolname) diff --git a/src/pybind/mgr/volumes/fs/operations/snapshot_util.py b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py new file mode 100644 index 00000000..2223c58e --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py @@ -0,0 +1,30 @@ +import os +import errno + +import cephfs + +from ..exception import VolumeException + +def mksnap(fs, snappath): + """ + Create a snapshot, or do nothing if it already exists. + """ + try: + # snap create does not accept mode -- use default + fs.mkdir(snappath, 0o755) + except cephfs.ObjectExists: + return + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +def rmsnap(fs, snappath): + """ + Remove a snapshot + """ + try: + fs.stat(snappath) + fs.rmdir(snappath) + except cephfs.ObjectNotFound: + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(os.path.basename(snappath))) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) diff --git a/src/pybind/mgr/volumes/fs/operations/subvolume.py b/src/pybind/mgr/volumes/fs/operations/subvolume.py new file mode 100644 index 00000000..dc36477b --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/subvolume.py @@ -0,0 +1,74 @@ +import os +import errno +from contextlib import contextmanager + +from ..exception import VolumeException +from .template import SubvolumeOpType + +from .versions import loaded_subvolumes + +def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid): + """ + create a subvolume (create a subvolume with the max known version). + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the subvolume + :param size: In bytes, or None for no size limit + :param isolate_nspace: If true, use separate RADOS namespace for this subvolume + :param pool: the RADOS pool where the data objects of the subvolumes will be stored + :param mode: the user permissions + :param uid: the user identifier + :param gid: the group identifier + :return: None + """ + subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname) + subvolume.create(size, isolate_nspace, pool, mode, uid, gid) + +def create_clone(mgr, fs, vol_spec, group, subvolname, pool, source_volume, source_subvolume, snapname): + """ + create a cloned subvolume. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the clone + :param subvolname: clone subvolume nam + :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored + :param source_volume: source subvolumes volume name + :param source_subvolume: source (parent) subvolume object + :param snapname: source subvolume snapshot + :return None + """ + subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname) + subvolume.create_clone(pool, source_volume, source_subvolume, snapname) + +def remove_subvol(mgr, fs, vol_spec, group, subvolname, force=False, retainsnaps=False): + """ + remove a subvolume. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the subvolume + :param subvolname: subvolume name + :param force: force remove subvolumes + :return: None + """ + op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE + with open_subvol(mgr, fs, vol_spec, group, subvolname, op_type) as subvolume: + subvolume.remove(retainsnaps) + +@contextmanager +def open_subvol(mgr, fs, vol_spec, group, subvolname, op_type): + """ + open a subvolume. This API is to be used as a context manager. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :param group: group object for the subvolume + :param subvolname: subvolume name + :param op_type: operation type for which subvolume is being opened + :return: yields a subvolume object (subclass of SubvolumeTemplate) + """ + subvolume = loaded_subvolumes.get_subvolume_object(mgr, fs, vol_spec, group, subvolname) + subvolume.open(op_type) + yield subvolume diff --git a/src/pybind/mgr/volumes/fs/operations/template.py b/src/pybind/mgr/volumes/fs/operations/template.py new file mode 100644 index 00000000..d35ad0de --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/template.py @@ -0,0 +1,173 @@ +import errno + +from enum import Enum, unique + +from ..exception import VolumeException + +class GroupTemplate(object): + def list_subvolumes(self): + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create_snapshot(self, snapname): + """ + create a subvolume group snapshot. + + :param: group snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def remove_snapshot(self, snapname): + """ + remove a subvolume group snapshot. + + :param: group snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def list_snapshots(self): + """ + list all subvolume group snapshots. + + :param: None + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + +@unique +class SubvolumeOpType(Enum): + CREATE = 'create' + REMOVE = 'rm' + REMOVE_FORCE = 'rm-force' + PIN = 'pin' + LIST = 'ls' + GETPATH = 'getpath' + INFO = 'info' + RESIZE = 'resize' + SNAP_CREATE = 'snap-create' + SNAP_REMOVE = 'snap-rm' + SNAP_LIST = 'snap-ls' + SNAP_INFO = 'snap-info' + SNAP_PROTECT = 'snap-protect' + SNAP_UNPROTECT = 'snap-unprotect' + CLONE_SOURCE = 'clone-source' + CLONE_CREATE = 'clone-create' + CLONE_STATUS = 'clone-status' + CLONE_CANCEL = 'clone-cancel' + CLONE_INTERNAL = 'clone_internal' + ALLOW_ACCESS = 'allow-access' + DENY_ACCESS = 'deny-access' + AUTH_LIST = 'auth-list' + EVICT = 'evict' + +class SubvolumeTemplate(object): + VERSION = None # type: int + + @staticmethod + def version(): + return SubvolumeTemplate.VERSION + + def open(self, op_type): + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def status(self): + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + """ + set up metadata, pools and auth for a subvolume. + + This function is idempotent. It is safe to call this again + for an already-created subvolume, even if it is in use. + + :param size: In bytes, or None for no size limit + :param isolate_nspace: If true, use separate RADOS namespace for this subvolume + :param pool: the RADOS pool where the data objects of the subvolumes will be stored + :param mode: the user permissions + :param uid: the user identifier + :param gid: the group identifier + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + """ + prepare a subvolume to be cloned. + + :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored + :param source_volname: source volume of snapshot + :param source_subvolume: source subvolume of snapshot + :param snapname: snapshot name to be cloned from + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def remove(self): + """ + make a subvolume inaccessible to guests. + + This function is idempotent. It is safe to call this again + + :param: None + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def resize(self, newsize, nshrink): + """ + resize a subvolume + + :param newsize: new size In bytes (or inf/infinite) + :return: new quota size and used bytes as a tuple + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def create_snapshot(self, snapname): + """ + snapshot a subvolume. + + :param: subvolume snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def remove_snapshot(self, snapname): + """ + remove a subvolume snapshot. + + :param: subvolume snapshot name + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def list_snapshots(self): + """ + list all subvolume snapshots. + + :param: None + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def attach_snapshot(self, snapname, tgt_subvolume): + """ + attach a snapshot to a target cloned subvolume. the target subvolume + should be an empty subvolume (type "clone") in "pending" state. + + :param: snapname: snapshot to attach to a clone + :param: tgt_subvolume: target clone subvolume + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + + def detach_snapshot(self, snapname, tgt_subvolume): + """ + detach a snapshot from a target cloned subvolume. the target subvolume + should either be in "failed" or "completed" state. + + :param: snapname: snapshot to detach from a clone + :param: tgt_subvolume: target clone subvolume + :return: None + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") diff --git a/src/pybind/mgr/volumes/fs/operations/trash.py b/src/pybind/mgr/volumes/fs/operations/trash.py new file mode 100644 index 00000000..66f1d71c --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/trash.py @@ -0,0 +1,145 @@ +import os +import uuid +import logging +from contextlib import contextmanager + +import cephfs + +from .template import GroupTemplate +from ..fs_util import listdir +from ..exception import VolumeException + +log = logging.getLogger(__name__) + +class Trash(GroupTemplate): + GROUP_NAME = "_deleting" + + def __init__(self, fs, vol_spec): + self.fs = fs + self.vol_spec = vol_spec + self.groupname = Trash.GROUP_NAME + + @property + def path(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8')) + + @property + def unique_trash_path(self): + """ + return a unique trash directory entry path + """ + return os.path.join(self.path, str(uuid.uuid4()).encode('utf-8')) + + def _get_single_dir_entry(self, exclude_list=[]): + exclude_list.extend((b".", b"..")) + try: + with self.fs.opendir(self.path) as d: + entry = self.fs.readdir(d) + while entry: + if entry.d_name not in exclude_list: + return entry.d_name + entry = self.fs.readdir(d) + return None + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def get_trash_entry(self, exclude_list): + """ + get a trash entry excluding entries provided. + + :praram exclude_list: entries to exclude + :return: trash entry + """ + return self._get_single_dir_entry(exclude_list) + + def purge(self, trashpath, should_cancel): + """ + purge a trash entry. + + :praram trash_entry: the trash entry to purge + :praram should_cancel: callback to check if the purge should be aborted + :return: None + """ + def rmtree(root_path): + log.debug("rmtree {0}".format(root_path)) + try: + with self.fs.opendir(root_path) as dir_handle: + d = self.fs.readdir(dir_handle) + while d and not should_cancel(): + if d.d_name not in (b".", b".."): + d_full = os.path.join(root_path, d.d_name) + if d.is_dir(): + rmtree(d_full) + else: + self.fs.unlink(d_full) + d = self.fs.readdir(dir_handle) + except cephfs.ObjectNotFound: + return + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + # remove the directory only if we were not asked to cancel + # (else we would fail to remove this anyway) + if not should_cancel(): + self.fs.rmdir(root_path) + + # catch any unlink errors + try: + rmtree(trashpath) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def dump(self, path): + """ + move an filesystem entity to trash can. + + :praram path: the filesystem path to be moved + :return: None + """ + try: + self.fs.rename(path, self.unique_trash_path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def link(self, path, bname): + pth = os.path.join(self.path, bname) + try: + self.fs.symlink(path, pth) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def delink(self, bname): + pth = os.path.join(self.path, bname) + try: + self.fs.unlink(pth) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +def create_trashcan(fs, vol_spec): + """ + create a trash can. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :return: None + """ + trashcan = Trash(fs, vol_spec) + try: + fs.mkdirs(trashcan.path, 0o700) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + +@contextmanager +def open_trashcan(fs, vol_spec): + """ + open a trash can. This API is to be used as a context manager. + + :param fs: ceph filesystem handle + :param vol_spec: volume specification + :return: yields a trash can object (subclass of GroupTemplate) + """ + trashcan = Trash(fs, vol_spec) + try: + fs.stat(trashcan.path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + yield trashcan diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py new file mode 100644 index 00000000..3dcdd7c1 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py @@ -0,0 +1,109 @@ +import errno +import logging +import importlib + +import cephfs + +from .subvolume_base import SubvolumeBase +from .subvolume_attrs import SubvolumeTypes +from .subvolume_v1 import SubvolumeV1 +from .subvolume_v2 import SubvolumeV2 +from .metadata_manager import MetadataManager +from .op_sm import SubvolumeOpSm +from ..template import SubvolumeOpType +from ...exception import MetadataMgrException, OpSmException, VolumeException + +log = logging.getLogger(__name__) + +class SubvolumeLoader(object): + INVALID_VERSION = -1 + + SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1', 'subvolume_v2.SubvolumeV2'] + + def __init__(self): + self.max_version = SubvolumeLoader.INVALID_VERSION + self.versions = {} + + def _load_module(self, mod_cls): + mod_name, cls_name = mod_cls.split('.') + mod = importlib.import_module('.versions.{0}'.format(mod_name), package='volumes.fs.operations') + return getattr(mod, cls_name) + + def _load_supported_versions(self): + for mod_cls in SubvolumeLoader.SUPPORTED_MODULES: + cls = self._load_module(mod_cls) + log.info("loaded v{0} subvolume".format(cls.version())) + if self.max_version is not None or cls.version() > self.max_version: + self.max_version = cls.version() + self.versions[cls.version()] = cls + if self.max_version == SubvolumeLoader.INVALID_VERSION: + raise VolumeException(-errno.EINVAL, "no subvolume version available") + log.info("max subvolume version is v{0}".format(self.max_version)) + + def _get_subvolume_version(self, version): + try: + return self.versions[version] + except KeyError: + raise VolumeException(-errno.EINVAL, "subvolume class v{0} does not exist".format(version)) + + def get_subvolume_object_max(self, mgr, fs, vol_spec, group, subvolname): + return self._get_subvolume_version(self.max_version)(mgr, fs, vol_spec, group, subvolname) + + def upgrade_to_v2_subvolume(self, subvolume): + # legacy mode subvolumes cannot be upgraded to v2 + if subvolume.legacy_mode: + return + + version = int(subvolume.metadata_mgr.get_global_option('version')) + if version >= SubvolumeV2.version(): + return + + v1_subvolume = self._get_subvolume_version(version)(subvolume.mgr, subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname) + try: + v1_subvolume.open(SubvolumeOpType.SNAP_LIST) + except VolumeException as ve: + # if volume is not ready for snapshot listing, do not upgrade at present + if ve.errno == -errno.EAGAIN: + return + raise + + # v1 subvolumes with snapshots cannot be upgraded to v2 + if v1_subvolume.list_snapshots(): + return + + subvolume.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_VERSION, SubvolumeV2.version()) + subvolume.metadata_mgr.flush() + + def upgrade_legacy_subvolume(self, fs, subvolume): + assert subvolume.legacy_mode + try: + fs.mkdirs(subvolume.legacy_dir, 0o700) + except cephfs.Error as e: + raise VolumeException(-e.args[0], "error accessing subvolume") + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + qpath = subvolume.base_path.decode('utf-8') + # legacy is only upgradable to v1 + subvolume.init_config(SubvolumeV1.version(), subvolume_type, qpath, initial_state) + + def get_subvolume_object(self, mgr, fs, vol_spec, group, subvolname, upgrade=True): + subvolume = SubvolumeBase(mgr, fs, vol_spec, group, subvolname) + try: + subvolume.discover() + self.upgrade_to_v2_subvolume(subvolume) + version = int(subvolume.metadata_mgr.get_global_option('version')) + return self._get_subvolume_version(version)(mgr, fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT and upgrade: + self.upgrade_legacy_subvolume(fs, subvolume) + return self.get_subvolume_object(mgr, fs, vol_spec, group, subvolname, upgrade=False) + else: + # log the actual error and generalize error string returned to user + log.error("error accessing subvolume metadata for '{0}' ({1})".format(subvolname, me)) + raise VolumeException(-errno.EINVAL, "error accessing subvolume metadata") + +loaded_subvolumes = SubvolumeLoader() +loaded_subvolumes._load_supported_versions() diff --git a/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py new file mode 100644 index 00000000..259dcd0e --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py @@ -0,0 +1,208 @@ +from contextlib import contextmanager +import os +import fcntl +import json +import logging +import struct +import uuid + +import cephfs + +from ..group import Group + +log = logging.getLogger(__name__) + +class AuthMetadataError(Exception): + pass + +class AuthMetadataManager(object): + + # Current version + version = 6 + + # Filename extensions for meta files. + META_FILE_EXT = ".meta" + DEFAULT_VOL_PREFIX = "/volumes" + + def __init__(self, fs): + self.fs = fs + self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] + self.volume_prefix = self.DEFAULT_VOL_PREFIX + + def _to_bytes(self, param): + ''' + Helper method that returns byte representation of the given parameter. + ''' + if isinstance(param, str): + return param.encode('utf-8') + elif param is None: + return param + else: + return str(param).encode('utf-8') + + def _subvolume_metadata_path(self, group_name, subvol_name): + return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( + group_name if group_name != Group.NO_GROUP_NAME else "", + subvol_name, + self.META_FILE_EXT)) + + def _check_compat_version(self, compat_version): + if self.version < compat_version: + msg = ("The current version of AuthMetadataManager, version {0} " + "does not support the required feature. Need version {1} " + "or greater".format(self.version, compat_version) + ) + log.error(msg) + raise AuthMetadataError(msg) + + def _metadata_get(self, path): + """ + Return a deserialized JSON object, or None + """ + fd = self.fs.open(path, "r") + # TODO iterate instead of assuming file < 4MB + read_bytes = self.fs.read(fd, 0, 4096 * 1024) + self.fs.close(fd) + if read_bytes: + return json.loads(read_bytes.decode()) + else: + return None + + def _metadata_set(self, path, data): + serialized = json.dumps(data) + fd = self.fs.open(path, "w") + try: + self.fs.write(fd, self._to_bytes(serialized), 0) + self.fs.fsync(fd, 0) + finally: + self.fs.close(fd) + + def _lock(self, path): + @contextmanager + def fn(): + while(1): + fd = self.fs.open(path, os.O_CREAT, 0o755) + self.fs.flock(fd, fcntl.LOCK_EX, self._id) + + # The locked file will be cleaned up sometime. It could be + # unlinked by consumer e.g., an another manila-share service + # instance, before lock was applied on it. Perform checks to + # ensure that this does not happen. + try: + statbuf = self.fs.stat(path) + except cephfs.ObjectNotFound: + self.fs.close(fd) + continue + + fstatbuf = self.fs.fstat(fd) + if statbuf.st_ino == fstatbuf.st_ino: + break + + try: + yield + finally: + self.fs.flock(fd, fcntl.LOCK_UN, self._id) + self.fs.close(fd) + + return fn() + + def _auth_metadata_path(self, auth_id): + return os.path.join(self.volume_prefix, "${0}{1}".format( + auth_id, self.META_FILE_EXT)) + + def auth_lock(self, auth_id): + return self._lock(self._auth_metadata_path(auth_id)) + + def auth_metadata_get(self, auth_id): + """ + Call me with the metadata locked! + + Check whether a auth metadata structure can be decoded by the current + version of AuthMetadataManager. + + Return auth metadata that the current version of AuthMetadataManager + can decode. + """ + auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) + + if auth_metadata: + self._check_compat_version(auth_metadata['compat_version']) + + return auth_metadata + + def auth_metadata_set(self, auth_id, data): + """ + Call me with the metadata locked! + + Fsync the auth metadata. + + Add two version attributes to the auth metadata, + 'compat_version', the minimum AuthMetadataManager version that can + decode the metadata, and 'version', the AuthMetadataManager version + that encoded the metadata. + """ + data['compat_version'] = 6 + data['version'] = self.version + return self._metadata_set(self._auth_metadata_path(auth_id), data) + + def create_subvolume_metadata_file(self, group_name, subvol_name): + """ + Create a subvolume metadata file, if it does not already exist, to store + data about auth ids having access to the subvolume + """ + fd = self.fs.open(self._subvolume_metadata_path(group_name, subvol_name), + os.O_CREAT, 0o755) + self.fs.close(fd) + + def delete_subvolume_metadata_file(self, group_name, subvol_name): + vol_meta_path = self._subvolume_metadata_path(group_name, subvol_name) + try: + self.fs.unlink(vol_meta_path) + except cephfs.ObjectNotFound: + pass + + def subvol_metadata_lock(self, group_name, subvol_name): + """ + Return a ContextManager which locks the authorization metadata for + a particular subvolume, and persists a flag to the metadata indicating + that it is currently locked, so that we can detect dirty situations + during recovery. + + This lock isn't just to make access to the metadata safe: it's also + designed to be used over the two-step process of checking the + metadata and then responding to an authorization request, to + ensure that at the point we respond the metadata hasn't changed + in the background. It's key to how we avoid security holes + resulting from races during that problem , + """ + return self._lock(self._subvolume_metadata_path(group_name, subvol_name)) + + def subvol_metadata_get(self, group_name, subvol_name): + """ + Call me with the metadata locked! + + Check whether a subvolume metadata structure can be decoded by the current + version of AuthMetadataManager. + + Return a subvolume_metadata structure that the current version of + AuthMetadataManager can decode. + """ + subvolume_metadata = self._metadata_get(self._subvolume_metadata_path(group_name, subvol_name)) + + if subvolume_metadata: + self._check_compat_version(subvolume_metadata['compat_version']) + + return subvolume_metadata + + def subvol_metadata_set(self, group_name, subvol_name, data): + """ + Call me with the metadata locked! + + Add two version attributes to the subvolume metadata, + 'compat_version', the minimum AuthMetadataManager version that can + decode the metadata and 'version', the AuthMetadataManager version + that encoded the metadata. + """ + data['compat_version'] = 1 + data['version'] = self.version + return self._metadata_set(self._subvolume_metadata_path(group_name, subvol_name), data) diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py new file mode 100644 index 00000000..1b6c4327 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py @@ -0,0 +1,144 @@ +import os +import errno +import logging +import sys + +if sys.version_info >= (3, 2): + import configparser +else: + import ConfigParser as configparser + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +import cephfs + +from ...exception import MetadataMgrException + +log = logging.getLogger(__name__) + +class MetadataManager(object): + GLOBAL_SECTION = "GLOBAL" + GLOBAL_META_KEY_VERSION = "version" + GLOBAL_META_KEY_TYPE = "type" + GLOBAL_META_KEY_PATH = "path" + GLOBAL_META_KEY_STATE = "state" + + MAX_IO_BYTES = 8 * 1024 + + def __init__(self, fs, config_path, mode): + self.fs = fs + self.mode = mode + self.config_path = config_path + if sys.version_info >= (3, 2): + self.config = configparser.ConfigParser() + else: + self.config = configparser.SafeConfigParser() + + def refresh(self): + fd = None + conf_data = StringIO() + try: + log.debug("opening config {0}".format(self.config_path)) + fd = self.fs.open(self.config_path, os.O_RDONLY) + while True: + data = self.fs.read(fd, -1, MetadataManager.MAX_IO_BYTES) + if not len(data): + break + conf_data.write(data.decode('utf-8')) + conf_data.seek(0) + self.config.readfp(conf_data) + except cephfs.ObjectNotFound: + raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path)) + except cephfs.Error as e: + raise MetadataMgrException(-e.args[0], e.args[1]) + finally: + if fd is not None: + self.fs.close(fd) + + def flush(self): + # cull empty sections + for section in list(self.config.sections()): + if len(self.config.items(section)) == 0: + self.config.remove_section(section) + + conf_data = StringIO() + self.config.write(conf_data) + conf_data.seek(0) + + fd = None + try: + fd = self.fs.open(self.config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode) + wrote = 0 + while True: + data = conf_data.read() + if not len(data): + break + wrote += self.fs.write(fd, data.encode('utf-8'), -1) + self.fs.fsync(fd, 0) + log.info("wrote {0} bytes to config {1}".format(wrote, self.config_path)) + except cephfs.Error as e: + raise MetadataMgrException(-e.args[0], e.args[1]) + finally: + if fd is not None: + self.fs.close(fd) + + def init(self, version, typ, path, state): + # you may init just once before refresh (helps to overwrite conf) + if self.config.has_section(MetadataManager.GLOBAL_SECTION): + raise MetadataMgrException(-errno.EINVAL, "init called on an existing config") + + self.add_section(MetadataManager.GLOBAL_SECTION) + self.update_section_multi( + MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version), + MetadataManager.GLOBAL_META_KEY_TYPE : str(typ), + MetadataManager.GLOBAL_META_KEY_PATH : str(path), + MetadataManager.GLOBAL_META_KEY_STATE : str(state) + }) + + def add_section(self, section): + try: + self.config.add_section(section) + except configparser.DuplicateSectionError: + return + except: + raise MetadataMgrException(-errno.EINVAL, "error adding section to config") + + def remove_option(self, section, key): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + self.config.remove_option(section, key) + + def remove_section(self, section): + self.config.remove_section(section) + + def update_section(self, section, key, value): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + self.config.set(section, key, str(value)) + + def update_section_multi(self, section, dct): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + for key,value in dct.items(): + self.config.set(section, key, str(value)) + + def update_global_section(self, key, value): + self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value)) + + def get_option(self, section, key): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + if not self.config.has_option(section, key): + raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section)) + return self.config.get(section, key) + + def get_global_option(self, key): + return self.get_option(MetadataManager.GLOBAL_SECTION, key) + + def section_has_item(self, section, item): + if not self.config.has_section(section): + raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) + return item in [v[1] for v in self.config.items(section)] diff --git a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py new file mode 100644 index 00000000..c2b5f582 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py @@ -0,0 +1,114 @@ +import errno + +from enum import Enum, unique + +from ...exception import OpSmException +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions + +class TransitionKey(object): + def __init__(self, subvol_type, state, action_type): + self.transition_key = [subvol_type, state, action_type] + + def __hash__(self): + return hash(tuple(self.transition_key)) + + def __eq__(self, other): + return self.transition_key == other.transition_key + + def __neq__(self, other): + return not(self == other) + +class SubvolumeOpSm(object): + transition_table = {} + + @staticmethod + def is_complete_state(state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeStates.STATE_COMPLETE + + @staticmethod + def is_failed_state(state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED + + @staticmethod + def is_init_state(stm_type, state): + if not isinstance(state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state)) + return state == SubvolumeOpSm.get_init_state(stm_type) + + @staticmethod + def get_init_state(stm_type): + if not isinstance(stm_type, SubvolumeTypes): + raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type)) + init_state = SubvolumeOpSm.transition_table[TransitionKey(stm_type, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE)] + if not init_state: + raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type)) + return init_state + + @staticmethod + def transition(stm_type, current_state, action): + if not isinstance(stm_type, SubvolumeTypes): + raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type)) + if not isinstance(current_state, SubvolumeStates): + raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state)) + if not isinstance(action, SubvolumeActions): + raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action)) + + transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)] + if not transition: + raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type)) + + return transition + +SubvolumeOpSm.transition_table = { + # state transitions for state machine type TYPE_NORMAL + TransitionKey(SubvolumeTypes.TYPE_NORMAL, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE, + + TransitionKey(SubvolumeTypes.TYPE_NORMAL, + SubvolumeStates.STATE_COMPLETE, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, + + # state transitions for state machine type TYPE_CLONE + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INIT, + SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_PENDING, + SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_INPROGRESS, + SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_COMPLETE, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_CANCELED, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, + + TransitionKey(SubvolumeTypes.TYPE_CLONE, + SubvolumeStates.STATE_FAILED, + SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED, +} diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py new file mode 100644 index 00000000..ec7138cb --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py @@ -0,0 +1,61 @@ +import errno +from enum import Enum, unique + +from ...exception import VolumeException + +@unique +class SubvolumeTypes(Enum): + TYPE_NORMAL = "subvolume" + TYPE_CLONE = "clone" + + @staticmethod + def from_value(value): + if value == "subvolume": + return SubvolumeTypes.TYPE_NORMAL + if value == "clone": + return SubvolumeTypes.TYPE_CLONE + + raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value)) + +@unique +class SubvolumeStates(Enum): + STATE_INIT = 'init' + STATE_PENDING = 'pending' + STATE_INPROGRESS = 'in-progress' + STATE_FAILED = 'failed' + STATE_COMPLETE = 'complete' + STATE_CANCELED = 'canceled' + STATE_RETAINED = 'snapshot-retained' + + @staticmethod + def from_value(value): + if value == "init": + return SubvolumeStates.STATE_INIT + if value == "pending": + return SubvolumeStates.STATE_PENDING + if value == "in-progress": + return SubvolumeStates.STATE_INPROGRESS + if value == "failed": + return SubvolumeStates.STATE_FAILED + if value == "complete": + return SubvolumeStates.STATE_COMPLETE + if value == "canceled": + return SubvolumeStates.STATE_CANCELED + if value == "snapshot-retained": + return SubvolumeStates.STATE_RETAINED + + raise VolumeException(-errno.EINVAL, "invalid state '{0}'".format(value)) + +@unique +class SubvolumeActions(Enum): + ACTION_NONE = 0 + ACTION_SUCCESS = 1 + ACTION_FAILED = 2 + ACTION_CANCELLED = 3 + ACTION_RETAINED = 4 + +@unique +class SubvolumeFeatures(Enum): + FEATURE_SNAPSHOT_CLONE = "snapshot-clone" + FEATURE_SNAPSHOT_RETENTION = "snapshot-retention" + FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect" diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py new file mode 100644 index 00000000..f193dabd --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py @@ -0,0 +1,331 @@ +import os +import stat +import uuid +import errno +import logging +from hashlib import md5 + +import cephfs + +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates +from .metadata_manager import MetadataManager +from ..trash import create_trashcan, open_trashcan +from ...fs_util import get_ancestor_xattr +from ...exception import MetadataMgrException, VolumeException +from .op_sm import SubvolumeOpSm +from .auth_metadata import AuthMetadataManager + +log = logging.getLogger(__name__) + +class SubvolumeBase(object): + LEGACY_CONF_DIR = "_legacy" + + def __init__(self, mgr, fs, vol_spec, group, subvolname, legacy=False): + self.mgr = mgr + self.fs = fs + self.auth_mdata_mgr = AuthMetadataManager(fs) + self.cmode = None + self.user_id = None + self.group_id = None + self.vol_spec = vol_spec + self.group = group + self.subvolname = subvolname + self.legacy_mode = legacy + self.load_config() + + @property + def uid(self): + return self.user_id + + @uid.setter + def uid(self, val): + self.user_id = val + + @property + def gid(self): + return self.group_id + + @gid.setter + def gid(self, val): + self.group_id = val + + @property + def mode(self): + return self.cmode + + @mode.setter + def mode(self, val): + self.cmode = val + + @property + def base_path(self): + return os.path.join(self.group.path, self.subvolname.encode('utf-8')) + + @property + def config_path(self): + return os.path.join(self.base_path, b".meta") + + @property + def legacy_dir(self): + return os.path.join(self.vol_spec.base_dir.encode('utf-8'), SubvolumeBase.LEGACY_CONF_DIR.encode('utf-8')) + + @property + def legacy_config_path(self): + m = md5() + m.update(self.base_path) + meta_config = "{0}.meta".format(m.hexdigest()) + return os.path.join(self.legacy_dir, meta_config.encode('utf-8')) + + @property + def namespace(self): + return "{0}{1}".format(self.vol_spec.fs_namespace, self.subvolname) + + @property + def group_name(self): + return self.group.group_name + + @property + def subvol_name(self): + return self.subvolname + + @property + def legacy_mode(self): + return self.legacy + + @legacy_mode.setter + def legacy_mode(self, mode): + self.legacy = mode + + @property + def path(self): + """ Path to subvolume data directory """ + raise NotImplementedError + + @property + def features(self): + """ List of features supported by the subvolume, containing items from SubvolumeFeatures """ + raise NotImplementedError + + @property + def state(self): + """ Subvolume state, one of SubvolumeStates """ + raise NotImplementedError + + @property + def subvol_type(self): + return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE)) + + @property + def purgeable(self): + """ Boolean declaring if subvolume can be purged """ + raise NotImplementedError + + def load_config(self): + if self.legacy_mode: + self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640) + else: + self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640) + + def get_attrs(self, pathname): + # get subvolume attributes + attrs = {} + stx = self.fs.statx(pathname, + cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE, + cephfs.AT_SYMLINK_NOFOLLOW) + + attrs["uid"] = int(stx["uid"]) + attrs["gid"] = int(stx["gid"]) + attrs["mode"] = int(int(stx["mode"]) & ~stat.S_IFMT(stx["mode"])) + + try: + attrs["data_pool"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.NoData: + attrs["data_pool"] = None + + try: + attrs["pool_namespace"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + attrs["pool_namespace"] = None + + try: + attrs["quota"] = int(self.fs.getxattr(pathname, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + attrs["quota"] = None + + return attrs + + def set_attrs(self, path, attrs): + # set subvolume attributes + # set size + quota = attrs.get("quota") + if quota is not None: + try: + self.fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # set pool layout + data_pool = attrs.get("data_pool") + if data_pool is not None: + try: + self.fs.setxattr(path, 'ceph.dir.layout.pool', data_pool.encode('utf-8'), 0) + except cephfs.InvalidValue: + raise VolumeException(-errno.EINVAL, + "invalid pool layout '{0}' -- need a valid data pool".format(data_pool)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # isolate namespace + xattr_key = xattr_val = None + pool_namespace = attrs.get("pool_namespace") + if pool_namespace is not None: + # enforce security isolation, use separate namespace for this subvolume + xattr_key = 'ceph.dir.layout.pool_namespace' + xattr_val = pool_namespace + elif not data_pool: + # If subvolume's namespace layout is not set, then the subvolume's pool + # layout remains unset and will undesirably change with ancestor's + # pool layout changes. + xattr_key = 'ceph.dir.layout.pool' + xattr_val = None + try: + self.fs.getxattr(path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.NoData as e: + xattr_val = get_ancestor_xattr(self.fs, os.path.split(path)[0], "ceph.dir.layout.pool") + if xattr_key and xattr_val: + try: + self.fs.setxattr(path, xattr_key, xattr_val.encode('utf-8'), 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + # set uid/gid + uid = attrs.get("uid") + if uid is None: + uid = self.group.uid + else: + try: + uid = int(uid) + if uid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid UID") + + gid = attrs.get("gid") + if gid is None: + gid = self.group.gid + else: + try: + gid = int(gid) + if gid < 0: + raise ValueError + except ValueError: + raise VolumeException(-errno.EINVAL, "invalid GID") + + if uid is not None and gid is not None: + self.fs.chown(path, uid, gid) + + def _resize(self, path, newsize, noshrink): + try: + newsize = int(newsize) + if newsize <= 0: + raise VolumeException(-errno.EINVAL, "Invalid subvolume size") + except ValueError: + newsize = newsize.lower() + if not (newsize == "inf" or newsize == "infinite"): + raise VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize)) + newsize = 0 + noshrink = False + + try: + maxbytes = int(self.fs.getxattr(path, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + maxbytes = 0 + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + subvolstat = self.fs.stat(path) + if newsize > 0 and newsize < subvolstat.st_size: + if noshrink: + raise VolumeException(-errno.EINVAL, "Can't resize the subvolume. The new size '{0}' would be lesser than the current " + "used size '{1}'".format(newsize, subvolstat.st_size)) + + if not newsize == maxbytes: + try: + self.fs.setxattr(path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], "Cannot set new size for the subvolume. '{0}'".format(e.args[1])) + return newsize, subvolstat.st_size + + def init_config(self, version, subvolume_type, subvolume_path, subvolume_state): + self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value) + self.metadata_mgr.flush() + + def discover(self): + log.debug("discovering subvolume '{0}' [mode: {1}]".format(self.subvolname, "legacy" if self.legacy_mode else "new")) + try: + self.fs.stat(self.base_path) + self.metadata_mgr.refresh() + log.debug("loaded subvolume '{0}'".format(self.subvolname)) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT and not self.legacy_mode: + self.legacy_mode = True + self.load_config() + self.discover() + else: + raise + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname)) + + def _trash_dir(self, path): + create_trashcan(self.fs, self.vol_spec) + with open_trashcan(self.fs, self.vol_spec) as trashcan: + trashcan.dump(path) + log.info("subvolume path '{0}' moved to trashcan".format(path)) + + def _link_dir(self, path, bname): + create_trashcan(self.fs, self.vol_spec) + with open_trashcan(self.fs, self.vol_spec) as trashcan: + trashcan.link(path, bname) + log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname)) + + def trash_base_dir(self): + if self.legacy_mode: + self.fs.unlink(self.legacy_config_path) + self._trash_dir(self.base_path) + + def create_base_dir(self, mode): + try: + self.fs.mkdirs(self.base_path, mode) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def info (self): + subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH) + etype = self.subvol_type + st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE | + cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | + cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME | + cephfs.CEPH_STATX_MTIME | cephfs.CEPH_STATX_CTIME, + cephfs.AT_SYMLINK_NOFOLLOW) + usedbytes = st["size"] + try: + nsize = int(self.fs.getxattr(subvolpath, 'ceph.quota.max_bytes').decode('utf-8')) + except cephfs.NoData: + nsize = 0 + + try: + data_pool = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool').decode('utf-8') + pool_namespace = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]), + 'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]), + 'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]), + 'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes), + 'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0), + 'pool_namespace': pool_namespace, 'features': self.features, 'state': self.state.value} diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py new file mode 100644 index 00000000..b4cca736 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py @@ -0,0 +1,780 @@ +import os +import sys +import stat +import uuid +import errno +import logging +import json +from datetime import datetime +try: + from typing import List, Dict +except ImportError: + pass # For typing only + +import cephfs + +from .metadata_manager import MetadataManager +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures +from .op_sm import SubvolumeOpSm +from .subvolume_base import SubvolumeBase +from ..template import SubvolumeTemplate +from ..snapshot_util import mksnap, rmsnap +from ..access import allow_access, deny_access +from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError +from ...fs_util import listsnaps, is_inherited_snap +from ..template import SubvolumeOpType +from ..group import Group +from ..rankevicter import RankEvicter +from ..volume import get_mds_map + +from ..clone_index import open_clone_index, create_clone_index + +log = logging.getLogger(__name__) + +class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): + """ + Version 1 subvolumes creates a subvolume with path as follows, + volumes/<group-name>/<subvolume-name>/<uuid>/ + + - The directory under which user data resides is <uuid> + - Snapshots of the subvolume are taken within the <uuid> directory + - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing, + - global information about the subvolume (version, path, type, state) + - snapshots attached to an ongoing clone operation + - clone snapshot source if subvolume is a clone of a snapshot + - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under + /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid> + component in the path. + """ + VERSION = 1 + + @staticmethod + def version(): + return SubvolumeV1.VERSION + + @property + def path(self): + try: + # no need to stat the path -- open() does that + return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8') + except MetadataMgrException as me: + raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") + + @property + def features(self): + return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value] + + def mark_subvolume(self): + # set subvolume attr, on subvolume root, marking it as a CephFS subvolume + # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes + try: + # MDS treats this as a noop for already marked subvolume + self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def snapshot_base_path(self): + """ Base path for all snapshots """ + return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + + def snapshot_path(self, snapname): + """ Path to a specific snapshot named 'snapname' """ + return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8')) + + def snapshot_data_path(self, snapname): + """ Path to user data directory within a subvolume snapshot named 'snapname' """ + return self.snapshot_path(snapname) + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # create directory and set attributes + self.fs.mkdirs(subvol_path, mode) + self.mark_subvolume() + attrs = { + 'uid': uid, + 'gid': gid, + 'data_pool': pool, + 'pool_namespace': self.namespace if isolate_nspace else None, + 'quota': size + } + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata + qpath = subvol_path.decode('utf-8') + self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state) + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, "exception in subvolume metadata") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def add_clone_source(self, volname, subvolume, snapname, flush=False): + self.metadata_mgr.add_section("source") + self.metadata_mgr.update_section("source", "volume", volname) + if not subvolume.group.is_default_group(): + self.metadata_mgr.update_section("source", "group", subvolume.group_name) + self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name) + self.metadata_mgr.update_section("source", "snapshot", snapname) + if flush: + self.metadata_mgr.flush() + + def remove_clone_source(self, flush=False): + self.metadata_mgr.remove_section("source") + if flush: + self.metadata_mgr.flush() + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + subvolume_type = SubvolumeTypes.TYPE_CLONE + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "clone failed: internal error") + + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # source snapshot attrs are used to create clone subvolume. + # attributes of subvolume's content though, are synced during the cloning process. + attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname)) + + # override snapshot pool setting, if one is provided for the clone + if pool is not None: + attrs["data_pool"] = pool + attrs["pool_namespace"] = None + + # create directory and set attributes + self.fs.mkdirs(subvol_path, attrs.get("mode")) + self.mark_subvolume() + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata and clone source + qpath = subvol_path.decode('utf-8') + self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value) + self.add_clone_source(source_volname, source_subvolume, snapname) + self.metadata_mgr.flush() + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, "exception in subvolume metadata") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def allowed_ops_by_type(self, vol_type): + if vol_type == SubvolumeTypes.TYPE_CLONE: + return {op_type for op_type in SubvolumeOpType} + + if vol_type == SubvolumeTypes.TYPE_NORMAL: + return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + return {} + + def allowed_ops_by_state(self, vol_state): + if vol_state == SubvolumeStates.STATE_COMPLETE: + return {op_type for op_type in SubvolumeOpType} + + return {SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.CLONE_CREATE, + SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + def open(self, op_type): + if not isinstance(op_type, SubvolumeOpType): + raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( + op_type.value, self.subvolname)) + try: + self.metadata_mgr.refresh() + + etype = self.subvol_type + if op_type not in self.allowed_ops_by_type(etype): + raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( + op_type.value, self.subvolname, etype.value)) + + estate = self.state + if op_type not in self.allowed_ops_by_state(estate): + raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( + self.subvolname, op_type.value)) + + subvol_path = self.path + log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) + st = self.fs.stat(subvol_path) + # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark + self.mark_subvolume() + + self.uid = int(st.st_uid) + self.gid = int(st.st_gid) + self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(me.args[0], me.args[1]) + except cephfs.ObjectNotFound: + log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname)) + raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def _recover_auth_meta(self, auth_id, auth_meta): + """ + Call me after locking the auth meta file. + """ + remove_subvolumes = [] + + for subvol, subvol_data in auth_meta['subvolumes'].items(): + if not subvol_data['dirty']: + continue + + (group_name, subvol_name) = subvol.split('/') + group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME + access_level = subvol_data['access_level'] + + with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name) + + # No SVMeta update indicates that there was no auth update + # in Ceph either. So it's safe to remove corresponding + # partial update in AMeta. + if not subvol_meta or auth_id not in subvol_meta['auths']: + remove_subvolumes.append(subvol) + continue + + want_auth = { + 'access_level': access_level, + 'dirty': False, + } + # SVMeta update looks clean. Ceph auth update must have been + # clean. Update the dirty flag and continue + if subvol_meta['auths'][auth_id] == want_auth: + auth_meta['subvolumes'][subvol]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + continue + + client_entity = "client.{0}".format(auth_id) + ret, out, err = self.mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + if ret == 0: + existing_caps = json.loads(out) + elif ret == -errno.ENOENT: + existing_caps = None + else: + log.error(err) + raise VolumeException(ret, err) + + self._authorize_subvolume(auth_id, access_level, existing_caps) + + # Recovered from partial auth updates for the auth ID's access + # to a subvolume. + auth_meta['subvolumes'][subvol]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + for subvol in remove_subvolumes: + del auth_meta['subvolumes'][subvol] + + if not auth_meta['subvolumes']: + # Clean up auth meta file + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + # Recovered from all partial auth updates for the auth ID. + auth_meta['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False): + """ + Get-or-create a Ceph auth identity for `auth_id` and grant them access + to + :param auth_id: + :param access_level: + :param tenant_id: Optionally provide a stringizable object to + restrict any created cephx IDs to other callers + passing the same tenant ID. + :allow_existing_id: Optionally authorize existing auth-ids not + created by ceph_volume_client. + :return: + """ + + with self.auth_mdata_mgr.auth_lock(auth_id): + client_entity = "client.{0}".format(auth_id) + ret, out, err = self.mgr.mon_command( + { + 'prefix': 'auth get', + 'entity': client_entity, + 'format': 'json' + }) + + if ret == 0: + existing_caps = json.loads(out) + elif ret == -errno.ENOENT: + existing_caps = None + else: + log.error(err) + raise VolumeException(ret, err) + + # Existing meta, or None, to be updated + auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) + + # subvolume data to be inserted + group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None + group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) + subvolume = { + group_subvol_id : { + # The access level at which the auth_id is authorized to + # access the volume. + 'access_level': access_level, + 'dirty': True, + } + } + + if auth_meta is None: + if not allow_existing_id and existing_caps is not None: + msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id) + log.error(msg) + raise VolumeException(-errno.EPERM, msg) + + # non-existent auth IDs + sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( + auth_id, tenant_id + )) + log.debug("Authorize: no existing meta") + auth_meta = { + 'dirty': True, + 'tenant_id': str(tenant_id) if tenant_id else None, + 'subvolumes': subvolume + } + else: + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + # Disallow tenants to share auth IDs + if str(auth_meta['tenant_id']) != str(tenant_id): + msg = "auth ID: {0} is already in use".format(auth_id) + log.error(msg) + raise VolumeException(-errno.EPERM, msg) + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Authorize: existing tenant {tenant}".format( + tenant=auth_meta['tenant_id'] + )) + auth_meta['dirty'] = True + auth_meta['subvolumes'].update(subvolume) + + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + key = self._authorize_subvolume(auth_id, access_level, existing_caps) + + auth_meta['dirty'] = False + auth_meta['subvolumes'][group_subvol_id]['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + if tenant_id: + return key + else: + # Caller wasn't multi-tenant aware: be safe and don't give + # them a key + return "" + + def _authorize_subvolume(self, auth_id, access_level, existing_caps): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + + auth = { + auth_id: { + 'access_level': access_level, + 'dirty': True, + } + } + + if subvol_meta is None: + subvol_meta = { + 'auths': auth + } + else: + subvol_meta['auths'].update(auth) + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + key = self._authorize(auth_id, access_level, existing_caps) + + subvol_meta['auths'][auth_id]['dirty'] = False + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + return key + + def _authorize(self, auth_id, access_level, existing_caps): + subvol_path = self.path + log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path)) + + # First I need to work out what the data pool is for this share: + # read the layout + try: + pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + try: + namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + namespace = None + + # Now construct auth capabilities that give the guest just enough + # permissions to access the share + client_entity = "client.{0}".format(auth_id) + want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8')) + want_osd_cap = "allow {0} pool={1}{2}".format( + access_level, pool, " namespace={0}".format(namespace) if namespace else "") + + # Construct auth caps that if present might conflict with the desired + # auth caps. + unwanted_access_level = 'r' if access_level is 'rw' else 'rw' + unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8')) + unwanted_osd_cap = "allow {0} pool={1}{2}".format( + unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "") + + return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap, existing_caps) + + def deauthorize(self, auth_id): + with self.auth_mdata_mgr.auth_lock(auth_id): + # Existing meta, or None, to be updated + auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) + + if auth_meta is None: + msg = "auth ID: {0} doesn't exist".format(auth_id) + log.error(msg) + raise VolumeException(-errno.ENOENT, msg) + + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None + group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) + if (auth_meta is None) or (not auth_meta['subvolumes']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + # Clean up the auth meta file of an auth ID + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + if group_subvol_id not in auth_meta['subvolumes']: + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + return + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + auth_meta['dirty'] = True + auth_meta['subvolumes'][group_subvol_id]['dirty'] = True + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + self._deauthorize_subvolume(auth_id) + + # Filter out the volume we're deauthorizing + del auth_meta['subvolumes'][group_subvol_id] + + # Clean up auth meta file + if not auth_meta['subvolumes']: + self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) + return + + auth_meta['dirty'] = False + self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) + + def _deauthorize_subvolume(self, auth_id): + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + + if (subvol_meta is None) or (auth_id not in subvol_meta['auths']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for subvolume '{subvolume}'".format( + auth_id=auth_id, subvolume=self.subvolname + )) + return + + subvol_meta['auths'][auth_id]['dirty'] = True + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + self._deauthorize(auth_id) + + # Remove the auth_id from the metadata *after* removing it + # from ceph, so that if we crashed here, we would actually + # recreate the auth ID during recovery (i.e. end up with + # a consistent state). + + # Filter out the auth we're removing + del subvol_meta['auths'][auth_id] + self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) + + def _deauthorize(self, auth_id): + """ + The volume must still exist. + """ + client_entity = "client.{0}".format(auth_id) + subvol_path = self.path + try: + pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + try: + namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') + except cephfs.NoData: + namespace = None + + # The auth_id might have read-only or read-write mount access for the + # subvolume path. + access_levels = ('r', 'rw') + want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8')) + for access_level in access_levels] + want_osd_caps = ['allow {0} pool={1}{2}'.format( + access_level, pool_name, " namespace={0}".format(namespace) if namespace else "") + for access_level in access_levels] + deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps) + + def authorized_list(self): + """ + Expose a list of auth IDs that have access to a subvolume. + + return: a list of (auth_id, access_level) tuples, where + the access_level can be 'r' , or 'rw'. + None if no auth ID is given access to the subvolume. + """ + with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): + meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) + auths = [] # type: List[Dict[str,str]] + if not meta or not meta['auths']: + return auths + + for auth, auth_data in meta['auths'].items(): + # Skip partial auth updates. + if not auth_data['dirty']: + auths.append({auth: auth_data['access_level']}) + + return auths + + def evict(self, volname, auth_id, timeout=30): + """ + Evict all clients based on the authorization ID and the subvolume path mounted. + Assumes that the authorization key has been revoked prior to calling this function. + + This operation can throw an exception if the mon cluster is unresponsive, or + any individual MDS daemon is unresponsive for longer than the timeout passed in. + """ + + client_spec = ["auth_name={0}".format(auth_id), ] + client_spec.append("client_metadata.root={0}". + format(self.path.decode('utf-8'))) + + log.info("evict clients with {0}".format(', '.join(client_spec))) + + mds_map = get_mds_map(self.mgr, volname) + if not mds_map: + raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname)) + + up = {} + for name, gid in mds_map['up'].items(): + # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" + assert name.startswith("mds_") + up[int(name[4:])] = gid + + # For all MDS ranks held by a daemon + # Do the parallelism in python instead of using "tell mds.*", because + # the latter doesn't give us per-mds output + threads = [] + for rank, gid in up.items(): + thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + log.info("evict: joined all") + + for t in threads: + if not t.success: + msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". + format(', '.join(client_spec), t.rank, t.gid, t.exception) + ) + log.error(msg) + raise EvictionError(msg) + + def _get_clone_source(self): + try: + clone_source = { + 'volume' : self.metadata_mgr.get_option("source", "volume"), + 'subvolume': self.metadata_mgr.get_option("source", "subvolume"), + 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"), + } + + try: + clone_source["group"] = self.metadata_mgr.get_option("source", "group") + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + pass + else: + raise + except MetadataMgrException as me: + raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") + return clone_source + + @property + def status(self): + state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) + subvolume_type = self.subvol_type + subvolume_status = { + 'state' : state.value + } + if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: + subvolume_status["source"] = self._get_clone_source() + return subvolume_status + + @property + def state(self): + return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) + + @state.setter + def state(self, val): + state = val[0].value + flush = val[1] + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state) + if flush: + self.metadata_mgr.flush() + + def remove(self, retainsnaps=False): + if retainsnaps: + raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname)) + if self.list_snapshots(): + raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname)) + self.trash_base_dir() + + def resize(self, newsize, noshrink): + subvol_path = self.path + return self._resize(subvol_path, newsize, noshrink) + + def create_snapshot(self, snapname): + try: + group_snapshot_path = os.path.join(self.group.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + self.fs.stat(group_snapshot_path) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + snappath = self.snapshot_path(snapname) + mksnap(self.fs, snappath) + else: + raise VolumeException(-e.args[0], e.args[1]) + else: + raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same") + + def has_pending_clones(self, snapname): + try: + return self.metadata_mgr.section_has_item('clone snaps', snapname) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + return False + raise + + def remove_snapshot(self, snapname): + if self.has_pending_clones(snapname): + raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname)) + snappath = self.snapshot_path(snapname) + rmsnap(self.fs, snappath) + + def snapshot_info(self, snapname): + if is_inherited_snap(snapname): + raise VolumeException(-errno.EINVAL, + "snapshot name '{0}' is invalid".format(snapname)) + snappath = self.snapshot_data_path(snapname) + snap_info = {} + try: + snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes', + 'data_pool':'ceph.dir.layout.pool'} + for key, val in snap_attrs.items(): + snap_info[key] = self.fs.getxattr(snappath, val) + return {'size': int(snap_info['size']), + 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))), + 'data_pool': snap_info['data_pool'].decode('utf-8'), + 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"} + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise VolumeException(-errno.ENOENT, + "snapshot '{0}' does not exist".format(snapname)) + raise VolumeException(-e.args[0], e.args[1]) + + def list_snapshots(self): + try: + dirpath = self.snapshot_base_path() + return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return [] + raise + + def _add_snap_clone(self, track_id, snapname): + self.metadata_mgr.add_section("clone snaps") + self.metadata_mgr.update_section("clone snaps", track_id, snapname) + self.metadata_mgr.flush() + + def _remove_snap_clone(self, track_id): + self.metadata_mgr.remove_option("clone snaps", track_id) + self.metadata_mgr.flush() + + def attach_snapshot(self, snapname, tgt_subvolume): + if not snapname.encode('utf-8') in self.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + try: + create_clone_index(self.fs, self.vol_spec) + with open_clone_index(self.fs, self.vol_spec) as index: + track_idx = index.track(tgt_subvolume.base_path) + self._add_snap_clone(track_idx, snapname) + except (IndexException, MetadataMgrException) as e: + log.warn("error creating clone index: {0}".format(e)) + raise VolumeException(-errno.EINVAL, "error cloning subvolume") + + def detach_snapshot(self, snapname, track_id): + if not snapname.encode('utf-8') in self.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + try: + with open_clone_index(self.fs, self.vol_spec) as index: + index.untrack(track_id) + self._remove_snap_clone(track_id) + except (IndexException, MetadataMgrException) as e: + log.warn("error delining snapshot from clone: {0}".format(e)) + raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone") diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py new file mode 100644 index 00000000..1dd6f3fe --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py @@ -0,0 +1,370 @@ +import os +import stat +import uuid +import errno +import logging + +import cephfs + +from .metadata_manager import MetadataManager +from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures +from .op_sm import SubvolumeOpSm +from .subvolume_v1 import SubvolumeV1 +from ..template import SubvolumeTemplate +from ...exception import OpSmException, VolumeException, MetadataMgrException +from ...fs_util import listdir +from ..template import SubvolumeOpType + +log = logging.getLogger(__name__) + +class SubvolumeV2(SubvolumeV1): + """ + Version 2 subvolumes creates a subvolume with path as follows, + volumes/<group-name>/<subvolume-name>/<uuid>/ + + The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots + of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory, + rather than under the <uuid> directory, as is the case of V1 subvolumes. + + - The directory under which user data resides is <uuid> + - Snapshots of the subvolume are taken within the <subvolume-name> directory + - A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar + to V1 subvolumes + - On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining + the rest of the subvolume and its meta file. + - The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of + older incarnations of the same subvolume. + - V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the + snapshot retention feature + """ + VERSION = 2 + + @staticmethod + def version(): + return SubvolumeV2.VERSION + + @property + def features(self): + return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, + SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value, + SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value] + + @property + def retained(self): + try: + self.metadata_mgr.refresh() + if self.state == SubvolumeStates.STATE_RETAINED: + return True + return False + except MetadataMgrException as me: + if me.errno != -errno.ENOENT: + raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname)) + return False + + @property + def purgeable(self): + if not self.retained or self.list_snapshots() or self.has_pending_purges: + return False + return True + + @property + def has_pending_purges(self): + try: + return not listdir(self.fs, self.trash_dir) == [] + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return False + raise + + @property + def trash_dir(self): + return os.path.join(self.base_path, b".trash") + + def create_trashcan(self): + """per subvolume trash directory""" + try: + self.fs.stat(self.trash_dir) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + try: + self.fs.mkdir(self.trash_dir, 0o700) + except cephfs.Error as ce: + raise VolumeException(-ce.args[0], ce.args[1]) + else: + raise VolumeException(-e.args[0], e.args[1]) + + def mark_subvolume(self): + # set subvolume attr, on subvolume root, marking it as a CephFS subvolume + # subvolume root is where snapshots would be taken, and hence is the base_path for v2 subvolumes + try: + # MDS treats this as a noop for already marked subvolume + self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0) + except cephfs.InvalidValue as e: + raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + @staticmethod + def is_valid_uuid(uuid_str): + try: + uuid.UUID(uuid_str) + return True + except ValueError: + return False + + def snapshot_base_path(self): + return os.path.join(self.base_path, self.vol_spec.snapshot_dir_prefix.encode('utf-8')) + + def snapshot_data_path(self, snapname): + snap_base_path = self.snapshot_path(snapname) + uuid_str = None + try: + with self.fs.opendir(snap_base_path) as dir_handle: + d = self.fs.readdir(dir_handle) + while d: + if d.d_name not in (b".", b".."): + d_full_path = os.path.join(snap_base_path, d.d_name) + stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW) + if stat.S_ISDIR(stx.get('mode')): + if self.is_valid_uuid(d.d_name.decode('utf-8')): + uuid_str = d.d_name + d = self.fs.readdir(dir_handle) + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + raise VolumeException(-e.args[0], e.args[1]) + + if not uuid_str: + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + + return os.path.join(snap_base_path, uuid_str) + + def _remove_on_failure(self, subvol_path, retained): + if retained: + log.info("cleaning up subvolume incarnation with path: {0}".format(subvol_path)) + try: + self.fs.rmdir(subvol_path) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + else: + log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) + self.remove() + + def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state): + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value) + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath) + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value) + + def create(self, size, isolate_nspace, pool, mode, uid, gid): + subvolume_type = SubvolumeTypes.TYPE_NORMAL + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") + + retained = self.retained + if retained and self.has_pending_purges: + raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress") + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + self.fs.mkdirs(subvol_path, mode) + self.mark_subvolume() + attrs = { + 'uid': uid, + 'gid': gid, + 'data_pool': pool, + 'pool_namespace': self.namespace if isolate_nspace else None, + 'quota': size + } + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata + qpath = subvol_path.decode('utf-8') + if retained: + self._set_incarnation_metadata(subvolume_type, qpath, initial_state) + self.metadata_mgr.flush() + else: + self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state) + + # Create the subvolume metadata file which manages auth-ids if it doesn't exist + self.auth_mdata_mgr.create_subvolume_metadata_file(self.group.groupname, self.subvolname) + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + self._remove_on_failure(subvol_path, retained) + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, "exception in subvolume metadata") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def create_clone(self, pool, source_volname, source_subvolume, snapname): + subvolume_type = SubvolumeTypes.TYPE_CLONE + try: + initial_state = SubvolumeOpSm.get_init_state(subvolume_type) + except OpSmException as oe: + raise VolumeException(-errno.EINVAL, "clone failed: internal error") + + retained = self.retained + if retained and self.has_pending_purges: + raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress") + subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) + try: + # source snapshot attrs are used to create clone subvolume + # attributes of subvolume's content though, are synced during the cloning process. + attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname)) + + # override snapshot pool setting, if one is provided for the clone + if pool is not None: + attrs["data_pool"] = pool + attrs["pool_namespace"] = None + + # create directory and set attributes + self.fs.mkdirs(subvol_path, attrs.get("mode")) + self.mark_subvolume() + self.set_attrs(subvol_path, attrs) + + # persist subvolume metadata and clone source + qpath = subvol_path.decode('utf-8') + if retained: + self._set_incarnation_metadata(subvolume_type, qpath, initial_state) + else: + self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value) + self.add_clone_source(source_volname, source_subvolume, snapname) + self.metadata_mgr.flush() + except (VolumeException, MetadataMgrException, cephfs.Error) as e: + try: + self._remove_on_failure(subvol_path, retained) + except VolumeException as ve: + log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) + + if isinstance(e, MetadataMgrException): + log.error("metadata manager exception: {0}".format(e)) + e = VolumeException(-errno.EINVAL, "exception in subvolume metadata") + elif isinstance(e, cephfs.Error): + e = VolumeException(-e.args[0], e.args[1]) + raise e + + def allowed_ops_by_type(self, vol_type): + if vol_type == SubvolumeTypes.TYPE_CLONE: + return {op_type for op_type in SubvolumeOpType} + + if vol_type == SubvolumeTypes.TYPE_NORMAL: + return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL} + + return {} + + def allowed_ops_by_state(self, vol_state): + if vol_state == SubvolumeStates.STATE_COMPLETE: + return {op_type for op_type in SubvolumeOpType} + + if vol_state == SubvolumeStates.STATE_RETAINED: + return { + SubvolumeOpType.REMOVE, + SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.LIST, + SubvolumeOpType.INFO, + SubvolumeOpType.SNAP_REMOVE, + SubvolumeOpType.SNAP_LIST, + SubvolumeOpType.SNAP_INFO, + SubvolumeOpType.SNAP_PROTECT, + SubvolumeOpType.SNAP_UNPROTECT, + SubvolumeOpType.CLONE_SOURCE + } + + return {SubvolumeOpType.REMOVE_FORCE, + SubvolumeOpType.CLONE_CREATE, + SubvolumeOpType.CLONE_STATUS, + SubvolumeOpType.CLONE_CANCEL, + SubvolumeOpType.CLONE_INTERNAL, + SubvolumeOpType.CLONE_SOURCE} + + def open(self, op_type): + if not isinstance(op_type, SubvolumeOpType): + raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( + op_type.value, self.subvolname)) + try: + self.metadata_mgr.refresh() + # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark + self.mark_subvolume() + + etype = self.subvol_type + if op_type not in self.allowed_ops_by_type(etype): + raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( + op_type.value, self.subvolname, etype.value)) + + estate = self.state + if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format( + self.subvolname)) + + if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED: + raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( + self.subvolname, op_type.value)) + + if estate != SubvolumeStates.STATE_RETAINED: + subvol_path = self.path + log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) + st = self.fs.stat(subvol_path) + + self.uid = int(st.st_uid) + self.gid = int(st.st_gid) + self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) + except MetadataMgrException as me: + if me.errno == -errno.ENOENT: + raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) + raise VolumeException(me.args[0], me.args[1]) + except cephfs.ObjectNotFound: + log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname)) + raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname)) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def trash_incarnation_dir(self): + """rename subvolume (uuid component) to trash""" + self.create_trashcan() + try: + bname = os.path.basename(self.path) + tpath = os.path.join(self.trash_dir, bname) + log.debug("trash: {0} -> {1}".format(self.path, tpath)) + self.fs.rename(self.path, tpath) + self._link_dir(tpath, bname) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + + def remove(self, retainsnaps=False): + if self.list_snapshots(): + if not retainsnaps: + raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname)) + else: + if not self.has_pending_purges: + self.trash_base_dir() + # Delete the volume meta file, if it's not already deleted + self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname) + return + if self.state != SubvolumeStates.STATE_RETAINED: + self.trash_incarnation_dir() + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "") + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value) + self.metadata_mgr.flush() + # Delete the volume meta file, if it's not already deleted + self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname) + + def info(self): + if self.state != SubvolumeStates.STATE_RETAINED: + return super(SubvolumeV2, self).info() + + return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value} + + def remove_snapshot(self, snapname): + super(SubvolumeV2, self).remove_snapshot(snapname) + if self.purgeable: + self.trash_base_dir() + # tickle the volume purge job to purge this entry, using ESTALE + raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname)) + # if not purgeable, subvol is not retained, or has snapshots, or already has purge jobs that will garbage collect this subvol diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py new file mode 100644 index 00000000..410e5c44 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/volume.py @@ -0,0 +1,348 @@ +import time +import errno +import logging +import sys + +try: + from typing import List +except ImportError: + pass # For typing only + +from contextlib import contextmanager +from threading import Lock, Condition + +if sys.version_info >= (3, 3): + from threading import Timer +else: + from threading import _Timer as Timer + +import cephfs +import orchestrator + +from .lock import GlobalLock +from ..exception import VolumeException +from ..fs_util import create_pool, remove_pool, create_filesystem, \ + remove_filesystem, create_mds, volume_exists + +log = logging.getLogger(__name__) + +class ConnectionPool(object): + class Connection(object): + def __init__(self, mgr, fs_name): + self.fs = None + self.mgr = mgr + self.fs_name = fs_name + self.ops_in_progress = 0 + self.last_used = time.time() + self.fs_id = self.get_fs_id() + + def get_fs_id(self): + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == self.fs_name: + return fs['id'] + raise VolumeException( + -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name)) + + def get_fs_handle(self): + self.last_used = time.time() + self.ops_in_progress += 1 + return self.fs + + def put_fs_handle(self, notify): + assert self.ops_in_progress > 0 + self.ops_in_progress -= 1 + if self.ops_in_progress == 0: + notify() + + def del_fs_handle(self, waiter): + if waiter: + while self.ops_in_progress != 0: + waiter() + if self.is_connection_valid(): + self.disconnect() + else: + self.abort() + + def is_connection_valid(self): + fs_id = None + try: + fs_id = self.get_fs_id() + except: + # the filesystem does not exist now -- connection is not valid. + pass + log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id)) + return self.fs_id == fs_id + + def is_connection_idle(self, timeout): + return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) + + def connect(self): + assert self.ops_in_progress == 0 + log.debug("Connecting to cephfs '{0}'".format(self.fs_name)) + self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) + log.debug("Setting user ID and group ID of CephFS mount as root...") + self.fs.conf_set("client_mount_uid", "0") + self.fs.conf_set("client_mount_gid", "0") + log.debug("CephFS initializing...") + self.fs.init() + log.debug("CephFS mounting...") + self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) + log.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) + + def disconnect(self): + try: + assert self.fs + assert self.ops_in_progress == 0 + log.info("disconnecting from cephfs '{0}'".format(self.fs_name)) + self.fs.shutdown() + self.fs = None + except Exception as e: + log.debug("disconnect: ({0})".format(e)) + raise + + def abort(self): + assert self.fs + assert self.ops_in_progress == 0 + log.info("aborting connection from cephfs '{0}'".format(self.fs_name)) + self.fs.abort_conn() + log.info("abort done from cephfs '{0}'".format(self.fs_name)) + self.fs = None + + class RTimer(Timer): + """ + recurring timer variant of Timer + """ + def run(self): + try: + while not self.finished.is_set(): + self.finished.wait(self.interval) + self.function(*self.args, **self.kwargs) + self.finished.set() + except Exception as e: + log.error("ConnectionPool.RTimer: %s", e) + raise + + # TODO: make this configurable + TIMER_TASK_RUN_INTERVAL = 30.0 # seconds + CONNECTION_IDLE_INTERVAL = 60.0 # seconds + + def __init__(self, mgr): + self.mgr = mgr + self.connections = {} + self.lock = Lock() + self.cond = Condition(self.lock) + self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, + self.cleanup_connections) + self.timer_task.start() + + def cleanup_connections(self): + with self.lock: + log.info("scanning for idle connections..") + idle_fs = [fs_name for fs_name,conn in self.connections.items() + if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)] + for fs_name in idle_fs: + log.info("cleaning up connection for '{}'".format(fs_name)) + self._del_fs_handle(fs_name) + + def get_fs_handle(self, fs_name): + with self.lock: + conn = None + try: + conn = self.connections.get(fs_name, None) + if conn: + if conn.is_connection_valid(): + return conn.get_fs_handle() + else: + # filesystem id changed beneath us (or the filesystem does not exist). + # this is possible if the filesystem got removed (and recreated with + # same name) via "ceph fs rm/new" mon command. + log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) + self._del_fs_handle(fs_name) + conn = ConnectionPool.Connection(self.mgr, fs_name) + conn.connect() + except cephfs.Error as e: + # try to provide a better error string if possible + if e.args[0] == errno.ENOENT: + raise VolumeException( + -errno.ENOENT, "Volume '{0}' not found".format(fs_name)) + raise VolumeException(-e.args[0], e.args[1]) + self.connections[fs_name] = conn + return conn.get_fs_handle() + + def put_fs_handle(self, fs_name): + with self.lock: + conn = self.connections.get(fs_name, None) + if conn: + conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) + + def _del_fs_handle(self, fs_name, wait=False): + conn = self.connections.pop(fs_name, None) + if conn: + conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) + + def del_fs_handle(self, fs_name, wait=False): + with self.lock: + self._del_fs_handle(fs_name, wait) + + def del_all_handles(self): + with self.lock: + for fs_name in list(self.connections.keys()): + log.info("waiting for pending ops for '{}'".format(fs_name)) + self._del_fs_handle(fs_name, wait=True) + log.info("pending ops completed for '{}'".format(fs_name)) + # no new connections should have been initialized since its + # guarded on shutdown. + assert len(self.connections) == 0 + +def gen_pool_names(volname): + """ + return metadata and data pool name (from a filesystem/volume name) as a tuple + """ + return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname) + +def get_mds_map(mgr, volname): + """ + return mdsmap for a volname + """ + mds_map = None + fs_map = mgr.get("fs_map") + for f in fs_map['filesystems']: + if volname == f['mdsmap']['fs_name']: + return f['mdsmap'] + return mds_map + +def get_pool_names(mgr, volname): + """ + return metadata and data pools (list) names of volume as a tuple + """ + fs_map = mgr.get("fs_map") + metadata_pool_id = None + data_pool_ids = [] # type: List[int] + for f in fs_map['filesystems']: + if volname == f['mdsmap']['fs_name']: + metadata_pool_id = f['mdsmap']['metadata_pool'] + data_pool_ids = f['mdsmap']['data_pools'] + break + if metadata_pool_id is None: + return None, None + + osdmap = mgr.get("osd_map") + pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']]) + metadata_pool = pools[metadata_pool_id] + data_pools = [pools[id] for id in data_pool_ids] + return metadata_pool, data_pools + +def create_volume(mgr, volname): + """ + create volume (pool, filesystem and mds) + """ + metadata_pool, data_pool = gen_pool_names(volname) + # create pools + r, outs, outb = create_pool(mgr, metadata_pool, 16) + if r != 0: + return r, outb, outs + r, outb, outs = create_pool(mgr, data_pool, 8) + if r != 0: + #cleanup + remove_pool(mgr, metadata_pool) + return r, outb, outs + # create filesystem + r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool) + if r != 0: + log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs)) + #cleanup + remove_pool(mgr, data_pool) + remove_pool(mgr, metadata_pool) + return r, outb, outs + # create mds + return create_mds(mgr, volname) + +def delete_volume(mgr, volname, metadata_pool, data_pools): + """ + delete the given module (tear down mds, remove filesystem, remove pools) + """ + # Tear down MDS daemons + try: + completion = mgr.remove_stateless_service("mds", volname) + mgr._orchestrator_wait([completion]) + orchestrator.raise_if_exception(completion) + except (ImportError, orchestrator.OrchestratorError): + log.warning("OrchestratorError, not tearing down MDS daemons") + except Exception as e: + # Don't let detailed orchestrator exceptions (python backtraces) + # bubble out to the user + log.exception("Failed to tear down MDS daemons") + return -errno.EINVAL, "", str(e) + + # In case orchestrator didn't tear down MDS daemons cleanly, or + # there was no orchestrator, we force the daemons down. + if volume_exists(mgr, volname): + r, outb, outs = remove_filesystem(mgr, volname) + if r != 0: + return r, outb, outs + else: + err = "Filesystem not found for volume '{0}'".format(volname) + log.warning(err) + return -errno.ENOENT, "", err + r, outb, outs = remove_pool(mgr, metadata_pool) + if r != 0: + return r, outb, outs + + for data_pool in data_pools: + r, outb, outs = remove_pool(mgr, data_pool) + if r != 0: + return r, outb, outs + result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools)) + return r, result_str, "" + +def list_volumes(mgr): + """ + list all filesystem volumes. + + :param: None + :return: None + """ + result = [] + fs_map = mgr.get("fs_map") + for f in fs_map['filesystems']: + result.append({'name': f['mdsmap']['fs_name']}) + return result + +@contextmanager +def open_volume(vc, volname): + """ + open a volume for exclusive access. This API is to be used as a context manager. + + :param vc: volume client instance + :param volname: volume name + :return: yields a volume handle (ceph filesystem handle) + """ + if vc.is_stopping(): + raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress") + + g_lock = GlobalLock() + fs_handle = vc.connection_pool.get_fs_handle(volname) + try: + with g_lock.lock_op(): + yield fs_handle + finally: + vc.connection_pool.put_fs_handle(volname) + +@contextmanager +def open_volume_lockless(vc, volname): + """ + open a volume with shared access. This API is to be used as a context manager. + + :param vc: volume client instance + :param volname: volume name + :return: yields a volume handle (ceph filesystem handle) + """ + if vc.is_stopping(): + raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress") + + fs_handle = vc.connection_pool.get_fs_handle(volname) + try: + yield fs_handle + finally: + vc.connection_pool.put_fs_handle(volname) diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py new file mode 100644 index 00000000..7c902572 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -0,0 +1,109 @@ +import errno +import logging +import os +import stat + +import cephfs + +from .async_job import AsyncJobs +from .exception import VolumeException +from .operations.resolver import resolve_trash +from .operations.template import SubvolumeOpType +from .operations.group import open_group +from .operations.subvolume import open_subvol +from .operations.volume import open_volume, open_volume_lockless +from .operations.trash import open_trashcan + +log = logging.getLogger(__name__) + +# helper for fetching a trash entry for a given volume +def get_trash_entry_for_volume(volume_client, volname, running_jobs): + log.debug("fetching trash entry for volume '{0}'".format(volname)) + + try: + with open_volume_lockless(volume_client, volname) as fs_handle: + try: + with open_trashcan(fs_handle, volume_client.volspec) as trashcan: + path = trashcan.get_trash_entry(running_jobs) + return 0, path + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return 0, None + raise ve + except VolumeException as ve: + log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve)) + return ve.errno, None + +def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel): + groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8')) + log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname)) + + try: + with open_volume(volume_client, volname) as fs_handle: + with open_group(fs_handle, volume_client.volspec, groupname) as group: + with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume: + log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable)) + if not subvolume.purgeable: + return + # this is fine under the global lock -- there are just a handful + # of entries in the subvolume to purge. moreover, the purge needs + # to be guarded since a create request might sneak in. + trashcan.purge(subvolume.base_path, should_cancel) + except VolumeException as ve: + if not ve.errno == -errno.ENOENT: + raise + +# helper for starting a purge operation on a trash entry +def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel): + log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname)) + + ret = 0 + try: + with open_volume_lockless(volume_client, volname) as fs_handle: + with open_trashcan(fs_handle, volume_client.volspec) as trashcan: + try: + pth = os.path.join(trashcan.path, purge_entry) + stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE, + cephfs.AT_SYMLINK_NOFOLLOW) + if stat.S_ISLNK(stx['mode']): + tgt = fs_handle.readlink(pth, 4096) + tgt = tgt[:stx['size']] + log.debug("purging entry pointing to subvolume trash: {0}".format(tgt)) + delink = True + try: + trashcan.purge(tgt, should_cancel) + except VolumeException as ve: + if not ve.errno == -errno.ENOENT: + delink = False + return ve.errno + finally: + if delink: + subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel) + log.debug("purging trash link: {0}".format(purge_entry)) + trashcan.delink(purge_entry) + else: + log.debug("purging entry pointing to trash: {0}".format(pth)) + trashcan.purge(pth, should_cancel) + except cephfs.Error as e: + log.warn("failed to remove trash entry: {0}".format(e)) + except VolumeException as ve: + ret = ve.errno + return ret + +class ThreadPoolPurgeQueueMixin(AsyncJobs): + """ + Purge queue mixin class maintaining a pool of threads for purging trash entries. + Subvolumes are chosen from volumes in a round robin fashion. If some of the purge + entries (belonging to a set of volumes) have huge directory tree's (such as, lots + of small files in a directory w/ deep directory trees), this model may lead to + _all_ threads purging entries for one volume (starving other volumes). + """ + def __init__(self, volume_client, tp_size): + self.vc = volume_client + super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size) + + def get_next_job(self, volname, running_jobs): + return get_trash_entry_for_volume(self.vc, volname, running_jobs) + + def execute_job(self, volname, job, should_cancel): + purge_trash_entry_for_volume(self.vc, volname, job, should_cancel) diff --git a/src/pybind/mgr/volumes/fs/vol_spec.py b/src/pybind/mgr/volumes/fs/vol_spec.py new file mode 100644 index 00000000..e18ab069 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/vol_spec.py @@ -0,0 +1,37 @@ +import os + +class VolSpec(object): + """ + specification of a "volume" -- base directory and various prefixes. + """ + + # where shall we (by default) create subvolumes + DEFAULT_SUBVOL_PREFIX = "/volumes" + # and the default namespace + DEFAULT_NS_PREFIX = "fsvolumens_" + + def __init__(self, snapshot_prefix, subvolume_prefix=None, pool_ns_prefix=None): + self.snapshot_prefix = snapshot_prefix + self.subvolume_prefix = subvolume_prefix if subvolume_prefix else VolSpec.DEFAULT_SUBVOL_PREFIX + self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else VolSpec.DEFAULT_NS_PREFIX + + @property + def snapshot_dir_prefix(self): + """ + Return the snapshot directory prefix + """ + return self.snapshot_prefix + + @property + def base_dir(self): + """ + Return the top level directory under which subvolumes/groups are created + """ + return self.subvolume_prefix + + @property + def fs_namespace(self): + """ + return a filesystem namespace by stashing pool namespace prefix and subvolume-id + """ + return self.pool_ns_prefix diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py new file mode 100644 index 00000000..ab287539 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -0,0 +1,660 @@ +import json +import errno +import logging +from threading import Event + +import cephfs + +from .fs_util import listdir + +from .operations.volume import ConnectionPool, open_volume, create_volume, \ + delete_volume, list_volumes, get_pool_names +from .operations.group import open_group, create_group, remove_group, open_group_unique +from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ + create_clone + +from .vol_spec import VolSpec +from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError +from .async_cloner import Cloner +from .purge_queue import ThreadPoolPurgeQueueMixin +from .operations.template import SubvolumeOpType + +log = logging.getLogger(__name__) + +ALLOWED_ACCESS_LEVELS = ('r', 'rw') + + +def octal_str_to_decimal_int(mode): + try: + return int(mode, 8) + except ValueError: + raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode)) + +def name_to_json(names): + """ + convert the list of names to json + """ + namedict = [] + for i in range(len(names)): + namedict.append({'name': names[i].decode('utf-8')}) + return json.dumps(namedict, indent=4, sort_keys=True) + +class VolumeClient(object): + def __init__(self, mgr): + self.mgr = mgr + self.stopping = Event() + # volume specification + self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) + self.connection_pool = ConnectionPool(self.mgr) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones) + self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) + # on startup, queue purge job for available volumes to kickstart + # purge for leftover subvolume entries in trash. note that, if the + # trash directory does not exist or if there are no purge entries + # available for a volume, the volume is removed from the purge + # job list. + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + self.cloner.queue_job(fs['mdsmap']['fs_name']) + self.purge_queue.queue_job(fs['mdsmap']['fs_name']) + + def is_stopping(self): + return self.stopping.is_set() + + def shutdown(self): + log.info("shutting down") + # first, note that we're shutting down + self.stopping.set() + # second, ask purge threads to quit + self.purge_queue.cancel_all_jobs() + # third, delete all libcephfs handles from connection pool + self.connection_pool.del_all_handles() + + def cluster_log(self, msg, lvl=None): + """ + log to cluster log with default log level as WARN. + """ + if not lvl: + lvl = self.mgr.CLUSTER_LOG_PRIO_WARN + self.mgr.cluster_log("cluster", lvl, msg) + + def volume_exception_to_retval(self, ve): + """ + return a tuple representation from a volume exception + """ + return ve.to_tuple() + + ### volume operations -- create, rm, ls + + def create_fs_volume(self, volname): + if self.is_stopping(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + return create_volume(self.mgr, volname) + + def delete_fs_volume(self, volname, confirm): + if self.is_stopping(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + + if confirm != "--yes-i-really-mean-it": + return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \ + "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \ + "that is what you want, re-issue the command followed by " \ + "--yes-i-really-mean-it.".format(volname) + + ret, out, err = self.mgr.mon_command({ + 'prefix': 'config get', + 'key': 'mon_allow_pool_delete', + 'who': 'mon.*', + 'format': 'json', + }) + if ret != 0: + return ret, out, err + mon_allow_pool_delete = json.loads(out) + if not mon_allow_pool_delete: + return -errno.EPERM, "", "pool deletion is disabled; you must first " \ + "set the mon_allow_pool_delete config option to true before volumes " \ + "can be deleted" + + metadata_pool, data_pools = get_pool_names(self.mgr, volname) + if not metadata_pool: + return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname) + self.purge_queue.cancel_jobs(volname) + self.connection_pool.del_fs_handle(volname, wait=True) + return delete_volume(self.mgr, volname, metadata_pool, data_pools) + + def list_fs_volumes(self): + if self.stopping.is_set(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + volumes = list_volumes(self.mgr) + return 0, json.dumps(volumes, indent=4, sort_keys=True), "" + + ### subvolume operations + + def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs): + size = kwargs['size'] + pool = kwargs['pool_layout'] + uid = kwargs['uid'] + gid = kwargs['gid'] + mode = kwargs['mode'] + isolate_nspace = kwargs['namespace_isolated'] + + oct_mode = octal_str_to_decimal_int(mode) + try: + create_subvol( + self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid) + except VolumeException as ve: + # kick the purge threads for async removal -- note that this + # assumes that the subvolume is moved to trashcan for cleanup on error. + self.purge_queue.queue_job(volname) + raise ve + + def create_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + size = kwargs['size'] + pool = kwargs['pool_layout'] + uid = kwargs['uid'] + gid = kwargs['gid'] + isolate_nspace = kwargs['namespace_isolated'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + try: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume: + # idempotent creation -- valid. Attributes set is supported. + attrs = { + 'uid': uid if uid else subvolume.uid, + 'gid': gid if gid else subvolume.gid, + 'data_pool': pool, + 'pool_namespace': subvolume.namespace if isolate_nspace else None, + 'quota': size + } + subvolume.set_attrs(subvolume.path, attrs) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs) + else: + raise + except VolumeException as ve: + # volume/group does not exist or subvolume creation failed + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] + retainsnaps = kwargs['retain_snapshots'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps) + # kick the purge threads for async removal -- note that this + # assumes that the subvolume is moved to trash can. + # TODO: make purge queue as singleton so that trash can kicks + # the purge threads on dump. + self.purge_queue.queue_job(volname) + except VolumeException as ve: + if ve.errno == -errno.EAGAIN: + ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)") + ret = self.volume_exception_to_retval(ve) + elif not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def authorize_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + authid = kwargs['auth_id'] + groupname = kwargs['group_name'] + accesslevel = kwargs['access_level'] + tenant_id = kwargs['tenant_id'] + allow_existing_id = kwargs['allow_existing_id'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume: + key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id) + ret = 0, key, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def deauthorize_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + authid = kwargs['auth_id'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume: + subvolume.deauthorize(authid) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def authorized_list(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume: + auths = subvolume.authorized_list() + ret = 0, json.dumps(auths, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def evict(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + authid = kwargs['auth_id'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume: + key = subvolume.evict(volname, authid) + ret = 0, "", "" + except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e: + if isinstance(e, VolumeException): + ret = self.volume_exception_to_retval(e) + elif isinstance(e, ClusterTimeout): + ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster" + elif isinstance(e, ClusterError): + ret = e._result_code , "", e._result_str + elif isinstance(e, EvictionError): + ret = -errno.EINVAL, "", str(e) + return ret + + def resize_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + newsize = kwargs['new_size'] + noshrink = kwargs['no_shrink'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume: + nsize, usedbytes = subvolume.resize(newsize, noshrink) + ret = 0, json.dumps( + [{'bytes_used': usedbytes},{'bytes_quota': nsize}, + {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}], + indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_getpath(self, **kwargs): + ret = None + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume: + subvolpath = subvolume.path + ret = 0, subvolpath.decode("utf-8"), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_info(self, **kwargs): + ret = None + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume: + mon_addr_lst = [] + mon_map_mons = self.mgr.get('mon_map')['mons'] + for mon in mon_map_mons: + ip_port = mon['addr'].split("/")[0] + mon_addr_lst.append(ip_port) + + subvol_info_dict = subvolume.info() + subvol_info_dict["mon_addrs"] = mon_addr_lst + ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolumes(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + subvolumes = group.list_subvolumes() + ret = 0, name_to_json(subvolumes), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + ### subvolume snapshot + + def create_subvolume_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume: + subvolume.create_snapshot(snapname) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume: + subvolume.remove_snapshot(snapname) + except VolumeException as ve: + # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and, + # we should tickle the purge jobs to purge the same + if ve.errno == -errno.ESTALE: + self.purge_queue.queue_job(volname) + elif not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_snapshot_info(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume: + snap_info_dict = subvolume.snapshot_info(snapname) + ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolume_snapshots(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume: + snapshots = subvolume.list_snapshots() + ret = 0, name_to_json(snapshots), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def protect_subvolume_snapshot(self, **kwargs): + ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume: + log.warning("snapshot protect call is deprecated and will be removed in a future release") + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def unprotect_subvolume_snapshot(self, **kwargs): + ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume: + log.warning("snapshot unprotect call is deprecated and will be removed in a future release") + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs): + t_pool = kwargs['pool_layout'] + s_subvolname = kwargs['sub_name'] + s_groupname = kwargs['group_name'] + t_groupname = kwargs['target_group_name'] + + create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname) + with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume: + try: + if t_groupname == s_groupname and t_subvolname == s_subvolname: + t_subvolume.attach_snapshot(s_snapname, t_subvolume) + else: + s_subvolume.attach_snapshot(s_snapname, t_subvolume) + self.cloner.queue_job(volname) + except VolumeException as ve: + try: + t_subvolume.remove() + self.purge_queue.queue_job(volname) + except Exception as e: + log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e)) + raise ve + + def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs): + s_snapname = kwargs['snap_name'] + target_subvolname = kwargs['target_sub_name'] + target_groupname = kwargs['target_group_name'] + s_groupname = kwargs['group_name'] + + if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname)) + + with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group: + try: + with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE): + raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname)) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname, + target_group, target_subvolname, **kwargs) + else: + raise + + def clone_subvolume_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + s_subvolname = kwargs['sub_name'] + s_groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, s_groupname) as s_group: + with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: + self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def clone_status(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + clonename = kwargs['clone_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume: + ret = 0, json.dumps({'status' : subvolume.status}, indent=2), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def clone_cancel(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + clonename = kwargs['clone_name'] + groupname = kwargs['group_name'] + + try: + self.cloner.cancel_job(volname, (clonename, groupname)) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + ### group operations + + def create_subvolume_group(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + pool = kwargs['pool_layout'] + uid = kwargs['uid'] + gid = kwargs['gid'] + mode = kwargs['mode'] + + try: + with open_volume(self, volname) as fs_handle: + try: + with open_group(fs_handle, self.volspec, groupname): + # idempotent creation -- valid. + pass + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + oct_mode = octal_str_to_decimal_int(mode) + create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid) + else: + raise + except VolumeException as ve: + # volume does not exist or subvolume group creation failed + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_group(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + remove_group(fs_handle, self.volspec, groupname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def getpath_subvolume_group(self, **kwargs): + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + return 0, group.path.decode('utf-8'), "" + except VolumeException as ve: + return self.volume_exception_to_retval(ve) + + def list_subvolume_groups(self, **kwargs): + volname = kwargs['vol_name'] + ret = 0, '[]', "" + try: + with open_volume(self, volname) as fs_handle: + groups = listdir(fs_handle, self.volspec.base_dir) + ret = 0, name_to_json(groups), "" + except VolumeException as ve: + if not ve.errno == -errno.ENOENT: + ret = self.volume_exception_to_retval(ve) + return ret + + ### group snapshot + + def create_subvolume_group_snapshot(self, **kwargs): + ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + # snapname = kwargs['snap_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots + # at the subvolume group (see: https://tracker.ceph.com/issues/46074) + # group.create_snapshot(snapname) + pass + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_group_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + snapname = kwargs['snap_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + group.remove_snapshot(snapname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolume_group_snapshots(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + snapshots = group.list_snapshots() + ret = 0, name_to_json(snapshots), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret |