diff options
Diffstat (limited to 'src/pybind/mgr/volumes/fs/volume.py')
-rw-r--r-- | src/pybind/mgr/volumes/fs/volume.py | 999 |
1 files changed, 999 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py new file mode 100644 index 000000000..e2576ffff --- /dev/null +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -0,0 +1,999 @@ +import json +import errno +import logging +import os +import mgr_util +from typing import TYPE_CHECKING + +import cephfs + +from mgr_util import CephfsClient + +from .fs_util import listdir, has_subdir + +from .operations.group import open_group, create_group, remove_group, \ + open_group_unique, set_group_attrs +from .operations.volume import create_volume, delete_volume, \ + list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count +from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ + create_clone +from .operations.trash import Trash + +from .vol_spec import VolSpec +from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError +from .async_cloner import Cloner +from .purge_queue import ThreadPoolPurgeQueueMixin +from .operations.template import SubvolumeOpType + +if TYPE_CHECKING: + from volumes import Module + +log = logging.getLogger(__name__) + +ALLOWED_ACCESS_LEVELS = ('r', 'rw') + + +def octal_str_to_decimal_int(mode): + try: + return int(mode, 8) + except ValueError: + raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode)) + + +def name_to_json(names): + """ + convert the list of names to json + """ + namedict = [] + for i in range(len(names)): + namedict.append({'name': names[i].decode('utf-8')}) + return json.dumps(namedict, indent=4, sort_keys=True) + + +class VolumeClient(CephfsClient["Module"]): + def __init__(self, mgr): + super().__init__(mgr) + # volume specification + self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay) + self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) + # on startup, queue purge job for available volumes to kickstart + # purge for leftover subvolume entries in trash. note that, if the + # trash directory does not exist or if there are no purge entries + # available for a volume, the volume is removed from the purge + # job list. + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + self.cloner.queue_job(fs['mdsmap']['fs_name']) + self.purge_queue.queue_job(fs['mdsmap']['fs_name']) + + def shutdown(self): + # Overrides CephfsClient.shutdown() + log.info("shutting down") + # first, note that we're shutting down + self.stopping.set() + # stop clones + self.cloner.shutdown() + # stop purge threads + self.purge_queue.shutdown() + # last, delete all libcephfs handles from connection pool + self.connection_pool.del_all_connections() + + def cluster_log(self, msg, lvl=None): + """ + log to cluster log with default log level as WARN. + """ + if not lvl: + lvl = self.mgr.ClusterLogPrio.WARN + self.mgr.cluster_log("cluster", lvl, msg) + + def volume_exception_to_retval(self, ve): + """ + return a tuple representation from a volume exception + """ + return ve.to_tuple() + + ### volume operations -- create, rm, ls + + def create_fs_volume(self, volname, placement): + if self.is_stopping(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + return create_volume(self.mgr, volname, placement) + + def delete_fs_volume(self, volname, confirm): + if self.is_stopping(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + + if confirm != "--yes-i-really-mean-it": + return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \ + "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \ + "that is what you want, re-issue the command followed by " \ + "--yes-i-really-mean-it.".format(volname) + + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config get', + 'key': 'mon_allow_pool_delete', + 'who': 'mon', + 'format': 'json', + }) + mon_allow_pool_delete = json.loads(out) + if not mon_allow_pool_delete: + return -errno.EPERM, "", "pool deletion is disabled; you must first " \ + "set the mon_allow_pool_delete config option to true before volumes " \ + "can be deleted" + + metadata_pool, data_pools = get_pool_names(self.mgr, volname) + if not metadata_pool: + return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname) + self.purge_queue.cancel_jobs(volname) + self.connection_pool.del_connections(volname, wait=True) + return delete_volume(self.mgr, volname, metadata_pool, data_pools) + + def list_fs_volumes(self): + if self.stopping.is_set(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + volumes = list_volumes(self.mgr) + return 0, json.dumps(volumes, indent=4, sort_keys=True), "" + + def volume_info(self, **kwargs): + ret = None + volname = kwargs['vol_name'] + human_readable = kwargs['human_readable'] + + try: + with open_volume(self, volname) as fs_handle: + path = self.volspec.base_dir + vol_info_dict = {} + try: + st = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_SIZE, + cephfs.AT_SYMLINK_NOFOLLOW) + + usedbytes = st['size'] + vol_info_dict = get_pending_subvol_deletions_count(path) + if human_readable: + vol_info_dict['used_size'] = mgr_util.format_bytes(int(usedbytes), 5) + else: + vol_info_dict['used_size'] = int(usedbytes) + except cephfs.Error as e: + if e.args[0] == errno.ENOENT: + pass + df = self.mgr.get("df") + pool_stats = dict([(p['id'], p['stats']) for p in df['pools']]) + osdmap = self.mgr.get("osd_map") + pools = dict([(p['pool'], p) for p in osdmap['pools']]) + metadata_pool_id, data_pool_ids = get_pool_ids(self.mgr, volname) + vol_info_dict["pools"] = {"metadata": [], "data": []} + for pool_id in [metadata_pool_id] + data_pool_ids: + if pool_id == metadata_pool_id: + pool_type = "metadata" + else: + pool_type = "data" + if human_readable: + vol_info_dict["pools"][pool_type].append({ + 'name': pools[pool_id]['pool_name'], + 'used': mgr_util.format_bytes(pool_stats[pool_id]['bytes_used'], 5), + 'avail': mgr_util.format_bytes(pool_stats[pool_id]['max_avail'], 5)}) + else: + vol_info_dict["pools"][pool_type].append({ + 'name': pools[pool_id]['pool_name'], + 'used': pool_stats[pool_id]['bytes_used'], + 'avail': pool_stats[pool_id]['max_avail']}) + + mon_addr_lst = [] + mon_map_mons = self.mgr.get('mon_map')['mons'] + for mon in mon_map_mons: + ip_port = mon['addr'].split("/")[0] + mon_addr_lst.append(ip_port) + vol_info_dict["mon_addrs"] = mon_addr_lst + ret = 0, json.dumps(vol_info_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + ### subvolume operations + + def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs): + size = kwargs['size'] + pool = kwargs['pool_layout'] + uid = kwargs['uid'] + gid = kwargs['gid'] + mode = kwargs['mode'] + isolate_nspace = kwargs['namespace_isolated'] + + oct_mode = octal_str_to_decimal_int(mode) + try: + create_subvol( + self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid) + except VolumeException as ve: + # kick the purge threads for async removal -- note that this + # assumes that the subvolume is moved to trashcan for cleanup on error. + self.purge_queue.queue_job(volname) + raise ve + + def create_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + size = kwargs['size'] + pool = kwargs['pool_layout'] + uid = kwargs['uid'] + gid = kwargs['gid'] + mode = kwargs['mode'] + isolate_nspace = kwargs['namespace_isolated'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + try: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume: + # idempotent creation -- valid. Attributes set is supported. + attrs = { + 'uid': uid if uid else subvolume.uid, + 'gid': gid if gid else subvolume.gid, + 'mode': octal_str_to_decimal_int(mode), + 'data_pool': pool, + 'pool_namespace': subvolume.namespace if isolate_nspace else None, + 'quota': size + } + subvolume.set_attrs(subvolume.path, attrs) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs) + else: + raise + except VolumeException as ve: + # volume/group does not exist or subvolume creation failed + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] + retainsnaps = kwargs['retain_snapshots'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps) + # kick the purge threads for async removal -- note that this + # assumes that the subvolume is moved to trash can. + # TODO: make purge queue as singleton so that trash can kicks + # the purge threads on dump. + self.purge_queue.queue_job(volname) + except VolumeException as ve: + if ve.errno == -errno.EAGAIN and not force: + ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)") + ret = self.volume_exception_to_retval(ve) + elif not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def authorize_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + authid = kwargs['auth_id'] + groupname = kwargs['group_name'] + accesslevel = kwargs['access_level'] + tenant_id = kwargs['tenant_id'] + allow_existing_id = kwargs['allow_existing_id'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume: + key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id) + ret = 0, key, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def deauthorize_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + authid = kwargs['auth_id'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume: + subvolume.deauthorize(authid) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def authorized_list(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume: + auths = subvolume.authorized_list() + ret = 0, json.dumps(auths, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def evict(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + authid = kwargs['auth_id'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume: + key = subvolume.evict(volname, authid) + ret = 0, "", "" + except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e: + if isinstance(e, VolumeException): + ret = self.volume_exception_to_retval(e) + elif isinstance(e, ClusterTimeout): + ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster" + elif isinstance(e, ClusterError): + ret = e._result_code , "", e._result_str + elif isinstance(e, EvictionError): + ret = -errno.EINVAL, "", str(e) + return ret + + def resize_subvolume(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + newsize = kwargs['new_size'] + noshrink = kwargs['no_shrink'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume: + nsize, usedbytes = subvolume.resize(newsize, noshrink) + ret = 0, json.dumps( + [{'bytes_used': usedbytes},{'bytes_quota': nsize}, + {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}], + indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_pin(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + pin_type = kwargs['pin_type'] + pin_setting = kwargs['pin_setting'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume: + subvolume.pin(pin_type, pin_setting) + ret = 0, json.dumps({}), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_getpath(self, **kwargs): + ret = None + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume: + subvolpath = subvolume.path + ret = 0, subvolpath.decode("utf-8"), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_info(self, **kwargs): + ret = None + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume: + mon_addr_lst = [] + mon_map_mons = self.mgr.get('mon_map')['mons'] + for mon in mon_map_mons: + ip_port = mon['addr'].split("/")[0] + mon_addr_lst.append(ip_port) + + subvol_info_dict = subvolume.info() + subvol_info_dict["mon_addrs"] = mon_addr_lst + ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def set_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + value = kwargs['value'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume: + subvolume.set_user_metadata(keyname, value) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def get_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume: + value = subvolume.get_user_metadata(keyname) + ret = 0, value, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def list_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume: + subvol_metadata_dict = subvolume.list_user_metadata() + ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume: + subvolume.remove_user_metadata(keyname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolumes(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + subvolumes = group.list_subvolumes() + ret = 0, name_to_json(subvolumes), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_exists(self, **kwargs): + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + ret = 0, "", "" + volume_exists = False + + try: + with open_volume(self, volname) as fs_handle: + volume_exists = True + with open_group(fs_handle, self.volspec, groupname) as group: + res = group.has_subvolumes() + if res: + ret = 0, "subvolume exists", "" + else: + ret = 0, "no subvolume exists", "" + except VolumeException as ve: + if volume_exists and ve.errno == -errno.ENOENT: + ret = 0, "no subvolume exists", "" + else: + ret = self.volume_exception_to_retval(ve) + return ret + + ### subvolume snapshot + + def create_subvolume_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume: + subvolume.create_snapshot(snapname) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume: + subvolume.remove_snapshot(snapname, force) + except VolumeException as ve: + # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and, + # we should tickle the purge jobs to purge the same + if ve.errno == -errno.ESTALE: + self.purge_queue.queue_job(volname) + elif not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_snapshot_info(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume: + snap_info_dict = subvolume.snapshot_info(snapname) + ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def set_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + value = kwargs['value'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + subvolume.set_snapshot_metadata(snapname, keyname, value) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def get_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + value = subvolume.get_snapshot_metadata(snapname, keyname) + ret = 0, value, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + snap_metadata_dict = subvolume.list_snapshot_metadata(snapname) + ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + subvolume.remove_snapshot_metadata(snapname, keyname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolume_snapshots(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume: + snapshots = subvolume.list_snapshots() + ret = 0, name_to_json(snapshots), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def protect_subvolume_snapshot(self, **kwargs): + ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume: + log.warning("snapshot protect call is deprecated and will be removed in a future release") + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def unprotect_subvolume_snapshot(self, **kwargs): + ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume: + log.warning("snapshot unprotect call is deprecated and will be removed in a future release") + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs): + t_pool = kwargs['pool_layout'] + s_subvolname = kwargs['sub_name'] + s_groupname = kwargs['group_name'] + t_groupname = kwargs['target_group_name'] + + create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname) + with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume: + try: + if t_groupname == s_groupname and t_subvolname == s_subvolname: + t_subvolume.attach_snapshot(s_snapname, t_subvolume) + else: + s_subvolume.attach_snapshot(s_snapname, t_subvolume) + self.cloner.queue_job(volname) + except VolumeException as ve: + try: + t_subvolume.remove() + self.purge_queue.queue_job(volname) + except Exception as e: + log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e)) + raise ve + + def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs): + s_snapname = kwargs['snap_name'] + target_subvolname = kwargs['target_sub_name'] + target_groupname = kwargs['target_group_name'] + s_groupname = kwargs['group_name'] + + if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname)) + + with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group: + try: + with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE): + raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname)) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname, + target_group, target_subvolname, **kwargs) + else: + raise + + def clone_subvolume_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + s_subvolname = kwargs['sub_name'] + s_groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, s_groupname) as s_group: + with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: + self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def clone_status(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + clonename = kwargs['clone_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume: + ret = 0, json.dumps({'status' : subvolume.status}, indent=2), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def clone_cancel(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + clonename = kwargs['clone_name'] + groupname = kwargs['group_name'] + + try: + self.cloner.cancel_job(volname, (clonename, groupname)) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + ### group operations + + def create_subvolume_group(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + size = kwargs['size'] + pool = kwargs['pool_layout'] + uid = kwargs['uid'] + gid = kwargs['gid'] + mode = kwargs['mode'] + + try: + with open_volume(self, volname) as fs_handle: + try: + with open_group(fs_handle, self.volspec, groupname) as group: + # idempotent creation -- valid. + attrs = { + 'uid': uid, + 'gid': gid, + 'mode': octal_str_to_decimal_int(mode), + 'data_pool': pool, + 'quota': size + } + set_group_attrs(fs_handle, group.path, attrs) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + oct_mode = octal_str_to_decimal_int(mode) + create_group(fs_handle, self.volspec, groupname, size, pool, oct_mode, uid, gid) + else: + raise + except VolumeException as ve: + # volume does not exist or subvolume group creation failed + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_group(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + remove_group(fs_handle, self.volspec, groupname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolumegroup_info(self, **kwargs): + ret = None + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + mon_addr_lst = [] + mon_map_mons = self.mgr.get('mon_map')['mons'] + for mon in mon_map_mons: + ip_port = mon['addr'].split("/")[0] + mon_addr_lst.append(ip_port) + + group_info_dict = group.info() + group_info_dict["mon_addrs"] = mon_addr_lst + ret = 0, json.dumps(group_info_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def resize_subvolume_group(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + newsize = kwargs['new_size'] + noshrink = kwargs['no_shrink'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + nsize, usedbytes = group.resize(newsize, noshrink) + ret = 0, json.dumps( + [{'bytes_used': usedbytes},{'bytes_quota': nsize}, + {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}], + indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def getpath_subvolume_group(self, **kwargs): + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + return 0, group.path.decode('utf-8'), "" + except VolumeException as ve: + return self.volume_exception_to_retval(ve) + + def list_subvolume_groups(self, **kwargs): + volname = kwargs['vol_name'] + ret = 0, '[]', "" + volume_exists = False + try: + with open_volume(self, volname) as fs_handle: + volume_exists = True + groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS]) + ret = 0, name_to_json(groups), "" + except VolumeException as ve: + if not ve.errno == -errno.ENOENT or not volume_exists: + ret = self.volume_exception_to_retval(ve) + return ret + + def pin_subvolume_group(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + pin_type = kwargs['pin_type'] + pin_setting = kwargs['pin_setting'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + group.pin(pin_type, pin_setting) + ret = 0, json.dumps({}), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def subvolume_group_exists(self, **kwargs): + volname = kwargs['vol_name'] + ret = 0, "", "" + volume_exists = False + + try: + with open_volume(self, volname) as fs_handle: + volume_exists = True + res = has_subdir(fs_handle, self.volspec.base_dir, filter_entries=[ + dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS]) + if res: + ret = 0, "subvolumegroup exists", "" + else: + ret = 0, "no subvolumegroup exists", "" + except VolumeException as ve: + if volume_exists and ve.errno == -errno.ENOENT: + ret = 0, "no subvolumegroup exists", "" + else: + ret = self.volume_exception_to_retval(ve) + return ret + + ### group snapshot + + def create_subvolume_group_snapshot(self, **kwargs): + ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + # snapname = kwargs['snap_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots + # at the subvolume group (see: https://tracker.ceph.com/issues/46074) + # group.create_snapshot(snapname) + pass + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_group_snapshot(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + snapname = kwargs['snap_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + group.remove_snapshot(snapname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolume_group_snapshots(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + snapshots = group.list_snapshots() + ret = 0, name_to_json(snapshots), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret |