diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/rook/rook_cluster.py | 442 |
1 files changed, 442 insertions, 0 deletions
diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py new file mode 100644 index 00000000..ef404075 --- /dev/null +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -0,0 +1,442 @@ +""" +This module wrap's Rook + Kubernetes APIs to expose the calls +needed to implement an orchestrator module. While the orchestrator +module exposes an async API, this module simply exposes blocking API +call methods. + +This module is runnable outside of ceph-mgr, useful for testing. +""" +import logging +import json +from contextlib import contextmanager + +from six.moves.urllib.parse import urljoin # pylint: disable=import-error + +# Optional kubernetes imports to enable MgrModule.can_run +# to behave cleanly. +try: + from kubernetes.client.rest import ApiException +except ImportError: + ApiException = None + +try: + import orchestrator + from rook.module import RookEnv + from typing import List +except ImportError: + pass # just used for type checking. + + +log = logging.getLogger(__name__) + + +class ApplyException(Exception): + """ + For failures to update the Rook CRDs, usually indicating + some kind of interference between our attempted update + and other conflicting activity. + """ + + +class RookCluster(object): + def __init__(self, k8s, rook_env): + self.rook_env = rook_env # type: RookEnv + self.k8s = k8s + + def rook_url(self, path): + prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( + self.rook_env.crd_version, self.rook_env.namespace) + return urljoin(prefix, path) + + def rook_api_call(self, verb, path, **kwargs): + full_path = self.rook_url(path) + log.debug("[%s] %s" % (verb, full_path)) + + return self.k8s.api_client.call_api( + full_path, + verb, + auth_settings=['BearerToken'], + response_type="object", + _return_http_data_only=True, + _preload_content=True, + **kwargs) + + def rook_api_get(self, path, **kwargs): + return self.rook_api_call("GET", path, **kwargs) + + def rook_api_delete(self, path): + return self.rook_api_call("DELETE", path) + + def rook_api_patch(self, path, **kwargs): + return self.rook_api_call("PATCH", path, + header_params={"Content-Type": "application/json-patch+json"}, + **kwargs) + + def rook_api_post(self, path, **kwargs): + return self.rook_api_call("POST", path, **kwargs) + + def get_discovered_devices(self, nodenames=None): + # TODO: replace direct k8s calls with Rook API calls + # when they're implemented + label_selector = "app=rook-discover" + if nodenames is not None: + # FIXME: is there a practical or official limit on the + # number of entries in a label selector + label_selector += ", rook.io/node in ({0})".format( + ", ".join(nodenames)) + + try: + result = self.k8s.list_namespaced_config_map( + self.rook_env.operator_namespace, + label_selector=label_selector) + except ApiException as e: + log.exception("Failed to fetch device metadata: {0}".format(e)) + raise + + nodename_to_devices = {} + for i in result.items: + drives = json.loads(i.data['devices']) + nodename_to_devices[i.metadata.labels['rook.io/node']] = drives + + return nodename_to_devices + + def get_nfs_conf_url(self, nfs_cluster, instance): + # + # Fetch cephnfs object for "nfs_cluster" and then return a rados:// + # URL for the instance within that cluster. If the fetch fails, just + # return None. + # + try: + ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster)) + except ApiException as e: + log.info("Unable to fetch cephnfs object: {}".format(e.status)) + return None + + pool = ceph_nfs['spec']['rados']['pool'] + namespace = ceph_nfs['spec']['rados'].get('namespace', None) + + if namespace == None: + url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance) + else: + url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) + return url + + + def describe_pods(self, service_type, service_id, nodename): + # Go query the k8s API about deployment, containers related to this + # filesystem + + # Inspect the Rook YAML, to decide whether this filesystem + # is Ceph-managed or Rook-managed + # TODO: extend Orchestrator interface to describe whether FS + # is manageable by us or not + + # Example Rook Pod labels for a mgr daemon: + # Labels: app=rook-ceph-mgr + # pod-template-hash=2171958073 + # rook_cluster=rook + # And MDS containers additionally have `rook_filesystem` label + + # Label filter is rook_cluster=<cluster name> + # rook_file_system=<self.fs_name> + + label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name) + if service_type != None: + label_filter += ",app=rook-ceph-{0}".format(service_type) + if service_id != None: + if service_type == "mds": + label_filter += ",rook_file_system={0}".format(service_id) + elif service_type == "osd": + # Label added in https://github.com/rook/rook/pull/1698 + label_filter += ",ceph-osd-id={0}".format(service_id) + elif service_type == "mon": + # label like mon=rook-ceph-mon0 + label_filter += ",mon={0}".format(service_id) + elif service_type == "mgr": + label_filter += ",mgr={0}".format(service_id) + elif service_type == "nfs": + label_filter += ",ceph_nfs={0}".format(service_id) + elif service_type == "rgw": + # TODO: rgw + pass + + field_filter = "" + if nodename != None: + field_filter = "spec.nodeName={0}".format(nodename) + + pods = self.k8s.list_namespaced_pod( + self.rook_env.namespace, + label_selector=label_filter, + field_selector=field_filter) + + # import json + # print json.dumps(pods.items[0]) + + pods_summary = [] + + for p in pods.items: + d = p.to_dict() + # p['metadata']['creationTimestamp'] + # p['metadata']['nodeName'] + pods_summary.append({ + "name": d['metadata']['name'], + "nodename": d['spec']['node_name'], + "labels": d['metadata']['labels'] + }) + pass + + return pods_summary + + @contextmanager + def ignore_409(self, what): + try: + yield + except ApiException as e: + if e.status == 409: + # Idempotent, succeed. + log.info("{} already exists".format(what)) + else: + raise + + def add_filesystem(self, spec): + # TODO use spec.placement + # TODO warn if spec.extended has entries we don't kow how + # to action. + + rook_fs = { + "apiVersion": self.rook_env.api_name, + "kind": "CephFilesystem", + "metadata": { + "name": spec.name, + "namespace": self.rook_env.namespace + }, + "spec": { + "onlyManageDaemons": True, + "metadataServer": { + "activeCount": spec.count, + "activeStandby": True + + } + } + } + + with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)): + self.rook_api_post("cephfilesystems/", body=rook_fs) + + def add_nfsgw(self, spec): + # TODO use spec.placement + # TODO warn if spec.extended has entries we don't kow how + # to action. + + rook_nfsgw = { + "apiVersion": self.rook_env.api_name, + "kind": "CephNFS", + "metadata": { + "name": spec.name, + "namespace": self.rook_env.namespace + }, + "spec": { + "rados": { + "pool": spec.extended["pool"] + }, + "server": { + "active": spec.count, + } + } + } + + if "namespace" in spec.extended: + rook_nfsgw["spec"]["rados"]["namespace"] = spec.extended["namespace"] + + with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)): + self.rook_api_post("cephnfses/", body=rook_nfsgw) + + def add_objectstore(self, spec): + rook_os = { + "apiVersion": self.rook_env.api_name, + "kind": "CephObjectStore", + "metadata": { + "name": spec.name, + "namespace": self.rook_env.namespace + }, + "spec": { + "metadataPool": { + "failureDomain": "host", + "replicated": { + "size": 1 + } + }, + "dataPool": { + "failureDomain": "osd", + "replicated": { + "size": 1 + } + }, + "gateway": { + "type": "s3", + "port": 80, + "instances": 1, + "allNodes": False + } + } + } + + with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)): + self.rook_api_post("cephobjectstores/", body=rook_os) + + def rm_service(self, service_type, service_id): + assert service_type in ("mds", "rgw", "nfs") + + if service_type == "mds": + rooktype = "cephfilesystems" + elif service_type == "rgw": + rooktype = "cephobjectstores" + elif service_type == "nfs": + rooktype = "cephnfses" + + objpath = "{0}/{1}".format(rooktype, service_id) + + try: + self.rook_api_delete(objpath) + except ApiException as e: + if e.status == 404: + log.info("{0} service '{1}' does not exist".format(service_type, service_id)) + # Idempotent, succeed. + else: + raise + + def can_create_osd(self): + current_cluster = self.rook_api_get( + "cephclusters/{0}".format(self.rook_env.cluster_name)) + use_all_nodes = current_cluster['spec'].get('useAllNodes', False) + + # If useAllNodes is set, then Rook will not be paying attention + # to anything we put in 'nodes', so can't do OSD creation. + return not use_all_nodes + + def node_exists(self, node_name): + try: + self.k8s.read_node(node_name, exact=False, export=True) + except ApiException as e: + if e.status == 404: + return False + else: + raise + else: + return True + + def update_mon_count(self, newcount): + patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}] + + try: + self.rook_api_patch( + "cephclusters/{0}".format(self.rook_env.cluster_name), + body=patch) + except ApiException as e: + log.exception("API exception: {0}".format(e)) + raise ApplyException( + "Failed to update mon count in Cluster CRD: {0}".format(e)) + + return "Updated mon count to {0}".format(newcount) + + def update_nfs_count(self, svc_id, newcount): + patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}] + + try: + self.rook_api_patch( + "cephnfses/{0}".format(svc_id), + body=patch) + except ApiException as e: + log.exception("API exception: {0}".format(e)) + raise ApplyException( + "Failed to update NFS server count for {0}: {1}".format(svc_id, e)) + return "Updated NFS server count for {0} to {1}".format(svc_id, newcount) + + def add_osds(self, drive_group, all_hosts): + # type: (orchestrator.DriveGroupSpec, List[str]) -> str + """ + Rook currently (0.8) can only do single-drive OSDs, so we + treat all drive groups as just a list of individual OSDs. + """ + block_devices = drive_group.data_devices.paths if drive_group.data_devices else None + directories = drive_group.data_directories + + assert drive_group.objectstore in ("bluestore", "filestore") + + # The CRD looks something like this: + # nodes: + # - name: "gravel1.rockery" + # devices: + # - name: "sdb" + # config: + # storeType: bluestore + + current_cluster = self.rook_api_get( + "cephclusters/{0}".format(self.rook_env.cluster_name)) + + patch = [] + + # FIXME: this is all not really atomic, because jsonpatch doesn't + # let us do "test" operations that would check if items with + # matching names were in existing lists. + + if 'nodes' not in current_cluster['spec']['storage']: + patch.append({ + 'op': 'add', 'path': '/spec/storage/nodes', 'value': [] + }) + + current_nodes = current_cluster['spec']['storage'].get('nodes', []) + + if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]: + pd = { "name": drive_group.hosts(all_hosts)[0], + "config": { "storeType": drive_group.objectstore }} + + if block_devices: + pd["devices"] = [{'name': d} for d in block_devices] + if directories: + pd["directories"] = [{'path': p} for p in directories] + + patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd }) + else: + # Extend existing node + node_idx = None + current_node = None + for i, c in enumerate(current_nodes): + if c['name'] == drive_group.hosts(all_hosts)[0]: + current_node = c + node_idx = i + break + + assert node_idx is not None + assert current_node is not None + + new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']])) + for n in new_devices: + patch.append({ + "op": "add", + "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx), + "value": {'name': n} + }) + + new_dirs = list(set(directories) - set(current_node['directories'])) + for p in new_dirs: + patch.append({ + "op": "add", + "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx), + "value": {'path': p} + }) + + if len(patch) == 0: + return "No change" + + try: + self.rook_api_patch( + "cephclusters/{0}".format(self.rook_env.cluster_name), + body=patch) + except ApiException as e: + log.exception("API exception: {0}".format(e)) + raise ApplyException( + "Failed to create OSD entries in Cluster CRD: {0}".format( + e)) + + return "Success" |