import json import errno import ipaddress import logging import re import shlex from collections import defaultdict from configparser import ConfigParser from functools import wraps from tempfile import TemporaryDirectory from threading import Event import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type import datetime import os import random import tempfile import multiprocessing.pool import subprocess from prettytable import PrettyTable from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ ServiceSpec, PlacementSpec, \ HostPlacementSpec, IngressSpec, IscsiServiceSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonDeploySpec from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType from mgr_util import create_self_signed_cert import secrets import orchestrator from orchestrator.module import to_format, Format from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \ service_to_daemon_types from orchestrator._interface import GenericSpec from orchestrator._interface import daemon_type_to_service from . import remotes from . import utils from .migrations import Migrations from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ RbdMirrorService, CrashService, CephadmService, CephfsMirrorService from .services.ingress import IngressService from .services.container import CustomContainerService from .services.iscsi import IscsiService from .services.nfs import NFSService from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService, SNMPGatewayService from .services.exporter import CephadmExporter, CephadmExporterConfig from .schedule import HostAssignment from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec from .upgrade import CephadmUpgrade from .template import TemplateMgr from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \ cephadmNoImage, CEPH_UPGRADE_ORDER from .configchecks import CephadmConfigChecks from .offline_watcher import OfflineHostWatcher try: import remoto # NOTE(mattoliverau) Patch remoto until remoto PR # (https://github.com/alfredodeza/remoto/pull/56) lands from distutils.version import StrictVersion if StrictVersion(remoto.__version__) <= StrictVersion('1.2'): def remoto_has_connection(self: Any) -> bool: return self.gateway.hasreceiver() from remoto.backends import BaseConnection BaseConnection.has_connection = remoto_has_connection import remoto.process except ImportError as e: remoto = None remoto_import_error = str(e) logger = logging.getLogger(__name__) T = TypeVar('T') DEFAULT_SSH_CONFIG = """ Host * User root StrictHostKeyChecking no UserKnownHostsFile /dev/null ConnectTimeout=30 """ # Default container images ----------------------------------------------------- DEFAULT_IMAGE = 'quay.io/ceph/ceph' DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.33.4' DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.3.1' DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.23.0' DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:8.3.5' DEFAULT_HAPROXY_IMAGE = 'docker.io/library/haproxy:2.3' DEFAULT_KEEPALIVED_IMAGE = 'docker.io/arcts/keepalived' DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1' # ------------------------------------------------------------------------------ def service_inactive(spec_name: str) -> Callable: def inner(func: Callable) -> Callable: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: obj = args[0] if obj.get_store(f"spec.{spec_name}") is not None: return 1, "", f"Unable to change configuration of an active service {spec_name}" return func(*args, **kwargs) return wrapper return inner def host_exists(hostname_position: int = 1) -> Callable: """Check that a hostname exists in the inventory""" def inner(func: Callable) -> Callable: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: this = args[0] # self object hostname = args[hostname_position] if hostname not in this.cache.get_hosts(): candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)]) help_msg = f"Did you mean {candidates}?" if candidates else "" raise OrchestratorError( f"Cannot find host '{hostname}' in the inventory. {help_msg}") return func(*args, **kwargs) return wrapper return inner class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, metaclass=CLICommandMeta): _STORE_HOST_PREFIX = "host" instance = None NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary] NATIVE_OPTIONS = [] # type: List[Any] MODULE_OPTIONS = [ Option( 'ssh_config_file', type='str', default=None, desc='customized SSH config file to connect to managed hosts', ), Option( 'device_cache_timeout', type='secs', default=30 * 60, desc='seconds to cache device inventory', ), Option( 'device_enhanced_scan', type='bool', default=False, desc='Use libstoragemgmt during device scans', ), Option( 'daemon_cache_timeout', type='secs', default=10 * 60, desc='seconds to cache service (daemon) inventory', ), Option( 'facts_cache_timeout', type='secs', default=1 * 60, desc='seconds to cache host facts data', ), Option( 'host_check_interval', type='secs', default=10 * 60, desc='how frequently to perform a host check', ), Option( 'mode', type='str', enum_allowed=['root', 'cephadm-package'], default='root', desc='mode for remote execution of cephadm', ), Option( 'container_image_base', default=DEFAULT_IMAGE, desc='Container image name, without the tag', runtime=True, ), Option( 'container_image_prometheus', default=DEFAULT_PROMETHEUS_IMAGE, desc='Prometheus container image', ), Option( 'container_image_grafana', default=DEFAULT_GRAFANA_IMAGE, desc='Prometheus container image', ), Option( 'container_image_alertmanager', default=DEFAULT_ALERT_MANAGER_IMAGE, desc='Prometheus container image', ), Option( 'container_image_node_exporter', default=DEFAULT_NODE_EXPORTER_IMAGE, desc='Prometheus container image', ), Option( 'container_image_haproxy', default=DEFAULT_HAPROXY_IMAGE, desc='HAproxy container image', ), Option( 'container_image_keepalived', default=DEFAULT_KEEPALIVED_IMAGE, desc='Keepalived container image', ), Option( 'container_image_snmp_gateway', default=DEFAULT_SNMP_GATEWAY_IMAGE, desc='SNMP Gateway container image', ), Option( 'warn_on_stray_hosts', type='bool', default=True, desc='raise a health warning if daemons are detected on a host ' 'that is not managed by cephadm', ), Option( 'warn_on_stray_daemons', type='bool', default=True, desc='raise a health warning if daemons are detected ' 'that are not managed by cephadm', ), Option( 'warn_on_failed_host_check', type='bool', default=True, desc='raise a health warning if the host check fails', ), Option( 'log_to_cluster', type='bool', default=True, desc='log to the "cephadm" cluster log channel"', ), Option( 'allow_ptrace', type='bool', default=False, desc='allow SYS_PTRACE capability on ceph containers', long_desc='The SYS_PTRACE capability is needed to attach to a ' 'process with gdb or strace. Enabling this options ' 'can allow debugging daemons that encounter problems ' 'at runtime.', ), Option( 'container_init', type='bool', default=True, desc='Run podman/docker with `--init`' ), Option( 'prometheus_alerts_path', type='str', default='/etc/prometheus/ceph/ceph_default_alerts.yml', desc='location of alerts to include in prometheus deployments', ), Option( 'migration_current', type='int', default=None, desc='internal - do not modify', # used to track track spec and other data migrations. ), Option( 'config_dashboard', type='bool', default=True, desc='manage configs like API endpoints in Dashboard.' ), Option( 'manage_etc_ceph_ceph_conf', type='bool', default=False, desc='Manage and own /etc/ceph/ceph.conf on the hosts.', ), Option( 'manage_etc_ceph_ceph_conf_hosts', type='str', default='*', desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf', ), # not used anymore Option( 'registry_url', type='str', default=None, desc='Registry url for login purposes. This is not the default registry' ), Option( 'registry_username', type='str', default=None, desc='Custom repository username. Only used for logging into a registry.' ), Option( 'registry_password', type='str', default=None, desc='Custom repository password. Only used for logging into a registry.' ), #### Option( 'registry_insecure', type='bool', default=False, desc='Registry is to be considered insecure (no TLS available). Only for development purposes.' ), Option( 'use_repo_digest', type='bool', default=True, desc='Automatically convert image tags to image digest. Make sure all daemons use the same image', ), Option( 'config_checks_enabled', type='bool', default=False, desc='Enable or disable the cephadm configuration analysis', ), Option( 'default_registry', type='str', default='docker.io', desc='Search-registry to which we should normalize unqualified image names. ' 'This is not the default registry', ), Option( 'max_count_per_host', type='int', default=10, desc='max number of daemons per service per host', ), Option( 'autotune_memory_target_ratio', type='float', default=.7, desc='ratio of total system memory to divide amongst autotuned daemons' ), Option( 'autotune_interval', type='secs', default=10 * 60, desc='how frequently to autotune daemon memory' ), Option( 'max_osd_draining_count', type='int', default=10, desc='max number of osds that will be drained simultaneously when osds are removed' ), ] def __init__(self, *args: Any, **kwargs: Any): super(CephadmOrchestrator, self).__init__(*args, **kwargs) self._cluster_fsid: str = self.get('mon_map')['fsid'] self.last_monmap: Optional[datetime.datetime] = None # for serve() self.run = True self.event = Event() if self.get_store('pause'): self.paused = True else: self.paused = False # for mypy which does not run the code if TYPE_CHECKING: self.ssh_config_file = None # type: Optional[str] self.device_cache_timeout = 0 self.daemon_cache_timeout = 0 self.facts_cache_timeout = 0 self.host_check_interval = 0 self.max_count_per_host = 0 self.mode = '' self.container_image_base = '' self.container_image_prometheus = '' self.container_image_grafana = '' self.container_image_alertmanager = '' self.container_image_node_exporter = '' self.container_image_haproxy = '' self.container_image_keepalived = '' self.container_image_snmp_gateway = '' self.warn_on_stray_hosts = True self.warn_on_stray_daemons = True self.warn_on_failed_host_check = True self.allow_ptrace = False self.container_init = True self.prometheus_alerts_path = '' self.migration_current: Optional[int] = None self.config_dashboard = True self.manage_etc_ceph_ceph_conf = True self.manage_etc_ceph_ceph_conf_hosts = '*' self.registry_url: Optional[str] = None self.registry_username: Optional[str] = None self.registry_password: Optional[str] = None self.registry_insecure: bool = False self.use_repo_digest = True self.default_registry = '' self.autotune_memory_target_ratio = 0.0 self.autotune_interval = 0 self.apply_spec_fails: List[Tuple[str, str]] = [] self.max_osd_draining_count = 10 self.device_enhanced_scan = False self._cons: Dict[str, Tuple[remoto.backends.BaseConnection, remoto.backends.LegacyModuleExecute]] = {} self.notify(NotifyType.mon_map, None) self.config_notify() path = self.get_ceph_option('cephadm_path') try: assert isinstance(path, str) with open(path, 'r') as f: self._cephadm = f.read() except (IOError, TypeError) as e: raise RuntimeError("unable to read cephadm at '%s': %s" % ( path, str(e))) self.cephadm_binary_path = self._get_cephadm_binary_path() self._worker_pool = multiprocessing.pool.ThreadPool(10) self._reconfig_ssh() CephadmOrchestrator.instance = self self.upgrade = CephadmUpgrade(self) self.health_checks: Dict[str, dict] = {} self.inventory = Inventory(self) self.cache = HostCache(self) self.cache.load() self.to_remove_osds = OSDRemovalQueue(self) self.to_remove_osds.load_from_store() self.spec_store = SpecStore(self) self.spec_store.load() self.keys = ClientKeyringStore(self) self.keys.load() # ensure the host lists are in sync for h in self.inventory.keys(): if h not in self.cache.daemons: self.cache.prime_empty_host(h) for h in self.cache.get_hosts(): if h not in self.inventory: self.cache.rm_host(h) # in-memory only. self.events = EventStore(self) self.offline_hosts: Set[str] = set() self.migration = Migrations(self) _service_clses: Sequence[Type[CephadmService]] = [ OSDService, NFSService, MonService, MgrService, MdsService, RgwService, RbdMirrorService, GrafanaService, AlertmanagerService, PrometheusService, NodeExporterService, CrashService, IscsiService, IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService, SNMPGatewayService, ] # https://github.com/python/mypy/issues/8993 self.cephadm_services: Dict[str, CephadmService] = { cls.TYPE: cls(self) for cls in _service_clses} # type: ignore self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr']) self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd']) self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi']) self.template = TemplateMgr(self) self.requires_post_actions: Set[str] = set() self.need_connect_dashboard_rgw = False self.config_checker = CephadmConfigChecks(self) self.offline_watcher = OfflineHostWatcher(self) self.offline_watcher.start() def shutdown(self) -> None: self.log.debug('shutdown') self._worker_pool.close() self._worker_pool.join() self.offline_watcher.shutdown() self.run = False self.event.set() def _get_cephadm_service(self, service_type: str) -> CephadmService: assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES return self.cephadm_services[service_type] def _get_cephadm_binary_path(self) -> str: import hashlib m = hashlib.sha256() m.update(self._cephadm.encode()) return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}' def _kick_serve_loop(self) -> None: self.log.debug('_kick_serve_loop') self.event.set() def serve(self) -> None: """ The main loop of cephadm. A command handler will typically change the declarative state of cephadm. This loop will then attempt to apply this new state. """ serve = CephadmServe(self) serve.serve() def set_container_image(self, entity: str, image: str) -> None: self.check_mon_command({ 'prefix': 'config set', 'name': 'container_image', 'value': image, 'who': entity, }) def config_notify(self) -> None: """ This method is called whenever one of our config options is changed. TODO: this method should be moved into mgr_module.py """ for opt in self.MODULE_OPTIONS: setattr(self, opt['name'], # type: ignore self.get_module_option(opt['name'])) # type: ignore self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore for opt in self.NATIVE_OPTIONS: setattr(self, opt, # type: ignore self.get_ceph_option(opt)) self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore self.event.set() def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None: if notify_type == NotifyType.mon_map: # get monmap mtime so we can refresh configs when mons change monmap = self.get('mon_map') self.last_monmap = str_to_datetime(monmap['modified']) if self.last_monmap and self.last_monmap > datetime_now(): self.last_monmap = None # just in case clocks are skewed if getattr(self, 'manage_etc_ceph_ceph_conf', False): # getattr, due to notify() being called before config_notify() self._kick_serve_loop() if notify_type == NotifyType.pg_summary: self._trigger_osd_removal() def _trigger_osd_removal(self) -> None: remove_queue = self.to_remove_osds.as_osd_ids() if not remove_queue: return data = self.get("osd_stats") for osd in data.get('osd_stats', []): if osd.get('num_pgs') == 0: # if _ANY_ osd that is currently in the queue appears to be empty, # start the removal process if int(osd.get('osd')) in remove_queue: self.log.debug('Found empty osd. Starting removal process') # if the osd that is now empty is also part of the removal queue # start the process self._kick_serve_loop() def pause(self) -> None: if not self.paused: self.log.info('Paused') self.set_store('pause', 'true') self.paused = True # wake loop so we update the health status self._kick_serve_loop() def resume(self) -> None: if self.paused: self.log.info('Resumed') self.paused = False self.set_store('pause', None) # unconditionally wake loop so that 'orch resume' can be used to kick # cephadm self._kick_serve_loop() def get_unique_name( self, daemon_type: str, host: str, existing: List[orchestrator.DaemonDescription], prefix: Optional[str] = None, forcename: Optional[str] = None, rank: Optional[int] = None, rank_generation: Optional[int] = None, ) -> str: """ Generate a unique random service name """ suffix = daemon_type not in [ 'mon', 'crash', 'prometheus', 'node-exporter', 'grafana', 'alertmanager', 'container', 'cephadm-exporter', 'snmp-gateway' ] if forcename: if len([d for d in existing if d.daemon_id == forcename]): raise orchestrator.OrchestratorValidationError( f'name {daemon_type}.{forcename} already in use') return forcename if '.' in host: host = host.split('.')[0] while True: if prefix: name = prefix + '.' else: name = '' if rank is not None and rank_generation is not None: name += f'{rank}.{rank_generation}.' name += host if suffix: name += '.' + ''.join(random.choice(string.ascii_lowercase) for _ in range(6)) if len([d for d in existing if d.daemon_id == name]): if not suffix: raise orchestrator.OrchestratorValidationError( f'name {daemon_type}.{name} already in use') self.log.debug('name %s exists, trying again', name) continue return name def _reconfig_ssh(self) -> None: temp_files = [] # type: list ssh_options = [] # type: List[str] # ssh_config ssh_config_fname = self.ssh_config_file ssh_config = self.get_store("ssh_config") if ssh_config is not None or ssh_config_fname is None: if not ssh_config: ssh_config = DEFAULT_SSH_CONFIG f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-') os.fchmod(f.fileno(), 0o600) f.write(ssh_config.encode('utf-8')) f.flush() # make visible to other processes temp_files += [f] ssh_config_fname = f.name if ssh_config_fname: self.validate_ssh_config_fname(ssh_config_fname) ssh_options += ['-F', ssh_config_fname] self.ssh_config = ssh_config # identity ssh_key = self.get_store("ssh_identity_key") ssh_pub = self.get_store("ssh_identity_pub") self.ssh_pub = ssh_pub self.ssh_key = ssh_key if ssh_key and ssh_pub: tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-') tkey.write(ssh_key.encode('utf-8')) os.fchmod(tkey.fileno(), 0o600) tkey.flush() # make visible to other processes tpub = open(tkey.name + '.pub', 'w') os.fchmod(tpub.fileno(), 0o600) tpub.write(ssh_pub) tpub.flush() # make visible to other processes temp_files += [tkey, tpub] ssh_options += ['-i', tkey.name] self._temp_files = temp_files ssh_options += ['-o', 'ServerAliveInterval=7', '-o', 'ServerAliveCountMax=3'] self._ssh_options = ' '.join(ssh_options) # type: Optional[str] if self.mode == 'root': self.ssh_user = self.get_store('ssh_user', default='root') elif self.mode == 'cephadm-package': self.ssh_user = 'cephadm' self._reset_cons() def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None: if ssh_config is None or len(ssh_config.strip()) == 0: raise OrchestratorValidationError('ssh_config cannot be empty') # StrictHostKeyChecking is [yes|no] ? res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config) if not res: raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking') for s in res: if 'ask' in s.lower(): raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') def validate_ssh_config_fname(self, ssh_config_fname: str) -> None: if not os.path.isfile(ssh_config_fname): raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( ssh_config_fname)) def _reset_con(self, host: str) -> None: conn, r = self._cons.get(host, (None, None)) if conn: self.log.debug('_reset_con close %s' % host) conn.exit() del self._cons[host] def _reset_cons(self) -> None: for host, conn_and_r in self._cons.items(): self.log.debug('_reset_cons close %s' % host) conn, r = conn_and_r conn.exit() self._cons = {} def update_watched_hosts(self) -> None: # currently, we are watching hosts with nfs daemons hosts_to_watch = [d.hostname for d in self.cache.get_daemons( ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES] self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None]))) def offline_hosts_remove(self, host: str) -> None: if host in self.offline_hosts: self.offline_hosts.remove(host) @staticmethod def can_run() -> Tuple[bool, str]: if remoto is not None: return True, "" else: return False, "loading remoto library:{}".format( remoto_import_error) def available(self) -> Tuple[bool, str, Dict[str, Any]]: """ The cephadm orchestrator is always available. """ ok, err = self.can_run() if not ok: return ok, err, {} if not self.ssh_key or not self.ssh_pub: return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {} # mypy is unable to determine type for _processes since it's private worker_count: int = self._worker_pool._processes # type: ignore ret = { "workers": worker_count, "paused": self.paused, } return True, err, ret def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None: self.set_store(what, new) self._reconfig_ssh() if self.cache.get_hosts(): # Can't check anything without hosts host = self.cache.get_hosts()[0] r = CephadmServe(self)._check_host(host) if r is not None: # connection failed reset user self.set_store(what, old) self._reconfig_ssh() raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host)) self.log.info(f'Set ssh {what}') @orchestrator._cli_write_command( prefix='cephadm set-ssh-config') def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Set the ssh_config file (use -i ) """ # Set an ssh_config file provided from stdin old = self.ssh_config if inbuf == old: return 0, "value unchanged", "" self.validate_ssh_config_content(inbuf) self._validate_and_set_ssh_val('ssh_config', inbuf, old) return 0, "", "" @orchestrator._cli_write_command('cephadm clear-ssh-config') def _clear_ssh_config(self) -> Tuple[int, str, str]: """ Clear the ssh_config file """ # Clear the ssh_config file provided from stdin self.set_store("ssh_config", None) self.ssh_config_tmp = None self.log.info('Cleared ssh_config') self._reconfig_ssh() return 0, "", "" @orchestrator._cli_read_command('cephadm get-ssh-config') def _get_ssh_config(self) -> HandleCommandResult: """ Returns the ssh config as used by cephadm """ if self.ssh_config_file: self.validate_ssh_config_fname(self.ssh_config_file) with open(self.ssh_config_file) as f: return HandleCommandResult(stdout=f.read()) ssh_config = self.get_store("ssh_config") if ssh_config: return HandleCommandResult(stdout=ssh_config) return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) @orchestrator._cli_write_command('cephadm generate-key') def _generate_key(self) -> Tuple[int, str, str]: """ Generate a cluster SSH key (if not present) """ if not self.ssh_pub or not self.ssh_key: self.log.info('Generating ssh key...') tmp_dir = TemporaryDirectory() path = tmp_dir.name + '/key' try: subprocess.check_call([ '/usr/bin/ssh-keygen', '-C', 'ceph-%s' % self._cluster_fsid, '-N', '', '-f', path ]) with open(path, 'r') as f: secret = f.read() with open(path + '.pub', 'r') as f: pub = f.read() finally: os.unlink(path) os.unlink(path + '.pub') tmp_dir.cleanup() self.set_store('ssh_identity_key', secret) self.set_store('ssh_identity_pub', pub) self._reconfig_ssh() return 0, '', '' @orchestrator._cli_write_command( 'cephadm set-priv-key') def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """Set cluster SSH private key (use -i )""" if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty private ssh key provided" old = self.ssh_key if inbuf == old: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old) self.log.info('Set ssh private key') return 0, "", "" @orchestrator._cli_write_command( 'cephadm set-pub-key') def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """Set cluster SSH public key (use -i )""" if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty public ssh key provided" old = self.ssh_pub if inbuf == old: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old) return 0, "", "" @orchestrator._cli_write_command( 'cephadm clear-key') def _clear_key(self) -> Tuple[int, str, str]: """Clear cluster SSH key""" self.set_store('ssh_identity_key', None) self.set_store('ssh_identity_pub', None) self._reconfig_ssh() self.log.info('Cleared cluster SSH key') return 0, '', '' @orchestrator._cli_read_command( 'cephadm get-pub-key') def _get_pub_key(self) -> Tuple[int, str, str]: """Show SSH public key for connecting to cluster hosts""" if self.ssh_pub: return 0, self.ssh_pub, '' else: return -errno.ENOENT, '', 'No cluster SSH key defined' @orchestrator._cli_read_command( 'cephadm get-user') def _get_user(self) -> Tuple[int, str, str]: """ Show user for SSHing to cluster hosts """ if self.ssh_user is None: return -errno.ENOENT, '', 'No cluster SSH user configured' else: return 0, self.ssh_user, '' @orchestrator._cli_read_command( 'cephadm set-user') def set_ssh_user(self, user: str) -> Tuple[int, str, str]: """ Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users """ current_user = self.ssh_user if user == current_user: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_user', user, current_user) current_ssh_config = self._get_ssh_config() new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout) self._set_ssh_config(new_ssh_config) msg = 'ssh user set to %s' % user if user != 'root': msg += '. sudo will be used' self.log.info(msg) return 0, msg, '' @orchestrator._cli_read_command( 'cephadm registry-login') def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Set custom registry login info by providing url, username and password or json file with login info (-i ) """ # if password not given in command line, get it through file input if not (url and username and password) and (inbuf is None or len(inbuf) == 0): return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments " "or -i ") elif (url and username and password): registry_json = {'url': url, 'username': username, 'password': password} else: assert isinstance(inbuf, str) registry_json = json.loads(inbuf) if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json: return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. " "Please setup json file as\n" "{\n" " \"url\": \"REGISTRY_URL\",\n" " \"username\": \"REGISTRY_USERNAME\",\n" " \"password\": \"REGISTRY_PASSWORD\"\n" "}\n") # verify login info works by attempting login on random host host = None for host_name in self.inventory.keys(): host = host_name break if not host: raise OrchestratorError('no hosts defined') r = CephadmServe(self)._registry_login(host, registry_json) if r is not None: return 1, '', r # if logins succeeded, store info self.log.debug("Host logins successful. Storing login info.") self.set_store('registry_credentials', json.dumps(registry_json)) # distribute new login info to all hosts self.cache.distribute_new_registry_login_info() return 0, "registry login scheduled", '' @orchestrator._cli_read_command('cephadm check-host') def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Check whether we can access and manage a remote host""" try: out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host', ['--expect-hostname', host], addr=addr, error_ok=True, no_fsid=True) if code: return 1, '', ('check-host failed:\n' + '\n'.join(err)) except OrchestratorError: self.log.exception(f"check-host failed for '{host}'") return 1, '', ('check-host failed:\n' + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.") # if we have an outstanding health alert for this host, give the # serve thread a kick if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: if item.startswith('host %s ' % host): self.event.set() return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) @orchestrator._cli_read_command( 'cephadm prepare-host') def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Prepare a remote host for use with cephadm""" out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host], addr=addr, error_ok=True, no_fsid=True) if code: return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) # if we have an outstanding health alert for this host, give the # serve thread a kick if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: if item.startswith('host %s ' % host): self.event.set() return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) @orchestrator._cli_write_command( prefix='cephadm set-extra-ceph-conf') def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult: """ Text that is appended to all daemon's ceph.conf. Mainly a workaround, till `config generate-minimal-conf` generates a complete ceph.conf. Warning: this is a dangerous operation. """ if inbuf: # sanity check. cp = ConfigParser() cp.read_string(inbuf, source='') self.set_store("extra_ceph_conf", json.dumps({ 'conf': inbuf, 'last_modified': datetime_to_str(datetime_now()) })) self.log.info('Set extra_ceph_conf') self._kick_serve_loop() return HandleCommandResult() @orchestrator._cli_read_command( 'cephadm get-extra-ceph-conf') def _get_extra_ceph_conf(self) -> HandleCommandResult: """ Get extra ceph conf that is appended """ return HandleCommandResult(stdout=self.extra_ceph_conf().conf) def _set_exporter_config(self, config: Dict[str, str]) -> None: self.set_store('exporter_config', json.dumps(config)) def _get_exporter_config(self) -> Dict[str, str]: cfg_str = self.get_store('exporter_config') return json.loads(cfg_str) if cfg_str else {} def _set_exporter_option(self, option: str, value: Optional[str] = None) -> None: kv_option = f'exporter_{option}' self.set_store(kv_option, value) def _get_exporter_option(self, option: str) -> Optional[str]: kv_option = f'exporter_{option}' return self.get_store(kv_option) @orchestrator._cli_write_command( prefix='cephadm generate-exporter-config') @service_inactive('cephadm-exporter') def _generate_exporter_config(self) -> Tuple[int, str, str]: """ Generate default SSL crt/key and token for cephadm exporter daemons """ self._set_exporter_defaults() self.log.info('Default settings created for cephadm exporter(s)') return 0, "", "" def _set_exporter_defaults(self) -> None: crt, key = self._generate_exporter_ssl() token = self._generate_exporter_token() self._set_exporter_config({ "crt": crt, "key": key, "token": token, "port": CephadmExporterConfig.DEFAULT_PORT }) self._set_exporter_option('enabled', 'true') def _generate_exporter_ssl(self) -> Tuple[str, str]: return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"}) def _generate_exporter_token(self) -> str: return secrets.token_hex(32) @orchestrator._cli_write_command( prefix='cephadm clear-exporter-config') @service_inactive('cephadm-exporter') def _clear_exporter_config(self) -> Tuple[int, str, str]: """ Clear the SSL configuration used by cephadm exporter daemons """ self._clear_exporter_config_settings() self.log.info('Cleared cephadm exporter configuration') return 0, "", "" def _clear_exporter_config_settings(self) -> None: self.set_store('exporter_config', None) self._set_exporter_option('enabled', None) @orchestrator._cli_write_command( prefix='cephadm set-exporter-config') @service_inactive('cephadm-exporter') def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Set custom cephadm-exporter configuration from a json file (-i ). JSON must contain crt, key, token and port """ if not inbuf: return 1, "", "JSON configuration has not been provided (-i )" cfg = CephadmExporterConfig(self) rc, reason = cfg.load_from_json(inbuf) if rc: return 1, "", reason rc, reason = cfg.validate_config() if rc: return 1, "", reason self._set_exporter_config({ "crt": cfg.crt, "key": cfg.key, "token": cfg.token, "port": cfg.port }) self.log.info("Loaded and verified the TLS configuration") return 0, "", "" @orchestrator._cli_read_command( 'cephadm get-exporter-config') def _show_exporter_config(self) -> Tuple[int, str, str]: """ Show the current cephadm-exporter configuraion (JSON)' """ cfg = self._get_exporter_config() return 0, json.dumps(cfg, indent=2), "" @orchestrator._cli_read_command('cephadm config-check ls') def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult: """List the available configuration checks and their current state""" if format not in [Format.plain, Format.json, Format.json_pretty]: return HandleCommandResult( retval=1, stderr="Requested format is not supported when listing configuration checks" ) if format in [Format.json, Format.json_pretty]: return HandleCommandResult( stdout=to_format(self.config_checker.health_checks, format, many=True, cls=None)) # plain formatting table = PrettyTable( ['NAME', 'HEALTHCHECK', 'STATUS', 'DESCRIPTION' ], border=False) table.align['NAME'] = 'l' table.align['HEALTHCHECK'] = 'l' table.align['STATUS'] = 'l' table.align['DESCRIPTION'] = 'l' table.left_padding_width = 0 table.right_padding_width = 2 for c in self.config_checker.health_checks: table.add_row(( c.name, c.healthcheck_name, c.status, c.description, )) return HandleCommandResult(stdout=table.get_string()) @orchestrator._cli_read_command('cephadm config-check status') def _config_check_status(self) -> HandleCommandResult: """Show whether the configuration checker feature is enabled/disabled""" status = self.get_module_option('config_checks_enabled') return HandleCommandResult(stdout="Enabled" if status else "Disabled") @orchestrator._cli_write_command('cephadm config-check enable') def _config_check_enable(self, check_name: str) -> HandleCommandResult: """Enable a specific configuration check""" if not self._config_check_valid(check_name): return HandleCommandResult(retval=1, stderr="Invalid check name") err, msg = self._update_config_check(check_name, 'enabled') if err: return HandleCommandResult( retval=err, stderr=f"Failed to enable check '{check_name}' : {msg}") return HandleCommandResult(stdout="ok") @orchestrator._cli_write_command('cephadm config-check disable') def _config_check_disable(self, check_name: str) -> HandleCommandResult: """Disable a specific configuration check""" if not self._config_check_valid(check_name): return HandleCommandResult(retval=1, stderr="Invalid check name") err, msg = self._update_config_check(check_name, 'disabled') if err: return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}") else: # drop any outstanding raised healthcheck for this check config_check = self.config_checker.lookup_check(check_name) if config_check: if config_check.healthcheck_name in self.health_checks: self.health_checks.pop(config_check.healthcheck_name, None) self.set_health_checks(self.health_checks) else: self.log.error( f"Unable to resolve a check name ({check_name}) to a healthcheck definition?") return HandleCommandResult(stdout="ok") def _config_check_valid(self, check_name: str) -> bool: return check_name in [chk.name for chk in self.config_checker.health_checks] def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]: checks_raw = self.get_store('config_checks') if not checks_raw: return 1, "config_checks setting is not available" checks = json.loads(checks_raw) checks.update({ check_name: status }) self.log.info(f"updated config check '{check_name}' : {status}") self.set_store('config_checks', json.dumps(checks)) return 0, "" class ExtraCephConf(NamedTuple): conf: str last_modified: Optional[datetime.datetime] def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf': data = self.get_store('extra_ceph_conf') if not data: return CephadmOrchestrator.ExtraCephConf('', None) try: j = json.loads(data) except ValueError: msg = 'Unable to load extra_ceph_conf: Cannot decode JSON' self.log.exception('%s: \'%s\'', msg, data) return CephadmOrchestrator.ExtraCephConf('', None) return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified'])) def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool: conf = self.extra_ceph_conf() if not conf.last_modified: return False return conf.last_modified > dt @orchestrator._cli_write_command( 'cephadm osd activate' ) def _osd_activate(self, host: List[str]) -> HandleCommandResult: """ Start OSD containers for existing OSDs """ @forall_hosts def run(h: str) -> str: return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd') return HandleCommandResult(stdout='\n'.join(run(host))) @orchestrator._cli_read_command('orch client-keyring ls') def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult: """ List client keyrings under cephadm management """ if format != Format.plain: output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec) else: table = PrettyTable( ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'], border=False) table.align = 'l' table.left_padding_width = 0 table.right_padding_width = 2 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity): table.add_row(( ks.entity, ks.placement.pretty_str(), utils.file_mode_to_str(ks.mode), f'{ks.uid}:{ks.gid}', ks.path, )) output = table.get_string() return HandleCommandResult(stdout=output) @orchestrator._cli_write_command('orch client-keyring set') def _client_keyring_set( self, entity: str, placement: str, owner: Optional[str] = None, mode: Optional[str] = None, ) -> HandleCommandResult: """ Add or update client keyring under cephadm management """ if not entity.startswith('client.'): raise OrchestratorError('entity must start with client.') if owner: try: uid, gid = map(int, owner.split(':')) except Exception: raise OrchestratorError('owner must look like ":", e.g., "0:0"') else: uid = 0 gid = 0 if mode: try: imode = int(mode, 8) except Exception: raise OrchestratorError('mode must be an octal mode, e.g. "600"') else: imode = 0o600 pspec = PlacementSpec.from_string(placement) ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid) self.keys.update(ks) self._kick_serve_loop() return HandleCommandResult() @orchestrator._cli_write_command('orch client-keyring rm') def _client_keyring_rm( self, entity: str, ) -> HandleCommandResult: """ Remove client keyring from cephadm management """ self.keys.rm(entity) self._kick_serve_loop() return HandleCommandResult() def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection', 'remoto.backends.LegacyModuleExecute']: """ Setup a connection for running commands on remote host. """ conn, r = self._cons.get(host, (None, None)) if conn: if conn.has_connection(): self.log.debug('Have connection to %s' % host) return conn, r else: self._reset_con(host) assert self.ssh_user n = self.ssh_user + '@' + host self.log.debug("Opening connection to {} with ssh options '{}'".format( n, self._ssh_options)) child_logger = self.log.getChild(n) child_logger.setLevel('WARNING') conn = remoto.Connection( n, logger=child_logger, ssh_options=self._ssh_options, sudo=True if self.ssh_user != 'root' else False) r = conn.import_module(remotes) self._cons[host] = conn, r return conn, r def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str: """ Remote validator that accepts a connection object to ensure that a certain executable is available returning its full path if so. Otherwise an exception with thorough details will be raised, informing the user that the executable was not found. """ executable_path = conn.remote_module.which(executable) if not executable_path: raise RuntimeError("Executable '{}' not found on host '{}'".format( executable, conn.hostname)) self.log.debug("Found executable '{}' at path '{}'".format(executable, executable_path)) return executable_path def _get_container_image(self, daemon_name: str) -> Optional[str]: daemon_type = daemon_name.split('.', 1)[0] # type: ignore image: Optional[str] = None if daemon_type in CEPH_IMAGE_TYPES: # get container image image = str(self.get_foreign_ceph_option( utils.name_to_config_section(daemon_name), 'container_image' )).strip() elif daemon_type == 'prometheus': image = self.container_image_prometheus elif daemon_type == 'grafana': image = self.container_image_grafana elif daemon_type == 'alertmanager': image = self.container_image_alertmanager elif daemon_type == 'node-exporter': image = self.container_image_node_exporter elif daemon_type == 'haproxy': image = self.container_image_haproxy elif daemon_type == 'keepalived': image = self.container_image_keepalived elif daemon_type == CustomContainerService.TYPE: # The image can't be resolved, the necessary information # is only available when a container is deployed (given # via spec). image = None elif daemon_type == 'snmp-gateway': image = self.container_image_snmp_gateway else: assert False, daemon_type self.log.debug('%s container image %s' % (daemon_name, image)) return image def _schedulable_hosts(self) -> List[HostSpec]: """ Returns all usable hosts that went through _refresh_host_daemons(). This mitigates a potential race, where new host was added *after* ``_refresh_host_daemons()`` was called, but *before* ``_apply_all_specs()`` was called. thus we end up with a hosts where daemons might be running, but we have not yet detected them. """ return [ h for h in self.inventory.all_specs() if ( self.cache.host_had_daemon_refresh(h.hostname) and '_no_schedule' not in h.labels ) ] def _unreachable_hosts(self) -> List[HostSpec]: """ Return all hosts that are offline or in maintenance mode. The idea is we should not touch the daemons on these hosts (since in theory the hosts are inaccessible so we CAN'T touch them) but we still want to count daemons that exist on these hosts toward the placement so daemons on these hosts aren't just moved elsewhere """ return [ h for h in self.inventory.all_specs() if ( h.status.lower() in ['maintenance', 'offline'] or h.hostname in self.offline_hosts ) ] def _check_valid_addr(self, host: str, addr: str) -> str: # make sure hostname is resolvable before trying to make a connection try: ip_addr = utils.resolve_ip(addr) except OrchestratorError as e: msg = str(e) + f''' You may need to supply an address for {addr} Please make sure that the host is reachable and accepts connections using the cephadm SSH key To add the cephadm SSH key to the host: > ceph cephadm get-pub-key > ~/ceph.pub > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr} To check that the host is reachable open a new shell with the --no-hosts flag: > cephadm shell --no-hosts Then run the following: > ceph cephadm get-ssh-config > ssh_config > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key > chmod 0600 ~/cephadm_private_key > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}''' raise OrchestratorError(msg) if ipaddress.ip_address(ip_addr).is_loopback and host == addr: # if this is a re-add, use old address. otherwise error if host not in self.inventory or self.inventory.get_addr(host) == host: raise OrchestratorError( (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n' + f'Please explicitly provide the address (ceph orch host add {host} --addr )')) self.log.debug( f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.') ip_addr = self.inventory.get_addr(host) out, err, code = CephadmServe(self)._run_cephadm( host, cephadmNoImage, 'check-host', ['--expect-hostname', host], addr=addr, error_ok=True, no_fsid=True) if code: msg = 'check-host failed:\n' + '\n'.join(err) # err will contain stdout and stderr, so we filter on the message text to # only show the errors errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')] if errors: msg = f'Host {host} ({addr}) failed check(s): {errors}' raise OrchestratorError(msg) return ip_addr def _add_host(self, spec): # type: (HostSpec) -> str """ Add a host to be managed by the orchestrator. :param host: host name """ HostSpec.validate(spec) ip_addr = self._check_valid_addr(spec.hostname, spec.addr) if spec.addr == spec.hostname and ip_addr: spec.addr = ip_addr if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr: self.cache.refresh_all_host_info(spec.hostname) # prime crush map? if spec.location: self.check_mon_command({ 'prefix': 'osd crush add-bucket', 'name': spec.hostname, 'type': 'host', 'args': [f'{k}={v}' for k, v in spec.location.items()], }) if spec.hostname not in self.inventory: self.cache.prime_empty_host(spec.hostname) self.inventory.add_host(spec) self.offline_hosts_remove(spec.hostname) if spec.status == 'maintenance': self._set_maintenance_healthcheck() self.event.set() # refresh stray health check self.log.info('Added host %s' % spec.hostname) return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr) @handle_orch_error def add_host(self, spec: HostSpec) -> str: return self._add_host(spec) @handle_orch_error def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str: """ Remove a host from orchestrator management. :param host: host name :param force: bypass running daemons check :param offline: remove offline host """ # check if host is offline host_offline = host in self.offline_hosts if host_offline and not offline: raise OrchestratorValidationError( "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host)) if not host_offline and offline: raise OrchestratorValidationError( "{} is online, please remove host without --offline.".format(host)) if offline and not force: raise OrchestratorValidationError("Removing an offline host requires --force") # check if there are daemons on the host if not force: daemons = self.cache.get_daemons_by_host(host) if daemons: self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}") daemons_table = "" daemons_table += "{:<20} {:<15}\n".format("type", "id") daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) for d in daemons: daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) raise OrchestratorValidationError("Not allowed to remove %s from cluster. " "The following daemons are running in the host:" "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % ( host, daemons_table, host)) # check, if there we're removing the last _admin host if not force: p = PlacementSpec(label='_admin') admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) if len(admin_hosts) == 1 and admin_hosts[0] == host: raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'" " label. Please add the '_admin' label to a host" " or add --force to this command") def run_cmd(cmd_args: dict) -> None: ret, out, err = self.mon_command(cmd_args) if ret != 0: self.log.debug(f"ran {cmd_args} with mon_command") self.log.error( f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") if offline: daemons = self.cache.get_daemons_by_host(host) for d in daemons: self.log.info(f"removing: {d.name()}") if d.daemon_type != 'osd': self.cephadm_services[str(d.daemon_type)].pre_remove(d) self.cephadm_services[str(d.daemon_type)].post_remove(d, is_failed_deploy=False) else: cmd_args = { 'prefix': 'osd purge-actual', 'id': int(str(d.daemon_id)), 'yes_i_really_mean_it': True } run_cmd(cmd_args) cmd_args = { 'prefix': 'osd crush rm', 'name': host } run_cmd(cmd_args) self.inventory.rm_host(host) self.cache.rm_host(host) self._reset_con(host) self.event.set() # refresh stray health check self.log.info('Removed host %s' % host) return "Removed {} host '{}'".format('offline' if offline else '', host) @handle_orch_error def update_host_addr(self, host: str, addr: str) -> str: self._check_valid_addr(host, addr) self.inventory.set_addr(host, addr) self._reset_con(host) self.event.set() # refresh stray health check self.log.info('Set host %s addr to %s' % (host, addr)) return "Updated host '{}' addr to '{}'".format(host, addr) @handle_orch_error def get_hosts(self): # type: () -> List[orchestrator.HostSpec] """ Return a list of hosts managed by the orchestrator. Notes: - skip async: manager reads from cache. """ return list(self.inventory.all_specs()) @handle_orch_error def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]: """ Return a list of hosts metadata(gather_facts) managed by the orchestrator. Notes: - skip async: manager reads from cache. """ if hostname: return [self.cache.get_facts(hostname)] return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()] @handle_orch_error def add_host_label(self, host: str, label: str) -> str: self.inventory.add_label(host, label) self.log.info('Added label %s to host %s' % (label, host)) self._kick_serve_loop() return 'Added label %s to host %s' % (label, host) @handle_orch_error def remove_host_label(self, host: str, label: str, force: bool = False) -> str: # if we remove the _admin label from the only host that has it we could end up # removing the only instance of the config and keyring and cause issues if not force and label == '_admin': p = PlacementSpec(label='_admin') admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) if len(admin_hosts) == 1 and admin_hosts[0] == host: raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'" " label.\nRemoving the _admin label from this host could cause the removal" " of the last cluster config/keyring managed by cephadm.\n" "It is recommended to add the _admin label to another host" " before completing this operation.\nIf you're certain this is" " what you want rerun this command with --force.") self.inventory.rm_label(host, label) self.log.info('Removed label %s to host %s' % (label, host)) self._kick_serve_loop() return 'Removed label %s from host %s' % (label, host) def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]: self.log.debug("running host-ok-to-stop checks") daemons = self.cache.get_daemons() daemon_map: Dict[str, List[str]] = defaultdict(lambda: []) for dd in daemons: assert dd.hostname is not None assert dd.daemon_type is not None assert dd.daemon_id is not None if dd.hostname == hostname: daemon_map[dd.daemon_type].append(dd.daemon_id) notifications: List[str] = [] error_notifications: List[str] = [] okay: bool = True for daemon_type, daemon_ids in daemon_map.items(): r = self.cephadm_services[daemon_type_to_service( daemon_type)].ok_to_stop(daemon_ids, force=force) if r.retval: okay = False # collect error notifications so user can see every daemon causing host # to not be okay to stop error_notifications.append(r.stderr) if r.stdout: # if extra notifications to print for user, add them to notifications list notifications.append(r.stdout) if not okay: # at least one daemon is not okay to stop return 1, '\n'.join(error_notifications) if notifications: return 0, (f'It is presumed safe to stop host {hostname}. ' + 'Note the following:\n\n' + '\n'.join(notifications)) return 0, f'It is presumed safe to stop host {hostname}' @handle_orch_error def host_ok_to_stop(self, hostname: str) -> str: if hostname not in self.cache.get_hosts(): raise OrchestratorError(f'Cannot find host "{hostname}"') rc, msg = self._host_ok_to_stop(hostname) if rc: raise OrchestratorError(msg, errno=rc) self.log.info(msg) return msg def _set_maintenance_healthcheck(self) -> None: """Raise/update or clear the maintenance health check as needed""" in_maintenance = self.inventory.get_host_with_state("maintenance") if not in_maintenance: self.remove_health_warning('HOST_IN_MAINTENANCE') else: s = "host is" if len(in_maintenance) == 1 else "hosts are" self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [ f"{h} is in maintenance" for h in in_maintenance]) @handle_orch_error @host_exists() def enter_host_maintenance(self, hostname: str, force: bool = False) -> str: """ Attempt to place a cluster host in maintenance Placing a host into maintenance disables the cluster's ceph target in systemd and stops all ceph daemons. If the host is an osd host we apply the noout flag for the host subtree in crush to prevent data movement during a host maintenance window. :param hostname: (str) name of the host (must match an inventory hostname) :raises OrchestratorError: Hostname is invalid, host is already in maintenance """ if len(self.cache.get_hosts()) == 1: raise OrchestratorError("Maintenance feature is not supported on single node clusters") # if upgrade is active, deny if self.upgrade.upgrade_state: raise OrchestratorError( f"Unable to place {hostname} in maintenance with upgrade active/paused") tgt_host = self.inventory._inventory[hostname] if tgt_host.get("status", "").lower() == "maintenance": raise OrchestratorError(f"Host {hostname} is already in maintenance") host_daemons = self.cache.get_daemon_types(hostname) self.log.debug("daemons on host {}".format(','.join(host_daemons))) if host_daemons: # daemons on this host, so check the daemons can be stopped # and if so, place the host into maintenance by disabling the target rc, msg = self._host_ok_to_stop(hostname, force) if rc: raise OrchestratorError( msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc) # call the host-maintenance function _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance", ["enter"], error_ok=True) returned_msg = _err[0].split('\n')[-1] if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): raise OrchestratorError( f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}") if "osd" in host_daemons: crush_node = hostname if '.' not in hostname else hostname.split('.')[0] rc, out, err = self.mon_command({ 'prefix': 'osd set-group', 'flags': 'noout', 'who': [crush_node], 'format': 'json' }) if rc: self.log.warning( f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})") raise OrchestratorError( f"Unable to set the osds on {hostname} to noout (rc={rc})") else: self.log.info( f"maintenance mode request for {hostname} has SET the noout group") # update the host status in the inventory tgt_host["status"] = "maintenance" self.inventory._inventory[hostname] = tgt_host self.inventory.save() self._set_maintenance_healthcheck() return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode' @handle_orch_error @host_exists() def exit_host_maintenance(self, hostname: str) -> str: """Exit maintenance mode and return a host to an operational state Returning from maintnenance will enable the clusters systemd target and start it, and remove any noout that has been added for the host if the host has osd daemons :param hostname: (str) host name :raises OrchestratorError: Unable to return from maintenance, or unset the noout flag """ tgt_host = self.inventory._inventory[hostname] if tgt_host['status'] != "maintenance": raise OrchestratorError(f"Host {hostname} is not in maintenance mode") outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', ['exit'], error_ok=True) returned_msg = errs[0].split('\n')[-1] if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): raise OrchestratorError( f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") if "osd" in self.cache.get_daemon_types(hostname): crush_node = hostname if '.' not in hostname else hostname.split('.')[0] rc, _out, _err = self.mon_command({ 'prefix': 'osd unset-group', 'flags': 'noout', 'who': [crush_node], 'format': 'json' }) if rc: self.log.warning( f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})") raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})") else: self.log.info( f"exit maintenance request has UNSET for the noout group on host {hostname}") # update the host record status tgt_host['status'] = "" self.inventory._inventory[hostname] = tgt_host self.inventory.save() self._set_maintenance_healthcheck() return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode" @handle_orch_error @host_exists() def rescan_host(self, hostname: str) -> str: """Use cephadm to issue a disk rescan on each HBA Some HBAs and external enclosures don't automatically register device insertion with the kernel, so for these scenarios we need to manually rescan :param hostname: (str) host name """ self.log.info(f'disk rescan request sent to host "{hostname}"') _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan", [], no_fsid=True, error_ok=True) if not _err: raise OrchestratorError('Unexpected response from cephadm disk-rescan call') msg = _err[0].split('\n')[-1] log_msg = f'disk rescan: {msg}' if msg.upper().startswith('OK'): self.log.info(log_msg) else: self.log.warning(log_msg) return f'{msg}' def get_minimal_ceph_conf(self) -> str: _, config, _ = self.check_mon_command({ "prefix": "config generate-minimal-conf", }) extra = self.extra_ceph_conf().conf if extra: config += '\n\n' + extra.strip() + '\n' return config def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None: if filter_host: self.cache.invalidate_host_daemons(filter_host) else: for h in self.cache.get_hosts(): # Also discover daemons deployed manually self.cache.invalidate_host_daemons(h) self._kick_serve_loop() @handle_orch_error def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> List[orchestrator.ServiceDescription]: if refresh: self._invalidate_daemons_and_kick_serve() self.log.debug('Kicked serve() loop to refresh all services') sm: Dict[str, orchestrator.ServiceDescription] = {} # known services for nm, spec in self.spec_store.all_specs.items(): if service_type is not None and service_type != spec.service_type: continue if service_name is not None and service_name != nm: continue if spec.service_type != 'osd': size = spec.placement.get_target_count(self._schedulable_hosts()) else: # osd counting is special size = 0 sm[nm] = orchestrator.ServiceDescription( spec=spec, size=size, running=0, events=self.events.get_for_service(spec.service_name()), created=self.spec_store.spec_created[nm], deleted=self.spec_store.spec_deleted.get(nm, None), virtual_ip=spec.get_virtual_ip(), ports=spec.get_port_start(), ) if spec.service_type == 'ingress': # ingress has 2 daemons running per host sm[nm].size *= 2 # factor daemons into status for h, dm in self.cache.get_daemons_with_volatile_status(): for name, dd in dm.items(): assert dd.hostname is not None, f'no hostname for {dd!r}' assert dd.daemon_type is not None, f'no daemon_type for {dd!r}' n: str = dd.service_name() if ( service_type and service_type != daemon_type_to_service(dd.daemon_type) ): continue if service_name and service_name != n: continue if n not in sm: # new unmanaged service spec = ServiceSpec( unmanaged=True, service_type=daemon_type_to_service(dd.daemon_type), service_id=dd.service_id(), ) sm[n] = orchestrator.ServiceDescription( last_refresh=dd.last_refresh, container_image_id=dd.container_image_id, container_image_name=dd.container_image_name, spec=spec, size=0, ) if dd.status == DaemonDescriptionStatus.running: sm[n].running += 1 if dd.daemon_type == 'osd': # The osd count can't be determined by the Placement spec. # Showing an actual/expected representation cannot be determined # here. So we're setting running = size for now. sm[n].size += 1 if ( not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh # type: ignore ): sm[n].last_refresh = dd.last_refresh return list(sm.values()) @handle_orch_error def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> List[orchestrator.DaemonDescription]: if refresh: self._invalidate_daemons_and_kick_serve(host) self.log.debug('Kicked serve() loop to refresh all daemons') result = [] for h, dm in self.cache.get_daemons_with_volatile_status(): if host and h != host: continue for name, dd in dm.items(): if daemon_type is not None and daemon_type != dd.daemon_type: continue if daemon_id is not None and daemon_id != dd.daemon_id: continue if service_name is not None and service_name != dd.service_name(): continue if not dd.memory_request and dd.daemon_type in ['osd', 'mon']: dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option( dd.name(), f"{dd.daemon_type}_memory_target" )) result.append(dd) return result @handle_orch_error def service_action(self, action: str, service_name: str) -> List[str]: if service_name not in self.spec_store.all_specs.keys(): raise OrchestratorError(f'Invalid service name "{service_name}".' + ' View currently running services using "ceph orch ls"') dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) if not dds: raise OrchestratorError(f'No daemons exist under service name "{service_name}".' + ' View currently running services using "ceph orch ls"') if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']: return [f'Stopping entire {service_name} service is prohibited.'] self.log.info('%s service %s' % (action.capitalize(), service_name)) return [ self._schedule_daemon_action(dd.name(), action) for dd in dds ] def _daemon_action(self, daemon_spec: CephadmDaemonDeploySpec, action: str, image: Optional[str] = None) -> str: self._daemon_action_set_image(action, image, daemon_spec.daemon_type, daemon_spec.daemon_id) if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type, daemon_spec.daemon_id): self.mgr_service.fail_over() return '' # unreachable if action == 'redeploy' or action == 'reconfig': if daemon_spec.daemon_type != 'osd': daemon_spec = self.cephadm_services[daemon_type_to_service( daemon_spec.daemon_type)].prepare_create(daemon_spec) else: # for OSDs, we still need to update config, just not carry out the full # prepare_create function daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(daemon_spec) return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')) actions = { 'start': ['reset-failed', 'start'], 'stop': ['stop'], 'restart': ['reset-failed', 'restart'], } name = daemon_spec.name() for a in actions[action]: try: out, err, code = CephadmServe(self)._run_cephadm( daemon_spec.host, name, 'unit', ['--name', name, a]) except Exception: self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed') self.cache.invalidate_host_daemons(daemon_spec.host) msg = "{} {} from host '{}'".format(action, name, daemon_spec.host) self.events.for_daemon(name, 'INFO', msg) return msg def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None: if image is not None: if action != 'redeploy': raise OrchestratorError( f'Cannot execute {action} with new image. `action` needs to be `redeploy`') if daemon_type not in CEPH_IMAGE_TYPES: raise OrchestratorError( f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported ' f'types are: {", ".join(CEPH_IMAGE_TYPES)}') self.check_mon_command({ 'prefix': 'config set', 'name': 'container_image', 'value': image, 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), }) @handle_orch_error def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: d = self.cache.get_daemon(daemon_name) assert d.daemon_type is not None assert d.daemon_id is not None if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \ and not self.mgr_service.mgr_map_has_standby(): raise OrchestratorError( f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id) self.log.info(f'Schedule {action} daemon {daemon_name}') return self._schedule_daemon_action(daemon_name, action) def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() def get_active_mgr_digests(self) -> List[str]: digests = self.mgr_service.get_active_daemon( self.cache.get_daemons_by_type('mgr')).container_image_digests return digests if digests else [] def _schedule_daemon_action(self, daemon_name: str, action: str) -> str: dd = self.cache.get_daemon(daemon_name) assert dd.daemon_type is not None assert dd.daemon_id is not None assert dd.hostname is not None if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ and not self.mgr_service.mgr_map_has_standby(): raise OrchestratorError( f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') self.cache.schedule_daemon_action(dd.hostname, dd.name(), action) msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname) self._kick_serve_loop() return msg @handle_orch_error def remove_daemons(self, names): # type: (List[str]) -> List[str] args = [] for host, dm in self.cache.daemons.items(): for name in names: if name in dm: args.append((name, host)) if not args: raise OrchestratorError('Unable to find daemon(s) %s' % (names)) self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args])) return self._remove_daemons(args) @handle_orch_error def remove_service(self, service_name: str, force: bool = False) -> str: self.log.info('Remove service %s' % service_name) self._trigger_preview_refresh(service_name=service_name) if service_name in self.spec_store: if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'): return f'Unable to remove {service_name} service.\n' \ f'Note, you might want to mark the {service_name} service as "unmanaged"' else: return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n" # Report list of affected OSDs? if not force and service_name.startswith('osd.'): osds_msg = {} for h, dm in self.cache.get_daemons_with_volatile_status(): osds_to_remove = [] for name, dd in dm.items(): if dd.daemon_type == 'osd' and dd.service_name() == service_name: osds_to_remove.append(str(dd.daemon_id)) if osds_to_remove: osds_msg[h] = osds_to_remove if osds_msg: msg = '' for h, ls in osds_msg.items(): msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}' raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}') found = self.spec_store.rm(service_name) if found and service_name.startswith('osd.'): self.spec_store.finally_rm(service_name) self._kick_serve_loop() return f'Removed service {service_name}' @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: """ Return the storage inventory of hosts matching the given filter. :param host_filter: host filter TODO: - add filtering by label """ if refresh: if host_filter and host_filter.hosts: for h in host_filter.hosts: self.log.debug(f'will refresh {h} devs') self.cache.invalidate_host_devices(h) else: for h in self.cache.get_hosts(): self.log.debug(f'will refresh {h} devs') self.cache.invalidate_host_devices(h) self.event.set() self.log.debug('Kicked serve() loop to refresh devices') result = [] for host, dls in self.cache.devices.items(): if host_filter and host_filter.hosts and host not in host_filter.hosts: continue result.append(orchestrator.InventoryHost(host, inventory.Devices(dls))) return result @handle_orch_error def zap_device(self, host: str, path: str) -> str: """Zap a device on a managed host. Use ceph-volume zap to return a device to an unused/free state Args: host (str): hostname of the cluster host path (str): device path Raises: OrchestratorError: host is not a cluster host OrchestratorError: host is in maintenance and therefore unavailable OrchestratorError: device path not found on the host OrchestratorError: device is known to a different ceph cluster OrchestratorError: device holds active osd OrchestratorError: device cache hasn't been populated yet.. Returns: str: output from the zap command """ self.log.info('Zap device %s:%s' % (host, path)) if host not in self.inventory.keys(): raise OrchestratorError( f"Host '{host}' is not a member of the cluster") host_info = self.inventory._inventory.get(host, {}) if host_info.get('status', '').lower() == 'maintenance': raise OrchestratorError( f"Host '{host}' is in maintenance mode, which prevents any actions against it.") if host not in self.cache.devices: raise OrchestratorError( f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.") host_devices = self.cache.devices[host] path_found = False osd_id_list: List[str] = [] for dev in host_devices: if dev.path == path: # match, so look a little deeper if dev.lvs: for lv in cast(List[Dict[str, str]], dev.lvs): if lv.get('osd_id', ''): lv_fsid = lv.get('cluster_fsid') if lv_fsid != self._cluster_fsid: raise OrchestratorError( f"device {path} has lv's from a different Ceph cluster ({lv_fsid})") osd_id_list.append(lv.get('osd_id', '')) path_found = True break if not path_found: raise OrchestratorError( f"Device path '{path}' not found on host '{host}'") if osd_id_list: dev_name = os.path.basename(path) active_osds: List[str] = [] for osd_id in osd_id_list: metadata = self.get_metadata('osd', str(osd_id)) if metadata: if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','): active_osds.append("osd." + osd_id) if active_osds: raise OrchestratorError( f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active " f"OSD{'s' if len(active_osds) > 1 else ''}" f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.") out, err, code = CephadmServe(self)._run_cephadm( host, 'osd', 'ceph-volume', ['--', 'lvm', 'zap', '--destroy', path], error_ok=True) self.cache.invalidate_host_devices(host) if code: raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) msg = f'zap successful for {path} on {host}' self.log.info(msg) return msg + '\n' @handle_orch_error def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: """ Blink a device light. Calling something like:: lsmcli local-disk-ident-led-on --path $path If you must, you can customize this via:: ceph config-key set mgr/cephadm/blink_device_light_cmd '' ceph config-key set mgr/cephadm//blink_device_light_cmd '' See templates/blink_device_light_cmd.j2 """ @forall_hosts def blink(host: str, dev: str, path: str) -> str: cmd_line = self.template.render('blink_device_light_cmd.j2', { 'on': on, 'ident_fault': ident_fault, 'dev': dev, 'path': path }, host=host) cmd_args = shlex.split(cmd_line) out, err, code = CephadmServe(self)._run_cephadm( host, 'osd', 'shell', ['--'] + cmd_args, error_ok=True) if code: raise OrchestratorError( 'Unable to affect %s light for %s:%s. Command: %s' % ( ident_fault, host, dev, ' '.join(cmd_args))) self.log.info('Set %s light for %s:%s %s' % ( ident_fault, host, dev, 'on' if on else 'off')) return "Set %s light for %s:%s %s" % ( ident_fault, host, dev, 'on' if on else 'off') return blink(locs) def get_osd_uuid_map(self, only_up=False): # type: (bool) -> Dict[str, str] osd_map = self.get('osd_map') r = {} for o in osd_map['osds']: # only include OSDs that have ever started in this map. this way # an interrupted osd create can be repeated and succeed the second # time around. osd_id = o.get('osd') if osd_id is None: raise OrchestratorError("Could not retrieve osd_id from osd_map") if not only_up: r[str(osd_id)] = o.get('uuid', '') return r def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]: osd = [x for x in self.get('osd_map')['osds'] if x['osd'] == osd_id] if len(osd) != 1: return None return osd[0] def _trigger_preview_refresh(self, specs: Optional[List[DriveGroupSpec]] = None, service_name: Optional[str] = None, ) -> None: # Only trigger a refresh when a spec has changed trigger_specs = [] if specs: for spec in specs: preview_spec = self.spec_store.spec_preview.get(spec.service_name()) # the to-be-preview spec != the actual spec, this means we need to # trigger a refresh, if the spec has been removed (==None) we need to # refresh as well. if not preview_spec or spec != preview_spec: trigger_specs.append(spec) if service_name: trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))] if not any(trigger_specs): return None refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs) for host in refresh_hosts: self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") self.cache.osdspec_previews_refresh_queue.append(host) @handle_orch_error def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: """ Deprecated. Please use `apply()` instead. Keeping this around to be compapatible to mgr/dashboard """ return [self._apply(spec) for spec in specs] @handle_orch_error def create_osds(self, drive_group: DriveGroupSpec) -> str: hosts: List[HostSpec] = self.inventory.all_specs() filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts) if not filtered_hosts: return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts" return self.osd_service.create_from_spec(drive_group) def _preview_osdspecs(self, osdspecs: Optional[List[DriveGroupSpec]] = None ) -> dict: if not osdspecs: return {'n/a': [{'error': True, 'message': 'No OSDSpec or matching hosts found.'}]} matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs) if not matching_hosts: return {'n/a': [{'error': True, 'message': 'No OSDSpec or matching hosts found.'}]} # Is any host still loading previews or still in the queue to be previewed pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts} if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts): # Report 'pending' when any of the matching hosts is still loading previews (flag is True) return {'n/a': [{'error': True, 'message': 'Preview data is being generated.. ' 'Please re-run this command in a bit.'}]} # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs previews_for_specs = {} for host, raw_reports in self.cache.osdspec_previews.items(): if host not in matching_hosts: continue osd_reports = [] for osd_report in raw_reports: if osd_report.get('osdspec') in [x.service_id for x in osdspecs]: osd_reports.append(osd_report) previews_for_specs.update({host: osd_reports}) return previews_for_specs def _calc_daemon_deps(self, spec: Optional[ServiceSpec], daemon_type: str, daemon_id: str) -> List[str]: deps = [] if daemon_type == 'haproxy': # because cephadm creates new daemon instances whenever # port or ip changes, identifying daemons by name is # sufficient to detect changes. if not spec: return [] ingress_spec = cast(IngressSpec, spec) assert ingress_spec.backend_service daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service) deps = [d.name() for d in daemons] elif daemon_type == 'keepalived': # because cephadm creates new daemon instances whenever # port or ip changes, identifying daemons by name is # sufficient to detect changes. if not spec: return [] daemons = self.cache.get_daemons_by_service(spec.service_name()) deps = [d.name() for d in daemons if d.daemon_type == 'haproxy'] elif daemon_type == 'iscsi': if spec: iscsi_spec = cast(IscsiServiceSpec, spec) deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)] else: deps = [self.get_mgr_ip()] else: need = { 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'], 'grafana': ['prometheus'], 'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'], } for dep_type in need.get(daemon_type, []): for dd in self.cache.get_daemons_by_type(dep_type): deps.append(dd.name()) if daemon_type == 'prometheus': deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283))) return sorted(deps) @forall_hosts def _remove_daemons(self, name: str, host: str) -> str: return CephadmServe(self)._remove_daemon(name, host) def _check_pool_exists(self, pool: str, service_name: str) -> None: logger.info(f'Checking pool "{pool}" exists for service {service_name}') if not self.rados.pool_exists(pool): raise OrchestratorError(f'Cannot find pool "{pool}" for ' f'service {service_name}') def _add_daemon(self, daemon_type: str, spec: ServiceSpec) -> List[str]: """ Add (and place) a daemon. Require explicit host placement. Do not schedule, and do not apply the related scheduling limitations. """ if spec.service_name() not in self.spec_store: raise OrchestratorError('Unable to add a Daemon without Service.\n' 'Please use `ceph orch apply ...` to create a Service.\n' 'Note, you might want to create the service with "unmanaged=true"') self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) if not spec.placement.hosts: raise OrchestratorError('must specify host(s) to deploy on') count = spec.placement.count or len(spec.placement.hosts) daemons = self.cache.get_daemons_by_service(spec.service_name()) return self._create_daemons(daemon_type, spec, daemons, spec.placement.hosts, count) def _create_daemons(self, daemon_type: str, spec: ServiceSpec, daemons: List[DaemonDescription], hosts: List[HostPlacementSpec], count: int) -> List[str]: if count > len(hosts): raise OrchestratorError('too few hosts: want %d, have %s' % ( count, hosts)) did_config = False service_type = daemon_type_to_service(daemon_type) args = [] # type: List[CephadmDaemonDeploySpec] for host, network, name in hosts: daemon_id = self.get_unique_name(daemon_type, host, daemons, prefix=spec.service_id, forcename=name) if not did_config: self.cephadm_services[service_type].config(spec) did_config = True daemon_spec = self.cephadm_services[service_type].make_daemon_spec( host, daemon_id, network, spec, # NOTE: this does not consider port conflicts! ports=spec.get_port_start()) self.log.debug('Placing %s.%s on host %s' % ( daemon_type, daemon_id, host)) args.append(daemon_spec) # add to daemon list so next name(s) will also be unique sd = orchestrator.DaemonDescription( hostname=host, daemon_type=daemon_type, daemon_id=daemon_id, ) daemons.append(sd) @ forall_hosts def create_func_map(*args: Any) -> str: daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) return CephadmServe(self)._create_daemon(daemon_spec) return create_func_map(args) @handle_orch_error def add_daemon(self, spec: ServiceSpec) -> List[str]: ret: List[str] = [] try: with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True): for d_type in service_to_daemon_types(spec.service_type): ret.extend(self._add_daemon(d_type, spec)) return ret except OrchestratorError as e: self.events.from_orch_error(e) raise @handle_orch_error def apply_mon(self, spec: ServiceSpec) -> str: return self._apply(spec) def _apply(self, spec: GenericSpec) -> str: if spec.service_type == 'host': return self._add_host(cast(HostSpec, spec)) if spec.service_type == 'osd': # _trigger preview refresh needs to be smart and # should only refresh if a change has been detected self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)]) return self._apply_service_spec(cast(ServiceSpec, spec)) def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None: self.health_checks[name] = { 'severity': 'warning', 'summary': summary, 'count': count, 'detail': detail, } self.set_health_checks(self.health_checks) def remove_health_warning(self, name: str) -> None: if name in self.health_checks: del self.health_checks[name] self.set_health_checks(self.health_checks) def _plan(self, spec: ServiceSpec) -> dict: if spec.service_type == 'osd': return {'service_name': spec.service_name(), 'service_type': spec.service_type, 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} svc = self.cephadm_services[spec.service_type] ha = HostAssignment( spec=spec, hosts=self._schedulable_hosts(), unreachable_hosts=self._unreachable_hosts(), networks=self.cache.networks, daemons=self.cache.get_daemons_by_service(spec.service_name()), allow_colo=svc.allow_colo(), rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None ) ha.validate() hosts, to_add, to_remove = ha.place() return { 'service_name': spec.service_name(), 'service_type': spec.service_type, 'add': [hs.hostname for hs in to_add], 'remove': [d.name() for d in to_remove] } @handle_orch_error def plan(self, specs: Sequence[GenericSpec]) -> List: results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' 'to the current inventory setup. If any of these conditions change, the \n' 'preview will be invalid. Please make sure to have a minimal \n' 'timeframe between planning and applying the specs.'}] if any([spec.service_type == 'host' for spec in specs]): return [{'error': 'Found . Previews that include Host Specifications are not supported, yet.'}] for spec in specs: results.append(self._plan(cast(ServiceSpec, spec))) return results def _apply_service_spec(self, spec: ServiceSpec) -> str: if spec.placement.is_empty(): # fill in default placement defaults = { 'mon': PlacementSpec(count=5), 'mgr': PlacementSpec(count=2), 'mds': PlacementSpec(count=2), 'rgw': PlacementSpec(count=2), 'ingress': PlacementSpec(count=2), 'iscsi': PlacementSpec(count=1), 'rbd-mirror': PlacementSpec(count=2), 'cephfs-mirror': PlacementSpec(count=1), 'nfs': PlacementSpec(count=1), 'grafana': PlacementSpec(count=1), 'alertmanager': PlacementSpec(count=1), 'prometheus': PlacementSpec(count=1), 'node-exporter': PlacementSpec(host_pattern='*'), 'crash': PlacementSpec(host_pattern='*'), 'container': PlacementSpec(count=1), 'cephadm-exporter': PlacementSpec(host_pattern='*'), 'snmp-gateway': PlacementSpec(count=1), } spec.placement = defaults[spec.service_type] elif spec.service_type in ['mon', 'mgr'] and \ spec.placement.count is not None and \ spec.placement.count < 1: raise OrchestratorError('cannot scale %s service below 1' % ( spec.service_type)) host_count = len(self.inventory.keys()) max_count = self.max_count_per_host if spec.placement.count is not None: if spec.service_type in ['mon', 'mgr']: if spec.placement.count > max(5, host_count): raise OrchestratorError( (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.')) elif spec.service_type != 'osd': if spec.placement.count > (max_count * host_count): raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).' + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd': raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.' + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) HostAssignment( spec=spec, hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh unreachable_hosts=self._unreachable_hosts(), networks=self.cache.networks, daemons=self.cache.get_daemons_by_service(spec.service_name()), allow_colo=self.cephadm_services[spec.service_type].allow_colo(), ).validate() self.log.info('Saving service %s spec with placement %s' % ( spec.service_name(), spec.placement.pretty_str())) self.spec_store.save(spec) self._kick_serve_loop() return "Scheduled %s update..." % spec.service_name() @handle_orch_error def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]: results = [] for spec in specs: if no_overwrite: if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory: results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag' % (cast(HostSpec, spec).hostname, spec.service_type)) continue elif cast(ServiceSpec, spec).service_name() in self.spec_store: results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag' % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name())) continue results.append(self._apply(spec)) return results @handle_orch_error def apply_mgr(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_mds(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_rgw(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_ingress(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_iscsi(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_rbd_mirror(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_nfs(self, spec: ServiceSpec) -> str: return self._apply(spec) def _get_dashboard_url(self): # type: () -> str return self.get('mgr_map').get('services', {}).get('dashboard', '') @handle_orch_error def apply_prometheus(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_node_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_crash(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_grafana(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_alertmanager(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_container(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_snmp_gateway(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_cephadm_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def upgrade_check(self, image: str, version: str) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("check aborted - you have hosts in maintenance state") if version: target_name = self.container_image_base + ':v' + version elif image: target_name = image else: raise OrchestratorError('must specify either image or version') image_info = CephadmServe(self)._get_container_image_info(target_name) ceph_image_version = image_info.ceph_version if not ceph_image_version: return f'Unable to extract ceph version from {target_name}.' if ceph_image_version.startswith('ceph version '): ceph_image_version = ceph_image_version.split(' ')[2] version_error = self.upgrade._check_target_version(ceph_image_version) if version_error: return f'Incompatible upgrade: {version_error}' self.log.debug(f'image info {image} -> {image_info}') r: dict = { 'target_name': target_name, 'target_id': image_info.image_id, 'target_version': image_info.ceph_version, 'needs_update': dict(), 'up_to_date': list(), 'non_ceph_image_daemons': list() } for host, dm in self.cache.daemons.items(): for name, dd in dm.items(): if image_info.image_id == dd.container_image_id: r['up_to_date'].append(dd.name()) elif dd.daemon_type in CEPH_IMAGE_TYPES: r['needs_update'][dd.name()] = { 'current_name': dd.container_image_name, 'current_id': dd.container_image_id, 'current_version': dd.version, } else: r['non_ceph_image_daemons'].append(dd.name()) if self.use_repo_digest and image_info.repo_digests: # FIXME: we assume the first digest is the best one to use r['target_digest'] = image_info.repo_digests[0] return json.dumps(r, indent=4, sort_keys=True) @handle_orch_error def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: return self.upgrade.upgrade_status() @handle_orch_error def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict[Any, Any]: return self.upgrade.upgrade_ls(image, tags) @handle_orch_error def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state") if daemon_types is not None and services is not None: raise OrchestratorError('--daemon-types and --services are mutually exclusive') if daemon_types is not None: for dtype in daemon_types: if dtype not in CEPH_UPGRADE_ORDER: raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n' f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}') if services is not None: for service in services: if service not in self.spec_store: raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n' f'Known services are: {self.spec_store.all_specs.keys()}') hosts: Optional[List[str]] = None if host_placement is not None: all_hosts = list(self.inventory.all_specs()) placement = PlacementSpec.from_string(host_placement) hosts = placement.filter_matching_hostspecs(all_hosts) if not hosts: raise OrchestratorError( f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts') if limit is not None: if limit < 1: raise OrchestratorError(f'Upgrade aborted - --limit arg must be a positive integer, not {limit}') return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit) @handle_orch_error def upgrade_pause(self) -> str: return self.upgrade.upgrade_pause() @handle_orch_error def upgrade_resume(self) -> str: return self.upgrade.upgrade_resume() @handle_orch_error def upgrade_stop(self) -> str: return self.upgrade.upgrade_stop() @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> str: """ Takes a list of OSDs and schedules them for removal. The function that takes care of the actual removal is process_removal_queue(). """ daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') to_remove_daemons = list() for daemon in daemons: if daemon.daemon_id in osd_ids: to_remove_daemons.append(daemon) if not to_remove_daemons: return f"Unable to find OSDs: {osd_ids}" for daemon in to_remove_daemons: assert daemon.daemon_id is not None try: self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), replace=replace, force=force, zap=zap, hostname=daemon.hostname, process_started_at=datetime_now(), remove_util=self.to_remove_osds.rm_util)) except NotFoundError: return f"Unable to find OSDs: {osd_ids}" # trigger the serve loop to initiate the removal self._kick_serve_loop() return "Scheduled OSD(s) for removal" @handle_orch_error def stop_remove_osds(self, osd_ids: List[str]) -> str: """ Stops a `removal` process for a List of OSDs. This will revert their weight and remove it from the osds_to_remove queue """ for osd_id in osd_ids: try: self.to_remove_osds.rm(OSD(osd_id=int(osd_id), remove_util=self.to_remove_osds.rm_util)) except (NotFoundError, KeyError, ValueError): return f'Unable to find OSD in the queue: {osd_id}' # trigger the serve loop to halt the removal self._kick_serve_loop() return "Stopped OSD(s) removal" @handle_orch_error def remove_osds_status(self) -> List[OSD]: """ The CLI call to retrieve an osd removal report """ return self.to_remove_osds.all_osds() @handle_orch_error def drain_host(self, hostname, force=False): # type: (str, bool) -> str """ Drain all daemons from a host. :param host: host name """ # if we drain the last admin host we could end up removing the only instance # of the config and keyring and cause issues if not force: p = PlacementSpec(label='_admin') admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) if len(admin_hosts) == 1 and admin_hosts[0] == hostname: raise OrchestratorValidationError(f"Host {hostname} is the last host with the '_admin'" " label.\nDraining this host could cause the removal" " of the last cluster config/keyring managed by cephadm.\n" "It is recommended to add the _admin label to another host" " before completing this operation.\nIf you're certain this is" " what you want rerun this command with --force.") self.add_host_label(hostname, '_no_schedule') daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname) osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd'] self.remove_osds(osds_to_remove) daemons_table = "" daemons_table += "{:<20} {:<15}\n".format("type", "id") daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) for d in daemons: daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table) def trigger_connect_dashboard_rgw(self) -> None: self.need_connect_dashboard_rgw = True self.event.set()