summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/services
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/cephadm/services/__init__.py0
-rw-r--r--src/pybind/mgr/cephadm/services/cephadmservice.py1043
-rw-r--r--src/pybind/mgr/cephadm/services/container.py29
-rw-r--r--src/pybind/mgr/cephadm/services/exporter.py147
-rw-r--r--src/pybind/mgr/cephadm/services/ingress.py296
-rw-r--r--src/pybind/mgr/cephadm/services/iscsi.py210
-rw-r--r--src/pybind/mgr/cephadm/services/monitoring.py502
-rw-r--r--src/pybind/mgr/cephadm/services/nfs.py290
-rw-r--r--src/pybind/mgr/cephadm/services/osd.py956
9 files changed, 3473 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/services/__init__.py b/src/pybind/mgr/cephadm/services/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/__init__.py
diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py
new file mode 100644
index 000000000..92d293fcd
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/cephadmservice.py
@@ -0,0 +1,1043 @@
+import errno
+import json
+import logging
+import re
+import socket
+import time
+from abc import ABCMeta, abstractmethod
+from typing import TYPE_CHECKING, List, Callable, TypeVar, \
+ Optional, Dict, Any, Tuple, NewType, cast
+
+from mgr_module import HandleCommandResult, MonCommandFailed
+
+from ceph.deployment.service_spec import ServiceSpec, RGWSpec
+from ceph.deployment.utils import is_ipv6, unwrap_ipv6
+from mgr_util import build_url
+from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
+from orchestrator._interface import daemon_type_to_service
+from cephadm import utils
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
+AuthEntity = NewType('AuthEntity', str)
+
+
+class CephadmDaemonDeploySpec:
+ # typing.NamedTuple + Generic is broken in py36
+ def __init__(self, host: str, daemon_id: str,
+ service_name: str,
+ network: Optional[str] = None,
+ keyring: Optional[str] = None,
+ extra_args: Optional[List[str]] = None,
+ ceph_conf: str = '',
+ extra_files: Optional[Dict[str, Any]] = None,
+ daemon_type: Optional[str] = None,
+ ip: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None):
+ """
+ A data struction to encapsulate `cephadm deploy ...
+ """
+ self.host: str = host
+ self.daemon_id = daemon_id
+ self.service_name = service_name
+ daemon_type = daemon_type or (service_name.split('.')[0])
+ assert daemon_type is not None
+ self.daemon_type: str = daemon_type
+
+ # mons
+ self.network = network
+
+ # for run_cephadm.
+ self.keyring: Optional[str] = keyring
+
+ # For run_cephadm. Would be great to have more expressive names.
+ self.extra_args: List[str] = extra_args or []
+
+ self.ceph_conf = ceph_conf
+ self.extra_files = extra_files or {}
+
+ # TCP ports used by the daemon
+ self.ports: List[int] = ports or []
+ self.ip: Optional[str] = ip
+
+ # values to be populated during generate_config calls
+ # and then used in _run_cephadm
+ self.final_config: Dict[str, Any] = {}
+ self.deps: List[str] = []
+
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
+
+ self.extra_container_args = extra_container_args
+
+ def name(self) -> str:
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def config_get_files(self) -> Dict[str, Any]:
+ files = self.extra_files
+ if self.ceph_conf:
+ files['config'] = self.ceph_conf
+
+ return files
+
+ @staticmethod
+ def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
+ assert dd.hostname
+ assert dd.daemon_id
+ assert dd.daemon_type
+ return CephadmDaemonDeploySpec(
+ host=dd.hostname,
+ daemon_id=dd.daemon_id,
+ daemon_type=dd.daemon_type,
+ service_name=dd.service_name(),
+ ip=dd.ip,
+ ports=dd.ports,
+ rank=dd.rank,
+ rank_generation=dd.rank_generation,
+ extra_container_args=dd.extra_container_args,
+ )
+
+ def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
+ return DaemonDescription(
+ daemon_type=self.daemon_type,
+ daemon_id=self.daemon_id,
+ service_name=self.service_name,
+ hostname=self.host,
+ status=status,
+ status_desc=status_desc,
+ ip=self.ip,
+ ports=self.ports,
+ rank=self.rank,
+ rank_generation=self.rank_generation,
+ extra_container_args=self.extra_container_args,
+ )
+
+
+class CephadmService(metaclass=ABCMeta):
+ """
+ Base class for service types. Often providing a create() and config() fn.
+ """
+
+ @property
+ @abstractmethod
+ def TYPE(self) -> str:
+ pass
+
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr: "CephadmOrchestrator" = mgr
+
+ def allow_colo(self) -> bool:
+ """
+ Return True if multiple daemons of the same type can colocate on
+ the same host.
+ """
+ return False
+
+ def primary_daemon_type(self) -> str:
+ """
+ This is the type of the primary (usually only) daemon to be deployed.
+ """
+ return self.TYPE
+
+ def per_host_daemon_type(self) -> Optional[str]:
+ """
+ If defined, this type of daemon will be deployed once for each host
+ containing one or more daemons of the primary type.
+ """
+ return None
+
+ def ranked(self) -> bool:
+ """
+ If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+ generation (0, 1, ...) to each daemon we create/deploy.
+ """
+ return False
+
+ def fence_old_ranks(self,
+ spec: ServiceSpec,
+ rank_map: Dict[int, Dict[int, Optional[str]]],
+ num_ranks: int) -> None:
+ assert False
+
+ def make_daemon_spec(
+ self,
+ host: str,
+ daemon_id: str,
+ network: str,
+ spec: ServiceSpecs,
+ daemon_type: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ ip: Optional[str] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ ) -> CephadmDaemonDeploySpec:
+ try:
+ eca = spec.extra_container_args
+ except AttributeError:
+ eca = None
+ return CephadmDaemonDeploySpec(
+ host=host,
+ daemon_id=daemon_id,
+ service_name=spec.service_name(),
+ network=network,
+ daemon_type=daemon_type,
+ ports=ports,
+ ip=ip,
+ rank=rank,
+ rank_generation=rank_generation,
+ extra_container_args=eca,
+ )
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ raise NotImplementedError()
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ raise NotImplementedError()
+
+ def config(self, spec: ServiceSpec) -> None:
+ """
+ Configure the cluster for this service. Only called *once* per
+ service apply. Not for every daemon.
+ """
+ pass
+
+ def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
+ """The post actions needed to be done after daemons are checked"""
+ if self.mgr.config_dashboard:
+ if 'dashboard' in self.mgr.get('mgr_map')['modules']:
+ self.config_dashboard(daemon_descrs)
+ else:
+ logger.debug('Dashboard is not enabled. Skip configuration.')
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ """Config dashboard settings."""
+ raise NotImplementedError()
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # if this is called for a service type where it hasn't explcitly been
+ # defined, return empty Daemon Desc
+ return DaemonDescription()
+
+ def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
+ ret, keyring, err = self.mgr.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': entity,
+ 'caps': caps,
+ })
+ if err:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth caps',
+ 'entity': entity,
+ 'caps': caps,
+ })
+ if err:
+ self.mgr.log.warning(f"Unable to update caps for {entity}")
+ return keyring
+
+ def _inventory_get_fqdn(self, hostname: str) -> str:
+ """Get a host's FQDN with its hostname.
+
+ If the FQDN can't be resolved, the address from the inventory will
+ be returned instead.
+ """
+ addr = self.mgr.inventory.get_addr(hostname)
+ return socket.getfqdn(addr)
+
+ def _set_service_url_on_dashboard(self,
+ service_name: str,
+ get_mon_cmd: str,
+ set_mon_cmd: str,
+ service_url: str) -> None:
+ """A helper to get and set service_url via Dashboard's MON command.
+
+ If result of get_mon_cmd differs from service_url, set_mon_cmd will
+ be sent to set the service_url.
+ """
+ def get_set_cmd_dicts(out: str) -> List[dict]:
+ cmd_dict = {
+ 'prefix': set_mon_cmd,
+ 'value': service_url
+ }
+ return [cmd_dict] if service_url != out else []
+
+ self._check_and_set_dashboard(
+ service_name=service_name,
+ get_cmd=get_mon_cmd,
+ get_set_cmd_dicts=get_set_cmd_dicts
+ )
+
+ def _check_and_set_dashboard(self,
+ service_name: str,
+ get_cmd: str,
+ get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
+ """A helper to set configs in the Dashboard.
+
+ The method is useful for the pattern:
+ - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
+ gateways.
+ - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
+ - Determine if the config need to be update. NOTE: This step is important because if a
+ Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
+ kicks the serve() loop and the logic using this method is likely to be called again.
+ A config should be updated only when needed.
+ - Update a config in Dashboard by using a Dashboard command.
+
+ :param service_name: the service name to be used for logging
+ :type service_name: str
+ :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
+ :type get_cmd: str
+ :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
+ e.g.
+ [
+ {
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'service_url': 'http://admin:admin@aaa:5000',
+ 'name': 'aaa'
+ },
+ {
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'service_url': 'http://admin:admin@bbb:5000',
+ 'name': 'bbb'
+ }
+ ]
+ The function should return empty list if no command need to be sent.
+ :type get_set_cmd_dicts: Callable[[str], List[dict]]
+ """
+
+ try:
+ _, out, _ = self.mgr.check_mon_command({
+ 'prefix': get_cmd
+ })
+ except MonCommandFailed as e:
+ logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
+ return
+ cmd_dicts = get_set_cmd_dicts(out.strip())
+ for cmd_dict in list(cmd_dicts):
+ try:
+ inbuf = cmd_dict.pop('inbuf', None)
+ _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
+ except MonCommandFailed as e:
+ logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
+
+ def ok_to_stop_osd(
+ self,
+ osds: List[str],
+ known: Optional[List[str]] = None, # output argument
+ force: bool = False) -> HandleCommandResult:
+ r = HandleCommandResult(*self.mgr.mon_command({
+ 'prefix': "osd ok-to-stop",
+ 'ids': osds,
+ 'max': 16,
+ }))
+ j = None
+ try:
+ j = json.loads(r.stdout)
+ except json.decoder.JSONDecodeError:
+ self.mgr.log.warning("osd ok-to-stop didn't return structured result")
+ raise
+ if r.retval:
+ return r
+ if known is not None and j and j.get('ok_to_stop'):
+ self.mgr.log.debug(f"got {j}")
+ known.extend([f'osd.{x}' for x in j.get('osds', [])])
+ return HandleCommandResult(
+ 0,
+ f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
+ ''
+ )
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+ out = f'It appears safe to stop {",".join(names)}'
+ err = f'It is NOT safe to stop {",".join(names)} at this time'
+
+ if self.TYPE not in ['mon', 'osd', 'mds']:
+ logger.debug(out)
+ return HandleCommandResult(0, out)
+
+ if self.TYPE == 'osd':
+ return self.ok_to_stop_osd(daemon_ids, known, force)
+
+ r = HandleCommandResult(*self.mgr.mon_command({
+ 'prefix': f'{self.TYPE} ok-to-stop',
+ 'ids': daemon_ids,
+ }))
+
+ if r.retval:
+ err = f'{err}: {r.stderr}' if r.stderr else err
+ logger.debug(err)
+ return HandleCommandResult(r.retval, r.stdout, err)
+
+ out = f'{out}: {r.stdout}' if r.stdout else out
+ logger.debug(out)
+ return HandleCommandResult(r.retval, out, r.stderr)
+
+ def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
+ # Provides a warning about if it possible or not to stop <n> daemons in a service
+ names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
+ number_of_running_daemons = len(
+ [daemon
+ for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
+ if daemon.status == DaemonDescriptionStatus.running])
+ if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
+ return False, f'It is presumed safe to stop {names}'
+
+ num_daemons_left = number_of_running_daemons - len(daemon_ids)
+
+ def plural(count: int) -> str:
+ return 'daemon' if count == 1 else 'daemons'
+
+ left_count = "no" if num_daemons_left == 0 else num_daemons_left
+
+ if alert:
+ out = (f'ALERT: Cannot stop {names} in {service} service. '
+ f'Not enough remaining {service} daemons. '
+ f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
+ else:
+ out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
+ f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
+ f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
+ return True, out
+
+ def pre_remove(self, daemon: DaemonDescription) -> None:
+ """
+ Called before the daemon is removed.
+ """
+ assert daemon.daemon_type is not None
+ assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
+ logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ """
+ Called after the daemon is removed.
+ """
+ assert daemon.daemon_type is not None
+ assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
+ logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+ def purge(self, service_name: str) -> None:
+ """Called to carry out any purge tasks following service removal"""
+ logger.debug(f'Purge called for {self.TYPE} - no action taken')
+
+
+class CephService(CephadmService):
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ # Ceph.daemons (mon, mgr, mds, osd, etc)
+ cephadm_config = self.get_config_and_keyring(
+ daemon_spec.daemon_type,
+ daemon_spec.daemon_id,
+ host=daemon_spec.host,
+ keyring=daemon_spec.keyring,
+ extra_ceph_config=daemon_spec.ceph_conf)
+
+ if daemon_spec.config_get_files():
+ cephadm_config.update({'files': daemon_spec.config_get_files()})
+
+ return cephadm_config, []
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ self.remove_keyring(daemon)
+
+ def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
+ """
+ Map the daemon id to a cephx keyring entity name
+ """
+ # despite this mapping entity names to daemons, self.TYPE within
+ # the CephService class refers to service types, not daemon types
+ if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
+ return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
+ elif self.TYPE == 'crash':
+ if host == "":
+ raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+ return AuthEntity(f'client.{self.TYPE}.{host}')
+ elif self.TYPE == 'mon':
+ return AuthEntity('mon.')
+ elif self.TYPE in ['mgr', 'osd', 'mds']:
+ return AuthEntity(f'{self.TYPE}.{daemon_id}')
+ else:
+ raise OrchestratorError("unknown daemon type")
+
+ def get_config_and_keyring(self,
+ daemon_type: str,
+ daemon_id: str,
+ host: str,
+ keyring: Optional[str] = None,
+ extra_ceph_config: Optional[str] = None
+ ) -> Dict[str, Any]:
+ # keyring
+ if not keyring:
+ entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get',
+ 'entity': entity,
+ })
+
+ config = self.mgr.get_minimal_ceph_conf()
+
+ if extra_ceph_config:
+ config += extra_ceph_config
+
+ return {
+ 'config': config,
+ 'keyring': keyring,
+ }
+
+ def remove_keyring(self, daemon: DaemonDescription) -> None:
+ assert daemon.daemon_id is not None
+ assert daemon.hostname is not None
+ daemon_id: str = daemon.daemon_id
+ host: str = daemon.hostname
+
+ assert daemon.daemon_type != 'mon'
+
+ entity = self.get_auth_entity(daemon_id, host=host)
+
+ logger.info(f'Removing key for {entity}')
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+
+
+class MonService(CephService):
+ TYPE = 'mon'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ """
+ Create a new monitor on the given host.
+ """
+ assert self.TYPE == daemon_spec.daemon_type
+ name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+
+ # get mon. key
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get',
+ 'entity': self.get_auth_entity(name),
+ })
+
+ extra_config = '[mon.%s]\n' % name
+ if network:
+ # infer whether this is a CIDR network, addrvec, or plain IP
+ if '/' in network:
+ extra_config += 'public network = %s\n' % network
+ elif network.startswith('[v') and network.endswith(']'):
+ extra_config += 'public addrv = %s\n' % network
+ elif is_ipv6(network):
+ extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
+ elif ':' not in network:
+ extra_config += 'public addr = %s\n' % network
+ else:
+ raise OrchestratorError(
+ 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
+ else:
+ # try to get the public_network from the config
+ ret, network, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'mon',
+ 'key': 'public_network',
+ })
+ network = network.strip() if network else network
+ if not network:
+ raise OrchestratorError(
+ 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
+ if '/' not in network:
+ raise OrchestratorError(
+ 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
+ extra_config += 'public network = %s\n' % network
+
+ daemon_spec.ceph_conf = extra_config
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def _check_safe_to_destroy(self, mon_id: str) -> None:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'quorum_status',
+ })
+ try:
+ j = json.loads(out)
+ except Exception:
+ raise OrchestratorError('failed to parse quorum status')
+
+ mons = [m['name'] for m in j['monmap']['mons']]
+ if mon_id not in mons:
+ logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
+ mon_id, mons))
+ return
+ new_mons = [m for m in mons if m != mon_id]
+ new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+ if len(new_quorum) > len(new_mons) / 2:
+ logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
+ (mon_id, new_quorum, new_mons))
+ return
+ raise OrchestratorError(
+ 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
+
+ def pre_remove(self, daemon: DaemonDescription) -> None:
+ super().pre_remove(daemon)
+
+ assert daemon.daemon_id is not None
+ daemon_id: str = daemon.daemon_id
+ self._check_safe_to_destroy(daemon_id)
+
+ # remove mon from quorum before we destroy the daemon
+ logger.info('Removing monitor %s from monmap...' % daemon_id)
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mon rm',
+ 'name': daemon_id,
+ })
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ # Do not remove the mon keyring.
+ # super().post_remove(daemon)
+ pass
+
+
+class MgrService(CephService):
+ TYPE = 'mgr'
+
+ def allow_colo(self) -> bool:
+ if self.mgr.get_ceph_option('mgr_standby_modules'):
+ # traditional mgr mode: standby daemons' modules listen on
+ # ports and redirect to the primary. we must not schedule
+ # multiple mgrs on the same host or else ports will
+ # conflict.
+ return False
+ else:
+ # standby daemons do nothing, and therefore port conflicts
+ # are not a concern.
+ return True
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ """
+ Create a new manager instance on a host.
+ """
+ assert self.TYPE == daemon_spec.daemon_type
+ mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
+
+ # get mgr. key
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
+ ['mon', 'profile mgr',
+ 'osd', 'allow *',
+ 'mds', 'allow *'])
+
+ # Retrieve ports used by manager modules
+ # In the case of the dashboard port and with several manager daemons
+ # running in different hosts, it exists the possibility that the
+ # user has decided to use different dashboard ports in each server
+ # If this is the case then the dashboard port opened will be only the used
+ # as default.
+ ports = []
+ ret, mgr_services, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr services',
+ })
+ if mgr_services:
+ mgr_endpoints = json.loads(mgr_services)
+ for end_point in mgr_endpoints.values():
+ port = re.search(r'\:\d+\/', end_point)
+ if port:
+ ports.append(int(port[0][1:-1]))
+
+ if ports:
+ daemon_spec.ports = ports
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ for daemon in daemon_descrs:
+ assert daemon.daemon_type is not None
+ assert daemon.daemon_id is not None
+ if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
+ return daemon
+ # if no active mgr found, return empty Daemon Desc
+ return DaemonDescription()
+
+ def fail_over(self) -> None:
+ # this has been seen to sometimes transiently fail even when there are multiple
+ # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+ class NoStandbyError(OrchestratorError):
+ pass
+ no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+ 'daemon', 'mgr' + self.mgr.get_mgr_id()))
+ for sleep_secs in [2, 8, 15]:
+ try:
+ if not self.mgr_map_has_standby():
+ raise no_standby_exc
+ self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+ 'INFO', 'Failing over to other MGR')
+ logger.info('Failing over to other MGR')
+
+ # fail over
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr fail',
+ 'who': self.mgr.get_mgr_id(),
+ })
+ return
+ except NoStandbyError:
+ logger.info(
+ f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+ time.sleep(sleep_secs)
+ raise no_standby_exc
+
+ def mgr_map_has_standby(self) -> bool:
+ """
+ This is a bit safer than asking our inventory. If the mgr joined the mgr map,
+ we know it joined the cluster
+ """
+ mgr_map = self.mgr.get('mgr_map')
+ num = len(mgr_map.get('standbys'))
+ return bool(num)
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
+
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+ active = self.get_active_daemon(mgr_daemons).daemon_id
+ if active in daemon_ids:
+ warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ return HandleCommandResult(0, warn_message, '')
+
+
+class MdsService(CephService):
+ TYPE = 'mds'
+
+ def allow_colo(self) -> bool:
+ return True
+
+ def config(self, spec: ServiceSpec) -> None:
+ assert self.TYPE == spec.service_type
+ assert spec.service_id
+
+ # ensure mds_join_fs is set for these daemons
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mds.' + spec.service_id,
+ 'name': 'mds_join_fs',
+ 'value': spec.service_id,
+ })
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
+
+ # get mds. key
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
+ ['mon', 'profile mds',
+ 'osd', 'allow rw tag cephfs *=*',
+ 'mds', 'allow'])
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ active_mds_strs = list()
+ for fs in self.mgr.get('fs_map')['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['state'] == 'up:active':
+ active_mds_strs.append(mds_status['name'])
+ if len(active_mds_strs) != 0:
+ for daemon in daemon_descrs:
+ if daemon.daemon_id in active_mds_strs:
+ return daemon
+ # if no mds found, return empty Daemon Desc
+ return DaemonDescription()
+
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': service_name,
+ 'name': 'mds_join_fs',
+ })
+
+
+class RgwService(CephService):
+ TYPE = 'rgw'
+
+ def allow_colo(self) -> bool:
+ return True
+
+ def config(self, spec: RGWSpec) -> None: # type: ignore
+ assert self.TYPE == spec.service_type
+
+ # set rgw_realm and rgw_zone, if present
+ if spec.rgw_realm:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_realm',
+ 'value': spec.rgw_realm,
+ })
+ if spec.rgw_zone:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_zone',
+ 'value': spec.rgw_zone,
+ })
+
+ if spec.rgw_frontend_ssl_certificate:
+ if isinstance(spec.rgw_frontend_ssl_certificate, list):
+ cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
+ elif isinstance(spec.rgw_frontend_ssl_certificate, str):
+ cert_data = spec.rgw_frontend_ssl_certificate
+ else:
+ raise OrchestratorError(
+ 'Invalid rgw_frontend_ssl_certificate: %s'
+ % spec.rgw_frontend_ssl_certificate)
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config-key set',
+ 'key': f'rgw/cert/{spec.service_name()}',
+ 'val': cert_data,
+ })
+
+ # TODO: fail, if we don't have a spec
+ logger.info('Saving service %s spec with placement %s' % (
+ spec.service_name(), spec.placement.pretty_str()))
+ self.mgr.spec_store.save(spec)
+ self.mgr.trigger_connect_dashboard_rgw()
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
+ spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ keyring = self.get_keyring(rgw_id)
+
+ if daemon_spec.ports:
+ port = daemon_spec.ports[0]
+ else:
+ # this is a redeploy of older instance that doesn't have an explicitly
+ # assigned port, in which case we can assume there is only 1 per host
+ # and it matches the spec.
+ port = spec.get_port()
+
+ # configure frontend
+ args = []
+ ftype = spec.rgw_frontend_type or "beast"
+ if ftype == 'beast':
+ if spec.ssl:
+ if daemon_spec.ip:
+ args.append(
+ f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"ssl_port={port}")
+ args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+ else:
+ if daemon_spec.ip:
+ args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"port={port}")
+ elif ftype == 'civetweb':
+ if spec.ssl:
+ if daemon_spec.ip:
+ # note the 's' suffix on port
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
+ else:
+ args.append(f"port={port}s") # note the 's' suffix on port
+ args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+ else:
+ if daemon_spec.ip:
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"port={port}")
+ frontend = f'{ftype} {" ".join(args)}'
+
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': utils.name_to_config_section(daemon_spec.name()),
+ 'name': 'rgw_frontends',
+ 'value': frontend
+ })
+
+ daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def get_keyring(self, rgw_id: str) -> str:
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
+ ['mon', 'allow *',
+ 'mgr', 'allow rw',
+ 'osd', 'allow rwx tag rgw *=*'])
+ return keyring
+
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(service_name),
+ 'name': 'rgw_realm',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(service_name),
+ 'name': 'rgw_zone',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config-key rm',
+ 'key': f'rgw/cert/{service_name}',
+ })
+ self.mgr.trigger_connect_dashboard_rgw()
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_frontends',
+ })
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
+ # if no load balancer, warn if > 1 daemon, block if only 1 daemon
+ def ingress_present() -> bool:
+ running_ingress_daemons = [
+ daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
+ running_haproxy_daemons = [
+ daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
+ running_keepalived_daemons = [
+ daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
+ # check that there is at least one haproxy and keepalived daemon running
+ if running_haproxy_daemons and running_keepalived_daemons:
+ return True
+ return False
+
+ # if only 1 rgw, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ # if reached here, there is > 1 rgw daemon.
+ # Say okay if load balancer present or force flag set
+ if ingress_present() or force:
+ return HandleCommandResult(0, warn_message, '')
+
+ # if reached here, > 1 RGW daemon, no load balancer and no force flag.
+ # Provide warning
+ warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ self.mgr.trigger_connect_dashboard_rgw()
+
+
+class RbdMirrorService(CephService):
+ TYPE = 'rbd-mirror'
+
+ def allow_colo(self) -> bool:
+ return True
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
+ ['mon', 'profile rbd-mirror',
+ 'osd', 'profile rbd'])
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # if only 1 rbd-mirror, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(
+ self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ return HandleCommandResult(0, warn_message, '')
+
+
+class CrashService(CephService):
+ TYPE = 'crash'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
+ ['mon', 'profile crash',
+ 'mgr', 'profile crash'])
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+
+class CephfsMirrorService(CephService):
+ TYPE = 'cephfs-mirror'
+
+ def config(self, spec: ServiceSpec) -> None:
+ # make sure mirroring module is enabled
+ mgr_map = self.mgr.get('mgr_map')
+ mod_name = 'mirroring'
+ if mod_name not in mgr_map.get('services', {}):
+ self.mgr.check_mon_command({
+ 'prefix': 'mgr module enable',
+ 'module': mod_name
+ })
+ # we shouldn't get here (mon will tell the mgr to respawn), but no
+ # harm done if we do.
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': self.get_auth_entity(daemon_spec.daemon_id),
+ 'caps': ['mon', 'profile cephfs-mirror',
+ 'mds', 'allow r',
+ 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
+ 'mgr', 'allow r'],
+ })
+
+ daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
diff --git a/src/pybind/mgr/cephadm/services/container.py b/src/pybind/mgr/cephadm/services/container.py
new file mode 100644
index 000000000..b9cdfad5e
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/container.py
@@ -0,0 +1,29 @@
+import logging
+from typing import List, Any, Tuple, Dict, cast
+
+from ceph.deployment.service_spec import CustomContainerSpec
+
+from .cephadmservice import CephadmService, CephadmDaemonDeploySpec
+
+logger = logging.getLogger(__name__)
+
+
+class CustomContainerService(CephadmService):
+ TYPE = 'container'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) \
+ -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) \
+ -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps: List[str] = []
+ spec = cast(CustomContainerSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ config: Dict[str, Any] = spec.config_json()
+ logger.debug(
+ 'Generated configuration for \'%s\' service: config-json=%s, dependencies=%s' %
+ (self.TYPE, config, deps))
+ return config, deps
diff --git a/src/pybind/mgr/cephadm/services/exporter.py b/src/pybind/mgr/cephadm/services/exporter.py
new file mode 100644
index 000000000..b9c7d85e6
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/exporter.py
@@ -0,0 +1,147 @@
+import json
+import logging
+from typing import TYPE_CHECKING, List, Dict, Any, Tuple
+
+from orchestrator import OrchestratorError
+from mgr_util import ServerConfigException, verify_tls
+
+from .cephadmservice import CephadmService, CephadmDaemonDeploySpec
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+
+class CephadmExporterConfig:
+ required_keys = ['crt', 'key', 'token', 'port']
+ DEFAULT_PORT = '9443'
+
+ def __init__(self, mgr: "CephadmOrchestrator", crt: str = "", key: str = "",
+ token: str = "", port: str = "") -> None:
+ self.mgr = mgr
+ self.crt = crt
+ self.key = key
+ self.token = token
+ self.port = port
+
+ @property
+ def ready(self) -> bool:
+ return all([self.crt, self.key, self.token, self.port])
+
+ def load_from_store(self) -> None:
+ cfg = self.mgr._get_exporter_config()
+
+ assert isinstance(cfg, dict)
+ self.crt = cfg.get('crt', "")
+ self.key = cfg.get('key', "")
+ self.token = cfg.get('token', "")
+ self.port = cfg.get('port', "")
+
+ def load_from_json(self, json_str: str) -> Tuple[int, str]:
+ try:
+ cfg = json.loads(json_str)
+ except ValueError:
+ return 1, "Invalid JSON provided - unable to load"
+
+ if not all([k in cfg for k in CephadmExporterConfig.required_keys]):
+ return 1, "JSON file must contain crt, key, token and port"
+
+ self.crt = cfg.get('crt')
+ self.key = cfg.get('key')
+ self.token = cfg.get('token')
+ self.port = cfg.get('port')
+
+ return 0, ""
+
+ def validate_config(self) -> Tuple[int, str]:
+ if not self.ready:
+ return 1, "Incomplete configuration. cephadm-exporter needs crt, key, token and port to be set"
+
+ for check in [self._validate_tls, self._validate_token, self._validate_port]:
+ rc, reason = check()
+ if rc:
+ return 1, reason
+
+ return 0, ""
+
+ def _validate_tls(self) -> Tuple[int, str]:
+
+ try:
+ verify_tls(self.crt, self.key)
+ except ServerConfigException as e:
+ return 1, str(e)
+
+ return 0, ""
+
+ def _validate_token(self) -> Tuple[int, str]:
+ if not isinstance(self.token, str):
+ return 1, "token must be a string"
+ if len(self.token) < 8:
+ return 1, "Token must be a string of at least 8 chars in length"
+
+ return 0, ""
+
+ def _validate_port(self) -> Tuple[int, str]:
+ try:
+ p = int(str(self.port))
+ if p <= 1024:
+ raise ValueError
+ except ValueError:
+ return 1, "Port must be a integer (>1024)"
+
+ return 0, ""
+
+
+class CephadmExporter(CephadmService):
+ TYPE = 'cephadm-exporter'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+
+ cfg = CephadmExporterConfig(self.mgr)
+ cfg.load_from_store()
+
+ if cfg.ready:
+ rc, reason = cfg.validate_config()
+ if rc:
+ raise OrchestratorError(reason)
+ else:
+ logger.info(
+ "Incomplete/Missing configuration, applying defaults")
+ self.mgr._set_exporter_defaults()
+ cfg.load_from_store()
+
+ if not daemon_spec.ports:
+ daemon_spec.ports = [int(cfg.port)]
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps: List[str] = []
+
+ cfg = CephadmExporterConfig(self.mgr)
+ cfg.load_from_store()
+
+ if cfg.ready:
+ rc, reason = cfg.validate_config()
+ if rc:
+ raise OrchestratorError(reason)
+ else:
+ logger.info("Using default configuration for cephadm-exporter")
+ self.mgr._set_exporter_defaults()
+ cfg.load_from_store()
+
+ config = {
+ "crt": cfg.crt,
+ "key": cfg.key,
+ "token": cfg.token
+ }
+ return config, deps
+
+ def purge(self, service_name: str) -> None:
+ logger.info("Purging cephadm-exporter settings from mon K/V store")
+ self.mgr._clear_exporter_config_settings()
diff --git a/src/pybind/mgr/cephadm/services/ingress.py b/src/pybind/mgr/cephadm/services/ingress.py
new file mode 100644
index 000000000..99fde1c43
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/ingress.py
@@ -0,0 +1,296 @@
+import ipaddress
+import logging
+import random
+import string
+from typing import List, Dict, Any, Tuple, cast, Optional
+
+from ceph.deployment.service_spec import IngressSpec
+from mgr_util import build_url
+from cephadm.utils import resolve_ip
+from orchestrator import OrchestratorError
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+
+logger = logging.getLogger(__name__)
+
+
+class IngressService(CephService):
+ TYPE = 'ingress'
+
+ def primary_daemon_type(self) -> str:
+ return 'haproxy'
+
+ def per_host_daemon_type(self) -> Optional[str]:
+ return 'keepalived'
+
+ def prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ if daemon_spec.daemon_type == 'haproxy':
+ return self.haproxy_prepare_create(daemon_spec)
+ if daemon_spec.daemon_type == 'keepalived':
+ return self.keepalived_prepare_create(daemon_spec)
+ assert False, "unexpected daemon type"
+
+ def generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ if daemon_spec.daemon_type == 'haproxy':
+ return self.haproxy_generate_config(daemon_spec)
+ else:
+ return self.keepalived_generate_config(daemon_spec)
+ assert False, "unexpected daemon type"
+
+ def haproxy_prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ assert daemon_spec.daemon_type == 'haproxy'
+
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ logger.debug('prepare_create haproxy.%s on host %s with spec %s' % (
+ daemon_id, host, spec))
+
+ daemon_spec.final_config, daemon_spec.deps = self.haproxy_generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def haproxy_generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ assert spec.backend_service
+ if spec.backend_service not in self.mgr.spec_store:
+ raise RuntimeError(
+ f'{spec.service_name()} backend service {spec.backend_service} does not exist')
+ backend_spec = self.mgr.spec_store[spec.backend_service].spec
+ daemons = self.mgr.cache.get_daemons_by_service(spec.backend_service)
+ deps = [d.name() for d in daemons]
+
+ # generate password?
+ pw_key = f'{spec.service_name()}/monitor_password'
+ password = self.mgr.get_store(pw_key)
+ if password is None:
+ if not spec.monitor_password:
+ password = ''.join(random.choice(string.ascii_lowercase) for _ in range(20))
+ self.mgr.set_store(pw_key, password)
+ else:
+ if spec.monitor_password:
+ self.mgr.set_store(pw_key, None)
+ if spec.monitor_password:
+ password = spec.monitor_password
+
+ if backend_spec.service_type == 'nfs':
+ mode = 'tcp'
+ by_rank = {d.rank: d for d in daemons if d.rank is not None}
+ servers = []
+
+ # try to establish how many ranks we *should* have
+ num_ranks = backend_spec.placement.count
+ if not num_ranks:
+ num_ranks = 1 + max(by_rank.keys())
+
+ for rank in range(num_ranks):
+ if rank in by_rank:
+ d = by_rank[rank]
+ assert d.ports
+ servers.append({
+ 'name': f"{spec.backend_service}.{rank}",
+ 'ip': d.ip or resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))),
+ 'port': d.ports[0],
+ })
+ else:
+ # offline/missing server; leave rank in place
+ servers.append({
+ 'name': f"{spec.backend_service}.{rank}",
+ 'ip': '0.0.0.0',
+ 'port': 0,
+ })
+ else:
+ mode = 'http'
+ servers = [
+ {
+ 'name': d.name(),
+ 'ip': d.ip or resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))),
+ 'port': d.ports[0],
+ } for d in daemons if d.ports
+ ]
+
+ haproxy_conf = self.mgr.template.render(
+ 'services/ingress/haproxy.cfg.j2',
+ {
+ 'spec': spec,
+ 'mode': mode,
+ 'servers': servers,
+ 'user': spec.monitor_user or 'admin',
+ 'password': password,
+ 'ip': "*" if spec.virtual_ips_list else str(spec.virtual_ip).split('/')[0] or daemon_spec.ip or '*',
+ 'frontend_port': daemon_spec.ports[0] if daemon_spec.ports else spec.frontend_port,
+ 'monitor_port': daemon_spec.ports[1] if daemon_spec.ports else spec.monitor_port,
+ }
+ )
+ config_files = {
+ 'files': {
+ "haproxy.cfg": haproxy_conf,
+ }
+ }
+ if spec.ssl_cert:
+ ssl_cert = spec.ssl_cert
+ if isinstance(ssl_cert, list):
+ ssl_cert = '\n'.join(ssl_cert)
+ config_files['files']['haproxy.pem'] = ssl_cert
+
+ return config_files, sorted(deps)
+
+ def keepalived_prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ assert daemon_spec.daemon_type == 'keepalived'
+
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ logger.debug('prepare_create keepalived.%s on host %s with spec %s' % (
+ daemon_id, host, spec))
+
+ daemon_spec.final_config, daemon_spec.deps = self.keepalived_generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def keepalived_generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ spec = cast(IngressSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ assert spec.backend_service
+
+ # generate password?
+ pw_key = f'{spec.service_name()}/keepalived_password'
+ password = self.mgr.get_store(pw_key)
+ if password is None:
+ if not spec.keepalived_password:
+ password = ''.join(random.choice(string.ascii_lowercase) for _ in range(20))
+ self.mgr.set_store(pw_key, password)
+ else:
+ if spec.keepalived_password:
+ self.mgr.set_store(pw_key, None)
+ if spec.keepalived_password:
+ password = spec.keepalived_password
+
+ daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
+
+ if not daemons:
+ raise OrchestratorError(
+ f'Failed to generate keepalived.conf: No daemons deployed for {spec.service_name()}')
+
+ deps = sorted([d.name() for d in daemons if d.daemon_type == 'haproxy'])
+
+ host = daemon_spec.host
+ hosts = sorted(list(set([host] + [str(d.hostname) for d in daemons])))
+
+ # interface
+ bare_ips = []
+ if spec.virtual_ip:
+ bare_ips.append(str(spec.virtual_ip).split('/')[0])
+ elif spec.virtual_ips_list:
+ bare_ips = [str(vip).split('/')[0] for vip in spec.virtual_ips_list]
+ interface = None
+ for bare_ip in bare_ips:
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+ interface = list(ifaces.keys())[0]
+ logger.info(
+ f'{bare_ip} is in {subnet} on {host} interface {interface}'
+ )
+ break
+ else: # nobreak
+ continue
+ break
+ # try to find interface by matching spec.virtual_interface_networks
+ if not interface and spec.virtual_interface_networks:
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if subnet in spec.virtual_interface_networks:
+ interface = list(ifaces.keys())[0]
+ logger.info(
+ f'{spec.virtual_ip} will be configured on {host} interface '
+ f'{interface} (which has guiding subnet {subnet})'
+ )
+ break
+ if not interface:
+ raise OrchestratorError(
+ f"Unable to identify interface for {spec.virtual_ip} on {host}"
+ )
+
+ # script to monitor health
+ script = '/usr/bin/false'
+ for d in daemons:
+ if d.hostname == host:
+ if d.daemon_type == 'haproxy':
+ assert d.ports
+ port = d.ports[1] # monitoring port
+ script = f'/usr/bin/curl {build_url(scheme="http", host=d.ip or "localhost", port=port)}/health'
+ assert script
+
+ states = []
+ priorities = []
+ virtual_ips = []
+
+ # Set state and priority. Have one master for each VIP. Or at least the first one as master if only one VIP.
+ if spec.virtual_ip:
+ virtual_ips.append(spec.virtual_ip)
+ if hosts[0] == host:
+ states.append('MASTER')
+ priorities.append(100)
+ else:
+ states.append('BACKUP')
+ priorities.append(90)
+
+ elif spec.virtual_ips_list:
+ virtual_ips = spec.virtual_ips_list
+ if len(virtual_ips) > len(hosts):
+ raise OrchestratorError(
+ "Number of virtual IPs for ingress is greater than number of available hosts"
+ )
+ for x in range(len(virtual_ips)):
+ if hosts[x] == host:
+ states.append('MASTER')
+ priorities.append(100)
+ else:
+ states.append('BACKUP')
+ priorities.append(90)
+
+ # remove host, daemon is being deployed on from hosts list for
+ # other_ips in conf file and converter to ips
+ if host in hosts:
+ hosts.remove(host)
+ other_ips = [resolve_ip(self.mgr.inventory.get_addr(h)) for h in hosts]
+
+ keepalived_conf = self.mgr.template.render(
+ 'services/ingress/keepalived.conf.j2',
+ {
+ 'spec': spec,
+ 'script': script,
+ 'password': password,
+ 'interface': interface,
+ 'virtual_ips': virtual_ips,
+ 'states': states,
+ 'priorities': priorities,
+ 'other_ips': other_ips,
+ 'host_ip': resolve_ip(self.mgr.inventory.get_addr(host)),
+ }
+ )
+
+ config_file = {
+ 'files': {
+ "keepalived.conf": keepalived_conf,
+ }
+ }
+
+ return config_file, deps
diff --git a/src/pybind/mgr/cephadm/services/iscsi.py b/src/pybind/mgr/cephadm/services/iscsi.py
new file mode 100644
index 000000000..c42eff683
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/iscsi.py
@@ -0,0 +1,210 @@
+import errno
+import json
+import logging
+import subprocess
+from typing import List, cast, Optional
+from ipaddress import ip_address, IPv6Address
+
+from mgr_module import HandleCommandResult
+from ceph.deployment.service_spec import IscsiServiceSpec
+
+from orchestrator import DaemonDescription, DaemonDescriptionStatus
+from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .. import utils
+
+logger = logging.getLogger(__name__)
+
+
+class IscsiService(CephService):
+ TYPE = 'iscsi'
+
+ def config(self, spec: IscsiServiceSpec) -> None: # type: ignore
+ assert self.TYPE == spec.service_type
+ assert spec.pool
+ self.mgr._check_pool_exists(spec.pool, spec.service_name())
+
+ def get_trusted_ips(self, spec: IscsiServiceSpec) -> str:
+ # add active mgr ip address to trusted list so dashboard can access
+ trusted_ip_list = spec.trusted_ip_list if spec.trusted_ip_list else ''
+ if trusted_ip_list:
+ trusted_ip_list += ','
+ trusted_ip_list += self.mgr.get_mgr_ip()
+ return trusted_ip_list
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+
+ spec = cast(IscsiServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ igw_id = daemon_spec.daemon_id
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(igw_id),
+ ['mon', 'profile rbd, '
+ 'allow command "osd blocklist", '
+ 'allow command "config-key get" with "key" prefix "iscsi/"',
+ 'mgr', 'allow command "service status"',
+ 'osd', 'allow rwx'])
+
+ if spec.ssl_cert:
+ if isinstance(spec.ssl_cert, list):
+ cert_data = '\n'.join(spec.ssl_cert)
+ else:
+ cert_data = spec.ssl_cert
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config-key set',
+ 'key': f'iscsi/{utils.name_to_config_section("iscsi")}.{igw_id}/iscsi-gateway.crt',
+ 'val': cert_data,
+ })
+
+ if spec.ssl_key:
+ if isinstance(spec.ssl_key, list):
+ key_data = '\n'.join(spec.ssl_key)
+ else:
+ key_data = spec.ssl_key
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config-key set',
+ 'key': f'iscsi/{utils.name_to_config_section("iscsi")}.{igw_id}/iscsi-gateway.key',
+ 'val': key_data,
+ })
+
+ trusted_ip_list = self.get_trusted_ips(spec)
+
+ context = {
+ 'client_name': '{}.{}'.format(utils.name_to_config_section('iscsi'), igw_id),
+ 'trusted_ip_list': trusted_ip_list,
+ 'spec': spec
+ }
+ igw_conf = self.mgr.template.render('services/iscsi/iscsi-gateway.cfg.j2', context)
+
+ daemon_spec.keyring = keyring
+ daemon_spec.extra_files = {'iscsi-gateway.cfg': igw_conf}
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ daemon_spec.deps = [trusted_ip_list]
+ return daemon_spec
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ def get_set_cmd_dicts(out: str) -> List[dict]:
+ gateways = json.loads(out)['gateways']
+ cmd_dicts = []
+ # TODO: fail, if we don't have a spec
+ spec = cast(IscsiServiceSpec,
+ self.mgr.spec_store.all_specs.get(daemon_descrs[0].service_name(), None))
+ if spec.api_secure and spec.ssl_cert and spec.ssl_key:
+ cmd_dicts.append({
+ 'prefix': 'dashboard set-iscsi-api-ssl-verification',
+ 'value': "false"
+ })
+ else:
+ cmd_dicts.append({
+ 'prefix': 'dashboard set-iscsi-api-ssl-verification',
+ 'value': "true"
+ })
+ for dd in daemon_descrs:
+ assert dd.hostname is not None
+ # todo: this can fail:
+ spec = cast(IscsiServiceSpec,
+ self.mgr.spec_store.all_specs.get(dd.service_name(), None))
+ if not spec:
+ logger.warning('No ServiceSpec found for %s', dd)
+ continue
+ ip = utils.resolve_ip(self.mgr.inventory.get_addr(dd.hostname))
+ # IPv6 URL encoding requires square brackets enclosing the ip
+ if type(ip_address(ip)) is IPv6Address:
+ ip = f'[{ip}]'
+ protocol = "http"
+ if spec.api_secure and spec.ssl_cert and spec.ssl_key:
+ protocol = "https"
+ service_url = '{}://{}:{}@{}:{}'.format(
+ protocol, spec.api_user or 'admin', spec.api_password or 'admin', ip, spec.api_port or '5000')
+ gw = gateways.get(dd.hostname)
+ if not gw or gw['service_url'] != service_url:
+ safe_service_url = '{}://{}:{}@{}:{}'.format(
+ protocol, '<api-user>', '<api-password>', ip, spec.api_port or '5000')
+ logger.info('Adding iSCSI gateway %s to Dashboard', safe_service_url)
+ cmd_dicts.append({
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'inbuf': service_url,
+ 'name': dd.hostname
+ })
+ return cmd_dicts
+
+ self._check_and_set_dashboard(
+ service_name='iSCSI',
+ get_cmd='dashboard iscsi-gateway-list',
+ get_set_cmd_dicts=get_set_cmd_dicts
+ )
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ # if only 1 iscsi, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Iscsi', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ # if reached here, there is > 1 nfs daemon. make sure none are down
+ warn_message = (
+ 'ALERT: 1 iscsi daemon is already down. Please bring it back up before stopping this one')
+ iscsi_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+ for i in iscsi_daemons:
+ if i.status != DaemonDescriptionStatus.running:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+ warn_message = f'It is presumed safe to stop {names}'
+ return HandleCommandResult(0, warn_message, '')
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ """
+ Called after the daemon is removed.
+ """
+ logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+ # remove config for dashboard iscsi gateways
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'dashboard iscsi-gateway-rm',
+ 'name': daemon.hostname,
+ })
+ if not ret:
+ logger.info(f'{daemon.hostname} removed from iscsi gateways dashboard config')
+
+ # needed to know if we have ssl stuff for iscsi in ceph config
+ iscsi_config_dict = {}
+ ret, iscsi_config, err = self.mgr.mon_command({
+ 'prefix': 'config-key dump',
+ 'key': 'iscsi',
+ })
+ if iscsi_config:
+ iscsi_config_dict = json.loads(iscsi_config)
+
+ # remove iscsi cert and key from ceph config
+ for iscsi_key, value in iscsi_config_dict.items():
+ if f'iscsi/client.{daemon.name()}/' in iscsi_key:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'config-key rm',
+ 'key': iscsi_key,
+ })
+ logger.info(f'{iscsi_key} removed from ceph config')
+
+ def purge(self, service_name: str) -> None:
+ """Removes configuration
+ """
+ spec = cast(IscsiServiceSpec, self.mgr.spec_store[service_name].spec)
+ try:
+ # remove service configuration from the pool
+ try:
+ subprocess.run(['rados',
+ '-k', str(self.mgr.get_ceph_option('keyring')),
+ '-n', f'mgr.{self.mgr.get_mgr_id()}',
+ '-p', cast(str, spec.pool),
+ 'rm',
+ 'gateway.conf'],
+ timeout=5)
+ logger.info(f'<gateway.conf> removed from {spec.pool}')
+ except subprocess.CalledProcessError as ex:
+ logger.error(f'Error executing <<{ex.cmd}>>: {ex.output}')
+ except subprocess.TimeoutExpired:
+ logger.error(f'timeout (5s) trying to remove <gateway.conf> from {spec.pool}')
+
+ except Exception:
+ logger.exception(f'failed to purge {service_name}')
diff --git a/src/pybind/mgr/cephadm/services/monitoring.py b/src/pybind/mgr/cephadm/services/monitoring.py
new file mode 100644
index 000000000..8de7195a3
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/monitoring.py
@@ -0,0 +1,502 @@
+import errno
+import ipaddress
+import logging
+import os
+import socket
+from typing import List, Any, Tuple, Dict, Optional, cast
+from urllib.parse import urlparse
+
+from mgr_module import HandleCommandResult
+
+from orchestrator import DaemonDescription
+from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \
+ SNMPGatewaySpec, PrometheusSpec
+from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
+from cephadm.services.ingress import IngressSpec
+from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url
+
+logger = logging.getLogger(__name__)
+
+
+class GrafanaService(CephadmService):
+ TYPE = 'grafana'
+ DEFAULT_SERVICE_PORT = 3000
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps = [] # type: List[str]
+
+ prom_services = [] # type: List[str]
+ for dd in self.mgr.cache.get_daemons_by_service('prometheus'):
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else 9095
+ prom_services.append(build_url(scheme='http', host=addr, port=port))
+
+ deps.append(dd.name())
+ grafana_data_sources = self.mgr.template.render(
+ 'services/grafana/ceph-dashboard.yml.j2', {'hosts': prom_services})
+
+ cert_path = f'{daemon_spec.host}/grafana_crt'
+ key_path = f'{daemon_spec.host}/grafana_key'
+ cert = self.mgr.get_store(cert_path)
+ pkey = self.mgr.get_store(key_path)
+ if cert and pkey:
+ try:
+ verify_tls(cert, pkey)
+ except ServerConfigException as e:
+ logger.warning('Provided grafana TLS certificates invalid: %s', str(e))
+ cert, pkey = None, None
+ if not (cert and pkey):
+ cert, pkey = create_self_signed_cert('Ceph', daemon_spec.host)
+ self.mgr.set_store(cert_path, cert)
+ self.mgr.set_store(key_path, pkey)
+ if 'dashboard' in self.mgr.get('mgr_map')['modules']:
+ self.mgr.check_mon_command({
+ 'prefix': 'dashboard set-grafana-api-ssl-verify',
+ 'value': 'false',
+ })
+
+ spec: GrafanaSpec = cast(
+ GrafanaSpec, self.mgr.spec_store.active_specs[daemon_spec.service_name])
+ grafana_ini = self.mgr.template.render(
+ 'services/grafana/grafana.ini.j2', {
+ 'initial_admin_password': spec.initial_admin_password,
+ 'http_port': daemon_spec.ports[0] if daemon_spec.ports else self.DEFAULT_SERVICE_PORT,
+ 'http_addr': daemon_spec.ip if daemon_spec.ip else ''
+ })
+
+ if 'dashboard' in self.mgr.get('mgr_map')['modules'] and spec.initial_admin_password:
+ self.mgr.check_mon_command(
+ {'prefix': 'dashboard set-grafana-api-password'}, inbuf=spec.initial_admin_password)
+
+ config_file = {
+ 'files': {
+ "grafana.ini": grafana_ini,
+ 'provisioning/datasources/ceph-dashboard.yml': grafana_data_sources,
+ 'certs/cert_file': '# generated by cephadm\n%s' % cert,
+ 'certs/cert_key': '# generated by cephadm\n%s' % pkey,
+ }
+ }
+ return config_file, sorted(deps)
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # Use the least-created one as the active daemon
+ if daemon_descrs:
+ return daemon_descrs[-1]
+ # if empty list provided, return empty Daemon Desc
+ return DaemonDescription()
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ # TODO: signed cert
+ dd = self.get_active_daemon(daemon_descrs)
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else self.DEFAULT_SERVICE_PORT
+ service_url = build_url(scheme='https', host=addr, port=port)
+ self._set_service_url_on_dashboard(
+ 'Grafana',
+ 'dashboard get-grafana-api-url',
+ 'dashboard set-grafana-api-url',
+ service_url
+ )
+
+ def pre_remove(self, daemon: DaemonDescription) -> None:
+ """
+ Called before grafana daemon is removed.
+ """
+ if daemon.hostname is not None:
+ # delete cert/key entires for this grafana daemon
+ cert_path = f'{daemon.hostname}/grafana_crt'
+ key_path = f'{daemon.hostname}/grafana_key'
+ self.mgr.set_store(cert_path, None)
+ self.mgr.set_store(key_path, None)
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Grafana', 1)
+ if warn and not force:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ return HandleCommandResult(0, warn_message, '')
+
+
+class AlertmanagerService(CephadmService):
+ TYPE = 'alertmanager'
+ DEFAULT_SERVICE_PORT = 9093
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps: List[str] = []
+ default_webhook_urls: List[str] = []
+
+ spec = cast(AlertManagerSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ try:
+ secure = spec.secure
+ except AttributeError:
+ secure = False
+ user_data = spec.user_data
+ if 'default_webhook_urls' in user_data and isinstance(
+ user_data['default_webhook_urls'], list):
+ default_webhook_urls.extend(user_data['default_webhook_urls'])
+
+ # dashboard(s)
+ dashboard_urls: List[str] = []
+ snmp_gateway_urls: List[str] = []
+ mgr_map = self.mgr.get('mgr_map')
+ port = None
+ proto = None # http: or https:
+ url = mgr_map.get('services', {}).get('dashboard', None)
+ if url:
+ p_result = urlparse(url.rstrip('/'))
+ hostname = socket.getfqdn(p_result.hostname)
+
+ try:
+ ip = ipaddress.ip_address(hostname)
+ except ValueError:
+ pass
+ else:
+ if ip.version == 6:
+ hostname = f'[{hostname}]'
+
+ dashboard_urls.append(
+ f'{p_result.scheme}://{hostname}:{p_result.port}{p_result.path}')
+ proto = p_result.scheme
+ port = p_result.port
+ # scan all mgrs to generate deps and to get standbys too.
+ # assume that they are all on the same port as the active mgr.
+ for dd in self.mgr.cache.get_daemons_by_service('mgr'):
+ # we consider mgr a dep even if the dashboard is disabled
+ # in order to be consistent with _calc_daemon_deps().
+ deps.append(dd.name())
+ if not port:
+ continue
+ if dd.daemon_id == self.mgr.get_mgr_id():
+ continue
+ assert dd.hostname is not None
+ addr = self._inventory_get_fqdn(dd.hostname)
+ dashboard_urls.append(build_url(scheme=proto, host=addr, port=port).rstrip('/'))
+
+ for dd in self.mgr.cache.get_daemons_by_service('snmp-gateway'):
+ assert dd.hostname is not None
+ assert dd.ports
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ deps.append(dd.name())
+
+ snmp_gateway_urls.append(build_url(scheme='http', host=addr,
+ port=dd.ports[0], path='/alerts'))
+
+ context = {
+ 'dashboard_urls': dashboard_urls,
+ 'default_webhook_urls': default_webhook_urls,
+ 'snmp_gateway_urls': snmp_gateway_urls,
+ 'secure': secure,
+ }
+ yml = self.mgr.template.render('services/alertmanager/alertmanager.yml.j2', context)
+
+ peers = []
+ port = 9094
+ for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
+ assert dd.hostname is not None
+ deps.append(dd.name())
+ addr = self._inventory_get_fqdn(dd.hostname)
+ peers.append(build_url(host=addr, port=port).lstrip('/'))
+
+ return {
+ "files": {
+ "alertmanager.yml": yml
+ },
+ "peers": peers
+ }, sorted(deps)
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # TODO: if there are multiple daemons, who is the active one?
+ if daemon_descrs:
+ return daemon_descrs[0]
+ # if empty list provided, return empty Daemon Desc
+ return DaemonDescription()
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ dd = self.get_active_daemon(daemon_descrs)
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else self.DEFAULT_SERVICE_PORT
+ service_url = build_url(scheme='http', host=addr, port=port)
+ self._set_service_url_on_dashboard(
+ 'AlertManager',
+ 'dashboard get-alertmanager-api-host',
+ 'dashboard set-alertmanager-api-host',
+ service_url
+ )
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Alertmanager', 1)
+ if warn and not force:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ return HandleCommandResult(0, warn_message, '')
+
+
+class PrometheusService(CephadmService):
+ TYPE = 'prometheus'
+ DEFAULT_SERVICE_PORT = 9095
+ DEFAULT_MGR_PROMETHEUS_PORT = 9283
+
+ def config(self, spec: ServiceSpec) -> None:
+ # make sure module is enabled
+ mgr_map = self.mgr.get('mgr_map')
+ if 'prometheus' not in mgr_map.get('services', {}):
+ self.mgr.check_mon_command({
+ 'prefix': 'mgr module enable',
+ 'module': 'prometheus'
+ })
+ # we shouldn't get here (mon will tell the mgr to respawn), but no
+ # harm done if we do.
+
+ def prepare_create(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ ) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps = [] # type: List[str]
+
+ prom_spec = cast(PrometheusSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ try:
+ retention_time = prom_spec.retention_time if prom_spec.retention_time else '15d'
+ except AttributeError:
+ retention_time = '15d'
+
+ # scrape mgrs
+ mgr_scrape_list = []
+ mgr_map = self.mgr.get('mgr_map')
+ port = cast(int, self.mgr.get_module_option_ex(
+ 'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT))
+ deps.append(str(port))
+ t = mgr_map.get('services', {}).get('prometheus', None)
+ if t:
+ p_result = urlparse(t)
+ # urlparse .hostname removes '[]' from the hostname in case
+ # of ipv6 addresses so if this is the case then we just
+ # append the brackets when building the final scrape endpoint
+ if '[' in p_result.netloc and ']' in p_result.netloc:
+ mgr_scrape_list.append(f"[{p_result.hostname}]:{port}")
+ else:
+ mgr_scrape_list.append(f"{p_result.hostname}:{port}")
+ # scan all mgrs to generate deps and to get standbys too.
+ # assume that they are all on the same port as the active mgr.
+ for dd in self.mgr.cache.get_daemons_by_service('mgr'):
+ # we consider the mgr a dep even if the prometheus module is
+ # disabled in order to be consistent with _calc_daemon_deps().
+ deps.append(dd.name())
+ if not port:
+ continue
+ if dd.daemon_id == self.mgr.get_mgr_id():
+ continue
+ assert dd.hostname is not None
+ addr = self._inventory_get_fqdn(dd.hostname)
+ mgr_scrape_list.append(build_url(host=addr, port=port).lstrip('/'))
+
+ # scrape node exporters
+ nodes = []
+ for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
+ assert dd.hostname is not None
+ deps.append(dd.name())
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else 9100
+ nodes.append({
+ 'hostname': dd.hostname,
+ 'url': build_url(host=addr, port=port).lstrip('/')
+ })
+
+ # scrape alert managers
+ alertmgr_targets = []
+ for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
+ assert dd.hostname is not None
+ deps.append(dd.name())
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else 9093
+ alertmgr_targets.append("'{}'".format(build_url(host=addr, port=port).lstrip('/')))
+
+ # scrape haproxies
+ haproxy_targets = []
+ for dd in self.mgr.cache.get_daemons_by_type('ingress'):
+ if dd.service_name() in self.mgr.spec_store:
+ spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec)
+ assert dd.hostname is not None
+ deps.append(dd.name())
+ if dd.daemon_type == 'haproxy':
+ addr = self._inventory_get_fqdn(dd.hostname)
+ haproxy_targets.append({
+ "url": f"'{build_url(host=addr, port=spec.monitor_port).lstrip('/')}'",
+ "service": dd.service_name(),
+ })
+
+ # generate the prometheus configuration
+ context = {
+ 'alertmgr_targets': alertmgr_targets,
+ 'mgr_scrape_list': mgr_scrape_list,
+ 'haproxy_targets': haproxy_targets,
+ 'nodes': nodes,
+ }
+ r: Dict[str, Any] = {
+ 'files': {
+ 'prometheus.yml':
+ self.mgr.template.render(
+ 'services/prometheus/prometheus.yml.j2', context)
+ },
+ 'retention_time': retention_time
+ }
+
+ # include alerts, if present in the container
+ if os.path.exists(self.mgr.prometheus_alerts_path):
+ with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f:
+ alerts = f.read()
+ r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
+
+ # Include custom alerts if present in key value store. This enables the
+ # users to add custom alerts. Write the file in any case, so that if the
+ # content of the key value store changed, that file is overwritten
+ # (emptied in case they value has been removed from the key value
+ # store). This prevents the necessity to adapt `cephadm` binary to
+ # remove the file.
+ #
+ # Don't use the template engine for it as
+ #
+ # 1. the alerts are always static and
+ # 2. they are a template themselves for the Go template engine, which
+ # use curly braces and escaping that is cumbersome and unnecessary
+ # for the user.
+ #
+ r['files']['/etc/prometheus/alerting/custom_alerts.yml'] = \
+ self.mgr.get_store('services/prometheus/alerting/custom_alerts.yml', '')
+
+ return r, sorted(deps)
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # TODO: if there are multiple daemons, who is the active one?
+ if daemon_descrs:
+ return daemon_descrs[0]
+ # if empty list provided, return empty Daemon Desc
+ return DaemonDescription()
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ dd = self.get_active_daemon(daemon_descrs)
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
+ port = dd.ports[0] if dd.ports else self.DEFAULT_SERVICE_PORT
+ service_url = build_url(scheme='http', host=addr, port=port)
+ self._set_service_url_on_dashboard(
+ 'Prometheus',
+ 'dashboard get-prometheus-api-host',
+ 'dashboard set-prometheus-api-host',
+ service_url
+ )
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Prometheus', 1)
+ if warn and not force:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ return HandleCommandResult(0, warn_message, '')
+
+
+class NodeExporterService(CephadmService):
+ TYPE = 'node-exporter'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ return {}, []
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ # since node exporter runs on each host and cannot compromise data, no extra checks required
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+ out = f'It is presumed safe to stop {names}'
+ return HandleCommandResult(0, out, '')
+
+
+class SNMPGatewayService(CephadmService):
+ TYPE = 'snmp-gateway'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps: List[str] = []
+
+ spec = cast(SNMPGatewaySpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ config = {
+ "destination": spec.snmp_destination,
+ "snmp_version": spec.snmp_version,
+ }
+ if spec.snmp_version == 'V2c':
+ community = spec.credentials.get('snmp_community', None)
+ assert community is not None
+
+ config.update({
+ "snmp_community": community
+ })
+ else:
+ # SNMP v3 settings can be either authNoPriv or authPriv
+ auth_protocol = 'SHA' if not spec.auth_protocol else spec.auth_protocol
+
+ auth_username = spec.credentials.get('snmp_v3_auth_username', None)
+ auth_password = spec.credentials.get('snmp_v3_auth_password', None)
+ assert auth_username is not None
+ assert auth_password is not None
+ assert spec.engine_id is not None
+
+ config.update({
+ "snmp_v3_auth_protocol": auth_protocol,
+ "snmp_v3_auth_username": auth_username,
+ "snmp_v3_auth_password": auth_password,
+ "snmp_v3_engine_id": spec.engine_id,
+ })
+ # authPriv adds encryption
+ if spec.privacy_protocol:
+ priv_password = spec.credentials.get('snmp_v3_priv_password', None)
+ assert priv_password is not None
+
+ config.update({
+ "snmp_v3_priv_protocol": spec.privacy_protocol,
+ "snmp_v3_priv_password": priv_password,
+ })
+
+ logger.debug(
+ f"Generated configuration for '{self.TYPE}' service. Dependencies={deps}")
+
+ return config, sorted(deps)
diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py
new file mode 100644
index 000000000..ee53283bd
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/nfs.py
@@ -0,0 +1,290 @@
+import errno
+import logging
+import os
+import subprocess
+import tempfile
+from typing import Dict, Tuple, Any, List, cast, Optional
+
+from mgr_module import HandleCommandResult
+from mgr_module import NFS_POOL_NAME as POOL_NAME
+
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
+
+from orchestrator import DaemonDescription
+
+from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonDeploySpec, CephService
+
+logger = logging.getLogger(__name__)
+
+
+class NFSService(CephService):
+ TYPE = 'nfs'
+
+ def ranked(self) -> bool:
+ return True
+
+ def fence(self, daemon_id: str) -> None:
+ logger.info(f'Fencing old nfs.{daemon_id}')
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth rm',
+ 'entity': f'client.nfs.{daemon_id}',
+ })
+
+ # TODO: block/fence this entity (in case it is still running somewhere)
+
+ def fence_old_ranks(self,
+ spec: ServiceSpec,
+ rank_map: Dict[int, Dict[int, Optional[str]]],
+ num_ranks: int) -> None:
+ for rank, m in list(rank_map.items()):
+ if rank >= num_ranks:
+ for daemon_id in m.values():
+ if daemon_id is not None:
+ self.fence(daemon_id)
+ del rank_map[rank]
+ nodeid = f'{spec.service_name()}.{rank}'
+ self.mgr.log.info(f'Removing {nodeid} from the ganesha grace table')
+ self.run_grace_tool(cast(NFSServiceSpec, spec), 'remove', nodeid)
+ self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
+ else:
+ max_gen = max(m.keys())
+ for gen, daemon_id in list(m.items()):
+ if gen < max_gen:
+ if daemon_id is not None:
+ self.fence(daemon_id)
+ del rank_map[rank][gen]
+ self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
+
+ def config(self, spec: NFSServiceSpec) -> None: # type: ignore
+ from nfs.cluster import create_ganesha_pool
+
+ assert self.TYPE == spec.service_type
+ create_ganesha_pool(self.mgr)
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+
+ daemon_type = daemon_spec.daemon_type
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+ spec = cast(NFSServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ deps: List[str] = []
+
+ nodeid = f'{daemon_spec.service_name}.{daemon_spec.rank}'
+
+ # create the RADOS recovery pool keyring
+ rados_user = f'{daemon_type}.{daemon_id}'
+ rados_keyring = self.create_keyring(daemon_spec)
+
+ # ensure rank is known to ganesha
+ self.mgr.log.info(f'Ensuring {nodeid} is in the ganesha grace table')
+ self.run_grace_tool(spec, 'add', nodeid)
+
+ # create the rados config object
+ self.create_rados_config_obj(spec)
+
+ # create the RGW keyring
+ rgw_user = f'{rados_user}-rgw'
+ rgw_keyring = self.create_rgw_keyring(daemon_spec)
+
+ # generate the ganesha config
+ def get_ganesha_conf() -> str:
+ context = {
+ "user": rados_user,
+ "nodeid": nodeid,
+ "pool": POOL_NAME,
+ "namespace": spec.service_id,
+ "rgw_user": rgw_user,
+ "url": f'rados://{POOL_NAME}/{spec.service_id}/{spec.rados_config_name()}',
+ # fall back to default NFS port if not present in daemon_spec
+ "port": daemon_spec.ports[0] if daemon_spec.ports else 2049,
+ "bind_addr": daemon_spec.ip if daemon_spec.ip else '',
+ }
+ return self.mgr.template.render('services/nfs/ganesha.conf.j2', context)
+
+ # generate the cephadm config json
+ def get_cephadm_config() -> Dict[str, Any]:
+ config: Dict[str, Any] = {}
+ config['pool'] = POOL_NAME
+ config['namespace'] = spec.service_id
+ config['userid'] = rados_user
+ config['extra_args'] = ['-N', 'NIV_EVENT']
+ config['files'] = {
+ 'ganesha.conf': get_ganesha_conf(),
+ }
+ config.update(
+ self.get_config_and_keyring(
+ daemon_type, daemon_id,
+ keyring=rados_keyring,
+ host=host
+ )
+ )
+ config['rgw'] = {
+ 'cluster': 'ceph',
+ 'user': rgw_user,
+ 'keyring': rgw_keyring,
+ }
+ logger.debug('Generated cephadm config-json: %s' % config)
+ return config
+
+ return get_cephadm_config(), deps
+
+ def create_rados_config_obj(self,
+ spec: NFSServiceSpec,
+ clobber: bool = False) -> None:
+ objname = spec.rados_config_name()
+ cmd = [
+ 'rados',
+ '-n', f"mgr.{self.mgr.get_mgr_id()}",
+ '-k', str(self.mgr.get_ceph_option('keyring')),
+ '-p', POOL_NAME,
+ '--namespace', cast(str, spec.service_id),
+ ]
+ result = subprocess.run(
+ cmd + ['get', objname, '-'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ timeout=10)
+ if not result.returncode and not clobber:
+ logger.info('Rados config object exists: %s' % objname)
+ else:
+ logger.info('Creating rados config object: %s' % objname)
+ result = subprocess.run(
+ cmd + ['put', objname, '-'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ timeout=10)
+ if result.returncode:
+ self.mgr.log.warning(
+ f'Unable to create rados config object {objname}: {result.stderr.decode("utf-8")}'
+ )
+ raise RuntimeError(result.stderr.decode("utf-8"))
+
+ def create_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
+ daemon_id = daemon_spec.daemon_id
+ spec = cast(NFSServiceSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ entity: AuthEntity = self.get_auth_entity(daemon_id)
+
+ osd_caps = 'allow rw pool=%s namespace=%s' % (POOL_NAME, spec.service_id)
+
+ logger.info('Creating key for %s' % entity)
+ keyring = self.get_keyring_with_caps(entity,
+ ['mon', 'allow r',
+ 'osd', osd_caps])
+
+ return keyring
+
+ def create_rgw_keyring(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
+ daemon_id = daemon_spec.daemon_id
+ entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw')
+
+ logger.info('Creating key for %s' % entity)
+ keyring = self.get_keyring_with_caps(entity,
+ ['mon', 'allow r',
+ 'osd', 'allow rwx tag rgw *=*'])
+
+ return keyring
+
+ def run_grace_tool(self,
+ spec: NFSServiceSpec,
+ action: str,
+ nodeid: str) -> None:
+ # write a temp keyring and referencing config file. this is a kludge
+ # because the ganesha-grace-tool can only authenticate as a client (and
+ # not a mgr). Also, it doesn't allow you to pass a keyring location via
+ # the command line, nor does it parse the CEPH_ARGS env var.
+ tmp_id = f'mgr.nfs.grace.{spec.service_name()}'
+ entity = AuthEntity(f'client.{tmp_id}')
+ keyring = self.get_keyring_with_caps(
+ entity,
+ ['mon', 'allow r', 'osd', f'allow rwx pool {POOL_NAME}']
+ )
+ tmp_keyring = tempfile.NamedTemporaryFile(mode='w', prefix='mgr-grace-keyring')
+ os.fchmod(tmp_keyring.fileno(), 0o600)
+ tmp_keyring.write(keyring)
+ tmp_keyring.flush()
+ tmp_conf = tempfile.NamedTemporaryFile(mode='w', prefix='mgr-grace-conf')
+ tmp_conf.write(self.mgr.get_minimal_ceph_conf())
+ tmp_conf.write(f'\tkeyring = {tmp_keyring.name}\n')
+ tmp_conf.flush()
+ try:
+ cmd: List[str] = [
+ 'ganesha-rados-grace',
+ '--cephconf', tmp_conf.name,
+ '--userid', tmp_id,
+ '--pool', POOL_NAME,
+ '--ns', cast(str, spec.service_id),
+ action, nodeid,
+ ]
+ self.mgr.log.debug(cmd)
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ timeout=10)
+ if result.returncode:
+ self.mgr.log.warning(
+ f'ganesha-rados-grace tool failed: {result.stderr.decode("utf-8")}'
+ )
+ raise RuntimeError(f'grace tool failed: {result.stderr.decode("utf-8")}')
+
+ finally:
+ self.mgr.check_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+
+ def remove_rgw_keyring(self, daemon: DaemonDescription) -> None:
+ assert daemon.daemon_id is not None
+ daemon_id: str = daemon.daemon_id
+ entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw')
+
+ logger.info(f'Removing key for {entity}')
+ self.mgr.check_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ self.remove_rgw_keyring(daemon)
+
+ def ok_to_stop(self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None) -> HandleCommandResult:
+ # if only 1 nfs, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'NFS', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ # if reached here, there is > 1 nfs daemon.
+ if force:
+ return HandleCommandResult(0, warn_message, '')
+
+ # if reached here, > 1 nfs daemon and no force flag.
+ # Provide warning
+ warn_message = "WARNING: Removing NFS daemons can cause clients to lose connectivity. "
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ def purge(self, service_name: str) -> None:
+ if service_name not in self.mgr.spec_store:
+ return
+ spec = cast(NFSServiceSpec, self.mgr.spec_store[service_name].spec)
+
+ logger.info(f'Removing grace file for {service_name}')
+ cmd = [
+ 'rados',
+ '-n', f"mgr.{self.mgr.get_mgr_id()}",
+ '-k', str(self.mgr.get_ceph_option('keyring')),
+ '-p', POOL_NAME,
+ '--namespace', cast(str, spec.service_id),
+ 'rm', 'grace',
+ ]
+ subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ timeout=10
+ )
diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py
new file mode 100644
index 000000000..5899ba49a
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/osd.py
@@ -0,0 +1,956 @@
+import json
+import logging
+from threading import Lock
+from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
+
+from ceph.deployment import translate
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.drive_selection import DriveSelection
+from ceph.deployment.inventory import Device
+from ceph.utils import datetime_to_str, str_to_datetime
+
+from datetime import datetime
+import orchestrator
+from cephadm.serve import CephadmServe
+from cephadm.utils import forall_hosts
+from ceph.utils import datetime_now
+from orchestrator import OrchestratorError, DaemonDescription
+from mgr_module import MonCommandFailed
+
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+
+class OSDService(CephService):
+ TYPE = 'osd'
+
+ def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
+ logger.debug(f"Processing DriveGroup {drive_group}")
+ osd_id_claims = OsdIdClaims(self.mgr)
+ if osd_id_claims.get():
+ logger.info(
+ f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
+
+ @forall_hosts
+ def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
+ # skip this host if there has been no change in inventory
+ if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
+ self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
+ host, drive_group))
+ return None
+ # skip this host if we cannot schedule here
+ if self.mgr.inventory.has_label(host, '_no_schedule'):
+ return None
+
+ osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
+
+ cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
+ osd_id_claims_for_host)
+ if not cmds:
+ logger.debug("No data_devices, skipping DriveGroup: {}".format(
+ drive_group.service_id))
+ return None
+
+ logger.debug('Applying service osd.%s on host %s...' % (
+ drive_group.service_id, host
+ ))
+ start_ts = datetime_now()
+ env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
+ ret_msg = self.create_single_host(
+ drive_group, host, cmds,
+ replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
+ )
+ self.mgr.cache.update_osdspec_last_applied(
+ host, drive_group.service_name(), start_ts
+ )
+ self.mgr.cache.save_host(host)
+ return ret_msg
+
+ ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
+ return ", ".join(filter(None, ret))
+
+ def create_single_host(self,
+ drive_group: DriveGroupSpec,
+ host: str, cmds: List[str], replace_osd_ids: List[str],
+ env_vars: Optional[List[str]] = None) -> str:
+ for cmd in cmds:
+ out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
+ if code == 1 and ', it is already prepared' in '\n'.join(err):
+ # HACK: when we create against an existing LV, ceph-volume
+ # returns an error and the above message. To make this
+ # command idempotent, tolerate this "error" and continue.
+ logger.debug('the device was already prepared; continuing')
+ code = 0
+ if code:
+ raise RuntimeError(
+ 'cephadm exited with an error code: %d, stderr:%s' % (
+ code, '\n'.join(err)))
+ return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
+ replace_osd_ids)
+
+ def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
+ replace_osd_ids: Optional[List[str]] = None) -> str:
+
+ if replace_osd_ids is None:
+ replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
+ assert replace_osd_ids is not None
+
+ # check result: lvm
+ osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
+ host, 'osd', 'ceph-volume',
+ [
+ '--',
+ 'lvm', 'list',
+ '--format', 'json',
+ ])
+ before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
+ fsid = self.mgr._cluster_fsid
+ osd_uuid_map = self.mgr.get_osd_uuid_map()
+ created = []
+ for osd_id, osds in osds_elems.items():
+ for osd in osds:
+ if osd['type'] == 'db':
+ continue
+ if osd['tags']['ceph.cluster_fsid'] != fsid:
+ logger.debug('mismatched fsid, skipping %s' % osd)
+ continue
+ if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
+ # if it exists but is part of the replacement operation, don't skip
+ continue
+ if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
+ # cephadm daemon instance already exists
+ logger.debug(f'osd id {osd_id} daemon already exists')
+ continue
+ if osd_id not in osd_uuid_map:
+ logger.debug('osd id {} does not exist in cluster'.format(osd_id))
+ continue
+ if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
+ logger.debug('mismatched osd uuid (cluster has %s, osd '
+ 'has %s)' % (
+ osd_uuid_map.get(osd_id),
+ osd['tags']['ceph.osd_fsid']))
+ continue
+
+ created.append(osd_id)
+ daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
+ service_name=service_name,
+ daemon_id=str(osd_id),
+ host=host,
+ daemon_type='osd',
+ )
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ CephadmServe(self.mgr)._create_daemon(
+ daemon_spec,
+ osd_uuid_map=osd_uuid_map)
+
+ # check result: raw
+ raw_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
+ host, 'osd', 'ceph-volume',
+ [
+ '--',
+ 'raw', 'list',
+ '--format', 'json',
+ ])
+ for osd_uuid, osd in raw_elems.items():
+ if osd.get('ceph_fsid') != fsid:
+ continue
+ osd_id = str(osd.get('osd_id', '-1'))
+ if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
+ # if it exists but is part of the replacement operation, don't skip
+ continue
+ if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
+ # cephadm daemon instance already exists
+ logger.debug(f'osd id {osd_id} daemon already exists')
+ continue
+ if osd_id not in osd_uuid_map:
+ logger.debug('osd id {} does not exist in cluster'.format(osd_id))
+ continue
+ if osd_uuid_map.get(osd_id) != osd_uuid:
+ logger.debug('mismatched osd uuid (cluster has %s, osd '
+ 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid))
+ continue
+ if osd_id in created:
+ continue
+
+ created.append(osd_id)
+ daemon_spec = CephadmDaemonDeploySpec(
+ service_name=service_name,
+ daemon_id=osd_id,
+ host=host,
+ daemon_type='osd',
+ )
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ CephadmServe(self.mgr)._create_daemon(
+ daemon_spec,
+ osd_uuid_map=osd_uuid_map)
+
+ if created:
+ self.mgr.cache.invalidate_host_devices(host)
+ self.mgr.cache.invalidate_autotune(host)
+ return "Created osd(s) %s on host '%s'" % (','.join(created), host)
+ else:
+ return "Created no osd(s) on host %s; already created?" % host
+
+ def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
+ # 1) use fn_filter to determine matching_hosts
+ matching_hosts = drive_group.placement.filter_matching_hostspecs(
+ self.mgr._schedulable_hosts())
+ # 2) Map the inventory to the InventoryHost object
+ host_ds_map = []
+
+ # set osd_id_claims
+
+ def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
+ # This is stupid and needs to be loaded with the host
+ for _host, _inventory in inventory_dict.items():
+ if _host == hostname:
+ return _inventory
+ raise OrchestratorError("No inventory found for host: {}".format(hostname))
+
+ # 3) iterate over matching_host and call DriveSelection
+ logger.debug(f"Checking matching hosts -> {matching_hosts}")
+ for host in matching_hosts:
+ inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
+ logger.debug(f"Found inventory for host {inventory_for_host}")
+
+ # List of Daemons on that host
+ dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
+ dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
+
+ drive_selection = DriveSelection(drive_group, inventory_for_host,
+ existing_daemons=len(dd_for_spec_and_host))
+ logger.debug(f"Found drive selection {drive_selection}")
+ if drive_group.method and drive_group.method == 'raw':
+ # ceph-volume can currently only handle a 1:1 mapping
+ # of data/db/wal devices for raw mode osds. If db/wal devices
+ # are defined and the number does not match the number of data
+ # devices, we need to bail out
+ if drive_selection.data_devices() and drive_selection.db_devices():
+ if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
+ raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
+ f'{len(drive_selection.data_devices())} potential data device(s) and '
+ f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
+ if drive_selection.data_devices() and drive_selection.wal_devices():
+ if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
+ raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
+ f'{len(drive_selection.data_devices())} potential data device(s) and '
+ f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
+ host_ds_map.append((host, drive_selection))
+ return host_ds_map
+
+ @staticmethod
+ def driveselection_to_ceph_volume(drive_selection: DriveSelection,
+ osd_id_claims: Optional[List[str]] = None,
+ preview: bool = False) -> List[str]:
+ logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
+ cmds: List[str] = translate.to_ceph_volume(drive_selection,
+ osd_id_claims, preview=preview).run()
+ logger.debug(f"Resulting ceph-volume cmds: {cmds}")
+ return cmds
+
+ def get_previews(self, host: str) -> List[Dict[str, Any]]:
+ # Find OSDSpecs that match host.
+ osdspecs = self.resolve_osdspecs_for_host(host)
+ return self.generate_previews(osdspecs, host)
+
+ def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
+ """
+
+ The return should look like this:
+
+ [
+ {'data': {<metadata>},
+ 'osdspec': <name of osdspec>,
+ 'host': <name of host>,
+ 'notes': <notes>
+ },
+
+ {'data': ...,
+ 'osdspec': ..,
+ 'host': ...,
+ 'notes': ...
+ }
+ ]
+
+ Note: One host can have multiple previews based on its assigned OSDSpecs.
+ """
+ self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
+ ret_all: List[Dict[str, Any]] = []
+ if not osdspecs:
+ return ret_all
+ for osdspec in osdspecs:
+
+ # populate osd_id_claims
+ osd_id_claims = OsdIdClaims(self.mgr)
+
+ # prepare driveselection
+ for host, ds in self.prepare_drivegroup(osdspec):
+ if host != for_host:
+ continue
+
+ # driveselection for host
+ cmds: List[str] = self.driveselection_to_ceph_volume(ds,
+ osd_id_claims.filtered_by_host(host),
+ preview=True)
+ if not cmds:
+ logger.debug("No data_devices, skipping DriveGroup: {}".format(
+ osdspec.service_name()))
+ continue
+
+ # get preview data from ceph-volume
+ for cmd in cmds:
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+ if out:
+ try:
+ concat_out: Dict[str, Any] = json.loads(' '.join(out))
+ except ValueError:
+ logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
+ concat_out = {}
+ notes = []
+ if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
+ found = len(concat_out)
+ limit = osdspec.data_devices.limit
+ notes.append(
+ f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
+ ret_all.append({'data': concat_out,
+ 'osdspec': osdspec.service_id,
+ 'host': host,
+ 'notes': notes})
+ return ret_all
+
+ def resolve_hosts_for_osdspecs(self,
+ specs: Optional[List[DriveGroupSpec]] = None
+ ) -> List[str]:
+ osdspecs = []
+ if specs:
+ osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
+ if not osdspecs:
+ self.mgr.log.debug("No OSDSpecs found")
+ return []
+ return sum([spec.placement.filter_matching_hostspecs(self.mgr._schedulable_hosts()) for spec in osdspecs], [])
+
+ def resolve_osdspecs_for_host(self, host: str,
+ specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
+ matching_specs = []
+ self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
+ if not specs:
+ specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
+ if spec.service_type == 'osd']
+ for spec in specs:
+ if host in spec.placement.filter_matching_hostspecs(self.mgr._schedulable_hosts()):
+ self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
+ matching_specs.append(spec)
+ return matching_specs
+
+ def _run_ceph_volume_command(self, host: str,
+ cmd: str, env_vars: Optional[List[str]] = None
+ ) -> Tuple[List[str], List[str], int]:
+ self.mgr.inventory.assert_host(host)
+
+ # get bootstrap key
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get',
+ 'entity': 'client.bootstrap-osd',
+ })
+
+ j = json.dumps({
+ 'config': self.mgr.get_minimal_ceph_conf(),
+ 'keyring': keyring,
+ })
+
+ split_cmd = cmd.split(' ')
+ _cmd = ['--config-json', '-', '--']
+ _cmd.extend(split_cmd)
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
+ host, 'osd', 'ceph-volume',
+ _cmd,
+ env_vars=env_vars,
+ stdin=j,
+ error_ok=True)
+ return out, err, code
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
+ # we cannot recover from it. The OSD keys are created by ceph-volume and not by
+ # us.
+ if not is_failed_deploy:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+
+
+class OsdIdClaims(object):
+ """
+ Retrieve and provide osd ids that can be reused in the cluster
+ """
+
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
+ self.osd_host_map: Dict[str, List[str]] = dict()
+ self.refresh()
+
+ def refresh(self) -> None:
+ try:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'osd tree',
+ 'states': ['destroyed'],
+ 'format': 'json'
+ })
+ except MonCommandFailed as e:
+ logger.exception('osd tree failed')
+ raise OrchestratorError(str(e))
+ try:
+ tree = json.loads(out)
+ except ValueError:
+ logger.exception(f'Cannot decode JSON: \'{out}\'')
+ return
+
+ nodes = tree.get('nodes', {})
+ for node in nodes:
+ if node.get('type') == 'host':
+ self.osd_host_map.update(
+ {node.get('name'): [str(_id) for _id in node.get('children', list())]}
+ )
+ if self.osd_host_map:
+ self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
+
+ def get(self) -> Dict[str, List[str]]:
+ return self.osd_host_map
+
+ def filtered_by_host(self, host: str) -> List[str]:
+ """
+ Return the list of osd ids that can be reused in a host
+
+ OSD id claims in CRUSH map are linked to the bare name of
+ the hostname. In case of FQDN hostnames the host is searched by the
+ bare name
+ """
+ return self.osd_host_map.get(host.split(".")[0], [])
+
+
+class RemoveUtil(object):
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
+
+ def get_osds_in_cluster(self) -> List[str]:
+ osd_map = self.mgr.get_osdmap()
+ return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
+
+ def osd_df(self) -> dict:
+ base_cmd = 'osd df'
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': base_cmd,
+ 'format': 'json'
+ })
+ try:
+ return json.loads(out)
+ except ValueError:
+ logger.exception(f'Cannot decode JSON: \'{out}\'')
+ return {}
+
+ def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
+ if not osd_df:
+ osd_df = self.osd_df()
+ osd_nodes = osd_df.get('nodes', [])
+ for osd_node in osd_nodes:
+ if osd_node.get('id') == int(osd_id):
+ return osd_node.get('pgs', -1)
+ return -1
+
+ def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
+ """
+ Cut osd_id list in half until it's ok-to-stop
+
+ :param osds: list of osd_ids
+ :return: list of ods_ids that can be stopped at once
+ """
+ if not osds:
+ return []
+ while not self.ok_to_stop(osds):
+ if len(osds) <= 1:
+ # can't even stop one OSD, aborting
+ self.mgr.log.debug(
+ "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
+ return []
+
+ # This potentially prolongs the global wait time.
+ self.mgr.event.wait(1)
+ # splitting osd_ids in half until ok_to_stop yields success
+ # maybe popping ids off one by one is better here..depends on the cluster size I guess..
+ # There's a lot of room for micro adjustments here
+ osds = osds[len(osds) // 2:]
+ return osds
+
+ # todo start draining
+ # return all([osd.start_draining() for osd in osds])
+
+ def ok_to_stop(self, osds: List["OSD"]) -> bool:
+ cmd_args = {
+ 'prefix': "osd ok-to-stop",
+ 'ids': [str(osd.osd_id) for osd in osds]
+ }
+ return self._run_mon_cmd(cmd_args, error_ok=True)
+
+ def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
+ base_cmd = f"osd {flag}"
+ self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': base_cmd,
+ 'ids': [str(osd.osd_id) for osd in osds]
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
+ return False
+ self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
+ return True
+
+ def get_weight(self, osd: "OSD") -> Optional[float]:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'osd crush tree',
+ 'format': 'json',
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Could not dump crush weights. <{err}>")
+ return None
+ j = json.loads(out)
+ for n in j.get("nodes", []):
+ if n.get("name") == f"osd.{osd.osd_id}":
+ self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
+ return n.get("crush_weight")
+ return None
+
+ def reweight_osd(self, osd: "OSD", weight: float) -> bool:
+ self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': "osd crush reweight",
+ 'name': f"osd.{osd.osd_id}",
+ 'weight': weight,
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
+ return False
+ self.mgr.log.info(f"{osd} weight is now {weight}")
+ return True
+
+ def zap_osd(self, osd: "OSD") -> str:
+ "Zaps all devices that are associated with an OSD"
+ if osd.hostname is not None:
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
+ osd.hostname, 'osd', 'ceph-volume',
+ ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
+ error_ok=True)
+ self.mgr.cache.invalidate_host_devices(osd.hostname)
+ if code:
+ raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
+ return '\n'.join(out + err)
+ raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
+
+ def safe_to_destroy(self, osd_ids: List[int]) -> bool:
+ """ Queries the safe-to-destroy flag for OSDs """
+ cmd_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': [str(x) for x in osd_ids]}
+ return self._run_mon_cmd(cmd_args, error_ok=True)
+
+ def destroy_osd(self, osd_id: int) -> bool:
+ """ Destroys an OSD (forcefully) """
+ cmd_args = {'prefix': 'osd destroy-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True}
+ return self._run_mon_cmd(cmd_args)
+
+ def purge_osd(self, osd_id: int) -> bool:
+ """ Purges an OSD from the cluster (forcefully) """
+ cmd_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool:
+ """
+ Generic command to run mon_command and evaluate/log the results
+ """
+ ret, out, err = self.mgr.mon_command(cmd_args)
+ if ret != 0:
+ self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+ if not error_ok:
+ self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+ return False
+ self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+ return True
+
+
+class NotFoundError(Exception):
+ pass
+
+
+class OSD:
+
+ def __init__(self,
+ osd_id: int,
+ remove_util: RemoveUtil,
+ drain_started_at: Optional[datetime] = None,
+ process_started_at: Optional[datetime] = None,
+ drain_stopped_at: Optional[datetime] = None,
+ drain_done_at: Optional[datetime] = None,
+ draining: bool = False,
+ started: bool = False,
+ stopped: bool = False,
+ replace: bool = False,
+ force: bool = False,
+ hostname: Optional[str] = None,
+ zap: bool = False):
+ # the ID of the OSD
+ self.osd_id = osd_id
+
+ # when did process (not the actual draining) start
+ self.process_started_at = process_started_at
+
+ # when did the drain start
+ self.drain_started_at = drain_started_at
+
+ # when did the drain stop
+ self.drain_stopped_at = drain_stopped_at
+
+ # when did the drain finish
+ self.drain_done_at = drain_done_at
+
+ # did the draining start
+ self.draining = draining
+
+ # was the operation started
+ self.started = started
+
+ # was the operation stopped
+ self.stopped = stopped
+
+ # If this is a replace or remove operation
+ self.replace = replace
+ # If we wait for the osd to be drained
+ self.force = force
+ # The name of the node
+ self.hostname = hostname
+
+ # mgr obj to make mgr/mon calls
+ self.rm_util: RemoveUtil = remove_util
+
+ self.original_weight: Optional[float] = None
+
+ # Whether devices associated with the OSD should be zapped (DATA ERASED)
+ self.zap = zap
+
+ def start(self) -> None:
+ if self.started:
+ logger.debug(f"Already started draining {self}")
+ return None
+ self.started = True
+ self.stopped = False
+
+ def start_draining(self) -> bool:
+ if self.stopped:
+ logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
+ return False
+ if self.replace:
+ self.rm_util.set_osd_flag([self], 'out')
+ else:
+ self.original_weight = self.rm_util.get_weight(self)
+ self.rm_util.reweight_osd(self, 0.0)
+ self.drain_started_at = datetime.utcnow()
+ self.draining = True
+ logger.debug(f"Started draining {self}.")
+ return True
+
+ def stop_draining(self) -> bool:
+ if self.replace:
+ self.rm_util.set_osd_flag([self], 'in')
+ else:
+ if self.original_weight:
+ self.rm_util.reweight_osd(self, self.original_weight)
+ self.drain_stopped_at = datetime.utcnow()
+ self.draining = False
+ logger.debug(f"Stopped draining {self}.")
+ return True
+
+ def stop(self) -> None:
+ if self.stopped:
+ logger.debug(f"Already stopped draining {self}")
+ return None
+ self.started = False
+ self.stopped = True
+ self.stop_draining()
+
+ @property
+ def is_draining(self) -> bool:
+ """
+ Consider an OSD draining when it is
+ actively draining but not yet empty
+ """
+ return self.draining and not self.is_empty
+
+ @property
+ def is_ok_to_stop(self) -> bool:
+ return self.rm_util.ok_to_stop([self])
+
+ @property
+ def is_empty(self) -> bool:
+ if self.get_pg_count() == 0:
+ if not self.drain_done_at:
+ self.drain_done_at = datetime.utcnow()
+ self.draining = False
+ return True
+ return False
+
+ def safe_to_destroy(self) -> bool:
+ return self.rm_util.safe_to_destroy([self.osd_id])
+
+ def down(self) -> bool:
+ return self.rm_util.set_osd_flag([self], 'down')
+
+ def destroy(self) -> bool:
+ return self.rm_util.destroy_osd(self.osd_id)
+
+ def do_zap(self) -> str:
+ return self.rm_util.zap_osd(self)
+
+ def purge(self) -> bool:
+ return self.rm_util.purge_osd(self.osd_id)
+
+ def get_pg_count(self) -> int:
+ return self.rm_util.get_pg_count(self.osd_id)
+
+ @property
+ def exists(self) -> bool:
+ return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
+
+ def drain_status_human(self) -> str:
+ default_status = 'not started'
+ status = 'started' if self.started and not self.draining else default_status
+ status = 'draining' if self.draining else status
+ status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
+ return status
+
+ def pg_count_str(self) -> str:
+ return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
+
+ def to_json(self) -> dict:
+ out: Dict[str, Any] = dict()
+ out['osd_id'] = self.osd_id
+ out['started'] = self.started
+ out['draining'] = self.draining
+ out['stopped'] = self.stopped
+ out['replace'] = self.replace
+ out['force'] = self.force
+ out['zap'] = self.zap
+ out['hostname'] = self.hostname # type: ignore
+
+ for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
+ if getattr(self, k):
+ out[k] = datetime_to_str(getattr(self, k))
+ else:
+ out[k] = getattr(self, k)
+ return out
+
+ @classmethod
+ def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
+ if not inp:
+ return None
+ for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
+ if inp.get(date_field):
+ inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
+ inp.update({'remove_util': rm_util})
+ if 'nodename' in inp:
+ hostname = inp.pop('nodename')
+ inp['hostname'] = hostname
+ return cls(**inp)
+
+ def __hash__(self) -> int:
+ return hash(self.osd_id)
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, OSD):
+ return NotImplemented
+ return self.osd_id == other.osd_id
+
+ def __repr__(self) -> str:
+ return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
+
+
+class OSDRemovalQueue(object):
+
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
+ self.osds: Set[OSD] = set()
+ self.rm_util = RemoveUtil(mgr)
+
+ # locks multithreaded access to self.osds. Please avoid locking
+ # network calls, like mon commands.
+ self.lock = Lock()
+
+ def process_removal_queue(self) -> None:
+ """
+ Performs actions in the _serve() loop to remove an OSD
+ when criteria is met.
+
+ we can't hold self.lock, as we're calling _remove_daemon in the loop
+ """
+
+ # make sure that we don't run on OSDs that are not in the cluster anymore.
+ self.cleanup()
+
+ # find osds that are ok-to-stop and not yet draining
+ ready_to_drain_osds = self._ready_to_drain_osds()
+ if ready_to_drain_osds:
+ # start draining those
+ _ = [osd.start_draining() for osd in ready_to_drain_osds]
+
+ all_osds = self.all_osds()
+
+ logger.debug(
+ f"{self.queue_size()} OSDs are scheduled "
+ f"for removal: {all_osds}")
+
+ # Check all osds for their state and take action (remove, purge etc)
+ new_queue: Set[OSD] = set()
+ for osd in all_osds: # type: OSD
+ if not osd.force:
+ # skip criteria
+ if not osd.is_empty:
+ logger.debug(f"{osd} is not empty yet. Waiting a bit more")
+ new_queue.add(osd)
+ continue
+
+ if not osd.safe_to_destroy():
+ logger.debug(
+ f"{osd} is not safe-to-destroy yet. Waiting a bit more")
+ new_queue.add(osd)
+ continue
+
+ # abort criteria
+ if not osd.down():
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(
+ f"Could not mark {osd} down")
+
+ # stop and remove daemon
+ assert osd.hostname is not None
+
+ if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
+ CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
+ logger.info(f"Successfully removed {osd} on {osd.hostname}")
+ else:
+ logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
+
+ if osd.replace:
+ # mark destroyed in osdmap
+ if not osd.destroy():
+ raise orchestrator.OrchestratorError(
+ f"Could not destroy {osd}")
+ logger.info(
+ f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
+ else:
+ # purge from osdmap
+ if not osd.purge():
+ raise orchestrator.OrchestratorError(f"Could not purge {osd}")
+ logger.info(f"Successfully purged {osd} on {osd.hostname}")
+
+ if osd.zap:
+ # throws an exception if the zap fails
+ logger.info(f"Zapping devices for {osd} on {osd.hostname}")
+ osd.do_zap()
+ logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
+
+ logger.debug(f"Removing {osd} from the queue.")
+
+ # self could change while this is processing (osds get added from the CLI)
+ # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
+ # osds that were added while this method was executed'
+ with self.lock:
+ self.osds.intersection_update(new_queue)
+ self._save_to_store()
+
+ def cleanup(self) -> None:
+ # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
+ with self.lock:
+ for osd in self._not_in_cluster():
+ self.osds.remove(osd)
+
+ def _ready_to_drain_osds(self) -> List["OSD"]:
+ """
+ Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
+ be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
+ that are already draining.
+ """
+ draining_limit = max(1, self.mgr.max_osd_draining_count)
+ num_already_draining = len(self.draining_osds())
+ num_to_start_draining = max(0, draining_limit - num_already_draining)
+ stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
+ return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
+
+ def _save_to_store(self) -> None:
+ osd_queue = [osd.to_json() for osd in self.osds]
+ logger.debug(f"Saving {osd_queue} to store")
+ self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
+
+ def load_from_store(self) -> None:
+ with self.lock:
+ for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+ for osd in json.loads(v):
+ logger.debug(f"Loading osd ->{osd} from store")
+ osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
+ if osd_obj is not None:
+ self.osds.add(osd_obj)
+
+ def as_osd_ids(self) -> List[int]:
+ with self.lock:
+ return [osd.osd_id for osd in self.osds]
+
+ def queue_size(self) -> int:
+ with self.lock:
+ return len(self.osds)
+
+ def draining_osds(self) -> List["OSD"]:
+ with self.lock:
+ return [osd for osd in self.osds if osd.is_draining]
+
+ def idling_osds(self) -> List["OSD"]:
+ with self.lock:
+ return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
+
+ def empty_osds(self) -> List["OSD"]:
+ with self.lock:
+ return [osd for osd in self.osds if osd.is_empty]
+
+ def all_osds(self) -> List["OSD"]:
+ with self.lock:
+ return [osd for osd in self.osds]
+
+ def _not_in_cluster(self) -> List["OSD"]:
+ return [osd for osd in self.osds if not osd.exists]
+
+ def enqueue(self, osd: "OSD") -> None:
+ if not osd.exists:
+ raise NotFoundError()
+ with self.lock:
+ self.osds.add(osd)
+ osd.start()
+
+ def rm(self, osd: "OSD") -> None:
+ if not osd.exists:
+ raise NotFoundError()
+ osd.stop()
+ with self.lock:
+ try:
+ logger.debug(f'Removing {osd} from the queue.')
+ self.osds.remove(osd)
+ except KeyError:
+ logger.debug(f"Could not find {osd} in queue.")
+ raise KeyError
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, OSDRemovalQueue):
+ return False
+ with self.lock:
+ return self.osds == other.osds