summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rook/rook_cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rook/rook_cluster.py')
-rw-r--r--src/pybind/mgr/rook/rook_cluster.py382
1 files changed, 21 insertions, 361 deletions
diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py
index 5c7c9fc04..16d498a70 100644
--- a/src/pybind/mgr/rook/rook_cluster.py
+++ b/src/pybind/mgr/rook/rook_cluster.py
@@ -24,9 +24,20 @@ from urllib3.exceptions import ProtocolError
from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
+from ceph.deployment.service_spec import (
+ ServiceSpec,
+ NFSServiceSpec,
+ RGWSpec,
+ PlacementSpec,
+ HostPlacementSpec,
+ HostPattern,
+)
from ceph.utils import datetime_now
-from ceph.deployment.drive_selection.matchers import SizeMatcher
+from ceph.deployment.drive_selection.matchers import (
+ AllMatcher,
+ Matcher,
+ SizeMatcher,
+)
from nfs.cluster import create_ganesha_pool
from nfs.module import Module
from nfs.export import NFSRados
@@ -372,324 +383,6 @@ class KubernetesCustomResource(KubernetesResource):
"{} doesn't contain a metadata.name. Unable to track changes".format(
self.api_func))
-class DefaultCreator():
- def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'):
- self.coreV1_api = coreV1_api
- self.storage_class_name = storage_class_name
- self.inventory = inventory
-
- def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
- device_set = ccl.StorageClassDeviceSetsItem(
- name=d.sys_api['pv_name'],
- volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
- count=1,
- encrypted=drive_group.encrypted,
- portable=False
- )
- device_set.volumeClaimTemplates.append(
- ccl.VolumeClaimTemplatesItem(
- metadata=ccl.Metadata(
- name="data"
- ),
- spec=ccl.Spec(
- storageClassName=self.storage_class_name,
- volumeMode="Block",
- accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
- resources={
- "requests":{
- "storage": 1
- }
- },
- volumeName=d.sys_api['pv_name']
- )
- )
- )
- return device_set
-
- def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
- device_list = []
- assert drive_group.data_devices is not None
- sizematcher: Optional[SizeMatcher] = None
- if drive_group.data_devices.size:
- sizematcher = SizeMatcher('size', drive_group.data_devices.size)
- limit = getattr(drive_group.data_devices, 'limit', None)
- count = 0
- all = getattr(drive_group.data_devices, 'all', None)
- paths = [device.path for device in drive_group.data_devices.paths]
- osd_list = []
- for pod in rook_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
- ):
- osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
- for _, node in self.inventory.items():
- for device in node:
- if device.sys_api['pv_name'] in osd_list:
- count += 1
- for _, node in self.inventory.items():
- for device in node:
- if not limit or (count < limit):
- if device.available:
- if (
- all
- or (
- device.sys_api['node'] in matching_hosts
- and ((sizematcher != None) or sizematcher.compare(device))
- and (
- not drive_group.data_devices.paths
- or (device.path in paths)
- )
- )
- ):
- device_list.append(device)
- count += 1
-
- return device_list
-
- def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
- to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
- assert drive_group.data_devices is not None
- def _add_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
- if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
- new_cluster.spec.storage = ccl.Storage()
-
- if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
- new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
-
- existing_scds = [
- scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
- ]
- for device in to_create:
- new_scds = self.device_to_device_set(drive_group, device)
- if new_scds.name not in existing_scds:
- new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
- return new_cluster
- return _add_osds
-
-class LSOCreator(DefaultCreator):
- def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
- device_list = []
- assert drive_group.data_devices is not None
- sizematcher = None
- if drive_group.data_devices.size:
- sizematcher = SizeMatcher('size', drive_group.data_devices.size)
- limit = getattr(drive_group.data_devices, 'limit', None)
- all = getattr(drive_group.data_devices, 'all', None)
- paths = [device.path for device in drive_group.data_devices.paths]
- vendor = getattr(drive_group.data_devices, 'vendor', None)
- model = getattr(drive_group.data_devices, 'model', None)
- count = 0
- osd_list = []
- for pod in rook_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
- ):
- osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
- for _, node in self.inventory.items():
- for device in node:
- if device.sys_api['pv_name'] in osd_list:
- count += 1
- for _, node in self.inventory.items():
- for device in node:
- if not limit or (count < limit):
- if device.available:
- if (
- all
- or (
- device.sys_api['node'] in matching_hosts
- and ((sizematcher != None) or sizematcher.compare(device))
- and (
- not drive_group.data_devices.paths
- or device.path in paths
- )
- and (
- not vendor
- or device.sys_api['vendor'] == vendor
- )
- and (
- not model
- or device.sys_api['model'].startsWith(model)
- )
- )
- ):
- device_list.append(device)
- count += 1
- return device_list
-
-class DefaultRemover():
- def __init__(
- self,
- coreV1_api: 'client.CoreV1Api',
- batchV1_api: 'client.BatchV1Api',
- appsV1_api: 'client.AppsV1Api',
- osd_ids: List[str],
- replace_flag: bool,
- force_flag: bool,
- mon_command: Callable,
- patch: Callable,
- rook_env: 'RookEnv',
- inventory: Dict[str, List[Device]]
- ):
- self.batchV1_api = batchV1_api
- self.appsV1_api = appsV1_api
- self.coreV1_api = coreV1_api
-
- self.osd_ids = osd_ids
- self.replace_flag = replace_flag
- self.force_flag = force_flag
-
- self.mon_command = mon_command
-
- self.patch = patch
- self.rook_env = rook_env
-
- self.inventory = inventory
- self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
- namespace=self.rook_env.namespace,
- label_selector='app=rook-ceph-osd')
- self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job,
- namespace=self.rook_env.namespace,
- label_selector='app=rook-ceph-osd-prepare')
- self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim,
- namespace=self.rook_env.namespace)
-
-
- def remove_device_sets(self) -> str:
- self.to_remove: Dict[str, int] = {}
- self.pvc_to_remove: List[str] = []
- for pod in self.osd_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and pod.metadata.labels['osd'] in self.osd_ids
- ):
- if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
- self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
- else:
- self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
- self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
- def _remove_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
- assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
- for _set in new_cluster.spec.storage.storageClassDeviceSets:
- if _set.name in self.to_remove:
- if _set.count == self.to_remove[_set.name]:
- new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
- else:
- _set.count = _set.count - self.to_remove[_set.name]
- return new_cluster
- return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
-
- def check_force(self) -> None:
- if not self.force_flag:
- safe_args = {'prefix': 'osd safe-to-destroy',
- 'ids': [str(x) for x in self.osd_ids]}
- ret, out, err = self.mon_command(safe_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def set_osds_down(self) -> None:
- down_flag_args = {
- 'prefix': 'osd down',
- 'ids': [str(x) for x in self.osd_ids]
- }
- ret, out, err = self.mon_command(down_flag_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def scale_deployments(self) -> None:
- for osd_id in self.osd_ids:
- self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace,
- name='rook-ceph-osd-{}'.format(osd_id),
- body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0)))
-
- def set_osds_out(self) -> None:
- out_flag_args = {
- 'prefix': 'osd out',
- 'ids': [str(x) for x in self.osd_ids]
- }
- ret, out, err = self.mon_command(out_flag_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def delete_deployments(self) -> None:
- for osd_id in self.osd_ids:
- self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace,
- name='rook-ceph-osd-{}'.format(osd_id),
- propagation_policy='Foreground')
-
- def clean_up_prepare_jobs_and_pvc(self) -> None:
- for job in self.jobs.items:
- if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
- self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace,
- propagation_policy='Foreground')
- self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'],
- namespace=self.rook_env.namespace,
- propagation_policy='Foreground')
-
- def purge_osds(self) -> None:
- for id in self.osd_ids:
- purge_args = {
- 'prefix': 'osd purge-actual',
- 'id': int(id),
- 'yes_i_really_mean_it': True
- }
- ret, out, err = self.mon_command(purge_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def destroy_osds(self) -> None:
- for id in self.osd_ids:
- destroy_args = {
- 'prefix': 'osd destroy-actual',
- 'id': int(id),
- 'yes_i_really_mean_it': True
- }
- ret, out, err = self.mon_command(destroy_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def remove(self) -> str:
- try:
- self.check_force()
- except Exception as e:
- log.exception("Error checking if OSDs are safe to destroy")
- return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
- try:
- remove_result = self.remove_device_sets()
- except Exception as e:
- log.exception("Error patching ceph cluster CRD")
- return f"Not possible to modify Ceph cluster CRD: {e}"
- try:
- self.scale_deployments()
- self.delete_deployments()
- self.clean_up_prepare_jobs_and_pvc()
- except Exception as e:
- log.exception("Ceph cluster CRD patched, but error cleaning environment")
- return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
- try:
- self.set_osds_down()
- self.set_osds_out()
- if self.replace_flag:
- self.destroy_osds()
- else:
- self.purge_osds()
- except Exception as e:
- log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
- return f"Error removing OSDs from Ceph cluster: {e}"
-
- return remove_result
-
-
-
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
@@ -794,7 +487,12 @@ class RookCluster(object):
else:
fetcher = DefaultFetcher(sc.metadata.name, self.coreV1_api, self.rook_env)
fetcher.fetch()
- discovered_devices.update(fetcher.devices())
+ nodename_to_devices = fetcher.devices()
+ for node, devices in nodename_to_devices.items():
+ if node in discovered_devices:
+ discovered_devices[node].extend(devices)
+ else:
+ discovered_devices[node] = devices
return discovered_devices
@@ -1096,7 +794,6 @@ class RookCluster(object):
name=spec.rgw_zone
)
return object_store
-
def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
if new.spec.gateway:
@@ -1188,48 +885,11 @@ class RookCluster(object):
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
- def add_osds(self, drive_group, matching_hosts):
- # type: (DriveGroupSpec, List[str]) -> str
- assert drive_group.objectstore in ("bluestore", "filestore")
- assert drive_group.service_id
- storage_class = self.get_storage_class()
- inventory = self.get_discovered_devices()
- creator: Optional[DefaultCreator] = None
- if (
- storage_class.metadata.labels
- and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
- ):
- creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name)
- else:
- creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name)
- return self._patch(
- ccl.CephCluster,
- 'cephclusters',
- self.rook_env.cluster_name,
- creator.add_osds(self.rook_pods, drive_group, matching_hosts)
- )
-
- def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
- inventory = self.get_discovered_devices()
- self.remover = DefaultRemover(
- self.coreV1_api,
- self.batchV1_api,
- self.appsV1_api,
- osd_ids,
- replace,
- force,
- mon_command,
- self._patch,
- self.rook_env,
- inventory
- )
- return self.remover.remove()
-
def get_hosts(self) -> List[orchestrator.HostSpec]:
ret = []
for node in self.nodes.items:
spec = orchestrator.HostSpec(
- node.metadata.name,
+ node.metadata.name,
addr='/'.join([addr.address for addr in node.status.addresses]),
labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
)
@@ -1585,7 +1245,7 @@ def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) ->
res.label = expression.key.split('/')[1]
elif expression.key == "kubernetes.io/hostname":
if expression.operator == "Exists":
- res.host_pattern = "*"
+ res.host_pattern = HostPattern("*")
elif expression.operator == "In":
res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
return res