diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-23 16:45:13 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-23 16:45:13 +0000 |
commit | 389020e14594e4894e28d1eb9103c210b142509e (patch) | |
tree | 2ba734cdd7a243f46dda7c3d0cc88c2293d9699f /src/pybind/mgr/rook/module.py | |
parent | Adding upstream version 18.2.2. (diff) | |
download | ceph-389020e14594e4894e28d1eb9103c210b142509e.tar.xz ceph-389020e14594e4894e28d1eb9103c210b142509e.zip |
Adding upstream version 18.2.3.upstream/18.2.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/rook/module.py')
-rw-r--r-- | src/pybind/mgr/rook/module.py | 220 |
1 files changed, 67 insertions, 153 deletions
diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index fa75db2cf..34ed15bc6 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -82,12 +82,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): default='local', desc='storage class name for LSO-discovered PVs', ), - Option( - 'drive_group_interval', - type='float', - default=300.0, - desc='interval in seconds between re-application of applied drive_groups', - ), ] @staticmethod @@ -126,9 +120,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.config_notify() if TYPE_CHECKING: self.storage_class = 'foo' - self.drive_group_interval = 10.0 - self._load_drive_groups() self._shutdown = threading.Event() def config_notify(self) -> None: @@ -144,7 +136,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore assert isinstance(self.storage_class, str) - assert isinstance(self.drive_group_interval, float) if self._rook_cluster: self._rook_cluster.storage_class_name = self.storage_class @@ -211,10 +202,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.set() self.config_notify() - while not self._shutdown.is_set(): - self._apply_drivegroups(list(self._drive_group_map.values())) - self._shutdown.wait(self.drive_group_interval) - @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: host_list = None @@ -257,6 +244,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): image_name = cl['spec'].get('cephVersion', {}).get('image', None) num_nodes = len(self.rook_cluster.get_node_names()) + def sum_running_pods(service_type: str, service_name: Optional[str] = None) -> int: + all_pods = self.rook_cluster.describe_pods(None, None, None) + if service_name is None: + return sum(pod['phase'] == 'Running' for pod in all_pods if pod['labels']['app'] == f"rook-ceph-{service_type}") + else: + if service_type == 'mds': + key = 'rook_file_system' + elif service_type == 'rgw': + key = 'rook_object_store' + elif service_type == 'nfs': + key = 'ceph_nfs' + else: + self.log.error(f"Unknow service type {service_type}") + return 0 + + return sum(pod['phase'] == 'Running' \ + for pod in all_pods \ + if pod['labels']['app'] == f"rook-ceph-{service_type}" \ + and service_name == pod['labels'][key]) + spec = {} if service_type == 'mon' or service_type is None: spec['mon'] = orchestrator.ServiceDescription( @@ -269,6 +276,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=cl['spec'].get('mon', {}).get('count', 1), container_image_name=image_name, last_refresh=now, + running=sum_running_pods('mon') ) if service_type == 'mgr' or service_type is None: spec['mgr'] = orchestrator.ServiceDescription( @@ -279,6 +287,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=1, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('mgr') ) if ( @@ -293,13 +302,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=num_nodes, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('crashcollector') ) if service_type == 'mds' or service_type is None: # CephFilesystems all_fs = self.rook_cluster.get_resource("cephfilesystems") for fs in all_fs: - svc = 'mds.' + fs['metadata']['name'] + fs_name = fs['metadata']['name'] + svc = 'mds.' + fs_name if svc in spec: continue # FIXME: we are conflating active (+ standby) with count @@ -316,13 +327,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=total_mds, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('mds', fs_name) ) if service_type == 'rgw' or service_type is None: # CephObjectstores all_zones = self.rook_cluster.get_resource("cephobjectstores") for zone in all_zones: - svc = 'rgw.' + zone['metadata']['name'] + zone_name = zone['metadata']['name'] + svc = 'rgw.' + zone_name if svc in spec: continue active = zone['spec']['gateway']['instances']; @@ -344,6 +357,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=active, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('rgw', zone_name) ) if service_type == 'nfs' or service_type is None: @@ -368,7 +382,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ), size=active, last_refresh=now, - running=len([1 for pod in nfs_pods if pod['labels']['ceph_nfs'] == nfs_name]), + running=sum_running_pods('nfs', nfs_name), created=creation_timestamp.astimezone(tz=datetime.timezone.utc) ) if service_type == 'osd' or service_type is None: @@ -385,18 +399,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ), size=len(all_osds), last_refresh=now, - running=sum(osd.status.phase == 'Running' for osd in all_osds) + running=sum_running_pods('osd') ) - # drivegroups - for name, dg in self._drive_group_map.items(): - spec[f'osd.{name}'] = orchestrator.ServiceDescription( - spec=dg, - last_refresh=now, - size=0, - running=0, - ) - if service_type == 'rbd-mirror' or service_type is None: # rbd-mirrors all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors") @@ -414,13 +419,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ), size=1, last_refresh=now, + running=sum_running_pods('rbd-mirror', mirror_name) ) - + for dd in self._list_daemons(): if dd.service_name() not in spec: continue service = spec[dd.service_name()] - service.running += 1 if not service.container_image_id: service.container_image_id = dd.container_image_id if not service.container_image_name: @@ -451,11 +456,25 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> List[orchestrator.DaemonDescription]: + + def _pod_to_servicename(pod: Dict[str, Any]) -> Optional[str]: + if 'ceph_daemon_type' not in pod['labels']: + return None + daemon_type = pod['labels']['ceph_daemon_type'] + if daemon_type in ['mds', 'rgw', 'nfs', 'rbd-mirror']: + if 'app.kubernetes.io/part-of' in pod['labels']: + service_name = f"{daemon_type}.{pod['labels']['app.kubernetes.io/part-of']}" + else: + service_name = f"{daemon_type}" + else: + service_name = f"{daemon_type}" + return service_name + pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host) - self.log.debug('pods %s' % pods) result = [] for p in pods: - sd = orchestrator.DaemonDescription() + pod_svc_name = _pod_to_servicename(p) + sd = orchestrator.DaemonDescription(service_name=pod_svc_name) sd.hostname = p['hostname'] # In Rook environments, the 'ceph-exporter' daemon is named 'exporter' whereas @@ -535,9 +554,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): elif service_type == 'rbd-mirror': return self.rook_cluster.rm_service('cephrbdmirrors', service_id) elif service_type == 'osd': - if service_id in self._drive_group_map: - del self._drive_group_map[service_id] - self._save_drive_groups() return f'Removed {service_name}' elif service_type == 'ingress': self.log.info("{0} service '{1}' does not exist".format('ingress', service_id)) @@ -593,135 +609,33 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def remove_daemons(self, names: List[str]) -> List[str]: return self.rook_cluster.remove_pods(names) - def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: - for drive_group in specs: - self._drive_group_map[str(drive_group.service_id)] = drive_group - self._save_drive_groups() - return OrchResult(self._apply_drivegroups(specs)) - - def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]: - all_hosts = raise_if_exception(self.get_hosts()) - result_list: List[str] = [] - for drive_group in ls: - matching_hosts = drive_group.placement.filter_matching_hosts( - lambda label=None, as_hostspec=None: all_hosts - ) + def add_host_label(self, host: str, label: str) -> OrchResult[str]: + return self.rook_cluster.add_host_label(host, label) + + def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: + return self.rook_cluster.remove_host_label(host, label) - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts)) - return result_list - - def _load_drive_groups(self) -> None: - stored_drive_group = self.get_store("drive_group_map") - self._drive_group_map: Dict[str, DriveGroupSpec] = {} - if stored_drive_group: - for name, dg in json.loads(stored_drive_group).items(): - try: - self._drive_group_map[name] = DriveGroupSpec.from_json(dg) - except ValueError as e: - self.log.error(f'Failed to load drive group {name} ({dg}): {e}') - - def _save_drive_groups(self) -> None: - json_drive_group_map = { - name: dg.to_json() for name, dg in self._drive_group_map.items() - } - self.set_store("drive_group_map", json.dumps(json_drive_group_map)) + @handle_orch_error + def create_osds(self, drive_group: DriveGroupSpec) -> str: + raise orchestrator.OrchestratorError('Creating OSDs is not supported by rook orchestrator. Please, use Rook operator.') + @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False, - no_destroy: bool = False) -> OrchResult[str]: - assert self._rook_cluster is not None - if zap: - raise RuntimeError("Rook does not support zapping devices during OSD removal.") - res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command) - return OrchResult(res) + no_destroy: bool = False) -> str: + raise orchestrator.OrchestratorError('Removing OSDs is not supported by rook orchestrator. Please, use Rook operator.') - def add_host_label(self, host: str, label: str) -> OrchResult[str]: - return self.rook_cluster.add_host_label(host, label) - - def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: - return self.rook_cluster.remove_host_label(host, label) - """ @handle_orch_error - def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> str - # Creates OSDs from a drive group specification. - - # $: ceph orch osd create -i <dg.file> - - # The drivegroup file must only contain one spec at a time. - # - - targets = [] # type: List[str] - if drive_group.data_devices and drive_group.data_devices.paths: - targets += [d.path for d in drive_group.data_devices.paths] - if drive_group.data_directories: - targets += drive_group.data_directories - - all_hosts = raise_if_exception(self.get_hosts()) - - matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) - - assert len(matching_hosts) == 1 - - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - - return self.rook_cluster.add_osds(drive_group, matching_hosts) - - # TODO: this was the code to update the progress reference: - - @handle_orch_error - def has_osds(matching_hosts: List[str]) -> bool: - - # Find OSD pods on this host - pod_osd_ids = set() - pods = self.k8s.list_namespaced_pod(self._rook_env.namespace, - label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), - field_selector="spec.nodeName={0}".format( - matching_hosts[0] - )).items - for p in pods: - pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) - - self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids)) - - found = [] - osdmap = self.get("osd_map") - for osd in osdmap['osds']: - osd_id = osd['osd'] - if osd_id not in pod_osd_ids: - continue - - metadata = self.get_metadata('osd', "%s" % osd_id) - if metadata and metadata['devices'] in targets: - found.append(osd_id) - else: - self.log.info("ignoring osd {0} {1}".format( - osd_id, metadata['devices'] if metadata else 'DNE' - )) + def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: + return self.rook_cluster.blink_light(ident_fault, on, locs) - return found is not None - """ + @handle_orch_error + def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: + return orchestrator.UpgradeStatusSpec() @handle_orch_error - def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: - return self.rook_cluster.blink_light(ident_fault, on, locs) + def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]: + return {} |