From 389020e14594e4894e28d1eb9103c210b142509e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 18:45:13 +0200 Subject: Adding upstream version 18.2.3. Signed-off-by: Daniel Baumann --- src/pybind/mgr/rook/rook_cluster.py | 382 ++---------------------------------- 1 file changed, 21 insertions(+), 361 deletions(-) (limited to 'src/pybind/mgr/rook/rook_cluster.py') diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index 5c7c9fc04..16d498a70 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -24,9 +24,20 @@ from urllib3.exceptions import ProtocolError from ceph.deployment.inventory import Device from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec +from ceph.deployment.service_spec import ( + ServiceSpec, + NFSServiceSpec, + RGWSpec, + PlacementSpec, + HostPlacementSpec, + HostPattern, +) from ceph.utils import datetime_now -from ceph.deployment.drive_selection.matchers import SizeMatcher +from ceph.deployment.drive_selection.matchers import ( + AllMatcher, + Matcher, + SizeMatcher, +) from nfs.cluster import create_ganesha_pool from nfs.module import Module from nfs.export import NFSRados @@ -372,324 +383,6 @@ class KubernetesCustomResource(KubernetesResource): "{} doesn't contain a metadata.name. Unable to track changes".format( self.api_func)) -class DefaultCreator(): - def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'): - self.coreV1_api = coreV1_api - self.storage_class_name = storage_class_name - self.inventory = inventory - - def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: - device_set = ccl.StorageClassDeviceSetsItem( - name=d.sys_api['pv_name'], - volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), - count=1, - encrypted=drive_group.encrypted, - portable=False - ) - device_set.volumeClaimTemplates.append( - ccl.VolumeClaimTemplatesItem( - metadata=ccl.Metadata( - name="data" - ), - spec=ccl.Spec( - storageClassName=self.storage_class_name, - volumeMode="Block", - accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), - resources={ - "requests":{ - "storage": 1 - } - }, - volumeName=d.sys_api['pv_name'] - ) - ) - ) - return device_set - - def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: - device_list = [] - assert drive_group.data_devices is not None - sizematcher: Optional[SizeMatcher] = None - if drive_group.data_devices.size: - sizematcher = SizeMatcher('size', drive_group.data_devices.size) - limit = getattr(drive_group.data_devices, 'limit', None) - count = 0 - all = getattr(drive_group.data_devices, 'all', None) - paths = [device.path for device in drive_group.data_devices.paths] - osd_list = [] - for pod in rook_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and 'ceph.rook.io/DeviceSet' in pod.metadata.labels - ): - osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) - for _, node in self.inventory.items(): - for device in node: - if device.sys_api['pv_name'] in osd_list: - count += 1 - for _, node in self.inventory.items(): - for device in node: - if not limit or (count < limit): - if device.available: - if ( - all - or ( - device.sys_api['node'] in matching_hosts - and ((sizematcher != None) or sizematcher.compare(device)) - and ( - not drive_group.data_devices.paths - or (device.path in paths) - ) - ) - ): - device_list.append(device) - count += 1 - - return device_list - - def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: - to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) - assert drive_group.data_devices is not None - def _add_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: - new_cluster.spec.storage = ccl.Storage() - - if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: - new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() - - existing_scds = [ - scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets - ] - for device in to_create: - new_scds = self.device_to_device_set(drive_group, device) - if new_scds.name not in existing_scds: - new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) - return new_cluster - return _add_osds - -class LSOCreator(DefaultCreator): - def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: - device_list = [] - assert drive_group.data_devices is not None - sizematcher = None - if drive_group.data_devices.size: - sizematcher = SizeMatcher('size', drive_group.data_devices.size) - limit = getattr(drive_group.data_devices, 'limit', None) - all = getattr(drive_group.data_devices, 'all', None) - paths = [device.path for device in drive_group.data_devices.paths] - vendor = getattr(drive_group.data_devices, 'vendor', None) - model = getattr(drive_group.data_devices, 'model', None) - count = 0 - osd_list = [] - for pod in rook_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and 'ceph.rook.io/DeviceSet' in pod.metadata.labels - ): - osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) - for _, node in self.inventory.items(): - for device in node: - if device.sys_api['pv_name'] in osd_list: - count += 1 - for _, node in self.inventory.items(): - for device in node: - if not limit or (count < limit): - if device.available: - if ( - all - or ( - device.sys_api['node'] in matching_hosts - and ((sizematcher != None) or sizematcher.compare(device)) - and ( - not drive_group.data_devices.paths - or device.path in paths - ) - and ( - not vendor - or device.sys_api['vendor'] == vendor - ) - and ( - not model - or device.sys_api['model'].startsWith(model) - ) - ) - ): - device_list.append(device) - count += 1 - return device_list - -class DefaultRemover(): - def __init__( - self, - coreV1_api: 'client.CoreV1Api', - batchV1_api: 'client.BatchV1Api', - appsV1_api: 'client.AppsV1Api', - osd_ids: List[str], - replace_flag: bool, - force_flag: bool, - mon_command: Callable, - patch: Callable, - rook_env: 'RookEnv', - inventory: Dict[str, List[Device]] - ): - self.batchV1_api = batchV1_api - self.appsV1_api = appsV1_api - self.coreV1_api = coreV1_api - - self.osd_ids = osd_ids - self.replace_flag = replace_flag - self.force_flag = force_flag - - self.mon_command = mon_command - - self.patch = patch - self.rook_env = rook_env - - self.inventory = inventory - self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, - namespace=self.rook_env.namespace, - label_selector='app=rook-ceph-osd') - self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, - namespace=self.rook_env.namespace, - label_selector='app=rook-ceph-osd-prepare') - self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, - namespace=self.rook_env.namespace) - - - def remove_device_sets(self) -> str: - self.to_remove: Dict[str, int] = {} - self.pvc_to_remove: List[str] = [] - for pod in self.osd_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and pod.metadata.labels['osd'] in self.osd_ids - ): - if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: - self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 - else: - self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 - self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc']) - def _remove_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None - for _set in new_cluster.spec.storage.storageClassDeviceSets: - if _set.name in self.to_remove: - if _set.count == self.to_remove[_set.name]: - new_cluster.spec.storage.storageClassDeviceSets.remove(_set) - else: - _set.count = _set.count - self.to_remove[_set.name] - return new_cluster - return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) - - def check_force(self) -> None: - if not self.force_flag: - safe_args = {'prefix': 'osd safe-to-destroy', - 'ids': [str(x) for x in self.osd_ids]} - ret, out, err = self.mon_command(safe_args) - if ret != 0: - raise RuntimeError(err) - - def set_osds_down(self) -> None: - down_flag_args = { - 'prefix': 'osd down', - 'ids': [str(x) for x in self.osd_ids] - } - ret, out, err = self.mon_command(down_flag_args) - if ret != 0: - raise RuntimeError(err) - - def scale_deployments(self) -> None: - for osd_id in self.osd_ids: - self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace, - name='rook-ceph-osd-{}'.format(osd_id), - body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0))) - - def set_osds_out(self) -> None: - out_flag_args = { - 'prefix': 'osd out', - 'ids': [str(x) for x in self.osd_ids] - } - ret, out, err = self.mon_command(out_flag_args) - if ret != 0: - raise RuntimeError(err) - - def delete_deployments(self) -> None: - for osd_id in self.osd_ids: - self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace, - name='rook-ceph-osd-{}'.format(osd_id), - propagation_policy='Foreground') - - def clean_up_prepare_jobs_and_pvc(self) -> None: - for job in self.jobs.items: - if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove: - self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace, - propagation_policy='Foreground') - self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], - namespace=self.rook_env.namespace, - propagation_policy='Foreground') - - def purge_osds(self) -> None: - for id in self.osd_ids: - purge_args = { - 'prefix': 'osd purge-actual', - 'id': int(id), - 'yes_i_really_mean_it': True - } - ret, out, err = self.mon_command(purge_args) - if ret != 0: - raise RuntimeError(err) - - def destroy_osds(self) -> None: - for id in self.osd_ids: - destroy_args = { - 'prefix': 'osd destroy-actual', - 'id': int(id), - 'yes_i_really_mean_it': True - } - ret, out, err = self.mon_command(destroy_args) - if ret != 0: - raise RuntimeError(err) - - def remove(self) -> str: - try: - self.check_force() - except Exception as e: - log.exception("Error checking if OSDs are safe to destroy") - return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}" - try: - remove_result = self.remove_device_sets() - except Exception as e: - log.exception("Error patching ceph cluster CRD") - return f"Not possible to modify Ceph cluster CRD: {e}" - try: - self.scale_deployments() - self.delete_deployments() - self.clean_up_prepare_jobs_and_pvc() - except Exception as e: - log.exception("Ceph cluster CRD patched, but error cleaning environment") - return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}" - try: - self.set_osds_down() - self.set_osds_out() - if self.replace_flag: - self.destroy_osds() - else: - self.purge_osds() - except Exception as e: - log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster") - return f"Error removing OSDs from Ceph cluster: {e}" - - return remove_result - - - class RookCluster(object): # import of client.CoreV1Api must be optional at import time. # Instead allow mgr/rook to be imported anyway. @@ -794,7 +487,12 @@ class RookCluster(object): else: fetcher = DefaultFetcher(sc.metadata.name, self.coreV1_api, self.rook_env) fetcher.fetch() - discovered_devices.update(fetcher.devices()) + nodename_to_devices = fetcher.devices() + for node, devices in nodename_to_devices.items(): + if node in discovered_devices: + discovered_devices[node].extend(devices) + else: + discovered_devices[node] = devices return discovered_devices @@ -1096,7 +794,6 @@ class RookCluster(object): name=spec.rgw_zone ) return object_store - def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore: if new.spec.gateway: @@ -1188,48 +885,11 @@ class RookCluster(object): return new return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) - def add_osds(self, drive_group, matching_hosts): - # type: (DriveGroupSpec, List[str]) -> str - assert drive_group.objectstore in ("bluestore", "filestore") - assert drive_group.service_id - storage_class = self.get_storage_class() - inventory = self.get_discovered_devices() - creator: Optional[DefaultCreator] = None - if ( - storage_class.metadata.labels - and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels - ): - creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name) - else: - creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name) - return self._patch( - ccl.CephCluster, - 'cephclusters', - self.rook_env.cluster_name, - creator.add_osds(self.rook_pods, drive_group, matching_hosts) - ) - - def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: - inventory = self.get_discovered_devices() - self.remover = DefaultRemover( - self.coreV1_api, - self.batchV1_api, - self.appsV1_api, - osd_ids, - replace, - force, - mon_command, - self._patch, - self.rook_env, - inventory - ) - return self.remover.remove() - def get_hosts(self) -> List[orchestrator.HostSpec]: ret = [] for node in self.nodes.items: spec = orchestrator.HostSpec( - node.metadata.name, + node.metadata.name, addr='/'.join([addr.address for addr in node.status.addresses]), labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')], ) @@ -1585,7 +1245,7 @@ def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> res.label = expression.key.split('/')[1] elif expression.key == "kubernetes.io/hostname": if expression.operator == "Exists": - res.host_pattern = "*" + res.host_pattern = HostPattern("*") elif expression.operator == "In": res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values] return res -- cgit v1.2.3