diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/orchestrator | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/orchestrator')
-rw-r--r-- | src/pybind/mgr/orchestrator/README.md | 14 | ||||
-rw-r--r-- | src/pybind/mgr/orchestrator/__init__.py | 20 | ||||
-rw-r--r-- | src/pybind/mgr/orchestrator/_interface.py | 1527 | ||||
-rw-r--r-- | src/pybind/mgr/orchestrator/module.py | 1471 | ||||
-rw-r--r-- | src/pybind/mgr/orchestrator/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/orchestrator/tests/test_orchestrator.py | 220 |
6 files changed, 3252 insertions, 0 deletions
diff --git a/src/pybind/mgr/orchestrator/README.md b/src/pybind/mgr/orchestrator/README.md new file mode 100644 index 000000000..7e3417959 --- /dev/null +++ b/src/pybind/mgr/orchestrator/README.md @@ -0,0 +1,14 @@ +# Orchestrator CLI + +See also [orchestrator cli doc](https://docs.ceph.com/en/latest/mgr/orchestrator/). + +## Running the Teuthology tests + +To run the API tests against a real Ceph cluster, we leverage the Teuthology +framework and the `test_orchestrator` backend. + +``source`` the script and run the tests manually:: + + $ pushd ../dashboard ; source ./run-backend-api-tests.sh ; popd + $ run_teuthology_tests tasks.mgr.test_orchestrator_cli + $ cleanup_teuthology diff --git a/src/pybind/mgr/orchestrator/__init__.py b/src/pybind/mgr/orchestrator/__init__.py new file mode 100644 index 000000000..c901284d3 --- /dev/null +++ b/src/pybind/mgr/orchestrator/__init__.py @@ -0,0 +1,20 @@ +# flake8: noqa + +from .module import OrchestratorCli + +# usage: E.g. `from orchestrator import StatelessServiceSpec` +from ._interface import \ + OrchResult, raise_if_exception, handle_orch_error, \ + CLICommand, _cli_write_command, _cli_read_command, CLICommandMeta, \ + Orchestrator, OrchestratorClientMixin, \ + OrchestratorValidationError, OrchestratorError, NoOrchestrator, \ + ServiceDescription, InventoryFilter, HostSpec, \ + DaemonDescription, DaemonDescriptionStatus, \ + OrchestratorEvent, set_exception_subject, \ + InventoryHost, DeviceLightLoc, \ + UpgradeStatusSpec, daemon_type_to_service, service_to_daemon_types, KNOWN_DAEMON_TYPES + + +import os +if 'UNITTEST' in os.environ: + import tests diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py new file mode 100644 index 000000000..2208a5874 --- /dev/null +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -0,0 +1,1527 @@ + +""" +ceph-mgr orchestrator interface + +Please see the ceph-mgr module developer's guide for more information. +""" + +import copy +import datetime +import enum +import errno +import logging +import pickle +import re + +from collections import namedtuple, OrderedDict +from contextlib import contextmanager +from functools import wraps, reduce, update_wrapper + +from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \ + Sequence, Dict, cast, Mapping + +try: + from typing import Protocol # Protocol was added in Python 3.8 +except ImportError: + class Protocol: # type: ignore + pass + + +import yaml + +from ceph.deployment import inventory +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ + IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.hostspec import HostSpec, SpecValidationError +from ceph.utils import datetime_to_str, str_to_datetime + +from mgr_module import MgrModule, CLICommand, HandleCommandResult + + +logger = logging.getLogger(__name__) + +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable[..., Any]) + + +class OrchestratorError(Exception): + """ + General orchestrator specific error. + + Used for deployment, configuration or user errors. + + It's not intended for programming errors or orchestrator internal errors. + """ + + def __init__(self, + msg: str, + errno: int = -errno.EINVAL, + event_kind_subject: Optional[Tuple[str, str]] = None) -> None: + super(Exception, self).__init__(msg) + self.errno = errno + # See OrchestratorEvent.subject + self.event_subject = event_kind_subject + + +class NoOrchestrator(OrchestratorError): + """ + No orchestrator in configured. + """ + + def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None: + super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT) + + +class OrchestratorValidationError(OrchestratorError): + """ + Raised when an orchestrator doesn't support a specific feature. + """ + + +@contextmanager +def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]: + try: + yield + except OrchestratorError as e: + if overwrite or hasattr(e, 'event_subject'): + e.event_subject = (kind, subject) + raise + + +def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return func(*args, **kwargs) + except (OrchestratorError, SpecValidationError) as e: + # Do not print Traceback for expected errors. + return HandleCommandResult(e.errno, stderr=str(e)) + except ImportError as e: + return HandleCommandResult(-errno.ENOENT, stderr=str(e)) + except NotImplementedError: + msg = 'This Orchestrator does not support `{}`'.format(prefix) + return HandleCommandResult(-errno.ENOENT, stderr=msg) + + # misuse lambda to copy `wrapper` + wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731 + wrapper_copy._prefix = prefix # type: ignore + wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore + wrapper_copy._cli_command.store_func_metadata(func) # type: ignore + wrapper_copy._cli_command.func = wrapper_copy # type: ignore + + return cast(FuncT, wrapper_copy) + + +def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']: + """ + Decorator to make Orchestrator methods return + an OrchResult. + """ + + @wraps(f) + def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]: + try: + return OrchResult(f(*args, **kwargs)) + except Exception as e: + logger.exception(e) + import os + if 'UNITTEST' in os.environ: + raise # This makes debugging of Tracebacks from unittests a bit easier + return OrchResult(None, exception=e) + + return cast(Callable[..., OrchResult[T]], wrapper) + + +class InnerCliCommandCallable(Protocol): + def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]: + ... + + +def _cli_command(perm: str) -> InnerCliCommandCallable: + def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]: + return lambda func: handle_exception(prefix, perm, func) + return inner_cli_command + + +_cli_read_command = _cli_command('r') +_cli_write_command = _cli_command('rw') + + +class CLICommandMeta(type): + """ + This is a workaround for the use of a global variable CLICommand.COMMANDS which + prevents modules from importing any other module. + + We make use of CLICommand, except for the use of the global variable. + """ + def __init__(cls, name: str, bases: Any, dct: Any) -> None: + super(CLICommandMeta, cls).__init__(name, bases, dct) + dispatch: Dict[str, CLICommand] = {} + for v in dct.values(): + try: + dispatch[v._prefix] = v._cli_command + except AttributeError: + pass + + def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any: + if cmd['prefix'] not in dispatch: + return self.handle_command(inbuf, cmd) + + return dispatch[cmd['prefix']].call(self, cmd, inbuf) + + cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()] + cls.handle_command = handle_command + + +class OrchResult(Generic[T]): + """ + Stores a result and an exception. Mainly to circumvent the + MgrModule.remote() method that hides all exceptions and for + handling different sub-interpreters. + """ + + def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None: + self.result = result + self.serialized_exception: Optional[bytes] = None + self.exception_str: str = '' + self.set_exception(exception) + + __slots__ = 'result', 'serialized_exception', 'exception_str' + + def set_exception(self, e: Optional[Exception]) -> None: + if e is None: + self.serialized_exception = None + self.exception_str = '' + return + + self.exception_str = f'{type(e)}: {str(e)}' + try: + self.serialized_exception = pickle.dumps(e) + except pickle.PicklingError: + logger.error(f"failed to pickle {e}") + if isinstance(e, Exception): + e = Exception(*e.args) + else: + e = Exception(str(e)) + # degenerate to a plain Exception + self.serialized_exception = pickle.dumps(e) + + def result_str(self) -> str: + """Force a string.""" + if self.result is None: + return '' + if isinstance(self.result, list): + return '\n'.join(str(x) for x in self.result) + return str(self.result) + + +def raise_if_exception(c: OrchResult[T]) -> T: + """ + Due to different sub-interpreters, this MUST not be in the `OrchResult` class. + """ + if c.serialized_exception is not None: + try: + e = pickle.loads(c.serialized_exception) + except (KeyError, AttributeError): + raise Exception(c.exception_str) + raise e + assert c.result is not None, 'OrchResult should either have an exception or a result' + return c.result + + +def _hide_in_features(f: FuncT) -> FuncT: + f._hide_in_features = True # type: ignore + return f + + +class Orchestrator(object): + """ + Calls in this class may do long running remote operations, with time + periods ranging from network latencies to package install latencies and large + internet downloads. For that reason, all are asynchronous, and return + ``Completion`` objects. + + Methods should only return the completion and not directly execute + anything, like network calls. Otherwise the purpose of + those completions is defeated. + + Implementations are not required to start work on an operation until + the caller waits on the relevant Completion objects. Callers making + multiple updates should not wait on Completions until they're done + sending operations: this enables implementations to batch up a series + of updates when wait() is called on a set of Completion objects. + + Implementations are encouraged to keep reasonably fresh caches of + the status of the system: it is better to serve a stale-but-recent + result read of e.g. device inventory than it is to keep the caller waiting + while you scan hosts every time. + """ + + @_hide_in_features + def is_orchestrator_module(self) -> bool: + """ + Enable other modules to interrogate this module to discover + whether it's usable as an orchestrator module. + + Subclasses do not need to override this. + """ + return True + + @_hide_in_features + def available(self) -> Tuple[bool, str, Dict[str, Any]]: + """ + Report whether we can talk to the orchestrator. This is the + place to give the user a meaningful message if the orchestrator + isn't running or can't be contacted. + + This method may be called frequently (e.g. every page load + to conditionally display a warning banner), so make sure it's + not too expensive. It's okay to give a slightly stale status + (e.g. based on a periodic background ping of the orchestrator) + if that's necessary to make this method fast. + + .. note:: + `True` doesn't mean that the desired functionality + is actually available in the orchestrator. I.e. this + won't work as expected:: + + >>> #doctest: +SKIP + ... if OrchestratorClientMixin().available()[0]: # wrong. + ... OrchestratorClientMixin().get_hosts() + + :return: boolean representing whether the module is available/usable + :return: string describing any error + :return: dict containing any module specific information + """ + raise NotImplementedError() + + @_hide_in_features + def get_feature_set(self) -> Dict[str, dict]: + """Describes which methods this orchestrator implements + + .. note:: + `True` doesn't mean that the desired functionality + is actually possible in the orchestrator. I.e. this + won't work as expected:: + + >>> #doctest: +SKIP + ... api = OrchestratorClientMixin() + ... if api.get_feature_set()['get_hosts']['available']: # wrong. + ... api.get_hosts() + + It's better to ask for forgiveness instead:: + + >>> #doctest: +SKIP + ... try: + ... OrchestratorClientMixin().get_hosts() + ... except (OrchestratorError, NotImplementedError): + ... ... + + :returns: Dict of API method names to ``{'available': True or False}`` + """ + module = self.__class__ + features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)} + for a in Orchestrator.__dict__ + if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False) + } + return features + + def cancel_completions(self) -> None: + """ + Cancels ongoing completions. Unstuck the mgr. + """ + raise NotImplementedError() + + def pause(self) -> None: + raise NotImplementedError() + + def resume(self) -> None: + raise NotImplementedError() + + def add_host(self, host_spec: HostSpec) -> OrchResult[str]: + """ + Add a host to the orchestrator inventory. + + :param host: hostname + """ + raise NotImplementedError() + + def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]: + """ + Remove a host from the orchestrator inventory. + + :param host: hostname + """ + raise NotImplementedError() + + def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]: + """ + drain all daemons from a host + + :param hostname: hostname + """ + raise NotImplementedError() + + def update_host_addr(self, host: str, addr: str) -> OrchResult[str]: + """ + Update a host's address + + :param host: hostname + :param addr: address (dns name or IP) + """ + raise NotImplementedError() + + def get_hosts(self) -> OrchResult[List[HostSpec]]: + """ + Report the hosts in the cluster. + + :return: list of HostSpec + """ + raise NotImplementedError() + + def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]: + """ + Return hosts metadata(gather_facts). + """ + raise NotImplementedError() + + def add_host_label(self, host: str, label: str) -> OrchResult[str]: + """ + Add a host label + """ + raise NotImplementedError() + + def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: + """ + Remove a host label + """ + raise NotImplementedError() + + def host_ok_to_stop(self, hostname: str) -> OrchResult: + """ + Check if the specified host can be safely stopped without reducing availability + + :param host: hostname + """ + raise NotImplementedError() + + def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult: + """ + Place a host in maintenance, stopping daemons and disabling it's systemd target + """ + raise NotImplementedError() + + def exit_host_maintenance(self, hostname: str) -> OrchResult: + """ + Return a host from maintenance, restarting the clusters systemd target + """ + raise NotImplementedError() + + def rescan_host(self, hostname: str) -> OrchResult: + """Use cephadm to issue a disk rescan on each HBA + + Some HBAs and external enclosures don't automatically register + device insertion with the kernel, so for these scenarios we need + to manually rescan + + :param hostname: (str) host name + """ + raise NotImplementedError() + + def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]: + """ + Returns something that was created by `ceph-volume inventory`. + + :return: list of InventoryHost + """ + raise NotImplementedError() + + def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]: + """ + Describe a service (of any kind) that is already configured in + the orchestrator. For example, when viewing an OSD in the dashboard + we might like to also display information about the orchestrator's + view of the service (like the kubernetes pod ID). + + When viewing a CephFS filesystem in the dashboard, we would use this + to display the pods being currently run for MDS daemons. + + :return: list of ServiceDescription objects. + """ + raise NotImplementedError() + + def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]: + """ + Describe a daemon (of any kind) that is already configured in + the orchestrator. + + :return: list of DaemonDescription objects. + """ + raise NotImplementedError() + + @handle_orch_error + def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]: + """ + Applies any spec + """ + fns: Dict[str, Callable[..., OrchResult[str]]] = { + 'alertmanager': self.apply_alertmanager, + 'crash': self.apply_crash, + 'grafana': self.apply_grafana, + 'iscsi': self.apply_iscsi, + 'mds': self.apply_mds, + 'mgr': self.apply_mgr, + 'mon': self.apply_mon, + 'nfs': self.apply_nfs, + 'node-exporter': self.apply_node_exporter, + 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore + 'prometheus': self.apply_prometheus, + 'rbd-mirror': self.apply_rbd_mirror, + 'rgw': self.apply_rgw, + 'ingress': self.apply_ingress, + 'snmp-gateway': self.apply_snmp_gateway, + 'host': self.add_host, + 'cephadm-exporter': self.apply_cephadm_exporter, + } + + def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741 + l_res = raise_if_exception(l) + r_res = raise_if_exception(r) + l_res.append(r_res) + return OrchResult(l_res) + return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([]))) + + def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]: + """ + Plan (Dry-run, Preview) a List of Specs. + """ + raise NotImplementedError() + + def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]: + """ + Remove specific daemon(s). + + :return: None + """ + raise NotImplementedError() + + def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]: + """ + Remove a service (a collection of daemons). + + :return: None + """ + raise NotImplementedError() + + def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]: + """ + Perform an action (start/stop/reload) on a service (i.e., all daemons + providing the logical service). + + :param action: one of "start", "stop", "restart", "redeploy", "reconfig" + :param service_name: service_type + '.' + service_id + (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...) + :rtype: OrchResult + """ + # assert action in ["start", "stop", "reload, "restart", "redeploy"] + raise NotImplementedError() + + def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]: + """ + Perform an action (start/stop/reload) on a daemon. + + :param action: one of "start", "stop", "restart", "redeploy", "reconfig" + :param daemon_name: name of daemon + :param image: Container image when redeploying that daemon + :rtype: OrchResult + """ + # assert action in ["start", "stop", "reload, "restart", "redeploy"] + raise NotImplementedError() + + def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]: + """ + Create one or more OSDs within a single Drive Group. + + The principal argument here is the drive_group member + of OsdSpec: other fields are advisory/extensible for any + finer-grained OSD feature enablement (choice of backing store, + compression/encryption, etc). + """ + raise NotImplementedError() + + def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: + """ Update OSD cluster """ + raise NotImplementedError() + + def set_unmanaged_flag(self, + unmanaged_flag: bool, + service_type: str = 'osd', + service_name: Optional[str] = None + ) -> HandleCommandResult: + raise NotImplementedError() + + def preview_osdspecs(self, + osdspec_name: Optional[str] = 'osd', + osdspecs: Optional[List[DriveGroupSpec]] = None + ) -> OrchResult[str]: + """ Get a preview for OSD deployments """ + raise NotImplementedError() + + def remove_osds(self, osd_ids: List[str], + replace: bool = False, + force: bool = False, + zap: bool = False) -> OrchResult[str]: + """ + :param osd_ids: list of OSD IDs + :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` + :param force: Forces the OSD removal process without waiting for the data to be drained first. + :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA) + + .. note:: this can only remove OSDs that were successfully + created (i.e. got an OSD ID). + """ + raise NotImplementedError() + + def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult: + """ + TODO + """ + raise NotImplementedError() + + def remove_osds_status(self) -> OrchResult: + """ + Returns a status of the ongoing OSD removal operations. + """ + raise NotImplementedError() + + def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]: + """ + Instructs the orchestrator to enable or disable either the ident or the fault LED. + + :param ident_fault: either ``"ident"`` or ``"fault"`` + :param on: ``True`` = on. + :param locations: See :class:`orchestrator.DeviceLightLoc` + """ + raise NotImplementedError() + + def zap_device(self, host: str, path: str) -> OrchResult[str]: + """Zap/Erase a device (DESTROYS DATA)""" + raise NotImplementedError() + + def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]: + """Create daemons daemon(s) for unmanaged services""" + raise NotImplementedError() + + def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]: + """Update mon cluster""" + raise NotImplementedError() + + def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]: + """Update mgr cluster""" + raise NotImplementedError() + + def apply_mds(self, spec: MDSSpec) -> OrchResult[str]: + """Update MDS cluster""" + raise NotImplementedError() + + def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: + """Update RGW cluster""" + raise NotImplementedError() + + def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]: + """Update ingress daemons""" + raise NotImplementedError() + + def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]: + """Update rbd-mirror cluster""" + raise NotImplementedError() + + def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]: + """Update NFS cluster""" + raise NotImplementedError() + + def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]: + """Update iscsi cluster""" + raise NotImplementedError() + + def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]: + """Update prometheus cluster""" + raise NotImplementedError() + + def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Node-Exporter daemon(s)""" + raise NotImplementedError() + + def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a crash daemon(s)""" + raise NotImplementedError() + + def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a grafana service""" + raise NotImplementedError() + + def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]: + """Update an existing AlertManager daemon(s)""" + raise NotImplementedError() + + def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]: + """Update an existing snmp gateway service""" + raise NotImplementedError() + + def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]: + """Update an existing cephadm exporter daemon""" + raise NotImplementedError() + + def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]: + raise NotImplementedError() + + def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]], + hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_pause(self) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_resume(self) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_stop(self) -> OrchResult[str]: + raise NotImplementedError() + + def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']: + """ + If an upgrade is currently underway, report on where + we are in the process, or if some error has occurred. + + :return: UpgradeStatusSpec instance + """ + raise NotImplementedError() + + @_hide_in_features + def upgrade_available(self) -> OrchResult: + """ + Report on what versions are available to upgrade to + + :return: List of strings + """ + raise NotImplementedError() + + +GenericSpec = Union[ServiceSpec, HostSpec] + + +def json_to_generic_spec(spec: dict) -> GenericSpec: + if 'service_type' in spec and spec['service_type'] == 'host': + return HostSpec.from_json(spec) + else: + return ServiceSpec.from_json(spec) + + +def daemon_type_to_service(dtype: str) -> str: + mapping = { + 'mon': 'mon', + 'mgr': 'mgr', + 'mds': 'mds', + 'rgw': 'rgw', + 'osd': 'osd', + 'haproxy': 'ingress', + 'keepalived': 'ingress', + 'iscsi': 'iscsi', + 'rbd-mirror': 'rbd-mirror', + 'cephfs-mirror': 'cephfs-mirror', + 'nfs': 'nfs', + 'grafana': 'grafana', + 'alertmanager': 'alertmanager', + 'prometheus': 'prometheus', + 'node-exporter': 'node-exporter', + 'crash': 'crash', + 'crashcollector': 'crash', # Specific Rook Daemon + 'container': 'container', + 'cephadm-exporter': 'cephadm-exporter', + 'snmp-gateway': 'snmp-gateway', + } + return mapping[dtype] + + +def service_to_daemon_types(stype: str) -> List[str]: + mapping = { + 'mon': ['mon'], + 'mgr': ['mgr'], + 'mds': ['mds'], + 'rgw': ['rgw'], + 'osd': ['osd'], + 'ingress': ['haproxy', 'keepalived'], + 'iscsi': ['iscsi'], + 'rbd-mirror': ['rbd-mirror'], + 'cephfs-mirror': ['cephfs-mirror'], + 'nfs': ['nfs'], + 'grafana': ['grafana'], + 'alertmanager': ['alertmanager'], + 'prometheus': ['prometheus'], + 'node-exporter': ['node-exporter'], + 'crash': ['crash'], + 'container': ['container'], + 'cephadm-exporter': ['cephadm-exporter'], + 'snmp-gateway': ['snmp-gateway'], + } + return mapping[stype] + + +KNOWN_DAEMON_TYPES: List[str] = list( + sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), [])) + + +class UpgradeStatusSpec(object): + # Orchestrator's report on what's going on with any ongoing upgrade + def __init__(self) -> None: + self.in_progress = False # Is an upgrade underway? + self.target_image: Optional[str] = None + self.services_complete: List[str] = [] # Which daemon types are fully updated? + self.which: str = '<unknown>' # for if user specified daemon types, services or hosts + self.progress: Optional[str] = None # How many of the daemons have we upgraded + self.message = "" # Freeform description + self.is_paused: bool = False # Is the upgrade paused? + + +def handle_type_error(method: FuncT) -> FuncT: + @wraps(method) + def inner(cls: Any, *args: Any, **kwargs: Any) -> Any: + try: + return method(cls, *args, **kwargs) + except TypeError as e: + error_msg = '{}: {}'.format(cls.__name__, e) + raise OrchestratorValidationError(error_msg) + return cast(FuncT, inner) + + +class DaemonDescriptionStatus(enum.IntEnum): + unknown = -2 + error = -1 + stopped = 0 + running = 1 + starting = 2 #: Daemon is deployed, but not yet running + + @staticmethod + def to_str(status: Optional['DaemonDescriptionStatus']) -> str: + if status is None: + status = DaemonDescriptionStatus.unknown + return { + DaemonDescriptionStatus.unknown: 'unknown', + DaemonDescriptionStatus.error: 'error', + DaemonDescriptionStatus.stopped: 'stopped', + DaemonDescriptionStatus.running: 'running', + DaemonDescriptionStatus.starting: 'starting', + }.get(status, '<unknown>') + + +class DaemonDescription(object): + """ + For responding to queries about the status of a particular daemon, + stateful or stateless. + + This is not about health or performance monitoring of daemons: it's + about letting the orchestrator tell Ceph whether and where a + daemon is scheduled in the cluster. When an orchestrator tells + Ceph "it's running on host123", that's not a promise that the process + is literally up this second, it's a description of where the orchestrator + has decided the daemon should run. + """ + + def __init__(self, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + hostname: Optional[str] = None, + container_id: Optional[str] = None, + container_image_id: Optional[str] = None, + container_image_name: Optional[str] = None, + container_image_digests: Optional[List[str]] = None, + version: Optional[str] = None, + status: Optional[DaemonDescriptionStatus] = None, + status_desc: Optional[str] = None, + last_refresh: Optional[datetime.datetime] = None, + created: Optional[datetime.datetime] = None, + started: Optional[datetime.datetime] = None, + last_configured: Optional[datetime.datetime] = None, + osdspec_affinity: Optional[str] = None, + last_deployed: Optional[datetime.datetime] = None, + events: Optional[List['OrchestratorEvent']] = None, + is_active: bool = False, + memory_usage: Optional[int] = None, + memory_request: Optional[int] = None, + memory_limit: Optional[int] = None, + cpu_percentage: Optional[str] = None, + service_name: Optional[str] = None, + ports: Optional[List[int]] = None, + ip: Optional[str] = None, + deployed_by: Optional[List[str]] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + extra_container_args: Optional[List[str]] = None, + ) -> None: + + #: Host is at the same granularity as InventoryHost + self.hostname: Optional[str] = hostname + + # Not everyone runs in containers, but enough people do to + # justify having the container_id (runtime id) and container_image + # (image name) + self.container_id = container_id # runtime id + self.container_image_id = container_image_id # image id locally + self.container_image_name = container_image_name # image friendly name + self.container_image_digests = container_image_digests # reg hashes + + #: The type of service (osd, mon, mgr, etc.) + self.daemon_type = daemon_type + + #: The orchestrator will have picked some names for daemons, + #: typically either based on hostnames or on pod names. + #: This is the <foo> in mds.<foo>, the ID that will appear + #: in the FSMap/ServiceMap. + self.daemon_id: Optional[str] = daemon_id + self.daemon_name = self.name() + + #: Some daemon types have a numeric rank assigned + self.rank: Optional[int] = rank + self.rank_generation: Optional[int] = rank_generation + + self._service_name: Optional[str] = service_name + + #: Service version that was deployed + self.version = version + + # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting + self._status = status + + #: Service status description when status == error. + self.status_desc = status_desc + + #: datetime when this info was last refreshed + self.last_refresh: Optional[datetime.datetime] = last_refresh + + self.created: Optional[datetime.datetime] = created + self.started: Optional[datetime.datetime] = started + self.last_configured: Optional[datetime.datetime] = last_configured + self.last_deployed: Optional[datetime.datetime] = last_deployed + + #: Affinity to a certain OSDSpec + self.osdspec_affinity: Optional[str] = osdspec_affinity + + self.events: List[OrchestratorEvent] = events or [] + + self.memory_usage: Optional[int] = memory_usage + self.memory_request: Optional[int] = memory_request + self.memory_limit: Optional[int] = memory_limit + + self.cpu_percentage: Optional[str] = cpu_percentage + + self.ports: Optional[List[int]] = ports + self.ip: Optional[str] = ip + + self.deployed_by = deployed_by + + self.is_active = is_active + + self.extra_container_args = extra_container_args + + @property + def status(self) -> Optional[DaemonDescriptionStatus]: + return self._status + + @status.setter + def status(self, new: DaemonDescriptionStatus) -> None: + self._status = new + self.status_desc = DaemonDescriptionStatus.to_str(new) + + def get_port_summary(self) -> str: + if not self.ports: + return '' + return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}" + + def name(self) -> str: + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def matches_service(self, service_name: Optional[str]) -> bool: + assert self.daemon_id is not None + assert self.daemon_type is not None + if service_name: + return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.') + return False + + def service_id(self) -> str: + assert self.daemon_id is not None + assert self.daemon_type is not None + + if self._service_name: + if '.' in self._service_name: + return self._service_name.split('.', 1)[1] + else: + return '' + + if self.daemon_type == 'osd': + if self.osdspec_affinity and self.osdspec_affinity != 'None': + return self.osdspec_affinity + return '' + + def _match() -> str: + assert self.daemon_id is not None + err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " + f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'") + + if not self.hostname: + # TODO: can a DaemonDescription exist without a hostname? + raise err + + # use the bare hostname, not the FQDN. + host = self.hostname.split('.')[0] + + if host == self.daemon_id: + # daemon_id == "host" + return self.daemon_id + + elif host in self.daemon_id: + # daemon_id == "service_id.host" + # daemon_id == "service_id.host.random" + pre, post = self.daemon_id.rsplit(host, 1) + if not pre.endswith('.'): + # '.' sep missing at front of host + raise err + elif post and not post.startswith('.'): + # '.' sep missing at end of host + raise err + return pre[:-1] + + # daemon_id == "service_id.random" + if self.daemon_type == 'rgw': + v = self.daemon_id.split('.') + if len(v) in [3, 4]: + return '.'.join(v[0:2]) + + if self.daemon_type == 'iscsi': + v = self.daemon_id.split('.') + return '.'.join(v[0:-1]) + + # daemon_id == "service_id" + return self.daemon_id + + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: + return _match() + + return self.daemon_id + + def service_name(self) -> str: + if self._service_name: + return self._service_name + assert self.daemon_type is not None + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: + return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}' + return daemon_type_to_service(self.daemon_type) + + def __repr__(self) -> str: + return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type, + id=self.daemon_id) + + def __str__(self) -> str: + return f"{self.name()} in status {self.status_desc} on {self.hostname}" + + def to_json(self) -> dict: + out: Dict[str, Any] = OrderedDict() + out['daemon_type'] = self.daemon_type + out['daemon_id'] = self.daemon_id + out['service_name'] = self._service_name + out['daemon_name'] = self.name() + out['hostname'] = self.hostname + out['container_id'] = self.container_id + out['container_image_id'] = self.container_image_id + out['container_image_name'] = self.container_image_name + out['container_image_digests'] = self.container_image_digests + out['memory_usage'] = self.memory_usage + out['memory_request'] = self.memory_request + out['memory_limit'] = self.memory_limit + out['cpu_percentage'] = self.cpu_percentage + out['version'] = self.version + out['status'] = self.status.value if self.status is not None else None + out['status_desc'] = self.status_desc + if self.daemon_type == 'osd': + out['osdspec_affinity'] = self.osdspec_affinity + out['is_active'] = self.is_active + out['ports'] = self.ports + out['ip'] = self.ip + out['rank'] = self.rank + out['rank_generation'] = self.rank_generation + + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + + if self.events: + out['events'] = [e.to_json() for e in self.events] + + empty = [k for k, v in out.items() if v is None] + for e in empty: + del out[e] + return out + + def to_dict(self) -> dict: + out: Dict[str, Any] = OrderedDict() + out['daemon_type'] = self.daemon_type + out['daemon_id'] = self.daemon_id + out['daemon_name'] = self.name() + out['hostname'] = self.hostname + out['container_id'] = self.container_id + out['container_image_id'] = self.container_image_id + out['container_image_name'] = self.container_image_name + out['container_image_digests'] = self.container_image_digests + out['memory_usage'] = self.memory_usage + out['memory_request'] = self.memory_request + out['memory_limit'] = self.memory_limit + out['cpu_percentage'] = self.cpu_percentage + out['version'] = self.version + out['status'] = self.status.value if self.status is not None else None + out['status_desc'] = self.status_desc + if self.daemon_type == 'osd': + out['osdspec_affinity'] = self.osdspec_affinity + out['is_active'] = self.is_active + out['ports'] = self.ports + out['ip'] = self.ip + + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + + if self.events: + out['events'] = [e.to_dict() for e in self.events] + + empty = [k for k, v in out.items() if v is None] + for e in empty: + del out[e] + return out + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'DaemonDescription': + c = data.copy() + event_strs = c.pop('events', []) + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if k in c: + c[k] = str_to_datetime(c[k]) + events = [OrchestratorEvent.from_json(e) for e in event_strs] + status_int = c.pop('status', None) + if 'daemon_name' in c: + del c['daemon_name'] + if 'service_name' in c and c['service_name'].startswith('osd.'): + # if the service_name is a osd.NNN (numeric osd id) then + # ignore it -- it is not a valid service_name and + # (presumably) came from an older version of cephadm. + try: + int(c['service_name'][4:]) + del c['service_name'] + except ValueError: + pass + status = DaemonDescriptionStatus(status_int) if status_int is not None else None + return cls(events=events, status=status, **c) + + def __copy__(self) -> 'DaemonDescription': + # feel free to change this: + return DaemonDescription.from_json(self.to_json()) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer) + + +class ServiceDescription(object): + """ + For responding to queries about the status of a particular service, + stateful or stateless. + + This is not about health or performance monitoring of services: it's + about letting the orchestrator tell Ceph whether and where a + service is scheduled in the cluster. When an orchestrator tells + Ceph "it's running on host123", that's not a promise that the process + is literally up this second, it's a description of where the orchestrator + has decided the service should run. + """ + + def __init__(self, + spec: ServiceSpec, + container_image_id: Optional[str] = None, + container_image_name: Optional[str] = None, + service_url: Optional[str] = None, + last_refresh: Optional[datetime.datetime] = None, + created: Optional[datetime.datetime] = None, + deleted: Optional[datetime.datetime] = None, + size: int = 0, + running: int = 0, + events: Optional[List['OrchestratorEvent']] = None, + virtual_ip: Optional[str] = None, + ports: List[int] = []) -> None: + # Not everyone runs in containers, but enough people do to + # justify having the container_image_id (image hash) and container_image + # (image name) + self.container_image_id = container_image_id # image hash + self.container_image_name = container_image_name # image friendly name + + # If the service exposes REST-like API, this attribute should hold + # the URL. + self.service_url = service_url + + # Number of daemons + self.size = size + + # Number of daemons up + self.running = running + + # datetime when this info was last refreshed + self.last_refresh: Optional[datetime.datetime] = last_refresh + self.created: Optional[datetime.datetime] = created + self.deleted: Optional[datetime.datetime] = deleted + + self.spec: ServiceSpec = spec + + self.events: List[OrchestratorEvent] = events or [] + + self.virtual_ip = virtual_ip + self.ports = ports + + def service_type(self) -> str: + return self.spec.service_type + + def __repr__(self) -> str: + return f"<ServiceDescription of {self.spec.one_line_str()}>" + + def get_port_summary(self) -> str: + if not self.ports: + return '' + return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}" + + def to_json(self) -> OrderedDict: + out = self.spec.to_json() + status = { + 'container_image_id': self.container_image_id, + 'container_image_name': self.container_image_name, + 'service_url': self.service_url, + 'size': self.size, + 'running': self.running, + 'last_refresh': self.last_refresh, + 'created': self.created, + 'virtual_ip': self.virtual_ip, + 'ports': self.ports if self.ports else None, + } + for k in ['last_refresh', 'created']: + if getattr(self, k): + status[k] = datetime_to_str(getattr(self, k)) + status = {k: v for (k, v) in status.items() if v is not None} + out['status'] = status + if self.events: + out['events'] = [e.to_json() for e in self.events] + return out + + def to_dict(self) -> OrderedDict: + out = self.spec.to_json() + status = { + 'container_image_id': self.container_image_id, + 'container_image_name': self.container_image_name, + 'service_url': self.service_url, + 'size': self.size, + 'running': self.running, + 'last_refresh': self.last_refresh, + 'created': self.created, + 'virtual_ip': self.virtual_ip, + 'ports': self.ports if self.ports else None, + } + for k in ['last_refresh', 'created']: + if getattr(self, k): + status[k] = datetime_to_str(getattr(self, k)) + status = {k: v for (k, v) in status.items() if v is not None} + out['status'] = status + if self.events: + out['events'] = [e.to_dict() for e in self.events] + return out + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'ServiceDescription': + c = data.copy() + status = c.pop('status', {}) + event_strs = c.pop('events', []) + spec = ServiceSpec.from_json(c) + + c_status = status.copy() + for k in ['last_refresh', 'created']: + if k in c_status: + c_status[k] = str_to_datetime(c_status[k]) + events = [OrchestratorEvent.from_json(e) for e in event_strs] + return cls(spec=spec, events=events, **c_status) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer) + + +class InventoryFilter(object): + """ + When fetching inventory, use this filter to avoid unnecessarily + scanning the whole estate. + + Typical use: + + filter by host when presentig UI workflow for configuring + a particular server. + filter by label when not all of estate is Ceph servers, + and we want to only learn about the Ceph servers. + filter by label when we are interested particularly + in e.g. OSD servers. + """ + + def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None: + + #: Optional: get info about hosts matching labels + self.labels = labels + + #: Optional: get info about certain named hosts only + self.hosts = hosts + + +class InventoryHost(object): + """ + When fetching inventory, all Devices are groups inside of an + InventoryHost. + """ + + def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None: + if devices is None: + devices = inventory.Devices([]) + if labels is None: + labels = [] + assert isinstance(devices, inventory.Devices) + + self.name = name # unique within cluster. For example a hostname. + self.addr = addr or name + self.devices = devices + self.labels = labels + + def to_json(self) -> dict: + return { + 'name': self.name, + 'addr': self.addr, + 'devices': self.devices.to_json(), + 'labels': self.labels, + } + + @classmethod + def from_json(cls, data: dict) -> 'InventoryHost': + try: + _data = copy.deepcopy(data) + name = _data.pop('name') + addr = _data.pop('addr', None) or name + devices = inventory.Devices.from_json(_data.pop('devices')) + labels = _data.pop('labels', list()) + if _data: + error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys())) + raise OrchestratorValidationError(error_msg) + return cls(name, devices, labels, addr) + except KeyError as e: + error_msg = '{} is required for {}'.format(e, cls.__name__) + raise OrchestratorValidationError(error_msg) + except TypeError as e: + raise OrchestratorValidationError('Failed to read inventory: {}'.format(e)) + + @classmethod + def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']: + devs = inventory.Devices.from_json + return [cls(item[0], devs(item[1].data)) for item in hosts] + + def __repr__(self) -> str: + return "<InventoryHost>({name})".format(name=self.name) + + @staticmethod + def get_host_names(hosts: List['InventoryHost']) -> List[str]: + return [host.name for host in hosts] + + def __eq__(self, other: Any) -> bool: + return self.name == other.name and self.devices == other.devices + + +class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])): + """ + Describes a specific device on a specific host. Used for enabling or disabling LEDs + on devices. + + hostname as in :func:`orchestrator.Orchestrator.get_hosts` + + device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``. + See ``ceph osd metadata | jq '.[].device_ids'`` + """ + __slots__ = () + + +class OrchestratorEvent: + """ + Similar to K8s Events. + + Some form of "important" log message attached to something. + """ + INFO = 'INFO' + ERROR = 'ERROR' + regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE) + + def __init__(self, created: Union[str, datetime.datetime], kind: str, + subject: str, level: str, message: str) -> None: + if isinstance(created, str): + created = str_to_datetime(created) + self.created: datetime.datetime = created + + assert kind in "service daemon".split() + self.kind: str = kind + + # service name, or daemon danem or something + self.subject: str = subject + + # Events are not meant for debugging. debugs should end in the log. + assert level in "INFO ERROR".split() + self.level = level + + self.message: str = message + + __slots__ = ('created', 'kind', 'subject', 'level', 'message') + + def kind_subject(self) -> str: + return f'{self.kind}:{self.subject}' + + def to_json(self) -> str: + # Make a long list of events readable. + created = datetime_to_str(self.created) + return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"' + + def to_dict(self) -> dict: + # Convert events data to dict. + return { + 'created': datetime_to_str(self.created), + 'subject': self.kind_subject(), + 'level': self.level, + 'message': self.message + } + + @classmethod + @handle_type_error + def from_json(cls, data: str) -> "OrchestratorEvent": + """ + >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json() + '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"' + + :param data: + :return: + """ + match = cls.regex_v1.match(data) + if match: + return cls(*match.groups()) + raise ValueError(f'Unable to match: "{data}"') + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, OrchestratorEvent): + return False + + return self.created == other.created and self.kind == other.kind \ + and self.subject == other.subject and self.message == other.message + + def __repr__(self) -> str: + return f'OrchestratorEvent.from_json({self.to_json()!r})' + + +def _mk_orch_methods(cls: Any) -> Any: + # Needs to be defined outside of for. + # Otherwise meth is always bound to last key + def shim(method_name: str) -> Callable: + def inner(self: Any, *args: Any, **kwargs: Any) -> Any: + completion = self._oremote(method_name, args, kwargs) + return completion + return inner + + for name, method in Orchestrator.__dict__.items(): + if not name.startswith('_') and name not in ['is_orchestrator_module']: + remote_call = update_wrapper(shim(name), method) + setattr(cls, name, remote_call) + return cls + + +@_mk_orch_methods +class OrchestratorClientMixin(Orchestrator): + """ + A module that inherents from `OrchestratorClientMixin` can directly call + all :class:`Orchestrator` methods without manually calling remote. + + Every interface method from ``Orchestrator`` is converted into a stub method that internally + calls :func:`OrchestratorClientMixin._oremote` + + >>> class MyModule(OrchestratorClientMixin): + ... def func(self): + ... completion = self.add_host('somehost') # calls `_oremote()` + ... self.log.debug(completion.result) + + .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`. + Reason is, that OrchestratorClientMixin magically redirects all methods to the + "real" implementation of the orchestrator. + + + >>> import mgr_module + >>> #doctest: +SKIP + ... class MyImplentation(mgr_module.MgrModule, Orchestrator): + ... def __init__(self, ...): + ... self.orch_client = OrchestratorClientMixin() + ... self.orch_client.set_mgr(self.mgr)) + """ + + def set_mgr(self, mgr: MgrModule) -> None: + """ + Useable in the Dashbord that uses a global ``mgr`` + """ + + self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties + + def __get_mgr(self) -> Any: + try: + return self.__mgr + except AttributeError: + return self + + def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any: + """ + Helper for invoking `remote` on whichever orchestrator is enabled + + :raises RuntimeError: If the remote method failed. + :raises OrchestratorError: orchestrator failed to perform + :raises ImportError: no `orchestrator` module or backend not found. + """ + mgr = self.__get_mgr() + + try: + o = mgr._select_orchestrator() + except AttributeError: + o = mgr.remote('orchestrator', '_select_orchestrator') + + if o is None: + raise NoOrchestrator() + + mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs)) + try: + return mgr.remote(o, meth, *args, **kwargs) + except Exception as e: + if meth == 'get_feature_set': + raise # self.get_feature_set() calls self._oremote() + f_set = self.get_feature_set() + if meth not in f_set or not f_set[meth]['available']: + raise NotImplementedError(f'{o} does not implement {meth}') from e + raise diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py new file mode 100644 index 000000000..2288cfeef --- /dev/null +++ b/src/pybind/mgr/orchestrator/module.py @@ -0,0 +1,1471 @@ +import enum +import errno +import json +from typing import List, Set, Optional, Iterator, cast, Dict, Any, Union, Sequence +import re +import datetime + +import yaml +from prettytable import PrettyTable + +from ceph.deployment.inventory import Device # noqa: F401; pylint: disable=unused-variable +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, OSDMethod +from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, service_spec_allow_invalid_from_json +from ceph.deployment.hostspec import SpecValidationError +from ceph.utils import datetime_now + +from mgr_util import to_pretty_timedelta, format_dimless, format_bytes +from mgr_module import MgrModule, HandleCommandResult, Option + +from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \ + raise_if_exception, _cli_write_command, OrchestratorError, \ + NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \ + RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \ + ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, \ + GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec + + +def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str: + if t: + return to_pretty_timedelta(now - t) + suffix + else: + return '-' + + +def nice_bytes(v: Optional[int]) -> str: + if not v: + return '-' + return format_bytes(v, 5) + + +class Format(enum.Enum): + plain = 'plain' + json = 'json' + json_pretty = 'json-pretty' + yaml = 'yaml' + xml_pretty = 'xml-pretty' + xml = 'xml' + + +class ServiceType(enum.Enum): + mon = 'mon' + mgr = 'mgr' + rbd_mirror = 'rbd-mirror' + cephfs_mirror = 'cephfs-mirror' + crash = 'crash' + alertmanager = 'alertmanager' + grafana = 'grafana' + node_exporter = 'node-exporter' + prometheus = 'prometheus' + mds = 'mds' + rgw = 'rgw' + nfs = 'nfs' + iscsi = 'iscsi' + cephadm_exporter = 'cephadm-exporter' + snmp_gateway = 'snmp-gateway' + + +class ServiceAction(enum.Enum): + start = 'start' + stop = 'stop' + restart = 'restart' + redeploy = 'redeploy' + reconfig = 'reconfig' + + +class DaemonAction(enum.Enum): + start = 'start' + stop = 'stop' + restart = 'restart' + reconfig = 'reconfig' + + +def to_format(what: Any, format: Format, many: bool, cls: Any) -> Any: + def to_json_1(obj: Any) -> Any: + if hasattr(obj, 'to_json'): + return obj.to_json() + return obj + + def to_json_n(objs: List) -> List: + return [to_json_1(o) for o in objs] + + to_json = to_json_n if many else to_json_1 + + if format == Format.json: + return json.dumps(to_json(what), sort_keys=True) + elif format == Format.json_pretty: + return json.dumps(to_json(what), indent=2, sort_keys=True) + elif format == Format.yaml: + # fun with subinterpreters again. pyyaml depends on object identity. + # as what originates from a different subinterpreter we have to copy things here. + if cls: + flat = to_json(what) + copy = [cls.from_json(o) for o in flat] if many else cls.from_json(flat) + else: + copy = what + + def to_yaml_1(obj: Any) -> Any: + if hasattr(obj, 'yaml_representer'): + return obj + return to_json_1(obj) + + def to_yaml_n(objs: list) -> list: + return [to_yaml_1(o) for o in objs] + + to_yaml = to_yaml_n if many else to_yaml_1 + + if many: + return yaml.dump_all(to_yaml(copy), default_flow_style=False) + return yaml.dump(to_yaml(copy), default_flow_style=False) + elif format == Format.xml or format == Format.xml_pretty: + raise OrchestratorError(f"format '{format.name}' is not implemented.") + else: + raise OrchestratorError(f'unsupported format type: {format}') + + +def generate_preview_tables(data: Any, osd_only: bool = False) -> str: + error = [x.get('error') for x in data if x.get('error')] + if error: + return json.dumps(error) + warning = [x.get('warning') for x in data if x.get('warning')] + osd_table = preview_table_osd(data) + service_table = preview_table_services(data) + + if osd_only: + tables = f""" +{''.join(warning)} + +################ +OSDSPEC PREVIEWS +################ +{osd_table} +""" + return tables + else: + tables = f""" +{''.join(warning)} + +#################### +SERVICESPEC PREVIEWS +#################### +{service_table} + +################ +OSDSPEC PREVIEWS +################ +{osd_table} +""" + return tables + + +def preview_table_osd(data: List) -> str: + table = PrettyTable(header_style='upper', title='OSDSPEC PREVIEWS', border=True) + table.field_names = "service name host data db wal".split() + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + notes = '' + for osd_data in data: + if osd_data.get('service_type') != 'osd': + continue + for host, specs in osd_data.get('data').items(): + for spec in specs: + if spec.get('error'): + return spec.get('message') + dg_name = spec.get('osdspec') + if spec.get('notes', []): + notes += '\n'.join(spec.get('notes')) + '\n' + for osd in spec.get('data', []): + db_path = osd.get('block_db', '-') + wal_path = osd.get('block_wal', '-') + block_data = osd.get('data', '') + if not block_data: + continue + table.add_row(('osd', dg_name, host, block_data, db_path, wal_path)) + return notes + table.get_string() + + +def preview_table_services(data: List) -> str: + table = PrettyTable(header_style='upper', title="SERVICESPEC PREVIEW", border=True) + table.field_names = 'SERVICE NAME ADD_TO REMOVE_FROM'.split() + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for item in data: + if item.get('warning'): + continue + if item.get('service_type') != 'osd': + table.add_row((item.get('service_type'), item.get('service_name'), + " ".join(item.get('add')), " ".join(item.get('remove')))) + return table.get_string() + + +class OrchestratorCli(OrchestratorClientMixin, MgrModule, + metaclass=CLICommandMeta): + MODULE_OPTIONS = [ + Option( + 'orchestrator', + type='str', + default=None, + desc='Orchestrator backend', + enum_allowed=['cephadm', 'rook', 'test_orchestrator'], + runtime=True, + ) + ] + NATIVE_OPTIONS = [] # type: List[dict] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(OrchestratorCli, self).__init__(*args, **kwargs) + self.ident: Set[str] = set() + self.fault: Set[str] = set() + self._load() + self._refresh_health() + + def _load(self) -> None: + active = self.get_store('active_devices') + if active: + decoded = json.loads(active) + self.ident = set(decoded.get('ident', [])) + self.fault = set(decoded.get('fault', [])) + self.log.debug('ident {}, fault {}'.format(self.ident, self.fault)) + + def _save(self) -> None: + encoded = json.dumps({ + 'ident': list(self.ident), + 'fault': list(self.fault), + }) + self.set_store('active_devices', encoded) + + def _refresh_health(self) -> None: + h = {} + if self.ident: + h['DEVICE_IDENT_ON'] = { + 'severity': 'warning', + 'summary': '%d devices have ident light turned on' % len( + self.ident), + 'detail': ['{} ident light enabled'.format(d) for d in self.ident] + } + if self.fault: + h['DEVICE_FAULT_ON'] = { + 'severity': 'warning', + 'summary': '%d devices have fault light turned on' % len( + self.fault), + 'detail': ['{} fault light enabled'.format(d) for d in self.ident] + } + self.set_health_checks(h) + + def _get_device_locations(self, dev_id): + # type: (str) -> List[DeviceLightLoc] + locs = [d['location'] for d in self.get('devices')['devices'] if d['devid'] == dev_id] + return [DeviceLightLoc(**loc) for loc in sum(locs, [])] + + @_cli_read_command(prefix='device ls-lights') + def _device_ls(self) -> HandleCommandResult: + """List currently active device indicator lights""" + return HandleCommandResult( + stdout=json.dumps({ + 'ident': list(self.ident), + 'fault': list(self.fault) + }, indent=4, sort_keys=True)) + + def light_on(self, fault_ident, devid): + # type: (str, str) -> HandleCommandResult + assert fault_ident in ("fault", "ident") + locs = self._get_device_locations(devid) + if locs is None: + return HandleCommandResult(stderr='device {} not found'.format(devid), + retval=-errno.ENOENT) + + getattr(self, fault_ident).add(devid) + self._save() + self._refresh_health() + completion = self.blink_device_light(fault_ident, True, locs) + return HandleCommandResult(stdout=str(completion.result)) + + def light_off(self, fault_ident, devid, force): + # type: (str, str, bool) -> HandleCommandResult + assert fault_ident in ("fault", "ident") + locs = self._get_device_locations(devid) + if locs is None: + return HandleCommandResult(stderr='device {} not found'.format(devid), + retval=-errno.ENOENT) + + try: + completion = self.blink_device_light(fault_ident, False, locs) + + if devid in getattr(self, fault_ident): + getattr(self, fault_ident).remove(devid) + self._save() + self._refresh_health() + return HandleCommandResult(stdout=str(completion.result)) + + except Exception: + # There are several reasons the try: block might fail: + # 1. the device no longer exist + # 2. the device is no longer known to Ceph + # 3. the host is not reachable + if force and devid in getattr(self, fault_ident): + getattr(self, fault_ident).remove(devid) + self._save() + self._refresh_health() + raise + + class DeviceLightEnable(enum.Enum): + on = 'on' + off = 'off' + + class DeviceLightType(enum.Enum): + ident = 'ident' + fault = 'fault' + + @_cli_write_command(prefix='device light') + def _device_light(self, + enable: DeviceLightEnable, + devid: str, + light_type: DeviceLightType = DeviceLightType.ident, + force: bool = False) -> HandleCommandResult: + """ + Enable or disable the device light. Default type is `ident` + 'Usage: device light (on|off) <devid> [ident|fault] [--force]' + """"" + if enable == self.DeviceLightEnable.on: + return self.light_on(light_type.value, devid) + else: + return self.light_off(light_type.value, devid, force) + + def _select_orchestrator(self) -> str: + return cast(str, self.get_module_option("orchestrator")) + + @_cli_write_command('orch host add') + def _add_host(self, + hostname: str, + addr: Optional[str] = None, + labels: Optional[List[str]] = None, + maintenance: Optional[bool] = False) -> HandleCommandResult: + """Add a host""" + _status = 'maintenance' if maintenance else '' + + # split multiple labels passed in with --labels=label1,label2 + if labels and len(labels) == 1: + labels = labels[0].split(',') + + s = HostSpec(hostname=hostname, addr=addr, labels=labels, status=_status) + + return self._apply_misc([s], False, Format.plain) + + @_cli_write_command('orch host rm') + def _remove_host(self, hostname: str, force: bool = False, offline: bool = False) -> HandleCommandResult: + """Remove a host""" + completion = self.remove_host(hostname, force, offline) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch host drain') + def _drain_host(self, hostname: str, force: bool = False) -> HandleCommandResult: + """drain all daemons from a host""" + completion = self.drain_host(hostname, force) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch host set-addr') + def _update_set_addr(self, hostname: str, addr: str) -> HandleCommandResult: + """Update a host address""" + completion = self.update_host_addr(hostname, addr) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_read_command('orch host ls') + def _get_hosts(self, format: Format = Format.plain, host_pattern: str = '', label: str = '', host_status: str = '') -> HandleCommandResult: + """List hosts""" + completion = self.get_hosts() + hosts = raise_if_exception(completion) + + filter_spec = PlacementSpec( + host_pattern=host_pattern, + label=label + ) + filtered_hosts: List[str] = filter_spec.filter_matching_hostspecs(hosts) + hosts = [h for h in hosts if h.hostname in filtered_hosts] + + if host_status: + hosts = [h for h in hosts if h.status.lower() == host_status] + + if format != Format.plain: + output = to_format(hosts, format, many=True, cls=HostSpec) + else: + table = PrettyTable( + ['HOST', 'ADDR', 'LABELS', 'STATUS'], + border=False) + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for host in sorted(hosts, key=lambda h: h.hostname): + table.add_row((host.hostname, host.addr, ' '.join( + host.labels), host.status.capitalize())) + output = table.get_string() + if format == Format.plain: + output += f'\n{len(hosts)} hosts in cluster' + if label: + output += f' who had label {label}' + if host_pattern: + output += f' whose hostname matched {host_pattern}' + if host_status: + output += f' with status {host_status}' + return HandleCommandResult(stdout=output) + + @_cli_write_command('orch host label add') + def _host_label_add(self, hostname: str, label: str) -> HandleCommandResult: + """Add a host label""" + completion = self.add_host_label(hostname, label) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch host label rm') + def _host_label_rm(self, hostname: str, label: str, force: bool = False) -> HandleCommandResult: + """Remove a host label""" + completion = self.remove_host_label(hostname, label, force) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch host ok-to-stop') + def _host_ok_to_stop(self, hostname: str) -> HandleCommandResult: + """Check if the specified host can be safely stopped without reducing availability""""" + completion = self.host_ok_to_stop(hostname) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( + 'orch host maintenance enter') + def _host_maintenance_enter(self, hostname: str, force: bool = False) -> HandleCommandResult: + """ + Prepare a host for maintenance by shutting down and disabling all Ceph daemons (cephadm only) + """ + completion = self.enter_host_maintenance(hostname, force=force) + raise_if_exception(completion) + + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( + 'orch host maintenance exit') + def _host_maintenance_exit(self, hostname: str) -> HandleCommandResult: + """ + Return a host from maintenance, restarting all Ceph daemons (cephadm only) + """ + completion = self.exit_host_maintenance(hostname) + raise_if_exception(completion) + + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch host rescan') + def _host_rescan(self, hostname: str, with_summary: bool = False) -> HandleCommandResult: + """Perform a disk rescan on a host""" + completion = self.rescan_host(hostname) + raise_if_exception(completion) + + if with_summary: + return HandleCommandResult(stdout=completion.result_str()) + return HandleCommandResult(stdout=completion.result_str().split('.')[0]) + + @_cli_read_command('orch device ls') + def _list_devices(self, + hostname: Optional[List[str]] = None, + format: Format = Format.plain, + refresh: bool = False, + wide: bool = False) -> HandleCommandResult: + """ + List devices on a host + """ + # Provide information about storage devices present in cluster hosts + # + # Note: this does not have to be completely synchronous. Slightly out of + # date hardware inventory is fine as long as hardware ultimately appears + # in the output of this command. + nf = InventoryFilter(hosts=hostname) if hostname else None + + completion = self.get_inventory(host_filter=nf, refresh=refresh) + + inv_hosts = raise_if_exception(completion) + + if format != Format.plain: + return HandleCommandResult(stdout=to_format(inv_hosts, + format, + many=True, + cls=InventoryHost)) + else: + display_map = { + "Unsupported": "N/A", + "N/A": "N/A", + "On": "On", + "Off": "Off", + True: "Yes", + False: "", + } + + out = [] + if wide: + table = PrettyTable( + ['HOST', 'PATH', 'TYPE', 'TRANSPORT', 'RPM', 'DEVICE ID', 'SIZE', + 'HEALTH', 'IDENT', 'FAULT', + 'AVAILABLE', 'REFRESHED', 'REJECT REASONS'], + border=False) + else: + table = PrettyTable( + ['HOST', 'PATH', 'TYPE', 'DEVICE ID', 'SIZE', + 'AVAILABLE', 'REFRESHED', 'REJECT REASONS'], + border=False) + table.align = 'l' + table._align['SIZE'] = 'r' + table.left_padding_width = 0 + table.right_padding_width = 2 + now = datetime_now() + for host_ in sorted(inv_hosts, key=lambda h: h.name): # type: InventoryHost + for d in sorted(host_.devices.devices, key=lambda d: d.path): # type: Device + + led_ident = 'N/A' + led_fail = 'N/A' + if d.lsm_data.get('ledSupport', None): + led_ident = d.lsm_data['ledSupport']['IDENTstatus'] + led_fail = d.lsm_data['ledSupport']['FAILstatus'] + + if wide: + table.add_row( + ( + host_.name, + d.path, + d.human_readable_type, + d.lsm_data.get('transport', ''), + d.lsm_data.get('rpm', ''), + d.device_id, + format_dimless(d.sys_api.get('size', 0), 5), + d.lsm_data.get('health', ''), + display_map[led_ident], + display_map[led_fail], + display_map[d.available], + nice_delta(now, d.created, ' ago'), + ', '.join(d.rejected_reasons) + ) + ) + else: + table.add_row( + ( + host_.name, + d.path, + d.human_readable_type, + d.device_id, + format_dimless(d.sys_api.get('size', 0), 5), + display_map[d.available], + nice_delta(now, d.created, ' ago'), + ', '.join(d.rejected_reasons) + ) + ) + out.append(table.get_string()) + return HandleCommandResult(stdout='\n'.join(out)) + + @_cli_write_command('orch device zap') + def _zap_device(self, hostname: str, path: str, force: bool = False) -> HandleCommandResult: + """ + Zap (erase!) a device so it can be re-used + """ + if not force: + raise OrchestratorError('must pass --force to PERMANENTLY ERASE DEVICE DATA') + completion = self.zap_device(hostname, path) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_read_command('orch ls') + def _list_services(self, + service_type: Optional[str] = None, + service_name: Optional[str] = None, + export: bool = False, + format: Format = Format.plain, + refresh: bool = False) -> HandleCommandResult: + """ + List services known to orchestrator + """ + if export and format == Format.plain: + format = Format.yaml + + completion = self.describe_service(service_type, + service_name, + refresh=refresh) + + services = raise_if_exception(completion) + + def ukn(s: Optional[str]) -> str: + return '<unknown>' if s is None else s + + # Sort the list for display + services.sort(key=lambda s: (ukn(s.spec.service_name()))) + + if len(services) == 0: + return HandleCommandResult(stdout="No services reported") + elif format != Format.plain: + with service_spec_allow_invalid_from_json(): + if export: + data = [s.spec for s in services if s.deleted is None] + return HandleCommandResult(stdout=to_format(data, format, many=True, cls=ServiceSpec)) + else: + return HandleCommandResult(stdout=to_format(services, format, many=True, cls=ServiceDescription)) + else: + now = datetime_now() + table = PrettyTable( + [ + 'NAME', 'PORTS', + 'RUNNING', 'REFRESHED', 'AGE', + 'PLACEMENT', + ], + border=False) + table.align['NAME'] = 'l' + table.align['PORTS'] = 'l' + table.align['RUNNING'] = 'r' + table.align['REFRESHED'] = 'l' + table.align['AGE'] = 'l' + table.align['PLACEMENT'] = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for s in services: + if not s.spec: + pl = '<no spec>' + elif s.spec.unmanaged: + pl = '<unmanaged>' + else: + pl = s.spec.placement.pretty_str() + if s.deleted: + refreshed = '<deleting>' + else: + refreshed = nice_delta(now, s.last_refresh, ' ago') + + if s.spec.service_type == 'osd': + running = str(s.running) + else: + running = '{}/{}'.format(s.running, s.size) + + table.add_row(( + s.spec.service_name(), + s.get_port_summary(), + running, + refreshed, + nice_delta(now, s.created), + pl, + )) + + return HandleCommandResult(stdout=table.get_string()) + + @_cli_read_command('orch ps') + def _list_daemons(self, + hostname: Optional[str] = None, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + format: Format = Format.plain, + refresh: bool = False) -> HandleCommandResult: + """ + List daemons known to orchestrator + """ + completion = self.list_daemons(service_name, + daemon_type, + daemon_id=daemon_id, + host=hostname, + refresh=refresh) + + daemons = raise_if_exception(completion) + + def ukn(s: Optional[str]) -> str: + return '<unknown>' if s is None else s + # Sort the list for display + daemons.sort(key=lambda s: (ukn(s.daemon_type), ukn(s.hostname), ukn(s.daemon_id))) + + if format != Format.plain: + return HandleCommandResult(stdout=to_format(daemons, format, many=True, cls=DaemonDescription)) + else: + if len(daemons) == 0: + return HandleCommandResult(stdout="No daemons reported") + + now = datetime_now() + table = PrettyTable( + ['NAME', 'HOST', 'PORTS', + 'STATUS', 'REFRESHED', 'AGE', + 'MEM USE', 'MEM LIM', + 'VERSION', 'IMAGE ID', 'CONTAINER ID'], + border=False) + table.align = 'l' + table._align['REFRESHED'] = 'r' + table._align['AGE'] = 'r' + table._align['MEM USE'] = 'r' + table._align['MEM LIM'] = 'r' + table.left_padding_width = 0 + table.right_padding_width = 2 + for s in sorted(daemons, key=lambda s: s.name()): + if s.status_desc: + status = s.status_desc + else: + status = DaemonDescriptionStatus.to_str(s.status) + if s.status == DaemonDescriptionStatus.running and s.started: # See DDS.starting + status += ' (%s)' % to_pretty_timedelta(now - s.started) + + table.add_row(( + s.name(), + ukn(s.hostname), + s.get_port_summary(), + status, + nice_delta(now, s.last_refresh, ' ago'), + nice_delta(now, s.created), + nice_bytes(s.memory_usage), + nice_bytes(s.memory_request), + ukn(s.version), + ukn(s.container_image_id)[0:12], + ukn(s.container_id))) + + remove_column = 'CONTAINER ID' + if table.get_string(fields=[remove_column], border=False, + header=False).count('<unknown>') == len(daemons): + try: + table.del_column(remove_column) + except AttributeError as e: + # del_column method was introduced in prettytable 2.0 + if str(e) != "del_column": + raise + table.field_names.remove(remove_column) + table._rows = [row[:-1] for row in table._rows] + + return HandleCommandResult(stdout=table.get_string()) + + @_cli_write_command('orch apply osd') + def _apply_osd(self, + all_available_devices: bool = False, + format: Format = Format.plain, + unmanaged: Optional[bool] = None, + dry_run: bool = False, + no_overwrite: bool = False, + inbuf: Optional[str] = None # deprecated. Was deprecated before Quincy + ) -> HandleCommandResult: + """ + Create OSD daemon(s) on all available devices + """ + + if inbuf and all_available_devices: + return HandleCommandResult(-errno.EINVAL, '-i infile and --all-available-devices are mutually exclusive') + + if not inbuf and not all_available_devices: + # one parameter must be present + return HandleCommandResult(-errno.EINVAL, '--all-available-devices is required') + + if inbuf: + if unmanaged is not None: + return HandleCommandResult(-errno.EINVAL, stderr='-i infile and --unmanaged are mutually exclusive') + + try: + drivegroups = [_dg for _dg in yaml.safe_load_all(inbuf)] + except yaml.scanner.ScannerError as e: + msg = f"Invalid YAML received : {str(e)}" + self.log.exception(e) + return HandleCommandResult(-errno.EINVAL, stderr=msg) + + dg_specs = [] + for dg in drivegroups: + spec = DriveGroupSpec.from_json(dg) + if dry_run: + spec.preview_only = True + dg_specs.append(spec) + + return self._apply_misc(dg_specs, dry_run, format, no_overwrite) + + if all_available_devices: + if unmanaged is None: + unmanaged = False + dg_specs = [ + DriveGroupSpec( + service_id='all-available-devices', + placement=PlacementSpec(host_pattern='*'), + data_devices=DeviceSelection(all=True), + unmanaged=unmanaged, + preview_only=dry_run + ) + ] + return self._apply_misc(dg_specs, dry_run, format, no_overwrite) + + return HandleCommandResult(-errno.EINVAL, stderr='--all-available-devices is required') + + @_cli_write_command('orch daemon add osd') + def _daemon_add_osd(self, + svc_arg: Optional[str] = None, + method: Optional[OSDMethod] = None) -> HandleCommandResult: + """Create OSD daemon(s) on specified host and device(s) (e.g., ceph orch daemon add osd myhost:/dev/sdb)""" + # Create one or more OSDs""" + + usage = """ +Usage: + ceph orch daemon add osd host:device1,device2,... + ceph orch daemon add osd host:data_devices=device1,device2,db_devices=device3,osds_per_device=2,... +""" + if not svc_arg: + return HandleCommandResult(-errno.EINVAL, stderr=usage) + try: + host_name, raw = svc_arg.split(":") + drive_group_spec = { + 'data_devices': [] + } # type: Dict + drv_grp_spec_arg = None + values = raw.split(',') + while values: + v = values[0].split(',', 1)[0] + if '=' in v: + drv_grp_spec_arg, value = v.split('=') + if drv_grp_spec_arg in ['data_devices', + 'db_devices', + 'wal_devices', + 'journal_devices']: + drive_group_spec[drv_grp_spec_arg] = [] + drive_group_spec[drv_grp_spec_arg].append(value) + else: + drive_group_spec[drv_grp_spec_arg] = value + elif drv_grp_spec_arg is not None: + drive_group_spec[drv_grp_spec_arg].append(v) + else: + drive_group_spec['data_devices'].append(v) + values.remove(v) + + for dev_type in ['data_devices', 'db_devices', 'wal_devices', 'journal_devices']: + drive_group_spec[dev_type] = DeviceSelection(paths=drive_group_spec[dev_type]) if drive_group_spec.get(dev_type) else None + + drive_group = DriveGroupSpec( + placement=PlacementSpec(host_pattern=host_name), + method=method, + **drive_group_spec, + ) + except (TypeError, KeyError, ValueError) as e: + msg = f"Invalid host:device spec: '{svc_arg}': {e}" + usage + return HandleCommandResult(-errno.EINVAL, stderr=msg) + + completion = self.create_osds(drive_group) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch osd rm') + def _osd_rm_start(self, + osd_id: List[str], + replace: bool = False, + force: bool = False, + zap: bool = False) -> HandleCommandResult: + """Remove OSD daemons""" + completion = self.remove_osds(osd_id, replace=replace, force=force, zap=zap) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch osd rm stop') + def _osd_rm_stop(self, osd_id: List[str]) -> HandleCommandResult: + """Cancel ongoing OSD removal operation""" + completion = self.stop_remove_osds(osd_id) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch osd rm status') + def _osd_rm_status(self, format: Format = Format.plain) -> HandleCommandResult: + """Status of OSD removal operation""" + completion = self.remove_osds_status() + raise_if_exception(completion) + report = completion.result + + if not report: + return HandleCommandResult(stdout="No OSD remove/replace operations reported") + + if format != Format.plain: + out = to_format(report, format, many=True, cls=None) + else: + table = PrettyTable( + ['OSD', 'HOST', 'STATE', 'PGS', 'REPLACE', 'FORCE', 'ZAP', + 'DRAIN STARTED AT'], + border=False) + table.align = 'l' + table._align['PGS'] = 'r' + table.left_padding_width = 0 + table.right_padding_width = 2 + for osd in sorted(report, key=lambda o: o.osd_id): + table.add_row([osd.osd_id, osd.hostname, osd.drain_status_human(), + osd.get_pg_count(), osd.replace, osd.force, osd.zap, + osd.drain_started_at or '']) + out = table.get_string() + + return HandleCommandResult(stdout=out) + + @_cli_write_command('orch daemon add') + def daemon_add_misc(self, + daemon_type: Optional[ServiceType] = None, + placement: Optional[str] = None, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Add daemon(s)""" + usage = f"""Usage: + ceph orch daemon add -i <json_file> + ceph orch daemon add {daemon_type or '<daemon_type>'} <placement>""" + if inbuf: + if daemon_type or placement: + raise OrchestratorValidationError(usage) + spec = ServiceSpec.from_json(yaml.safe_load(inbuf)) + else: + if not placement or not daemon_type: + raise OrchestratorValidationError(usage) + placement_spec = PlacementSpec.from_string(placement) + spec = ServiceSpec(daemon_type.value, placement=placement_spec) + + return self._daemon_add_misc(spec) + + def _daemon_add_misc(self, spec: ServiceSpec) -> HandleCommandResult: + completion = self.add_daemon(spec) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch daemon add mds') + def _mds_add(self, + fs_name: str, + placement: Optional[str] = None, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Start MDS daemon(s)""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = ServiceSpec( + service_type='mds', + service_id=fs_name, + placement=PlacementSpec.from_string(placement), + ) + return self._daemon_add_misc(spec) + + @_cli_write_command('orch daemon add rgw') + def _rgw_add(self, + svc_id: str, + placement: Optional[str] = None, + port: Optional[int] = None, + ssl: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Start RGW daemon(s)""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = RGWSpec( + service_id=svc_id, + rgw_frontend_port=port, + ssl=ssl, + placement=PlacementSpec.from_string(placement), + ) + return self._daemon_add_misc(spec) + + @_cli_write_command('orch daemon add nfs') + def _nfs_add(self, + svc_id: str, + placement: Optional[str] = None, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Start NFS daemon(s)""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = NFSServiceSpec( + service_id=svc_id, + placement=PlacementSpec.from_string(placement), + ) + return self._daemon_add_misc(spec) + + @_cli_write_command('orch daemon add iscsi') + def _iscsi_add(self, + pool: str, + api_user: str, + api_password: str, + trusted_ip_list: Optional[str] = None, + placement: Optional[str] = None, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Start iscsi daemon(s)""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = IscsiServiceSpec( + service_id='iscsi', + pool=pool, + api_user=api_user, + api_password=api_password, + trusted_ip_list=trusted_ip_list, + placement=PlacementSpec.from_string(placement), + ) + return self._daemon_add_misc(spec) + + @_cli_write_command('orch') + def _service_action(self, action: ServiceAction, service_name: str) -> HandleCommandResult: + """Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)""" + completion = self.service_action(action.value, service_name) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch daemon') + def _daemon_action(self, action: DaemonAction, name: str) -> HandleCommandResult: + """Start, stop, restart, (redeploy,) or reconfig a specific daemon""" + if '.' not in name: + raise OrchestratorError('%s is not a valid daemon name' % name) + completion = self.daemon_action(action.value, name) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch daemon redeploy') + def _daemon_action_redeploy(self, + name: str, + image: Optional[str] = None) -> HandleCommandResult: + """Redeploy a daemon (with a specifc image)""" + if '.' not in name: + raise OrchestratorError('%s is not a valid daemon name' % name) + completion = self.daemon_action("redeploy", name, image=image) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch daemon rm') + def _daemon_rm(self, + names: List[str], + force: Optional[bool] = False) -> HandleCommandResult: + """Remove specific daemon(s)""" + for name in names: + if '.' not in name: + raise OrchestratorError('%s is not a valid daemon name' % name) + (daemon_type) = name.split('.')[0] + if not force and daemon_type in ['osd', 'mon', 'prometheus']: + raise OrchestratorError( + 'must pass --force to REMOVE daemon with potentially PRECIOUS DATA for %s' % name) + completion = self.remove_daemons(names) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch rm') + def _service_rm(self, + service_name: str, + force: bool = False) -> HandleCommandResult: + """Remove a service""" + if service_name in ['mon', 'mgr'] and not force: + raise OrchestratorError('The mon and mgr services cannot be removed') + completion = self.remove_service(service_name, force=force) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch apply') + def apply_misc(self, + service_type: Optional[ServiceType] = None, + placement: Optional[str] = None, + dry_run: bool = False, + format: Format = Format.plain, + unmanaged: bool = False, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Update the size or placement for a service or apply a large yaml spec""" + usage = """Usage: + ceph orch apply -i <yaml spec> [--dry-run] + ceph orch apply <service_type> [--placement=<placement_string>] [--unmanaged] + """ + if inbuf: + if service_type or placement or unmanaged: + raise OrchestratorValidationError(usage) + yaml_objs: Iterator = yaml.safe_load_all(inbuf) + specs: List[Union[ServiceSpec, HostSpec]] = [] + # YAML '---' document separator with no content generates + # None entries in the output. Let's skip them silently. + content = [o for o in yaml_objs if o is not None] + for s in content: + spec = json_to_generic_spec(s) + + # validate the config (we need MgrModule for that) + if isinstance(spec, ServiceSpec) and spec.config: + for k, v in spec.config.items(): + try: + self.get_foreign_ceph_option('mon', k) + except KeyError: + raise SpecValidationError(f'Invalid config option {k} in spec') + + if dry_run and not isinstance(spec, HostSpec): + spec.preview_only = dry_run + specs.append(spec) + else: + placementspec = PlacementSpec.from_string(placement) + if not service_type: + raise OrchestratorValidationError(usage) + specs = [ServiceSpec(service_type.value, placement=placementspec, + unmanaged=unmanaged, preview_only=dry_run)] + return self._apply_misc(specs, dry_run, format, no_overwrite) + + def _apply_misc(self, specs: Sequence[GenericSpec], dry_run: bool, format: Format, no_overwrite: bool = False) -> HandleCommandResult: + completion = self.apply(specs, no_overwrite) + raise_if_exception(completion) + out = completion.result_str() + if dry_run: + completion = self.plan(specs) + raise_if_exception(completion) + data = completion.result + if format == Format.plain: + out = generate_preview_tables(data) + else: + out = to_format(data, format, many=True, cls=None) + return HandleCommandResult(stdout=out) + + @_cli_write_command('orch apply mds') + def _apply_mds(self, + fs_name: str, + placement: Optional[str] = None, + dry_run: bool = False, + unmanaged: bool = False, + format: Format = Format.plain, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Update the number of MDS instances for the given fs_name""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = MDSSpec( + service_type='mds', + service_id=fs_name, + placement=PlacementSpec.from_string(placement), + unmanaged=unmanaged, + preview_only=dry_run) + + spec.validate() # force any validation exceptions to be caught correctly + + return self._apply_misc([spec], dry_run, format, no_overwrite) + + @_cli_write_command('orch apply rgw') + def _apply_rgw(self, + svc_id: str, + placement: Optional[str] = None, + realm: Optional[str] = None, + zone: Optional[str] = None, + port: Optional[int] = None, + ssl: bool = False, + dry_run: bool = False, + format: Format = Format.plain, + unmanaged: bool = False, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Update the number of RGW instances for the given zone""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + if realm and not zone: + raise OrchestratorValidationError( + 'Cannot add RGW: Realm specified but no zone specified') + if zone and not realm: + raise OrchestratorValidationError( + 'Cannot add RGW: Zone specified but no realm specified') + + spec = RGWSpec( + service_id=svc_id, + rgw_realm=realm, + rgw_zone=zone, + rgw_frontend_port=port, + ssl=ssl, + placement=PlacementSpec.from_string(placement), + unmanaged=unmanaged, + preview_only=dry_run + ) + + spec.validate() # force any validation exceptions to be caught correctly + + return self._apply_misc([spec], dry_run, format, no_overwrite) + + @_cli_write_command('orch apply nfs') + def _apply_nfs(self, + svc_id: str, + placement: Optional[str] = None, + format: Format = Format.plain, + port: Optional[int] = None, + dry_run: bool = False, + unmanaged: bool = False, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Scale an NFS service""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = NFSServiceSpec( + service_id=svc_id, + port=port, + placement=PlacementSpec.from_string(placement), + unmanaged=unmanaged, + preview_only=dry_run + ) + + spec.validate() # force any validation exceptions to be caught correctly + + return self._apply_misc([spec], dry_run, format, no_overwrite) + + @_cli_write_command('orch apply iscsi') + def _apply_iscsi(self, + pool: str, + api_user: str, + api_password: str, + trusted_ip_list: Optional[str] = None, + placement: Optional[str] = None, + unmanaged: bool = False, + dry_run: bool = False, + format: Format = Format.plain, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Scale an iSCSI service""" + if inbuf: + raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage') + + spec = IscsiServiceSpec( + service_id=pool, + pool=pool, + api_user=api_user, + api_password=api_password, + trusted_ip_list=trusted_ip_list, + placement=PlacementSpec.from_string(placement), + unmanaged=unmanaged, + preview_only=dry_run + ) + + spec.validate() # force any validation exceptions to be caught correctly + + return self._apply_misc([spec], dry_run, format, no_overwrite) + + @_cli_write_command('orch apply snmp-gateway') + def _apply_snmp_gateway(self, + snmp_version: SNMPGatewaySpec.SNMPVersion, + destination: str, + port: Optional[int] = None, + engine_id: Optional[str] = None, + auth_protocol: Optional[SNMPGatewaySpec.SNMPAuthType] = None, + privacy_protocol: Optional[SNMPGatewaySpec.SNMPPrivacyType] = None, + placement: Optional[str] = None, + unmanaged: bool = False, + dry_run: bool = False, + format: Format = Format.plain, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Add a Prometheus to SNMP gateway service (cephadm only)""" + + if not inbuf: + raise OrchestratorValidationError( + 'missing credential configuration file. Retry with -i <filename>') + + try: + # load inbuf + credentials = yaml.safe_load(inbuf) + except (OSError, yaml.YAMLError): + raise OrchestratorValidationError('credentials file must be valid YAML') + + spec = SNMPGatewaySpec( + snmp_version=snmp_version, + port=port, + credentials=credentials, + snmp_destination=destination, + engine_id=engine_id, + auth_protocol=auth_protocol, + privacy_protocol=privacy_protocol, + placement=PlacementSpec.from_string(placement), + unmanaged=unmanaged, + preview_only=dry_run + ) + + spec.validate() # force any validation exceptions to be caught correctly + + return self._apply_misc([spec], dry_run, format, no_overwrite) + + @_cli_write_command('orch set backend') + def _set_backend(self, module_name: Optional[str] = None) -> HandleCommandResult: + """ + Select orchestrator module backend + """ + # We implement a setter command instead of just having the user + # modify the setting directly, so that we can validate they're setting + # it to a module that really exists and is enabled. + + # There isn't a mechanism for ensuring they don't *disable* the module + # later, but this is better than nothing. + mgr_map = self.get("mgr_map") + + if module_name is None or module_name == "": + self.set_module_option("orchestrator", None) + return HandleCommandResult() + + for module in mgr_map['available_modules']: + if module['name'] != module_name: + continue + + if not module['can_run']: + continue + + enabled = module['name'] in mgr_map['modules'] + if not enabled: + return HandleCommandResult(-errno.EINVAL, + stderr="Module '{module_name}' is not enabled. \n Run " + "`ceph mgr module enable {module_name}` " + "to enable.".format(module_name=module_name)) + + try: + is_orchestrator = self.remote(module_name, + "is_orchestrator_module") + except NameError: + is_orchestrator = False + + if not is_orchestrator: + return HandleCommandResult(-errno.EINVAL, + stderr="'{0}' is not an orchestrator module".format(module_name)) + + self.set_module_option("orchestrator", module_name) + + return HandleCommandResult() + + return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name)) + + @_cli_write_command('orch pause') + def _pause(self) -> HandleCommandResult: + """Pause orchestrator background work""" + self.pause() + return HandleCommandResult() + + @_cli_write_command('orch resume') + def _resume(self) -> HandleCommandResult: + """Resume orchestrator background work (if paused)""" + self.resume() + return HandleCommandResult() + + @_cli_write_command('orch cancel') + def _cancel(self) -> HandleCommandResult: + """ + Cancel ongoing background operations + """ + self.cancel_completions() + return HandleCommandResult() + + @_cli_read_command('orch status') + def _status(self, + detail: bool = False, + format: Format = Format.plain) -> HandleCommandResult: + """Report configured backend and its status""" + o = self._select_orchestrator() + if o is None: + raise NoOrchestrator() + + avail, why, module_details = self.available() + result: Dict[str, Any] = { + "available": avail, + "backend": o, + } + + if avail: + result.update(module_details) + else: + result['reason'] = why + + if format != Format.plain: + output = to_format(result, format, many=False, cls=None) + else: + output = "Backend: {0}".format(result['backend']) + output += f"\nAvailable: {'Yes' if result['available'] else 'No'}" + if 'reason' in result: + output += ' ({0})'.format(result['reason']) + if 'paused' in result: + output += f"\nPaused: {'Yes' if result['paused'] else 'No'}" + if 'workers' in result and detail: + output += f"\nHost Parallelism: {result['workers']}" + return HandleCommandResult(stdout=output) + + def self_test(self) -> None: + old_orch = self._select_orchestrator() + self._set_backend('') + assert self._select_orchestrator() is None + self._set_backend(old_orch) + + e1 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError") + try: + raise_if_exception(e1) + assert False + except ZeroDivisionError as e: + assert e.args == ('hello, world',) + + e2 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError") + try: + raise_if_exception(e2) + assert False + except OrchestratorError as e: + assert e.args == ('hello, world',) + + @staticmethod + def _upgrade_check_image_name(image: Optional[str], ceph_version: Optional[str]) -> None: + """ + >>> OrchestratorCli._upgrade_check_image_name('v15.2.0', None) + Traceback (most recent call last): + orchestrator._interface.OrchestratorValidationError: Error: unable to pull image name `v15.2.0`. + Maybe you meant `--ceph-version 15.2.0`? + + """ + if image and re.match(r'^v?\d+\.\d+\.\d+$', image) and ceph_version is None: + ver = image[1:] if image.startswith('v') else image + s = f"Error: unable to pull image name `{image}`.\n" \ + f" Maybe you meant `--ceph-version {ver}`?" + raise OrchestratorValidationError(s) + + @_cli_write_command('orch upgrade check') + def _upgrade_check(self, + image: Optional[str] = None, + ceph_version: Optional[str] = None) -> HandleCommandResult: + """Check service versions vs available and target containers""" + self._upgrade_check_image_name(image, ceph_version) + completion = self.upgrade_check(image=image, version=ceph_version) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_read_command('orch upgrade ls') + def _upgrade_ls(self, + image: Optional[str] = None, + tags: bool = False) -> HandleCommandResult: + """Check for available versions (or tags) we can upgrade to""" + completion = self.upgrade_ls(image, tags) + r = raise_if_exception(completion) + out = json.dumps(r, indent=4) + return HandleCommandResult(stdout=out) + + @_cli_write_command('orch upgrade status') + def _upgrade_status(self) -> HandleCommandResult: + """Check service versions vs available and target containers""" + completion = self.upgrade_status() + status = raise_if_exception(completion) + r = { + 'target_image': status.target_image, + 'in_progress': status.in_progress, + 'which': status.which, + 'services_complete': status.services_complete, + 'progress': status.progress, + 'message': status.message, + 'is_paused': status.is_paused, + } + out = json.dumps(r, indent=4) + return HandleCommandResult(stdout=out) + + @_cli_write_command('orch upgrade start') + def _upgrade_start(self, + image: Optional[str] = None, + ceph_version: Optional[str] = None, + daemon_types: Optional[str] = None, + hosts: Optional[str] = None, + services: Optional[str] = None, + limit: Optional[int] = None) -> HandleCommandResult: + """Initiate upgrade""" + self._upgrade_check_image_name(image, ceph_version) + dtypes = daemon_types.split(',') if daemon_types is not None else None + service_names = services.split(',') if services is not None else None + completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch upgrade pause') + def _upgrade_pause(self) -> HandleCommandResult: + """Pause an in-progress upgrade""" + completion = self.upgrade_pause() + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch upgrade resume') + def _upgrade_resume(self) -> HandleCommandResult: + """Resume paused upgrade""" + completion = self.upgrade_resume() + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command('orch upgrade stop') + def _upgrade_stop(self) -> HandleCommandResult: + """Stop an in-progress upgrade""" + completion = self.upgrade_stop() + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) diff --git a/src/pybind/mgr/orchestrator/tests/__init__.py b/src/pybind/mgr/orchestrator/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/orchestrator/tests/__init__.py diff --git a/src/pybind/mgr/orchestrator/tests/test_orchestrator.py b/src/pybind/mgr/orchestrator/tests/test_orchestrator.py new file mode 100644 index 000000000..7fe32e559 --- /dev/null +++ b/src/pybind/mgr/orchestrator/tests/test_orchestrator.py @@ -0,0 +1,220 @@ +from __future__ import absolute_import + +import json +import textwrap + +import pytest +import yaml + +from ceph.deployment.service_spec import ServiceSpec +from ceph.deployment import inventory +from ceph.utils import datetime_now +from mgr_module import HandleCommandResult + +from test_orchestrator import TestOrchestrator as _TestOrchestrator + +from orchestrator import InventoryHost, DaemonDescription, ServiceDescription, DaemonDescriptionStatus, OrchResult +from orchestrator import OrchestratorValidationError +from orchestrator.module import to_format, Format, OrchestratorCli, preview_table_osd +from unittest import mock + + +def _test_resource(data, resource_class, extra=None): + # ensure we can deserialize and serialize + rsc = resource_class.from_json(data) + assert rsc.to_json() == resource_class.from_json(rsc.to_json()).to_json() + + if extra: + # if there is an unexpected data provided + data_copy = data.copy() + data_copy.update(extra) + with pytest.raises(OrchestratorValidationError): + resource_class.from_json(data_copy) + + +def test_inventory(): + json_data = { + 'name': 'host0', + 'addr': '1.2.3.4', + 'devices': [ + { + 'sys_api': { + 'rotational': '1', + 'size': 1024, + }, + 'path': '/dev/sda', + 'available': False, + 'rejected_reasons': [], + 'lvs': [] + } + ] + } + _test_resource(json_data, InventoryHost, {'abc': False}) + for devices in json_data['devices']: + _test_resource(devices, inventory.Device) + + json_data = [{}, {'name': 'host0', 'addr': '1.2.3.4'}, {'devices': []}] + for data in json_data: + with pytest.raises(OrchestratorValidationError): + InventoryHost.from_json(data) + + +def test_daemon_description(): + json_data = { + 'hostname': 'test', + 'daemon_type': 'mon', + 'daemon_id': 'a', + 'status': -1, + } + _test_resource(json_data, DaemonDescription, {'abc': False}) + + dd = DaemonDescription.from_json(json_data) + assert dd.status.value == DaemonDescriptionStatus.error.value + + +def test_apply(): + to = _TestOrchestrator('', 0, 0) + completion = to.apply([ + ServiceSpec(service_type='nfs', service_id='foo'), + ServiceSpec(service_type='nfs', service_id='foo'), + ServiceSpec(service_type='nfs', service_id='foo'), + ]) + res = '<NFSServiceSpec for service_name=nfs.foo>' + assert completion.result == [res, res, res] + + +def test_yaml(): + y = """daemon_type: crash +daemon_id: ubuntu +daemon_name: crash.ubuntu +hostname: ubuntu +status: 1 +status_desc: starting +is_active: false +events: +- 2020-06-10T10:08:22.933241Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on + host 'ubuntu'" +--- +service_type: crash +service_name: crash +placement: + host_pattern: '*' +status: + container_image_id: 74803e884bea289d2d2d3ebdf6d37cd560499e955595695b1390a89800f4e37a + container_image_name: docker.io/ceph/daemon-base:latest-master-devel + created: '2020-06-10T10:37:31.051288Z' + last_refresh: '2020-06-10T10:57:40.715637Z' + running: 1 + size: 1 +events: +- 2020-06-10T10:37:31.139159Z service:crash [INFO] "service was created" +""" + types = (DaemonDescription, ServiceDescription) + + for y, cls in zip(y.split('---\n'), types): + data = yaml.safe_load(y) + object = cls.from_json(data) + + assert to_format(object, Format.yaml, False, cls) == y + assert to_format([object], Format.yaml, True, cls) == y + + j = json.loads(to_format(object, Format.json, False, cls)) + assert to_format(cls.from_json(j), Format.yaml, False, cls) == y + + +def test_event_multiline(): + from .._interface import OrchestratorEvent + e = OrchestratorEvent(datetime_now(), 'service', 'subject', 'ERROR', 'message') + assert OrchestratorEvent.from_json(e.to_json()) == e + + e = OrchestratorEvent(datetime_now(), 'service', + 'subject', 'ERROR', 'multiline\nmessage') + assert OrchestratorEvent.from_json(e.to_json()) == e + + +def test_handle_command(): + cmd = { + 'prefix': 'orch daemon add', + 'daemon_type': 'mon', + 'placement': 'smithi044:[v2:172.21.15.44:3301,v1:172.21.15.44:6790]=c', + } + m = OrchestratorCli('orchestrator', 0, 0) + r = m._handle_command(None, cmd) + assert r == HandleCommandResult( + retval=-2, stdout='', stderr='No orchestrator configured (try `ceph orch set backend`)') + + +r = OrchResult([ServiceDescription(spec=ServiceSpec(service_type='osd'), running=123)]) + + +@mock.patch("orchestrator.OrchestratorCli.describe_service", return_value=r) +def test_orch_ls(_describe_service): + cmd = { + 'prefix': 'orch ls', + } + m = OrchestratorCli('orchestrator', 0, 0) + r = m._handle_command(None, cmd) + out = 'NAME PORTS RUNNING REFRESHED AGE PLACEMENT \n' \ + 'osd 123 - - ' + assert r == HandleCommandResult(retval=0, stdout=out, stderr='') + + cmd = { + 'prefix': 'orch ls', + 'format': 'yaml', + } + m = OrchestratorCli('orchestrator', 0, 0) + r = m._handle_command(None, cmd) + out = textwrap.dedent(""" + service_type: osd + service_name: osd + spec: + filter_logic: AND + objectstore: bluestore + status: + running: 123 + size: 0 + """).lstrip() + assert r == HandleCommandResult(retval=0, stdout=out, stderr='') + + +def test_preview_table_osd_smoke(): + data = [ + { + 'service_type': 'osd', + 'data': + { + 'foo host': + [ + { + 'osdspec': 'foo', + 'error': '', + 'data': + [ + { + "block_db": "/dev/nvme0n1", + "block_db_size": "66.67 GB", + "data": "/dev/sdb", + "data_size": "300.00 GB", + "encryption": "None" + }, + { + "block_db": "/dev/nvme0n1", + "block_db_size": "66.67 GB", + "data": "/dev/sdc", + "data_size": "300.00 GB", + "encryption": "None" + }, + { + "block_db": "/dev/nvme0n1", + "block_db_size": "66.67 GB", + "data": "/dev/sdd", + "data_size": "300.00 GB", + "encryption": "None" + } + ] + } + ] + } + } + ] + preview_table_osd(data) |