diff options
Diffstat (limited to 'src/pybind/mgr/test_orchestrator/module.py')
-rw-r--r-- | src/pybind/mgr/test_orchestrator/module.py | 306 |
1 files changed, 306 insertions, 0 deletions
diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py new file mode 100644 index 000000000..d89c23bf1 --- /dev/null +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -0,0 +1,306 @@ +import errno +import json +import re +import os +import threading +import functools +import itertools +from subprocess import check_output, CalledProcessError + +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, IscsiServiceSpec + +try: + from typing import Callable, List, Sequence, Tuple +except ImportError: + pass # type checking + +from ceph.deployment import inventory +from ceph.deployment.drive_group import DriveGroupSpec +from mgr_module import CLICommand, HandleCommandResult +from mgr_module import MgrModule + +import orchestrator +from orchestrator import handle_orch_error, raise_if_exception + + +class TestOrchestrator(MgrModule, orchestrator.Orchestrator): + """ + This is an orchestrator implementation used for internal testing. It's meant for + development environments and integration testing. + + It does not actually do anything. + + The implementation is similar to the Rook orchestrator, but simpler. + """ + + @CLICommand('test_orchestrator load_data', perm='w') + def _load_data(self, inbuf): + """ + load dummy data into test orchestrator + """ + try: + data = json.loads(inbuf) + self._init_data(data) + return HandleCommandResult() + except json.decoder.JSONDecodeError as e: + msg = 'Invalid JSON file: {}'.format(e) + return HandleCommandResult(retval=-errno.EINVAL, stderr=msg) + except orchestrator.OrchestratorValidationError as e: + return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e)) + + def available(self): + return True, "", {} + + def __init__(self, *args, **kwargs): + super(TestOrchestrator, self).__init__(*args, **kwargs) + + self._initialized = threading.Event() + self._shutdown = threading.Event() + self._init_data({}) + + def shutdown(self): + self._shutdown.set() + + def serve(self): + + self._initialized.set() + + while not self._shutdown.is_set(): + self._shutdown.wait(5) + + def _init_data(self, data=None): + self._inventory = [orchestrator.InventoryHost.from_json(inventory_host) + for inventory_host in data.get('inventory', [])] + self._services = [orchestrator.ServiceDescription.from_json(service) + for service in data.get('services', [])] + self._daemons = [orchestrator.DaemonDescription.from_json(daemon) + for daemon in data.get('daemons', [])] + + @handle_orch_error + def get_inventory(self, host_filter=None, refresh=False): + """ + There is no guarantee which devices are returned by get_inventory. + """ + if host_filter and host_filter.hosts is not None: + assert isinstance(host_filter.hosts, list) + + if self._inventory: + if host_filter: + return list(filter(lambda host: host.name in host_filter.hosts, + self._inventory)) + return self._inventory + + try: + c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json']) + except OSError: + cmd = """ + . {tmpdir}/ceph-volume-virtualenv/bin/activate + ceph-volume inventory --format json + """ + try: + c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True) + except (OSError, CalledProcessError): + c_v_out = check_output(cmd.format(tmpdir='.'),shell=True) + + for out in c_v_out.splitlines(): + self.log.error(out) + devs = inventory.Devices.from_json(json.loads(out)) + return [orchestrator.InventoryHost('localhost', devs)] + self.log.error('c-v failed: ' + str(c_v_out)) + raise Exception('c-v failed') + + def _get_ceph_daemons(self): + # type: () -> List[orchestrator.DaemonDescription] + """ Return ceph daemons on the running host.""" + types = ("mds", "osd", "mon", "rgw", "mgr", "nfs", "iscsi") + out = map(str, check_output(['ps', 'aux']).splitlines()) + processes = [p for p in out if any( + [('ceph-{} '.format(t) in p) for t in types])] + + daemons = [] + for p in processes: + # parse daemon type + m = re.search('ceph-([^ ]+)', p) + if m: + _daemon_type = m.group(1) + else: + raise AssertionError('Fail to determine daemon type from {}'.format(p)) + + # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>` + patterns = [r'-i\s(\w+)', r'--id[\s=](\w+)'] + for pattern in patterns: + m = re.search(pattern, p) + if m: + daemon_id = m.group(1) + break + else: + raise AssertionError('Fail to determine daemon ID from {}'.format(p)) + daemon = orchestrator.DaemonDescription( + daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost') + daemons.append(daemon) + return daemons + + @handle_orch_error + def describe_service(self, service_type=None, service_name=None, refresh=False): + if self._services: + # Dummy data + services = self._services + if service_type is not None: + services = list(filter(lambda s: s.spec.service_type == service_type, services)) + else: + # Deduce services from daemons running on localhost + all_daemons = self._get_ceph_daemons() + services = [] + for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type): + if service_type is not None and service_type != daemon_type: + continue + daemon_size = len(list(daemons)) + services.append(orchestrator.ServiceDescription( + spec=ServiceSpec( + service_type=daemon_type, # type: ignore + ), + size=daemon_size, running=daemon_size)) + + def _filter_func(svc): + if service_name is not None and service_name != svc.spec.service_name(): + return False + return True + + return list(filter(_filter_func, services)) + + @handle_orch_error + def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False): + """ + There is no guarantee which daemons are returned by describe_service, except that + it returns the mgr we're running in. + """ + if daemon_type: + daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash", "nfs") + assert daemon_type in daemon_types, daemon_type + " unsupported" + + daemons = self._daemons if self._daemons else self._get_ceph_daemons() + + def _filter_func(d): + if service_name is not None and service_name != d.service_name(): + return False + if daemon_type is not None and daemon_type != d.daemon_type: + return False + if daemon_id is not None and daemon_id != d.daemon_id: + return False + if host is not None and host != d.hostname: + return False + return True + + return list(filter(_filter_func, daemons)) + + def preview_drivegroups(self, drive_group_name=None, dg_specs=None): + return [{}] + + @handle_orch_error + def create_osds(self, drive_group): + # type: (DriveGroupSpec) -> str + """ Creates OSDs from a drive group specification. + + $: ceph orch osd create -i <dg.file> + + The drivegroup file must only contain one spec at a time. + """ + return self._create_osds(drive_group) + + def _create_osds(self, drive_group): + # type: (DriveGroupSpec) -> str + + drive_group.validate() + all_hosts = raise_if_exception(self.get_hosts()) + if not drive_group.placement.filter_matching_hostspecs(all_hosts): + raise orchestrator.OrchestratorValidationError('failed to match') + return '' + + @handle_orch_error + def apply_drivegroups(self, specs): + # type: (List[DriveGroupSpec]) -> List[str] + return [self._create_osds(dg) for dg in specs] + + @handle_orch_error + def remove_daemons(self, names): + assert isinstance(names, list) + return 'done' + + @handle_orch_error + def remove_service(self, service_name, force = False): + assert isinstance(service_name, str) + return 'done' + + @handle_orch_error + def blink_device_light(self, ident_fault, on, locations): + assert ident_fault in ("ident", "fault") + assert len(locations) + return '' + + @handle_orch_error + def service_action(self, action, service_name): + return 'done' + + @handle_orch_error + def daemon_action(self, action, daemon_name, image=None): + return 'done' + + @handle_orch_error + def add_daemon(self, spec: ServiceSpec): + return [spec.one_line_str()] + + @handle_orch_error + def apply_nfs(self, spec): + return spec.one_line_str() + + @handle_orch_error + def apply_iscsi(self, spec): + # type: (IscsiServiceSpec) -> str + return spec.one_line_str() + + @handle_orch_error + def get_hosts(self): + if self._inventory: + return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory] + return [orchestrator.HostSpec('localhost')] + + @handle_orch_error + def add_host(self, spec): + # type: (orchestrator.HostSpec) -> str + host = spec.hostname + if host == 'raise_validation_error': + raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5") + if host == 'raise_error': + raise orchestrator.OrchestratorError("host address is empty") + if host == 'raise_bug': + raise ZeroDivisionError() + if host == 'raise_not_implemented': + raise NotImplementedError() + if host == 'raise_no_orchestrator': + raise orchestrator.NoOrchestrator() + if host == 'raise_import_error': + raise ImportError("test_orchestrator not enabled") + assert isinstance(host, str) + return '' + + @handle_orch_error + def remove_host(self, host, force: bool, offline: bool): + assert isinstance(host, str) + return 'done' + + @handle_orch_error + def apply_mgr(self, spec): + # type: (ServiceSpec) -> str + + assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count + assert all([isinstance(h, str) for h in spec.placement.hosts]) + return spec.one_line_str() + + @handle_orch_error + def apply_mon(self, spec): + # type: (ServiceSpec) -> str + + assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count + assert all([isinstance(h[0], str) for h in spec.placement.hosts]) + assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts]) + return spec.one_line_str() |