summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rook/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rook/module.py')
-rw-r--r--src/pybind/mgr/rook/module.py220
1 files changed, 67 insertions, 153 deletions
diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py
index fa75db2cf..34ed15bc6 100644
--- a/src/pybind/mgr/rook/module.py
+++ b/src/pybind/mgr/rook/module.py
@@ -82,12 +82,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
default='local',
desc='storage class name for LSO-discovered PVs',
),
- Option(
- 'drive_group_interval',
- type='float',
- default=300.0,
- desc='interval in seconds between re-application of applied drive_groups',
- ),
]
@staticmethod
@@ -126,9 +120,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self.config_notify()
if TYPE_CHECKING:
self.storage_class = 'foo'
- self.drive_group_interval = 10.0
- self._load_drive_groups()
self._shutdown = threading.Event()
def config_notify(self) -> None:
@@ -144,7 +136,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
assert isinstance(self.storage_class, str)
- assert isinstance(self.drive_group_interval, float)
if self._rook_cluster:
self._rook_cluster.storage_class_name = self.storage_class
@@ -211,10 +202,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self._initialized.set()
self.config_notify()
- while not self._shutdown.is_set():
- self._apply_drivegroups(list(self._drive_group_map.values()))
- self._shutdown.wait(self.drive_group_interval)
-
@handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
@@ -257,6 +244,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
image_name = cl['spec'].get('cephVersion', {}).get('image', None)
num_nodes = len(self.rook_cluster.get_node_names())
+ def sum_running_pods(service_type: str, service_name: Optional[str] = None) -> int:
+ all_pods = self.rook_cluster.describe_pods(None, None, None)
+ if service_name is None:
+ return sum(pod['phase'] == 'Running' for pod in all_pods if pod['labels']['app'] == f"rook-ceph-{service_type}")
+ else:
+ if service_type == 'mds':
+ key = 'rook_file_system'
+ elif service_type == 'rgw':
+ key = 'rook_object_store'
+ elif service_type == 'nfs':
+ key = 'ceph_nfs'
+ else:
+ self.log.error(f"Unknow service type {service_type}")
+ return 0
+
+ return sum(pod['phase'] == 'Running' \
+ for pod in all_pods \
+ if pod['labels']['app'] == f"rook-ceph-{service_type}" \
+ and service_name == pod['labels'][key])
+
spec = {}
if service_type == 'mon' or service_type is None:
spec['mon'] = orchestrator.ServiceDescription(
@@ -269,6 +276,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=cl['spec'].get('mon', {}).get('count', 1),
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mon')
)
if service_type == 'mgr' or service_type is None:
spec['mgr'] = orchestrator.ServiceDescription(
@@ -279,6 +287,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=1,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mgr')
)
if (
@@ -293,13 +302,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=num_nodes,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('crashcollector')
)
if service_type == 'mds' or service_type is None:
# CephFilesystems
all_fs = self.rook_cluster.get_resource("cephfilesystems")
for fs in all_fs:
- svc = 'mds.' + fs['metadata']['name']
+ fs_name = fs['metadata']['name']
+ svc = 'mds.' + fs_name
if svc in spec:
continue
# FIXME: we are conflating active (+ standby) with count
@@ -316,13 +327,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=total_mds,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mds', fs_name)
)
if service_type == 'rgw' or service_type is None:
# CephObjectstores
all_zones = self.rook_cluster.get_resource("cephobjectstores")
for zone in all_zones:
- svc = 'rgw.' + zone['metadata']['name']
+ zone_name = zone['metadata']['name']
+ svc = 'rgw.' + zone_name
if svc in spec:
continue
active = zone['spec']['gateway']['instances'];
@@ -344,6 +357,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=active,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('rgw', zone_name)
)
if service_type == 'nfs' or service_type is None:
@@ -368,7 +382,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
),
size=active,
last_refresh=now,
- running=len([1 for pod in nfs_pods if pod['labels']['ceph_nfs'] == nfs_name]),
+ running=sum_running_pods('nfs', nfs_name),
created=creation_timestamp.astimezone(tz=datetime.timezone.utc)
)
if service_type == 'osd' or service_type is None:
@@ -385,18 +399,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
),
size=len(all_osds),
last_refresh=now,
- running=sum(osd.status.phase == 'Running' for osd in all_osds)
+ running=sum_running_pods('osd')
)
- # drivegroups
- for name, dg in self._drive_group_map.items():
- spec[f'osd.{name}'] = orchestrator.ServiceDescription(
- spec=dg,
- last_refresh=now,
- size=0,
- running=0,
- )
-
if service_type == 'rbd-mirror' or service_type is None:
# rbd-mirrors
all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
@@ -414,13 +419,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
),
size=1,
last_refresh=now,
+ running=sum_running_pods('rbd-mirror', mirror_name)
)
-
+
for dd in self._list_daemons():
if dd.service_name() not in spec:
continue
service = spec[dd.service_name()]
- service.running += 1
if not service.container_image_id:
service.container_image_id = dd.container_image_id
if not service.container_image_name:
@@ -451,11 +456,25 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
daemon_id: Optional[str] = None,
host: Optional[str] = None,
refresh: bool = False) -> List[orchestrator.DaemonDescription]:
+
+ def _pod_to_servicename(pod: Dict[str, Any]) -> Optional[str]:
+ if 'ceph_daemon_type' not in pod['labels']:
+ return None
+ daemon_type = pod['labels']['ceph_daemon_type']
+ if daemon_type in ['mds', 'rgw', 'nfs', 'rbd-mirror']:
+ if 'app.kubernetes.io/part-of' in pod['labels']:
+ service_name = f"{daemon_type}.{pod['labels']['app.kubernetes.io/part-of']}"
+ else:
+ service_name = f"{daemon_type}"
+ else:
+ service_name = f"{daemon_type}"
+ return service_name
+
pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
- self.log.debug('pods %s' % pods)
result = []
for p in pods:
- sd = orchestrator.DaemonDescription()
+ pod_svc_name = _pod_to_servicename(p)
+ sd = orchestrator.DaemonDescription(service_name=pod_svc_name)
sd.hostname = p['hostname']
# In Rook environments, the 'ceph-exporter' daemon is named 'exporter' whereas
@@ -535,9 +554,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
elif service_type == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', service_id)
elif service_type == 'osd':
- if service_id in self._drive_group_map:
- del self._drive_group_map[service_id]
- self._save_drive_groups()
return f'Removed {service_name}'
elif service_type == 'ingress':
self.log.info("{0} service '{1}' does not exist".format('ingress', service_id))
@@ -593,135 +609,33 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
def remove_daemons(self, names: List[str]) -> List[str]:
return self.rook_cluster.remove_pods(names)
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
- for drive_group in specs:
- self._drive_group_map[str(drive_group.service_id)] = drive_group
- self._save_drive_groups()
- return OrchResult(self._apply_drivegroups(specs))
-
- def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
- all_hosts = raise_if_exception(self.get_hosts())
- result_list: List[str] = []
- for drive_group in ls:
- matching_hosts = drive_group.placement.filter_matching_hosts(
- lambda label=None, as_hostspec=None: all_hosts
- )
+ def add_host_label(self, host: str, label: str) -> OrchResult[str]:
+ return self.rook_cluster.add_host_label(host, label)
+
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
+ return self.rook_cluster.remove_host_label(host, label)
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
- result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
- return result_list
-
- def _load_drive_groups(self) -> None:
- stored_drive_group = self.get_store("drive_group_map")
- self._drive_group_map: Dict[str, DriveGroupSpec] = {}
- if stored_drive_group:
- for name, dg in json.loads(stored_drive_group).items():
- try:
- self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
- except ValueError as e:
- self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
-
- def _save_drive_groups(self) -> None:
- json_drive_group_map = {
- name: dg.to_json() for name, dg in self._drive_group_map.items()
- }
- self.set_store("drive_group_map", json.dumps(json_drive_group_map))
+ @handle_orch_error
+ def create_osds(self, drive_group: DriveGroupSpec) -> str:
+ raise orchestrator.OrchestratorError('Creating OSDs is not supported by rook orchestrator. Please, use Rook operator.')
+ @handle_orch_error
def remove_osds(self,
osd_ids: List[str],
replace: bool = False,
force: bool = False,
zap: bool = False,
- no_destroy: bool = False) -> OrchResult[str]:
- assert self._rook_cluster is not None
- if zap:
- raise RuntimeError("Rook does not support zapping devices during OSD removal.")
- res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
- return OrchResult(res)
+ no_destroy: bool = False) -> str:
+ raise orchestrator.OrchestratorError('Removing OSDs is not supported by rook orchestrator. Please, use Rook operator.')
- def add_host_label(self, host: str, label: str) -> OrchResult[str]:
- return self.rook_cluster.add_host_label(host, label)
-
- def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
- return self.rook_cluster.remove_host_label(host, label)
- """
@handle_orch_error
- def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> str
- # Creates OSDs from a drive group specification.
-
- # $: ceph orch osd create -i <dg.file>
-
- # The drivegroup file must only contain one spec at a time.
- #
-
- targets = [] # type: List[str]
- if drive_group.data_devices and drive_group.data_devices.paths:
- targets += [d.path for d in drive_group.data_devices.paths]
- if drive_group.data_directories:
- targets += drive_group.data_directories
-
- all_hosts = raise_if_exception(self.get_hosts())
-
- matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
-
- assert len(matching_hosts) == 1
-
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
-
- return self.rook_cluster.add_osds(drive_group, matching_hosts)
-
- # TODO: this was the code to update the progress reference:
-
- @handle_orch_error
- def has_osds(matching_hosts: List[str]) -> bool:
-
- # Find OSD pods on this host
- pod_osd_ids = set()
- pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
- label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
- field_selector="spec.nodeName={0}".format(
- matching_hosts[0]
- )).items
- for p in pods:
- pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
-
- self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
-
- found = []
- osdmap = self.get("osd_map")
- for osd in osdmap['osds']:
- osd_id = osd['osd']
- if osd_id not in pod_osd_ids:
- continue
-
- metadata = self.get_metadata('osd', "%s" % osd_id)
- if metadata and metadata['devices'] in targets:
- found.append(osd_id)
- else:
- self.log.info("ignoring osd {0} {1}".format(
- osd_id, metadata['devices'] if metadata else 'DNE'
- ))
+ def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
+ return self.rook_cluster.blink_light(ident_fault, on, locs)
- return found is not None
- """
+ @handle_orch_error
+ def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
+ return orchestrator.UpgradeStatusSpec()
@handle_orch_error
- def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
- return self.rook_cluster.blink_light(ident_fault, on, locs)
+ def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
+ return {}