From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/rook/rook_cluster.py | 1591 +++++++++++++++++++++++++++++++++++ 1 file changed, 1591 insertions(+) create mode 100644 src/pybind/mgr/rook/rook_cluster.py (limited to 'src/pybind/mgr/rook/rook_cluster.py') diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py new file mode 100644 index 000000000..5c7c9fc04 --- /dev/null +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -0,0 +1,1591 @@ +""" +This module wrap's Rook + Kubernetes APIs to expose the calls +needed to implement an orchestrator module. While the orchestrator +module exposes an async API, this module simply exposes blocking API +call methods. + +This module is runnable outside of ceph-mgr, useful for testing. +""" +import datetime +import threading +import logging +from contextlib import contextmanager +from time import sleep +import re +from orchestrator import OrchResult + +import jsonpatch +from urllib.parse import urljoin +import json + +# Optional kubernetes imports to enable MgrModule.can_run +# to behave cleanly. +from urllib3.exceptions import ProtocolError + +from ceph.deployment.inventory import Device +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec +from ceph.utils import datetime_now +from ceph.deployment.drive_selection.matchers import SizeMatcher +from nfs.cluster import create_ganesha_pool +from nfs.module import Module +from nfs.export import NFSRados +from mgr_module import NFS_POOL_NAME +from mgr_util import merge_dicts + +from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \ + Iterable, Dict, Iterator, Type + +try: + from kubernetes import client, watch + from kubernetes.client.rest import ApiException +except ImportError: + class ApiException(Exception): # type: ignore + status = 0 + +from .rook_client.ceph import cephfilesystem as cfs +from .rook_client.ceph import cephnfs as cnfs +from .rook_client.ceph import cephobjectstore as cos +from .rook_client.ceph import cephcluster as ccl +from .rook_client.ceph import cephrbdmirror as crbdm +from .rook_client._helper import CrdClass + +import orchestrator + +try: + from rook.module import RookEnv, RookOrchestrator +except ImportError: + pass # just used for type checking. + + +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable) + +CrdClassT = TypeVar('CrdClassT', bound=CrdClass) + + +log = logging.getLogger(__name__) + + +def __urllib3_supports_read_chunked() -> bool: + # There is a bug in CentOS 7 as it ships a urllib3 which is lower + # than required by kubernetes-client + try: + from urllib3.response import HTTPResponse + return hasattr(HTTPResponse, 'read_chunked') + except ImportError: + return False + + +_urllib3_supports_read_chunked = __urllib3_supports_read_chunked() + +class ApplyException(orchestrator.OrchestratorError): + """ + For failures to update the Rook CRDs, usually indicating + some kind of interference between our attempted update + and other conflicting activity. + """ + + +def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]: + def wrapper(*args: Any, **kwargs: Any) -> threading.Thread: + t = threading.Thread(target=f, args=args, kwargs=kwargs) + t.start() + return t + + return cast(Callable[..., threading.Thread], wrapper) + + +class DefaultFetcher(): + def __init__(self, storage_class_name: str, coreV1_api: 'client.CoreV1Api', rook_env: 'RookEnv'): + self.storage_class_name = storage_class_name + self.coreV1_api = coreV1_api + self.rook_env = rook_env + self.pvs_in_sc: List[client.V1PersistentVolumeList] = [] + + def fetch(self) -> None: + self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume) + self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class_name] + + def convert_size(self, size_str: str) -> int: + units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E") + coeff_and_unit = re.search('(\d+)(\D+)', size_str) + assert coeff_and_unit is not None + coeff = int(coeff_and_unit[1]) + unit = coeff_and_unit[2] + try: + factor = units.index(unit) % 7 + except ValueError: + log.error("PV size format invalid") + raise + size = coeff * (2 ** (10 * factor)) + return size + + def devices(self) -> Dict[str, List[Device]]: + nodename_to_devices: Dict[str, List[Device]] = {} + for i in self.pvs_in_sc: + node, device = self.device(i) + if node not in nodename_to_devices: + nodename_to_devices[node] = [] + nodename_to_devices[node].append(device) + return nodename_to_devices + + def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]: + node = 'N/A' + if i.spec.node_affinity: + terms = i.spec.node_affinity.required.node_selector_terms + if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1: + node = terms[0].match_expressions[0].values[0] + size = self.convert_size(i.spec.capacity['storage']) + path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else '' + state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available' + pv_name = i.metadata.name + device = Device( + path = path, + sys_api = dict( + size = size, + node = node, + pv_name = pv_name + ), + available = state, + ) + return (node, device) + + +class LSOFetcher(DefaultFetcher): + def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', rook_env: 'RookEnv', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None): + super().__init__(storage_class, coreV1_api, rook_env) + self.customObjects_api = customObjects_api + self.nodenames = nodenames + + def fetch(self) -> None: + super().fetch() + self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object, + group="local.storage.openshift.io", + version="v1alpha1", + plural="localvolumediscoveryresults") + + def predicate(self, item: 'client.V1ConfigMapList') -> bool: + if self.nodenames is not None: + return item['spec']['nodeName'] in self.nodenames + else: + return True + + def devices(self) -> Dict[str, List[Device]]: + try: + lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)] + except ApiException as dummy_e: + log.error("Failed to fetch device metadata") + raise + self.lso_devices = {} + for i in lso_discovery_results: + drives = i['status']['discoveredDevices'] + for drive in drives: + self.lso_devices[drive['deviceID'].split('/')[-1]] = drive + nodename_to_devices: Dict[str, List[Device]] = {} + for i in self.pvs_in_sc: + node, device = (None, None) + if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices): + node, device = super().device(i) + else: + node, device = self.device(i) + if node not in nodename_to_devices: + nodename_to_devices[node] = [] + nodename_to_devices[node].append(device) + return nodename_to_devices + + def device(self, i: Any) -> Tuple[str, Device]: + node = i.metadata.labels['kubernetes.io/hostname'] + device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']] + pv_name = i.metadata.name + vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else '' + model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else '' + device = Device( + path = device_discovery['path'], + sys_api = dict( + size = device_discovery['size'], + rotational = '1' if device_discovery['property']=='Rotational' else '0', + node = node, + pv_name = pv_name, + model = model, + vendor = vendor + ), + available = device_discovery['status']['state']=='Available', + device_id = device_discovery['deviceID'].split('/')[-1], + lsm_data = dict( + serialNum = device_discovery['serial'] + ) + ) + return (node, device) + + +class PDFetcher(DefaultFetcher): + """ Physical Devices Fetcher""" + def __init__(self, coreV1_api: 'client.CoreV1Api', rook_env: 'RookEnv'): + super().__init__('', coreV1_api, rook_env) + + def fetch(self) -> None: + """ Collect the devices information from k8s configmaps""" + self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map, + namespace=self.rook_env.operator_namespace, + label_selector='app=rook-discover') + + def devices(self) -> Dict[str, List[Device]]: + """ Return the list of devices found""" + node_devices: Dict[str, List[Device]] = {} + for i in self.dev_cms.items: + devices_list: List[Device] = [] + for d in json.loads(i.data['devices']): + devices_list.append(self.device(d)[1]) + node_devices[i.metadata.labels['rook.io/node']] = devices_list + + return node_devices + + def device(self, devData: Dict[str,str]) -> Tuple[str, Device]: + """ Build an orchestrator device """ + if 'cephVolumeData' in devData and devData['cephVolumeData']: + return "", Device.from_json(json.loads(devData['cephVolumeData'])) + else: + return "", Device( + path='/dev/' + devData['name'], + sys_api=dict( + rotational='1' if devData['rotational'] else '0', + size=devData['size'] + ), + available=False, + rejected_reasons=['device data coming from ceph-volume not provided'], + ) + + +class KubernetesResource(Generic[T]): + def __init__(self, api_func: Callable, **kwargs: Any) -> None: + """ + Generic kubernetes Resource parent class + + The api fetch and watch methods should be common across resource types, + + Exceptions in the runner thread are propagated to the caller. + + :param api_func: kubernetes client api function that is passed to the watcher + :param filter_func: signature: ``(Item) -> bool``. + """ + self.kwargs = kwargs + self.api_func = api_func + + # ``_items`` is accessed by different threads. I assume assignment is atomic. + self._items: Dict[str, T] = dict() + self.thread = None # type: Optional[threading.Thread] + self.exception: Optional[Exception] = None + if not _urllib3_supports_read_chunked: + logging.info('urllib3 is too old. Fallback to full fetches') + + def _fetch(self) -> str: + """ Execute the requested api method as a one-off fetch""" + response = self.api_func(**self.kwargs) + metadata = response.metadata + self._items = {item.metadata.name: item for item in response.items} + log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) + return metadata.resource_version + + @property + def items(self) -> Iterable[T]: + """ + Returns the items of the request. + Creates the watcher as a side effect. + :return: + """ + if self.exception: + e = self.exception + self.exception = None + raise e # Propagate the exception to the user. + if not self.thread or not self.thread.is_alive(): + resource_version = self._fetch() + if _urllib3_supports_read_chunked: + # Start a thread which will use the kubernetes watch client against a resource + log.debug("Attaching resource watcher for k8s {}".format(self.api_func)) + self.thread = self._watch(resource_version) + + return self._items.values() + + def get_item_name(self, item: Any) -> Any: + try: + return item.metadata.name + except AttributeError: + raise AttributeError( + "{} doesn't contain a metadata.name. Unable to track changes".format( + self.api_func)) + @threaded + def _watch(self, res_ver: Optional[str]) -> None: + """ worker thread that runs the kubernetes watch """ + + self.exception = None + + w = watch.Watch() + + try: + # execute generator to continually watch resource for changes + for event in w.stream(self.api_func, resource_version=res_ver, watch=True, + **self.kwargs): + self.health = '' + item = event['object'] + name = self.get_item_name(item) + + log.info('{} event: {}'.format(event['type'], name)) + + if event['type'] in ('ADDED', 'MODIFIED'): + self._items = merge_dicts(self._items, {name: item}) + elif event['type'] == 'DELETED': + self._items = {k:v for k,v in self._items.items() if k != name} + elif event['type'] == 'BOOKMARK': + pass + elif event['type'] == 'ERROR': + raise ApiException(str(event)) + else: + raise KeyError('Unknown watch event {}'.format(event['type'])) + except ProtocolError as e: + if 'Connection broken' in str(e): + log.info('Connection reset.') + return + raise + except ApiException as e: + log.exception('K8s API failed. {}'.format(self.api_func)) + self.exception = e + raise + except Exception as e: + log.exception("Watcher failed. ({})".format(self.api_func)) + self.exception = e + raise + +class KubernetesCustomResource(KubernetesResource): + def _fetch(self) -> str: + response = self.api_func(**self.kwargs) + metadata = response['metadata'] + self._items = {item['metadata']['name']: item for item in response['items']} + log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) + return metadata['resourceVersion'] + + def get_item_name(self, item: Any) -> Any: + try: + return item['metadata']['name'] + except AttributeError: + raise AttributeError( + "{} doesn't contain a metadata.name. Unable to track changes".format( + self.api_func)) + +class DefaultCreator(): + def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'): + self.coreV1_api = coreV1_api + self.storage_class_name = storage_class_name + self.inventory = inventory + + def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: + device_set = ccl.StorageClassDeviceSetsItem( + name=d.sys_api['pv_name'], + volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), + count=1, + encrypted=drive_group.encrypted, + portable=False + ) + device_set.volumeClaimTemplates.append( + ccl.VolumeClaimTemplatesItem( + metadata=ccl.Metadata( + name="data" + ), + spec=ccl.Spec( + storageClassName=self.storage_class_name, + volumeMode="Block", + accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), + resources={ + "requests":{ + "storage": 1 + } + }, + volumeName=d.sys_api['pv_name'] + ) + ) + ) + return device_set + + def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: + device_list = [] + assert drive_group.data_devices is not None + sizematcher: Optional[SizeMatcher] = None + if drive_group.data_devices.size: + sizematcher = SizeMatcher('size', drive_group.data_devices.size) + limit = getattr(drive_group.data_devices, 'limit', None) + count = 0 + all = getattr(drive_group.data_devices, 'all', None) + paths = [device.path for device in drive_group.data_devices.paths] + osd_list = [] + for pod in rook_pods.items: + if ( + hasattr(pod, 'metadata') + and hasattr(pod.metadata, 'labels') + and 'osd' in pod.metadata.labels + and 'ceph.rook.io/DeviceSet' in pod.metadata.labels + ): + osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) + for _, node in self.inventory.items(): + for device in node: + if device.sys_api['pv_name'] in osd_list: + count += 1 + for _, node in self.inventory.items(): + for device in node: + if not limit or (count < limit): + if device.available: + if ( + all + or ( + device.sys_api['node'] in matching_hosts + and ((sizematcher != None) or sizematcher.compare(device)) + and ( + not drive_group.data_devices.paths + or (device.path in paths) + ) + ) + ): + device_list.append(device) + count += 1 + + return device_list + + def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: + to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) + assert drive_group.data_devices is not None + def _add_osds(current_cluster, new_cluster): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: + new_cluster.spec.storage = ccl.Storage() + + if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: + new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() + + existing_scds = [ + scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets + ] + for device in to_create: + new_scds = self.device_to_device_set(drive_group, device) + if new_scds.name not in existing_scds: + new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) + return new_cluster + return _add_osds + +class LSOCreator(DefaultCreator): + def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: + device_list = [] + assert drive_group.data_devices is not None + sizematcher = None + if drive_group.data_devices.size: + sizematcher = SizeMatcher('size', drive_group.data_devices.size) + limit = getattr(drive_group.data_devices, 'limit', None) + all = getattr(drive_group.data_devices, 'all', None) + paths = [device.path for device in drive_group.data_devices.paths] + vendor = getattr(drive_group.data_devices, 'vendor', None) + model = getattr(drive_group.data_devices, 'model', None) + count = 0 + osd_list = [] + for pod in rook_pods.items: + if ( + hasattr(pod, 'metadata') + and hasattr(pod.metadata, 'labels') + and 'osd' in pod.metadata.labels + and 'ceph.rook.io/DeviceSet' in pod.metadata.labels + ): + osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) + for _, node in self.inventory.items(): + for device in node: + if device.sys_api['pv_name'] in osd_list: + count += 1 + for _, node in self.inventory.items(): + for device in node: + if not limit or (count < limit): + if device.available: + if ( + all + or ( + device.sys_api['node'] in matching_hosts + and ((sizematcher != None) or sizematcher.compare(device)) + and ( + not drive_group.data_devices.paths + or device.path in paths + ) + and ( + not vendor + or device.sys_api['vendor'] == vendor + ) + and ( + not model + or device.sys_api['model'].startsWith(model) + ) + ) + ): + device_list.append(device) + count += 1 + return device_list + +class DefaultRemover(): + def __init__( + self, + coreV1_api: 'client.CoreV1Api', + batchV1_api: 'client.BatchV1Api', + appsV1_api: 'client.AppsV1Api', + osd_ids: List[str], + replace_flag: bool, + force_flag: bool, + mon_command: Callable, + patch: Callable, + rook_env: 'RookEnv', + inventory: Dict[str, List[Device]] + ): + self.batchV1_api = batchV1_api + self.appsV1_api = appsV1_api + self.coreV1_api = coreV1_api + + self.osd_ids = osd_ids + self.replace_flag = replace_flag + self.force_flag = force_flag + + self.mon_command = mon_command + + self.patch = patch + self.rook_env = rook_env + + self.inventory = inventory + self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, + namespace=self.rook_env.namespace, + label_selector='app=rook-ceph-osd') + self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, + namespace=self.rook_env.namespace, + label_selector='app=rook-ceph-osd-prepare') + self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, + namespace=self.rook_env.namespace) + + + def remove_device_sets(self) -> str: + self.to_remove: Dict[str, int] = {} + self.pvc_to_remove: List[str] = [] + for pod in self.osd_pods.items: + if ( + hasattr(pod, 'metadata') + and hasattr(pod.metadata, 'labels') + and 'osd' in pod.metadata.labels + and pod.metadata.labels['osd'] in self.osd_ids + ): + if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: + self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 + else: + self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 + self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc']) + def _remove_osds(current_cluster, new_cluster): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None + for _set in new_cluster.spec.storage.storageClassDeviceSets: + if _set.name in self.to_remove: + if _set.count == self.to_remove[_set.name]: + new_cluster.spec.storage.storageClassDeviceSets.remove(_set) + else: + _set.count = _set.count - self.to_remove[_set.name] + return new_cluster + return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) + + def check_force(self) -> None: + if not self.force_flag: + safe_args = {'prefix': 'osd safe-to-destroy', + 'ids': [str(x) for x in self.osd_ids]} + ret, out, err = self.mon_command(safe_args) + if ret != 0: + raise RuntimeError(err) + + def set_osds_down(self) -> None: + down_flag_args = { + 'prefix': 'osd down', + 'ids': [str(x) for x in self.osd_ids] + } + ret, out, err = self.mon_command(down_flag_args) + if ret != 0: + raise RuntimeError(err) + + def scale_deployments(self) -> None: + for osd_id in self.osd_ids: + self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace, + name='rook-ceph-osd-{}'.format(osd_id), + body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0))) + + def set_osds_out(self) -> None: + out_flag_args = { + 'prefix': 'osd out', + 'ids': [str(x) for x in self.osd_ids] + } + ret, out, err = self.mon_command(out_flag_args) + if ret != 0: + raise RuntimeError(err) + + def delete_deployments(self) -> None: + for osd_id in self.osd_ids: + self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace, + name='rook-ceph-osd-{}'.format(osd_id), + propagation_policy='Foreground') + + def clean_up_prepare_jobs_and_pvc(self) -> None: + for job in self.jobs.items: + if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove: + self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace, + propagation_policy='Foreground') + self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], + namespace=self.rook_env.namespace, + propagation_policy='Foreground') + + def purge_osds(self) -> None: + for id in self.osd_ids: + purge_args = { + 'prefix': 'osd purge-actual', + 'id': int(id), + 'yes_i_really_mean_it': True + } + ret, out, err = self.mon_command(purge_args) + if ret != 0: + raise RuntimeError(err) + + def destroy_osds(self) -> None: + for id in self.osd_ids: + destroy_args = { + 'prefix': 'osd destroy-actual', + 'id': int(id), + 'yes_i_really_mean_it': True + } + ret, out, err = self.mon_command(destroy_args) + if ret != 0: + raise RuntimeError(err) + + def remove(self) -> str: + try: + self.check_force() + except Exception as e: + log.exception("Error checking if OSDs are safe to destroy") + return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}" + try: + remove_result = self.remove_device_sets() + except Exception as e: + log.exception("Error patching ceph cluster CRD") + return f"Not possible to modify Ceph cluster CRD: {e}" + try: + self.scale_deployments() + self.delete_deployments() + self.clean_up_prepare_jobs_and_pvc() + except Exception as e: + log.exception("Ceph cluster CRD patched, but error cleaning environment") + return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}" + try: + self.set_osds_down() + self.set_osds_out() + if self.replace_flag: + self.destroy_osds() + else: + self.purge_osds() + except Exception as e: + log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster") + return f"Error removing OSDs from Ceph cluster: {e}" + + return remove_result + + + +class RookCluster(object): + # import of client.CoreV1Api must be optional at import time. + # Instead allow mgr/rook to be imported anyway. + def __init__( + self, + coreV1_api: 'client.CoreV1Api', + batchV1_api: 'client.BatchV1Api', + customObjects_api: 'client.CustomObjectsApi', + storageV1_api: 'client.StorageV1Api', + appsV1_api: 'client.AppsV1Api', + rook_env: 'RookEnv', + storage_class_name: 'str' + ): + self.rook_env = rook_env # type: RookEnv + self.coreV1_api = coreV1_api # client.CoreV1Api + self.batchV1_api = batchV1_api + self.customObjects_api = customObjects_api + self.storageV1_api = storageV1_api # client.StorageV1Api + self.appsV1_api = appsV1_api # client.AppsV1Api + self.storage_class_name = storage_class_name # type: str + + # TODO: replace direct k8s calls with Rook API calls + self.available_storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class) + self.configured_storage_classes = self.list_storage_classes() + + self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod, + namespace=self.rook_env.namespace, + label_selector="rook_cluster={0}".format( + self.rook_env.namespace)) + self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node) + + def rook_url(self, path: str) -> str: + prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( + self.rook_env.crd_version, self.rook_env.namespace) + return urljoin(prefix, path) + + def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any: + full_path = self.rook_url(path) + log.debug("[%s] %s" % (verb, full_path)) + + return self.coreV1_api.api_client.call_api( + full_path, + verb, + auth_settings=['BearerToken'], + response_type="object", + _return_http_data_only=True, + _preload_content=True, + **kwargs) + + def rook_api_get(self, path: str, **kwargs: Any) -> Any: + return self.rook_api_call("GET", path, **kwargs) + + def rook_api_delete(self, path: str) -> Any: + return self.rook_api_call("DELETE", path) + + def rook_api_patch(self, path: str, **kwargs: Any) -> Any: + return self.rook_api_call("PATCH", path, + header_params={"Content-Type": "application/json-patch+json"}, + **kwargs) + + def rook_api_post(self, path: str, **kwargs: Any) -> Any: + return self.rook_api_call("POST", path, **kwargs) + + def list_storage_classes(self) -> List[str]: + try: + crd = self.customObjects_api.get_namespaced_custom_object( + group="ceph.rook.io", + version="v1", + namespace=self.rook_env.namespace, + plural="cephclusters", + name=self.rook_env.cluster_name) + + sc_devicesets = crd['spec']['storage']['storageClassDeviceSets'] + sc_names = [vct['spec']['storageClassName'] for sc in sc_devicesets for vct in sc['volumeClaimTemplates']] + log.info(f"the cluster has the following configured sc: {sc_names}") + return sc_names + except Exception as e: + log.error(f"unable to list storage classes: {e}") + return [] + + # TODO: remove all the calls to code that uses rook_cluster.storage_class_name + def get_storage_class(self) -> 'client.V1StorageClass': + matching_sc = [i for i in self.available_storage_classes.items if self.storage_class_name == i.metadata.name] + if len(matching_sc) == 0: + log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class_name}>. This storage class can be set in ceph config (mgr/rook/storage_class)") + raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class') + return matching_sc[0] + + def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]: + discovered_devices: Dict[str, List[Device]] = {} + op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace=self.rook_env.operator_namespace).data + fetcher: Optional[DefaultFetcher] = None + if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true': + fetcher = PDFetcher(self.coreV1_api, self.rook_env) + fetcher.fetch() + discovered_devices = fetcher.devices() + else: + active_storage_classes = [sc for sc in self.available_storage_classes.items if sc.metadata.name in self.configured_storage_classes] + for sc in active_storage_classes: + if sc.metadata.labels and ('local.storage.openshift.io/owner-name' in sc.metadata.labels): + fetcher = LSOFetcher(sc.metadata.name, self.coreV1_api, self.customObjects_api, nodenames) + else: + fetcher = DefaultFetcher(sc.metadata.name, self.coreV1_api, self.rook_env) + fetcher.fetch() + discovered_devices.update(fetcher.devices()) + + return discovered_devices + + def get_osds(self) -> List: + osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, + namespace=self.rook_env.namespace, + label_selector='app=rook-ceph-osd') + return list(osd_pods.items) + + def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]: + # + # Fetch cephnfs object for "nfs_cluster" and then return a rados:// + # URL for the instance within that cluster. If the fetch fails, just + # return None. + # + try: + ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster)) + except ApiException as e: + log.info("Unable to fetch cephnfs object: {}".format(e.status)) + return None + + pool = ceph_nfs['spec']['rados']['pool'] + namespace = ceph_nfs['spec']['rados'].get('namespace', None) + + if namespace == None: + url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance) + else: + url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) + return url + + def describe_pods(self, + service_type: Optional[str], + service_id: Optional[str], + nodename: Optional[str]) -> List[Dict[str, Any]]: + """ + Go query the k8s API about deployment, containers related to this + filesystem + + Example Rook Pod labels for a mgr daemon: + Labels: app=rook-ceph-mgr + pod-template-hash=2171958073 + rook_cluster=rook + And MDS containers additionally have `rook_filesystem` label + + Label filter is rook_cluster= + rook_file_system= + """ + def predicate(item): + # type: (client.V1Pod) -> bool + metadata = item.metadata + if service_type is not None: + if metadata.labels['app'] != "rook-ceph-{0}".format(service_type): + return False + + if service_id is not None: + try: + k, v = { + "mds": ("rook_file_system", service_id), + "osd": ("ceph-osd-id", service_id), + "mon": ("mon", service_id), + "mgr": ("mgr", service_id), + "nfs": ("nfs", service_id), + "rgw": ("ceph_rgw", service_id), + }[service_type] + except KeyError: + raise orchestrator.OrchestratorValidationError( + '{} not supported'.format(service_type)) + if metadata.labels[k] != v: + return False + + if nodename is not None: + if item.spec.node_name != nodename: + return False + return True + + refreshed = datetime_now() + pods = [i for i in self.rook_pods.items if predicate(i)] + + pods_summary = [] + prefix = 'sha256:' + + for p in pods: + d = p.to_dict() + + image_name = None + for c in d['spec']['containers']: + # look at the first listed container in the pod... + image_name = c['image'] + break + + ls = d['status'].get('container_statuses') + if not ls: + # ignore pods with no containers + continue + image_id = ls[0]['image_id'] + image_id = image_id.split(prefix)[1] if prefix in image_id else image_id + + s = { + "name": d['metadata']['name'], + "hostname": d['spec']['node_name'], + "labels": d['metadata']['labels'], + 'phase': d['status']['phase'], + 'container_image_name': image_name, + 'container_image_id': image_id, + 'refreshed': refreshed, + # these may get set below... + 'started': None, + 'created': None, + } + + # note: we want UTC + if d['metadata'].get('creation_timestamp', None): + s['created'] = d['metadata']['creation_timestamp'].astimezone( + tz=datetime.timezone.utc) + if d['status'].get('start_time', None): + s['started'] = d['status']['start_time'].astimezone( + tz=datetime.timezone.utc) + + pods_summary.append(s) + + return pods_summary + + def remove_pods(self, names: List[str]) -> List[str]: + pods = [i for i in self.rook_pods.items] + for p in pods: + d = p.to_dict() + daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','') + daemon_id = d['metadata']['labels']['ceph_daemon_id'] + name = daemon_type + '.' + daemon_id + if name in names: + self.coreV1_api.delete_namespaced_pod( + d['metadata']['name'], + self.rook_env.namespace, + body=client.V1DeleteOptions() + ) + return [f'Removed Pod {n}' for n in names] + + def get_node_names(self) -> List[str]: + return [i.metadata.name for i in self.nodes.items] + + @contextmanager + def ignore_409(self, what: str) -> Iterator[None]: + try: + yield + except ApiException as e: + if e.status == 409: + # Idempotent, succeed. + log.info("{} already exists".format(what)) + else: + raise + + def apply_filesystem(self, spec: ServiceSpec, num_replicas: int, + leaf_type: str) -> str: + # TODO use spec.placement + # TODO warn if spec.extended has entries we don't kow how + # to action. + all_hosts = self.get_hosts() + def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem: + new.spec.metadataServer.activeCount = spec.placement.count or 1 + new.spec.metadataServer.placement = cfs.Placement( + nodeAffinity=cfs.NodeAffinity( + requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=cfs.NodeSelectorTermsList( + [placement_spec_to_node_selector(spec.placement, all_hosts)] + ) + ) + ) + ) + return new + def _create_fs() -> cfs.CephFilesystem: + fs = cfs.CephFilesystem( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=spec.service_id, + namespace=self.rook_env.namespace, + ), + spec=cfs.Spec( + dataPools=cfs.DataPoolsList( + { + cfs.DataPoolsItem( + failureDomain=leaf_type, + replicated=cfs.Replicated( + size=num_replicas + ) + ) + } + ), + metadataPool=cfs.MetadataPool( + failureDomain=leaf_type, + replicated=cfs.Replicated( + size=num_replicas + ) + ), + metadataServer=cfs.MetadataServer( + activeCount=spec.placement.count or 1, + activeStandby=True, + placement= + cfs.Placement( + nodeAffinity=cfs.NodeAffinity( + requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=cfs.NodeSelectorTermsList( + [placement_spec_to_node_selector(spec.placement, all_hosts)] + ) + ) + ) + ) + ) + ) + ) + return fs + assert spec.service_id is not None + return self._create_or_patch( + cfs.CephFilesystem, 'cephfilesystems', spec.service_id, + _update_fs, _create_fs) + + def get_matching_node(self, host: str) -> Any: + matching_node = None + for node in self.nodes.items: + if node.metadata.labels['kubernetes.io/hostname'] == host: + matching_node = node + return matching_node + + def add_host_label(self, host: str, label: str) -> OrchResult[str]: + matching_node = self.get_matching_node(host) + if matching_node == None: + return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster")) + matching_node.metadata.labels['ceph-label/'+ label] = "" + self.coreV1_api.patch_node(host, matching_node) + return OrchResult(f'Added {label} label to {host}') + + def remove_host_label(self, host: str, label: str) -> OrchResult[str]: + matching_node = self.get_matching_node(host) + if matching_node == None: + return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster")) + matching_node.metadata.labels.pop('ceph-label/' + label, None) + self.coreV1_api.patch_node(host, matching_node) + return OrchResult(f'Removed {label} label from {host}') + + def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str: + assert spec.service_id is not None + + name = spec.service_id + + if '.' in spec.service_id: + # rook does not like . in the name. this is could + # there because it is a legacy rgw spec that was named + # like $realm.$zone, except that I doubt there were any + # users of this code. Instead, focus on future users and + # translate . to - (fingers crossed!) instead. + name = spec.service_id.replace('.', '-') + + all_hosts = self.get_hosts() + def _create_zone() -> cos.CephObjectStore: + port = None + secure_port = None + if spec.ssl: + secure_port = spec.get_port() + else: + port = spec.get_port() + object_store = cos.CephObjectStore( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=name, + namespace=self.rook_env.namespace + ), + spec=cos.Spec( + gateway=cos.Gateway( + port=port, + securePort=secure_port, + instances=spec.placement.count or 1, + placement=cos.Placement( + cos.NodeAffinity( + requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=cos.NodeSelectorTermsList( + [ + placement_spec_to_node_selector(spec.placement, all_hosts) + ] + ) + ) + ) + ) + ), + dataPool=cos.DataPool( + failureDomain=leaf_type, + replicated=cos.Replicated( + size=num_replicas + ) + ), + metadataPool=cos.MetadataPool( + failureDomain=leaf_type, + replicated=cos.Replicated( + size=num_replicas + ) + ) + ) + ) + if spec.rgw_zone: + object_store.spec.zone=cos.Zone( + name=spec.rgw_zone + ) + return object_store + + + def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore: + if new.spec.gateway: + new.spec.gateway.instances = spec.placement.count or 1 + else: + new.spec.gateway=cos.Gateway( + instances=spec.placement.count or 1 + ) + return new + return self._create_or_patch( + cos.CephObjectStore, 'cephobjectstores', name, + _update_zone, _create_zone) + + def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str: + # TODO use spec.placement + # TODO warn if spec.extended has entries we don't kow how + # to action. + # TODO Number of pods should be based on the list of hosts in the + # PlacementSpec. + assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing + service_id = spec.service_id + mgr_module = cast(Module, mgr) + count = spec.placement.count or 1 + def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS: + new.spec.server.active = count + return new + + def _create_nfs() -> cnfs.CephNFS: + rook_nfsgw = cnfs.CephNFS( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=spec.service_id, + namespace=self.rook_env.namespace, + ), + spec=cnfs.Spec( + rados=cnfs.Rados( + namespace=service_id, + pool=NFS_POOL_NAME, + ), + server=cnfs.Server( + active=count + ) + ) + ) + + + return rook_nfsgw + + create_ganesha_pool(mgr) + NFSRados(mgr_module.rados, service_id).write_obj('', f'conf-nfs.{spec.service_id}') + return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id, + _update_nfs, _create_nfs) + + def rm_service(self, rooktype: str, service_id: str) -> str: + self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", + namespace=self.rook_env.namespace, + plural=rooktype, name=service_id) + objpath = "{0}/{1}".format(rooktype, service_id) + return f'Removed {objpath}' + + def get_resource(self, resource_type: str) -> Iterable: + custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, + group="ceph.rook.io", + version="v1", + namespace=self.rook_env.namespace, + plural=resource_type) + return custom_objects.items + + def can_create_osd(self) -> bool: + current_cluster = self.rook_api_get( + "cephclusters/{0}".format(self.rook_env.cluster_name)) + use_all_nodes = current_cluster['spec'].get('useAllNodes', False) + + # If useAllNodes is set, then Rook will not be paying attention + # to anything we put in 'nodes', so can't do OSD creation. + return not use_all_nodes + + def node_exists(self, node_name: str) -> bool: + return node_name in self.get_node_names() + + def update_mon_count(self, newcount: Optional[int]) -> str: + def _update_mon_count(current, new): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + if newcount is None: + raise orchestrator.OrchestratorError('unable to set mon count to None') + if not new.spec.mon: + raise orchestrator.OrchestratorError("mon attribute not specified in new spec") + new.spec.mon.count = newcount + return new + return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) + + def add_osds(self, drive_group, matching_hosts): + # type: (DriveGroupSpec, List[str]) -> str + assert drive_group.objectstore in ("bluestore", "filestore") + assert drive_group.service_id + storage_class = self.get_storage_class() + inventory = self.get_discovered_devices() + creator: Optional[DefaultCreator] = None + if ( + storage_class.metadata.labels + and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels + ): + creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name) + else: + creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name) + return self._patch( + ccl.CephCluster, + 'cephclusters', + self.rook_env.cluster_name, + creator.add_osds(self.rook_pods, drive_group, matching_hosts) + ) + + def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: + inventory = self.get_discovered_devices() + self.remover = DefaultRemover( + self.coreV1_api, + self.batchV1_api, + self.appsV1_api, + osd_ids, + replace, + force, + mon_command, + self._patch, + self.rook_env, + inventory + ) + return self.remover.remove() + + def get_hosts(self) -> List[orchestrator.HostSpec]: + ret = [] + for node in self.nodes.items: + spec = orchestrator.HostSpec( + node.metadata.name, + addr='/'.join([addr.address for addr in node.status.addresses]), + labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')], + ) + ret.append(spec) + return ret + + def create_zap_job(self, host: str, path: str) -> None: + body = client.V1Job( + api_version="batch/v1", + metadata=client.V1ObjectMeta( + name="rook-ceph-device-zap", + namespace=self.rook_env.namespace + ), + spec=client.V1JobSpec( + template=client.V1PodTemplateSpec( + spec=client.V1PodSpec( + containers=[ + client.V1Container( + name="device-zap", + image="rook/ceph:master", + command=["bash"], + args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"], + env=[ + client.V1EnvVar( + name="ROOK_CEPH_USERNAME", + value_from=client.V1EnvVarSource( + secret_key_ref=client.V1SecretKeySelector( + key="ceph-username", + name="rook-ceph-mon" + ) + ) + ), + client.V1EnvVar( + name="ROOK_CEPH_SECRET", + value_from=client.V1EnvVarSource( + secret_key_ref=client.V1SecretKeySelector( + key="ceph-secret", + name="rook-ceph-mon" + ) + ) + ) + ], + security_context=client.V1SecurityContext( + run_as_user=0, + privileged=True + ), + volume_mounts=[ + client.V1VolumeMount( + mount_path="/etc/ceph", + name="ceph-conf-emptydir" + ), + client.V1VolumeMount( + mount_path="/etc/rook", + name="rook-config" + ), + client.V1VolumeMount( + mount_path="/dev", + name="devices" + ) + ] + ) + ], + volumes=[ + client.V1Volume( + name="ceph-conf-emptydir", + empty_dir=client.V1EmptyDirVolumeSource() + ), + client.V1Volume( + name="rook-config", + empty_dir=client.V1EmptyDirVolumeSource() + ), + client.V1Volume( + name="devices", + host_path=client.V1HostPathVolumeSource( + path="/dev" + ) + ), + ], + node_selector={ + "kubernetes.io/hostname": host + }, + restart_policy="Never" + ) + ) + ) + ) + self.batchV1_api.create_namespaced_job(self.rook_env.namespace, body) + + def rbd_mirror(self, spec: ServiceSpec) -> None: + service_id = spec.service_id or "default-rbd-mirror" + all_hosts = self.get_hosts() + def _create_rbd_mirror() -> crbdm.CephRBDMirror: + return crbdm.CephRBDMirror( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=service_id, + namespace=self.rook_env.namespace, + ), + spec=crbdm.Spec( + count=spec.placement.count or 1, + placement=crbdm.Placement( + nodeAffinity=crbdm.NodeAffinity( + requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=crbdm.NodeSelectorTermsList( + [ + placement_spec_to_node_selector(spec.placement, all_hosts) + ] + ) + ) + ) + ) + ) + ) + def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror: + new.spec.count = spec.placement.count or 1 + new.spec.placement = crbdm.Placement( + nodeAffinity=crbdm.NodeAffinity( + requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=crbdm.NodeSelectorTermsList( + [ + placement_spec_to_node_selector(spec.placement, all_hosts) + ] + ) + ) + ) + ) + return new + self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror) + def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: + current_json = self.rook_api_get( + "{}/{}".format(crd_name, cr_name) + ) + + current = crd.from_json(current_json) + new = crd.from_json(current_json) # no deepcopy. + + new = func(current, new) + + patch = list(jsonpatch.make_patch(current_json, new.to_json())) + + log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) + + if len(patch) == 0: + return "No change" + + try: + self.rook_api_patch( + "{}/{}".format(crd_name, cr_name), + body=patch) + except ApiException as e: + log.exception("API exception: {0}".format(e)) + raise ApplyException( + "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) + + return "Success" + + def _create_or_patch(self, + crd: Type, + crd_name: str, + cr_name: str, + update_func: Callable[[CrdClassT], CrdClassT], + create_func: Callable[[], CrdClassT]) -> str: + try: + current_json = self.rook_api_get( + "{}/{}".format(crd_name, cr_name) + ) + except ApiException as e: + if e.status == 404: + current_json = None + else: + raise + + if current_json: + new = crd.from_json(current_json) # no deepcopy. + + new = update_func(new) + + patch = list(jsonpatch.make_patch(current_json, new.to_json())) + + log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) + + if len(patch) == 0: + return "No change" + + try: + self.rook_api_patch( + "{}/{}".format(crd_name, cr_name), + body=patch) + except ApiException as e: + log.exception("API exception: {0}".format(e)) + raise ApplyException( + "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) + return "Updated" + else: + new = create_func() + with self.ignore_409("{} {} already exists".format(crd_name, + cr_name)): + self.rook_api_post("{}/".format(crd_name), + body=new.to_json()) + return "Created" + def get_ceph_image(self) -> str: + try: + api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, + label_selector="app=rook-ceph-mon", + timeout_seconds=10) + if api_response.items: + return api_response.items[-1].spec.containers[0].image + else: + raise orchestrator.OrchestratorError( + "Error getting ceph image. Cluster without monitors") + except ApiException as e: + raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e)) + + + def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str: + operation_id = str(hash(loc)) + message = "" + + # job definition + job_metadata = client.V1ObjectMeta(name=operation_id, + namespace= self.rook_env.namespace, + labels={"ident": operation_id}) + pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id}) + pod_container = client.V1Container(name="ceph-lsmcli-command", + security_context=client.V1SecurityContext(privileged=True), + image=self.get_ceph_image(), + command=["lsmcli",], + args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'), + '--path', loc.path or loc.dev,], + volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"), + client.V1VolumeMount(name="run-udev", mount_path="/run/udev")]) + pod_spec = client.V1PodSpec(containers=[pod_container], + active_deadline_seconds=30, # Max time to terminate pod + restart_policy="Never", + node_selector= {"kubernetes.io/hostname": loc.host}, + volumes=[client.V1Volume(name="devices", + host_path=client.V1HostPathVolumeSource(path="/dev")), + client.V1Volume(name="run-udev", + host_path=client.V1HostPathVolumeSource(path="/run/udev"))]) + pod_template = client.V1PodTemplateSpec(metadata=pod_metadata, + spec=pod_spec) + job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job + ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed) + backoff_limit=0, + template=pod_template) + job = client.V1Job(api_version="batch/v1", + kind="Job", + metadata=job_metadata, + spec=job_spec) + + # delete previous job if it exists + try: + try: + api_response = self.batchV1_api.delete_namespaced_job(operation_id, + self.rook_env.namespace, + propagation_policy="Background") + except ApiException as e: + if e.status != 404: # No problem if the job does not exist + raise + + # wait until the job is not present + deleted = False + retries = 0 + while not deleted and retries < 10: + api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace, + label_selector="ident=%s" % operation_id, + timeout_seconds=10) + deleted = not api_response.items + if retries > 5: + sleep(0.1) + retries += 1 + if retries == 10 and not deleted: + raise orchestrator.OrchestratorError( + "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format( + on, loc.host, loc.path or loc.dev, operation_id)) + + # create the job + api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job) + + # get the result + finished = False + while not finished: + api_response = self.batchV1_api.read_namespaced_job(operation_id, + self.rook_env.namespace) + finished = api_response.status.succeeded or api_response.status.failed + if finished: + message = api_response.status.conditions[-1].message + + # get the result of the lsmcli command + api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, + label_selector="ident=%s" % operation_id, + timeout_seconds=10) + if api_response.items: + pod_name = api_response.items[-1].metadata.name + message = self.coreV1_api.read_namespaced_pod_log(pod_name, + self.rook_env.namespace) + + except ApiException as e: + log.exception('K8s API failed. {}'.format(e)) + raise + + # Finally, delete the job. + # The job uses . This makes that the TTL controller delete automatically the job. + # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically + try: + api_response = self.batchV1_api.delete_namespaced_job(operation_id, + self.rook_env.namespace, + propagation_policy="Background") + except ApiException as e: + if e.status != 404: # No problem if the job does not exist + raise + + return message + + def blink_light(self, ident_fault, on, locs): + # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str] + return [self._execute_blight_job(ident_fault, on, loc) for loc in locs] + +def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem: + all_hostnames = [hs.hostname for hs in all_hosts] + res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList()) + if spec.host_pattern and spec.host_pattern != "*": + raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements") + if spec.label: + res.matchExpressions.append( + ccl.MatchExpressionsItem( + key="ceph-label/" + spec.label, + operator="Exists" + ) + ) + if spec.hosts: + host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames] + res.matchExpressions.append( + ccl.MatchExpressionsItem( + key="kubernetes.io/hostname", + operator="In", + values=ccl.CrdObjectList(host_list) + ) + ) + if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern): + res.matchExpressions.append( + ccl.MatchExpressionsItem( + key="kubernetes.io/hostname", + operator="Exists", + ) + ) + return res + +def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec: + res = PlacementSpec() + for expression in node_selector.matchExpressions: + if expression.key.startswith("ceph-label/"): + res.label = expression.key.split('/')[1] + elif expression.key == "kubernetes.io/hostname": + if expression.operator == "Exists": + res.host_pattern = "*" + elif expression.operator == "In": + res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values] + return res -- cgit v1.2.3