summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/volumes/fs
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs')
-rw-r--r--src/pybind/mgr/volumes/fs/__init__.py0
-rw-r--r--src/pybind/mgr/volumes/fs/async_cloner.py345
-rw-r--r--src/pybind/mgr/volumes/fs/async_job.py279
-rw-r--r--src/pybind/mgr/volumes/fs/exception.py63
-rw-r--r--src/pybind/mgr/volumes/fs/fs_util.py161
-rw-r--r--src/pybind/mgr/volumes/fs/operations/__init__.py0
-rw-r--r--src/pybind/mgr/volumes/fs/operations/access.py142
-rw-r--r--src/pybind/mgr/volumes/fs/operations/clone_index.py98
-rw-r--r--src/pybind/mgr/volumes/fs/operations/group.py186
-rw-r--r--src/pybind/mgr/volumes/fs/operations/index.py23
-rw-r--r--src/pybind/mgr/volumes/fs/operations/lock.py42
-rw-r--r--src/pybind/mgr/volumes/fs/operations/rankevicter.py114
-rw-r--r--src/pybind/mgr/volumes/fs/operations/resolver.py26
-rw-r--r--src/pybind/mgr/volumes/fs/operations/snapshot_util.py30
-rw-r--r--src/pybind/mgr/volumes/fs/operations/subvolume.py74
-rw-r--r--src/pybind/mgr/volumes/fs/operations/template.py173
-rw-r--r--src/pybind/mgr/volumes/fs/operations/trash.py145
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/__init__.py109
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py208
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py144
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/op_sm.py114
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py61
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py331
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py780
-rw-r--r--src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py370
-rw-r--r--src/pybind/mgr/volumes/fs/operations/volume.py348
-rw-r--r--src/pybind/mgr/volumes/fs/purge_queue.py109
-rw-r--r--src/pybind/mgr/volumes/fs/vol_spec.py37
-rw-r--r--src/pybind/mgr/volumes/fs/volume.py660
29 files changed, 5172 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/__init__.py b/src/pybind/mgr/volumes/fs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/__init__.py
diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py
new file mode 100644
index 00000000..0c5155e7
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/async_cloner.py
@@ -0,0 +1,345 @@
+import os
+import stat
+import time
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .async_job import AsyncJobs
+from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
+from .fs_util import copy_file
+from .operations.versions.op_sm import SubvolumeOpSm
+from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions
+from .operations.resolver import resolve
+from .operations.volume import open_volume, open_volume_lockless
+from .operations.group import open_group
+from .operations.subvolume import open_subvol
+from .operations.clone_index import open_clone_index
+from .operations.template import SubvolumeOpType
+
+log = logging.getLogger(__name__)
+
+# helper for fetching a clone entry for a given volume
+def get_next_clone_entry(volume_client, volname, running_jobs):
+ log.debug("fetching clone entry for volume '{0}'".format(volname))
+
+ try:
+ with open_volume_lockless(volume_client, volname) as fs_handle:
+ try:
+ with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
+ job = clone_index.get_oldest_clone_entry(running_jobs)
+ return 0, job
+ except IndexException as ve:
+ if ve.errno == -errno.ENOENT:
+ return 0, None
+ raise ve
+ except VolumeException as ve:
+ log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
+ return ve.errno, None
+
+@contextmanager
+def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
+ with open_volume(volume_client, volname) as fs_handle:
+ with open_group(fs_handle, volume_client.volspec, groupname) as group:
+ with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
+ yield subvolume
+
+@contextmanager
+def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
+ with open_group(fs_handle, volume_client.volspec, groupname) as group:
+ with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
+ yield subvolume
+
+@contextmanager
+def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
+ # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
+ # the clone subvolume as the source subvolume
+ if s_groupname == c_groupname and s_subvolname == c_subvolname:
+ yield c_subvolume
+ else:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
+ yield s_subvolume
+
+
+@contextmanager
+def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
+ with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
+ s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
+ if groupname == s_groupname and subvolname == s_subvolname:
+ # use the same subvolume to avoid metadata overwrites
+ yield (clone_subvolume, clone_subvolume, s_snapname)
+ else:
+ with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
+ yield (clone_subvolume, source_subvolume, s_snapname)
+
+def get_clone_state(volume_client, volname, groupname, subvolname):
+ with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
+ return subvolume.state
+
+def set_clone_state(volume_client, volname, groupname, subvolname, state):
+ with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
+ subvolume.state = (state, True)
+
+def get_clone_source(clone_subvolume):
+ source = clone_subvolume._get_clone_source()
+ return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
+
+def get_next_state_on_error(errnum):
+ if errnum == -errno.EINTR:
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_CANCELLED)
+ else:
+ # jump to failed state, on all other errors
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_FAILED)
+ return next_state
+
+def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
+ try:
+ if should_cancel():
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_CANCELLED)
+ else:
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_SUCCESS)
+ except OpSmException as oe:
+ raise VolumeException(oe.errno, oe.error_str)
+ return (next_state, False)
+
+def sync_attrs(fs_handle, target_path, source_statx):
+ try:
+ fs_handle.lchown(target_path, int(source_statx["uid"]), int(source_statx["gid"]))
+ fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()),
+ time.mktime(source_statx["mtime"].timetuple())))
+ except cephfs.Error as e:
+ log.warn("error synchronizing attrs for {0} ({1})".format(target_path, e))
+ raise e
+
+def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
+ """
+ bulk copy data from source to destination -- only directories, symlinks
+ and regular files are synced.
+ """
+ log.info("copying data from {0} to {1}".format(source_path, dst_path))
+ def cptree(src_root_path, dst_root_path):
+ log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
+ try:
+ with fs_handle.opendir(src_root_path) as dir_handle:
+ d = fs_handle.readdir(dir_handle)
+ while d and not should_cancel():
+ if d.d_name not in (b".", b".."):
+ log.debug("d={0}".format(d))
+ d_full_src = os.path.join(src_root_path, d.d_name)
+ d_full_dst = os.path.join(dst_root_path, d.d_name)
+ stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE |
+ cephfs.CEPH_STATX_UID |
+ cephfs.CEPH_STATX_GID |
+ cephfs.CEPH_STATX_ATIME |
+ cephfs.CEPH_STATX_MTIME |
+ cephfs.CEPH_STATX_SIZE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ handled = True
+ mo = stx["mode"] & ~stat.S_IFMT(stx["mode"])
+ if stat.S_ISDIR(stx["mode"]):
+ log.debug("cptree: (DIR) {0}".format(d_full_src))
+ try:
+ fs_handle.mkdir(d_full_dst, mo)
+ except cephfs.Error as e:
+ if not e.args[0] == errno.EEXIST:
+ raise
+ cptree(d_full_src, d_full_dst)
+ elif stat.S_ISLNK(stx["mode"]):
+ log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
+ target = fs_handle.readlink(d_full_src, 4096)
+ try:
+ fs_handle.symlink(target[:stx["size"]], d_full_dst)
+ except cephfs.Error as e:
+ if not e.args[0] == errno.EEXIST:
+ raise
+ elif stat.S_ISREG(stx["mode"]):
+ log.debug("cptree: (REG) {0}".format(d_full_src))
+ copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel)
+ else:
+ handled = False
+ log.warn("cptree: (IGNORE) {0}".format(d_full_src))
+ if handled:
+ sync_attrs(fs_handle, d_full_dst, stx)
+ d = fs_handle.readdir(dir_handle)
+ stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME |
+ cephfs.CEPH_STATX_MTIME,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()),
+ time.mktime(stx_root["mtime"].timetuple())))
+ except cephfs.Error as e:
+ if not e.args[0] == errno.ENOENT:
+ raise VolumeException(-e.args[0], e.args[1])
+ cptree(source_path, dst_path)
+ if should_cancel():
+ raise VolumeException(-errno.EINTR, "clone operation interrupted")
+
+def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
+ with open_volume_lockless(volume_client, volname) as fs_handle:
+ with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
+ dst_path = clone_volumes[0].path
+ bulk_copy(fs_handle, src_path, dst_path, should_cancel)
+
+def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
+ try:
+ do_clone(volume_client, volname, groupname, subvolname, should_cancel)
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_SUCCESS)
+ except VolumeException as ve:
+ next_state = get_next_state_on_error(ve.errno)
+ except OpSmException as oe:
+ raise VolumeException(oe.errno, oe.error_str)
+ return (next_state, False)
+
+def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
+ try:
+ with open_volume(volume_client, volname) as fs_handle:
+ # detach source but leave the clone section intact for later inspection
+ with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ clone_volumes[1].detach_snapshot(clone_volumes[2], index)
+ except (MetadataMgrException, VolumeException) as e:
+ log.error("failed to detach clone from snapshot: {0}".format(e))
+ return (None, True)
+
+def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
+ try:
+ with open_volume(volume_client, volname) as fs_handle:
+ with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ clone_volumes[1].detach_snapshot(clone_volumes[2], index)
+ clone_volumes[0].remove_clone_source(flush=True)
+ except (MetadataMgrException, VolumeException) as e:
+ log.error("failed to detach clone from snapshot: {0}".format(e))
+ return (None, True)
+
+def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
+ finished = False
+ current_state = None
+ try:
+ current_state = get_clone_state(volume_client, volname, groupname, subvolname)
+ log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
+ while not finished:
+ handler = state_table.get(current_state, None)
+ if not handler:
+ raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
+ (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
+ if next_state:
+ log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
+ current_state, next_state))
+ set_clone_state(volume_client, volname, groupname, subvolname, next_state)
+ current_state = next_state
+ except VolumeException as ve:
+ log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
+ subvolname, current_state, ve))
+
+def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
+ log.info("cloning to subvolume path: {0}".format(clone_path))
+ resolved = resolve(volume_client.volspec, clone_path)
+
+ groupname = resolved[0]
+ subvolname = resolved[1]
+ log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
+
+ try:
+ log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
+ start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
+ log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
+ except VolumeException as ve:
+ log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
+
+class Cloner(AsyncJobs):
+ """
+ Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
+ this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
+ the driver. file types supported are directories, symbolic links and regular files.
+ """
+ def __init__(self, volume_client, tp_size):
+ self.vc = volume_client
+ self.state_table = {
+ SubvolumeStates.STATE_PENDING : handle_clone_pending,
+ SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
+ SubvolumeStates.STATE_COMPLETE : handle_clone_complete,
+ SubvolumeStates.STATE_FAILED : handle_clone_failed,
+ SubvolumeStates.STATE_CANCELED : handle_clone_failed,
+ }
+ super(Cloner, self).__init__(volume_client, "cloner", tp_size)
+
+ def reconfigure_max_concurrent_clones(self, tp_size):
+ super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+
+ def is_clone_cancelable(self, clone_state):
+ return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
+
+ def get_clone_tracking_index(self, fs_handle, clone_subvolume):
+ with open_clone_index(fs_handle, self.vc.volspec) as index:
+ return index.find_clone_entry_index(clone_subvolume.base_path)
+
+ def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx):
+ clone_state = SubvolumeStates.from_value(status['state'])
+ assert self.is_clone_cancelable(clone_state)
+
+ s_groupname = status['source'].get('group', None)
+ s_subvolname = status['source']['subvolume']
+ s_snapname = status['source']['snapshot']
+
+ with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
+ clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
+ clone_state,
+ SubvolumeActions.ACTION_CANCELLED)
+ clone_subvolume.state = (next_state, True)
+ s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
+
+ def cancel_job(self, volname, job):
+ """
+ override base class `cancel_job`. interpret @job as (clone, group) tuple.
+ """
+ clonename = job[0]
+ groupname = job[1]
+ track_idx = None
+
+ try:
+ with open_volume(self.vc, volname) as fs_handle:
+ with open_group(fs_handle, self.vc.volspec, groupname) as group:
+ with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
+ status = clone_subvolume.status
+ clone_state = SubvolumeStates.from_value(status['state'])
+ if not self.is_clone_cancelable(clone_state):
+ raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
+ track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
+ if not track_idx:
+ log.warn("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
+ raise VolumeException(-errno.EINVAL, "error canceling clone")
+ if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
+ # clone has not started yet -- cancel right away.
+ self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx)
+ return
+ # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
+ # to persist the new state, async cloner accesses the volume in exclusive mode.
+ # accessing the volume in exclusive mode here would lead to deadlock.
+ assert track_idx is not None
+ with self.lock:
+ with open_volume_lockless(self.vc, volname) as fs_handle:
+ with open_group(fs_handle, self.vc.volspec, groupname) as group:
+ with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
+ if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
+ raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
+ except (IndexException, MetadataMgrException) as e:
+ log.error("error cancelling clone {0}: ({1})".format(job, e))
+ raise VolumeException(-errno.EINVAL, "error canceling clone")
+
+ def get_next_job(self, volname, running_jobs):
+ return get_next_clone_entry(self.vc, volname, running_jobs)
+
+ def execute_job(self, volname, job, should_cancel):
+ clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py
new file mode 100644
index 00000000..954e89c4
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/async_job.py
@@ -0,0 +1,279 @@
+import sys
+import time
+import logging
+import threading
+import traceback
+from collections import deque
+
+from .exception import NotImplementedException
+
+log = logging.getLogger(__name__)
+
+class JobThread(threading.Thread):
+ # this is "not" configurable and there is no need for it to be
+ # configurable. if a thread encounters an exception, we retry
+ # until it hits this many consecutive exceptions.
+ MAX_RETRIES_ON_EXCEPTION = 10
+
+ def __init__(self, async_job, volume_client, name):
+ self.vc = volume_client
+ self.async_job = async_job
+ # event object to cancel jobs
+ self.cancel_event = threading.Event()
+ threading.Thread.__init__(self, name=name)
+
+ def run(self):
+ retries = 0
+ thread_id = threading.currentThread()
+ assert isinstance(thread_id, JobThread)
+ thread_name = thread_id.getName()
+ log.debug("thread [{0}] starting".format(thread_name))
+
+ while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
+ vol_job = None
+ try:
+ # fetch next job to execute
+ with self.async_job.lock:
+ while True:
+ if self.should_reconfigure_num_threads():
+ log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+ self.async_job.threads.remove(self)
+ return
+ vol_job = self.async_job.get_job()
+ if vol_job:
+ break
+ self.async_job.cv.wait()
+ self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
+
+ # execute the job (outside lock)
+ self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
+ retries = 0
+ except NotImplementedException:
+ raise
+ except Exception:
+ # unless the jobs fetching and execution routines are not implemented
+ # retry till we hit cap limit.
+ retries += 1
+ log.warning("thread [{0}] encountered fatal error: (attempt#" \
+ " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ log.warning("traceback: {0}".format("".join(
+ traceback.format_exception(exc_type, exc_value, exc_traceback))))
+ finally:
+ # when done, unregister the job
+ if vol_job:
+ with self.async_job.lock:
+ self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
+ time.sleep(1)
+ log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
+ self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+ with self.async_job.lock:
+ self.async_job.threads.remove(self)
+
+ def should_reconfigure_num_threads(self):
+ # reconfigure of max_concurrent_clones
+ return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
+
+ def cancel_job(self):
+ self.cancel_event.set()
+
+ def should_cancel(self):
+ return self.cancel_event.is_set()
+
+ def reset_cancel(self):
+ self.cancel_event.clear()
+
+class AsyncJobs(object):
+ """
+ Class providing asynchronous execution of jobs via worker threads.
+ `jobs` are grouped by `volume`, so a `volume` can have N number of
+ `jobs` executing concurrently (capped by number of concurrent jobs).
+
+ Usability is simple: subclass this and implement the following:
+ - get_next_job(volname, running_jobs)
+ - execute_job(volname, job, should_cancel)
+
+ ... and do not forget to invoke base class constructor.
+
+ Job cancelation is for a volume as a whole, i.e., all executing jobs
+ for a volume are canceled. Cancelation is poll based -- jobs need to
+ periodically check if cancelation is requested, after which the job
+ should return as soon as possible. Cancelation check is provided
+ via `should_cancel()` lambda passed to `execute_job()`.
+ """
+
+ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
+ self.vc = volume_client
+ # queue of volumes for starting async jobs
+ self.q = deque()
+ # volume => job tracking
+ self.jobs = {}
+ # lock, cv for kickstarting jobs
+ self.lock = threading.Lock()
+ self.cv = threading.Condition(self.lock)
+ # cv for job cancelation
+ self.waiting = False
+ self.cancel_cv = threading.Condition(self.lock)
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+
+ self.threads = []
+ for i in range(nr_concurrent_jobs):
+ self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
+ self.threads[-1].start()
+
+ def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+ """
+ reconfigure number of cloner threads
+ """
+ with self.lock:
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+ # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+ if len(self.threads) > nr_concurrent_jobs:
+ self.cv.notifyAll()
+ # Increase in concurrency
+ if len(self.threads) < nr_concurrent_jobs:
+ for i in range(len(self.threads), nr_concurrent_jobs):
+ self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+ self.threads[-1].start()
+
+ def get_job(self):
+ log.debug("processing {0} volume entries".format(len(self.q)))
+ nr_vols = len(self.q)
+ to_remove = []
+ next_job = None
+ while nr_vols > 0:
+ volname = self.q[0]
+ # do this now so that the other thread pick up jobs for other volumes
+ self.q.rotate(1)
+ running_jobs = [j[0] for j in self.jobs[volname]]
+ (ret, job) = self.get_next_job(volname, running_jobs)
+ if job:
+ next_job = (volname, job)
+ break
+ # this is an optimization when for a given volume there are no more
+ # jobs and no jobs are in progress. in such cases we remove the volume
+ # from the tracking list so as to:
+ #
+ # a. not query the filesystem for jobs over and over again
+ # b. keep the filesystem connection idle so that it can be freed
+ # from the connection pool
+ #
+ # if at all there are jobs for a volume, the volume gets added again
+ # to the tracking list and the jobs get kickstarted.
+ # note that, we do not iterate the volume list fully if there is a
+ # jobs to process (that will take place eventually).
+ if ret == 0 and not job and not running_jobs:
+ to_remove.append(volname)
+ nr_vols -= 1
+ for vol in to_remove:
+ log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
+ self.q.remove(vol)
+ self.jobs.pop(vol)
+ return next_job
+
+ def register_async_job(self, volname, job, thread_id):
+ log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
+ self.jobs[volname].append((job, thread_id))
+
+ def unregister_async_job(self, volname, job, thread_id):
+ log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
+ self.jobs[volname].remove((job, thread_id))
+
+ cancelled = thread_id.should_cancel()
+ thread_id.reset_cancel()
+
+ # wake up cancellation waiters if needed
+ if cancelled:
+ logging.info("waking up cancellation waiters")
+ self.cancel_cv.notifyAll()
+
+ def queue_job(self, volname):
+ """
+ queue a volume for asynchronous job execution.
+ """
+ log.info("queuing job for volume '{0}'".format(volname))
+ with self.lock:
+ if not volname in self.q:
+ self.q.append(volname)
+ self.jobs[volname] = []
+ self.cv.notifyAll()
+
+ def _cancel_jobs(self, volname):
+ """
+ cancel all jobs for the volume. do nothing is the no jobs are
+ executing for the given volume. this would wait until all jobs
+ get interrupted and finish execution.
+ """
+ log.info("cancelling jobs for volume '{0}'".format(volname))
+ try:
+ if not volname in self.q and not volname in self.jobs:
+ return
+ self.q.remove(volname)
+ # cancel in-progress operation and wait until complete
+ for j in self.jobs[volname]:
+ j[1].cancel_job()
+ # wait for cancellation to complete
+ while self.jobs[volname]:
+ log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
+ "cancel".format(len(self.jobs[volname]), volname))
+ self.cancel_cv.wait()
+ self.jobs.pop(volname)
+ except (KeyError, ValueError):
+ pass
+
+ def _cancel_job(self, volname, job):
+ """
+ cancel a executing job for a given volume. return True if canceled, False
+ otherwise (volume/job not found).
+ """
+ canceled = False
+ log.info("canceling job {0} for volume {1}".format(job, volname))
+ try:
+ if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
+ return canceled
+ for j in self.jobs[volname]:
+ if j[0] == job:
+ j[1].cancel_job()
+ # be safe against _cancel_jobs() running concurrently
+ while j in self.jobs.get(volname, []):
+ self.cancel_cv.wait()
+ canceled = True
+ break
+ except (KeyError, ValueError):
+ pass
+ return canceled
+
+ def cancel_job(self, volname, job):
+ with self.lock:
+ return self._cancel_job(volname, job)
+
+ def cancel_jobs(self, volname):
+ """
+ cancel all executing jobs for a given volume.
+ """
+ with self.lock:
+ self._cancel_jobs(volname)
+
+ def cancel_all_jobs(self):
+ """
+ call all executing jobs for all volumes.
+ """
+ with self.lock:
+ for volname in list(self.q):
+ self._cancel_jobs(volname)
+
+ def get_next_job(self, volname, running_jobs):
+ """
+ get the next job for asynchronous execution as (retcode, job) tuple. if no
+ jobs are available return (0, None) else return (0, job). on error return
+ (-ret, None). called under `self.lock`.
+ """
+ raise NotImplementedException()
+
+ def execute_job(self, volname, job, should_cancel):
+ """
+ execute a job for a volume. the job can block on I/O operations, sleep for long
+ hours and do all kinds of synchronous work. called outside `self.lock`.
+ """
+ raise NotImplementedException()
+
diff --git a/src/pybind/mgr/volumes/fs/exception.py b/src/pybind/mgr/volumes/fs/exception.py
new file mode 100644
index 00000000..4f903b99
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/exception.py
@@ -0,0 +1,63 @@
+class VolumeException(Exception):
+ def __init__(self, error_code, error_message):
+ self.errno = error_code
+ self.error_str = error_message
+
+ def to_tuple(self):
+ return self.errno, "", self.error_str
+
+ def __str__(self):
+ return "{0} ({1})".format(self.errno, self.error_str)
+
+class MetadataMgrException(Exception):
+ def __init__(self, error_code, error_message):
+ self.errno = error_code
+ self.error_str = error_message
+
+ def __str__(self):
+ return "{0} ({1})".format(self.errno, self.error_str)
+
+class IndexException(Exception):
+ def __init__(self, error_code, error_message):
+ self.errno = error_code
+ self.error_str = error_message
+
+ def __str__(self):
+ return "{0} ({1})".format(self.errno, self.error_str)
+
+class OpSmException(Exception):
+ def __init__(self, error_code, error_message):
+ self.errno = error_code
+ self.error_str = error_message
+
+ def __str__(self):
+ return "{0} ({1})".format(self.errno, self.error_str)
+
+class NotImplementedException(Exception):
+ pass
+
+class ClusterTimeout(Exception):
+ """
+ Exception indicating that we timed out trying to talk to the Ceph cluster,
+ either to the mons, or to any individual daemon that the mons indicate ought
+ to be up but isn't responding to us.
+ """
+ pass
+
+class ClusterError(Exception):
+ """
+ Exception indicating that the cluster returned an error to a command that
+ we thought should be successful based on our last knowledge of the cluster
+ state.
+ """
+ def __init__(self, action, result_code, result_str):
+ self._action = action
+ self._result_code = result_code
+ self._result_str = result_str
+
+ def __str__(self):
+ return "Error {0} (\"{1}\") while {2}".format(
+ self._result_code, self._result_str, self._action)
+
+class EvictionError(Exception):
+ pass
diff --git a/src/pybind/mgr/volumes/fs/fs_util.py b/src/pybind/mgr/volumes/fs/fs_util.py
new file mode 100644
index 00000000..6fe02f58
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/fs_util.py
@@ -0,0 +1,161 @@
+import os
+import errno
+import logging
+
+import cephfs
+import orchestrator
+
+from .exception import VolumeException
+
+log = logging.getLogger(__name__)
+
+def create_pool(mgr, pool_name, pg_num):
+ # create the given pool
+ command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
+ return mgr.mon_command(command)
+
+def remove_pool(mgr, pool_name):
+ command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
+ 'yes_i_really_really_mean_it': True}
+ return mgr.mon_command(command)
+
+def create_filesystem(mgr, fs_name, metadata_pool, data_pool):
+ command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
+ 'data': data_pool}
+ return mgr.mon_command(command)
+
+def remove_filesystem(mgr, fs_name):
+ command = {'prefix': 'fs fail', 'fs_name': fs_name}
+ r, outb, outs = mgr.mon_command(command)
+ if r != 0:
+ return r, outb, outs
+
+ command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
+ return mgr.mon_command(command)
+
+def create_mds(mgr, fs_name):
+ spec = orchestrator.StatelessServiceSpec()
+ spec.name = fs_name
+ try:
+ completion = mgr.add_stateless_service("mds", spec)
+ mgr._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ return 0, "", "Volume created successfully (no MDS daemons created)"
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to create MDS daemons")
+ return -errno.EINVAL, "", str(e)
+ return 0, "", ""
+
+def volume_exists(mgr, fs_name):
+ fs_map = mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == fs_name:
+ return True
+ return False
+
+def listdir(fs, dirpath):
+ """
+ Get the directory names (only dirs) for a given path
+ """
+ dirs = []
+ try:
+ with fs.opendir(dirpath) as dir_handle:
+ d = fs.readdir(dir_handle)
+ while d:
+ if (d.d_name not in (b".", b"..")) and d.is_dir():
+ dirs.append(d.d_name)
+ d = fs.readdir(dir_handle)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ return dirs
+
+def is_inherited_snap(snapname):
+ """
+ Returns True if the snapname is inherited else False
+ """
+ return snapname.startswith("_")
+
+def listsnaps(fs, volspec, snapdirpath, filter_inherited_snaps=False):
+ """
+ Get the snap names from a given snap directory path
+ """
+ if os.path.basename(snapdirpath) != volspec.snapshot_prefix.encode('utf-8'):
+ raise VolumeException(-errno.EINVAL, "Not a snap directory: {0}".format(snapdirpath))
+ snaps = []
+ try:
+ with fs.opendir(snapdirpath) as dir_handle:
+ d = fs.readdir(dir_handle)
+ while d:
+ if (d.d_name not in (b".", b"..")) and d.is_dir():
+ d_name = d.d_name.decode('utf-8')
+ if not is_inherited_snap(d_name):
+ snaps.append(d.d_name)
+ elif is_inherited_snap(d_name) and not filter_inherited_snaps:
+ snaps.append(d.d_name)
+ d = fs.readdir(dir_handle)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ return snaps
+
+def list_one_entry_at_a_time(fs, dirpath):
+ """
+ Get a directory entry (one entry a time)
+ """
+ try:
+ with fs.opendir(dirpath) as dir_handle:
+ d = fs.readdir(dir_handle)
+ while d:
+ if d.d_name not in (b".", b".."):
+ yield d
+ d = fs.readdir(dir_handle)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+def copy_file(fs, src, dst, mode, cancel_check=None):
+ """
+ Copy a regular file from @src to @dst. @dst is overwritten if it exists.
+ """
+ src_fd = dst_fd = None
+ try:
+ src_fd = fs.open(src, os.O_RDONLY);
+ dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode)
+ except cephfs.Error as e:
+ if src_fd is not None:
+ fs.close(src_fd)
+ if dst_fd is not None:
+ fs.close(dst_fd)
+ raise VolumeException(-e.args[0], e.args[1])
+
+ IO_SIZE = 8 * 1024 * 1024
+ try:
+ while True:
+ if cancel_check and cancel_check():
+ raise VolumeException(-errno.EINTR, "copy operation interrupted")
+ data = fs.read(src_fd, -1, IO_SIZE)
+ if not len(data):
+ break
+ written = 0
+ while written < len(data):
+ written += fs.write(dst_fd, data[written:], -1)
+ fs.fsync(dst_fd, 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ finally:
+ fs.close(src_fd)
+ fs.close(dst_fd)
+
+def get_ancestor_xattr(fs, path, attr):
+ """
+ Helper for reading layout information: if this xattr is missing
+ on the requested path, keep checking parents until we find it.
+ """
+ try:
+ return fs.getxattr(path, attr).decode('utf-8')
+ except cephfs.NoData as e:
+ if path == "/":
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ return get_ancestor_xattr(fs, os.path.split(path)[0], attr)
diff --git a/src/pybind/mgr/volumes/fs/operations/__init__.py b/src/pybind/mgr/volumes/fs/operations/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/__init__.py
diff --git a/src/pybind/mgr/volumes/fs/operations/access.py b/src/pybind/mgr/volumes/fs/operations/access.py
new file mode 100644
index 00000000..44430f59
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/access.py
@@ -0,0 +1,142 @@
+import errno
+import json
+try:
+ from typing import List
+except ImportError:
+ pass # For typing only
+
+def prepare_updated_caps_list(existing_caps, mds_cap_str, osd_cap_str, authorize=True):
+ caps_list = [] # type: List[str]
+ for k, v in existing_caps['caps'].items():
+ if k == 'mds' or k == 'osd':
+ continue
+ elif k == 'mon':
+ if not authorize and v == 'allow r':
+ continue
+ caps_list.extend((k,v))
+
+ if mds_cap_str:
+ caps_list.extend(('mds', mds_cap_str))
+ if osd_cap_str:
+ caps_list.extend(('osd', osd_cap_str))
+
+ if authorize and 'mon' not in caps_list:
+ caps_list.extend(('mon', 'allow r'))
+
+ return caps_list
+
+
+def allow_access(mgr, client_entity, want_mds_cap, want_osd_cap,
+ unwanted_mds_cap, unwanted_osd_cap, existing_caps):
+ if existing_caps is None:
+ ret, out, err = mgr.mon_command({
+ "prefix": "auth get-or-create",
+ "entity": client_entity,
+ "caps": ['mds', want_mds_cap, 'osd', want_osd_cap, 'mon', 'allow r'],
+ "format": "json"})
+ else:
+ cap = existing_caps[0]
+
+ def cap_update(
+ orig_mds_caps, orig_osd_caps, want_mds_cap,
+ want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
+
+ if not orig_mds_caps:
+ return want_mds_cap, want_osd_cap
+
+ mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
+ osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
+
+ if want_mds_cap in mds_cap_tokens:
+ return orig_mds_caps, orig_osd_caps
+
+ if unwanted_mds_cap in mds_cap_tokens:
+ mds_cap_tokens.remove(unwanted_mds_cap)
+ osd_cap_tokens.remove(unwanted_osd_cap)
+
+ mds_cap_tokens.append(want_mds_cap)
+ osd_cap_tokens.append(want_osd_cap)
+
+ return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
+
+ orig_mds_caps = cap['caps'].get('mds', "")
+ orig_osd_caps = cap['caps'].get('osd', "")
+
+ mds_cap_str, osd_cap_str = cap_update(
+ orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
+ unwanted_mds_cap, unwanted_osd_cap)
+
+ caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str)
+ mgr.mon_command(
+ {
+ "prefix": "auth caps",
+ 'entity': client_entity,
+ 'caps': caps_list
+ })
+ ret, out, err = mgr.mon_command(
+ {
+ 'prefix': 'auth get',
+ 'entity': client_entity,
+ 'format': 'json'
+ })
+
+ # Result expected like this:
+ # [
+ # {
+ # "entity": "client.foobar",
+ # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
+ # "caps": {
+ # "mds": "allow *",
+ # "mon": "allow *"
+ # }
+ # }
+ # ]
+
+ caps = json.loads(out)
+ assert len(caps) == 1
+ assert caps[0]['entity'] == client_entity
+ return caps[0]['key']
+
+def deny_access(mgr, client_entity, want_mds_caps, want_osd_caps):
+ ret, out, err = mgr.mon_command({
+ "prefix": "auth get",
+ "entity": client_entity,
+ "format": "json",
+ })
+
+ if ret == -errno.ENOENT:
+ # Already gone, great.
+ return
+
+ def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
+ mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
+ osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
+
+ for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
+ if want_mds_cap in mds_cap_tokens:
+ mds_cap_tokens.remove(want_mds_cap)
+ osd_cap_tokens.remove(want_osd_cap)
+ break
+
+ return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
+
+ cap = json.loads(out)[0]
+ orig_mds_caps = cap['caps'].get('mds', "")
+ orig_osd_caps = cap['caps'].get('osd', "")
+ mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
+ want_mds_caps, want_osd_caps)
+
+ caps_list = prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False)
+ if not caps_list:
+ mgr.mon_command(
+ {
+ 'prefix': 'auth rm',
+ 'entity': client_entity
+ })
+ else:
+ mgr.mon_command(
+ {
+ "prefix": "auth caps",
+ 'entity': client_entity,
+ 'caps': caps_list
+ })
diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py
new file mode 100644
index 00000000..a2b31f85
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py
@@ -0,0 +1,98 @@
+import os
+import uuid
+import stat
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .index import Index
+from ..exception import IndexException, VolumeException
+from ..fs_util import list_one_entry_at_a_time
+
+log = logging.getLogger(__name__)
+
+class CloneIndex(Index):
+ SUB_GROUP_NAME = "clone"
+ PATH_MAX = 4096
+
+ @property
+ def path(self):
+ return os.path.join(super(CloneIndex, self).path, CloneIndex.SUB_GROUP_NAME.encode('utf-8'))
+
+ def _track(self, sink_path):
+ tracking_id = str(uuid.uuid4())
+ source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
+ log.info("tracking-id {0} for path {1}".format(tracking_id, sink_path))
+
+ self.fs.symlink(sink_path, source_path)
+ return tracking_id
+
+ def track(self, sink_path):
+ try:
+ return self._track(sink_path)
+ except (VolumeException, cephfs.Error) as e:
+ if isinstance(e, cephfs.Error):
+ e = IndexException(-e.args[0], e.args[1])
+ elif isinstance(e, VolumeException):
+ e = IndexException(e.errno, e.error_str)
+ raise e
+
+ def untrack(self, tracking_id):
+ log.info("untracking {0}".format(tracking_id))
+ source_path = os.path.join(self.path, tracking_id.encode('utf-8'))
+ try:
+ self.fs.unlink(source_path)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+ def get_oldest_clone_entry(self, exclude=[]):
+ min_ctime_entry = None
+ exclude_tracking_ids = [v[0] for v in exclude]
+ log.debug("excluded tracking ids: {0}".format(exclude_tracking_ids))
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if dname not in exclude_tracking_ids and stat.S_ISLNK(st.st_mode):
+ if min_ctime_entry is None or st.st_ctime < min_ctime_entry[1].st_ctime:
+ min_ctime_entry = (dname, st)
+ if min_ctime_entry:
+ try:
+ linklen = min_ctime_entry[1].st_size
+ sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX)
+ return (min_ctime_entry[0], sink_path[:linklen])
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+ return None
+
+ def find_clone_entry_index(self, sink_path):
+ try:
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if stat.S_ISLNK(st.st_mode):
+ target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
+ if sink_path == target_path[:st.st_size]:
+ return dname
+ return None
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+def create_clone_index(fs, vol_spec):
+ clone_index = CloneIndex(fs, vol_spec)
+ try:
+ fs.mkdirs(clone_index.path, 0o700)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_clone_index(fs, vol_spec):
+ clone_index = CloneIndex(fs, vol_spec)
+ try:
+ fs.stat(clone_index.path)
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+ yield clone_index
diff --git a/src/pybind/mgr/volumes/fs/operations/group.py b/src/pybind/mgr/volumes/fs/operations/group.py
new file mode 100644
index 00000000..ae334563
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/group.py
@@ -0,0 +1,186 @@
+import os
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .snapshot_util import mksnap, rmsnap
+from .template import GroupTemplate
+from ..fs_util import listdir, listsnaps, get_ancestor_xattr
+from ..exception import VolumeException
+
+log = logging.getLogger(__name__)
+
+class Group(GroupTemplate):
+ # Reserved subvolume group name which we use in paths for subvolumes
+ # that are not assigned to a group (i.e. created with group=None)
+ NO_GROUP_NAME = "_nogroup"
+
+ def __init__(self, fs, vol_spec, groupname):
+ assert groupname != Group.NO_GROUP_NAME
+ self.fs = fs
+ self.user_id = None
+ self.group_id = None
+ self.vol_spec = vol_spec
+ self.groupname = groupname if groupname else Group.NO_GROUP_NAME
+
+ @property
+ def path(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8'))
+
+ @property
+ def group_name(self):
+ return self.groupname
+
+ @property
+ def uid(self):
+ return self.user_id
+
+ @uid.setter
+ def uid(self, val):
+ self.user_id = val
+
+ @property
+ def gid(self):
+ return self.group_id
+
+ @gid.setter
+ def gid(self, val):
+ self.group_id = val
+
+ def is_default_group(self):
+ return self.groupname == Group.NO_GROUP_NAME
+
+ def list_subvolumes(self):
+ try:
+ return listdir(self.fs, self.path)
+ except VolumeException as ve:
+ # listing a default group when it's not yet created
+ if ve.errno == -errno.ENOENT and self.is_default_group():
+ return []
+ raise
+
+ def create_snapshot(self, snapname):
+ snappath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ mksnap(self.fs, snappath)
+
+ def remove_snapshot(self, snapname):
+ snappath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ rmsnap(self.fs, snappath)
+
+ def list_snapshots(self):
+ try:
+ dirpath = os.path.join(self.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+ return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return []
+ raise
+
+def create_group(fs, vol_spec, groupname, pool, mode, uid, gid):
+ """
+ create a subvolume group.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param groupname: subvolume group name
+ :param pool: the RADOS pool where the data objects of the subvolumes will be stored
+ :param mode: the user permissions
+ :param uid: the user identifier
+ :param gid: the group identifier
+ :return: None
+ """
+ group = Group(fs, vol_spec, groupname)
+ path = group.path
+ fs.mkdirs(path, mode)
+ try:
+ if not pool:
+ pool = get_ancestor_xattr(fs, path, "ceph.dir.layout.pool")
+ try:
+ fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL,
+ "Invalid pool layout '{0}'. It must be a valid data pool".format(pool))
+ if uid is None:
+ uid = 0
+ else:
+ try:
+ uid = int(uid)
+ if uid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid UID")
+ if gid is None:
+ gid = 0
+ else:
+ try:
+ gid = int(gid)
+ if gid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid GID")
+ fs.chown(path, uid, gid)
+ except (cephfs.Error, VolumeException) as e:
+ try:
+ # cleanup group path on best effort basis
+ log.debug("cleaning up subvolume group path: {0}".format(path))
+ fs.rmdir(path)
+ except cephfs.Error as ce:
+ log.debug("failed to clean up subvolume group {0} with path: {1} ({2})".format(groupname, path, ce))
+ if isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+def remove_group(fs, vol_spec, groupname):
+ """
+ remove a subvolume group.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param groupname: subvolume group name
+ :return: None
+ """
+ group = Group(fs, vol_spec, groupname)
+ try:
+ fs.rmdir(group.path)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_group(fs, vol_spec, groupname):
+ """
+ open a subvolume group. This API is to be used as a context manager.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param groupname: subvolume group name
+ :return: yields a group object (subclass of GroupTemplate)
+ """
+ group = Group(fs, vol_spec, groupname)
+ try:
+ st = fs.stat(group.path)
+ group.uid = int(st.st_uid)
+ group.gid = int(st.st_gid)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ if not group.is_default_group():
+ raise VolumeException(-errno.ENOENT, "subvolume group '{0}' does not exist".format(groupname))
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+ yield group
+
+@contextmanager
+def open_group_unique(fs, vol_spec, groupname, c_group, c_groupname):
+ if groupname == c_groupname:
+ yield c_group
+ else:
+ with open_group(fs, vol_spec, groupname) as group:
+ yield group
diff --git a/src/pybind/mgr/volumes/fs/operations/index.py b/src/pybind/mgr/volumes/fs/operations/index.py
new file mode 100644
index 00000000..0e4296d7
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/index.py
@@ -0,0 +1,23 @@
+import errno
+import os
+
+from ..exception import VolumeException
+from .template import GroupTemplate
+
+class Index(GroupTemplate):
+ GROUP_NAME = "_index"
+
+ def __init__(self, fs, vol_spec):
+ self.fs = fs
+ self.vol_spec = vol_spec
+ self.groupname = Index.GROUP_NAME
+
+ @property
+ def path(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8'))
+
+ def track(self, *args):
+ raise VolumeException(-errno.EINVAL, "operation not supported.")
+
+ def untrack(self, tracking_id):
+ raise VolumeException(-errno.EINVAL, "operation not supported.")
diff --git a/src/pybind/mgr/volumes/fs/operations/lock.py b/src/pybind/mgr/volumes/fs/operations/lock.py
new file mode 100644
index 00000000..ab5f1d04
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/lock.py
@@ -0,0 +1,42 @@
+from contextlib import contextmanager
+import logging
+from threading import Lock
+
+log = logging.getLogger(__name__)
+
+# singleton design pattern taken from http://www.aleax.it/5ep.html
+
+class GlobalLock(object):
+ """
+ Global lock to serialize operations in mgr/volumes. This lock
+ is currently held when accessing (opening) a volume to perform
+ group/subvolume operations. Since this is a big lock, it's rather
+ inefficient -- but right now it's ok since mgr/volumes does not
+ expect concurrent operations via its APIs.
+
+ As and when features get added (such as clone, where mgr/volumes
+ would maintain subvolume states in the filesystem), there might
+ be a need to allow concurrent operations. In that case it would
+ be nice to implement an efficient path based locking mechanism.
+
+ See: https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project6_report.pdf
+ """
+ _shared_state = {
+ 'lock' : Lock(),
+ 'init' : False
+ }
+
+ def __init__(self):
+ with self._shared_state['lock']:
+ if not self._shared_state['init']:
+ self._shared_state['init'] = True
+ # share this state among all instances
+ self.__dict__ = self._shared_state
+
+ @contextmanager
+ def lock_op(self):
+ log.debug("entering global lock")
+ with self._shared_state['lock']:
+ log.debug("acquired global lock")
+ yield
+ log.debug("exited global lock")
diff --git a/src/pybind/mgr/volumes/fs/operations/rankevicter.py b/src/pybind/mgr/volumes/fs/operations/rankevicter.py
new file mode 100644
index 00000000..5b945c38
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/rankevicter.py
@@ -0,0 +1,114 @@
+import errno
+import json
+import logging
+import threading
+import time
+
+from .volume import get_mds_map
+from ..exception import ClusterTimeout, ClusterError
+
+log = logging.getLogger(__name__)
+
+class RankEvicter(threading.Thread):
+ """
+ Thread for evicting client(s) from a particular MDS daemon instance.
+
+ This is more complex than simply sending a command, because we have to
+ handle cases where MDS daemons might not be fully up yet, and/or might
+ be transiently unresponsive to commands.
+ """
+ class GidGone(Exception):
+ pass
+
+ POLL_PERIOD = 5
+
+ def __init__(self, mgr, fs, client_spec, volname, rank, gid, mds_map, ready_timeout):
+ """
+ :param client_spec: list of strings, used as filter arguments to "session evict"
+ pass ["id=123"] to evict a single client with session id 123.
+ """
+ self.volname = volname
+ self.rank = rank
+ self.gid = gid
+ self._mds_map = mds_map
+ self._client_spec = client_spec
+ self._fs = fs
+ self._ready_timeout = ready_timeout
+ self._ready_waited = 0
+ self.mgr = mgr
+
+ self.success = False
+ self.exception = None
+
+ super(RankEvicter, self).__init__()
+
+ def _ready_to_evict(self):
+ if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
+ log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
+ self._client_spec, self.rank, self.gid
+ ))
+ raise RankEvicter.GidGone()
+
+ info = self._mds_map['info']["gid_{0}".format(self.gid)]
+ log.debug("_ready_to_evict: state={0}".format(info['state']))
+ return info['state'] in ["up:active", "up:clientreplay"]
+
+ def _wait_for_ready(self):
+ """
+ Wait for that MDS rank to reach an active or clientreplay state, and
+ not be laggy.
+ """
+ while not self._ready_to_evict():
+ if self._ready_waited > self._ready_timeout:
+ raise ClusterTimeout()
+
+ time.sleep(self.POLL_PERIOD)
+ self._ready_waited += self.POLL_PERIOD
+ self._mds_map = get_mds_map(self.mgr, self.volname)
+
+ def _evict(self):
+ """
+ Run the eviction procedure. Return true on success, false on errors.
+ """
+
+ # Wait til the MDS is believed by the mon to be available for commands
+ try:
+ self._wait_for_ready()
+ except self.GidGone:
+ return True
+
+ # Then send it an evict
+ ret = -errno.ETIMEDOUT
+ while ret == -errno.ETIMEDOUT:
+ log.debug("mds_command: {0}, {1}".format(
+ "%s" % self.gid, ["session", "evict"] + self._client_spec
+ ))
+ ret, outb, outs = self._fs.mds_command(
+ "%s" % self.gid,
+ json.dumps({
+ "prefix": "session evict",
+ "filters": self._client_spec
+ }), "")
+ log.debug("mds_command: complete {0} {1}".format(ret, outs))
+
+ # If we get a clean response, great, it's gone from that rank.
+ if ret == 0:
+ return True
+ elif ret == -errno.ETIMEDOUT:
+ # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
+ self._mds_map = get_mds_map(self.mgr, self.volname)
+ try:
+ self._wait_for_ready()
+ except self.GidGone:
+ return True
+ else:
+ raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
+
+ def run(self):
+ try:
+ self._evict()
+ except Exception as e:
+ self.success = False
+ self.exception = e
+ else:
+ self.success = True
diff --git a/src/pybind/mgr/volumes/fs/operations/resolver.py b/src/pybind/mgr/volumes/fs/operations/resolver.py
new file mode 100644
index 00000000..a9543654
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/resolver.py
@@ -0,0 +1,26 @@
+import os
+
+from .group import Group
+
+def splitall(path):
+ if path == "/":
+ return ["/"]
+ s = os.path.split(path)
+ return splitall(s[0]) + [s[1]]
+
+def resolve(vol_spec, path):
+ parts = splitall(path)
+ if len(parts) != 4 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix:
+ return None
+ groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+ subvolname = parts[3]
+ return (groupname, subvolname)
+
+def resolve_trash(vol_spec, path):
+ parts = splitall(path)
+ if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \
+ parts[4] != '.trash':
+ return None
+ groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+ subvolname = parts[3]
+ return (groupname, subvolname)
diff --git a/src/pybind/mgr/volumes/fs/operations/snapshot_util.py b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py
new file mode 100644
index 00000000..2223c58e
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/snapshot_util.py
@@ -0,0 +1,30 @@
+import os
+import errno
+
+import cephfs
+
+from ..exception import VolumeException
+
+def mksnap(fs, snappath):
+ """
+ Create a snapshot, or do nothing if it already exists.
+ """
+ try:
+ # snap create does not accept mode -- use default
+ fs.mkdir(snappath, 0o755)
+ except cephfs.ObjectExists:
+ return
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+def rmsnap(fs, snappath):
+ """
+ Remove a snapshot
+ """
+ try:
+ fs.stat(snappath)
+ fs.rmdir(snappath)
+ except cephfs.ObjectNotFound:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(os.path.basename(snappath)))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
diff --git a/src/pybind/mgr/volumes/fs/operations/subvolume.py b/src/pybind/mgr/volumes/fs/operations/subvolume.py
new file mode 100644
index 00000000..dc36477b
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/subvolume.py
@@ -0,0 +1,74 @@
+import os
+import errno
+from contextlib import contextmanager
+
+from ..exception import VolumeException
+from .template import SubvolumeOpType
+
+from .versions import loaded_subvolumes
+
+def create_subvol(mgr, fs, vol_spec, group, subvolname, size, isolate_nspace, pool, mode, uid, gid):
+ """
+ create a subvolume (create a subvolume with the max known version).
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the subvolume
+ :param size: In bytes, or None for no size limit
+ :param isolate_nspace: If true, use separate RADOS namespace for this subvolume
+ :param pool: the RADOS pool where the data objects of the subvolumes will be stored
+ :param mode: the user permissions
+ :param uid: the user identifier
+ :param gid: the group identifier
+ :return: None
+ """
+ subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname)
+ subvolume.create(size, isolate_nspace, pool, mode, uid, gid)
+
+def create_clone(mgr, fs, vol_spec, group, subvolname, pool, source_volume, source_subvolume, snapname):
+ """
+ create a cloned subvolume.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the clone
+ :param subvolname: clone subvolume nam
+ :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored
+ :param source_volume: source subvolumes volume name
+ :param source_subvolume: source (parent) subvolume object
+ :param snapname: source subvolume snapshot
+ :return None
+ """
+ subvolume = loaded_subvolumes.get_subvolume_object_max(mgr, fs, vol_spec, group, subvolname)
+ subvolume.create_clone(pool, source_volume, source_subvolume, snapname)
+
+def remove_subvol(mgr, fs, vol_spec, group, subvolname, force=False, retainsnaps=False):
+ """
+ remove a subvolume.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the subvolume
+ :param subvolname: subvolume name
+ :param force: force remove subvolumes
+ :return: None
+ """
+ op_type = SubvolumeOpType.REMOVE if not force else SubvolumeOpType.REMOVE_FORCE
+ with open_subvol(mgr, fs, vol_spec, group, subvolname, op_type) as subvolume:
+ subvolume.remove(retainsnaps)
+
+@contextmanager
+def open_subvol(mgr, fs, vol_spec, group, subvolname, op_type):
+ """
+ open a subvolume. This API is to be used as a context manager.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :param group: group object for the subvolume
+ :param subvolname: subvolume name
+ :param op_type: operation type for which subvolume is being opened
+ :return: yields a subvolume object (subclass of SubvolumeTemplate)
+ """
+ subvolume = loaded_subvolumes.get_subvolume_object(mgr, fs, vol_spec, group, subvolname)
+ subvolume.open(op_type)
+ yield subvolume
diff --git a/src/pybind/mgr/volumes/fs/operations/template.py b/src/pybind/mgr/volumes/fs/operations/template.py
new file mode 100644
index 00000000..d35ad0de
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/template.py
@@ -0,0 +1,173 @@
+import errno
+
+from enum import Enum, unique
+
+from ..exception import VolumeException
+
+class GroupTemplate(object):
+ def list_subvolumes(self):
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create_snapshot(self, snapname):
+ """
+ create a subvolume group snapshot.
+
+ :param: group snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def remove_snapshot(self, snapname):
+ """
+ remove a subvolume group snapshot.
+
+ :param: group snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def list_snapshots(self):
+ """
+ list all subvolume group snapshots.
+
+ :param: None
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+@unique
+class SubvolumeOpType(Enum):
+ CREATE = 'create'
+ REMOVE = 'rm'
+ REMOVE_FORCE = 'rm-force'
+ PIN = 'pin'
+ LIST = 'ls'
+ GETPATH = 'getpath'
+ INFO = 'info'
+ RESIZE = 'resize'
+ SNAP_CREATE = 'snap-create'
+ SNAP_REMOVE = 'snap-rm'
+ SNAP_LIST = 'snap-ls'
+ SNAP_INFO = 'snap-info'
+ SNAP_PROTECT = 'snap-protect'
+ SNAP_UNPROTECT = 'snap-unprotect'
+ CLONE_SOURCE = 'clone-source'
+ CLONE_CREATE = 'clone-create'
+ CLONE_STATUS = 'clone-status'
+ CLONE_CANCEL = 'clone-cancel'
+ CLONE_INTERNAL = 'clone_internal'
+ ALLOW_ACCESS = 'allow-access'
+ DENY_ACCESS = 'deny-access'
+ AUTH_LIST = 'auth-list'
+ EVICT = 'evict'
+
+class SubvolumeTemplate(object):
+ VERSION = None # type: int
+
+ @staticmethod
+ def version():
+ return SubvolumeTemplate.VERSION
+
+ def open(self, op_type):
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def status(self):
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ """
+ set up metadata, pools and auth for a subvolume.
+
+ This function is idempotent. It is safe to call this again
+ for an already-created subvolume, even if it is in use.
+
+ :param size: In bytes, or None for no size limit
+ :param isolate_nspace: If true, use separate RADOS namespace for this subvolume
+ :param pool: the RADOS pool where the data objects of the subvolumes will be stored
+ :param mode: the user permissions
+ :param uid: the user identifier
+ :param gid: the group identifier
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ """
+ prepare a subvolume to be cloned.
+
+ :param pool: the RADOS pool where the data objects of the cloned subvolume will be stored
+ :param source_volname: source volume of snapshot
+ :param source_subvolume: source subvolume of snapshot
+ :param snapname: snapshot name to be cloned from
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def remove(self):
+ """
+ make a subvolume inaccessible to guests.
+
+ This function is idempotent. It is safe to call this again
+
+ :param: None
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def resize(self, newsize, nshrink):
+ """
+ resize a subvolume
+
+ :param newsize: new size In bytes (or inf/infinite)
+ :return: new quota size and used bytes as a tuple
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def create_snapshot(self, snapname):
+ """
+ snapshot a subvolume.
+
+ :param: subvolume snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def remove_snapshot(self, snapname):
+ """
+ remove a subvolume snapshot.
+
+ :param: subvolume snapshot name
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def list_snapshots(self):
+ """
+ list all subvolume snapshots.
+
+ :param: None
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def attach_snapshot(self, snapname, tgt_subvolume):
+ """
+ attach a snapshot to a target cloned subvolume. the target subvolume
+ should be an empty subvolume (type "clone") in "pending" state.
+
+ :param: snapname: snapshot to attach to a clone
+ :param: tgt_subvolume: target clone subvolume
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
+ def detach_snapshot(self, snapname, tgt_subvolume):
+ """
+ detach a snapshot from a target cloned subvolume. the target subvolume
+ should either be in "failed" or "completed" state.
+
+ :param: snapname: snapshot to detach from a clone
+ :param: tgt_subvolume: target clone subvolume
+ :return: None
+ """
+ raise VolumeException(-errno.ENOTSUP, "operation not supported.")
diff --git a/src/pybind/mgr/volumes/fs/operations/trash.py b/src/pybind/mgr/volumes/fs/operations/trash.py
new file mode 100644
index 00000000..66f1d71c
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/trash.py
@@ -0,0 +1,145 @@
+import os
+import uuid
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .template import GroupTemplate
+from ..fs_util import listdir
+from ..exception import VolumeException
+
+log = logging.getLogger(__name__)
+
+class Trash(GroupTemplate):
+ GROUP_NAME = "_deleting"
+
+ def __init__(self, fs, vol_spec):
+ self.fs = fs
+ self.vol_spec = vol_spec
+ self.groupname = Trash.GROUP_NAME
+
+ @property
+ def path(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), self.groupname.encode('utf-8'))
+
+ @property
+ def unique_trash_path(self):
+ """
+ return a unique trash directory entry path
+ """
+ return os.path.join(self.path, str(uuid.uuid4()).encode('utf-8'))
+
+ def _get_single_dir_entry(self, exclude_list=[]):
+ exclude_list.extend((b".", b".."))
+ try:
+ with self.fs.opendir(self.path) as d:
+ entry = self.fs.readdir(d)
+ while entry:
+ if entry.d_name not in exclude_list:
+ return entry.d_name
+ entry = self.fs.readdir(d)
+ return None
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def get_trash_entry(self, exclude_list):
+ """
+ get a trash entry excluding entries provided.
+
+ :praram exclude_list: entries to exclude
+ :return: trash entry
+ """
+ return self._get_single_dir_entry(exclude_list)
+
+ def purge(self, trashpath, should_cancel):
+ """
+ purge a trash entry.
+
+ :praram trash_entry: the trash entry to purge
+ :praram should_cancel: callback to check if the purge should be aborted
+ :return: None
+ """
+ def rmtree(root_path):
+ log.debug("rmtree {0}".format(root_path))
+ try:
+ with self.fs.opendir(root_path) as dir_handle:
+ d = self.fs.readdir(dir_handle)
+ while d and not should_cancel():
+ if d.d_name not in (b".", b".."):
+ d_full = os.path.join(root_path, d.d_name)
+ if d.is_dir():
+ rmtree(d_full)
+ else:
+ self.fs.unlink(d_full)
+ d = self.fs.readdir(dir_handle)
+ except cephfs.ObjectNotFound:
+ return
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ # remove the directory only if we were not asked to cancel
+ # (else we would fail to remove this anyway)
+ if not should_cancel():
+ self.fs.rmdir(root_path)
+
+ # catch any unlink errors
+ try:
+ rmtree(trashpath)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def dump(self, path):
+ """
+ move an filesystem entity to trash can.
+
+ :praram path: the filesystem path to be moved
+ :return: None
+ """
+ try:
+ self.fs.rename(path, self.unique_trash_path)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def link(self, path, bname):
+ pth = os.path.join(self.path, bname)
+ try:
+ self.fs.symlink(path, pth)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def delink(self, bname):
+ pth = os.path.join(self.path, bname)
+ try:
+ self.fs.unlink(pth)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+def create_trashcan(fs, vol_spec):
+ """
+ create a trash can.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :return: None
+ """
+ trashcan = Trash(fs, vol_spec)
+ try:
+ fs.mkdirs(trashcan.path, 0o700)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+@contextmanager
+def open_trashcan(fs, vol_spec):
+ """
+ open a trash can. This API is to be used as a context manager.
+
+ :param fs: ceph filesystem handle
+ :param vol_spec: volume specification
+ :return: yields a trash can object (subclass of GroupTemplate)
+ """
+ trashcan = Trash(fs, vol_spec)
+ try:
+ fs.stat(trashcan.path)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ yield trashcan
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/__init__.py b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
new file mode 100644
index 00000000..3dcdd7c1
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/__init__.py
@@ -0,0 +1,109 @@
+import errno
+import logging
+import importlib
+
+import cephfs
+
+from .subvolume_base import SubvolumeBase
+from .subvolume_attrs import SubvolumeTypes
+from .subvolume_v1 import SubvolumeV1
+from .subvolume_v2 import SubvolumeV2
+from .metadata_manager import MetadataManager
+from .op_sm import SubvolumeOpSm
+from ..template import SubvolumeOpType
+from ...exception import MetadataMgrException, OpSmException, VolumeException
+
+log = logging.getLogger(__name__)
+
+class SubvolumeLoader(object):
+ INVALID_VERSION = -1
+
+ SUPPORTED_MODULES = ['subvolume_v1.SubvolumeV1', 'subvolume_v2.SubvolumeV2']
+
+ def __init__(self):
+ self.max_version = SubvolumeLoader.INVALID_VERSION
+ self.versions = {}
+
+ def _load_module(self, mod_cls):
+ mod_name, cls_name = mod_cls.split('.')
+ mod = importlib.import_module('.versions.{0}'.format(mod_name), package='volumes.fs.operations')
+ return getattr(mod, cls_name)
+
+ def _load_supported_versions(self):
+ for mod_cls in SubvolumeLoader.SUPPORTED_MODULES:
+ cls = self._load_module(mod_cls)
+ log.info("loaded v{0} subvolume".format(cls.version()))
+ if self.max_version is not None or cls.version() > self.max_version:
+ self.max_version = cls.version()
+ self.versions[cls.version()] = cls
+ if self.max_version == SubvolumeLoader.INVALID_VERSION:
+ raise VolumeException(-errno.EINVAL, "no subvolume version available")
+ log.info("max subvolume version is v{0}".format(self.max_version))
+
+ def _get_subvolume_version(self, version):
+ try:
+ return self.versions[version]
+ except KeyError:
+ raise VolumeException(-errno.EINVAL, "subvolume class v{0} does not exist".format(version))
+
+ def get_subvolume_object_max(self, mgr, fs, vol_spec, group, subvolname):
+ return self._get_subvolume_version(self.max_version)(mgr, fs, vol_spec, group, subvolname)
+
+ def upgrade_to_v2_subvolume(self, subvolume):
+ # legacy mode subvolumes cannot be upgraded to v2
+ if subvolume.legacy_mode:
+ return
+
+ version = int(subvolume.metadata_mgr.get_global_option('version'))
+ if version >= SubvolumeV2.version():
+ return
+
+ v1_subvolume = self._get_subvolume_version(version)(subvolume.mgr, subvolume.fs, subvolume.vol_spec, subvolume.group, subvolume.subvolname)
+ try:
+ v1_subvolume.open(SubvolumeOpType.SNAP_LIST)
+ except VolumeException as ve:
+ # if volume is not ready for snapshot listing, do not upgrade at present
+ if ve.errno == -errno.EAGAIN:
+ return
+ raise
+
+ # v1 subvolumes with snapshots cannot be upgraded to v2
+ if v1_subvolume.list_snapshots():
+ return
+
+ subvolume.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_VERSION, SubvolumeV2.version())
+ subvolume.metadata_mgr.flush()
+
+ def upgrade_legacy_subvolume(self, fs, subvolume):
+ assert subvolume.legacy_mode
+ try:
+ fs.mkdirs(subvolume.legacy_dir, 0o700)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], "error accessing subvolume")
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+ qpath = subvolume.base_path.decode('utf-8')
+ # legacy is only upgradable to v1
+ subvolume.init_config(SubvolumeV1.version(), subvolume_type, qpath, initial_state)
+
+ def get_subvolume_object(self, mgr, fs, vol_spec, group, subvolname, upgrade=True):
+ subvolume = SubvolumeBase(mgr, fs, vol_spec, group, subvolname)
+ try:
+ subvolume.discover()
+ self.upgrade_to_v2_subvolume(subvolume)
+ version = int(subvolume.metadata_mgr.get_global_option('version'))
+ return self._get_subvolume_version(version)(mgr, fs, vol_spec, group, subvolname, legacy=subvolume.legacy_mode)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT and upgrade:
+ self.upgrade_legacy_subvolume(fs, subvolume)
+ return self.get_subvolume_object(mgr, fs, vol_spec, group, subvolname, upgrade=False)
+ else:
+ # log the actual error and generalize error string returned to user
+ log.error("error accessing subvolume metadata for '{0}' ({1})".format(subvolname, me))
+ raise VolumeException(-errno.EINVAL, "error accessing subvolume metadata")
+
+loaded_subvolumes = SubvolumeLoader()
+loaded_subvolumes._load_supported_versions()
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py
new file mode 100644
index 00000000..259dcd0e
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py
@@ -0,0 +1,208 @@
+from contextlib import contextmanager
+import os
+import fcntl
+import json
+import logging
+import struct
+import uuid
+
+import cephfs
+
+from ..group import Group
+
+log = logging.getLogger(__name__)
+
+class AuthMetadataError(Exception):
+ pass
+
+class AuthMetadataManager(object):
+
+ # Current version
+ version = 6
+
+ # Filename extensions for meta files.
+ META_FILE_EXT = ".meta"
+ DEFAULT_VOL_PREFIX = "/volumes"
+
+ def __init__(self, fs):
+ self.fs = fs
+ self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
+ self.volume_prefix = self.DEFAULT_VOL_PREFIX
+
+ def _to_bytes(self, param):
+ '''
+ Helper method that returns byte representation of the given parameter.
+ '''
+ if isinstance(param, str):
+ return param.encode('utf-8')
+ elif param is None:
+ return param
+ else:
+ return str(param).encode('utf-8')
+
+ def _subvolume_metadata_path(self, group_name, subvol_name):
+ return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
+ group_name if group_name != Group.NO_GROUP_NAME else "",
+ subvol_name,
+ self.META_FILE_EXT))
+
+ def _check_compat_version(self, compat_version):
+ if self.version < compat_version:
+ msg = ("The current version of AuthMetadataManager, version {0} "
+ "does not support the required feature. Need version {1} "
+ "or greater".format(self.version, compat_version)
+ )
+ log.error(msg)
+ raise AuthMetadataError(msg)
+
+ def _metadata_get(self, path):
+ """
+ Return a deserialized JSON object, or None
+ """
+ fd = self.fs.open(path, "r")
+ # TODO iterate instead of assuming file < 4MB
+ read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+ self.fs.close(fd)
+ if read_bytes:
+ return json.loads(read_bytes.decode())
+ else:
+ return None
+
+ def _metadata_set(self, path, data):
+ serialized = json.dumps(data)
+ fd = self.fs.open(path, "w")
+ try:
+ self.fs.write(fd, self._to_bytes(serialized), 0)
+ self.fs.fsync(fd, 0)
+ finally:
+ self.fs.close(fd)
+
+ def _lock(self, path):
+ @contextmanager
+ def fn():
+ while(1):
+ fd = self.fs.open(path, os.O_CREAT, 0o755)
+ self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+
+ # The locked file will be cleaned up sometime. It could be
+ # unlinked by consumer e.g., an another manila-share service
+ # instance, before lock was applied on it. Perform checks to
+ # ensure that this does not happen.
+ try:
+ statbuf = self.fs.stat(path)
+ except cephfs.ObjectNotFound:
+ self.fs.close(fd)
+ continue
+
+ fstatbuf = self.fs.fstat(fd)
+ if statbuf.st_ino == fstatbuf.st_ino:
+ break
+
+ try:
+ yield
+ finally:
+ self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+ self.fs.close(fd)
+
+ return fn()
+
+ def _auth_metadata_path(self, auth_id):
+ return os.path.join(self.volume_prefix, "${0}{1}".format(
+ auth_id, self.META_FILE_EXT))
+
+ def auth_lock(self, auth_id):
+ return self._lock(self._auth_metadata_path(auth_id))
+
+ def auth_metadata_get(self, auth_id):
+ """
+ Call me with the metadata locked!
+
+ Check whether a auth metadata structure can be decoded by the current
+ version of AuthMetadataManager.
+
+ Return auth metadata that the current version of AuthMetadataManager
+ can decode.
+ """
+ auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
+
+ if auth_metadata:
+ self._check_compat_version(auth_metadata['compat_version'])
+
+ return auth_metadata
+
+ def auth_metadata_set(self, auth_id, data):
+ """
+ Call me with the metadata locked!
+
+ Fsync the auth metadata.
+
+ Add two version attributes to the auth metadata,
+ 'compat_version', the minimum AuthMetadataManager version that can
+ decode the metadata, and 'version', the AuthMetadataManager version
+ that encoded the metadata.
+ """
+ data['compat_version'] = 6
+ data['version'] = self.version
+ return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+ def create_subvolume_metadata_file(self, group_name, subvol_name):
+ """
+ Create a subvolume metadata file, if it does not already exist, to store
+ data about auth ids having access to the subvolume
+ """
+ fd = self.fs.open(self._subvolume_metadata_path(group_name, subvol_name),
+ os.O_CREAT, 0o755)
+ self.fs.close(fd)
+
+ def delete_subvolume_metadata_file(self, group_name, subvol_name):
+ vol_meta_path = self._subvolume_metadata_path(group_name, subvol_name)
+ try:
+ self.fs.unlink(vol_meta_path)
+ except cephfs.ObjectNotFound:
+ pass
+
+ def subvol_metadata_lock(self, group_name, subvol_name):
+ """
+ Return a ContextManager which locks the authorization metadata for
+ a particular subvolume, and persists a flag to the metadata indicating
+ that it is currently locked, so that we can detect dirty situations
+ during recovery.
+
+ This lock isn't just to make access to the metadata safe: it's also
+ designed to be used over the two-step process of checking the
+ metadata and then responding to an authorization request, to
+ ensure that at the point we respond the metadata hasn't changed
+ in the background. It's key to how we avoid security holes
+ resulting from races during that problem ,
+ """
+ return self._lock(self._subvolume_metadata_path(group_name, subvol_name))
+
+ def subvol_metadata_get(self, group_name, subvol_name):
+ """
+ Call me with the metadata locked!
+
+ Check whether a subvolume metadata structure can be decoded by the current
+ version of AuthMetadataManager.
+
+ Return a subvolume_metadata structure that the current version of
+ AuthMetadataManager can decode.
+ """
+ subvolume_metadata = self._metadata_get(self._subvolume_metadata_path(group_name, subvol_name))
+
+ if subvolume_metadata:
+ self._check_compat_version(subvolume_metadata['compat_version'])
+
+ return subvolume_metadata
+
+ def subvol_metadata_set(self, group_name, subvol_name, data):
+ """
+ Call me with the metadata locked!
+
+ Add two version attributes to the subvolume metadata,
+ 'compat_version', the minimum AuthMetadataManager version that can
+ decode the metadata and 'version', the AuthMetadataManager version
+ that encoded the metadata.
+ """
+ data['compat_version'] = 1
+ data['version'] = self.version
+ return self._metadata_set(self._subvolume_metadata_path(group_name, subvol_name), data)
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
new file mode 100644
index 00000000..1b6c4327
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
@@ -0,0 +1,144 @@
+import os
+import errno
+import logging
+import sys
+
+if sys.version_info >= (3, 2):
+ import configparser
+else:
+ import ConfigParser as configparser
+
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
+
+import cephfs
+
+from ...exception import MetadataMgrException
+
+log = logging.getLogger(__name__)
+
+class MetadataManager(object):
+ GLOBAL_SECTION = "GLOBAL"
+ GLOBAL_META_KEY_VERSION = "version"
+ GLOBAL_META_KEY_TYPE = "type"
+ GLOBAL_META_KEY_PATH = "path"
+ GLOBAL_META_KEY_STATE = "state"
+
+ MAX_IO_BYTES = 8 * 1024
+
+ def __init__(self, fs, config_path, mode):
+ self.fs = fs
+ self.mode = mode
+ self.config_path = config_path
+ if sys.version_info >= (3, 2):
+ self.config = configparser.ConfigParser()
+ else:
+ self.config = configparser.SafeConfigParser()
+
+ def refresh(self):
+ fd = None
+ conf_data = StringIO()
+ try:
+ log.debug("opening config {0}".format(self.config_path))
+ fd = self.fs.open(self.config_path, os.O_RDONLY)
+ while True:
+ data = self.fs.read(fd, -1, MetadataManager.MAX_IO_BYTES)
+ if not len(data):
+ break
+ conf_data.write(data.decode('utf-8'))
+ conf_data.seek(0)
+ self.config.readfp(conf_data)
+ except cephfs.ObjectNotFound:
+ raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path))
+ except cephfs.Error as e:
+ raise MetadataMgrException(-e.args[0], e.args[1])
+ finally:
+ if fd is not None:
+ self.fs.close(fd)
+
+ def flush(self):
+ # cull empty sections
+ for section in list(self.config.sections()):
+ if len(self.config.items(section)) == 0:
+ self.config.remove_section(section)
+
+ conf_data = StringIO()
+ self.config.write(conf_data)
+ conf_data.seek(0)
+
+ fd = None
+ try:
+ fd = self.fs.open(self.config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
+ wrote = 0
+ while True:
+ data = conf_data.read()
+ if not len(data):
+ break
+ wrote += self.fs.write(fd, data.encode('utf-8'), -1)
+ self.fs.fsync(fd, 0)
+ log.info("wrote {0} bytes to config {1}".format(wrote, self.config_path))
+ except cephfs.Error as e:
+ raise MetadataMgrException(-e.args[0], e.args[1])
+ finally:
+ if fd is not None:
+ self.fs.close(fd)
+
+ def init(self, version, typ, path, state):
+ # you may init just once before refresh (helps to overwrite conf)
+ if self.config.has_section(MetadataManager.GLOBAL_SECTION):
+ raise MetadataMgrException(-errno.EINVAL, "init called on an existing config")
+
+ self.add_section(MetadataManager.GLOBAL_SECTION)
+ self.update_section_multi(
+ MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version),
+ MetadataManager.GLOBAL_META_KEY_TYPE : str(typ),
+ MetadataManager.GLOBAL_META_KEY_PATH : str(path),
+ MetadataManager.GLOBAL_META_KEY_STATE : str(state)
+ })
+
+ def add_section(self, section):
+ try:
+ self.config.add_section(section)
+ except configparser.DuplicateSectionError:
+ return
+ except:
+ raise MetadataMgrException(-errno.EINVAL, "error adding section to config")
+
+ def remove_option(self, section, key):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ self.config.remove_option(section, key)
+
+ def remove_section(self, section):
+ self.config.remove_section(section)
+
+ def update_section(self, section, key, value):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ self.config.set(section, key, str(value))
+
+ def update_section_multi(self, section, dct):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ for key,value in dct.items():
+ self.config.set(section, key, str(value))
+
+ def update_global_section(self, key, value):
+ self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value))
+
+ def get_option(self, section, key):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ if not self.config.has_option(section, key):
+ raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section))
+ return self.config.get(section, key)
+
+ def get_global_option(self, key):
+ return self.get_option(MetadataManager.GLOBAL_SECTION, key)
+
+ def section_has_item(self, section, item):
+ if not self.config.has_section(section):
+ raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
+ return item in [v[1] for v in self.config.items(section)]
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
new file mode 100644
index 00000000..c2b5f582
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/op_sm.py
@@ -0,0 +1,114 @@
+import errno
+
+from enum import Enum, unique
+
+from ...exception import OpSmException
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions
+
+class TransitionKey(object):
+ def __init__(self, subvol_type, state, action_type):
+ self.transition_key = [subvol_type, state, action_type]
+
+ def __hash__(self):
+ return hash(tuple(self.transition_key))
+
+ def __eq__(self, other):
+ return self.transition_key == other.transition_key
+
+ def __neq__(self, other):
+ return not(self == other)
+
+class SubvolumeOpSm(object):
+ transition_table = {}
+
+ @staticmethod
+ def is_complete_state(state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeStates.STATE_COMPLETE
+
+ @staticmethod
+ def is_failed_state(state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeStates.STATE_FAILED or state == SubvolumeStates.STATE_CANCELED
+
+ @staticmethod
+ def is_init_state(stm_type, state):
+ if not isinstance(state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(state))
+ return state == SubvolumeOpSm.get_init_state(stm_type)
+
+ @staticmethod
+ def get_init_state(stm_type):
+ if not isinstance(stm_type, SubvolumeTypes):
+ raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+ init_state = SubvolumeOpSm.transition_table[TransitionKey(stm_type,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE)]
+ if not init_state:
+ raise OpSmException(-errno.ENOENT, "initial state for state machine '{0}' not found".format(stm_type))
+ return init_state
+
+ @staticmethod
+ def transition(stm_type, current_state, action):
+ if not isinstance(stm_type, SubvolumeTypes):
+ raise OpSmException(-errno.EINVAL, "unknown state machine '{0}'".format(stm_type))
+ if not isinstance(current_state, SubvolumeStates):
+ raise OpSmException(-errno.EINVAL, "unknown state '{0}'".format(current_state))
+ if not isinstance(action, SubvolumeActions):
+ raise OpSmException(-errno.EINVAL, "unknown action '{0}'".format(action))
+
+ transition = SubvolumeOpSm.transition_table[TransitionKey(stm_type, current_state, action)]
+ if not transition:
+ raise OpSmException(-errno.EINVAL, "invalid action '{0}' on current state {1} for state machine '{2}'".format(action, current_state, stm_type))
+
+ return transition
+
+SubvolumeOpSm.transition_table = {
+ # state transitions for state machine type TYPE_NORMAL
+ TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_COMPLETE,
+
+ TransitionKey(SubvolumeTypes.TYPE_NORMAL,
+ SubvolumeStates.STATE_COMPLETE,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ # state transitions for state machine type TYPE_CLONE
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INIT,
+ SubvolumeActions.ACTION_NONE) : SubvolumeStates.STATE_PENDING,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_INPROGRESS,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_PENDING,
+ SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_SUCCESS) : SubvolumeStates.STATE_COMPLETE,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_CANCELLED) : SubvolumeStates.STATE_CANCELED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_INPROGRESS,
+ SubvolumeActions.ACTION_FAILED) : SubvolumeStates.STATE_FAILED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_COMPLETE,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_CANCELED,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+
+ TransitionKey(SubvolumeTypes.TYPE_CLONE,
+ SubvolumeStates.STATE_FAILED,
+ SubvolumeActions.ACTION_RETAINED) : SubvolumeStates.STATE_RETAINED,
+}
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py
new file mode 100644
index 00000000..ec7138cb
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_attrs.py
@@ -0,0 +1,61 @@
+import errno
+from enum import Enum, unique
+
+from ...exception import VolumeException
+
+@unique
+class SubvolumeTypes(Enum):
+ TYPE_NORMAL = "subvolume"
+ TYPE_CLONE = "clone"
+
+ @staticmethod
+ def from_value(value):
+ if value == "subvolume":
+ return SubvolumeTypes.TYPE_NORMAL
+ if value == "clone":
+ return SubvolumeTypes.TYPE_CLONE
+
+ raise VolumeException(-errno.EINVAL, "invalid subvolume type '{0}'".format(value))
+
+@unique
+class SubvolumeStates(Enum):
+ STATE_INIT = 'init'
+ STATE_PENDING = 'pending'
+ STATE_INPROGRESS = 'in-progress'
+ STATE_FAILED = 'failed'
+ STATE_COMPLETE = 'complete'
+ STATE_CANCELED = 'canceled'
+ STATE_RETAINED = 'snapshot-retained'
+
+ @staticmethod
+ def from_value(value):
+ if value == "init":
+ return SubvolumeStates.STATE_INIT
+ if value == "pending":
+ return SubvolumeStates.STATE_PENDING
+ if value == "in-progress":
+ return SubvolumeStates.STATE_INPROGRESS
+ if value == "failed":
+ return SubvolumeStates.STATE_FAILED
+ if value == "complete":
+ return SubvolumeStates.STATE_COMPLETE
+ if value == "canceled":
+ return SubvolumeStates.STATE_CANCELED
+ if value == "snapshot-retained":
+ return SubvolumeStates.STATE_RETAINED
+
+ raise VolumeException(-errno.EINVAL, "invalid state '{0}'".format(value))
+
+@unique
+class SubvolumeActions(Enum):
+ ACTION_NONE = 0
+ ACTION_SUCCESS = 1
+ ACTION_FAILED = 2
+ ACTION_CANCELLED = 3
+ ACTION_RETAINED = 4
+
+@unique
+class SubvolumeFeatures(Enum):
+ FEATURE_SNAPSHOT_CLONE = "snapshot-clone"
+ FEATURE_SNAPSHOT_RETENTION = "snapshot-retention"
+ FEATURE_SNAPSHOT_AUTOPROTECT = "snapshot-autoprotect"
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
new file mode 100644
index 00000000..f193dabd
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
@@ -0,0 +1,331 @@
+import os
+import stat
+import uuid
+import errno
+import logging
+from hashlib import md5
+
+import cephfs
+
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates
+from .metadata_manager import MetadataManager
+from ..trash import create_trashcan, open_trashcan
+from ...fs_util import get_ancestor_xattr
+from ...exception import MetadataMgrException, VolumeException
+from .op_sm import SubvolumeOpSm
+from .auth_metadata import AuthMetadataManager
+
+log = logging.getLogger(__name__)
+
+class SubvolumeBase(object):
+ LEGACY_CONF_DIR = "_legacy"
+
+ def __init__(self, mgr, fs, vol_spec, group, subvolname, legacy=False):
+ self.mgr = mgr
+ self.fs = fs
+ self.auth_mdata_mgr = AuthMetadataManager(fs)
+ self.cmode = None
+ self.user_id = None
+ self.group_id = None
+ self.vol_spec = vol_spec
+ self.group = group
+ self.subvolname = subvolname
+ self.legacy_mode = legacy
+ self.load_config()
+
+ @property
+ def uid(self):
+ return self.user_id
+
+ @uid.setter
+ def uid(self, val):
+ self.user_id = val
+
+ @property
+ def gid(self):
+ return self.group_id
+
+ @gid.setter
+ def gid(self, val):
+ self.group_id = val
+
+ @property
+ def mode(self):
+ return self.cmode
+
+ @mode.setter
+ def mode(self, val):
+ self.cmode = val
+
+ @property
+ def base_path(self):
+ return os.path.join(self.group.path, self.subvolname.encode('utf-8'))
+
+ @property
+ def config_path(self):
+ return os.path.join(self.base_path, b".meta")
+
+ @property
+ def legacy_dir(self):
+ return os.path.join(self.vol_spec.base_dir.encode('utf-8'), SubvolumeBase.LEGACY_CONF_DIR.encode('utf-8'))
+
+ @property
+ def legacy_config_path(self):
+ m = md5()
+ m.update(self.base_path)
+ meta_config = "{0}.meta".format(m.hexdigest())
+ return os.path.join(self.legacy_dir, meta_config.encode('utf-8'))
+
+ @property
+ def namespace(self):
+ return "{0}{1}".format(self.vol_spec.fs_namespace, self.subvolname)
+
+ @property
+ def group_name(self):
+ return self.group.group_name
+
+ @property
+ def subvol_name(self):
+ return self.subvolname
+
+ @property
+ def legacy_mode(self):
+ return self.legacy
+
+ @legacy_mode.setter
+ def legacy_mode(self, mode):
+ self.legacy = mode
+
+ @property
+ def path(self):
+ """ Path to subvolume data directory """
+ raise NotImplementedError
+
+ @property
+ def features(self):
+ """ List of features supported by the subvolume, containing items from SubvolumeFeatures """
+ raise NotImplementedError
+
+ @property
+ def state(self):
+ """ Subvolume state, one of SubvolumeStates """
+ raise NotImplementedError
+
+ @property
+ def subvol_type(self):
+ return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
+
+ @property
+ def purgeable(self):
+ """ Boolean declaring if subvolume can be purged """
+ raise NotImplementedError
+
+ def load_config(self):
+ if self.legacy_mode:
+ self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640)
+ else:
+ self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640)
+
+ def get_attrs(self, pathname):
+ # get subvolume attributes
+ attrs = {}
+ stx = self.fs.statx(pathname,
+ cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+
+ attrs["uid"] = int(stx["uid"])
+ attrs["gid"] = int(stx["gid"])
+ attrs["mode"] = int(int(stx["mode"]) & ~stat.S_IFMT(stx["mode"]))
+
+ try:
+ attrs["data_pool"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.NoData:
+ attrs["data_pool"] = None
+
+ try:
+ attrs["pool_namespace"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.NoData:
+ attrs["pool_namespace"] = None
+
+ try:
+ attrs["quota"] = int(self.fs.getxattr(pathname, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ attrs["quota"] = None
+
+ return attrs
+
+ def set_attrs(self, path, attrs):
+ # set subvolume attributes
+ # set size
+ quota = attrs.get("quota")
+ if quota is not None:
+ try:
+ self.fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set pool layout
+ data_pool = attrs.get("data_pool")
+ if data_pool is not None:
+ try:
+ self.fs.setxattr(path, 'ceph.dir.layout.pool', data_pool.encode('utf-8'), 0)
+ except cephfs.InvalidValue:
+ raise VolumeException(-errno.EINVAL,
+ "invalid pool layout '{0}' -- need a valid data pool".format(data_pool))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # isolate namespace
+ xattr_key = xattr_val = None
+ pool_namespace = attrs.get("pool_namespace")
+ if pool_namespace is not None:
+ # enforce security isolation, use separate namespace for this subvolume
+ xattr_key = 'ceph.dir.layout.pool_namespace'
+ xattr_val = pool_namespace
+ elif not data_pool:
+ # If subvolume's namespace layout is not set, then the subvolume's pool
+ # layout remains unset and will undesirably change with ancestor's
+ # pool layout changes.
+ xattr_key = 'ceph.dir.layout.pool'
+ xattr_val = None
+ try:
+ self.fs.getxattr(path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.NoData as e:
+ xattr_val = get_ancestor_xattr(self.fs, os.path.split(path)[0], "ceph.dir.layout.pool")
+ if xattr_key and xattr_val:
+ try:
+ self.fs.setxattr(path, xattr_key, xattr_val.encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ # set uid/gid
+ uid = attrs.get("uid")
+ if uid is None:
+ uid = self.group.uid
+ else:
+ try:
+ uid = int(uid)
+ if uid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid UID")
+
+ gid = attrs.get("gid")
+ if gid is None:
+ gid = self.group.gid
+ else:
+ try:
+ gid = int(gid)
+ if gid < 0:
+ raise ValueError
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "invalid GID")
+
+ if uid is not None and gid is not None:
+ self.fs.chown(path, uid, gid)
+
+ def _resize(self, path, newsize, noshrink):
+ try:
+ newsize = int(newsize)
+ if newsize <= 0:
+ raise VolumeException(-errno.EINVAL, "Invalid subvolume size")
+ except ValueError:
+ newsize = newsize.lower()
+ if not (newsize == "inf" or newsize == "infinite"):
+ raise VolumeException(-errno.EINVAL, "invalid size option '{0}'".format(newsize))
+ newsize = 0
+ noshrink = False
+
+ try:
+ maxbytes = int(self.fs.getxattr(path, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ maxbytes = 0
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ subvolstat = self.fs.stat(path)
+ if newsize > 0 and newsize < subvolstat.st_size:
+ if noshrink:
+ raise VolumeException(-errno.EINVAL, "Can't resize the subvolume. The new size '{0}' would be lesser than the current "
+ "used size '{1}'".format(newsize, subvolstat.st_size))
+
+ if not newsize == maxbytes:
+ try:
+ self.fs.setxattr(path, 'ceph.quota.max_bytes', str(newsize).encode('utf-8'), 0)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], "Cannot set new size for the subvolume. '{0}'".format(e.args[1]))
+ return newsize, subvolstat.st_size
+
+ def init_config(self, version, subvolume_type, subvolume_path, subvolume_state):
+ self.metadata_mgr.init(version, subvolume_type.value, subvolume_path, subvolume_state.value)
+ self.metadata_mgr.flush()
+
+ def discover(self):
+ log.debug("discovering subvolume '{0}' [mode: {1}]".format(self.subvolname, "legacy" if self.legacy_mode else "new"))
+ try:
+ self.fs.stat(self.base_path)
+ self.metadata_mgr.refresh()
+ log.debug("loaded subvolume '{0}'".format(self.subvolname))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT and not self.legacy_mode:
+ self.legacy_mode = True
+ self.load_config()
+ self.discover()
+ else:
+ raise
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(-e.args[0], "error accessing subvolume '{0}'".format(self.subvolname))
+
+ def _trash_dir(self, path):
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.dump(path)
+ log.info("subvolume path '{0}' moved to trashcan".format(path))
+
+ def _link_dir(self, path, bname):
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.link(path, bname)
+ log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname))
+
+ def trash_base_dir(self):
+ if self.legacy_mode:
+ self.fs.unlink(self.legacy_config_path)
+ self._trash_dir(self.base_path)
+
+ def create_base_dir(self, mode):
+ try:
+ self.fs.mkdirs(self.base_path, mode)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def info (self):
+ subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH)
+ etype = self.subvol_type
+ st = self.fs.statx(subvolpath, cephfs.CEPH_STATX_BTIME | cephfs.CEPH_STATX_SIZE |
+ cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID |
+ cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_ATIME |
+ cephfs.CEPH_STATX_MTIME | cephfs.CEPH_STATX_CTIME,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ usedbytes = st["size"]
+ try:
+ nsize = int(self.fs.getxattr(subvolpath, 'ceph.quota.max_bytes').decode('utf-8'))
+ except cephfs.NoData:
+ nsize = 0
+
+ try:
+ data_pool = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool').decode('utf-8')
+ pool_namespace = self.fs.getxattr(subvolpath, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ return {'path': subvolpath, 'type': etype.value, 'uid': int(st["uid"]), 'gid': int(st["gid"]),
+ 'atime': str(st["atime"]), 'mtime': str(st["mtime"]), 'ctime': str(st["ctime"]),
+ 'mode': int(st["mode"]), 'data_pool': data_pool, 'created_at': str(st["btime"]),
+ 'bytes_quota': "infinite" if nsize == 0 else nsize, 'bytes_used': int(usedbytes),
+ 'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0),
+ 'pool_namespace': pool_namespace, 'features': self.features, 'state': self.state.value}
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
new file mode 100644
index 00000000..b4cca736
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
@@ -0,0 +1,780 @@
+import os
+import sys
+import stat
+import uuid
+import errno
+import logging
+import json
+from datetime import datetime
+try:
+ from typing import List, Dict
+except ImportError:
+ pass # For typing only
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
+from .op_sm import SubvolumeOpSm
+from .subvolume_base import SubvolumeBase
+from ..template import SubvolumeTemplate
+from ..snapshot_util import mksnap, rmsnap
+from ..access import allow_access, deny_access
+from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError
+from ...fs_util import listsnaps, is_inherited_snap
+from ..template import SubvolumeOpType
+from ..group import Group
+from ..rankevicter import RankEvicter
+from ..volume import get_mds_map
+
+from ..clone_index import open_clone_index, create_clone_index
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
+ """
+ Version 1 subvolumes creates a subvolume with path as follows,
+ volumes/<group-name>/<subvolume-name>/<uuid>/
+
+ - The directory under which user data resides is <uuid>
+ - Snapshots of the subvolume are taken within the <uuid> directory
+ - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
+ - global information about the subvolume (version, path, type, state)
+ - snapshots attached to an ongoing clone operation
+ - clone snapshot source if subvolume is a clone of a snapshot
+ - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
+ /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
+ component in the path.
+ """
+ VERSION = 1
+
+ @staticmethod
+ def version():
+ return SubvolumeV1.VERSION
+
+ @property
+ def path(self):
+ try:
+ # no need to stat the path -- open() does that
+ return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+
+ @property
+ def features(self):
+ return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
+
+ def mark_subvolume(self):
+ # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
+ # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
+ try:
+ # MDS treats this as a noop for already marked subvolume
+ self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def snapshot_base_path(self):
+ """ Base path for all snapshots """
+ return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+
+ def snapshot_path(self, snapname):
+ """ Path to a specific snapshot named 'snapname' """
+ return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8'))
+
+ def snapshot_data_path(self, snapname):
+ """ Path to user data directory within a subvolume snapshot named 'snapname' """
+ return self.snapshot_path(snapname)
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, mode)
+ self.mark_subvolume()
+ attrs = {
+ 'uid': uid,
+ 'gid': gid,
+ 'data_pool': pool,
+ 'pool_namespace': self.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata
+ qpath = subvol_path.decode('utf-8')
+ self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def add_clone_source(self, volname, subvolume, snapname, flush=False):
+ self.metadata_mgr.add_section("source")
+ self.metadata_mgr.update_section("source", "volume", volname)
+ if not subvolume.group.is_default_group():
+ self.metadata_mgr.update_section("source", "group", subvolume.group_name)
+ self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name)
+ self.metadata_mgr.update_section("source", "snapshot", snapname)
+ if flush:
+ self.metadata_mgr.flush()
+
+ def remove_clone_source(self, flush=False):
+ self.metadata_mgr.remove_section("source")
+ if flush:
+ self.metadata_mgr.flush()
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ subvolume_type = SubvolumeTypes.TYPE_CLONE
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "clone failed: internal error")
+
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # source snapshot attrs are used to create clone subvolume.
+ # attributes of subvolume's content though, are synced during the cloning process.
+ attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
+
+ # override snapshot pool setting, if one is provided for the clone
+ if pool is not None:
+ attrs["data_pool"] = pool
+ attrs["pool_namespace"] = None
+
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, attrs.get("mode"))
+ self.mark_subvolume()
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata and clone source
+ qpath = subvol_path.decode('utf-8')
+ self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value)
+ self.add_clone_source(source_volname, source_subvolume, snapname)
+ self.metadata_mgr.flush()
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def allowed_ops_by_type(self, vol_type):
+ if vol_type == SubvolumeTypes.TYPE_CLONE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_type == SubvolumeTypes.TYPE_NORMAL:
+ return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ return {}
+
+ def allowed_ops_by_state(self, vol_state):
+ if vol_state == SubvolumeStates.STATE_COMPLETE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ return {SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.CLONE_CREATE,
+ SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ def open(self, op_type):
+ if not isinstance(op_type, SubvolumeOpType):
+ raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+ op_type.value, self.subvolname))
+ try:
+ self.metadata_mgr.refresh()
+
+ etype = self.subvol_type
+ if op_type not in self.allowed_ops_by_type(etype):
+ raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+ op_type.value, self.subvolname, etype.value))
+
+ estate = self.state
+ if op_type not in self.allowed_ops_by_state(estate):
+ raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+ self.subvolname, op_type.value))
+
+ subvol_path = self.path
+ log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+ st = self.fs.stat(subvol_path)
+ # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
+ self.mark_subvolume()
+
+ self.uid = int(st.st_uid)
+ self.gid = int(st.st_gid)
+ self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(me.args[0], me.args[1])
+ except cephfs.ObjectNotFound:
+ log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+ raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def _recover_auth_meta(self, auth_id, auth_meta):
+ """
+ Call me after locking the auth meta file.
+ """
+ remove_subvolumes = []
+
+ for subvol, subvol_data in auth_meta['subvolumes'].items():
+ if not subvol_data['dirty']:
+ continue
+
+ (group_name, subvol_name) = subvol.split('/')
+ group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME
+ access_level = subvol_data['access_level']
+
+ with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name)
+
+ # No SVMeta update indicates that there was no auth update
+ # in Ceph either. So it's safe to remove corresponding
+ # partial update in AMeta.
+ if not subvol_meta or auth_id not in subvol_meta['auths']:
+ remove_subvolumes.append(subvol)
+ continue
+
+ want_auth = {
+ 'access_level': access_level,
+ 'dirty': False,
+ }
+ # SVMeta update looks clean. Ceph auth update must have been
+ # clean. Update the dirty flag and continue
+ if subvol_meta['auths'][auth_id] == want_auth:
+ auth_meta['subvolumes'][subvol]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+ continue
+
+ client_entity = "client.{0}".format(auth_id)
+ ret, out, err = self.mgr.mon_command(
+ {
+ 'prefix': 'auth get',
+ 'entity': client_entity,
+ 'format': 'json'
+ })
+ if ret == 0:
+ existing_caps = json.loads(out)
+ elif ret == -errno.ENOENT:
+ existing_caps = None
+ else:
+ log.error(err)
+ raise VolumeException(ret, err)
+
+ self._authorize_subvolume(auth_id, access_level, existing_caps)
+
+ # Recovered from partial auth updates for the auth ID's access
+ # to a subvolume.
+ auth_meta['subvolumes'][subvol]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ for subvol in remove_subvolumes:
+ del auth_meta['subvolumes'][subvol]
+
+ if not auth_meta['subvolumes']:
+ # Clean up auth meta file
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ # Recovered from all partial auth updates for the auth ID.
+ auth_meta['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False):
+ """
+ Get-or-create a Ceph auth identity for `auth_id` and grant them access
+ to
+ :param auth_id:
+ :param access_level:
+ :param tenant_id: Optionally provide a stringizable object to
+ restrict any created cephx IDs to other callers
+ passing the same tenant ID.
+ :allow_existing_id: Optionally authorize existing auth-ids not
+ created by ceph_volume_client.
+ :return:
+ """
+
+ with self.auth_mdata_mgr.auth_lock(auth_id):
+ client_entity = "client.{0}".format(auth_id)
+ ret, out, err = self.mgr.mon_command(
+ {
+ 'prefix': 'auth get',
+ 'entity': client_entity,
+ 'format': 'json'
+ })
+
+ if ret == 0:
+ existing_caps = json.loads(out)
+ elif ret == -errno.ENOENT:
+ existing_caps = None
+ else:
+ log.error(err)
+ raise VolumeException(ret, err)
+
+ # Existing meta, or None, to be updated
+ auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+ # subvolume data to be inserted
+ group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+ group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+ subvolume = {
+ group_subvol_id : {
+ # The access level at which the auth_id is authorized to
+ # access the volume.
+ 'access_level': access_level,
+ 'dirty': True,
+ }
+ }
+
+ if auth_meta is None:
+ if not allow_existing_id and existing_caps is not None:
+ msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.EPERM, msg)
+
+ # non-existent auth IDs
+ sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+ auth_id, tenant_id
+ ))
+ log.debug("Authorize: no existing meta")
+ auth_meta = {
+ 'dirty': True,
+ 'tenant_id': str(tenant_id) if tenant_id else None,
+ 'subvolumes': subvolume
+ }
+ else:
+ # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
+ if 'volumes' in auth_meta:
+ auth_meta['subvolumes'] = auth_meta.pop('volumes')
+
+ # Disallow tenants to share auth IDs
+ if str(auth_meta['tenant_id']) != str(tenant_id):
+ msg = "auth ID: {0} is already in use".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.EPERM, msg)
+
+ if auth_meta['dirty']:
+ self._recover_auth_meta(auth_id, auth_meta)
+
+ log.debug("Authorize: existing tenant {tenant}".format(
+ tenant=auth_meta['tenant_id']
+ ))
+ auth_meta['dirty'] = True
+ auth_meta['subvolumes'].update(subvolume)
+
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ key = self._authorize_subvolume(auth_id, access_level, existing_caps)
+
+ auth_meta['dirty'] = False
+ auth_meta['subvolumes'][group_subvol_id]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ if tenant_id:
+ return key
+ else:
+ # Caller wasn't multi-tenant aware: be safe and don't give
+ # them a key
+ return ""
+
+ def _authorize_subvolume(self, auth_id, access_level, existing_caps):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+ auth = {
+ auth_id: {
+ 'access_level': access_level,
+ 'dirty': True,
+ }
+ }
+
+ if subvol_meta is None:
+ subvol_meta = {
+ 'auths': auth
+ }
+ else:
+ subvol_meta['auths'].update(auth)
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ key = self._authorize(auth_id, access_level, existing_caps)
+
+ subvol_meta['auths'][auth_id]['dirty'] = False
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ return key
+
+ def _authorize(self, auth_id, access_level, existing_caps):
+ subvol_path = self.path
+ log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path))
+
+ # First I need to work out what the data pool is for this share:
+ # read the layout
+ try:
+ pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ try:
+ namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.NoData:
+ namespace = None
+
+ # Now construct auth capabilities that give the guest just enough
+ # permissions to access the share
+ client_entity = "client.{0}".format(auth_id)
+ want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8'))
+ want_osd_cap = "allow {0} pool={1}{2}".format(
+ access_level, pool, " namespace={0}".format(namespace) if namespace else "")
+
+ # Construct auth caps that if present might conflict with the desired
+ # auth caps.
+ unwanted_access_level = 'r' if access_level is 'rw' else 'rw'
+ unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8'))
+ unwanted_osd_cap = "allow {0} pool={1}{2}".format(
+ unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "")
+
+ return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap,
+ unwanted_mds_cap, unwanted_osd_cap, existing_caps)
+
+ def deauthorize(self, auth_id):
+ with self.auth_mdata_mgr.auth_lock(auth_id):
+ # Existing meta, or None, to be updated
+ auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+ if auth_meta is None:
+ msg = "auth ID: {0} doesn't exist".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.ENOENT, msg)
+
+ # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
+ if 'volumes' in auth_meta:
+ auth_meta['subvolumes'] = auth_meta.pop('volumes')
+
+ group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+ group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+ if (auth_meta is None) or (not auth_meta['subvolumes']):
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ # Clean up the auth meta file of an auth ID
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ if group_subvol_id not in auth_meta['subvolumes']:
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ return
+
+ if auth_meta['dirty']:
+ self._recover_auth_meta(auth_id, auth_meta)
+
+ auth_meta['dirty'] = True
+ auth_meta['subvolumes'][group_subvol_id]['dirty'] = True
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ self._deauthorize_subvolume(auth_id)
+
+ # Filter out the volume we're deauthorizing
+ del auth_meta['subvolumes'][group_subvol_id]
+
+ # Clean up auth meta file
+ if not auth_meta['subvolumes']:
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ auth_meta['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ def _deauthorize_subvolume(self, auth_id):
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+ if (subvol_meta is None) or (auth_id not in subvol_meta['auths']):
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ return
+
+ subvol_meta['auths'][auth_id]['dirty'] = True
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ self._deauthorize(auth_id)
+
+ # Remove the auth_id from the metadata *after* removing it
+ # from ceph, so that if we crashed here, we would actually
+ # recreate the auth ID during recovery (i.e. end up with
+ # a consistent state).
+
+ # Filter out the auth we're removing
+ del subvol_meta['auths'][auth_id]
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ def _deauthorize(self, auth_id):
+ """
+ The volume must still exist.
+ """
+ client_entity = "client.{0}".format(auth_id)
+ subvol_path = self.path
+ try:
+ pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ try:
+ namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
+ except cephfs.NoData:
+ namespace = None
+
+ # The auth_id might have read-only or read-write mount access for the
+ # subvolume path.
+ access_levels = ('r', 'rw')
+ want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8'))
+ for access_level in access_levels]
+ want_osd_caps = ['allow {0} pool={1}{2}'.format(
+ access_level, pool_name, " namespace={0}".format(namespace) if namespace else "")
+ for access_level in access_levels]
+ deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps)
+
+ def authorized_list(self):
+ """
+ Expose a list of auth IDs that have access to a subvolume.
+
+ return: a list of (auth_id, access_level) tuples, where
+ the access_level can be 'r' , or 'rw'.
+ None if no auth ID is given access to the subvolume.
+ """
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+ auths = [] # type: List[Dict[str,str]]
+ if not meta or not meta['auths']:
+ return auths
+
+ for auth, auth_data in meta['auths'].items():
+ # Skip partial auth updates.
+ if not auth_data['dirty']:
+ auths.append({auth: auth_data['access_level']})
+
+ return auths
+
+ def evict(self, volname, auth_id, timeout=30):
+ """
+ Evict all clients based on the authorization ID and the subvolume path mounted.
+ Assumes that the authorization key has been revoked prior to calling this function.
+
+ This operation can throw an exception if the mon cluster is unresponsive, or
+ any individual MDS daemon is unresponsive for longer than the timeout passed in.
+ """
+
+ client_spec = ["auth_name={0}".format(auth_id), ]
+ client_spec.append("client_metadata.root={0}".
+ format(self.path.decode('utf-8')))
+
+ log.info("evict clients with {0}".format(', '.join(client_spec)))
+
+ mds_map = get_mds_map(self.mgr, volname)
+ if not mds_map:
+ raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname))
+
+ up = {}
+ for name, gid in mds_map['up'].items():
+ # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
+ assert name.startswith("mds_")
+ up[int(name[4:])] = gid
+
+ # For all MDS ranks held by a daemon
+ # Do the parallelism in python instead of using "tell mds.*", because
+ # the latter doesn't give us per-mds output
+ threads = []
+ for rank, gid in up.items():
+ thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout)
+ thread.start()
+ threads.append(thread)
+
+ for t in threads:
+ t.join()
+
+ log.info("evict: joined all")
+
+ for t in threads:
+ if not t.success:
+ msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
+ format(', '.join(client_spec), t.rank, t.gid, t.exception)
+ )
+ log.error(msg)
+ raise EvictionError(msg)
+
+ def _get_clone_source(self):
+ try:
+ clone_source = {
+ 'volume' : self.metadata_mgr.get_option("source", "volume"),
+ 'subvolume': self.metadata_mgr.get_option("source", "subvolume"),
+ 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"),
+ }
+
+ try:
+ clone_source["group"] = self.metadata_mgr.get_option("source", "group")
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ pass
+ else:
+ raise
+ except MetadataMgrException as me:
+ raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
+ return clone_source
+
+ @property
+ def status(self):
+ state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+ subvolume_type = self.subvol_type
+ subvolume_status = {
+ 'state' : state.value
+ }
+ if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
+ subvolume_status["source"] = self._get_clone_source()
+ return subvolume_status
+
+ @property
+ def state(self):
+ return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
+
+ @state.setter
+ def state(self, val):
+ state = val[0].value
+ flush = val[1]
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
+ if flush:
+ self.metadata_mgr.flush()
+
+ def remove(self, retainsnaps=False):
+ if retainsnaps:
+ raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname))
+ if self.list_snapshots():
+ raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
+ self.trash_base_dir()
+
+ def resize(self, newsize, noshrink):
+ subvol_path = self.path
+ return self._resize(subvol_path, newsize, noshrink)
+
+ def create_snapshot(self, snapname):
+ try:
+ group_snapshot_path = os.path.join(self.group.path,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+ snapname.encode('utf-8'))
+ self.fs.stat(group_snapshot_path)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ snappath = self.snapshot_path(snapname)
+ mksnap(self.fs, snappath)
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same")
+
+ def has_pending_clones(self, snapname):
+ try:
+ return self.metadata_mgr.section_has_item('clone snaps', snapname)
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ return False
+ raise
+
+ def remove_snapshot(self, snapname):
+ if self.has_pending_clones(snapname):
+ raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
+ snappath = self.snapshot_path(snapname)
+ rmsnap(self.fs, snappath)
+
+ def snapshot_info(self, snapname):
+ if is_inherited_snap(snapname):
+ raise VolumeException(-errno.EINVAL,
+ "snapshot name '{0}' is invalid".format(snapname))
+ snappath = self.snapshot_data_path(snapname)
+ snap_info = {}
+ try:
+ snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
+ 'data_pool':'ceph.dir.layout.pool'}
+ for key, val in snap_attrs.items():
+ snap_info[key] = self.fs.getxattr(snappath, val)
+ return {'size': int(snap_info['size']),
+ 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
+ 'data_pool': snap_info['data_pool'].decode('utf-8'),
+ 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT,
+ "snapshot '{0}' does not exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def list_snapshots(self):
+ try:
+ dirpath = self.snapshot_base_path()
+ return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return []
+ raise
+
+ def _add_snap_clone(self, track_id, snapname):
+ self.metadata_mgr.add_section("clone snaps")
+ self.metadata_mgr.update_section("clone snaps", track_id, snapname)
+ self.metadata_mgr.flush()
+
+ def _remove_snap_clone(self, track_id):
+ self.metadata_mgr.remove_option("clone snaps", track_id)
+ self.metadata_mgr.flush()
+
+ def attach_snapshot(self, snapname, tgt_subvolume):
+ if not snapname.encode('utf-8') in self.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ try:
+ create_clone_index(self.fs, self.vol_spec)
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ track_idx = index.track(tgt_subvolume.base_path)
+ self._add_snap_clone(track_idx, snapname)
+ except (IndexException, MetadataMgrException) as e:
+ log.warn("error creating clone index: {0}".format(e))
+ raise VolumeException(-errno.EINVAL, "error cloning subvolume")
+
+ def detach_snapshot(self, snapname, track_id):
+ if not snapname.encode('utf-8') in self.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ try:
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ index.untrack(track_id)
+ self._remove_snap_clone(track_id)
+ except (IndexException, MetadataMgrException) as e:
+ log.warn("error delining snapshot from clone: {0}".format(e))
+ raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone")
diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
new file mode 100644
index 00000000..1dd6f3fe
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py
@@ -0,0 +1,370 @@
+import os
+import stat
+import uuid
+import errno
+import logging
+
+import cephfs
+
+from .metadata_manager import MetadataManager
+from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
+from .op_sm import SubvolumeOpSm
+from .subvolume_v1 import SubvolumeV1
+from ..template import SubvolumeTemplate
+from ...exception import OpSmException, VolumeException, MetadataMgrException
+from ...fs_util import listdir
+from ..template import SubvolumeOpType
+
+log = logging.getLogger(__name__)
+
+class SubvolumeV2(SubvolumeV1):
+ """
+ Version 2 subvolumes creates a subvolume with path as follows,
+ volumes/<group-name>/<subvolume-name>/<uuid>/
+
+ The distinguishing feature of V2 subvolume as compared to V1 subvolumes is its ability to retain snapshots
+ of a subvolume on removal. This is done by creating snapshots under the <subvolume-name> directory,
+ rather than under the <uuid> directory, as is the case of V1 subvolumes.
+
+ - The directory under which user data resides is <uuid>
+ - Snapshots of the subvolume are taken within the <subvolume-name> directory
+ - A meta file is maintained under the <subvolume-name> directory as a metadata store, storing information similar
+ to V1 subvolumes
+ - On a request to remove subvolume but retain its snapshots, only the <uuid> directory is moved to trash, retaining
+ the rest of the subvolume and its meta file.
+ - The <uuid> directory, when present, is the current incarnation of the subvolume, which may have snapshots of
+ older incarnations of the same subvolume.
+ - V1 subvolumes that currently do not have any snapshots are upgraded to V2 subvolumes automatically, to support the
+ snapshot retention feature
+ """
+ VERSION = 2
+
+ @staticmethod
+ def version():
+ return SubvolumeV2.VERSION
+
+ @property
+ def features(self):
+ return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value,
+ SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value,
+ SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value]
+
+ @property
+ def retained(self):
+ try:
+ self.metadata_mgr.refresh()
+ if self.state == SubvolumeStates.STATE_RETAINED:
+ return True
+ return False
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
+ return False
+
+ @property
+ def purgeable(self):
+ if not self.retained or self.list_snapshots() or self.has_pending_purges:
+ return False
+ return True
+
+ @property
+ def has_pending_purges(self):
+ try:
+ return not listdir(self.fs, self.trash_dir) == []
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return False
+ raise
+
+ @property
+ def trash_dir(self):
+ return os.path.join(self.base_path, b".trash")
+
+ def create_trashcan(self):
+ """per subvolume trash directory"""
+ try:
+ self.fs.stat(self.trash_dir)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ try:
+ self.fs.mkdir(self.trash_dir, 0o700)
+ except cephfs.Error as ce:
+ raise VolumeException(-ce.args[0], ce.args[1])
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def mark_subvolume(self):
+ # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
+ # subvolume root is where snapshots would be taken, and hence is the base_path for v2 subvolumes
+ try:
+ # MDS treats this as a noop for already marked subvolume
+ self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0)
+ except cephfs.InvalidValue as e:
+ raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ @staticmethod
+ def is_valid_uuid(uuid_str):
+ try:
+ uuid.UUID(uuid_str)
+ return True
+ except ValueError:
+ return False
+
+ def snapshot_base_path(self):
+ return os.path.join(self.base_path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+
+ def snapshot_data_path(self, snapname):
+ snap_base_path = self.snapshot_path(snapname)
+ uuid_str = None
+ try:
+ with self.fs.opendir(snap_base_path) as dir_handle:
+ d = self.fs.readdir(dir_handle)
+ while d:
+ if d.d_name not in (b".", b".."):
+ d_full_path = os.path.join(snap_base_path, d.d_name)
+ stx = self.fs.statx(d_full_path, cephfs.CEPH_STATX_MODE, cephfs.AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISDIR(stx.get('mode')):
+ if self.is_valid_uuid(d.d_name.decode('utf-8')):
+ uuid_str = d.d_name
+ d = self.fs.readdir(dir_handle)
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ raise VolumeException(-e.args[0], e.args[1])
+
+ if not uuid_str:
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+
+ return os.path.join(snap_base_path, uuid_str)
+
+ def _remove_on_failure(self, subvol_path, retained):
+ if retained:
+ log.info("cleaning up subvolume incarnation with path: {0}".format(subvol_path))
+ try:
+ self.fs.rmdir(subvol_path)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
+ self.remove()
+
+ def _set_incarnation_metadata(self, subvolume_type, qpath, initial_state):
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_TYPE, subvolume_type.value)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, qpath)
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, initial_state.value)
+
+ def create(self, size, isolate_nspace, pool, mode, uid, gid):
+ subvolume_type = SubvolumeTypes.TYPE_NORMAL
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
+
+ retained = self.retained
+ if retained and self.has_pending_purges:
+ raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ self.fs.mkdirs(subvol_path, mode)
+ self.mark_subvolume()
+ attrs = {
+ 'uid': uid,
+ 'gid': gid,
+ 'data_pool': pool,
+ 'pool_namespace': self.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata
+ qpath = subvol_path.decode('utf-8')
+ if retained:
+ self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+ self.metadata_mgr.flush()
+ else:
+ self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state)
+
+ # Create the subvolume metadata file which manages auth-ids if it doesn't exist
+ self.auth_mdata_mgr.create_subvolume_metadata_file(self.group.groupname, self.subvolname)
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ self._remove_on_failure(subvol_path, retained)
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def create_clone(self, pool, source_volname, source_subvolume, snapname):
+ subvolume_type = SubvolumeTypes.TYPE_CLONE
+ try:
+ initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
+ except OpSmException as oe:
+ raise VolumeException(-errno.EINVAL, "clone failed: internal error")
+
+ retained = self.retained
+ if retained and self.has_pending_purges:
+ raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
+ subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
+ try:
+ # source snapshot attrs are used to create clone subvolume
+ # attributes of subvolume's content though, are synced during the cloning process.
+ attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
+
+ # override snapshot pool setting, if one is provided for the clone
+ if pool is not None:
+ attrs["data_pool"] = pool
+ attrs["pool_namespace"] = None
+
+ # create directory and set attributes
+ self.fs.mkdirs(subvol_path, attrs.get("mode"))
+ self.mark_subvolume()
+ self.set_attrs(subvol_path, attrs)
+
+ # persist subvolume metadata and clone source
+ qpath = subvol_path.decode('utf-8')
+ if retained:
+ self._set_incarnation_metadata(subvolume_type, qpath, initial_state)
+ else:
+ self.metadata_mgr.init(SubvolumeV2.VERSION, subvolume_type.value, qpath, initial_state.value)
+ self.add_clone_source(source_volname, source_subvolume, snapname)
+ self.metadata_mgr.flush()
+ except (VolumeException, MetadataMgrException, cephfs.Error) as e:
+ try:
+ self._remove_on_failure(subvol_path, retained)
+ except VolumeException as ve:
+ log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
+
+ if isinstance(e, MetadataMgrException):
+ log.error("metadata manager exception: {0}".format(e))
+ e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ elif isinstance(e, cephfs.Error):
+ e = VolumeException(-e.args[0], e.args[1])
+ raise e
+
+ def allowed_ops_by_type(self, vol_type):
+ if vol_type == SubvolumeTypes.TYPE_CLONE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_type == SubvolumeTypes.TYPE_NORMAL:
+ return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL}
+
+ return {}
+
+ def allowed_ops_by_state(self, vol_state):
+ if vol_state == SubvolumeStates.STATE_COMPLETE:
+ return {op_type for op_type in SubvolumeOpType}
+
+ if vol_state == SubvolumeStates.STATE_RETAINED:
+ return {
+ SubvolumeOpType.REMOVE,
+ SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.LIST,
+ SubvolumeOpType.INFO,
+ SubvolumeOpType.SNAP_REMOVE,
+ SubvolumeOpType.SNAP_LIST,
+ SubvolumeOpType.SNAP_INFO,
+ SubvolumeOpType.SNAP_PROTECT,
+ SubvolumeOpType.SNAP_UNPROTECT,
+ SubvolumeOpType.CLONE_SOURCE
+ }
+
+ return {SubvolumeOpType.REMOVE_FORCE,
+ SubvolumeOpType.CLONE_CREATE,
+ SubvolumeOpType.CLONE_STATUS,
+ SubvolumeOpType.CLONE_CANCEL,
+ SubvolumeOpType.CLONE_INTERNAL,
+ SubvolumeOpType.CLONE_SOURCE}
+
+ def open(self, op_type):
+ if not isinstance(op_type, SubvolumeOpType):
+ raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
+ op_type.value, self.subvolname))
+ try:
+ self.metadata_mgr.refresh()
+ # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
+ self.mark_subvolume()
+
+ etype = self.subvol_type
+ if op_type not in self.allowed_ops_by_type(etype):
+ raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
+ op_type.value, self.subvolname, etype.value))
+
+ estate = self.state
+ if op_type not in self.allowed_ops_by_state(estate) and estate == SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' is removed and has only snapshots retained".format(
+ self.subvolname))
+
+ if op_type not in self.allowed_ops_by_state(estate) and estate != SubvolumeStates.STATE_RETAINED:
+ raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
+ self.subvolname, op_type.value))
+
+ if estate != SubvolumeStates.STATE_RETAINED:
+ subvol_path = self.path
+ log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
+ st = self.fs.stat(subvol_path)
+
+ self.uid = int(st.st_uid)
+ self.gid = int(st.st_gid)
+ self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+ except MetadataMgrException as me:
+ if me.errno == -errno.ENOENT:
+ raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
+ raise VolumeException(me.args[0], me.args[1])
+ except cephfs.ObjectNotFound:
+ log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
+ raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def trash_incarnation_dir(self):
+ """rename subvolume (uuid component) to trash"""
+ self.create_trashcan()
+ try:
+ bname = os.path.basename(self.path)
+ tpath = os.path.join(self.trash_dir, bname)
+ log.debug("trash: {0} -> {1}".format(self.path, tpath))
+ self.fs.rename(self.path, tpath)
+ self._link_dir(tpath, bname)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def remove(self, retainsnaps=False):
+ if self.list_snapshots():
+ if not retainsnaps:
+ raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
+ else:
+ if not self.has_pending_purges:
+ self.trash_base_dir()
+ # Delete the volume meta file, if it's not already deleted
+ self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
+ return
+ if self.state != SubvolumeStates.STATE_RETAINED:
+ self.trash_incarnation_dir()
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
+ self.metadata_mgr.flush()
+ # Delete the volume meta file, if it's not already deleted
+ self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
+
+ def info(self):
+ if self.state != SubvolumeStates.STATE_RETAINED:
+ return super(SubvolumeV2, self).info()
+
+ return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value}
+
+ def remove_snapshot(self, snapname):
+ super(SubvolumeV2, self).remove_snapshot(snapname)
+ if self.purgeable:
+ self.trash_base_dir()
+ # tickle the volume purge job to purge this entry, using ESTALE
+ raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
+ # if not purgeable, subvol is not retained, or has snapshots, or already has purge jobs that will garbage collect this subvol
diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py
new file mode 100644
index 00000000..410e5c44
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/operations/volume.py
@@ -0,0 +1,348 @@
+import time
+import errno
+import logging
+import sys
+
+try:
+ from typing import List
+except ImportError:
+ pass # For typing only
+
+from contextlib import contextmanager
+from threading import Lock, Condition
+
+if sys.version_info >= (3, 3):
+ from threading import Timer
+else:
+ from threading import _Timer as Timer
+
+import cephfs
+import orchestrator
+
+from .lock import GlobalLock
+from ..exception import VolumeException
+from ..fs_util import create_pool, remove_pool, create_filesystem, \
+ remove_filesystem, create_mds, volume_exists
+
+log = logging.getLogger(__name__)
+
+class ConnectionPool(object):
+ class Connection(object):
+ def __init__(self, mgr, fs_name):
+ self.fs = None
+ self.mgr = mgr
+ self.fs_name = fs_name
+ self.ops_in_progress = 0
+ self.last_used = time.time()
+ self.fs_id = self.get_fs_id()
+
+ def get_fs_id(self):
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == self.fs_name:
+ return fs['id']
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+ def get_fs_handle(self):
+ self.last_used = time.time()
+ self.ops_in_progress += 1
+ return self.fs
+
+ def put_fs_handle(self, notify):
+ assert self.ops_in_progress > 0
+ self.ops_in_progress -= 1
+ if self.ops_in_progress == 0:
+ notify()
+
+ def del_fs_handle(self, waiter):
+ if waiter:
+ while self.ops_in_progress != 0:
+ waiter()
+ if self.is_connection_valid():
+ self.disconnect()
+ else:
+ self.abort()
+
+ def is_connection_valid(self):
+ fs_id = None
+ try:
+ fs_id = self.get_fs_id()
+ except:
+ # the filesystem does not exist now -- connection is not valid.
+ pass
+ log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
+ return self.fs_id == fs_id
+
+ def is_connection_idle(self, timeout):
+ return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+ def connect(self):
+ assert self.ops_in_progress == 0
+ log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+ self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+ log.debug("Setting user ID and group ID of CephFS mount as root...")
+ self.fs.conf_set("client_mount_uid", "0")
+ self.fs.conf_set("client_mount_gid", "0")
+ log.debug("CephFS initializing...")
+ self.fs.init()
+ log.debug("CephFS mounting...")
+ self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+ log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+ def disconnect(self):
+ try:
+ assert self.fs
+ assert self.ops_in_progress == 0
+ log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+ self.fs.shutdown()
+ self.fs = None
+ except Exception as e:
+ log.debug("disconnect: ({0})".format(e))
+ raise
+
+ def abort(self):
+ assert self.fs
+ assert self.ops_in_progress == 0
+ log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+ self.fs.abort_conn()
+ log.info("abort done from cephfs '{0}'".format(self.fs_name))
+ self.fs = None
+
+ class RTimer(Timer):
+ """
+ recurring timer variant of Timer
+ """
+ def run(self):
+ try:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+ except Exception as e:
+ log.error("ConnectionPool.RTimer: %s", e)
+ raise
+
+ # TODO: make this configurable
+ TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
+ CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.connections = {}
+ self.lock = Lock()
+ self.cond = Condition(self.lock)
+ self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+ self.cleanup_connections)
+ self.timer_task.start()
+
+ def cleanup_connections(self):
+ with self.lock:
+ log.info("scanning for idle connections..")
+ idle_fs = [fs_name for fs_name,conn in self.connections.items()
+ if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+ for fs_name in idle_fs:
+ log.info("cleaning up connection for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name)
+
+ def get_fs_handle(self, fs_name):
+ with self.lock:
+ conn = None
+ try:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ if conn.is_connection_valid():
+ return conn.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+ self._del_fs_handle(fs_name)
+ conn = ConnectionPool.Connection(self.mgr, fs_name)
+ conn.connect()
+ except cephfs.Error as e:
+ # try to provide a better error string if possible
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+ raise VolumeException(-e.args[0], e.args[1])
+ self.connections[fs_name] = conn
+ return conn.get_fs_handle()
+
+ def put_fs_handle(self, fs_name):
+ with self.lock:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+
+ def _del_fs_handle(self, fs_name, wait=False):
+ conn = self.connections.pop(fs_name, None)
+ if conn:
+ conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+ def del_fs_handle(self, fs_name, wait=False):
+ with self.lock:
+ self._del_fs_handle(fs_name, wait)
+
+ def del_all_handles(self):
+ with self.lock:
+ for fs_name in list(self.connections.keys()):
+ log.info("waiting for pending ops for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name, wait=True)
+ log.info("pending ops completed for '{}'".format(fs_name))
+ # no new connections should have been initialized since its
+ # guarded on shutdown.
+ assert len(self.connections) == 0
+
+def gen_pool_names(volname):
+ """
+ return metadata and data pool name (from a filesystem/volume name) as a tuple
+ """
+ return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
+
+def get_mds_map(mgr, volname):
+ """
+ return mdsmap for a volname
+ """
+ mds_map = None
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ return f['mdsmap']
+ return mds_map
+
+def get_pool_names(mgr, volname):
+ """
+ return metadata and data pools (list) names of volume as a tuple
+ """
+ fs_map = mgr.get("fs_map")
+ metadata_pool_id = None
+ data_pool_ids = [] # type: List[int]
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ metadata_pool_id = f['mdsmap']['metadata_pool']
+ data_pool_ids = f['mdsmap']['data_pools']
+ break
+ if metadata_pool_id is None:
+ return None, None
+
+ osdmap = mgr.get("osd_map")
+ pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']])
+ metadata_pool = pools[metadata_pool_id]
+ data_pools = [pools[id] for id in data_pool_ids]
+ return metadata_pool, data_pools
+
+def create_volume(mgr, volname):
+ """
+ create volume (pool, filesystem and mds)
+ """
+ metadata_pool, data_pool = gen_pool_names(volname)
+ # create pools
+ r, outs, outb = create_pool(mgr, metadata_pool, 16)
+ if r != 0:
+ return r, outb, outs
+ r, outb, outs = create_pool(mgr, data_pool, 8)
+ if r != 0:
+ #cleanup
+ remove_pool(mgr, metadata_pool)
+ return r, outb, outs
+ # create filesystem
+ r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
+ if r != 0:
+ log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
+ #cleanup
+ remove_pool(mgr, data_pool)
+ remove_pool(mgr, metadata_pool)
+ return r, outb, outs
+ # create mds
+ return create_mds(mgr, volname)
+
+def delete_volume(mgr, volname, metadata_pool, data_pools):
+ """
+ delete the given module (tear down mds, remove filesystem, remove pools)
+ """
+ # Tear down MDS daemons
+ try:
+ completion = mgr.remove_stateless_service("mds", volname)
+ mgr._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ log.warning("OrchestratorError, not tearing down MDS daemons")
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to tear down MDS daemons")
+ return -errno.EINVAL, "", str(e)
+
+ # In case orchestrator didn't tear down MDS daemons cleanly, or
+ # there was no orchestrator, we force the daemons down.
+ if volume_exists(mgr, volname):
+ r, outb, outs = remove_filesystem(mgr, volname)
+ if r != 0:
+ return r, outb, outs
+ else:
+ err = "Filesystem not found for volume '{0}'".format(volname)
+ log.warning(err)
+ return -errno.ENOENT, "", err
+ r, outb, outs = remove_pool(mgr, metadata_pool)
+ if r != 0:
+ return r, outb, outs
+
+ for data_pool in data_pools:
+ r, outb, outs = remove_pool(mgr, data_pool)
+ if r != 0:
+ return r, outb, outs
+ result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
+ return r, result_str, ""
+
+def list_volumes(mgr):
+ """
+ list all filesystem volumes.
+
+ :param: None
+ :return: None
+ """
+ result = []
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ result.append({'name': f['mdsmap']['fs_name']})
+ return result
+
+@contextmanager
+def open_volume(vc, volname):
+ """
+ open a volume for exclusive access. This API is to be used as a context manager.
+
+ :param vc: volume client instance
+ :param volname: volume name
+ :return: yields a volume handle (ceph filesystem handle)
+ """
+ if vc.is_stopping():
+ raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
+
+ g_lock = GlobalLock()
+ fs_handle = vc.connection_pool.get_fs_handle(volname)
+ try:
+ with g_lock.lock_op():
+ yield fs_handle
+ finally:
+ vc.connection_pool.put_fs_handle(volname)
+
+@contextmanager
+def open_volume_lockless(vc, volname):
+ """
+ open a volume with shared access. This API is to be used as a context manager.
+
+ :param vc: volume client instance
+ :param volname: volume name
+ :return: yields a volume handle (ceph filesystem handle)
+ """
+ if vc.is_stopping():
+ raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
+
+ fs_handle = vc.connection_pool.get_fs_handle(volname)
+ try:
+ yield fs_handle
+ finally:
+ vc.connection_pool.put_fs_handle(volname)
diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py
new file mode 100644
index 00000000..7c902572
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/purge_queue.py
@@ -0,0 +1,109 @@
+import errno
+import logging
+import os
+import stat
+
+import cephfs
+
+from .async_job import AsyncJobs
+from .exception import VolumeException
+from .operations.resolver import resolve_trash
+from .operations.template import SubvolumeOpType
+from .operations.group import open_group
+from .operations.subvolume import open_subvol
+from .operations.volume import open_volume, open_volume_lockless
+from .operations.trash import open_trashcan
+
+log = logging.getLogger(__name__)
+
+# helper for fetching a trash entry for a given volume
+def get_trash_entry_for_volume(volume_client, volname, running_jobs):
+ log.debug("fetching trash entry for volume '{0}'".format(volname))
+
+ try:
+ with open_volume_lockless(volume_client, volname) as fs_handle:
+ try:
+ with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ path = trashcan.get_trash_entry(running_jobs)
+ return 0, path
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return 0, None
+ raise ve
+ except VolumeException as ve:
+ log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
+ return ve.errno, None
+
+def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
+ groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
+ log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
+
+ try:
+ with open_volume(volume_client, volname) as fs_handle:
+ with open_group(fs_handle, volume_client.volspec, groupname) as group:
+ with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+ log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
+ if not subvolume.purgeable:
+ return
+ # this is fine under the global lock -- there are just a handful
+ # of entries in the subvolume to purge. moreover, the purge needs
+ # to be guarded since a create request might sneak in.
+ trashcan.purge(subvolume.base_path, should_cancel)
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ raise
+
+# helper for starting a purge operation on a trash entry
+def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
+ log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
+
+ ret = 0
+ try:
+ with open_volume_lockless(volume_client, volname) as fs_handle:
+ with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ try:
+ pth = os.path.join(trashcan.path, purge_entry)
+ stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISLNK(stx['mode']):
+ tgt = fs_handle.readlink(pth, 4096)
+ tgt = tgt[:stx['size']]
+ log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
+ delink = True
+ try:
+ trashcan.purge(tgt, should_cancel)
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ delink = False
+ return ve.errno
+ finally:
+ if delink:
+ subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
+ log.debug("purging trash link: {0}".format(purge_entry))
+ trashcan.delink(purge_entry)
+ else:
+ log.debug("purging entry pointing to trash: {0}".format(pth))
+ trashcan.purge(pth, should_cancel)
+ except cephfs.Error as e:
+ log.warn("failed to remove trash entry: {0}".format(e))
+ except VolumeException as ve:
+ ret = ve.errno
+ return ret
+
+class ThreadPoolPurgeQueueMixin(AsyncJobs):
+ """
+ Purge queue mixin class maintaining a pool of threads for purging trash entries.
+ Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
+ entries (belonging to a set of volumes) have huge directory tree's (such as, lots
+ of small files in a directory w/ deep directory trees), this model may lead to
+ _all_ threads purging entries for one volume (starving other volumes).
+ """
+ def __init__(self, volume_client, tp_size):
+ self.vc = volume_client
+ super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
+
+ def get_next_job(self, volname, running_jobs):
+ return get_trash_entry_for_volume(self.vc, volname, running_jobs)
+
+ def execute_job(self, volname, job, should_cancel):
+ purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)
diff --git a/src/pybind/mgr/volumes/fs/vol_spec.py b/src/pybind/mgr/volumes/fs/vol_spec.py
new file mode 100644
index 00000000..e18ab069
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/vol_spec.py
@@ -0,0 +1,37 @@
+import os
+
+class VolSpec(object):
+ """
+ specification of a "volume" -- base directory and various prefixes.
+ """
+
+ # where shall we (by default) create subvolumes
+ DEFAULT_SUBVOL_PREFIX = "/volumes"
+ # and the default namespace
+ DEFAULT_NS_PREFIX = "fsvolumens_"
+
+ def __init__(self, snapshot_prefix, subvolume_prefix=None, pool_ns_prefix=None):
+ self.snapshot_prefix = snapshot_prefix
+ self.subvolume_prefix = subvolume_prefix if subvolume_prefix else VolSpec.DEFAULT_SUBVOL_PREFIX
+ self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else VolSpec.DEFAULT_NS_PREFIX
+
+ @property
+ def snapshot_dir_prefix(self):
+ """
+ Return the snapshot directory prefix
+ """
+ return self.snapshot_prefix
+
+ @property
+ def base_dir(self):
+ """
+ Return the top level directory under which subvolumes/groups are created
+ """
+ return self.subvolume_prefix
+
+ @property
+ def fs_namespace(self):
+ """
+ return a filesystem namespace by stashing pool namespace prefix and subvolume-id
+ """
+ return self.pool_ns_prefix
diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py
new file mode 100644
index 00000000..ab287539
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/volume.py
@@ -0,0 +1,660 @@
+import json
+import errno
+import logging
+from threading import Event
+
+import cephfs
+
+from .fs_util import listdir
+
+from .operations.volume import ConnectionPool, open_volume, create_volume, \
+ delete_volume, list_volumes, get_pool_names
+from .operations.group import open_group, create_group, remove_group, open_group_unique
+from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
+ create_clone
+
+from .vol_spec import VolSpec
+from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .async_cloner import Cloner
+from .purge_queue import ThreadPoolPurgeQueueMixin
+from .operations.template import SubvolumeOpType
+
+log = logging.getLogger(__name__)
+
+ALLOWED_ACCESS_LEVELS = ('r', 'rw')
+
+
+def octal_str_to_decimal_int(mode):
+ try:
+ return int(mode, 8)
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+
+def name_to_json(names):
+ """
+ convert the list of names to json
+ """
+ namedict = []
+ for i in range(len(names)):
+ namedict.append({'name': names[i].decode('utf-8')})
+ return json.dumps(namedict, indent=4, sort_keys=True)
+
+class VolumeClient(object):
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.stopping = Event()
+ # volume specification
+ self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
+ self.connection_pool = ConnectionPool(self.mgr)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
+ self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
+ # on startup, queue purge job for available volumes to kickstart
+ # purge for leftover subvolume entries in trash. note that, if the
+ # trash directory does not exist or if there are no purge entries
+ # available for a volume, the volume is removed from the purge
+ # job list.
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ self.cloner.queue_job(fs['mdsmap']['fs_name'])
+ self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
+
+ def is_stopping(self):
+ return self.stopping.is_set()
+
+ def shutdown(self):
+ log.info("shutting down")
+ # first, note that we're shutting down
+ self.stopping.set()
+ # second, ask purge threads to quit
+ self.purge_queue.cancel_all_jobs()
+ # third, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_handles()
+
+ def cluster_log(self, msg, lvl=None):
+ """
+ log to cluster log with default log level as WARN.
+ """
+ if not lvl:
+ lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
+ self.mgr.cluster_log("cluster", lvl, msg)
+
+ def volume_exception_to_retval(self, ve):
+ """
+ return a tuple representation from a volume exception
+ """
+ return ve.to_tuple()
+
+ ### volume operations -- create, rm, ls
+
+ def create_fs_volume(self, volname):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+ return create_volume(self.mgr, volname)
+
+ def delete_fs_volume(self, volname, confirm):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+ if confirm != "--yes-i-really-mean-it":
+ return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
+ "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
+ "that is what you want, re-issue the command followed by " \
+ "--yes-i-really-mean-it.".format(volname)
+
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'config get',
+ 'key': 'mon_allow_pool_delete',
+ 'who': 'mon.*',
+ 'format': 'json',
+ })
+ if ret != 0:
+ return ret, out, err
+ mon_allow_pool_delete = json.loads(out)
+ if not mon_allow_pool_delete:
+ return -errno.EPERM, "", "pool deletion is disabled; you must first " \
+ "set the mon_allow_pool_delete config option to true before volumes " \
+ "can be deleted"
+
+ metadata_pool, data_pools = get_pool_names(self.mgr, volname)
+ if not metadata_pool:
+ return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
+ self.purge_queue.cancel_jobs(volname)
+ self.connection_pool.del_fs_handle(volname, wait=True)
+ return delete_volume(self.mgr, volname, metadata_pool, data_pools)
+
+ def list_fs_volumes(self):
+ if self.stopping.is_set():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+ volumes = list_volumes(self.mgr)
+ return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
+
+ ### subvolume operations
+
+ def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+ isolate_nspace = kwargs['namespace_isolated']
+
+ oct_mode = octal_str_to_decimal_int(mode)
+ try:
+ create_subvol(
+ self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
+ except VolumeException as ve:
+ # kick the purge threads for async removal -- note that this
+ # assumes that the subvolume is moved to trashcan for cleanup on error.
+ self.purge_queue.queue_job(volname)
+ raise ve
+
+ def create_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ isolate_nspace = kwargs['namespace_isolated']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ try:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
+ # idempotent creation -- valid. Attributes set is supported.
+ attrs = {
+ 'uid': uid if uid else subvolume.uid,
+ 'gid': gid if gid else subvolume.gid,
+ 'data_pool': pool,
+ 'pool_namespace': subvolume.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ subvolume.set_attrs(subvolume.path, attrs)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
+ else:
+ raise
+ except VolumeException as ve:
+ # volume/group does not exist or subvolume creation failed
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+ retainsnaps = kwargs['retain_snapshots']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps)
+ # kick the purge threads for async removal -- note that this
+ # assumes that the subvolume is moved to trash can.
+ # TODO: make purge queue as singleton so that trash can kicks
+ # the purge threads on dump.
+ self.purge_queue.queue_job(volname)
+ except VolumeException as ve:
+ if ve.errno == -errno.EAGAIN:
+ ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
+ ret = self.volume_exception_to_retval(ve)
+ elif not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def authorize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+ accesslevel = kwargs['access_level']
+ tenant_id = kwargs['tenant_id']
+ allow_existing_id = kwargs['allow_existing_id']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
+ key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id)
+ ret = 0, key, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def deauthorize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume:
+ subvolume.deauthorize(authid)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def authorized_list(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
+ auths = subvolume.authorized_list()
+ ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def evict(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
+ key = subvolume.evict(volname, authid)
+ ret = 0, "", ""
+ except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
+ if isinstance(e, VolumeException):
+ ret = self.volume_exception_to_retval(e)
+ elif isinstance(e, ClusterTimeout):
+ ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
+ elif isinstance(e, ClusterError):
+ ret = e._result_code , "", e._result_str
+ elif isinstance(e, EvictionError):
+ ret = -errno.EINVAL, "", str(e)
+ return ret
+
+ def resize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ newsize = kwargs['new_size']
+ noshrink = kwargs['no_shrink']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
+ nsize, usedbytes = subvolume.resize(newsize, noshrink)
+ ret = 0, json.dumps(
+ [{'bytes_used': usedbytes},{'bytes_quota': nsize},
+ {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
+ indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_getpath(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
+ subvolpath = subvolume.path
+ ret = 0, subvolpath.decode("utf-8"), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_info(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
+ mon_addr_lst = []
+ mon_map_mons = self.mgr.get('mon_map')['mons']
+ for mon in mon_map_mons:
+ ip_port = mon['addr'].split("/")[0]
+ mon_addr_lst.append(ip_port)
+
+ subvol_info_dict = subvolume.info()
+ subvol_info_dict["mon_addrs"] = mon_addr_lst
+ ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolumes(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ subvolumes = group.list_subvolumes()
+ ret = 0, name_to_json(subvolumes), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### subvolume snapshot
+
+ def create_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
+ subvolume.create_snapshot(snapname)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
+ subvolume.remove_snapshot(snapname)
+ except VolumeException as ve:
+ # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
+ # we should tickle the purge jobs to purge the same
+ if ve.errno == -errno.ESTALE:
+ self.purge_queue.queue_job(volname)
+ elif not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_snapshot_info(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
+ snap_info_dict = subvolume.snapshot_info(snapname)
+ ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_snapshots(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
+ snapshots = subvolume.list_snapshots()
+ ret = 0, name_to_json(snapshots), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def protect_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
+ log.warning("snapshot protect call is deprecated and will be removed in a future release")
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def unprotect_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
+ log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
+ t_pool = kwargs['pool_layout']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+ t_groupname = kwargs['target_group_name']
+
+ create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
+ with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
+ try:
+ if t_groupname == s_groupname and t_subvolname == s_subvolname:
+ t_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ else:
+ s_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ self.cloner.queue_job(volname)
+ except VolumeException as ve:
+ try:
+ t_subvolume.remove()
+ self.purge_queue.queue_job(volname)
+ except Exception as e:
+ log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
+ raise ve
+
+ def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
+ s_snapname = kwargs['snap_name']
+ target_subvolname = kwargs['target_sub_name']
+ target_groupname = kwargs['target_group_name']
+ s_groupname = kwargs['group_name']
+
+ if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
+
+ with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
+ try:
+ with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
+ raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
+ target_group, target_subvolname, **kwargs)
+ else:
+ raise
+
+ def clone_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, s_groupname) as s_group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def clone_status(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ clonename = kwargs['clone_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
+ ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def clone_cancel(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ clonename = kwargs['clone_name']
+ groupname = kwargs['group_name']
+
+ try:
+ self.cloner.cancel_job(volname, (clonename, groupname))
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### group operations
+
+ def create_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ try:
+ with open_group(fs_handle, self.volspec, groupname):
+ # idempotent creation -- valid.
+ pass
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ oct_mode = octal_str_to_decimal_int(mode)
+ create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
+ else:
+ raise
+ except VolumeException as ve:
+ # volume does not exist or subvolume group creation failed
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ remove_group(fs_handle, self.volspec, groupname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def getpath_subvolume_group(self, **kwargs):
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ return 0, group.path.decode('utf-8'), ""
+ except VolumeException as ve:
+ return self.volume_exception_to_retval(ve)
+
+ def list_subvolume_groups(self, **kwargs):
+ volname = kwargs['vol_name']
+ ret = 0, '[]', ""
+ try:
+ with open_volume(self, volname) as fs_handle:
+ groups = listdir(fs_handle, self.volspec.base_dir)
+ ret = 0, name_to_json(groups), ""
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### group snapshot
+
+ def create_subvolume_group_snapshot(self, **kwargs):
+ ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ # snapname = kwargs['snap_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
+ # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
+ # group.create_snapshot(snapname)
+ pass
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_group_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ snapname = kwargs['snap_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ group.remove_snapshot(snapname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_group_snapshots(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ snapshots = group.list_snapshots()
+ ret = 0, name_to_json(snapshots), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret