diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/cephadm/services/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/cephadmservice.py | 1043 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/container.py | 29 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/exporter.py | 147 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/ingress.py | 296 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/iscsi.py | 210 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/monitoring.py | 502 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/nfs.py | 290 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/services/osd.py | 956 |
9 files changed, 3473 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/services/__init__.py b/src/pybind/mgr/cephadm/services/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/cephadm/services/__init__.py diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py new file mode 100644 index 000000000..92d293fcd --- /dev/null +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -0,0 +1,1043 @@ +import errno +import json +import logging +import re +import socket +import time +from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING, List, Callable, TypeVar, \ + Optional, Dict, Any, Tuple, NewType, cast + +from mgr_module import HandleCommandResult, MonCommandFailed + +from ceph.deployment.service_spec import ServiceSpec, RGWSpec +from ceph.deployment.utils import is_ipv6, unwrap_ipv6 +from mgr_util import build_url +from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus +from orchestrator._interface import daemon_type_to_service +from cephadm import utils + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + +logger = logging.getLogger(__name__) + +ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec) +AuthEntity = NewType('AuthEntity', str) + + +class CephadmDaemonDeploySpec: + # typing.NamedTuple + Generic is broken in py36 + def __init__(self, host: str, daemon_id: str, + service_name: str, + network: Optional[str] = None, + keyring: Optional[str] = None, + extra_args: Optional[List[str]] = None, + ceph_conf: str = '', + extra_files: Optional[Dict[str, Any]] = None, + daemon_type: Optional[str] = None, + ip: Optional[str] = None, + ports: Optional[List[int]] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + extra_container_args: Optional[List[str]] = None): + """ + A data struction to encapsulate `cephadm deploy ... + """ + self.host: str = host + self.daemon_id = daemon_id + self.service_name = service_name + daemon_type = daemon_type or (service_name.split('.')[0]) + assert daemon_type is not None + self.daemon_type: str = daemon_type + + # mons + self.network = network + + # for run_cephadm. + self.keyring: Optional[str] = keyring + + # For run_cephadm. Would be great to have more expressive names. + self.extra_args: List[str] = extra_args or [] + + self.ceph_conf = ceph_conf + self.extra_files = extra_files or {} + + # TCP ports used by the daemon + self.ports: List[int] = ports or [] + self.ip: Optional[str] = ip + + # values to be populated during generate_config calls + # and then used in _run_cephadm + self.final_config: Dict[str, Any] = {} + self.deps: List[str] = [] + + self.rank: Optional[int] = rank + self.rank_generation: Optional[int] = rank_generation + + self.extra_container_args = extra_container_args + + def name(self) -> str: + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def config_get_files(self) -> Dict[str, Any]: + files = self.extra_files + if self.ceph_conf: + files['config'] = self.ceph_conf + + return files + + @staticmethod + def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec': + assert dd.hostname + assert dd.daemon_id + assert dd.daemon_type + return CephadmDaemonDeploySpec( + host=dd.hostname, + daemon_id=dd.daemon_id, + daemon_type=dd.daemon_type, + service_name=dd.service_name(), + ip=dd.ip, + ports=dd.ports, + rank=dd.rank, + rank_generation=dd.rank_generation, + extra_container_args=dd.extra_container_args, + ) + + def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription: + return DaemonDescription( + daemon_type=self.daemon_type, + daemon_id=self.daemon_id, + service_name=self.service_name, + hostname=self.host, + status=status, + status_desc=status_desc, + ip=self.ip, + ports=self.ports, + rank=self.rank, + rank_generation=self.rank_generation, + extra_container_args=self.extra_container_args, + ) + + +class CephadmService(metaclass=ABCMeta): + """ + Base class for service types. Often providing a create() and config() fn. + """ + + @property + @abstractmethod + def TYPE(self) -> str: + pass + + def __init__(self, mgr: "CephadmOrchestrator"): + self.mgr: "CephadmOrchestrator" = mgr + + def allow_colo(self) -> bool: + """ + Return True if multiple daemons of the same type can colocate on + the same host. + """ + return False + + def primary_daemon_type(self) -> str: + """ + This is the type of the primary (usually only) daemon to be deployed. + """ + return self.TYPE + + def per_host_daemon_type(self) -> Optional[str]: + """ + If defined, this type of daemon will be deployed once for each host + containing one or more daemons of the primary type. + """ + return None + + def ranked(self) -> bool: + """ + If True, we will assign a stable rank (0, 1, ...) and monotonically increasing + generation (0, 1, ...) to each daemon we create/deploy. + """ + return False + + def fence_old_ranks(self, + spec: ServiceSpec, + rank_map: Dict[int, Dict[int, Optional[str]]], + num_ranks: int) -> None: + assert False + + def make_daemon_spec( + self, + host: str, + daemon_id: str, + network: str, + spec: ServiceSpecs, + daemon_type: Optional[str] = None, + ports: Optional[List[int]] = None, + ip: Optional[str] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + ) -> CephadmDaemonDeploySpec: + try: + eca = spec.extra_container_args + except AttributeError: + eca = None + return CephadmDaemonDeploySpec( + host=host, + daemon_id=daemon_id, + service_name=spec.service_name(), + network=network, + daemon_type=daemon_type, + ports=ports, + ip=ip, + rank=rank, + rank_generation=rank_generation, + extra_container_args=eca, + ) + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + raise NotImplementedError() + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + raise NotImplementedError() + + def config(self, spec: ServiceSpec) -> None: + """ + Configure the cluster for this service. Only called *once* per + service apply. Not for every daemon. + """ + pass + + def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None: + """The post actions needed to be done after daemons are checked""" + if self.mgr.config_dashboard: + if 'dashboard' in self.mgr.get('mgr_map')['modules']: + self.config_dashboard(daemon_descrs) + else: + logger.debug('Dashboard is not enabled. Skip configuration.') + + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: + """Config dashboard settings.""" + raise NotImplementedError() + + def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: + # if this is called for a service type where it hasn't explcitly been + # defined, return empty Daemon Desc + return DaemonDescription() + + def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str: + ret, keyring, err = self.mgr.mon_command({ + 'prefix': 'auth get-or-create', + 'entity': entity, + 'caps': caps, + }) + if err: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth caps', + 'entity': entity, + 'caps': caps, + }) + if err: + self.mgr.log.warning(f"Unable to update caps for {entity}") + return keyring + + def _inventory_get_fqdn(self, hostname: str) -> str: + """Get a host's FQDN with its hostname. + + If the FQDN can't be resolved, the address from the inventory will + be returned instead. + """ + addr = self.mgr.inventory.get_addr(hostname) + return socket.getfqdn(addr) + + def _set_service_url_on_dashboard(self, + service_name: str, + get_mon_cmd: str, + set_mon_cmd: str, + service_url: str) -> None: + """A helper to get and set service_url via Dashboard's MON command. + + If result of get_mon_cmd differs from service_url, set_mon_cmd will + be sent to set the service_url. + """ + def get_set_cmd_dicts(out: str) -> List[dict]: + cmd_dict = { + 'prefix': set_mon_cmd, + 'value': service_url + } + return [cmd_dict] if service_url != out else [] + + self._check_and_set_dashboard( + service_name=service_name, + get_cmd=get_mon_cmd, + get_set_cmd_dicts=get_set_cmd_dicts + ) + + def _check_and_set_dashboard(self, + service_name: str, + get_cmd: str, + get_set_cmd_dicts: Callable[[str], List[dict]]) -> None: + """A helper to set configs in the Dashboard. + + The method is useful for the pattern: + - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI + gateways. + - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string. + - Determine if the config need to be update. NOTE: This step is important because if a + Dashboard command modified Ceph config, cephadm's config_notify() is called. Which + kicks the serve() loop and the logic using this method is likely to be called again. + A config should be updated only when needed. + - Update a config in Dashboard by using a Dashboard command. + + :param service_name: the service name to be used for logging + :type service_name: str + :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url + :type get_cmd: str + :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary. + e.g. + [ + { + 'prefix': 'dashboard iscsi-gateway-add', + 'service_url': 'http://admin:admin@aaa:5000', + 'name': 'aaa' + }, + { + 'prefix': 'dashboard iscsi-gateway-add', + 'service_url': 'http://admin:admin@bbb:5000', + 'name': 'bbb' + } + ] + The function should return empty list if no command need to be sent. + :type get_set_cmd_dicts: Callable[[str], List[dict]] + """ + + try: + _, out, _ = self.mgr.check_mon_command({ + 'prefix': get_cmd + }) + except MonCommandFailed as e: + logger.warning('Failed to get Dashboard config for %s: %s', service_name, e) + return + cmd_dicts = get_set_cmd_dicts(out.strip()) + for cmd_dict in list(cmd_dicts): + try: + inbuf = cmd_dict.pop('inbuf', None) + _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf) + except MonCommandFailed as e: + logger.warning('Failed to set Dashboard config for %s: %s', service_name, e) + + def ok_to_stop_osd( + self, + osds: List[str], + known: Optional[List[str]] = None, # output argument + force: bool = False) -> HandleCommandResult: + r = HandleCommandResult(*self.mgr.mon_command({ + 'prefix': "osd ok-to-stop", + 'ids': osds, + 'max': 16, + })) + j = None + try: + j = json.loads(r.stdout) + except json.decoder.JSONDecodeError: + self.mgr.log.warning("osd ok-to-stop didn't return structured result") + raise + if r.retval: + return r + if known is not None and j and j.get('ok_to_stop'): + self.mgr.log.debug(f"got {j}") + known.extend([f'osd.{x}' for x in j.get('osds', [])]) + return HandleCommandResult( + 0, + f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart', + '' + ) + + def ok_to_stop( + self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None # output argument + ) -> HandleCommandResult: + names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids] + out = f'It appears safe to stop {",".join(names)}' + err = f'It is NOT safe to stop {",".join(names)} at this time' + + if self.TYPE not in ['mon', 'osd', 'mds']: + logger.debug(out) + return HandleCommandResult(0, out) + + if self.TYPE == 'osd': + return self.ok_to_stop_osd(daemon_ids, known, force) + + r = HandleCommandResult(*self.mgr.mon_command({ + 'prefix': f'{self.TYPE} ok-to-stop', + 'ids': daemon_ids, + })) + + if r.retval: + err = f'{err}: {r.stderr}' if r.stderr else err + logger.debug(err) + return HandleCommandResult(r.retval, r.stdout, err) + + out = f'{out}: {r.stdout}' if r.stdout else out + logger.debug(out) + return HandleCommandResult(r.retval, out, r.stderr) + + def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]: + # Provides a warning about if it possible or not to stop <n> daemons in a service + names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids] + number_of_running_daemons = len( + [daemon + for daemon in self.mgr.cache.get_daemons_by_type(daemon_type) + if daemon.status == DaemonDescriptionStatus.running]) + if (number_of_running_daemons - len(daemon_ids)) >= low_limit: + return False, f'It is presumed safe to stop {names}' + + num_daemons_left = number_of_running_daemons - len(daemon_ids) + + def plural(count: int) -> str: + return 'daemon' if count == 1 else 'daemons' + + left_count = "no" if num_daemons_left == 0 else num_daemons_left + + if alert: + out = (f'ALERT: Cannot stop {names} in {service} service. ' + f'Not enough remaining {service} daemons. ' + f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ') + else: + out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. ' + f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. ' + f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ') + return True, out + + def pre_remove(self, daemon: DaemonDescription) -> None: + """ + Called before the daemon is removed. + """ + assert daemon.daemon_type is not None + assert self.TYPE == daemon_type_to_service(daemon.daemon_type) + logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}') + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + """ + Called after the daemon is removed. + """ + assert daemon.daemon_type is not None + assert self.TYPE == daemon_type_to_service(daemon.daemon_type) + logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}') + + def purge(self, service_name: str) -> None: + """Called to carry out any purge tasks following service removal""" + logger.debug(f'Purge called for {self.TYPE} - no action taken') + + +class CephService(CephadmService): + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + # Ceph.daemons (mon, mgr, mds, osd, etc) + cephadm_config = self.get_config_and_keyring( + daemon_spec.daemon_type, + daemon_spec.daemon_id, + host=daemon_spec.host, + keyring=daemon_spec.keyring, + extra_ceph_config=daemon_spec.ceph_conf) + + if daemon_spec.config_get_files(): + cephadm_config.update({'files': daemon_spec.config_get_files()}) + + return cephadm_config, [] + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + super().post_remove(daemon, is_failed_deploy=is_failed_deploy) + self.remove_keyring(daemon) + + def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity: + """ + Map the daemon id to a cephx keyring entity name + """ + # despite this mapping entity names to daemons, self.TYPE within + # the CephService class refers to service types, not daemon types + if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']: + return AuthEntity(f'client.{self.TYPE}.{daemon_id}') + elif self.TYPE == 'crash': + if host == "": + raise OrchestratorError("Host not provided to generate <crash> auth entity name") + return AuthEntity(f'client.{self.TYPE}.{host}') + elif self.TYPE == 'mon': + return AuthEntity('mon.') + elif self.TYPE in ['mgr', 'osd', 'mds']: + return AuthEntity(f'{self.TYPE}.{daemon_id}') + else: + raise OrchestratorError("unknown daemon type") + + def get_config_and_keyring(self, + daemon_type: str, + daemon_id: str, + host: str, + keyring: Optional[str] = None, + extra_ceph_config: Optional[str] = None + ) -> Dict[str, Any]: + # keyring + if not keyring: + entity: AuthEntity = self.get_auth_entity(daemon_id, host=host) + ret, keyring, err = self.mgr.check_mon_command({ + 'prefix': 'auth get', + 'entity': entity, + }) + + config = self.mgr.get_minimal_ceph_conf() + + if extra_ceph_config: + config += extra_ceph_config + + return { + 'config': config, + 'keyring': keyring, + } + + def remove_keyring(self, daemon: DaemonDescription) -> None: + assert daemon.daemon_id is not None + assert daemon.hostname is not None + daemon_id: str = daemon.daemon_id + host: str = daemon.hostname + + assert daemon.daemon_type != 'mon' + + entity = self.get_auth_entity(daemon_id, host=host) + + logger.info(f'Removing key for {entity}') + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + + +class MonService(CephService): + TYPE = 'mon' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + """ + Create a new monitor on the given host. + """ + assert self.TYPE == daemon_spec.daemon_type + name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network + + # get mon. key + ret, keyring, err = self.mgr.check_mon_command({ + 'prefix': 'auth get', + 'entity': self.get_auth_entity(name), + }) + + extra_config = '[mon.%s]\n' % name + if network: + # infer whether this is a CIDR network, addrvec, or plain IP + if '/' in network: + extra_config += 'public network = %s\n' % network + elif network.startswith('[v') and network.endswith(']'): + extra_config += 'public addrv = %s\n' % network + elif is_ipv6(network): + extra_config += 'public addr = %s\n' % unwrap_ipv6(network) + elif ':' not in network: + extra_config += 'public addr = %s\n' % network + else: + raise OrchestratorError( + 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network) + else: + # try to get the public_network from the config + ret, network, err = self.mgr.check_mon_command({ + 'prefix': 'config get', + 'who': 'mon', + 'key': 'public_network', + }) + network = network.strip() if network else network + if not network: + raise OrchestratorError( + 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP') + if '/' not in network: + raise OrchestratorError( + 'public_network is set but does not look like a CIDR network: \'%s\'' % network) + extra_config += 'public network = %s\n' % network + + daemon_spec.ceph_conf = extra_config + daemon_spec.keyring = keyring + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def _check_safe_to_destroy(self, mon_id: str) -> None: + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'quorum_status', + }) + try: + j = json.loads(out) + except Exception: + raise OrchestratorError('failed to parse quorum status') + + mons = [m['name'] for m in j['monmap']['mons']] + if mon_id not in mons: + logger.info('Safe to remove mon.%s: not in monmap (%s)' % ( + mon_id, mons)) + return + new_mons = [m for m in mons if m != mon_id] + new_quorum = [m for m in j['quorum_names'] if m != mon_id] + if len(new_quorum) > len(new_mons) / 2: + logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % + (mon_id, new_quorum, new_mons)) + return + raise OrchestratorError( + 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons)) + + def pre_remove(self, daemon: DaemonDescription) -> None: + super().pre_remove(daemon) + + assert daemon.daemon_id is not None + daemon_id: str = daemon.daemon_id + self._check_safe_to_destroy(daemon_id) + + # remove mon from quorum before we destroy the daemon + logger.info('Removing monitor %s from monmap...' % daemon_id) + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'mon rm', + 'name': daemon_id, + }) + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + # Do not remove the mon keyring. + # super().post_remove(daemon) + pass + + +class MgrService(CephService): + TYPE = 'mgr' + + def allow_colo(self) -> bool: + if self.mgr.get_ceph_option('mgr_standby_modules'): + # traditional mgr mode: standby daemons' modules listen on + # ports and redirect to the primary. we must not schedule + # multiple mgrs on the same host or else ports will + # conflict. + return False + else: + # standby daemons do nothing, and therefore port conflicts + # are not a concern. + return True + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + """ + Create a new manager instance on a host. + """ + assert self.TYPE == daemon_spec.daemon_type + mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host + + # get mgr. key + keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id), + ['mon', 'profile mgr', + 'osd', 'allow *', + 'mds', 'allow *']) + + # Retrieve ports used by manager modules + # In the case of the dashboard port and with several manager daemons + # running in different hosts, it exists the possibility that the + # user has decided to use different dashboard ports in each server + # If this is the case then the dashboard port opened will be only the used + # as default. + ports = [] + ret, mgr_services, err = self.mgr.check_mon_command({ + 'prefix': 'mgr services', + }) + if mgr_services: + mgr_endpoints = json.loads(mgr_services) + for end_point in mgr_endpoints.values(): + port = re.search(r'\:\d+\/', end_point) + if port: + ports.append(int(port[0][1:-1])) + + if ports: + daemon_spec.ports = ports + + daemon_spec.keyring = keyring + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: + for daemon in daemon_descrs: + assert daemon.daemon_type is not None + assert daemon.daemon_id is not None + if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id): + return daemon + # if no active mgr found, return empty Daemon Desc + return DaemonDescription() + + def fail_over(self) -> None: + # this has been seen to sometimes transiently fail even when there are multiple + # mgr daemons. As long as there are multiple known mgr daemons, we should retry. + class NoStandbyError(OrchestratorError): + pass + no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=( + 'daemon', 'mgr' + self.mgr.get_mgr_id())) + for sleep_secs in [2, 8, 15]: + try: + if not self.mgr_map_has_standby(): + raise no_standby_exc + self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(), + 'INFO', 'Failing over to other MGR') + logger.info('Failing over to other MGR') + + # fail over + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'mgr fail', + 'who': self.mgr.get_mgr_id(), + }) + return + except NoStandbyError: + logger.info( + f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds') + time.sleep(sleep_secs) + raise no_standby_exc + + def mgr_map_has_standby(self) -> bool: + """ + This is a bit safer than asking our inventory. If the mgr joined the mgr map, + we know it joined the cluster + """ + mgr_map = self.mgr.get('mgr_map') + num = len(mgr_map.get('standbys')) + return bool(num) + + def ok_to_stop( + self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None # output argument + ) -> HandleCommandResult: + # ok to stop if there is more than 1 mgr and not trying to stop the active mgr + + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True) + if warn: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE) + active = self.get_active_daemon(mgr_daemons).daemon_id + if active in daemon_ids: + warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + return HandleCommandResult(0, warn_message, '') + + +class MdsService(CephService): + TYPE = 'mds' + + def allow_colo(self) -> bool: + return True + + def config(self, spec: ServiceSpec) -> None: + assert self.TYPE == spec.service_type + assert spec.service_id + + # ensure mds_join_fs is set for these daemons + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': 'mds.' + spec.service_id, + 'name': 'mds_join_fs', + 'value': spec.service_id, + }) + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + mds_id, _ = daemon_spec.daemon_id, daemon_spec.host + + # get mds. key + keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id), + ['mon', 'profile mds', + 'osd', 'allow rw tag cephfs *=*', + 'mds', 'allow']) + daemon_spec.keyring = keyring + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: + active_mds_strs = list() + for fs in self.mgr.get('fs_map')['filesystems']: + mds_map = fs['mdsmap'] + if mds_map is not None: + for mds_id, mds_status in mds_map['info'].items(): + if mds_status['state'] == 'up:active': + active_mds_strs.append(mds_status['name']) + if len(active_mds_strs) != 0: + for daemon in daemon_descrs: + if daemon.daemon_id in active_mds_strs: + return daemon + # if no mds found, return empty Daemon Desc + return DaemonDescription() + + def purge(self, service_name: str) -> None: + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': service_name, + 'name': 'mds_join_fs', + }) + + +class RgwService(CephService): + TYPE = 'rgw' + + def allow_colo(self) -> bool: + return True + + def config(self, spec: RGWSpec) -> None: # type: ignore + assert self.TYPE == spec.service_type + + # set rgw_realm and rgw_zone, if present + if spec.rgw_realm: + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", + 'name': 'rgw_realm', + 'value': spec.rgw_realm, + }) + if spec.rgw_zone: + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", + 'name': 'rgw_zone', + 'value': spec.rgw_zone, + }) + + if spec.rgw_frontend_ssl_certificate: + if isinstance(spec.rgw_frontend_ssl_certificate, list): + cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate) + elif isinstance(spec.rgw_frontend_ssl_certificate, str): + cert_data = spec.rgw_frontend_ssl_certificate + else: + raise OrchestratorError( + 'Invalid rgw_frontend_ssl_certificate: %s' + % spec.rgw_frontend_ssl_certificate) + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config-key set', + 'key': f'rgw/cert/{spec.service_name()}', + 'val': cert_data, + }) + + # TODO: fail, if we don't have a spec + logger.info('Saving service %s spec with placement %s' % ( + spec.service_name(), spec.placement.pretty_str())) + self.mgr.spec_store.save(spec) + self.mgr.trigger_connect_dashboard_rgw() + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host + spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + + keyring = self.get_keyring(rgw_id) + + if daemon_spec.ports: + port = daemon_spec.ports[0] + else: + # this is a redeploy of older instance that doesn't have an explicitly + # assigned port, in which case we can assume there is only 1 per host + # and it matches the spec. + port = spec.get_port() + + # configure frontend + args = [] + ftype = spec.rgw_frontend_type or "beast" + if ftype == 'beast': + if spec.ssl: + if daemon_spec.ip: + args.append( + f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}") + else: + args.append(f"ssl_port={port}") + args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}") + else: + if daemon_spec.ip: + args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}") + else: + args.append(f"port={port}") + elif ftype == 'civetweb': + if spec.ssl: + if daemon_spec.ip: + # note the 's' suffix on port + args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s") + else: + args.append(f"port={port}s") # note the 's' suffix on port + args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}") + else: + if daemon_spec.ip: + args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}") + else: + args.append(f"port={port}") + frontend = f'{ftype} {" ".join(args)}' + + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': utils.name_to_config_section(daemon_spec.name()), + 'name': 'rgw_frontends', + 'value': frontend + }) + + daemon_spec.keyring = keyring + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def get_keyring(self, rgw_id: str) -> str: + keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id), + ['mon', 'allow *', + 'mgr', 'allow rw', + 'osd', 'allow rwx tag rgw *=*']) + return keyring + + def purge(self, service_name: str) -> None: + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(service_name), + 'name': 'rgw_realm', + }) + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(service_name), + 'name': 'rgw_zone', + }) + self.mgr.check_mon_command({ + 'prefix': 'config-key rm', + 'key': f'rgw/cert/{service_name}', + }) + self.mgr.trigger_connect_dashboard_rgw() + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + super().post_remove(daemon, is_failed_deploy=is_failed_deploy) + self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'who': utils.name_to_config_section(daemon.name()), + 'name': 'rgw_frontends', + }) + + def ok_to_stop( + self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None # output argument + ) -> HandleCommandResult: + # if load balancer (ingress) is present block if only 1 daemon up otherwise ok + # if no load balancer, warn if > 1 daemon, block if only 1 daemon + def ingress_present() -> bool: + running_ingress_daemons = [ + daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1] + running_haproxy_daemons = [ + daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy'] + running_keepalived_daemons = [ + daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived'] + # check that there is at least one haproxy and keepalived daemon running + if running_haproxy_daemons and running_keepalived_daemons: + return True + return False + + # if only 1 rgw, alert user (this is not passable with --force) + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True) + if warn: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + # if reached here, there is > 1 rgw daemon. + # Say okay if load balancer present or force flag set + if ingress_present() or force: + return HandleCommandResult(0, warn_message, '') + + # if reached here, > 1 RGW daemon, no load balancer and no force flag. + # Provide warning + warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. " + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: + self.mgr.trigger_connect_dashboard_rgw() + + +class RbdMirrorService(CephService): + TYPE = 'rbd-mirror' + + def allow_colo(self) -> bool: + return True + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host + + keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id), + ['mon', 'profile rbd-mirror', + 'osd', 'profile rbd']) + + daemon_spec.keyring = keyring + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def ok_to_stop( + self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None # output argument + ) -> HandleCommandResult: + # if only 1 rbd-mirror, alert user (this is not passable with --force) + warn, warn_message = self._enough_daemons_to_stop( + self.TYPE, daemon_ids, 'Rbdmirror', 1, True) + if warn: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + return HandleCommandResult(0, warn_message, '') + + +class CrashService(CephService): + TYPE = 'crash' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_id, host = daemon_spec.daemon_id, daemon_spec.host + + keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), + ['mon', 'profile crash', + 'mgr', 'profile crash']) + + daemon_spec.keyring = keyring + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + +class CephfsMirrorService(CephService): + TYPE = 'cephfs-mirror' + + def config(self, spec: ServiceSpec) -> None: + # make sure mirroring module is enabled + mgr_map = self.mgr.get('mgr_map') + mod_name = 'mirroring' + if mod_name not in mgr_map.get('services', {}): + self.mgr.check_mon_command({ + 'prefix': 'mgr module enable', + 'module': mod_name + }) + # we shouldn't get here (mon will tell the mgr to respawn), but no + # harm done if we do. + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + + ret, keyring, err = self.mgr.check_mon_command({ + 'prefix': 'auth get-or-create', + 'entity': self.get_auth_entity(daemon_spec.daemon_id), + 'caps': ['mon', 'profile cephfs-mirror', + 'mds', 'allow r', + 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*', + 'mgr', 'allow r'], + }) + + daemon_spec.keyring = keyring + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec diff --git a/src/pybind/mgr/cephadm/services/container.py b/src/pybind/mgr/cephadm/services/container.py new file mode 100644 index 000000000..b9cdfad5e --- /dev/null +++ b/src/pybind/mgr/cephadm/services/container.py @@ -0,0 +1,29 @@ +import logging +from typing import List, Any, Tuple, Dict, cast + +from ceph.deployment.service_spec import CustomContainerSpec + +from .cephadmservice import CephadmService, CephadmDaemonDeploySpec + +logger = logging.getLogger(__name__) + + +class CustomContainerService(CephadmService): + TYPE = 'container' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) \ + -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) \ + -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + deps: List[str] = [] + spec = cast(CustomContainerSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + config: Dict[str, Any] = spec.config_json() + logger.debug( + 'Generated configuration for \'%s\' service: config-json=%s, dependencies=%s' % + (self.TYPE, config, deps)) + return config, deps diff --git a/src/pybind/mgr/cephadm/services/exporter.py b/src/pybind/mgr/cephadm/services/exporter.py new file mode 100644 index 000000000..b9c7d85e6 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/exporter.py @@ -0,0 +1,147 @@ +import json +import logging +from typing import TYPE_CHECKING, List, Dict, Any, Tuple + +from orchestrator import OrchestratorError +from mgr_util import ServerConfigException, verify_tls + +from .cephadmservice import CephadmService, CephadmDaemonDeploySpec + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + +logger = logging.getLogger(__name__) + + +class CephadmExporterConfig: + required_keys = ['crt', 'key', 'token', 'port'] + DEFAULT_PORT = '9443' + + def __init__(self, mgr: "CephadmOrchestrator", crt: str = "", key: str = "", + token: str = "", port: str = "") -> None: + self.mgr = mgr + self.crt = crt + self.key = key + self.token = token + self.port = port + + @property + def ready(self) -> bool: + return all([self.crt, self.key, self.token, self.port]) + + def load_from_store(self) -> None: + cfg = self.mgr._get_exporter_config() + + assert isinstance(cfg, dict) + self.crt = cfg.get('crt', "") + self.key = cfg.get('key', "") + self.token = cfg.get('token', "") + self.port = cfg.get('port', "") + + def load_from_json(self, json_str: str) -> Tuple[int, str]: + try: + cfg = json.loads(json_str) + except ValueError: + return 1, "Invalid JSON provided - unable to load" + + if not all([k in cfg for k in CephadmExporterConfig.required_keys]): + return 1, "JSON file must contain crt, key, token and port" + + self.crt = cfg.get('crt') + self.key = cfg.get('key') + self.token = cfg.get('token') + self.port = cfg.get('port') + + return 0, "" + + def validate_config(self) -> Tuple[int, str]: + if not self.ready: + return 1, "Incomplete configuration. cephadm-exporter needs crt, key, token and port to be set" + + for check in [self._validate_tls, self._validate_token, self._validate_port]: + rc, reason = check() + if rc: + return 1, reason + + return 0, "" + + def _validate_tls(self) -> Tuple[int, str]: + + try: + verify_tls(self.crt, self.key) + except ServerConfigException as e: + return 1, str(e) + + return 0, "" + + def _validate_token(self) -> Tuple[int, str]: + if not isinstance(self.token, str): + return 1, "token must be a string" + if len(self.token) < 8: + return 1, "Token must be a string of at least 8 chars in length" + + return 0, "" + + def _validate_port(self) -> Tuple[int, str]: + try: + p = int(str(self.port)) + if p <= 1024: + raise ValueError + except ValueError: + return 1, "Port must be a integer (>1024)" + + return 0, "" + + +class CephadmExporter(CephadmService): + TYPE = 'cephadm-exporter' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + + cfg = CephadmExporterConfig(self.mgr) + cfg.load_from_store() + + if cfg.ready: + rc, reason = cfg.validate_config() + if rc: + raise OrchestratorError(reason) + else: + logger.info( + "Incomplete/Missing configuration, applying defaults") + self.mgr._set_exporter_defaults() + cfg.load_from_store() + + if not daemon_spec.ports: + daemon_spec.ports = [int(cfg.port)] + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + deps: List[str] = [] + + cfg = CephadmExporterConfig(self.mgr) + cfg.load_from_store() + + if cfg.ready: + rc, reason = cfg.validate_config() + if rc: + raise OrchestratorError(reason) + else: + logger.info("Using default configuration for cephadm-exporter") + self.mgr._set_exporter_defaults() + cfg.load_from_store() + + config = { + "crt": cfg.crt, + "key": cfg.key, + "token": cfg.token + } + return config, deps + + def purge(self, service_name: str) -> None: + logger.info("Purging cephadm-exporter settings from mon K/V store") + self.mgr._clear_exporter_config_settings() diff --git a/src/pybind/mgr/cephadm/services/ingress.py b/src/pybind/mgr/cephadm/services/ingress.py new file mode 100644 index 000000000..99fde1c43 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/ingress.py @@ -0,0 +1,296 @@ +import ipaddress +import logging +import random +import string +from typing import List, Dict, Any, Tuple, cast, Optional + +from ceph.deployment.service_spec import IngressSpec +from mgr_util import build_url +from cephadm.utils import resolve_ip +from orchestrator import OrchestratorError +from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService + +logger = logging.getLogger(__name__) + + +class IngressService(CephService): + TYPE = 'ingress' + + def primary_daemon_type(self) -> str: + return 'haproxy' + + def per_host_daemon_type(self) -> Optional[str]: + return 'keepalived' + + def prepare_create( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> CephadmDaemonDeploySpec: + if daemon_spec.daemon_type == 'haproxy': + return self.haproxy_prepare_create(daemon_spec) + if daemon_spec.daemon_type == 'keepalived': + return self.keepalived_prepare_create(daemon_spec) + assert False, "unexpected daemon type" + + def generate_config( + self, + daemon_spec: CephadmDaemonDeploySpec + ) -> Tuple[Dict[str, Any], List[str]]: + if daemon_spec.daemon_type == 'haproxy': + return self.haproxy_generate_config(daemon_spec) + else: + return self.keepalived_generate_config(daemon_spec) + assert False, "unexpected daemon type" + + def haproxy_prepare_create( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> CephadmDaemonDeploySpec: + assert daemon_spec.daemon_type == 'haproxy' + + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + + logger.debug('prepare_create haproxy.%s on host %s with spec %s' % ( + daemon_id, host, spec)) + + daemon_spec.final_config, daemon_spec.deps = self.haproxy_generate_config(daemon_spec) + + return daemon_spec + + def haproxy_generate_config( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> Tuple[Dict[str, Any], List[str]]: + spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + assert spec.backend_service + if spec.backend_service not in self.mgr.spec_store: + raise RuntimeError( + f'{spec.service_name()} backend service {spec.backend_service} does not exist') + backend_spec = self.mgr.spec_store[spec.backend_service].spec + daemons = self.mgr.cache.get_daemons_by_service(spec.backend_service) + deps = [d.name() for d in daemons] + + # generate password? + pw_key = f'{spec.service_name()}/monitor_password' + password = self.mgr.get_store(pw_key) + if password is None: + if not spec.monitor_password: + password = ''.join(random.choice(string.ascii_lowercase) for _ in range(20)) + self.mgr.set_store(pw_key, password) + else: + if spec.monitor_password: + self.mgr.set_store(pw_key, None) + if spec.monitor_password: + password = spec.monitor_password + + if backend_spec.service_type == 'nfs': + mode = 'tcp' + by_rank = {d.rank: d for d in daemons if d.rank is not None} + servers = [] + + # try to establish how many ranks we *should* have + num_ranks = backend_spec.placement.count + if not num_ranks: + num_ranks = 1 + max(by_rank.keys()) + + for rank in range(num_ranks): + if rank in by_rank: + d = by_rank[rank] + assert d.ports + servers.append({ + 'name': f"{spec.backend_service}.{rank}", + 'ip': d.ip or resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))), + 'port': d.ports[0], + }) + else: + # offline/missing server; leave rank in place + servers.append({ + 'name': f"{spec.backend_service}.{rank}", + 'ip': '0.0.0.0', + 'port': 0, + }) + else: + mode = 'http' + servers = [ + { + 'name': d.name(), + 'ip': d.ip or resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))), + 'port': d.ports[0], + } for d in daemons if d.ports + ] + + haproxy_conf = self.mgr.template.render( + 'services/ingress/haproxy.cfg.j2', + { + 'spec': spec, + 'mode': mode, + 'servers': servers, + 'user': spec.monitor_user or 'admin', + 'password': password, + 'ip': "*" if spec.virtual_ips_list else str(spec.virtual_ip).split('/')[0] or daemon_spec.ip or '*', + 'frontend_port': daemon_spec.ports[0] if daemon_spec.ports else spec.frontend_port, + 'monitor_port': daemon_spec.ports[1] if daemon_spec.ports else spec.monitor_port, + } + ) + config_files = { + 'files': { + "haproxy.cfg": haproxy_conf, + } + } + if spec.ssl_cert: + ssl_cert = spec.ssl_cert + if isinstance(ssl_cert, list): + ssl_cert = '\n'.join(ssl_cert) + config_files['files']['haproxy.pem'] = ssl_cert + + return config_files, sorted(deps) + + def keepalived_prepare_create( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> CephadmDaemonDeploySpec: + assert daemon_spec.daemon_type == 'keepalived' + + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + + logger.debug('prepare_create keepalived.%s on host %s with spec %s' % ( + daemon_id, host, spec)) + + daemon_spec.final_config, daemon_spec.deps = self.keepalived_generate_config(daemon_spec) + + return daemon_spec + + def keepalived_generate_config( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> Tuple[Dict[str, Any], List[str]]: + spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + assert spec.backend_service + + # generate password? + pw_key = f'{spec.service_name()}/keepalived_password' + password = self.mgr.get_store(pw_key) + if password is None: + if not spec.keepalived_password: + password = ''.join(random.choice(string.ascii_lowercase) for _ in range(20)) + self.mgr.set_store(pw_key, password) + else: + if spec.keepalived_password: + self.mgr.set_store(pw_key, None) + if spec.keepalived_password: + password = spec.keepalived_password + + daemons = self.mgr.cache.get_daemons_by_service(spec.service_name()) + + if not daemons: + raise OrchestratorError( + f'Failed to generate keepalived.conf: No daemons deployed for {spec.service_name()}') + + deps = sorted([d.name() for d in daemons if d.daemon_type == 'haproxy']) + + host = daemon_spec.host + hosts = sorted(list(set([host] + [str(d.hostname) for d in daemons]))) + + # interface + bare_ips = [] + if spec.virtual_ip: + bare_ips.append(str(spec.virtual_ip).split('/')[0]) + elif spec.virtual_ips_list: + bare_ips = [str(vip).split('/')[0] for vip in spec.virtual_ips_list] + interface = None + for bare_ip in bare_ips: + for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items(): + if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet): + interface = list(ifaces.keys())[0] + logger.info( + f'{bare_ip} is in {subnet} on {host} interface {interface}' + ) + break + else: # nobreak + continue + break + # try to find interface by matching spec.virtual_interface_networks + if not interface and spec.virtual_interface_networks: + for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items(): + if subnet in spec.virtual_interface_networks: + interface = list(ifaces.keys())[0] + logger.info( + f'{spec.virtual_ip} will be configured on {host} interface ' + f'{interface} (which has guiding subnet {subnet})' + ) + break + if not interface: + raise OrchestratorError( + f"Unable to identify interface for {spec.virtual_ip} on {host}" + ) + + # script to monitor health + script = '/usr/bin/false' + for d in daemons: + if d.hostname == host: + if d.daemon_type == 'haproxy': + assert d.ports + port = d.ports[1] # monitoring port + script = f'/usr/bin/curl {build_url(scheme="http", host=d.ip or "localhost", port=port)}/health' + assert script + + states = [] + priorities = [] + virtual_ips = [] + + # Set state and priority. Have one master for each VIP. Or at least the first one as master if only one VIP. + if spec.virtual_ip: + virtual_ips.append(spec.virtual_ip) + if hosts[0] == host: + states.append('MASTER') + priorities.append(100) + else: + states.append('BACKUP') + priorities.append(90) + + elif spec.virtual_ips_list: + virtual_ips = spec.virtual_ips_list + if len(virtual_ips) > len(hosts): + raise OrchestratorError( + "Number of virtual IPs for ingress is greater than number of available hosts" + ) + for x in range(len(virtual_ips)): + if hosts[x] == host: + states.append('MASTER') + priorities.append(100) + else: + states.append('BACKUP') + priorities.append(90) + + # remove host, daemon is being deployed on from hosts list for + # other_ips in conf file and converter to ips + if host in hosts: + hosts.remove(host) + other_ips = [resolve_ip(self.mgr.inventory.get_addr(h)) for h in hosts] + + keepalived_conf = self.mgr.template.render( + 'services/ingress/keepalived.conf.j2', + { + 'spec': spec, + 'script': script, + 'password': password, + 'interface': interface, + 'virtual_ips': virtual_ips, + 'states': states, + 'priorities': priorities, + 'other_ips': other_ips, + 'host_ip': resolve_ip(self.mgr.inventory.get_addr(host)), + } + ) + + config_file = { + 'files': { + "keepalived.conf": keepalived_conf, + } + } + + return config_file, deps diff --git a/src/pybind/mgr/cephadm/services/iscsi.py b/src/pybind/mgr/cephadm/services/iscsi.py new file mode 100644 index 000000000..c42eff683 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/iscsi.py @@ -0,0 +1,210 @@ +import errno +import json +import logging +import subprocess +from typing import List, cast, Optional +from ipaddress import ip_address, IPv6Address + +from mgr_module import HandleCommandResult +from ceph.deployment.service_spec import IscsiServiceSpec + +from orchestrator import DaemonDescription, DaemonDescriptionStatus +from .cephadmservice import CephadmDaemonDeploySpec, CephService +from .. import utils + +logger = logging.getLogger(__name__) + + +class IscsiService(CephService): + TYPE = 'iscsi' + + def config(self, spec: IscsiServiceSpec) -> None: # type: ignore + assert self.TYPE == spec.service_type + assert spec.pool + self.mgr._check_pool_exists(spec.pool, spec.service_name()) + + def get_trusted_ips(self, spec: IscsiServiceSpec) -> str: + # add active mgr ip address to trusted list so dashboard can access + trusted_ip_list = spec.trusted_ip_list if spec.trusted_ip_list else '' + if trusted_ip_list: + trusted_ip_list += ',' + trusted_ip_list += self.mgr.get_mgr_ip() + return trusted_ip_list + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + + spec = cast(IscsiServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + igw_id = daemon_spec.daemon_id + + keyring = self.get_keyring_with_caps(self.get_auth_entity(igw_id), + ['mon', 'profile rbd, ' + 'allow command "osd blocklist", ' + 'allow command "config-key get" with "key" prefix "iscsi/"', + 'mgr', 'allow command "service status"', + 'osd', 'allow rwx']) + + if spec.ssl_cert: + if isinstance(spec.ssl_cert, list): + cert_data = '\n'.join(spec.ssl_cert) + else: + cert_data = spec.ssl_cert + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config-key set', + 'key': f'iscsi/{utils.name_to_config_section("iscsi")}.{igw_id}/iscsi-gateway.crt', + 'val': cert_data, + }) + + if spec.ssl_key: + if isinstance(spec.ssl_key, list): + key_data = '\n'.join(spec.ssl_key) + else: + key_data = spec.ssl_key + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config-key set', + 'key': f'iscsi/{utils.name_to_config_section("iscsi")}.{igw_id}/iscsi-gateway.key', + 'val': key_data, + }) + + trusted_ip_list = self.get_trusted_ips(spec) + + context = { + 'client_name': '{}.{}'.format(utils.name_to_config_section('iscsi'), igw_id), + 'trusted_ip_list': trusted_ip_list, + 'spec': spec + } + igw_conf = self.mgr.template.render('services/iscsi/iscsi-gateway.cfg.j2', context) + + daemon_spec.keyring = keyring + daemon_spec.extra_files = {'iscsi-gateway.cfg': igw_conf} + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + daemon_spec.deps = [trusted_ip_list] + return daemon_spec + + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: + def get_set_cmd_dicts(out: str) -> List[dict]: + gateways = json.loads(out)['gateways'] + cmd_dicts = [] + # TODO: fail, if we don't have a spec + spec = cast(IscsiServiceSpec, + self.mgr.spec_store.all_specs.get(daemon_descrs[0].service_name(), None)) + if spec.api_secure and spec.ssl_cert and spec.ssl_key: + cmd_dicts.append({ + 'prefix': 'dashboard set-iscsi-api-ssl-verification', + 'value': "false" + }) + else: + cmd_dicts.append({ + 'prefix': 'dashboard set-iscsi-api-ssl-verification', + 'value': "true" + }) + for dd in daemon_descrs: + assert dd.hostname is not None + # todo: this can fail: + spec = cast(IscsiServiceSpec, + self.mgr.spec_store.all_specs.get(dd.service_name(), None)) + if not spec: + logger.warning('No ServiceSpec found for %s', dd) + continue + ip = utils.resolve_ip(self.mgr.inventory.get_addr(dd.hostname)) + # IPv6 URL encoding requires square brackets enclosing the ip + if type(ip_address(ip)) is IPv6Address: + ip = f'[{ip}]' + protocol = "http" + if spec.api_secure and spec.ssl_cert and spec.ssl_key: + protocol = "https" + service_url = '{}://{}:{}@{}:{}'.format( + protocol, spec.api_user or 'admin', spec.api_password or 'admin', ip, spec.api_port or '5000') + gw = gateways.get(dd.hostname) + if not gw or gw['service_url'] != service_url: + safe_service_url = '{}://{}:{}@{}:{}'.format( + protocol, '<api-user>', '<api-password>', ip, spec.api_port or '5000') + logger.info('Adding iSCSI gateway %s to Dashboard', safe_service_url) + cmd_dicts.append({ + 'prefix': 'dashboard iscsi-gateway-add', + 'inbuf': service_url, + 'name': dd.hostname + }) + return cmd_dicts + + self._check_and_set_dashboard( + service_name='iSCSI', + get_cmd='dashboard iscsi-gateway-list', + get_set_cmd_dicts=get_set_cmd_dicts + ) + + def ok_to_stop(self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None) -> HandleCommandResult: + # if only 1 iscsi, alert user (this is not passable with --force) + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Iscsi', 1, True) + if warn: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + # if reached here, there is > 1 nfs daemon. make sure none are down + warn_message = ( + 'ALERT: 1 iscsi daemon is already down. Please bring it back up before stopping this one') + iscsi_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE) + for i in iscsi_daemons: + if i.status != DaemonDescriptionStatus.running: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids] + warn_message = f'It is presumed safe to stop {names}' + return HandleCommandResult(0, warn_message, '') + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + """ + Called after the daemon is removed. + """ + logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}') + + # remove config for dashboard iscsi gateways + ret, out, err = self.mgr.mon_command({ + 'prefix': 'dashboard iscsi-gateway-rm', + 'name': daemon.hostname, + }) + if not ret: + logger.info(f'{daemon.hostname} removed from iscsi gateways dashboard config') + + # needed to know if we have ssl stuff for iscsi in ceph config + iscsi_config_dict = {} + ret, iscsi_config, err = self.mgr.mon_command({ + 'prefix': 'config-key dump', + 'key': 'iscsi', + }) + if iscsi_config: + iscsi_config_dict = json.loads(iscsi_config) + + # remove iscsi cert and key from ceph config + for iscsi_key, value in iscsi_config_dict.items(): + if f'iscsi/client.{daemon.name()}/' in iscsi_key: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'config-key rm', + 'key': iscsi_key, + }) + logger.info(f'{iscsi_key} removed from ceph config') + + def purge(self, service_name: str) -> None: + """Removes configuration + """ + spec = cast(IscsiServiceSpec, self.mgr.spec_store[service_name].spec) + try: + # remove service configuration from the pool + try: + subprocess.run(['rados', + '-k', str(self.mgr.get_ceph_option('keyring')), + '-n', f'mgr.{self.mgr.get_mgr_id()}', + '-p', cast(str, spec.pool), + 'rm', + 'gateway.conf'], + timeout=5) + logger.info(f'<gateway.conf> removed from {spec.pool}') + except subprocess.CalledProcessError as ex: + logger.error(f'Error executing <<{ex.cmd}>>: {ex.output}') + except subprocess.TimeoutExpired: + logger.error(f'timeout (5s) trying to remove <gateway.conf> from {spec.pool}') + + except Exception: + logger.exception(f'failed to purge {service_name}') diff --git a/src/pybind/mgr/cephadm/services/monitoring.py b/src/pybind/mgr/cephadm/services/monitoring.py new file mode 100644 index 000000000..8de7195a3 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/monitoring.py @@ -0,0 +1,502 @@ +import errno +import ipaddress +import logging +import os +import socket +from typing import List, Any, Tuple, Dict, Optional, cast +from urllib.parse import urlparse + +from mgr_module import HandleCommandResult + +from orchestrator import DaemonDescription +from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \ + SNMPGatewaySpec, PrometheusSpec +from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec +from cephadm.services.ingress import IngressSpec +from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url + +logger = logging.getLogger(__name__) + + +class GrafanaService(CephadmService): + TYPE = 'grafana' + DEFAULT_SERVICE_PORT = 3000 + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + deps = [] # type: List[str] + + prom_services = [] # type: List[str] + for dd in self.mgr.cache.get_daemons_by_service('prometheus'): + assert dd.hostname is not None + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + port = dd.ports[0] if dd.ports else 9095 + prom_services.append(build_url(scheme='http', host=addr, port=port)) + + deps.append(dd.name()) + grafana_data_sources = self.mgr.template.render( + 'services/grafana/ceph-dashboard.yml.j2', {'hosts': prom_services}) + + cert_path = f'{daemon_spec.host}/grafana_crt' + key_path = f'{daemon_spec.host}/grafana_key' + cert = self.mgr.get_store(cert_path) + pkey = self.mgr.get_store(key_path) + if cert and pkey: + try: + verify_tls(cert, pkey) + except ServerConfigException as e: + logger.warning('Provided grafana TLS certificates invalid: %s', str(e)) + cert, pkey = None, None + if not (cert and pkey): + cert, pkey = create_self_signed_cert('Ceph', daemon_spec.host) + self.mgr.set_store(cert_path, cert) + self.mgr.set_store(key_path, pkey) + if 'dashboard' in self.mgr.get('mgr_map')['modules']: + self.mgr.check_mon_command({ + 'prefix': 'dashboard set-grafana-api-ssl-verify', + 'value': 'false', + }) + + spec: GrafanaSpec = cast( + GrafanaSpec, self.mgr.spec_store.active_specs[daemon_spec.service_name]) + grafana_ini = self.mgr.template.render( + 'services/grafana/grafana.ini.j2', { + 'initial_admin_password': spec.initial_admin_password, + 'http_port': daemon_spec.ports[0] if daemon_spec.ports else self.DEFAULT_SERVICE_PORT, + 'http_addr': daemon_spec.ip if daemon_spec.ip else '' + }) + + if 'dashboard' in self.mgr.get('mgr_map')['modules'] and spec.initial_admin_password: + self.mgr.check_mon_command( + {'prefix': 'dashboard set-grafana-api-password'}, inbuf=spec.initial_admin_password) + + config_file = { + 'files': { + "grafana.ini": grafana_ini, + 'provisioning/datasources/ceph-dashboard.yml': grafana_data_sources, + 'certs/cert_file': '# generated by cephadm\n%s' % cert, + 'certs/cert_key': '# generated by cephadm\n%s' % pkey, + } + } + return config_file, sorted(deps) + + def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: + # Use the least-created one as the active daemon + if daemon_descrs: + return daemon_descrs[-1] + # if empty list provided, return empty Daemon Desc + return DaemonDescription() + + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: + # TODO: signed cert + dd = self.get_active_daemon(daemon_descrs) + assert dd.hostname is not None + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + port = dd.ports[0] if dd.ports else self.DEFAULT_SERVICE_PORT + service_url = build_url(scheme='https', host=addr, port=port) + self._set_service_url_on_dashboard( + 'Grafana', + 'dashboard get-grafana-api-url', + 'dashboard set-grafana-api-url', + service_url + ) + + def pre_remove(self, daemon: DaemonDescription) -> None: + """ + Called before grafana daemon is removed. + """ + if daemon.hostname is not None: + # delete cert/key entires for this grafana daemon + cert_path = f'{daemon.hostname}/grafana_crt' + key_path = f'{daemon.hostname}/grafana_key' + self.mgr.set_store(cert_path, None) + self.mgr.set_store(key_path, None) + + def ok_to_stop(self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None) -> HandleCommandResult: + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Grafana', 1) + if warn and not force: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + return HandleCommandResult(0, warn_message, '') + + +class AlertmanagerService(CephadmService): + TYPE = 'alertmanager' + DEFAULT_SERVICE_PORT = 9093 + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + deps: List[str] = [] + default_webhook_urls: List[str] = [] + + spec = cast(AlertManagerSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + try: + secure = spec.secure + except AttributeError: + secure = False + user_data = spec.user_data + if 'default_webhook_urls' in user_data and isinstance( + user_data['default_webhook_urls'], list): + default_webhook_urls.extend(user_data['default_webhook_urls']) + + # dashboard(s) + dashboard_urls: List[str] = [] + snmp_gateway_urls: List[str] = [] + mgr_map = self.mgr.get('mgr_map') + port = None + proto = None # http: or https: + url = mgr_map.get('services', {}).get('dashboard', None) + if url: + p_result = urlparse(url.rstrip('/')) + hostname = socket.getfqdn(p_result.hostname) + + try: + ip = ipaddress.ip_address(hostname) + except ValueError: + pass + else: + if ip.version == 6: + hostname = f'[{hostname}]' + + dashboard_urls.append( + f'{p_result.scheme}://{hostname}:{p_result.port}{p_result.path}') + proto = p_result.scheme + port = p_result.port + # scan all mgrs to generate deps and to get standbys too. + # assume that they are all on the same port as the active mgr. + for dd in self.mgr.cache.get_daemons_by_service('mgr'): + # we consider mgr a dep even if the dashboard is disabled + # in order to be consistent with _calc_daemon_deps(). + deps.append(dd.name()) + if not port: + continue + if dd.daemon_id == self.mgr.get_mgr_id(): + continue + assert dd.hostname is not None + addr = self._inventory_get_fqdn(dd.hostname) + dashboard_urls.append(build_url(scheme=proto, host=addr, port=port).rstrip('/')) + + for dd in self.mgr.cache.get_daemons_by_service('snmp-gateway'): + assert dd.hostname is not None + assert dd.ports + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + deps.append(dd.name()) + + snmp_gateway_urls.append(build_url(scheme='http', host=addr, + port=dd.ports[0], path='/alerts')) + + context = { + 'dashboard_urls': dashboard_urls, + 'default_webhook_urls': default_webhook_urls, + 'snmp_gateway_urls': snmp_gateway_urls, + 'secure': secure, + } + yml = self.mgr.template.render('services/alertmanager/alertmanager.yml.j2', context) + + peers = [] + port = 9094 + for dd in self.mgr.cache.get_daemons_by_service('alertmanager'): + assert dd.hostname is not None + deps.append(dd.name()) + addr = self._inventory_get_fqdn(dd.hostname) + peers.append(build_url(host=addr, port=port).lstrip('/')) + + return { + "files": { + "alertmanager.yml": yml + }, + "peers": peers + }, sorted(deps) + + def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: + # TODO: if there are multiple daemons, who is the active one? + if daemon_descrs: + return daemon_descrs[0] + # if empty list provided, return empty Daemon Desc + return DaemonDescription() + + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: + dd = self.get_active_daemon(daemon_descrs) + assert dd.hostname is not None + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + port = dd.ports[0] if dd.ports else self.DEFAULT_SERVICE_PORT + service_url = build_url(scheme='http', host=addr, port=port) + self._set_service_url_on_dashboard( + 'AlertManager', + 'dashboard get-alertmanager-api-host', + 'dashboard set-alertmanager-api-host', + service_url + ) + + def ok_to_stop(self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None) -> HandleCommandResult: + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Alertmanager', 1) + if warn and not force: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + return HandleCommandResult(0, warn_message, '') + + +class PrometheusService(CephadmService): + TYPE = 'prometheus' + DEFAULT_SERVICE_PORT = 9095 + DEFAULT_MGR_PROMETHEUS_PORT = 9283 + + def config(self, spec: ServiceSpec) -> None: + # make sure module is enabled + mgr_map = self.mgr.get('mgr_map') + if 'prometheus' not in mgr_map.get('services', {}): + self.mgr.check_mon_command({ + 'prefix': 'mgr module enable', + 'module': 'prometheus' + }) + # we shouldn't get here (mon will tell the mgr to respawn), but no + # harm done if we do. + + def prepare_create( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config( + self, + daemon_spec: CephadmDaemonDeploySpec, + ) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + deps = [] # type: List[str] + + prom_spec = cast(PrometheusSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + + try: + retention_time = prom_spec.retention_time if prom_spec.retention_time else '15d' + except AttributeError: + retention_time = '15d' + + # scrape mgrs + mgr_scrape_list = [] + mgr_map = self.mgr.get('mgr_map') + port = cast(int, self.mgr.get_module_option_ex( + 'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT)) + deps.append(str(port)) + t = mgr_map.get('services', {}).get('prometheus', None) + if t: + p_result = urlparse(t) + # urlparse .hostname removes '[]' from the hostname in case + # of ipv6 addresses so if this is the case then we just + # append the brackets when building the final scrape endpoint + if '[' in p_result.netloc and ']' in p_result.netloc: + mgr_scrape_list.append(f"[{p_result.hostname}]:{port}") + else: + mgr_scrape_list.append(f"{p_result.hostname}:{port}") + # scan all mgrs to generate deps and to get standbys too. + # assume that they are all on the same port as the active mgr. + for dd in self.mgr.cache.get_daemons_by_service('mgr'): + # we consider the mgr a dep even if the prometheus module is + # disabled in order to be consistent with _calc_daemon_deps(). + deps.append(dd.name()) + if not port: + continue + if dd.daemon_id == self.mgr.get_mgr_id(): + continue + assert dd.hostname is not None + addr = self._inventory_get_fqdn(dd.hostname) + mgr_scrape_list.append(build_url(host=addr, port=port).lstrip('/')) + + # scrape node exporters + nodes = [] + for dd in self.mgr.cache.get_daemons_by_service('node-exporter'): + assert dd.hostname is not None + deps.append(dd.name()) + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + port = dd.ports[0] if dd.ports else 9100 + nodes.append({ + 'hostname': dd.hostname, + 'url': build_url(host=addr, port=port).lstrip('/') + }) + + # scrape alert managers + alertmgr_targets = [] + for dd in self.mgr.cache.get_daemons_by_service('alertmanager'): + assert dd.hostname is not None + deps.append(dd.name()) + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + port = dd.ports[0] if dd.ports else 9093 + alertmgr_targets.append("'{}'".format(build_url(host=addr, port=port).lstrip('/'))) + + # scrape haproxies + haproxy_targets = [] + for dd in self.mgr.cache.get_daemons_by_type('ingress'): + if dd.service_name() in self.mgr.spec_store: + spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec) + assert dd.hostname is not None + deps.append(dd.name()) + if dd.daemon_type == 'haproxy': + addr = self._inventory_get_fqdn(dd.hostname) + haproxy_targets.append({ + "url": f"'{build_url(host=addr, port=spec.monitor_port).lstrip('/')}'", + "service": dd.service_name(), + }) + + # generate the prometheus configuration + context = { + 'alertmgr_targets': alertmgr_targets, + 'mgr_scrape_list': mgr_scrape_list, + 'haproxy_targets': haproxy_targets, + 'nodes': nodes, + } + r: Dict[str, Any] = { + 'files': { + 'prometheus.yml': + self.mgr.template.render( + 'services/prometheus/prometheus.yml.j2', context) + }, + 'retention_time': retention_time + } + + # include alerts, if present in the container + if os.path.exists(self.mgr.prometheus_alerts_path): + with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f: + alerts = f.read() + r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts + + # Include custom alerts if present in key value store. This enables the + # users to add custom alerts. Write the file in any case, so that if the + # content of the key value store changed, that file is overwritten + # (emptied in case they value has been removed from the key value + # store). This prevents the necessity to adapt `cephadm` binary to + # remove the file. + # + # Don't use the template engine for it as + # + # 1. the alerts are always static and + # 2. they are a template themselves for the Go template engine, which + # use curly braces and escaping that is cumbersome and unnecessary + # for the user. + # + r['files']['/etc/prometheus/alerting/custom_alerts.yml'] = \ + self.mgr.get_store('services/prometheus/alerting/custom_alerts.yml', '') + + return r, sorted(deps) + + def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: + # TODO: if there are multiple daemons, who is the active one? + if daemon_descrs: + return daemon_descrs[0] + # if empty list provided, return empty Daemon Desc + return DaemonDescription() + + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: + dd = self.get_active_daemon(daemon_descrs) + assert dd.hostname is not None + addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) + port = dd.ports[0] if dd.ports else self.DEFAULT_SERVICE_PORT + service_url = build_url(scheme='http', host=addr, port=port) + self._set_service_url_on_dashboard( + 'Prometheus', + 'dashboard get-prometheus-api-host', + 'dashboard set-prometheus-api-host', + service_url + ) + + def ok_to_stop(self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None) -> HandleCommandResult: + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Prometheus', 1) + if warn and not force: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + return HandleCommandResult(0, warn_message, '') + + +class NodeExporterService(CephadmService): + TYPE = 'node-exporter' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + return {}, [] + + def ok_to_stop(self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None) -> HandleCommandResult: + # since node exporter runs on each host and cannot compromise data, no extra checks required + names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids] + out = f'It is presumed safe to stop {names}' + return HandleCommandResult(0, out, '') + + +class SNMPGatewayService(CephadmService): + TYPE = 'snmp-gateway' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + deps: List[str] = [] + + spec = cast(SNMPGatewaySpec, self.mgr.spec_store[daemon_spec.service_name].spec) + config = { + "destination": spec.snmp_destination, + "snmp_version": spec.snmp_version, + } + if spec.snmp_version == 'V2c': + community = spec.credentials.get('snmp_community', None) + assert community is not None + + config.update({ + "snmp_community": community + }) + else: + # SNMP v3 settings can be either authNoPriv or authPriv + auth_protocol = 'SHA' if not spec.auth_protocol else spec.auth_protocol + + auth_username = spec.credentials.get('snmp_v3_auth_username', None) + auth_password = spec.credentials.get('snmp_v3_auth_password', None) + assert auth_username is not None + assert auth_password is not None + assert spec.engine_id is not None + + config.update({ + "snmp_v3_auth_protocol": auth_protocol, + "snmp_v3_auth_username": auth_username, + "snmp_v3_auth_password": auth_password, + "snmp_v3_engine_id": spec.engine_id, + }) + # authPriv adds encryption + if spec.privacy_protocol: + priv_password = spec.credentials.get('snmp_v3_priv_password', None) + assert priv_password is not None + + config.update({ + "snmp_v3_priv_protocol": spec.privacy_protocol, + "snmp_v3_priv_password": priv_password, + }) + + logger.debug( + f"Generated configuration for '{self.TYPE}' service. Dependencies={deps}") + + return config, sorted(deps) diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py new file mode 100644 index 000000000..ee53283bd --- /dev/null +++ b/src/pybind/mgr/cephadm/services/nfs.py @@ -0,0 +1,290 @@ +import errno +import logging +import os +import subprocess +import tempfile +from typing import Dict, Tuple, Any, List, cast, Optional + +from mgr_module import HandleCommandResult +from mgr_module import NFS_POOL_NAME as POOL_NAME + +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec + +from orchestrator import DaemonDescription + +from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonDeploySpec, CephService + +logger = logging.getLogger(__name__) + + +class NFSService(CephService): + TYPE = 'nfs' + + def ranked(self) -> bool: + return True + + def fence(self, daemon_id: str) -> None: + logger.info(f'Fencing old nfs.{daemon_id}') + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth rm', + 'entity': f'client.nfs.{daemon_id}', + }) + + # TODO: block/fence this entity (in case it is still running somewhere) + + def fence_old_ranks(self, + spec: ServiceSpec, + rank_map: Dict[int, Dict[int, Optional[str]]], + num_ranks: int) -> None: + for rank, m in list(rank_map.items()): + if rank >= num_ranks: + for daemon_id in m.values(): + if daemon_id is not None: + self.fence(daemon_id) + del rank_map[rank] + nodeid = f'{spec.service_name()}.{rank}' + self.mgr.log.info(f'Removing {nodeid} from the ganesha grace table') + self.run_grace_tool(cast(NFSServiceSpec, spec), 'remove', nodeid) + self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) + else: + max_gen = max(m.keys()) + for gen, daemon_id in list(m.items()): + if gen < max_gen: + if daemon_id is not None: + self.fence(daemon_id) + del rank_map[rank][gen] + self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) + + def config(self, spec: NFSServiceSpec) -> None: # type: ignore + from nfs.cluster import create_ganesha_pool + + assert self.TYPE == spec.service_type + create_ganesha_pool(self.mgr) + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type + + daemon_type = daemon_spec.daemon_type + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + spec = cast(NFSServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + + deps: List[str] = [] + + nodeid = f'{daemon_spec.service_name}.{daemon_spec.rank}' + + # create the RADOS recovery pool keyring + rados_user = f'{daemon_type}.{daemon_id}' + rados_keyring = self.create_keyring(daemon_spec) + + # ensure rank is known to ganesha + self.mgr.log.info(f'Ensuring {nodeid} is in the ganesha grace table') + self.run_grace_tool(spec, 'add', nodeid) + + # create the rados config object + self.create_rados_config_obj(spec) + + # create the RGW keyring + rgw_user = f'{rados_user}-rgw' + rgw_keyring = self.create_rgw_keyring(daemon_spec) + + # generate the ganesha config + def get_ganesha_conf() -> str: + context = { + "user": rados_user, + "nodeid": nodeid, + "pool": POOL_NAME, + "namespace": spec.service_id, + "rgw_user": rgw_user, + "url": f'rados://{POOL_NAME}/{spec.service_id}/{spec.rados_config_name()}', + # fall back to default NFS port if not present in daemon_spec + "port": daemon_spec.ports[0] if daemon_spec.ports else 2049, + "bind_addr": daemon_spec.ip if daemon_spec.ip else '', + } + return self.mgr.template.render('services/nfs/ganesha.conf.j2', context) + + # generate the cephadm config json + def get_cephadm_config() -> Dict[str, Any]: + config: Dict[str, Any] = {} + config['pool'] = POOL_NAME + config['namespace'] = spec.service_id + config['userid'] = rados_user + config['extra_args'] = ['-N', 'NIV_EVENT'] + config['files'] = { + 'ganesha.conf': get_ganesha_conf(), + } + config.update( + self.get_config_and_keyring( + daemon_type, daemon_id, + keyring=rados_keyring, + host=host + ) + ) + config['rgw'] = { + 'cluster': 'ceph', + 'user': rgw_user, + 'keyring': rgw_keyring, + } + logger.debug('Generated cephadm config-json: %s' % config) + return config + + return get_cephadm_config(), deps + + def create_rados_config_obj(self, + spec: NFSServiceSpec, + clobber: bool = False) -> None: + objname = spec.rados_config_name() + cmd = [ + 'rados', + '-n', f"mgr.{self.mgr.get_mgr_id()}", + '-k', str(self.mgr.get_ceph_option('keyring')), + '-p', POOL_NAME, + '--namespace', cast(str, spec.service_id), + ] + result = subprocess.run( + cmd + ['get', objname, '-'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + timeout=10) + if not result.returncode and not clobber: + logger.info('Rados config object exists: %s' % objname) + else: + logger.info('Creating rados config object: %s' % objname) + result = subprocess.run( + cmd + ['put', objname, '-'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + timeout=10) + if result.returncode: + self.mgr.log.warning( + f'Unable to create rados config object {objname}: {result.stderr.decode("utf-8")}' + ) + raise RuntimeError(result.stderr.decode("utf-8")) + + def create_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str: + daemon_id = daemon_spec.daemon_id + spec = cast(NFSServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec) + entity: AuthEntity = self.get_auth_entity(daemon_id) + + osd_caps = 'allow rw pool=%s namespace=%s' % (POOL_NAME, spec.service_id) + + logger.info('Creating key for %s' % entity) + keyring = self.get_keyring_with_caps(entity, + ['mon', 'allow r', + 'osd', osd_caps]) + + return keyring + + def create_rgw_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str: + daemon_id = daemon_spec.daemon_id + entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw') + + logger.info('Creating key for %s' % entity) + keyring = self.get_keyring_with_caps(entity, + ['mon', 'allow r', + 'osd', 'allow rwx tag rgw *=*']) + + return keyring + + def run_grace_tool(self, + spec: NFSServiceSpec, + action: str, + nodeid: str) -> None: + # write a temp keyring and referencing config file. this is a kludge + # because the ganesha-grace-tool can only authenticate as a client (and + # not a mgr). Also, it doesn't allow you to pass a keyring location via + # the command line, nor does it parse the CEPH_ARGS env var. + tmp_id = f'mgr.nfs.grace.{spec.service_name()}' + entity = AuthEntity(f'client.{tmp_id}') + keyring = self.get_keyring_with_caps( + entity, + ['mon', 'allow r', 'osd', f'allow rwx pool {POOL_NAME}'] + ) + tmp_keyring = tempfile.NamedTemporaryFile(mode='w', prefix='mgr-grace-keyring') + os.fchmod(tmp_keyring.fileno(), 0o600) + tmp_keyring.write(keyring) + tmp_keyring.flush() + tmp_conf = tempfile.NamedTemporaryFile(mode='w', prefix='mgr-grace-conf') + tmp_conf.write(self.mgr.get_minimal_ceph_conf()) + tmp_conf.write(f'\tkeyring = {tmp_keyring.name}\n') + tmp_conf.flush() + try: + cmd: List[str] = [ + 'ganesha-rados-grace', + '--cephconf', tmp_conf.name, + '--userid', tmp_id, + '--pool', POOL_NAME, + '--ns', cast(str, spec.service_id), + action, nodeid, + ] + self.mgr.log.debug(cmd) + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + timeout=10) + if result.returncode: + self.mgr.log.warning( + f'ganesha-rados-grace tool failed: {result.stderr.decode("utf-8")}' + ) + raise RuntimeError(f'grace tool failed: {result.stderr.decode("utf-8")}') + + finally: + self.mgr.check_mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + + def remove_rgw_keyring(self, daemon: DaemonDescription) -> None: + assert daemon.daemon_id is not None + daemon_id: str = daemon.daemon_id + entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw') + + logger.info(f'Removing key for {entity}') + self.mgr.check_mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + super().post_remove(daemon, is_failed_deploy=is_failed_deploy) + self.remove_rgw_keyring(daemon) + + def ok_to_stop(self, + daemon_ids: List[str], + force: bool = False, + known: Optional[List[str]] = None) -> HandleCommandResult: + # if only 1 nfs, alert user (this is not passable with --force) + warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'NFS', 1, True) + if warn: + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + # if reached here, there is > 1 nfs daemon. + if force: + return HandleCommandResult(0, warn_message, '') + + # if reached here, > 1 nfs daemon and no force flag. + # Provide warning + warn_message = "WARNING: Removing NFS daemons can cause clients to lose connectivity. " + return HandleCommandResult(-errno.EBUSY, '', warn_message) + + def purge(self, service_name: str) -> None: + if service_name not in self.mgr.spec_store: + return + spec = cast(NFSServiceSpec, self.mgr.spec_store[service_name].spec) + + logger.info(f'Removing grace file for {service_name}') + cmd = [ + 'rados', + '-n', f"mgr.{self.mgr.get_mgr_id()}", + '-k', str(self.mgr.get_ceph_option('keyring')), + '-p', POOL_NAME, + '--namespace', cast(str, spec.service_id), + 'rm', 'grace', + ] + subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=10 + ) diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py new file mode 100644 index 000000000..5899ba49a --- /dev/null +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -0,0 +1,956 @@ +import json +import logging +from threading import Lock +from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING + +from ceph.deployment import translate +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.drive_selection import DriveSelection +from ceph.deployment.inventory import Device +from ceph.utils import datetime_to_str, str_to_datetime + +from datetime import datetime +import orchestrator +from cephadm.serve import CephadmServe +from cephadm.utils import forall_hosts +from ceph.utils import datetime_now +from orchestrator import OrchestratorError, DaemonDescription +from mgr_module import MonCommandFailed + +from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + +logger = logging.getLogger(__name__) + + +class OSDService(CephService): + TYPE = 'osd' + + def create_from_spec(self, drive_group: DriveGroupSpec) -> str: + logger.debug(f"Processing DriveGroup {drive_group}") + osd_id_claims = OsdIdClaims(self.mgr) + if osd_id_claims.get(): + logger.info( + f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}") + + @forall_hosts + def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: + # skip this host if there has been no change in inventory + if not self.mgr.cache.osdspec_needs_apply(host, drive_group): + self.mgr.log.debug("skipping apply of %s on %s (no change)" % ( + host, drive_group)) + return None + # skip this host if we cannot schedule here + if self.mgr.inventory.has_label(host, '_no_schedule'): + return None + + osd_id_claims_for_host = osd_id_claims.filtered_by_host(host) + + cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection, + osd_id_claims_for_host) + if not cmds: + logger.debug("No data_devices, skipping DriveGroup: {}".format( + drive_group.service_id)) + return None + + logger.debug('Applying service osd.%s on host %s...' % ( + drive_group.service_id, host + )) + start_ts = datetime_now() + env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] + ret_msg = self.create_single_host( + drive_group, host, cmds, + replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars + ) + self.mgr.cache.update_osdspec_last_applied( + host, drive_group.service_name(), start_ts + ) + self.mgr.cache.save_host(host) + return ret_msg + + ret = create_from_spec_one(self.prepare_drivegroup(drive_group)) + return ", ".join(filter(None, ret)) + + def create_single_host(self, + drive_group: DriveGroupSpec, + host: str, cmds: List[str], replace_osd_ids: List[str], + env_vars: Optional[List[str]] = None) -> str: + for cmd in cmds: + out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars) + if code == 1 and ', it is already prepared' in '\n'.join(err): + # HACK: when we create against an existing LV, ceph-volume + # returns an error and the above message. To make this + # command idempotent, tolerate this "error" and continue. + logger.debug('the device was already prepared; continuing') + code = 0 + if code: + raise RuntimeError( + 'cephadm exited with an error code: %d, stderr:%s' % ( + code, '\n'.join(err))) + return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(), + replace_osd_ids) + + def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str, + replace_osd_ids: Optional[List[str]] = None) -> str: + + if replace_osd_ids is None: + replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host) + assert replace_osd_ids is not None + + # check result: lvm + osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json( + host, 'osd', 'ceph-volume', + [ + '--', + 'lvm', 'list', + '--format', 'json', + ]) + before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) + fsid = self.mgr._cluster_fsid + osd_uuid_map = self.mgr.get_osd_uuid_map() + created = [] + for osd_id, osds in osds_elems.items(): + for osd in osds: + if osd['type'] == 'db': + continue + if osd['tags']['ceph.cluster_fsid'] != fsid: + logger.debug('mismatched fsid, skipping %s' % osd) + continue + if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: + # if it exists but is part of the replacement operation, don't skip + continue + if self.mgr.cache.has_daemon(f'osd.{osd_id}', host): + # cephadm daemon instance already exists + logger.debug(f'osd id {osd_id} daemon already exists') + continue + if osd_id not in osd_uuid_map: + logger.debug('osd id {} does not exist in cluster'.format(osd_id)) + continue + if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: + logger.debug('mismatched osd uuid (cluster has %s, osd ' + 'has %s)' % ( + osd_uuid_map.get(osd_id), + osd['tags']['ceph.osd_fsid'])) + continue + + created.append(osd_id) + daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec( + service_name=service_name, + daemon_id=str(osd_id), + host=host, + daemon_type='osd', + ) + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + CephadmServe(self.mgr)._create_daemon( + daemon_spec, + osd_uuid_map=osd_uuid_map) + + # check result: raw + raw_elems: dict = CephadmServe(self.mgr)._run_cephadm_json( + host, 'osd', 'ceph-volume', + [ + '--', + 'raw', 'list', + '--format', 'json', + ]) + for osd_uuid, osd in raw_elems.items(): + if osd.get('ceph_fsid') != fsid: + continue + osd_id = str(osd.get('osd_id', '-1')) + if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: + # if it exists but is part of the replacement operation, don't skip + continue + if self.mgr.cache.has_daemon(f'osd.{osd_id}', host): + # cephadm daemon instance already exists + logger.debug(f'osd id {osd_id} daemon already exists') + continue + if osd_id not in osd_uuid_map: + logger.debug('osd id {} does not exist in cluster'.format(osd_id)) + continue + if osd_uuid_map.get(osd_id) != osd_uuid: + logger.debug('mismatched osd uuid (cluster has %s, osd ' + 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid)) + continue + if osd_id in created: + continue + + created.append(osd_id) + daemon_spec = CephadmDaemonDeploySpec( + service_name=service_name, + daemon_id=osd_id, + host=host, + daemon_type='osd', + ) + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + CephadmServe(self.mgr)._create_daemon( + daemon_spec, + osd_uuid_map=osd_uuid_map) + + if created: + self.mgr.cache.invalidate_host_devices(host) + self.mgr.cache.invalidate_autotune(host) + return "Created osd(s) %s on host '%s'" % (','.join(created), host) + else: + return "Created no osd(s) on host %s; already created?" % host + + def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: + # 1) use fn_filter to determine matching_hosts + matching_hosts = drive_group.placement.filter_matching_hostspecs( + self.mgr._schedulable_hosts()) + # 2) Map the inventory to the InventoryHost object + host_ds_map = [] + + # set osd_id_claims + + def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]: + # This is stupid and needs to be loaded with the host + for _host, _inventory in inventory_dict.items(): + if _host == hostname: + return _inventory + raise OrchestratorError("No inventory found for host: {}".format(hostname)) + + # 3) iterate over matching_host and call DriveSelection + logger.debug(f"Checking matching hosts -> {matching_hosts}") + for host in matching_hosts: + inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) + logger.debug(f"Found inventory for host {inventory_for_host}") + + # List of Daemons on that host + dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name()) + dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host] + + drive_selection = DriveSelection(drive_group, inventory_for_host, + existing_daemons=len(dd_for_spec_and_host)) + logger.debug(f"Found drive selection {drive_selection}") + if drive_group.method and drive_group.method == 'raw': + # ceph-volume can currently only handle a 1:1 mapping + # of data/db/wal devices for raw mode osds. If db/wal devices + # are defined and the number does not match the number of data + # devices, we need to bail out + if drive_selection.data_devices() and drive_selection.db_devices(): + if len(drive_selection.data_devices()) != len(drive_selection.db_devices()): + raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found ' + f'{len(drive_selection.data_devices())} potential data device(s) and ' + f'{len(drive_selection.db_devices())} potential db device(s) on host {host}') + if drive_selection.data_devices() and drive_selection.wal_devices(): + if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()): + raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found ' + f'{len(drive_selection.data_devices())} potential data device(s) and ' + f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}') + host_ds_map.append((host, drive_selection)) + return host_ds_map + + @staticmethod + def driveselection_to_ceph_volume(drive_selection: DriveSelection, + osd_id_claims: Optional[List[str]] = None, + preview: bool = False) -> List[str]: + logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command") + cmds: List[str] = translate.to_ceph_volume(drive_selection, + osd_id_claims, preview=preview).run() + logger.debug(f"Resulting ceph-volume cmds: {cmds}") + return cmds + + def get_previews(self, host: str) -> List[Dict[str, Any]]: + # Find OSDSpecs that match host. + osdspecs = self.resolve_osdspecs_for_host(host) + return self.generate_previews(osdspecs, host) + + def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]: + """ + + The return should look like this: + + [ + {'data': {<metadata>}, + 'osdspec': <name of osdspec>, + 'host': <name of host>, + 'notes': <notes> + }, + + {'data': ..., + 'osdspec': .., + 'host': ..., + 'notes': ... + } + ] + + Note: One host can have multiple previews based on its assigned OSDSpecs. + """ + self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}") + ret_all: List[Dict[str, Any]] = [] + if not osdspecs: + return ret_all + for osdspec in osdspecs: + + # populate osd_id_claims + osd_id_claims = OsdIdClaims(self.mgr) + + # prepare driveselection + for host, ds in self.prepare_drivegroup(osdspec): + if host != for_host: + continue + + # driveselection for host + cmds: List[str] = self.driveselection_to_ceph_volume(ds, + osd_id_claims.filtered_by_host(host), + preview=True) + if not cmds: + logger.debug("No data_devices, skipping DriveGroup: {}".format( + osdspec.service_name())) + continue + + # get preview data from ceph-volume + for cmd in cmds: + out, err, code = self._run_ceph_volume_command(host, cmd) + if out: + try: + concat_out: Dict[str, Any] = json.loads(' '.join(out)) + except ValueError: + logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out)) + concat_out = {} + notes = [] + if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit: + found = len(concat_out) + limit = osdspec.data_devices.limit + notes.append( + f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})') + ret_all.append({'data': concat_out, + 'osdspec': osdspec.service_id, + 'host': host, + 'notes': notes}) + return ret_all + + def resolve_hosts_for_osdspecs(self, + specs: Optional[List[DriveGroupSpec]] = None + ) -> List[str]: + osdspecs = [] + if specs: + osdspecs = [cast(DriveGroupSpec, spec) for spec in specs] + if not osdspecs: + self.mgr.log.debug("No OSDSpecs found") + return [] + return sum([spec.placement.filter_matching_hostspecs(self.mgr._schedulable_hosts()) for spec in osdspecs], []) + + def resolve_osdspecs_for_host(self, host: str, + specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]: + matching_specs = [] + self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>") + if not specs: + specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items() + if spec.service_type == 'osd'] + for spec in specs: + if host in spec.placement.filter_matching_hostspecs(self.mgr._schedulable_hosts()): + self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>") + matching_specs.append(spec) + return matching_specs + + def _run_ceph_volume_command(self, host: str, + cmd: str, env_vars: Optional[List[str]] = None + ) -> Tuple[List[str], List[str], int]: + self.mgr.inventory.assert_host(host) + + # get bootstrap key + ret, keyring, err = self.mgr.check_mon_command({ + 'prefix': 'auth get', + 'entity': 'client.bootstrap-osd', + }) + + j = json.dumps({ + 'config': self.mgr.get_minimal_ceph_conf(), + 'keyring': keyring, + }) + + split_cmd = cmd.split(' ') + _cmd = ['--config-json', '-', '--'] + _cmd.extend(split_cmd) + out, err, code = CephadmServe(self.mgr)._run_cephadm( + host, 'osd', 'ceph-volume', + _cmd, + env_vars=env_vars, + stdin=j, + error_ok=True) + return out, err, code + + def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: + # Do not remove the osd.N keyring, if we failed to deploy the OSD, because + # we cannot recover from it. The OSD keys are created by ceph-volume and not by + # us. + if not is_failed_deploy: + super().post_remove(daemon, is_failed_deploy=is_failed_deploy) + + +class OsdIdClaims(object): + """ + Retrieve and provide osd ids that can be reused in the cluster + """ + + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr + self.osd_host_map: Dict[str, List[str]] = dict() + self.refresh() + + def refresh(self) -> None: + try: + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'osd tree', + 'states': ['destroyed'], + 'format': 'json' + }) + except MonCommandFailed as e: + logger.exception('osd tree failed') + raise OrchestratorError(str(e)) + try: + tree = json.loads(out) + except ValueError: + logger.exception(f'Cannot decode JSON: \'{out}\'') + return + + nodes = tree.get('nodes', {}) + for node in nodes: + if node.get('type') == 'host': + self.osd_host_map.update( + {node.get('name'): [str(_id) for _id in node.get('children', list())]} + ) + if self.osd_host_map: + self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}") + + def get(self) -> Dict[str, List[str]]: + return self.osd_host_map + + def filtered_by_host(self, host: str) -> List[str]: + """ + Return the list of osd ids that can be reused in a host + + OSD id claims in CRUSH map are linked to the bare name of + the hostname. In case of FQDN hostnames the host is searched by the + bare name + """ + return self.osd_host_map.get(host.split(".")[0], []) + + +class RemoveUtil(object): + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr + + def get_osds_in_cluster(self) -> List[str]: + osd_map = self.mgr.get_osdmap() + return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] + + def osd_df(self) -> dict: + base_cmd = 'osd df' + ret, out, err = self.mgr.mon_command({ + 'prefix': base_cmd, + 'format': 'json' + }) + try: + return json.loads(out) + except ValueError: + logger.exception(f'Cannot decode JSON: \'{out}\'') + return {} + + def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: + if not osd_df: + osd_df = self.osd_df() + osd_nodes = osd_df.get('nodes', []) + for osd_node in osd_nodes: + if osd_node.get('id') == int(osd_id): + return osd_node.get('pgs', -1) + return -1 + + def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: + """ + Cut osd_id list in half until it's ok-to-stop + + :param osds: list of osd_ids + :return: list of ods_ids that can be stopped at once + """ + if not osds: + return [] + while not self.ok_to_stop(osds): + if len(osds) <= 1: + # can't even stop one OSD, aborting + self.mgr.log.debug( + "Can't even stop one OSD. Cluster is probably busy. Retrying later..") + return [] + + # This potentially prolongs the global wait time. + self.mgr.event.wait(1) + # splitting osd_ids in half until ok_to_stop yields success + # maybe popping ids off one by one is better here..depends on the cluster size I guess.. + # There's a lot of room for micro adjustments here + osds = osds[len(osds) // 2:] + return osds + + # todo start draining + # return all([osd.start_draining() for osd in osds]) + + def ok_to_stop(self, osds: List["OSD"]) -> bool: + cmd_args = { + 'prefix': "osd ok-to-stop", + 'ids': [str(osd.osd_id) for osd in osds] + } + return self._run_mon_cmd(cmd_args, error_ok=True) + + def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: + base_cmd = f"osd {flag}" + self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") + ret, out, err = self.mgr.mon_command({ + 'prefix': base_cmd, + 'ids': [str(osd.osd_id) for osd in osds] + }) + if ret != 0: + self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>") + return False + self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}") + return True + + def get_weight(self, osd: "OSD") -> Optional[float]: + ret, out, err = self.mgr.mon_command({ + 'prefix': 'osd crush tree', + 'format': 'json', + }) + if ret != 0: + self.mgr.log.error(f"Could not dump crush weights. <{err}>") + return None + j = json.loads(out) + for n in j.get("nodes", []): + if n.get("name") == f"osd.{osd.osd_id}": + self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}") + return n.get("crush_weight") + return None + + def reweight_osd(self, osd: "OSD", weight: float) -> bool: + self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}") + ret, out, err = self.mgr.mon_command({ + 'prefix': "osd crush reweight", + 'name': f"osd.{osd.osd_id}", + 'weight': weight, + }) + if ret != 0: + self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>") + return False + self.mgr.log.info(f"{osd} weight is now {weight}") + return True + + def zap_osd(self, osd: "OSD") -> str: + "Zaps all devices that are associated with an OSD" + if osd.hostname is not None: + out, err, code = CephadmServe(self.mgr)._run_cephadm( + osd.hostname, 'osd', 'ceph-volume', + ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)], + error_ok=True) + self.mgr.cache.invalidate_host_devices(osd.hostname) + if code: + raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) + return '\n'.join(out + err) + raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown") + + def safe_to_destroy(self, osd_ids: List[int]) -> bool: + """ Queries the safe-to-destroy flag for OSDs """ + cmd_args = {'prefix': 'osd safe-to-destroy', + 'ids': [str(x) for x in osd_ids]} + return self._run_mon_cmd(cmd_args, error_ok=True) + + def destroy_osd(self, osd_id: int) -> bool: + """ Destroys an OSD (forcefully) """ + cmd_args = {'prefix': 'osd destroy-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True} + return self._run_mon_cmd(cmd_args) + + def purge_osd(self, osd_id: int) -> bool: + """ Purges an OSD from the cluster (forcefully) """ + cmd_args = { + 'prefix': 'osd purge-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True + } + return self._run_mon_cmd(cmd_args) + + def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool: + """ + Generic command to run mon_command and evaluate/log the results + """ + ret, out, err = self.mgr.mon_command(cmd_args) + if ret != 0: + self.mgr.log.debug(f"ran {cmd_args} with mon_command") + if not error_ok: + self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + return False + self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") + return True + + +class NotFoundError(Exception): + pass + + +class OSD: + + def __init__(self, + osd_id: int, + remove_util: RemoveUtil, + drain_started_at: Optional[datetime] = None, + process_started_at: Optional[datetime] = None, + drain_stopped_at: Optional[datetime] = None, + drain_done_at: Optional[datetime] = None, + draining: bool = False, + started: bool = False, + stopped: bool = False, + replace: bool = False, + force: bool = False, + hostname: Optional[str] = None, + zap: bool = False): + # the ID of the OSD + self.osd_id = osd_id + + # when did process (not the actual draining) start + self.process_started_at = process_started_at + + # when did the drain start + self.drain_started_at = drain_started_at + + # when did the drain stop + self.drain_stopped_at = drain_stopped_at + + # when did the drain finish + self.drain_done_at = drain_done_at + + # did the draining start + self.draining = draining + + # was the operation started + self.started = started + + # was the operation stopped + self.stopped = stopped + + # If this is a replace or remove operation + self.replace = replace + # If we wait for the osd to be drained + self.force = force + # The name of the node + self.hostname = hostname + + # mgr obj to make mgr/mon calls + self.rm_util: RemoveUtil = remove_util + + self.original_weight: Optional[float] = None + + # Whether devices associated with the OSD should be zapped (DATA ERASED) + self.zap = zap + + def start(self) -> None: + if self.started: + logger.debug(f"Already started draining {self}") + return None + self.started = True + self.stopped = False + + def start_draining(self) -> bool: + if self.stopped: + logger.debug(f"Won't start draining {self}. OSD draining is stopped.") + return False + if self.replace: + self.rm_util.set_osd_flag([self], 'out') + else: + self.original_weight = self.rm_util.get_weight(self) + self.rm_util.reweight_osd(self, 0.0) + self.drain_started_at = datetime.utcnow() + self.draining = True + logger.debug(f"Started draining {self}.") + return True + + def stop_draining(self) -> bool: + if self.replace: + self.rm_util.set_osd_flag([self], 'in') + else: + if self.original_weight: + self.rm_util.reweight_osd(self, self.original_weight) + self.drain_stopped_at = datetime.utcnow() + self.draining = False + logger.debug(f"Stopped draining {self}.") + return True + + def stop(self) -> None: + if self.stopped: + logger.debug(f"Already stopped draining {self}") + return None + self.started = False + self.stopped = True + self.stop_draining() + + @property + def is_draining(self) -> bool: + """ + Consider an OSD draining when it is + actively draining but not yet empty + """ + return self.draining and not self.is_empty + + @property + def is_ok_to_stop(self) -> bool: + return self.rm_util.ok_to_stop([self]) + + @property + def is_empty(self) -> bool: + if self.get_pg_count() == 0: + if not self.drain_done_at: + self.drain_done_at = datetime.utcnow() + self.draining = False + return True + return False + + def safe_to_destroy(self) -> bool: + return self.rm_util.safe_to_destroy([self.osd_id]) + + def down(self) -> bool: + return self.rm_util.set_osd_flag([self], 'down') + + def destroy(self) -> bool: + return self.rm_util.destroy_osd(self.osd_id) + + def do_zap(self) -> str: + return self.rm_util.zap_osd(self) + + def purge(self) -> bool: + return self.rm_util.purge_osd(self.osd_id) + + def get_pg_count(self) -> int: + return self.rm_util.get_pg_count(self.osd_id) + + @property + def exists(self) -> bool: + return str(self.osd_id) in self.rm_util.get_osds_in_cluster() + + def drain_status_human(self) -> str: + default_status = 'not started' + status = 'started' if self.started and not self.draining else default_status + status = 'draining' if self.draining else status + status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status + return status + + def pg_count_str(self) -> str: + return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) + + def to_json(self) -> dict: + out: Dict[str, Any] = dict() + out['osd_id'] = self.osd_id + out['started'] = self.started + out['draining'] = self.draining + out['stopped'] = self.stopped + out['replace'] = self.replace + out['force'] = self.force + out['zap'] = self.zap + out['hostname'] = self.hostname # type: ignore + + for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + else: + out[k] = getattr(self, k) + return out + + @classmethod + def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]: + if not inp: + return None + for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if inp.get(date_field): + inp.update({date_field: str_to_datetime(inp.get(date_field, ''))}) + inp.update({'remove_util': rm_util}) + if 'nodename' in inp: + hostname = inp.pop('nodename') + inp['hostname'] = hostname + return cls(**inp) + + def __hash__(self) -> int: + return hash(self.osd_id) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, OSD): + return NotImplemented + return self.osd_id == other.osd_id + + def __repr__(self) -> str: + return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}" + + +class OSDRemovalQueue(object): + + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr + self.osds: Set[OSD] = set() + self.rm_util = RemoveUtil(mgr) + + # locks multithreaded access to self.osds. Please avoid locking + # network calls, like mon commands. + self.lock = Lock() + + def process_removal_queue(self) -> None: + """ + Performs actions in the _serve() loop to remove an OSD + when criteria is met. + + we can't hold self.lock, as we're calling _remove_daemon in the loop + """ + + # make sure that we don't run on OSDs that are not in the cluster anymore. + self.cleanup() + + # find osds that are ok-to-stop and not yet draining + ready_to_drain_osds = self._ready_to_drain_osds() + if ready_to_drain_osds: + # start draining those + _ = [osd.start_draining() for osd in ready_to_drain_osds] + + all_osds = self.all_osds() + + logger.debug( + f"{self.queue_size()} OSDs are scheduled " + f"for removal: {all_osds}") + + # Check all osds for their state and take action (remove, purge etc) + new_queue: Set[OSD] = set() + for osd in all_osds: # type: OSD + if not osd.force: + # skip criteria + if not osd.is_empty: + logger.debug(f"{osd} is not empty yet. Waiting a bit more") + new_queue.add(osd) + continue + + if not osd.safe_to_destroy(): + logger.debug( + f"{osd} is not safe-to-destroy yet. Waiting a bit more") + new_queue.add(osd) + continue + + # abort criteria + if not osd.down(): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not mark {osd} down") + + # stop and remove daemon + assert osd.hostname is not None + + if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'): + CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname) + logger.info(f"Successfully removed {osd} on {osd.hostname}") + else: + logger.info(f"Daemon {osd} on {osd.hostname} was already removed") + + if osd.replace: + # mark destroyed in osdmap + if not osd.destroy(): + raise orchestrator.OrchestratorError( + f"Could not destroy {osd}") + logger.info( + f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement") + else: + # purge from osdmap + if not osd.purge(): + raise orchestrator.OrchestratorError(f"Could not purge {osd}") + logger.info(f"Successfully purged {osd} on {osd.hostname}") + + if osd.zap: + # throws an exception if the zap fails + logger.info(f"Zapping devices for {osd} on {osd.hostname}") + osd.do_zap() + logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}") + + logger.debug(f"Removing {osd} from the queue.") + + # self could change while this is processing (osds get added from the CLI) + # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and + # osds that were added while this method was executed' + with self.lock: + self.osds.intersection_update(new_queue) + self._save_to_store() + + def cleanup(self) -> None: + # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs + with self.lock: + for osd in self._not_in_cluster(): + self.osds.remove(osd) + + def _ready_to_drain_osds(self) -> List["OSD"]: + """ + Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can + be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs + that are already draining. + """ + draining_limit = max(1, self.mgr.max_osd_draining_count) + num_already_draining = len(self.draining_osds()) + num_to_start_draining = max(0, draining_limit - num_already_draining) + stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) + return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining] + + def _save_to_store(self) -> None: + osd_queue = [osd.to_json() for osd in self.osds] + logger.debug(f"Saving {osd_queue} to store") + self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) + + def load_from_store(self) -> None: + with self.lock: + for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): + for osd in json.loads(v): + logger.debug(f"Loading osd ->{osd} from store") + osd_obj = OSD.from_json(osd, rm_util=self.rm_util) + if osd_obj is not None: + self.osds.add(osd_obj) + + def as_osd_ids(self) -> List[int]: + with self.lock: + return [osd.osd_id for osd in self.osds] + + def queue_size(self) -> int: + with self.lock: + return len(self.osds) + + def draining_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds if osd.is_draining] + + def idling_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty] + + def empty_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds if osd.is_empty] + + def all_osds(self) -> List["OSD"]: + with self.lock: + return [osd for osd in self.osds] + + def _not_in_cluster(self) -> List["OSD"]: + return [osd for osd in self.osds if not osd.exists] + + def enqueue(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + with self.lock: + self.osds.add(osd) + osd.start() + + def rm(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + osd.stop() + with self.lock: + try: + logger.debug(f'Removing {osd} from the queue.') + self.osds.remove(osd) + except KeyError: + logger.debug(f"Could not find {osd} in queue.") + raise KeyError + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, OSDRemovalQueue): + return False + with self.lock: + return self.osds == other.osds |