diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/ceph_volume_client.py | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/ceph_volume_client.py')
-rw-r--r-- | src/pybind/ceph_volume_client.py | 1586 |
1 files changed, 1586 insertions, 0 deletions
diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py new file mode 100644 index 00000000..8fcb0cf8 --- /dev/null +++ b/src/pybind/ceph_volume_client.py @@ -0,0 +1,1586 @@ +""" +Copyright (C) 2015 Red Hat, Inc. + +LGPL2.1. See file COPYING. +""" + +from contextlib import contextmanager +import errno +import fcntl +import json +import logging +import os +import re +import struct +import sys +import threading +import time +import uuid + +from ceph_argparse import json_command + +import cephfs +import rados + +def to_bytes(param): + ''' + Helper method that returns byte representation of the given parameter. + ''' + if isinstance(param, str): + return param.encode('utf-8') + elif param is None: + return param + else: + return str(param).encode('utf-8') + +class RadosError(Exception): + """ + Something went wrong talking to Ceph with librados + """ + pass + + +RADOS_TIMEOUT = 10 + +log = logging.getLogger(__name__) + +# Reserved volume group name which we use in paths for volumes +# that are not assigned to a group (i.e. created with group=None) +NO_GROUP_NAME = "_nogroup" + +# Filename extensions for meta files. +META_FILE_EXT = ".meta" + +class VolumePath(object): + """ + Identify a volume's path as group->volume + The Volume ID is a unique identifier, but this is a much more + helpful thing to pass around. + """ + def __init__(self, group_id, volume_id): + self.group_id = group_id + self.volume_id = volume_id + assert self.group_id != NO_GROUP_NAME + assert self.volume_id != "" and self.volume_id is not None + + def __str__(self): + return "{0}/{1}".format(self.group_id, self.volume_id) + + +class ClusterTimeout(Exception): + """ + Exception indicating that we timed out trying to talk to the Ceph cluster, + either to the mons, or to any individual daemon that the mons indicate ought + to be up but isn't responding to us. + """ + pass + + +class ClusterError(Exception): + """ + Exception indicating that the cluster returned an error to a command that + we thought should be successful based on our last knowledge of the cluster + state. + """ + def __init__(self, action, result_code, result_str): + self._action = action + self._result_code = result_code + self._result_str = result_str + + def __str__(self): + return "Error {0} (\"{1}\") while {2}".format( + self._result_code, self._result_str, self._action) + + +class RankEvicter(threading.Thread): + """ + Thread for evicting client(s) from a particular MDS daemon instance. + + This is more complex than simply sending a command, because we have to + handle cases where MDS daemons might not be fully up yet, and/or might + be transiently unresponsive to commands. + """ + class GidGone(Exception): + pass + + POLL_PERIOD = 5 + + def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): + """ + :param client_spec: list of strings, used as filter arguments to "session evict" + pass ["id=123"] to evict a single client with session id 123. + """ + self.rank = rank + self.gid = gid + self._mds_map = mds_map + self._client_spec = client_spec + self._volume_client = volume_client + self._ready_timeout = ready_timeout + self._ready_waited = 0 + + self.success = False + self.exception = None + + super(RankEvicter, self).__init__() + + def _ready_to_evict(self): + if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: + log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( + self._client_spec, self.rank, self.gid + )) + raise RankEvicter.GidGone() + + info = self._mds_map['info']["gid_{0}".format(self.gid)] + log.debug("_ready_to_evict: state={0}".format(info['state'])) + return info['state'] in ["up:active", "up:clientreplay"] + + def _wait_for_ready(self): + """ + Wait for that MDS rank to reach an active or clientreplay state, and + not be laggy. + """ + while not self._ready_to_evict(): + if self._ready_waited > self._ready_timeout: + raise ClusterTimeout() + + time.sleep(self.POLL_PERIOD) + self._ready_waited += self.POLL_PERIOD + + self._mds_map = self._volume_client.get_mds_map() + + def _evict(self): + """ + Run the eviction procedure. Return true on success, false on errors. + """ + + # Wait til the MDS is believed by the mon to be available for commands + try: + self._wait_for_ready() + except self.GidGone: + return True + + # Then send it an evict + ret = errno.ETIMEDOUT + while ret == errno.ETIMEDOUT: + log.debug("mds_command: {0}, {1}".format( + "%s" % self.gid, ["session", "evict"] + self._client_spec + )) + ret, outb, outs = self._volume_client.fs.mds_command( + "%s" % self.gid, + json.dumps({ + "prefix": "session evict", + "filters": self._client_spec + }), "") + log.debug("mds_command: complete {0} {1}".format(ret, outs)) + + # If we get a clean response, great, it's gone from that rank. + if ret == 0: + return True + elif ret == errno.ETIMEDOUT: + # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) + self._mds_map = self._volume_client.get_mds_map() + try: + self._wait_for_ready() + except self.GidGone: + return True + else: + raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) + + def run(self): + try: + self._evict() + except Exception as e: + self.success = False + self.exception = e + else: + self.success = True + + +class EvictionError(Exception): + pass + + +class CephFSVolumeClientError(Exception): + """ + Something went wrong talking to Ceph using CephFSVolumeClient. + """ + pass + + +CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ + + CephFSVolumeClient Version History: + + * 1 - Initial version + * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient + * 3 - Allow volumes to be created without RADOS namespace isolation + * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient + * 5 - Disallow authorize API for users not created by CephFSVolumeClient + * 6 - The 'volumes' key in auth-metadata-file is changed to 'subvolumes'. +""" + + +class CephFSVolumeClient(object): + """ + Combine libcephfs and librados interfaces to implement a + 'Volume' concept implemented as a cephfs directory and + client capabilities which restrict mount access to this + directory. + + Additionally, volumes may be in a 'Group'. Conveniently, + volumes are a lot like manila shares, and groups are a lot + like manila consistency groups. + + Refer to volumes with VolumePath, which specifies the + volume and group IDs (both strings). The group ID may + be None. + + In general, functions in this class are allowed raise rados.Error + or cephfs.Error exceptions in unexpected situations. + """ + + # Current version + version = 6 + + # Where shall we create our volumes? + POOL_PREFIX = "fsvolume_" + DEFAULT_VOL_PREFIX = "/volumes" + DEFAULT_NS_PREFIX = "fsvolumens_" + + def __init__(self, auth_id=None, conf_path=None, cluster_name=None, + volume_prefix=None, pool_ns_prefix=None, rados=None, + fs_name=None): + """ + Either set all three of ``auth_id``, ``conf_path`` and + ``cluster_name`` (rados constructed on connect), or + set ``rados`` (existing rados instance). + """ + self.fs = None + self.fs_name = fs_name + self.connected = False + + self.conf_path = conf_path + self.cluster_name = cluster_name + self.auth_id = auth_id + + self.rados = rados + if self.rados: + # Using an externally owned rados, so we won't tear it down + # on disconnect + self.own_rados = False + else: + # self.rados will be constructed in connect + self.own_rados = True + + self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX + self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX + # For flock'ing in cephfs, I want a unique ID to distinguish me + # from any other manila-share services that are loading this module. + # We could use pid, but that's unnecessary weak: generate a + # UUID + self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] + + # TODO: version the on-disk structures + + def recover(self): + # Scan all auth keys to see if they're dirty: if they are, they have + # state that might not have propagated to Ceph or to the related + # volumes yet. + + # Important: we *always* acquire locks in the order auth->volume + # That means a volume can never be dirty without the auth key + # we're updating it with being dirty at the same time. + + # First list the auth IDs that have potentially dirty on-disk metadata + log.debug("Recovering from partial auth updates (if any)...") + + try: + dir_handle = self.fs.opendir(self.volume_prefix) + except cephfs.ObjectNotFound: + log.debug("Nothing to recover. No auth meta files.") + return + + d = self.fs.readdir(dir_handle) + auth_ids = [] + + if not d: + log.debug("Nothing to recover. No auth meta files.") + + while d: + # Identify auth IDs from auth meta filenames. The auth meta files + # are named as, "$<auth_id><meta filename extension>" + regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) + match = re.search(regex, d.d_name.decode(encoding='utf-8')) + if match: + auth_ids.append(match.group(1)) + + d = self.fs.readdir(dir_handle) + + self.fs.closedir(dir_handle) + + # Key points based on ordering: + # * Anything added in VMeta is already added in AMeta + # * Anything added in Ceph is already added in VMeta + # * Anything removed in VMeta is already removed in Ceph + # * Anything removed in AMeta is already removed in VMeta + + # Deauthorization: because I only update metadata AFTER the + # update of the next level down, I have the same ordering of + # -> things which exist in the AMeta should also exist + # in the VMeta, should also exist in Ceph, and the same + # recovery procedure that gets me consistent after crashes + # during authorization will also work during deauthorization + + # Now for each auth ID, check for dirty flag and apply updates + # if dirty flag is found + for auth_id in auth_ids: + with self._auth_lock(auth_id): + auth_meta = self._auth_metadata_get(auth_id) + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if auth_meta and 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + if not auth_meta or not auth_meta['subvolumes']: + # Clean up auth meta file + self.fs.unlink(self._auth_metadata_path(auth_id)) + continue + if not auth_meta['dirty']: + continue + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Recovered from partial auth updates (if any).") + + def _recover_auth_meta(self, auth_id, auth_meta): + """ + Call me after locking the auth meta file. + """ + remove_volumes = [] + + for volume, volume_data in auth_meta['subvolumes'].items(): + if not volume_data['dirty']: + continue + + (group_id, volume_id) = volume.split('/') + group_id = group_id if group_id != 'None' else None + volume_path = VolumePath(group_id, volume_id) + access_level = volume_data['access_level'] + + with self._volume_lock(volume_path): + vol_meta = self._volume_metadata_get(volume_path) + + # No VMeta update indicates that there was no auth update + # in Ceph either. So it's safe to remove corresponding + # partial update in AMeta. + if not vol_meta or auth_id not in vol_meta['auths']: + remove_volumes.append(volume) + continue + + want_auth = { + 'access_level': access_level, + 'dirty': False, + } + # VMeta update looks clean. Ceph auth update must have been + # clean. + if vol_meta['auths'][auth_id] == want_auth: + auth_meta['subvolumes'][volume]['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + continue + + readonly = access_level == 'r' + client_entity = "client.{0}".format(auth_id) + try: + existing_caps = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + # FIXME: rados raising Error instead of ObjectNotFound in auth get failure + except rados.Error: + existing_caps = None + self._authorize_volume(volume_path, auth_id, readonly, existing_caps) + + # Recovered from partial auth updates for the auth ID's access + # to a volume. + auth_meta['subvolumes'][volume]['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + for volume in remove_volumes: + del auth_meta['subvolumes'][volume] + + if not auth_meta['subvolumes']: + # Clean up auth meta file + self.fs.unlink(self._auth_metadata_path(auth_id)) + return + + # Recovered from all partial auth updates for the auth ID. + auth_meta['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + def get_mds_map(self): + fs_map = self._rados_command("fs dump", {}) + return fs_map['filesystems'][0]['mdsmap'] + + def evict(self, auth_id, timeout=30, volume_path=None): + """ + Evict all clients based on the authorization ID and optionally based on + the volume path mounted. Assumes that the authorization key has been + revoked prior to calling this function. + + This operation can throw an exception if the mon cluster is unresponsive, or + any individual MDS daemon is unresponsive for longer than the timeout passed in. + """ + + client_spec = ["auth_name={0}".format(auth_id), ] + if volume_path: + client_spec.append("client_metadata.root={0}". + format(self._get_path(volume_path))) + + log.info("evict clients with {0}".format(', '.join(client_spec))) + + mds_map = self.get_mds_map() + up = {} + for name, gid in mds_map['up'].items(): + # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" + assert name.startswith("mds_") + up[int(name[4:])] = gid + + # For all MDS ranks held by a daemon + # Do the parallelism in python instead of using "tell mds.*", because + # the latter doesn't give us per-mds output + threads = [] + for rank, gid in up.items(): + thread = RankEvicter(self, client_spec, rank, gid, mds_map, + timeout) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + log.info("evict: joined all") + + for t in threads: + if not t.success: + msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". + format(', '.join(client_spec), t.rank, t.gid, t.exception) + ) + log.error(msg) + raise EvictionError(msg) + + def _get_path(self, volume_path): + """ + Determine the path within CephFS where this volume will live + :return: absolute path (string) + """ + return os.path.join( + self.volume_prefix, + volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, + volume_path.volume_id) + + def _get_group_path(self, group_id): + if group_id is None: + raise ValueError("group_id may not be None") + + return os.path.join( + self.volume_prefix, + group_id + ) + + def _connect(self, premount_evict): + log.debug("Connecting to cephfs...") + self.fs = cephfs.LibCephFS(rados_inst=self.rados) + log.debug("CephFS initializing...") + self.fs.init() + if premount_evict is not None: + log.debug("Premount eviction of {0} starting".format(premount_evict)) + self.evict(premount_evict) + log.debug("Premount eviction of {0} completes".format(premount_evict)) + log.debug("CephFS mounting...") + self.fs.mount(filesystem_name=to_bytes(self.fs_name)) + log.debug("Connection to cephfs complete") + + # Recover from partial auth updates due to a previous + # crash. + self.recover() + + def connect(self, premount_evict = None): + """ + + :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers + may want to use this to specify their own auth ID if they expect + to be a unique instance and don't want to wait for caps to time + out after failure of another instance of themselves. + """ + if self.own_rados: + log.debug("Configuring to RADOS with config {0}...".format(self.conf_path)) + self.rados = rados.Rados( + name="client.{0}".format(self.auth_id), + clustername=self.cluster_name, + conffile=self.conf_path, + conf={} + ) + if self.rados.state != "connected": + log.debug("Connecting to RADOS...") + self.rados.connect() + log.debug("Connection to RADOS complete") + self._connect(premount_evict) + + def get_mon_addrs(self): + log.info("get_mon_addrs") + result = [] + mon_map = self._rados_command("mon dump") + for mon in mon_map['mons']: + ip_port = mon['addr'].split("/")[0] + result.append(ip_port) + + return result + + def disconnect(self): + log.info("disconnect") + if self.fs: + log.debug("Disconnecting cephfs...") + self.fs.shutdown() + self.fs = None + log.debug("Disconnecting cephfs complete") + + if self.rados and self.own_rados: + log.debug("Disconnecting rados...") + self.rados.shutdown() + self.rados = None + log.debug("Disconnecting rados complete") + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.disconnect() + + def __del__(self): + self.disconnect() + + def _get_pool_id(self, osd_map, pool_name): + # Maybe borrow the OSDMap wrapper class from calamari if more helpers + # like this are needed. + for pool in osd_map['pools']: + if pool['pool_name'] == pool_name: + return pool['pool'] + + return None + + def _create_volume_pool(self, pool_name): + """ + Idempotently create a pool for use as a CephFS data pool, with the given name + + :return The ID of the created pool + """ + osd_map = self._rados_command('osd dump', {}) + + existing_id = self._get_pool_id(osd_map, pool_name) + if existing_id is not None: + log.info("Pool {0} already exists".format(pool_name)) + return existing_id + + osd_count = len(osd_map['osds']) + + # We can't query the actual cluster config remotely, but since this is + # just a heuristic we'll assume that the ceph.conf we have locally reflects + # that in use in the rest of the cluster. + pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd')) + + other_pgs = 0 + for pool in osd_map['pools']: + if not pool['pool_name'].startswith(self.POOL_PREFIX): + other_pgs += pool['pg_num'] + + # A basic heuristic for picking pg_num: work out the max number of + # PGs we can have without tripping a warning, then subtract the number + # of PGs already created by non-manila pools, then divide by ten. That'll + # give you a reasonable result on a system where you have "a few" manila + # shares. + pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) // 10 + # TODO Alternatively, respect an override set by the user. + + self._rados_command( + 'osd pool create', + { + 'pool': pool_name, + 'pg_num': int(pg_num), + } + ) + + osd_map = self._rados_command('osd dump', {}) + pool_id = self._get_pool_id(osd_map, pool_name) + + if pool_id is None: + # If the pool isn't there, that's either a ceph bug, or it's some outside influence + # removing it right after we created it. + log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( + pool_name, json.dumps(osd_map, indent=2) + )) + raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) + else: + return pool_id + + def create_group(self, group_id, mode=0o755): + # Prevent craftily-named volume groups from colliding with the meta + # files. + if group_id.endswith(META_FILE_EXT): + raise ValueError("group ID cannot end with '{0}'.".format( + META_FILE_EXT)) + path = self._get_group_path(group_id) + self._mkdir_p(path, mode) + + def destroy_group(self, group_id): + path = self._get_group_path(group_id) + try: + self.fs.stat(self.volume_prefix) + except cephfs.ObjectNotFound: + pass + else: + self.fs.rmdir(path) + + def _mkdir_p(self, path, mode=0o755): + try: + self.fs.stat(path) + except cephfs.ObjectNotFound: + pass + else: + return + + parts = path.split(os.path.sep) + + for i in range(1, len(parts) + 1): + subpath = os.path.join(*parts[0:i]) + try: + self.fs.stat(subpath) + except cephfs.ObjectNotFound: + self.fs.mkdir(subpath, mode) + + def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True, + mode=0o755): + """ + Set up metadata, pools and auth for a volume. + + This function is idempotent. It is safe to call this again + for an already-created volume, even if it is in use. + + :param volume_path: VolumePath instance + :param size: In bytes, or None for no size limit + :param data_isolated: If true, create a separate OSD pool for this volume + :param namespace_isolated: If true, use separate RADOS namespace for this volume + :return: + """ + path = self._get_path(volume_path) + log.info("create_volume: {0}".format(path)) + + self._mkdir_p(path, mode) + + if size is not None: + self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0) + + # data_isolated means create a separate pool for this volume + if data_isolated: + pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) + log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) + pool_id = self._create_volume_pool(pool_name) + mds_map = self.get_mds_map() + if pool_id not in mds_map['data_pools']: + self._rados_command("fs add_data_pool", { + 'fs_name': mds_map['fs_name'], + 'pool': pool_name + }) + time.sleep(5) # time for MDSMap to be distributed + self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0) + + # enforce security isolation, use separate namespace for this volume + if namespace_isolated: + namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) + log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) + self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', + to_bytes(namespace), 0) + else: + # If volume's namespace layout is not set, then the volume's pool + # layout remains unset and will undesirably change with ancestor's + # pool layout changes. + pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") + self.fs.setxattr(path, 'ceph.dir.layout.pool', + to_bytes(pool_name), 0) + + # Create a volume meta file, if it does not already exist, to store + # data about auth ids having access to the volume + fd = self.fs.open(self._volume_metadata_path(volume_path), + os.O_CREAT, 0o755) + self.fs.close(fd) + + return { + 'mount_path': path + } + + def delete_volume(self, volume_path, data_isolated=False): + """ + Make a volume inaccessible to guests. This function is + idempotent. This is the fast part of tearing down a volume: you must + also later call purge_volume, which is the slow part. + + :param volume_path: Same identifier used in create_volume + :return: + """ + + path = self._get_path(volume_path) + log.info("delete_volume: {0}".format(path)) + + # Create the trash folder if it doesn't already exist + trash = os.path.join(self.volume_prefix, "_deleting") + self._mkdir_p(trash) + + # We'll move it to here + trashed_volume = os.path.join(trash, volume_path.volume_id) + + # Move the volume's data to the trash folder + try: + self.fs.stat(path) + except cephfs.ObjectNotFound: + log.warning("Trying to delete volume '{0}' but it's already gone".format( + path)) + else: + self.fs.rename(path, trashed_volume) + + # Delete the volume meta file, if it's not already deleted + vol_meta_path = self._volume_metadata_path(volume_path) + try: + self.fs.unlink(vol_meta_path) + except cephfs.ObjectNotFound: + pass + + def purge_volume(self, volume_path, data_isolated=False): + """ + Finish clearing up a volume that was previously passed to delete_volume. This + function is idempotent. + """ + + trash = os.path.join(self.volume_prefix, "_deleting") + trashed_volume = os.path.join(trash, volume_path.volume_id) + + try: + self.fs.stat(trashed_volume) + except cephfs.ObjectNotFound: + log.warning("Trying to purge volume '{0}' but it's already been purged".format( + trashed_volume)) + return + + def rmtree(root_path): + log.debug("rmtree {0}".format(root_path)) + dir_handle = self.fs.opendir(root_path) + d = self.fs.readdir(dir_handle) + while d: + d_name = d.d_name.decode(encoding='utf-8') + if d_name not in [".", ".."]: + # Do not use os.path.join because it is sensitive + # to string encoding, we just pass through dnames + # as byte arrays + d_full = u"{0}/{1}".format(root_path, d_name) + if d.is_dir(): + rmtree(d_full) + else: + self.fs.unlink(d_full) + + d = self.fs.readdir(dir_handle) + self.fs.closedir(dir_handle) + + self.fs.rmdir(root_path) + + rmtree(trashed_volume) + + if data_isolated: + pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) + osd_map = self._rados_command("osd dump", {}) + pool_id = self._get_pool_id(osd_map, pool_name) + mds_map = self.get_mds_map() + if pool_id in mds_map['data_pools']: + self._rados_command("fs rm_data_pool", { + 'fs_name': mds_map['fs_name'], + 'pool': pool_name + }) + self._rados_command("osd pool delete", + { + "pool": pool_name, + "pool2": pool_name, + "yes_i_really_really_mean_it": True + }) + + def _get_ancestor_xattr(self, path, attr): + """ + Helper for reading layout information: if this xattr is missing + on the requested path, keep checking parents until we find it. + """ + try: + result = self.fs.getxattr(path, attr).decode() + if result == "": + # Annoying! cephfs gives us empty instead of an error when attr not found + raise cephfs.NoData() + else: + return result + except cephfs.NoData: + if path == "/": + raise + else: + return self._get_ancestor_xattr(os.path.split(path)[0], attr) + + def _check_compat_version(self, compat_version): + if self.version < compat_version: + msg = ("The current version of CephFSVolumeClient, version {0} " + "does not support the required feature. Need version {1} " + "or greater".format(self.version, compat_version) + ) + log.error(msg) + raise CephFSVolumeClientError(msg) + + def _metadata_get(self, path): + """ + Return a deserialized JSON object, or None + """ + fd = self.fs.open(path, "r") + # TODO iterate instead of assuming file < 4MB + read_bytes = self.fs.read(fd, 0, 4096 * 1024) + self.fs.close(fd) + if read_bytes: + return json.loads(read_bytes.decode()) + else: + return None + + def _metadata_set(self, path, data): + serialized = json.dumps(data) + fd = self.fs.open(path, "w") + try: + self.fs.write(fd, to_bytes(serialized), 0) + self.fs.fsync(fd, 0) + finally: + self.fs.close(fd) + + def _lock(self, path): + @contextmanager + def fn(): + while(1): + fd = self.fs.open(path, os.O_CREAT, 0o755) + self.fs.flock(fd, fcntl.LOCK_EX, self._id) + + # The locked file will be cleaned up sometime. It could be + # unlinked e.g., by an another manila share instance, before + # lock was applied on it. Perform checks to ensure that this + # does not happen. + try: + statbuf = self.fs.stat(path) + except cephfs.ObjectNotFound: + self.fs.close(fd) + continue + + fstatbuf = self.fs.fstat(fd) + if statbuf.st_ino == fstatbuf.st_ino: + break + + try: + yield + finally: + self.fs.flock(fd, fcntl.LOCK_UN, self._id) + self.fs.close(fd) + + return fn() + + def _auth_metadata_path(self, auth_id): + return os.path.join(self.volume_prefix, "${0}{1}".format( + auth_id, META_FILE_EXT)) + + def _auth_lock(self, auth_id): + return self._lock(self._auth_metadata_path(auth_id)) + + def _auth_metadata_get(self, auth_id): + """ + Call me with the metadata locked! + + Check whether a auth metadata structure can be decoded by the current + version of CephFSVolumeClient. + + Return auth metadata that the current version of CephFSVolumeClient + can decode. + """ + auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) + + if auth_metadata: + self._check_compat_version(auth_metadata['compat_version']) + + return auth_metadata + + def _auth_metadata_set(self, auth_id, data): + """ + Call me with the metadata locked! + + Fsync the auth metadata. + + Add two version attributes to the auth metadata, + 'compat_version', the minimum CephFSVolumeClient version that can + decode the metadata, and 'version', the CephFSVolumeClient version + that encoded the metadata. + """ + data['compat_version'] = 6 + data['version'] = self.version + return self._metadata_set(self._auth_metadata_path(auth_id), data) + + def _volume_metadata_path(self, volume_path): + return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( + volume_path.group_id if volume_path.group_id else "", + volume_path.volume_id, + META_FILE_EXT + )) + + def _volume_lock(self, volume_path): + """ + Return a ContextManager which locks the authorization metadata for + a particular volume, and persists a flag to the metadata indicating + that it is currently locked, so that we can detect dirty situations + during recovery. + + This lock isn't just to make access to the metadata safe: it's also + designed to be used over the two-step process of checking the + metadata and then responding to an authorization request, to + ensure that at the point we respond the metadata hasn't changed + in the background. It's key to how we avoid security holes + resulting from races during that problem , + """ + return self._lock(self._volume_metadata_path(volume_path)) + + def _volume_metadata_get(self, volume_path): + """ + Call me with the metadata locked! + + Check whether a volume metadata structure can be decoded by the current + version of CephFSVolumeClient. + + Return a volume_metadata structure that the current version of + CephFSVolumeClient can decode. + """ + volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) + + if volume_metadata: + self._check_compat_version(volume_metadata['compat_version']) + + return volume_metadata + + def _volume_metadata_set(self, volume_path, data): + """ + Call me with the metadata locked! + + Add two version attributes to the volume metadata, + 'compat_version', the minimum CephFSVolumeClient version that can + decode the metadata and 'version', the CephFSVolumeClient version + that encoded the metadata. + """ + data['compat_version'] = 1 + data['version'] = self.version + return self._metadata_set(self._volume_metadata_path(volume_path), data) + + def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True): + caps_list = [] + for k, v in existing_caps['caps'].items(): + if k == 'mds' or k == 'osd': + continue + elif k == 'mon': + if not authorize and v == 'allow r': + continue + caps_list.extend((k,v)) + + if mds_cap_str: + caps_list.extend(('mds', mds_cap_str)) + if osd_cap_str: + caps_list.extend(('osd', osd_cap_str)) + + if authorize and 'mon' not in caps_list: + caps_list.extend(('mon', 'allow r')) + + return caps_list + + def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False): + """ + Get-or-create a Ceph auth identity for `auth_id` and grant them access + to + :param volume_path: + :param auth_id: + :param readonly: + :param tenant_id: Optionally provide a stringizable object to + restrict any created cephx IDs to other callers + passing the same tenant ID. + :allow_existing_id: Optionally authorize existing auth-ids not + created by ceph_volume_client + :return: + """ + + with self._auth_lock(auth_id): + client_entity = "client.{0}".format(auth_id) + try: + existing_caps = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + # FIXME: rados raising Error instead of ObjectNotFound in auth get failure + except rados.Error: + existing_caps = None + + # Existing meta, or None, to be updated + auth_meta = self._auth_metadata_get(auth_id) + + # subvolume data to be inserted + volume_path_str = str(volume_path) + subvolume = { + volume_path_str : { + # The access level at which the auth_id is authorized to + # access the volume. + 'access_level': 'r' if readonly else 'rw', + 'dirty': True, + } + } + + if auth_meta is None: + if not allow_existing_id and existing_caps is not None: + msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id) + log.error(msg) + raise CephFSVolumeClientError(msg) + + # non-existent auth IDs + sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( + auth_id, tenant_id + )) + log.debug("Authorize: no existing meta") + auth_meta = { + 'dirty': True, + 'tenant_id': tenant_id.__str__() if tenant_id else None, + 'subvolumes': subvolume + } + else: + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + # Disallow tenants to share auth IDs + if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): + msg = "auth ID: {0} is already in use".format(auth_id) + log.error(msg) + raise CephFSVolumeClientError(msg) + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + log.debug("Authorize: existing tenant {tenant}".format( + tenant=auth_meta['tenant_id'] + )) + auth_meta['dirty'] = True + auth_meta['subvolumes'].update(subvolume) + + self._auth_metadata_set(auth_id, auth_meta) + + with self._volume_lock(volume_path): + key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps) + + auth_meta['dirty'] = False + auth_meta['subvolumes'][volume_path_str]['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + if tenant_id: + return { + 'auth_key': key + } + else: + # Caller wasn't multi-tenant aware: be safe and don't give + # them a key + return { + 'auth_key': None + } + + def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps): + vol_meta = self._volume_metadata_get(volume_path) + + access_level = 'r' if readonly else 'rw' + auth = { + auth_id: { + 'access_level': access_level, + 'dirty': True, + } + } + + if vol_meta is None: + vol_meta = { + 'auths': auth + } + else: + vol_meta['auths'].update(auth) + self._volume_metadata_set(volume_path, vol_meta) + + key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps) + + vol_meta['auths'][auth_id]['dirty'] = False + self._volume_metadata_set(volume_path, vol_meta) + + return key + + def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps): + path = self._get_path(volume_path) + log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( + auth_id, path + )) + + # First I need to work out what the data pool is for this share: + # read the layout + pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") + + try: + namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" + "namespace").decode() + except cephfs.NoData: + namespace = None + + # Now construct auth capabilities that give the guest just enough + # permissions to access the share + client_entity = "client.{0}".format(auth_id) + want_access_level = 'r' if readonly else 'rw' + want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) + if namespace: + want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( + want_access_level, pool_name, namespace) + else: + want_osd_cap = 'allow {0} pool={1}'.format(want_access_level, + pool_name) + + if existing_caps is None: + caps = self._rados_command( + 'auth get-or-create', + { + 'entity': client_entity, + 'caps': [ + 'mds', want_mds_cap, + 'osd', want_osd_cap, + 'mon', 'allow r'] + }) + else: + # entity exists, update it + cap = existing_caps[0] + + # Construct auth caps that if present might conflict with the desired + # auth caps. + unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw' + unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) + if namespace: + unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( + unwanted_access_level, pool_name, namespace) + else: + unwanted_osd_cap = 'allow {0} pool={1}'.format( + unwanted_access_level, pool_name) + + def cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, + want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): + + if not orig_mds_caps: + return want_mds_cap, want_osd_cap + + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + if want_mds_cap in mds_cap_tokens: + return orig_mds_caps, orig_osd_caps + + if unwanted_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(unwanted_mds_cap) + osd_cap_tokens.remove(unwanted_osd_cap) + + mds_cap_tokens.append(want_mds_cap) + osd_cap_tokens.append(want_osd_cap) + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + + mds_cap_str, osd_cap_str = cap_update( + orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, + unwanted_mds_cap, unwanted_osd_cap) + + caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str) + caps = self._rados_command( + 'auth caps', + { + 'entity': client_entity, + 'caps': caps_list + }) + + caps = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + + # Result expected like this: + # [ + # { + # "entity": "client.foobar", + # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", + # "caps": { + # "mds": "allow *", + # "mon": "allow *" + # } + # } + # ] + assert len(caps) == 1 + assert caps[0]['entity'] == client_entity + return caps[0]['key'] + + def deauthorize(self, volume_path, auth_id): + with self._auth_lock(auth_id): + # Existing meta, or None, to be updated + auth_meta = self._auth_metadata_get(auth_id) + + # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key + if auth_meta and 'volumes' in auth_meta: + auth_meta['subvolumes'] = auth_meta.pop('volumes') + + volume_path_str = str(volume_path) + if (auth_meta is None) or (not auth_meta['subvolumes']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for volume ID '{volume}'".format( + auth_id=auth_id, volume=volume_path.volume_id + )) + # Clean up the auth meta file of an auth ID + self.fs.unlink(self._auth_metadata_path(auth_id)) + return + + if volume_path_str not in auth_meta['subvolumes']: + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for volume ID '{volume}'".format( + auth_id=auth_id, volume=volume_path.volume_id + )) + return + + if auth_meta['dirty']: + self._recover_auth_meta(auth_id, auth_meta) + + auth_meta['dirty'] = True + auth_meta['subvolumes'][volume_path_str]['dirty'] = True + self._auth_metadata_set(auth_id, auth_meta) + + self._deauthorize_volume(volume_path, auth_id) + + # Filter out the volume we're deauthorizing + del auth_meta['subvolumes'][volume_path_str] + + # Clean up auth meta file + if not auth_meta['subvolumes']: + self.fs.unlink(self._auth_metadata_path(auth_id)) + return + + auth_meta['dirty'] = False + self._auth_metadata_set(auth_id, auth_meta) + + def _deauthorize_volume(self, volume_path, auth_id): + with self._volume_lock(volume_path): + vol_meta = self._volume_metadata_get(volume_path) + + if (vol_meta is None) or (auth_id not in vol_meta['auths']): + log.warning("deauthorized called for already-removed auth" + "ID '{auth_id}' for volume ID '{volume}'".format( + auth_id=auth_id, volume=volume_path.volume_id + )) + return + + vol_meta['auths'][auth_id]['dirty'] = True + self._volume_metadata_set(volume_path, vol_meta) + + self._deauthorize(volume_path, auth_id) + + # Remove the auth_id from the metadata *after* removing it + # from ceph, so that if we crashed here, we would actually + # recreate the auth ID during recovery (i.e. end up with + # a consistent state). + + # Filter out the auth we're removing + del vol_meta['auths'][auth_id] + self._volume_metadata_set(volume_path, vol_meta) + + def _deauthorize(self, volume_path, auth_id): + """ + The volume must still exist. + """ + client_entity = "client.{0}".format(auth_id) + path = self._get_path(volume_path) + pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") + try: + namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" + "namespace").decode() + except cephfs.NoData: + namespace = None + + # The auth_id might have read-only or read-write mount access for the + # volume path. + access_levels = ('r', 'rw') + want_mds_caps = ['allow {0} path={1}'.format(access_level, path) + for access_level in access_levels] + if namespace: + want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace) + for access_level in access_levels] + else: + want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name) + for access_level in access_levels] + + + try: + existing = self._rados_command( + 'auth get', + { + 'entity': client_entity + } + ) + + def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): + mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] + osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] + + for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): + if want_mds_cap in mds_cap_tokens: + mds_cap_tokens.remove(want_mds_cap) + osd_cap_tokens.remove(want_osd_cap) + break + + return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) + + cap = existing[0] + orig_mds_caps = cap['caps'].get('mds', "") + orig_osd_caps = cap['caps'].get('osd', "") + mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, + want_mds_caps, want_osd_caps) + + caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False) + if not caps_list: + self._rados_command('auth del', {'entity': client_entity}, decode=False) + else: + self._rados_command( + 'auth caps', + { + 'entity': client_entity, + 'caps': caps_list + }) + + # FIXME: rados raising Error instead of ObjectNotFound in auth get failure + except rados.Error: + # Already gone, great. + return + + def get_authorized_ids(self, volume_path): + """ + Expose a list of auth IDs that have access to a volume. + + return: a list of (auth_id, access_level) tuples, where + the access_level can be 'r' , or 'rw'. + None if no auth ID is given access to the volume. + """ + with self._volume_lock(volume_path): + meta = self._volume_metadata_get(volume_path) + auths = [] + if not meta or not meta['auths']: + return None + + for auth, auth_data in meta['auths'].items(): + # Skip partial auth updates. + if not auth_data['dirty']: + auths.append((auth, auth_data['access_level'])) + + return auths + + def _rados_command(self, prefix, args=None, decode=True): + """ + Safer wrapper for ceph_argparse.json_command, which raises + Error exception instead of relying on caller to check return + codes. + + Error exception can result from: + * Timeout + * Actual legitimate errors + * Malformed JSON output + + return: Decoded object from ceph, or None if empty string returned. + If decode is False, return a string (the data returned by + ceph command) + """ + if args is None: + args = {} + + argdict = args.copy() + argdict['format'] = 'json' + + ret, outbuf, outs = json_command(self.rados, + prefix=prefix, + argdict=argdict, + timeout=RADOS_TIMEOUT) + if ret != 0: + raise rados.Error(outs) + else: + if decode: + if outbuf: + try: + return json.loads(outbuf.decode()) + except (ValueError, TypeError): + raise RadosError("Invalid JSON output for command {0}".format(argdict)) + else: + return None + else: + return outbuf + + def get_used_bytes(self, volume_path): + return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir." + "rbytes").decode()) + + def set_max_bytes(self, volume_path, max_bytes): + self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', + to_bytes(max_bytes if max_bytes else 0), 0) + + def _snapshot_path(self, dir_path, snapshot_name): + return os.path.join( + dir_path, self.rados.conf_get('client_snapdir'), snapshot_name + ) + + def _snapshot_create(self, dir_path, snapshot_name, mode=0o755): + # TODO: raise intelligible exception for clusters where snaps are disabled + self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode) + + def _snapshot_destroy(self, dir_path, snapshot_name): + """ + Remove a snapshot, or do nothing if it already doesn't exist. + """ + try: + self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) + except cephfs.ObjectNotFound: + log.warning("Snapshot was already gone: {0}".format(snapshot_name)) + + def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755): + self._snapshot_create(self._get_path(volume_path), snapshot_name, mode) + + def destroy_snapshot_volume(self, volume_path, snapshot_name): + self._snapshot_destroy(self._get_path(volume_path), snapshot_name) + + def create_snapshot_group(self, group_id, snapshot_name, mode=0o755): + if group_id is None: + raise RuntimeError("Group ID may not be None") + + return self._snapshot_create(self._get_group_path(group_id), snapshot_name, + mode) + + def destroy_snapshot_group(self, group_id, snapshot_name): + if group_id is None: + raise RuntimeError("Group ID may not be None") + if snapshot_name is None: + raise RuntimeError("Snapshot name may not be None") + + return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) + + def _cp_r(self, src, dst): + # TODO + raise NotImplementedError() + + def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): + dest_fs_path = self._get_path(dest_volume_path) + src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) + + self._cp_r(src_snapshot_path, dest_fs_path) + + def put_object(self, pool_name, object_name, data): + """ + Synchronously write data to an object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + """ + return self.put_object_versioned(pool_name, object_name, data) + + def put_object_versioned(self, pool_name, object_name, data, version=None): + """ + Synchronously write data to an object only if version of the object + version matches the expected version. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + :param data: data to write + :type data: bytes + :param version: expected version of the object to write + :type version: int + """ + ioctx = self.rados.open_ioctx(pool_name) + + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 + if len(data) > max_size: + msg = ("Data to be written to object '{0}' exceeds " + "{1} bytes".format(object_name, max_size)) + log.error(msg) + raise CephFSVolumeClientError(msg) + + try: + with rados.WriteOpCtx() as wop: + if version is not None: + wop.assert_version(version) + wop.write_full(data) + ioctx.operate_write_op(wop, object_name) + except rados.OSError as e: + log.error(e) + raise e + finally: + ioctx.close() + + def get_object(self, pool_name, object_name): + """ + Synchronously read data from object. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + + :returns: bytes - data read from object + """ + return self.get_object_and_version(pool_name, object_name)[0] + + def get_object_and_version(self, pool_name, object_name): + """ + Synchronously read data from object and get its version. + + :param pool_name: name of the pool + :type pool_name: str + :param object_name: name of the object + :type object_name: str + + :returns: tuple of object data and version + """ + ioctx = self.rados.open_ioctx(pool_name) + max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 + try: + bytes_read = ioctx.read(object_name, max_size) + if ((len(bytes_read) == max_size) and + (ioctx.read(object_name, 1, offset=max_size))): + log.warning("Size of object {0} exceeds '{1}' bytes " + "read".format(object_name, max_size)) + obj_version = ioctx.get_last_version() + finally: + ioctx.close() + return (bytes_read, obj_version) + + def delete_object(self, pool_name, object_name): + ioctx = self.rados.open_ioctx(pool_name) + try: + ioctx.remove_object(object_name) + except rados.ObjectNotFound: + log.warning("Object '{0}' was already removed".format(object_name)) + finally: + ioctx.close() |