import asyncio import json import errno import ipaddress import logging import re import shlex from collections import defaultdict from configparser import ConfigParser from contextlib import contextmanager from functools import wraps from tempfile import TemporaryDirectory, NamedTemporaryFile from threading import Event from cephadm.service_discovery import ServiceDiscovery import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \ Awaitable, Iterator import datetime import os import random import multiprocessing.pool import subprocess from prettytable import PrettyTable from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ ServiceSpec, PlacementSpec, \ HostPlacementSpec, IngressSpec, \ TunedProfileSpec, IscsiServiceSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonDeploySpec from cephadm.http_server import CephadmHttpServer from cephadm.agent import CephadmAgentHelpers from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType import orchestrator from orchestrator.module import to_format, Format from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \ service_to_daemon_types from orchestrator._interface import GenericSpec from orchestrator._interface import daemon_type_to_service from . import utils from . import ssh from .migrations import Migrations from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \ CephExporterService from .services.ingress import IngressService from .services.container import CustomContainerService from .services.iscsi import IscsiService from .services.nvmeof import NvmeofService from .services.nfs import NFSService from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService, SNMPGatewayService, LokiService, PromtailService from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService from .schedule import HostAssignment from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \ ClientKeyringStore, ClientKeyringSpec, TunedProfileStore from .upgrade import CephadmUpgrade from .template import TemplateMgr from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \ cephadmNoImage, CEPH_UPGRADE_ORDER, SpecialHostLabels from .configchecks import CephadmConfigChecks from .offline_watcher import OfflineHostWatcher from .tuned_profiles import TunedProfileUtils try: import asyncssh except ImportError as e: asyncssh = None # type: ignore asyncssh_import_error = str(e) logger = logging.getLogger(__name__) T = TypeVar('T') DEFAULT_SSH_CONFIG = """ Host * User root StrictHostKeyChecking no UserKnownHostsFile /dev/null ConnectTimeout=30 """ # cherrypy likes to sys.exit on error. don't let it take us down too! def os_exit_noop(status: int) -> None: pass os._exit = os_exit_noop # type: ignore # Default container images ----------------------------------------------------- DEFAULT_IMAGE = 'quay.io/ceph/ceph' # DO NOT ADD TAG TO THIS DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0' DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0' DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.2' DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0' DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0' DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0' DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7' DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3' DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4' DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1' DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23' DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29' DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29' DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29' # ------------------------------------------------------------------------------ def host_exists(hostname_position: int = 1) -> Callable: """Check that a hostname exists in the inventory""" def inner(func: Callable) -> Callable: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: this = args[0] # self object hostname = args[hostname_position] if hostname not in this.cache.get_hosts(): candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)]) help_msg = f"Did you mean {candidates}?" if candidates else "" raise OrchestratorError( f"Cannot find host '{hostname}' in the inventory. {help_msg}") return func(*args, **kwargs) return wrapper return inner class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, metaclass=CLICommandMeta): _STORE_HOST_PREFIX = "host" instance = None NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary] NATIVE_OPTIONS = [] # type: List[Any] MODULE_OPTIONS = [ Option( 'ssh_config_file', type='str', default=None, desc='customized SSH config file to connect to managed hosts', ), Option( 'device_cache_timeout', type='secs', default=30 * 60, desc='seconds to cache device inventory', ), Option( 'device_enhanced_scan', type='bool', default=False, desc='Use libstoragemgmt during device scans', ), Option( 'inventory_list_all', type='bool', default=False, desc='Whether ceph-volume inventory should report ' 'more devices (mostly mappers (LVs / mpaths), partitions...)', ), Option( 'daemon_cache_timeout', type='secs', default=10 * 60, desc='seconds to cache service (daemon) inventory', ), Option( 'facts_cache_timeout', type='secs', default=1 * 60, desc='seconds to cache host facts data', ), Option( 'host_check_interval', type='secs', default=10 * 60, desc='how frequently to perform a host check', ), Option( 'mode', type='str', enum_allowed=['root', 'cephadm-package'], default='root', desc='mode for remote execution of cephadm', ), Option( 'container_image_base', default=DEFAULT_IMAGE, desc='Container image name, without the tag', runtime=True, ), Option( 'container_image_prometheus', default=DEFAULT_PROMETHEUS_IMAGE, desc='Prometheus container image', ), Option( 'container_image_nvmeof', default=DEFAULT_NVMEOF_IMAGE, desc='Nvme-of container image', ), Option( 'container_image_grafana', default=DEFAULT_GRAFANA_IMAGE, desc='Prometheus container image', ), Option( 'container_image_alertmanager', default=DEFAULT_ALERT_MANAGER_IMAGE, desc='Prometheus container image', ), Option( 'container_image_node_exporter', default=DEFAULT_NODE_EXPORTER_IMAGE, desc='Prometheus container image', ), Option( 'container_image_loki', default=DEFAULT_LOKI_IMAGE, desc='Loki container image', ), Option( 'container_image_promtail', default=DEFAULT_PROMTAIL_IMAGE, desc='Promtail container image', ), Option( 'container_image_haproxy', default=DEFAULT_HAPROXY_IMAGE, desc='HAproxy container image', ), Option( 'container_image_keepalived', default=DEFAULT_KEEPALIVED_IMAGE, desc='Keepalived container image', ), Option( 'container_image_snmp_gateway', default=DEFAULT_SNMP_GATEWAY_IMAGE, desc='SNMP Gateway container image', ), Option( 'container_image_elasticsearch', default=DEFAULT_ELASTICSEARCH_IMAGE, desc='elasticsearch container image', ), Option( 'container_image_jaeger_agent', default=DEFAULT_JAEGER_AGENT_IMAGE, desc='Jaeger agent container image', ), Option( 'container_image_jaeger_collector', default=DEFAULT_JAEGER_COLLECTOR_IMAGE, desc='Jaeger collector container image', ), Option( 'container_image_jaeger_query', default=DEFAULT_JAEGER_QUERY_IMAGE, desc='Jaeger query container image', ), Option( 'warn_on_stray_hosts', type='bool', default=True, desc='raise a health warning if daemons are detected on a host ' 'that is not managed by cephadm', ), Option( 'warn_on_stray_daemons', type='bool', default=True, desc='raise a health warning if daemons are detected ' 'that are not managed by cephadm', ), Option( 'warn_on_failed_host_check', type='bool', default=True, desc='raise a health warning if the host check fails', ), Option( 'log_to_cluster', type='bool', default=True, desc='log to the "cephadm" cluster log channel"', ), Option( 'allow_ptrace', type='bool', default=False, desc='allow SYS_PTRACE capability on ceph containers', long_desc='The SYS_PTRACE capability is needed to attach to a ' 'process with gdb or strace. Enabling this options ' 'can allow debugging daemons that encounter problems ' 'at runtime.', ), Option( 'container_init', type='bool', default=True, desc='Run podman/docker with `--init`' ), Option( 'prometheus_alerts_path', type='str', default='/etc/prometheus/ceph/ceph_default_alerts.yml', desc='location of alerts to include in prometheus deployments', ), Option( 'migration_current', type='int', default=None, desc='internal - do not modify', # used to track spec and other data migrations. ), Option( 'config_dashboard', type='bool', default=True, desc='manage configs like API endpoints in Dashboard.' ), Option( 'manage_etc_ceph_ceph_conf', type='bool', default=False, desc='Manage and own /etc/ceph/ceph.conf on the hosts.', ), Option( 'manage_etc_ceph_ceph_conf_hosts', type='str', default='*', desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf', ), # not used anymore Option( 'registry_url', type='str', default=None, desc='Registry url for login purposes. This is not the default registry' ), Option( 'registry_username', type='str', default=None, desc='Custom repository username. Only used for logging into a registry.' ), Option( 'registry_password', type='str', default=None, desc='Custom repository password. Only used for logging into a registry.' ), #### Option( 'registry_insecure', type='bool', default=False, desc='Registry is to be considered insecure (no TLS available). Only for development purposes.' ), Option( 'use_repo_digest', type='bool', default=True, desc='Automatically convert image tags to image digest. Make sure all daemons use the same image', ), Option( 'config_checks_enabled', type='bool', default=False, desc='Enable or disable the cephadm configuration analysis', ), Option( 'default_registry', type='str', default='docker.io', desc='Search-registry to which we should normalize unqualified image names. ' 'This is not the default registry', ), Option( 'max_count_per_host', type='int', default=10, desc='max number of daemons per service per host', ), Option( 'autotune_memory_target_ratio', type='float', default=.7, desc='ratio of total system memory to divide amongst autotuned daemons' ), Option( 'autotune_interval', type='secs', default=10 * 60, desc='how frequently to autotune daemon memory' ), Option( 'use_agent', type='bool', default=False, desc='Use cephadm agent on each host to gather and send metadata' ), Option( 'agent_refresh_rate', type='secs', default=20, desc='How often agent on each host will try to gather and send metadata' ), Option( 'agent_starting_port', type='int', default=4721, desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)' ), Option( 'agent_down_multiplier', type='float', default=3.0, desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down' ), Option( 'max_osd_draining_count', type='int', default=10, desc='max number of osds that will be drained simultaneously when osds are removed' ), Option( 'service_discovery_port', type='int', default=8765, desc='cephadm service discovery port' ), Option( 'cgroups_split', type='bool', default=True, desc='Pass --cgroups=split when cephadm creates containers (currently podman only)' ), Option( 'log_refresh_metadata', type='bool', default=False, desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level' ), Option( 'secure_monitoring_stack', type='bool', default=False, desc='Enable TLS security for all the monitoring stack daemons' ), Option( 'default_cephadm_command_timeout', type='secs', default=15 * 60, desc='Default timeout applied to cephadm commands run directly on ' 'the host (in seconds)' ), ] def __init__(self, *args: Any, **kwargs: Any): super(CephadmOrchestrator, self).__init__(*args, **kwargs) self._cluster_fsid: str = self.get('mon_map')['fsid'] self.last_monmap: Optional[datetime.datetime] = None # for serve() self.run = True self.event = Event() self.ssh = ssh.SSHManager(self) if self.get_store('pause'): self.paused = True else: self.paused = False # for mypy which does not run the code if TYPE_CHECKING: self.ssh_config_file = None # type: Optional[str] self.device_cache_timeout = 0 self.daemon_cache_timeout = 0 self.facts_cache_timeout = 0 self.host_check_interval = 0 self.max_count_per_host = 0 self.mode = '' self.container_image_base = '' self.container_image_prometheus = '' self.container_image_nvmeof = '' self.container_image_grafana = '' self.container_image_alertmanager = '' self.container_image_node_exporter = '' self.container_image_loki = '' self.container_image_promtail = '' self.container_image_haproxy = '' self.container_image_keepalived = '' self.container_image_snmp_gateway = '' self.container_image_elasticsearch = '' self.container_image_jaeger_agent = '' self.container_image_jaeger_collector = '' self.container_image_jaeger_query = '' self.warn_on_stray_hosts = True self.warn_on_stray_daemons = True self.warn_on_failed_host_check = True self.allow_ptrace = False self.container_init = True self.prometheus_alerts_path = '' self.migration_current: Optional[int] = None self.config_dashboard = True self.manage_etc_ceph_ceph_conf = True self.manage_etc_ceph_ceph_conf_hosts = '*' self.registry_url: Optional[str] = None self.registry_username: Optional[str] = None self.registry_password: Optional[str] = None self.registry_insecure: bool = False self.use_repo_digest = True self.default_registry = '' self.autotune_memory_target_ratio = 0.0 self.autotune_interval = 0 self.ssh_user: Optional[str] = None self._ssh_options: Optional[str] = None self.tkey = NamedTemporaryFile() self.ssh_config_fname: Optional[str] = None self.ssh_config: Optional[str] = None self._temp_files: List = [] self.ssh_key: Optional[str] = None self.ssh_pub: Optional[str] = None self.ssh_cert: Optional[str] = None self.use_agent = False self.agent_refresh_rate = 0 self.agent_down_multiplier = 0.0 self.agent_starting_port = 0 self.service_discovery_port = 0 self.secure_monitoring_stack = False self.apply_spec_fails: List[Tuple[str, str]] = [] self.max_osd_draining_count = 10 self.device_enhanced_scan = False self.inventory_list_all = False self.cgroups_split = True self.log_refresh_metadata = False self.default_cephadm_command_timeout = 0 self.notify(NotifyType.mon_map, None) self.config_notify() path = self.get_ceph_option('cephadm_path') try: assert isinstance(path, str) with open(path, 'rb') as f: self._cephadm = f.read() except (IOError, TypeError) as e: raise RuntimeError("unable to read cephadm at '%s': %s" % ( path, str(e))) self.cephadm_binary_path = self._get_cephadm_binary_path() self._worker_pool = multiprocessing.pool.ThreadPool(10) self.ssh._reconfig_ssh() CephadmOrchestrator.instance = self self.upgrade = CephadmUpgrade(self) self.health_checks: Dict[str, dict] = {} self.inventory = Inventory(self) self.cache = HostCache(self) self.cache.load() self.agent_cache = AgentCache(self) self.agent_cache.load() self.to_remove_osds = OSDRemovalQueue(self) self.to_remove_osds.load_from_store() self.spec_store = SpecStore(self) self.spec_store.load() self.keys = ClientKeyringStore(self) self.keys.load() self.tuned_profiles = TunedProfileStore(self) self.tuned_profiles.load() self.tuned_profile_utils = TunedProfileUtils(self) # ensure the host lists are in sync for h in self.inventory.keys(): if h not in self.cache.daemons: self.cache.prime_empty_host(h) for h in self.cache.get_hosts(): if h not in self.inventory: self.cache.rm_host(h) # in-memory only. self.events = EventStore(self) self.offline_hosts: Set[str] = set() self.migration = Migrations(self) _service_classes: Sequence[Type[CephadmService]] = [ OSDService, NFSService, MonService, MgrService, MdsService, RgwService, RbdMirrorService, GrafanaService, AlertmanagerService, PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService, IngressService, CustomContainerService, CephfsMirrorService, NvmeofService, CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService, JaegerQueryService, JaegerAgentService, JaegerCollectorService ] # https://github.com/python/mypy/issues/8993 self.cephadm_services: Dict[str, CephadmService] = { cls.TYPE: cls(self) for cls in _service_classes} # type: ignore self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr']) self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd']) self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi']) self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof']) self.scheduled_async_actions: List[Callable] = [] self.template = TemplateMgr(self) self.requires_post_actions: Set[str] = set() self.need_connect_dashboard_rgw = False self.config_checker = CephadmConfigChecks(self) self.http_server = CephadmHttpServer(self) self.http_server.start() self.agent_helpers = CephadmAgentHelpers(self) if self.use_agent: self.agent_helpers._apply_agent() self.offline_watcher = OfflineHostWatcher(self) self.offline_watcher.start() def shutdown(self) -> None: self.log.debug('shutdown') self._worker_pool.close() self._worker_pool.join() self.http_server.shutdown() self.offline_watcher.shutdown() self.run = False self.event.set() def _get_cephadm_service(self, service_type: str) -> CephadmService: assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES return self.cephadm_services[service_type] def _get_cephadm_binary_path(self) -> str: import hashlib m = hashlib.sha256() m.update(self._cephadm) return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}' def _kick_serve_loop(self) -> None: self.log.debug('_kick_serve_loop') self.event.set() def serve(self) -> None: """ The main loop of cephadm. A command handler will typically change the declarative state of cephadm. This loop will then attempt to apply this new state. """ # for ssh in serve self.event_loop = ssh.EventLoopThread() serve = CephadmServe(self) serve.serve() def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T: if not timeout: timeout = self.default_cephadm_command_timeout # put a lower bound of 60 seconds in case users # accidentally set it to something unreasonable. # For example if they though it was in minutes # rather than seconds if timeout < 60: self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.') timeout = 60 return self.event_loop.get_result(coro, timeout) @contextmanager def async_timeout_handler(self, host: Optional[str] = '', cmd: Optional[str] = '', timeout: Optional[int] = None) -> Iterator[None]: # this is meant to catch asyncio.TimeoutError and convert it into an # OrchestratorError which much of the cephadm codebase is better equipped to handle. # If the command being run, the host it is run on, or the timeout being used # are provided, that will be included in the OrchestratorError's message try: yield except asyncio.TimeoutError: err_str: str = '' if cmd: err_str = f'Command "{cmd}" timed out ' else: err_str = 'Command timed out ' if host: err_str += f'on host {host} ' if timeout: err_str += f'(non-default {timeout} second timeout)' else: err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)') raise OrchestratorError(err_str) def set_container_image(self, entity: str, image: str) -> None: self.check_mon_command({ 'prefix': 'config set', 'name': 'container_image', 'value': image, 'who': entity, }) def config_notify(self) -> None: """ This method is called whenever one of our config options is changed. TODO: this method should be moved into mgr_module.py """ for opt in self.MODULE_OPTIONS: setattr(self, opt['name'], # type: ignore self.get_module_option(opt['name'])) # type: ignore self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore for opt in self.NATIVE_OPTIONS: setattr(self, opt, # type: ignore self.get_ceph_option(opt)) self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore self.event.set() def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None: if notify_type == NotifyType.mon_map: # get monmap mtime so we can refresh configs when mons change monmap = self.get('mon_map') self.last_monmap = str_to_datetime(monmap['modified']) if self.last_monmap and self.last_monmap > datetime_now(): self.last_monmap = None # just in case clocks are skewed if getattr(self, 'manage_etc_ceph_ceph_conf', False): # getattr, due to notify() being called before config_notify() self._kick_serve_loop() if notify_type == NotifyType.pg_summary: self._trigger_osd_removal() def _trigger_osd_removal(self) -> None: remove_queue = self.to_remove_osds.as_osd_ids() if not remove_queue: return data = self.get("osd_stats") for osd in data.get('osd_stats', []): if osd.get('num_pgs') == 0: # if _ANY_ osd that is currently in the queue appears to be empty, # start the removal process if int(osd.get('osd')) in remove_queue: self.log.debug('Found empty osd. Starting removal process') # if the osd that is now empty is also part of the removal queue # start the process self._kick_serve_loop() def pause(self) -> None: if not self.paused: self.log.info('Paused') self.set_store('pause', 'true') self.paused = True # wake loop so we update the health status self._kick_serve_loop() def resume(self) -> None: if self.paused: self.log.info('Resumed') self.paused = False self.set_store('pause', None) # unconditionally wake loop so that 'orch resume' can be used to kick # cephadm self._kick_serve_loop() def get_unique_name( self, daemon_type: str, host: str, existing: List[orchestrator.DaemonDescription], prefix: Optional[str] = None, forcename: Optional[str] = None, rank: Optional[int] = None, rank_generation: Optional[int] = None, ) -> str: """ Generate a unique random service name """ suffix = daemon_type not in [ 'mon', 'crash', 'ceph-exporter', 'prometheus', 'node-exporter', 'grafana', 'alertmanager', 'container', 'agent', 'snmp-gateway', 'loki', 'promtail', 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query' ] if forcename: if len([d for d in existing if d.daemon_id == forcename]): raise orchestrator.OrchestratorValidationError( f'name {daemon_type}.{forcename} already in use') return forcename if '.' in host: host = host.split('.')[0] while True: if prefix: name = prefix + '.' else: name = '' if rank is not None and rank_generation is not None: name += f'{rank}.{rank_generation}.' name += host if suffix: name += '.' + ''.join(random.choice(string.ascii_lowercase) for _ in range(6)) if len([d for d in existing if d.daemon_id == name]): if not suffix: raise orchestrator.OrchestratorValidationError( f'name {daemon_type}.{name} already in use') self.log.debug('name %s exists, trying again', name) continue return name def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None: if ssh_config is None or len(ssh_config.strip()) == 0: raise OrchestratorValidationError('ssh_config cannot be empty') # StrictHostKeyChecking is [yes|no] ? res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config) if not res: raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking') for s in res: if 'ask' in s.lower(): raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') def validate_ssh_config_fname(self, ssh_config_fname: str) -> None: if not os.path.isfile(ssh_config_fname): raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( ssh_config_fname)) def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None: def _as_datetime(value: Optional[str]) -> Optional[datetime.datetime]: return str_to_datetime(value) if value is not None else None dm = {} for d in ls: if not d['style'].startswith('cephadm'): continue if d['fsid'] != self._cluster_fsid: continue if '.' not in d['name']: continue daemon_type = d['name'].split('.')[0] if daemon_type not in orchestrator.KNOWN_DAEMON_TYPES: logger.warning(f"Found unknown daemon type {daemon_type} on host {host}") continue container_id = d.get('container_id') if container_id: # shorten the hash container_id = container_id[0:12] rank = int(d['rank']) if d.get('rank') is not None else None rank_generation = int(d['rank_generation']) if d.get( 'rank_generation') is not None else None status, status_desc = None, 'unknown' if 'state' in d: status_desc = d['state'] status = { 'running': DaemonDescriptionStatus.running, 'stopped': DaemonDescriptionStatus.stopped, 'error': DaemonDescriptionStatus.error, 'unknown': DaemonDescriptionStatus.error, }[d['state']] sd = orchestrator.DaemonDescription( daemon_type=daemon_type, daemon_id='.'.join(d['name'].split('.')[1:]), hostname=host, container_id=container_id, container_image_id=d.get('container_image_id'), container_image_name=d.get('container_image_name'), container_image_digests=d.get('container_image_digests'), version=d.get('version'), status=status, status_desc=status_desc, created=_as_datetime(d.get('created')), started=_as_datetime(d.get('started')), last_refresh=datetime_now(), last_configured=_as_datetime(d.get('last_configured')), last_deployed=_as_datetime(d.get('last_deployed')), memory_usage=d.get('memory_usage'), memory_request=d.get('memory_request'), memory_limit=d.get('memory_limit'), cpu_percentage=d.get('cpu_percentage'), service_name=d.get('service_name'), ports=d.get('ports'), ip=d.get('ip'), deployed_by=d.get('deployed_by'), rank=rank, rank_generation=rank_generation, extra_container_args=d.get('extra_container_args'), extra_entrypoint_args=d.get('extra_entrypoint_args'), ) dm[sd.name()] = sd self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm))) self.cache.update_host_daemons(host, dm) self.cache.save_host(host) return None def update_watched_hosts(self) -> None: # currently, we are watching hosts with nfs daemons hosts_to_watch = [d.hostname for d in self.cache.get_daemons( ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES] self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None]))) def offline_hosts_remove(self, host: str) -> None: if host in self.offline_hosts: self.offline_hosts.remove(host) def update_failed_daemon_health_check(self) -> None: failed_daemons = [] for dd in self.cache.get_error_daemons(): if dd.daemon_type != 'agent': # agents tracked by CEPHADM_AGENT_DOWN failed_daemons.append('daemon %s on %s is in %s state' % ( dd.name(), dd.hostname, dd.status_desc )) self.remove_health_warning('CEPHADM_FAILED_DAEMON') if failed_daemons: self.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len( failed_daemons), failed_daemons) @staticmethod def can_run() -> Tuple[bool, str]: if asyncssh is not None: return True, "" else: return False, "loading asyncssh library:{}".format( asyncssh_import_error) def available(self) -> Tuple[bool, str, Dict[str, Any]]: """ The cephadm orchestrator is always available. """ ok, err = self.can_run() if not ok: return ok, err, {} if not self.ssh_key or not self.ssh_pub: return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {} # mypy is unable to determine type for _processes since it's private worker_count: int = self._worker_pool._processes # type: ignore ret = { "workers": worker_count, "paused": self.paused, } return True, err, ret def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None: self.set_store(what, new) self.ssh._reconfig_ssh() if self.cache.get_hosts(): # Can't check anything without hosts host = self.cache.get_hosts()[0] r = CephadmServe(self)._check_host(host) if r is not None: # connection failed reset user self.set_store(what, old) self.ssh._reconfig_ssh() raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host)) self.log.info(f'Set ssh {what}') @orchestrator._cli_write_command( prefix='cephadm set-ssh-config') def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Set the ssh_config file (use -i ) """ # Set an ssh_config file provided from stdin old = self.ssh_config if inbuf == old: return 0, "value unchanged", "" self.validate_ssh_config_content(inbuf) self._validate_and_set_ssh_val('ssh_config', inbuf, old) return 0, "", "" @orchestrator._cli_write_command('cephadm clear-ssh-config') def _clear_ssh_config(self) -> Tuple[int, str, str]: """ Clear the ssh_config file """ # Clear the ssh_config file provided from stdin self.set_store("ssh_config", None) self.ssh_config_tmp = None self.log.info('Cleared ssh_config') self.ssh._reconfig_ssh() return 0, "", "" @orchestrator._cli_read_command('cephadm get-ssh-config') def _get_ssh_config(self) -> HandleCommandResult: """ Returns the ssh config as used by cephadm """ if self.ssh_config_file: self.validate_ssh_config_fname(self.ssh_config_file) with open(self.ssh_config_file) as f: return HandleCommandResult(stdout=f.read()) ssh_config = self.get_store("ssh_config") if ssh_config: return HandleCommandResult(stdout=ssh_config) return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) @orchestrator._cli_write_command('cephadm generate-key') def _generate_key(self) -> Tuple[int, str, str]: """ Generate a cluster SSH key (if not present) """ if not self.ssh_pub or not self.ssh_key: self.log.info('Generating ssh key...') tmp_dir = TemporaryDirectory() path = tmp_dir.name + '/key' try: subprocess.check_call([ '/usr/bin/ssh-keygen', '-C', 'ceph-%s' % self._cluster_fsid, '-N', '', '-f', path ]) with open(path, 'r') as f: secret = f.read() with open(path + '.pub', 'r') as f: pub = f.read() finally: os.unlink(path) os.unlink(path + '.pub') tmp_dir.cleanup() self.set_store('ssh_identity_key', secret) self.set_store('ssh_identity_pub', pub) self.ssh._reconfig_ssh() return 0, '', '' @orchestrator._cli_write_command( 'cephadm set-priv-key') def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """Set cluster SSH private key (use -i )""" if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty private ssh key provided" old = self.ssh_key if inbuf == old: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old) self.log.info('Set ssh private key') return 0, "", "" @orchestrator._cli_write_command( 'cephadm set-pub-key') def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """Set cluster SSH public key (use -i )""" if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty public ssh key provided" old = self.ssh_pub if inbuf == old: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old) return 0, "", "" @orchestrator._cli_write_command( 'cephadm set-signed-cert') def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """Set a signed cert if CA signed keys are being used (use -i )""" if inbuf is None or len(inbuf) == 0: return -errno.EINVAL, "", "empty cert file provided" old = self.ssh_cert if inbuf == old: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old) return 0, "", "" @orchestrator._cli_write_command( 'cephadm clear-key') def _clear_key(self) -> Tuple[int, str, str]: """Clear cluster SSH key""" self.set_store('ssh_identity_key', None) self.set_store('ssh_identity_pub', None) self.set_store('ssh_identity_cert', None) self.ssh._reconfig_ssh() self.log.info('Cleared cluster SSH key') return 0, '', '' @orchestrator._cli_read_command( 'cephadm get-pub-key') def _get_pub_key(self) -> Tuple[int, str, str]: """Show SSH public key for connecting to cluster hosts""" if self.ssh_pub: return 0, self.ssh_pub, '' else: return -errno.ENOENT, '', 'No cluster SSH key defined' @orchestrator._cli_read_command( 'cephadm get-signed-cert') def _get_signed_cert(self) -> Tuple[int, str, str]: """Show SSH signed cert for connecting to cluster hosts using CA signed keys""" if self.ssh_cert: return 0, self.ssh_cert, '' else: return -errno.ENOENT, '', 'No signed cert defined' @orchestrator._cli_read_command( 'cephadm get-user') def _get_user(self) -> Tuple[int, str, str]: """ Show user for SSHing to cluster hosts """ if self.ssh_user is None: return -errno.ENOENT, '', 'No cluster SSH user configured' else: return 0, self.ssh_user, '' @orchestrator._cli_read_command( 'cephadm set-user') def set_ssh_user(self, user: str) -> Tuple[int, str, str]: """ Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users """ current_user = self.ssh_user if user == current_user: return 0, "value unchanged", "" self._validate_and_set_ssh_val('ssh_user', user, current_user) current_ssh_config = self._get_ssh_config() new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout) self._set_ssh_config(new_ssh_config) msg = 'ssh user set to %s' % user if user != 'root': msg += '. sudo will be used' self.log.info(msg) return 0, msg, '' @orchestrator._cli_read_command( 'cephadm registry-login') def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Set custom registry login info by providing url, username and password or json file with login info (-i ) """ # if password not given in command line, get it through file input if not (url and username and password) and (inbuf is None or len(inbuf) == 0): return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments " "or -i ") elif (url and username and password): registry_json = {'url': url, 'username': username, 'password': password} else: assert isinstance(inbuf, str) registry_json = json.loads(inbuf) if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json: return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. " "Please setup json file as\n" "{\n" " \"url\": \"REGISTRY_URL\",\n" " \"username\": \"REGISTRY_USERNAME\",\n" " \"password\": \"REGISTRY_PASSWORD\"\n" "}\n") # verify login info works by attempting login on random host host = None for host_name in self.inventory.keys(): host = host_name break if not host: raise OrchestratorError('no hosts defined') with self.async_timeout_handler(host, 'cephadm registry-login'): r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json)) if r is not None: return 1, '', r # if logins succeeded, store info self.log.debug("Host logins successful. Storing login info.") self.set_store('registry_credentials', json.dumps(registry_json)) # distribute new login info to all hosts self.cache.distribute_new_registry_login_info() return 0, "registry login scheduled", '' @orchestrator._cli_read_command('cephadm check-host') def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Check whether we can access and manage a remote host""" try: with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'): out, err, code = self.wait_async( CephadmServe(self)._run_cephadm( host, cephadmNoImage, 'check-host', ['--expect-hostname', host], addr=addr, error_ok=True, no_fsid=True)) if code: return 1, '', ('check-host failed:\n' + '\n'.join(err)) except ssh.HostConnectionError as e: self.log.exception( f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}") return 1, '', ('check-host failed:\n' + f"Failed to connect to {host} at address ({e.addr}): {str(e)}") except OrchestratorError: self.log.exception(f"check-host failed for '{host}'") return 1, '', ('check-host failed:\n' + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.") # if we have an outstanding health alert for this host, give the # serve thread a kick if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: if item.startswith('host %s ' % host): self.event.set() return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) @orchestrator._cli_read_command( 'cephadm prepare-host') def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: """Prepare a remote host for use with cephadm""" with self.async_timeout_handler(host, 'cephadm prepare-host'): out, err, code = self.wait_async( CephadmServe(self)._run_cephadm( host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host], addr=addr, error_ok=True, no_fsid=True)) if code: return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) # if we have an outstanding health alert for this host, give the # serve thread a kick if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: if item.startswith('host %s ' % host): self.event.set() return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) @orchestrator._cli_write_command( prefix='cephadm set-extra-ceph-conf') def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult: """ Text that is appended to all daemon's ceph.conf. Mainly a workaround, till `config generate-minimal-conf` generates a complete ceph.conf. Warning: this is a dangerous operation. """ if inbuf: # sanity check. cp = ConfigParser() cp.read_string(inbuf, source='') self.set_store("extra_ceph_conf", json.dumps({ 'conf': inbuf, 'last_modified': datetime_to_str(datetime_now()) })) self.log.info('Set extra_ceph_conf') self._kick_serve_loop() return HandleCommandResult() @orchestrator._cli_read_command( 'cephadm get-extra-ceph-conf') def _get_extra_ceph_conf(self) -> HandleCommandResult: """ Get extra ceph conf that is appended """ return HandleCommandResult(stdout=self.extra_ceph_conf().conf) @orchestrator._cli_read_command('cephadm config-check ls') def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult: """List the available configuration checks and their current state""" if format not in [Format.plain, Format.json, Format.json_pretty]: return HandleCommandResult( retval=1, stderr="Requested format is not supported when listing configuration checks" ) if format in [Format.json, Format.json_pretty]: return HandleCommandResult( stdout=to_format(self.config_checker.health_checks, format, many=True, cls=None)) # plain formatting table = PrettyTable( ['NAME', 'HEALTHCHECK', 'STATUS', 'DESCRIPTION' ], border=False) table.align['NAME'] = 'l' table.align['HEALTHCHECK'] = 'l' table.align['STATUS'] = 'l' table.align['DESCRIPTION'] = 'l' table.left_padding_width = 0 table.right_padding_width = 2 for c in self.config_checker.health_checks: table.add_row(( c.name, c.healthcheck_name, c.status, c.description, )) return HandleCommandResult(stdout=table.get_string()) @orchestrator._cli_read_command('cephadm config-check status') def _config_check_status(self) -> HandleCommandResult: """Show whether the configuration checker feature is enabled/disabled""" status = self.get_module_option('config_checks_enabled') return HandleCommandResult(stdout="Enabled" if status else "Disabled") @orchestrator._cli_write_command('cephadm config-check enable') def _config_check_enable(self, check_name: str) -> HandleCommandResult: """Enable a specific configuration check""" if not self._config_check_valid(check_name): return HandleCommandResult(retval=1, stderr="Invalid check name") err, msg = self._update_config_check(check_name, 'enabled') if err: return HandleCommandResult( retval=err, stderr=f"Failed to enable check '{check_name}' : {msg}") return HandleCommandResult(stdout="ok") @orchestrator._cli_write_command('cephadm config-check disable') def _config_check_disable(self, check_name: str) -> HandleCommandResult: """Disable a specific configuration check""" if not self._config_check_valid(check_name): return HandleCommandResult(retval=1, stderr="Invalid check name") err, msg = self._update_config_check(check_name, 'disabled') if err: return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}") else: # drop any outstanding raised healthcheck for this check config_check = self.config_checker.lookup_check(check_name) if config_check: if config_check.healthcheck_name in self.health_checks: self.health_checks.pop(config_check.healthcheck_name, None) self.set_health_checks(self.health_checks) else: self.log.error( f"Unable to resolve a check name ({check_name}) to a healthcheck definition?") return HandleCommandResult(stdout="ok") def _config_check_valid(self, check_name: str) -> bool: return check_name in [chk.name for chk in self.config_checker.health_checks] def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]: checks_raw = self.get_store('config_checks') if not checks_raw: return 1, "config_checks setting is not available" checks = json.loads(checks_raw) checks.update({ check_name: status }) self.log.info(f"updated config check '{check_name}' : {status}") self.set_store('config_checks', json.dumps(checks)) return 0, "" class ExtraCephConf(NamedTuple): conf: str last_modified: Optional[datetime.datetime] def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf': data = self.get_store('extra_ceph_conf') if not data: return CephadmOrchestrator.ExtraCephConf('', None) try: j = json.loads(data) except ValueError: msg = 'Unable to load extra_ceph_conf: Cannot decode JSON' self.log.exception('%s: \'%s\'', msg, data) return CephadmOrchestrator.ExtraCephConf('', None) return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified'])) def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool: conf = self.extra_ceph_conf() if not conf.last_modified: return False return conf.last_modified > dt @orchestrator._cli_write_command( 'cephadm osd activate' ) def _osd_activate(self, host: List[str]) -> HandleCommandResult: """ Start OSD containers for existing OSDs """ @forall_hosts def run(h: str) -> str: with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'): return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')) return HandleCommandResult(stdout='\n'.join(run(host))) @orchestrator._cli_read_command('orch client-keyring ls') def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult: """ List client keyrings under cephadm management """ if format != Format.plain: output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec) else: table = PrettyTable( ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'], border=False) table.align = 'l' table.left_padding_width = 0 table.right_padding_width = 2 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity): table.add_row(( ks.entity, ks.placement.pretty_str(), utils.file_mode_to_str(ks.mode), f'{ks.uid}:{ks.gid}', ks.path, )) output = table.get_string() return HandleCommandResult(stdout=output) @orchestrator._cli_write_command('orch client-keyring set') def _client_keyring_set( self, entity: str, placement: str, owner: Optional[str] = None, mode: Optional[str] = None, ) -> HandleCommandResult: """ Add or update client keyring under cephadm management """ if not entity.startswith('client.'): raise OrchestratorError('entity must start with client.') if owner: try: uid, gid = map(int, owner.split(':')) except Exception: raise OrchestratorError('owner must look like ":", e.g., "0:0"') else: uid = 0 gid = 0 if mode: try: imode = int(mode, 8) except Exception: raise OrchestratorError('mode must be an octal mode, e.g. "600"') else: imode = 0o600 pspec = PlacementSpec.from_string(placement) ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid) self.keys.update(ks) self._kick_serve_loop() return HandleCommandResult() @orchestrator._cli_write_command('orch client-keyring rm') def _client_keyring_rm( self, entity: str, ) -> HandleCommandResult: """ Remove client keyring from cephadm management """ self.keys.rm(entity) self._kick_serve_loop() return HandleCommandResult() def _get_container_image(self, daemon_name: str) -> Optional[str]: daemon_type = daemon_name.split('.', 1)[0] # type: ignore image: Optional[str] = None if daemon_type in CEPH_IMAGE_TYPES: # get container image image = str(self.get_foreign_ceph_option( utils.name_to_config_section(daemon_name), 'container_image' )).strip() elif daemon_type == 'prometheus': image = self.container_image_prometheus elif daemon_type == 'nvmeof': image = self.container_image_nvmeof elif daemon_type == 'grafana': image = self.container_image_grafana elif daemon_type == 'alertmanager': image = self.container_image_alertmanager elif daemon_type == 'node-exporter': image = self.container_image_node_exporter elif daemon_type == 'loki': image = self.container_image_loki elif daemon_type == 'promtail': image = self.container_image_promtail elif daemon_type == 'haproxy': image = self.container_image_haproxy elif daemon_type == 'keepalived': image = self.container_image_keepalived elif daemon_type == 'elasticsearch': image = self.container_image_elasticsearch elif daemon_type == 'jaeger-agent': image = self.container_image_jaeger_agent elif daemon_type == 'jaeger-collector': image = self.container_image_jaeger_collector elif daemon_type == 'jaeger-query': image = self.container_image_jaeger_query elif daemon_type == CustomContainerService.TYPE: # The image can't be resolved, the necessary information # is only available when a container is deployed (given # via spec). image = None elif daemon_type == 'snmp-gateway': image = self.container_image_snmp_gateway else: assert False, daemon_type self.log.debug('%s container image %s' % (daemon_name, image)) return image def _check_valid_addr(self, host: str, addr: str) -> str: # make sure hostname is resolvable before trying to make a connection try: ip_addr = utils.resolve_ip(addr) except OrchestratorError as e: msg = str(e) + f''' You may need to supply an address for {addr} Please make sure that the host is reachable and accepts connections using the cephadm SSH key To add the cephadm SSH key to the host: > ceph cephadm get-pub-key > ~/ceph.pub > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr} To check that the host is reachable open a new shell with the --no-hosts flag: > cephadm shell --no-hosts Then run the following: > ceph cephadm get-ssh-config > ssh_config > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key > chmod 0600 ~/cephadm_private_key > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}''' raise OrchestratorError(msg) if ipaddress.ip_address(ip_addr).is_loopback and host == addr: # if this is a re-add, use old address. otherwise error if host not in self.inventory or self.inventory.get_addr(host) == host: raise OrchestratorError( (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n' + f'Please explicitly provide the address (ceph orch host add {host} --addr )')) self.log.debug( f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.') ip_addr = self.inventory.get_addr(host) try: with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'): out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( host, cephadmNoImage, 'check-host', ['--expect-hostname', host], addr=addr, error_ok=True, no_fsid=True)) if code: msg = 'check-host failed:\n' + '\n'.join(err) # err will contain stdout and stderr, so we filter on the message text to # only show the errors errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')] if errors: msg = f'Host {host} ({addr}) failed check(s): {errors}' raise OrchestratorError(msg) except ssh.HostConnectionError as e: raise OrchestratorError(str(e)) return ip_addr def _add_host(self, spec): # type: (HostSpec) -> str """ Add a host to be managed by the orchestrator. :param host: host name """ HostSpec.validate(spec) ip_addr = self._check_valid_addr(spec.hostname, spec.addr) if spec.addr == spec.hostname and ip_addr: spec.addr = ip_addr if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr: self.cache.refresh_all_host_info(spec.hostname) # prime crush map? if spec.location: self.check_mon_command({ 'prefix': 'osd crush add-bucket', 'name': spec.hostname, 'type': 'host', 'args': [f'{k}={v}' for k, v in spec.location.items()], }) if spec.hostname not in self.inventory: self.cache.prime_empty_host(spec.hostname) self.inventory.add_host(spec) self.offline_hosts_remove(spec.hostname) if spec.status == 'maintenance': self._set_maintenance_healthcheck() self.event.set() # refresh stray health check self.log.info('Added host %s' % spec.hostname) return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr) @handle_orch_error def add_host(self, spec: HostSpec) -> str: return self._add_host(spec) @handle_orch_error def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str: """ Remove a host from orchestrator management. :param host: host name :param force: bypass running daemons check :param offline: remove offline host """ # check if host is offline host_offline = host in self.offline_hosts if host_offline and not offline: raise OrchestratorValidationError( "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host)) if not host_offline and offline: raise OrchestratorValidationError( "{} is online, please remove host without --offline.".format(host)) if offline and not force: raise OrchestratorValidationError("Removing an offline host requires --force") # check if there are daemons on the host if not force: daemons = self.cache.get_daemons_by_host(host) if daemons: self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}") daemons_table = "" daemons_table += "{:<20} {:<15}\n".format("type", "id") daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) for d in daemons: daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) raise OrchestratorValidationError("Not allowed to remove %s from cluster. " "The following daemons are running in the host:" "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % ( host, daemons_table, host)) # check, if there we're removing the last _admin host if not force: p = PlacementSpec(label=SpecialHostLabels.ADMIN) admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) if len(admin_hosts) == 1 and admin_hosts[0] == host: raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'" f" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host" " or add --force to this command") def run_cmd(cmd_args: dict) -> None: ret, out, err = self.mon_command(cmd_args) if ret != 0: self.log.debug(f"ran {cmd_args} with mon_command") self.log.error( f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") if offline: daemons = self.cache.get_daemons_by_host(host) for d in daemons: self.log.info(f"removing: {d.name()}") if d.daemon_type != 'osd': self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d) self.cephadm_services[daemon_type_to_service( str(d.daemon_type))].post_remove(d, is_failed_deploy=False) else: cmd_args = { 'prefix': 'osd purge-actual', 'id': int(str(d.daemon_id)), 'yes_i_really_mean_it': True } run_cmd(cmd_args) cmd_args = { 'prefix': 'osd crush rm', 'name': host } run_cmd(cmd_args) self.inventory.rm_host(host) self.cache.rm_host(host) self.ssh.reset_con(host) # if host was in offline host list, we should remove it now. self.offline_hosts_remove(host) self.event.set() # refresh stray health check self.log.info('Removed host %s' % host) return "Removed {} host '{}'".format('offline' if offline else '', host) @handle_orch_error def update_host_addr(self, host: str, addr: str) -> str: self._check_valid_addr(host, addr) self.inventory.set_addr(host, addr) self.ssh.reset_con(host) self.event.set() # refresh stray health check self.log.info('Set host %s addr to %s' % (host, addr)) return "Updated host '{}' addr to '{}'".format(host, addr) @handle_orch_error def get_hosts(self): # type: () -> List[orchestrator.HostSpec] """ Return a list of hosts managed by the orchestrator. Notes: - skip async: manager reads from cache. """ return list(self.inventory.all_specs()) @handle_orch_error def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]: """ Return a list of hosts metadata(gather_facts) managed by the orchestrator. Notes: - skip async: manager reads from cache. """ if hostname: return [self.cache.get_facts(hostname)] return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()] @handle_orch_error def add_host_label(self, host: str, label: str) -> str: self.inventory.add_label(host, label) self.log.info('Added label %s to host %s' % (label, host)) self._kick_serve_loop() return 'Added label %s to host %s' % (label, host) @handle_orch_error def remove_host_label(self, host: str, label: str, force: bool = False) -> str: # if we remove the _admin label from the only host that has it we could end up # removing the only instance of the config and keyring and cause issues if not force and label == SpecialHostLabels.ADMIN: p = PlacementSpec(label=SpecialHostLabels.ADMIN) admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) if len(admin_hosts) == 1 and admin_hosts[0] == host: raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'" f" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal" " of the last cluster config/keyring managed by cephadm.\n" f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host" " before completing this operation.\nIf you're certain this is" " what you want rerun this command with --force.") if self.inventory.has_label(host, label): self.inventory.rm_label(host, label) msg = f'Removed label {label} from host {host}' else: msg = f"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels." self.log.info(msg) self._kick_serve_loop() return msg def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]: self.log.debug("running host-ok-to-stop checks") daemons = self.cache.get_daemons() daemon_map: Dict[str, List[str]] = defaultdict(lambda: []) for dd in daemons: assert dd.hostname is not None assert dd.daemon_type is not None assert dd.daemon_id is not None if dd.hostname == hostname: daemon_map[dd.daemon_type].append(dd.daemon_id) notifications: List[str] = [] error_notifications: List[str] = [] okay: bool = True for daemon_type, daemon_ids in daemon_map.items(): r = self.cephadm_services[daemon_type_to_service( daemon_type)].ok_to_stop(daemon_ids, force=force) if r.retval: okay = False # collect error notifications so user can see every daemon causing host # to not be okay to stop error_notifications.append(r.stderr) if r.stdout: # if extra notifications to print for user, add them to notifications list notifications.append(r.stdout) if not okay: # at least one daemon is not okay to stop return 1, '\n'.join(error_notifications) if notifications: return 0, (f'It is presumed safe to stop host {hostname}. ' + 'Note the following:\n\n' + '\n'.join(notifications)) return 0, f'It is presumed safe to stop host {hostname}' @handle_orch_error def host_ok_to_stop(self, hostname: str) -> str: if hostname not in self.cache.get_hosts(): raise OrchestratorError(f'Cannot find host "{hostname}"') rc, msg = self._host_ok_to_stop(hostname) if rc: raise OrchestratorError(msg, errno=rc) self.log.info(msg) return msg def _set_maintenance_healthcheck(self) -> None: """Raise/update or clear the maintenance health check as needed""" in_maintenance = self.inventory.get_host_with_state("maintenance") if not in_maintenance: self.remove_health_warning('HOST_IN_MAINTENANCE') else: s = "host is" if len(in_maintenance) == 1 else "hosts are" self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [ f"{h} is in maintenance" for h in in_maintenance]) @handle_orch_error @host_exists() def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> str: """ Attempt to place a cluster host in maintenance Placing a host into maintenance disables the cluster's ceph target in systemd and stops all ceph daemons. If the host is an osd host we apply the noout flag for the host subtree in crush to prevent data movement during a host maintenance window. :param hostname: (str) name of the host (must match an inventory hostname) :raises OrchestratorError: Hostname is invalid, host is already in maintenance """ if yes_i_really_mean_it and not force: raise OrchestratorError("--force must be passed with --yes-i-really-mean-it") if len(self.cache.get_hosts()) == 1 and not yes_i_really_mean_it: raise OrchestratorError("Maintenance feature is not supported on single node clusters") # if upgrade is active, deny if self.upgrade.upgrade_state and not yes_i_really_mean_it: raise OrchestratorError( f"Unable to place {hostname} in maintenance with upgrade active/paused") tgt_host = self.inventory._inventory[hostname] if tgt_host.get("status", "").lower() == "maintenance": raise OrchestratorError(f"Host {hostname} is already in maintenance") host_daemons = self.cache.get_daemon_types(hostname) self.log.debug("daemons on host {}".format(','.join(host_daemons))) if host_daemons: # daemons on this host, so check the daemons can be stopped # and if so, place the host into maintenance by disabling the target rc, msg = self._host_ok_to_stop(hostname, force) if rc and not yes_i_really_mean_it: raise OrchestratorError( msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc) # call the host-maintenance function with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'): _out, _err, _code = self.wait_async( CephadmServe(self)._run_cephadm( hostname, cephadmNoImage, "host-maintenance", ["enter"], error_ok=True)) returned_msg = _err[0].split('\n')[-1] if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it: raise OrchestratorError( f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}") if "osd" in host_daemons: crush_node = hostname if '.' not in hostname else hostname.split('.')[0] rc, out, err = self.mon_command({ 'prefix': 'osd set-group', 'flags': 'noout', 'who': [crush_node], 'format': 'json' }) if rc and not yes_i_really_mean_it: self.log.warning( f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})") raise OrchestratorError( f"Unable to set the osds on {hostname} to noout (rc={rc})") elif not rc: self.log.info( f"maintenance mode request for {hostname} has SET the noout group") # update the host status in the inventory tgt_host["status"] = "maintenance" self.inventory._inventory[hostname] = tgt_host self.inventory.save() self._set_maintenance_healthcheck() return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode' @handle_orch_error @host_exists() def exit_host_maintenance(self, hostname: str) -> str: """Exit maintenance mode and return a host to an operational state Returning from maintenance will enable the clusters systemd target and start it, and remove any noout that has been added for the host if the host has osd daemons :param hostname: (str) host name :raises OrchestratorError: Unable to return from maintenance, or unset the noout flag """ tgt_host = self.inventory._inventory[hostname] if tgt_host['status'] != "maintenance": raise OrchestratorError(f"Host {hostname} is not in maintenance mode") with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'): outs, errs, _code = self.wait_async( CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', ['exit'], error_ok=True)) returned_msg = errs[0].split('\n')[-1] if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): raise OrchestratorError( f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") if "osd" in self.cache.get_daemon_types(hostname): crush_node = hostname if '.' not in hostname else hostname.split('.')[0] rc, _out, _err = self.mon_command({ 'prefix': 'osd unset-group', 'flags': 'noout', 'who': [crush_node], 'format': 'json' }) if rc: self.log.warning( f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})") raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})") else: self.log.info( f"exit maintenance request has UNSET for the noout group on host {hostname}") # update the host record status tgt_host['status'] = "" self.inventory._inventory[hostname] = tgt_host self.inventory.save() self._set_maintenance_healthcheck() return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode" @handle_orch_error @host_exists() def rescan_host(self, hostname: str) -> str: """Use cephadm to issue a disk rescan on each HBA Some HBAs and external enclosures don't automatically register device insertion with the kernel, so for these scenarios we need to manually rescan :param hostname: (str) host name """ self.log.info(f'disk rescan request sent to host "{hostname}"') with self.async_timeout_handler(hostname, 'cephadm disk-rescan'): _out, _err, _code = self.wait_async( CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan", [], no_fsid=True, error_ok=True)) if not _err: raise OrchestratorError('Unexpected response from cephadm disk-rescan call') msg = _err[0].split('\n')[-1] log_msg = f'disk rescan: {msg}' if msg.upper().startswith('OK'): self.log.info(log_msg) else: self.log.warning(log_msg) return f'{msg}' def get_minimal_ceph_conf(self) -> str: _, config, _ = self.check_mon_command({ "prefix": "config generate-minimal-conf", }) extra = self.extra_ceph_conf().conf if extra: try: config = self._combine_confs(config, extra) except Exception as e: self.log.error(f'Failed to add extra ceph conf settings to minimal ceph conf: {e}') return config def _combine_confs(self, conf1: str, conf2: str) -> str: section_to_option: Dict[str, List[str]] = {} final_conf: str = '' for conf in [conf1, conf2]: if not conf: continue section = '' for line in conf.split('\n'): if line.strip().startswith('#') or not line.strip(): continue if line.strip().startswith('[') and line.strip().endswith(']'): section = line.strip().replace('[', '').replace(']', '') if section not in section_to_option: section_to_option[section] = [] else: section_to_option[section].append(line.strip()) first_section = True for section, options in section_to_option.items(): if not first_section: final_conf += '\n' final_conf += f'[{section}]\n' for option in options: final_conf += f'{option}\n' first_section = False return final_conf def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None: if filter_host: self.cache.invalidate_host_daemons(filter_host) else: for h in self.cache.get_hosts(): # Also discover daemons deployed manually self.cache.invalidate_host_daemons(h) self._kick_serve_loop() @handle_orch_error def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> List[orchestrator.ServiceDescription]: if refresh: self._invalidate_daemons_and_kick_serve() self.log.debug('Kicked serve() loop to refresh all services') sm: Dict[str, orchestrator.ServiceDescription] = {} # known services for nm, spec in self.spec_store.all_specs.items(): if service_type is not None and service_type != spec.service_type: continue if service_name is not None and service_name != nm: continue if spec.service_type != 'osd': size = spec.placement.get_target_count(self.cache.get_schedulable_hosts()) else: # osd counting is special size = 0 sm[nm] = orchestrator.ServiceDescription( spec=spec, size=size, running=0, events=self.events.get_for_service(spec.service_name()), created=self.spec_store.spec_created[nm], deleted=self.spec_store.spec_deleted.get(nm, None), virtual_ip=spec.get_virtual_ip(), ports=spec.get_port_start(), ) if spec.service_type == 'ingress': # ingress has 2 daemons running per host # but only if it's the full ingress service, not for keepalive-only if not cast(IngressSpec, spec).keepalive_only: sm[nm].size *= 2 # factor daemons into status for h, dm in self.cache.get_daemons_with_volatile_status(): for name, dd in dm.items(): assert dd.hostname is not None, f'no hostname for {dd!r}' assert dd.daemon_type is not None, f'no daemon_type for {dd!r}' n: str = dd.service_name() if ( service_type and service_type != daemon_type_to_service(dd.daemon_type) ): continue if service_name and service_name != n: continue if n not in sm: # new unmanaged service spec = ServiceSpec( unmanaged=True, service_type=daemon_type_to_service(dd.daemon_type), service_id=dd.service_id(), ) sm[n] = orchestrator.ServiceDescription( last_refresh=dd.last_refresh, container_image_id=dd.container_image_id, container_image_name=dd.container_image_name, spec=spec, size=0, ) if dd.status == DaemonDescriptionStatus.running: sm[n].running += 1 if dd.daemon_type == 'osd': # The osd count can't be determined by the Placement spec. # Showing an actual/expected representation cannot be determined # here. So we're setting running = size for now. sm[n].size += 1 if ( not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh # type: ignore ): sm[n].last_refresh = dd.last_refresh return list(sm.values()) @handle_orch_error def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> List[orchestrator.DaemonDescription]: if refresh: self._invalidate_daemons_and_kick_serve(host) self.log.debug('Kicked serve() loop to refresh all daemons') result = [] for h, dm in self.cache.get_daemons_with_volatile_status(): if host and h != host: continue for name, dd in dm.items(): if daemon_type is not None and daemon_type != dd.daemon_type: continue if daemon_id is not None and daemon_id != dd.daemon_id: continue if service_name is not None and service_name != dd.service_name(): continue if not dd.memory_request and dd.daemon_type in ['osd', 'mon']: dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option( dd.name(), f"{dd.daemon_type}_memory_target" )) result.append(dd) return result @handle_orch_error def service_action(self, action: str, service_name: str) -> List[str]: if service_name not in self.spec_store.all_specs.keys(): raise OrchestratorError(f'Invalid service name "{service_name}".' + ' View currently running services using "ceph orch ls"') dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) if not dds: raise OrchestratorError(f'No daemons exist under service name "{service_name}".' + ' View currently running services using "ceph orch ls"') if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']: return [f'Stopping entire {service_name} service is prohibited.'] self.log.info('%s service %s' % (action.capitalize(), service_name)) return [ self._schedule_daemon_action(dd.name(), action) for dd in dds ] def _rotate_daemon_key(self, daemon_spec: CephadmDaemonDeploySpec) -> str: self.log.info(f'Rotating authentication key for {daemon_spec.name()}') rc, out, err = self.mon_command({ 'prefix': 'auth get-or-create-pending', 'entity': daemon_spec.entity_name(), 'format': 'json', }) j = json.loads(out) pending_key = j[0]['pending_key'] # deploy a new keyring file if daemon_spec.daemon_type != 'osd': daemon_spec = self.cephadm_services[daemon_type_to_service( daemon_spec.daemon_type)].prepare_create(daemon_spec) with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)) # try to be clever, or fall back to restarting the daemon rc = -1 if daemon_spec.daemon_type == 'osd': rc, out, err = self.tool_exec( args=['ceph', 'tell', daemon_spec.name(), 'rotate-stored-key', '-i', '-'], stdin=pending_key.encode() ) if not rc: rc, out, err = self.tool_exec( args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'], stdin=pending_key.encode() ) elif daemon_spec.daemon_type == 'mds': rc, out, err = self.tool_exec( args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'], stdin=pending_key.encode() ) elif ( daemon_spec.daemon_type == 'mgr' and daemon_spec.daemon_id == self.get_mgr_id() ): rc, out, err = self.tool_exec( args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'], stdin=pending_key.encode() ) if rc: self._daemon_action(daemon_spec, 'restart') return f'Rotated key for {daemon_spec.name()}' def _daemon_action(self, daemon_spec: CephadmDaemonDeploySpec, action: str, image: Optional[str] = None) -> str: self._daemon_action_set_image(action, image, daemon_spec.daemon_type, daemon_spec.daemon_id) if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type, daemon_spec.daemon_id): self.mgr_service.fail_over() return '' # unreachable if action == 'rotate-key': return self._rotate_daemon_key(daemon_spec) if action == 'redeploy' or action == 'reconfig': if daemon_spec.daemon_type != 'osd': daemon_spec = self.cephadm_services[daemon_type_to_service( daemon_spec.daemon_type)].prepare_create(daemon_spec) else: # for OSDs, we still need to update config, just not carry out the full # prepare_create function daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config( daemon_spec) with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): return self.wait_async( CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))) actions = { 'start': ['reset-failed', 'start'], 'stop': ['stop'], 'restart': ['reset-failed', 'restart'], } name = daemon_spec.name() for a in actions[action]: try: with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'): out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( daemon_spec.host, name, 'unit', ['--name', name, a])) except Exception: self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed') self.cache.invalidate_host_daemons(daemon_spec.host) msg = "{} {} from host '{}'".format(action, name, daemon_spec.host) self.events.for_daemon(name, 'INFO', msg) return msg def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None: if image is not None: if action != 'redeploy': raise OrchestratorError( f'Cannot execute {action} with new image. `action` needs to be `redeploy`') if daemon_type not in CEPH_IMAGE_TYPES: raise OrchestratorError( f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported ' f'types are: {", ".join(CEPH_IMAGE_TYPES)}') self.check_mon_command({ 'prefix': 'config set', 'name': 'container_image', 'value': image, 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), }) @handle_orch_error def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: d = self.cache.get_daemon(daemon_name) assert d.daemon_type is not None assert d.daemon_id is not None if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \ and not self.mgr_service.mgr_map_has_standby(): raise OrchestratorError( f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') if action == 'rotate-key': if d.daemon_type not in ['mgr', 'osd', 'mds', 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']: raise OrchestratorError( f'key rotation not supported for {d.daemon_type}' ) self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id) self.log.info(f'Schedule {action} daemon {daemon_name}') return self._schedule_daemon_action(daemon_name, action) def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() def get_active_mgr(self) -> DaemonDescription: return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr')) def get_active_mgr_digests(self) -> List[str]: digests = self.mgr_service.get_active_daemon( self.cache.get_daemons_by_type('mgr')).container_image_digests return digests if digests else [] def _schedule_daemon_action(self, daemon_name: str, action: str) -> str: dd = self.cache.get_daemon(daemon_name) assert dd.daemon_type is not None assert dd.daemon_id is not None assert dd.hostname is not None if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ and not self.mgr_service.mgr_map_has_standby(): raise OrchestratorError( f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') self.cache.schedule_daemon_action(dd.hostname, dd.name(), action) self.cache.save_host(dd.hostname) msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname) self._kick_serve_loop() return msg @handle_orch_error def remove_daemons(self, names): # type: (List[str]) -> List[str] args = [] for host, dm in self.cache.daemons.items(): for name in names: if name in dm: args.append((name, host)) if not args: raise OrchestratorError('Unable to find daemon(s) %s' % (names)) self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args])) return self._remove_daemons(args) @handle_orch_error def remove_service(self, service_name: str, force: bool = False) -> str: self.log.info('Remove service %s' % service_name) self._trigger_preview_refresh(service_name=service_name) if service_name in self.spec_store: if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'): return f'Unable to remove {service_name} service.\n' \ f'Note, you might want to mark the {service_name} service as "unmanaged"' else: return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n" # Report list of affected OSDs? if not force and service_name.startswith('osd.'): osds_msg = {} for h, dm in self.cache.get_daemons_with_volatile_status(): osds_to_remove = [] for name, dd in dm.items(): if dd.daemon_type == 'osd' and dd.service_name() == service_name: osds_to_remove.append(str(dd.daemon_id)) if osds_to_remove: osds_msg[h] = osds_to_remove if osds_msg: msg = '' for h, ls in osds_msg.items(): msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}' raise OrchestratorError( f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}') found = self.spec_store.rm(service_name) if found and service_name.startswith('osd.'): self.spec_store.finally_rm(service_name) self._kick_serve_loop() return f'Removed service {service_name}' @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: """ Return the storage inventory of hosts matching the given filter. :param host_filter: host filter TODO: - add filtering by label """ if refresh: if host_filter and host_filter.hosts: for h in host_filter.hosts: self.log.debug(f'will refresh {h} devs') self.cache.invalidate_host_devices(h) self.cache.invalidate_host_networks(h) else: for h in self.cache.get_hosts(): self.log.debug(f'will refresh {h} devs') self.cache.invalidate_host_devices(h) self.cache.invalidate_host_networks(h) self.event.set() self.log.debug('Kicked serve() loop to refresh devices') result = [] for host, dls in self.cache.devices.items(): if host_filter and host_filter.hosts and host not in host_filter.hosts: continue result.append(orchestrator.InventoryHost(host, inventory.Devices(dls))) return result @handle_orch_error def zap_device(self, host: str, path: str) -> str: """Zap a device on a managed host. Use ceph-volume zap to return a device to an unused/free state Args: host (str): hostname of the cluster host path (str): device path Raises: OrchestratorError: host is not a cluster host OrchestratorError: host is in maintenance and therefore unavailable OrchestratorError: device path not found on the host OrchestratorError: device is known to a different ceph cluster OrchestratorError: device holds active osd OrchestratorError: device cache hasn't been populated yet.. Returns: str: output from the zap command """ self.log.info('Zap device %s:%s' % (host, path)) if host not in self.inventory.keys(): raise OrchestratorError( f"Host '{host}' is not a member of the cluster") host_info = self.inventory._inventory.get(host, {}) if host_info.get('status', '').lower() == 'maintenance': raise OrchestratorError( f"Host '{host}' is in maintenance mode, which prevents any actions against it.") if host not in self.cache.devices: raise OrchestratorError( f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.") host_devices = self.cache.devices[host] path_found = False osd_id_list: List[str] = [] for dev in host_devices: if dev.path == path: path_found = True break if not path_found: raise OrchestratorError( f"Device path '{path}' not found on host '{host}'") if osd_id_list: dev_name = os.path.basename(path) active_osds: List[str] = [] for osd_id in osd_id_list: metadata = self.get_metadata('osd', str(osd_id)) if metadata: if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','): active_osds.append("osd." + osd_id) if active_osds: raise OrchestratorError( f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active " f"OSD{'s' if len(active_osds) > 1 else ''}" f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.") cv_args = ['--', 'lvm', 'zap', '--destroy', path] with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'): out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( host, 'osd', 'ceph-volume', cv_args, error_ok=True)) self.cache.invalidate_host_devices(host) self.cache.invalidate_host_networks(host) if code: raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) msg = f'zap successful for {path} on {host}' self.log.info(msg) return msg + '\n' @handle_orch_error def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: """ Blink a device light. Calling something like:: lsmcli local-disk-ident-led-on --path $path If you must, you can customize this via:: ceph config-key set mgr/cephadm/blink_device_light_cmd '' ceph config-key set mgr/cephadm//blink_device_light_cmd '' See templates/blink_device_light_cmd.j2 """ @forall_hosts def blink(host: str, dev: str, path: str) -> str: cmd_line = self.template.render('blink_device_light_cmd.j2', { 'on': on, 'ident_fault': ident_fault, 'dev': dev, 'path': path }, host=host) cmd_args = shlex.split(cmd_line) with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'): out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( host, 'osd', 'shell', ['--'] + cmd_args, error_ok=True)) if code: raise OrchestratorError( 'Unable to affect %s light for %s:%s. Command: %s' % ( ident_fault, host, dev, ' '.join(cmd_args))) self.log.info('Set %s light for %s:%s %s' % ( ident_fault, host, dev, 'on' if on else 'off')) return "Set %s light for %s:%s %s" % ( ident_fault, host, dev, 'on' if on else 'off') return blink(locs) def get_osd_uuid_map(self, only_up=False): # type: (bool) -> Dict[str, str] osd_map = self.get('osd_map') r = {} for o in osd_map['osds']: # only include OSDs that have ever started in this map. this way # an interrupted osd create can be repeated and succeed the second # time around. osd_id = o.get('osd') if osd_id is None: raise OrchestratorError("Could not retrieve osd_id from osd_map") if not only_up: r[str(osd_id)] = o.get('uuid', '') return r def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]: osd = [x for x in self.get('osd_map')['osds'] if x['osd'] == osd_id] if len(osd) != 1: return None return osd[0] def _trigger_preview_refresh(self, specs: Optional[List[DriveGroupSpec]] = None, service_name: Optional[str] = None, ) -> None: # Only trigger a refresh when a spec has changed trigger_specs = [] if specs: for spec in specs: preview_spec = self.spec_store.spec_preview.get(spec.service_name()) # the to-be-preview spec != the actual spec, this means we need to # trigger a refresh, if the spec has been removed (==None) we need to # refresh as well. if not preview_spec or spec != preview_spec: trigger_specs.append(spec) if service_name: trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))] if not any(trigger_specs): return None refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs) for host in refresh_hosts: self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") self.cache.osdspec_previews_refresh_queue.append(host) @handle_orch_error def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: """ Deprecated. Please use `apply()` instead. Keeping this around to be compatible to mgr/dashboard """ return [self._apply(spec) for spec in specs] @handle_orch_error def create_osds(self, drive_group: DriveGroupSpec) -> str: hosts: List[HostSpec] = self.inventory.all_specs() filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts) if not filtered_hosts: return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts" return self.osd_service.create_from_spec(drive_group) def _preview_osdspecs(self, osdspecs: Optional[List[DriveGroupSpec]] = None ) -> dict: if not osdspecs: return {'n/a': [{'error': True, 'message': 'No OSDSpec or matching hosts found.'}]} matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs) if not matching_hosts: return {'n/a': [{'error': True, 'message': 'No OSDSpec or matching hosts found.'}]} # Is any host still loading previews or still in the queue to be previewed pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts} if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts): # Report 'pending' when any of the matching hosts is still loading previews (flag is True) return {'n/a': [{'error': True, 'message': 'Preview data is being generated.. ' 'Please re-run this command in a bit.'}]} # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs previews_for_specs = {} for host, raw_reports in self.cache.osdspec_previews.items(): if host not in matching_hosts: continue osd_reports = [] for osd_report in raw_reports: if osd_report.get('osdspec') in [x.service_id for x in osdspecs]: osd_reports.append(osd_report) previews_for_specs.update({host: osd_reports}) return previews_for_specs def _calc_daemon_deps(self, spec: Optional[ServiceSpec], daemon_type: str, daemon_id: str) -> List[str]: def get_daemon_names(daemons: List[str]) -> List[str]: daemon_names = [] for daemon_type in daemons: for dd in self.cache.get_daemons_by_type(daemon_type): daemon_names.append(dd.name()) return daemon_names alertmanager_user, alertmanager_password = self._get_alertmanager_credentials() prometheus_user, prometheus_password = self._get_prometheus_credentials() deps = [] if daemon_type == 'haproxy': # because cephadm creates new daemon instances whenever # port or ip changes, identifying daemons by name is # sufficient to detect changes. if not spec: return [] ingress_spec = cast(IngressSpec, spec) assert ingress_spec.backend_service daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service) deps = [d.name() for d in daemons] elif daemon_type == 'keepalived': # because cephadm creates new daemon instances whenever # port or ip changes, identifying daemons by name is # sufficient to detect changes. if not spec: return [] daemons = self.cache.get_daemons_by_service(spec.service_name()) deps = [d.name() for d in daemons if d.daemon_type == 'haproxy'] elif daemon_type == 'agent': root_cert = '' server_port = '' try: server_port = str(self.http_server.agent.server_port) root_cert = self.http_server.agent.ssl_certs.get_root_cert() except Exception: pass deps = sorted([self.get_mgr_ip(), server_port, root_cert, str(self.device_enhanced_scan)]) elif daemon_type == 'iscsi': if spec: iscsi_spec = cast(IscsiServiceSpec, spec) deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)] else: deps = [self.get_mgr_ip()] elif daemon_type == 'prometheus': # for prometheus we add the active mgr as an explicit dependency, # this way we force a redeploy after a mgr failover deps.append(self.get_active_mgr().name()) deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283))) deps.append(str(self.service_discovery_port)) # prometheus yaml configuration file (generated by prometheus.yml.j2) contains # a scrape_configs section for each service type. This should be included only # when at least one daemon of the corresponding service is running. Therefore, # an explicit dependency is added for each service-type to force a reconfig # whenever the number of daemons for those service-type changes from 0 to greater # than zero and vice versa. deps += [s for s in ['node-exporter', 'alertmanager'] if self.cache.get_daemons_by_service(s)] if len(self.cache.get_daemons_by_type('ingress')) > 0: deps.append('ingress') # add dependency on ceph-exporter daemons deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')] if self.secure_monitoring_stack: if prometheus_user and prometheus_password: deps.append(f'{hash(prometheus_user + prometheus_password)}') if alertmanager_user and alertmanager_password: deps.append(f'{hash(alertmanager_user + alertmanager_password)}') elif daemon_type == 'grafana': deps += get_daemon_names(['prometheus', 'loki']) if self.secure_monitoring_stack and prometheus_user and prometheus_password: deps.append(f'{hash(prometheus_user + prometheus_password)}') elif daemon_type == 'alertmanager': deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway']) if self.secure_monitoring_stack and alertmanager_user and alertmanager_password: deps.append(f'{hash(alertmanager_user + alertmanager_password)}') elif daemon_type == 'promtail': deps += get_daemon_names(['loki']) else: # TODO(redo): some error message! pass if daemon_type in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']: deps.append(f'secure_monitoring_stack:{self.secure_monitoring_stack}') return sorted(deps) @forall_hosts def _remove_daemons(self, name: str, host: str) -> str: return CephadmServe(self)._remove_daemon(name, host) def _check_pool_exists(self, pool: str, service_name: str) -> None: logger.info(f'Checking pool "{pool}" exists for service {service_name}') if not self.rados.pool_exists(pool): raise OrchestratorError(f'Cannot find pool "{pool}" for ' f'service {service_name}') def _add_daemon(self, daemon_type: str, spec: ServiceSpec) -> List[str]: """ Add (and place) a daemon. Require explicit host placement. Do not schedule, and do not apply the related scheduling limitations. """ if spec.service_name() not in self.spec_store: raise OrchestratorError('Unable to add a Daemon without Service.\n' 'Please use `ceph orch apply ...` to create a Service.\n' 'Note, you might want to create the service with "unmanaged=true"') self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) if not spec.placement.hosts: raise OrchestratorError('must specify host(s) to deploy on') count = spec.placement.count or len(spec.placement.hosts) daemons = self.cache.get_daemons_by_service(spec.service_name()) return self._create_daemons(daemon_type, spec, daemons, spec.placement.hosts, count) def _create_daemons(self, daemon_type: str, spec: ServiceSpec, daemons: List[DaemonDescription], hosts: List[HostPlacementSpec], count: int) -> List[str]: if count > len(hosts): raise OrchestratorError('too few hosts: want %d, have %s' % ( count, hosts)) did_config = False service_type = daemon_type_to_service(daemon_type) args = [] # type: List[CephadmDaemonDeploySpec] for host, network, name in hosts: daemon_id = self.get_unique_name(daemon_type, host, daemons, prefix=spec.service_id, forcename=name) if not did_config: self.cephadm_services[service_type].config(spec) did_config = True daemon_spec = self.cephadm_services[service_type].make_daemon_spec( host, daemon_id, network, spec, # NOTE: this does not consider port conflicts! ports=spec.get_port_start()) self.log.debug('Placing %s.%s on host %s' % ( daemon_type, daemon_id, host)) args.append(daemon_spec) # add to daemon list so next name(s) will also be unique sd = orchestrator.DaemonDescription( hostname=host, daemon_type=daemon_type, daemon_id=daemon_id, ) daemons.append(sd) @ forall_hosts def create_func_map(*args: Any) -> str: daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec)) return create_func_map(args) @handle_orch_error def add_daemon(self, spec: ServiceSpec) -> List[str]: ret: List[str] = [] try: with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True): for d_type in service_to_daemon_types(spec.service_type): ret.extend(self._add_daemon(d_type, spec)) return ret except OrchestratorError as e: self.events.from_orch_error(e) raise def _get_alertmanager_credentials(self) -> Tuple[str, str]: user = self.get_store(AlertmanagerService.USER_CFG_KEY) password = self.get_store(AlertmanagerService.PASS_CFG_KEY) if user is None or password is None: user = 'admin' password = 'admin' self.set_store(AlertmanagerService.USER_CFG_KEY, user) self.set_store(AlertmanagerService.PASS_CFG_KEY, password) return (user, password) def _get_prometheus_credentials(self) -> Tuple[str, str]: user = self.get_store(PrometheusService.USER_CFG_KEY) password = self.get_store(PrometheusService.PASS_CFG_KEY) if user is None or password is None: user = 'admin' password = 'admin' self.set_store(PrometheusService.USER_CFG_KEY, user) self.set_store(PrometheusService.PASS_CFG_KEY, password) return (user, password) @handle_orch_error def set_prometheus_access_info(self, user: str, password: str) -> str: self.set_store(PrometheusService.USER_CFG_KEY, user) self.set_store(PrometheusService.PASS_CFG_KEY, password) return 'prometheus credentials updated correctly' @handle_orch_error def set_alertmanager_access_info(self, user: str, password: str) -> str: self.set_store(AlertmanagerService.USER_CFG_KEY, user) self.set_store(AlertmanagerService.PASS_CFG_KEY, password) return 'alertmanager credentials updated correctly' @handle_orch_error def get_prometheus_access_info(self) -> Dict[str, str]: user, password = self._get_prometheus_credentials() return {'user': user, 'password': password, 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()} @handle_orch_error def get_alertmanager_access_info(self) -> Dict[str, str]: user, password = self._get_alertmanager_credentials() return {'user': user, 'password': password, 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()} @handle_orch_error def apply_mon(self, spec: ServiceSpec) -> str: return self._apply(spec) def _apply(self, spec: GenericSpec) -> str: if spec.service_type == 'host': return self._add_host(cast(HostSpec, spec)) if spec.service_type == 'osd': # _trigger preview refresh needs to be smart and # should only refresh if a change has been detected self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)]) return self._apply_service_spec(cast(ServiceSpec, spec)) def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]: """Return a list of candidate hosts according to the placement specification.""" all_hosts = self.cache.get_schedulable_hosts() candidates = [] if placement.hosts: candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts] elif placement.label: candidates = [x.hostname for x in [h for h in all_hosts if placement.label in h.labels]] elif placement.host_pattern: candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)] elif (placement.count is not None or placement.count_per_host is not None): candidates = [x.hostname for x in all_hosts] return [h for h in candidates if not self.cache.is_host_draining(h)] def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None: """Validate placement specification for TunedProfileSpec and ClientKeyringSpec.""" if spec.count is not None: raise OrchestratorError( "Placement 'count' field is no supported for this specification.") if spec.count_per_host is not None: raise OrchestratorError( "Placement 'count_per_host' field is no supported for this specification.") if spec.hosts: all_hosts = [h.hostname for h in self.inventory.all_specs()] invalid_hosts = [h.hostname for h in spec.hosts if h.hostname not in all_hosts] if invalid_hosts: raise OrchestratorError(f"Found invalid host(s) in placement section: {invalid_hosts}. " f"Please check 'ceph orch host ls' for available hosts.") elif not self._get_candidate_hosts(spec): raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n" "Please check 'ceph orch host ls' for available hosts.\n" "Note: draining hosts are excluded from the candidate list.") def _validate_tunedprofile_settings(self, spec: TunedProfileSpec) -> Dict[str, List[str]]: candidate_hosts = spec.placement.filter_matching_hostspecs(self.inventory.all_specs()) invalid_options: Dict[str, List[str]] = {} for host in candidate_hosts: host_sysctl_options = self.cache.get_facts(host).get('sysctl_options', {}) invalid_options[host] = [] for option in spec.settings: if option not in host_sysctl_options: invalid_options[host].append(option) return invalid_options def _validate_tuned_profile_spec(self, spec: TunedProfileSpec) -> None: if not spec.settings: raise OrchestratorError("Invalid spec: settings section cannot be empty.") self._validate_one_shot_placement_spec(spec.placement) invalid_options = self._validate_tunedprofile_settings(spec) if any(e for e in invalid_options.values()): raise OrchestratorError( f'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}') @handle_orch_error def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str: outs = [] for spec in specs: self._validate_tuned_profile_spec(spec) if no_overwrite and self.tuned_profiles.exists(spec.profile_name): outs.append( f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)") else: # done, let's save the specs self.tuned_profiles.add_profile(spec) outs.append(f'Saved tuned profile {spec.profile_name}') self._kick_serve_loop() return '\n'.join(outs) @handle_orch_error def rm_tuned_profile(self, profile_name: str) -> str: if profile_name not in self.tuned_profiles: raise OrchestratorError( f'Tuned profile {profile_name} does not exist. Nothing to remove.') self.tuned_profiles.rm_profile(profile_name) self._kick_serve_loop() return f'Removed tuned profile {profile_name}' @handle_orch_error def tuned_profile_ls(self) -> List[TunedProfileSpec]: return self.tuned_profiles.list_profiles() @handle_orch_error def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str: if profile_name not in self.tuned_profiles: raise OrchestratorError( f'Tuned profile {profile_name} does not exist. Cannot add setting.') self.tuned_profiles.add_setting(profile_name, setting, value) self._kick_serve_loop() return f'Added setting {setting} with value {value} to tuned profile {profile_name}' @handle_orch_error def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str: if profile_name not in self.tuned_profiles: raise OrchestratorError( f'Tuned profile {profile_name} does not exist. Cannot remove setting.') self.tuned_profiles.rm_setting(profile_name, setting) self._kick_serve_loop() return f'Removed setting {setting} from tuned profile {profile_name}' @handle_orch_error def service_discovery_dump_cert(self) -> str: root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT) if not root_cert: raise OrchestratorError('No certificate found for service discovery') return root_cert def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None: self.health_checks[name] = { 'severity': 'warning', 'summary': summary, 'count': count, 'detail': detail, } self.set_health_checks(self.health_checks) def remove_health_warning(self, name: str) -> None: if name in self.health_checks: del self.health_checks[name] self.set_health_checks(self.health_checks) def _plan(self, spec: ServiceSpec) -> dict: if spec.service_type == 'osd': return {'service_name': spec.service_name(), 'service_type': spec.service_type, 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} svc = self.cephadm_services[spec.service_type] ha = HostAssignment( spec=spec, hosts=self.cache.get_schedulable_hosts(), unreachable_hosts=self.cache.get_unreachable_hosts(), draining_hosts=self.cache.get_draining_hosts(), networks=self.cache.networks, daemons=self.cache.get_daemons_by_service(spec.service_name()), allow_colo=svc.allow_colo(), rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None ) ha.validate() hosts, to_add, to_remove = ha.place() return { 'service_name': spec.service_name(), 'service_type': spec.service_type, 'add': [hs.hostname for hs in to_add], 'remove': [d.name() for d in to_remove] } @handle_orch_error def plan(self, specs: Sequence[GenericSpec]) -> List: results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' 'to the current inventory setup. If any of these conditions change, the \n' 'preview will be invalid. Please make sure to have a minimal \n' 'timeframe between planning and applying the specs.'}] if any([spec.service_type == 'host' for spec in specs]): return [{'error': 'Found . Previews that include Host Specifications are not supported, yet.'}] for spec in specs: results.append(self._plan(cast(ServiceSpec, spec))) return results def _apply_service_spec(self, spec: ServiceSpec) -> str: if spec.placement.is_empty(): # fill in default placement defaults = { 'mon': PlacementSpec(count=5), 'mgr': PlacementSpec(count=2), 'mds': PlacementSpec(count=2), 'rgw': PlacementSpec(count=2), 'ingress': PlacementSpec(count=2), 'iscsi': PlacementSpec(count=1), 'nvmeof': PlacementSpec(count=1), 'rbd-mirror': PlacementSpec(count=2), 'cephfs-mirror': PlacementSpec(count=1), 'nfs': PlacementSpec(count=1), 'grafana': PlacementSpec(count=1), 'alertmanager': PlacementSpec(count=1), 'prometheus': PlacementSpec(count=1), 'node-exporter': PlacementSpec(host_pattern='*'), 'ceph-exporter': PlacementSpec(host_pattern='*'), 'loki': PlacementSpec(count=1), 'promtail': PlacementSpec(host_pattern='*'), 'crash': PlacementSpec(host_pattern='*'), 'container': PlacementSpec(count=1), 'snmp-gateway': PlacementSpec(count=1), 'elasticsearch': PlacementSpec(count=1), 'jaeger-agent': PlacementSpec(host_pattern='*'), 'jaeger-collector': PlacementSpec(count=1), 'jaeger-query': PlacementSpec(count=1) } spec.placement = defaults[spec.service_type] elif spec.service_type in ['mon', 'mgr'] and \ spec.placement.count is not None and \ spec.placement.count < 1: raise OrchestratorError('cannot scale %s service below 1' % ( spec.service_type)) host_count = len(self.inventory.keys()) max_count = self.max_count_per_host if spec.placement.count is not None: if spec.service_type in ['mon', 'mgr']: if spec.placement.count > max(5, host_count): raise OrchestratorError( (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.')) elif spec.service_type != 'osd': if spec.placement.count > (max_count * host_count): raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).' + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd': raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.' + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) HostAssignment( spec=spec, hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh unreachable_hosts=self.cache.get_unreachable_hosts(), draining_hosts=self.cache.get_draining_hosts(), networks=self.cache.networks, daemons=self.cache.get_daemons_by_service(spec.service_name()), allow_colo=self.cephadm_services[spec.service_type].allow_colo(), ).validate() self.log.info('Saving service %s spec with placement %s' % ( spec.service_name(), spec.placement.pretty_str())) self.spec_store.save(spec) self._kick_serve_loop() return "Scheduled %s update..." % spec.service_name() @handle_orch_error def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]: results = [] for spec in specs: if no_overwrite: if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory: results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag' % (cast(HostSpec, spec).hostname, spec.service_type)) continue elif cast(ServiceSpec, spec).service_name() in self.spec_store: results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag' % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name())) continue results.append(self._apply(spec)) return results @handle_orch_error def apply_mgr(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_mds(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_rgw(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_ingress(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_iscsi(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_rbd_mirror(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_nfs(self, spec: ServiceSpec) -> str: return self._apply(spec) def _get_dashboard_url(self): # type: () -> str return self.get('mgr_map').get('services', {}).get('dashboard', '') @handle_orch_error def apply_prometheus(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_loki(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_promtail(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_node_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_ceph_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_crash(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_grafana(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_alertmanager(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_container(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def apply_snmp_gateway(self, spec: ServiceSpec) -> str: return self._apply(spec) @handle_orch_error def set_unmanaged(self, service_name: str, value: bool) -> str: return self.spec_store.set_unmanaged(service_name, value) @handle_orch_error def upgrade_check(self, image: str, version: str) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("check aborted - you have hosts in maintenance state") if version: target_name = self.container_image_base + ':v' + version elif image: target_name = image else: raise OrchestratorError('must specify either image or version') with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'): image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name)) ceph_image_version = image_info.ceph_version if not ceph_image_version: return f'Unable to extract ceph version from {target_name}.' if ceph_image_version.startswith('ceph version '): ceph_image_version = ceph_image_version.split(' ')[2] version_error = self.upgrade._check_target_version(ceph_image_version) if version_error: return f'Incompatible upgrade: {version_error}' self.log.debug(f'image info {image} -> {image_info}') r: dict = { 'target_name': target_name, 'target_id': image_info.image_id, 'target_version': image_info.ceph_version, 'needs_update': dict(), 'up_to_date': list(), 'non_ceph_image_daemons': list() } for host, dm in self.cache.daemons.items(): for name, dd in dm.items(): # check if the container digest for the digest we're checking upgrades for matches # the container digests for the daemon if "use_repo_digest" setting is true # or that the image name matches the daemon's image name if "use_repo_digest" # is false. The idea is to generally check if the daemon is already using # the image we're checking upgrade to. if ( (self.use_repo_digest and dd.matches_digests(image_info.repo_digests)) or (not self.use_repo_digest and dd.matches_image_name(image)) ): r['up_to_date'].append(dd.name()) elif dd.daemon_type in CEPH_IMAGE_TYPES: r['needs_update'][dd.name()] = { 'current_name': dd.container_image_name, 'current_id': dd.container_image_id, 'current_version': dd.version, } else: r['non_ceph_image_daemons'].append(dd.name()) if self.use_repo_digest and image_info.repo_digests: # FIXME: we assume the first digest is the best one to use r['target_digest'] = image_info.repo_digests[0] return json.dumps(r, indent=4, sort_keys=True) @handle_orch_error def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: return self.upgrade.upgrade_status() @handle_orch_error def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]: return self.upgrade.upgrade_ls(image, tags, show_all_versions) @handle_orch_error def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state") if self.offline_hosts: raise OrchestratorError( f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}") if daemon_types is not None and services is not None: raise OrchestratorError('--daemon-types and --services are mutually exclusive') if daemon_types is not None: for dtype in daemon_types: if dtype not in CEPH_UPGRADE_ORDER: raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n' f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}') if services is not None: for service in services: if service not in self.spec_store: raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n' f'Known services are: {self.spec_store.all_specs.keys()}') hosts: Optional[List[str]] = None if host_placement is not None: all_hosts = list(self.inventory.all_specs()) placement = PlacementSpec.from_string(host_placement) hosts = placement.filter_matching_hostspecs(all_hosts) if not hosts: raise OrchestratorError( f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts') if limit is not None: if limit < 1: raise OrchestratorError( f'Upgrade aborted - --limit arg must be a positive integer, not {limit}') return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit) @handle_orch_error def upgrade_pause(self) -> str: return self.upgrade.upgrade_pause() @handle_orch_error def upgrade_resume(self) -> str: return self.upgrade.upgrade_resume() @handle_orch_error def upgrade_stop(self) -> str: return self.upgrade.upgrade_stop() @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False, no_destroy: bool = False) -> str: """ Takes a list of OSDs and schedules them for removal. The function that takes care of the actual removal is process_removal_queue(). """ daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') to_remove_daemons = list() for daemon in daemons: if daemon.daemon_id in osd_ids: to_remove_daemons.append(daemon) if not to_remove_daemons: return f"Unable to find OSDs: {osd_ids}" for daemon in to_remove_daemons: assert daemon.daemon_id is not None try: self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), replace=replace, force=force, zap=zap, no_destroy=no_destroy, hostname=daemon.hostname, process_started_at=datetime_now(), remove_util=self.to_remove_osds.rm_util)) except NotFoundError: return f"Unable to find OSDs: {osd_ids}" # trigger the serve loop to initiate the removal self._kick_serve_loop() warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n" "Run the `ceph-volume lvm zap` command with `--destroy`" " against the VG/LV if you want them to be destroyed.") return f"Scheduled OSD(s) for removal.{warning_zap}" @handle_orch_error def stop_remove_osds(self, osd_ids: List[str]) -> str: """ Stops a `removal` process for a List of OSDs. This will revert their weight and remove it from the osds_to_remove queue """ for osd_id in osd_ids: try: self.to_remove_osds.rm(OSD(osd_id=int(osd_id), remove_util=self.to_remove_osds.rm_util)) except (NotFoundError, KeyError, ValueError): return f'Unable to find OSD in the queue: {osd_id}' # trigger the serve loop to halt the removal self._kick_serve_loop() return "Stopped OSD(s) removal" @handle_orch_error def remove_osds_status(self) -> List[OSD]: """ The CLI call to retrieve an osd removal report """ return self.to_remove_osds.all_osds() @handle_orch_error def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str: """ Drain all daemons from a host. :param host: host name """ # if we drain the last admin host we could end up removing the only instance # of the config and keyring and cause issues if not force: p = PlacementSpec(label=SpecialHostLabels.ADMIN) admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) if len(admin_hosts) == 1 and admin_hosts[0] == hostname: raise OrchestratorValidationError(f"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'" " label.\nDraining this host could cause the removal" " of the last cluster config/keyring managed by cephadm.\n" f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host" " before completing this operation.\nIf you're certain this is" " what you want rerun this command with --force.") self.add_host_label(hostname, '_no_schedule') if not keep_conf_keyring: self.add_host_label(hostname, SpecialHostLabels.DRAIN_CONF_KEYRING) daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname) osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd'] self.remove_osds(osds_to_remove, zap=zap_osd_devices) daemons_table = "" daemons_table += "{:<20} {:<15}\n".format("type", "id") daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) for d in daemons: daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table) def trigger_connect_dashboard_rgw(self) -> None: self.need_connect_dashboard_rgw = True self.event.set()