diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/rook/module.py | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/rook/module.py')
-rw-r--r-- | src/pybind/mgr/rook/module.py | 512 |
1 files changed, 512 insertions, 0 deletions
diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py new file mode 100644 index 000000000..70512567a --- /dev/null +++ b/src/pybind/mgr/rook/module.py @@ -0,0 +1,512 @@ +import threading +import functools +import os +import json + +from ceph.deployment import inventory +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec +from ceph.utils import datetime_now + +from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple + +try: + from ceph.deployment.drive_group import DriveGroupSpec +except ImportError: + pass # just for type checking + +try: + from kubernetes import client, config + from kubernetes.client.rest import ApiException + + kubernetes_imported = True + + # https://github.com/kubernetes-client/python/issues/895 + from kubernetes.client.models.v1_container_image import V1ContainerImage + def names(self: Any, names: Any) -> None: + self._names = names + V1ContainerImage.names = V1ContainerImage.names.setter(names) + +except ImportError: + kubernetes_imported = False + client = None + config = None + +from mgr_module import MgrModule, Option +import orchestrator +from orchestrator import handle_orch_error, OrchResult, raise_if_exception + +from .rook_cluster import RookCluster + +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable) +ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec) + + + +class RookEnv(object): + def __init__(self) -> None: + # POD_NAMESPACE already exist for Rook 0.9 + self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph') + + # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0 + self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace) + + self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace) + self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1') + self.api_name = "ceph.rook.io/" + self.crd_version + + def api_version_match(self) -> bool: + return self.crd_version == 'v1' + + def has_namespace(self) -> bool: + return 'POD_NAMESPACE' in os.environ + + +class RookOrchestrator(MgrModule, orchestrator.Orchestrator): + """ + Writes are a two-phase thing, firstly sending + the write to the k8s API (fast) and then waiting + for the corresponding change to appear in the + Ceph cluster (slow) + + Right now, we are calling the k8s API synchronously. + """ + + MODULE_OPTIONS: List[Option] = [ + # TODO: configure k8s API addr instead of assuming local + ] + + @staticmethod + def can_run() -> Tuple[bool, str]: + if not kubernetes_imported: + return False, "`kubernetes` python module not found" + if not RookEnv().api_version_match(): + return False, "Rook version unsupported." + return True, '' + + def available(self) -> Tuple[bool, str, Dict[str, Any]]: + if not kubernetes_imported: + return False, "`kubernetes` python module not found", {} + elif not self._rook_env.has_namespace(): + return False, "ceph-mgr not running in Rook cluster", {} + + try: + self.k8s.list_namespaced_pod(self._rook_env.namespace) + except ApiException as e: + return False, "Cannot reach Kubernetes API: {}".format(e), {} + else: + return True, "", {} + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(RookOrchestrator, self).__init__(*args, **kwargs) + + self._initialized = threading.Event() + self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None + self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None + self._rook_cluster: Optional[RookCluster] = None + self._rook_env = RookEnv() + + self._shutdown = threading.Event() + + def shutdown(self) -> None: + self._shutdown.set() + + @property + def k8s(self): + # type: () -> client.CoreV1Api + self._initialized.wait() + assert self._k8s_CoreV1_api is not None + return self._k8s_CoreV1_api + + @property + def rook_cluster(self): + # type: () -> RookCluster + self._initialized.wait() + assert self._rook_cluster is not None + return self._rook_cluster + + def serve(self) -> None: + # For deployed clusters, we should always be running inside + # a Rook cluster. For development convenience, also support + # running outside (reading ~/.kube config) + + if self._rook_env.has_namespace(): + config.load_incluster_config() + else: + self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~") + config.load_kube_config() + + # So that I can do port forwarding from my workstation - jcsp + from kubernetes.client import configuration + configuration.verify_ssl = False + + self._k8s_CoreV1_api = client.CoreV1Api() + self._k8s_BatchV1_api = client.BatchV1Api() + + try: + # XXX mystery hack -- I need to do an API call from + # this context, or subsequent API usage from handle_command + # fails with SSLError('bad handshake'). Suspect some kind of + # thread context setup in SSL lib? + self._k8s_CoreV1_api.list_namespaced_pod(self._rook_env.namespace) + except ApiException: + # Ignore here to make self.available() fail with a proper error message + pass + + self._rook_cluster = RookCluster( + self._k8s_CoreV1_api, + self._k8s_BatchV1_api, + self._rook_env) + + self._initialized.set() + + while not self._shutdown.is_set(): + self._shutdown.wait(5) + + @handle_orch_error + def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: + host_list = None + if host_filter and host_filter.hosts: + # Explicit host list + host_list = host_filter.hosts + elif host_filter and host_filter.labels: + # TODO: query k8s API to resolve to host list, and pass + # it into RookCluster.get_discovered_devices + raise NotImplementedError() + + discovered_devs = self.rook_cluster.get_discovered_devices(host_list) + + result = [] + for host_name, host_devs in discovered_devs.items(): + devs = [] + for d in host_devs: + if 'cephVolumeData' in d and d['cephVolumeData']: + devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData']))) + else: + devs.append(inventory.Device( + path = '/dev/' + d['name'], + sys_api = dict( + rotational = '1' if d['rotational'] else '0', + size = d['size'] + ), + available = False, + rejected_reasons=['device data coming from ceph-volume not provided'], + )) + + result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs))) + + return result + + @handle_orch_error + def get_hosts(self): + # type: () -> List[orchestrator.HostSpec] + return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()] + + @handle_orch_error + def describe_service(self, + service_type: Optional[str] = None, + service_name: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.ServiceDescription]: + now = datetime_now() + + # CephCluster + cl = self.rook_cluster.rook_api_get( + "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name)) + self.log.debug('CephCluster %s' % cl) + image_name = cl['spec'].get('cephVersion', {}).get('image', None) + num_nodes = len(self.rook_cluster.get_node_names()) + + spec = {} + if service_type == 'mon' or service_type is None: + spec['mon'] = orchestrator.ServiceDescription( + spec=ServiceSpec( + 'mon', + placement=PlacementSpec( + count=cl['spec'].get('mon', {}).get('count', 1), + ), + ), + size=cl['spec'].get('mon', {}).get('count', 1), + container_image_name=image_name, + last_refresh=now, + ) + if service_type == 'mgr' or service_type is None: + spec['mgr'] = orchestrator.ServiceDescription( + spec=ServiceSpec( + 'mgr', + placement=PlacementSpec.from_string('count:1'), + ), + size=1, + container_image_name=image_name, + last_refresh=now, + ) + if not cl['spec'].get('crashCollector', {}).get('disable', False): + spec['crash'] = orchestrator.ServiceDescription( + spec=ServiceSpec( + 'crash', + placement=PlacementSpec.from_string('*'), + ), + size=num_nodes, + container_image_name=image_name, + last_refresh=now, + ) + + if service_type == 'mds' or service_type is None: + # CephFilesystems + all_fs = self.rook_cluster.rook_api_get( + "cephfilesystems/") + self.log.debug('CephFilesystems %s' % all_fs) + for fs in all_fs.get('items', []): + svc = 'mds.' + fs['metadata']['name'] + if svc in spec: + continue + # FIXME: we are conflating active (+ standby) with count + active = fs['spec'].get('metadataServer', {}).get('activeCount', 1) + total_mds = active + if fs['spec'].get('metadataServer', {}).get('activeStandby', False): + total_mds = active * 2 + spec[svc] = orchestrator.ServiceDescription( + spec=ServiceSpec( + service_type='mds', + service_id=fs['metadata']['name'], + placement=PlacementSpec(count=active), + ), + size=total_mds, + container_image_name=image_name, + last_refresh=now, + ) + + if service_type == 'rgw' or service_type is None: + # CephObjectstores + all_zones = self.rook_cluster.rook_api_get( + "cephobjectstores/") + self.log.debug('CephObjectstores %s' % all_zones) + for zone in all_zones.get('items', []): + rgw_realm = zone['metadata']['name'] + rgw_zone = rgw_realm + svc = 'rgw.' + rgw_realm + '.' + rgw_zone + if svc in spec: + continue + active = zone['spec']['gateway']['instances']; + if 'securePort' in zone['spec']['gateway']: + ssl = True + port = zone['spec']['gateway']['securePort'] + else: + ssl = False + port = zone['spec']['gateway']['port'] or 80 + spec[svc] = orchestrator.ServiceDescription( + spec=RGWSpec( + service_id=rgw_realm + '.' + rgw_zone, + rgw_realm=rgw_realm, + rgw_zone=rgw_zone, + ssl=ssl, + rgw_frontend_port=port, + placement=PlacementSpec(count=active), + ), + size=active, + container_image_name=image_name, + last_refresh=now, + ) + + if service_type == 'nfs' or service_type is None: + # CephNFSes + all_nfs = self.rook_cluster.rook_api_get( + "cephnfses/") + self.log.warning('CephNFS %s' % all_nfs) + for nfs in all_nfs.get('items', []): + nfs_name = nfs['metadata']['name'] + svc = 'nfs.' + nfs_name + if svc in spec: + continue + active = nfs['spec'].get('server', {}).get('active') + spec[svc] = orchestrator.ServiceDescription( + spec=NFSServiceSpec( + service_id=nfs_name, + placement=PlacementSpec(count=active), + ), + size=active, + last_refresh=now, + ) + + for dd in self._list_daemons(): + if dd.service_name() not in spec: + continue + service = spec[dd.service_name()] + service.running += 1 + if not service.container_image_id: + service.container_image_id = dd.container_image_id + if not service.container_image_name: + service.container_image_name = dd.container_image_name + if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh: + service.last_refresh = dd.last_refresh + if service.created is None or dd.created is None or dd.created < service.created: + service.created = dd.created + + return [v for k, v in spec.items()] + + @handle_orch_error + def list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.DaemonDescription]: + return self._list_daemons(service_name=service_name, + daemon_type=daemon_type, + daemon_id=daemon_id, + host=host, + refresh=refresh) + + def _list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.DaemonDescription]: + pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host) + self.log.debug('pods %s' % pods) + result = [] + for p in pods: + sd = orchestrator.DaemonDescription() + sd.hostname = p['hostname'] + sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '') + status = { + 'Pending': orchestrator.DaemonDescriptionStatus.starting, + 'Running': orchestrator.DaemonDescriptionStatus.running, + 'Succeeded': orchestrator.DaemonDescriptionStatus.stopped, + 'Failed': orchestrator.DaemonDescriptionStatus.error, + 'Unknown': orchestrator.DaemonDescriptionStatus.unknown, + }[p['phase']] + sd.status = status + + if 'ceph_daemon_id' in p['labels']: + sd.daemon_id = p['labels']['ceph_daemon_id'] + elif 'ceph-osd-id' in p['labels']: + sd.daemon_id = p['labels']['ceph-osd-id'] + else: + # Unknown type -- skip it + continue + + if service_name is not None and service_name != sd.service_name(): + continue + sd.container_image_name = p['container_image_name'] + sd.container_image_id = p['container_image_id'] + sd.created = p['created'] + sd.last_configured = p['created'] + sd.last_deployed = p['created'] + sd.started = p['started'] + sd.last_refresh = p['refreshed'] + result.append(sd) + + return result + + @handle_orch_error + def remove_service(self, service_name: str, force: bool = False) -> str: + service_type, service_name = service_name.split('.', 1) + if service_type == 'mds': + return self.rook_cluster.rm_service('cephfilesystems', service_name) + elif service_type == 'rgw': + return self.rook_cluster.rm_service('cephobjectstores', service_name) + elif service_type == 'nfs': + return self.rook_cluster.rm_service('cephnfses', service_name) + else: + raise orchestrator.OrchestratorError(f'Service type {service_type} not supported') + + @handle_orch_error + def apply_mon(self, spec): + # type: (ServiceSpec) -> str + if spec.placement.hosts or spec.placement.label: + raise RuntimeError("Host list or label is not supported by rook.") + + return self.rook_cluster.update_mon_count(spec.placement.count) + + @handle_orch_error + def apply_mds(self, spec): + # type: (ServiceSpec) -> str + return self.rook_cluster.apply_filesystem(spec) + + @handle_orch_error + def apply_rgw(self, spec): + # type: (RGWSpec) -> str + return self.rook_cluster.apply_objectstore(spec) + + @handle_orch_error + def apply_nfs(self, spec): + # type: (NFSServiceSpec) -> str + return self.rook_cluster.apply_nfsgw(spec) + + @handle_orch_error + def remove_daemons(self, names: List[str]) -> List[str]: + return self.rook_cluster.remove_pods(names) + + @handle_orch_error + def create_osds(self, drive_group): + # type: (DriveGroupSpec) -> str + """ Creates OSDs from a drive group specification. + + $: ceph orch osd create -i <dg.file> + + The drivegroup file must only contain one spec at a time. + """ + + targets = [] # type: List[str] + if drive_group.data_devices and drive_group.data_devices.paths: + targets += [d.path for d in drive_group.data_devices.paths] + if drive_group.data_directories: + targets += drive_group.data_directories + + all_hosts = raise_if_exception(self.get_hosts()) + + matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) + + assert len(matching_hosts) == 1 + + if not self.rook_cluster.node_exists(matching_hosts[0]): + raise RuntimeError("Node '{0}' is not in the Kubernetes " + "cluster".format(matching_hosts)) + + # Validate whether cluster CRD can accept individual OSD + # creations (i.e. not useAllDevices) + if not self.rook_cluster.can_create_osd(): + raise RuntimeError("Rook cluster configuration does not " + "support OSD creation.") + + return self.rook_cluster.add_osds(drive_group, matching_hosts) + + # TODO: this was the code to update the progress reference: + """ + @handle_orch_error + def has_osds(matching_hosts: List[str]) -> bool: + + # Find OSD pods on this host + pod_osd_ids = set() + pods = self.k8s.list_namespaced_pod(self._rook_env.namespace, + label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), + field_selector="spec.nodeName={0}".format( + matching_hosts[0] + )).items + for p in pods: + pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) + + self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids)) + + found = [] + osdmap = self.get("osd_map") + for osd in osdmap['osds']: + osd_id = osd['osd'] + if osd_id not in pod_osd_ids: + continue + + metadata = self.get_metadata('osd', "%s" % osd_id) + if metadata and metadata['devices'] in targets: + found.append(osd_id) + else: + self.log.info("ignoring osd {0} {1}".format( + osd_id, metadata['devices'] if metadata else 'DNE' + )) + + return found is not None + """ + + @handle_orch_error + def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: + return self.rook_cluster.blink_light(ident_fault, on, locs) |