From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/orchestrator/_interface.py | 1664 +++++++++++++++++++++++++++++ 1 file changed, 1664 insertions(+) create mode 100644 src/pybind/mgr/orchestrator/_interface.py (limited to 'src/pybind/mgr/orchestrator/_interface.py') diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py new file mode 100644 index 000000000..e9a6c3f07 --- /dev/null +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -0,0 +1,1664 @@ + +""" +ceph-mgr orchestrator interface + +Please see the ceph-mgr module developer's guide for more information. +""" + +import copy +import datetime +import enum +import errno +import logging +import pickle +import re + +from collections import namedtuple, OrderedDict +from contextlib import contextmanager +from functools import wraps, reduce, update_wrapper + +from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \ + Sequence, Dict, cast, Mapping + +try: + from typing import Protocol # Protocol was added in Python 3.8 +except ImportError: + class Protocol: # type: ignore + pass + + +import yaml + +from ceph.deployment import inventory +from ceph.deployment.service_spec import ( + ArgumentList, + ArgumentSpec, + GeneralArgList, + IngressSpec, + IscsiServiceSpec, + MDSSpec, + NFSServiceSpec, + RGWSpec, + SNMPGatewaySpec, + ServiceSpec, + TunedProfileSpec, + NvmeofServiceSpec +) +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.hostspec import HostSpec, SpecValidationError +from ceph.utils import datetime_to_str, str_to_datetime + +from mgr_module import MgrModule, CLICommand, HandleCommandResult + + +logger = logging.getLogger(__name__) + +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable[..., Any]) + + +class OrchestratorError(Exception): + """ + General orchestrator specific error. + + Used for deployment, configuration or user errors. + + It's not intended for programming errors or orchestrator internal errors. + """ + + def __init__(self, + msg: str, + errno: int = -errno.EINVAL, + event_kind_subject: Optional[Tuple[str, str]] = None) -> None: + super(Exception, self).__init__(msg) + self.errno = errno + # See OrchestratorEvent.subject + self.event_subject = event_kind_subject + + +class NoOrchestrator(OrchestratorError): + """ + No orchestrator in configured. + """ + + def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None: + super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT) + + +class OrchestratorValidationError(OrchestratorError): + """ + Raised when an orchestrator doesn't support a specific feature. + """ + + +@contextmanager +def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]: + try: + yield + except OrchestratorError as e: + if overwrite or hasattr(e, 'event_subject'): + e.event_subject = (kind, subject) + raise + + +def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return func(*args, **kwargs) + except (OrchestratorError, SpecValidationError) as e: + # Do not print Traceback for expected errors. + return HandleCommandResult(e.errno, stderr=str(e)) + except ImportError as e: + return HandleCommandResult(-errno.ENOENT, stderr=str(e)) + except NotImplementedError: + msg = 'This Orchestrator does not support `{}`'.format(prefix) + return HandleCommandResult(-errno.ENOENT, stderr=msg) + + # misuse lambda to copy `wrapper` + wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731 + wrapper_copy._prefix = prefix # type: ignore + wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore + wrapper_copy._cli_command.store_func_metadata(func) # type: ignore + wrapper_copy._cli_command.func = wrapper_copy # type: ignore + + return cast(FuncT, wrapper_copy) + + +def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']: + """ + Decorator to make Orchestrator methods return + an OrchResult. + """ + + @wraps(f) + def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]: + try: + return OrchResult(f(*args, **kwargs)) + except Exception as e: + logger.exception(e) + import os + if 'UNITTEST' in os.environ: + raise # This makes debugging of Tracebacks from unittests a bit easier + return OrchResult(None, exception=e) + + return cast(Callable[..., OrchResult[T]], wrapper) + + +class InnerCliCommandCallable(Protocol): + def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]: + ... + + +def _cli_command(perm: str) -> InnerCliCommandCallable: + def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]: + return lambda func: handle_exception(prefix, perm, func) + return inner_cli_command + + +_cli_read_command = _cli_command('r') +_cli_write_command = _cli_command('rw') + + +class CLICommandMeta(type): + """ + This is a workaround for the use of a global variable CLICommand.COMMANDS which + prevents modules from importing any other module. + + We make use of CLICommand, except for the use of the global variable. + """ + def __init__(cls, name: str, bases: Any, dct: Any) -> None: + super(CLICommandMeta, cls).__init__(name, bases, dct) + dispatch: Dict[str, CLICommand] = {} + for v in dct.values(): + try: + dispatch[v._prefix] = v._cli_command + except AttributeError: + pass + + def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any: + if cmd['prefix'] not in dispatch: + return self.handle_command(inbuf, cmd) + + return dispatch[cmd['prefix']].call(self, cmd, inbuf) + + cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()] + cls.handle_command = handle_command + + +class OrchResult(Generic[T]): + """ + Stores a result and an exception. Mainly to circumvent the + MgrModule.remote() method that hides all exceptions and for + handling different sub-interpreters. + """ + + def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None: + self.result = result + self.serialized_exception: Optional[bytes] = None + self.exception_str: str = '' + self.set_exception(exception) + + __slots__ = 'result', 'serialized_exception', 'exception_str' + + def set_exception(self, e: Optional[Exception]) -> None: + if e is None: + self.serialized_exception = None + self.exception_str = '' + return + + self.exception_str = f'{type(e)}: {str(e)}' + try: + self.serialized_exception = pickle.dumps(e) + except pickle.PicklingError: + logger.error(f"failed to pickle {e}") + if isinstance(e, Exception): + e = Exception(*e.args) + else: + e = Exception(str(e)) + # degenerate to a plain Exception + self.serialized_exception = pickle.dumps(e) + + def result_str(self) -> str: + """Force a string.""" + if self.result is None: + return '' + if isinstance(self.result, list): + return '\n'.join(str(x) for x in self.result) + return str(self.result) + + +def raise_if_exception(c: OrchResult[T]) -> T: + """ + Due to different sub-interpreters, this MUST not be in the `OrchResult` class. + """ + if c.serialized_exception is not None: + try: + e = pickle.loads(c.serialized_exception) + except (KeyError, AttributeError): + raise Exception(c.exception_str) + raise e + assert c.result is not None, 'OrchResult should either have an exception or a result' + return c.result + + +def _hide_in_features(f: FuncT) -> FuncT: + f._hide_in_features = True # type: ignore + return f + + +class Orchestrator(object): + """ + Calls in this class may do long running remote operations, with time + periods ranging from network latencies to package install latencies and large + internet downloads. For that reason, all are asynchronous, and return + ``Completion`` objects. + + Methods should only return the completion and not directly execute + anything, like network calls. Otherwise the purpose of + those completions is defeated. + + Implementations are not required to start work on an operation until + the caller waits on the relevant Completion objects. Callers making + multiple updates should not wait on Completions until they're done + sending operations: this enables implementations to batch up a series + of updates when wait() is called on a set of Completion objects. + + Implementations are encouraged to keep reasonably fresh caches of + the status of the system: it is better to serve a stale-but-recent + result read of e.g. device inventory than it is to keep the caller waiting + while you scan hosts every time. + """ + + @_hide_in_features + def is_orchestrator_module(self) -> bool: + """ + Enable other modules to interrogate this module to discover + whether it's usable as an orchestrator module. + + Subclasses do not need to override this. + """ + return True + + @_hide_in_features + def available(self) -> Tuple[bool, str, Dict[str, Any]]: + """ + Report whether we can talk to the orchestrator. This is the + place to give the user a meaningful message if the orchestrator + isn't running or can't be contacted. + + This method may be called frequently (e.g. every page load + to conditionally display a warning banner), so make sure it's + not too expensive. It's okay to give a slightly stale status + (e.g. based on a periodic background ping of the orchestrator) + if that's necessary to make this method fast. + + .. note:: + `True` doesn't mean that the desired functionality + is actually available in the orchestrator. I.e. this + won't work as expected:: + + >>> #doctest: +SKIP + ... if OrchestratorClientMixin().available()[0]: # wrong. + ... OrchestratorClientMixin().get_hosts() + + :return: boolean representing whether the module is available/usable + :return: string describing any error + :return: dict containing any module specific information + """ + raise NotImplementedError() + + @_hide_in_features + def get_feature_set(self) -> Dict[str, dict]: + """Describes which methods this orchestrator implements + + .. note:: + `True` doesn't mean that the desired functionality + is actually possible in the orchestrator. I.e. this + won't work as expected:: + + >>> #doctest: +SKIP + ... api = OrchestratorClientMixin() + ... if api.get_feature_set()['get_hosts']['available']: # wrong. + ... api.get_hosts() + + It's better to ask for forgiveness instead:: + + >>> #doctest: +SKIP + ... try: + ... OrchestratorClientMixin().get_hosts() + ... except (OrchestratorError, NotImplementedError): + ... ... + + :returns: Dict of API method names to ``{'available': True or False}`` + """ + module = self.__class__ + features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)} + for a in Orchestrator.__dict__ + if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False) + } + return features + + def cancel_completions(self) -> None: + """ + Cancels ongoing completions. Unstuck the mgr. + """ + raise NotImplementedError() + + def pause(self) -> None: + raise NotImplementedError() + + def resume(self) -> None: + raise NotImplementedError() + + def add_host(self, host_spec: HostSpec) -> OrchResult[str]: + """ + Add a host to the orchestrator inventory. + + :param host: hostname + """ + raise NotImplementedError() + + def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]: + """ + Remove a host from the orchestrator inventory. + + :param host: hostname + """ + raise NotImplementedError() + + def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> OrchResult[str]: + """ + drain all daemons from a host + + :param hostname: hostname + """ + raise NotImplementedError() + + def update_host_addr(self, host: str, addr: str) -> OrchResult[str]: + """ + Update a host's address + + :param host: hostname + :param addr: address (dns name or IP) + """ + raise NotImplementedError() + + def get_hosts(self) -> OrchResult[List[HostSpec]]: + """ + Report the hosts in the cluster. + + :return: list of HostSpec + """ + raise NotImplementedError() + + def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]: + """ + Return hosts metadata(gather_facts). + """ + raise NotImplementedError() + + def add_host_label(self, host: str, label: str) -> OrchResult[str]: + """ + Add a host label + """ + raise NotImplementedError() + + def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: + """ + Remove a host label + """ + raise NotImplementedError() + + def host_ok_to_stop(self, hostname: str) -> OrchResult: + """ + Check if the specified host can be safely stopped without reducing availability + + :param host: hostname + """ + raise NotImplementedError() + + def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> OrchResult: + """ + Place a host in maintenance, stopping daemons and disabling it's systemd target + """ + raise NotImplementedError() + + def exit_host_maintenance(self, hostname: str) -> OrchResult: + """ + Return a host from maintenance, restarting the clusters systemd target + """ + raise NotImplementedError() + + def rescan_host(self, hostname: str) -> OrchResult: + """Use cephadm to issue a disk rescan on each HBA + + Some HBAs and external enclosures don't automatically register + device insertion with the kernel, so for these scenarios we need + to manually rescan + + :param hostname: (str) host name + """ + raise NotImplementedError() + + def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]: + """ + Returns something that was created by `ceph-volume inventory`. + + :return: list of InventoryHost + """ + raise NotImplementedError() + + def service_discovery_dump_cert(self) -> OrchResult: + """ + Returns service discovery server root certificate + + :return: service discovery root certificate + """ + raise NotImplementedError() + + def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]: + """ + Describe a service (of any kind) that is already configured in + the orchestrator. For example, when viewing an OSD in the dashboard + we might like to also display information about the orchestrator's + view of the service (like the kubernetes pod ID). + + When viewing a CephFS filesystem in the dashboard, we would use this + to display the pods being currently run for MDS daemons. + + :return: list of ServiceDescription objects. + """ + raise NotImplementedError() + + def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]: + """ + Describe a daemon (of any kind) that is already configured in + the orchestrator. + + :return: list of DaemonDescription objects. + """ + raise NotImplementedError() + + @handle_orch_error + def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]: + """ + Applies any spec + """ + fns: Dict[str, Callable[..., OrchResult[str]]] = { + 'alertmanager': self.apply_alertmanager, + 'crash': self.apply_crash, + 'grafana': self.apply_grafana, + 'iscsi': self.apply_iscsi, + 'nvmeof': self.apply_nvmeof, + 'mds': self.apply_mds, + 'mgr': self.apply_mgr, + 'mon': self.apply_mon, + 'nfs': self.apply_nfs, + 'node-exporter': self.apply_node_exporter, + 'ceph-exporter': self.apply_ceph_exporter, + 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore + 'prometheus': self.apply_prometheus, + 'loki': self.apply_loki, + 'promtail': self.apply_promtail, + 'rbd-mirror': self.apply_rbd_mirror, + 'rgw': self.apply_rgw, + 'ingress': self.apply_ingress, + 'snmp-gateway': self.apply_snmp_gateway, + 'host': self.add_host, + } + + def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741 + l_res = raise_if_exception(l) + r_res = raise_if_exception(r) + l_res.append(r_res) + return OrchResult(l_res) + return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([]))) + + def set_unmanaged(self, service_name: str, value: bool) -> OrchResult[str]: + """ + Set unmanaged parameter to True/False for a given service + + :return: None + """ + raise NotImplementedError() + + def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]: + """ + Plan (Dry-run, Preview) a List of Specs. + """ + raise NotImplementedError() + + def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]: + """ + Remove specific daemon(s). + + :return: None + """ + raise NotImplementedError() + + def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]: + """ + Remove a service (a collection of daemons). + + :return: None + """ + raise NotImplementedError() + + def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]: + """ + Perform an action (start/stop/reload) on a service (i.e., all daemons + providing the logical service). + + :param action: one of "start", "stop", "restart", "redeploy", "reconfig" + :param service_name: service_type + '.' + service_id + (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...) + :rtype: OrchResult + """ + # assert action in ["start", "stop", "reload, "restart", "redeploy"] + raise NotImplementedError() + + def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]: + """ + Perform an action (start/stop/reload) on a daemon. + + :param action: one of "start", "stop", "restart", "redeploy", "reconfig" + :param daemon_name: name of daemon + :param image: Container image when redeploying that daemon + :rtype: OrchResult + """ + # assert action in ["start", "stop", "reload, "restart", "redeploy"] + raise NotImplementedError() + + def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]: + """ + Create one or more OSDs within a single Drive Group. + + The principal argument here is the drive_group member + of OsdSpec: other fields are advisory/extensible for any + finer-grained OSD feature enablement (choice of backing store, + compression/encryption, etc). + """ + raise NotImplementedError() + + def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: + """ Update OSD cluster """ + raise NotImplementedError() + + def set_unmanaged_flag(self, + unmanaged_flag: bool, + service_type: str = 'osd', + service_name: Optional[str] = None + ) -> HandleCommandResult: + raise NotImplementedError() + + def preview_osdspecs(self, + osdspec_name: Optional[str] = 'osd', + osdspecs: Optional[List[DriveGroupSpec]] = None + ) -> OrchResult[str]: + """ Get a preview for OSD deployments """ + raise NotImplementedError() + + def remove_osds(self, osd_ids: List[str], + replace: bool = False, + force: bool = False, + zap: bool = False, + no_destroy: bool = False) -> OrchResult[str]: + """ + :param osd_ids: list of OSD IDs + :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` + :param force: Forces the OSD removal process without waiting for the data to be drained first. + :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA) + :param no_destroy: Do not destroy associated VGs/LVs with the OSD. + + + .. note:: this can only remove OSDs that were successfully + created (i.e. got an OSD ID). + """ + raise NotImplementedError() + + def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult: + """ + TODO + """ + raise NotImplementedError() + + def remove_osds_status(self) -> OrchResult: + """ + Returns a status of the ongoing OSD removal operations. + """ + raise NotImplementedError() + + def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]: + """ + Instructs the orchestrator to enable or disable either the ident or the fault LED. + + :param ident_fault: either ``"ident"`` or ``"fault"`` + :param on: ``True`` = on. + :param locations: See :class:`orchestrator.DeviceLightLoc` + """ + raise NotImplementedError() + + def zap_device(self, host: str, path: str) -> OrchResult[str]: + """Zap/Erase a device (DESTROYS DATA)""" + raise NotImplementedError() + + def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]: + """Create daemons daemon(s) for unmanaged services""" + raise NotImplementedError() + + def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]: + """Update mon cluster""" + raise NotImplementedError() + + def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]: + """Update mgr cluster""" + raise NotImplementedError() + + def apply_mds(self, spec: MDSSpec) -> OrchResult[str]: + """Update MDS cluster""" + raise NotImplementedError() + + def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: + """Update RGW cluster""" + raise NotImplementedError() + + def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]: + """Update ingress daemons""" + raise NotImplementedError() + + def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]: + """Update rbd-mirror cluster""" + raise NotImplementedError() + + def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]: + """Update NFS cluster""" + raise NotImplementedError() + + def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]: + """Update iscsi cluster""" + raise NotImplementedError() + + def apply_nvmeof(self, spec: NvmeofServiceSpec) -> OrchResult[str]: + """Update nvmeof cluster""" + raise NotImplementedError() + + def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]: + """Update prometheus cluster""" + raise NotImplementedError() + + def get_prometheus_access_info(self) -> OrchResult[Dict[str, str]]: + """get prometheus access information""" + raise NotImplementedError() + + def set_alertmanager_access_info(self, user: str, password: str) -> OrchResult[str]: + """set alertmanager access information""" + raise NotImplementedError() + + def set_prometheus_access_info(self, user: str, password: str) -> OrchResult[str]: + """set prometheus access information""" + raise NotImplementedError() + + def get_alertmanager_access_info(self) -> OrchResult[Dict[str, str]]: + """get alertmanager access information""" + raise NotImplementedError() + + def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Node-Exporter daemon(s)""" + raise NotImplementedError() + + def apply_ceph_exporter(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a ceph exporter daemon(s)""" + raise NotImplementedError() + + def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Loki daemon(s)""" + raise NotImplementedError() + + def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Promtail daemon(s)""" + raise NotImplementedError() + + def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a crash daemon(s)""" + raise NotImplementedError() + + def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a grafana service""" + raise NotImplementedError() + + def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]: + """Update an existing AlertManager daemon(s)""" + raise NotImplementedError() + + def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]: + """Update an existing snmp gateway service""" + raise NotImplementedError() + + def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool) -> OrchResult[str]: + """Add or update an existing tuned profile""" + raise NotImplementedError() + + def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]: + """Remove a tuned profile""" + raise NotImplementedError() + + def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]: + """See current tuned profiles""" + raise NotImplementedError() + + def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]: + """Change/Add a specific setting for a tuned profile""" + raise NotImplementedError() + + def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]: + """Remove a specific setting for a tuned profile""" + raise NotImplementedError() + + def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]: + raise NotImplementedError() + + def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]], + hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_pause(self) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_resume(self) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_stop(self) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']: + """ + If an upgrade is currently underway, report on where + we are in the process, or if some error has occurred. + + :return: UpgradeStatusSpec instance + """ + raise NotImplementedError() + + @_hide_in_features + def upgrade_available(self) -> OrchResult: + """ + Report on what versions are available to upgrade to + + :return: List of strings + """ + raise NotImplementedError() + + +GenericSpec = Union[ServiceSpec, HostSpec] + + +def json_to_generic_spec(spec: dict) -> GenericSpec: + if 'service_type' in spec and spec['service_type'] == 'host': + return HostSpec.from_json(spec) + else: + return ServiceSpec.from_json(spec) + + +def daemon_type_to_service(dtype: str) -> str: + mapping = { + 'mon': 'mon', + 'mgr': 'mgr', + 'mds': 'mds', + 'rgw': 'rgw', + 'osd': 'osd', + 'haproxy': 'ingress', + 'keepalived': 'ingress', + 'iscsi': 'iscsi', + 'nvmeof': 'nvmeof', + 'rbd-mirror': 'rbd-mirror', + 'cephfs-mirror': 'cephfs-mirror', + 'nfs': 'nfs', + 'grafana': 'grafana', + 'alertmanager': 'alertmanager', + 'prometheus': 'prometheus', + 'node-exporter': 'node-exporter', + 'ceph-exporter': 'ceph-exporter', + 'loki': 'loki', + 'promtail': 'promtail', + 'crash': 'crash', + 'crashcollector': 'crash', # Specific Rook Daemon + 'container': 'container', + 'agent': 'agent', + 'snmp-gateway': 'snmp-gateway', + 'elasticsearch': 'elasticsearch', + 'jaeger-agent': 'jaeger-agent', + 'jaeger-collector': 'jaeger-collector', + 'jaeger-query': 'jaeger-query' + } + return mapping[dtype] + + +def service_to_daemon_types(stype: str) -> List[str]: + mapping = { + 'mon': ['mon'], + 'mgr': ['mgr'], + 'mds': ['mds'], + 'rgw': ['rgw'], + 'osd': ['osd'], + 'ingress': ['haproxy', 'keepalived'], + 'iscsi': ['iscsi'], + 'nvmeof': ['nvmeof'], + 'rbd-mirror': ['rbd-mirror'], + 'cephfs-mirror': ['cephfs-mirror'], + 'nfs': ['nfs'], + 'grafana': ['grafana'], + 'alertmanager': ['alertmanager'], + 'prometheus': ['prometheus'], + 'loki': ['loki'], + 'promtail': ['promtail'], + 'node-exporter': ['node-exporter'], + 'ceph-exporter': ['ceph-exporter'], + 'crash': ['crash'], + 'container': ['container'], + 'agent': ['agent'], + 'snmp-gateway': ['snmp-gateway'], + 'elasticsearch': ['elasticsearch'], + 'jaeger-agent': ['jaeger-agent'], + 'jaeger-collector': ['jaeger-collector'], + 'jaeger-query': ['jaeger-query'], + 'jaeger-tracing': ['elasticsearch', 'jaeger-query', 'jaeger-collector', 'jaeger-agent'] + } + return mapping[stype] + + +KNOWN_DAEMON_TYPES: List[str] = list( + sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), [])) + + +class UpgradeStatusSpec(object): + # Orchestrator's report on what's going on with any ongoing upgrade + def __init__(self) -> None: + self.in_progress = False # Is an upgrade underway? + self.target_image: Optional[str] = None + self.services_complete: List[str] = [] # Which daemon types are fully updated? + self.which: str = '' # for if user specified daemon types, services or hosts + self.progress: Optional[str] = None # How many of the daemons have we upgraded + self.message = "" # Freeform description + self.is_paused: bool = False # Is the upgrade paused? + + def to_json(self) -> dict: + return { + 'in_progress': self.in_progress, + 'target_image': self.target_image, + 'which': self.which, + 'services_complete': self.services_complete, + 'progress': self.progress, + 'message': self.message, + 'is_paused': self.is_paused, + } + + +def handle_type_error(method: FuncT) -> FuncT: + @wraps(method) + def inner(cls: Any, *args: Any, **kwargs: Any) -> Any: + try: + return method(cls, *args, **kwargs) + except TypeError as e: + error_msg = '{}: {}'.format(cls.__name__, e) + raise OrchestratorValidationError(error_msg) + return cast(FuncT, inner) + + +class DaemonDescriptionStatus(enum.IntEnum): + unknown = -2 + error = -1 + stopped = 0 + running = 1 + starting = 2 #: Daemon is deployed, but not yet running + + @staticmethod + def to_str(status: Optional['DaemonDescriptionStatus']) -> str: + if status is None: + status = DaemonDescriptionStatus.unknown + return { + DaemonDescriptionStatus.unknown: 'unknown', + DaemonDescriptionStatus.error: 'error', + DaemonDescriptionStatus.stopped: 'stopped', + DaemonDescriptionStatus.running: 'running', + DaemonDescriptionStatus.starting: 'starting', + }.get(status, '') + + +class DaemonDescription(object): + """ + For responding to queries about the status of a particular daemon, + stateful or stateless. + + This is not about health or performance monitoring of daemons: it's + about letting the orchestrator tell Ceph whether and where a + daemon is scheduled in the cluster. When an orchestrator tells + Ceph "it's running on host123", that's not a promise that the process + is literally up this second, it's a description of where the orchestrator + has decided the daemon should run. + """ + + def __init__(self, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + hostname: Optional[str] = None, + container_id: Optional[str] = None, + container_image_id: Optional[str] = None, + container_image_name: Optional[str] = None, + container_image_digests: Optional[List[str]] = None, + version: Optional[str] = None, + status: Optional[DaemonDescriptionStatus] = None, + status_desc: Optional[str] = None, + last_refresh: Optional[datetime.datetime] = None, + created: Optional[datetime.datetime] = None, + started: Optional[datetime.datetime] = None, + last_configured: Optional[datetime.datetime] = None, + osdspec_affinity: Optional[str] = None, + last_deployed: Optional[datetime.datetime] = None, + events: Optional[List['OrchestratorEvent']] = None, + is_active: bool = False, + memory_usage: Optional[int] = None, + memory_request: Optional[int] = None, + memory_limit: Optional[int] = None, + cpu_percentage: Optional[str] = None, + service_name: Optional[str] = None, + ports: Optional[List[int]] = None, + ip: Optional[str] = None, + deployed_by: Optional[List[str]] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + ) -> None: + + #: Host is at the same granularity as InventoryHost + self.hostname: Optional[str] = hostname + + # Not everyone runs in containers, but enough people do to + # justify having the container_id (runtime id) and container_image + # (image name) + self.container_id = container_id # runtime id + self.container_image_id = container_image_id # image id locally + self.container_image_name = container_image_name # image friendly name + self.container_image_digests = container_image_digests # reg hashes + + #: The type of service (osd, mon, mgr, etc.) + self.daemon_type = daemon_type + + #: The orchestrator will have picked some names for daemons, + #: typically either based on hostnames or on pod names. + #: This is the in mds., the ID that will appear + #: in the FSMap/ServiceMap. + self.daemon_id: Optional[str] = daemon_id + self.daemon_name = self.name() + + #: Some daemon types have a numeric rank assigned + self.rank: Optional[int] = rank + self.rank_generation: Optional[int] = rank_generation + + self._service_name: Optional[str] = service_name + + #: Service version that was deployed + self.version = version + + # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting + self._status = status + + #: Service status description when status == error. + self.status_desc = status_desc + + #: datetime when this info was last refreshed + self.last_refresh: Optional[datetime.datetime] = last_refresh + + self.created: Optional[datetime.datetime] = created + self.started: Optional[datetime.datetime] = started + self.last_configured: Optional[datetime.datetime] = last_configured + self.last_deployed: Optional[datetime.datetime] = last_deployed + + #: Affinity to a certain OSDSpec + self.osdspec_affinity: Optional[str] = osdspec_affinity + + self.events: List[OrchestratorEvent] = events or [] + + self.memory_usage: Optional[int] = memory_usage + self.memory_request: Optional[int] = memory_request + self.memory_limit: Optional[int] = memory_limit + + self.cpu_percentage: Optional[str] = cpu_percentage + + self.ports: Optional[List[int]] = ports + self.ip: Optional[str] = ip + + self.deployed_by = deployed_by + + self.is_active = is_active + + self.extra_container_args: Optional[ArgumentList] = None + self.extra_entrypoint_args: Optional[ArgumentList] = None + if extra_container_args: + self.extra_container_args = ArgumentSpec.from_general_args( + extra_container_args) + if extra_entrypoint_args: + self.extra_entrypoint_args = ArgumentSpec.from_general_args( + extra_entrypoint_args) + + @property + def status(self) -> Optional[DaemonDescriptionStatus]: + return self._status + + @status.setter + def status(self, new: DaemonDescriptionStatus) -> None: + self._status = new + self.status_desc = DaemonDescriptionStatus.to_str(new) + + def get_port_summary(self) -> str: + if not self.ports: + return '' + return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}" + + def name(self) -> str: + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def matches_service(self, service_name: Optional[str]) -> bool: + assert self.daemon_id is not None + assert self.daemon_type is not None + if service_name: + return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.') + return False + + def matches_digests(self, digests: Optional[List[str]]) -> bool: + # the DaemonDescription class maintains a list of container digests + # for the container image last reported as being used for the daemons. + # This function checks if any of those digests match any of the digests + # in the list of digests provided as an arg to this function + if not digests or not self.container_image_digests: + return False + return any(d in digests for d in self.container_image_digests) + + def matches_image_name(self, image_name: Optional[str]) -> bool: + # the DaemonDescription class has an attribute that tracks the image + # name of the container image last reported as being used by the daemon. + # This function compares if the image name provided as an arg matches + # the image name in said attribute + if not image_name or not self.container_image_name: + return False + return image_name == self.container_image_name + + def service_id(self) -> str: + assert self.daemon_id is not None + assert self.daemon_type is not None + + if self._service_name: + if '.' in self._service_name: + return self._service_name.split('.', 1)[1] + else: + return '' + + if self.daemon_type == 'osd': + if self.osdspec_affinity and self.osdspec_affinity != 'None': + return self.osdspec_affinity + return '' + + def _match() -> str: + assert self.daemon_id is not None + err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " + f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'") + + if not self.hostname: + # TODO: can a DaemonDescription exist without a hostname? + raise err + + # use the bare hostname, not the FQDN. + host = self.hostname.split('.')[0] + + if host == self.daemon_id: + # daemon_id == "host" + return self.daemon_id + + elif host in self.daemon_id: + # daemon_id == "service_id.host" + # daemon_id == "service_id.host.random" + pre, post = self.daemon_id.rsplit(host, 1) + if not pre.endswith('.'): + # '.' sep missing at front of host + raise err + elif post and not post.startswith('.'): + # '.' sep missing at end of host + raise err + return pre[:-1] + + # daemon_id == "service_id.random" + if self.daemon_type == 'rgw': + v = self.daemon_id.split('.') + if len(v) in [3, 4]: + return '.'.join(v[0:2]) + + if self.daemon_type == 'iscsi': + v = self.daemon_id.split('.') + return '.'.join(v[0:-1]) + + # daemon_id == "service_id" + return self.daemon_id + + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: + return _match() + + return self.daemon_id + + def service_name(self) -> str: + if self._service_name: + return self._service_name + assert self.daemon_type is not None + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: + return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}' + return daemon_type_to_service(self.daemon_type) + + def __repr__(self) -> str: + return "({type}.{id})".format(type=self.daemon_type, + id=self.daemon_id) + + def __str__(self) -> str: + return f"{self.name()} in status {self.status_desc} on {self.hostname}" + + def to_json(self) -> dict: + out: Dict[str, Any] = OrderedDict() + out['daemon_type'] = self.daemon_type + out['daemon_id'] = self.daemon_id + out['service_name'] = self._service_name + out['daemon_name'] = self.name() + out['hostname'] = self.hostname + out['container_id'] = self.container_id + out['container_image_id'] = self.container_image_id + out['container_image_name'] = self.container_image_name + out['container_image_digests'] = self.container_image_digests + out['memory_usage'] = self.memory_usage + out['memory_request'] = self.memory_request + out['memory_limit'] = self.memory_limit + out['cpu_percentage'] = self.cpu_percentage + out['version'] = self.version + out['status'] = self.status.value if self.status is not None else None + out['status_desc'] = self.status_desc + if self.daemon_type == 'osd': + out['osdspec_affinity'] = self.osdspec_affinity + out['is_active'] = self.is_active + out['ports'] = self.ports + out['ip'] = self.ip + out['rank'] = self.rank + out['rank_generation'] = self.rank_generation + + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + + if self.events: + out['events'] = [e.to_json() for e in self.events] + + empty = [k for k, v in out.items() if v is None] + for e in empty: + del out[e] + return out + + def to_dict(self) -> dict: + out: Dict[str, Any] = OrderedDict() + out['daemon_type'] = self.daemon_type + out['daemon_id'] = self.daemon_id + out['daemon_name'] = self.name() + out['hostname'] = self.hostname + out['container_id'] = self.container_id + out['container_image_id'] = self.container_image_id + out['container_image_name'] = self.container_image_name + out['container_image_digests'] = self.container_image_digests + out['memory_usage'] = self.memory_usage + out['memory_request'] = self.memory_request + out['memory_limit'] = self.memory_limit + out['cpu_percentage'] = self.cpu_percentage + out['version'] = self.version + out['status'] = self.status.value if self.status is not None else None + out['status_desc'] = self.status_desc + if self.daemon_type == 'osd': + out['osdspec_affinity'] = self.osdspec_affinity + out['is_active'] = self.is_active + out['ports'] = self.ports + out['ip'] = self.ip + + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + + if self.events: + out['events'] = [e.to_dict() for e in self.events] + + empty = [k for k, v in out.items() if v is None] + for e in empty: + del out[e] + return out + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'DaemonDescription': + c = data.copy() + event_strs = c.pop('events', []) + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if k in c: + c[k] = str_to_datetime(c[k]) + events = [OrchestratorEvent.from_json(e) for e in event_strs] + status_int = c.pop('status', None) + if 'daemon_name' in c: + del c['daemon_name'] + if 'service_name' in c and c['service_name'].startswith('osd.'): + # if the service_name is a osd.NNN (numeric osd id) then + # ignore it -- it is not a valid service_name and + # (presumably) came from an older version of cephadm. + try: + int(c['service_name'][4:]) + del c['service_name'] + except ValueError: + pass + status = DaemonDescriptionStatus(status_int) if status_int is not None else None + return cls(events=events, status=status, **c) + + def __copy__(self) -> 'DaemonDescription': + # feel free to change this: + return DaemonDescription.from_json(self.to_json()) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer) + + +class ServiceDescription(object): + """ + For responding to queries about the status of a particular service, + stateful or stateless. + + This is not about health or performance monitoring of services: it's + about letting the orchestrator tell Ceph whether and where a + service is scheduled in the cluster. When an orchestrator tells + Ceph "it's running on host123", that's not a promise that the process + is literally up this second, it's a description of where the orchestrator + has decided the service should run. + """ + + def __init__(self, + spec: ServiceSpec, + container_image_id: Optional[str] = None, + container_image_name: Optional[str] = None, + service_url: Optional[str] = None, + last_refresh: Optional[datetime.datetime] = None, + created: Optional[datetime.datetime] = None, + deleted: Optional[datetime.datetime] = None, + size: int = 0, + running: int = 0, + events: Optional[List['OrchestratorEvent']] = None, + virtual_ip: Optional[str] = None, + ports: List[int] = []) -> None: + # Not everyone runs in containers, but enough people do to + # justify having the container_image_id (image hash) and container_image + # (image name) + self.container_image_id = container_image_id # image hash + self.container_image_name = container_image_name # image friendly name + + # If the service exposes REST-like API, this attribute should hold + # the URL. + self.service_url = service_url + + # Number of daemons + self.size = size + + # Number of daemons up + self.running = running + + # datetime when this info was last refreshed + self.last_refresh: Optional[datetime.datetime] = last_refresh + self.created: Optional[datetime.datetime] = created + self.deleted: Optional[datetime.datetime] = deleted + + self.spec: ServiceSpec = spec + + self.events: List[OrchestratorEvent] = events or [] + + self.virtual_ip = virtual_ip + self.ports = ports + + def service_type(self) -> str: + return self.spec.service_type + + def __repr__(self) -> str: + return f"" + + def get_port_summary(self) -> str: + if not self.ports: + return '' + ports = sorted([int(x) for x in self.ports]) + return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, ports or []))}" + + def to_json(self) -> OrderedDict: + out = self.spec.to_json() + status = { + 'container_image_id': self.container_image_id, + 'container_image_name': self.container_image_name, + 'service_url': self.service_url, + 'size': self.size, + 'running': self.running, + 'last_refresh': self.last_refresh, + 'created': self.created, + 'virtual_ip': self.virtual_ip, + 'ports': self.ports if self.ports else None, + } + for k in ['last_refresh', 'created']: + if getattr(self, k): + status[k] = datetime_to_str(getattr(self, k)) + status = {k: v for (k, v) in status.items() if v is not None} + out['status'] = status + if self.events: + out['events'] = [e.to_json() for e in self.events] + return out + + def to_dict(self) -> OrderedDict: + out = self.spec.to_json() + status = { + 'container_image_id': self.container_image_id, + 'container_image_name': self.container_image_name, + 'service_url': self.service_url, + 'size': self.size, + 'running': self.running, + 'last_refresh': self.last_refresh, + 'created': self.created, + 'virtual_ip': self.virtual_ip, + 'ports': self.ports if self.ports else None, + } + for k in ['last_refresh', 'created']: + if getattr(self, k): + status[k] = datetime_to_str(getattr(self, k)) + status = {k: v for (k, v) in status.items() if v is not None} + out['status'] = status + if self.events: + out['events'] = [e.to_dict() for e in self.events] + return out + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'ServiceDescription': + c = data.copy() + status = c.pop('status', {}) + event_strs = c.pop('events', []) + spec = ServiceSpec.from_json(c) + + c_status = status.copy() + for k in ['last_refresh', 'created']: + if k in c_status: + c_status[k] = str_to_datetime(c_status[k]) + events = [OrchestratorEvent.from_json(e) for e in event_strs] + return cls(spec=spec, events=events, **c_status) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer) + + +class InventoryFilter(object): + """ + When fetching inventory, use this filter to avoid unnecessarily + scanning the whole estate. + + Typical use: + + filter by host when presenting UI workflow for configuring + a particular server. + filter by label when not all of estate is Ceph servers, + and we want to only learn about the Ceph servers. + filter by label when we are interested particularly + in e.g. OSD servers. + """ + + def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None: + + #: Optional: get info about hosts matching labels + self.labels = labels + + #: Optional: get info about certain named hosts only + self.hosts = hosts + + +class InventoryHost(object): + """ + When fetching inventory, all Devices are groups inside of an + InventoryHost. + """ + + def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None: + if devices is None: + devices = inventory.Devices([]) + if labels is None: + labels = [] + assert isinstance(devices, inventory.Devices) + + self.name = name # unique within cluster. For example a hostname. + self.addr = addr or name + self.devices = devices + self.labels = labels + + def to_json(self) -> dict: + return { + 'name': self.name, + 'addr': self.addr, + 'devices': self.devices.to_json(), + 'labels': self.labels, + } + + @classmethod + def from_json(cls, data: dict) -> 'InventoryHost': + try: + _data = copy.deepcopy(data) + name = _data.pop('name') + addr = _data.pop('addr', None) or name + devices = inventory.Devices.from_json(_data.pop('devices')) + labels = _data.pop('labels', list()) + if _data: + error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys())) + raise OrchestratorValidationError(error_msg) + return cls(name, devices, labels, addr) + except KeyError as e: + error_msg = '{} is required for {}'.format(e, cls.__name__) + raise OrchestratorValidationError(error_msg) + except TypeError as e: + raise OrchestratorValidationError('Failed to read inventory: {}'.format(e)) + + @classmethod + def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']: + devs = inventory.Devices.from_json + return [cls(item[0], devs(item[1].data)) for item in hosts] + + def __repr__(self) -> str: + return "({name})".format(name=self.name) + + @staticmethod + def get_host_names(hosts: List['InventoryHost']) -> List[str]: + return [host.name for host in hosts] + + def __eq__(self, other: Any) -> bool: + return self.name == other.name and self.devices == other.devices + + +class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])): + """ + Describes a specific device on a specific host. Used for enabling or disabling LEDs + on devices. + + hostname as in :func:`orchestrator.Orchestrator.get_hosts` + + device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``. + See ``ceph osd metadata | jq '.[].device_ids'`` + """ + __slots__ = () + + +class OrchestratorEvent: + """ + Similar to K8s Events. + + Some form of "important" log message attached to something. + """ + INFO = 'INFO' + ERROR = 'ERROR' + regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE) + + def __init__(self, created: Union[str, datetime.datetime], kind: str, + subject: str, level: str, message: str) -> None: + if isinstance(created, str): + created = str_to_datetime(created) + self.created: datetime.datetime = created + + assert kind in "service daemon".split() + self.kind: str = kind + + # service name, or daemon danem or something + self.subject: str = subject + + # Events are not meant for debugging. debugs should end in the log. + assert level in "INFO ERROR".split() + self.level = level + + self.message: str = message + + __slots__ = ('created', 'kind', 'subject', 'level', 'message') + + def kind_subject(self) -> str: + return f'{self.kind}:{self.subject}' + + def to_json(self) -> str: + # Make a long list of events readable. + created = datetime_to_str(self.created) + return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"' + + def to_dict(self) -> dict: + # Convert events data to dict. + return { + 'created': datetime_to_str(self.created), + 'subject': self.kind_subject(), + 'level': self.level, + 'message': self.message + } + + @classmethod + @handle_type_error + def from_json(cls, data: str) -> "OrchestratorEvent": + """ + >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json() + '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"' + + :param data: + :return: + """ + match = cls.regex_v1.match(data) + if match: + return cls(*match.groups()) + raise ValueError(f'Unable to match: "{data}"') + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, OrchestratorEvent): + return False + + return self.created == other.created and self.kind == other.kind \ + and self.subject == other.subject and self.message == other.message + + def __repr__(self) -> str: + return f'OrchestratorEvent.from_json({self.to_json()!r})' + + +def _mk_orch_methods(cls: Any) -> Any: + # Needs to be defined outside of for. + # Otherwise meth is always bound to last key + def shim(method_name: str) -> Callable: + def inner(self: Any, *args: Any, **kwargs: Any) -> Any: + completion = self._oremote(method_name, args, kwargs) + return completion + return inner + + for name, method in Orchestrator.__dict__.items(): + if not name.startswith('_') and name not in ['is_orchestrator_module']: + remote_call = update_wrapper(shim(name), method) + setattr(cls, name, remote_call) + return cls + + +@_mk_orch_methods +class OrchestratorClientMixin(Orchestrator): + """ + A module that inherents from `OrchestratorClientMixin` can directly call + all :class:`Orchestrator` methods without manually calling remote. + + Every interface method from ``Orchestrator`` is converted into a stub method that internally + calls :func:`OrchestratorClientMixin._oremote` + + >>> class MyModule(OrchestratorClientMixin): + ... def func(self): + ... completion = self.add_host('somehost') # calls `_oremote()` + ... self.log.debug(completion.result) + + .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`. + Reason is, that OrchestratorClientMixin magically redirects all methods to the + "real" implementation of the orchestrator. + + + >>> import mgr_module + >>> #doctest: +SKIP + ... class MyImplementation(mgr_module.MgrModule, Orchestrator): + ... def __init__(self, ...): + ... self.orch_client = OrchestratorClientMixin() + ... self.orch_client.set_mgr(self.mgr)) + """ + + def set_mgr(self, mgr: MgrModule) -> None: + """ + Useable in the Dashboard that uses a global ``mgr`` + """ + + self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties + + def __get_mgr(self) -> Any: + try: + return self.__mgr + except AttributeError: + return self + + def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any: + """ + Helper for invoking `remote` on whichever orchestrator is enabled + + :raises RuntimeError: If the remote method failed. + :raises OrchestratorError: orchestrator failed to perform + :raises ImportError: no `orchestrator` module or backend not found. + """ + mgr = self.__get_mgr() + + try: + o = mgr._select_orchestrator() + except AttributeError: + o = mgr.remote('orchestrator', '_select_orchestrator') + + if o is None: + raise NoOrchestrator() + + mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs)) + try: + return mgr.remote(o, meth, *args, **kwargs) + except Exception as e: + if meth == 'get_feature_set': + raise # self.get_feature_set() calls self._oremote() + f_set = self.get_feature_set() + if meth not in f_set or not f_set[meth]['available']: + raise NotImplementedError(f'{o} does not implement {meth}') from e + raise -- cgit v1.2.3