summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/services/cephadmservice.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/cephadm/services/cephadmservice.py1043
1 files changed, 1043 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py
new file mode 100644
index 000000000..92d293fcd
--- /dev/null
+++ b/src/pybind/mgr/cephadm/services/cephadmservice.py
@@ -0,0 +1,1043 @@
+import errno
+import json
+import logging
+import re
+import socket
+import time
+from abc import ABCMeta, abstractmethod
+from typing import TYPE_CHECKING, List, Callable, TypeVar, \
+ Optional, Dict, Any, Tuple, NewType, cast
+
+from mgr_module import HandleCommandResult, MonCommandFailed
+
+from ceph.deployment.service_spec import ServiceSpec, RGWSpec
+from ceph.deployment.utils import is_ipv6, unwrap_ipv6
+from mgr_util import build_url
+from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
+from orchestrator._interface import daemon_type_to_service
+from cephadm import utils
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
+AuthEntity = NewType('AuthEntity', str)
+
+
+class CephadmDaemonDeploySpec:
+ # typing.NamedTuple + Generic is broken in py36
+ def __init__(self, host: str, daemon_id: str,
+ service_name: str,
+ network: Optional[str] = None,
+ keyring: Optional[str] = None,
+ extra_args: Optional[List[str]] = None,
+ ceph_conf: str = '',
+ extra_files: Optional[Dict[str, Any]] = None,
+ daemon_type: Optional[str] = None,
+ ip: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None):
+ """
+ A data struction to encapsulate `cephadm deploy ...
+ """
+ self.host: str = host
+ self.daemon_id = daemon_id
+ self.service_name = service_name
+ daemon_type = daemon_type or (service_name.split('.')[0])
+ assert daemon_type is not None
+ self.daemon_type: str = daemon_type
+
+ # mons
+ self.network = network
+
+ # for run_cephadm.
+ self.keyring: Optional[str] = keyring
+
+ # For run_cephadm. Would be great to have more expressive names.
+ self.extra_args: List[str] = extra_args or []
+
+ self.ceph_conf = ceph_conf
+ self.extra_files = extra_files or {}
+
+ # TCP ports used by the daemon
+ self.ports: List[int] = ports or []
+ self.ip: Optional[str] = ip
+
+ # values to be populated during generate_config calls
+ # and then used in _run_cephadm
+ self.final_config: Dict[str, Any] = {}
+ self.deps: List[str] = []
+
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
+
+ self.extra_container_args = extra_container_args
+
+ def name(self) -> str:
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def config_get_files(self) -> Dict[str, Any]:
+ files = self.extra_files
+ if self.ceph_conf:
+ files['config'] = self.ceph_conf
+
+ return files
+
+ @staticmethod
+ def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
+ assert dd.hostname
+ assert dd.daemon_id
+ assert dd.daemon_type
+ return CephadmDaemonDeploySpec(
+ host=dd.hostname,
+ daemon_id=dd.daemon_id,
+ daemon_type=dd.daemon_type,
+ service_name=dd.service_name(),
+ ip=dd.ip,
+ ports=dd.ports,
+ rank=dd.rank,
+ rank_generation=dd.rank_generation,
+ extra_container_args=dd.extra_container_args,
+ )
+
+ def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
+ return DaemonDescription(
+ daemon_type=self.daemon_type,
+ daemon_id=self.daemon_id,
+ service_name=self.service_name,
+ hostname=self.host,
+ status=status,
+ status_desc=status_desc,
+ ip=self.ip,
+ ports=self.ports,
+ rank=self.rank,
+ rank_generation=self.rank_generation,
+ extra_container_args=self.extra_container_args,
+ )
+
+
+class CephadmService(metaclass=ABCMeta):
+ """
+ Base class for service types. Often providing a create() and config() fn.
+ """
+
+ @property
+ @abstractmethod
+ def TYPE(self) -> str:
+ pass
+
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr: "CephadmOrchestrator" = mgr
+
+ def allow_colo(self) -> bool:
+ """
+ Return True if multiple daemons of the same type can colocate on
+ the same host.
+ """
+ return False
+
+ def primary_daemon_type(self) -> str:
+ """
+ This is the type of the primary (usually only) daemon to be deployed.
+ """
+ return self.TYPE
+
+ def per_host_daemon_type(self) -> Optional[str]:
+ """
+ If defined, this type of daemon will be deployed once for each host
+ containing one or more daemons of the primary type.
+ """
+ return None
+
+ def ranked(self) -> bool:
+ """
+ If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+ generation (0, 1, ...) to each daemon we create/deploy.
+ """
+ return False
+
+ def fence_old_ranks(self,
+ spec: ServiceSpec,
+ rank_map: Dict[int, Dict[int, Optional[str]]],
+ num_ranks: int) -> None:
+ assert False
+
+ def make_daemon_spec(
+ self,
+ host: str,
+ daemon_id: str,
+ network: str,
+ spec: ServiceSpecs,
+ daemon_type: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ ip: Optional[str] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ ) -> CephadmDaemonDeploySpec:
+ try:
+ eca = spec.extra_container_args
+ except AttributeError:
+ eca = None
+ return CephadmDaemonDeploySpec(
+ host=host,
+ daemon_id=daemon_id,
+ service_name=spec.service_name(),
+ network=network,
+ daemon_type=daemon_type,
+ ports=ports,
+ ip=ip,
+ rank=rank,
+ rank_generation=rank_generation,
+ extra_container_args=eca,
+ )
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ raise NotImplementedError()
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ raise NotImplementedError()
+
+ def config(self, spec: ServiceSpec) -> None:
+ """
+ Configure the cluster for this service. Only called *once* per
+ service apply. Not for every daemon.
+ """
+ pass
+
+ def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
+ """The post actions needed to be done after daemons are checked"""
+ if self.mgr.config_dashboard:
+ if 'dashboard' in self.mgr.get('mgr_map')['modules']:
+ self.config_dashboard(daemon_descrs)
+ else:
+ logger.debug('Dashboard is not enabled. Skip configuration.')
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ """Config dashboard settings."""
+ raise NotImplementedError()
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ # if this is called for a service type where it hasn't explcitly been
+ # defined, return empty Daemon Desc
+ return DaemonDescription()
+
+ def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
+ ret, keyring, err = self.mgr.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': entity,
+ 'caps': caps,
+ })
+ if err:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth caps',
+ 'entity': entity,
+ 'caps': caps,
+ })
+ if err:
+ self.mgr.log.warning(f"Unable to update caps for {entity}")
+ return keyring
+
+ def _inventory_get_fqdn(self, hostname: str) -> str:
+ """Get a host's FQDN with its hostname.
+
+ If the FQDN can't be resolved, the address from the inventory will
+ be returned instead.
+ """
+ addr = self.mgr.inventory.get_addr(hostname)
+ return socket.getfqdn(addr)
+
+ def _set_service_url_on_dashboard(self,
+ service_name: str,
+ get_mon_cmd: str,
+ set_mon_cmd: str,
+ service_url: str) -> None:
+ """A helper to get and set service_url via Dashboard's MON command.
+
+ If result of get_mon_cmd differs from service_url, set_mon_cmd will
+ be sent to set the service_url.
+ """
+ def get_set_cmd_dicts(out: str) -> List[dict]:
+ cmd_dict = {
+ 'prefix': set_mon_cmd,
+ 'value': service_url
+ }
+ return [cmd_dict] if service_url != out else []
+
+ self._check_and_set_dashboard(
+ service_name=service_name,
+ get_cmd=get_mon_cmd,
+ get_set_cmd_dicts=get_set_cmd_dicts
+ )
+
+ def _check_and_set_dashboard(self,
+ service_name: str,
+ get_cmd: str,
+ get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
+ """A helper to set configs in the Dashboard.
+
+ The method is useful for the pattern:
+ - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
+ gateways.
+ - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
+ - Determine if the config need to be update. NOTE: This step is important because if a
+ Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
+ kicks the serve() loop and the logic using this method is likely to be called again.
+ A config should be updated only when needed.
+ - Update a config in Dashboard by using a Dashboard command.
+
+ :param service_name: the service name to be used for logging
+ :type service_name: str
+ :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
+ :type get_cmd: str
+ :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
+ e.g.
+ [
+ {
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'service_url': 'http://admin:admin@aaa:5000',
+ 'name': 'aaa'
+ },
+ {
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'service_url': 'http://admin:admin@bbb:5000',
+ 'name': 'bbb'
+ }
+ ]
+ The function should return empty list if no command need to be sent.
+ :type get_set_cmd_dicts: Callable[[str], List[dict]]
+ """
+
+ try:
+ _, out, _ = self.mgr.check_mon_command({
+ 'prefix': get_cmd
+ })
+ except MonCommandFailed as e:
+ logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
+ return
+ cmd_dicts = get_set_cmd_dicts(out.strip())
+ for cmd_dict in list(cmd_dicts):
+ try:
+ inbuf = cmd_dict.pop('inbuf', None)
+ _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
+ except MonCommandFailed as e:
+ logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
+
+ def ok_to_stop_osd(
+ self,
+ osds: List[str],
+ known: Optional[List[str]] = None, # output argument
+ force: bool = False) -> HandleCommandResult:
+ r = HandleCommandResult(*self.mgr.mon_command({
+ 'prefix': "osd ok-to-stop",
+ 'ids': osds,
+ 'max': 16,
+ }))
+ j = None
+ try:
+ j = json.loads(r.stdout)
+ except json.decoder.JSONDecodeError:
+ self.mgr.log.warning("osd ok-to-stop didn't return structured result")
+ raise
+ if r.retval:
+ return r
+ if known is not None and j and j.get('ok_to_stop'):
+ self.mgr.log.debug(f"got {j}")
+ known.extend([f'osd.{x}' for x in j.get('osds', [])])
+ return HandleCommandResult(
+ 0,
+ f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
+ ''
+ )
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+ out = f'It appears safe to stop {",".join(names)}'
+ err = f'It is NOT safe to stop {",".join(names)} at this time'
+
+ if self.TYPE not in ['mon', 'osd', 'mds']:
+ logger.debug(out)
+ return HandleCommandResult(0, out)
+
+ if self.TYPE == 'osd':
+ return self.ok_to_stop_osd(daemon_ids, known, force)
+
+ r = HandleCommandResult(*self.mgr.mon_command({
+ 'prefix': f'{self.TYPE} ok-to-stop',
+ 'ids': daemon_ids,
+ }))
+
+ if r.retval:
+ err = f'{err}: {r.stderr}' if r.stderr else err
+ logger.debug(err)
+ return HandleCommandResult(r.retval, r.stdout, err)
+
+ out = f'{out}: {r.stdout}' if r.stdout else out
+ logger.debug(out)
+ return HandleCommandResult(r.retval, out, r.stderr)
+
+ def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
+ # Provides a warning about if it possible or not to stop <n> daemons in a service
+ names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
+ number_of_running_daemons = len(
+ [daemon
+ for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
+ if daemon.status == DaemonDescriptionStatus.running])
+ if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
+ return False, f'It is presumed safe to stop {names}'
+
+ num_daemons_left = number_of_running_daemons - len(daemon_ids)
+
+ def plural(count: int) -> str:
+ return 'daemon' if count == 1 else 'daemons'
+
+ left_count = "no" if num_daemons_left == 0 else num_daemons_left
+
+ if alert:
+ out = (f'ALERT: Cannot stop {names} in {service} service. '
+ f'Not enough remaining {service} daemons. '
+ f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
+ else:
+ out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
+ f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
+ f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
+ return True, out
+
+ def pre_remove(self, daemon: DaemonDescription) -> None:
+ """
+ Called before the daemon is removed.
+ """
+ assert daemon.daemon_type is not None
+ assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
+ logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ """
+ Called after the daemon is removed.
+ """
+ assert daemon.daemon_type is not None
+ assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
+ logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+
+ def purge(self, service_name: str) -> None:
+ """Called to carry out any purge tasks following service removal"""
+ logger.debug(f'Purge called for {self.TYPE} - no action taken')
+
+
+class CephService(CephadmService):
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ # Ceph.daemons (mon, mgr, mds, osd, etc)
+ cephadm_config = self.get_config_and_keyring(
+ daemon_spec.daemon_type,
+ daemon_spec.daemon_id,
+ host=daemon_spec.host,
+ keyring=daemon_spec.keyring,
+ extra_ceph_config=daemon_spec.ceph_conf)
+
+ if daemon_spec.config_get_files():
+ cephadm_config.update({'files': daemon_spec.config_get_files()})
+
+ return cephadm_config, []
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ self.remove_keyring(daemon)
+
+ def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
+ """
+ Map the daemon id to a cephx keyring entity name
+ """
+ # despite this mapping entity names to daemons, self.TYPE within
+ # the CephService class refers to service types, not daemon types
+ if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
+ return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
+ elif self.TYPE == 'crash':
+ if host == "":
+ raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+ return AuthEntity(f'client.{self.TYPE}.{host}')
+ elif self.TYPE == 'mon':
+ return AuthEntity('mon.')
+ elif self.TYPE in ['mgr', 'osd', 'mds']:
+ return AuthEntity(f'{self.TYPE}.{daemon_id}')
+ else:
+ raise OrchestratorError("unknown daemon type")
+
+ def get_config_and_keyring(self,
+ daemon_type: str,
+ daemon_id: str,
+ host: str,
+ keyring: Optional[str] = None,
+ extra_ceph_config: Optional[str] = None
+ ) -> Dict[str, Any]:
+ # keyring
+ if not keyring:
+ entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get',
+ 'entity': entity,
+ })
+
+ config = self.mgr.get_minimal_ceph_conf()
+
+ if extra_ceph_config:
+ config += extra_ceph_config
+
+ return {
+ 'config': config,
+ 'keyring': keyring,
+ }
+
+ def remove_keyring(self, daemon: DaemonDescription) -> None:
+ assert daemon.daemon_id is not None
+ assert daemon.hostname is not None
+ daemon_id: str = daemon.daemon_id
+ host: str = daemon.hostname
+
+ assert daemon.daemon_type != 'mon'
+
+ entity = self.get_auth_entity(daemon_id, host=host)
+
+ logger.info(f'Removing key for {entity}')
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+
+
+class MonService(CephService):
+ TYPE = 'mon'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ """
+ Create a new monitor on the given host.
+ """
+ assert self.TYPE == daemon_spec.daemon_type
+ name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+
+ # get mon. key
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get',
+ 'entity': self.get_auth_entity(name),
+ })
+
+ extra_config = '[mon.%s]\n' % name
+ if network:
+ # infer whether this is a CIDR network, addrvec, or plain IP
+ if '/' in network:
+ extra_config += 'public network = %s\n' % network
+ elif network.startswith('[v') and network.endswith(']'):
+ extra_config += 'public addrv = %s\n' % network
+ elif is_ipv6(network):
+ extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
+ elif ':' not in network:
+ extra_config += 'public addr = %s\n' % network
+ else:
+ raise OrchestratorError(
+ 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
+ else:
+ # try to get the public_network from the config
+ ret, network, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'mon',
+ 'key': 'public_network',
+ })
+ network = network.strip() if network else network
+ if not network:
+ raise OrchestratorError(
+ 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
+ if '/' not in network:
+ raise OrchestratorError(
+ 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
+ extra_config += 'public network = %s\n' % network
+
+ daemon_spec.ceph_conf = extra_config
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def _check_safe_to_destroy(self, mon_id: str) -> None:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'quorum_status',
+ })
+ try:
+ j = json.loads(out)
+ except Exception:
+ raise OrchestratorError('failed to parse quorum status')
+
+ mons = [m['name'] for m in j['monmap']['mons']]
+ if mon_id not in mons:
+ logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
+ mon_id, mons))
+ return
+ new_mons = [m for m in mons if m != mon_id]
+ new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+ if len(new_quorum) > len(new_mons) / 2:
+ logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
+ (mon_id, new_quorum, new_mons))
+ return
+ raise OrchestratorError(
+ 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
+
+ def pre_remove(self, daemon: DaemonDescription) -> None:
+ super().pre_remove(daemon)
+
+ assert daemon.daemon_id is not None
+ daemon_id: str = daemon.daemon_id
+ self._check_safe_to_destroy(daemon_id)
+
+ # remove mon from quorum before we destroy the daemon
+ logger.info('Removing monitor %s from monmap...' % daemon_id)
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mon rm',
+ 'name': daemon_id,
+ })
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ # Do not remove the mon keyring.
+ # super().post_remove(daemon)
+ pass
+
+
+class MgrService(CephService):
+ TYPE = 'mgr'
+
+ def allow_colo(self) -> bool:
+ if self.mgr.get_ceph_option('mgr_standby_modules'):
+ # traditional mgr mode: standby daemons' modules listen on
+ # ports and redirect to the primary. we must not schedule
+ # multiple mgrs on the same host or else ports will
+ # conflict.
+ return False
+ else:
+ # standby daemons do nothing, and therefore port conflicts
+ # are not a concern.
+ return True
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ """
+ Create a new manager instance on a host.
+ """
+ assert self.TYPE == daemon_spec.daemon_type
+ mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
+
+ # get mgr. key
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
+ ['mon', 'profile mgr',
+ 'osd', 'allow *',
+ 'mds', 'allow *'])
+
+ # Retrieve ports used by manager modules
+ # In the case of the dashboard port and with several manager daemons
+ # running in different hosts, it exists the possibility that the
+ # user has decided to use different dashboard ports in each server
+ # If this is the case then the dashboard port opened will be only the used
+ # as default.
+ ports = []
+ ret, mgr_services, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr services',
+ })
+ if mgr_services:
+ mgr_endpoints = json.loads(mgr_services)
+ for end_point in mgr_endpoints.values():
+ port = re.search(r'\:\d+\/', end_point)
+ if port:
+ ports.append(int(port[0][1:-1]))
+
+ if ports:
+ daemon_spec.ports = ports
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ for daemon in daemon_descrs:
+ assert daemon.daemon_type is not None
+ assert daemon.daemon_id is not None
+ if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
+ return daemon
+ # if no active mgr found, return empty Daemon Desc
+ return DaemonDescription()
+
+ def fail_over(self) -> None:
+ # this has been seen to sometimes transiently fail even when there are multiple
+ # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+ class NoStandbyError(OrchestratorError):
+ pass
+ no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+ 'daemon', 'mgr' + self.mgr.get_mgr_id()))
+ for sleep_secs in [2, 8, 15]:
+ try:
+ if not self.mgr_map_has_standby():
+ raise no_standby_exc
+ self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+ 'INFO', 'Failing over to other MGR')
+ logger.info('Failing over to other MGR')
+
+ # fail over
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr fail',
+ 'who': self.mgr.get_mgr_id(),
+ })
+ return
+ except NoStandbyError:
+ logger.info(
+ f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+ time.sleep(sleep_secs)
+ raise no_standby_exc
+
+ def mgr_map_has_standby(self) -> bool:
+ """
+ This is a bit safer than asking our inventory. If the mgr joined the mgr map,
+ we know it joined the cluster
+ """
+ mgr_map = self.mgr.get('mgr_map')
+ num = len(mgr_map.get('standbys'))
+ return bool(num)
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
+
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+ active = self.get_active_daemon(mgr_daemons).daemon_id
+ if active in daemon_ids:
+ warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ return HandleCommandResult(0, warn_message, '')
+
+
+class MdsService(CephService):
+ TYPE = 'mds'
+
+ def allow_colo(self) -> bool:
+ return True
+
+ def config(self, spec: ServiceSpec) -> None:
+ assert self.TYPE == spec.service_type
+ assert spec.service_id
+
+ # ensure mds_join_fs is set for these daemons
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mds.' + spec.service_id,
+ 'name': 'mds_join_fs',
+ 'value': spec.service_id,
+ })
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
+
+ # get mds. key
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
+ ['mon', 'profile mds',
+ 'osd', 'allow rw tag cephfs *=*',
+ 'mds', 'allow'])
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ active_mds_strs = list()
+ for fs in self.mgr.get('fs_map')['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['state'] == 'up:active':
+ active_mds_strs.append(mds_status['name'])
+ if len(active_mds_strs) != 0:
+ for daemon in daemon_descrs:
+ if daemon.daemon_id in active_mds_strs:
+ return daemon
+ # if no mds found, return empty Daemon Desc
+ return DaemonDescription()
+
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': service_name,
+ 'name': 'mds_join_fs',
+ })
+
+
+class RgwService(CephService):
+ TYPE = 'rgw'
+
+ def allow_colo(self) -> bool:
+ return True
+
+ def config(self, spec: RGWSpec) -> None: # type: ignore
+ assert self.TYPE == spec.service_type
+
+ # set rgw_realm and rgw_zone, if present
+ if spec.rgw_realm:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_realm',
+ 'value': spec.rgw_realm,
+ })
+ if spec.rgw_zone:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_zone',
+ 'value': spec.rgw_zone,
+ })
+
+ if spec.rgw_frontend_ssl_certificate:
+ if isinstance(spec.rgw_frontend_ssl_certificate, list):
+ cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
+ elif isinstance(spec.rgw_frontend_ssl_certificate, str):
+ cert_data = spec.rgw_frontend_ssl_certificate
+ else:
+ raise OrchestratorError(
+ 'Invalid rgw_frontend_ssl_certificate: %s'
+ % spec.rgw_frontend_ssl_certificate)
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config-key set',
+ 'key': f'rgw/cert/{spec.service_name()}',
+ 'val': cert_data,
+ })
+
+ # TODO: fail, if we don't have a spec
+ logger.info('Saving service %s spec with placement %s' % (
+ spec.service_name(), spec.placement.pretty_str()))
+ self.mgr.spec_store.save(spec)
+ self.mgr.trigger_connect_dashboard_rgw()
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
+ spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+
+ keyring = self.get_keyring(rgw_id)
+
+ if daemon_spec.ports:
+ port = daemon_spec.ports[0]
+ else:
+ # this is a redeploy of older instance that doesn't have an explicitly
+ # assigned port, in which case we can assume there is only 1 per host
+ # and it matches the spec.
+ port = spec.get_port()
+
+ # configure frontend
+ args = []
+ ftype = spec.rgw_frontend_type or "beast"
+ if ftype == 'beast':
+ if spec.ssl:
+ if daemon_spec.ip:
+ args.append(
+ f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"ssl_port={port}")
+ args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+ else:
+ if daemon_spec.ip:
+ args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"port={port}")
+ elif ftype == 'civetweb':
+ if spec.ssl:
+ if daemon_spec.ip:
+ # note the 's' suffix on port
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
+ else:
+ args.append(f"port={port}s") # note the 's' suffix on port
+ args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+ else:
+ if daemon_spec.ip:
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"port={port}")
+ frontend = f'{ftype} {" ".join(args)}'
+
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': utils.name_to_config_section(daemon_spec.name()),
+ 'name': 'rgw_frontends',
+ 'value': frontend
+ })
+
+ daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def get_keyring(self, rgw_id: str) -> str:
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
+ ['mon', 'allow *',
+ 'mgr', 'allow rw',
+ 'osd', 'allow rwx tag rgw *=*'])
+ return keyring
+
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(service_name),
+ 'name': 'rgw_realm',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(service_name),
+ 'name': 'rgw_zone',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config-key rm',
+ 'key': f'rgw/cert/{service_name}',
+ })
+ self.mgr.trigger_connect_dashboard_rgw()
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_frontends',
+ })
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
+ # if no load balancer, warn if > 1 daemon, block if only 1 daemon
+ def ingress_present() -> bool:
+ running_ingress_daemons = [
+ daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
+ running_haproxy_daemons = [
+ daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
+ running_keepalived_daemons = [
+ daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
+ # check that there is at least one haproxy and keepalived daemon running
+ if running_haproxy_daemons and running_keepalived_daemons:
+ return True
+ return False
+
+ # if only 1 rgw, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ # if reached here, there is > 1 rgw daemon.
+ # Say okay if load balancer present or force flag set
+ if ingress_present() or force:
+ return HandleCommandResult(0, warn_message, '')
+
+ # if reached here, > 1 RGW daemon, no load balancer and no force flag.
+ # Provide warning
+ warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ self.mgr.trigger_connect_dashboard_rgw()
+
+
+class RbdMirrorService(CephService):
+ TYPE = 'rbd-mirror'
+
+ def allow_colo(self) -> bool:
+ return True
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
+ ['mon', 'profile rbd-mirror',
+ 'osd', 'profile rbd'])
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # if only 1 rbd-mirror, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(
+ self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ return HandleCommandResult(0, warn_message, '')
+
+
+class CrashService(CephService):
+ TYPE = 'crash'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
+ ['mon', 'profile crash',
+ 'mgr', 'profile crash'])
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+
+class CephfsMirrorService(CephService):
+ TYPE = 'cephfs-mirror'
+
+ def config(self, spec: ServiceSpec) -> None:
+ # make sure mirroring module is enabled
+ mgr_map = self.mgr.get('mgr_map')
+ mod_name = 'mirroring'
+ if mod_name not in mgr_map.get('services', {}):
+ self.mgr.check_mon_command({
+ 'prefix': 'mgr module enable',
+ 'module': mod_name
+ })
+ # we shouldn't get here (mon will tell the mgr to respawn), but no
+ # harm done if we do.
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+
+ ret, keyring, err = self.mgr.check_mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': self.get_auth_entity(daemon_spec.daemon_id),
+ 'caps': ['mon', 'profile cephfs-mirror',
+ 'mds', 'allow r',
+ 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
+ 'mgr', 'allow r'],
+ })
+
+ daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec