From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/cephadm/services/osd.py | 972 +++++++++++++++++++++++++++++++++ 1 file changed, 972 insertions(+) create mode 100644 src/pybind/mgr/cephadm/services/osd.py (limited to 'src/pybind/mgr/cephadm/services/osd.py') diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py new file mode 100644 index 000000000..bfecc5723 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -0,0 +1,972 @@ +import json +import logging +from asyncio import gather +from threading import Lock +from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING + +from ceph.deployment import translate +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.drive_selection import DriveSelection +from ceph.deployment.inventory import Device +from ceph.utils import datetime_to_str, str_to_datetime + +from datetime import datetime +import orchestrator +from cephadm.serve import CephadmServe +from cephadm.utils import SpecialHostLabels +from ceph.utils import datetime_now +from orchestrator import OrchestratorError, DaemonDescription +from mgr_module import MonCommandFailed + +from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + +logger = logging.getLogger(__name__) + + +class OSDService(CephService): + TYPE = 'osd' + + def create_from_spec(self, drive_group: DriveGroupSpec) -> str: + logger.debug(f"Processing DriveGroup {drive_group}") + osd_id_claims = OsdIdClaims(self.mgr) + if osd_id_claims.get(): + logger.info( + f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}") + + async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: + # skip this host if there has been no change in inventory + if not self.mgr.cache.osdspec_needs_apply(host, drive_group): + self.mgr.log.debug("skipping apply of %s on %s (no change)" % ( + host, drive_group)) + return None + # skip this host if we cannot schedule here + if self.mgr.inventory.has_label(host, SpecialHostLabels.DRAIN_DAEMONS): + return None + + osd_id_claims_for_host = osd_id_claims.filtered_by_host(host) + + cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection, + osd_id_claims_for_host) + if not cmds: + logger.debug("No data_devices, skipping DriveGroup: {}".format( + drive_group.service_id)) + return None + + logger.debug('Applying service osd.%s on host %s...' % ( + drive_group.service_id, host + )) + start_ts = datetime_now() + env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] + ret_msg = await self.create_single_host( + drive_group, host, cmds, + replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars + ) + self.mgr.cache.update_osdspec_last_applied( + host, drive_group.service_name(), start_ts + ) + self.mgr.cache.save_host(host) + return ret_msg + + async def all_hosts() -> List[Optional[str]]: + futures = [create_from_spec_one(h, ds) + for h, ds in self.prepare_drivegroup(drive_group)] + return await gather(*futures) + + with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'): + ret = self.mgr.wait_async(all_hosts()) + return ", ".join(filter(None, ret)) + + async def create_single_host(self, + drive_group: DriveGroupSpec, + host: str, cmds: List[str], replace_osd_ids: List[str], + env_vars: Optional[List[str]] = None) -> str: + for cmd in cmds: + out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars) + if code == 1 and ', it is already prepared' in '\n'.join(err): + # HACK: when we create against an existing LV, ceph-volume + # returns an error and the above message. To make this + # command idempotent, tolerate this "error" and continue. + logger.debug('the device was already prepared; continuing') + code = 0 + if code: + raise RuntimeError( + 'cephadm exited with an error code: %d, stderr:%s' % ( + code, '\n'.join(err))) + return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(), + replace_osd_ids) + + async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str, + replace_osd_ids: Optional[List[str]] = None) -> str: + + if replace_osd_ids is None: + replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host) + assert replace_osd_ids is not None + + # check result: lvm + osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json( + host, 'osd', 'ceph-volume', + [ + '--', + 'lvm', 'list', + '--format', 'json', + ]) + before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) + fsid = self.mgr._cluster_fsid + osd_uuid_map = self.mgr.get_osd_uuid_map() + created = [] + for osd_id, osds in osds_elems.items(): + for osd in osds: + if osd['type'] == 'db': + continue + if osd['tags']['ceph.cluster_fsid'] != fsid: + logger.debug('mismatched fsid, skipping %s' % osd) + continue + if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: + # if it exists but is part of the replacement operation, don't skip + continue + if self.mgr.cache.has_daemon(f'osd.{osd_id}', host): + # cephadm daemon instance already exists + logger.debug(f'osd id {osd_id} daemon already exists') + continue + if osd_id not in osd_uuid_map: + logger.debug('osd id {} does not exist in cluster'.format(osd_id)) + continue + if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: + logger.debug('mismatched osd uuid (cluster has %s, osd ' + 'has %s)' % ( + osd_uuid_map.get(osd_id), + osd['tags']['ceph.osd_fsid'])) + continue + + created.append(osd_id) + daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec( + service_name=service_name, + daemon_id=str(osd_id), + host=host, + daemon_type='osd', + ) + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + await CephadmServe(self.mgr)._create_daemon( + daemon_spec, + osd_uuid_map=osd_uuid_map) + + # check result: raw + raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json( + host, 'osd', 'ceph-volume', + [ + '--', + 'raw', 'list', + '--format', 'json', + ]) + for osd_uuid, osd in raw_elems.items(): + if osd.get('ceph_fsid') != fsid: + continue + osd_id = str(osd.get('osd_id', '-1')) + if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: + # if it exists but is part of the replacement operation, don't skip + continue + if self.mgr.cache.has_daemon(f'osd.{osd_id}', host): + # cephadm daemon instance already exists + logger.debug(f'osd id {osd_id} daemon already exists') + continue + if osd_id not in osd_uuid_map: + logger.debug('osd id {} does not exist in cluster'.format(osd_id)) + continue + if osd_uuid_map.get(osd_id) != osd_uuid: + logger.debug('mismatched osd uuid (cluster has %s, osd ' + 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid)) + continue + if osd_id in created: + continue + + created.append(osd_id) + daemon_spec = CephadmDaemonDeploySpec( + service_name=service_name, + daemon_id=osd_id, + host=host, + daemon_type='osd', + ) + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + await CephadmServe(self.mgr)._create_daemon( + daemon_spec, + osd_uuid_map=osd_uuid_map) + + if created: + self.mgr.cache.invalidate_host_devices(host) + self.mgr.cache.invalidate_autotune(host) + return "Created osd(s) %s on host '%s'" % (','.join(created), host) + else: + return "Created no osd(s) on host %s; already created?" % host + + def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: + # 1) use fn_filter to determine matching_hosts + matching_hosts = drive_group.placement.filter_matching_hostspecs( + self.mgr.cache.get_schedulable_hosts()) + # 2) Map the inventory to the InventoryHost object + host_ds_map = [] + + # set osd_id_claims + + def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]: + # This is stupid and needs to be loaded with the host + for _host, _inventory in inventory_dict.items(): + if _host == hostname: + return _inventory + raise OrchestratorError("No inventory found for host: {}".format(hostname)) + + # 3) iterate over matching_host and call DriveSelection + logger.debug(f"Checking matching hosts -> {matching_hosts}") + for host in matching_hosts: + inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) + logger.debug(f"Found inventory for host {inventory_for_host}") + + # List of Daemons on that host + dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name()) + dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host] + + drive_selection = DriveSelection(drive_group, inventory_for_host, + existing_daemons=len(dd_for_spec_and_host)) + logger.debug(f"Found drive selection {drive_selection}") + if drive_group.method and drive_group.method == 'raw': + # ceph-volume can currently only handle a 1:1 mapping + # of data/db/wal devices for raw mode osds. If db/wal devices + # are defined and the number does not match the number of data + # devices, we need to bail out + if drive_selection.data_devices() and drive_selection.db_devices(): + if len(drive_selection.data_devices()) != len(drive_selection.db_devices()): + raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found ' + f'{len(drive_selection.data_devices())} potential data device(s) and ' + f'{len(drive_selection.db_devices())} potential db device(s) on host {host}') + if drive_selection.data_devices() and drive_selection.wal_devices(): + if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()): + raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found ' + f'{len(drive_selection.data_devices())} potential data device(s) and ' + f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}') + host_ds_map.append((host, drive_selection)) + return host_ds_map + + @staticmethod + def driveselection_to_ceph_volume(drive_selection: DriveSelection, + osd_id_claims: Optional[List[str]] = None, + preview: bool = False) -> List[str]: + logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command") + cmds: List[str] = translate.to_ceph_volume(drive_selection, + osd_id_claims, preview=preview).run() + logger.debug(f"Resulting ceph-volume cmds: {cmds}") + return cmds + + def get_previews(self, host: str) -> List[Dict[str, Any]]: + # Find OSDSpecs that match host. + osdspecs = self.resolve_osdspecs_for_host(host) + return self.generate_previews(osdspecs, host) + + def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]: + """ + + The return should look like this: + + [ + {'data': {}, + 'osdspec': , + 'host': , + 'notes': + }, + + {'data': ..., + 'osdspec': .., + 'host': ..., + 'notes': ... + } + ] + + Note: One host can have multiple previews based on its assigned OSDSpecs. + """ + self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}") + ret_all: List[Dict[str, Any]] = [] + if not osdspecs: + return ret_all + for osdspec in osdspecs: + + # populate osd_id_claims + osd_id_claims = OsdIdClaims(self.mgr) + + # prepare driveselection + for host, ds in self.prepare_drivegroup(osdspec): + if host != for_host: + continue + + # driveselection for host + cmds: List[str] = self.driveselection_to_ceph_volume(ds, + osd_id_claims.filtered_by_host( + host), + preview=True) + if not cmds: + logger.debug("No data_devices, skipping DriveGroup: {}".format( + osdspec.service_name())) + continue + + # get preview data from ceph-volume + for cmd in cmds: + with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'): + out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd)) + if out: + try: + concat_out: Dict[str, Any] = json.loads(' '.join(out)) + except ValueError: + logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out)) + concat_out = {} + notes = [] + if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit: + found = len(concat_out) + limit = osdspec.data_devices.limit + notes.append( + f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})') + ret_all.append({'data': concat_out, + 'osdspec': osdspec.service_id, + 'host': host, + 'notes': notes}) + return ret_all + + def resolve_hosts_for_osdspecs(self, + specs: Optional[List[DriveGroupSpec]] = None + ) -> List[str]: + osdspecs = [] + if specs: + osdspecs = [cast(DriveGroupSpec, spec) for spec in specs] + if not osdspecs: + self.mgr.log.debug("No OSDSpecs found") + return [] + return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], []) + + def resolve_osdspecs_for_host(self, host: str, + specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]: + matching_specs = [] + self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>") + if not specs: + specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items() + if spec.service_type == 'osd'] + for spec in specs: + if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()): + self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>") + matching_specs.append(spec) + return matching_specs + + async def _run_ceph_volume_command(self, host: str, + cmd: str, env_vars: Optional[List[str]] = None + ) -> Tuple[List[str], List[str], int]: + self.mgr.inventory.assert_host(host) + + # get bootstrap key + ret, keyring, err = self.mgr.check_mon_command({ + 'prefix': 'auth get', + 'entity': 'client.bootstrap-osd', + }) + + j = json.dumps({ + 'config': self.mgr.get_minimal_ceph_conf(), + 'keyring': keyring, + }) + + split_cmd = cmd.split(' ') + _cmd = ['--config-json', '-', '--'] + _cmd.extend(split_cmd) + out, err, code = await CephadmServe(self.mgr)._run_cephadm( + host, 'osd', 'ceph-volume', + _cmd, + env_vars=env_vars, + stdin=j, + error_ok=True) + return out, err, code + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + # Do not remove the osd.N keyring, if we failed to deploy the OSD, because + # we cannot recover from it. The OSD keys are created by ceph-volume and not by + # us. + if not is_failed_deploy: + super().post_remove(daemon, is_failed_deploy=is_failed_deploy) + + +class OsdIdClaims(object): + """ + Retrieve and provide osd ids that can be reused in the cluster + """ + + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr + self.osd_host_map: Dict[str, List[str]] = dict() + self.refresh() + + def refresh(self) -> None: + try: + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'osd tree', + 'states': ['destroyed'], + 'format': 'json' + }) + except MonCommandFailed as e: + logger.exception('osd tree failed') + raise OrchestratorError(str(e)) + try: + tree = json.loads(out) + except ValueError: + logger.exception(f'Cannot decode JSON: \'{out}\'') + return + + nodes = tree.get('nodes', {}) + for node in nodes: + if node.get('type') == 'host': + self.osd_host_map.update( + {node.get('name'): [str(_id) for _id in node.get('children', list())]} + ) + if self.osd_host_map: + self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}") + + def get(self) -> Dict[str, List[str]]: + return self.osd_host_map + + def filtered_by_host(self, host: str) -> List[str]: + """ + Return the list of osd ids that can be reused in a host + + OSD id claims in CRUSH map are linked to the bare name of + the hostname. In case of FQDN hostnames the host is searched by the + bare name + """ + return self.osd_host_map.get(host.split(".")[0], []) + + +class RemoveUtil(object): + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr + + def get_osds_in_cluster(self) -> List[str]: + osd_map = self.mgr.get_osdmap() + return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] + + def osd_df(self) -> dict: + base_cmd = 'osd df' + ret, out, err = self.mgr.mon_command({ + 'prefix': base_cmd, + 'format': 'json' + }) + try: + return json.loads(out) + except ValueError: + logger.exception(f'Cannot decode JSON: \'{out}\'') + return {} + + def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: + if not osd_df: + osd_df = self.osd_df() + osd_nodes = osd_df.get('nodes', []) + for osd_node in osd_nodes: + if osd_node.get('id') == int(osd_id): + return osd_node.get('pgs', -1) + return -1 + + def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: + """ + Cut osd_id list in half until it's ok-to-stop + + :param osds: list of osd_ids + :return: list of ods_ids that can be stopped at once + """ + if not osds: + return [] + while not self.ok_to_stop(osds): + if len(osds) <= 1: + # can't even stop one OSD, aborting + self.mgr.log.debug( + "Can't even stop one OSD. Cluster is probably busy. Retrying later..") + return [] + + # This potentially prolongs the global wait time. + self.mgr.event.wait(1) + # splitting osd_ids in half until ok_to_stop yields success + # maybe popping ids off one by one is better here..depends on the cluster size I guess.. + # There's a lot of room for micro adjustments here + osds = osds[len(osds) // 2:] + return osds + + # todo start draining + # return all([osd.start_draining() for osd in osds]) + + def ok_to_stop(self, osds: List["OSD"]) -> bool: + cmd_args = { + 'prefix': "osd ok-to-stop", + 'ids': [str(osd.osd_id) for osd in osds] + } + return self._run_mon_cmd(cmd_args, error_ok=True) + + def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: + base_cmd = f"osd {flag}" + self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") + ret, out, err = self.mgr.mon_command({ + 'prefix': base_cmd, + 'ids': [str(osd.osd_id) for osd in osds] + }) + if ret != 0: + self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>") + return False + self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}") + return True + + def get_weight(self, osd: "OSD") -> Optional[float]: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'osd crush tree', + 'format': 'json', + }) + if ret != 0: + self.mgr.log.error(f"Could not dump crush weights. <{err}>") + return None + j = json.loads(out) + for n in j.get("nodes", []): + if n.get("name") == f"osd.{osd.osd_id}": + self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}") + return n.get("crush_weight") + return None + + def reweight_osd(self, osd: "OSD", weight: float) -> bool: + self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}") + ret, out, err = self.mgr.mon_command({ + 'prefix': "osd crush reweight", + 'name': f"osd.{osd.osd_id}", + 'weight': weight, + }) + if ret != 0: + self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>") + return False + self.mgr.log.info(f"{osd} weight is now {weight}") + return True + + def zap_osd(self, osd: "OSD") -> str: + "Zaps all devices that are associated with an OSD" + if osd.hostname is not None: + cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)] + if not osd.no_destroy: + cmd.append('--destroy') + with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'): + out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( + osd.hostname, 'osd', 'ceph-volume', + cmd, + error_ok=True)) + self.mgr.cache.invalidate_host_devices(osd.hostname) + if code: + raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) + return '\n'.join(out + err) + raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown") + + def safe_to_destroy(self, osd_ids: List[int]) -> bool: + """ Queries the safe-to-destroy flag for OSDs """ + cmd_args = {'prefix': 'osd safe-to-destroy', + 'ids': [str(x) for x in osd_ids]} + return self._run_mon_cmd(cmd_args, error_ok=True) + + def destroy_osd(self, osd_id: int) -> bool: + """ Destroys an OSD (forcefully) """ + cmd_args = {'prefix': 'osd destroy-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True} + return self._run_mon_cmd(cmd_args) + + def purge_osd(self, osd_id: int) -> bool: + """ Purges an OSD from the cluster (forcefully) """ + cmd_args = { + 'prefix': 'osd purge-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True + } + return self._run_mon_cmd(cmd_args) + + def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool: + """ + Generic command to run mon_command and evaluate/log the results + """ + ret, out, err = self.mgr.mon_command(cmd_args) + if ret != 0: + self.mgr.log.debug(f"ran {cmd_args} with mon_command") + if not error_ok: + self.mgr.log.error( + f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + return False + self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") + return True + + +class NotFoundError(Exception): + pass + + +class OSD: + + def __init__(self, + osd_id: int, + remove_util: RemoveUtil, + drain_started_at: Optional[datetime] = None, + process_started_at: Optional[datetime] = None, + drain_stopped_at: Optional[datetime] = None, + drain_done_at: Optional[datetime] = None, + draining: bool = False, + started: bool = False, + stopped: bool = False, + replace: bool = False, + force: bool = False, + hostname: Optional[str] = None, + zap: bool = False, + no_destroy: bool = False): + # the ID of the OSD + self.osd_id = osd_id + + # when did process (not the actual draining) start + self.process_started_at = process_started_at + + # when did the drain start + self.drain_started_at = drain_started_at + + # when did the drain stop + self.drain_stopped_at = drain_stopped_at + + # when did the drain finish + self.drain_done_at = drain_done_at + + # did the draining start + self.draining = draining + + # was the operation started + self.started = started + + # was the operation stopped + self.stopped = stopped + + # If this is a replace or remove operation + self.replace = replace + # If we wait for the osd to be drained + self.force = force + # The name of the node + self.hostname = hostname + + # mgr obj to make mgr/mon calls + self.rm_util: RemoveUtil = remove_util + + self.original_weight: Optional[float] = None + + # Whether devices associated with the OSD should be zapped (DATA ERASED) + self.zap = zap + # Whether all associated LV devices should be destroyed. + self.no_destroy = no_destroy + + def start(self) -> None: + if self.started: + logger.debug(f"Already started draining {self}") + return None + self.started = True + self.stopped = False + + def start_draining(self) -> bool: + if self.stopped: + logger.debug(f"Won't start draining {self}. OSD draining is stopped.") + return False + if self.replace: + self.rm_util.set_osd_flag([self], 'out') + else: + self.original_weight = self.rm_util.get_weight(self) + self.rm_util.reweight_osd(self, 0.0) + self.drain_started_at = datetime.utcnow() + self.draining = True + logger.debug(f"Started draining {self}.") + return True + + def stop_draining(self) -> bool: + if self.replace: + self.rm_util.set_osd_flag([self], 'in') + else: + if self.original_weight: + self.rm_util.reweight_osd(self, self.original_weight) + self.drain_stopped_at = datetime.utcnow() + self.draining = False + logger.debug(f"Stopped draining {self}.") + return True + + def stop(self) -> None: + if self.stopped: + logger.debug(f"Already stopped draining {self}") + return None + self.started = False + self.stopped = True + self.stop_draining() + + @property + def is_draining(self) -> bool: + """ + Consider an OSD draining when it is + actively draining but not yet empty + """ + return self.draining and not self.is_empty + + @property + def is_ok_to_stop(self) -> bool: + return self.rm_util.ok_to_stop([self]) + + @property + def is_empty(self) -> bool: + if self.get_pg_count() == 0: + if not self.drain_done_at: + self.drain_done_at = datetime.utcnow() + self.draining = False + return True + return False + + def safe_to_destroy(self) -> bool: + return self.rm_util.safe_to_destroy([self.osd_id]) + + def down(self) -> bool: + return self.rm_util.set_osd_flag([self], 'down') + + def destroy(self) -> bool: + return self.rm_util.destroy_osd(self.osd_id) + + def do_zap(self) -> str: + return self.rm_util.zap_osd(self) + + def purge(self) -> bool: + return self.rm_util.purge_osd(self.osd_id) + + def get_pg_count(self) -> int: + return self.rm_util.get_pg_count(self.osd_id) + + @property + def exists(self) -> bool: + return str(self.osd_id) in self.rm_util.get_osds_in_cluster() + + def drain_status_human(self) -> str: + default_status = 'not started' + status = 'started' if self.started and not self.draining else default_status + status = 'draining' if self.draining else status + status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status + return status + + def pg_count_str(self) -> str: + return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) + + def to_json(self) -> dict: + out: Dict[str, Any] = dict() + out['osd_id'] = self.osd_id + out['started'] = self.started + out['draining'] = self.draining + out['stopped'] = self.stopped + out['replace'] = self.replace + out['force'] = self.force + out['zap'] = self.zap + out['hostname'] = self.hostname # type: ignore + + for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + else: + out[k] = getattr(self, k) + return out + + @classmethod + def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]: + if not inp: + return None + for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if inp.get(date_field): + inp.update({date_field: str_to_datetime(inp.get(date_field, ''))}) + inp.update({'remove_util': rm_util}) + if 'nodename' in inp: + hostname = inp.pop('nodename') + inp['hostname'] = hostname + return cls(**inp) + + def __hash__(self) -> int: + return hash(self.osd_id) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, OSD): + return NotImplemented + return self.osd_id == other.osd_id + + def __repr__(self) -> str: + return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}" + + +class OSDRemovalQueue(object): + + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr + self.osds: Set[OSD] = set() + self.rm_util = RemoveUtil(mgr) + + # locks multithreaded access to self.osds. Please avoid locking + # network calls, like mon commands. + self.lock = Lock() + + def process_removal_queue(self) -> None: + """ + Performs actions in the _serve() loop to remove an OSD + when criteria is met. + + we can't hold self.lock, as we're calling _remove_daemon in the loop + """ + + # make sure that we don't run on OSDs that are not in the cluster anymore. + self.cleanup() + + # find osds that are ok-to-stop and not yet draining + ready_to_drain_osds = self._ready_to_drain_osds() + if ready_to_drain_osds: + # start draining those + _ = [osd.start_draining() for osd in ready_to_drain_osds] + + all_osds = self.all_osds() + + logger.debug( + f"{self.queue_size()} OSDs are scheduled " + f"for removal: {all_osds}") + + # Check all osds for their state and take action (remove, purge etc) + new_queue: Set[OSD] = set() + for osd in all_osds: # type: OSD + if not osd.force: + # skip criteria + if not osd.is_empty: + logger.debug(f"{osd} is not empty yet. Waiting a bit more") + new_queue.add(osd) + continue + + if not osd.safe_to_destroy(): + logger.debug( + f"{osd} is not safe-to-destroy yet. Waiting a bit more") + new_queue.add(osd) + continue + + # abort criteria + if not osd.down(): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not mark {osd} down") + + # stop and remove daemon + assert osd.hostname is not None + + if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'): + CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname) + logger.info(f"Successfully removed {osd} on {osd.hostname}") + else: + logger.info(f"Daemon {osd} on {osd.hostname} was already removed") + + if osd.replace: + # mark destroyed in osdmap + if not osd.destroy(): + raise orchestrator.OrchestratorError( + f"Could not destroy {osd}") + logger.info( + f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement") + else: + # purge from osdmap + if not osd.purge(): + raise orchestrator.OrchestratorError(f"Could not purge {osd}") + logger.info(f"Successfully purged {osd} on {osd.hostname}") + + if osd.zap: + # throws an exception if the zap fails + logger.info(f"Zapping devices for {osd} on {osd.hostname}") + osd.do_zap() + logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}") + + logger.debug(f"Removing {osd} from the queue.") + + # self could change while this is processing (osds get added from the CLI) + # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and + # osds that were added while this method was executed' + with self.lock: + self.osds.intersection_update(new_queue) + self._save_to_store() + + def cleanup(self) -> None: + # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs + with self.lock: + for osd in self._not_in_cluster(): + self.osds.remove(osd) + + def _ready_to_drain_osds(self) -> List["OSD"]: + """ + Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can + be accommodated by the 'max_osd_draining_count' config value, considering the number of OSDs + that are already draining. + """ + draining_limit = max(1, self.mgr.max_osd_draining_count) + num_already_draining = len(self.draining_osds()) + num_to_start_draining = max(0, draining_limit - num_already_draining) + stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) + return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining] + + def _save_to_store(self) -> None: + osd_queue = [osd.to_json() for osd in self.osds] + logger.debug(f"Saving {osd_queue} to store") + self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) + + def load_from_store(self) -> None: + with self.lock: + for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): + for osd in json.loads(v): + logger.debug(f"Loading osd ->{osd} from store") + osd_obj = OSD.from_json(osd, rm_util=self.rm_util) + if osd_obj is not None: + self.osds.add(osd_obj) + + def as_osd_ids(self) -> List[int]: + with self.lock: + return [osd.osd_id for osd in self.osds] + + def queue_size(self) -> int: + with self.lock: + return len(self.osds) + + def draining_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds if osd.is_draining] + + def idling_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty] + + def empty_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds if osd.is_empty] + + def all_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds] + + def _not_in_cluster(self) -> List["OSD"]: + return [osd for osd in self.osds if not osd.exists] + + def enqueue(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + with self.lock: + self.osds.add(osd) + osd.start() + + def rm(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + osd.stop() + with self.lock: + try: + logger.debug(f'Removing {osd} from the queue.') + self.osds.remove(osd) + except KeyError: + logger.debug(f"Could not find {osd} in queue.") + raise KeyError + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, OSDRemovalQueue): + return False + with self.lock: + return self.osds == other.osds -- cgit v1.2.3