summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/module.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/cephadm/module.py
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/cephadm/module.py')
-rw-r--r--src/pybind/mgr/cephadm/module.py2974
1 files changed, 2974 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py
new file mode 100644
index 000000000..9fc4298a8
--- /dev/null
+++ b/src/pybind/mgr/cephadm/module.py
@@ -0,0 +1,2974 @@
+import json
+import errno
+import ipaddress
+import logging
+import re
+import shlex
+from collections import defaultdict
+from configparser import ConfigParser
+from functools import wraps
+from tempfile import TemporaryDirectory
+from threading import Event
+
+import string
+from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
+ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type
+
+import datetime
+import os
+import random
+import tempfile
+import multiprocessing.pool
+import subprocess
+from prettytable import PrettyTable
+
+from ceph.deployment import inventory
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.service_spec import \
+ ServiceSpec, PlacementSpec, \
+ HostPlacementSpec, IngressSpec, IscsiServiceSpec
+from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
+from cephadm.serve import CephadmServe
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
+
+from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
+from mgr_util import create_self_signed_cert
+import secrets
+import orchestrator
+from orchestrator.module import to_format, Format
+
+from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
+ CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
+ service_to_daemon_types
+from orchestrator._interface import GenericSpec
+from orchestrator._interface import daemon_type_to_service
+
+from . import remotes
+from . import utils
+from .migrations import Migrations
+from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
+ RbdMirrorService, CrashService, CephadmService, CephfsMirrorService
+from .services.ingress import IngressService
+from .services.container import CustomContainerService
+from .services.iscsi import IscsiService
+from .services.nfs import NFSService
+from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
+from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
+ NodeExporterService, SNMPGatewayService
+from .services.exporter import CephadmExporter, CephadmExporterConfig
+from .schedule import HostAssignment
+from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
+from .upgrade import CephadmUpgrade
+from .template import TemplateMgr
+from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
+ cephadmNoImage, CEPH_UPGRADE_ORDER
+from .configchecks import CephadmConfigChecks
+from .offline_watcher import OfflineHostWatcher
+
+try:
+ import remoto
+ # NOTE(mattoliverau) Patch remoto until remoto PR
+ # (https://github.com/alfredodeza/remoto/pull/56) lands
+ from distutils.version import StrictVersion
+ if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
+ def remoto_has_connection(self: Any) -> bool:
+ return self.gateway.hasreceiver()
+
+ from remoto.backends import BaseConnection
+ BaseConnection.has_connection = remoto_has_connection
+ import remoto.process
+except ImportError as e:
+ remoto = None
+ remoto_import_error = str(e)
+
+logger = logging.getLogger(__name__)
+
+T = TypeVar('T')
+
+DEFAULT_SSH_CONFIG = """
+Host *
+ User root
+ StrictHostKeyChecking no
+ UserKnownHostsFile /dev/null
+ ConnectTimeout=30
+"""
+
+# Default container images -----------------------------------------------------
+DEFAULT_IMAGE = 'quay.io/ceph/ceph'
+DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.33.4'
+DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.3.1'
+DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.23.0'
+DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:8.3.5'
+DEFAULT_HAPROXY_IMAGE = 'docker.io/library/haproxy:2.3'
+DEFAULT_KEEPALIVED_IMAGE = 'docker.io/arcts/keepalived'
+DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
+# ------------------------------------------------------------------------------
+
+
+def service_inactive(spec_name: str) -> Callable:
+ def inner(func: Callable) -> Callable:
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ obj = args[0]
+ if obj.get_store(f"spec.{spec_name}") is not None:
+ return 1, "", f"Unable to change configuration of an active service {spec_name}"
+ return func(*args, **kwargs)
+ return wrapper
+ return inner
+
+
+def host_exists(hostname_position: int = 1) -> Callable:
+ """Check that a hostname exists in the inventory"""
+ def inner(func: Callable) -> Callable:
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ this = args[0] # self object
+ hostname = args[hostname_position]
+ if hostname not in this.cache.get_hosts():
+ candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
+ help_msg = f"Did you mean {candidates}?" if candidates else ""
+ raise OrchestratorError(
+ f"Cannot find host '{hostname}' in the inventory. {help_msg}")
+
+ return func(*args, **kwargs)
+ return wrapper
+ return inner
+
+
+class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
+ metaclass=CLICommandMeta):
+
+ _STORE_HOST_PREFIX = "host"
+
+ instance = None
+ NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary]
+ NATIVE_OPTIONS = [] # type: List[Any]
+ MODULE_OPTIONS = [
+ Option(
+ 'ssh_config_file',
+ type='str',
+ default=None,
+ desc='customized SSH config file to connect to managed hosts',
+ ),
+ Option(
+ 'device_cache_timeout',
+ type='secs',
+ default=30 * 60,
+ desc='seconds to cache device inventory',
+ ),
+ Option(
+ 'device_enhanced_scan',
+ type='bool',
+ default=False,
+ desc='Use libstoragemgmt during device scans',
+ ),
+ Option(
+ 'daemon_cache_timeout',
+ type='secs',
+ default=10 * 60,
+ desc='seconds to cache service (daemon) inventory',
+ ),
+ Option(
+ 'facts_cache_timeout',
+ type='secs',
+ default=1 * 60,
+ desc='seconds to cache host facts data',
+ ),
+ Option(
+ 'host_check_interval',
+ type='secs',
+ default=10 * 60,
+ desc='how frequently to perform a host check',
+ ),
+ Option(
+ 'mode',
+ type='str',
+ enum_allowed=['root', 'cephadm-package'],
+ default='root',
+ desc='mode for remote execution of cephadm',
+ ),
+ Option(
+ 'container_image_base',
+ default=DEFAULT_IMAGE,
+ desc='Container image name, without the tag',
+ runtime=True,
+ ),
+ Option(
+ 'container_image_prometheus',
+ default=DEFAULT_PROMETHEUS_IMAGE,
+ desc='Prometheus container image',
+ ),
+ Option(
+ 'container_image_grafana',
+ default=DEFAULT_GRAFANA_IMAGE,
+ desc='Prometheus container image',
+ ),
+ Option(
+ 'container_image_alertmanager',
+ default=DEFAULT_ALERT_MANAGER_IMAGE,
+ desc='Prometheus container image',
+ ),
+ Option(
+ 'container_image_node_exporter',
+ default=DEFAULT_NODE_EXPORTER_IMAGE,
+ desc='Prometheus container image',
+ ),
+ Option(
+ 'container_image_haproxy',
+ default=DEFAULT_HAPROXY_IMAGE,
+ desc='HAproxy container image',
+ ),
+ Option(
+ 'container_image_keepalived',
+ default=DEFAULT_KEEPALIVED_IMAGE,
+ desc='Keepalived container image',
+ ),
+ Option(
+ 'container_image_snmp_gateway',
+ default=DEFAULT_SNMP_GATEWAY_IMAGE,
+ desc='SNMP Gateway container image',
+ ),
+ Option(
+ 'warn_on_stray_hosts',
+ type='bool',
+ default=True,
+ desc='raise a health warning if daemons are detected on a host '
+ 'that is not managed by cephadm',
+ ),
+ Option(
+ 'warn_on_stray_daemons',
+ type='bool',
+ default=True,
+ desc='raise a health warning if daemons are detected '
+ 'that are not managed by cephadm',
+ ),
+ Option(
+ 'warn_on_failed_host_check',
+ type='bool',
+ default=True,
+ desc='raise a health warning if the host check fails',
+ ),
+ Option(
+ 'log_to_cluster',
+ type='bool',
+ default=True,
+ desc='log to the "cephadm" cluster log channel"',
+ ),
+ Option(
+ 'allow_ptrace',
+ type='bool',
+ default=False,
+ desc='allow SYS_PTRACE capability on ceph containers',
+ long_desc='The SYS_PTRACE capability is needed to attach to a '
+ 'process with gdb or strace. Enabling this options '
+ 'can allow debugging daemons that encounter problems '
+ 'at runtime.',
+ ),
+ Option(
+ 'container_init',
+ type='bool',
+ default=True,
+ desc='Run podman/docker with `--init`'
+ ),
+ Option(
+ 'prometheus_alerts_path',
+ type='str',
+ default='/etc/prometheus/ceph/ceph_default_alerts.yml',
+ desc='location of alerts to include in prometheus deployments',
+ ),
+ Option(
+ 'migration_current',
+ type='int',
+ default=None,
+ desc='internal - do not modify',
+ # used to track track spec and other data migrations.
+ ),
+ Option(
+ 'config_dashboard',
+ type='bool',
+ default=True,
+ desc='manage configs like API endpoints in Dashboard.'
+ ),
+ Option(
+ 'manage_etc_ceph_ceph_conf',
+ type='bool',
+ default=False,
+ desc='Manage and own /etc/ceph/ceph.conf on the hosts.',
+ ),
+ Option(
+ 'manage_etc_ceph_ceph_conf_hosts',
+ type='str',
+ default='*',
+ desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
+ ),
+ # not used anymore
+ Option(
+ 'registry_url',
+ type='str',
+ default=None,
+ desc='Registry url for login purposes. This is not the default registry'
+ ),
+ Option(
+ 'registry_username',
+ type='str',
+ default=None,
+ desc='Custom repository username. Only used for logging into a registry.'
+ ),
+ Option(
+ 'registry_password',
+ type='str',
+ default=None,
+ desc='Custom repository password. Only used for logging into a registry.'
+ ),
+ ####
+ Option(
+ 'registry_insecure',
+ type='bool',
+ default=False,
+ desc='Registry is to be considered insecure (no TLS available). Only for development purposes.'
+ ),
+ Option(
+ 'use_repo_digest',
+ type='bool',
+ default=True,
+ desc='Automatically convert image tags to image digest. Make sure all daemons use the same image',
+ ),
+ Option(
+ 'config_checks_enabled',
+ type='bool',
+ default=False,
+ desc='Enable or disable the cephadm configuration analysis',
+ ),
+ Option(
+ 'default_registry',
+ type='str',
+ default='docker.io',
+ desc='Search-registry to which we should normalize unqualified image names. '
+ 'This is not the default registry',
+ ),
+ Option(
+ 'max_count_per_host',
+ type='int',
+ default=10,
+ desc='max number of daemons per service per host',
+ ),
+ Option(
+ 'autotune_memory_target_ratio',
+ type='float',
+ default=.7,
+ desc='ratio of total system memory to divide amongst autotuned daemons'
+ ),
+ Option(
+ 'autotune_interval',
+ type='secs',
+ default=10 * 60,
+ desc='how frequently to autotune daemon memory'
+ ),
+ Option(
+ 'max_osd_draining_count',
+ type='int',
+ default=10,
+ desc='max number of osds that will be drained simultaneously when osds are removed'
+ ),
+ ]
+
+ def __init__(self, *args: Any, **kwargs: Any):
+ super(CephadmOrchestrator, self).__init__(*args, **kwargs)
+ self._cluster_fsid: str = self.get('mon_map')['fsid']
+ self.last_monmap: Optional[datetime.datetime] = None
+
+ # for serve()
+ self.run = True
+ self.event = Event()
+
+ if self.get_store('pause'):
+ self.paused = True
+ else:
+ self.paused = False
+
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.ssh_config_file = None # type: Optional[str]
+ self.device_cache_timeout = 0
+ self.daemon_cache_timeout = 0
+ self.facts_cache_timeout = 0
+ self.host_check_interval = 0
+ self.max_count_per_host = 0
+ self.mode = ''
+ self.container_image_base = ''
+ self.container_image_prometheus = ''
+ self.container_image_grafana = ''
+ self.container_image_alertmanager = ''
+ self.container_image_node_exporter = ''
+ self.container_image_haproxy = ''
+ self.container_image_keepalived = ''
+ self.container_image_snmp_gateway = ''
+ self.warn_on_stray_hosts = True
+ self.warn_on_stray_daemons = True
+ self.warn_on_failed_host_check = True
+ self.allow_ptrace = False
+ self.container_init = True
+ self.prometheus_alerts_path = ''
+ self.migration_current: Optional[int] = None
+ self.config_dashboard = True
+ self.manage_etc_ceph_ceph_conf = True
+ self.manage_etc_ceph_ceph_conf_hosts = '*'
+ self.registry_url: Optional[str] = None
+ self.registry_username: Optional[str] = None
+ self.registry_password: Optional[str] = None
+ self.registry_insecure: bool = False
+ self.use_repo_digest = True
+ self.default_registry = ''
+ self.autotune_memory_target_ratio = 0.0
+ self.autotune_interval = 0
+ self.apply_spec_fails: List[Tuple[str, str]] = []
+ self.max_osd_draining_count = 10
+ self.device_enhanced_scan = False
+
+ self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
+ remoto.backends.LegacyModuleExecute]] = {}
+
+ self.notify(NotifyType.mon_map, None)
+ self.config_notify()
+
+ path = self.get_ceph_option('cephadm_path')
+ try:
+ assert isinstance(path, str)
+ with open(path, 'r') as f:
+ self._cephadm = f.read()
+ except (IOError, TypeError) as e:
+ raise RuntimeError("unable to read cephadm at '%s': %s" % (
+ path, str(e)))
+
+ self.cephadm_binary_path = self._get_cephadm_binary_path()
+
+ self._worker_pool = multiprocessing.pool.ThreadPool(10)
+
+ self._reconfig_ssh()
+
+ CephadmOrchestrator.instance = self
+
+ self.upgrade = CephadmUpgrade(self)
+
+ self.health_checks: Dict[str, dict] = {}
+
+ self.inventory = Inventory(self)
+
+ self.cache = HostCache(self)
+ self.cache.load()
+
+ self.to_remove_osds = OSDRemovalQueue(self)
+ self.to_remove_osds.load_from_store()
+
+ self.spec_store = SpecStore(self)
+ self.spec_store.load()
+
+ self.keys = ClientKeyringStore(self)
+ self.keys.load()
+
+ # ensure the host lists are in sync
+ for h in self.inventory.keys():
+ if h not in self.cache.daemons:
+ self.cache.prime_empty_host(h)
+ for h in self.cache.get_hosts():
+ if h not in self.inventory:
+ self.cache.rm_host(h)
+
+ # in-memory only.
+ self.events = EventStore(self)
+ self.offline_hosts: Set[str] = set()
+
+ self.migration = Migrations(self)
+
+ _service_clses: Sequence[Type[CephadmService]] = [
+ OSDService, NFSService, MonService, MgrService, MdsService,
+ RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
+ PrometheusService, NodeExporterService, CrashService, IscsiService,
+ IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService,
+ SNMPGatewayService,
+ ]
+
+ # https://github.com/python/mypy/issues/8993
+ self.cephadm_services: Dict[str, CephadmService] = {
+ cls.TYPE: cls(self) for cls in _service_clses} # type: ignore
+
+ self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
+ self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
+ self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
+
+ self.template = TemplateMgr(self)
+
+ self.requires_post_actions: Set[str] = set()
+ self.need_connect_dashboard_rgw = False
+
+ self.config_checker = CephadmConfigChecks(self)
+
+ self.offline_watcher = OfflineHostWatcher(self)
+ self.offline_watcher.start()
+
+ def shutdown(self) -> None:
+ self.log.debug('shutdown')
+ self._worker_pool.close()
+ self._worker_pool.join()
+ self.offline_watcher.shutdown()
+ self.run = False
+ self.event.set()
+
+ def _get_cephadm_service(self, service_type: str) -> CephadmService:
+ assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
+ return self.cephadm_services[service_type]
+
+ def _get_cephadm_binary_path(self) -> str:
+ import hashlib
+ m = hashlib.sha256()
+ m.update(self._cephadm.encode())
+ return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
+
+ def _kick_serve_loop(self) -> None:
+ self.log.debug('_kick_serve_loop')
+ self.event.set()
+
+ def serve(self) -> None:
+ """
+ The main loop of cephadm.
+
+ A command handler will typically change the declarative state
+ of cephadm. This loop will then attempt to apply this new state.
+ """
+ serve = CephadmServe(self)
+ serve.serve()
+
+ def set_container_image(self, entity: str, image: str) -> None:
+ self.check_mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': image,
+ 'who': entity,
+ })
+
+ def config_notify(self) -> None:
+ """
+ This method is called whenever one of our config options is changed.
+
+ TODO: this method should be moved into mgr_module.py
+ """
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'], # type: ignore
+ self.get_module_option(opt['name'])) # type: ignore
+ self.log.debug(' mgr option %s = %s',
+ opt['name'], getattr(self, opt['name'])) # type: ignore
+ for opt in self.NATIVE_OPTIONS:
+ setattr(self,
+ opt, # type: ignore
+ self.get_ceph_option(opt))
+ self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
+
+ self.event.set()
+
+ def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
+ if notify_type == NotifyType.mon_map:
+ # get monmap mtime so we can refresh configs when mons change
+ monmap = self.get('mon_map')
+ self.last_monmap = str_to_datetime(monmap['modified'])
+ if self.last_monmap and self.last_monmap > datetime_now():
+ self.last_monmap = None # just in case clocks are skewed
+ if getattr(self, 'manage_etc_ceph_ceph_conf', False):
+ # getattr, due to notify() being called before config_notify()
+ self._kick_serve_loop()
+ if notify_type == NotifyType.pg_summary:
+ self._trigger_osd_removal()
+
+ def _trigger_osd_removal(self) -> None:
+ remove_queue = self.to_remove_osds.as_osd_ids()
+ if not remove_queue:
+ return
+ data = self.get("osd_stats")
+ for osd in data.get('osd_stats', []):
+ if osd.get('num_pgs') == 0:
+ # if _ANY_ osd that is currently in the queue appears to be empty,
+ # start the removal process
+ if int(osd.get('osd')) in remove_queue:
+ self.log.debug('Found empty osd. Starting removal process')
+ # if the osd that is now empty is also part of the removal queue
+ # start the process
+ self._kick_serve_loop()
+
+ def pause(self) -> None:
+ if not self.paused:
+ self.log.info('Paused')
+ self.set_store('pause', 'true')
+ self.paused = True
+ # wake loop so we update the health status
+ self._kick_serve_loop()
+
+ def resume(self) -> None:
+ if self.paused:
+ self.log.info('Resumed')
+ self.paused = False
+ self.set_store('pause', None)
+ # unconditionally wake loop so that 'orch resume' can be used to kick
+ # cephadm
+ self._kick_serve_loop()
+
+ def get_unique_name(
+ self,
+ daemon_type: str,
+ host: str,
+ existing: List[orchestrator.DaemonDescription],
+ prefix: Optional[str] = None,
+ forcename: Optional[str] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ ) -> str:
+ """
+ Generate a unique random service name
+ """
+ suffix = daemon_type not in [
+ 'mon', 'crash',
+ 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
+ 'container', 'cephadm-exporter', 'snmp-gateway'
+ ]
+ if forcename:
+ if len([d for d in existing if d.daemon_id == forcename]):
+ raise orchestrator.OrchestratorValidationError(
+ f'name {daemon_type}.{forcename} already in use')
+ return forcename
+
+ if '.' in host:
+ host = host.split('.')[0]
+ while True:
+ if prefix:
+ name = prefix + '.'
+ else:
+ name = ''
+ if rank is not None and rank_generation is not None:
+ name += f'{rank}.{rank_generation}.'
+ name += host
+ if suffix:
+ name += '.' + ''.join(random.choice(string.ascii_lowercase)
+ for _ in range(6))
+ if len([d for d in existing if d.daemon_id == name]):
+ if not suffix:
+ raise orchestrator.OrchestratorValidationError(
+ f'name {daemon_type}.{name} already in use')
+ self.log.debug('name %s exists, trying again', name)
+ continue
+ return name
+
+ def _reconfig_ssh(self) -> None:
+ temp_files = [] # type: list
+ ssh_options = [] # type: List[str]
+
+ # ssh_config
+ ssh_config_fname = self.ssh_config_file
+ ssh_config = self.get_store("ssh_config")
+ if ssh_config is not None or ssh_config_fname is None:
+ if not ssh_config:
+ ssh_config = DEFAULT_SSH_CONFIG
+ f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
+ os.fchmod(f.fileno(), 0o600)
+ f.write(ssh_config.encode('utf-8'))
+ f.flush() # make visible to other processes
+ temp_files += [f]
+ ssh_config_fname = f.name
+ if ssh_config_fname:
+ self.validate_ssh_config_fname(ssh_config_fname)
+ ssh_options += ['-F', ssh_config_fname]
+ self.ssh_config = ssh_config
+
+ # identity
+ ssh_key = self.get_store("ssh_identity_key")
+ ssh_pub = self.get_store("ssh_identity_pub")
+ self.ssh_pub = ssh_pub
+ self.ssh_key = ssh_key
+ if ssh_key and ssh_pub:
+ tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
+ tkey.write(ssh_key.encode('utf-8'))
+ os.fchmod(tkey.fileno(), 0o600)
+ tkey.flush() # make visible to other processes
+ tpub = open(tkey.name + '.pub', 'w')
+ os.fchmod(tpub.fileno(), 0o600)
+ tpub.write(ssh_pub)
+ tpub.flush() # make visible to other processes
+ temp_files += [tkey, tpub]
+ ssh_options += ['-i', tkey.name]
+
+ self._temp_files = temp_files
+ ssh_options += ['-o', 'ServerAliveInterval=7', '-o', 'ServerAliveCountMax=3']
+ self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
+
+ if self.mode == 'root':
+ self.ssh_user = self.get_store('ssh_user', default='root')
+ elif self.mode == 'cephadm-package':
+ self.ssh_user = 'cephadm'
+
+ self._reset_cons()
+
+ def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
+ if ssh_config is None or len(ssh_config.strip()) == 0:
+ raise OrchestratorValidationError('ssh_config cannot be empty')
+ # StrictHostKeyChecking is [yes|no] ?
+ res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
+ if not res:
+ raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
+ for s in res:
+ if 'ask' in s.lower():
+ raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
+
+ def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
+ if not os.path.isfile(ssh_config_fname):
+ raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
+ ssh_config_fname))
+
+ def _reset_con(self, host: str) -> None:
+ conn, r = self._cons.get(host, (None, None))
+ if conn:
+ self.log.debug('_reset_con close %s' % host)
+ conn.exit()
+ del self._cons[host]
+
+ def _reset_cons(self) -> None:
+ for host, conn_and_r in self._cons.items():
+ self.log.debug('_reset_cons close %s' % host)
+ conn, r = conn_and_r
+ conn.exit()
+ self._cons = {}
+
+ def update_watched_hosts(self) -> None:
+ # currently, we are watching hosts with nfs daemons
+ hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
+ ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
+ self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
+
+ def offline_hosts_remove(self, host: str) -> None:
+ if host in self.offline_hosts:
+ self.offline_hosts.remove(host)
+
+ @staticmethod
+ def can_run() -> Tuple[bool, str]:
+ if remoto is not None:
+ return True, ""
+ else:
+ return False, "loading remoto library:{}".format(
+ remoto_import_error)
+
+ def available(self) -> Tuple[bool, str, Dict[str, Any]]:
+ """
+ The cephadm orchestrator is always available.
+ """
+ ok, err = self.can_run()
+ if not ok:
+ return ok, err, {}
+ if not self.ssh_key or not self.ssh_pub:
+ return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
+
+ # mypy is unable to determine type for _processes since it's private
+ worker_count: int = self._worker_pool._processes # type: ignore
+ ret = {
+ "workers": worker_count,
+ "paused": self.paused,
+ }
+
+ return True, err, ret
+
+ def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
+ self.set_store(what, new)
+ self._reconfig_ssh()
+ if self.cache.get_hosts():
+ # Can't check anything without hosts
+ host = self.cache.get_hosts()[0]
+ r = CephadmServe(self)._check_host(host)
+ if r is not None:
+ # connection failed reset user
+ self.set_store(what, old)
+ self._reconfig_ssh()
+ raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
+ self.log.info(f'Set ssh {what}')
+
+ @orchestrator._cli_write_command(
+ prefix='cephadm set-ssh-config')
+ def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Set the ssh_config file (use -i <ssh_config>)
+ """
+ # Set an ssh_config file provided from stdin
+
+ old = self.ssh_config
+ if inbuf == old:
+ return 0, "value unchanged", ""
+ self.validate_ssh_config_content(inbuf)
+ self._validate_and_set_ssh_val('ssh_config', inbuf, old)
+ return 0, "", ""
+
+ @orchestrator._cli_write_command('cephadm clear-ssh-config')
+ def _clear_ssh_config(self) -> Tuple[int, str, str]:
+ """
+ Clear the ssh_config file
+ """
+ # Clear the ssh_config file provided from stdin
+ self.set_store("ssh_config", None)
+ self.ssh_config_tmp = None
+ self.log.info('Cleared ssh_config')
+ self._reconfig_ssh()
+ return 0, "", ""
+
+ @orchestrator._cli_read_command('cephadm get-ssh-config')
+ def _get_ssh_config(self) -> HandleCommandResult:
+ """
+ Returns the ssh config as used by cephadm
+ """
+ if self.ssh_config_file:
+ self.validate_ssh_config_fname(self.ssh_config_file)
+ with open(self.ssh_config_file) as f:
+ return HandleCommandResult(stdout=f.read())
+ ssh_config = self.get_store("ssh_config")
+ if ssh_config:
+ return HandleCommandResult(stdout=ssh_config)
+ return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
+
+ @orchestrator._cli_write_command('cephadm generate-key')
+ def _generate_key(self) -> Tuple[int, str, str]:
+ """
+ Generate a cluster SSH key (if not present)
+ """
+ if not self.ssh_pub or not self.ssh_key:
+ self.log.info('Generating ssh key...')
+ tmp_dir = TemporaryDirectory()
+ path = tmp_dir.name + '/key'
+ try:
+ subprocess.check_call([
+ '/usr/bin/ssh-keygen',
+ '-C', 'ceph-%s' % self._cluster_fsid,
+ '-N', '',
+ '-f', path
+ ])
+ with open(path, 'r') as f:
+ secret = f.read()
+ with open(path + '.pub', 'r') as f:
+ pub = f.read()
+ finally:
+ os.unlink(path)
+ os.unlink(path + '.pub')
+ tmp_dir.cleanup()
+ self.set_store('ssh_identity_key', secret)
+ self.set_store('ssh_identity_pub', pub)
+ self._reconfig_ssh()
+ return 0, '', ''
+
+ @orchestrator._cli_write_command(
+ 'cephadm set-priv-key')
+ def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ """Set cluster SSH private key (use -i <private_key>)"""
+ if inbuf is None or len(inbuf) == 0:
+ return -errno.EINVAL, "", "empty private ssh key provided"
+ old = self.ssh_key
+ if inbuf == old:
+ return 0, "value unchanged", ""
+ self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old)
+ self.log.info('Set ssh private key')
+ return 0, "", ""
+
+ @orchestrator._cli_write_command(
+ 'cephadm set-pub-key')
+ def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ """Set cluster SSH public key (use -i <public_key>)"""
+ if inbuf is None or len(inbuf) == 0:
+ return -errno.EINVAL, "", "empty public ssh key provided"
+ old = self.ssh_pub
+ if inbuf == old:
+ return 0, "value unchanged", ""
+ self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
+ return 0, "", ""
+
+ @orchestrator._cli_write_command(
+ 'cephadm clear-key')
+ def _clear_key(self) -> Tuple[int, str, str]:
+ """Clear cluster SSH key"""
+ self.set_store('ssh_identity_key', None)
+ self.set_store('ssh_identity_pub', None)
+ self._reconfig_ssh()
+ self.log.info('Cleared cluster SSH key')
+ return 0, '', ''
+
+ @orchestrator._cli_read_command(
+ 'cephadm get-pub-key')
+ def _get_pub_key(self) -> Tuple[int, str, str]:
+ """Show SSH public key for connecting to cluster hosts"""
+ if self.ssh_pub:
+ return 0, self.ssh_pub, ''
+ else:
+ return -errno.ENOENT, '', 'No cluster SSH key defined'
+
+ @orchestrator._cli_read_command(
+ 'cephadm get-user')
+ def _get_user(self) -> Tuple[int, str, str]:
+ """
+ Show user for SSHing to cluster hosts
+ """
+ if self.ssh_user is None:
+ return -errno.ENOENT, '', 'No cluster SSH user configured'
+ else:
+ return 0, self.ssh_user, ''
+
+ @orchestrator._cli_read_command(
+ 'cephadm set-user')
+ def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
+ """
+ Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
+ """
+ current_user = self.ssh_user
+ if user == current_user:
+ return 0, "value unchanged", ""
+
+ self._validate_and_set_ssh_val('ssh_user', user, current_user)
+ current_ssh_config = self._get_ssh_config()
+ new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout)
+ self._set_ssh_config(new_ssh_config)
+
+ msg = 'ssh user set to %s' % user
+ if user != 'root':
+ msg += '. sudo will be used'
+ self.log.info(msg)
+ return 0, msg, ''
+
+ @orchestrator._cli_read_command(
+ 'cephadm registry-login')
+ def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
+ """
+ # if password not given in command line, get it through file input
+ if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
+ return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
+ "or -i <login credentials json file>")
+ elif (url and username and password):
+ registry_json = {'url': url, 'username': username, 'password': password}
+ else:
+ assert isinstance(inbuf, str)
+ registry_json = json.loads(inbuf)
+ if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json:
+ return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+
+ # verify login info works by attempting login on random host
+ host = None
+ for host_name in self.inventory.keys():
+ host = host_name
+ break
+ if not host:
+ raise OrchestratorError('no hosts defined')
+ r = CephadmServe(self)._registry_login(host, registry_json)
+ if r is not None:
+ return 1, '', r
+ # if logins succeeded, store info
+ self.log.debug("Host logins successful. Storing login info.")
+ self.set_store('registry_credentials', json.dumps(registry_json))
+ # distribute new login info to all hosts
+ self.cache.distribute_new_registry_login_info()
+ return 0, "registry login scheduled", ''
+
+ @orchestrator._cli_read_command('cephadm check-host')
+ def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
+ """Check whether we can access and manage a remote host"""
+ try:
+ out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True)
+ if code:
+ return 1, '', ('check-host failed:\n' + '\n'.join(err))
+ except OrchestratorError:
+ self.log.exception(f"check-host failed for '{host}'")
+ return 1, '', ('check-host failed:\n'
+ + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
+ # if we have an outstanding health alert for this host, give the
+ # serve thread a kick
+ if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
+ for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
+ if item.startswith('host %s ' % host):
+ self.event.set()
+ return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
+
+ @orchestrator._cli_read_command(
+ 'cephadm prepare-host')
+ def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
+ """Prepare a remote host for use with cephadm"""
+ out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True)
+ if code:
+ return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
+ # if we have an outstanding health alert for this host, give the
+ # serve thread a kick
+ if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
+ for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
+ if item.startswith('host %s ' % host):
+ self.event.set()
+ return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
+
+ @orchestrator._cli_write_command(
+ prefix='cephadm set-extra-ceph-conf')
+ def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
+ """
+ Text that is appended to all daemon's ceph.conf.
+ Mainly a workaround, till `config generate-minimal-conf` generates
+ a complete ceph.conf.
+
+ Warning: this is a dangerous operation.
+ """
+ if inbuf:
+ # sanity check.
+ cp = ConfigParser()
+ cp.read_string(inbuf, source='<infile>')
+
+ self.set_store("extra_ceph_conf", json.dumps({
+ 'conf': inbuf,
+ 'last_modified': datetime_to_str(datetime_now())
+ }))
+ self.log.info('Set extra_ceph_conf')
+ self._kick_serve_loop()
+ return HandleCommandResult()
+
+ @orchestrator._cli_read_command(
+ 'cephadm get-extra-ceph-conf')
+ def _get_extra_ceph_conf(self) -> HandleCommandResult:
+ """
+ Get extra ceph conf that is appended
+ """
+ return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
+
+ def _set_exporter_config(self, config: Dict[str, str]) -> None:
+ self.set_store('exporter_config', json.dumps(config))
+
+ def _get_exporter_config(self) -> Dict[str, str]:
+ cfg_str = self.get_store('exporter_config')
+ return json.loads(cfg_str) if cfg_str else {}
+
+ def _set_exporter_option(self, option: str, value: Optional[str] = None) -> None:
+ kv_option = f'exporter_{option}'
+ self.set_store(kv_option, value)
+
+ def _get_exporter_option(self, option: str) -> Optional[str]:
+ kv_option = f'exporter_{option}'
+ return self.get_store(kv_option)
+
+ @orchestrator._cli_write_command(
+ prefix='cephadm generate-exporter-config')
+ @service_inactive('cephadm-exporter')
+ def _generate_exporter_config(self) -> Tuple[int, str, str]:
+ """
+ Generate default SSL crt/key and token for cephadm exporter daemons
+ """
+ self._set_exporter_defaults()
+ self.log.info('Default settings created for cephadm exporter(s)')
+ return 0, "", ""
+
+ def _set_exporter_defaults(self) -> None:
+ crt, key = self._generate_exporter_ssl()
+ token = self._generate_exporter_token()
+ self._set_exporter_config({
+ "crt": crt,
+ "key": key,
+ "token": token,
+ "port": CephadmExporterConfig.DEFAULT_PORT
+ })
+ self._set_exporter_option('enabled', 'true')
+
+ def _generate_exporter_ssl(self) -> Tuple[str, str]:
+ return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"})
+
+ def _generate_exporter_token(self) -> str:
+ return secrets.token_hex(32)
+
+ @orchestrator._cli_write_command(
+ prefix='cephadm clear-exporter-config')
+ @service_inactive('cephadm-exporter')
+ def _clear_exporter_config(self) -> Tuple[int, str, str]:
+ """
+ Clear the SSL configuration used by cephadm exporter daemons
+ """
+ self._clear_exporter_config_settings()
+ self.log.info('Cleared cephadm exporter configuration')
+ return 0, "", ""
+
+ def _clear_exporter_config_settings(self) -> None:
+ self.set_store('exporter_config', None)
+ self._set_exporter_option('enabled', None)
+
+ @orchestrator._cli_write_command(
+ prefix='cephadm set-exporter-config')
+ @service_inactive('cephadm-exporter')
+ def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Set custom cephadm-exporter configuration from a json file (-i <file>). JSON must contain crt, key, token and port
+ """
+ if not inbuf:
+ return 1, "", "JSON configuration has not been provided (-i <filename>)"
+
+ cfg = CephadmExporterConfig(self)
+ rc, reason = cfg.load_from_json(inbuf)
+ if rc:
+ return 1, "", reason
+
+ rc, reason = cfg.validate_config()
+ if rc:
+ return 1, "", reason
+
+ self._set_exporter_config({
+ "crt": cfg.crt,
+ "key": cfg.key,
+ "token": cfg.token,
+ "port": cfg.port
+ })
+ self.log.info("Loaded and verified the TLS configuration")
+ return 0, "", ""
+
+ @orchestrator._cli_read_command(
+ 'cephadm get-exporter-config')
+ def _show_exporter_config(self) -> Tuple[int, str, str]:
+ """
+ Show the current cephadm-exporter configuraion (JSON)'
+ """
+ cfg = self._get_exporter_config()
+ return 0, json.dumps(cfg, indent=2), ""
+
+ @orchestrator._cli_read_command('cephadm config-check ls')
+ def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
+ """List the available configuration checks and their current state"""
+
+ if format not in [Format.plain, Format.json, Format.json_pretty]:
+ return HandleCommandResult(
+ retval=1,
+ stderr="Requested format is not supported when listing configuration checks"
+ )
+
+ if format in [Format.json, Format.json_pretty]:
+ return HandleCommandResult(
+ stdout=to_format(self.config_checker.health_checks,
+ format,
+ many=True,
+ cls=None))
+
+ # plain formatting
+ table = PrettyTable(
+ ['NAME',
+ 'HEALTHCHECK',
+ 'STATUS',
+ 'DESCRIPTION'
+ ], border=False)
+ table.align['NAME'] = 'l'
+ table.align['HEALTHCHECK'] = 'l'
+ table.align['STATUS'] = 'l'
+ table.align['DESCRIPTION'] = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for c in self.config_checker.health_checks:
+ table.add_row((
+ c.name,
+ c.healthcheck_name,
+ c.status,
+ c.description,
+ ))
+
+ return HandleCommandResult(stdout=table.get_string())
+
+ @orchestrator._cli_read_command('cephadm config-check status')
+ def _config_check_status(self) -> HandleCommandResult:
+ """Show whether the configuration checker feature is enabled/disabled"""
+ status = self.get_module_option('config_checks_enabled')
+ return HandleCommandResult(stdout="Enabled" if status else "Disabled")
+
+ @orchestrator._cli_write_command('cephadm config-check enable')
+ def _config_check_enable(self, check_name: str) -> HandleCommandResult:
+ """Enable a specific configuration check"""
+ if not self._config_check_valid(check_name):
+ return HandleCommandResult(retval=1, stderr="Invalid check name")
+
+ err, msg = self._update_config_check(check_name, 'enabled')
+ if err:
+ return HandleCommandResult(
+ retval=err,
+ stderr=f"Failed to enable check '{check_name}' : {msg}")
+
+ return HandleCommandResult(stdout="ok")
+
+ @orchestrator._cli_write_command('cephadm config-check disable')
+ def _config_check_disable(self, check_name: str) -> HandleCommandResult:
+ """Disable a specific configuration check"""
+ if not self._config_check_valid(check_name):
+ return HandleCommandResult(retval=1, stderr="Invalid check name")
+
+ err, msg = self._update_config_check(check_name, 'disabled')
+ if err:
+ return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}")
+ else:
+ # drop any outstanding raised healthcheck for this check
+ config_check = self.config_checker.lookup_check(check_name)
+ if config_check:
+ if config_check.healthcheck_name in self.health_checks:
+ self.health_checks.pop(config_check.healthcheck_name, None)
+ self.set_health_checks(self.health_checks)
+ else:
+ self.log.error(
+ f"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
+
+ return HandleCommandResult(stdout="ok")
+
+ def _config_check_valid(self, check_name: str) -> bool:
+ return check_name in [chk.name for chk in self.config_checker.health_checks]
+
+ def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]:
+ checks_raw = self.get_store('config_checks')
+ if not checks_raw:
+ return 1, "config_checks setting is not available"
+
+ checks = json.loads(checks_raw)
+ checks.update({
+ check_name: status
+ })
+ self.log.info(f"updated config check '{check_name}' : {status}")
+ self.set_store('config_checks', json.dumps(checks))
+ return 0, ""
+
+ class ExtraCephConf(NamedTuple):
+ conf: str
+ last_modified: Optional[datetime.datetime]
+
+ def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
+ data = self.get_store('extra_ceph_conf')
+ if not data:
+ return CephadmOrchestrator.ExtraCephConf('', None)
+ try:
+ j = json.loads(data)
+ except ValueError:
+ msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
+ self.log.exception('%s: \'%s\'', msg, data)
+ return CephadmOrchestrator.ExtraCephConf('', None)
+ return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
+
+ def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
+ conf = self.extra_ceph_conf()
+ if not conf.last_modified:
+ return False
+ return conf.last_modified > dt
+
+ @orchestrator._cli_write_command(
+ 'cephadm osd activate'
+ )
+ def _osd_activate(self, host: List[str]) -> HandleCommandResult:
+ """
+ Start OSD containers for existing OSDs
+ """
+
+ @forall_hosts
+ def run(h: str) -> str:
+ return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')
+
+ return HandleCommandResult(stdout='\n'.join(run(host)))
+
+ @orchestrator._cli_read_command('orch client-keyring ls')
+ def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
+ """
+ List client keyrings under cephadm management
+ """
+ if format != Format.plain:
+ output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec)
+ else:
+ table = PrettyTable(
+ ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
+ border=False)
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity):
+ table.add_row((
+ ks.entity, ks.placement.pretty_str(),
+ utils.file_mode_to_str(ks.mode),
+ f'{ks.uid}:{ks.gid}',
+ ks.path,
+ ))
+ output = table.get_string()
+ return HandleCommandResult(stdout=output)
+
+ @orchestrator._cli_write_command('orch client-keyring set')
+ def _client_keyring_set(
+ self,
+ entity: str,
+ placement: str,
+ owner: Optional[str] = None,
+ mode: Optional[str] = None,
+ ) -> HandleCommandResult:
+ """
+ Add or update client keyring under cephadm management
+ """
+ if not entity.startswith('client.'):
+ raise OrchestratorError('entity must start with client.')
+ if owner:
+ try:
+ uid, gid = map(int, owner.split(':'))
+ except Exception:
+ raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
+ else:
+ uid = 0
+ gid = 0
+ if mode:
+ try:
+ imode = int(mode, 8)
+ except Exception:
+ raise OrchestratorError('mode must be an octal mode, e.g. "600"')
+ else:
+ imode = 0o600
+ pspec = PlacementSpec.from_string(placement)
+ ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid)
+ self.keys.update(ks)
+ self._kick_serve_loop()
+ return HandleCommandResult()
+
+ @orchestrator._cli_write_command('orch client-keyring rm')
+ def _client_keyring_rm(
+ self,
+ entity: str,
+ ) -> HandleCommandResult:
+ """
+ Remove client keyring from cephadm management
+ """
+ self.keys.rm(entity)
+ self._kick_serve_loop()
+ return HandleCommandResult()
+
+ def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
+ 'remoto.backends.LegacyModuleExecute']:
+ """
+ Setup a connection for running commands on remote host.
+ """
+ conn, r = self._cons.get(host, (None, None))
+ if conn:
+ if conn.has_connection():
+ self.log.debug('Have connection to %s' % host)
+ return conn, r
+ else:
+ self._reset_con(host)
+ assert self.ssh_user
+ n = self.ssh_user + '@' + host
+ self.log.debug("Opening connection to {} with ssh options '{}'".format(
+ n, self._ssh_options))
+ child_logger = self.log.getChild(n)
+ child_logger.setLevel('WARNING')
+ conn = remoto.Connection(
+ n,
+ logger=child_logger,
+ ssh_options=self._ssh_options,
+ sudo=True if self.ssh_user != 'root' else False)
+
+ r = conn.import_module(remotes)
+ self._cons[host] = conn, r
+
+ return conn, r
+
+ def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
+ """
+ Remote validator that accepts a connection object to ensure that a certain
+ executable is available returning its full path if so.
+
+ Otherwise an exception with thorough details will be raised, informing the
+ user that the executable was not found.
+ """
+ executable_path = conn.remote_module.which(executable)
+ if not executable_path:
+ raise RuntimeError("Executable '{}' not found on host '{}'".format(
+ executable, conn.hostname))
+ self.log.debug("Found executable '{}' at path '{}'".format(executable,
+ executable_path))
+ return executable_path
+
+ def _get_container_image(self, daemon_name: str) -> Optional[str]:
+ daemon_type = daemon_name.split('.', 1)[0] # type: ignore
+ image: Optional[str] = None
+ if daemon_type in CEPH_IMAGE_TYPES:
+ # get container image
+ image = str(self.get_foreign_ceph_option(
+ utils.name_to_config_section(daemon_name),
+ 'container_image'
+ )).strip()
+ elif daemon_type == 'prometheus':
+ image = self.container_image_prometheus
+ elif daemon_type == 'grafana':
+ image = self.container_image_grafana
+ elif daemon_type == 'alertmanager':
+ image = self.container_image_alertmanager
+ elif daemon_type == 'node-exporter':
+ image = self.container_image_node_exporter
+ elif daemon_type == 'haproxy':
+ image = self.container_image_haproxy
+ elif daemon_type == 'keepalived':
+ image = self.container_image_keepalived
+ elif daemon_type == CustomContainerService.TYPE:
+ # The image can't be resolved, the necessary information
+ # is only available when a container is deployed (given
+ # via spec).
+ image = None
+ elif daemon_type == 'snmp-gateway':
+ image = self.container_image_snmp_gateway
+ else:
+ assert False, daemon_type
+
+ self.log.debug('%s container image %s' % (daemon_name, image))
+
+ return image
+
+ def _schedulable_hosts(self) -> List[HostSpec]:
+ """
+ Returns all usable hosts that went through _refresh_host_daemons().
+
+ This mitigates a potential race, where new host was added *after*
+ ``_refresh_host_daemons()`` was called, but *before*
+ ``_apply_all_specs()`` was called. thus we end up with a hosts
+ where daemons might be running, but we have not yet detected them.
+ """
+ return [
+ h for h in self.inventory.all_specs()
+ if (
+ self.cache.host_had_daemon_refresh(h.hostname)
+ and '_no_schedule' not in h.labels
+ )
+ ]
+
+ def _unreachable_hosts(self) -> List[HostSpec]:
+ """
+ Return all hosts that are offline or in maintenance mode.
+
+ The idea is we should not touch the daemons on these hosts (since
+ in theory the hosts are inaccessible so we CAN'T touch them) but
+ we still want to count daemons that exist on these hosts toward the
+ placement so daemons on these hosts aren't just moved elsewhere
+ """
+ return [
+ h for h in self.inventory.all_specs()
+ if (
+ h.status.lower() in ['maintenance', 'offline']
+ or h.hostname in self.offline_hosts
+ )
+ ]
+
+ def _check_valid_addr(self, host: str, addr: str) -> str:
+ # make sure hostname is resolvable before trying to make a connection
+ try:
+ ip_addr = utils.resolve_ip(addr)
+ except OrchestratorError as e:
+ msg = str(e) + f'''
+You may need to supply an address for {addr}
+
+Please make sure that the host is reachable and accepts connections using the cephadm SSH key
+To add the cephadm SSH key to the host:
+> ceph cephadm get-pub-key > ~/ceph.pub
+> ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
+
+To check that the host is reachable open a new shell with the --no-hosts flag:
+> cephadm shell --no-hosts
+
+Then run the following:
+> ceph cephadm get-ssh-config > ssh_config
+> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
+> chmod 0600 ~/cephadm_private_key
+> ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
+ raise OrchestratorError(msg)
+
+ if ipaddress.ip_address(ip_addr).is_loopback and host == addr:
+ # if this is a re-add, use old address. otherwise error
+ if host not in self.inventory or self.inventory.get_addr(host) == host:
+ raise OrchestratorError(
+ (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
+ + f'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
+ self.log.debug(
+ f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
+ ip_addr = self.inventory.get_addr(host)
+ out, err, code = CephadmServe(self)._run_cephadm(
+ host, cephadmNoImage, 'check-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True)
+ if code:
+ msg = 'check-host failed:\n' + '\n'.join(err)
+ # err will contain stdout and stderr, so we filter on the message text to
+ # only show the errors
+ errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
+ if errors:
+ msg = f'Host {host} ({addr}) failed check(s): {errors}'
+ raise OrchestratorError(msg)
+ return ip_addr
+
+ def _add_host(self, spec):
+ # type: (HostSpec) -> str
+ """
+ Add a host to be managed by the orchestrator.
+
+ :param host: host name
+ """
+ HostSpec.validate(spec)
+ ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
+ if spec.addr == spec.hostname and ip_addr:
+ spec.addr = ip_addr
+
+ if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr:
+ self.cache.refresh_all_host_info(spec.hostname)
+
+ # prime crush map?
+ if spec.location:
+ self.check_mon_command({
+ 'prefix': 'osd crush add-bucket',
+ 'name': spec.hostname,
+ 'type': 'host',
+ 'args': [f'{k}={v}' for k, v in spec.location.items()],
+ })
+
+ if spec.hostname not in self.inventory:
+ self.cache.prime_empty_host(spec.hostname)
+ self.inventory.add_host(spec)
+ self.offline_hosts_remove(spec.hostname)
+ if spec.status == 'maintenance':
+ self._set_maintenance_healthcheck()
+ self.event.set() # refresh stray health check
+ self.log.info('Added host %s' % spec.hostname)
+ return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
+
+ @handle_orch_error
+ def add_host(self, spec: HostSpec) -> str:
+ return self._add_host(spec)
+
+ @handle_orch_error
+ def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str:
+ """
+ Remove a host from orchestrator management.
+
+ :param host: host name
+ :param force: bypass running daemons check
+ :param offline: remove offline host
+ """
+
+ # check if host is offline
+ host_offline = host in self.offline_hosts
+
+ if host_offline and not offline:
+ raise OrchestratorValidationError(
+ "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host))
+
+ if not host_offline and offline:
+ raise OrchestratorValidationError(
+ "{} is online, please remove host without --offline.".format(host))
+
+ if offline and not force:
+ raise OrchestratorValidationError("Removing an offline host requires --force")
+
+ # check if there are daemons on the host
+ if not force:
+ daemons = self.cache.get_daemons_by_host(host)
+ if daemons:
+ self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}")
+
+ daemons_table = ""
+ daemons_table += "{:<20} {:<15}\n".format("type", "id")
+ daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
+ for d in daemons:
+ daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
+
+ raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
+ "The following daemons are running in the host:"
+ "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
+ host, daemons_table, host))
+
+ # check, if there we're removing the last _admin host
+ if not force:
+ p = PlacementSpec(label='_admin')
+ admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
+ if len(admin_hosts) == 1 and admin_hosts[0] == host:
+ raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
+ " label. Please add the '_admin' label to a host"
+ " or add --force to this command")
+
+ def run_cmd(cmd_args: dict) -> None:
+ ret, out, err = self.mon_command(cmd_args)
+ if ret != 0:
+ self.log.debug(f"ran {cmd_args} with mon_command")
+ self.log.error(
+ f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+ self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+
+ if offline:
+ daemons = self.cache.get_daemons_by_host(host)
+ for d in daemons:
+ self.log.info(f"removing: {d.name()}")
+
+ if d.daemon_type != 'osd':
+ self.cephadm_services[str(d.daemon_type)].pre_remove(d)
+ self.cephadm_services[str(d.daemon_type)].post_remove(d, is_failed_deploy=False)
+ else:
+ cmd_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(str(d.daemon_id)),
+ 'yes_i_really_mean_it': True
+ }
+ run_cmd(cmd_args)
+
+ cmd_args = {
+ 'prefix': 'osd crush rm',
+ 'name': host
+ }
+ run_cmd(cmd_args)
+
+ self.inventory.rm_host(host)
+ self.cache.rm_host(host)
+ self._reset_con(host)
+ self.event.set() # refresh stray health check
+ self.log.info('Removed host %s' % host)
+ return "Removed {} host '{}'".format('offline' if offline else '', host)
+
+ @handle_orch_error
+ def update_host_addr(self, host: str, addr: str) -> str:
+ self._check_valid_addr(host, addr)
+ self.inventory.set_addr(host, addr)
+ self._reset_con(host)
+ self.event.set() # refresh stray health check
+ self.log.info('Set host %s addr to %s' % (host, addr))
+ return "Updated host '{}' addr to '{}'".format(host, addr)
+
+ @handle_orch_error
+ def get_hosts(self):
+ # type: () -> List[orchestrator.HostSpec]
+ """
+ Return a list of hosts managed by the orchestrator.
+
+ Notes:
+ - skip async: manager reads from cache.
+ """
+ return list(self.inventory.all_specs())
+
+ @handle_orch_error
+ def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]:
+ """
+ Return a list of hosts metadata(gather_facts) managed by the orchestrator.
+
+ Notes:
+ - skip async: manager reads from cache.
+ """
+ if hostname:
+ return [self.cache.get_facts(hostname)]
+
+ return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()]
+
+ @handle_orch_error
+ def add_host_label(self, host: str, label: str) -> str:
+ self.inventory.add_label(host, label)
+ self.log.info('Added label %s to host %s' % (label, host))
+ self._kick_serve_loop()
+ return 'Added label %s to host %s' % (label, host)
+
+ @handle_orch_error
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
+ # if we remove the _admin label from the only host that has it we could end up
+ # removing the only instance of the config and keyring and cause issues
+ if not force and label == '_admin':
+ p = PlacementSpec(label='_admin')
+ admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
+ if len(admin_hosts) == 1 and admin_hosts[0] == host:
+ raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
+ " label.\nRemoving the _admin label from this host could cause the removal"
+ " of the last cluster config/keyring managed by cephadm.\n"
+ "It is recommended to add the _admin label to another host"
+ " before completing this operation.\nIf you're certain this is"
+ " what you want rerun this command with --force.")
+ self.inventory.rm_label(host, label)
+ self.log.info('Removed label %s to host %s' % (label, host))
+ self._kick_serve_loop()
+ return 'Removed label %s from host %s' % (label, host)
+
+ def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
+ self.log.debug("running host-ok-to-stop checks")
+ daemons = self.cache.get_daemons()
+ daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
+ for dd in daemons:
+ assert dd.hostname is not None
+ assert dd.daemon_type is not None
+ assert dd.daemon_id is not None
+ if dd.hostname == hostname:
+ daemon_map[dd.daemon_type].append(dd.daemon_id)
+
+ notifications: List[str] = []
+ error_notifications: List[str] = []
+ okay: bool = True
+ for daemon_type, daemon_ids in daemon_map.items():
+ r = self.cephadm_services[daemon_type_to_service(
+ daemon_type)].ok_to_stop(daemon_ids, force=force)
+ if r.retval:
+ okay = False
+ # collect error notifications so user can see every daemon causing host
+ # to not be okay to stop
+ error_notifications.append(r.stderr)
+ if r.stdout:
+ # if extra notifications to print for user, add them to notifications list
+ notifications.append(r.stdout)
+
+ if not okay:
+ # at least one daemon is not okay to stop
+ return 1, '\n'.join(error_notifications)
+
+ if notifications:
+ return 0, (f'It is presumed safe to stop host {hostname}. '
+ + 'Note the following:\n\n' + '\n'.join(notifications))
+ return 0, f'It is presumed safe to stop host {hostname}'
+
+ @handle_orch_error
+ def host_ok_to_stop(self, hostname: str) -> str:
+ if hostname not in self.cache.get_hosts():
+ raise OrchestratorError(f'Cannot find host "{hostname}"')
+
+ rc, msg = self._host_ok_to_stop(hostname)
+ if rc:
+ raise OrchestratorError(msg, errno=rc)
+
+ self.log.info(msg)
+ return msg
+
+ def _set_maintenance_healthcheck(self) -> None:
+ """Raise/update or clear the maintenance health check as needed"""
+
+ in_maintenance = self.inventory.get_host_with_state("maintenance")
+ if not in_maintenance:
+ self.remove_health_warning('HOST_IN_MAINTENANCE')
+ else:
+ s = "host is" if len(in_maintenance) == 1 else "hosts are"
+ self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [
+ f"{h} is in maintenance" for h in in_maintenance])
+
+ @handle_orch_error
+ @host_exists()
+ def enter_host_maintenance(self, hostname: str, force: bool = False) -> str:
+ """ Attempt to place a cluster host in maintenance
+
+ Placing a host into maintenance disables the cluster's ceph target in systemd
+ and stops all ceph daemons. If the host is an osd host we apply the noout flag
+ for the host subtree in crush to prevent data movement during a host maintenance
+ window.
+
+ :param hostname: (str) name of the host (must match an inventory hostname)
+
+ :raises OrchestratorError: Hostname is invalid, host is already in maintenance
+ """
+ if len(self.cache.get_hosts()) == 1:
+ raise OrchestratorError("Maintenance feature is not supported on single node clusters")
+
+ # if upgrade is active, deny
+ if self.upgrade.upgrade_state:
+ raise OrchestratorError(
+ f"Unable to place {hostname} in maintenance with upgrade active/paused")
+
+ tgt_host = self.inventory._inventory[hostname]
+ if tgt_host.get("status", "").lower() == "maintenance":
+ raise OrchestratorError(f"Host {hostname} is already in maintenance")
+
+ host_daemons = self.cache.get_daemon_types(hostname)
+ self.log.debug("daemons on host {}".format(','.join(host_daemons)))
+ if host_daemons:
+ # daemons on this host, so check the daemons can be stopped
+ # and if so, place the host into maintenance by disabling the target
+ rc, msg = self._host_ok_to_stop(hostname, force)
+ if rc:
+ raise OrchestratorError(
+ msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
+
+ # call the host-maintenance function
+ _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
+ ["enter"],
+ error_ok=True)
+ returned_msg = _err[0].split('\n')[-1]
+ if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
+ raise OrchestratorError(
+ f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
+
+ if "osd" in host_daemons:
+ crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
+ rc, out, err = self.mon_command({
+ 'prefix': 'osd set-group',
+ 'flags': 'noout',
+ 'who': [crush_node],
+ 'format': 'json'
+ })
+ if rc:
+ self.log.warning(
+ f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
+ raise OrchestratorError(
+ f"Unable to set the osds on {hostname} to noout (rc={rc})")
+ else:
+ self.log.info(
+ f"maintenance mode request for {hostname} has SET the noout group")
+
+ # update the host status in the inventory
+ tgt_host["status"] = "maintenance"
+ self.inventory._inventory[hostname] = tgt_host
+ self.inventory.save()
+
+ self._set_maintenance_healthcheck()
+ return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
+
+ @handle_orch_error
+ @host_exists()
+ def exit_host_maintenance(self, hostname: str) -> str:
+ """Exit maintenance mode and return a host to an operational state
+
+ Returning from maintnenance will enable the clusters systemd target and
+ start it, and remove any noout that has been added for the host if the
+ host has osd daemons
+
+ :param hostname: (str) host name
+
+ :raises OrchestratorError: Unable to return from maintenance, or unset the
+ noout flag
+ """
+ tgt_host = self.inventory._inventory[hostname]
+ if tgt_host['status'] != "maintenance":
+ raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
+
+ outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
+ ['exit'],
+ error_ok=True)
+ returned_msg = errs[0].split('\n')[-1]
+ if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
+ raise OrchestratorError(
+ f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
+
+ if "osd" in self.cache.get_daemon_types(hostname):
+ crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
+ rc, _out, _err = self.mon_command({
+ 'prefix': 'osd unset-group',
+ 'flags': 'noout',
+ 'who': [crush_node],
+ 'format': 'json'
+ })
+ if rc:
+ self.log.warning(
+ f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
+ raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
+ else:
+ self.log.info(
+ f"exit maintenance request has UNSET for the noout group on host {hostname}")
+
+ # update the host record status
+ tgt_host['status'] = ""
+ self.inventory._inventory[hostname] = tgt_host
+ self.inventory.save()
+
+ self._set_maintenance_healthcheck()
+
+ return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
+
+ @handle_orch_error
+ @host_exists()
+ def rescan_host(self, hostname: str) -> str:
+ """Use cephadm to issue a disk rescan on each HBA
+
+ Some HBAs and external enclosures don't automatically register
+ device insertion with the kernel, so for these scenarios we need
+ to manually rescan
+
+ :param hostname: (str) host name
+ """
+ self.log.info(f'disk rescan request sent to host "{hostname}"')
+ _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
+ [], no_fsid=True, error_ok=True)
+ if not _err:
+ raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
+
+ msg = _err[0].split('\n')[-1]
+ log_msg = f'disk rescan: {msg}'
+ if msg.upper().startswith('OK'):
+ self.log.info(log_msg)
+ else:
+ self.log.warning(log_msg)
+
+ return f'{msg}'
+
+ def get_minimal_ceph_conf(self) -> str:
+ _, config, _ = self.check_mon_command({
+ "prefix": "config generate-minimal-conf",
+ })
+ extra = self.extra_ceph_conf().conf
+ if extra:
+ config += '\n\n' + extra.strip() + '\n'
+ return config
+
+ def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
+ if filter_host:
+ self.cache.invalidate_host_daemons(filter_host)
+ else:
+ for h in self.cache.get_hosts():
+ # Also discover daemons deployed manually
+ self.cache.invalidate_host_daemons(h)
+
+ self._kick_serve_loop()
+
+ @handle_orch_error
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.ServiceDescription]:
+ if refresh:
+ self._invalidate_daemons_and_kick_serve()
+ self.log.debug('Kicked serve() loop to refresh all services')
+
+ sm: Dict[str, orchestrator.ServiceDescription] = {}
+
+ # known services
+ for nm, spec in self.spec_store.all_specs.items():
+ if service_type is not None and service_type != spec.service_type:
+ continue
+ if service_name is not None and service_name != nm:
+ continue
+
+ if spec.service_type != 'osd':
+ size = spec.placement.get_target_count(self._schedulable_hosts())
+ else:
+ # osd counting is special
+ size = 0
+
+ sm[nm] = orchestrator.ServiceDescription(
+ spec=spec,
+ size=size,
+ running=0,
+ events=self.events.get_for_service(spec.service_name()),
+ created=self.spec_store.spec_created[nm],
+ deleted=self.spec_store.spec_deleted.get(nm, None),
+ virtual_ip=spec.get_virtual_ip(),
+ ports=spec.get_port_start(),
+ )
+ if spec.service_type == 'ingress':
+ # ingress has 2 daemons running per host
+ sm[nm].size *= 2
+
+ # factor daemons into status
+ for h, dm in self.cache.get_daemons_with_volatile_status():
+ for name, dd in dm.items():
+ assert dd.hostname is not None, f'no hostname for {dd!r}'
+ assert dd.daemon_type is not None, f'no daemon_type for {dd!r}'
+
+ n: str = dd.service_name()
+
+ if (
+ service_type
+ and service_type != daemon_type_to_service(dd.daemon_type)
+ ):
+ continue
+ if service_name and service_name != n:
+ continue
+
+ if n not in sm:
+ # new unmanaged service
+ spec = ServiceSpec(
+ unmanaged=True,
+ service_type=daemon_type_to_service(dd.daemon_type),
+ service_id=dd.service_id(),
+ )
+ sm[n] = orchestrator.ServiceDescription(
+ last_refresh=dd.last_refresh,
+ container_image_id=dd.container_image_id,
+ container_image_name=dd.container_image_name,
+ spec=spec,
+ size=0,
+ )
+
+ if dd.status == DaemonDescriptionStatus.running:
+ sm[n].running += 1
+ if dd.daemon_type == 'osd':
+ # The osd count can't be determined by the Placement spec.
+ # Showing an actual/expected representation cannot be determined
+ # here. So we're setting running = size for now.
+ sm[n].size += 1
+ if (
+ not sm[n].last_refresh
+ or not dd.last_refresh
+ or dd.last_refresh < sm[n].last_refresh # type: ignore
+ ):
+ sm[n].last_refresh = dd.last_refresh
+
+ return list(sm.values())
+
+ @handle_orch_error
+ def list_daemons(self,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.DaemonDescription]:
+ if refresh:
+ self._invalidate_daemons_and_kick_serve(host)
+ self.log.debug('Kicked serve() loop to refresh all daemons')
+
+ result = []
+ for h, dm in self.cache.get_daemons_with_volatile_status():
+ if host and h != host:
+ continue
+ for name, dd in dm.items():
+ if daemon_type is not None and daemon_type != dd.daemon_type:
+ continue
+ if daemon_id is not None and daemon_id != dd.daemon_id:
+ continue
+ if service_name is not None and service_name != dd.service_name():
+ continue
+ if not dd.memory_request and dd.daemon_type in ['osd', 'mon']:
+ dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option(
+ dd.name(),
+ f"{dd.daemon_type}_memory_target"
+ ))
+ result.append(dd)
+ return result
+
+ @handle_orch_error
+ def service_action(self, action: str, service_name: str) -> List[str]:
+ if service_name not in self.spec_store.all_specs.keys():
+ raise OrchestratorError(f'Invalid service name "{service_name}".'
+ + ' View currently running services using "ceph orch ls"')
+ dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
+ if not dds:
+ raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
+ + ' View currently running services using "ceph orch ls"')
+ if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
+ return [f'Stopping entire {service_name} service is prohibited.']
+ self.log.info('%s service %s' % (action.capitalize(), service_name))
+ return [
+ self._schedule_daemon_action(dd.name(), action)
+ for dd in dds
+ ]
+
+ def _daemon_action(self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ action: str,
+ image: Optional[str] = None) -> str:
+ self._daemon_action_set_image(action, image, daemon_spec.daemon_type,
+ daemon_spec.daemon_id)
+
+ if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type,
+ daemon_spec.daemon_id):
+ self.mgr_service.fail_over()
+ return '' # unreachable
+
+ if action == 'redeploy' or action == 'reconfig':
+ if daemon_spec.daemon_type != 'osd':
+ daemon_spec = self.cephadm_services[daemon_type_to_service(
+ daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ else:
+ # for OSDs, we still need to update config, just not carry out the full
+ # prepare_create function
+ daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(daemon_spec)
+ return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))
+
+ actions = {
+ 'start': ['reset-failed', 'start'],
+ 'stop': ['stop'],
+ 'restart': ['reset-failed', 'restart'],
+ }
+ name = daemon_spec.name()
+ for a in actions[action]:
+ try:
+ out, err, code = CephadmServe(self)._run_cephadm(
+ daemon_spec.host, name, 'unit',
+ ['--name', name, a])
+ except Exception:
+ self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
+ self.cache.invalidate_host_daemons(daemon_spec.host)
+ msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
+ self.events.for_daemon(name, 'INFO', msg)
+ return msg
+
+ def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
+ if image is not None:
+ if action != 'redeploy':
+ raise OrchestratorError(
+ f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
+ if daemon_type not in CEPH_IMAGE_TYPES:
+ raise OrchestratorError(
+ f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
+ f'types are: {", ".join(CEPH_IMAGE_TYPES)}')
+
+ self.check_mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': image,
+ 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
+ })
+
+ @handle_orch_error
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
+ d = self.cache.get_daemon(daemon_name)
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+
+ if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \
+ and not self.mgr_service.mgr_map_has_standby():
+ raise OrchestratorError(
+ f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
+
+ self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
+
+ self.log.info(f'Schedule {action} daemon {daemon_name}')
+ return self._schedule_daemon_action(daemon_name, action)
+
+ def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
+ return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
+
+ def get_active_mgr_digests(self) -> List[str]:
+ digests = self.mgr_service.get_active_daemon(
+ self.cache.get_daemons_by_type('mgr')).container_image_digests
+ return digests if digests else []
+
+ def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
+ dd = self.cache.get_daemon(daemon_name)
+ assert dd.daemon_type is not None
+ assert dd.daemon_id is not None
+ assert dd.hostname is not None
+ if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
+ and not self.mgr_service.mgr_map_has_standby():
+ raise OrchestratorError(
+ f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
+ self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
+ msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
+ self._kick_serve_loop()
+ return msg
+
+ @handle_orch_error
+ def remove_daemons(self, names):
+ # type: (List[str]) -> List[str]
+ args = []
+ for host, dm in self.cache.daemons.items():
+ for name in names:
+ if name in dm:
+ args.append((name, host))
+ if not args:
+ raise OrchestratorError('Unable to find daemon(s) %s' % (names))
+ self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
+ return self._remove_daemons(args)
+
+ @handle_orch_error
+ def remove_service(self, service_name: str, force: bool = False) -> str:
+ self.log.info('Remove service %s' % service_name)
+ self._trigger_preview_refresh(service_name=service_name)
+ if service_name in self.spec_store:
+ if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
+ return f'Unable to remove {service_name} service.\n' \
+ f'Note, you might want to mark the {service_name} service as "unmanaged"'
+ else:
+ return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
+
+ # Report list of affected OSDs?
+ if not force and service_name.startswith('osd.'):
+ osds_msg = {}
+ for h, dm in self.cache.get_daemons_with_volatile_status():
+ osds_to_remove = []
+ for name, dd in dm.items():
+ if dd.daemon_type == 'osd' and dd.service_name() == service_name:
+ osds_to_remove.append(str(dd.daemon_id))
+ if osds_to_remove:
+ osds_msg[h] = osds_to_remove
+ if osds_msg:
+ msg = ''
+ for h, ls in osds_msg.items():
+ msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
+ raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
+
+ found = self.spec_store.rm(service_name)
+ if found and service_name.startswith('osd.'):
+ self.spec_store.finally_rm(service_name)
+ self._kick_serve_loop()
+ return f'Removed service {service_name}'
+
+ @handle_orch_error
+ def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
+ """
+ Return the storage inventory of hosts matching the given filter.
+
+ :param host_filter: host filter
+
+ TODO:
+ - add filtering by label
+ """
+ if refresh:
+ if host_filter and host_filter.hosts:
+ for h in host_filter.hosts:
+ self.log.debug(f'will refresh {h} devs')
+ self.cache.invalidate_host_devices(h)
+ else:
+ for h in self.cache.get_hosts():
+ self.log.debug(f'will refresh {h} devs')
+ self.cache.invalidate_host_devices(h)
+
+ self.event.set()
+ self.log.debug('Kicked serve() loop to refresh devices')
+
+ result = []
+ for host, dls in self.cache.devices.items():
+ if host_filter and host_filter.hosts and host not in host_filter.hosts:
+ continue
+ result.append(orchestrator.InventoryHost(host,
+ inventory.Devices(dls)))
+ return result
+
+ @handle_orch_error
+ def zap_device(self, host: str, path: str) -> str:
+ """Zap a device on a managed host.
+
+ Use ceph-volume zap to return a device to an unused/free state
+
+ Args:
+ host (str): hostname of the cluster host
+ path (str): device path
+
+ Raises:
+ OrchestratorError: host is not a cluster host
+ OrchestratorError: host is in maintenance and therefore unavailable
+ OrchestratorError: device path not found on the host
+ OrchestratorError: device is known to a different ceph cluster
+ OrchestratorError: device holds active osd
+ OrchestratorError: device cache hasn't been populated yet..
+
+ Returns:
+ str: output from the zap command
+ """
+
+ self.log.info('Zap device %s:%s' % (host, path))
+
+ if host not in self.inventory.keys():
+ raise OrchestratorError(
+ f"Host '{host}' is not a member of the cluster")
+
+ host_info = self.inventory._inventory.get(host, {})
+ if host_info.get('status', '').lower() == 'maintenance':
+ raise OrchestratorError(
+ f"Host '{host}' is in maintenance mode, which prevents any actions against it.")
+
+ if host not in self.cache.devices:
+ raise OrchestratorError(
+ f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
+
+ host_devices = self.cache.devices[host]
+ path_found = False
+ osd_id_list: List[str] = []
+
+ for dev in host_devices:
+ if dev.path == path:
+ # match, so look a little deeper
+ if dev.lvs:
+ for lv in cast(List[Dict[str, str]], dev.lvs):
+ if lv.get('osd_id', ''):
+ lv_fsid = lv.get('cluster_fsid')
+ if lv_fsid != self._cluster_fsid:
+ raise OrchestratorError(
+ f"device {path} has lv's from a different Ceph cluster ({lv_fsid})")
+ osd_id_list.append(lv.get('osd_id', ''))
+ path_found = True
+ break
+ if not path_found:
+ raise OrchestratorError(
+ f"Device path '{path}' not found on host '{host}'")
+
+ if osd_id_list:
+ dev_name = os.path.basename(path)
+ active_osds: List[str] = []
+ for osd_id in osd_id_list:
+ metadata = self.get_metadata('osd', str(osd_id))
+ if metadata:
+ if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','):
+ active_osds.append("osd." + osd_id)
+ if active_osds:
+ raise OrchestratorError(
+ f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
+ f"OSD{'s' if len(active_osds) > 1 else ''}"
+ f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
+
+ out, err, code = CephadmServe(self)._run_cephadm(
+ host, 'osd', 'ceph-volume',
+ ['--', 'lvm', 'zap', '--destroy', path],
+ error_ok=True)
+
+ self.cache.invalidate_host_devices(host)
+ if code:
+ raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
+ msg = f'zap successful for {path} on {host}'
+ self.log.info(msg)
+
+ return msg + '\n'
+
+ @handle_orch_error
+ def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
+ """
+ Blink a device light. Calling something like::
+
+ lsmcli local-disk-ident-led-on --path $path
+
+ If you must, you can customize this via::
+
+ ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
+ ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
+
+ See templates/blink_device_light_cmd.j2
+ """
+ @forall_hosts
+ def blink(host: str, dev: str, path: str) -> str:
+ cmd_line = self.template.render('blink_device_light_cmd.j2',
+ {
+ 'on': on,
+ 'ident_fault': ident_fault,
+ 'dev': dev,
+ 'path': path
+ },
+ host=host)
+ cmd_args = shlex.split(cmd_line)
+
+ out, err, code = CephadmServe(self)._run_cephadm(
+ host, 'osd', 'shell', ['--'] + cmd_args,
+ error_ok=True)
+ if code:
+ raise OrchestratorError(
+ 'Unable to affect %s light for %s:%s. Command: %s' % (
+ ident_fault, host, dev, ' '.join(cmd_args)))
+ self.log.info('Set %s light for %s:%s %s' % (
+ ident_fault, host, dev, 'on' if on else 'off'))
+ return "Set %s light for %s:%s %s" % (
+ ident_fault, host, dev, 'on' if on else 'off')
+
+ return blink(locs)
+
+ def get_osd_uuid_map(self, only_up=False):
+ # type: (bool) -> Dict[str, str]
+ osd_map = self.get('osd_map')
+ r = {}
+ for o in osd_map['osds']:
+ # only include OSDs that have ever started in this map. this way
+ # an interrupted osd create can be repeated and succeed the second
+ # time around.
+ osd_id = o.get('osd')
+ if osd_id is None:
+ raise OrchestratorError("Could not retrieve osd_id from osd_map")
+ if not only_up:
+ r[str(osd_id)] = o.get('uuid', '')
+ return r
+
+ def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]:
+ osd = [x for x in self.get('osd_map')['osds']
+ if x['osd'] == osd_id]
+
+ if len(osd) != 1:
+ return None
+
+ return osd[0]
+
+ def _trigger_preview_refresh(self,
+ specs: Optional[List[DriveGroupSpec]] = None,
+ service_name: Optional[str] = None,
+ ) -> None:
+ # Only trigger a refresh when a spec has changed
+ trigger_specs = []
+ if specs:
+ for spec in specs:
+ preview_spec = self.spec_store.spec_preview.get(spec.service_name())
+ # the to-be-preview spec != the actual spec, this means we need to
+ # trigger a refresh, if the spec has been removed (==None) we need to
+ # refresh as well.
+ if not preview_spec or spec != preview_spec:
+ trigger_specs.append(spec)
+ if service_name:
+ trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
+ if not any(trigger_specs):
+ return None
+
+ refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
+ for host in refresh_hosts:
+ self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
+ self.cache.osdspec_previews_refresh_queue.append(host)
+
+ @handle_orch_error
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
+ """
+ Deprecated. Please use `apply()` instead.
+
+ Keeping this around to be compapatible to mgr/dashboard
+ """
+ return [self._apply(spec) for spec in specs]
+
+ @handle_orch_error
+ def create_osds(self, drive_group: DriveGroupSpec) -> str:
+ hosts: List[HostSpec] = self.inventory.all_specs()
+ filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
+ if not filtered_hosts:
+ return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
+ return self.osd_service.create_from_spec(drive_group)
+
+ def _preview_osdspecs(self,
+ osdspecs: Optional[List[DriveGroupSpec]] = None
+ ) -> dict:
+ if not osdspecs:
+ return {'n/a': [{'error': True,
+ 'message': 'No OSDSpec or matching hosts found.'}]}
+ matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
+ if not matching_hosts:
+ return {'n/a': [{'error': True,
+ 'message': 'No OSDSpec or matching hosts found.'}]}
+ # Is any host still loading previews or still in the queue to be previewed
+ pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
+ if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
+ # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
+ return {'n/a': [{'error': True,
+ 'message': 'Preview data is being generated.. '
+ 'Please re-run this command in a bit.'}]}
+ # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
+ previews_for_specs = {}
+ for host, raw_reports in self.cache.osdspec_previews.items():
+ if host not in matching_hosts:
+ continue
+ osd_reports = []
+ for osd_report in raw_reports:
+ if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
+ osd_reports.append(osd_report)
+ previews_for_specs.update({host: osd_reports})
+ return previews_for_specs
+
+ def _calc_daemon_deps(self,
+ spec: Optional[ServiceSpec],
+ daemon_type: str,
+ daemon_id: str) -> List[str]:
+ deps = []
+ if daemon_type == 'haproxy':
+ # because cephadm creates new daemon instances whenever
+ # port or ip changes, identifying daemons by name is
+ # sufficient to detect changes.
+ if not spec:
+ return []
+ ingress_spec = cast(IngressSpec, spec)
+ assert ingress_spec.backend_service
+ daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service)
+ deps = [d.name() for d in daemons]
+ elif daemon_type == 'keepalived':
+ # because cephadm creates new daemon instances whenever
+ # port or ip changes, identifying daemons by name is
+ # sufficient to detect changes.
+ if not spec:
+ return []
+ daemons = self.cache.get_daemons_by_service(spec.service_name())
+ deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
+ elif daemon_type == 'iscsi':
+ if spec:
+ iscsi_spec = cast(IscsiServiceSpec, spec)
+ deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)]
+ else:
+ deps = [self.get_mgr_ip()]
+ else:
+ need = {
+ 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
+ 'grafana': ['prometheus'],
+ 'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
+ }
+ for dep_type in need.get(daemon_type, []):
+ for dd in self.cache.get_daemons_by_type(dep_type):
+ deps.append(dd.name())
+ if daemon_type == 'prometheus':
+ deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
+ return sorted(deps)
+
+ @forall_hosts
+ def _remove_daemons(self, name: str, host: str) -> str:
+ return CephadmServe(self)._remove_daemon(name, host)
+
+ def _check_pool_exists(self, pool: str, service_name: str) -> None:
+ logger.info(f'Checking pool "{pool}" exists for service {service_name}')
+ if not self.rados.pool_exists(pool):
+ raise OrchestratorError(f'Cannot find pool "{pool}" for '
+ f'service {service_name}')
+
+ def _add_daemon(self,
+ daemon_type: str,
+ spec: ServiceSpec) -> List[str]:
+ """
+ Add (and place) a daemon. Require explicit host placement. Do not
+ schedule, and do not apply the related scheduling limitations.
+ """
+ if spec.service_name() not in self.spec_store:
+ raise OrchestratorError('Unable to add a Daemon without Service.\n'
+ 'Please use `ceph orch apply ...` to create a Service.\n'
+ 'Note, you might want to create the service with "unmanaged=true"')
+
+ self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
+ if not spec.placement.hosts:
+ raise OrchestratorError('must specify host(s) to deploy on')
+ count = spec.placement.count or len(spec.placement.hosts)
+ daemons = self.cache.get_daemons_by_service(spec.service_name())
+ return self._create_daemons(daemon_type, spec, daemons,
+ spec.placement.hosts, count)
+
+ def _create_daemons(self,
+ daemon_type: str,
+ spec: ServiceSpec,
+ daemons: List[DaemonDescription],
+ hosts: List[HostPlacementSpec],
+ count: int) -> List[str]:
+ if count > len(hosts):
+ raise OrchestratorError('too few hosts: want %d, have %s' % (
+ count, hosts))
+
+ did_config = False
+ service_type = daemon_type_to_service(daemon_type)
+
+ args = [] # type: List[CephadmDaemonDeploySpec]
+ for host, network, name in hosts:
+ daemon_id = self.get_unique_name(daemon_type, host, daemons,
+ prefix=spec.service_id,
+ forcename=name)
+
+ if not did_config:
+ self.cephadm_services[service_type].config(spec)
+ did_config = True
+
+ daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
+ host, daemon_id, network, spec,
+ # NOTE: this does not consider port conflicts!
+ ports=spec.get_port_start())
+ self.log.debug('Placing %s.%s on host %s' % (
+ daemon_type, daemon_id, host))
+ args.append(daemon_spec)
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=host,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ )
+ daemons.append(sd)
+
+ @ forall_hosts
+ def create_func_map(*args: Any) -> str:
+ daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
+ return CephadmServe(self)._create_daemon(daemon_spec)
+
+ return create_func_map(args)
+
+ @handle_orch_error
+ def add_daemon(self, spec: ServiceSpec) -> List[str]:
+ ret: List[str] = []
+ try:
+ with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True):
+ for d_type in service_to_daemon_types(spec.service_type):
+ ret.extend(self._add_daemon(d_type, spec))
+ return ret
+ except OrchestratorError as e:
+ self.events.from_orch_error(e)
+ raise
+
+ @handle_orch_error
+ def apply_mon(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ def _apply(self, spec: GenericSpec) -> str:
+ if spec.service_type == 'host':
+ return self._add_host(cast(HostSpec, spec))
+
+ if spec.service_type == 'osd':
+ # _trigger preview refresh needs to be smart and
+ # should only refresh if a change has been detected
+ self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
+
+ return self._apply_service_spec(cast(ServiceSpec, spec))
+
+ def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
+ self.health_checks[name] = {
+ 'severity': 'warning',
+ 'summary': summary,
+ 'count': count,
+ 'detail': detail,
+ }
+ self.set_health_checks(self.health_checks)
+
+ def remove_health_warning(self, name: str) -> None:
+ if name in self.health_checks:
+ del self.health_checks[name]
+ self.set_health_checks(self.health_checks)
+
+ def _plan(self, spec: ServiceSpec) -> dict:
+ if spec.service_type == 'osd':
+ return {'service_name': spec.service_name(),
+ 'service_type': spec.service_type,
+ 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
+
+ svc = self.cephadm_services[spec.service_type]
+ ha = HostAssignment(
+ spec=spec,
+ hosts=self._schedulable_hosts(),
+ unreachable_hosts=self._unreachable_hosts(),
+ networks=self.cache.networks,
+ daemons=self.cache.get_daemons_by_service(spec.service_name()),
+ allow_colo=svc.allow_colo(),
+ rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
+ )
+ ha.validate()
+ hosts, to_add, to_remove = ha.place()
+
+ return {
+ 'service_name': spec.service_name(),
+ 'service_type': spec.service_type,
+ 'add': [hs.hostname for hs in to_add],
+ 'remove': [d.name() for d in to_remove]
+ }
+
+ @handle_orch_error
+ def plan(self, specs: Sequence[GenericSpec]) -> List:
+ results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
+ 'to the current inventory setup. If any of these conditions change, the \n'
+ 'preview will be invalid. Please make sure to have a minimal \n'
+ 'timeframe between planning and applying the specs.'}]
+ if any([spec.service_type == 'host' for spec in specs]):
+ return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
+ for spec in specs:
+ results.append(self._plan(cast(ServiceSpec, spec)))
+ return results
+
+ def _apply_service_spec(self, spec: ServiceSpec) -> str:
+ if spec.placement.is_empty():
+ # fill in default placement
+ defaults = {
+ 'mon': PlacementSpec(count=5),
+ 'mgr': PlacementSpec(count=2),
+ 'mds': PlacementSpec(count=2),
+ 'rgw': PlacementSpec(count=2),
+ 'ingress': PlacementSpec(count=2),
+ 'iscsi': PlacementSpec(count=1),
+ 'rbd-mirror': PlacementSpec(count=2),
+ 'cephfs-mirror': PlacementSpec(count=1),
+ 'nfs': PlacementSpec(count=1),
+ 'grafana': PlacementSpec(count=1),
+ 'alertmanager': PlacementSpec(count=1),
+ 'prometheus': PlacementSpec(count=1),
+ 'node-exporter': PlacementSpec(host_pattern='*'),
+ 'crash': PlacementSpec(host_pattern='*'),
+ 'container': PlacementSpec(count=1),
+ 'cephadm-exporter': PlacementSpec(host_pattern='*'),
+ 'snmp-gateway': PlacementSpec(count=1),
+ }
+ spec.placement = defaults[spec.service_type]
+ elif spec.service_type in ['mon', 'mgr'] and \
+ spec.placement.count is not None and \
+ spec.placement.count < 1:
+ raise OrchestratorError('cannot scale %s service below 1' % (
+ spec.service_type))
+
+ host_count = len(self.inventory.keys())
+ max_count = self.max_count_per_host
+
+ if spec.placement.count is not None:
+ if spec.service_type in ['mon', 'mgr']:
+ if spec.placement.count > max(5, host_count):
+ raise OrchestratorError(
+ (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
+ elif spec.service_type != 'osd':
+ if spec.placement.count > (max_count * host_count):
+ raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
+ + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
+
+ if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd':
+ raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.'
+ + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
+
+ HostAssignment(
+ spec=spec,
+ hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
+ unreachable_hosts=self._unreachable_hosts(),
+ networks=self.cache.networks,
+ daemons=self.cache.get_daemons_by_service(spec.service_name()),
+ allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
+ ).validate()
+
+ self.log.info('Saving service %s spec with placement %s' % (
+ spec.service_name(), spec.placement.pretty_str()))
+ self.spec_store.save(spec)
+ self._kick_serve_loop()
+ return "Scheduled %s update..." % spec.service_name()
+
+ @handle_orch_error
+ def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
+ results = []
+ for spec in specs:
+ if no_overwrite:
+ if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory:
+ results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
+ % (cast(HostSpec, spec).hostname, spec.service_type))
+ continue
+ elif cast(ServiceSpec, spec).service_name() in self.spec_store:
+ results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
+ % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
+ continue
+ results.append(self._apply(spec))
+ return results
+
+ @handle_orch_error
+ def apply_mgr(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_mds(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_rgw(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_ingress(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_iscsi(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_nfs(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ def _get_dashboard_url(self):
+ # type: () -> str
+ return self.get('mgr_map').get('services', {}).get('dashboard', '')
+
+ @handle_orch_error
+ def apply_prometheus(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_node_exporter(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_crash(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_grafana(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_alertmanager(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_container(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_snmp_gateway(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
+ @handle_orch_error
+ def upgrade_check(self, image: str, version: str) -> str:
+ if self.inventory.get_host_with_state("maintenance"):
+ raise OrchestratorError("check aborted - you have hosts in maintenance state")
+
+ if version:
+ target_name = self.container_image_base + ':v' + version
+ elif image:
+ target_name = image
+ else:
+ raise OrchestratorError('must specify either image or version')
+
+ image_info = CephadmServe(self)._get_container_image_info(target_name)
+
+ ceph_image_version = image_info.ceph_version
+ if not ceph_image_version:
+ return f'Unable to extract ceph version from {target_name}.'
+ if ceph_image_version.startswith('ceph version '):
+ ceph_image_version = ceph_image_version.split(' ')[2]
+ version_error = self.upgrade._check_target_version(ceph_image_version)
+ if version_error:
+ return f'Incompatible upgrade: {version_error}'
+
+ self.log.debug(f'image info {image} -> {image_info}')
+ r: dict = {
+ 'target_name': target_name,
+ 'target_id': image_info.image_id,
+ 'target_version': image_info.ceph_version,
+ 'needs_update': dict(),
+ 'up_to_date': list(),
+ 'non_ceph_image_daemons': list()
+ }
+ for host, dm in self.cache.daemons.items():
+ for name, dd in dm.items():
+ if image_info.image_id == dd.container_image_id:
+ r['up_to_date'].append(dd.name())
+ elif dd.daemon_type in CEPH_IMAGE_TYPES:
+ r['needs_update'][dd.name()] = {
+ 'current_name': dd.container_image_name,
+ 'current_id': dd.container_image_id,
+ 'current_version': dd.version,
+ }
+ else:
+ r['non_ceph_image_daemons'].append(dd.name())
+ if self.use_repo_digest and image_info.repo_digests:
+ # FIXME: we assume the first digest is the best one to use
+ r['target_digest'] = image_info.repo_digests[0]
+
+ return json.dumps(r, indent=4, sort_keys=True)
+
+ @handle_orch_error
+ def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
+ return self.upgrade.upgrade_status()
+
+ @handle_orch_error
+ def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict[Any, Any]:
+ return self.upgrade.upgrade_ls(image, tags)
+
+ @handle_orch_error
+ def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
+ services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
+ if self.inventory.get_host_with_state("maintenance"):
+ raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
+ if daemon_types is not None and services is not None:
+ raise OrchestratorError('--daemon-types and --services are mutually exclusive')
+ if daemon_types is not None:
+ for dtype in daemon_types:
+ if dtype not in CEPH_UPGRADE_ORDER:
+ raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
+ f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
+ if services is not None:
+ for service in services:
+ if service not in self.spec_store:
+ raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
+ f'Known services are: {self.spec_store.all_specs.keys()}')
+ hosts: Optional[List[str]] = None
+ if host_placement is not None:
+ all_hosts = list(self.inventory.all_specs())
+ placement = PlacementSpec.from_string(host_placement)
+ hosts = placement.filter_matching_hostspecs(all_hosts)
+ if not hosts:
+ raise OrchestratorError(
+ f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
+
+ if limit is not None:
+ if limit < 1:
+ raise OrchestratorError(f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
+
+ return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
+
+ @handle_orch_error
+ def upgrade_pause(self) -> str:
+ return self.upgrade.upgrade_pause()
+
+ @handle_orch_error
+ def upgrade_resume(self) -> str:
+ return self.upgrade.upgrade_resume()
+
+ @handle_orch_error
+ def upgrade_stop(self) -> str:
+ return self.upgrade.upgrade_stop()
+
+ @handle_orch_error
+ def remove_osds(self, osd_ids: List[str],
+ replace: bool = False,
+ force: bool = False,
+ zap: bool = False) -> str:
+ """
+ Takes a list of OSDs and schedules them for removal.
+ The function that takes care of the actual removal is
+ process_removal_queue().
+ """
+
+ daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
+ to_remove_daemons = list()
+ for daemon in daemons:
+ if daemon.daemon_id in osd_ids:
+ to_remove_daemons.append(daemon)
+ if not to_remove_daemons:
+ return f"Unable to find OSDs: {osd_ids}"
+
+ for daemon in to_remove_daemons:
+ assert daemon.daemon_id is not None
+ try:
+ self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
+ replace=replace,
+ force=force,
+ zap=zap,
+ hostname=daemon.hostname,
+ process_started_at=datetime_now(),
+ remove_util=self.to_remove_osds.rm_util))
+ except NotFoundError:
+ return f"Unable to find OSDs: {osd_ids}"
+
+ # trigger the serve loop to initiate the removal
+ self._kick_serve_loop()
+ return "Scheduled OSD(s) for removal"
+
+ @handle_orch_error
+ def stop_remove_osds(self, osd_ids: List[str]) -> str:
+ """
+ Stops a `removal` process for a List of OSDs.
+ This will revert their weight and remove it from the osds_to_remove queue
+ """
+ for osd_id in osd_ids:
+ try:
+ self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
+ remove_util=self.to_remove_osds.rm_util))
+ except (NotFoundError, KeyError, ValueError):
+ return f'Unable to find OSD in the queue: {osd_id}'
+
+ # trigger the serve loop to halt the removal
+ self._kick_serve_loop()
+ return "Stopped OSD(s) removal"
+
+ @handle_orch_error
+ def remove_osds_status(self) -> List[OSD]:
+ """
+ The CLI call to retrieve an osd removal report
+ """
+ return self.to_remove_osds.all_osds()
+
+ @handle_orch_error
+ def drain_host(self, hostname, force=False):
+ # type: (str, bool) -> str
+ """
+ Drain all daemons from a host.
+ :param host: host name
+ """
+
+ # if we drain the last admin host we could end up removing the only instance
+ # of the config and keyring and cause issues
+ if not force:
+ p = PlacementSpec(label='_admin')
+ admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
+ if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
+ raise OrchestratorValidationError(f"Host {hostname} is the last host with the '_admin'"
+ " label.\nDraining this host could cause the removal"
+ " of the last cluster config/keyring managed by cephadm.\n"
+ "It is recommended to add the _admin label to another host"
+ " before completing this operation.\nIf you're certain this is"
+ " what you want rerun this command with --force.")
+
+ self.add_host_label(hostname, '_no_schedule')
+
+ daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
+
+ osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
+ self.remove_osds(osds_to_remove)
+
+ daemons_table = ""
+ daemons_table += "{:<20} {:<15}\n".format("type", "id")
+ daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
+ for d in daemons:
+ daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
+
+ return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table)
+
+ def trigger_connect_dashboard_rgw(self) -> None:
+ self.need_connect_dashboard_rgw = True
+ self.event.set()