summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/volume.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/volumes/fs/volume.py')
-rw-r--r--src/pybind/mgr/volumes/fs/volume.py999
1 files changed, 999 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py
new file mode 100644
index 000000000..e2576ffff
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/volume.py
@@ -0,0 +1,999 @@
+import json
+import errno
+import logging
+import os
+import mgr_util
+from typing import TYPE_CHECKING
+
+import cephfs
+
+from mgr_util import CephfsClient
+
+from .fs_util import listdir, has_subdir
+
+from .operations.group import open_group, create_group, remove_group, \
+ open_group_unique, set_group_attrs
+from .operations.volume import create_volume, delete_volume, \
+ list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
+from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
+ create_clone
+from .operations.trash import Trash
+
+from .vol_spec import VolSpec
+from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .async_cloner import Cloner
+from .purge_queue import ThreadPoolPurgeQueueMixin
+from .operations.template import SubvolumeOpType
+
+if TYPE_CHECKING:
+ from volumes import Module
+
+log = logging.getLogger(__name__)
+
+ALLOWED_ACCESS_LEVELS = ('r', 'rw')
+
+
+def octal_str_to_decimal_int(mode):
+ try:
+ return int(mode, 8)
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+
+
+def name_to_json(names):
+ """
+ convert the list of names to json
+ """
+ namedict = []
+ for i in range(len(names)):
+ namedict.append({'name': names[i].decode('utf-8')})
+ return json.dumps(namedict, indent=4, sort_keys=True)
+
+
+class VolumeClient(CephfsClient["Module"]):
+ def __init__(self, mgr):
+ super().__init__(mgr)
+ # volume specification
+ self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+ self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
+ # on startup, queue purge job for available volumes to kickstart
+ # purge for leftover subvolume entries in trash. note that, if the
+ # trash directory does not exist or if there are no purge entries
+ # available for a volume, the volume is removed from the purge
+ # job list.
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ self.cloner.queue_job(fs['mdsmap']['fs_name'])
+ self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
+
+ def shutdown(self):
+ # Overrides CephfsClient.shutdown()
+ log.info("shutting down")
+ # first, note that we're shutting down
+ self.stopping.set()
+ # stop clones
+ self.cloner.shutdown()
+ # stop purge threads
+ self.purge_queue.shutdown()
+ # last, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_connections()
+
+ def cluster_log(self, msg, lvl=None):
+ """
+ log to cluster log with default log level as WARN.
+ """
+ if not lvl:
+ lvl = self.mgr.ClusterLogPrio.WARN
+ self.mgr.cluster_log("cluster", lvl, msg)
+
+ def volume_exception_to_retval(self, ve):
+ """
+ return a tuple representation from a volume exception
+ """
+ return ve.to_tuple()
+
+ ### volume operations -- create, rm, ls
+
+ def create_fs_volume(self, volname, placement):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+ return create_volume(self.mgr, volname, placement)
+
+ def delete_fs_volume(self, volname, confirm):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+ if confirm != "--yes-i-really-mean-it":
+ return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
+ "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
+ "that is what you want, re-issue the command followed by " \
+ "--yes-i-really-mean-it.".format(volname)
+
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'key': 'mon_allow_pool_delete',
+ 'who': 'mon',
+ 'format': 'json',
+ })
+ mon_allow_pool_delete = json.loads(out)
+ if not mon_allow_pool_delete:
+ return -errno.EPERM, "", "pool deletion is disabled; you must first " \
+ "set the mon_allow_pool_delete config option to true before volumes " \
+ "can be deleted"
+
+ metadata_pool, data_pools = get_pool_names(self.mgr, volname)
+ if not metadata_pool:
+ return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
+ self.purge_queue.cancel_jobs(volname)
+ self.connection_pool.del_connections(volname, wait=True)
+ return delete_volume(self.mgr, volname, metadata_pool, data_pools)
+
+ def list_fs_volumes(self):
+ if self.stopping.is_set():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+ volumes = list_volumes(self.mgr)
+ return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
+
+ def volume_info(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ human_readable = kwargs['human_readable']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ path = self.volspec.base_dir
+ vol_info_dict = {}
+ try:
+ st = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_SIZE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+
+ usedbytes = st['size']
+ vol_info_dict = get_pending_subvol_deletions_count(path)
+ if human_readable:
+ vol_info_dict['used_size'] = mgr_util.format_bytes(int(usedbytes), 5)
+ else:
+ vol_info_dict['used_size'] = int(usedbytes)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ pass
+ df = self.mgr.get("df")
+ pool_stats = dict([(p['id'], p['stats']) for p in df['pools']])
+ osdmap = self.mgr.get("osd_map")
+ pools = dict([(p['pool'], p) for p in osdmap['pools']])
+ metadata_pool_id, data_pool_ids = get_pool_ids(self.mgr, volname)
+ vol_info_dict["pools"] = {"metadata": [], "data": []}
+ for pool_id in [metadata_pool_id] + data_pool_ids:
+ if pool_id == metadata_pool_id:
+ pool_type = "metadata"
+ else:
+ pool_type = "data"
+ if human_readable:
+ vol_info_dict["pools"][pool_type].append({
+ 'name': pools[pool_id]['pool_name'],
+ 'used': mgr_util.format_bytes(pool_stats[pool_id]['bytes_used'], 5),
+ 'avail': mgr_util.format_bytes(pool_stats[pool_id]['max_avail'], 5)})
+ else:
+ vol_info_dict["pools"][pool_type].append({
+ 'name': pools[pool_id]['pool_name'],
+ 'used': pool_stats[pool_id]['bytes_used'],
+ 'avail': pool_stats[pool_id]['max_avail']})
+
+ mon_addr_lst = []
+ mon_map_mons = self.mgr.get('mon_map')['mons']
+ for mon in mon_map_mons:
+ ip_port = mon['addr'].split("/")[0]
+ mon_addr_lst.append(ip_port)
+ vol_info_dict["mon_addrs"] = mon_addr_lst
+ ret = 0, json.dumps(vol_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### subvolume operations
+
+ def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+ isolate_nspace = kwargs['namespace_isolated']
+
+ oct_mode = octal_str_to_decimal_int(mode)
+ try:
+ create_subvol(
+ self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
+ except VolumeException as ve:
+ # kick the purge threads for async removal -- note that this
+ # assumes that the subvolume is moved to trashcan for cleanup on error.
+ self.purge_queue.queue_job(volname)
+ raise ve
+
+ def create_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+ isolate_nspace = kwargs['namespace_isolated']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ try:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
+ # idempotent creation -- valid. Attributes set is supported.
+ attrs = {
+ 'uid': uid if uid else subvolume.uid,
+ 'gid': gid if gid else subvolume.gid,
+ 'mode': octal_str_to_decimal_int(mode),
+ 'data_pool': pool,
+ 'pool_namespace': subvolume.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ subvolume.set_attrs(subvolume.path, attrs)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
+ else:
+ raise
+ except VolumeException as ve:
+ # volume/group does not exist or subvolume creation failed
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+ retainsnaps = kwargs['retain_snapshots']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps)
+ # kick the purge threads for async removal -- note that this
+ # assumes that the subvolume is moved to trash can.
+ # TODO: make purge queue as singleton so that trash can kicks
+ # the purge threads on dump.
+ self.purge_queue.queue_job(volname)
+ except VolumeException as ve:
+ if ve.errno == -errno.EAGAIN and not force:
+ ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
+ ret = self.volume_exception_to_retval(ve)
+ elif not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def authorize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+ accesslevel = kwargs['access_level']
+ tenant_id = kwargs['tenant_id']
+ allow_existing_id = kwargs['allow_existing_id']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
+ key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id)
+ ret = 0, key, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def deauthorize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume:
+ subvolume.deauthorize(authid)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def authorized_list(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
+ auths = subvolume.authorized_list()
+ ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def evict(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
+ key = subvolume.evict(volname, authid)
+ ret = 0, "", ""
+ except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
+ if isinstance(e, VolumeException):
+ ret = self.volume_exception_to_retval(e)
+ elif isinstance(e, ClusterTimeout):
+ ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
+ elif isinstance(e, ClusterError):
+ ret = e._result_code , "", e._result_str
+ elif isinstance(e, EvictionError):
+ ret = -errno.EINVAL, "", str(e)
+ return ret
+
+ def resize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ newsize = kwargs['new_size']
+ noshrink = kwargs['no_shrink']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
+ nsize, usedbytes = subvolume.resize(newsize, noshrink)
+ ret = 0, json.dumps(
+ [{'bytes_used': usedbytes},{'bytes_quota': nsize},
+ {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
+ indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_pin(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ pin_type = kwargs['pin_type']
+ pin_setting = kwargs['pin_setting']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
+ subvolume.pin(pin_type, pin_setting)
+ ret = 0, json.dumps({}), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_getpath(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
+ subvolpath = subvolume.path
+ ret = 0, subvolpath.decode("utf-8"), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_info(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
+ mon_addr_lst = []
+ mon_map_mons = self.mgr.get('mon_map')['mons']
+ for mon in mon_map_mons:
+ ip_port = mon['addr'].split("/")[0]
+ mon_addr_lst.append(ip_port)
+
+ subvol_info_dict = subvolume.info()
+ subvol_info_dict["mon_addrs"] = mon_addr_lst
+ ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def set_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ value = kwargs['value']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume:
+ subvolume.set_user_metadata(keyname, value)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def get_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume:
+ value = subvolume.get_user_metadata(keyname)
+ ret = 0, value, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume:
+ subvol_metadata_dict = subvolume.list_user_metadata()
+ ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume:
+ subvolume.remove_user_metadata(keyname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolumes(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ subvolumes = group.list_subvolumes()
+ ret = 0, name_to_json(subvolumes), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_exists(self, **kwargs):
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ ret = 0, "", ""
+ volume_exists = False
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ volume_exists = True
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ res = group.has_subvolumes()
+ if res:
+ ret = 0, "subvolume exists", ""
+ else:
+ ret = 0, "no subvolume exists", ""
+ except VolumeException as ve:
+ if volume_exists and ve.errno == -errno.ENOENT:
+ ret = 0, "no subvolume exists", ""
+ else:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### subvolume snapshot
+
+ def create_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
+ subvolume.create_snapshot(snapname)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
+ subvolume.remove_snapshot(snapname, force)
+ except VolumeException as ve:
+ # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
+ # we should tickle the purge jobs to purge the same
+ if ve.errno == -errno.ESTALE:
+ self.purge_queue.queue_job(volname)
+ elif not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_snapshot_info(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
+ snap_info_dict = subvolume.snapshot_info(snapname)
+ ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def set_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ value = kwargs['value']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ subvolume.set_snapshot_metadata(snapname, keyname, value)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def get_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ value = subvolume.get_snapshot_metadata(snapname, keyname)
+ ret = 0, value, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ snap_metadata_dict = subvolume.list_snapshot_metadata(snapname)
+ ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ subvolume.remove_snapshot_metadata(snapname, keyname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_snapshots(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
+ snapshots = subvolume.list_snapshots()
+ ret = 0, name_to_json(snapshots), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def protect_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
+ log.warning("snapshot protect call is deprecated and will be removed in a future release")
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def unprotect_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
+ log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
+ t_pool = kwargs['pool_layout']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+ t_groupname = kwargs['target_group_name']
+
+ create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
+ with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
+ try:
+ if t_groupname == s_groupname and t_subvolname == s_subvolname:
+ t_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ else:
+ s_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ self.cloner.queue_job(volname)
+ except VolumeException as ve:
+ try:
+ t_subvolume.remove()
+ self.purge_queue.queue_job(volname)
+ except Exception as e:
+ log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
+ raise ve
+
+ def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
+ s_snapname = kwargs['snap_name']
+ target_subvolname = kwargs['target_sub_name']
+ target_groupname = kwargs['target_group_name']
+ s_groupname = kwargs['group_name']
+
+ if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
+
+ with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
+ try:
+ with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
+ raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
+ target_group, target_subvolname, **kwargs)
+ else:
+ raise
+
+ def clone_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, s_groupname) as s_group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def clone_status(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ clonename = kwargs['clone_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
+ ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def clone_cancel(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ clonename = kwargs['clone_name']
+ groupname = kwargs['group_name']
+
+ try:
+ self.cloner.cancel_job(volname, (clonename, groupname))
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### group operations
+
+ def create_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ try:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ # idempotent creation -- valid.
+ attrs = {
+ 'uid': uid,
+ 'gid': gid,
+ 'mode': octal_str_to_decimal_int(mode),
+ 'data_pool': pool,
+ 'quota': size
+ }
+ set_group_attrs(fs_handle, group.path, attrs)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ oct_mode = octal_str_to_decimal_int(mode)
+ create_group(fs_handle, self.volspec, groupname, size, pool, oct_mode, uid, gid)
+ else:
+ raise
+ except VolumeException as ve:
+ # volume does not exist or subvolume group creation failed
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ remove_group(fs_handle, self.volspec, groupname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolumegroup_info(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ mon_addr_lst = []
+ mon_map_mons = self.mgr.get('mon_map')['mons']
+ for mon in mon_map_mons:
+ ip_port = mon['addr'].split("/")[0]
+ mon_addr_lst.append(ip_port)
+
+ group_info_dict = group.info()
+ group_info_dict["mon_addrs"] = mon_addr_lst
+ ret = 0, json.dumps(group_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def resize_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ newsize = kwargs['new_size']
+ noshrink = kwargs['no_shrink']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ nsize, usedbytes = group.resize(newsize, noshrink)
+ ret = 0, json.dumps(
+ [{'bytes_used': usedbytes},{'bytes_quota': nsize},
+ {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
+ indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def getpath_subvolume_group(self, **kwargs):
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ return 0, group.path.decode('utf-8'), ""
+ except VolumeException as ve:
+ return self.volume_exception_to_retval(ve)
+
+ def list_subvolume_groups(self, **kwargs):
+ volname = kwargs['vol_name']
+ ret = 0, '[]', ""
+ volume_exists = False
+ try:
+ with open_volume(self, volname) as fs_handle:
+ volume_exists = True
+ groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS])
+ ret = 0, name_to_json(groups), ""
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT or not volume_exists:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def pin_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ pin_type = kwargs['pin_type']
+ pin_setting = kwargs['pin_setting']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ group.pin(pin_type, pin_setting)
+ ret = 0, json.dumps({}), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_group_exists(self, **kwargs):
+ volname = kwargs['vol_name']
+ ret = 0, "", ""
+ volume_exists = False
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ volume_exists = True
+ res = has_subdir(fs_handle, self.volspec.base_dir, filter_entries=[
+ dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS])
+ if res:
+ ret = 0, "subvolumegroup exists", ""
+ else:
+ ret = 0, "no subvolumegroup exists", ""
+ except VolumeException as ve:
+ if volume_exists and ve.errno == -errno.ENOENT:
+ ret = 0, "no subvolumegroup exists", ""
+ else:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### group snapshot
+
+ def create_subvolume_group_snapshot(self, **kwargs):
+ ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ # snapname = kwargs['snap_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
+ # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
+ # group.create_snapshot(snapname)
+ pass
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_group_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ snapname = kwargs['snap_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ group.remove_snapshot(snapname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_group_snapshots(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ snapshots = group.list_snapshots()
+ ret = 0, name_to_json(snapshots), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret