""" This module wrap's Rook + Kubernetes APIs to expose the calls needed to implement an orchestrator module. While the orchestrator module exposes an async API, this module simply exposes blocking API call methods. This module is runnable outside of ceph-mgr, useful for testing. """ import logging import json from contextlib import contextmanager from six.moves.urllib.parse import urljoin # pylint: disable=import-error # Optional kubernetes imports to enable MgrModule.can_run # to behave cleanly. try: from kubernetes.client.rest import ApiException except ImportError: ApiException = None try: import orchestrator from rook.module import RookEnv from typing import List except ImportError: pass # just used for type checking. log = logging.getLogger(__name__) class ApplyException(Exception): """ For failures to update the Rook CRDs, usually indicating some kind of interference between our attempted update and other conflicting activity. """ class RookCluster(object): def __init__(self, k8s, rook_env): self.rook_env = rook_env # type: RookEnv self.k8s = k8s def rook_url(self, path): prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( self.rook_env.crd_version, self.rook_env.namespace) return urljoin(prefix, path) def rook_api_call(self, verb, path, **kwargs): full_path = self.rook_url(path) log.debug("[%s] %s" % (verb, full_path)) return self.k8s.api_client.call_api( full_path, verb, auth_settings=['BearerToken'], response_type="object", _return_http_data_only=True, _preload_content=True, **kwargs) def rook_api_get(self, path, **kwargs): return self.rook_api_call("GET", path, **kwargs) def rook_api_delete(self, path): return self.rook_api_call("DELETE", path) def rook_api_patch(self, path, **kwargs): return self.rook_api_call("PATCH", path, header_params={"Content-Type": "application/json-patch+json"}, **kwargs) def rook_api_post(self, path, **kwargs): return self.rook_api_call("POST", path, **kwargs) def get_discovered_devices(self, nodenames=None): # TODO: replace direct k8s calls with Rook API calls # when they're implemented label_selector = "app=rook-discover" if nodenames is not None: # FIXME: is there a practical or official limit on the # number of entries in a label selector label_selector += ", rook.io/node in ({0})".format( ", ".join(nodenames)) try: result = self.k8s.list_namespaced_config_map( self.rook_env.operator_namespace, label_selector=label_selector) except ApiException as e: log.exception("Failed to fetch device metadata: {0}".format(e)) raise nodename_to_devices = {} for i in result.items: drives = json.loads(i.data['devices']) nodename_to_devices[i.metadata.labels['rook.io/node']] = drives return nodename_to_devices def get_nfs_conf_url(self, nfs_cluster, instance): # # Fetch cephnfs object for "nfs_cluster" and then return a rados:// # URL for the instance within that cluster. If the fetch fails, just # return None. # try: ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster)) except ApiException as e: log.info("Unable to fetch cephnfs object: {}".format(e.status)) return None pool = ceph_nfs['spec']['rados']['pool'] namespace = ceph_nfs['spec']['rados'].get('namespace', None) if namespace == None: url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance) else: url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) return url def describe_pods(self, service_type, service_id, nodename): # Go query the k8s API about deployment, containers related to this # filesystem # Inspect the Rook YAML, to decide whether this filesystem # is Ceph-managed or Rook-managed # TODO: extend Orchestrator interface to describe whether FS # is manageable by us or not # Example Rook Pod labels for a mgr daemon: # Labels: app=rook-ceph-mgr # pod-template-hash=2171958073 # rook_cluster=rook # And MDS containers additionally have `rook_filesystem` label # Label filter is rook_cluster= # rook_file_system= label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name) if service_type != None: label_filter += ",app=rook-ceph-{0}".format(service_type) if service_id != None: if service_type == "mds": label_filter += ",rook_file_system={0}".format(service_id) elif service_type == "osd": # Label added in https://github.com/rook/rook/pull/1698 label_filter += ",ceph-osd-id={0}".format(service_id) elif service_type == "mon": # label like mon=rook-ceph-mon0 label_filter += ",mon={0}".format(service_id) elif service_type == "mgr": label_filter += ",mgr={0}".format(service_id) elif service_type == "nfs": label_filter += ",ceph_nfs={0}".format(service_id) elif service_type == "rgw": # TODO: rgw pass field_filter = "" if nodename != None: field_filter = "spec.nodeName={0}".format(nodename) pods = self.k8s.list_namespaced_pod( self.rook_env.namespace, label_selector=label_filter, field_selector=field_filter) # import json # print json.dumps(pods.items[0]) pods_summary = [] for p in pods.items: d = p.to_dict() # p['metadata']['creationTimestamp'] # p['metadata']['nodeName'] pods_summary.append({ "name": d['metadata']['name'], "nodename": d['spec']['node_name'], "labels": d['metadata']['labels'] }) pass return pods_summary @contextmanager def ignore_409(self, what): try: yield except ApiException as e: if e.status == 409: # Idempotent, succeed. log.info("{} already exists".format(what)) else: raise def add_filesystem(self, spec): # TODO use spec.placement # TODO warn if spec.extended has entries we don't kow how # to action. rook_fs = { "apiVersion": self.rook_env.api_name, "kind": "CephFilesystem", "metadata": { "name": spec.name, "namespace": self.rook_env.namespace }, "spec": { "onlyManageDaemons": True, "metadataServer": { "activeCount": spec.count, "activeStandby": True } } } with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)): self.rook_api_post("cephfilesystems/", body=rook_fs) def add_nfsgw(self, spec): # TODO use spec.placement # TODO warn if spec.extended has entries we don't kow how # to action. rook_nfsgw = { "apiVersion": self.rook_env.api_name, "kind": "CephNFS", "metadata": { "name": spec.name, "namespace": self.rook_env.namespace }, "spec": { "rados": { "pool": spec.extended["pool"] }, "server": { "active": spec.count, } } } if "namespace" in spec.extended: rook_nfsgw["spec"]["rados"]["namespace"] = spec.extended["namespace"] with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)): self.rook_api_post("cephnfses/", body=rook_nfsgw) def add_objectstore(self, spec): rook_os = { "apiVersion": self.rook_env.api_name, "kind": "CephObjectStore", "metadata": { "name": spec.name, "namespace": self.rook_env.namespace }, "spec": { "metadataPool": { "failureDomain": "host", "replicated": { "size": 1 } }, "dataPool": { "failureDomain": "osd", "replicated": { "size": 1 } }, "gateway": { "type": "s3", "port": 80, "instances": 1, "allNodes": False } } } with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)): self.rook_api_post("cephobjectstores/", body=rook_os) def rm_service(self, service_type, service_id): assert service_type in ("mds", "rgw", "nfs") if service_type == "mds": rooktype = "cephfilesystems" elif service_type == "rgw": rooktype = "cephobjectstores" elif service_type == "nfs": rooktype = "cephnfses" objpath = "{0}/{1}".format(rooktype, service_id) try: self.rook_api_delete(objpath) except ApiException as e: if e.status == 404: log.info("{0} service '{1}' does not exist".format(service_type, service_id)) # Idempotent, succeed. else: raise def can_create_osd(self): current_cluster = self.rook_api_get( "cephclusters/{0}".format(self.rook_env.cluster_name)) use_all_nodes = current_cluster['spec'].get('useAllNodes', False) # If useAllNodes is set, then Rook will not be paying attention # to anything we put in 'nodes', so can't do OSD creation. return not use_all_nodes def node_exists(self, node_name): try: self.k8s.read_node(node_name, exact=False, export=True) except ApiException as e: if e.status == 404: return False else: raise else: return True def update_mon_count(self, newcount): patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}] try: self.rook_api_patch( "cephclusters/{0}".format(self.rook_env.cluster_name), body=patch) except ApiException as e: log.exception("API exception: {0}".format(e)) raise ApplyException( "Failed to update mon count in Cluster CRD: {0}".format(e)) return "Updated mon count to {0}".format(newcount) def update_nfs_count(self, svc_id, newcount): patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}] try: self.rook_api_patch( "cephnfses/{0}".format(svc_id), body=patch) except ApiException as e: log.exception("API exception: {0}".format(e)) raise ApplyException( "Failed to update NFS server count for {0}: {1}".format(svc_id, e)) return "Updated NFS server count for {0} to {1}".format(svc_id, newcount) def add_osds(self, drive_group, all_hosts): # type: (orchestrator.DriveGroupSpec, List[str]) -> str """ Rook currently (0.8) can only do single-drive OSDs, so we treat all drive groups as just a list of individual OSDs. """ block_devices = drive_group.data_devices.paths if drive_group.data_devices else None directories = drive_group.data_directories assert drive_group.objectstore in ("bluestore", "filestore") # The CRD looks something like this: # nodes: # - name: "gravel1.rockery" # devices: # - name: "sdb" # config: # storeType: bluestore current_cluster = self.rook_api_get( "cephclusters/{0}".format(self.rook_env.cluster_name)) patch = [] # FIXME: this is all not really atomic, because jsonpatch doesn't # let us do "test" operations that would check if items with # matching names were in existing lists. if 'nodes' not in current_cluster['spec']['storage']: patch.append({ 'op': 'add', 'path': '/spec/storage/nodes', 'value': [] }) current_nodes = current_cluster['spec']['storage'].get('nodes', []) if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]: pd = { "name": drive_group.hosts(all_hosts)[0], "config": { "storeType": drive_group.objectstore }} if block_devices: pd["devices"] = [{'name': d} for d in block_devices] if directories: pd["directories"] = [{'path': p} for p in directories] patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd }) else: # Extend existing node node_idx = None current_node = None for i, c in enumerate(current_nodes): if c['name'] == drive_group.hosts(all_hosts)[0]: current_node = c node_idx = i break assert node_idx is not None assert current_node is not None new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']])) for n in new_devices: patch.append({ "op": "add", "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx), "value": {'name': n} }) new_dirs = list(set(directories) - set(current_node['directories'])) for p in new_dirs: patch.append({ "op": "add", "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx), "value": {'path': p} }) if len(patch) == 0: return "No change" try: self.rook_api_patch( "cephclusters/{0}".format(self.rook_env.cluster_name), body=patch) except ApiException as e: log.exception("API exception: {0}".format(e)) raise ApplyException( "Failed to create OSD entries in Cluster CRD: {0}".format( e)) return "Success"