From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/cephadm/module.py | 3405 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 3405 insertions(+) create mode 100644 src/pybind/mgr/cephadm/module.py (limited to 'src/pybind/mgr/cephadm/module.py') diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py new file mode 100644 index 000000000..7b97ce74a --- /dev/null +++ b/src/pybind/mgr/cephadm/module.py @@ -0,0 +1,3405 @@ +import asyncio +import json +import errno +import ipaddress +import logging +import re +import shlex +from collections import defaultdict +from configparser import ConfigParser +from contextlib import contextmanager +from functools import wraps +from tempfile import TemporaryDirectory, NamedTemporaryFile +from threading import Event + +from cephadm.service_discovery import ServiceDiscovery + +import string +from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ + Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \ + Awaitable, Iterator + +import datetime +import os +import random +import multiprocessing.pool +import subprocess +from prettytable import PrettyTable + +from ceph.deployment import inventory +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.service_spec import \ + ServiceSpec, PlacementSpec, \ + HostPlacementSpec, IngressSpec, \ + TunedProfileSpec, IscsiServiceSpec +from ceph.utils import str_to_datetime, datetime_to_str, datetime_now +from cephadm.serve import CephadmServe +from cephadm.services.cephadmservice import CephadmDaemonDeploySpec +from cephadm.http_server import CephadmHttpServer +from cephadm.agent import CephadmAgentHelpers + + +from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType +import orchestrator +from orchestrator.module import to_format, Format + +from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ + CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \ + service_to_daemon_types +from orchestrator._interface import GenericSpec +from orchestrator._interface import daemon_type_to_service + +from . import utils +from . import ssh +from .migrations import Migrations +from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ + RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \ + CephExporterService +from .services.ingress import IngressService +from .services.container import CustomContainerService +from .services.iscsi import IscsiService +from .services.nvmeof import NvmeofService +from .services.nfs import NFSService +from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError +from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ + NodeExporterService, SNMPGatewayService, LokiService, PromtailService +from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService +from .schedule import HostAssignment +from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \ + ClientKeyringStore, ClientKeyringSpec, TunedProfileStore +from .upgrade import CephadmUpgrade +from .template import TemplateMgr +from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \ + cephadmNoImage, CEPH_UPGRADE_ORDER, SpecialHostLabels +from .configchecks import CephadmConfigChecks +from .offline_watcher import OfflineHostWatcher +from .tuned_profiles import TunedProfileUtils + +try: + import asyncssh +except ImportError as e: + asyncssh = None # type: ignore + asyncssh_import_error = str(e) + +logger = logging.getLogger(__name__) + +T = TypeVar('T') + +DEFAULT_SSH_CONFIG = """ +Host * + User root + StrictHostKeyChecking no + UserKnownHostsFile /dev/null + ConnectTimeout=30 +""" + +# cherrypy likes to sys.exit on error. don't let it take us down too! + + +def os_exit_noop(status: int) -> None: + pass + + +os._exit = os_exit_noop # type: ignore + + +# Default container images ----------------------------------------------------- +DEFAULT_IMAGE = 'quay.io/ceph/ceph' # DO NOT ADD TAG TO THIS +DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0' +DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0' +DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.2' +DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0' +DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0' +DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0' +DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7' +DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3' +DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4' +DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1' +DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23' +DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29' +DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29' +DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29' +# ------------------------------------------------------------------------------ + + +def host_exists(hostname_position: int = 1) -> Callable: + """Check that a hostname exists in the inventory""" + def inner(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + this = args[0] # self object + hostname = args[hostname_position] + if hostname not in this.cache.get_hosts(): + candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)]) + help_msg = f"Did you mean {candidates}?" if candidates else "" + raise OrchestratorError( + f"Cannot find host '{hostname}' in the inventory. {help_msg}") + + return func(*args, **kwargs) + return wrapper + return inner + + +class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, + metaclass=CLICommandMeta): + + _STORE_HOST_PREFIX = "host" + + instance = None + NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary] + NATIVE_OPTIONS = [] # type: List[Any] + MODULE_OPTIONS = [ + Option( + 'ssh_config_file', + type='str', + default=None, + desc='customized SSH config file to connect to managed hosts', + ), + Option( + 'device_cache_timeout', + type='secs', + default=30 * 60, + desc='seconds to cache device inventory', + ), + Option( + 'device_enhanced_scan', + type='bool', + default=False, + desc='Use libstoragemgmt during device scans', + ), + Option( + 'inventory_list_all', + type='bool', + default=False, + desc='Whether ceph-volume inventory should report ' + 'more devices (mostly mappers (LVs / mpaths), partitions...)', + ), + Option( + 'daemon_cache_timeout', + type='secs', + default=10 * 60, + desc='seconds to cache service (daemon) inventory', + ), + Option( + 'facts_cache_timeout', + type='secs', + default=1 * 60, + desc='seconds to cache host facts data', + ), + Option( + 'host_check_interval', + type='secs', + default=10 * 60, + desc='how frequently to perform a host check', + ), + Option( + 'mode', + type='str', + enum_allowed=['root', 'cephadm-package'], + default='root', + desc='mode for remote execution of cephadm', + ), + Option( + 'container_image_base', + default=DEFAULT_IMAGE, + desc='Container image name, without the tag', + runtime=True, + ), + Option( + 'container_image_prometheus', + default=DEFAULT_PROMETHEUS_IMAGE, + desc='Prometheus container image', + ), + Option( + 'container_image_nvmeof', + default=DEFAULT_NVMEOF_IMAGE, + desc='Nvme-of container image', + ), + Option( + 'container_image_grafana', + default=DEFAULT_GRAFANA_IMAGE, + desc='Prometheus container image', + ), + Option( + 'container_image_alertmanager', + default=DEFAULT_ALERT_MANAGER_IMAGE, + desc='Prometheus container image', + ), + Option( + 'container_image_node_exporter', + default=DEFAULT_NODE_EXPORTER_IMAGE, + desc='Prometheus container image', + ), + Option( + 'container_image_loki', + default=DEFAULT_LOKI_IMAGE, + desc='Loki container image', + ), + Option( + 'container_image_promtail', + default=DEFAULT_PROMTAIL_IMAGE, + desc='Promtail container image', + ), + Option( + 'container_image_haproxy', + default=DEFAULT_HAPROXY_IMAGE, + desc='HAproxy container image', + ), + Option( + 'container_image_keepalived', + default=DEFAULT_KEEPALIVED_IMAGE, + desc='Keepalived container image', + ), + Option( + 'container_image_snmp_gateway', + default=DEFAULT_SNMP_GATEWAY_IMAGE, + desc='SNMP Gateway container image', + ), + Option( + 'container_image_elasticsearch', + default=DEFAULT_ELASTICSEARCH_IMAGE, + desc='elasticsearch container image', + ), + Option( + 'container_image_jaeger_agent', + default=DEFAULT_JAEGER_AGENT_IMAGE, + desc='Jaeger agent container image', + ), + Option( + 'container_image_jaeger_collector', + default=DEFAULT_JAEGER_COLLECTOR_IMAGE, + desc='Jaeger collector container image', + ), + Option( + 'container_image_jaeger_query', + default=DEFAULT_JAEGER_QUERY_IMAGE, + desc='Jaeger query container image', + ), + Option( + 'warn_on_stray_hosts', + type='bool', + default=True, + desc='raise a health warning if daemons are detected on a host ' + 'that is not managed by cephadm', + ), + Option( + 'warn_on_stray_daemons', + type='bool', + default=True, + desc='raise a health warning if daemons are detected ' + 'that are not managed by cephadm', + ), + Option( + 'warn_on_failed_host_check', + type='bool', + default=True, + desc='raise a health warning if the host check fails', + ), + Option( + 'log_to_cluster', + type='bool', + default=True, + desc='log to the "cephadm" cluster log channel"', + ), + Option( + 'allow_ptrace', + type='bool', + default=False, + desc='allow SYS_PTRACE capability on ceph containers', + long_desc='The SYS_PTRACE capability is needed to attach to a ' + 'process with gdb or strace. Enabling this options ' + 'can allow debugging daemons that encounter problems ' + 'at runtime.', + ), + Option( + 'container_init', + type='bool', + default=True, + desc='Run podman/docker with `--init`' + ), + Option( + 'prometheus_alerts_path', + type='str', + default='/etc/prometheus/ceph/ceph_default_alerts.yml', + desc='location of alerts to include in prometheus deployments', + ), + Option( + 'migration_current', + type='int', + default=None, + desc='internal - do not modify', + # used to track spec and other data migrations. + ), + Option( + 'config_dashboard', + type='bool', + default=True, + desc='manage configs like API endpoints in Dashboard.' + ), + Option( + 'manage_etc_ceph_ceph_conf', + type='bool', + default=False, + desc='Manage and own /etc/ceph/ceph.conf on the hosts.', + ), + Option( + 'manage_etc_ceph_ceph_conf_hosts', + type='str', + default='*', + desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf', + ), + # not used anymore + Option( + 'registry_url', + type='str', + default=None, + desc='Registry url for login purposes. This is not the default registry' + ), + Option( + 'registry_username', + type='str', + default=None, + desc='Custom repository username. Only used for logging into a registry.' + ), + Option( + 'registry_password', + type='str', + default=None, + desc='Custom repository password. Only used for logging into a registry.' + ), + #### + Option( + 'registry_insecure', + type='bool', + default=False, + desc='Registry is to be considered insecure (no TLS available). Only for development purposes.' + ), + Option( + 'use_repo_digest', + type='bool', + default=True, + desc='Automatically convert image tags to image digest. Make sure all daemons use the same image', + ), + Option( + 'config_checks_enabled', + type='bool', + default=False, + desc='Enable or disable the cephadm configuration analysis', + ), + Option( + 'default_registry', + type='str', + default='docker.io', + desc='Search-registry to which we should normalize unqualified image names. ' + 'This is not the default registry', + ), + Option( + 'max_count_per_host', + type='int', + default=10, + desc='max number of daemons per service per host', + ), + Option( + 'autotune_memory_target_ratio', + type='float', + default=.7, + desc='ratio of total system memory to divide amongst autotuned daemons' + ), + Option( + 'autotune_interval', + type='secs', + default=10 * 60, + desc='how frequently to autotune daemon memory' + ), + Option( + 'use_agent', + type='bool', + default=False, + desc='Use cephadm agent on each host to gather and send metadata' + ), + Option( + 'agent_refresh_rate', + type='secs', + default=20, + desc='How often agent on each host will try to gather and send metadata' + ), + Option( + 'agent_starting_port', + type='int', + default=4721, + desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)' + ), + Option( + 'agent_down_multiplier', + type='float', + default=3.0, + desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down' + ), + Option( + 'max_osd_draining_count', + type='int', + default=10, + desc='max number of osds that will be drained simultaneously when osds are removed' + ), + Option( + 'service_discovery_port', + type='int', + default=8765, + desc='cephadm service discovery port' + ), + Option( + 'cgroups_split', + type='bool', + default=True, + desc='Pass --cgroups=split when cephadm creates containers (currently podman only)' + ), + Option( + 'log_refresh_metadata', + type='bool', + default=False, + desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level' + ), + Option( + 'secure_monitoring_stack', + type='bool', + default=False, + desc='Enable TLS security for all the monitoring stack daemons' + ), + Option( + 'default_cephadm_command_timeout', + type='secs', + default=15 * 60, + desc='Default timeout applied to cephadm commands run directly on ' + 'the host (in seconds)' + ), + ] + + def __init__(self, *args: Any, **kwargs: Any): + super(CephadmOrchestrator, self).__init__(*args, **kwargs) + self._cluster_fsid: str = self.get('mon_map')['fsid'] + self.last_monmap: Optional[datetime.datetime] = None + + # for serve() + self.run = True + self.event = Event() + + self.ssh = ssh.SSHManager(self) + + if self.get_store('pause'): + self.paused = True + else: + self.paused = False + + # for mypy which does not run the code + if TYPE_CHECKING: + self.ssh_config_file = None # type: Optional[str] + self.device_cache_timeout = 0 + self.daemon_cache_timeout = 0 + self.facts_cache_timeout = 0 + self.host_check_interval = 0 + self.max_count_per_host = 0 + self.mode = '' + self.container_image_base = '' + self.container_image_prometheus = '' + self.container_image_nvmeof = '' + self.container_image_grafana = '' + self.container_image_alertmanager = '' + self.container_image_node_exporter = '' + self.container_image_loki = '' + self.container_image_promtail = '' + self.container_image_haproxy = '' + self.container_image_keepalived = '' + self.container_image_snmp_gateway = '' + self.container_image_elasticsearch = '' + self.container_image_jaeger_agent = '' + self.container_image_jaeger_collector = '' + self.container_image_jaeger_query = '' + self.warn_on_stray_hosts = True + self.warn_on_stray_daemons = True + self.warn_on_failed_host_check = True + self.allow_ptrace = False + self.container_init = True + self.prometheus_alerts_path = '' + self.migration_current: Optional[int] = None + self.config_dashboard = True + self.manage_etc_ceph_ceph_conf = True + self.manage_etc_ceph_ceph_conf_hosts = '*' + self.registry_url: Optional[str] = None + self.registry_username: Optional[str] = None + self.registry_password: Optional[str] = None + self.registry_insecure: bool = False + self.use_repo_digest = True + self.default_registry = '' + self.autotune_memory_target_ratio = 0.0 + self.autotune_interval = 0 + self.ssh_user: Optional[str] = None + self._ssh_options: Optional[str] = None + self.tkey = NamedTemporaryFile() + self.ssh_config_fname: Optional[str] = None + self.ssh_config: Optional[str] = None + self._temp_files: List = [] + self.ssh_key: Optional[str] = None + self.ssh_pub: Optional[str] = None + self.ssh_cert: Optional[str] = None + self.use_agent = False + self.agent_refresh_rate = 0 + self.agent_down_multiplier = 0.0 + self.agent_starting_port = 0 + self.service_discovery_port = 0 + self.secure_monitoring_stack = False + self.apply_spec_fails: List[Tuple[str, str]] = [] + self.max_osd_draining_count = 10 + self.device_enhanced_scan = False + self.inventory_list_all = False + self.cgroups_split = True + self.log_refresh_metadata = False + self.default_cephadm_command_timeout = 0 + + self.notify(NotifyType.mon_map, None) + self.config_notify() + + path = self.get_ceph_option('cephadm_path') + try: + assert isinstance(path, str) + with open(path, 'rb') as f: + self._cephadm = f.read() + except (IOError, TypeError) as e: + raise RuntimeError("unable to read cephadm at '%s': %s" % ( + path, str(e))) + + self.cephadm_binary_path = self._get_cephadm_binary_path() + + self._worker_pool = multiprocessing.pool.ThreadPool(10) + + self.ssh._reconfig_ssh() + + CephadmOrchestrator.instance = self + + self.upgrade = CephadmUpgrade(self) + + self.health_checks: Dict[str, dict] = {} + + self.inventory = Inventory(self) + + self.cache = HostCache(self) + self.cache.load() + + self.agent_cache = AgentCache(self) + self.agent_cache.load() + + self.to_remove_osds = OSDRemovalQueue(self) + self.to_remove_osds.load_from_store() + + self.spec_store = SpecStore(self) + self.spec_store.load() + + self.keys = ClientKeyringStore(self) + self.keys.load() + + self.tuned_profiles = TunedProfileStore(self) + self.tuned_profiles.load() + + self.tuned_profile_utils = TunedProfileUtils(self) + + # ensure the host lists are in sync + for h in self.inventory.keys(): + if h not in self.cache.daemons: + self.cache.prime_empty_host(h) + for h in self.cache.get_hosts(): + if h not in self.inventory: + self.cache.rm_host(h) + + # in-memory only. + self.events = EventStore(self) + self.offline_hosts: Set[str] = set() + + self.migration = Migrations(self) + + _service_classes: Sequence[Type[CephadmService]] = [ + OSDService, NFSService, MonService, MgrService, MdsService, + RgwService, RbdMirrorService, GrafanaService, AlertmanagerService, + PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService, + IngressService, CustomContainerService, CephfsMirrorService, NvmeofService, + CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService, + JaegerQueryService, JaegerAgentService, JaegerCollectorService + ] + + # https://github.com/python/mypy/issues/8993 + self.cephadm_services: Dict[str, CephadmService] = { + cls.TYPE: cls(self) for cls in _service_classes} # type: ignore + + self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr']) + self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd']) + self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi']) + self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof']) + + self.scheduled_async_actions: List[Callable] = [] + + self.template = TemplateMgr(self) + + self.requires_post_actions: Set[str] = set() + self.need_connect_dashboard_rgw = False + + self.config_checker = CephadmConfigChecks(self) + + self.http_server = CephadmHttpServer(self) + self.http_server.start() + self.agent_helpers = CephadmAgentHelpers(self) + if self.use_agent: + self.agent_helpers._apply_agent() + + self.offline_watcher = OfflineHostWatcher(self) + self.offline_watcher.start() + + def shutdown(self) -> None: + self.log.debug('shutdown') + self._worker_pool.close() + self._worker_pool.join() + self.http_server.shutdown() + self.offline_watcher.shutdown() + self.run = False + self.event.set() + + def _get_cephadm_service(self, service_type: str) -> CephadmService: + assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES + return self.cephadm_services[service_type] + + def _get_cephadm_binary_path(self) -> str: + import hashlib + m = hashlib.sha256() + m.update(self._cephadm) + return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}' + + def _kick_serve_loop(self) -> None: + self.log.debug('_kick_serve_loop') + self.event.set() + + def serve(self) -> None: + """ + The main loop of cephadm. + + A command handler will typically change the declarative state + of cephadm. This loop will then attempt to apply this new state. + """ + # for ssh in serve + self.event_loop = ssh.EventLoopThread() + + serve = CephadmServe(self) + serve.serve() + + def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T: + if not timeout: + timeout = self.default_cephadm_command_timeout + # put a lower bound of 60 seconds in case users + # accidentally set it to something unreasonable. + # For example if they though it was in minutes + # rather than seconds + if timeout < 60: + self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.') + timeout = 60 + return self.event_loop.get_result(coro, timeout) + + @contextmanager + def async_timeout_handler(self, host: Optional[str] = '', + cmd: Optional[str] = '', + timeout: Optional[int] = None) -> Iterator[None]: + # this is meant to catch asyncio.TimeoutError and convert it into an + # OrchestratorError which much of the cephadm codebase is better equipped to handle. + # If the command being run, the host it is run on, or the timeout being used + # are provided, that will be included in the OrchestratorError's message + try: + yield + except asyncio.TimeoutError: + err_str: str = '' + if cmd: + err_str = f'Command "{cmd}" timed out ' + else: + err_str = 'Command timed out ' + if host: + err_str += f'on host {host} ' + if timeout: + err_str += f'(non-default {timeout} second timeout)' + else: + err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)') + raise OrchestratorError(err_str) + + def set_container_image(self, entity: str, image: str) -> None: + self.check_mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': image, + 'who': entity, + }) + + def config_notify(self) -> None: + """ + This method is called whenever one of our config options is changed. + + TODO: this method should be moved into mgr_module.py + """ + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], # type: ignore + self.get_module_option(opt['name'])) # type: ignore + self.log.debug(' mgr option %s = %s', + opt['name'], getattr(self, opt['name'])) # type: ignore + for opt in self.NATIVE_OPTIONS: + setattr(self, + opt, # type: ignore + self.get_ceph_option(opt)) + self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore + + self.event.set() + + def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None: + if notify_type == NotifyType.mon_map: + # get monmap mtime so we can refresh configs when mons change + monmap = self.get('mon_map') + self.last_monmap = str_to_datetime(monmap['modified']) + if self.last_monmap and self.last_monmap > datetime_now(): + self.last_monmap = None # just in case clocks are skewed + if getattr(self, 'manage_etc_ceph_ceph_conf', False): + # getattr, due to notify() being called before config_notify() + self._kick_serve_loop() + if notify_type == NotifyType.pg_summary: + self._trigger_osd_removal() + + def _trigger_osd_removal(self) -> None: + remove_queue = self.to_remove_osds.as_osd_ids() + if not remove_queue: + return + data = self.get("osd_stats") + for osd in data.get('osd_stats', []): + if osd.get('num_pgs') == 0: + # if _ANY_ osd that is currently in the queue appears to be empty, + # start the removal process + if int(osd.get('osd')) in remove_queue: + self.log.debug('Found empty osd. Starting removal process') + # if the osd that is now empty is also part of the removal queue + # start the process + self._kick_serve_loop() + + def pause(self) -> None: + if not self.paused: + self.log.info('Paused') + self.set_store('pause', 'true') + self.paused = True + # wake loop so we update the health status + self._kick_serve_loop() + + def resume(self) -> None: + if self.paused: + self.log.info('Resumed') + self.paused = False + self.set_store('pause', None) + # unconditionally wake loop so that 'orch resume' can be used to kick + # cephadm + self._kick_serve_loop() + + def get_unique_name( + self, + daemon_type: str, + host: str, + existing: List[orchestrator.DaemonDescription], + prefix: Optional[str] = None, + forcename: Optional[str] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + ) -> str: + """ + Generate a unique random service name + """ + suffix = daemon_type not in [ + 'mon', 'crash', 'ceph-exporter', + 'prometheus', 'node-exporter', 'grafana', 'alertmanager', + 'container', 'agent', 'snmp-gateway', 'loki', 'promtail', + 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query' + ] + if forcename: + if len([d for d in existing if d.daemon_id == forcename]): + raise orchestrator.OrchestratorValidationError( + f'name {daemon_type}.{forcename} already in use') + return forcename + + if '.' in host: + host = host.split('.')[0] + while True: + if prefix: + name = prefix + '.' + else: + name = '' + if rank is not None and rank_generation is not None: + name += f'{rank}.{rank_generation}.' + name += host + if suffix: + name += '.' + ''.join(random.choice(string.ascii_lowercase) + for _ in range(6)) + if len([d for d in existing if d.daemon_id == name]): + if not suffix: + raise orchestrator.OrchestratorValidationError( + f'name {daemon_type}.{name} already in use') + self.log.debug('name %s exists, trying again', name) + continue + return name + + def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None: + if ssh_config is None or len(ssh_config.strip()) == 0: + raise OrchestratorValidationError('ssh_config cannot be empty') + # StrictHostKeyChecking is [yes|no] ? + res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config) + if not res: + raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking') + for s in res: + if 'ask' in s.lower(): + raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') + + def validate_ssh_config_fname(self, ssh_config_fname: str) -> None: + if not os.path.isfile(ssh_config_fname): + raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( + ssh_config_fname)) + + def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None: + def _as_datetime(value: Optional[str]) -> Optional[datetime.datetime]: + return str_to_datetime(value) if value is not None else None + + dm = {} + for d in ls: + if not d['style'].startswith('cephadm'): + continue + if d['fsid'] != self._cluster_fsid: + continue + if '.' not in d['name']: + continue + daemon_type = d['name'].split('.')[0] + if daemon_type not in orchestrator.KNOWN_DAEMON_TYPES: + logger.warning(f"Found unknown daemon type {daemon_type} on host {host}") + continue + + container_id = d.get('container_id') + if container_id: + # shorten the hash + container_id = container_id[0:12] + rank = int(d['rank']) if d.get('rank') is not None else None + rank_generation = int(d['rank_generation']) if d.get( + 'rank_generation') is not None else None + status, status_desc = None, 'unknown' + if 'state' in d: + status_desc = d['state'] + status = { + 'running': DaemonDescriptionStatus.running, + 'stopped': DaemonDescriptionStatus.stopped, + 'error': DaemonDescriptionStatus.error, + 'unknown': DaemonDescriptionStatus.error, + }[d['state']] + sd = orchestrator.DaemonDescription( + daemon_type=daemon_type, + daemon_id='.'.join(d['name'].split('.')[1:]), + hostname=host, + container_id=container_id, + container_image_id=d.get('container_image_id'), + container_image_name=d.get('container_image_name'), + container_image_digests=d.get('container_image_digests'), + version=d.get('version'), + status=status, + status_desc=status_desc, + created=_as_datetime(d.get('created')), + started=_as_datetime(d.get('started')), + last_refresh=datetime_now(), + last_configured=_as_datetime(d.get('last_configured')), + last_deployed=_as_datetime(d.get('last_deployed')), + memory_usage=d.get('memory_usage'), + memory_request=d.get('memory_request'), + memory_limit=d.get('memory_limit'), + cpu_percentage=d.get('cpu_percentage'), + service_name=d.get('service_name'), + ports=d.get('ports'), + ip=d.get('ip'), + deployed_by=d.get('deployed_by'), + rank=rank, + rank_generation=rank_generation, + extra_container_args=d.get('extra_container_args'), + extra_entrypoint_args=d.get('extra_entrypoint_args'), + ) + dm[sd.name()] = sd + self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm))) + self.cache.update_host_daemons(host, dm) + self.cache.save_host(host) + return None + + def update_watched_hosts(self) -> None: + # currently, we are watching hosts with nfs daemons + hosts_to_watch = [d.hostname for d in self.cache.get_daemons( + ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES] + self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None]))) + + def offline_hosts_remove(self, host: str) -> None: + if host in self.offline_hosts: + self.offline_hosts.remove(host) + + def update_failed_daemon_health_check(self) -> None: + failed_daemons = [] + for dd in self.cache.get_error_daemons(): + if dd.daemon_type != 'agent': # agents tracked by CEPHADM_AGENT_DOWN + failed_daemons.append('daemon %s on %s is in %s state' % ( + dd.name(), dd.hostname, dd.status_desc + )) + self.remove_health_warning('CEPHADM_FAILED_DAEMON') + if failed_daemons: + self.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len( + failed_daemons), failed_daemons) + + @staticmethod + def can_run() -> Tuple[bool, str]: + if asyncssh is not None: + return True, "" + else: + return False, "loading asyncssh library:{}".format( + asyncssh_import_error) + + def available(self) -> Tuple[bool, str, Dict[str, Any]]: + """ + The cephadm orchestrator is always available. + """ + ok, err = self.can_run() + if not ok: + return ok, err, {} + if not self.ssh_key or not self.ssh_pub: + return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {} + + # mypy is unable to determine type for _processes since it's private + worker_count: int = self._worker_pool._processes # type: ignore + ret = { + "workers": worker_count, + "paused": self.paused, + } + + return True, err, ret + + def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None: + self.set_store(what, new) + self.ssh._reconfig_ssh() + if self.cache.get_hosts(): + # Can't check anything without hosts + host = self.cache.get_hosts()[0] + r = CephadmServe(self)._check_host(host) + if r is not None: + # connection failed reset user + self.set_store(what, old) + self.ssh._reconfig_ssh() + raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host)) + self.log.info(f'Set ssh {what}') + + @orchestrator._cli_write_command( + prefix='cephadm set-ssh-config') + def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + """ + Set the ssh_config file (use -i ) + """ + # Set an ssh_config file provided from stdin + + old = self.ssh_config + if inbuf == old: + return 0, "value unchanged", "" + self.validate_ssh_config_content(inbuf) + self._validate_and_set_ssh_val('ssh_config', inbuf, old) + return 0, "", "" + + @orchestrator._cli_write_command('cephadm clear-ssh-config') + def _clear_ssh_config(self) -> Tuple[int, str, str]: + """ + Clear the ssh_config file + """ + # Clear the ssh_config file provided from stdin + self.set_store("ssh_config", None) + self.ssh_config_tmp = None + self.log.info('Cleared ssh_config') + self.ssh._reconfig_ssh() + return 0, "", "" + + @orchestrator._cli_read_command('cephadm get-ssh-config') + def _get_ssh_config(self) -> HandleCommandResult: + """ + Returns the ssh config as used by cephadm + """ + if self.ssh_config_file: + self.validate_ssh_config_fname(self.ssh_config_file) + with open(self.ssh_config_file) as f: + return HandleCommandResult(stdout=f.read()) + ssh_config = self.get_store("ssh_config") + if ssh_config: + return HandleCommandResult(stdout=ssh_config) + return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) + + @orchestrator._cli_write_command('cephadm generate-key') + def _generate_key(self) -> Tuple[int, str, str]: + """ + Generate a cluster SSH key (if not present) + """ + if not self.ssh_pub or not self.ssh_key: + self.log.info('Generating ssh key...') + tmp_dir = TemporaryDirectory() + path = tmp_dir.name + '/key' + try: + subprocess.check_call([ + '/usr/bin/ssh-keygen', + '-C', 'ceph-%s' % self._cluster_fsid, + '-N', '', + '-f', path + ]) + with open(path, 'r') as f: + secret = f.read() + with open(path + '.pub', 'r') as f: + pub = f.read() + finally: + os.unlink(path) + os.unlink(path + '.pub') + tmp_dir.cleanup() + self.set_store('ssh_identity_key', secret) + self.set_store('ssh_identity_pub', pub) + self.ssh._reconfig_ssh() + return 0, '', '' + + @orchestrator._cli_write_command( + 'cephadm set-priv-key') + def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + """Set cluster SSH private key (use -i )""" + if inbuf is None or len(inbuf) == 0: + return -errno.EINVAL, "", "empty private ssh key provided" + old = self.ssh_key + if inbuf == old: + return 0, "value unchanged", "" + self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old) + self.log.info('Set ssh private key') + return 0, "", "" + + @orchestrator._cli_write_command( + 'cephadm set-pub-key') + def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + """Set cluster SSH public key (use -i )""" + if inbuf is None or len(inbuf) == 0: + return -errno.EINVAL, "", "empty public ssh key provided" + old = self.ssh_pub + if inbuf == old: + return 0, "value unchanged", "" + self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old) + return 0, "", "" + + @orchestrator._cli_write_command( + 'cephadm set-signed-cert') + def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + """Set a signed cert if CA signed keys are being used (use -i )""" + if inbuf is None or len(inbuf) == 0: + return -errno.EINVAL, "", "empty cert file provided" + old = self.ssh_cert + if inbuf == old: + return 0, "value unchanged", "" + self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old) + return 0, "", "" + + @orchestrator._cli_write_command( + 'cephadm clear-key') + def _clear_key(self) -> Tuple[int, str, str]: + """Clear cluster SSH key""" + self.set_store('ssh_identity_key', None) + self.set_store('ssh_identity_pub', None) + self.set_store('ssh_identity_cert', None) + self.ssh._reconfig_ssh() + self.log.info('Cleared cluster SSH key') + return 0, '', '' + + @orchestrator._cli_read_command( + 'cephadm get-pub-key') + def _get_pub_key(self) -> Tuple[int, str, str]: + """Show SSH public key for connecting to cluster hosts""" + if self.ssh_pub: + return 0, self.ssh_pub, '' + else: + return -errno.ENOENT, '', 'No cluster SSH key defined' + + @orchestrator._cli_read_command( + 'cephadm get-signed-cert') + def _get_signed_cert(self) -> Tuple[int, str, str]: + """Show SSH signed cert for connecting to cluster hosts using CA signed keys""" + if self.ssh_cert: + return 0, self.ssh_cert, '' + else: + return -errno.ENOENT, '', 'No signed cert defined' + + @orchestrator._cli_read_command( + 'cephadm get-user') + def _get_user(self) -> Tuple[int, str, str]: + """ + Show user for SSHing to cluster hosts + """ + if self.ssh_user is None: + return -errno.ENOENT, '', 'No cluster SSH user configured' + else: + return 0, self.ssh_user, '' + + @orchestrator._cli_read_command( + 'cephadm set-user') + def set_ssh_user(self, user: str) -> Tuple[int, str, str]: + """ + Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users + """ + current_user = self.ssh_user + if user == current_user: + return 0, "value unchanged", "" + + self._validate_and_set_ssh_val('ssh_user', user, current_user) + current_ssh_config = self._get_ssh_config() + new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout) + self._set_ssh_config(new_ssh_config) + + msg = 'ssh user set to %s' % user + if user != 'root': + msg += '. sudo will be used' + self.log.info(msg) + return 0, msg, '' + + @orchestrator._cli_read_command( + 'cephadm registry-login') + def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + """ + Set custom registry login info by providing url, username and password or json file with login info (-i ) + """ + # if password not given in command line, get it through file input + if not (url and username and password) and (inbuf is None or len(inbuf) == 0): + return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments " + "or -i ") + elif (url and username and password): + registry_json = {'url': url, 'username': username, 'password': password} + else: + assert isinstance(inbuf, str) + registry_json = json.loads(inbuf) + if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json: + return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. " + "Please setup json file as\n" + "{\n" + " \"url\": \"REGISTRY_URL\",\n" + " \"username\": \"REGISTRY_USERNAME\",\n" + " \"password\": \"REGISTRY_PASSWORD\"\n" + "}\n") + + # verify login info works by attempting login on random host + host = None + for host_name in self.inventory.keys(): + host = host_name + break + if not host: + raise OrchestratorError('no hosts defined') + with self.async_timeout_handler(host, 'cephadm registry-login'): + r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json)) + if r is not None: + return 1, '', r + # if logins succeeded, store info + self.log.debug("Host logins successful. Storing login info.") + self.set_store('registry_credentials', json.dumps(registry_json)) + # distribute new login info to all hosts + self.cache.distribute_new_registry_login_info() + return 0, "registry login scheduled", '' + + @orchestrator._cli_read_command('cephadm check-host') + def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: + """Check whether we can access and manage a remote host""" + try: + with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'): + out, err, code = self.wait_async( + CephadmServe(self)._run_cephadm( + host, cephadmNoImage, 'check-host', ['--expect-hostname', host], + addr=addr, error_ok=True, no_fsid=True)) + if code: + return 1, '', ('check-host failed:\n' + '\n'.join(err)) + except ssh.HostConnectionError as e: + self.log.exception( + f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}") + return 1, '', ('check-host failed:\n' + + f"Failed to connect to {host} at address ({e.addr}): {str(e)}") + except OrchestratorError: + self.log.exception(f"check-host failed for '{host}'") + return 1, '', ('check-host failed:\n' + + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.") + # if we have an outstanding health alert for this host, give the + # serve thread a kick + if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: + for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: + if item.startswith('host %s ' % host): + self.event.set() + return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) + + @orchestrator._cli_read_command( + 'cephadm prepare-host') + def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: + """Prepare a remote host for use with cephadm""" + with self.async_timeout_handler(host, 'cephadm prepare-host'): + out, err, code = self.wait_async( + CephadmServe(self)._run_cephadm( + host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host], + addr=addr, error_ok=True, no_fsid=True)) + if code: + return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) + # if we have an outstanding health alert for this host, give the + # serve thread a kick + if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: + for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: + if item.startswith('host %s ' % host): + self.event.set() + return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) + + @orchestrator._cli_write_command( + prefix='cephadm set-extra-ceph-conf') + def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult: + """ + Text that is appended to all daemon's ceph.conf. + Mainly a workaround, till `config generate-minimal-conf` generates + a complete ceph.conf. + + Warning: this is a dangerous operation. + """ + if inbuf: + # sanity check. + cp = ConfigParser() + cp.read_string(inbuf, source='') + + self.set_store("extra_ceph_conf", json.dumps({ + 'conf': inbuf, + 'last_modified': datetime_to_str(datetime_now()) + })) + self.log.info('Set extra_ceph_conf') + self._kick_serve_loop() + return HandleCommandResult() + + @orchestrator._cli_read_command( + 'cephadm get-extra-ceph-conf') + def _get_extra_ceph_conf(self) -> HandleCommandResult: + """ + Get extra ceph conf that is appended + """ + return HandleCommandResult(stdout=self.extra_ceph_conf().conf) + + @orchestrator._cli_read_command('cephadm config-check ls') + def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult: + """List the available configuration checks and their current state""" + + if format not in [Format.plain, Format.json, Format.json_pretty]: + return HandleCommandResult( + retval=1, + stderr="Requested format is not supported when listing configuration checks" + ) + + if format in [Format.json, Format.json_pretty]: + return HandleCommandResult( + stdout=to_format(self.config_checker.health_checks, + format, + many=True, + cls=None)) + + # plain formatting + table = PrettyTable( + ['NAME', + 'HEALTHCHECK', + 'STATUS', + 'DESCRIPTION' + ], border=False) + table.align['NAME'] = 'l' + table.align['HEALTHCHECK'] = 'l' + table.align['STATUS'] = 'l' + table.align['DESCRIPTION'] = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for c in self.config_checker.health_checks: + table.add_row(( + c.name, + c.healthcheck_name, + c.status, + c.description, + )) + + return HandleCommandResult(stdout=table.get_string()) + + @orchestrator._cli_read_command('cephadm config-check status') + def _config_check_status(self) -> HandleCommandResult: + """Show whether the configuration checker feature is enabled/disabled""" + status = self.get_module_option('config_checks_enabled') + return HandleCommandResult(stdout="Enabled" if status else "Disabled") + + @orchestrator._cli_write_command('cephadm config-check enable') + def _config_check_enable(self, check_name: str) -> HandleCommandResult: + """Enable a specific configuration check""" + if not self._config_check_valid(check_name): + return HandleCommandResult(retval=1, stderr="Invalid check name") + + err, msg = self._update_config_check(check_name, 'enabled') + if err: + return HandleCommandResult( + retval=err, + stderr=f"Failed to enable check '{check_name}' : {msg}") + + return HandleCommandResult(stdout="ok") + + @orchestrator._cli_write_command('cephadm config-check disable') + def _config_check_disable(self, check_name: str) -> HandleCommandResult: + """Disable a specific configuration check""" + if not self._config_check_valid(check_name): + return HandleCommandResult(retval=1, stderr="Invalid check name") + + err, msg = self._update_config_check(check_name, 'disabled') + if err: + return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}") + else: + # drop any outstanding raised healthcheck for this check + config_check = self.config_checker.lookup_check(check_name) + if config_check: + if config_check.healthcheck_name in self.health_checks: + self.health_checks.pop(config_check.healthcheck_name, None) + self.set_health_checks(self.health_checks) + else: + self.log.error( + f"Unable to resolve a check name ({check_name}) to a healthcheck definition?") + + return HandleCommandResult(stdout="ok") + + def _config_check_valid(self, check_name: str) -> bool: + return check_name in [chk.name for chk in self.config_checker.health_checks] + + def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]: + checks_raw = self.get_store('config_checks') + if not checks_raw: + return 1, "config_checks setting is not available" + + checks = json.loads(checks_raw) + checks.update({ + check_name: status + }) + self.log.info(f"updated config check '{check_name}' : {status}") + self.set_store('config_checks', json.dumps(checks)) + return 0, "" + + class ExtraCephConf(NamedTuple): + conf: str + last_modified: Optional[datetime.datetime] + + def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf': + data = self.get_store('extra_ceph_conf') + if not data: + return CephadmOrchestrator.ExtraCephConf('', None) + try: + j = json.loads(data) + except ValueError: + msg = 'Unable to load extra_ceph_conf: Cannot decode JSON' + self.log.exception('%s: \'%s\'', msg, data) + return CephadmOrchestrator.ExtraCephConf('', None) + return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified'])) + + def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool: + conf = self.extra_ceph_conf() + if not conf.last_modified: + return False + return conf.last_modified > dt + + @orchestrator._cli_write_command( + 'cephadm osd activate' + ) + def _osd_activate(self, host: List[str]) -> HandleCommandResult: + """ + Start OSD containers for existing OSDs + """ + + @forall_hosts + def run(h: str) -> str: + with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'): + return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')) + + return HandleCommandResult(stdout='\n'.join(run(host))) + + @orchestrator._cli_read_command('orch client-keyring ls') + def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult: + """ + List client keyrings under cephadm management + """ + if format != Format.plain: + output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec) + else: + table = PrettyTable( + ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'], + border=False) + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity): + table.add_row(( + ks.entity, ks.placement.pretty_str(), + utils.file_mode_to_str(ks.mode), + f'{ks.uid}:{ks.gid}', + ks.path, + )) + output = table.get_string() + return HandleCommandResult(stdout=output) + + @orchestrator._cli_write_command('orch client-keyring set') + def _client_keyring_set( + self, + entity: str, + placement: str, + owner: Optional[str] = None, + mode: Optional[str] = None, + ) -> HandleCommandResult: + """ + Add or update client keyring under cephadm management + """ + if not entity.startswith('client.'): + raise OrchestratorError('entity must start with client.') + if owner: + try: + uid, gid = map(int, owner.split(':')) + except Exception: + raise OrchestratorError('owner must look like ":", e.g., "0:0"') + else: + uid = 0 + gid = 0 + if mode: + try: + imode = int(mode, 8) + except Exception: + raise OrchestratorError('mode must be an octal mode, e.g. "600"') + else: + imode = 0o600 + pspec = PlacementSpec.from_string(placement) + ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid) + self.keys.update(ks) + self._kick_serve_loop() + return HandleCommandResult() + + @orchestrator._cli_write_command('orch client-keyring rm') + def _client_keyring_rm( + self, + entity: str, + ) -> HandleCommandResult: + """ + Remove client keyring from cephadm management + """ + self.keys.rm(entity) + self._kick_serve_loop() + return HandleCommandResult() + + def _get_container_image(self, daemon_name: str) -> Optional[str]: + daemon_type = daemon_name.split('.', 1)[0] # type: ignore + image: Optional[str] = None + if daemon_type in CEPH_IMAGE_TYPES: + # get container image + image = str(self.get_foreign_ceph_option( + utils.name_to_config_section(daemon_name), + 'container_image' + )).strip() + elif daemon_type == 'prometheus': + image = self.container_image_prometheus + elif daemon_type == 'nvmeof': + image = self.container_image_nvmeof + elif daemon_type == 'grafana': + image = self.container_image_grafana + elif daemon_type == 'alertmanager': + image = self.container_image_alertmanager + elif daemon_type == 'node-exporter': + image = self.container_image_node_exporter + elif daemon_type == 'loki': + image = self.container_image_loki + elif daemon_type == 'promtail': + image = self.container_image_promtail + elif daemon_type == 'haproxy': + image = self.container_image_haproxy + elif daemon_type == 'keepalived': + image = self.container_image_keepalived + elif daemon_type == 'elasticsearch': + image = self.container_image_elasticsearch + elif daemon_type == 'jaeger-agent': + image = self.container_image_jaeger_agent + elif daemon_type == 'jaeger-collector': + image = self.container_image_jaeger_collector + elif daemon_type == 'jaeger-query': + image = self.container_image_jaeger_query + elif daemon_type == CustomContainerService.TYPE: + # The image can't be resolved, the necessary information + # is only available when a container is deployed (given + # via spec). + image = None + elif daemon_type == 'snmp-gateway': + image = self.container_image_snmp_gateway + else: + assert False, daemon_type + + self.log.debug('%s container image %s' % (daemon_name, image)) + + return image + + def _check_valid_addr(self, host: str, addr: str) -> str: + # make sure hostname is resolvable before trying to make a connection + try: + ip_addr = utils.resolve_ip(addr) + except OrchestratorError as e: + msg = str(e) + f''' +You may need to supply an address for {addr} + +Please make sure that the host is reachable and accepts connections using the cephadm SSH key +To add the cephadm SSH key to the host: +> ceph cephadm get-pub-key > ~/ceph.pub +> ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr} + +To check that the host is reachable open a new shell with the --no-hosts flag: +> cephadm shell --no-hosts + +Then run the following: +> ceph cephadm get-ssh-config > ssh_config +> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key +> chmod 0600 ~/cephadm_private_key +> ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}''' + raise OrchestratorError(msg) + + if ipaddress.ip_address(ip_addr).is_loopback and host == addr: + # if this is a re-add, use old address. otherwise error + if host not in self.inventory or self.inventory.get_addr(host) == host: + raise OrchestratorError( + (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n' + + f'Please explicitly provide the address (ceph orch host add {host} --addr )')) + self.log.debug( + f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.') + ip_addr = self.inventory.get_addr(host) + try: + with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + host, cephadmNoImage, 'check-host', + ['--expect-hostname', host], + addr=addr, + error_ok=True, no_fsid=True)) + if code: + msg = 'check-host failed:\n' + '\n'.join(err) + # err will contain stdout and stderr, so we filter on the message text to + # only show the errors + errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')] + if errors: + msg = f'Host {host} ({addr}) failed check(s): {errors}' + raise OrchestratorError(msg) + except ssh.HostConnectionError as e: + raise OrchestratorError(str(e)) + return ip_addr + + def _add_host(self, spec): + # type: (HostSpec) -> str + """ + Add a host to be managed by the orchestrator. + + :param host: host name + """ + HostSpec.validate(spec) + ip_addr = self._check_valid_addr(spec.hostname, spec.addr) + if spec.addr == spec.hostname and ip_addr: + spec.addr = ip_addr + + if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr: + self.cache.refresh_all_host_info(spec.hostname) + + # prime crush map? + if spec.location: + self.check_mon_command({ + 'prefix': 'osd crush add-bucket', + 'name': spec.hostname, + 'type': 'host', + 'args': [f'{k}={v}' for k, v in spec.location.items()], + }) + + if spec.hostname not in self.inventory: + self.cache.prime_empty_host(spec.hostname) + self.inventory.add_host(spec) + self.offline_hosts_remove(spec.hostname) + if spec.status == 'maintenance': + self._set_maintenance_healthcheck() + self.event.set() # refresh stray health check + self.log.info('Added host %s' % spec.hostname) + return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr) + + @handle_orch_error + def add_host(self, spec: HostSpec) -> str: + return self._add_host(spec) + + @handle_orch_error + def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str: + """ + Remove a host from orchestrator management. + + :param host: host name + :param force: bypass running daemons check + :param offline: remove offline host + """ + + # check if host is offline + host_offline = host in self.offline_hosts + + if host_offline and not offline: + raise OrchestratorValidationError( + "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host)) + + if not host_offline and offline: + raise OrchestratorValidationError( + "{} is online, please remove host without --offline.".format(host)) + + if offline and not force: + raise OrchestratorValidationError("Removing an offline host requires --force") + + # check if there are daemons on the host + if not force: + daemons = self.cache.get_daemons_by_host(host) + if daemons: + self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}") + + daemons_table = "" + daemons_table += "{:<20} {:<15}\n".format("type", "id") + daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) + for d in daemons: + daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) + + raise OrchestratorValidationError("Not allowed to remove %s from cluster. " + "The following daemons are running in the host:" + "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % ( + host, daemons_table, host)) + + # check, if there we're removing the last _admin host + if not force: + p = PlacementSpec(label=SpecialHostLabels.ADMIN) + admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) + if len(admin_hosts) == 1 and admin_hosts[0] == host: + raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'" + f" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host" + " or add --force to this command") + + def run_cmd(cmd_args: dict) -> None: + ret, out, err = self.mon_command(cmd_args) + if ret != 0: + self.log.debug(f"ran {cmd_args} with mon_command") + self.log.error( + f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") + + if offline: + daemons = self.cache.get_daemons_by_host(host) + for d in daemons: + self.log.info(f"removing: {d.name()}") + + if d.daemon_type != 'osd': + self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d) + self.cephadm_services[daemon_type_to_service( + str(d.daemon_type))].post_remove(d, is_failed_deploy=False) + else: + cmd_args = { + 'prefix': 'osd purge-actual', + 'id': int(str(d.daemon_id)), + 'yes_i_really_mean_it': True + } + run_cmd(cmd_args) + + cmd_args = { + 'prefix': 'osd crush rm', + 'name': host + } + run_cmd(cmd_args) + + self.inventory.rm_host(host) + self.cache.rm_host(host) + self.ssh.reset_con(host) + # if host was in offline host list, we should remove it now. + self.offline_hosts_remove(host) + self.event.set() # refresh stray health check + self.log.info('Removed host %s' % host) + return "Removed {} host '{}'".format('offline' if offline else '', host) + + @handle_orch_error + def update_host_addr(self, host: str, addr: str) -> str: + self._check_valid_addr(host, addr) + self.inventory.set_addr(host, addr) + self.ssh.reset_con(host) + self.event.set() # refresh stray health check + self.log.info('Set host %s addr to %s' % (host, addr)) + return "Updated host '{}' addr to '{}'".format(host, addr) + + @handle_orch_error + def get_hosts(self): + # type: () -> List[orchestrator.HostSpec] + """ + Return a list of hosts managed by the orchestrator. + + Notes: + - skip async: manager reads from cache. + """ + return list(self.inventory.all_specs()) + + @handle_orch_error + def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]: + """ + Return a list of hosts metadata(gather_facts) managed by the orchestrator. + + Notes: + - skip async: manager reads from cache. + """ + if hostname: + return [self.cache.get_facts(hostname)] + + return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()] + + @handle_orch_error + def add_host_label(self, host: str, label: str) -> str: + self.inventory.add_label(host, label) + self.log.info('Added label %s to host %s' % (label, host)) + self._kick_serve_loop() + return 'Added label %s to host %s' % (label, host) + + @handle_orch_error + def remove_host_label(self, host: str, label: str, force: bool = False) -> str: + # if we remove the _admin label from the only host that has it we could end up + # removing the only instance of the config and keyring and cause issues + if not force and label == SpecialHostLabels.ADMIN: + p = PlacementSpec(label=SpecialHostLabels.ADMIN) + admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) + if len(admin_hosts) == 1 and admin_hosts[0] == host: + raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'" + f" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal" + " of the last cluster config/keyring managed by cephadm.\n" + f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host" + " before completing this operation.\nIf you're certain this is" + " what you want rerun this command with --force.") + if self.inventory.has_label(host, label): + self.inventory.rm_label(host, label) + msg = f'Removed label {label} from host {host}' + else: + msg = f"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels." + self.log.info(msg) + self._kick_serve_loop() + return msg + + def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]: + self.log.debug("running host-ok-to-stop checks") + daemons = self.cache.get_daemons() + daemon_map: Dict[str, List[str]] = defaultdict(lambda: []) + for dd in daemons: + assert dd.hostname is not None + assert dd.daemon_type is not None + assert dd.daemon_id is not None + if dd.hostname == hostname: + daemon_map[dd.daemon_type].append(dd.daemon_id) + + notifications: List[str] = [] + error_notifications: List[str] = [] + okay: bool = True + for daemon_type, daemon_ids in daemon_map.items(): + r = self.cephadm_services[daemon_type_to_service( + daemon_type)].ok_to_stop(daemon_ids, force=force) + if r.retval: + okay = False + # collect error notifications so user can see every daemon causing host + # to not be okay to stop + error_notifications.append(r.stderr) + if r.stdout: + # if extra notifications to print for user, add them to notifications list + notifications.append(r.stdout) + + if not okay: + # at least one daemon is not okay to stop + return 1, '\n'.join(error_notifications) + + if notifications: + return 0, (f'It is presumed safe to stop host {hostname}. ' + + 'Note the following:\n\n' + '\n'.join(notifications)) + return 0, f'It is presumed safe to stop host {hostname}' + + @handle_orch_error + def host_ok_to_stop(self, hostname: str) -> str: + if hostname not in self.cache.get_hosts(): + raise OrchestratorError(f'Cannot find host "{hostname}"') + + rc, msg = self._host_ok_to_stop(hostname) + if rc: + raise OrchestratorError(msg, errno=rc) + + self.log.info(msg) + return msg + + def _set_maintenance_healthcheck(self) -> None: + """Raise/update or clear the maintenance health check as needed""" + + in_maintenance = self.inventory.get_host_with_state("maintenance") + if not in_maintenance: + self.remove_health_warning('HOST_IN_MAINTENANCE') + else: + s = "host is" if len(in_maintenance) == 1 else "hosts are" + self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [ + f"{h} is in maintenance" for h in in_maintenance]) + + @handle_orch_error + @host_exists() + def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> str: + """ Attempt to place a cluster host in maintenance + + Placing a host into maintenance disables the cluster's ceph target in systemd + and stops all ceph daemons. If the host is an osd host we apply the noout flag + for the host subtree in crush to prevent data movement during a host maintenance + window. + + :param hostname: (str) name of the host (must match an inventory hostname) + + :raises OrchestratorError: Hostname is invalid, host is already in maintenance + """ + if yes_i_really_mean_it and not force: + raise OrchestratorError("--force must be passed with --yes-i-really-mean-it") + + if len(self.cache.get_hosts()) == 1 and not yes_i_really_mean_it: + raise OrchestratorError("Maintenance feature is not supported on single node clusters") + + # if upgrade is active, deny + if self.upgrade.upgrade_state and not yes_i_really_mean_it: + raise OrchestratorError( + f"Unable to place {hostname} in maintenance with upgrade active/paused") + + tgt_host = self.inventory._inventory[hostname] + if tgt_host.get("status", "").lower() == "maintenance": + raise OrchestratorError(f"Host {hostname} is already in maintenance") + + host_daemons = self.cache.get_daemon_types(hostname) + self.log.debug("daemons on host {}".format(','.join(host_daemons))) + if host_daemons: + # daemons on this host, so check the daemons can be stopped + # and if so, place the host into maintenance by disabling the target + rc, msg = self._host_ok_to_stop(hostname, force) + if rc and not yes_i_really_mean_it: + raise OrchestratorError( + msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc) + + # call the host-maintenance function + with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'): + _out, _err, _code = self.wait_async( + CephadmServe(self)._run_cephadm( + hostname, cephadmNoImage, "host-maintenance", + ["enter"], + error_ok=True)) + returned_msg = _err[0].split('\n')[-1] + if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it: + raise OrchestratorError( + f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}") + + if "osd" in host_daemons: + crush_node = hostname if '.' not in hostname else hostname.split('.')[0] + rc, out, err = self.mon_command({ + 'prefix': 'osd set-group', + 'flags': 'noout', + 'who': [crush_node], + 'format': 'json' + }) + if rc and not yes_i_really_mean_it: + self.log.warning( + f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})") + raise OrchestratorError( + f"Unable to set the osds on {hostname} to noout (rc={rc})") + elif not rc: + self.log.info( + f"maintenance mode request for {hostname} has SET the noout group") + + # update the host status in the inventory + tgt_host["status"] = "maintenance" + self.inventory._inventory[hostname] = tgt_host + self.inventory.save() + + self._set_maintenance_healthcheck() + return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode' + + @handle_orch_error + @host_exists() + def exit_host_maintenance(self, hostname: str) -> str: + """Exit maintenance mode and return a host to an operational state + + Returning from maintenance will enable the clusters systemd target and + start it, and remove any noout that has been added for the host if the + host has osd daemons + + :param hostname: (str) host name + + :raises OrchestratorError: Unable to return from maintenance, or unset the + noout flag + """ + tgt_host = self.inventory._inventory[hostname] + if tgt_host['status'] != "maintenance": + raise OrchestratorError(f"Host {hostname} is not in maintenance mode") + + with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'): + outs, errs, _code = self.wait_async( + CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, + 'host-maintenance', ['exit'], error_ok=True)) + returned_msg = errs[0].split('\n')[-1] + if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): + raise OrchestratorError( + f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") + + if "osd" in self.cache.get_daemon_types(hostname): + crush_node = hostname if '.' not in hostname else hostname.split('.')[0] + rc, _out, _err = self.mon_command({ + 'prefix': 'osd unset-group', + 'flags': 'noout', + 'who': [crush_node], + 'format': 'json' + }) + if rc: + self.log.warning( + f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})") + raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})") + else: + self.log.info( + f"exit maintenance request has UNSET for the noout group on host {hostname}") + + # update the host record status + tgt_host['status'] = "" + self.inventory._inventory[hostname] = tgt_host + self.inventory.save() + + self._set_maintenance_healthcheck() + + return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode" + + @handle_orch_error + @host_exists() + def rescan_host(self, hostname: str) -> str: + """Use cephadm to issue a disk rescan on each HBA + + Some HBAs and external enclosures don't automatically register + device insertion with the kernel, so for these scenarios we need + to manually rescan + + :param hostname: (str) host name + """ + self.log.info(f'disk rescan request sent to host "{hostname}"') + with self.async_timeout_handler(hostname, 'cephadm disk-rescan'): + _out, _err, _code = self.wait_async( + CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan", + [], no_fsid=True, error_ok=True)) + if not _err: + raise OrchestratorError('Unexpected response from cephadm disk-rescan call') + + msg = _err[0].split('\n')[-1] + log_msg = f'disk rescan: {msg}' + if msg.upper().startswith('OK'): + self.log.info(log_msg) + else: + self.log.warning(log_msg) + + return f'{msg}' + + def get_minimal_ceph_conf(self) -> str: + _, config, _ = self.check_mon_command({ + "prefix": "config generate-minimal-conf", + }) + extra = self.extra_ceph_conf().conf + if extra: + try: + config = self._combine_confs(config, extra) + except Exception as e: + self.log.error(f'Failed to add extra ceph conf settings to minimal ceph conf: {e}') + return config + + def _combine_confs(self, conf1: str, conf2: str) -> str: + section_to_option: Dict[str, List[str]] = {} + final_conf: str = '' + for conf in [conf1, conf2]: + if not conf: + continue + section = '' + for line in conf.split('\n'): + if line.strip().startswith('#') or not line.strip(): + continue + if line.strip().startswith('[') and line.strip().endswith(']'): + section = line.strip().replace('[', '').replace(']', '') + if section not in section_to_option: + section_to_option[section] = [] + else: + section_to_option[section].append(line.strip()) + + first_section = True + for section, options in section_to_option.items(): + if not first_section: + final_conf += '\n' + final_conf += f'[{section}]\n' + for option in options: + final_conf += f'{option}\n' + first_section = False + + return final_conf + + def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None: + if filter_host: + self.cache.invalidate_host_daemons(filter_host) + else: + for h in self.cache.get_hosts(): + # Also discover daemons deployed manually + self.cache.invalidate_host_daemons(h) + + self._kick_serve_loop() + + @handle_orch_error + def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.ServiceDescription]: + if refresh: + self._invalidate_daemons_and_kick_serve() + self.log.debug('Kicked serve() loop to refresh all services') + + sm: Dict[str, orchestrator.ServiceDescription] = {} + + # known services + for nm, spec in self.spec_store.all_specs.items(): + if service_type is not None and service_type != spec.service_type: + continue + if service_name is not None and service_name != nm: + continue + + if spec.service_type != 'osd': + size = spec.placement.get_target_count(self.cache.get_schedulable_hosts()) + else: + # osd counting is special + size = 0 + + sm[nm] = orchestrator.ServiceDescription( + spec=spec, + size=size, + running=0, + events=self.events.get_for_service(spec.service_name()), + created=self.spec_store.spec_created[nm], + deleted=self.spec_store.spec_deleted.get(nm, None), + virtual_ip=spec.get_virtual_ip(), + ports=spec.get_port_start(), + ) + if spec.service_type == 'ingress': + # ingress has 2 daemons running per host + # but only if it's the full ingress service, not for keepalive-only + if not cast(IngressSpec, spec).keepalive_only: + sm[nm].size *= 2 + + # factor daemons into status + for h, dm in self.cache.get_daemons_with_volatile_status(): + for name, dd in dm.items(): + assert dd.hostname is not None, f'no hostname for {dd!r}' + assert dd.daemon_type is not None, f'no daemon_type for {dd!r}' + + n: str = dd.service_name() + + if ( + service_type + and service_type != daemon_type_to_service(dd.daemon_type) + ): + continue + if service_name and service_name != n: + continue + + if n not in sm: + # new unmanaged service + spec = ServiceSpec( + unmanaged=True, + service_type=daemon_type_to_service(dd.daemon_type), + service_id=dd.service_id(), + ) + sm[n] = orchestrator.ServiceDescription( + last_refresh=dd.last_refresh, + container_image_id=dd.container_image_id, + container_image_name=dd.container_image_name, + spec=spec, + size=0, + ) + + if dd.status == DaemonDescriptionStatus.running: + sm[n].running += 1 + if dd.daemon_type == 'osd': + # The osd count can't be determined by the Placement spec. + # Showing an actual/expected representation cannot be determined + # here. So we're setting running = size for now. + sm[n].size += 1 + if ( + not sm[n].last_refresh + or not dd.last_refresh + or dd.last_refresh < sm[n].last_refresh # type: ignore + ): + sm[n].last_refresh = dd.last_refresh + + return list(sm.values()) + + @handle_orch_error + def list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.DaemonDescription]: + if refresh: + self._invalidate_daemons_and_kick_serve(host) + self.log.debug('Kicked serve() loop to refresh all daemons') + + result = [] + for h, dm in self.cache.get_daemons_with_volatile_status(): + if host and h != host: + continue + for name, dd in dm.items(): + if daemon_type is not None and daemon_type != dd.daemon_type: + continue + if daemon_id is not None and daemon_id != dd.daemon_id: + continue + if service_name is not None and service_name != dd.service_name(): + continue + if not dd.memory_request and dd.daemon_type in ['osd', 'mon']: + dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option( + dd.name(), + f"{dd.daemon_type}_memory_target" + )) + result.append(dd) + return result + + @handle_orch_error + def service_action(self, action: str, service_name: str) -> List[str]: + if service_name not in self.spec_store.all_specs.keys(): + raise OrchestratorError(f'Invalid service name "{service_name}".' + + ' View currently running services using "ceph orch ls"') + dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) + if not dds: + raise OrchestratorError(f'No daemons exist under service name "{service_name}".' + + ' View currently running services using "ceph orch ls"') + if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']: + return [f'Stopping entire {service_name} service is prohibited.'] + self.log.info('%s service %s' % (action.capitalize(), service_name)) + return [ + self._schedule_daemon_action(dd.name(), action) + for dd in dds + ] + + def _rotate_daemon_key(self, daemon_spec: CephadmDaemonDeploySpec) -> str: + self.log.info(f'Rotating authentication key for {daemon_spec.name()}') + rc, out, err = self.mon_command({ + 'prefix': 'auth get-or-create-pending', + 'entity': daemon_spec.entity_name(), + 'format': 'json', + }) + j = json.loads(out) + pending_key = j[0]['pending_key'] + + # deploy a new keyring file + if daemon_spec.daemon_type != 'osd': + daemon_spec = self.cephadm_services[daemon_type_to_service( + daemon_spec.daemon_type)].prepare_create(daemon_spec) + with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): + self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)) + + # try to be clever, or fall back to restarting the daemon + rc = -1 + if daemon_spec.daemon_type == 'osd': + rc, out, err = self.tool_exec( + args=['ceph', 'tell', daemon_spec.name(), 'rotate-stored-key', '-i', '-'], + stdin=pending_key.encode() + ) + if not rc: + rc, out, err = self.tool_exec( + args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'], + stdin=pending_key.encode() + ) + elif daemon_spec.daemon_type == 'mds': + rc, out, err = self.tool_exec( + args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'], + stdin=pending_key.encode() + ) + elif ( + daemon_spec.daemon_type == 'mgr' + and daemon_spec.daemon_id == self.get_mgr_id() + ): + rc, out, err = self.tool_exec( + args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'], + stdin=pending_key.encode() + ) + if rc: + self._daemon_action(daemon_spec, 'restart') + + return f'Rotated key for {daemon_spec.name()}' + + def _daemon_action(self, + daemon_spec: CephadmDaemonDeploySpec, + action: str, + image: Optional[str] = None) -> str: + self._daemon_action_set_image(action, image, daemon_spec.daemon_type, + daemon_spec.daemon_id) + + if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type, + daemon_spec.daemon_id): + self.mgr_service.fail_over() + return '' # unreachable + + if action == 'rotate-key': + return self._rotate_daemon_key(daemon_spec) + + if action == 'redeploy' or action == 'reconfig': + if daemon_spec.daemon_type != 'osd': + daemon_spec = self.cephadm_services[daemon_type_to_service( + daemon_spec.daemon_type)].prepare_create(daemon_spec) + else: + # for OSDs, we still need to update config, just not carry out the full + # prepare_create function + daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config( + daemon_spec) + with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): + return self.wait_async( + CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))) + + actions = { + 'start': ['reset-failed', 'start'], + 'stop': ['stop'], + 'restart': ['reset-failed', 'restart'], + } + name = daemon_spec.name() + for a in actions[action]: + try: + with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + daemon_spec.host, name, 'unit', + ['--name', name, a])) + except Exception: + self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed') + self.cache.invalidate_host_daemons(daemon_spec.host) + msg = "{} {} from host '{}'".format(action, name, daemon_spec.host) + self.events.for_daemon(name, 'INFO', msg) + return msg + + def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None: + if image is not None: + if action != 'redeploy': + raise OrchestratorError( + f'Cannot execute {action} with new image. `action` needs to be `redeploy`') + if daemon_type not in CEPH_IMAGE_TYPES: + raise OrchestratorError( + f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported ' + f'types are: {", ".join(CEPH_IMAGE_TYPES)}') + + self.check_mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': image, + 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), + }) + + @handle_orch_error + def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: + d = self.cache.get_daemon(daemon_name) + assert d.daemon_type is not None + assert d.daemon_id is not None + + if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \ + and not self.mgr_service.mgr_map_has_standby(): + raise OrchestratorError( + f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') + + if action == 'rotate-key': + if d.daemon_type not in ['mgr', 'osd', 'mds', + 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']: + raise OrchestratorError( + f'key rotation not supported for {d.daemon_type}' + ) + + self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id) + + self.log.info(f'Schedule {action} daemon {daemon_name}') + return self._schedule_daemon_action(daemon_name, action) + + def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: + return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() + + def get_active_mgr(self) -> DaemonDescription: + return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr')) + + def get_active_mgr_digests(self) -> List[str]: + digests = self.mgr_service.get_active_daemon( + self.cache.get_daemons_by_type('mgr')).container_image_digests + return digests if digests else [] + + def _schedule_daemon_action(self, daemon_name: str, action: str) -> str: + dd = self.cache.get_daemon(daemon_name) + assert dd.daemon_type is not None + assert dd.daemon_id is not None + assert dd.hostname is not None + if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ + and not self.mgr_service.mgr_map_has_standby(): + raise OrchestratorError( + f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') + self.cache.schedule_daemon_action(dd.hostname, dd.name(), action) + self.cache.save_host(dd.hostname) + msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname) + self._kick_serve_loop() + return msg + + @handle_orch_error + def remove_daemons(self, names): + # type: (List[str]) -> List[str] + args = [] + for host, dm in self.cache.daemons.items(): + for name in names: + if name in dm: + args.append((name, host)) + if not args: + raise OrchestratorError('Unable to find daemon(s) %s' % (names)) + self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args])) + return self._remove_daemons(args) + + @handle_orch_error + def remove_service(self, service_name: str, force: bool = False) -> str: + self.log.info('Remove service %s' % service_name) + self._trigger_preview_refresh(service_name=service_name) + if service_name in self.spec_store: + if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'): + return f'Unable to remove {service_name} service.\n' \ + f'Note, you might want to mark the {service_name} service as "unmanaged"' + else: + return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n" + + # Report list of affected OSDs? + if not force and service_name.startswith('osd.'): + osds_msg = {} + for h, dm in self.cache.get_daemons_with_volatile_status(): + osds_to_remove = [] + for name, dd in dm.items(): + if dd.daemon_type == 'osd' and dd.service_name() == service_name: + osds_to_remove.append(str(dd.daemon_id)) + if osds_to_remove: + osds_msg[h] = osds_to_remove + if osds_msg: + msg = '' + for h, ls in osds_msg.items(): + msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}' + raise OrchestratorError( + f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}') + + found = self.spec_store.rm(service_name) + if found and service_name.startswith('osd.'): + self.spec_store.finally_rm(service_name) + self._kick_serve_loop() + return f'Removed service {service_name}' + + @handle_orch_error + def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: + """ + Return the storage inventory of hosts matching the given filter. + + :param host_filter: host filter + + TODO: + - add filtering by label + """ + if refresh: + if host_filter and host_filter.hosts: + for h in host_filter.hosts: + self.log.debug(f'will refresh {h} devs') + self.cache.invalidate_host_devices(h) + self.cache.invalidate_host_networks(h) + else: + for h in self.cache.get_hosts(): + self.log.debug(f'will refresh {h} devs') + self.cache.invalidate_host_devices(h) + self.cache.invalidate_host_networks(h) + + self.event.set() + self.log.debug('Kicked serve() loop to refresh devices') + + result = [] + for host, dls in self.cache.devices.items(): + if host_filter and host_filter.hosts and host not in host_filter.hosts: + continue + result.append(orchestrator.InventoryHost(host, + inventory.Devices(dls))) + return result + + @handle_orch_error + def zap_device(self, host: str, path: str) -> str: + """Zap a device on a managed host. + + Use ceph-volume zap to return a device to an unused/free state + + Args: + host (str): hostname of the cluster host + path (str): device path + + Raises: + OrchestratorError: host is not a cluster host + OrchestratorError: host is in maintenance and therefore unavailable + OrchestratorError: device path not found on the host + OrchestratorError: device is known to a different ceph cluster + OrchestratorError: device holds active osd + OrchestratorError: device cache hasn't been populated yet.. + + Returns: + str: output from the zap command + """ + + self.log.info('Zap device %s:%s' % (host, path)) + + if host not in self.inventory.keys(): + raise OrchestratorError( + f"Host '{host}' is not a member of the cluster") + + host_info = self.inventory._inventory.get(host, {}) + if host_info.get('status', '').lower() == 'maintenance': + raise OrchestratorError( + f"Host '{host}' is in maintenance mode, which prevents any actions against it.") + + if host not in self.cache.devices: + raise OrchestratorError( + f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.") + + host_devices = self.cache.devices[host] + path_found = False + osd_id_list: List[str] = [] + + for dev in host_devices: + if dev.path == path: + path_found = True + break + if not path_found: + raise OrchestratorError( + f"Device path '{path}' not found on host '{host}'") + + if osd_id_list: + dev_name = os.path.basename(path) + active_osds: List[str] = [] + for osd_id in osd_id_list: + metadata = self.get_metadata('osd', str(osd_id)) + if metadata: + if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','): + active_osds.append("osd." + osd_id) + if active_osds: + raise OrchestratorError( + f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active " + f"OSD{'s' if len(active_osds) > 1 else ''}" + f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.") + + cv_args = ['--', 'lvm', 'zap', '--destroy', path] + with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + host, 'osd', 'ceph-volume', cv_args, error_ok=True)) + + self.cache.invalidate_host_devices(host) + self.cache.invalidate_host_networks(host) + if code: + raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) + msg = f'zap successful for {path} on {host}' + self.log.info(msg) + + return msg + '\n' + + @handle_orch_error + def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: + """ + Blink a device light. Calling something like:: + + lsmcli local-disk-ident-led-on --path $path + + If you must, you can customize this via:: + + ceph config-key set mgr/cephadm/blink_device_light_cmd '' + ceph config-key set mgr/cephadm//blink_device_light_cmd '' + + See templates/blink_device_light_cmd.j2 + """ + @forall_hosts + def blink(host: str, dev: str, path: str) -> str: + cmd_line = self.template.render('blink_device_light_cmd.j2', + { + 'on': on, + 'ident_fault': ident_fault, + 'dev': dev, + 'path': path + }, + host=host) + cmd_args = shlex.split(cmd_line) + + with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'): + out, err, code = self.wait_async(CephadmServe(self)._run_cephadm( + host, 'osd', 'shell', ['--'] + cmd_args, + error_ok=True)) + if code: + raise OrchestratorError( + 'Unable to affect %s light for %s:%s. Command: %s' % ( + ident_fault, host, dev, ' '.join(cmd_args))) + self.log.info('Set %s light for %s:%s %s' % ( + ident_fault, host, dev, 'on' if on else 'off')) + return "Set %s light for %s:%s %s" % ( + ident_fault, host, dev, 'on' if on else 'off') + + return blink(locs) + + def get_osd_uuid_map(self, only_up=False): + # type: (bool) -> Dict[str, str] + osd_map = self.get('osd_map') + r = {} + for o in osd_map['osds']: + # only include OSDs that have ever started in this map. this way + # an interrupted osd create can be repeated and succeed the second + # time around. + osd_id = o.get('osd') + if osd_id is None: + raise OrchestratorError("Could not retrieve osd_id from osd_map") + if not only_up: + r[str(osd_id)] = o.get('uuid', '') + return r + + def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]: + osd = [x for x in self.get('osd_map')['osds'] + if x['osd'] == osd_id] + + if len(osd) != 1: + return None + + return osd[0] + + def _trigger_preview_refresh(self, + specs: Optional[List[DriveGroupSpec]] = None, + service_name: Optional[str] = None, + ) -> None: + # Only trigger a refresh when a spec has changed + trigger_specs = [] + if specs: + for spec in specs: + preview_spec = self.spec_store.spec_preview.get(spec.service_name()) + # the to-be-preview spec != the actual spec, this means we need to + # trigger a refresh, if the spec has been removed (==None) we need to + # refresh as well. + if not preview_spec or spec != preview_spec: + trigger_specs.append(spec) + if service_name: + trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))] + if not any(trigger_specs): + return None + + refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs) + for host in refresh_hosts: + self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") + self.cache.osdspec_previews_refresh_queue.append(host) + + @handle_orch_error + def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: + """ + Deprecated. Please use `apply()` instead. + + Keeping this around to be compatible to mgr/dashboard + """ + return [self._apply(spec) for spec in specs] + + @handle_orch_error + def create_osds(self, drive_group: DriveGroupSpec) -> str: + hosts: List[HostSpec] = self.inventory.all_specs() + filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts) + if not filtered_hosts: + return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts" + return self.osd_service.create_from_spec(drive_group) + + def _preview_osdspecs(self, + osdspecs: Optional[List[DriveGroupSpec]] = None + ) -> dict: + if not osdspecs: + return {'n/a': [{'error': True, + 'message': 'No OSDSpec or matching hosts found.'}]} + matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs) + if not matching_hosts: + return {'n/a': [{'error': True, + 'message': 'No OSDSpec or matching hosts found.'}]} + # Is any host still loading previews or still in the queue to be previewed + pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts} + if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts): + # Report 'pending' when any of the matching hosts is still loading previews (flag is True) + return {'n/a': [{'error': True, + 'message': 'Preview data is being generated.. ' + 'Please re-run this command in a bit.'}]} + # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs + previews_for_specs = {} + for host, raw_reports in self.cache.osdspec_previews.items(): + if host not in matching_hosts: + continue + osd_reports = [] + for osd_report in raw_reports: + if osd_report.get('osdspec') in [x.service_id for x in osdspecs]: + osd_reports.append(osd_report) + previews_for_specs.update({host: osd_reports}) + return previews_for_specs + + def _calc_daemon_deps(self, + spec: Optional[ServiceSpec], + daemon_type: str, + daemon_id: str) -> List[str]: + + def get_daemon_names(daemons: List[str]) -> List[str]: + daemon_names = [] + for daemon_type in daemons: + for dd in self.cache.get_daemons_by_type(daemon_type): + daemon_names.append(dd.name()) + return daemon_names + + alertmanager_user, alertmanager_password = self._get_alertmanager_credentials() + prometheus_user, prometheus_password = self._get_prometheus_credentials() + + deps = [] + if daemon_type == 'haproxy': + # because cephadm creates new daemon instances whenever + # port or ip changes, identifying daemons by name is + # sufficient to detect changes. + if not spec: + return [] + ingress_spec = cast(IngressSpec, spec) + assert ingress_spec.backend_service + daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service) + deps = [d.name() for d in daemons] + elif daemon_type == 'keepalived': + # because cephadm creates new daemon instances whenever + # port or ip changes, identifying daemons by name is + # sufficient to detect changes. + if not spec: + return [] + daemons = self.cache.get_daemons_by_service(spec.service_name()) + deps = [d.name() for d in daemons if d.daemon_type == 'haproxy'] + elif daemon_type == 'agent': + root_cert = '' + server_port = '' + try: + server_port = str(self.http_server.agent.server_port) + root_cert = self.http_server.agent.ssl_certs.get_root_cert() + except Exception: + pass + deps = sorted([self.get_mgr_ip(), server_port, root_cert, + str(self.device_enhanced_scan)]) + elif daemon_type == 'iscsi': + if spec: + iscsi_spec = cast(IscsiServiceSpec, spec) + deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)] + else: + deps = [self.get_mgr_ip()] + elif daemon_type == 'prometheus': + # for prometheus we add the active mgr as an explicit dependency, + # this way we force a redeploy after a mgr failover + deps.append(self.get_active_mgr().name()) + deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283))) + deps.append(str(self.service_discovery_port)) + # prometheus yaml configuration file (generated by prometheus.yml.j2) contains + # a scrape_configs section for each service type. This should be included only + # when at least one daemon of the corresponding service is running. Therefore, + # an explicit dependency is added for each service-type to force a reconfig + # whenever the number of daemons for those service-type changes from 0 to greater + # than zero and vice versa. + deps += [s for s in ['node-exporter', 'alertmanager'] + if self.cache.get_daemons_by_service(s)] + if len(self.cache.get_daemons_by_type('ingress')) > 0: + deps.append('ingress') + # add dependency on ceph-exporter daemons + deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')] + if self.secure_monitoring_stack: + if prometheus_user and prometheus_password: + deps.append(f'{hash(prometheus_user + prometheus_password)}') + if alertmanager_user and alertmanager_password: + deps.append(f'{hash(alertmanager_user + alertmanager_password)}') + elif daemon_type == 'grafana': + deps += get_daemon_names(['prometheus', 'loki']) + if self.secure_monitoring_stack and prometheus_user and prometheus_password: + deps.append(f'{hash(prometheus_user + prometheus_password)}') + elif daemon_type == 'alertmanager': + deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway']) + if self.secure_monitoring_stack and alertmanager_user and alertmanager_password: + deps.append(f'{hash(alertmanager_user + alertmanager_password)}') + elif daemon_type == 'promtail': + deps += get_daemon_names(['loki']) + else: + # TODO(redo): some error message! + pass + + if daemon_type in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']: + deps.append(f'secure_monitoring_stack:{self.secure_monitoring_stack}') + + return sorted(deps) + + @forall_hosts + def _remove_daemons(self, name: str, host: str) -> str: + return CephadmServe(self)._remove_daemon(name, host) + + def _check_pool_exists(self, pool: str, service_name: str) -> None: + logger.info(f'Checking pool "{pool}" exists for service {service_name}') + if not self.rados.pool_exists(pool): + raise OrchestratorError(f'Cannot find pool "{pool}" for ' + f'service {service_name}') + + def _add_daemon(self, + daemon_type: str, + spec: ServiceSpec) -> List[str]: + """ + Add (and place) a daemon. Require explicit host placement. Do not + schedule, and do not apply the related scheduling limitations. + """ + if spec.service_name() not in self.spec_store: + raise OrchestratorError('Unable to add a Daemon without Service.\n' + 'Please use `ceph orch apply ...` to create a Service.\n' + 'Note, you might want to create the service with "unmanaged=true"') + + self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) + if not spec.placement.hosts: + raise OrchestratorError('must specify host(s) to deploy on') + count = spec.placement.count or len(spec.placement.hosts) + daemons = self.cache.get_daemons_by_service(spec.service_name()) + return self._create_daemons(daemon_type, spec, daemons, + spec.placement.hosts, count) + + def _create_daemons(self, + daemon_type: str, + spec: ServiceSpec, + daemons: List[DaemonDescription], + hosts: List[HostPlacementSpec], + count: int) -> List[str]: + if count > len(hosts): + raise OrchestratorError('too few hosts: want %d, have %s' % ( + count, hosts)) + + did_config = False + service_type = daemon_type_to_service(daemon_type) + + args = [] # type: List[CephadmDaemonDeploySpec] + for host, network, name in hosts: + daemon_id = self.get_unique_name(daemon_type, host, daemons, + prefix=spec.service_id, + forcename=name) + + if not did_config: + self.cephadm_services[service_type].config(spec) + did_config = True + + daemon_spec = self.cephadm_services[service_type].make_daemon_spec( + host, daemon_id, network, spec, + # NOTE: this does not consider port conflicts! + ports=spec.get_port_start()) + self.log.debug('Placing %s.%s on host %s' % ( + daemon_type, daemon_id, host)) + args.append(daemon_spec) + + # add to daemon list so next name(s) will also be unique + sd = orchestrator.DaemonDescription( + hostname=host, + daemon_type=daemon_type, + daemon_id=daemon_id, + ) + daemons.append(sd) + + @ forall_hosts + def create_func_map(*args: Any) -> str: + daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) + with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): + return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec)) + + return create_func_map(args) + + @handle_orch_error + def add_daemon(self, spec: ServiceSpec) -> List[str]: + ret: List[str] = [] + try: + with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True): + for d_type in service_to_daemon_types(spec.service_type): + ret.extend(self._add_daemon(d_type, spec)) + return ret + except OrchestratorError as e: + self.events.from_orch_error(e) + raise + + def _get_alertmanager_credentials(self) -> Tuple[str, str]: + user = self.get_store(AlertmanagerService.USER_CFG_KEY) + password = self.get_store(AlertmanagerService.PASS_CFG_KEY) + if user is None or password is None: + user = 'admin' + password = 'admin' + self.set_store(AlertmanagerService.USER_CFG_KEY, user) + self.set_store(AlertmanagerService.PASS_CFG_KEY, password) + return (user, password) + + def _get_prometheus_credentials(self) -> Tuple[str, str]: + user = self.get_store(PrometheusService.USER_CFG_KEY) + password = self.get_store(PrometheusService.PASS_CFG_KEY) + if user is None or password is None: + user = 'admin' + password = 'admin' + self.set_store(PrometheusService.USER_CFG_KEY, user) + self.set_store(PrometheusService.PASS_CFG_KEY, password) + return (user, password) + + @handle_orch_error + def set_prometheus_access_info(self, user: str, password: str) -> str: + self.set_store(PrometheusService.USER_CFG_KEY, user) + self.set_store(PrometheusService.PASS_CFG_KEY, password) + return 'prometheus credentials updated correctly' + + @handle_orch_error + def set_alertmanager_access_info(self, user: str, password: str) -> str: + self.set_store(AlertmanagerService.USER_CFG_KEY, user) + self.set_store(AlertmanagerService.PASS_CFG_KEY, password) + return 'alertmanager credentials updated correctly' + + @handle_orch_error + def get_prometheus_access_info(self) -> Dict[str, str]: + user, password = self._get_prometheus_credentials() + return {'user': user, + 'password': password, + 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()} + + @handle_orch_error + def get_alertmanager_access_info(self) -> Dict[str, str]: + user, password = self._get_alertmanager_credentials() + return {'user': user, + 'password': password, + 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()} + + @handle_orch_error + def apply_mon(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + def _apply(self, spec: GenericSpec) -> str: + if spec.service_type == 'host': + return self._add_host(cast(HostSpec, spec)) + + if spec.service_type == 'osd': + # _trigger preview refresh needs to be smart and + # should only refresh if a change has been detected + self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)]) + + return self._apply_service_spec(cast(ServiceSpec, spec)) + + def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]: + """Return a list of candidate hosts according to the placement specification.""" + all_hosts = self.cache.get_schedulable_hosts() + candidates = [] + if placement.hosts: + candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts] + elif placement.label: + candidates = [x.hostname for x in [h for h in all_hosts if placement.label in h.labels]] + elif placement.host_pattern: + candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)] + elif (placement.count is not None or placement.count_per_host is not None): + candidates = [x.hostname for x in all_hosts] + return [h for h in candidates if not self.cache.is_host_draining(h)] + + def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None: + """Validate placement specification for TunedProfileSpec and ClientKeyringSpec.""" + if spec.count is not None: + raise OrchestratorError( + "Placement 'count' field is no supported for this specification.") + if spec.count_per_host is not None: + raise OrchestratorError( + "Placement 'count_per_host' field is no supported for this specification.") + if spec.hosts: + all_hosts = [h.hostname for h in self.inventory.all_specs()] + invalid_hosts = [h.hostname for h in spec.hosts if h.hostname not in all_hosts] + if invalid_hosts: + raise OrchestratorError(f"Found invalid host(s) in placement section: {invalid_hosts}. " + f"Please check 'ceph orch host ls' for available hosts.") + elif not self._get_candidate_hosts(spec): + raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n" + "Please check 'ceph orch host ls' for available hosts.\n" + "Note: draining hosts are excluded from the candidate list.") + + def _validate_tunedprofile_settings(self, spec: TunedProfileSpec) -> Dict[str, List[str]]: + candidate_hosts = spec.placement.filter_matching_hostspecs(self.inventory.all_specs()) + invalid_options: Dict[str, List[str]] = {} + for host in candidate_hosts: + host_sysctl_options = self.cache.get_facts(host).get('sysctl_options', {}) + invalid_options[host] = [] + for option in spec.settings: + if option not in host_sysctl_options: + invalid_options[host].append(option) + return invalid_options + + def _validate_tuned_profile_spec(self, spec: TunedProfileSpec) -> None: + if not spec.settings: + raise OrchestratorError("Invalid spec: settings section cannot be empty.") + self._validate_one_shot_placement_spec(spec.placement) + invalid_options = self._validate_tunedprofile_settings(spec) + if any(e for e in invalid_options.values()): + raise OrchestratorError( + f'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}') + + @handle_orch_error + def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str: + outs = [] + for spec in specs: + self._validate_tuned_profile_spec(spec) + if no_overwrite and self.tuned_profiles.exists(spec.profile_name): + outs.append( + f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)") + else: + # done, let's save the specs + self.tuned_profiles.add_profile(spec) + outs.append(f'Saved tuned profile {spec.profile_name}') + self._kick_serve_loop() + return '\n'.join(outs) + + @handle_orch_error + def rm_tuned_profile(self, profile_name: str) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f'Tuned profile {profile_name} does not exist. Nothing to remove.') + self.tuned_profiles.rm_profile(profile_name) + self._kick_serve_loop() + return f'Removed tuned profile {profile_name}' + + @handle_orch_error + def tuned_profile_ls(self) -> List[TunedProfileSpec]: + return self.tuned_profiles.list_profiles() + + @handle_orch_error + def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f'Tuned profile {profile_name} does not exist. Cannot add setting.') + self.tuned_profiles.add_setting(profile_name, setting, value) + self._kick_serve_loop() + return f'Added setting {setting} with value {value} to tuned profile {profile_name}' + + @handle_orch_error + def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f'Tuned profile {profile_name} does not exist. Cannot remove setting.') + self.tuned_profiles.rm_setting(profile_name, setting) + self._kick_serve_loop() + return f'Removed setting {setting} from tuned profile {profile_name}' + + @handle_orch_error + def service_discovery_dump_cert(self) -> str: + root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT) + if not root_cert: + raise OrchestratorError('No certificate found for service discovery') + return root_cert + + def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None: + self.health_checks[name] = { + 'severity': 'warning', + 'summary': summary, + 'count': count, + 'detail': detail, + } + self.set_health_checks(self.health_checks) + + def remove_health_warning(self, name: str) -> None: + if name in self.health_checks: + del self.health_checks[name] + self.set_health_checks(self.health_checks) + + def _plan(self, spec: ServiceSpec) -> dict: + if spec.service_type == 'osd': + return {'service_name': spec.service_name(), + 'service_type': spec.service_type, + 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} + + svc = self.cephadm_services[spec.service_type] + ha = HostAssignment( + spec=spec, + hosts=self.cache.get_schedulable_hosts(), + unreachable_hosts=self.cache.get_unreachable_hosts(), + draining_hosts=self.cache.get_draining_hosts(), + networks=self.cache.networks, + daemons=self.cache.get_daemons_by_service(spec.service_name()), + allow_colo=svc.allow_colo(), + rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None + ) + ha.validate() + hosts, to_add, to_remove = ha.place() + + return { + 'service_name': spec.service_name(), + 'service_type': spec.service_type, + 'add': [hs.hostname for hs in to_add], + 'remove': [d.name() for d in to_remove] + } + + @handle_orch_error + def plan(self, specs: Sequence[GenericSpec]) -> List: + results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' + 'to the current inventory setup. If any of these conditions change, the \n' + 'preview will be invalid. Please make sure to have a minimal \n' + 'timeframe between planning and applying the specs.'}] + if any([spec.service_type == 'host' for spec in specs]): + return [{'error': 'Found . Previews that include Host Specifications are not supported, yet.'}] + for spec in specs: + results.append(self._plan(cast(ServiceSpec, spec))) + return results + + def _apply_service_spec(self, spec: ServiceSpec) -> str: + if spec.placement.is_empty(): + # fill in default placement + defaults = { + 'mon': PlacementSpec(count=5), + 'mgr': PlacementSpec(count=2), + 'mds': PlacementSpec(count=2), + 'rgw': PlacementSpec(count=2), + 'ingress': PlacementSpec(count=2), + 'iscsi': PlacementSpec(count=1), + 'nvmeof': PlacementSpec(count=1), + 'rbd-mirror': PlacementSpec(count=2), + 'cephfs-mirror': PlacementSpec(count=1), + 'nfs': PlacementSpec(count=1), + 'grafana': PlacementSpec(count=1), + 'alertmanager': PlacementSpec(count=1), + 'prometheus': PlacementSpec(count=1), + 'node-exporter': PlacementSpec(host_pattern='*'), + 'ceph-exporter': PlacementSpec(host_pattern='*'), + 'loki': PlacementSpec(count=1), + 'promtail': PlacementSpec(host_pattern='*'), + 'crash': PlacementSpec(host_pattern='*'), + 'container': PlacementSpec(count=1), + 'snmp-gateway': PlacementSpec(count=1), + 'elasticsearch': PlacementSpec(count=1), + 'jaeger-agent': PlacementSpec(host_pattern='*'), + 'jaeger-collector': PlacementSpec(count=1), + 'jaeger-query': PlacementSpec(count=1) + } + spec.placement = defaults[spec.service_type] + elif spec.service_type in ['mon', 'mgr'] and \ + spec.placement.count is not None and \ + spec.placement.count < 1: + raise OrchestratorError('cannot scale %s service below 1' % ( + spec.service_type)) + + host_count = len(self.inventory.keys()) + max_count = self.max_count_per_host + + if spec.placement.count is not None: + if spec.service_type in ['mon', 'mgr']: + if spec.placement.count > max(5, host_count): + raise OrchestratorError( + (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.')) + elif spec.service_type != 'osd': + if spec.placement.count > (max_count * host_count): + raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).' + + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) + + if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd': + raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.' + + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) + + HostAssignment( + spec=spec, + hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh + unreachable_hosts=self.cache.get_unreachable_hosts(), + draining_hosts=self.cache.get_draining_hosts(), + networks=self.cache.networks, + daemons=self.cache.get_daemons_by_service(spec.service_name()), + allow_colo=self.cephadm_services[spec.service_type].allow_colo(), + ).validate() + + self.log.info('Saving service %s spec with placement %s' % ( + spec.service_name(), spec.placement.pretty_str())) + self.spec_store.save(spec) + self._kick_serve_loop() + return "Scheduled %s update..." % spec.service_name() + + @handle_orch_error + def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]: + results = [] + for spec in specs: + if no_overwrite: + if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory: + results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag' + % (cast(HostSpec, spec).hostname, spec.service_type)) + continue + elif cast(ServiceSpec, spec).service_name() in self.spec_store: + results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag' + % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name())) + continue + results.append(self._apply(spec)) + return results + + @handle_orch_error + def apply_mgr(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_mds(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_rgw(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_ingress(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_iscsi(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_rbd_mirror(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_nfs(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + def _get_dashboard_url(self): + # type: () -> str + return self.get('mgr_map').get('services', {}).get('dashboard', '') + + @handle_orch_error + def apply_prometheus(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_loki(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_promtail(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_node_exporter(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_ceph_exporter(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_crash(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_grafana(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_alertmanager(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_container(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def apply_snmp_gateway(self, spec: ServiceSpec) -> str: + return self._apply(spec) + + @handle_orch_error + def set_unmanaged(self, service_name: str, value: bool) -> str: + return self.spec_store.set_unmanaged(service_name, value) + + @handle_orch_error + def upgrade_check(self, image: str, version: str) -> str: + if self.inventory.get_host_with_state("maintenance"): + raise OrchestratorError("check aborted - you have hosts in maintenance state") + + if version: + target_name = self.container_image_base + ':v' + version + elif image: + target_name = image + else: + raise OrchestratorError('must specify either image or version') + + with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'): + image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name)) + + ceph_image_version = image_info.ceph_version + if not ceph_image_version: + return f'Unable to extract ceph version from {target_name}.' + if ceph_image_version.startswith('ceph version '): + ceph_image_version = ceph_image_version.split(' ')[2] + version_error = self.upgrade._check_target_version(ceph_image_version) + if version_error: + return f'Incompatible upgrade: {version_error}' + + self.log.debug(f'image info {image} -> {image_info}') + r: dict = { + 'target_name': target_name, + 'target_id': image_info.image_id, + 'target_version': image_info.ceph_version, + 'needs_update': dict(), + 'up_to_date': list(), + 'non_ceph_image_daemons': list() + } + for host, dm in self.cache.daemons.items(): + for name, dd in dm.items(): + # check if the container digest for the digest we're checking upgrades for matches + # the container digests for the daemon if "use_repo_digest" setting is true + # or that the image name matches the daemon's image name if "use_repo_digest" + # is false. The idea is to generally check if the daemon is already using + # the image we're checking upgrade to. + if ( + (self.use_repo_digest and dd.matches_digests(image_info.repo_digests)) + or (not self.use_repo_digest and dd.matches_image_name(image)) + ): + r['up_to_date'].append(dd.name()) + elif dd.daemon_type in CEPH_IMAGE_TYPES: + r['needs_update'][dd.name()] = { + 'current_name': dd.container_image_name, + 'current_id': dd.container_image_id, + 'current_version': dd.version, + } + else: + r['non_ceph_image_daemons'].append(dd.name()) + if self.use_repo_digest and image_info.repo_digests: + # FIXME: we assume the first digest is the best one to use + r['target_digest'] = image_info.repo_digests[0] + + return json.dumps(r, indent=4, sort_keys=True) + + @handle_orch_error + def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: + return self.upgrade.upgrade_status() + + @handle_orch_error + def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]: + return self.upgrade.upgrade_ls(image, tags, show_all_versions) + + @handle_orch_error + def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None, + services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: + if self.inventory.get_host_with_state("maintenance"): + raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state") + if self.offline_hosts: + raise OrchestratorError( + f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}") + if daemon_types is not None and services is not None: + raise OrchestratorError('--daemon-types and --services are mutually exclusive') + if daemon_types is not None: + for dtype in daemon_types: + if dtype not in CEPH_UPGRADE_ORDER: + raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n' + f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}') + if services is not None: + for service in services: + if service not in self.spec_store: + raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n' + f'Known services are: {self.spec_store.all_specs.keys()}') + hosts: Optional[List[str]] = None + if host_placement is not None: + all_hosts = list(self.inventory.all_specs()) + placement = PlacementSpec.from_string(host_placement) + hosts = placement.filter_matching_hostspecs(all_hosts) + if not hosts: + raise OrchestratorError( + f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts') + + if limit is not None: + if limit < 1: + raise OrchestratorError( + f'Upgrade aborted - --limit arg must be a positive integer, not {limit}') + + return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit) + + @handle_orch_error + def upgrade_pause(self) -> str: + return self.upgrade.upgrade_pause() + + @handle_orch_error + def upgrade_resume(self) -> str: + return self.upgrade.upgrade_resume() + + @handle_orch_error + def upgrade_stop(self) -> str: + return self.upgrade.upgrade_stop() + + @handle_orch_error + def remove_osds(self, osd_ids: List[str], + replace: bool = False, + force: bool = False, + zap: bool = False, + no_destroy: bool = False) -> str: + """ + Takes a list of OSDs and schedules them for removal. + The function that takes care of the actual removal is + process_removal_queue(). + """ + + daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') + to_remove_daemons = list() + for daemon in daemons: + if daemon.daemon_id in osd_ids: + to_remove_daemons.append(daemon) + if not to_remove_daemons: + return f"Unable to find OSDs: {osd_ids}" + + for daemon in to_remove_daemons: + assert daemon.daemon_id is not None + try: + self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), + replace=replace, + force=force, + zap=zap, + no_destroy=no_destroy, + hostname=daemon.hostname, + process_started_at=datetime_now(), + remove_util=self.to_remove_osds.rm_util)) + except NotFoundError: + return f"Unable to find OSDs: {osd_ids}" + + # trigger the serve loop to initiate the removal + self._kick_serve_loop() + warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n" + "Run the `ceph-volume lvm zap` command with `--destroy`" + " against the VG/LV if you want them to be destroyed.") + return f"Scheduled OSD(s) for removal.{warning_zap}" + + @handle_orch_error + def stop_remove_osds(self, osd_ids: List[str]) -> str: + """ + Stops a `removal` process for a List of OSDs. + This will revert their weight and remove it from the osds_to_remove queue + """ + for osd_id in osd_ids: + try: + self.to_remove_osds.rm(OSD(osd_id=int(osd_id), + remove_util=self.to_remove_osds.rm_util)) + except (NotFoundError, KeyError, ValueError): + return f'Unable to find OSD in the queue: {osd_id}' + + # trigger the serve loop to halt the removal + self._kick_serve_loop() + return "Stopped OSD(s) removal" + + @handle_orch_error + def remove_osds_status(self) -> List[OSD]: + """ + The CLI call to retrieve an osd removal report + """ + return self.to_remove_osds.all_osds() + + @handle_orch_error + def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str: + """ + Drain all daemons from a host. + :param host: host name + """ + + # if we drain the last admin host we could end up removing the only instance + # of the config and keyring and cause issues + if not force: + p = PlacementSpec(label=SpecialHostLabels.ADMIN) + admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs()) + if len(admin_hosts) == 1 and admin_hosts[0] == hostname: + raise OrchestratorValidationError(f"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'" + " label.\nDraining this host could cause the removal" + " of the last cluster config/keyring managed by cephadm.\n" + f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host" + " before completing this operation.\nIf you're certain this is" + " what you want rerun this command with --force.") + + self.add_host_label(hostname, '_no_schedule') + if not keep_conf_keyring: + self.add_host_label(hostname, SpecialHostLabels.DRAIN_CONF_KEYRING) + + daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname) + + osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd'] + self.remove_osds(osds_to_remove, zap=zap_osd_devices) + + daemons_table = "" + daemons_table += "{:<20} {:<15}\n".format("type", "id") + daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) + for d in daemons: + daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) + + return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table) + + def trigger_connect_dashboard_rgw(self) -> None: + self.need_connect_dashboard_rgw = True + self.event.set() -- cgit v1.2.3