summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/orchestrator
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/orchestrator
parentInitial commit. (diff)
downloadceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz
ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/orchestrator/README.md14
-rw-r--r--src/pybind/mgr/orchestrator/__init__.py20
-rw-r--r--src/pybind/mgr/orchestrator/_interface.py1527
-rw-r--r--src/pybind/mgr/orchestrator/module.py1471
-rw-r--r--src/pybind/mgr/orchestrator/tests/__init__.py0
-rw-r--r--src/pybind/mgr/orchestrator/tests/test_orchestrator.py220
6 files changed, 3252 insertions, 0 deletions
diff --git a/src/pybind/mgr/orchestrator/README.md b/src/pybind/mgr/orchestrator/README.md
new file mode 100644
index 000000000..7e3417959
--- /dev/null
+++ b/src/pybind/mgr/orchestrator/README.md
@@ -0,0 +1,14 @@
+# Orchestrator CLI
+
+See also [orchestrator cli doc](https://docs.ceph.com/en/latest/mgr/orchestrator/).
+
+## Running the Teuthology tests
+
+To run the API tests against a real Ceph cluster, we leverage the Teuthology
+framework and the `test_orchestrator` backend.
+
+``source`` the script and run the tests manually::
+
+ $ pushd ../dashboard ; source ./run-backend-api-tests.sh ; popd
+ $ run_teuthology_tests tasks.mgr.test_orchestrator_cli
+ $ cleanup_teuthology
diff --git a/src/pybind/mgr/orchestrator/__init__.py b/src/pybind/mgr/orchestrator/__init__.py
new file mode 100644
index 000000000..c901284d3
--- /dev/null
+++ b/src/pybind/mgr/orchestrator/__init__.py
@@ -0,0 +1,20 @@
+# flake8: noqa
+
+from .module import OrchestratorCli
+
+# usage: E.g. `from orchestrator import StatelessServiceSpec`
+from ._interface import \
+ OrchResult, raise_if_exception, handle_orch_error, \
+ CLICommand, _cli_write_command, _cli_read_command, CLICommandMeta, \
+ Orchestrator, OrchestratorClientMixin, \
+ OrchestratorValidationError, OrchestratorError, NoOrchestrator, \
+ ServiceDescription, InventoryFilter, HostSpec, \
+ DaemonDescription, DaemonDescriptionStatus, \
+ OrchestratorEvent, set_exception_subject, \
+ InventoryHost, DeviceLightLoc, \
+ UpgradeStatusSpec, daemon_type_to_service, service_to_daemon_types, KNOWN_DAEMON_TYPES
+
+
+import os
+if 'UNITTEST' in os.environ:
+ import tests
diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py
new file mode 100644
index 000000000..2208a5874
--- /dev/null
+++ b/src/pybind/mgr/orchestrator/_interface.py
@@ -0,0 +1,1527 @@
+
+"""
+ceph-mgr orchestrator interface
+
+Please see the ceph-mgr module developer's guide for more information.
+"""
+
+import copy
+import datetime
+import enum
+import errno
+import logging
+import pickle
+import re
+
+from collections import namedtuple, OrderedDict
+from contextlib import contextmanager
+from functools import wraps, reduce, update_wrapper
+
+from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
+ Sequence, Dict, cast, Mapping
+
+try:
+ from typing import Protocol # Protocol was added in Python 3.8
+except ImportError:
+ class Protocol: # type: ignore
+ pass
+
+
+import yaml
+
+from ceph.deployment import inventory
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
+from ceph.utils import datetime_to_str, str_to_datetime
+
+from mgr_module import MgrModule, CLICommand, HandleCommandResult
+
+
+logger = logging.getLogger(__name__)
+
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
+
+
+class OrchestratorError(Exception):
+ """
+ General orchestrator specific error.
+
+ Used for deployment, configuration or user errors.
+
+ It's not intended for programming errors or orchestrator internal errors.
+ """
+
+ def __init__(self,
+ msg: str,
+ errno: int = -errno.EINVAL,
+ event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
+ super(Exception, self).__init__(msg)
+ self.errno = errno
+ # See OrchestratorEvent.subject
+ self.event_subject = event_kind_subject
+
+
+class NoOrchestrator(OrchestratorError):
+ """
+ No orchestrator in configured.
+ """
+
+ def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
+ super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
+
+
+class OrchestratorValidationError(OrchestratorError):
+ """
+ Raised when an orchestrator doesn't support a specific feature.
+ """
+
+
+@contextmanager
+def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
+ try:
+ yield
+ except OrchestratorError as e:
+ if overwrite or hasattr(e, 'event_subject'):
+ e.event_subject = (kind, subject)
+ raise
+
+
+def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ try:
+ return func(*args, **kwargs)
+ except (OrchestratorError, SpecValidationError) as e:
+ # Do not print Traceback for expected errors.
+ return HandleCommandResult(e.errno, stderr=str(e))
+ except ImportError as e:
+ return HandleCommandResult(-errno.ENOENT, stderr=str(e))
+ except NotImplementedError:
+ msg = 'This Orchestrator does not support `{}`'.format(prefix)
+ return HandleCommandResult(-errno.ENOENT, stderr=msg)
+
+ # misuse lambda to copy `wrapper`
+ wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
+ wrapper_copy._prefix = prefix # type: ignore
+ wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
+ wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
+ wrapper_copy._cli_command.func = wrapper_copy # type: ignore
+
+ return cast(FuncT, wrapper_copy)
+
+
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+ """
+ Decorator to make Orchestrator methods return
+ an OrchResult.
+ """
+
+ @wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+ try:
+ return OrchResult(f(*args, **kwargs))
+ except Exception as e:
+ logger.exception(e)
+ import os
+ if 'UNITTEST' in os.environ:
+ raise # This makes debugging of Tracebacks from unittests a bit easier
+ return OrchResult(None, exception=e)
+
+ return cast(Callable[..., OrchResult[T]], wrapper)
+
+
+class InnerCliCommandCallable(Protocol):
+ def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
+ ...
+
+
+def _cli_command(perm: str) -> InnerCliCommandCallable:
+ def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
+ return lambda func: handle_exception(prefix, perm, func)
+ return inner_cli_command
+
+
+_cli_read_command = _cli_command('r')
+_cli_write_command = _cli_command('rw')
+
+
+class CLICommandMeta(type):
+ """
+ This is a workaround for the use of a global variable CLICommand.COMMANDS which
+ prevents modules from importing any other module.
+
+ We make use of CLICommand, except for the use of the global variable.
+ """
+ def __init__(cls, name: str, bases: Any, dct: Any) -> None:
+ super(CLICommandMeta, cls).__init__(name, bases, dct)
+ dispatch: Dict[str, CLICommand] = {}
+ for v in dct.values():
+ try:
+ dispatch[v._prefix] = v._cli_command
+ except AttributeError:
+ pass
+
+ def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
+ if cmd['prefix'] not in dispatch:
+ return self.handle_command(inbuf, cmd)
+
+ return dispatch[cmd['prefix']].call(self, cmd, inbuf)
+
+ cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
+ cls.handle_command = handle_command
+
+
+class OrchResult(Generic[T]):
+ """
+ Stores a result and an exception. Mainly to circumvent the
+ MgrModule.remote() method that hides all exceptions and for
+ handling different sub-interpreters.
+ """
+
+ def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+ self.result = result
+ self.serialized_exception: Optional[bytes] = None
+ self.exception_str: str = ''
+ self.set_exception(exception)
+
+ __slots__ = 'result', 'serialized_exception', 'exception_str'
+
+ def set_exception(self, e: Optional[Exception]) -> None:
+ if e is None:
+ self.serialized_exception = None
+ self.exception_str = ''
+ return
+
+ self.exception_str = f'{type(e)}: {str(e)}'
+ try:
+ self.serialized_exception = pickle.dumps(e)
+ except pickle.PicklingError:
+ logger.error(f"failed to pickle {e}")
+ if isinstance(e, Exception):
+ e = Exception(*e.args)
+ else:
+ e = Exception(str(e))
+ # degenerate to a plain Exception
+ self.serialized_exception = pickle.dumps(e)
+
+ def result_str(self) -> str:
+ """Force a string."""
+ if self.result is None:
+ return ''
+ if isinstance(self.result, list):
+ return '\n'.join(str(x) for x in self.result)
+ return str(self.result)
+
+
+def raise_if_exception(c: OrchResult[T]) -> T:
+ """
+ Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
+ """
+ if c.serialized_exception is not None:
+ try:
+ e = pickle.loads(c.serialized_exception)
+ except (KeyError, AttributeError):
+ raise Exception(c.exception_str)
+ raise e
+ assert c.result is not None, 'OrchResult should either have an exception or a result'
+ return c.result
+
+
+def _hide_in_features(f: FuncT) -> FuncT:
+ f._hide_in_features = True # type: ignore
+ return f
+
+
+class Orchestrator(object):
+ """
+ Calls in this class may do long running remote operations, with time
+ periods ranging from network latencies to package install latencies and large
+ internet downloads. For that reason, all are asynchronous, and return
+ ``Completion`` objects.
+
+ Methods should only return the completion and not directly execute
+ anything, like network calls. Otherwise the purpose of
+ those completions is defeated.
+
+ Implementations are not required to start work on an operation until
+ the caller waits on the relevant Completion objects. Callers making
+ multiple updates should not wait on Completions until they're done
+ sending operations: this enables implementations to batch up a series
+ of updates when wait() is called on a set of Completion objects.
+
+ Implementations are encouraged to keep reasonably fresh caches of
+ the status of the system: it is better to serve a stale-but-recent
+ result read of e.g. device inventory than it is to keep the caller waiting
+ while you scan hosts every time.
+ """
+
+ @_hide_in_features
+ def is_orchestrator_module(self) -> bool:
+ """
+ Enable other modules to interrogate this module to discover
+ whether it's usable as an orchestrator module.
+
+ Subclasses do not need to override this.
+ """
+ return True
+
+ @_hide_in_features
+ def available(self) -> Tuple[bool, str, Dict[str, Any]]:
+ """
+ Report whether we can talk to the orchestrator. This is the
+ place to give the user a meaningful message if the orchestrator
+ isn't running or can't be contacted.
+
+ This method may be called frequently (e.g. every page load
+ to conditionally display a warning banner), so make sure it's
+ not too expensive. It's okay to give a slightly stale status
+ (e.g. based on a periodic background ping of the orchestrator)
+ if that's necessary to make this method fast.
+
+ .. note::
+ `True` doesn't mean that the desired functionality
+ is actually available in the orchestrator. I.e. this
+ won't work as expected::
+
+ >>> #doctest: +SKIP
+ ... if OrchestratorClientMixin().available()[0]: # wrong.
+ ... OrchestratorClientMixin().get_hosts()
+
+ :return: boolean representing whether the module is available/usable
+ :return: string describing any error
+ :return: dict containing any module specific information
+ """
+ raise NotImplementedError()
+
+ @_hide_in_features
+ def get_feature_set(self) -> Dict[str, dict]:
+ """Describes which methods this orchestrator implements
+
+ .. note::
+ `True` doesn't mean that the desired functionality
+ is actually possible in the orchestrator. I.e. this
+ won't work as expected::
+
+ >>> #doctest: +SKIP
+ ... api = OrchestratorClientMixin()
+ ... if api.get_feature_set()['get_hosts']['available']: # wrong.
+ ... api.get_hosts()
+
+ It's better to ask for forgiveness instead::
+
+ >>> #doctest: +SKIP
+ ... try:
+ ... OrchestratorClientMixin().get_hosts()
+ ... except (OrchestratorError, NotImplementedError):
+ ... ...
+
+ :returns: Dict of API method names to ``{'available': True or False}``
+ """
+ module = self.__class__
+ features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
+ for a in Orchestrator.__dict__
+ if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
+ }
+ return features
+
+ def cancel_completions(self) -> None:
+ """
+ Cancels ongoing completions. Unstuck the mgr.
+ """
+ raise NotImplementedError()
+
+ def pause(self) -> None:
+ raise NotImplementedError()
+
+ def resume(self) -> None:
+ raise NotImplementedError()
+
+ def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
+ """
+ Add a host to the orchestrator inventory.
+
+ :param host: hostname
+ """
+ raise NotImplementedError()
+
+ def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
+ """
+ Remove a host from the orchestrator inventory.
+
+ :param host: hostname
+ """
+ raise NotImplementedError()
+
+ def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+ """
+ drain all daemons from a host
+
+ :param hostname: hostname
+ """
+ raise NotImplementedError()
+
+ def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
+ """
+ Update a host's address
+
+ :param host: hostname
+ :param addr: address (dns name or IP)
+ """
+ raise NotImplementedError()
+
+ def get_hosts(self) -> OrchResult[List[HostSpec]]:
+ """
+ Report the hosts in the cluster.
+
+ :return: list of HostSpec
+ """
+ raise NotImplementedError()
+
+ def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+ """
+ Return hosts metadata(gather_facts).
+ """
+ raise NotImplementedError()
+
+ def add_host_label(self, host: str, label: str) -> OrchResult[str]:
+ """
+ Add a host label
+ """
+ raise NotImplementedError()
+
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
+ """
+ Remove a host label
+ """
+ raise NotImplementedError()
+
+ def host_ok_to_stop(self, hostname: str) -> OrchResult:
+ """
+ Check if the specified host can be safely stopped without reducing availability
+
+ :param host: hostname
+ """
+ raise NotImplementedError()
+
+ def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
+ """
+ Place a host in maintenance, stopping daemons and disabling it's systemd target
+ """
+ raise NotImplementedError()
+
+ def exit_host_maintenance(self, hostname: str) -> OrchResult:
+ """
+ Return a host from maintenance, restarting the clusters systemd target
+ """
+ raise NotImplementedError()
+
+ def rescan_host(self, hostname: str) -> OrchResult:
+ """Use cephadm to issue a disk rescan on each HBA
+
+ Some HBAs and external enclosures don't automatically register
+ device insertion with the kernel, so for these scenarios we need
+ to manually rescan
+
+ :param hostname: (str) host name
+ """
+ raise NotImplementedError()
+
+ def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
+ """
+ Returns something that was created by `ceph-volume inventory`.
+
+ :return: list of InventoryHost
+ """
+ raise NotImplementedError()
+
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
+ """
+ Describe a service (of any kind) that is already configured in
+ the orchestrator. For example, when viewing an OSD in the dashboard
+ we might like to also display information about the orchestrator's
+ view of the service (like the kubernetes pod ID).
+
+ When viewing a CephFS filesystem in the dashboard, we would use this
+ to display the pods being currently run for MDS daemons.
+
+ :return: list of ServiceDescription objects.
+ """
+ raise NotImplementedError()
+
+ def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
+ """
+ Describe a daemon (of any kind) that is already configured in
+ the orchestrator.
+
+ :return: list of DaemonDescription objects.
+ """
+ raise NotImplementedError()
+
+ @handle_orch_error
+ def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
+ """
+ Applies any spec
+ """
+ fns: Dict[str, Callable[..., OrchResult[str]]] = {
+ 'alertmanager': self.apply_alertmanager,
+ 'crash': self.apply_crash,
+ 'grafana': self.apply_grafana,
+ 'iscsi': self.apply_iscsi,
+ 'mds': self.apply_mds,
+ 'mgr': self.apply_mgr,
+ 'mon': self.apply_mon,
+ 'nfs': self.apply_nfs,
+ 'node-exporter': self.apply_node_exporter,
+ 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
+ 'prometheus': self.apply_prometheus,
+ 'rbd-mirror': self.apply_rbd_mirror,
+ 'rgw': self.apply_rgw,
+ 'ingress': self.apply_ingress,
+ 'snmp-gateway': self.apply_snmp_gateway,
+ 'host': self.add_host,
+ 'cephadm-exporter': self.apply_cephadm_exporter,
+ }
+
+ def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
+ l_res = raise_if_exception(l)
+ r_res = raise_if_exception(r)
+ l_res.append(r_res)
+ return OrchResult(l_res)
+ return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
+
+ def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
+ """
+ Plan (Dry-run, Preview) a List of Specs.
+ """
+ raise NotImplementedError()
+
+ def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
+ """
+ Remove specific daemon(s).
+
+ :return: None
+ """
+ raise NotImplementedError()
+
+ def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
+ """
+ Remove a service (a collection of daemons).
+
+ :return: None
+ """
+ raise NotImplementedError()
+
+ def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
+ """
+ Perform an action (start/stop/reload) on a service (i.e., all daemons
+ providing the logical service).
+
+ :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
+ :param service_name: service_type + '.' + service_id
+ (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
+ :rtype: OrchResult
+ """
+ # assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ raise NotImplementedError()
+
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
+ """
+ Perform an action (start/stop/reload) on a daemon.
+
+ :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
+ :param daemon_name: name of daemon
+ :param image: Container image when redeploying that daemon
+ :rtype: OrchResult
+ """
+ # assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ raise NotImplementedError()
+
+ def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
+ """
+ Create one or more OSDs within a single Drive Group.
+
+ The principal argument here is the drive_group member
+ of OsdSpec: other fields are advisory/extensible for any
+ finer-grained OSD feature enablement (choice of backing store,
+ compression/encryption, etc).
+ """
+ raise NotImplementedError()
+
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
+ """ Update OSD cluster """
+ raise NotImplementedError()
+
+ def set_unmanaged_flag(self,
+ unmanaged_flag: bool,
+ service_type: str = 'osd',
+ service_name: Optional[str] = None
+ ) -> HandleCommandResult:
+ raise NotImplementedError()
+
+ def preview_osdspecs(self,
+ osdspec_name: Optional[str] = 'osd',
+ osdspecs: Optional[List[DriveGroupSpec]] = None
+ ) -> OrchResult[str]:
+ """ Get a preview for OSD deployments """
+ raise NotImplementedError()
+
+ def remove_osds(self, osd_ids: List[str],
+ replace: bool = False,
+ force: bool = False,
+ zap: bool = False) -> OrchResult[str]:
+ """
+ :param osd_ids: list of OSD IDs
+ :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
+ :param force: Forces the OSD removal process without waiting for the data to be drained first.
+ :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+ .. note:: this can only remove OSDs that were successfully
+ created (i.e. got an OSD ID).
+ """
+ raise NotImplementedError()
+
+ def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
+ """
+ TODO
+ """
+ raise NotImplementedError()
+
+ def remove_osds_status(self) -> OrchResult:
+ """
+ Returns a status of the ongoing OSD removal operations.
+ """
+ raise NotImplementedError()
+
+ def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
+ """
+ Instructs the orchestrator to enable or disable either the ident or the fault LED.
+
+ :param ident_fault: either ``"ident"`` or ``"fault"``
+ :param on: ``True`` = on.
+ :param locations: See :class:`orchestrator.DeviceLightLoc`
+ """
+ raise NotImplementedError()
+
+ def zap_device(self, host: str, path: str) -> OrchResult[str]:
+ """Zap/Erase a device (DESTROYS DATA)"""
+ raise NotImplementedError()
+
+ def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
+ """Create daemons daemon(s) for unmanaged services"""
+ raise NotImplementedError()
+
+ def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update mon cluster"""
+ raise NotImplementedError()
+
+ def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update mgr cluster"""
+ raise NotImplementedError()
+
+ def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
+ """Update MDS cluster"""
+ raise NotImplementedError()
+
+ def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
+ """Update RGW cluster"""
+ raise NotImplementedError()
+
+ def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
+ """Update ingress daemons"""
+ raise NotImplementedError()
+
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update rbd-mirror cluster"""
+ raise NotImplementedError()
+
+ def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
+ """Update NFS cluster"""
+ raise NotImplementedError()
+
+ def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
+ """Update iscsi cluster"""
+ raise NotImplementedError()
+
+ def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update prometheus cluster"""
+ raise NotImplementedError()
+
+ def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Node-Exporter daemon(s)"""
+ raise NotImplementedError()
+
+ def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a crash daemon(s)"""
+ raise NotImplementedError()
+
+ def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a grafana service"""
+ raise NotImplementedError()
+
+ def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update an existing AlertManager daemon(s)"""
+ raise NotImplementedError()
+
+ def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+ """Update an existing snmp gateway service"""
+ raise NotImplementedError()
+
+ def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update an existing cephadm exporter daemon"""
+ raise NotImplementedError()
+
+ def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]:
+ raise NotImplementedError()
+
+ def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+ hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def upgrade_pause(self) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def upgrade_resume(self) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def upgrade_stop(self) -> OrchResult[str]:
+ raise NotImplementedError()
+
+ def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
+ """
+ If an upgrade is currently underway, report on where
+ we are in the process, or if some error has occurred.
+
+ :return: UpgradeStatusSpec instance
+ """
+ raise NotImplementedError()
+
+ @_hide_in_features
+ def upgrade_available(self) -> OrchResult:
+ """
+ Report on what versions are available to upgrade to
+
+ :return: List of strings
+ """
+ raise NotImplementedError()
+
+
+GenericSpec = Union[ServiceSpec, HostSpec]
+
+
+def json_to_generic_spec(spec: dict) -> GenericSpec:
+ if 'service_type' in spec and spec['service_type'] == 'host':
+ return HostSpec.from_json(spec)
+ else:
+ return ServiceSpec.from_json(spec)
+
+
+def daemon_type_to_service(dtype: str) -> str:
+ mapping = {
+ 'mon': 'mon',
+ 'mgr': 'mgr',
+ 'mds': 'mds',
+ 'rgw': 'rgw',
+ 'osd': 'osd',
+ 'haproxy': 'ingress',
+ 'keepalived': 'ingress',
+ 'iscsi': 'iscsi',
+ 'rbd-mirror': 'rbd-mirror',
+ 'cephfs-mirror': 'cephfs-mirror',
+ 'nfs': 'nfs',
+ 'grafana': 'grafana',
+ 'alertmanager': 'alertmanager',
+ 'prometheus': 'prometheus',
+ 'node-exporter': 'node-exporter',
+ 'crash': 'crash',
+ 'crashcollector': 'crash', # Specific Rook Daemon
+ 'container': 'container',
+ 'cephadm-exporter': 'cephadm-exporter',
+ 'snmp-gateway': 'snmp-gateway',
+ }
+ return mapping[dtype]
+
+
+def service_to_daemon_types(stype: str) -> List[str]:
+ mapping = {
+ 'mon': ['mon'],
+ 'mgr': ['mgr'],
+ 'mds': ['mds'],
+ 'rgw': ['rgw'],
+ 'osd': ['osd'],
+ 'ingress': ['haproxy', 'keepalived'],
+ 'iscsi': ['iscsi'],
+ 'rbd-mirror': ['rbd-mirror'],
+ 'cephfs-mirror': ['cephfs-mirror'],
+ 'nfs': ['nfs'],
+ 'grafana': ['grafana'],
+ 'alertmanager': ['alertmanager'],
+ 'prometheus': ['prometheus'],
+ 'node-exporter': ['node-exporter'],
+ 'crash': ['crash'],
+ 'container': ['container'],
+ 'cephadm-exporter': ['cephadm-exporter'],
+ 'snmp-gateway': ['snmp-gateway'],
+ }
+ return mapping[stype]
+
+
+KNOWN_DAEMON_TYPES: List[str] = list(
+ sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
+class UpgradeStatusSpec(object):
+ # Orchestrator's report on what's going on with any ongoing upgrade
+ def __init__(self) -> None:
+ self.in_progress = False # Is an upgrade underway?
+ self.target_image: Optional[str] = None
+ self.services_complete: List[str] = [] # Which daemon types are fully updated?
+ self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
+ self.progress: Optional[str] = None # How many of the daemons have we upgraded
+ self.message = "" # Freeform description
+ self.is_paused: bool = False # Is the upgrade paused?
+
+
+def handle_type_error(method: FuncT) -> FuncT:
+ @wraps(method)
+ def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
+ try:
+ return method(cls, *args, **kwargs)
+ except TypeError as e:
+ error_msg = '{}: {}'.format(cls.__name__, e)
+ raise OrchestratorValidationError(error_msg)
+ return cast(FuncT, inner)
+
+
+class DaemonDescriptionStatus(enum.IntEnum):
+ unknown = -2
+ error = -1
+ stopped = 0
+ running = 1
+ starting = 2 #: Daemon is deployed, but not yet running
+
+ @staticmethod
+ def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+ if status is None:
+ status = DaemonDescriptionStatus.unknown
+ return {
+ DaemonDescriptionStatus.unknown: 'unknown',
+ DaemonDescriptionStatus.error: 'error',
+ DaemonDescriptionStatus.stopped: 'stopped',
+ DaemonDescriptionStatus.running: 'running',
+ DaemonDescriptionStatus.starting: 'starting',
+ }.get(status, '<unknown>')
+
+
+class DaemonDescription(object):
+ """
+ For responding to queries about the status of a particular daemon,
+ stateful or stateless.
+
+ This is not about health or performance monitoring of daemons: it's
+ about letting the orchestrator tell Ceph whether and where a
+ daemon is scheduled in the cluster. When an orchestrator tells
+ Ceph "it's running on host123", that's not a promise that the process
+ is literally up this second, it's a description of where the orchestrator
+ has decided the daemon should run.
+ """
+
+ def __init__(self,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ hostname: Optional[str] = None,
+ container_id: Optional[str] = None,
+ container_image_id: Optional[str] = None,
+ container_image_name: Optional[str] = None,
+ container_image_digests: Optional[List[str]] = None,
+ version: Optional[str] = None,
+ status: Optional[DaemonDescriptionStatus] = None,
+ status_desc: Optional[str] = None,
+ last_refresh: Optional[datetime.datetime] = None,
+ created: Optional[datetime.datetime] = None,
+ started: Optional[datetime.datetime] = None,
+ last_configured: Optional[datetime.datetime] = None,
+ osdspec_affinity: Optional[str] = None,
+ last_deployed: Optional[datetime.datetime] = None,
+ events: Optional[List['OrchestratorEvent']] = None,
+ is_active: bool = False,
+ memory_usage: Optional[int] = None,
+ memory_request: Optional[int] = None,
+ memory_limit: Optional[int] = None,
+ cpu_percentage: Optional[str] = None,
+ service_name: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ ip: Optional[str] = None,
+ deployed_by: Optional[List[str]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None,
+ ) -> None:
+
+ #: Host is at the same granularity as InventoryHost
+ self.hostname: Optional[str] = hostname
+
+ # Not everyone runs in containers, but enough people do to
+ # justify having the container_id (runtime id) and container_image
+ # (image name)
+ self.container_id = container_id # runtime id
+ self.container_image_id = container_image_id # image id locally
+ self.container_image_name = container_image_name # image friendly name
+ self.container_image_digests = container_image_digests # reg hashes
+
+ #: The type of service (osd, mon, mgr, etc.)
+ self.daemon_type = daemon_type
+
+ #: The orchestrator will have picked some names for daemons,
+ #: typically either based on hostnames or on pod names.
+ #: This is the <foo> in mds.<foo>, the ID that will appear
+ #: in the FSMap/ServiceMap.
+ self.daemon_id: Optional[str] = daemon_id
+ self.daemon_name = self.name()
+
+ #: Some daemon types have a numeric rank assigned
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
+
+ self._service_name: Optional[str] = service_name
+
+ #: Service version that was deployed
+ self.version = version
+
+ # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+ self._status = status
+
+ #: Service status description when status == error.
+ self.status_desc = status_desc
+
+ #: datetime when this info was last refreshed
+ self.last_refresh: Optional[datetime.datetime] = last_refresh
+
+ self.created: Optional[datetime.datetime] = created
+ self.started: Optional[datetime.datetime] = started
+ self.last_configured: Optional[datetime.datetime] = last_configured
+ self.last_deployed: Optional[datetime.datetime] = last_deployed
+
+ #: Affinity to a certain OSDSpec
+ self.osdspec_affinity: Optional[str] = osdspec_affinity
+
+ self.events: List[OrchestratorEvent] = events or []
+
+ self.memory_usage: Optional[int] = memory_usage
+ self.memory_request: Optional[int] = memory_request
+ self.memory_limit: Optional[int] = memory_limit
+
+ self.cpu_percentage: Optional[str] = cpu_percentage
+
+ self.ports: Optional[List[int]] = ports
+ self.ip: Optional[str] = ip
+
+ self.deployed_by = deployed_by
+
+ self.is_active = is_active
+
+ self.extra_container_args = extra_container_args
+
+ @property
+ def status(self) -> Optional[DaemonDescriptionStatus]:
+ return self._status
+
+ @status.setter
+ def status(self, new: DaemonDescriptionStatus) -> None:
+ self._status = new
+ self.status_desc = DaemonDescriptionStatus.to_str(new)
+
+ def get_port_summary(self) -> str:
+ if not self.ports:
+ return ''
+ return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
+
+ def name(self) -> str:
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def matches_service(self, service_name: Optional[str]) -> bool:
+ assert self.daemon_id is not None
+ assert self.daemon_type is not None
+ if service_name:
+ return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
+ return False
+
+ def service_id(self) -> str:
+ assert self.daemon_id is not None
+ assert self.daemon_type is not None
+
+ if self._service_name:
+ if '.' in self._service_name:
+ return self._service_name.split('.', 1)[1]
+ else:
+ return ''
+
+ if self.daemon_type == 'osd':
+ if self.osdspec_affinity and self.osdspec_affinity != 'None':
+ return self.osdspec_affinity
+ return ''
+
+ def _match() -> str:
+ assert self.daemon_id is not None
+ err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
+ f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
+
+ if not self.hostname:
+ # TODO: can a DaemonDescription exist without a hostname?
+ raise err
+
+ # use the bare hostname, not the FQDN.
+ host = self.hostname.split('.')[0]
+
+ if host == self.daemon_id:
+ # daemon_id == "host"
+ return self.daemon_id
+
+ elif host in self.daemon_id:
+ # daemon_id == "service_id.host"
+ # daemon_id == "service_id.host.random"
+ pre, post = self.daemon_id.rsplit(host, 1)
+ if not pre.endswith('.'):
+ # '.' sep missing at front of host
+ raise err
+ elif post and not post.startswith('.'):
+ # '.' sep missing at end of host
+ raise err
+ return pre[:-1]
+
+ # daemon_id == "service_id.random"
+ if self.daemon_type == 'rgw':
+ v = self.daemon_id.split('.')
+ if len(v) in [3, 4]:
+ return '.'.join(v[0:2])
+
+ if self.daemon_type == 'iscsi':
+ v = self.daemon_id.split('.')
+ return '.'.join(v[0:-1])
+
+ # daemon_id == "service_id"
+ return self.daemon_id
+
+ if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
+ return _match()
+
+ return self.daemon_id
+
+ def service_name(self) -> str:
+ if self._service_name:
+ return self._service_name
+ assert self.daemon_type is not None
+ if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
+ return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
+ return daemon_type_to_service(self.daemon_type)
+
+ def __repr__(self) -> str:
+ return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
+ id=self.daemon_id)
+
+ def __str__(self) -> str:
+ return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
+ def to_json(self) -> dict:
+ out: Dict[str, Any] = OrderedDict()
+ out['daemon_type'] = self.daemon_type
+ out['daemon_id'] = self.daemon_id
+ out['service_name'] = self._service_name
+ out['daemon_name'] = self.name()
+ out['hostname'] = self.hostname
+ out['container_id'] = self.container_id
+ out['container_image_id'] = self.container_image_id
+ out['container_image_name'] = self.container_image_name
+ out['container_image_digests'] = self.container_image_digests
+ out['memory_usage'] = self.memory_usage
+ out['memory_request'] = self.memory_request
+ out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
+ out['version'] = self.version
+ out['status'] = self.status.value if self.status is not None else None
+ out['status_desc'] = self.status_desc
+ if self.daemon_type == 'osd':
+ out['osdspec_affinity'] = self.osdspec_affinity
+ out['is_active'] = self.is_active
+ out['ports'] = self.ports
+ out['ip'] = self.ip
+ out['rank'] = self.rank
+ out['rank_generation'] = self.rank_generation
+
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if getattr(self, k):
+ out[k] = datetime_to_str(getattr(self, k))
+
+ if self.events:
+ out['events'] = [e.to_json() for e in self.events]
+
+ empty = [k for k, v in out.items() if v is None]
+ for e in empty:
+ del out[e]
+ return out
+
+ def to_dict(self) -> dict:
+ out: Dict[str, Any] = OrderedDict()
+ out['daemon_type'] = self.daemon_type
+ out['daemon_id'] = self.daemon_id
+ out['daemon_name'] = self.name()
+ out['hostname'] = self.hostname
+ out['container_id'] = self.container_id
+ out['container_image_id'] = self.container_image_id
+ out['container_image_name'] = self.container_image_name
+ out['container_image_digests'] = self.container_image_digests
+ out['memory_usage'] = self.memory_usage
+ out['memory_request'] = self.memory_request
+ out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
+ out['version'] = self.version
+ out['status'] = self.status.value if self.status is not None else None
+ out['status_desc'] = self.status_desc
+ if self.daemon_type == 'osd':
+ out['osdspec_affinity'] = self.osdspec_affinity
+ out['is_active'] = self.is_active
+ out['ports'] = self.ports
+ out['ip'] = self.ip
+
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if getattr(self, k):
+ out[k] = datetime_to_str(getattr(self, k))
+
+ if self.events:
+ out['events'] = [e.to_dict() for e in self.events]
+
+ empty = [k for k, v in out.items() if v is None]
+ for e in empty:
+ del out[e]
+ return out
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data: dict) -> 'DaemonDescription':
+ c = data.copy()
+ event_strs = c.pop('events', [])
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if k in c:
+ c[k] = str_to_datetime(c[k])
+ events = [OrchestratorEvent.from_json(e) for e in event_strs]
+ status_int = c.pop('status', None)
+ if 'daemon_name' in c:
+ del c['daemon_name']
+ if 'service_name' in c and c['service_name'].startswith('osd.'):
+ # if the service_name is a osd.NNN (numeric osd id) then
+ # ignore it -- it is not a valid service_name and
+ # (presumably) came from an older version of cephadm.
+ try:
+ int(c['service_name'][4:])
+ del c['service_name']
+ except ValueError:
+ pass
+ status = DaemonDescriptionStatus(status_int) if status_int is not None else None
+ return cls(events=events, status=status, **c)
+
+ def __copy__(self) -> 'DaemonDescription':
+ # feel free to change this:
+ return DaemonDescription.from_json(self.to_json())
+
+ @staticmethod
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
+
+
+yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
+
+
+class ServiceDescription(object):
+ """
+ For responding to queries about the status of a particular service,
+ stateful or stateless.
+
+ This is not about health or performance monitoring of services: it's
+ about letting the orchestrator tell Ceph whether and where a
+ service is scheduled in the cluster. When an orchestrator tells
+ Ceph "it's running on host123", that's not a promise that the process
+ is literally up this second, it's a description of where the orchestrator
+ has decided the service should run.
+ """
+
+ def __init__(self,
+ spec: ServiceSpec,
+ container_image_id: Optional[str] = None,
+ container_image_name: Optional[str] = None,
+ service_url: Optional[str] = None,
+ last_refresh: Optional[datetime.datetime] = None,
+ created: Optional[datetime.datetime] = None,
+ deleted: Optional[datetime.datetime] = None,
+ size: int = 0,
+ running: int = 0,
+ events: Optional[List['OrchestratorEvent']] = None,
+ virtual_ip: Optional[str] = None,
+ ports: List[int] = []) -> None:
+ # Not everyone runs in containers, but enough people do to
+ # justify having the container_image_id (image hash) and container_image
+ # (image name)
+ self.container_image_id = container_image_id # image hash
+ self.container_image_name = container_image_name # image friendly name
+
+ # If the service exposes REST-like API, this attribute should hold
+ # the URL.
+ self.service_url = service_url
+
+ # Number of daemons
+ self.size = size
+
+ # Number of daemons up
+ self.running = running
+
+ # datetime when this info was last refreshed
+ self.last_refresh: Optional[datetime.datetime] = last_refresh
+ self.created: Optional[datetime.datetime] = created
+ self.deleted: Optional[datetime.datetime] = deleted
+
+ self.spec: ServiceSpec = spec
+
+ self.events: List[OrchestratorEvent] = events or []
+
+ self.virtual_ip = virtual_ip
+ self.ports = ports
+
+ def service_type(self) -> str:
+ return self.spec.service_type
+
+ def __repr__(self) -> str:
+ return f"<ServiceDescription of {self.spec.one_line_str()}>"
+
+ def get_port_summary(self) -> str:
+ if not self.ports:
+ return ''
+ return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
+
+ def to_json(self) -> OrderedDict:
+ out = self.spec.to_json()
+ status = {
+ 'container_image_id': self.container_image_id,
+ 'container_image_name': self.container_image_name,
+ 'service_url': self.service_url,
+ 'size': self.size,
+ 'running': self.running,
+ 'last_refresh': self.last_refresh,
+ 'created': self.created,
+ 'virtual_ip': self.virtual_ip,
+ 'ports': self.ports if self.ports else None,
+ }
+ for k in ['last_refresh', 'created']:
+ if getattr(self, k):
+ status[k] = datetime_to_str(getattr(self, k))
+ status = {k: v for (k, v) in status.items() if v is not None}
+ out['status'] = status
+ if self.events:
+ out['events'] = [e.to_json() for e in self.events]
+ return out
+
+ def to_dict(self) -> OrderedDict:
+ out = self.spec.to_json()
+ status = {
+ 'container_image_id': self.container_image_id,
+ 'container_image_name': self.container_image_name,
+ 'service_url': self.service_url,
+ 'size': self.size,
+ 'running': self.running,
+ 'last_refresh': self.last_refresh,
+ 'created': self.created,
+ 'virtual_ip': self.virtual_ip,
+ 'ports': self.ports if self.ports else None,
+ }
+ for k in ['last_refresh', 'created']:
+ if getattr(self, k):
+ status[k] = datetime_to_str(getattr(self, k))
+ status = {k: v for (k, v) in status.items() if v is not None}
+ out['status'] = status
+ if self.events:
+ out['events'] = [e.to_dict() for e in self.events]
+ return out
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data: dict) -> 'ServiceDescription':
+ c = data.copy()
+ status = c.pop('status', {})
+ event_strs = c.pop('events', [])
+ spec = ServiceSpec.from_json(c)
+
+ c_status = status.copy()
+ for k in ['last_refresh', 'created']:
+ if k in c_status:
+ c_status[k] = str_to_datetime(c_status[k])
+ events = [OrchestratorEvent.from_json(e) for e in event_strs]
+ return cls(spec=spec, events=events, **c_status)
+
+ @staticmethod
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
+
+
+yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
+
+
+class InventoryFilter(object):
+ """
+ When fetching inventory, use this filter to avoid unnecessarily
+ scanning the whole estate.
+
+ Typical use:
+
+ filter by host when presentig UI workflow for configuring
+ a particular server.
+ filter by label when not all of estate is Ceph servers,
+ and we want to only learn about the Ceph servers.
+ filter by label when we are interested particularly
+ in e.g. OSD servers.
+ """
+
+ def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
+
+ #: Optional: get info about hosts matching labels
+ self.labels = labels
+
+ #: Optional: get info about certain named hosts only
+ self.hosts = hosts
+
+
+class InventoryHost(object):
+ """
+ When fetching inventory, all Devices are groups inside of an
+ InventoryHost.
+ """
+
+ def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
+ if devices is None:
+ devices = inventory.Devices([])
+ if labels is None:
+ labels = []
+ assert isinstance(devices, inventory.Devices)
+
+ self.name = name # unique within cluster. For example a hostname.
+ self.addr = addr or name
+ self.devices = devices
+ self.labels = labels
+
+ def to_json(self) -> dict:
+ return {
+ 'name': self.name,
+ 'addr': self.addr,
+ 'devices': self.devices.to_json(),
+ 'labels': self.labels,
+ }
+
+ @classmethod
+ def from_json(cls, data: dict) -> 'InventoryHost':
+ try:
+ _data = copy.deepcopy(data)
+ name = _data.pop('name')
+ addr = _data.pop('addr', None) or name
+ devices = inventory.Devices.from_json(_data.pop('devices'))
+ labels = _data.pop('labels', list())
+ if _data:
+ error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
+ raise OrchestratorValidationError(error_msg)
+ return cls(name, devices, labels, addr)
+ except KeyError as e:
+ error_msg = '{} is required for {}'.format(e, cls.__name__)
+ raise OrchestratorValidationError(error_msg)
+ except TypeError as e:
+ raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
+
+ @classmethod
+ def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
+ devs = inventory.Devices.from_json
+ return [cls(item[0], devs(item[1].data)) for item in hosts]
+
+ def __repr__(self) -> str:
+ return "<InventoryHost>({name})".format(name=self.name)
+
+ @staticmethod
+ def get_host_names(hosts: List['InventoryHost']) -> List[str]:
+ return [host.name for host in hosts]
+
+ def __eq__(self, other: Any) -> bool:
+ return self.name == other.name and self.devices == other.devices
+
+
+class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
+ """
+ Describes a specific device on a specific host. Used for enabling or disabling LEDs
+ on devices.
+
+ hostname as in :func:`orchestrator.Orchestrator.get_hosts`
+
+ device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
+ See ``ceph osd metadata | jq '.[].device_ids'``
+ """
+ __slots__ = ()
+
+
+class OrchestratorEvent:
+ """
+ Similar to K8s Events.
+
+ Some form of "important" log message attached to something.
+ """
+ INFO = 'INFO'
+ ERROR = 'ERROR'
+ regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
+
+ def __init__(self, created: Union[str, datetime.datetime], kind: str,
+ subject: str, level: str, message: str) -> None:
+ if isinstance(created, str):
+ created = str_to_datetime(created)
+ self.created: datetime.datetime = created
+
+ assert kind in "service daemon".split()
+ self.kind: str = kind
+
+ # service name, or daemon danem or something
+ self.subject: str = subject
+
+ # Events are not meant for debugging. debugs should end in the log.
+ assert level in "INFO ERROR".split()
+ self.level = level
+
+ self.message: str = message
+
+ __slots__ = ('created', 'kind', 'subject', 'level', 'message')
+
+ def kind_subject(self) -> str:
+ return f'{self.kind}:{self.subject}'
+
+ def to_json(self) -> str:
+ # Make a long list of events readable.
+ created = datetime_to_str(self.created)
+ return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
+
+ def to_dict(self) -> dict:
+ # Convert events data to dict.
+ return {
+ 'created': datetime_to_str(self.created),
+ 'subject': self.kind_subject(),
+ 'level': self.level,
+ 'message': self.message
+ }
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data: str) -> "OrchestratorEvent":
+ """
+ >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
+ '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+
+ :param data:
+ :return:
+ """
+ match = cls.regex_v1.match(data)
+ if match:
+ return cls(*match.groups())
+ raise ValueError(f'Unable to match: "{data}"')
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, OrchestratorEvent):
+ return False
+
+ return self.created == other.created and self.kind == other.kind \
+ and self.subject == other.subject and self.message == other.message
+
+ def __repr__(self) -> str:
+ return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
+
+def _mk_orch_methods(cls: Any) -> Any:
+ # Needs to be defined outside of for.
+ # Otherwise meth is always bound to last key
+ def shim(method_name: str) -> Callable:
+ def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
+ completion = self._oremote(method_name, args, kwargs)
+ return completion
+ return inner
+
+ for name, method in Orchestrator.__dict__.items():
+ if not name.startswith('_') and name not in ['is_orchestrator_module']:
+ remote_call = update_wrapper(shim(name), method)
+ setattr(cls, name, remote_call)
+ return cls
+
+
+@_mk_orch_methods
+class OrchestratorClientMixin(Orchestrator):
+ """
+ A module that inherents from `OrchestratorClientMixin` can directly call
+ all :class:`Orchestrator` methods without manually calling remote.
+
+ Every interface method from ``Orchestrator`` is converted into a stub method that internally
+ calls :func:`OrchestratorClientMixin._oremote`
+
+ >>> class MyModule(OrchestratorClientMixin):
+ ... def func(self):
+ ... completion = self.add_host('somehost') # calls `_oremote()`
+ ... self.log.debug(completion.result)
+
+ .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
+ Reason is, that OrchestratorClientMixin magically redirects all methods to the
+ "real" implementation of the orchestrator.
+
+
+ >>> import mgr_module
+ >>> #doctest: +SKIP
+ ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
+ ... def __init__(self, ...):
+ ... self.orch_client = OrchestratorClientMixin()
+ ... self.orch_client.set_mgr(self.mgr))
+ """
+
+ def set_mgr(self, mgr: MgrModule) -> None:
+ """
+ Useable in the Dashbord that uses a global ``mgr``
+ """
+
+ self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
+
+ def __get_mgr(self) -> Any:
+ try:
+ return self.__mgr
+ except AttributeError:
+ return self
+
+ def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
+ """
+ Helper for invoking `remote` on whichever orchestrator is enabled
+
+ :raises RuntimeError: If the remote method failed.
+ :raises OrchestratorError: orchestrator failed to perform
+ :raises ImportError: no `orchestrator` module or backend not found.
+ """
+ mgr = self.__get_mgr()
+
+ try:
+ o = mgr._select_orchestrator()
+ except AttributeError:
+ o = mgr.remote('orchestrator', '_select_orchestrator')
+
+ if o is None:
+ raise NoOrchestrator()
+
+ mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
+ try:
+ return mgr.remote(o, meth, *args, **kwargs)
+ except Exception as e:
+ if meth == 'get_feature_set':
+ raise # self.get_feature_set() calls self._oremote()
+ f_set = self.get_feature_set()
+ if meth not in f_set or not f_set[meth]['available']:
+ raise NotImplementedError(f'{o} does not implement {meth}') from e
+ raise
diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py
new file mode 100644
index 000000000..2288cfeef
--- /dev/null
+++ b/src/pybind/mgr/orchestrator/module.py
@@ -0,0 +1,1471 @@
+import enum
+import errno
+import json
+from typing import List, Set, Optional, Iterator, cast, Dict, Any, Union, Sequence
+import re
+import datetime
+
+import yaml
+from prettytable import PrettyTable
+
+from ceph.deployment.inventory import Device # noqa: F401; pylint: disable=unused-variable
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, OSDMethod
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, service_spec_allow_invalid_from_json
+from ceph.deployment.hostspec import SpecValidationError
+from ceph.utils import datetime_now
+
+from mgr_util import to_pretty_timedelta, format_dimless, format_bytes
+from mgr_module import MgrModule, HandleCommandResult, Option
+
+from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \
+ raise_if_exception, _cli_write_command, OrchestratorError, \
+ NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
+ RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
+ ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, \
+ GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec
+
+
+def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str:
+ if t:
+ return to_pretty_timedelta(now - t) + suffix
+ else:
+ return '-'
+
+
+def nice_bytes(v: Optional[int]) -> str:
+ if not v:
+ return '-'
+ return format_bytes(v, 5)
+
+
+class Format(enum.Enum):
+ plain = 'plain'
+ json = 'json'
+ json_pretty = 'json-pretty'
+ yaml = 'yaml'
+ xml_pretty = 'xml-pretty'
+ xml = 'xml'
+
+
+class ServiceType(enum.Enum):
+ mon = 'mon'
+ mgr = 'mgr'
+ rbd_mirror = 'rbd-mirror'
+ cephfs_mirror = 'cephfs-mirror'
+ crash = 'crash'
+ alertmanager = 'alertmanager'
+ grafana = 'grafana'
+ node_exporter = 'node-exporter'
+ prometheus = 'prometheus'
+ mds = 'mds'
+ rgw = 'rgw'
+ nfs = 'nfs'
+ iscsi = 'iscsi'
+ cephadm_exporter = 'cephadm-exporter'
+ snmp_gateway = 'snmp-gateway'
+
+
+class ServiceAction(enum.Enum):
+ start = 'start'
+ stop = 'stop'
+ restart = 'restart'
+ redeploy = 'redeploy'
+ reconfig = 'reconfig'
+
+
+class DaemonAction(enum.Enum):
+ start = 'start'
+ stop = 'stop'
+ restart = 'restart'
+ reconfig = 'reconfig'
+
+
+def to_format(what: Any, format: Format, many: bool, cls: Any) -> Any:
+ def to_json_1(obj: Any) -> Any:
+ if hasattr(obj, 'to_json'):
+ return obj.to_json()
+ return obj
+
+ def to_json_n(objs: List) -> List:
+ return [to_json_1(o) for o in objs]
+
+ to_json = to_json_n if many else to_json_1
+
+ if format == Format.json:
+ return json.dumps(to_json(what), sort_keys=True)
+ elif format == Format.json_pretty:
+ return json.dumps(to_json(what), indent=2, sort_keys=True)
+ elif format == Format.yaml:
+ # fun with subinterpreters again. pyyaml depends on object identity.
+ # as what originates from a different subinterpreter we have to copy things here.
+ if cls:
+ flat = to_json(what)
+ copy = [cls.from_json(o) for o in flat] if many else cls.from_json(flat)
+ else:
+ copy = what
+
+ def to_yaml_1(obj: Any) -> Any:
+ if hasattr(obj, 'yaml_representer'):
+ return obj
+ return to_json_1(obj)
+
+ def to_yaml_n(objs: list) -> list:
+ return [to_yaml_1(o) for o in objs]
+
+ to_yaml = to_yaml_n if many else to_yaml_1
+
+ if many:
+ return yaml.dump_all(to_yaml(copy), default_flow_style=False)
+ return yaml.dump(to_yaml(copy), default_flow_style=False)
+ elif format == Format.xml or format == Format.xml_pretty:
+ raise OrchestratorError(f"format '{format.name}' is not implemented.")
+ else:
+ raise OrchestratorError(f'unsupported format type: {format}')
+
+
+def generate_preview_tables(data: Any, osd_only: bool = False) -> str:
+ error = [x.get('error') for x in data if x.get('error')]
+ if error:
+ return json.dumps(error)
+ warning = [x.get('warning') for x in data if x.get('warning')]
+ osd_table = preview_table_osd(data)
+ service_table = preview_table_services(data)
+
+ if osd_only:
+ tables = f"""
+{''.join(warning)}
+
+################
+OSDSPEC PREVIEWS
+################
+{osd_table}
+"""
+ return tables
+ else:
+ tables = f"""
+{''.join(warning)}
+
+####################
+SERVICESPEC PREVIEWS
+####################
+{service_table}
+
+################
+OSDSPEC PREVIEWS
+################
+{osd_table}
+"""
+ return tables
+
+
+def preview_table_osd(data: List) -> str:
+ table = PrettyTable(header_style='upper', title='OSDSPEC PREVIEWS', border=True)
+ table.field_names = "service name host data db wal".split()
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ notes = ''
+ for osd_data in data:
+ if osd_data.get('service_type') != 'osd':
+ continue
+ for host, specs in osd_data.get('data').items():
+ for spec in specs:
+ if spec.get('error'):
+ return spec.get('message')
+ dg_name = spec.get('osdspec')
+ if spec.get('notes', []):
+ notes += '\n'.join(spec.get('notes')) + '\n'
+ for osd in spec.get('data', []):
+ db_path = osd.get('block_db', '-')
+ wal_path = osd.get('block_wal', '-')
+ block_data = osd.get('data', '')
+ if not block_data:
+ continue
+ table.add_row(('osd', dg_name, host, block_data, db_path, wal_path))
+ return notes + table.get_string()
+
+
+def preview_table_services(data: List) -> str:
+ table = PrettyTable(header_style='upper', title="SERVICESPEC PREVIEW", border=True)
+ table.field_names = 'SERVICE NAME ADD_TO REMOVE_FROM'.split()
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for item in data:
+ if item.get('warning'):
+ continue
+ if item.get('service_type') != 'osd':
+ table.add_row((item.get('service_type'), item.get('service_name'),
+ " ".join(item.get('add')), " ".join(item.get('remove'))))
+ return table.get_string()
+
+
+class OrchestratorCli(OrchestratorClientMixin, MgrModule,
+ metaclass=CLICommandMeta):
+ MODULE_OPTIONS = [
+ Option(
+ 'orchestrator',
+ type='str',
+ default=None,
+ desc='Orchestrator backend',
+ enum_allowed=['cephadm', 'rook', 'test_orchestrator'],
+ runtime=True,
+ )
+ ]
+ NATIVE_OPTIONS = [] # type: List[dict]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(OrchestratorCli, self).__init__(*args, **kwargs)
+ self.ident: Set[str] = set()
+ self.fault: Set[str] = set()
+ self._load()
+ self._refresh_health()
+
+ def _load(self) -> None:
+ active = self.get_store('active_devices')
+ if active:
+ decoded = json.loads(active)
+ self.ident = set(decoded.get('ident', []))
+ self.fault = set(decoded.get('fault', []))
+ self.log.debug('ident {}, fault {}'.format(self.ident, self.fault))
+
+ def _save(self) -> None:
+ encoded = json.dumps({
+ 'ident': list(self.ident),
+ 'fault': list(self.fault),
+ })
+ self.set_store('active_devices', encoded)
+
+ def _refresh_health(self) -> None:
+ h = {}
+ if self.ident:
+ h['DEVICE_IDENT_ON'] = {
+ 'severity': 'warning',
+ 'summary': '%d devices have ident light turned on' % len(
+ self.ident),
+ 'detail': ['{} ident light enabled'.format(d) for d in self.ident]
+ }
+ if self.fault:
+ h['DEVICE_FAULT_ON'] = {
+ 'severity': 'warning',
+ 'summary': '%d devices have fault light turned on' % len(
+ self.fault),
+ 'detail': ['{} fault light enabled'.format(d) for d in self.ident]
+ }
+ self.set_health_checks(h)
+
+ def _get_device_locations(self, dev_id):
+ # type: (str) -> List[DeviceLightLoc]
+ locs = [d['location'] for d in self.get('devices')['devices'] if d['devid'] == dev_id]
+ return [DeviceLightLoc(**loc) for loc in sum(locs, [])]
+
+ @_cli_read_command(prefix='device ls-lights')
+ def _device_ls(self) -> HandleCommandResult:
+ """List currently active device indicator lights"""
+ return HandleCommandResult(
+ stdout=json.dumps({
+ 'ident': list(self.ident),
+ 'fault': list(self.fault)
+ }, indent=4, sort_keys=True))
+
+ def light_on(self, fault_ident, devid):
+ # type: (str, str) -> HandleCommandResult
+ assert fault_ident in ("fault", "ident")
+ locs = self._get_device_locations(devid)
+ if locs is None:
+ return HandleCommandResult(stderr='device {} not found'.format(devid),
+ retval=-errno.ENOENT)
+
+ getattr(self, fault_ident).add(devid)
+ self._save()
+ self._refresh_health()
+ completion = self.blink_device_light(fault_ident, True, locs)
+ return HandleCommandResult(stdout=str(completion.result))
+
+ def light_off(self, fault_ident, devid, force):
+ # type: (str, str, bool) -> HandleCommandResult
+ assert fault_ident in ("fault", "ident")
+ locs = self._get_device_locations(devid)
+ if locs is None:
+ return HandleCommandResult(stderr='device {} not found'.format(devid),
+ retval=-errno.ENOENT)
+
+ try:
+ completion = self.blink_device_light(fault_ident, False, locs)
+
+ if devid in getattr(self, fault_ident):
+ getattr(self, fault_ident).remove(devid)
+ self._save()
+ self._refresh_health()
+ return HandleCommandResult(stdout=str(completion.result))
+
+ except Exception:
+ # There are several reasons the try: block might fail:
+ # 1. the device no longer exist
+ # 2. the device is no longer known to Ceph
+ # 3. the host is not reachable
+ if force and devid in getattr(self, fault_ident):
+ getattr(self, fault_ident).remove(devid)
+ self._save()
+ self._refresh_health()
+ raise
+
+ class DeviceLightEnable(enum.Enum):
+ on = 'on'
+ off = 'off'
+
+ class DeviceLightType(enum.Enum):
+ ident = 'ident'
+ fault = 'fault'
+
+ @_cli_write_command(prefix='device light')
+ def _device_light(self,
+ enable: DeviceLightEnable,
+ devid: str,
+ light_type: DeviceLightType = DeviceLightType.ident,
+ force: bool = False) -> HandleCommandResult:
+ """
+ Enable or disable the device light. Default type is `ident`
+ 'Usage: device light (on|off) <devid> [ident|fault] [--force]'
+ """""
+ if enable == self.DeviceLightEnable.on:
+ return self.light_on(light_type.value, devid)
+ else:
+ return self.light_off(light_type.value, devid, force)
+
+ def _select_orchestrator(self) -> str:
+ return cast(str, self.get_module_option("orchestrator"))
+
+ @_cli_write_command('orch host add')
+ def _add_host(self,
+ hostname: str,
+ addr: Optional[str] = None,
+ labels: Optional[List[str]] = None,
+ maintenance: Optional[bool] = False) -> HandleCommandResult:
+ """Add a host"""
+ _status = 'maintenance' if maintenance else ''
+
+ # split multiple labels passed in with --labels=label1,label2
+ if labels and len(labels) == 1:
+ labels = labels[0].split(',')
+
+ s = HostSpec(hostname=hostname, addr=addr, labels=labels, status=_status)
+
+ return self._apply_misc([s], False, Format.plain)
+
+ @_cli_write_command('orch host rm')
+ def _remove_host(self, hostname: str, force: bool = False, offline: bool = False) -> HandleCommandResult:
+ """Remove a host"""
+ completion = self.remove_host(hostname, force, offline)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch host drain')
+ def _drain_host(self, hostname: str, force: bool = False) -> HandleCommandResult:
+ """drain all daemons from a host"""
+ completion = self.drain_host(hostname, force)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch host set-addr')
+ def _update_set_addr(self, hostname: str, addr: str) -> HandleCommandResult:
+ """Update a host address"""
+ completion = self.update_host_addr(hostname, addr)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_read_command('orch host ls')
+ def _get_hosts(self, format: Format = Format.plain, host_pattern: str = '', label: str = '', host_status: str = '') -> HandleCommandResult:
+ """List hosts"""
+ completion = self.get_hosts()
+ hosts = raise_if_exception(completion)
+
+ filter_spec = PlacementSpec(
+ host_pattern=host_pattern,
+ label=label
+ )
+ filtered_hosts: List[str] = filter_spec.filter_matching_hostspecs(hosts)
+ hosts = [h for h in hosts if h.hostname in filtered_hosts]
+
+ if host_status:
+ hosts = [h for h in hosts if h.status.lower() == host_status]
+
+ if format != Format.plain:
+ output = to_format(hosts, format, many=True, cls=HostSpec)
+ else:
+ table = PrettyTable(
+ ['HOST', 'ADDR', 'LABELS', 'STATUS'],
+ border=False)
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for host in sorted(hosts, key=lambda h: h.hostname):
+ table.add_row((host.hostname, host.addr, ' '.join(
+ host.labels), host.status.capitalize()))
+ output = table.get_string()
+ if format == Format.plain:
+ output += f'\n{len(hosts)} hosts in cluster'
+ if label:
+ output += f' who had label {label}'
+ if host_pattern:
+ output += f' whose hostname matched {host_pattern}'
+ if host_status:
+ output += f' with status {host_status}'
+ return HandleCommandResult(stdout=output)
+
+ @_cli_write_command('orch host label add')
+ def _host_label_add(self, hostname: str, label: str) -> HandleCommandResult:
+ """Add a host label"""
+ completion = self.add_host_label(hostname, label)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch host label rm')
+ def _host_label_rm(self, hostname: str, label: str, force: bool = False) -> HandleCommandResult:
+ """Remove a host label"""
+ completion = self.remove_host_label(hostname, label, force)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch host ok-to-stop')
+ def _host_ok_to_stop(self, hostname: str) -> HandleCommandResult:
+ """Check if the specified host can be safely stopped without reducing availability"""""
+ completion = self.host_ok_to_stop(hostname)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command(
+ 'orch host maintenance enter')
+ def _host_maintenance_enter(self, hostname: str, force: bool = False) -> HandleCommandResult:
+ """
+ Prepare a host for maintenance by shutting down and disabling all Ceph daemons (cephadm only)
+ """
+ completion = self.enter_host_maintenance(hostname, force=force)
+ raise_if_exception(completion)
+
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command(
+ 'orch host maintenance exit')
+ def _host_maintenance_exit(self, hostname: str) -> HandleCommandResult:
+ """
+ Return a host from maintenance, restarting all Ceph daemons (cephadm only)
+ """
+ completion = self.exit_host_maintenance(hostname)
+ raise_if_exception(completion)
+
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch host rescan')
+ def _host_rescan(self, hostname: str, with_summary: bool = False) -> HandleCommandResult:
+ """Perform a disk rescan on a host"""
+ completion = self.rescan_host(hostname)
+ raise_if_exception(completion)
+
+ if with_summary:
+ return HandleCommandResult(stdout=completion.result_str())
+ return HandleCommandResult(stdout=completion.result_str().split('.')[0])
+
+ @_cli_read_command('orch device ls')
+ def _list_devices(self,
+ hostname: Optional[List[str]] = None,
+ format: Format = Format.plain,
+ refresh: bool = False,
+ wide: bool = False) -> HandleCommandResult:
+ """
+ List devices on a host
+ """
+ # Provide information about storage devices present in cluster hosts
+ #
+ # Note: this does not have to be completely synchronous. Slightly out of
+ # date hardware inventory is fine as long as hardware ultimately appears
+ # in the output of this command.
+ nf = InventoryFilter(hosts=hostname) if hostname else None
+
+ completion = self.get_inventory(host_filter=nf, refresh=refresh)
+
+ inv_hosts = raise_if_exception(completion)
+
+ if format != Format.plain:
+ return HandleCommandResult(stdout=to_format(inv_hosts,
+ format,
+ many=True,
+ cls=InventoryHost))
+ else:
+ display_map = {
+ "Unsupported": "N/A",
+ "N/A": "N/A",
+ "On": "On",
+ "Off": "Off",
+ True: "Yes",
+ False: "",
+ }
+
+ out = []
+ if wide:
+ table = PrettyTable(
+ ['HOST', 'PATH', 'TYPE', 'TRANSPORT', 'RPM', 'DEVICE ID', 'SIZE',
+ 'HEALTH', 'IDENT', 'FAULT',
+ 'AVAILABLE', 'REFRESHED', 'REJECT REASONS'],
+ border=False)
+ else:
+ table = PrettyTable(
+ ['HOST', 'PATH', 'TYPE', 'DEVICE ID', 'SIZE',
+ 'AVAILABLE', 'REFRESHED', 'REJECT REASONS'],
+ border=False)
+ table.align = 'l'
+ table._align['SIZE'] = 'r'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ now = datetime_now()
+ for host_ in sorted(inv_hosts, key=lambda h: h.name): # type: InventoryHost
+ for d in sorted(host_.devices.devices, key=lambda d: d.path): # type: Device
+
+ led_ident = 'N/A'
+ led_fail = 'N/A'
+ if d.lsm_data.get('ledSupport', None):
+ led_ident = d.lsm_data['ledSupport']['IDENTstatus']
+ led_fail = d.lsm_data['ledSupport']['FAILstatus']
+
+ if wide:
+ table.add_row(
+ (
+ host_.name,
+ d.path,
+ d.human_readable_type,
+ d.lsm_data.get('transport', ''),
+ d.lsm_data.get('rpm', ''),
+ d.device_id,
+ format_dimless(d.sys_api.get('size', 0), 5),
+ d.lsm_data.get('health', ''),
+ display_map[led_ident],
+ display_map[led_fail],
+ display_map[d.available],
+ nice_delta(now, d.created, ' ago'),
+ ', '.join(d.rejected_reasons)
+ )
+ )
+ else:
+ table.add_row(
+ (
+ host_.name,
+ d.path,
+ d.human_readable_type,
+ d.device_id,
+ format_dimless(d.sys_api.get('size', 0), 5),
+ display_map[d.available],
+ nice_delta(now, d.created, ' ago'),
+ ', '.join(d.rejected_reasons)
+ )
+ )
+ out.append(table.get_string())
+ return HandleCommandResult(stdout='\n'.join(out))
+
+ @_cli_write_command('orch device zap')
+ def _zap_device(self, hostname: str, path: str, force: bool = False) -> HandleCommandResult:
+ """
+ Zap (erase!) a device so it can be re-used
+ """
+ if not force:
+ raise OrchestratorError('must pass --force to PERMANENTLY ERASE DEVICE DATA')
+ completion = self.zap_device(hostname, path)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_read_command('orch ls')
+ def _list_services(self,
+ service_type: Optional[str] = None,
+ service_name: Optional[str] = None,
+ export: bool = False,
+ format: Format = Format.plain,
+ refresh: bool = False) -> HandleCommandResult:
+ """
+ List services known to orchestrator
+ """
+ if export and format == Format.plain:
+ format = Format.yaml
+
+ completion = self.describe_service(service_type,
+ service_name,
+ refresh=refresh)
+
+ services = raise_if_exception(completion)
+
+ def ukn(s: Optional[str]) -> str:
+ return '<unknown>' if s is None else s
+
+ # Sort the list for display
+ services.sort(key=lambda s: (ukn(s.spec.service_name())))
+
+ if len(services) == 0:
+ return HandleCommandResult(stdout="No services reported")
+ elif format != Format.plain:
+ with service_spec_allow_invalid_from_json():
+ if export:
+ data = [s.spec for s in services if s.deleted is None]
+ return HandleCommandResult(stdout=to_format(data, format, many=True, cls=ServiceSpec))
+ else:
+ return HandleCommandResult(stdout=to_format(services, format, many=True, cls=ServiceDescription))
+ else:
+ now = datetime_now()
+ table = PrettyTable(
+ [
+ 'NAME', 'PORTS',
+ 'RUNNING', 'REFRESHED', 'AGE',
+ 'PLACEMENT',
+ ],
+ border=False)
+ table.align['NAME'] = 'l'
+ table.align['PORTS'] = 'l'
+ table.align['RUNNING'] = 'r'
+ table.align['REFRESHED'] = 'l'
+ table.align['AGE'] = 'l'
+ table.align['PLACEMENT'] = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for s in services:
+ if not s.spec:
+ pl = '<no spec>'
+ elif s.spec.unmanaged:
+ pl = '<unmanaged>'
+ else:
+ pl = s.spec.placement.pretty_str()
+ if s.deleted:
+ refreshed = '<deleting>'
+ else:
+ refreshed = nice_delta(now, s.last_refresh, ' ago')
+
+ if s.spec.service_type == 'osd':
+ running = str(s.running)
+ else:
+ running = '{}/{}'.format(s.running, s.size)
+
+ table.add_row((
+ s.spec.service_name(),
+ s.get_port_summary(),
+ running,
+ refreshed,
+ nice_delta(now, s.created),
+ pl,
+ ))
+
+ return HandleCommandResult(stdout=table.get_string())
+
+ @_cli_read_command('orch ps')
+ def _list_daemons(self,
+ hostname: Optional[str] = None,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ format: Format = Format.plain,
+ refresh: bool = False) -> HandleCommandResult:
+ """
+ List daemons known to orchestrator
+ """
+ completion = self.list_daemons(service_name,
+ daemon_type,
+ daemon_id=daemon_id,
+ host=hostname,
+ refresh=refresh)
+
+ daemons = raise_if_exception(completion)
+
+ def ukn(s: Optional[str]) -> str:
+ return '<unknown>' if s is None else s
+ # Sort the list for display
+ daemons.sort(key=lambda s: (ukn(s.daemon_type), ukn(s.hostname), ukn(s.daemon_id)))
+
+ if format != Format.plain:
+ return HandleCommandResult(stdout=to_format(daemons, format, many=True, cls=DaemonDescription))
+ else:
+ if len(daemons) == 0:
+ return HandleCommandResult(stdout="No daemons reported")
+
+ now = datetime_now()
+ table = PrettyTable(
+ ['NAME', 'HOST', 'PORTS',
+ 'STATUS', 'REFRESHED', 'AGE',
+ 'MEM USE', 'MEM LIM',
+ 'VERSION', 'IMAGE ID', 'CONTAINER ID'],
+ border=False)
+ table.align = 'l'
+ table._align['REFRESHED'] = 'r'
+ table._align['AGE'] = 'r'
+ table._align['MEM USE'] = 'r'
+ table._align['MEM LIM'] = 'r'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for s in sorted(daemons, key=lambda s: s.name()):
+ if s.status_desc:
+ status = s.status_desc
+ else:
+ status = DaemonDescriptionStatus.to_str(s.status)
+ if s.status == DaemonDescriptionStatus.running and s.started: # See DDS.starting
+ status += ' (%s)' % to_pretty_timedelta(now - s.started)
+
+ table.add_row((
+ s.name(),
+ ukn(s.hostname),
+ s.get_port_summary(),
+ status,
+ nice_delta(now, s.last_refresh, ' ago'),
+ nice_delta(now, s.created),
+ nice_bytes(s.memory_usage),
+ nice_bytes(s.memory_request),
+ ukn(s.version),
+ ukn(s.container_image_id)[0:12],
+ ukn(s.container_id)))
+
+ remove_column = 'CONTAINER ID'
+ if table.get_string(fields=[remove_column], border=False,
+ header=False).count('<unknown>') == len(daemons):
+ try:
+ table.del_column(remove_column)
+ except AttributeError as e:
+ # del_column method was introduced in prettytable 2.0
+ if str(e) != "del_column":
+ raise
+ table.field_names.remove(remove_column)
+ table._rows = [row[:-1] for row in table._rows]
+
+ return HandleCommandResult(stdout=table.get_string())
+
+ @_cli_write_command('orch apply osd')
+ def _apply_osd(self,
+ all_available_devices: bool = False,
+ format: Format = Format.plain,
+ unmanaged: Optional[bool] = None,
+ dry_run: bool = False,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None # deprecated. Was deprecated before Quincy
+ ) -> HandleCommandResult:
+ """
+ Create OSD daemon(s) on all available devices
+ """
+
+ if inbuf and all_available_devices:
+ return HandleCommandResult(-errno.EINVAL, '-i infile and --all-available-devices are mutually exclusive')
+
+ if not inbuf and not all_available_devices:
+ # one parameter must be present
+ return HandleCommandResult(-errno.EINVAL, '--all-available-devices is required')
+
+ if inbuf:
+ if unmanaged is not None:
+ return HandleCommandResult(-errno.EINVAL, stderr='-i infile and --unmanaged are mutually exclusive')
+
+ try:
+ drivegroups = [_dg for _dg in yaml.safe_load_all(inbuf)]
+ except yaml.scanner.ScannerError as e:
+ msg = f"Invalid YAML received : {str(e)}"
+ self.log.exception(e)
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
+
+ dg_specs = []
+ for dg in drivegroups:
+ spec = DriveGroupSpec.from_json(dg)
+ if dry_run:
+ spec.preview_only = True
+ dg_specs.append(spec)
+
+ return self._apply_misc(dg_specs, dry_run, format, no_overwrite)
+
+ if all_available_devices:
+ if unmanaged is None:
+ unmanaged = False
+ dg_specs = [
+ DriveGroupSpec(
+ service_id='all-available-devices',
+ placement=PlacementSpec(host_pattern='*'),
+ data_devices=DeviceSelection(all=True),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+ ]
+ return self._apply_misc(dg_specs, dry_run, format, no_overwrite)
+
+ return HandleCommandResult(-errno.EINVAL, stderr='--all-available-devices is required')
+
+ @_cli_write_command('orch daemon add osd')
+ def _daemon_add_osd(self,
+ svc_arg: Optional[str] = None,
+ method: Optional[OSDMethod] = None) -> HandleCommandResult:
+ """Create OSD daemon(s) on specified host and device(s) (e.g., ceph orch daemon add osd myhost:/dev/sdb)"""
+ # Create one or more OSDs"""
+
+ usage = """
+Usage:
+ ceph orch daemon add osd host:device1,device2,...
+ ceph orch daemon add osd host:data_devices=device1,device2,db_devices=device3,osds_per_device=2,...
+"""
+ if not svc_arg:
+ return HandleCommandResult(-errno.EINVAL, stderr=usage)
+ try:
+ host_name, raw = svc_arg.split(":")
+ drive_group_spec = {
+ 'data_devices': []
+ } # type: Dict
+ drv_grp_spec_arg = None
+ values = raw.split(',')
+ while values:
+ v = values[0].split(',', 1)[0]
+ if '=' in v:
+ drv_grp_spec_arg, value = v.split('=')
+ if drv_grp_spec_arg in ['data_devices',
+ 'db_devices',
+ 'wal_devices',
+ 'journal_devices']:
+ drive_group_spec[drv_grp_spec_arg] = []
+ drive_group_spec[drv_grp_spec_arg].append(value)
+ else:
+ drive_group_spec[drv_grp_spec_arg] = value
+ elif drv_grp_spec_arg is not None:
+ drive_group_spec[drv_grp_spec_arg].append(v)
+ else:
+ drive_group_spec['data_devices'].append(v)
+ values.remove(v)
+
+ for dev_type in ['data_devices', 'db_devices', 'wal_devices', 'journal_devices']:
+ drive_group_spec[dev_type] = DeviceSelection(paths=drive_group_spec[dev_type]) if drive_group_spec.get(dev_type) else None
+
+ drive_group = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern=host_name),
+ method=method,
+ **drive_group_spec,
+ )
+ except (TypeError, KeyError, ValueError) as e:
+ msg = f"Invalid host:device spec: '{svc_arg}': {e}" + usage
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
+
+ completion = self.create_osds(drive_group)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch osd rm')
+ def _osd_rm_start(self,
+ osd_id: List[str],
+ replace: bool = False,
+ force: bool = False,
+ zap: bool = False) -> HandleCommandResult:
+ """Remove OSD daemons"""
+ completion = self.remove_osds(osd_id, replace=replace, force=force, zap=zap)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch osd rm stop')
+ def _osd_rm_stop(self, osd_id: List[str]) -> HandleCommandResult:
+ """Cancel ongoing OSD removal operation"""
+ completion = self.stop_remove_osds(osd_id)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch osd rm status')
+ def _osd_rm_status(self, format: Format = Format.plain) -> HandleCommandResult:
+ """Status of OSD removal operation"""
+ completion = self.remove_osds_status()
+ raise_if_exception(completion)
+ report = completion.result
+
+ if not report:
+ return HandleCommandResult(stdout="No OSD remove/replace operations reported")
+
+ if format != Format.plain:
+ out = to_format(report, format, many=True, cls=None)
+ else:
+ table = PrettyTable(
+ ['OSD', 'HOST', 'STATE', 'PGS', 'REPLACE', 'FORCE', 'ZAP',
+ 'DRAIN STARTED AT'],
+ border=False)
+ table.align = 'l'
+ table._align['PGS'] = 'r'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for osd in sorted(report, key=lambda o: o.osd_id):
+ table.add_row([osd.osd_id, osd.hostname, osd.drain_status_human(),
+ osd.get_pg_count(), osd.replace, osd.force, osd.zap,
+ osd.drain_started_at or ''])
+ out = table.get_string()
+
+ return HandleCommandResult(stdout=out)
+
+ @_cli_write_command('orch daemon add')
+ def daemon_add_misc(self,
+ daemon_type: Optional[ServiceType] = None,
+ placement: Optional[str] = None,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Add daemon(s)"""
+ usage = f"""Usage:
+ ceph orch daemon add -i <json_file>
+ ceph orch daemon add {daemon_type or '<daemon_type>'} <placement>"""
+ if inbuf:
+ if daemon_type or placement:
+ raise OrchestratorValidationError(usage)
+ spec = ServiceSpec.from_json(yaml.safe_load(inbuf))
+ else:
+ if not placement or not daemon_type:
+ raise OrchestratorValidationError(usage)
+ placement_spec = PlacementSpec.from_string(placement)
+ spec = ServiceSpec(daemon_type.value, placement=placement_spec)
+
+ return self._daemon_add_misc(spec)
+
+ def _daemon_add_misc(self, spec: ServiceSpec) -> HandleCommandResult:
+ completion = self.add_daemon(spec)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch daemon add mds')
+ def _mds_add(self,
+ fs_name: str,
+ placement: Optional[str] = None,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Start MDS daemon(s)"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = ServiceSpec(
+ service_type='mds',
+ service_id=fs_name,
+ placement=PlacementSpec.from_string(placement),
+ )
+ return self._daemon_add_misc(spec)
+
+ @_cli_write_command('orch daemon add rgw')
+ def _rgw_add(self,
+ svc_id: str,
+ placement: Optional[str] = None,
+ port: Optional[int] = None,
+ ssl: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Start RGW daemon(s)"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = RGWSpec(
+ service_id=svc_id,
+ rgw_frontend_port=port,
+ ssl=ssl,
+ placement=PlacementSpec.from_string(placement),
+ )
+ return self._daemon_add_misc(spec)
+
+ @_cli_write_command('orch daemon add nfs')
+ def _nfs_add(self,
+ svc_id: str,
+ placement: Optional[str] = None,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Start NFS daemon(s)"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = NFSServiceSpec(
+ service_id=svc_id,
+ placement=PlacementSpec.from_string(placement),
+ )
+ return self._daemon_add_misc(spec)
+
+ @_cli_write_command('orch daemon add iscsi')
+ def _iscsi_add(self,
+ pool: str,
+ api_user: str,
+ api_password: str,
+ trusted_ip_list: Optional[str] = None,
+ placement: Optional[str] = None,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Start iscsi daemon(s)"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = IscsiServiceSpec(
+ service_id='iscsi',
+ pool=pool,
+ api_user=api_user,
+ api_password=api_password,
+ trusted_ip_list=trusted_ip_list,
+ placement=PlacementSpec.from_string(placement),
+ )
+ return self._daemon_add_misc(spec)
+
+ @_cli_write_command('orch')
+ def _service_action(self, action: ServiceAction, service_name: str) -> HandleCommandResult:
+ """Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)"""
+ completion = self.service_action(action.value, service_name)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch daemon')
+ def _daemon_action(self, action: DaemonAction, name: str) -> HandleCommandResult:
+ """Start, stop, restart, (redeploy,) or reconfig a specific daemon"""
+ if '.' not in name:
+ raise OrchestratorError('%s is not a valid daemon name' % name)
+ completion = self.daemon_action(action.value, name)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch daemon redeploy')
+ def _daemon_action_redeploy(self,
+ name: str,
+ image: Optional[str] = None) -> HandleCommandResult:
+ """Redeploy a daemon (with a specifc image)"""
+ if '.' not in name:
+ raise OrchestratorError('%s is not a valid daemon name' % name)
+ completion = self.daemon_action("redeploy", name, image=image)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch daemon rm')
+ def _daemon_rm(self,
+ names: List[str],
+ force: Optional[bool] = False) -> HandleCommandResult:
+ """Remove specific daemon(s)"""
+ for name in names:
+ if '.' not in name:
+ raise OrchestratorError('%s is not a valid daemon name' % name)
+ (daemon_type) = name.split('.')[0]
+ if not force and daemon_type in ['osd', 'mon', 'prometheus']:
+ raise OrchestratorError(
+ 'must pass --force to REMOVE daemon with potentially PRECIOUS DATA for %s' % name)
+ completion = self.remove_daemons(names)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch rm')
+ def _service_rm(self,
+ service_name: str,
+ force: bool = False) -> HandleCommandResult:
+ """Remove a service"""
+ if service_name in ['mon', 'mgr'] and not force:
+ raise OrchestratorError('The mon and mgr services cannot be removed')
+ completion = self.remove_service(service_name, force=force)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch apply')
+ def apply_misc(self,
+ service_type: Optional[ServiceType] = None,
+ placement: Optional[str] = None,
+ dry_run: bool = False,
+ format: Format = Format.plain,
+ unmanaged: bool = False,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Update the size or placement for a service or apply a large yaml spec"""
+ usage = """Usage:
+ ceph orch apply -i <yaml spec> [--dry-run]
+ ceph orch apply <service_type> [--placement=<placement_string>] [--unmanaged]
+ """
+ if inbuf:
+ if service_type or placement or unmanaged:
+ raise OrchestratorValidationError(usage)
+ yaml_objs: Iterator = yaml.safe_load_all(inbuf)
+ specs: List[Union[ServiceSpec, HostSpec]] = []
+ # YAML '---' document separator with no content generates
+ # None entries in the output. Let's skip them silently.
+ content = [o for o in yaml_objs if o is not None]
+ for s in content:
+ spec = json_to_generic_spec(s)
+
+ # validate the config (we need MgrModule for that)
+ if isinstance(spec, ServiceSpec) and spec.config:
+ for k, v in spec.config.items():
+ try:
+ self.get_foreign_ceph_option('mon', k)
+ except KeyError:
+ raise SpecValidationError(f'Invalid config option {k} in spec')
+
+ if dry_run and not isinstance(spec, HostSpec):
+ spec.preview_only = dry_run
+ specs.append(spec)
+ else:
+ placementspec = PlacementSpec.from_string(placement)
+ if not service_type:
+ raise OrchestratorValidationError(usage)
+ specs = [ServiceSpec(service_type.value, placement=placementspec,
+ unmanaged=unmanaged, preview_only=dry_run)]
+ return self._apply_misc(specs, dry_run, format, no_overwrite)
+
+ def _apply_misc(self, specs: Sequence[GenericSpec], dry_run: bool, format: Format, no_overwrite: bool = False) -> HandleCommandResult:
+ completion = self.apply(specs, no_overwrite)
+ raise_if_exception(completion)
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan(specs)
+ raise_if_exception(completion)
+ data = completion.result
+ if format == Format.plain:
+ out = generate_preview_tables(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
+
+ @_cli_write_command('orch apply mds')
+ def _apply_mds(self,
+ fs_name: str,
+ placement: Optional[str] = None,
+ dry_run: bool = False,
+ unmanaged: bool = False,
+ format: Format = Format.plain,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Update the number of MDS instances for the given fs_name"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = MDSSpec(
+ service_type='mds',
+ service_id=fs_name,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run)
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
+ @_cli_write_command('orch apply rgw')
+ def _apply_rgw(self,
+ svc_id: str,
+ placement: Optional[str] = None,
+ realm: Optional[str] = None,
+ zone: Optional[str] = None,
+ port: Optional[int] = None,
+ ssl: bool = False,
+ dry_run: bool = False,
+ format: Format = Format.plain,
+ unmanaged: bool = False,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Update the number of RGW instances for the given zone"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ if realm and not zone:
+ raise OrchestratorValidationError(
+ 'Cannot add RGW: Realm specified but no zone specified')
+ if zone and not realm:
+ raise OrchestratorValidationError(
+ 'Cannot add RGW: Zone specified but no realm specified')
+
+ spec = RGWSpec(
+ service_id=svc_id,
+ rgw_realm=realm,
+ rgw_zone=zone,
+ rgw_frontend_port=port,
+ ssl=ssl,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
+ @_cli_write_command('orch apply nfs')
+ def _apply_nfs(self,
+ svc_id: str,
+ placement: Optional[str] = None,
+ format: Format = Format.plain,
+ port: Optional[int] = None,
+ dry_run: bool = False,
+ unmanaged: bool = False,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Scale an NFS service"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = NFSServiceSpec(
+ service_id=svc_id,
+ port=port,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
+ @_cli_write_command('orch apply iscsi')
+ def _apply_iscsi(self,
+ pool: str,
+ api_user: str,
+ api_password: str,
+ trusted_ip_list: Optional[str] = None,
+ placement: Optional[str] = None,
+ unmanaged: bool = False,
+ dry_run: bool = False,
+ format: Format = Format.plain,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Scale an iSCSI service"""
+ if inbuf:
+ raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
+
+ spec = IscsiServiceSpec(
+ service_id=pool,
+ pool=pool,
+ api_user=api_user,
+ api_password=api_password,
+ trusted_ip_list=trusted_ip_list,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
+ @_cli_write_command('orch apply snmp-gateway')
+ def _apply_snmp_gateway(self,
+ snmp_version: SNMPGatewaySpec.SNMPVersion,
+ destination: str,
+ port: Optional[int] = None,
+ engine_id: Optional[str] = None,
+ auth_protocol: Optional[SNMPGatewaySpec.SNMPAuthType] = None,
+ privacy_protocol: Optional[SNMPGatewaySpec.SNMPPrivacyType] = None,
+ placement: Optional[str] = None,
+ unmanaged: bool = False,
+ dry_run: bool = False,
+ format: Format = Format.plain,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Add a Prometheus to SNMP gateway service (cephadm only)"""
+
+ if not inbuf:
+ raise OrchestratorValidationError(
+ 'missing credential configuration file. Retry with -i <filename>')
+
+ try:
+ # load inbuf
+ credentials = yaml.safe_load(inbuf)
+ except (OSError, yaml.YAMLError):
+ raise OrchestratorValidationError('credentials file must be valid YAML')
+
+ spec = SNMPGatewaySpec(
+ snmp_version=snmp_version,
+ port=port,
+ credentials=credentials,
+ snmp_destination=destination,
+ engine_id=engine_id,
+ auth_protocol=auth_protocol,
+ privacy_protocol=privacy_protocol,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
+ @_cli_write_command('orch set backend')
+ def _set_backend(self, module_name: Optional[str] = None) -> HandleCommandResult:
+ """
+ Select orchestrator module backend
+ """
+ # We implement a setter command instead of just having the user
+ # modify the setting directly, so that we can validate they're setting
+ # it to a module that really exists and is enabled.
+
+ # There isn't a mechanism for ensuring they don't *disable* the module
+ # later, but this is better than nothing.
+ mgr_map = self.get("mgr_map")
+
+ if module_name is None or module_name == "":
+ self.set_module_option("orchestrator", None)
+ return HandleCommandResult()
+
+ for module in mgr_map['available_modules']:
+ if module['name'] != module_name:
+ continue
+
+ if not module['can_run']:
+ continue
+
+ enabled = module['name'] in mgr_map['modules']
+ if not enabled:
+ return HandleCommandResult(-errno.EINVAL,
+ stderr="Module '{module_name}' is not enabled. \n Run "
+ "`ceph mgr module enable {module_name}` "
+ "to enable.".format(module_name=module_name))
+
+ try:
+ is_orchestrator = self.remote(module_name,
+ "is_orchestrator_module")
+ except NameError:
+ is_orchestrator = False
+
+ if not is_orchestrator:
+ return HandleCommandResult(-errno.EINVAL,
+ stderr="'{0}' is not an orchestrator module".format(module_name))
+
+ self.set_module_option("orchestrator", module_name)
+
+ return HandleCommandResult()
+
+ return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
+
+ @_cli_write_command('orch pause')
+ def _pause(self) -> HandleCommandResult:
+ """Pause orchestrator background work"""
+ self.pause()
+ return HandleCommandResult()
+
+ @_cli_write_command('orch resume')
+ def _resume(self) -> HandleCommandResult:
+ """Resume orchestrator background work (if paused)"""
+ self.resume()
+ return HandleCommandResult()
+
+ @_cli_write_command('orch cancel')
+ def _cancel(self) -> HandleCommandResult:
+ """
+ Cancel ongoing background operations
+ """
+ self.cancel_completions()
+ return HandleCommandResult()
+
+ @_cli_read_command('orch status')
+ def _status(self,
+ detail: bool = False,
+ format: Format = Format.plain) -> HandleCommandResult:
+ """Report configured backend and its status"""
+ o = self._select_orchestrator()
+ if o is None:
+ raise NoOrchestrator()
+
+ avail, why, module_details = self.available()
+ result: Dict[str, Any] = {
+ "available": avail,
+ "backend": o,
+ }
+
+ if avail:
+ result.update(module_details)
+ else:
+ result['reason'] = why
+
+ if format != Format.plain:
+ output = to_format(result, format, many=False, cls=None)
+ else:
+ output = "Backend: {0}".format(result['backend'])
+ output += f"\nAvailable: {'Yes' if result['available'] else 'No'}"
+ if 'reason' in result:
+ output += ' ({0})'.format(result['reason'])
+ if 'paused' in result:
+ output += f"\nPaused: {'Yes' if result['paused'] else 'No'}"
+ if 'workers' in result and detail:
+ output += f"\nHost Parallelism: {result['workers']}"
+ return HandleCommandResult(stdout=output)
+
+ def self_test(self) -> None:
+ old_orch = self._select_orchestrator()
+ self._set_backend('')
+ assert self._select_orchestrator() is None
+ self._set_backend(old_orch)
+
+ e1 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError")
+ try:
+ raise_if_exception(e1)
+ assert False
+ except ZeroDivisionError as e:
+ assert e.args == ('hello, world',)
+
+ e2 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError")
+ try:
+ raise_if_exception(e2)
+ assert False
+ except OrchestratorError as e:
+ assert e.args == ('hello, world',)
+
+ @staticmethod
+ def _upgrade_check_image_name(image: Optional[str], ceph_version: Optional[str]) -> None:
+ """
+ >>> OrchestratorCli._upgrade_check_image_name('v15.2.0', None)
+ Traceback (most recent call last):
+ orchestrator._interface.OrchestratorValidationError: Error: unable to pull image name `v15.2.0`.
+ Maybe you meant `--ceph-version 15.2.0`?
+
+ """
+ if image and re.match(r'^v?\d+\.\d+\.\d+$', image) and ceph_version is None:
+ ver = image[1:] if image.startswith('v') else image
+ s = f"Error: unable to pull image name `{image}`.\n" \
+ f" Maybe you meant `--ceph-version {ver}`?"
+ raise OrchestratorValidationError(s)
+
+ @_cli_write_command('orch upgrade check')
+ def _upgrade_check(self,
+ image: Optional[str] = None,
+ ceph_version: Optional[str] = None) -> HandleCommandResult:
+ """Check service versions vs available and target containers"""
+ self._upgrade_check_image_name(image, ceph_version)
+ completion = self.upgrade_check(image=image, version=ceph_version)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_read_command('orch upgrade ls')
+ def _upgrade_ls(self,
+ image: Optional[str] = None,
+ tags: bool = False) -> HandleCommandResult:
+ """Check for available versions (or tags) we can upgrade to"""
+ completion = self.upgrade_ls(image, tags)
+ r = raise_if_exception(completion)
+ out = json.dumps(r, indent=4)
+ return HandleCommandResult(stdout=out)
+
+ @_cli_write_command('orch upgrade status')
+ def _upgrade_status(self) -> HandleCommandResult:
+ """Check service versions vs available and target containers"""
+ completion = self.upgrade_status()
+ status = raise_if_exception(completion)
+ r = {
+ 'target_image': status.target_image,
+ 'in_progress': status.in_progress,
+ 'which': status.which,
+ 'services_complete': status.services_complete,
+ 'progress': status.progress,
+ 'message': status.message,
+ 'is_paused': status.is_paused,
+ }
+ out = json.dumps(r, indent=4)
+ return HandleCommandResult(stdout=out)
+
+ @_cli_write_command('orch upgrade start')
+ def _upgrade_start(self,
+ image: Optional[str] = None,
+ ceph_version: Optional[str] = None,
+ daemon_types: Optional[str] = None,
+ hosts: Optional[str] = None,
+ services: Optional[str] = None,
+ limit: Optional[int] = None) -> HandleCommandResult:
+ """Initiate upgrade"""
+ self._upgrade_check_image_name(image, ceph_version)
+ dtypes = daemon_types.split(',') if daemon_types is not None else None
+ service_names = services.split(',') if services is not None else None
+ completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch upgrade pause')
+ def _upgrade_pause(self) -> HandleCommandResult:
+ """Pause an in-progress upgrade"""
+ completion = self.upgrade_pause()
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch upgrade resume')
+ def _upgrade_resume(self) -> HandleCommandResult:
+ """Resume paused upgrade"""
+ completion = self.upgrade_resume()
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch upgrade stop')
+ def _upgrade_stop(self) -> HandleCommandResult:
+ """Stop an in-progress upgrade"""
+ completion = self.upgrade_stop()
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
diff --git a/src/pybind/mgr/orchestrator/tests/__init__.py b/src/pybind/mgr/orchestrator/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/orchestrator/tests/__init__.py
diff --git a/src/pybind/mgr/orchestrator/tests/test_orchestrator.py b/src/pybind/mgr/orchestrator/tests/test_orchestrator.py
new file mode 100644
index 000000000..7fe32e559
--- /dev/null
+++ b/src/pybind/mgr/orchestrator/tests/test_orchestrator.py
@@ -0,0 +1,220 @@
+from __future__ import absolute_import
+
+import json
+import textwrap
+
+import pytest
+import yaml
+
+from ceph.deployment.service_spec import ServiceSpec
+from ceph.deployment import inventory
+from ceph.utils import datetime_now
+from mgr_module import HandleCommandResult
+
+from test_orchestrator import TestOrchestrator as _TestOrchestrator
+
+from orchestrator import InventoryHost, DaemonDescription, ServiceDescription, DaemonDescriptionStatus, OrchResult
+from orchestrator import OrchestratorValidationError
+from orchestrator.module import to_format, Format, OrchestratorCli, preview_table_osd
+from unittest import mock
+
+
+def _test_resource(data, resource_class, extra=None):
+ # ensure we can deserialize and serialize
+ rsc = resource_class.from_json(data)
+ assert rsc.to_json() == resource_class.from_json(rsc.to_json()).to_json()
+
+ if extra:
+ # if there is an unexpected data provided
+ data_copy = data.copy()
+ data_copy.update(extra)
+ with pytest.raises(OrchestratorValidationError):
+ resource_class.from_json(data_copy)
+
+
+def test_inventory():
+ json_data = {
+ 'name': 'host0',
+ 'addr': '1.2.3.4',
+ 'devices': [
+ {
+ 'sys_api': {
+ 'rotational': '1',
+ 'size': 1024,
+ },
+ 'path': '/dev/sda',
+ 'available': False,
+ 'rejected_reasons': [],
+ 'lvs': []
+ }
+ ]
+ }
+ _test_resource(json_data, InventoryHost, {'abc': False})
+ for devices in json_data['devices']:
+ _test_resource(devices, inventory.Device)
+
+ json_data = [{}, {'name': 'host0', 'addr': '1.2.3.4'}, {'devices': []}]
+ for data in json_data:
+ with pytest.raises(OrchestratorValidationError):
+ InventoryHost.from_json(data)
+
+
+def test_daemon_description():
+ json_data = {
+ 'hostname': 'test',
+ 'daemon_type': 'mon',
+ 'daemon_id': 'a',
+ 'status': -1,
+ }
+ _test_resource(json_data, DaemonDescription, {'abc': False})
+
+ dd = DaemonDescription.from_json(json_data)
+ assert dd.status.value == DaemonDescriptionStatus.error.value
+
+
+def test_apply():
+ to = _TestOrchestrator('', 0, 0)
+ completion = to.apply([
+ ServiceSpec(service_type='nfs', service_id='foo'),
+ ServiceSpec(service_type='nfs', service_id='foo'),
+ ServiceSpec(service_type='nfs', service_id='foo'),
+ ])
+ res = '<NFSServiceSpec for service_name=nfs.foo>'
+ assert completion.result == [res, res, res]
+
+
+def test_yaml():
+ y = """daemon_type: crash
+daemon_id: ubuntu
+daemon_name: crash.ubuntu
+hostname: ubuntu
+status: 1
+status_desc: starting
+is_active: false
+events:
+- 2020-06-10T10:08:22.933241Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on
+ host 'ubuntu'"
+---
+service_type: crash
+service_name: crash
+placement:
+ host_pattern: '*'
+status:
+ container_image_id: 74803e884bea289d2d2d3ebdf6d37cd560499e955595695b1390a89800f4e37a
+ container_image_name: docker.io/ceph/daemon-base:latest-master-devel
+ created: '2020-06-10T10:37:31.051288Z'
+ last_refresh: '2020-06-10T10:57:40.715637Z'
+ running: 1
+ size: 1
+events:
+- 2020-06-10T10:37:31.139159Z service:crash [INFO] "service was created"
+"""
+ types = (DaemonDescription, ServiceDescription)
+
+ for y, cls in zip(y.split('---\n'), types):
+ data = yaml.safe_load(y)
+ object = cls.from_json(data)
+
+ assert to_format(object, Format.yaml, False, cls) == y
+ assert to_format([object], Format.yaml, True, cls) == y
+
+ j = json.loads(to_format(object, Format.json, False, cls))
+ assert to_format(cls.from_json(j), Format.yaml, False, cls) == y
+
+
+def test_event_multiline():
+ from .._interface import OrchestratorEvent
+ e = OrchestratorEvent(datetime_now(), 'service', 'subject', 'ERROR', 'message')
+ assert OrchestratorEvent.from_json(e.to_json()) == e
+
+ e = OrchestratorEvent(datetime_now(), 'service',
+ 'subject', 'ERROR', 'multiline\nmessage')
+ assert OrchestratorEvent.from_json(e.to_json()) == e
+
+
+def test_handle_command():
+ cmd = {
+ 'prefix': 'orch daemon add',
+ 'daemon_type': 'mon',
+ 'placement': 'smithi044:[v2:172.21.15.44:3301,v1:172.21.15.44:6790]=c',
+ }
+ m = OrchestratorCli('orchestrator', 0, 0)
+ r = m._handle_command(None, cmd)
+ assert r == HandleCommandResult(
+ retval=-2, stdout='', stderr='No orchestrator configured (try `ceph orch set backend`)')
+
+
+r = OrchResult([ServiceDescription(spec=ServiceSpec(service_type='osd'), running=123)])
+
+
+@mock.patch("orchestrator.OrchestratorCli.describe_service", return_value=r)
+def test_orch_ls(_describe_service):
+ cmd = {
+ 'prefix': 'orch ls',
+ }
+ m = OrchestratorCli('orchestrator', 0, 0)
+ r = m._handle_command(None, cmd)
+ out = 'NAME PORTS RUNNING REFRESHED AGE PLACEMENT \n' \
+ 'osd 123 - - '
+ assert r == HandleCommandResult(retval=0, stdout=out, stderr='')
+
+ cmd = {
+ 'prefix': 'orch ls',
+ 'format': 'yaml',
+ }
+ m = OrchestratorCli('orchestrator', 0, 0)
+ r = m._handle_command(None, cmd)
+ out = textwrap.dedent("""
+ service_type: osd
+ service_name: osd
+ spec:
+ filter_logic: AND
+ objectstore: bluestore
+ status:
+ running: 123
+ size: 0
+ """).lstrip()
+ assert r == HandleCommandResult(retval=0, stdout=out, stderr='')
+
+
+def test_preview_table_osd_smoke():
+ data = [
+ {
+ 'service_type': 'osd',
+ 'data':
+ {
+ 'foo host':
+ [
+ {
+ 'osdspec': 'foo',
+ 'error': '',
+ 'data':
+ [
+ {
+ "block_db": "/dev/nvme0n1",
+ "block_db_size": "66.67 GB",
+ "data": "/dev/sdb",
+ "data_size": "300.00 GB",
+ "encryption": "None"
+ },
+ {
+ "block_db": "/dev/nvme0n1",
+ "block_db_size": "66.67 GB",
+ "data": "/dev/sdc",
+ "data_size": "300.00 GB",
+ "encryption": "None"
+ },
+ {
+ "block_db": "/dev/nvme0n1",
+ "block_db_size": "66.67 GB",
+ "data": "/dev/sdd",
+ "data_size": "300.00 GB",
+ "encryption": "None"
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ preview_table_osd(data)