diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/cephadm/inventory.py | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/cephadm/inventory.py')
-rw-r--r-- | src/pybind/mgr/cephadm/inventory.py | 1019 |
1 files changed, 1019 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py new file mode 100644 index 000000000..92e10ea39 --- /dev/null +++ b/src/pybind/mgr/cephadm/inventory.py @@ -0,0 +1,1019 @@ +import datetime +from copy import copy +import ipaddress +import json +import logging +import socket +from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ + NamedTuple, Type + +import orchestrator +from ceph.deployment import inventory +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec +from ceph.utils import str_to_datetime, datetime_to_str, datetime_now +from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types + +from .utils import resolve_ip +from .migrations import queue_migrate_nfs_spec + +if TYPE_CHECKING: + from .module import CephadmOrchestrator + + +logger = logging.getLogger(__name__) + +HOST_CACHE_PREFIX = "host." +SPEC_STORE_PREFIX = "spec." + + +class Inventory: + """ + The inventory stores a HostSpec for all hosts persistently. + """ + + def __init__(self, mgr: 'CephadmOrchestrator'): + self.mgr = mgr + adjusted_addrs = False + + def is_valid_ip(ip: str) -> bool: + try: + ipaddress.ip_address(ip) + return True + except ValueError: + return False + + # load inventory + i = self.mgr.get_store('inventory') + if i: + self._inventory: Dict[str, dict] = json.loads(i) + # handle old clusters missing 'hostname' key from hostspec + for k, v in self._inventory.items(): + if 'hostname' not in v: + v['hostname'] = k + + # convert legacy non-IP addr? + if is_valid_ip(str(v.get('addr'))): + continue + if len(self._inventory) > 1: + if k == socket.gethostname(): + # Never try to resolve our own host! This is + # fraught and can lead to either a loopback + # address (due to podman's futzing with + # /etc/hosts) or a private IP based on the CNI + # configuration. Instead, wait until the mgr + # fails over to another host and let them resolve + # this host. + continue + ip = resolve_ip(cast(str, v.get('addr'))) + else: + # we only have 1 node in the cluster, so we can't + # rely on another host doing the lookup. use the + # IP the mgr binds to. + ip = self.mgr.get_mgr_ip() + if is_valid_ip(ip) and not ip.startswith('127.0.'): + self.mgr.log.info( + f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'" + ) + v['addr'] = ip + adjusted_addrs = True + if adjusted_addrs: + self.save() + else: + self._inventory = dict() + logger.debug('Loaded inventory %s' % self._inventory) + + def keys(self) -> List[str]: + return list(self._inventory.keys()) + + def __contains__(self, host: str) -> bool: + return host in self._inventory + + def assert_host(self, host: str) -> None: + if host not in self._inventory: + raise OrchestratorError('host %s does not exist' % host) + + def add_host(self, spec: HostSpec) -> None: + if spec.hostname in self._inventory: + # addr + if self.get_addr(spec.hostname) != spec.addr: + self.set_addr(spec.hostname, spec.addr) + # labels + for label in spec.labels: + self.add_label(spec.hostname, label) + else: + self._inventory[spec.hostname] = spec.to_json() + self.save() + + def rm_host(self, host: str) -> None: + self.assert_host(host) + del self._inventory[host] + self.save() + + def set_addr(self, host: str, addr: str) -> None: + self.assert_host(host) + self._inventory[host]['addr'] = addr + self.save() + + def add_label(self, host: str, label: str) -> None: + self.assert_host(host) + + if 'labels' not in self._inventory[host]: + self._inventory[host]['labels'] = list() + if label not in self._inventory[host]['labels']: + self._inventory[host]['labels'].append(label) + self.save() + + def rm_label(self, host: str, label: str) -> None: + self.assert_host(host) + + if 'labels' not in self._inventory[host]: + self._inventory[host]['labels'] = list() + if label in self._inventory[host]['labels']: + self._inventory[host]['labels'].remove(label) + self.save() + + def has_label(self, host: str, label: str) -> bool: + return ( + host in self._inventory + and label in self._inventory[host].get('labels', []) + ) + + def get_addr(self, host: str) -> str: + self.assert_host(host) + return self._inventory[host].get('addr', host) + + def spec_from_dict(self, info: dict) -> HostSpec: + hostname = info['hostname'] + return HostSpec( + hostname, + addr=info.get('addr', hostname), + labels=info.get('labels', []), + status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), + ) + + def all_specs(self) -> List[HostSpec]: + return list(map(self.spec_from_dict, self._inventory.values())) + + def get_host_with_state(self, state: str = "") -> List[str]: + """return a list of host names in a specific state""" + return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state] + + def save(self) -> None: + self.mgr.set_store('inventory', json.dumps(self._inventory)) + + +class SpecDescription(NamedTuple): + spec: ServiceSpec + rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] + created: datetime.datetime + deleted: Optional[datetime.datetime] + + +class SpecStore(): + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr = mgr + self._specs = {} # type: Dict[str, ServiceSpec] + # service_name -> rank -> gen -> daemon_id + self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]] + self.spec_created = {} # type: Dict[str, datetime.datetime] + self.spec_deleted = {} # type: Dict[str, datetime.datetime] + self.spec_preview = {} # type: Dict[str, ServiceSpec] + + @property + def all_specs(self) -> Mapping[str, ServiceSpec]: + """ + returns active and deleted specs. Returns read-only dict. + """ + return self._specs + + def __contains__(self, name: str) -> bool: + return name in self._specs + + def __getitem__(self, name: str) -> SpecDescription: + if name not in self._specs: + raise OrchestratorError(f'Service {name} not found.') + return SpecDescription(self._specs[name], + self._rank_maps.get(name), + self.spec_created[name], + self.spec_deleted.get(name, None)) + + @property + def active_specs(self) -> Mapping[str, ServiceSpec]: + return {k: v for k, v in self._specs.items() if k not in self.spec_deleted} + + def load(self): + # type: () -> None + for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items(): + service_name = k[len(SPEC_STORE_PREFIX):] + try: + j = cast(Dict[str, dict], json.loads(v)) + if ( + (self.mgr.migration_current or 0) < 3 + and j['spec'].get('service_type') == 'nfs' + ): + self.mgr.log.debug(f'found legacy nfs spec {j}') + queue_migrate_nfs_spec(self.mgr, j) + spec = ServiceSpec.from_json(j['spec']) + created = str_to_datetime(cast(str, j['created'])) + self._specs[service_name] = spec + self.spec_created[service_name] = created + + if 'deleted' in j: + deleted = str_to_datetime(cast(str, j['deleted'])) + self.spec_deleted[service_name] = deleted + + if 'rank_map' in j and isinstance(j['rank_map'], dict): + self._rank_maps[service_name] = {} + for rank_str, m in j['rank_map'].items(): + try: + rank = int(rank_str) + except ValueError: + logger.exception(f"failed to parse rank in {j['rank_map']}") + continue + if isinstance(m, dict): + self._rank_maps[service_name][rank] = {} + for gen_str, name in m.items(): + try: + gen = int(gen_str) + except ValueError: + logger.exception(f"failed to parse gen in {j['rank_map']}") + continue + if isinstance(name, str) or m is None: + self._rank_maps[service_name][rank][gen] = name + + self.mgr.log.debug('SpecStore: loaded spec for %s' % ( + service_name)) + except Exception as e: + self.mgr.log.warning('unable to load spec for %s: %s' % ( + service_name, e)) + pass + + def save( + self, + spec: ServiceSpec, + update_create: bool = True, + ) -> None: + name = spec.service_name() + if spec.preview_only: + self.spec_preview[name] = spec + return None + self._specs[name] = spec + + if update_create: + self.spec_created[name] = datetime_now() + self._save(name) + + def save_rank_map(self, + name: str, + rank_map: Dict[int, Dict[int, Optional[str]]]) -> None: + self._rank_maps[name] = rank_map + self._save(name) + + def _save(self, name: str) -> None: + data: Dict[str, Any] = { + 'spec': self._specs[name].to_json(), + 'created': datetime_to_str(self.spec_created[name]), + } + if name in self._rank_maps: + data['rank_map'] = self._rank_maps[name] + if name in self.spec_deleted: + data['deleted'] = datetime_to_str(self.spec_deleted[name]) + + self.mgr.set_store( + SPEC_STORE_PREFIX + name, + json.dumps(data, sort_keys=True), + ) + self.mgr.events.for_service(self._specs[name], + OrchestratorEvent.INFO, + 'service was created') + + def rm(self, service_name: str) -> bool: + if service_name not in self._specs: + return False + + if self._specs[service_name].preview_only: + self.finally_rm(service_name) + return True + + self.spec_deleted[service_name] = datetime_now() + self.save(self._specs[service_name], update_create=False) + return True + + def finally_rm(self, service_name): + # type: (str) -> bool + found = service_name in self._specs + if found: + del self._specs[service_name] + if service_name in self._rank_maps: + del self._rank_maps[service_name] + del self.spec_created[service_name] + if service_name in self.spec_deleted: + del self.spec_deleted[service_name] + self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) + return found + + def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]: + return self.spec_created.get(spec.service_name()) + + +class ClientKeyringSpec(object): + """ + A client keyring file that we should maintain + """ + + def __init__( + self, + entity: str, + placement: PlacementSpec, + mode: Optional[int] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + ) -> None: + self.entity = entity + self.placement = placement + self.mode = mode or 0o600 + self.uid = uid or 0 + self.gid = gid or 0 + + def validate(self) -> None: + pass + + def to_json(self) -> Dict[str, Any]: + return { + 'entity': self.entity, + 'placement': self.placement.to_json(), + 'mode': self.mode, + 'uid': self.uid, + 'gid': self.gid, + } + + @property + def path(self) -> str: + return f'/etc/ceph/ceph.{self.entity}.keyring' + + @classmethod + def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec': + c = data.copy() + if 'placement' in c: + c['placement'] = PlacementSpec.from_json(c['placement']) + _cls = cls(**c) + _cls.validate() + return _cls + + +class ClientKeyringStore(): + """ + Track client keyring files that we are supposed to maintain + """ + + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr: CephadmOrchestrator = mgr + self.mgr = mgr + self.keys: Dict[str, ClientKeyringSpec] = {} + + def load(self) -> None: + c = self.mgr.get_store('client_keyrings') or b'{}' + j = json.loads(c) + for e, d in j.items(): + self.keys[e] = ClientKeyringSpec.from_json(d) + + def save(self) -> None: + data = { + k: v.to_json() for k, v in self.keys.items() + } + self.mgr.set_store('client_keyrings', json.dumps(data)) + + def update(self, ks: ClientKeyringSpec) -> None: + self.keys[ks.entity] = ks + self.save() + + def rm(self, entity: str) -> None: + if entity in self.keys: + del self.keys[entity] + self.save() + + +class HostCache(): + """ + HostCache stores different things: + + 1. `daemons`: Deployed daemons O(daemons) + + They're part of the configuration nowadays and need to be + persistent. The name "daemon cache" is unfortunately a bit misleading. + Like for example we really need to know where daemons are deployed on + hosts that are offline. + + 2. `devices`: ceph-volume inventory cache O(hosts) + + As soon as this is populated, it becomes more or less read-only. + + 3. `networks`: network interfaces for each host. O(hosts) + + This is needed in order to deploy MONs. As this is mostly read-only. + + 4. `last_client_files` O(hosts) + + Stores the last digest and owner/mode for files we've pushed to /etc/ceph + (ceph.conf or client keyrings). + + 5. `scheduled_daemon_actions`: O(daemons) + + Used to run daemon actions after deploying a daemon. We need to + store it persistently, in order to stay consistent across + MGR failovers. + """ + + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr: CephadmOrchestrator = mgr + self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] + self.last_daemon_update = {} # type: Dict[str, datetime.datetime] + self.devices = {} # type: Dict[str, List[inventory.Device]] + self.facts = {} # type: Dict[str, Dict[str, Any]] + self.last_facts_update = {} # type: Dict[str, datetime.datetime] + self.last_autotune = {} # type: Dict[str, datetime.datetime] + self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] + self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]] + self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]] + self.last_device_update = {} # type: Dict[str, datetime.datetime] + self.last_device_change = {} # type: Dict[str, datetime.datetime] + self.daemon_refresh_queue = [] # type: List[str] + self.device_refresh_queue = [] # type: List[str] + self.osdspec_previews_refresh_queue = [] # type: List[str] + + # host -> daemon name -> dict + self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] + self.last_host_check = {} # type: Dict[str, datetime.datetime] + self.loading_osdspec_preview = set() # type: Set[str] + self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {} + self.registry_login_queue: Set[str] = set() + + self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} + + def load(self): + # type: () -> None + for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items(): + host = k[len(HOST_CACHE_PREFIX):] + if host not in self.mgr.inventory: + self.mgr.log.warning('removing stray HostCache host record %s' % ( + host)) + self.mgr.set_store(k, None) + try: + j = json.loads(v) + if 'last_device_update' in j: + self.last_device_update[host] = str_to_datetime(j['last_device_update']) + else: + self.device_refresh_queue.append(host) + if 'last_device_change' in j: + self.last_device_change[host] = str_to_datetime(j['last_device_change']) + # for services, we ignore the persisted last_*_update + # and always trigger a new scrape on mgr restart. + self.daemon_refresh_queue.append(host) + self.daemons[host] = {} + self.osdspec_previews[host] = [] + self.osdspec_last_applied[host] = {} + self.devices[host] = [] + self.networks[host] = {} + self.daemon_config_deps[host] = {} + for name, d in j.get('daemons', {}).items(): + self.daemons[host][name] = \ + orchestrator.DaemonDescription.from_json(d) + for d in j.get('devices', []): + self.devices[host].append(inventory.Device.from_json(d)) + self.networks[host] = j.get('networks_and_interfaces', {}) + self.osdspec_previews[host] = j.get('osdspec_previews', {}) + self.last_client_files[host] = j.get('last_client_files', {}) + for name, ts in j.get('osdspec_last_applied', {}).items(): + self.osdspec_last_applied[host][name] = str_to_datetime(ts) + + for name, d in j.get('daemon_config_deps', {}).items(): + self.daemon_config_deps[host][name] = { + 'deps': d.get('deps', []), + 'last_config': str_to_datetime(d['last_config']), + } + if 'last_host_check' in j: + self.last_host_check[host] = str_to_datetime(j['last_host_check']) + self.registry_login_queue.add(host) + self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) + + self.mgr.log.debug( + 'HostCache.load: host %s has %d daemons, ' + '%d devices, %d networks' % ( + host, len(self.daemons[host]), len(self.devices[host]), + len(self.networks[host]))) + except Exception as e: + self.mgr.log.warning('unable to load cached state for %s: %s' % ( + host, e)) + pass + + def update_host_daemons(self, host, dm): + # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None + self.daemons[host] = dm + self.last_daemon_update[host] = datetime_now() + + def update_host_facts(self, host, facts): + # type: (str, Dict[str, Dict[str, Any]]) -> None + self.facts[host] = facts + self.last_facts_update[host] = datetime_now() + + def update_autotune(self, host: str) -> None: + self.last_autotune[host] = datetime_now() + + def invalidate_autotune(self, host: str) -> None: + if host in self.last_autotune: + del self.last_autotune[host] + + def devices_changed(self, host: str, b: List[inventory.Device]) -> bool: + a = self.devices[host] + if len(a) != len(b): + return True + aj = {d.path: d.to_json() for d in a} + bj = {d.path: d.to_json() for d in b} + if aj != bj: + self.mgr.log.info("Detected new or changed devices on %s" % host) + return True + return False + + def update_host_devices_networks( + self, + host: str, + dls: List[inventory.Device], + nets: Dict[str, Dict[str, List[str]]] + ) -> None: + if ( + host not in self.devices + or host not in self.last_device_change + or self.devices_changed(host, dls) + ): + self.last_device_change[host] = datetime_now() + self.last_device_update[host] = datetime_now() + self.devices[host] = dls + self.networks[host] = nets + + def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: + self.daemon_config_deps[host][name] = { + 'deps': deps, + 'last_config': stamp, + } + + def update_last_host_check(self, host): + # type: (str) -> None + self.last_host_check[host] = datetime_now() + + def update_osdspec_last_applied(self, host, service_name, ts): + # type: (str, str, datetime.datetime) -> None + self.osdspec_last_applied[host][service_name] = ts + + def update_client_file(self, + host: str, + path: str, + digest: str, + mode: int, + uid: int, + gid: int) -> None: + if host not in self.last_client_files: + self.last_client_files[host] = {} + self.last_client_files[host][path] = (digest, mode, uid, gid) + + def removed_client_file(self, host: str, path: str) -> None: + if ( + host in self.last_client_files + and path in self.last_client_files[host] + ): + del self.last_client_files[host][path] + + def prime_empty_host(self, host): + # type: (str) -> None + """ + Install an empty entry for a host + """ + self.daemons[host] = {} + self.devices[host] = [] + self.networks[host] = {} + self.osdspec_previews[host] = [] + self.osdspec_last_applied[host] = {} + self.daemon_config_deps[host] = {} + self.daemon_refresh_queue.append(host) + self.device_refresh_queue.append(host) + self.osdspec_previews_refresh_queue.append(host) + self.registry_login_queue.add(host) + self.last_client_files[host] = {} + + def refresh_all_host_info(self, host): + # type: (str) -> None + + self.last_host_check.pop(host, None) + self.daemon_refresh_queue.append(host) + self.registry_login_queue.add(host) + self.device_refresh_queue.append(host) + self.last_facts_update.pop(host, None) + self.osdspec_previews_refresh_queue.append(host) + self.last_autotune.pop(host, None) + + def invalidate_host_daemons(self, host): + # type: (str) -> None + self.daemon_refresh_queue.append(host) + if host in self.last_daemon_update: + del self.last_daemon_update[host] + self.mgr.event.set() + + def invalidate_host_devices(self, host): + # type: (str) -> None + self.device_refresh_queue.append(host) + if host in self.last_device_update: + del self.last_device_update[host] + self.mgr.event.set() + + def distribute_new_registry_login_info(self) -> None: + self.registry_login_queue = set(self.mgr.inventory.keys()) + + def save_host(self, host: str) -> None: + j: Dict[str, Any] = { + 'daemons': {}, + 'devices': [], + 'osdspec_previews': [], + 'osdspec_last_applied': {}, + 'daemon_config_deps': {}, + } + if host in self.last_daemon_update: + j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) + if host in self.last_device_update: + j['last_device_update'] = datetime_to_str(self.last_device_update[host]) + if host in self.last_device_change: + j['last_device_change'] = datetime_to_str(self.last_device_change[host]) + if host in self.daemons: + for name, dd in self.daemons[host].items(): + j['daemons'][name] = dd.to_json() + if host in self.devices: + for d in self.devices[host]: + j['devices'].append(d.to_json()) + if host in self.networks: + j['networks_and_interfaces'] = self.networks[host] + if host in self.daemon_config_deps: + for name, depi in self.daemon_config_deps[host].items(): + j['daemon_config_deps'][name] = { + 'deps': depi.get('deps', []), + 'last_config': datetime_to_str(depi['last_config']), + } + if host in self.osdspec_previews and self.osdspec_previews[host]: + j['osdspec_previews'] = self.osdspec_previews[host] + if host in self.osdspec_last_applied: + for name, ts in self.osdspec_last_applied[host].items(): + j['osdspec_last_applied'][name] = datetime_to_str(ts) + + if host in self.last_host_check: + j['last_host_check'] = datetime_to_str(self.last_host_check[host]) + + if host in self.last_client_files: + j['last_client_files'] = self.last_client_files[host] + if host in self.scheduled_daemon_actions: + j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] + + self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) + + def rm_host(self, host): + # type: (str) -> None + if host in self.daemons: + del self.daemons[host] + if host in self.devices: + del self.devices[host] + if host in self.facts: + del self.facts[host] + if host in self.last_facts_update: + del self.last_facts_update[host] + if host in self.last_autotune: + del self.last_autotune[host] + if host in self.osdspec_previews: + del self.osdspec_previews[host] + if host in self.osdspec_last_applied: + del self.osdspec_last_applied[host] + if host in self.loading_osdspec_preview: + self.loading_osdspec_preview.remove(host) + if host in self.networks: + del self.networks[host] + if host in self.last_daemon_update: + del self.last_daemon_update[host] + if host in self.last_device_update: + del self.last_device_update[host] + if host in self.last_device_change: + del self.last_device_change[host] + if host in self.daemon_config_deps: + del self.daemon_config_deps[host] + if host in self.scheduled_daemon_actions: + del self.scheduled_daemon_actions[host] + if host in self.last_client_files: + del self.last_client_files[host] + self.mgr.set_store(HOST_CACHE_PREFIX + host, None) + + def get_hosts(self): + # type: () -> List[str] + return list(self.daemons) + + def get_facts(self, host: str) -> Dict[str, Any]: + return self.facts.get(host, {}) + + def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]: + for dm in self.daemons.copy().values(): + yield from dm.values() + + def get_daemons(self): + # type: () -> List[orchestrator.DaemonDescription] + return list(self._get_daemons()) + + def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]: + return list(self.daemons.get(host, {}).values()) + + def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription: + assert not daemon_name.startswith('ha-rgw.') + dds = self.get_daemons_by_host(host) if host else self._get_daemons() + for dd in dds: + if dd.name() == daemon_name: + return dd + + raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') + + def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool: + try: + self.get_daemon(daemon_name, host) + except orchestrator.OrchestratorError: + return False + return True + + def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: + def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: + dd = copy(dd_orig) + if host in self.mgr.offline_hosts: + dd.status = orchestrator.DaemonDescriptionStatus.error + dd.status_desc = 'host is offline' + elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": + # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses + # could be wrong. We must assume maintenance is working and daemons are stopped + dd.status = orchestrator.DaemonDescriptionStatus.stopped + dd.events = self.mgr.events.get_for_daemon(dd.name()) + return dd + + for host, dm in self.daemons.copy().items(): + yield host, {name: alter(host, d) for name, d in dm.items()} + + def get_daemons_by_service(self, service_name): + # type: (str) -> List[orchestrator.DaemonDescription] + assert not service_name.startswith('keepalived.') + assert not service_name.startswith('haproxy.') + + return list(dd for dd in self._get_daemons() if dd.service_name() == service_name) + + def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]: + assert service_type not in ['keepalived', 'haproxy'] + + daemons = self.daemons[host].values() if host else self._get_daemons() + + return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)] + + def get_daemon_types(self, hostname: str) -> Set[str]: + """Provide a list of the types of daemons on the host""" + return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()}) + + def get_daemon_names(self): + # type: () -> List[str] + return [d.name() for d in self._get_daemons()] + + def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: + if host in self.daemon_config_deps: + if name in self.daemon_config_deps[host]: + return self.daemon_config_deps[host][name].get('deps', []), \ + self.daemon_config_deps[host][name].get('last_config', None) + return None, None + + def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]: + return self.last_client_files.get(host, {}) + + def host_needs_daemon_refresh(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') + return False + if host in self.daemon_refresh_queue: + self.daemon_refresh_queue.remove(host) + return True + cutoff = datetime_now() - datetime.timedelta( + seconds=self.mgr.daemon_cache_timeout) + if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: + return True + return False + + def host_needs_facts_refresh(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') + return False + cutoff = datetime_now() - datetime.timedelta( + seconds=self.mgr.facts_cache_timeout) + if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: + return True + return False + + def host_needs_autotune_memory(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping autotune') + return False + cutoff = datetime_now() - datetime.timedelta( + seconds=self.mgr.autotune_interval) + if host not in self.last_autotune or self.last_autotune[host] < cutoff: + return True + return False + + def host_had_daemon_refresh(self, host: str) -> bool: + """ + ... at least once. + """ + if host in self.last_daemon_update: + return True + if host not in self.daemons: + return False + return bool(self.daemons[host]) + + def host_needs_device_refresh(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') + return False + if host in self.device_refresh_queue: + self.device_refresh_queue.remove(host) + return True + cutoff = datetime_now() - datetime.timedelta( + seconds=self.mgr.device_cache_timeout) + if host not in self.last_device_update or self.last_device_update[host] < cutoff: + return True + return False + + def host_needs_osdspec_preview_refresh(self, host: str) -> bool: + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') + return False + if host in self.osdspec_previews_refresh_queue: + self.osdspec_previews_refresh_queue.remove(host) + return True + # Since this is dependent on other factors (device and spec) this does not need + # to be updated periodically. + return False + + def host_needs_check(self, host): + # type: (str) -> bool + cutoff = datetime_now() - datetime.timedelta( + seconds=self.mgr.host_check_interval) + return host not in self.last_host_check or self.last_host_check[host] < cutoff + + def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool: + if ( + host not in self.devices + or host not in self.last_device_change + or host not in self.last_device_update + or host not in self.osdspec_last_applied + or spec.service_name() not in self.osdspec_last_applied[host] + ): + return True + created = self.mgr.spec_store.get_created(spec) + if not created or created > self.last_device_change[host]: + return True + return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host] + + def host_needs_registry_login(self, host: str) -> bool: + if host in self.mgr.offline_hosts: + return False + if host in self.registry_login_queue: + self.registry_login_queue.remove(host) + return True + return False + + def add_daemon(self, host, dd): + # type: (str, orchestrator.DaemonDescription) -> None + assert host in self.daemons + self.daemons[host][dd.name()] = dd + + def rm_daemon(self, host: str, name: str) -> None: + assert not name.startswith('ha-rgw.') + + if host in self.daemons: + if name in self.daemons[host]: + del self.daemons[host][name] + + def daemon_cache_filled(self) -> bool: + """ + i.e. we have checked the daemons for each hosts at least once. + excluding offline hosts. + + We're not checking for `host_needs_daemon_refresh`, as this might never be + False for all hosts. + """ + return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) + for h in self.get_hosts()) + + def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: + assert not daemon_name.startswith('ha-rgw.') + + priorities = { + 'start': 1, + 'restart': 2, + 'reconfig': 3, + 'redeploy': 4, + 'stop': 5, + } + existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None) + if existing_action and priorities[existing_action] > priorities[action]: + logger.debug( + f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.') + return + + if host not in self.scheduled_daemon_actions: + self.scheduled_daemon_actions[host] = {} + self.scheduled_daemon_actions[host][daemon_name] = action + + def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool: + found = False + if host in self.scheduled_daemon_actions: + if daemon_name in self.scheduled_daemon_actions[host]: + del self.scheduled_daemon_actions[host][daemon_name] + found = True + if not self.scheduled_daemon_actions[host]: + del self.scheduled_daemon_actions[host] + return found + + def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: + assert not daemon.startswith('ha-rgw.') + + return self.scheduled_daemon_actions.get(host, {}).get(daemon) + + +class EventStore(): + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr: CephadmOrchestrator = mgr + self.events = {} # type: Dict[str, List[OrchestratorEvent]] + + def add(self, event: OrchestratorEvent) -> None: + + if event.kind_subject() not in self.events: + self.events[event.kind_subject()] = [event] + + for e in self.events[event.kind_subject()]: + if e.message == event.message: + return + + self.events[event.kind_subject()].append(event) + + # limit to five events for now. + self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] + + def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: + e = OrchestratorEvent(datetime_now(), 'service', + spec.service_name(), level, message) + self.add(e) + + def from_orch_error(self, e: OrchestratorError) -> None: + if e.event_subject is not None: + self.add(OrchestratorEvent( + datetime_now(), + e.event_subject[0], + e.event_subject[1], + "ERROR", + str(e) + )) + + def for_daemon(self, daemon_name: str, level: str, message: str) -> None: + e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message) + self.add(e) + + def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: + self.for_daemon( + daemon_name, + "ERROR", + str(e) + ) + + def cleanup(self) -> None: + # Needs to be properly done, in case events are persistently stored. + + unknowns: List[str] = [] + daemons = self.mgr.cache.get_daemon_names() + specs = self.mgr.spec_store.all_specs.keys() + for k_s, v in self.events.items(): + kind, subject = k_s.split(':') + if kind == 'service': + if subject not in specs: + unknowns.append(k_s) + elif kind == 'daemon': + if subject not in daemons: + unknowns.append(k_s) + + for k_s in unknowns: + del self.events[k_s] + + def get_for_service(self, name: str) -> List[OrchestratorEvent]: + return self.events.get('service:' + name, []) + + def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: + return self.events.get('daemon:' + name, []) |