summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/serve.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/cephadm/serve.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/cephadm/serve.py1680
1 files changed, 1680 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py
new file mode 100644
index 000000000..5dfdc27a3
--- /dev/null
+++ b/src/pybind/mgr/cephadm/serve.py
@@ -0,0 +1,1680 @@
+import ipaddress
+import hashlib
+import json
+import logging
+import uuid
+import os
+from collections import defaultdict
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
+ DefaultDict, Callable
+
+from ceph.deployment import inventory
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.service_spec import (
+ ArgumentList,
+ ArgumentSpec,
+ CustomContainerSpec,
+ PlacementSpec,
+ RGWSpec,
+ ServiceSpec,
+ IngressSpec,
+)
+from ceph.utils import datetime_now
+
+import orchestrator
+from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
+ DaemonDescriptionStatus, daemon_type_to_service
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
+from cephadm.schedule import HostAssignment
+from cephadm.autotune import MemoryAutotuner
+from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
+ CephadmNoImage, CEPH_TYPES, ContainerInspectInfo, SpecialHostLabels
+from mgr_module import MonCommandFailed
+from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
+
+from . import utils
+from . import exchange
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
+
+
+class CephadmServe:
+ """
+ This module contains functions that are executed in the
+ serve() thread. Thus they don't block the CLI.
+
+ Please see the `Note regarding network calls from CLI handlers`
+ chapter in the cephadm developer guide.
+
+ On the other hand, These function should *not* be called form
+ CLI handlers, to avoid blocking the CLI
+ """
+
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr: "CephadmOrchestrator" = mgr
+ self.log = logger
+
+ def serve(self) -> None:
+ """
+ The main loop of cephadm.
+
+ A command handler will typically change the declarative state
+ of cephadm. This loop will then attempt to apply this new state.
+ """
+ self.log.debug("serve starting")
+ self.mgr.config_checker.load_network_config()
+
+ while self.mgr.run:
+ self.log.debug("serve loop start")
+
+ try:
+
+ self.convert_tags_to_repo_digest()
+
+ # refresh daemons
+ self.log.debug('refreshing hosts and daemons')
+ self._refresh_hosts_and_daemons()
+
+ self._check_for_strays()
+
+ self._update_paused_health()
+
+ if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
+ self.mgr.need_connect_dashboard_rgw = False
+ if 'dashboard' in self.mgr.get('mgr_map')['modules']:
+ self.log.info('Checking dashboard <-> RGW credentials')
+ self.mgr.remote('dashboard', 'set_rgw_credentials')
+
+ if not self.mgr.paused:
+ self._run_async_actions()
+
+ self.mgr.to_remove_osds.process_removal_queue()
+
+ self.mgr.migration.migrate()
+ if self.mgr.migration.is_migration_ongoing():
+ continue
+
+ if self._apply_all_services():
+ continue # did something, refresh
+
+ self._check_daemons()
+
+ self._check_certificates()
+
+ self._purge_deleted_services()
+
+ self._check_for_moved_osds()
+
+ if self.mgr.agent_helpers._handle_use_agent_setting():
+ continue
+
+ if self.mgr.upgrade.continue_upgrade():
+ continue
+
+ except OrchestratorError as e:
+ if e.event_subject:
+ self.mgr.events.from_orch_error(e)
+
+ self.log.debug("serve loop sleep")
+ self._serve_sleep()
+ self.log.debug("serve loop wake")
+ self.log.debug("serve exit")
+
+ def _check_certificates(self) -> None:
+ for d in self.mgr.cache.get_daemons_by_type('grafana'):
+ cert = self.mgr.get_store(f'{d.hostname}/grafana_crt')
+ key = self.mgr.get_store(f'{d.hostname}/grafana_key')
+ if (not cert or not cert.strip()) and (not key or not key.strip()):
+ # certificate/key are empty... nothing to check
+ return
+
+ try:
+ get_cert_issuer_info(cert)
+ verify_tls(cert, key)
+ self.mgr.remove_health_warning('CEPHADM_CERT_ERROR')
+ except ServerConfigException as e:
+ err_msg = f"""
+ Detected invalid grafana certificates. Please, use the following commands:
+
+ > ceph config-key set mgr/cephadm/{d.hostname}/grafana_crt -i <path-to-ctr-file>
+ > ceph config-key set mgr/cephadm/{d.hostname}/grafana_key -i <path-to-key-file>
+
+ to set valid key and certificate or reset their value to an empty string
+ in case you want cephadm to generate self-signed Grafana certificates.
+
+ Once done, run the following command to reconfig the daemon:
+
+ > ceph orch daemon reconfig grafana.{d.hostname}
+
+ """
+ self.log.error(f'Detected invalid grafana certificate on host {d.hostname}: {e}')
+ self.mgr.set_health_warning('CEPHADM_CERT_ERROR',
+ f'Invalid grafana certificate on host {d.hostname}: {e}',
+ 1, [err_msg])
+ break
+
+ def _serve_sleep(self) -> None:
+ sleep_interval = max(
+ 30,
+ min(
+ self.mgr.host_check_interval,
+ self.mgr.facts_cache_timeout,
+ self.mgr.daemon_cache_timeout,
+ self.mgr.device_cache_timeout,
+ )
+ )
+ self.log.debug('Sleeping for %d seconds', sleep_interval)
+ self.mgr.event.wait(sleep_interval)
+ self.mgr.event.clear()
+
+ def _update_paused_health(self) -> None:
+ self.log.debug('_update_paused_health')
+ if self.mgr.paused:
+ self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
+ "'ceph orch resume' to resume"])
+ else:
+ self.mgr.remove_health_warning('CEPHADM_PAUSED')
+
+ def _autotune_host_memory(self, host: str) -> None:
+ total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
+ if not total_mem:
+ val = None
+ else:
+ total_mem *= 1024 # kb -> bytes
+ total_mem *= self.mgr.autotune_memory_target_ratio
+ a = MemoryAutotuner(
+ daemons=self.mgr.cache.get_daemons_by_host(host),
+ config_get=self.mgr.get_foreign_ceph_option,
+ total_mem=total_mem,
+ )
+ val, osds = a.tune()
+ any_changed = False
+ for o in osds:
+ if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': o,
+ 'name': 'osd_memory_target',
+ })
+ any_changed = True
+ if val is not None:
+ if any_changed:
+ self.mgr.log.info(
+ f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
+ )
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'config set',
+ 'who': f'osd/host:{host.split(".")[0]}',
+ 'name': 'osd_memory_target',
+ 'value': str(val),
+ })
+ if ret:
+ self.log.warning(
+ f'Unable to set osd_memory_target on {host} to {val}: {err}'
+ )
+ else:
+ # if osd memory autotuning is off, we don't want to remove these config
+ # options as users may be using them. Since there is no way to set autotuning
+ # on/off at a host level, best we can do is check if it is globally on.
+ if self.mgr.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'):
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': f'osd/host:{host.split(".")[0]}',
+ 'name': 'osd_memory_target',
+ })
+ self.mgr.cache.update_autotune(host)
+
+ def _refresh_hosts_and_daemons(self) -> None:
+ self.log.debug('_refresh_hosts_and_daemons')
+ bad_hosts = []
+ failures = []
+ agents_down: List[str] = []
+
+ @forall_hosts
+ def refresh(host: str) -> None:
+
+ # skip hosts that are in maintenance - they could be powered off
+ if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
+ return
+
+ if self.mgr.use_agent:
+ if self.mgr.agent_helpers._check_agent(host):
+ agents_down.append(host)
+
+ if self.mgr.cache.host_needs_check(host):
+ r = self._check_host(host)
+ if r is not None:
+ bad_hosts.append(r)
+
+ if (
+ not self.mgr.use_agent
+ or self.mgr.cache.is_host_draining(host)
+ or host in agents_down
+ ):
+ if self.mgr.cache.host_needs_daemon_refresh(host):
+ self.log.debug('refreshing %s daemons' % host)
+ r = self._refresh_host_daemons(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_facts_refresh(host):
+ self.log.debug(('Refreshing %s facts' % host))
+ r = self._refresh_facts(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_network_refresh(host):
+ self.log.debug(('Refreshing %s networks' % host))
+ r = self._refresh_host_networks(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_device_refresh(host):
+ self.log.debug('refreshing %s devices' % host)
+ r = self._refresh_host_devices(host)
+ if r:
+ failures.append(r)
+ self.mgr.cache.metadata_up_to_date[host] = True
+ elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
+ if self.mgr.cache.host_needs_daemon_refresh(host):
+ self.log.debug('refreshing %s daemons' % host)
+ r = self._refresh_host_daemons(host)
+ if r:
+ failures.append(r)
+ self.mgr.cache.metadata_up_to_date[host] = True
+
+ if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
+ self.log.debug(f"Logging `{host}` into custom registry")
+ with self.mgr.async_timeout_handler(host, 'cephadm registry-login'):
+ r = self.mgr.wait_async(self._registry_login(
+ host, json.loads(str(self.mgr.get_store('registry_credentials')))))
+ if r:
+ bad_hosts.append(r)
+
+ if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
+ self.log.debug(f"refreshing OSDSpec previews for {host}")
+ r = self._refresh_host_osdspec_previews(host)
+ if r:
+ failures.append(r)
+
+ if (
+ self.mgr.cache.host_needs_autotune_memory(host)
+ and not self.mgr.inventory.has_label(host, SpecialHostLabels.NO_MEMORY_AUTOTUNE)
+ ):
+ self.log.debug(f"autotuning memory for {host}")
+ self._autotune_host_memory(host)
+
+ refresh(self.mgr.cache.get_hosts())
+
+ self._write_all_client_files()
+
+ self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
+ self.mgr.http_server.config_update()
+
+ self.mgr.config_checker.run_checks()
+
+ for k in [
+ 'CEPHADM_HOST_CHECK_FAILED',
+ 'CEPHADM_REFRESH_FAILED',
+ ]:
+ self.mgr.remove_health_warning(k)
+ if bad_hosts:
+ self.mgr.set_health_warning(
+ 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
+ if failures:
+ self.mgr.set_health_warning(
+ 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
+ self.mgr.update_failed_daemon_health_check()
+
+ def _check_host(self, host: str) -> Optional[str]:
+ if host not in self.mgr.inventory:
+ return None
+ self.log.debug(' checking %s' % host)
+ try:
+ addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
+ with self.mgr.async_timeout_handler(host, 'cephadm check-host'):
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
+ host, cephadmNoImage, 'check-host', [],
+ error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ self.mgr.cache.update_last_host_check(host)
+ self.mgr.cache.save_host(host)
+ if code:
+ self.log.debug(' host %s (%s) failed check' % (host, addr))
+ if self.mgr.warn_on_failed_host_check:
+ return 'host %s (%s) failed check: %s' % (host, addr, err)
+ else:
+ self.log.debug(' host %s (%s) ok' % (host, addr))
+ except Exception as e:
+ self.log.debug(' host %s (%s) failed check' % (host, addr))
+ return 'host %s (%s) failed check: %s' % (host, addr, e)
+ return None
+
+ def _refresh_host_daemons(self, host: str) -> Optional[str]:
+ try:
+ with self.mgr.async_timeout_handler(host, 'cephadm ls'):
+ ls = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'mon', 'ls', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ except OrchestratorError as e:
+ return str(e)
+ self.mgr._process_ls_output(host, ls)
+ return None
+
+ def _refresh_facts(self, host: str) -> Optional[str]:
+ try:
+ with self.mgr.async_timeout_handler(host, 'cephadm gather-facts'):
+ val = self.mgr.wait_async(self._run_cephadm_json(
+ host, cephadmNoImage, 'gather-facts', [],
+ no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ except OrchestratorError as e:
+ return str(e)
+
+ self.mgr.cache.update_host_facts(host, val)
+
+ return None
+
+ def _refresh_host_devices(self, host: str) -> Optional[str]:
+ with_lsm = self.mgr.device_enhanced_scan
+ list_all = self.mgr.inventory_list_all
+ inventory_args = ['--', 'inventory',
+ '--format=json-pretty',
+ '--filter-for-batch']
+ if with_lsm:
+ inventory_args.insert(-1, "--with-lsm")
+ if list_all:
+ inventory_args.insert(-1, "--list-all")
+
+ try:
+ try:
+ with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
+ devices = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'osd', 'ceph-volume', inventory_args, log_output=self.mgr.log_refresh_metadata))
+ except OrchestratorError as e:
+ if 'unrecognized arguments: --filter-for-batch' in str(e):
+ rerun_args = inventory_args.copy()
+ rerun_args.remove('--filter-for-batch')
+ with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
+ devices = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'osd', 'ceph-volume', rerun_args, log_output=self.mgr.log_refresh_metadata))
+ else:
+ raise
+
+ except OrchestratorError as e:
+ return str(e)
+
+ self.log.debug('Refreshed host %s devices (%d)' % (
+ host, len(devices)))
+ ret = inventory.Devices.from_json(devices)
+ self.mgr.cache.update_host_devices(host, ret.devices)
+ self.update_osdspec_previews(host)
+ self.mgr.cache.save_host(host)
+ return None
+
+ def _refresh_host_networks(self, host: str) -> Optional[str]:
+ try:
+ with self.mgr.async_timeout_handler(host, 'cephadm list-networks'):
+ networks = self.mgr.wait_async(self._run_cephadm_json(
+ host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
+ except OrchestratorError as e:
+ return str(e)
+
+ self.log.debug('Refreshed host %s networks (%s)' % (
+ host, len(networks)))
+ self.mgr.cache.update_host_networks(host, networks)
+ self.mgr.cache.save_host(host)
+ return None
+
+ def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
+ self.update_osdspec_previews(host)
+ self.mgr.cache.save_host(host)
+ self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
+ return None
+
+ def update_osdspec_previews(self, search_host: str = '') -> None:
+ # Set global 'pending' flag for host
+ self.mgr.cache.loading_osdspec_preview.add(search_host)
+ previews = []
+ # query OSDSpecs for host <search host> and generate/get the preview
+ # There can be multiple previews for one host due to multiple OSDSpecs.
+ previews.extend(self.mgr.osd_service.get_previews(search_host))
+ self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
+ self.mgr.cache.osdspec_previews[search_host] = previews
+ # Unset global 'pending' flag for host
+ self.mgr.cache.loading_osdspec_preview.remove(search_host)
+
+ def _run_async_actions(self) -> None:
+ while self.mgr.scheduled_async_actions:
+ (self.mgr.scheduled_async_actions.pop(0))()
+
+ def _check_for_strays(self) -> None:
+ self.log.debug('_check_for_strays')
+ for k in ['CEPHADM_STRAY_HOST',
+ 'CEPHADM_STRAY_DAEMON']:
+ self.mgr.remove_health_warning(k)
+ if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
+ ls = self.mgr.list_servers()
+ self.log.debug(ls)
+ managed = self.mgr.cache.get_daemon_names()
+ host_detail = [] # type: List[str]
+ host_num_daemons = 0
+ daemon_detail = [] # type: List[str]
+ for item in ls:
+ host = item.get('hostname')
+ assert isinstance(host, str)
+ daemons = item.get('services') # misnomer!
+ assert isinstance(daemons, list)
+ missing_names = []
+ for s in daemons:
+ daemon_id = s.get('id')
+ assert daemon_id
+ name = '%s.%s' % (s.get('type'), daemon_id)
+ if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
+ metadata = self.mgr.get_metadata(
+ cast(str, s.get('type')), daemon_id, {})
+ assert metadata is not None
+ try:
+ if s.get('type') == 'rgw-nfs':
+ # https://tracker.ceph.com/issues/49573
+ name = metadata['id'][:-4]
+ else:
+ name = '%s.%s' % (s.get('type'), metadata['id'])
+ except (KeyError, TypeError):
+ self.log.debug(
+ "Failed to find daemon id for %s service %s" % (
+ s.get('type'), s.get('id')
+ )
+ )
+ if s.get('type') == 'tcmu-runner':
+ # because we don't track tcmu-runner daemons in the host cache
+ # and don't have a way to check if the daemon is part of iscsi service
+ # we assume that all tcmu-runner daemons are managed by cephadm
+ managed.append(name)
+ if host not in self.mgr.inventory:
+ missing_names.append(name)
+ host_num_daemons += 1
+ if name not in managed:
+ daemon_detail.append(
+ 'stray daemon %s on host %s not managed by cephadm' % (name, host))
+ if missing_names:
+ host_detail.append(
+ 'stray host %s has %d stray daemons: %s' % (
+ host, len(missing_names), missing_names))
+ if self.mgr.warn_on_stray_hosts and host_detail:
+ self.mgr.set_health_warning(
+ 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
+ if self.mgr.warn_on_stray_daemons and daemon_detail:
+ self.mgr.set_health_warning(
+ 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+
+ def _check_for_moved_osds(self) -> None:
+ self.log.debug('_check_for_moved_osds')
+ all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
+ for dd in self.mgr.cache.get_daemons_by_type('osd'):
+ assert dd.daemon_id
+ all_osds[int(dd.daemon_id)].append(dd)
+ for osd_id, dds in all_osds.items():
+ if len(dds) <= 1:
+ continue
+ running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
+ error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
+ msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
+ logger.info(msg)
+ if len(running) != 1:
+ continue
+ osd = self.mgr.get_osd_by_id(osd_id)
+ if not osd or not osd['up']:
+ continue
+ for e in error:
+ assert e.hostname
+ try:
+ self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
+ self.mgr.events.for_daemon(
+ e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
+ except OrchestratorError as ex:
+ self.mgr.events.from_orch_error(ex)
+ logger.exception(f'failed to remove duplicated daemon {e}')
+
+ def _apply_all_services(self) -> bool:
+ self.log.debug('_apply_all_services')
+ r = False
+ specs = [] # type: List[ServiceSpec]
+ # if metadata is not up to date, we still need to apply spec for agent
+ # since the agent is the one who gather the metadata. If we don't we
+ # end up stuck between wanting metadata to be up to date to apply specs
+ # and needing to apply the agent spec to get up to date metadata
+ if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+ self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
+ try:
+ specs.append(self.mgr.spec_store['agent'].spec)
+ except Exception as e:
+ self.log.debug(f'Failed to find agent spec: {e}')
+ self.mgr.agent_helpers._apply_agent()
+ return r
+ else:
+ _specs: List[ServiceSpec] = []
+ for sn, spec in self.mgr.spec_store.active_specs.items():
+ _specs.append(spec)
+ # apply specs that don't use count first sice their placement is deterministic
+ # and not dependant on other daemon's placements in any way
+ specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count]
+
+ for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
+ self.mgr.remove_health_warning(name)
+ self.mgr.apply_spec_fails = []
+ for spec in specs:
+ try:
+ if self._apply_service(spec):
+ r = True
+ except Exception as e:
+ msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
+ self.log.exception(msg)
+ self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
+ self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
+ warnings = []
+ for x in self.mgr.apply_spec_fails:
+ warnings.append(f'{x[0]}: {x[1]}')
+ self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
+ f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
+ len(self.mgr.apply_spec_fails),
+ warnings)
+ self.mgr.update_watched_hosts()
+ self.mgr.tuned_profile_utils._write_all_tuned_profiles()
+ return r
+
+ def _apply_service_config(self, spec: ServiceSpec) -> None:
+ if spec.config:
+ section = utils.name_to_config_section(spec.service_name())
+ for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
+ self.mgr.remove_health_warning(name)
+ invalid_config_options = []
+ options_failed_to_set = []
+ for k, v in spec.config.items():
+ try:
+ current = self.mgr.get_foreign_ceph_option(section, k)
+ except KeyError:
+ msg = f'Ignoring invalid {spec.service_name()} config option {k}'
+ self.log.warning(msg)
+ self.mgr.events.for_service(
+ spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
+ )
+ invalid_config_options.append(msg)
+ continue
+ if current != v:
+ self.log.debug(f'setting [{section}] {k} = {v}')
+ try:
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'name': k,
+ 'value': str(v),
+ 'who': section,
+ })
+ except MonCommandFailed as e:
+ msg = f'Failed to set {spec.service_name()} option {k}: {e}'
+ self.log.warning(msg)
+ options_failed_to_set.append(msg)
+
+ if invalid_config_options:
+ self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
+ invalid_config_options), invalid_config_options)
+ if options_failed_to_set:
+ self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
+ options_failed_to_set), options_failed_to_set)
+
+ def _update_rgw_endpoints(self, rgw_spec: RGWSpec) -> None:
+
+ if not rgw_spec.update_endpoints or rgw_spec.rgw_realm_token is None:
+ return
+
+ ep = []
+ protocol = 'https' if rgw_spec.ssl else 'http'
+ for s in self.mgr.cache.get_daemons_by_service(rgw_spec.service_name()):
+ if s.ports:
+ for p in s.ports:
+ ep.append(f'{protocol}://{s.hostname}:{p}')
+ zone_update_cmd = {
+ 'prefix': 'rgw zone modify',
+ 'realm_name': rgw_spec.rgw_realm,
+ 'zonegroup_name': rgw_spec.rgw_zonegroup,
+ 'zone_name': rgw_spec.rgw_zone,
+ 'realm_token': rgw_spec.rgw_realm_token,
+ 'zone_endpoints': ep,
+ }
+ self.log.debug(f'rgw cmd: {zone_update_cmd}')
+ rc, out, err = self.mgr.mon_command(zone_update_cmd)
+ rgw_spec.update_endpoints = (rc != 0) # keep trying on failure
+ if rc != 0:
+ self.log.error(f'Error when trying to update rgw zone: {err}')
+ self.mgr.set_health_warning('CEPHADM_RGW', 'Cannot update rgw endpoints, error: {err}', 1,
+ [f'Cannot update rgw endpoints for daemon {rgw_spec.service_name()}, error: {err}'])
+ else:
+ self.mgr.remove_health_warning('CEPHADM_RGW')
+
+ def _apply_service(self, spec: ServiceSpec) -> bool:
+ """
+ Schedule a service. Deploy new daemons or remove old ones, depending
+ on the target label and count specified in the placement.
+ """
+ self.mgr.migration.verify_no_migration()
+
+ service_type = spec.service_type
+ service_name = spec.service_name()
+ if spec.unmanaged:
+ self.log.debug('Skipping unmanaged service %s' % service_name)
+ return False
+ if spec.preview_only:
+ self.log.debug('Skipping preview_only service %s' % service_name)
+ return False
+ self.log.debug('Applying service %s spec' % service_name)
+
+ if service_type == 'agent':
+ try:
+ assert self.mgr.http_server.agent
+ assert self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ except Exception:
+ self.log.info(
+ 'Delaying applying agent spec until cephadm endpoint root cert created')
+ return False
+
+ self._apply_service_config(spec)
+
+ if service_type == 'osd':
+ self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
+ # TODO: return True would result in a busy loop
+ # can't know if daemon count changed; create_from_spec doesn't
+ # return a solid indication
+ return False
+
+ svc = self.mgr.cephadm_services[service_type]
+ daemons = self.mgr.cache.get_daemons_by_service(service_name)
+ related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
+
+ public_networks: List[str] = []
+ if service_type == 'mon':
+ out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
+ if '/' in out:
+ public_networks = [x.strip() for x in out.split(',')]
+ self.log.debug('mon public_network(s) is %s' % public_networks)
+
+ def matches_public_network(host: str, sspec: ServiceSpec) -> bool:
+ # make sure the host has at least one network that belongs to some configured public network(s)
+ for pn in public_networks:
+ public_network = ipaddress.ip_network(pn)
+ for hn in self.mgr.cache.networks[host]:
+ host_network = ipaddress.ip_network(hn)
+ if host_network.overlaps(public_network):
+ return True
+
+ host_networks = ','.join(self.mgr.cache.networks[host])
+ pub_networks = ','.join(public_networks)
+ self.log.info(
+ f"Filtered out host {host}: does not belong to mon public_network(s): "
+ f" {pub_networks}, host network(s): {host_networks}"
+ )
+ return False
+
+ def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
+ # make sure the host has an interface that can
+ # actually accomodate the VIP
+ if not sspec or sspec.service_type != 'ingress':
+ return True
+ ingress_spec = cast(IngressSpec, sspec)
+ virtual_ips = []
+ if ingress_spec.virtual_ip:
+ virtual_ips.append(ingress_spec.virtual_ip)
+ elif ingress_spec.virtual_ips_list:
+ virtual_ips = ingress_spec.virtual_ips_list
+ for vip in virtual_ips:
+ found = False
+ bare_ip = str(vip).split('/')[0]
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+ # found matching interface for this IP, move on
+ self.log.debug(
+ f'{bare_ip} is in {subnet} on {host} interface {list(ifaces.keys())[0]}'
+ )
+ found = True
+ break
+ if not found:
+ self.log.info(
+ f"Filtered out host {host}: Host has no interface available for VIP: {vip}"
+ )
+ return False
+ return True
+
+ host_filters: Dict[str, Callable[[str, ServiceSpec], bool]] = {
+ 'mon': matches_public_network,
+ 'ingress': has_interface_for_vip
+ }
+
+ rank_map = None
+ if svc.ranked():
+ rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
+ ha = HostAssignment(
+ spec=spec,
+ hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
+ ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+ draining_hosts=self.mgr.cache.get_draining_hosts(),
+ daemons=daemons,
+ related_service_daemons=related_service_daemons,
+ networks=self.mgr.cache.networks,
+ filter_new_host=host_filters.get(service_type, None),
+ allow_colo=svc.allow_colo(),
+ primary_daemon_type=svc.primary_daemon_type(spec),
+ per_host_daemon_type=svc.per_host_daemon_type(spec),
+ rank_map=rank_map,
+ )
+
+ try:
+ all_slots, slots_to_add, daemons_to_remove = ha.place()
+ daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
+ 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
+ self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
+ except OrchestratorError as e:
+ msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
+ self.log.error(msg)
+ self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
+ self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
+ warnings = []
+ for x in self.mgr.apply_spec_fails:
+ warnings.append(f'{x[0]}: {x[1]}')
+ self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
+ f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
+ len(self.mgr.apply_spec_fails),
+ warnings)
+ return False
+
+ r = None
+
+ # sanity check
+ final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
+ if service_type in ['mon', 'mgr'] and final_count < 1:
+ self.log.debug('cannot scale mon|mgr below 1)')
+ return False
+
+ # progress
+ progress_id = str(uuid.uuid4())
+ delta: List[str] = []
+ if slots_to_add:
+ delta += [f'+{len(slots_to_add)}']
+ if daemons_to_remove:
+ delta += [f'-{len(daemons_to_remove)}']
+ progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
+ progress_total = len(slots_to_add) + len(daemons_to_remove)
+ progress_done = 0
+
+ def update_progress() -> None:
+ self.mgr.remote(
+ 'progress', 'update', progress_id,
+ ev_msg=progress_title,
+ ev_progress=(progress_done / progress_total),
+ add_to_ceph_s=True,
+ )
+
+ if progress_total:
+ update_progress()
+
+ self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
+ self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
+
+ hosts_altered: Set[str] = set()
+
+ try:
+ # assign names
+ for i in range(len(slots_to_add)):
+ slot = slots_to_add[i]
+ slot = slot.assign_name(self.mgr.get_unique_name(
+ slot.daemon_type,
+ slot.hostname,
+ [d for d in daemons if d not in daemons_to_remove],
+ prefix=spec.service_id,
+ forcename=slot.name,
+ rank=slot.rank,
+ rank_generation=slot.rank_generation,
+ ))
+ slots_to_add[i] = slot
+ if rank_map is not None:
+ assert slot.rank is not None
+ assert slot.rank_generation is not None
+ assert rank_map[slot.rank][slot.rank_generation] is None
+ rank_map[slot.rank][slot.rank_generation] = slot.name
+
+ if rank_map:
+ # record the rank_map before we make changes so that if we fail the
+ # next mgr will clean up.
+ self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
+
+ # remove daemons now, since we are going to fence them anyway
+ for d in daemons_to_remove:
+ assert d.hostname is not None
+ self._remove_daemon(d.name(), d.hostname)
+ daemons_to_remove = []
+
+ # fence them
+ svc.fence_old_ranks(spec, rank_map, len(all_slots))
+
+ # create daemons
+ daemon_place_fails = []
+ for slot in slots_to_add:
+ # first remove daemon with conflicting port or name?
+ if slot.ports or slot.name in [d.name() for d in daemons_to_remove]:
+ for d in daemons_to_remove:
+ if (
+ d.hostname != slot.hostname
+ or not (set(d.ports or []) & set(slot.ports))
+ or (d.ip and slot.ip and d.ip != slot.ip)
+ and d.name() != slot.name
+ ):
+ continue
+ if d.name() != slot.name:
+ self.log.info(
+ f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict'
+ )
+ # NOTE: we don't check ok-to-stop here to avoid starvation if
+ # there is only 1 gateway.
+ self._remove_daemon(d.name(), d.hostname)
+ daemons_to_remove.remove(d)
+ progress_done += 1
+ hosts_altered.add(d.hostname)
+ break
+
+ # deploy new daemon
+ daemon_id = slot.name
+
+ daemon_spec = svc.make_daemon_spec(
+ slot.hostname, daemon_id, slot.network, spec,
+ daemon_type=slot.daemon_type,
+ ports=slot.ports,
+ ip=slot.ip,
+ rank=slot.rank,
+ rank_generation=slot.rank_generation,
+ )
+ self.log.debug('Placing %s.%s on host %s' % (
+ slot.daemon_type, daemon_id, slot.hostname))
+
+ try:
+ daemon_spec = svc.prepare_create(daemon_spec)
+ with self.mgr.async_timeout_handler(slot.hostname, f'cephadm deploy ({daemon_spec.daemon_type} type dameon)'):
+ self.mgr.wait_async(self._create_daemon(daemon_spec))
+ r = True
+ progress_done += 1
+ update_progress()
+ hosts_altered.add(daemon_spec.host)
+ self.mgr.spec_store.mark_needs_configuration(spec.service_name())
+ except (RuntimeError, OrchestratorError) as e:
+ msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
+ f"on {slot.hostname}: {e}")
+ self.mgr.events.for_service(spec, 'ERROR', msg)
+ self.mgr.log.error(msg)
+ daemon_place_fails.append(msg)
+ # only return "no change" if no one else has already succeeded.
+ # later successes will also change to True
+ if r is None:
+ r = False
+ progress_done += 1
+ update_progress()
+ continue
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=slot.hostname,
+ daemon_type=slot.daemon_type,
+ daemon_id=daemon_id,
+ service_name=spec.service_name()
+ )
+ daemons.append(sd)
+ self.mgr.cache.append_tmp_daemon(slot.hostname, sd)
+
+ if daemon_place_fails:
+ self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
+ daemon_place_fails), daemon_place_fails)
+
+ if service_type == 'mgr':
+ active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr'))
+ if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]:
+ # We can't just remove the active mgr like any other daemon.
+ # Need to fail over later so it can be removed on next pass.
+ # This can be accomplished by scheduling a restart of the active mgr.
+ self.mgr._schedule_daemon_action(active_mgr.name(), 'restart')
+
+ if service_type == 'rgw':
+ self._update_rgw_endpoints(cast(RGWSpec, spec))
+
+ # remove any?
+ def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
+ daemon_ids = [d.daemon_id for d in remove_daemons]
+ assert None not in daemon_ids
+ # setting force flag retains previous behavior
+ r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
+ return not r.retval
+
+ while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
+ # let's find a subset that is ok-to-stop
+ non_error_daemon_index = -1
+ # prioritize removing daemons in error state
+ for i, dmon in enumerate(daemons_to_remove):
+ if dmon.status != DaemonDescriptionStatus.error:
+ non_error_daemon_index = i
+ break
+ if non_error_daemon_index != -1:
+ daemons_to_remove.pop(non_error_daemon_index)
+ else:
+ # all daemons in list are in error state
+ # we should be able to remove all of them
+ break
+ for d in daemons_to_remove:
+ r = True
+ assert d.hostname is not None
+ self._remove_daemon(d.name(), d.hostname)
+
+ progress_done += 1
+ update_progress()
+ hosts_altered.add(d.hostname)
+ self.mgr.spec_store.mark_needs_configuration(spec.service_name())
+
+ self.mgr.remote('progress', 'complete', progress_id)
+ except Exception as e:
+ self.mgr.remote('progress', 'fail', progress_id, str(e))
+ raise
+ finally:
+ if self.mgr.spec_store.needs_configuration(spec.service_name()):
+ svc.config(spec)
+ self.mgr.spec_store.mark_configured(spec.service_name())
+ if self.mgr.use_agent:
+ # can only send ack to agents if we know for sure port they bound to
+ hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and not self.mgr.cache.is_host_draining(h))])
+ self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
+
+ if r is None:
+ r = False
+ return r
+
+ def _check_daemons(self) -> None:
+ self.log.debug('_check_daemons')
+ daemons = self.mgr.cache.get_daemons()
+ daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
+ for dd in daemons:
+ # orphan?
+ spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
+ assert dd.hostname is not None
+ assert dd.daemon_type is not None
+ assert dd.daemon_id is not None
+
+ # any action we can try will fail for a daemon on an offline host,
+ # including removing the daemon
+ if dd.hostname in self.mgr.offline_hosts:
+ continue
+
+ if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
+ # (mon and mgr specs should always exist; osds aren't matched
+ # to a service spec)
+ self.log.info('Removing orphan daemon %s...' % dd.name())
+ self._remove_daemon(dd.name(), dd.hostname)
+
+ # ignore unmanaged services
+ if spec and spec.unmanaged:
+ continue
+
+ # ignore daemons for deleted services
+ if dd.service_name() in self.mgr.spec_store.spec_deleted:
+ continue
+
+ if dd.daemon_type == 'agent':
+ try:
+ self.mgr.agent_helpers._check_agent(dd.hostname)
+ except Exception as e:
+ self.log.debug(
+ f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
+ continue
+
+ # These daemon types require additional configs after creation
+ if dd.daemon_type in REQUIRES_POST_ACTIONS:
+ daemons_post[dd.daemon_type].append(dd)
+
+ if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
+ self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
+ dd.is_active = True
+ else:
+ dd.is_active = False
+
+ deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
+ last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
+ dd.hostname, dd.name())
+ if last_deps is None:
+ last_deps = []
+ action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
+ if not last_config:
+ self.log.info('Reconfiguring %s (unknown last config time)...' % (
+ dd.name()))
+ action = 'reconfig'
+ elif last_deps != deps:
+ self.log.debug(f'{dd.name()} deps {last_deps} -> {deps}')
+ self.log.info(f'Reconfiguring {dd.name()} (dependencies changed)...')
+ action = 'reconfig'
+ # we need only redeploy if secure_monitoring_stack value has changed:
+ if dd.daemon_type in ['prometheus', 'node-exporter', 'alertmanager']:
+ diff = list(set(last_deps) - set(deps))
+ if any('secure_monitoring_stack' in e for e in diff):
+ action = 'redeploy'
+
+ elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
+ self.log.debug(
+ f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
+ self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
+ dd.extra_container_args = spec.extra_container_args
+ action = 'redeploy'
+ elif spec is not None and hasattr(spec, 'extra_entrypoint_args') and dd.extra_entrypoint_args != spec.extra_entrypoint_args:
+ self.log.info(f'Redeploying {dd.name()}, (entrypoint args changed) . . .')
+ self.log.debug(
+ f'{dd.name()} daemon entrypoint args {dd.extra_entrypoint_args} -> {spec.extra_entrypoint_args}')
+ dd.extra_entrypoint_args = spec.extra_entrypoint_args
+ action = 'redeploy'
+ elif self.mgr.last_monmap and \
+ self.mgr.last_monmap > last_config and \
+ dd.daemon_type in CEPH_TYPES:
+ self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
+ action = 'reconfig'
+ elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
+ dd.daemon_type in CEPH_TYPES:
+ self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
+ action = 'reconfig'
+ if action:
+ if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
+ and action == 'reconfig':
+ action = 'redeploy'
+ try:
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
+ self.mgr._daemon_action(daemon_spec, action=action)
+ if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
+ self.mgr.cache.save_host(dd.hostname)
+ except OrchestratorError as e:
+ self.log.exception(e)
+ self.mgr.events.from_orch_error(e)
+ if dd.daemon_type in daemons_post:
+ del daemons_post[dd.daemon_type]
+ # continue...
+ except Exception as e:
+ self.log.exception(e)
+ self.mgr.events.for_daemon_from_exception(dd.name(), e)
+ if dd.daemon_type in daemons_post:
+ del daemons_post[dd.daemon_type]
+ # continue...
+
+ # do daemon post actions
+ for daemon_type, daemon_descs in daemons_post.items():
+ run_post = False
+ for d in daemon_descs:
+ if d.name() in self.mgr.requires_post_actions:
+ self.mgr.requires_post_actions.remove(d.name())
+ run_post = True
+ if run_post:
+ self.mgr._get_cephadm_service(daemon_type_to_service(
+ daemon_type)).daemon_check_post(daemon_descs)
+
+ def _purge_deleted_services(self) -> None:
+ self.log.debug('_purge_deleted_services')
+ existing_services = self.mgr.spec_store.all_specs.items()
+ for service_name, spec in list(existing_services):
+ if service_name not in self.mgr.spec_store.spec_deleted:
+ continue
+ if self.mgr.cache.get_daemons_by_service(service_name):
+ continue
+ if spec.service_type in ['mon', 'mgr']:
+ continue
+
+ logger.info(f'Purge service {service_name}')
+
+ self.mgr.cephadm_services[spec.service_type].purge(service_name)
+ self.mgr.spec_store.finally_rm(service_name)
+
+ def convert_tags_to_repo_digest(self) -> None:
+ if not self.mgr.use_repo_digest:
+ return
+ settings = self.mgr.upgrade.get_distinct_container_image_settings()
+ digests: Dict[str, ContainerInspectInfo] = {}
+ for container_image_ref in set(settings.values()):
+ if not is_repo_digest(container_image_ref):
+ with self.mgr.async_timeout_handler(cmd=f'cephadm inspect-image (image {container_image_ref})'):
+ image_info = self.mgr.wait_async(
+ self._get_container_image_info(container_image_ref))
+ if image_info.repo_digests:
+ # FIXME: we assume the first digest here is the best
+ assert is_repo_digest(image_info.repo_digests[0]), image_info
+ digests[container_image_ref] = image_info
+
+ for entity, container_image_ref in settings.items():
+ if not is_repo_digest(container_image_ref):
+ image_info = digests[container_image_ref]
+ if image_info.repo_digests:
+ # FIXME: we assume the first digest here is the best
+ self.mgr.set_container_image(entity, image_info.repo_digests[0])
+
+ def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
+ # host -> path -> (mode, uid, gid, content, digest)
+ client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
+
+ # ceph.conf
+ config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
+ config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
+ cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config'
+
+ if self.mgr.manage_etc_ceph_ceph_conf:
+ try:
+ pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
+ ha = HostAssignment(
+ spec=ServiceSpec('mon', placement=pspec),
+ hosts=self.mgr.cache.get_conf_keyring_available_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+ draining_hosts=self.mgr.cache.get_conf_keyring_draining_hosts(),
+ daemons=[],
+ networks=self.mgr.cache.networks,
+ )
+ all_slots, _, _ = ha.place()
+ for host in {s.hostname for s in all_slots}:
+ if host not in client_files:
+ client_files[host] = {}
+ ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
+ client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
+ client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
+ except Exception as e:
+ self.mgr.log.warning(
+ f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
+
+ # client keyrings
+ for ks in self.mgr.keys.keys.values():
+ try:
+ ret, keyring, err = self.mgr.mon_command({
+ 'prefix': 'auth get',
+ 'entity': ks.entity,
+ })
+ if ret:
+ self.log.warning(f'unable to fetch keyring for {ks.entity}')
+ continue
+ digest = ''.join('%02x' % c for c in hashlib.sha256(
+ keyring.encode('utf-8')).digest())
+ ha = HostAssignment(
+ spec=ServiceSpec('mon', placement=ks.placement),
+ hosts=self.mgr.cache.get_conf_keyring_available_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+ draining_hosts=self.mgr.cache.get_conf_keyring_draining_hosts(),
+ daemons=[],
+ networks=self.mgr.cache.networks,
+ )
+ all_slots, _, _ = ha.place()
+ for host in {s.hostname for s in all_slots}:
+ if host not in client_files:
+ client_files[host] = {}
+ ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
+ client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
+ client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
+ ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest)
+ client_files[host][ks.path] = ceph_admin_key
+ client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key
+ except Exception as e:
+ self.log.warning(
+ f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
+ return client_files
+
+ def _write_all_client_files(self) -> None:
+ if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
+ client_files = self._calc_client_files()
+ else:
+ client_files = {}
+
+ @forall_hosts
+ def _write_files(host: str) -> None:
+ self._write_client_files(client_files, host)
+
+ _write_files(self.mgr.cache.get_hosts())
+
+ def _write_client_files(self,
+ client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
+ host: str) -> None:
+ updated_files = False
+ if self.mgr.cache.is_host_unreachable(host):
+ return
+ old_files = self.mgr.cache.get_host_client_files(host).copy()
+ for path, m in client_files.get(host, {}).items():
+ mode, uid, gid, content, digest = m
+ if path in old_files:
+ match = old_files[path] == (digest, mode, uid, gid)
+ del old_files[path]
+ if match:
+ continue
+ self.log.info(f'Updating {host}:{path}')
+ self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
+ self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
+ updated_files = True
+ for path in old_files.keys():
+ if path == '/etc/ceph/ceph.conf':
+ continue
+ self.log.info(f'Removing {host}:{path}')
+ cmd = ['rm', '-f', path]
+ self.mgr.ssh.check_execute_command(host, cmd)
+ updated_files = True
+ self.mgr.cache.removed_client_file(host, path)
+ if updated_files:
+ self.mgr.cache.save_host(host)
+
+ async def _create_daemon(self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ reconfig: bool = False,
+ osd_uuid_map: Optional[Dict[str, Any]] = None,
+ ) -> str:
+
+ daemon_params: Dict[str, Any] = {}
+ with set_exception_subject('service', orchestrator.DaemonDescription(
+ daemon_type=daemon_spec.daemon_type,
+ daemon_id=daemon_spec.daemon_id,
+ hostname=daemon_spec.host,
+ ).service_id(), overwrite=True):
+
+ try:
+ image = ''
+ start_time = datetime_now()
+ ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
+ port_ips: Dict[str, str] = daemon_spec.port_ips if daemon_spec.port_ips else {}
+
+ if daemon_spec.daemon_type == 'container':
+ spec = cast(CustomContainerSpec,
+ self.mgr.spec_store[daemon_spec.service_name].spec)
+ image = spec.image
+ if spec.ports:
+ ports.extend(spec.ports)
+
+ # TCP port to open in the host firewall
+ if len(ports) > 0:
+ daemon_params['tcp_ports'] = list(ports)
+
+ if port_ips:
+ daemon_params['port_ips'] = port_ips
+
+ # osd deployments needs an --osd-uuid arg
+ if daemon_spec.daemon_type == 'osd':
+ if not osd_uuid_map:
+ osd_uuid_map = self.mgr.get_osd_uuid_map()
+ osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
+ if not osd_uuid:
+ raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+ daemon_params['osd_fsid'] = osd_uuid
+
+ if reconfig:
+ daemon_params['reconfig'] = True
+ if self.mgr.allow_ptrace:
+ daemon_params['allow_ptrace'] = True
+
+ daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec, daemon_params)
+
+ if daemon_spec.service_name in self.mgr.spec_store:
+ configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs
+ if configs is not None:
+ daemon_spec.final_config.update(
+ {'custom_config_files': [c.to_json() for c in configs]})
+
+ if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
+ await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
+
+ self.log.info('%s daemon %s on %s' % (
+ 'Reconfiguring' if reconfig else 'Deploying',
+ daemon_spec.name(), daemon_spec.host))
+
+ out, err, code = await self._run_cephadm(
+ daemon_spec.host,
+ daemon_spec.name(),
+ ['_orch', 'deploy'],
+ [],
+ stdin=exchange.Deploy(
+ fsid=self.mgr._cluster_fsid,
+ name=daemon_spec.name(),
+ image=image,
+ params=daemon_params,
+ meta=exchange.DeployMeta(
+ service_name=daemon_spec.service_name,
+ ports=daemon_spec.ports,
+ ip=daemon_spec.ip,
+ deployed_by=self.mgr.get_active_mgr_digests(),
+ rank=daemon_spec.rank,
+ rank_generation=daemon_spec.rank_generation,
+ extra_container_args=ArgumentSpec.map_json(
+ extra_container_args,
+ ),
+ extra_entrypoint_args=ArgumentSpec.map_json(
+ extra_entrypoint_args,
+ ),
+ ),
+ config_blobs=daemon_spec.final_config,
+ ).dump_json_str(),
+ )
+
+ if daemon_spec.daemon_type == 'agent':
+ self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
+ self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
+
+ # refresh daemon state? (ceph daemon reconfig does not need it)
+ if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
+ if not code and daemon_spec.host in self.mgr.cache.daemons:
+ # prime cached service state with what we (should have)
+ # just created
+ sd = daemon_spec.to_daemon_description(
+ DaemonDescriptionStatus.starting, 'starting')
+ self.mgr.cache.add_daemon(daemon_spec.host, sd)
+ if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
+ self.mgr.requires_post_actions.add(daemon_spec.name())
+ self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
+
+ if daemon_spec.daemon_type != 'agent':
+ self.mgr.cache.update_daemon_config_deps(
+ daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
+ self.mgr.cache.save_host(daemon_spec.host)
+ else:
+ self.mgr.agent_cache.update_agent_config_deps(
+ daemon_spec.host, daemon_spec.deps, start_time)
+ self.mgr.agent_cache.save_agent(daemon_spec.host)
+ msg = "{} {} on host '{}'".format(
+ 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+ if not code:
+ self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+ else:
+ what = 'reconfigure' if reconfig else 'deploy'
+ self.mgr.events.for_daemon(
+ daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+ return msg
+ except OrchestratorError:
+ redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
+ if not reconfig and not redeploy:
+ # we have to clean up the daemon. E.g. keyrings.
+ servict_type = daemon_type_to_service(daemon_spec.daemon_type)
+ dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
+ self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
+ raise
+
+ def _setup_extra_deployment_args(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ params: Dict[str, Any],
+ ) -> Tuple[CephadmDaemonDeploySpec, Optional[ArgumentList], Optional[ArgumentList]]:
+ # this function is for handling any potential user specified
+ # (in the service spec) extra runtime or entrypoint args for a daemon
+ # we are going to deploy. Effectively just adds a set of extra args to
+ # pass to the cephadm binary to indicate the daemon being deployed
+ # needs extra runtime/entrypoint args. Returns the modified daemon spec
+ # as well as what args were added (as those are included in unit.meta file)
+ def _to_args(lst: ArgumentList) -> List[str]:
+ out: List[str] = []
+ for argspec in lst:
+ out.extend(argspec.to_args())
+ return out
+
+ try:
+ eca = daemon_spec.extra_container_args
+ if eca:
+ params['extra_container_args'] = _to_args(eca)
+ except AttributeError:
+ eca = None
+ try:
+ eea = daemon_spec.extra_entrypoint_args
+ if eea:
+ params['extra_entrypoint_args'] = _to_args(eea)
+ except AttributeError:
+ eea = None
+ return daemon_spec, eca, eea
+
+ def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
+ """
+ Remove a daemon
+ """
+ (daemon_type, daemon_id) = name.split('.', 1)
+ daemon = orchestrator.DaemonDescription(
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ hostname=host)
+
+ with set_exception_subject('service', daemon.service_id(), overwrite=True):
+
+ self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
+ # NOTE: we are passing the 'force' flag here, which means
+ # we can delete a mon instances data.
+ dd = self.mgr.cache.get_daemon(daemon.daemon_name)
+ if dd.ports:
+ args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))]
+ else:
+ args = ['--name', name, '--force']
+
+ self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
+ with self.mgr.async_timeout_handler(host, f'cephadm rm-daemon (daemon {name})'):
+ out, err, code = self.mgr.wait_async(self._run_cephadm(
+ host, name, 'rm-daemon', args))
+ if not code:
+ # remove item from cache
+ self.mgr.cache.rm_daemon(host, name)
+ self.mgr.cache.invalidate_host_daemons(host)
+
+ if not no_post_remove:
+ if daemon_type not in ['iscsi']:
+ self.mgr.cephadm_services[daemon_type_to_service(
+ daemon_type)].post_remove(daemon, is_failed_deploy=False)
+ else:
+ self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service(
+ daemon_type)].post_remove(daemon, is_failed_deploy=False))
+ self.mgr._kick_serve_loop()
+
+ return "Removed {} from host '{}'".format(name, host)
+
+ async def _run_cephadm_json(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
+ image: Optional[str] = "",
+ log_output: Optional[bool] = True,
+ ) -> Any:
+ try:
+ out, err, code = await self._run_cephadm(
+ host, entity, command, args, no_fsid=no_fsid, error_ok=error_ok,
+ image=image, log_output=log_output)
+ if code:
+ raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
+ except Exception as e:
+ raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
+ try:
+ return json.loads(''.join(out))
+ except (ValueError, KeyError):
+ msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
+ self.log.exception(f'{msg}: {"".join(out)}')
+ raise OrchestratorError(msg)
+
+ async def _run_cephadm(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: Union[str, List[str]],
+ args: List[str],
+ addr: Optional[str] = "",
+ stdin: Optional[str] = "",
+ no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
+ image: Optional[str] = "",
+ env_vars: Optional[List[str]] = None,
+ log_output: Optional[bool] = True,
+ timeout: Optional[int] = None, # timeout in seconds
+ ) -> Tuple[List[str], List[str], int]:
+ """
+ Run cephadm on the remote host with the given command + args
+
+ Important: You probably don't want to run _run_cephadm from CLI handlers
+
+ :env_vars: in format -> [KEY=VALUE, ..]
+ """
+
+ await self.mgr.ssh._remote_connection(host, addr)
+
+ self.log.debug(f"_run_cephadm : command = {command}")
+ self.log.debug(f"_run_cephadm : args = {args}")
+
+ bypass_image = ('agent')
+
+ assert image or entity
+ # Skip the image check for daemons deployed that are not ceph containers
+ if not str(entity).startswith(bypass_image):
+ if not image and entity is not cephadmNoImage:
+ image = self.mgr._get_container_image(entity)
+
+ final_args = []
+
+ # global args
+ if env_vars:
+ for env_var_pair in env_vars:
+ final_args.extend(['--env', env_var_pair])
+
+ if image:
+ final_args.extend(['--image', image])
+
+ if not self.mgr.container_init:
+ final_args += ['--no-container-init']
+
+ if not self.mgr.cgroups_split:
+ final_args += ['--no-cgroups-split']
+
+ if not timeout:
+ # default global timeout if no timeout was passed
+ timeout = self.mgr.default_cephadm_command_timeout
+ # put a lower bound of 60 seconds in case users
+ # accidentally set it to something unreasonable.
+ # For example if they though it was in minutes
+ # rather than seconds
+ if timeout < 60:
+ self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
+ timeout = 60
+ # subtract a small amount to give this timeout
+ # in the binary a chance to actually happen over
+ # the asyncio based timeout in the mgr module
+ timeout -= 5
+ final_args += ['--timeout', str(timeout)]
+
+ # subcommand
+ if isinstance(command, list):
+ final_args.extend([str(v) for v in command])
+ else:
+ final_args.append(command)
+
+ # subcommand args
+ if not no_fsid:
+ final_args += ['--fsid', self.mgr._cluster_fsid]
+
+ final_args += args
+
+ # exec
+ self.log.debug('args: %s' % (' '.join(final_args)))
+ if self.mgr.mode == 'root':
+ # agent has cephadm binary as an extra file which is
+ # therefore passed over stdin. Even for debug logs it's too much
+ if stdin and 'agent' not in str(entity):
+ self.log.debug('stdin: %s' % stdin)
+
+ cmd = ['which', 'python3']
+ python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
+ cmd = [python, self.mgr.cephadm_binary_path] + final_args
+
+ try:
+ out, err, code = await self.mgr.ssh._execute_command(
+ host, cmd, stdin=stdin, addr=addr)
+ if code == 2:
+ ls_cmd = ['ls', self.mgr.cephadm_binary_path]
+ out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr,
+ log_command=log_output)
+ if code_ls == 2:
+ await self._deploy_cephadm_binary(host, addr)
+ out, err, code = await self.mgr.ssh._execute_command(
+ host, cmd, stdin=stdin, addr=addr)
+ # if there is an agent on this host, make sure it is using the most recent
+ # version of cephadm binary
+ if host in self.mgr.inventory:
+ for agent in self.mgr.cache.get_daemons_by_type('agent', host):
+ self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+
+ except Exception as e:
+ await self.mgr.ssh._reset_con(host)
+ if error_ok:
+ return [], [str(e)], 1
+ raise
+
+ elif self.mgr.mode == 'cephadm-package':
+ try:
+ cmd = ['/usr/bin/cephadm'] + final_args
+ out, err, code = await self.mgr.ssh._execute_command(
+ host, cmd, stdin=stdin, addr=addr)
+ except Exception as e:
+ await self.mgr.ssh._reset_con(host)
+ if error_ok:
+ return [], [str(e)], 1
+ raise
+ else:
+ assert False, 'unsupported mode'
+
+ if log_output:
+ self.log.debug(f'code: {code}')
+ if out:
+ self.log.debug(f'out: {out}')
+ if err:
+ self.log.debug(f'err: {err}')
+ if code and not error_ok:
+ raise OrchestratorError(
+ f'cephadm exited with an error code: {code}, stderr: {err}')
+ return [out], [err], code
+
+ async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
+ # pick a random host...
+ host = None
+ for host_name in self.mgr.inventory.keys():
+ host = host_name
+ break
+ if not host:
+ raise OrchestratorError('no hosts defined')
+ if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
+ await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
+
+ j = None
+ try:
+ j = await self._run_cephadm_json(host, '', 'inspect-image', [],
+ image=image_name, no_fsid=True,
+ error_ok=True)
+ except OrchestratorError:
+ pass
+
+ if not j:
+ pullargs: List[str] = []
+ if self.mgr.registry_insecure:
+ pullargs.append("--insecure")
+
+ j = await self._run_cephadm_json(host, '', 'pull', pullargs,
+ image=image_name, no_fsid=True)
+ r = ContainerInspectInfo(
+ j['image_id'],
+ j.get('ceph_version'),
+ j.get('repo_digests')
+ )
+ self.log.debug(f'image {image_name} -> {r}')
+ return r
+
+ # function responsible for logging single host into custom registry
+ async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
+ self.log.debug(
+ f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
+ # want to pass info over stdin rather than through normal list of args
+ out, err, code = await self._run_cephadm(
+ host, 'mon', 'registry-login',
+ ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
+ if code:
+ return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
+ return None
+
+ async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
+ # Use tee (from coreutils) to create a copy of cephadm on the target machine
+ self.log.info(f"Deploying cephadm binary to {host}")
+ await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
+ self.mgr._cephadm, addr=addr)