diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/cephadm/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/conftest.py | 27 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/fixtures.py | 151 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_autotune.py | 69 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_cephadm.py | 1805 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_completion.py | 40 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_configchecks.py | 668 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_facts.py | 10 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_migration.py | 229 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_osd_removal.py | 298 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_scheduling.py | 1591 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_services.py | 1031 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_spec.py | 600 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_template.py | 33 | ||||
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_upgrade.py | 322 |
15 files changed, 6874 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/tests/__init__.py b/src/pybind/mgr/cephadm/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/__init__.py diff --git a/src/pybind/mgr/cephadm/tests/conftest.py b/src/pybind/mgr/cephadm/tests/conftest.py new file mode 100644 index 000000000..e8add2c7b --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/conftest.py @@ -0,0 +1,27 @@ +import pytest + +from cephadm.services.osd import RemoveUtil, OSD +from tests import mock + +from .fixtures import with_cephadm_module + + +@pytest.fixture() +def cephadm_module(): + with with_cephadm_module({}) as m: + yield m + + +@pytest.fixture() +def rm_util(): + with with_cephadm_module({}) as m: + r = RemoveUtil.__new__(RemoveUtil) + r.__init__(m) + yield r + + +@pytest.fixture() +def osd_obj(): + with mock.patch("cephadm.services.osd.RemoveUtil"): + o = OSD(0, mock.MagicMock()) + yield o diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py new file mode 100644 index 000000000..8c2e1cfbf --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -0,0 +1,151 @@ +import fnmatch +from contextlib import contextmanager + +from ceph.deployment.service_spec import PlacementSpec, ServiceSpec +from ceph.utils import datetime_to_str, datetime_now +from cephadm.serve import CephadmServe + +try: + from typing import Any, Iterator, List +except ImportError: + pass + +from cephadm import CephadmOrchestrator +from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus +from tests import mock + + +def get_ceph_option(_, key): + return __file__ + + +def get_module_option_ex(_, module, key, default=None): + if module == 'prometheus': + if key == 'server_port': + return 9283 + return None + + +def _run_cephadm(ret): + def foo(s, host, entity, cmd, e, **kwargs): + if cmd == 'gather-facts': + return '{}', '', 0 + return [ret], '', 0 + return foo + + +def match_glob(val, pat): + ok = fnmatch.fnmatchcase(val, pat) + if not ok: + assert pat in val + + +@contextmanager +def with_cephadm_module(module_options=None, store=None): + """ + :param module_options: Set opts as if they were set before module.__init__ is called + :param store: Set the store before module.__init__ is called + """ + with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\ + mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \ + mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex),\ + mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \ + mock.patch("cephadm.module.CephadmOrchestrator.remote"), \ + mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'): + + m = CephadmOrchestrator.__new__(CephadmOrchestrator) + if module_options is not None: + for k, v in module_options.items(): + m._ceph_set_module_option('cephadm', k, v) + if store is None: + store = {} + if '_ceph_get/mon_map' not in store: + m.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + if '_ceph_get/mgr_map' not in store: + m.mock_store_set('_ceph_get', 'mgr_map', { + 'services': { + 'dashboard': 'http://[::1]:8080', + 'prometheus': 'http://[::1]:8081' + }, + 'modules': ['dashboard', 'prometheus'], + }) + for k, v in store.items(): + m._ceph_set_store(k, v) + + m.__init__('cephadm', 0, 0) + m._cluster_fsid = "fsid" + yield m + + +def wait(m: CephadmOrchestrator, c: OrchResult) -> Any: + return raise_if_exception(c) + + +@contextmanager +def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True): + with mock.patch("cephadm.utils.resolve_ip", return_value=addr): + wait(m, m.add_host(HostSpec(hostname=name))) + if refresh_hosts: + CephadmServe(m)._refresh_hosts_and_daemons() + yield + wait(m, m.remove_host(name, force=rm_with_force)) + + +def assert_rm_service(cephadm: CephadmOrchestrator, srv_name): + mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr') + if mon_or_mgr: + assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name)) + return + assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}' + assert cephadm.spec_store[srv_name].deleted is not None + CephadmServe(cephadm)._check_daemons() + CephadmServe(cephadm)._apply_all_services() + assert cephadm.spec_store[srv_name].deleted + unmanaged = cephadm.spec_store[srv_name].spec.unmanaged + CephadmServe(cephadm)._purge_deleted_services() + if not unmanaged: # cause then we're not deleting daemons + assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}' + + +@contextmanager +def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]: + if spec.placement.is_empty() and host: + spec.placement = PlacementSpec(hosts=[host], count=1) + if meth is not None: + c = meth(cephadm_module, spec) + assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...' + else: + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...'] + + specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] + assert spec in specs + + CephadmServe(cephadm_module)._apply_all_services() + + if status_running: + make_daemons_running(cephadm_module, spec.service_name()) + + dds = wait(cephadm_module, cephadm_module.list_daemons()) + own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()] + if host and spec.service_type != 'osd': + assert own_dds + + yield [dd.name() for dd in own_dds] + + assert_rm_service(cephadm_module, spec.service_name()) + + +def make_daemons_running(cephadm_module, service_name): + own_dds = cephadm_module.cache.get_daemons_by_service(service_name) + for dd in own_dds: + dd.status = DaemonDescriptionStatus.running # We're changing the reference + + +def _deploy_cephadm_binary(host): + def foo(*args, **kwargs): + return True + return foo diff --git a/src/pybind/mgr/cephadm/tests/test_autotune.py b/src/pybind/mgr/cephadm/tests/test_autotune.py new file mode 100644 index 000000000..524da9c00 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_autotune.py @@ -0,0 +1,69 @@ +# Disable autopep8 for this file: + +# fmt: off + +import pytest + +from cephadm.autotune import MemoryAutotuner +from orchestrator import DaemonDescription + + +@pytest.mark.parametrize("total,daemons,config,result", + [ # noqa: E128 + ( + 128 * 1024 * 1024 * 1024, + [], + {}, + None, + ), + ( + 128 * 1024 * 1024 * 1024, + [ + DaemonDescription('osd', '1', 'host1'), + DaemonDescription('osd', '2', 'host1'), + ], + {}, + 64 * 1024 * 1024 * 1024, + ), + ( + 128 * 1024 * 1024 * 1024, + [ + DaemonDescription('osd', '1', 'host1'), + DaemonDescription('osd', '2', 'host1'), + DaemonDescription('osd', '3', 'host1'), + ], + { + 'osd.3': 16 * 1024 * 1024 * 1024, + }, + 56 * 1024 * 1024 * 1024, + ), + ( + 128 * 1024 * 1024 * 1024, + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('osd', '1', 'host1'), + DaemonDescription('osd', '2', 'host1'), + ], + {}, + 62 * 1024 * 1024 * 1024, + ) + ]) +def test_autotune(total, daemons, config, result): + def fake_getter(who, opt): + if opt == 'osd_memory_target_autotune': + if who in config: + return False + else: + return True + if opt == 'osd_memory_target': + return config.get(who, 4 * 1024 * 1024 * 1024) + if opt == 'mds_cache_memory_limit': + return 16 * 1024 * 1024 * 1024 + + a = MemoryAutotuner( + total_mem=total, + daemons=daemons, + config_get=fake_getter, + ) + val, osds = a.tune() + assert val == result diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py new file mode 100644 index 000000000..a6850f6cb --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -0,0 +1,1805 @@ +import json +import logging +from contextlib import contextmanager + +import pytest + +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection +from cephadm.serve import CephadmServe +from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims + +try: + from typing import List +except ImportError: + pass + +from execnet.gateway_bootstrap import HostNotFound + +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \ + NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec, MDSSpec +from ceph.deployment.drive_selection.selector import DriveSelection +from ceph.deployment.inventory import Devices, Device +from ceph.utils import datetime_to_str, datetime_now +from orchestrator import DaemonDescription, InventoryHost, \ + HostSpec, OrchestratorError, DaemonDescriptionStatus, OrchestratorEvent +from tests import mock +from .fixtures import wait, _run_cephadm, match_glob, with_host, \ + with_cephadm_module, with_service, _deploy_cephadm_binary, make_daemons_running +from cephadm.module import CephadmOrchestrator + +""" +TODOs: + There is really room for improvement here. I just quickly assembled theses tests. + I general, everything should be testes in Teuthology as well. Reasons for + also testing this here is the development roundtrip time. +""" + + +def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host): + dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host)) + d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)] + assert d_names + # there should only be one daemon (if not match_glob will throw mismatch) + assert len(d_names) == 1 + + c = cephadm.remove_daemons(d_names) + [out] = wait(cephadm, c) + # picking the 1st element is needed, rather than passing the list when the daemon + # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter + # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e) + # are valid, so pass without incident! Also, match_gob acts on strings anyway! + match_glob(out, f"Removed {d_names[0]}* from host '{host}'") + + +@contextmanager +def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: str): + spec.placement = PlacementSpec(hosts=[host], count=1) + + c = cephadm_module.add_daemon(spec) + [out] = wait(cephadm_module, c) + match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'") + + dds = cephadm_module.cache.get_daemons_by_service(spec.service_name()) + for dd in dds: + if dd.hostname == host: + yield dd.daemon_id + assert_rm_daemon(cephadm_module, spec.service_name(), host) + return + + assert False, 'Daemon not found' + + +@contextmanager +def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None): + cephadm_module.mock_store_set('_ceph_get', 'osd_map', { + 'osds': [ + { + 'osd': 1, + 'up_from': 0, + 'up': True, + 'uuid': 'uuid' + } + ] + }) + + _run_cephadm.reset_mock(return_value=True, side_effect=True) + if ceph_volume_lvm_list: + _run_cephadm.side_effect = ceph_volume_lvm_list + else: + def _ceph_volume_list(s, host, entity, cmd, **kwargs): + logging.info(f'ceph-volume cmd: {cmd}') + if 'raw' in cmd: + return json.dumps({ + "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": { + "ceph_fsid": cephadm_module._cluster_fsid, + "device": "/dev/loop0", + "osd_id": 21, + "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5", + "type": "bluestore" + }, + }), '', 0 + if 'lvm' in cmd: + return json.dumps({ + str(osd_id): [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }] + }), '', 0 + return '{}', '', 0 + + _run_cephadm.side_effect = _ceph_volume_list + + assert cephadm_module._osd_activate( + [host]).stdout == f"Created osd(s) 1 on host '{host}'" + assert _run_cephadm.mock_calls == [ + mock.call(host, 'osd', 'ceph-volume', + ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, image=''), + mock.call(host, f'osd.{osd_id}', 'deploy', + ['--name', f'osd.{osd_id}', '--meta-json', mock.ANY, + '--config-json', '-', '--osd-fsid', 'uuid'], + stdin=mock.ANY, image=''), + mock.call(host, 'osd', 'ceph-volume', + ['--', 'raw', 'list', '--format', 'json'], no_fsid=False, image=''), + ] + dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host) + assert dd.name() == f'osd.{osd_id}' + yield dd + cephadm_module._remove_daemons([(f'osd.{osd_id}', host)]) + + +class TestCephadm(object): + + def test_get_unique_name(self, cephadm_module): + # type: (CephadmOrchestrator) -> None + existing = [ + DaemonDescription(daemon_type='mon', daemon_id='a') + ] + new_mon = cephadm_module.get_unique_name('mon', 'myhost', existing) + match_glob(new_mon, 'myhost') + new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing) + match_glob(new_mgr, 'myhost.*') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_host(self, cephadm_module): + assert wait(cephadm_module, cephadm_module.get_hosts()) == [] + with with_host(cephadm_module, 'test'): + assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')] + + # Be careful with backward compatibility when changing things here: + assert json.loads(cephadm_module.get_store('inventory')) == \ + {"test": {"hostname": "test", "addr": "1::4", "labels": [], "status": ""}} + + with with_host(cephadm_module, 'second', '1.2.3.5'): + assert wait(cephadm_module, cephadm_module.get_hosts()) == [ + HostSpec('test', '1::4'), + HostSpec('second', '1.2.3.5') + ] + + assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')] + assert wait(cephadm_module, cephadm_module.get_hosts()) == [] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.utils.resolve_ip") + def test_re_add_host_receive_loopback(self, resolve_ip, cephadm_module): + resolve_ip.side_effect = ['192.168.122.1', '127.0.0.1', '127.0.0.1'] + assert wait(cephadm_module, cephadm_module.get_hosts()) == [] + cephadm_module._add_host(HostSpec('test', '192.168.122.1')) + assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '192.168.122.1')] + cephadm_module._add_host(HostSpec('test')) + assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '192.168.122.1')] + with pytest.raises(OrchestratorError): + cephadm_module._add_host(HostSpec('test2')) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_service_ls(self, cephadm_module): + with with_host(cephadm_module, 'test'): + c = cephadm_module.list_daemons(refresh=True) + assert wait(cephadm_module, c) == [] + with with_service(cephadm_module, MDSSpec('mds', 'name', unmanaged=True)) as _, \ + with_daemon(cephadm_module, MDSSpec('mds', 'name'), 'test') as _: + + c = cephadm_module.list_daemons() + + def remove_id_events(dd): + out = dd.to_json() + del out['daemon_id'] + del out['events'] + del out['daemon_name'] + return out + + assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [ + { + 'service_name': 'mds.name', + 'daemon_type': 'mds', + 'hostname': 'test', + 'status': 2, + 'status_desc': 'starting', + 'is_active': False, + 'ports': [], + } + ] + + with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'), + CephadmOrchestrator.apply_rgw, 'test', status_running=True): + make_daemons_running(cephadm_module, 'mds.name') + + c = cephadm_module.describe_service() + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + expected = [ + { + 'placement': {'count': 2}, + 'service_id': 'name', + 'service_name': 'mds.name', + 'service_type': 'mds', + 'status': {'created': mock.ANY, 'running': 1, 'size': 2}, + 'unmanaged': True + }, + { + 'placement': { + 'count': 1, + 'hosts': ["test"] + }, + 'service_id': 'r.z', + 'service_name': 'rgw.r.z', + 'service_type': 'rgw', + 'status': {'created': mock.ANY, 'running': 1, 'size': 1, + 'ports': [80]}, + } + ] + for o in out: + if 'events' in o: + del o['events'] # delete it, as it contains a timestamp + assert out == expected + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_service_ls_service_type_flag(self, cephadm_module): + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), + CephadmOrchestrator.apply_mgr, '', status_running=True): + with with_service(cephadm_module, MDSSpec('mds', 'test-id', placement=PlacementSpec(count=2)), + CephadmOrchestrator.apply_mds, '', status_running=True): + + # with no service-type. Should provide info fot both services + c = cephadm_module.describe_service() + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + expected = [ + { + 'placement': {'count': 2}, + 'service_name': 'mgr', + 'service_type': 'mgr', + 'status': {'created': mock.ANY, + 'running': 2, + 'size': 2} + }, + { + 'placement': {'count': 2}, + 'service_id': 'test-id', + 'service_name': 'mds.test-id', + 'service_type': 'mds', + 'status': {'created': mock.ANY, + 'running': 2, + 'size': 2} + }, + ] + + for o in out: + if 'events' in o: + del o['events'] # delete it, as it contains a timestamp + assert out == expected + + # with service-type. Should provide info fot only mds + c = cephadm_module.describe_service(service_type='mds') + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + expected = [ + { + 'placement': {'count': 2}, + 'service_id': 'test-id', + 'service_name': 'mds.test-id', + 'service_type': 'mds', + 'status': {'created': mock.ANY, + 'running': 2, + 'size': 2} + }, + ] + + for o in out: + if 'events' in o: + del o['events'] # delete it, as it contains a timestamp + assert out == expected + + # service-type should not match with service names + c = cephadm_module.describe_service(service_type='mds.test-id') + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + assert out == [] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_device_ls(self, cephadm_module): + with with_host(cephadm_module, 'test'): + c = cephadm_module.get_inventory() + assert wait(cephadm_module, c) == [InventoryHost('test')] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name='rgw.myrgw.foobar', + style='cephadm', + fsid='fsid', + container_id='container_id', + version='version', + state='running', + ), + dict( + name='something.foo.bar', + style='cephadm', + fsid='fsid', + ), + dict( + name='haproxy.test.bar', + style='cephadm', + fsid='fsid', + ), + + ]) + )) + def test_list_daemons(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._refresh_host_daemons('test') + dds = wait(cephadm_module, cephadm_module.list_daemons()) + assert {d.name() for d in dds} == {'rgw.myrgw.foobar', 'haproxy.test.bar'} + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_daemon_action(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \ + with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id: + + d_name = 'rgw.' + daemon_id + + c = cephadm_module.daemon_action('redeploy', d_name) + assert wait(cephadm_module, + c) == f"Scheduled to redeploy rgw.{daemon_id} on host 'test'" + + for what in ('start', 'stop', 'restart'): + c = cephadm_module.daemon_action(what, d_name) + assert wait(cephadm_module, + c) == F"Scheduled to {what} {d_name} on host 'test'" + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module._store['_ceph_get/mon_map'] = { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + } + cephadm_module.notify('mon_map', None) + + CephadmServe(cephadm_module)._check_daemons() + + assert cephadm_module.events.get_for_daemon(d_name) == [ + OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO', + f"Deployed {d_name} on host \'test\'"), + OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO', + f"stop {d_name} from host \'test\'"), + ] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \ + with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id: + with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command: + + _ceph_send_command.side_effect = Exception("myerror") + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + cephadm_module.notify('mon_map', None) + + CephadmServe(cephadm_module)._check_daemons() + + evs = [e.message for e in cephadm_module.events.get_for_daemon( + f'rgw.{daemon_id}')] + + assert 'myerror' in ''.join(evs) + + @pytest.mark.parametrize( + "action", + [ + 'start', + 'stop', + 'restart', + 'reconfig', + 'redeploy' + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names: + [daemon_name] = d_names + + cephadm_module._schedule_daemon_action(daemon_name, action) + + assert cephadm_module.cache.get_scheduled_daemon_action( + 'test', daemon_name) == action + + CephadmServe(cephadm_module)._check_daemons() + + assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + with with_host(cephadm_module, 'test'): + + # Also testing deploying mons without explicit network placement + cephadm_module.check_mon_command({ + 'prefix': 'config set', + 'who': 'mon', + 'name': 'public_network', + 'value': '127.0.0.0/8' + }) + + cephadm_module.cache.update_host_devices_networks( + 'test', + [], + { + "127.0.0.0/8": [ + "127.0.0.1" + ], + } + ) + + with with_service(cephadm_module, ServiceSpec(service_type='mon'), CephadmOrchestrator.apply_mon, 'test') as d_names: + [daemon_name] = d_names + + cephadm_module._set_extra_ceph_conf('[mon]\nk=v') + + CephadmServe(cephadm_module)._check_daemons() + + _run_cephadm.assert_called_with( + 'test', 'mon.test', 'deploy', [ + '--name', 'mon.test', + '--meta-json', '{"service_name": "mon", "ports": [], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--reconfig', + ], + stdin='{"config": "\\n\\n[mon]\\nk=v\\n[mon.test]\\npublic network = 127.0.0.0/8\\n", ' + + '"keyring": "", "files": {"config": "[mon.test]\\npublic network = 127.0.0.0/8\\n"}}', + image='') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_extra_container_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='crash', extra_container_args=['--cpus=2', '--quiet']), CephadmOrchestrator.apply_crash): + _run_cephadm.assert_called_with( + 'test', 'crash.test', 'deploy', [ + '--name', 'crash.test', + '--meta-json', '{"service_name": "crash", "ports": [], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": ["--cpus=2", "--quiet"]}', + '--config-json', '-', + '--extra-container-args=--cpus=2', + '--extra-container-args=--quiet' + ], + stdin='{"config": "", "keyring": ""}', + image='', + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'): + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + cephadm_module.notify('mon_map', None) + cephadm_module.mock_store_set('_ceph_get', 'mgr_map', { + 'modules': ['dashboard'] + }) + + with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd: + CephadmServe(cephadm_module)._check_daemons() + _mon_cmd.assert_any_call( + {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://[1::4]:3000'}, + None) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1.2.3.4') + def test_iscsi_post_actions_with_missing_daemon_in_cache(self, cephadm_module: CephadmOrchestrator): + # https://tracker.ceph.com/issues/52866 + with with_host(cephadm_module, 'test1'): + with with_host(cephadm_module, 'test2'): + with with_service(cephadm_module, IscsiServiceSpec(service_id='foobar', pool='pool', placement=PlacementSpec(host_pattern='*')), CephadmOrchestrator.apply_iscsi, 'test'): + + CephadmServe(cephadm_module)._apply_all_services() + assert len(cephadm_module.cache.get_daemons_by_type('iscsi')) == 2 + + # get a deamons from postaction list (ARRGH sets!!) + tempset = cephadm_module.requires_post_actions.copy() + tempdeamon1 = tempset.pop() + tempdeamon2 = tempset.pop() + + # make sure post actions has 2 daemons in it + assert len(cephadm_module.requires_post_actions) == 2 + + # replicate a host cache that is not in sync when check_daemons is called + tempdd1 = cephadm_module.cache.get_daemon(tempdeamon1) + tempdd2 = cephadm_module.cache.get_daemon(tempdeamon2) + host = 'test1' + if 'test1' not in tempdeamon1: + host = 'test2' + cephadm_module.cache.rm_daemon(host, tempdeamon1) + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + cephadm_module.notify('mon_map', None) + cephadm_module.mock_store_set('_ceph_get', 'mgr_map', { + 'modules': ['dashboard'] + }) + + with mock.patch("cephadm.module.IscsiService.config_dashboard") as _cfg_db: + CephadmServe(cephadm_module)._check_daemons() + _cfg_db.assert_called_once_with([tempdd2]) + + # post actions still has the other deamon in it and will run next _check_deamons + assert len(cephadm_module.requires_post_actions) == 1 + + # post actions was missed for a daemon + assert tempdeamon1 in cephadm_module.requires_post_actions + + # put the daemon back in the cache + cephadm_module.cache.add_daemon(host, tempdd1) + + _cfg_db.reset_mock() + # replicate serve loop running again + CephadmServe(cephadm_module)._check_daemons() + + # post actions should have been called again + _cfg_db.asset_called() + + # post actions is now empty + assert len(cephadm_module.requires_post_actions) == 0 + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_mon_add(self, cephadm_module): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)): + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps)) + assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"] + + with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"): + ps = PlacementSpec(hosts=['test'], count=1) + c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps)) + wait(cephadm_module, c) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_mgr_update(self, cephadm_module): + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) + assert r + + assert_rm_daemon(cephadm_module, 'mgr.a', 'test') + + @mock.patch("cephadm.module.CephadmOrchestrator.mon_command") + def test_find_destroyed_osds(self, _mon_cmd, cephadm_module): + dict_out = { + "nodes": [ + { + "id": -1, + "name": "default", + "type": "root", + "type_id": 11, + "children": [ + -3 + ] + }, + { + "id": -3, + "name": "host1", + "type": "host", + "type_id": 1, + "pool_weights": {}, + "children": [ + 0 + ] + }, + { + "id": 0, + "device_class": "hdd", + "name": "osd.0", + "type": "osd", + "type_id": 0, + "crush_weight": 0.0243988037109375, + "depth": 2, + "pool_weights": {}, + "exists": 1, + "status": "destroyed", + "reweight": 1, + "primary_affinity": 1 + } + ], + "stray": [] + } + json_out = json.dumps(dict_out) + _mon_cmd.return_value = (0, json_out, '') + osd_claims = OsdIdClaims(cephadm_module) + assert osd_claims.get() == {'host1': ['0']} + assert osd_claims.filtered_by_host('host1') == ['0'] + assert osd_claims.filtered_by_host('host1.domain.com') == ['0'] + + @ pytest.mark.parametrize( + "ceph_services, cephadm_daemons, strays_expected, metadata", + # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ] + [ + ( + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [], + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + {}, + ), + ( + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [], + {}, + ), + ( + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [('mds', 'a'), ('osd', '0')], + [('mgr', 'x')], + {}, + ), + # https://tracker.ceph.com/issues/49573 + ( + [('rgw-nfs', '14649')], + [], + [('nfs', 'foo-rgw.host1')], + {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}}, + ), + ( + [('rgw-nfs', '14649'), ('rgw-nfs', '14650')], + [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')], + [], + {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}}, + ), + ( + [('rgw-nfs', '14649'), ('rgw-nfs', '14650')], + [('nfs', 'foo-rgw.host1')], + [('nfs', 'foo2.host2')], + {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}}, + ), + ] + ) + def test_check_for_stray_daemons( + self, + cephadm_module, + ceph_services, + cephadm_daemons, + strays_expected, + metadata + ): + # mock ceph service-map + services = [] + for service in ceph_services: + s = {'type': service[0], 'id': service[1]} + services.append(s) + ls = [{'hostname': 'host1', 'services': services}] + + with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers: + list_servers.return_value = ls + list_servers.__iter__.side_effect = ls.__iter__ + + # populate cephadm daemon cache + dm = {} + for daemon_type, daemon_id in cephadm_daemons: + dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id) + dm[dd.name()] = dd + cephadm_module.cache.update_host_daemons('host1', dm) + + def get_metadata_mock(svc_type, svc_id, default): + return metadata[svc_id] + + with mock.patch.object(cephadm_module, 'get_metadata', new_callable=lambda: get_metadata_mock): + + # test + CephadmServe(cephadm_module)._check_for_strays() + + # verify + strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON') + if not strays: + assert len(strays_expected) == 0 + else: + for dt, di in strays_expected: + name = '%s.%s' % (dt, di) + for detail in strays['detail']: + if name in detail: + strays['detail'].remove(detail) + break + assert name in detail + assert len(strays['detail']) == 0 + assert strays['count'] == len(strays_expected) + + @mock.patch("cephadm.module.CephadmOrchestrator.mon_command") + def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module): + _mon_cmd.return_value = (1, "", "fail_msg") + with pytest.raises(OrchestratorError): + OsdIdClaims(cephadm_module) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + + spec = DriveGroupSpec( + service_id='foo', + placement=PlacementSpec( + host_pattern='*', + ), + data_devices=DeviceSelection( + all=True + ) + ) + + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == ['Scheduled osd.foo update...'] + + inventory = Devices([ + Device( + '/dev/sdb', + available=True + ), + ]) + + cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {}) + + _run_cephadm.return_value = (['{}'], '', 0) + + assert CephadmServe(cephadm_module)._apply_all_services() is False + + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', + ['--config-json', '-', '--', 'lvm', 'batch', + '--no-auto', '/dev/sdb', '--yes', '--no-systemd'], + env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}') + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False) + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + + spec = DriveGroupSpec( + service_id='noncollocated', + placement=PlacementSpec( + hosts=['test'] + ), + data_devices=DeviceSelection(paths=['/dev/sdb']), + db_devices=DeviceSelection(paths=['/dev/sdc']), + wal_devices=DeviceSelection(paths=['/dev/sdd']) + ) + + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...'] + + inventory = Devices([ + Device('/dev/sdb', available=True), + Device('/dev/sdc', available=True), + Device('/dev/sdd', available=True) + ]) + + cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {}) + + _run_cephadm.return_value = (['{}'], '', 0) + + assert CephadmServe(cephadm_module)._apply_all_services() is False + + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', + ['--config-json', '-', '--', 'lvm', 'batch', + '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc', + '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'], + env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'], + error_ok=True, stdin='{"config": "", "keyring": ""}') + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False) + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.module.SpecStore.save") + def test_apply_osd_save_placement(self, _save_spec, cephadm_module): + with with_host(cephadm_module, 'test'): + json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, + 'service_id': 'foo', 'data_devices': {'all': True}} + spec = ServiceSpec.from_json(json_spec) + assert isinstance(spec, DriveGroupSpec) + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == ['Scheduled osd.foo update...'] + _save_spec.assert_called_with(spec) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_create_osds(self, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(paths=[''])) + c = cephadm_module.create_osds(dg) + out = wait(cephadm_module, c) + assert out == "Created no osd(s) on host test; already created?" + bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_hsot'), + data_devices=DeviceSelection(paths=[''])) + c = cephadm_module.create_osds(bad_dg) + out = wait(cephadm_module, c) + assert "Invalid 'host:device' spec: host not found in cluster" in out + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_create_noncollocated_osd(self, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(paths=[''])) + c = cephadm_module.create_osds(dg) + out = wait(cephadm_module, c) + assert out == "Created no osd(s) on host test; already created?" + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch('cephadm.services.osd.OSDService._run_ceph_volume_command') + @mock.patch('cephadm.services.osd.OSDService.driveselection_to_ceph_volume') + @mock.patch('cephadm.services.osd.OsdIdClaims.refresh', lambda _: None) + @mock.patch('cephadm.services.osd.OsdIdClaims.get', lambda _: {}) + def test_limit_not_reached(self, d_to_cv, _run_cv_cmd, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(limit=5, rotational=1), + service_id='not_enough') + + disks_found = [ + '[{"data": "/dev/vdb", "data_size": "50.00 GB", "encryption": "None"}, {"data": "/dev/vdc", "data_size": "50.00 GB", "encryption": "None"}]'] + d_to_cv.return_value = 'foo' + _run_cv_cmd.return_value = (disks_found, '', 0) + preview = cephadm_module.osd_service.generate_previews([dg], 'test') + + for osd in preview: + assert 'notes' in osd + assert osd['notes'] == [ + 'NOTE: Did not find enough disks matching filter on host test to reach data device limit (Found: 2 | Limit: 5)'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_prepare_drivegroup(self, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(paths=[''])) + out = cephadm_module.osd_service.prepare_drivegroup(dg) + assert len(out) == 1 + f1 = out[0] + assert f1[0] == 'test' + assert isinstance(f1[1], DriveSelection) + + @pytest.mark.parametrize( + "devices, preview, exp_commands", + [ + # no preview and only one disk, prepare is used due the hack that is in place. + (['/dev/sda'], False, ["lvm batch --no-auto /dev/sda --yes --no-systemd"]), + # no preview and multiple disks, uses batch + (['/dev/sda', '/dev/sdb'], False, + ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"]), + # preview and only one disk needs to use batch again to generate the preview + (['/dev/sda'], True, ["lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"]), + # preview and multiple disks work the same + (['/dev/sda', '/dev/sdb'], True, + ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"]), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec( + host_pattern='test'), data_devices=DeviceSelection(paths=devices)) + ds = DriveSelection(dg, Devices([Device(path) for path in devices])) + preview = preview + out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview) + assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}' + + @pytest.mark.parametrize( + "devices, preview, exp_commands", + [ + # one data device, no preview + (['/dev/sda'], False, ["raw prepare --bluestore --data /dev/sda"]), + # multiple data devices, no preview + (['/dev/sda', '/dev/sdb'], False, + ["raw prepare --bluestore --data /dev/sda", "raw prepare --bluestore --data /dev/sdb"]), + # one data device, preview + (['/dev/sda'], True, ["raw prepare --bluestore --data /dev/sda --report --format json"]), + # multiple data devices, preview + (['/dev/sda', '/dev/sdb'], True, + ["raw prepare --bluestore --data /dev/sda --report --format json", "raw prepare --bluestore --data /dev/sdb --report --format json"]), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_raw_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(service_id='test.spec', method='raw', placement=PlacementSpec( + host_pattern='test'), data_devices=DeviceSelection(paths=devices)) + ds = DriveSelection(dg, Devices([Device(path) for path in devices])) + preview = preview + out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview) + assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name='osd.0', + style='cephadm', + fsid='fsid', + container_id='container_id', + version='version', + state='running', + ) + ]) + )) + @mock.patch("cephadm.services.osd.OSD.exists", True) + @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0) + def test_remove_osds(self, cephadm_module): + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._refresh_host_daemons('test') + c = cephadm_module.list_daemons() + wait(cephadm_module, c) + + c = cephadm_module.remove_daemons(['osd.0']) + out = wait(cephadm_module, c) + assert out == ["Removed osd.0 from host 'test'"] + + cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0, + replace=False, + force=False, + hostname='test', + process_started_at=datetime_now(), + remove_util=cephadm_module.to_remove_osds.rm_util + )) + cephadm_module.to_remove_osds.process_removal_queue() + assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module) + + c = cephadm_module.remove_osds_status() + out = wait(cephadm_module, c) + assert out == [] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_rgw_update(self, cephadm_module): + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service(cephadm_module, RGWSpec(service_id="foo", unmanaged=True)): + ps = PlacementSpec(hosts=['host1'], count=1) + c = cephadm_module.add_daemon( + RGWSpec(service_id="foo", placement=ps)) + [out] = wait(cephadm_module, c) + match_glob(out, "Deployed rgw.foo.* on host 'host1'") + + ps = PlacementSpec(hosts=['host1', 'host2'], count=2) + r = CephadmServe(cephadm_module)._apply_service( + RGWSpec(service_id="foo", placement=ps)) + assert r + + assert_rm_daemon(cephadm_module, 'rgw.foo', 'host1') + assert_rm_daemon(cephadm_module, 'rgw.foo', 'host2') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name='rgw.myrgw.myhost.myid', + style='cephadm', + fsid='fsid', + container_id='container_id', + version='version', + state='running', + ) + ]) + )) + def test_remove_daemon(self, cephadm_module): + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._refresh_host_daemons('test') + c = cephadm_module.list_daemons() + wait(cephadm_module, c) + c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid']) + out = wait(cephadm_module, c) + assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription + with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2: # type: DaemonDescription + CephadmServe(cephadm_module)._check_for_moved_osds() + # both are in status "starting" + assert len(cephadm_module.cache.get_daemons()) == 2 + + dd1.status = DaemonDescriptionStatus.running + dd2.status = DaemonDescriptionStatus.error + cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1}) + cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2}) + CephadmServe(cephadm_module)._check_for_moved_osds() + assert len(cephadm_module.cache.get_daemons()) == 1 + + assert cephadm_module.events.get_for_daemon('osd.1') == [ + OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO', + "Deployed osd.1 on host 'host1'"), + OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO', + "Deployed osd.1 on host 'host2'"), + OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO', + "Removed duplicated daemon on host 'host2'"), + ] + + with pytest.raises(AssertionError): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': 'osd.1', + }) + + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': 'osd.1', + }) + + @pytest.mark.parametrize( + "spec", + [ + ServiceSpec('crash'), + ServiceSpec('prometheus'), + ServiceSpec('grafana'), + ServiceSpec('node-exporter'), + ServiceSpec('alertmanager'), + ServiceSpec('rbd-mirror'), + ServiceSpec('cephfs-mirror'), + ServiceSpec('mds', service_id='fsname'), + RGWSpec(rgw_realm='realm', rgw_zone='zone'), + RGWSpec(service_id="foo"), + ServiceSpec('cephadm-exporter'), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_daemon_add(self, spec: ServiceSpec, cephadm_module): + unmanaged_spec = ServiceSpec.from_json(spec.to_json()) + unmanaged_spec.unmanaged = True + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, unmanaged_spec): + with with_daemon(cephadm_module, spec, 'test'): + pass + + @pytest.mark.parametrize( + "entity,success,spec", + [ + ('mgr.x', True, ServiceSpec( + service_type='mgr', + placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ('client.rgw.x', True, ServiceSpec( + service_type='rgw', + service_id='id', + placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ('client.nfs.x', True, ServiceSpec( + service_type='nfs', + service_id='id', + placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ('mon.', False, ServiceSpec( + service_type='mon', + placement=PlacementSpec( + hosts=[HostPlacementSpec('test', '127.0.0.0/24', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) + def test_daemon_add_fail(self, _run_cephadm, entity, success, spec, cephadm_module): + _run_cephadm.return_value = '{}', '', 0 + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec): + _run_cephadm.side_effect = OrchestratorError('fail') + with pytest.raises(OrchestratorError): + wait(cephadm_module, cephadm_module.add_daemon(spec)) + if success: + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + else: + with pytest.raises(AssertionError): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + assert cephadm_module.events.get_for_service(spec.service_name()) == [ + OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'INFO', + "service was created"), + OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'ERROR', + "fail"), + ] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_daemon_place_fail_health_warning(self, _run_cephadm, cephadm_module): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + _run_cephadm.side_effect = OrchestratorError('fail') + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) + assert not r + assert cephadm_module.health_checks.get('CEPHADM_DAEMON_PLACE_FAIL') is not None + assert cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['count'] == 1 + assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['summary'] + assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['detail'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._apply_all_services() + ps = PlacementSpec(hosts=['fail'], count=1) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) + assert not r + assert cephadm_module.apply_spec_fails + assert cephadm_module.health_checks.get('CEPHADM_APPLY_SPEC_FAIL') is not None + assert cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['count'] == 1 + assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['summary'] + + @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + get_foreign_ceph_option.side_effect = KeyError + CephadmServe(cephadm_module)._apply_service_config( + ServiceSpec('mgr', placement=ps, config={'test': 'foo'})) + assert cephadm_module.health_checks.get('CEPHADM_INVALID_CONFIG_OPTION') is not None + assert cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['count'] == 1 + assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks[ + 'CEPHADM_INVALID_CONFIG_OPTION']['summary'] + assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[ + 'CEPHADM_INVALID_CONFIG_OPTION']['detail'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) + def test_nfs(self, cephadm_module): + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test'], count=1) + spec = NFSServiceSpec( + service_id='name', + placement=ps) + unmanaged_spec = ServiceSpec.from_json(spec.to_json()) + unmanaged_spec.unmanaged = True + with with_service(cephadm_module, unmanaged_spec): + c = cephadm_module.add_daemon(spec) + [out] = wait(cephadm_module, c) + match_glob(out, "Deployed nfs.name.* on host 'test'") + + assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("subprocess.run", None) + @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock()) + @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4') + def test_iscsi(self, cephadm_module): + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test'], count=1) + spec = IscsiServiceSpec( + service_id='name', + pool='pool', + api_user='user', + api_password='password', + placement=ps) + unmanaged_spec = ServiceSpec.from_json(spec.to_json()) + unmanaged_spec.unmanaged = True + with with_service(cephadm_module, unmanaged_spec): + + c = cephadm_module.add_daemon(spec) + [out] = wait(cephadm_module, c) + match_glob(out, "Deployed iscsi.name.* on host 'test'") + + assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test') + + @pytest.mark.parametrize( + "on_bool", + [ + True, + False + ] + ) + @pytest.mark.parametrize( + "fault_ident", + [ + 'fault', + 'ident' + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module): + _run_cephadm.return_value = '{}', '', 0 + with with_host(cephadm_module, 'test'): + c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')]) + on_off = 'on' if on_bool else 'off' + assert wait(cephadm_module, c) == [f'Set {fault_ident} light for test: {on_off}'] + _run_cephadm.assert_called_with('test', 'osd', 'shell', [ + '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_blink_device_light_custom(self, _run_cephadm, cephadm_module): + _run_cephadm.return_value = '{}', '', 0 + with with_host(cephadm_module, 'test'): + cephadm_module.set_store('blink_device_light_cmd', 'echo hello') + c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')]) + assert wait(cephadm_module, c) == ['Set ident light for test: on'] + _run_cephadm.assert_called_with('test', 'osd', 'shell', [ + '--', 'echo', 'hello'], error_ok=True) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module): + _run_cephadm.return_value = '{}', '', 0 + with with_host(cephadm_module, 'mgr0'): + cephadm_module.set_store('mgr0/blink_device_light_cmd', + 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'') + c = cephadm_module.blink_device_light( + 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')]) + assert wait(cephadm_module, c) == [ + 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on'] + _run_cephadm.assert_called_with('mgr0', 'osd', 'shell', [ + '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784' + ], error_ok=True) + + @pytest.mark.parametrize( + "spec, meth", + [ + (ServiceSpec('mgr'), CephadmOrchestrator.apply_mgr), + (ServiceSpec('crash'), CephadmOrchestrator.apply_crash), + (ServiceSpec('prometheus'), CephadmOrchestrator.apply_prometheus), + (ServiceSpec('grafana'), CephadmOrchestrator.apply_grafana), + (ServiceSpec('node-exporter'), CephadmOrchestrator.apply_node_exporter), + (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager), + (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror), + (ServiceSpec('cephfs-mirror'), CephadmOrchestrator.apply_rbd_mirror), + (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds), + (ServiceSpec( + 'mds', service_id='fsname', + placement=PlacementSpec( + hosts=[HostPlacementSpec( + hostname='test', + name='fsname', + network='' + )] + ) + ), CephadmOrchestrator.apply_mds), + (RGWSpec(service_id='foo'), CephadmOrchestrator.apply_rgw), + (RGWSpec( + service_id='bar', + rgw_realm='realm', rgw_zone='zone', + placement=PlacementSpec( + hosts=[HostPlacementSpec( + hostname='test', + name='bar', + network='' + )] + ) + ), CephadmOrchestrator.apply_rgw), + (NFSServiceSpec( + service_id='name', + ), CephadmOrchestrator.apply_nfs), + (IscsiServiceSpec( + service_id='name', + pool='pool', + api_user='user', + api_password='password' + ), CephadmOrchestrator.apply_iscsi), + (CustomContainerSpec( + service_id='hello-world', + image='docker.io/library/hello-world:latest', + uid=65534, + gid=65534, + dirs=['foo/bar'], + files={ + 'foo/bar/xyz.conf': 'aaa\nbbb' + }, + bind_mounts=[[ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ]], + volume_mounts={ + 'foo/bar': '/foo/bar:Z' + }, + args=['--no-healthcheck'], + envs=['SECRET=password'], + ports=[8080, 8443] + ), CephadmOrchestrator.apply_container), + (ServiceSpec('cephadm-exporter'), CephadmOrchestrator.apply_cephadm_exporter), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test')) + @mock.patch("subprocess.run", None) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) + @mock.patch("subprocess.run", mock.MagicMock()) + def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec, meth, 'test'): + pass + + @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test')) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator): + spec = MDSSpec('mds', service_id='fsname', config={'test': 'foo'}) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec, host='test'): + ret, out, err = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': spec.service_name(), + 'key': 'mds_join_fs', + }) + assert out == 'fsname' + ret, out, err = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': spec.service_name(), + 'key': 'mds_join_fs', + }) + assert not out + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop") + def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator): + spec = MDSSpec( + 'mds', + service_id='fsname', + placement=PlacementSpec(hosts=['host1', 'host2']), + config={'test': 'foo'} + ) + with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'): + c = cephadm_module.apply_mds(spec) + out = wait(cephadm_module, c) + match_glob(out, "Scheduled mds.fsname update...") + CephadmServe(cephadm_module)._apply_all_services() + + [daemon] = cephadm_module.cache.daemons['host1'].keys() + + spec.placement.set_hosts(['host2']) + + ok_to_stop.side_effect = False + + c = cephadm_module.apply_mds(spec) + out = wait(cephadm_module, c) + match_glob(out, "Scheduled mds.fsname update...") + CephadmServe(cephadm_module)._apply_all_services() + + ok_to_stop.assert_called_with([daemon[4:]], force=True) + + assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop + assert_rm_daemon(cephadm_module, spec.service_name(), 'host2') + + @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") + @mock.patch("remoto.process.check") + def test_offline(self, _check, _get_connection, cephadm_module): + _check.return_value = '{}', '', 0 + _get_connection.return_value = mock.Mock(), mock.Mock() + with with_host(cephadm_module, 'test'): + _get_connection.side_effect = HostNotFound + code, out, err = cephadm_module.check_host('test') + assert out == '' + assert "Host 'test' not found" in err + + out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json() + assert out == HostSpec('test', '1::4', status='Offline').to_json() + + _get_connection.side_effect = None + assert CephadmServe(cephadm_module)._check_host('test') is None + out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json() + assert out == HostSpec('test', '1::4').to_json() + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_dont_touch_offline_or_maintenance_host_daemons(self, cephadm_module): + # test daemons on offline/maint hosts not removed when applying specs + # test daemons not added to hosts in maint/offline state + with with_host(cephadm_module, 'test1'): + with with_host(cephadm_module, 'test2'): + with with_host(cephadm_module, 'test3'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*'))): + # should get a mgr on all 3 hosts + # CephadmServe(cephadm_module)._apply_all_services() + assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3 + + # put one host in offline state and one host in maintenance state + cephadm_module.offline_hosts = {'test2'} + cephadm_module.inventory._inventory['test3']['status'] = 'maintenance' + cephadm_module.inventory.save() + + # being in offline/maint mode should disqualify hosts from being + # candidates for scheduling + candidates = [ + h.hostname for h in cephadm_module._schedulable_hosts()] + assert 'test2' in candidates + assert 'test3' in candidates + + unreachable = [h.hostname for h in cephadm_module._unreachable_hosts()] + assert 'test2' in unreachable + assert 'test3' in unreachable + + with with_service(cephadm_module, ServiceSpec('crash', placement=PlacementSpec(host_pattern='*'))): + # re-apply services. No mgr should be removed from maint/offline hosts + # crash daemon should only be on host not in maint/offline mode + CephadmServe(cephadm_module)._apply_all_services() + assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3 + assert len(cephadm_module.cache.get_daemons_by_type('crash')) == 1 + + cephadm_module.offline_hosts = {} + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_enter_success(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.return_value = [''], ['something\nsuccess - systemd target xxx disabled'], 0 + _host_ok.return_value = 0, 'it is okay' + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname)) + # should not raise an error + retval = cephadm_module.enter_host_maintenance(hostname) + assert retval.result_str().startswith('Daemons for Ceph cluster') + assert not retval.exception_str + assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_enter_failure(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.return_value = [''], ['something\nfailed - disable the target'], 0 + _host_ok.return_value = 0, 'it is okay' + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname)) + + with pytest.raises(OrchestratorError, match='Failed to place host1 into maintenance for cluster fsid'): + cephadm_module.enter_host_maintenance(hostname) + + assert not cephadm_module.inventory._inventory[hostname]['status'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_exit_success(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.return_value = [''], [ + 'something\nsuccess - systemd target xxx enabled and started'], 0 + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance')) + # should not raise an error + retval = cephadm_module.exit_host_maintenance(hostname) + assert retval.result_str().startswith('Ceph cluster') + assert not retval.exception_str + assert not cephadm_module.inventory._inventory[hostname]['status'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_exit_failure(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.return_value = [''], ['something\nfailed - unable to enable the target'], 0 + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance')) + + with pytest.raises(OrchestratorError, match='Failed to exit maintenance state for host host1, cluster fsid'): + cephadm_module.exit_host_maintenance(hostname) + + assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance' + + def test_stale_connections(self, cephadm_module): + class Connection(object): + """ + A mocked connection class that only allows the use of the connection + once. If you attempt to use it again via a _check, it'll explode (go + boom!). + + The old code triggers the boom. The new code checks the has_connection + and will recreate the connection. + """ + fuse = False + + @ staticmethod + def has_connection(): + return False + + def import_module(self, *args, **kargs): + return mock.Mock() + + @ staticmethod + def exit(): + pass + + def _check(conn, *args, **kargs): + if conn.fuse: + raise Exception("boom: connection is dead") + else: + conn.fuse = True + return '{}', [], 0 + with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]): + with mock.patch("remoto.process.check", _check): + with with_host(cephadm_module, 'test', refresh_hosts=False): + code, out, err = cephadm_module.check_host('test') + # First should succeed. + assert err == '' + + # On second it should attempt to reuse the connection, where the + # connection is "down" so will recreate the connection. The old + # code will blow up here triggering the BOOM! + code, out, err = cephadm_module.check_host('test') + assert err == '' + + @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") + @mock.patch("remoto.process.check") + @mock.patch("cephadm.module.CephadmServe._write_remote_file") + def test_etc_ceph(self, _write_file, _check, _get_connection, cephadm_module): + _get_connection.return_value = mock.Mock(), mock.Mock() + _check.return_value = '{}', '', 0 + _write_file.return_value = None + + assert cephadm_module.manage_etc_ceph_ceph_conf is False + + with with_host(cephadm_module, 'test'): + assert '/etc/ceph/ceph.conf' not in cephadm_module.cache.get_host_client_files('test') + + with with_host(cephadm_module, 'test'): + cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True) + cephadm_module.config_notify() + assert cephadm_module.manage_etc_ceph_ceph_conf is True + + CephadmServe(cephadm_module)._refresh_hosts_and_daemons() + _write_file.assert_called_with('test', '/etc/ceph/ceph.conf', b'', + 0o644, 0, 0) + + assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test') + + # set extra config and expect that we deploy another ceph.conf + cephadm_module._set_extra_ceph_conf('[mon]\nk=v') + CephadmServe(cephadm_module)._refresh_hosts_and_daemons() + _write_file.assert_called_with('test', '/etc/ceph/ceph.conf', + b'\n\n[mon]\nk=v\n', 0o644, 0, 0) + + # reload + cephadm_module.cache.last_client_files = {} + cephadm_module.cache.load() + + assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test') + + # Make sure, _check_daemons does a redeploy due to monmap change: + before_digest = cephadm_module.cache.get_host_client_files('test')[ + '/etc/ceph/ceph.conf'][0] + cephadm_module._set_extra_ceph_conf('[mon]\nk2=v2') + CephadmServe(cephadm_module)._refresh_hosts_and_daemons() + after_digest = cephadm_module.cache.get_host_client_files('test')[ + '/etc/ceph/ceph.conf'][0] + assert before_digest != after_digest + + def test_etc_ceph_init(self): + with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m: + assert m.manage_etc_ceph_ceph_conf is True + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + def check_registry_credentials(url, username, password): + assert json.loads(cephadm_module.get_store('registry_credentials')) == { + 'url': url, 'username': username, 'password': password} + + _run_cephadm.return_value = '{}', '', 0 + with with_host(cephadm_module, 'test'): + # test successful login with valid args + code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password') + assert out == 'registry login scheduled' + assert err == '' + check_registry_credentials('test-url', 'test-user', 'test-password') + + # test bad login attempt with invalid args + code, out, err = cephadm_module.registry_login('bad-args') + assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> " + "or -i <login credentials json file>") + check_registry_credentials('test-url', 'test-user', 'test-password') + + # test bad login using invalid json file + code, out, err = cephadm_module.registry_login( + None, None, None, '{"bad-json": "bad-json"}') + assert err == ("json provided for custom registry login did not include all necessary fields. " + "Please setup json file as\n" + "{\n" + " \"url\": \"REGISTRY_URL\",\n" + " \"username\": \"REGISTRY_USERNAME\",\n" + " \"password\": \"REGISTRY_PASSWORD\"\n" + "}\n") + check_registry_credentials('test-url', 'test-user', 'test-password') + + # test good login using valid json file + good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", " + " \"password\": \"" + "json-pass" + "\"}") + code, out, err = cephadm_module.registry_login(None, None, None, good_json) + assert out == 'registry login scheduled' + assert err == '' + check_registry_credentials('json-url', 'json-user', 'json-pass') + + # test bad login where args are valid but login command fails + _run_cephadm.return_value = '{}', 'error', 1 + code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password') + assert err == 'Host test failed to login to fail-url as fail-user with given password' + check_registry_credentials('json-url', 'json-user', 'json-pass') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({ + 'image_id': 'image_id', + 'repo_digests': ['image@repo_digest'], + }))) + @pytest.mark.parametrize("use_repo_digest", + [ + False, + True + ]) + def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator): + cephadm_module.use_repo_digest = use_repo_digest + + with with_host(cephadm_module, 'test', refresh_hosts=False): + cephadm_module.set_container_image('global', 'image') + + if use_repo_digest: + + CephadmServe(cephadm_module).convert_tags_to_repo_digest() + + _, image, _ = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': 'global', + 'key': 'container_image', + }) + if use_repo_digest: + assert image == 'image@repo_digest' + else: + assert image == 'image' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch +Traceback (most recent call last): + File "<stdin>", line 6112, in <module> + File "<stdin>", line 1299, in _infer_fsid + File "<stdin>", line 1382, in _infer_image + File "<stdin>", line 3612, in command_ceph_volume + File "<stdin>", line 1061, in call_throws""" + + with with_host(cephadm_module, 'test'): + _run_cephadm.reset_mock() + _run_cephadm.side_effect = OrchestratorError(error_message) + + s = CephadmServe(cephadm_module)._refresh_host_devices('test') + assert s == 'host test `cephadm ceph-volume` failed: ' + error_message + + assert _run_cephadm.mock_calls == [ + mock.call('test', 'osd', 'ceph-volume', + ['--', 'inventory', '--format=json-pretty', '--filter-for-batch'], image='', + no_fsid=False), + mock.call('test', 'osd', 'ceph-volume', + ['--', 'inventory', '--format=json-pretty'], image='', + no_fsid=False), + ] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_activate_datadevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test', refresh_hosts=False): + with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1): + pass + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_activate_datadevice_fail(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test', refresh_hosts=False): + cephadm_module.mock_store_set('_ceph_get', 'osd_map', { + 'osds': [ + { + 'osd': 1, + 'up_from': 0, + 'uuid': 'uuid' + } + ] + }) + + ceph_volume_lvm_list = { + '1': [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }] + } + _run_cephadm.reset_mock(return_value=True) + + def _r_c(*args, **kwargs): + if 'ceph-volume' in args: + return (json.dumps(ceph_volume_lvm_list), '', 0) + else: + assert 'deploy' in args + raise OrchestratorError("let's fail somehow") + _run_cephadm.side_effect = _r_c + assert cephadm_module._osd_activate( + ['test']).stderr == "let's fail somehow" + with pytest.raises(AssertionError): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': 'osd.1', + }) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_activate_datadevice_dbdevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test', refresh_hosts=False): + + def _ceph_volume_list(s, host, entity, cmd, **kwargs): + logging.info(f'ceph-volume cmd: {cmd}') + if 'raw' in cmd: + return json.dumps({ + "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": { + "ceph_fsid": "64c84f19-fe1d-452a-a731-ab19dc144aa8", + "device": "/dev/loop0", + "osd_id": 21, + "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5", + "type": "bluestore" + }, + }), '', 0 + if 'lvm' in cmd: + return json.dumps({ + '1': [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }, { + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'db' + }] + }), '', 0 + return '{}', '', 0 + + with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1, ceph_volume_lvm_list=_ceph_volume_list): + pass + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_count(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + dg = DriveGroupSpec(service_id='', data_devices=DeviceSelection(all=True)) + with with_host(cephadm_module, 'test', refresh_hosts=False): + with with_service(cephadm_module, dg, host='test'): + with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1): + assert wait(cephadm_module, cephadm_module.describe_service())[0].size == 1 + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_host_rm_last_admin(self, cephadm_module: CephadmOrchestrator): + with pytest.raises(OrchestratorError): + with with_host(cephadm_module, 'test', refresh_hosts=False, rm_with_force=False): + cephadm_module.inventory.add_label('test', '_admin') + pass + assert False + with with_host(cephadm_module, 'test1', refresh_hosts=False, rm_with_force=True): + with with_host(cephadm_module, 'test2', refresh_hosts=False, rm_with_force=False): + cephadm_module.inventory.add_label('test2', '_admin') diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py new file mode 100644 index 000000000..327c12d2a --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_completion.py @@ -0,0 +1,40 @@ +import pytest + +from ..module import forall_hosts + + +class TestCompletion(object): + + @pytest.mark.parametrize("input,expected", [ + ([], []), + ([1], ["(1,)"]), + (["hallo"], ["('hallo',)"]), + ("hi", ["('h',)", "('i',)"]), + (list(range(5)), [str((x, )) for x in range(5)]), + ([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]), + ]) + def test_async_map(self, input, expected, cephadm_module): + @forall_hosts + def run_forall(*args): + return str(args) + assert run_forall(input) == expected + + @pytest.mark.parametrize("input,expected", [ + ([], []), + ([1], ["(1,)"]), + (["hallo"], ["('hallo',)"]), + ("hi", ["('h',)", "('i',)"]), + (list(range(5)), [str((x, )) for x in range(5)]), + ([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]), + ]) + def test_async_map_self(self, input, expected, cephadm_module): + class Run(object): + def __init__(self): + self.attr = 1 + + @forall_hosts + def run_forall(self, *args): + assert self.attr == 1 + return str(args) + + assert Run().run_forall(input) == expected diff --git a/src/pybind/mgr/cephadm/tests/test_configchecks.py b/src/pybind/mgr/cephadm/tests/test_configchecks.py new file mode 100644 index 000000000..3cae0a27d --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_configchecks.py @@ -0,0 +1,668 @@ +import copy +import json +import logging +import ipaddress +import pytest +import uuid + +from time import time as now + +from ..configchecks import CephadmConfigChecks +from ..inventory import HostCache +from ..upgrade import CephadmUpgrade, UpgradeState +from orchestrator import DaemonDescription + +from typing import List, Dict, Any, Optional + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +host_sample = { + "arch": "x86_64", + "bios_date": "04/01/2014", + "bios_version": "F2", + "cpu_cores": 16, + "cpu_count": 2, + "cpu_load": { + "15min": 0.0, + "1min": 0.01, + "5min": 0.01 + }, + "cpu_model": "Intel® Xeon® Processor E5-2698 v3", + "cpu_threads": 64, + "flash_capacity": "4.0TB", + "flash_capacity_bytes": 4000797868032, + "flash_count": 2, + "flash_list": [ + { + "description": "ATA CT2000MX500SSD1 (2.0TB)", + "dev_name": "sda", + "disk_size_bytes": 2000398934016, + "model": "CT2000MX500SSD1", + "rev": "023", + "vendor": "ATA", + "wwid": "t10.ATA CT2000MX500SSD1 193023156DE0" + }, + { + "description": "ATA CT2000MX500SSD1 (2.0TB)", + "dev_name": "sdb", + "disk_size_bytes": 2000398934016, + "model": "CT2000MX500SSD1", + "rev": "023", + "vendor": "ATA", + "wwid": "t10.ATA CT2000MX500SSD1 193023156DE0" + }, + ], + "hdd_capacity": "16.0TB", + "hdd_capacity_bytes": 16003148120064, + "hdd_count": 4, + "hdd_list": [ + { + "description": "ST4000VN008-2DR1 (4.0TB)", + "dev_name": "sdc", + "disk_size_bytes": 4000787030016, + "model": "ST4000VN008-2DR1", + "rev": "SC60", + "vendor": "ATA", + "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ" + }, + { + "description": "ST4000VN008-2DR1 (4.0TB)", + "dev_name": "sdd", + "disk_size_bytes": 4000787030016, + "model": "ST4000VN008-2DR1", + "rev": "SC60", + "vendor": "ATA", + "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ" + }, + { + "description": "ST4000VN008-2DR1 (4.0TB)", + "dev_name": "sde", + "disk_size_bytes": 4000787030016, + "model": "ST4000VN008-2DR1", + "rev": "SC60", + "vendor": "ATA", + "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ" + }, + { + "description": "ST4000VN008-2DR1 (4.0TB)", + "dev_name": "sdf", + "disk_size_bytes": 4000787030016, + "model": "ST4000VN008-2DR1", + "rev": "SC60", + "vendor": "ATA", + "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ" + }, + ], + "hostname": "dummy", + "interfaces": { + "eth0": { + "driver": "e1000e", + "iftype": "physical", + "ipv4_address": "10.7.17.1/24", + "ipv6_address": "fe80::215:17ff:feab:50e2/64", + "lower_devs_list": [], + "mtu": 9000, + "nic_type": "ethernet", + "operstate": "up", + "speed": 1000, + "upper_devs_list": [], + }, + "eth1": { + "driver": "e1000e", + "iftype": "physical", + "ipv4_address": "10.7.18.1/24", + "ipv6_address": "fe80::215:17ff:feab:50e2/64", + "lower_devs_list": [], + "mtu": 9000, + "nic_type": "ethernet", + "operstate": "up", + "speed": 1000, + "upper_devs_list": [], + }, + "eth2": { + "driver": "r8169", + "iftype": "physical", + "ipv4_address": "10.7.19.1/24", + "ipv6_address": "fe80::76d4:35ff:fe58:9a79/64", + "lower_devs_list": [], + "mtu": 1500, + "nic_type": "ethernet", + "operstate": "up", + "speed": 1000, + "upper_devs_list": [] + }, + }, + "kernel": "4.18.0-240.10.1.el8_3.x86_64", + "kernel_parameters": { + "net.ipv4.ip_nonlocal_bind": "0", + }, + "kernel_security": { + "SELINUX": "enforcing", + "SELINUXTYPE": "targeted", + "description": "SELinux: Enabled(enforcing, targeted)", + "type": "SELinux" + }, + "memory_available_kb": 19489212, + "memory_free_kb": 245164, + "memory_total_kb": 32900916, + "model": "StorageHeavy", + "nic_count": 3, + "operating_system": "Red Hat Enterprise Linux 8.3 (Ootpa)", + "subscribed": "Yes", + "system_uptime": 777600.0, + "timestamp": now(), + "vendor": "Ceph Servers Inc", +} + + +def role_list(n: int) -> List[str]: + if n == 1: + return ['mon', 'mgr', 'osd'] + if n in [2, 3]: + return ['mon', 'mds', 'osd'] + + return ['osd'] + + +def generate_testdata(count: int = 10, public_network: str = '10.7.17.0/24', cluster_network: str = '10.7.18.0/24'): + # public network = eth0, cluster_network = eth1 + assert count > 3 + assert public_network + num_disks = host_sample['hdd_count'] + hosts = {} + daemons = {} + daemon_to_host = {} + osd_num = 0 + public_netmask = public_network.split('/')[1] + cluster_ip_list = [] + cluster_netmask = '' + + public_ip_list = [str(i) for i in list(ipaddress.ip_network(public_network).hosts())] + if cluster_network: + cluster_ip_list = [str(i) for i in list(ipaddress.ip_network(cluster_network).hosts())] + cluster_netmask = cluster_network.split('/')[1] + + for n in range(1, count + 1, 1): + + new_host = copy.deepcopy(host_sample) + hostname = f"node-{n}.ceph.com" + + new_host['hostname'] = hostname + new_host['interfaces']['eth0']['ipv4_address'] = f"{public_ip_list.pop(0)}/{public_netmask}" + if cluster_ip_list: + new_host['interfaces']['eth1']['ipv4_address'] = f"{cluster_ip_list.pop(0)}/{cluster_netmask}" + else: + new_host['interfaces']['eth1']['ipv4_address'] = '' + + hosts[hostname] = new_host + daemons[hostname] = {} + for r in role_list(n): + name = '' + if r == 'osd': + for n in range(num_disks): + osd = DaemonDescription( + hostname=hostname, daemon_type='osd', daemon_id=osd_num) + name = f"osd.{osd_num}" + daemons[hostname][name] = osd + daemon_to_host[name] = hostname + osd_num += 1 + else: + name = f"{r}.{hostname}" + daemons[hostname][name] = DaemonDescription( + hostname=hostname, daemon_type=r, daemon_id=hostname) + daemon_to_host[name] = hostname + + logger.debug(f"daemon to host lookup - {json.dumps(daemon_to_host)}") + return hosts, daemons, daemon_to_host + + +@pytest.fixture() +def mgr(): + """Provide a fake ceph mgr object preloaded with a configuration""" + mgr = FakeMgr() + mgr.cache.facts, mgr.cache.daemons, mgr.daemon_to_host = \ + generate_testdata(public_network='10.9.64.0/24', cluster_network='') + mgr.module_option.update({ + "config_checks_enabled": True, + }) + yield mgr + + +class FakeMgr: + + def __init__(self): + self.datastore = {} + self.module_option = {} + self.health_checks = {} + self.default_version = 'quincy' + self.version_overrides = {} + self.daemon_to_host = {} + + self.cache = HostCache(self) + self.upgrade = CephadmUpgrade(self) + + def set_health_checks(self, checks: dict): + return + + def get_module_option(self, keyname: str) -> Optional[str]: + return self.module_option.get(keyname, None) + + def set_module_option(self, keyname: str, value: str) -> None: + return None + + def get_store(self, keyname: str, default=None) -> Optional[str]: + return self.datastore.get(keyname, None) + + def set_store(self, keyname: str, value: str) -> None: + self.datastore[keyname] = value + return None + + def _ceph_get_server(self) -> None: + pass + + def get_metadata(self, daemon_type: str, daemon_id: str) -> Dict[str, Any]: + key = f"{daemon_type}.{daemon_id}" + if key in self.version_overrides: + logger.debug(f"override applied for {key}") + version_str = self.version_overrides[key] + else: + version_str = self.default_version + + return {"ceph_release": version_str, "hostname": self.daemon_to_host[key]} + + def list_servers(self) -> List[Dict[str, List[Dict[str, str]]]]: + num_disks = host_sample['hdd_count'] + osd_num = 0 + service_map = [] + + for hostname in self.cache.facts: + + host_num = int(hostname.split('.')[0].split('-')[1]) + svc_list = [] + for r in role_list(host_num): + if r == 'osd': + for _n in range(num_disks): + svc_list.append({ + "type": "osd", + "id": osd_num, + }) + osd_num += 1 + else: + svc_list.append({ + "type": r, + "id": hostname, + }) + + service_map.append({"services": svc_list}) + logger.debug(f"services map - {json.dumps(service_map)}") + return service_map + + def use_repo_digest(self) -> None: + return None + + +class TestConfigCheck: + + def test_to_json(self, mgr): + checker = CephadmConfigChecks(mgr) + out = checker.to_json() + assert out + assert len(out) == len(checker.health_checks) + + def test_lookup_check(self, mgr): + checker = CephadmConfigChecks(mgr) + check = checker.lookup_check('osd_mtu_size') + logger.debug(json.dumps(check.to_json())) + assert check + assert check.healthcheck_name == "CEPHADM_CHECK_MTU" + + def test_old_checks_removed(self, mgr): + mgr.datastore.update({ + "config_checks": '{"bogus_one": "enabled", "bogus_two": "enabled", ' + '"kernel_security": "enabled", "public_network": "enabled", ' + '"kernel_version": "enabled", "network_missing": "enabled", ' + '"osd_mtu_size": "enabled", "osd_linkspeed": "enabled", ' + '"os_subscription": "enabled", "ceph_release": "enabled"}' + }) + checker = CephadmConfigChecks(mgr) + raw = mgr.get_store('config_checks') + checks = json.loads(raw) + assert "bogus_one" not in checks + assert "bogus_two" not in checks + assert len(checks) == len(checker.health_checks) + + def test_new_checks(self, mgr): + mgr.datastore.update({ + "config_checks": '{"kernel_security": "enabled", "public_network": "enabled", ' + '"osd_mtu_size": "enabled", "osd_linkspeed": "enabled"}' + }) + checker = CephadmConfigChecks(mgr) + raw = mgr.get_store('config_checks') + checks = json.loads(raw) + assert len(checks) == len(checker.health_checks) + + def test_no_issues(self, mgr): + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + checker.run_checks() + + assert not mgr.health_checks + + def test_no_public_network(self, mgr): + bad_node = mgr.cache.facts['node-1.ceph.com'] + bad_node['interfaces']['eth0']['ipv4_address'] = "192.168.1.20/24" + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + checker.run_checks() + logger.debug(mgr.health_checks) + assert len(mgr.health_checks) == 1 + assert 'CEPHADM_CHECK_PUBLIC_MEMBERSHIP' in mgr.health_checks + assert mgr.health_checks['CEPHADM_CHECK_PUBLIC_MEMBERSHIP']['detail'][0] == \ + 'node-1.ceph.com does not have an interface on any public network' + + def test_missing_networks(self, mgr): + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.66.0/24'] + checker.run_checks() + + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert len(mgr.health_checks) == 1 + assert 'CEPHADM_CHECK_NETWORK_MISSING' in mgr.health_checks + assert mgr.health_checks['CEPHADM_CHECK_NETWORK_MISSING']['detail'][0] == \ + "10.9.66.0/24 not found on any host in the cluster" + + def test_bad_mtu_single(self, mgr): + + bad_node = mgr.cache.facts['node-1.ceph.com'] + bad_node['interfaces']['eth0']['mtu'] = 1500 + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert "CEPHADM_CHECK_MTU" in mgr.health_checks and len(mgr.health_checks) == 1 + assert mgr.health_checks['CEPHADM_CHECK_MTU']['detail'][0] == \ + 'host node-1.ceph.com(eth0) is using MTU 1500 on 10.9.64.0/24, NICs on other hosts use 9000' + + def test_bad_mtu_multiple(self, mgr): + + for n in [1, 5]: + bad_node = mgr.cache.facts[f'node-{n}.ceph.com'] + bad_node['interfaces']['eth0']['mtu'] = 1500 + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert "CEPHADM_CHECK_MTU" in mgr.health_checks and len(mgr.health_checks) == 1 + assert mgr.health_checks['CEPHADM_CHECK_MTU']['count'] == 2 + + def test_bad_linkspeed_single(self, mgr): + + bad_node = mgr.cache.facts['node-1.ceph.com'] + bad_node['interfaces']['eth0']['speed'] = 100 + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert mgr.health_checks + assert "CEPHADM_CHECK_LINKSPEED" in mgr.health_checks and len(mgr.health_checks) == 1 + assert mgr.health_checks['CEPHADM_CHECK_LINKSPEED']['detail'][0] == \ + 'host node-1.ceph.com(eth0) has linkspeed of 100 on 10.9.64.0/24, NICs on other hosts use 1000' + + def test_super_linkspeed_single(self, mgr): + + bad_node = mgr.cache.facts['node-1.ceph.com'] + bad_node['interfaces']['eth0']['speed'] = 10000 + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert not mgr.health_checks + + def test_release_mismatch_single(self, mgr): + + mgr.version_overrides = { + "osd.1": "pacific", + } + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + assert mgr.health_checks + assert "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and len(mgr.health_checks) == 1 + assert mgr.health_checks['CEPHADM_CHECK_CEPH_RELEASE']['detail'][0] == \ + 'osd.1 is running pacific (majority of cluster is using quincy)' + + def test_release_mismatch_multi(self, mgr): + + mgr.version_overrides = { + "osd.1": "pacific", + "osd.5": "octopus", + } + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + assert mgr.health_checks + assert "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and len(mgr.health_checks) == 1 + assert len(mgr.health_checks['CEPHADM_CHECK_CEPH_RELEASE']['detail']) == 2 + + def test_kernel_mismatch(self, mgr): + + bad_host = mgr.cache.facts['node-1.ceph.com'] + bad_host['kernel'] = "5.10.18.0-241.10.1.el8.x86_64" + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + assert len(mgr.health_checks) == 1 + assert 'CEPHADM_CHECK_KERNEL_VERSION' in mgr.health_checks + assert mgr.health_checks['CEPHADM_CHECK_KERNEL_VERSION']['detail'][0] == \ + "host node-1.ceph.com running kernel 5.10, majority of hosts(9) running 4.18" + assert mgr.health_checks['CEPHADM_CHECK_KERNEL_VERSION']['count'] == 1 + + def test_inconsistent_subscription(self, mgr): + + bad_host = mgr.cache.facts['node-5.ceph.com'] + bad_host['subscribed'] = "no" + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + assert len(mgr.health_checks) == 1 + assert "CEPHADM_CHECK_SUBSCRIPTION" in mgr.health_checks + assert mgr.health_checks['CEPHADM_CHECK_SUBSCRIPTION']['detail'][0] == \ + "node-5.ceph.com does not have an active subscription" + + def test_kernel_security_inconsistent(self, mgr): + + bad_node = mgr.cache.facts['node-3.ceph.com'] + bad_node['kernel_security'] = { + "SELINUX": "permissive", + "SELINUXTYPE": "targeted", + "description": "SELinux: Enabled(permissive, targeted)", + "type": "SELinux" + } + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + assert len(mgr.health_checks) == 1 + assert 'CEPHADM_CHECK_KERNEL_LSM' in mgr.health_checks + assert mgr.health_checks['CEPHADM_CHECK_KERNEL_LSM']['detail'][0] == \ + "node-3.ceph.com has inconsistent KSM settings compared to the majority of hosts(9) in the cluster" + + def test_release_and_bad_mtu(self, mgr): + + mgr.version_overrides = { + "osd.1": "pacific", + } + bad_node = mgr.cache.facts['node-1.ceph.com'] + bad_node['interfaces']['eth0']['mtu'] = 1500 + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert mgr.health_checks + assert len(mgr.health_checks) == 2 + assert "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and \ + "CEPHADM_CHECK_MTU" in mgr.health_checks + + def test_release_mtu_LSM(self, mgr): + + mgr.version_overrides = { + "osd.1": "pacific", + } + bad_node1 = mgr.cache.facts['node-1.ceph.com'] + bad_node1['interfaces']['eth0']['mtu'] = 1500 + bad_node2 = mgr.cache.facts['node-3.ceph.com'] + bad_node2['kernel_security'] = { + "SELINUX": "permissive", + "SELINUXTYPE": "targeted", + "description": "SELinux: Enabled(permissive, targeted)", + "type": "SELinux" + } + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert mgr.health_checks + assert len(mgr.health_checks) == 3 + assert \ + "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and \ + "CEPHADM_CHECK_MTU" in mgr.health_checks and \ + "CEPHADM_CHECK_KERNEL_LSM" in mgr.health_checks + + def test_release_mtu_LSM_subscription(self, mgr): + + mgr.version_overrides = { + "osd.1": "pacific", + } + bad_node1 = mgr.cache.facts['node-1.ceph.com'] + bad_node1['interfaces']['eth0']['mtu'] = 1500 + bad_node1['subscribed'] = "no" + bad_node2 = mgr.cache.facts['node-3.ceph.com'] + bad_node2['kernel_security'] = { + "SELINUX": "permissive", + "SELINUXTYPE": "targeted", + "description": "SELinux: Enabled(permissive, targeted)", + "type": "SELinux" + } + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(json.dumps(mgr.health_checks)) + logger.info(checker.subnet_lookup) + assert mgr.health_checks + assert len(mgr.health_checks) == 4 + assert \ + "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and \ + "CEPHADM_CHECK_MTU" in mgr.health_checks and \ + "CEPHADM_CHECK_KERNEL_LSM" in mgr.health_checks and \ + "CEPHADM_CHECK_SUBSCRIPTION" in mgr.health_checks + + def test_skip_release_during_upgrade(self, mgr): + mgr.upgrade.upgrade_state = UpgradeState.from_json({ + 'target_name': 'wah', + 'progress_id': str(uuid.uuid4()), + 'target_id': 'wah', + 'error': '', + 'paused': False, + }) + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(f"{checker.skipped_checks_count} skipped check(s): {checker.skipped_checks}") + assert checker.skipped_checks_count == 1 + assert 'ceph_release' in checker.skipped_checks + + def test_skip_when_disabled(self, mgr): + mgr.module_option.update({ + "config_checks_enabled": "false" + }) + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(checker.active_checks) + logger.info(checker.defined_checks) + assert checker.active_checks_count == 0 + + def test_skip_mtu_checks(self, mgr): + mgr.datastore.update({ + 'config_checks': '{"osd_mtu_size": "disabled"}' + }) + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(checker.active_checks) + logger.info(checker.defined_checks) + assert 'osd_mtu_size' not in checker.active_checks + assert checker.defined_checks == 8 and checker.active_checks_count == 7 + + def test_skip_mtu_lsm_checks(self, mgr): + mgr.datastore.update({ + 'config_checks': '{"osd_mtu_size": "disabled", "kernel_security": "disabled"}' + }) + + checker = CephadmConfigChecks(mgr) + checker.cluster_network_list = [] + checker.public_network_list = ['10.9.64.0/24'] + + checker.run_checks() + logger.info(checker.active_checks) + logger.info(checker.defined_checks) + assert 'osd_mtu_size' not in checker.active_checks and \ + 'kernel_security' not in checker.active_checks + assert checker.defined_checks == 8 and checker.active_checks_count == 6 + assert not mgr.health_checks diff --git a/src/pybind/mgr/cephadm/tests/test_facts.py b/src/pybind/mgr/cephadm/tests/test_facts.py new file mode 100644 index 000000000..6c33f5368 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_facts.py @@ -0,0 +1,10 @@ +from ..import CephadmOrchestrator + +from .fixtures import wait + + +def test_facts(cephadm_module: CephadmOrchestrator): + facts = {'node-1.ceph.com': {'bios_version': 'F2', 'cpu_cores': 16}} + cephadm_module.cache.facts = facts + ret_facts = cephadm_module.get_facts('node-1.ceph.com') + assert wait(cephadm_module, ret_facts) == [{'bios_version': 'F2', 'cpu_cores': 16}] diff --git a/src/pybind/mgr/cephadm/tests/test_migration.py b/src/pybind/mgr/cephadm/tests/test_migration.py new file mode 100644 index 000000000..b95f54f7c --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_migration.py @@ -0,0 +1,229 @@ +import json + +from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec +from ceph.utils import datetime_to_str, datetime_now +from cephadm import CephadmOrchestrator +from cephadm.inventory import SPEC_STORE_PREFIX +from cephadm.migrations import LAST_MIGRATION +from cephadm.tests.fixtures import _run_cephadm, wait, with_host +from cephadm.serve import CephadmServe +from tests import mock + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_scheduler(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1', refresh_hosts=False): + with with_host(cephadm_module, 'host2', refresh_hosts=False): + + # emulate the old scheduler: + c = cephadm_module.apply_rgw( + ServiceSpec('rgw', 'r.z', placement=PlacementSpec(host_pattern='*', count=2)) + ) + assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...' + + # with pytest.raises(OrchestratorError, match="cephadm migration still ongoing. Please wait, until the migration is complete."): + CephadmServe(cephadm_module)._apply_all_services() + + cephadm_module.migration_current = 0 + cephadm_module.migration.migrate() + # assert we need all daemons. + assert cephadm_module.migration_current == 0 + + CephadmServe(cephadm_module)._refresh_hosts_and_daemons() + cephadm_module.migration.migrate() + + CephadmServe(cephadm_module)._apply_all_services() + + out = {o.hostname for o in wait(cephadm_module, cephadm_module.list_daemons())} + assert out == {'host1', 'host2'} + + c = cephadm_module.apply_rgw( + ServiceSpec('rgw', 'r.z', placement=PlacementSpec(host_pattern='host1', count=2)) + ) + assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...' + + # Sorry, for this hack, but I need to make sure, Migration thinks, + # we have updated all daemons already. + cephadm_module.cache.last_daemon_update['host1'] = datetime_now() + cephadm_module.cache.last_daemon_update['host2'] = datetime_now() + + cephadm_module.migration_current = 0 + cephadm_module.migration.migrate() + assert cephadm_module.migration_current >= 2 + + out = [o.spec.placement for o in wait( + cephadm_module, cephadm_module.describe_service())] + assert out == [PlacementSpec(count=2, hosts=[HostPlacementSpec( + hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])] + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({ + 'spec': { + 'service_type': 'mon', + 'service_id': 'wrong', + 'placement': { + 'hosts': ['host1'] + } + }, + 'created': datetime_to_str(datetime_now()), + }, sort_keys=True), + ) + + cephadm_module.spec_store.load() + + assert len(cephadm_module.spec_store.all_specs) == 1 + assert cephadm_module.spec_store.all_specs['mon.wrong'].service_name() == 'mon' + + cephadm_module.migration_current = 1 + cephadm_module.migration.migrate() + assert cephadm_module.migration_current >= 2 + + assert len(cephadm_module.spec_store.all_specs) == 1 + assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec( + service_type='mon', + unmanaged=True, + placement=PlacementSpec(hosts=['host1']) + ) + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon', json.dumps({ + 'spec': { + 'service_type': 'mon', + 'placement': { + 'count': 5, + } + }, + 'created': datetime_to_str(datetime_now()), + }, sort_keys=True), + ) + cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({ + 'spec': { + 'service_type': 'mon', + 'service_id': 'wrong', + 'placement': { + 'hosts': ['host1'] + } + }, + 'created': datetime_to_str(datetime_now()), + }, sort_keys=True), + ) + + cephadm_module.spec_store.load() + + assert len(cephadm_module.spec_store.all_specs) == 2 + assert cephadm_module.spec_store.all_specs['mon.wrong'].service_name() == 'mon' + assert cephadm_module.spec_store.all_specs['mon'].service_name() == 'mon' + + cephadm_module.migration_current = 1 + cephadm_module.migration.migrate() + assert cephadm_module.migration_current >= 2 + + assert len(cephadm_module.spec_store.all_specs) == 1 + assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec( + service_type='mon', + unmanaged=True, + placement=PlacementSpec(count=5) + ) + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_service_id_mds_one(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + cephadm_module.set_store(SPEC_STORE_PREFIX + 'mds', json.dumps({ + 'spec': { + 'service_type': 'mds', + 'placement': { + 'hosts': ['host1'] + } + }, + 'created': datetime_to_str(datetime_now()), + }, sort_keys=True), + ) + + cephadm_module.spec_store.load() + + # there is nothing to migrate, as the spec is gone now. + assert len(cephadm_module.spec_store.all_specs) == 0 + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_nfs_initial(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + cephadm_module.set_store( + SPEC_STORE_PREFIX + 'mds', + json.dumps({ + 'spec': { + 'service_type': 'nfs', + 'service_id': 'foo', + 'placement': { + 'hosts': ['host1'] + }, + 'spec': { + 'pool': 'mypool', + 'namespace': 'foons', + }, + }, + 'created': datetime_to_str(datetime_now()), + }, sort_keys=True), + ) + cephadm_module.migration_current = 1 + cephadm_module.spec_store.load() + + ls = json.loads(cephadm_module.get_store('nfs_migration_queue')) + assert ls == [['foo', 'mypool', 'foons']] + + cephadm_module.migration.migrate(True) + assert cephadm_module.migration_current == 2 + + cephadm_module.migration.migrate() + assert cephadm_module.migration_current == LAST_MIGRATION + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_nfs_initial_octopus(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + cephadm_module.set_store( + SPEC_STORE_PREFIX + 'mds', + json.dumps({ + 'spec': { + 'service_type': 'nfs', + 'service_id': 'ganesha-foo', + 'placement': { + 'hosts': ['host1'] + }, + 'spec': { + 'pool': 'mypool', + 'namespace': 'foons', + }, + }, + 'created': datetime_to_str(datetime_now()), + }, sort_keys=True), + ) + cephadm_module.migration_current = 1 + cephadm_module.spec_store.load() + + ls = json.loads(cephadm_module.get_store('nfs_migration_queue')) + assert ls == [['ganesha-foo', 'mypool', 'foons']] + + cephadm_module.migration.migrate(True) + assert cephadm_module.migration_current == 2 + + cephadm_module.migration.migrate() + assert cephadm_module.migration_current == LAST_MIGRATION + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) +def test_migrate_admin_client_keyring(cephadm_module: CephadmOrchestrator): + assert 'client.admin' not in cephadm_module.keys.keys + + cephadm_module.migration_current = 3 + cephadm_module.migration.migrate() + assert cephadm_module.migration_current == LAST_MIGRATION + + assert cephadm_module.keys.keys['client.admin'].placement.label == '_admin' diff --git a/src/pybind/mgr/cephadm/tests/test_osd_removal.py b/src/pybind/mgr/cephadm/tests/test_osd_removal.py new file mode 100644 index 000000000..6685fcb2a --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_osd_removal.py @@ -0,0 +1,298 @@ +import json + +from cephadm.services.osd import OSDRemovalQueue, OSD +import pytest +from tests import mock +from .fixtures import with_cephadm_module +from datetime import datetime + + +class MockOSD: + + def __init__(self, osd_id): + self.osd_id = osd_id + + +class TestOSDRemoval: + + @pytest.mark.parametrize( + "osd_id, osd_df, expected", + [ + # missing 'nodes' key + (1, dict(nodes=[]), -1), + # missing 'pgs' key + (1, dict(nodes=[dict(id=1)]), -1), + # id != osd_id + (1, dict(nodes=[dict(id=999, pgs=1)]), -1), + # valid + (1, dict(nodes=[dict(id=1, pgs=1)]), 1), + ] + ) + def test_get_pg_count(self, rm_util, osd_id, osd_df, expected): + with mock.patch("cephadm.services.osd.RemoveUtil.osd_df", return_value=osd_df): + assert rm_util.get_pg_count(osd_id) == expected + + @pytest.mark.parametrize( + "osds, ok_to_stop, expected", + [ + # no osd_ids provided + ([], [False], []), + # all osds are ok_to_stop + ([1, 2], [True], [1, 2]), + # osds are ok_to_stop after the second iteration + ([1, 2], [False, True], [2]), + # osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account), + # expected to get False + ([1, 2], [False, False], []), + ] + ) + def test_find_stop_threshold(self, rm_util, osds, ok_to_stop, expected): + with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop): + assert rm_util.find_osd_stop_threshold(osds) == expected + + def test_process_removal_queue(self, rm_util): + # TODO: ! + # rm_util.process_removal_queue() + pass + + @pytest.mark.parametrize( + "max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected", + [ + # drain one at a time, one already draining + (1, [1], [1], [True], 0), + # drain one at a time, none draining yet + (1, [], [1, 2, 3], [True, True, True], 1), + # drain one at a time, one already draining, none ok-to-stop + (1, [1], [1], [False], 0), + # drain one at a time, none draining, one ok-to-stop + (1, [], [1, 2, 3], [False, False, True], 1), + # drain three at a time, one already draining, all ok-to-stop + (3, [1], [1, 2, 3], [True, True, True], 2), + # drain two at a time, none already draining, none ok-to-stop + (2, [], [1, 2, 3], [False, False, False], 0), + # drain two at a time, none already draining, none idling + (2, [], [], [], 0), + ] + ) + def test_ready_to_drain_osds(self, max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected): + with with_cephadm_module({'max_osd_draining_count': max_osd_draining_count}) as m: + with mock.patch("cephadm.services.osd.OSDRemovalQueue.draining_osds", return_value=draining_osds): + with mock.patch("cephadm.services.osd.OSDRemovalQueue.idling_osds", return_value=idling_osds): + with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop): + removal_queue = OSDRemovalQueue(m) + assert len(removal_queue._ready_to_drain_osds()) == expected + + def test_ok_to_stop(self, rm_util): + rm_util.ok_to_stop([MockOSD(1)]) + rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']}, + error_ok=True) + + def test_safe_to_destroy(self, rm_util): + rm_util.safe_to_destroy([1]) + rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd safe-to-destroy', + 'ids': ['1']}, error_ok=True) + + def test_destroy_osd(self, rm_util): + rm_util.destroy_osd(1) + rm_util._run_mon_cmd.assert_called_with( + {'prefix': 'osd destroy-actual', 'id': 1, 'yes_i_really_mean_it': True}) + + def test_purge_osd(self, rm_util): + rm_util.purge_osd(1) + rm_util._run_mon_cmd.assert_called_with( + {'prefix': 'osd purge-actual', 'id': 1, 'yes_i_really_mean_it': True}) + + def test_load(self, cephadm_module, rm_util): + data = json.dumps([ + { + "osd_id": 35, + "started": True, + "draining": True, + "stopped": False, + "replace": False, + "force": False, + "zap": False, + "nodename": "node2", + "drain_started_at": "2020-09-14T11:41:53.960463", + "drain_stopped_at": None, + "drain_done_at": None, + "process_started_at": "2020-09-14T11:41:52.245832" + } + ]) + cephadm_module.set_store('osd_remove_queue', data) + cephadm_module.to_remove_osds.load_from_store() + + expected = OSDRemovalQueue(cephadm_module) + expected.osds.add(OSD(osd_id=35, remove_util=rm_util, draining=True)) + assert cephadm_module.to_remove_osds == expected + + +class TestOSD: + + def test_start(self, osd_obj): + assert osd_obj.started is False + osd_obj.start() + assert osd_obj.started is True + assert osd_obj.stopped is False + + def test_start_draining_purge(self, osd_obj): + assert osd_obj.draining is False + assert osd_obj.drain_started_at is None + ret = osd_obj.start_draining() + osd_obj.rm_util.reweight_osd.assert_called_with(osd_obj, 0.0) + assert isinstance(osd_obj.drain_started_at, datetime) + assert osd_obj.draining is True + assert osd_obj.replace is False + assert ret is True + + def test_start_draining_replace(self, osd_obj): + assert osd_obj.draining is False + assert osd_obj.drain_started_at is None + osd_obj.replace = True + ret = osd_obj.start_draining() + osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'out') + assert isinstance(osd_obj.drain_started_at, datetime) + assert osd_obj.draining is True + assert osd_obj.replace is True + assert ret is True + + def test_start_draining_stopped(self, osd_obj): + osd_obj.stopped = True + ret = osd_obj.start_draining() + assert osd_obj.drain_started_at is None + assert ret is False + assert osd_obj.draining is False + + def test_stop_draining_replace(self, osd_obj): + osd_obj.replace = True + ret = osd_obj.stop_draining() + osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'in') + assert isinstance(osd_obj.drain_stopped_at, datetime) + assert osd_obj.draining is False + assert ret is True + + def test_stop_draining_purge(self, osd_obj): + osd_obj.original_weight = 1.0 + ret = osd_obj.stop_draining() + osd_obj.rm_util.reweight_osd.assert_called_with(osd_obj, 1.0) + assert isinstance(osd_obj.drain_stopped_at, datetime) + assert osd_obj.draining is False + assert ret is True + + @mock.patch('cephadm.services.osd.OSD.stop_draining') + def test_stop(self, stop_draining_mock, osd_obj): + osd_obj.stop() + assert osd_obj.started is False + assert osd_obj.stopped is True + stop_draining_mock.assert_called_once() + + @pytest.mark.parametrize( + "draining, empty, expected", + [ + # must be !draining! and !not empty! to yield True + (True, not True, True), + # not draining and not empty + (False, not True, False), + # not draining and empty + (False, True, False), + # draining and empty + (True, True, False), + ] + ) + def test_is_draining(self, osd_obj, draining, empty, expected): + with mock.patch("cephadm.services.osd.OSD.is_empty", new_callable=mock.PropertyMock(return_value=empty)): + osd_obj.draining = draining + assert osd_obj.is_draining is expected + + @mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop") + def test_is_ok_to_stop(self, _, osd_obj): + osd_obj.is_ok_to_stop + osd_obj.rm_util.ok_to_stop.assert_called_once() + + @pytest.mark.parametrize( + "pg_count, expected", + [ + (0, True), + (1, False), + (9999, False), + (-1, False), + ] + ) + def test_is_empty(self, osd_obj, pg_count, expected): + with mock.patch("cephadm.services.osd.OSD.get_pg_count", return_value=pg_count): + assert osd_obj.is_empty is expected + + @mock.patch("cephadm.services.osd.RemoveUtil.safe_to_destroy") + def test_safe_to_destroy(self, _, osd_obj): + osd_obj.safe_to_destroy() + osd_obj.rm_util.safe_to_destroy.assert_called_once() + + @mock.patch("cephadm.services.osd.RemoveUtil.set_osd_flag") + def test_down(self, _, osd_obj): + osd_obj.down() + osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'down') + + @mock.patch("cephadm.services.osd.RemoveUtil.destroy_osd") + def test_destroy_osd(self, _, osd_obj): + osd_obj.destroy() + osd_obj.rm_util.destroy_osd.assert_called_once() + + @mock.patch("cephadm.services.osd.RemoveUtil.purge_osd") + def test_purge(self, _, osd_obj): + osd_obj.purge() + osd_obj.rm_util.purge_osd.assert_called_once() + + @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count") + def test_pg_count(self, _, osd_obj): + osd_obj.get_pg_count() + osd_obj.rm_util.get_pg_count.assert_called_once() + + def test_drain_status_human_not_started(self, osd_obj): + assert osd_obj.drain_status_human() == 'not started' + + def test_drain_status_human_started(self, osd_obj): + osd_obj.started = True + assert osd_obj.drain_status_human() == 'started' + + def test_drain_status_human_draining(self, osd_obj): + osd_obj.started = True + osd_obj.draining = True + assert osd_obj.drain_status_human() == 'draining' + + def test_drain_status_human_done(self, osd_obj): + osd_obj.started = True + osd_obj.draining = False + osd_obj.drain_done_at = datetime.utcnow() + assert osd_obj.drain_status_human() == 'done, waiting for purge' + + +class TestOSDRemovalQueue: + + def test_queue_size(self, osd_obj): + q = OSDRemovalQueue(mock.Mock()) + assert q.queue_size() == 0 + q.osds.add(osd_obj) + assert q.queue_size() == 1 + + @mock.patch("cephadm.services.osd.OSD.start") + @mock.patch("cephadm.services.osd.OSD.exists") + def test_enqueue(self, exist, start, osd_obj): + q = OSDRemovalQueue(mock.Mock()) + q.enqueue(osd_obj) + osd_obj.start.assert_called_once() + + @mock.patch("cephadm.services.osd.OSD.stop") + @mock.patch("cephadm.services.osd.OSD.exists") + def test_rm_raise(self, exist, stop, osd_obj): + q = OSDRemovalQueue(mock.Mock()) + with pytest.raises(KeyError): + q.rm(osd_obj) + osd_obj.stop.assert_called_once() + + @mock.patch("cephadm.services.osd.OSD.stop") + @mock.patch("cephadm.services.osd.OSD.exists") + def test_rm(self, exist, stop, osd_obj): + q = OSDRemovalQueue(mock.Mock()) + q.osds.add(osd_obj) + q.rm(osd_obj) + osd_obj.stop.assert_called_once() diff --git a/src/pybind/mgr/cephadm/tests/test_scheduling.py b/src/pybind/mgr/cephadm/tests/test_scheduling.py new file mode 100644 index 000000000..2454dc0d1 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_scheduling.py @@ -0,0 +1,1591 @@ +# Disable autopep8 for this file: + +# fmt: off + +from typing import NamedTuple, List, Dict, Optional +import pytest + +from ceph.deployment.hostspec import HostSpec +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, IngressSpec +from ceph.deployment.hostspec import SpecValidationError + +from cephadm.module import HostAssignment +from cephadm.schedule import DaemonPlacement +from orchestrator import DaemonDescription, OrchestratorValidationError, OrchestratorError + + +def wrapper(func): + # some odd thingy to revert the order or arguments + def inner(*args): + def inner2(expected): + func(expected, *args) + return inner2 + return inner + + +@wrapper +def none(expected): + assert expected == [] + + +@wrapper +def one_of(expected, *hosts): + if not isinstance(expected, list): + assert False, str(expected) + assert len(expected) == 1, f'one_of failed len({expected}) != 1' + assert expected[0] in hosts + + +@wrapper +def two_of(expected, *hosts): + if not isinstance(expected, list): + assert False, str(expected) + assert len(expected) == 2, f'one_of failed len({expected}) != 2' + matches = 0 + for h in hosts: + matches += int(h in expected) + if matches != 2: + assert False, f'two of {hosts} not in {expected}' + + +@wrapper +def exactly(expected, *hosts): + assert expected == list(hosts) + + +@wrapper +def error(expected, kind, match): + assert isinstance(expected, kind), (str(expected), match) + assert str(expected) == match, (str(expected), match) + + +@wrapper +def _or(expected, *inners): + def catch(inner): + try: + inner(expected) + except AssertionError as e: + return e + result = [catch(i) for i in inners] + if None not in result: + assert False, f"_or failed: {expected}" + + +def _always_true(_): + pass + + +def k(s): + return [e for e in s.split(' ') if e] + + +def get_result(key, results): + def match(one): + for o, k in zip(one, key): + if o != k and o != '*': + return False + return True + return [v for k, v in results if match(k)][0] + + +def mk_spec_and_host(spec_section, hosts, explicit_key, explicit, count): + + if spec_section == 'hosts': + mk_spec = lambda: ServiceSpec('mgr', placement=PlacementSpec( # noqa: E731 + hosts=explicit, + count=count, + )) + elif spec_section == 'label': + mk_spec = lambda: ServiceSpec('mgr', placement=PlacementSpec( # noqa: E731 + label='mylabel', + count=count, + )) + elif spec_section == 'host_pattern': + pattern = { + 'e': 'notfound', + '1': '1', + '12': '[1-2]', + '123': '*', + }[explicit_key] + mk_spec = lambda: ServiceSpec('mgr', placement=PlacementSpec( # noqa: E731 + host_pattern=pattern, + count=count, + )) + else: + assert False + + hosts = [ + HostSpec(h, labels=['mylabel']) if h in explicit else HostSpec(h) + for h in hosts + ] + + return mk_spec, hosts + + +def run_scheduler_test(results, mk_spec, hosts, daemons, key_elems): + key = ' '.join('N' if e is None else str(e) for e in key_elems) + try: + assert_res = get_result(k(key), results) + except IndexError: + try: + spec = mk_spec() + host_res, to_add, to_remove = HostAssignment( + spec=spec, + hosts=hosts, + unreachable_hosts=[], + daemons=daemons, + ).place() + if isinstance(host_res, list): + e = ', '.join(repr(h.hostname) for h in host_res) + assert False, f'`(k("{key}"), exactly({e})),` not found' + assert False, f'`(k("{key}"), ...),` not found' + except OrchestratorError as e: + assert False, f'`(k("{key}"), error({type(e).__name__}, {repr(str(e))})),` not found' + + for _ in range(10): # scheduler has a random component + try: + spec = mk_spec() + host_res, to_add, to_remove = HostAssignment( + spec=spec, + hosts=hosts, + unreachable_hosts=[], + daemons=daemons + ).place() + + assert_res(sorted([h.hostname for h in host_res])) + except Exception as e: + assert_res(e) + + +@pytest.mark.parametrize("dp,n,result", + [ # noqa: E128 + ( + DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80]), + 0, + DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80]), + ), + ( + DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80]), + 2, + DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[82]), + ), + ( + DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80, 90]), + 2, + DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[82, 92]), + ), + ]) +def test_daemon_placement_renumber(dp, n, result): + assert dp.renumber_ports(n) == result + + +@pytest.mark.parametrize( + 'dp,dd,result', + [ + ( + DaemonPlacement(daemon_type='mgr', hostname='host1'), + DaemonDescription('mgr', 'a', 'host1'), + True + ), + ( + DaemonPlacement(daemon_type='mgr', hostname='host1', name='a'), + DaemonDescription('mgr', 'a', 'host1'), + True + ), + ( + DaemonPlacement(daemon_type='mon', hostname='host1', name='a'), + DaemonDescription('mgr', 'a', 'host1'), + False + ), + ( + DaemonPlacement(daemon_type='mgr', hostname='host1', name='a'), + DaemonDescription('mgr', 'b', 'host1'), + False + ), + ]) +def test_daemon_placement_match(dp, dd, result): + assert dp.matches_daemon(dd) == result + + +# * first match from the top wins +# * where e=[], *=any +# +# + list of known hosts available for scheduling (host_key) +# | + hosts used for explict placement (explicit_key) +# | | + count +# | | | + section (host, label, pattern) +# | | | | + expected result +# | | | | | +test_explicit_scheduler_results = [ + (k("* * 0 *"), error(SpecValidationError, 'num/count must be >= 1')), + (k("* e N l"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr>: No matching hosts for label mylabel')), + (k("* e N p"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr>: No matching hosts')), + (k("* e N h"), error(OrchestratorValidationError, 'placement spec is empty: no hosts, no label, no pattern, no count')), + (k("* e * *"), none), + (k("1 12 * h"), error(OrchestratorValidationError, "Cannot place <ServiceSpec for service_name=mgr> on 2: Unknown hosts")), + (k("1 123 * h"), error(OrchestratorValidationError, "Cannot place <ServiceSpec for service_name=mgr> on 2, 3: Unknown hosts")), + (k("1 * * *"), exactly('1')), + (k("12 1 * *"), exactly('1')), + (k("12 12 1 *"), one_of('1', '2')), + (k("12 12 * *"), exactly('1', '2')), + (k("12 123 * h"), error(OrchestratorValidationError, "Cannot place <ServiceSpec for service_name=mgr> on 3: Unknown hosts")), + (k("12 123 1 *"), one_of('1', '2', '3')), + (k("12 123 * *"), two_of('1', '2', '3')), + (k("123 1 * *"), exactly('1')), + (k("123 12 1 *"), one_of('1', '2')), + (k("123 12 * *"), exactly('1', '2')), + (k("123 123 1 *"), one_of('1', '2', '3')), + (k("123 123 2 *"), two_of('1', '2', '3')), + (k("123 123 * *"), exactly('1', '2', '3')), +] + + +@pytest.mark.parametrize("spec_section_key,spec_section", + [ # noqa: E128 + ('h', 'hosts'), + ('l', 'label'), + ('p', 'host_pattern'), + ]) +@pytest.mark.parametrize("count", + [ # noqa: E128 + None, + 0, + 1, + 2, + 3, + ]) +@pytest.mark.parametrize("explicit_key, explicit", + [ # noqa: E128 + ('e', []), + ('1', ['1']), + ('12', ['1', '2']), + ('123', ['1', '2', '3']), + ]) +@pytest.mark.parametrize("host_key, hosts", + [ # noqa: E128 + ('1', ['1']), + ('12', ['1', '2']), + ('123', ['1', '2', '3']), + ]) +def test_explicit_scheduler(host_key, hosts, + explicit_key, explicit, + count, + spec_section_key, spec_section): + + mk_spec, hosts = mk_spec_and_host(spec_section, hosts, explicit_key, explicit, count) + run_scheduler_test( + results=test_explicit_scheduler_results, + mk_spec=mk_spec, + hosts=hosts, + daemons=[], + key_elems=(host_key, explicit_key, count, spec_section_key) + ) + + +# * first match from the top wins +# * where e=[], *=any +# +# + list of known hosts available for scheduling (host_key) +# | + hosts used for explict placement (explicit_key) +# | | + count +# | | | + existing daemons +# | | | | + section (host, label, pattern) +# | | | | | + expected result +# | | | | | | +test_scheduler_daemons_results = [ + (k("* 1 * * *"), exactly('1')), + (k("1 123 * * h"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr> on 2, 3: Unknown hosts')), + (k("1 123 * * *"), exactly('1')), + (k("12 123 * * h"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr> on 3: Unknown hosts')), + (k("12 123 N * *"), exactly('1', '2')), + (k("12 123 1 * *"), one_of('1', '2')), + (k("12 123 2 * *"), exactly('1', '2')), + (k("12 123 3 * *"), exactly('1', '2')), + (k("123 123 N * *"), exactly('1', '2', '3')), + (k("123 123 1 e *"), one_of('1', '2', '3')), + (k("123 123 1 1 *"), exactly('1')), + (k("123 123 1 3 *"), exactly('3')), + (k("123 123 1 12 *"), one_of('1', '2')), + (k("123 123 1 112 *"), one_of('1', '2')), + (k("123 123 1 23 *"), one_of('2', '3')), + (k("123 123 1 123 *"), one_of('1', '2', '3')), + (k("123 123 2 e *"), two_of('1', '2', '3')), + (k("123 123 2 1 *"), _or(exactly('1', '2'), exactly('1', '3'))), + (k("123 123 2 3 *"), _or(exactly('1', '3'), exactly('2', '3'))), + (k("123 123 2 12 *"), exactly('1', '2')), + (k("123 123 2 112 *"), exactly('1', '2')), + (k("123 123 2 23 *"), exactly('2', '3')), + (k("123 123 2 123 *"), two_of('1', '2', '3')), + (k("123 123 3 * *"), exactly('1', '2', '3')), +] + + +@pytest.mark.parametrize("spec_section_key,spec_section", + [ # noqa: E128 + ('h', 'hosts'), + ('l', 'label'), + ('p', 'host_pattern'), + ]) +@pytest.mark.parametrize("daemons_key, daemons", + [ # noqa: E128 + ('e', []), + ('1', ['1']), + ('3', ['3']), + ('12', ['1', '2']), + ('112', ['1', '1', '2']), # deal with existing co-located daemons + ('23', ['2', '3']), + ('123', ['1', '2', '3']), + ]) +@pytest.mark.parametrize("count", + [ # noqa: E128 + None, + 1, + 2, + 3, + ]) +@pytest.mark.parametrize("explicit_key, explicit", + [ # noqa: E128 + ('1', ['1']), + ('123', ['1', '2', '3']), + ]) +@pytest.mark.parametrize("host_key, hosts", + [ # noqa: E128 + ('1', ['1']), + ('12', ['1', '2']), + ('123', ['1', '2', '3']), + ]) +def test_scheduler_daemons(host_key, hosts, + explicit_key, explicit, + count, + daemons_key, daemons, + spec_section_key, spec_section): + mk_spec, hosts = mk_spec_and_host(spec_section, hosts, explicit_key, explicit, count) + dds = [ + DaemonDescription('mgr', d, d) + for d in daemons + ] + run_scheduler_test( + results=test_scheduler_daemons_results, + mk_spec=mk_spec, + hosts=hosts, + daemons=dds, + key_elems=(host_key, explicit_key, count, daemons_key, spec_section_key) + ) + + +# ========================= + + +class NodeAssignmentTest(NamedTuple): + service_type: str + placement: PlacementSpec + hosts: List[str] + daemons: List[DaemonDescription] + rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] + post_rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] + expected: List[str] + expected_add: List[str] + expected_remove: List[DaemonDescription] + + +@pytest.mark.parametrize("service_type,placement,hosts,daemons,rank_map,post_rank_map,expected,expected_add,expected_remove", + [ # noqa: E128 + # just hosts + NodeAssignmentTest( + 'mgr', + PlacementSpec(hosts=['smithi060']), + ['smithi060'], + [], + None, None, + ['mgr:smithi060'], ['mgr:smithi060'], [] + ), + # all_hosts + NodeAssignmentTest( + 'mgr', + PlacementSpec(host_pattern='*'), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + ], + None, None, + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + ['mgr:host3'], + [] + ), + # all_hosts + count_per_host + NodeAssignmentTest( + 'mds', + PlacementSpec(host_pattern='*', count_per_host=2), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mds', 'a', 'host1'), + DaemonDescription('mds', 'b', 'host2'), + ], + None, None, + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'], + ['mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'], + [] + ), + # count that is bigger than the amount of hosts. Truncate to len(hosts) + # mgr should not be co-located to each other. + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=4), + 'host1 host2 host3'.split(), + [], + None, None, + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + [] + ), + # count that is bigger than the amount of hosts; wrap around. + NodeAssignmentTest( + 'mds', + PlacementSpec(count=6), + 'host1 host2 host3'.split(), + [], + None, None, + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'], + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'], + [] + ), + # count + partial host list + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=3, hosts=['host3']), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + ], + None, None, + ['mgr:host3'], + ['mgr:host3'], + ['mgr.a', 'mgr.b'] + ), + # count + partial host list (with colo) + NodeAssignmentTest( + 'mds', + PlacementSpec(count=3, hosts=['host3']), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mds', 'a', 'host1'), + DaemonDescription('mds', 'b', 'host2'), + ], + None, None, + ['mds:host3', 'mds:host3', 'mds:host3'], + ['mds:host3', 'mds:host3', 'mds:host3'], + ['mds.a', 'mds.b'] + ), + # count 1 + partial host list + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=1, hosts=['host3']), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + ], + None, None, + ['mgr:host3'], + ['mgr:host3'], + ['mgr.a', 'mgr.b'] + ), + # count + partial host list + existing + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=2, hosts=['host3']), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + ], + None, None, + ['mgr:host3'], + ['mgr:host3'], + ['mgr.a'] + ), + # count + partial host list + existing (deterministic) + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=2, hosts=['host1']), + 'host1 host2'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + ], + None, None, + ['mgr:host1'], + [], + [] + ), + # count + partial host list + existing (deterministic) + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=2, hosts=['host1']), + 'host1 host2'.split(), + [ + DaemonDescription('mgr', 'a', 'host2'), + ], + None, None, + ['mgr:host1'], + ['mgr:host1'], + ['mgr.a'] + ), + # label only + NodeAssignmentTest( + 'mgr', + PlacementSpec(label='foo'), + 'host1 host2 host3'.split(), + [], + None, None, + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + [] + ), + # label + count (truncate to host list) + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=4, label='foo'), + 'host1 host2 host3'.split(), + [], + None, None, + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + ['mgr:host1', 'mgr:host2', 'mgr:host3'], + [] + ), + # label + count (with colo) + NodeAssignmentTest( + 'mds', + PlacementSpec(count=6, label='foo'), + 'host1 host2 host3'.split(), + [], + None, None, + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'], + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'], + [] + ), + # label only + count_per_hst + NodeAssignmentTest( + 'mds', + PlacementSpec(label='foo', count_per_host=3), + 'host1 host2 host3'.split(), + [], + None, None, + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3', + 'mds:host1', 'mds:host2', 'mds:host3'], + ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3', + 'mds:host1', 'mds:host2', 'mds:host3'], + [] + ), + # host_pattern + NodeAssignmentTest( + 'mgr', + PlacementSpec(host_pattern='mgr*'), + 'mgrhost1 mgrhost2 datahost'.split(), + [], + None, None, + ['mgr:mgrhost1', 'mgr:mgrhost2'], + ['mgr:mgrhost1', 'mgr:mgrhost2'], + [] + ), + # host_pattern + count_per_host + NodeAssignmentTest( + 'mds', + PlacementSpec(host_pattern='mds*', count_per_host=3), + 'mdshost1 mdshost2 datahost'.split(), + [], + None, None, + ['mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2'], + ['mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2'], + [] + ), + # label + count_per_host + ports + NodeAssignmentTest( + 'rgw', + PlacementSpec(count=6, label='foo'), + 'host1 host2 host3'.split(), + [], + None, None, + ['rgw:host1(*:80)', 'rgw:host2(*:80)', 'rgw:host3(*:80)', + 'rgw:host1(*:81)', 'rgw:host2(*:81)', 'rgw:host3(*:81)'], + ['rgw:host1(*:80)', 'rgw:host2(*:80)', 'rgw:host3(*:80)', + 'rgw:host1(*:81)', 'rgw:host2(*:81)', 'rgw:host3(*:81)'], + [] + ), + # label + count_per_host + ports (+ xisting) + NodeAssignmentTest( + 'rgw', + PlacementSpec(count=6, label='foo'), + 'host1 host2 host3'.split(), + [ + DaemonDescription('rgw', 'a', 'host1', ports=[81]), + DaemonDescription('rgw', 'b', 'host2', ports=[80]), + DaemonDescription('rgw', 'c', 'host1', ports=[82]), + ], + None, None, + ['rgw:host1(*:80)', 'rgw:host2(*:80)', 'rgw:host3(*:80)', + 'rgw:host1(*:81)', 'rgw:host2(*:81)', 'rgw:host3(*:81)'], + ['rgw:host1(*:80)', 'rgw:host3(*:80)', + 'rgw:host2(*:81)', 'rgw:host3(*:81)'], + ['rgw.c'] + ), + # cephadm.py teuth case + NodeAssignmentTest( + 'mgr', + PlacementSpec(count=3, hosts=['host1=y', 'host2=x']), + 'host1 host2'.split(), + [ + DaemonDescription('mgr', 'y', 'host1'), + DaemonDescription('mgr', 'x', 'host2'), + ], + None, None, + ['mgr:host1(name=y)', 'mgr:host2(name=x)'], + [], [] + ), + + # note: host -> rank mapping is psuedo-random based on svc name, so these + # host/rank pairs may seem random but they match the nfs.mynfs seed used by + # the test. + + # ranked, fresh + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + [], + {}, + {0: {0: None}, 1: {0: None}, 2: {0: None}}, + ['nfs:host3(rank=0.0)', 'nfs:host2(rank=1.0)', 'nfs:host1(rank=2.0)'], + ['nfs:host3(rank=0.0)', 'nfs:host2(rank=1.0)', 'nfs:host1(rank=2.0)'], + [] + ), + # 21: ranked, exist + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.1', 'host1', rank=0, rank_generation=1), + ], + {0: {1: '0.1'}}, + {0: {1: '0.1'}, 1: {0: None}, 2: {0: None}}, + ['nfs:host1(rank=0.1)', 'nfs:host3(rank=1.0)', 'nfs:host2(rank=2.0)'], + ['nfs:host3(rank=1.0)', 'nfs:host2(rank=2.0)'], + [] + ), + # ranked, exist, different ranks + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.1', 'host1', rank=0, rank_generation=1), + DaemonDescription('nfs', '1.1', 'host2', rank=1, rank_generation=1), + ], + {0: {1: '0.1'}, 1: {1: '1.1'}}, + {0: {1: '0.1'}, 1: {1: '1.1'}, 2: {0: None}}, + ['nfs:host1(rank=0.1)', 'nfs:host2(rank=1.1)', 'nfs:host3(rank=2.0)'], + ['nfs:host3(rank=2.0)'], + [] + ), + # ranked, exist, different ranks (2) + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.1', 'host1', rank=0, rank_generation=1), + DaemonDescription('nfs', '1.1', 'host3', rank=1, rank_generation=1), + ], + {0: {1: '0.1'}, 1: {1: '1.1'}}, + {0: {1: '0.1'}, 1: {1: '1.1'}, 2: {0: None}}, + ['nfs:host1(rank=0.1)', 'nfs:host3(rank=1.1)', 'nfs:host2(rank=2.0)'], + ['nfs:host2(rank=2.0)'], + [] + ), + # ranked, exist, extra ranks + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.5', 'host1', rank=0, rank_generation=5), + DaemonDescription('nfs', '1.5', 'host2', rank=1, rank_generation=5), + DaemonDescription('nfs', '4.5', 'host2', rank=4, rank_generation=5), + ], + {0: {5: '0.5'}, 1: {5: '1.5'}}, + {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {0: None}}, + ['nfs:host1(rank=0.5)', 'nfs:host2(rank=1.5)', 'nfs:host3(rank=2.0)'], + ['nfs:host3(rank=2.0)'], + ['nfs.4.5'] + ), + # 25: ranked, exist, extra ranks (scale down: kill off high rank) + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=2), + 'host3 host2 host1'.split(), + [ + DaemonDescription('nfs', '0.5', 'host1', rank=0, rank_generation=5), + DaemonDescription('nfs', '1.5', 'host2', rank=1, rank_generation=5), + DaemonDescription('nfs', '2.5', 'host3', rank=2, rank_generation=5), + ], + {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}}, + {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}}, + ['nfs:host1(rank=0.5)', 'nfs:host2(rank=1.5)'], + [], + ['nfs.2.5'] + ), + # ranked, exist, extra ranks (scale down hosts) + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=2), + 'host1 host3'.split(), + [ + DaemonDescription('nfs', '0.5', 'host1', rank=0, rank_generation=5), + DaemonDescription('nfs', '1.5', 'host2', rank=1, rank_generation=5), + DaemonDescription('nfs', '2.5', 'host3', rank=4, rank_generation=5), + ], + {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}}, + {0: {5: '0.5'}, 1: {5: '1.5', 6: None}, 2: {5: '2.5'}}, + ['nfs:host1(rank=0.5)', 'nfs:host3(rank=1.6)'], + ['nfs:host3(rank=1.6)'], + ['nfs.2.5', 'nfs.1.5'] + ), + # ranked, exist, duplicate rank + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.0', 'host1', rank=0, rank_generation=0), + DaemonDescription('nfs', '1.1', 'host2', rank=1, rank_generation=1), + DaemonDescription('nfs', '1.2', 'host3', rank=1, rank_generation=2), + ], + {0: {0: '0.0'}, 1: {2: '1.2'}}, + {0: {0: '0.0'}, 1: {2: '1.2'}, 2: {0: None}}, + ['nfs:host1(rank=0.0)', 'nfs:host3(rank=1.2)', 'nfs:host2(rank=2.0)'], + ['nfs:host2(rank=2.0)'], + ['nfs.1.1'] + ), + # 28: ranked, all gens stale (failure during update cycle) + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.2', 'host1', rank=0, rank_generation=2), + DaemonDescription('nfs', '1.2', 'host2', rank=1, rank_generation=2), + ], + {0: {2: '0.2'}, 1: {2: '1.2', 3: '1.3'}}, + {0: {2: '0.2'}, 1: {2: '1.2', 3: '1.3', 4: None}}, + ['nfs:host1(rank=0.2)', 'nfs:host3(rank=1.4)'], + ['nfs:host3(rank=1.4)'], + ['nfs.1.2'] + ), + # ranked, not enough hosts + NodeAssignmentTest( + 'nfs', + PlacementSpec(count=4), + 'host1 host2 host3'.split(), + [ + DaemonDescription('nfs', '0.2', 'host1', rank=0, rank_generation=2), + DaemonDescription('nfs', '1.2', 'host2', rank=1, rank_generation=2), + ], + {0: {2: '0.2'}, 1: {2: '1.2'}}, + {0: {2: '0.2'}, 1: {2: '1.2'}, 2: {0: None}}, + ['nfs:host1(rank=0.2)', 'nfs:host2(rank=1.2)', 'nfs:host3(rank=2.0)'], + ['nfs:host3(rank=2.0)'], + [] + ), + # ranked, scale down + NodeAssignmentTest( + 'nfs', + PlacementSpec(hosts=['host2']), + 'host1 host2'.split(), + [ + DaemonDescription('nfs', '0.2', 'host1', rank=0, rank_generation=2), + DaemonDescription('nfs', '1.2', 'host2', rank=1, rank_generation=2), + DaemonDescription('nfs', '2.2', 'host3', rank=2, rank_generation=2), + ], + {0: {2: '0.2'}, 1: {2: '1.2'}, 2: {2: '2.2'}}, + {0: {2: '0.2', 3: None}, 1: {2: '1.2'}, 2: {2: '2.2'}}, + ['nfs:host2(rank=0.3)'], + ['nfs:host2(rank=0.3)'], + ['nfs.0.2', 'nfs.1.2', 'nfs.2.2'] + ), + + ]) +def test_node_assignment(service_type, placement, hosts, daemons, rank_map, post_rank_map, + expected, expected_add, expected_remove): + spec = None + service_id = None + allow_colo = False + if service_type == 'rgw': + service_id = 'realm.zone' + allow_colo = True + elif service_type == 'mds': + service_id = 'myfs' + allow_colo = True + elif service_type == 'nfs': + service_id = 'mynfs' + spec = ServiceSpec(service_type=service_type, + service_id=service_id, + placement=placement) + + if not spec: + spec = ServiceSpec(service_type=service_type, + service_id=service_id, + placement=placement) + + all_slots, to_add, to_remove = HostAssignment( + spec=spec, + hosts=[HostSpec(h, labels=['foo']) for h in hosts], + unreachable_hosts=[], + daemons=daemons, + allow_colo=allow_colo, + rank_map=rank_map, + ).place() + + assert rank_map == post_rank_map + + got = [str(p) for p in all_slots] + num_wildcard = 0 + for i in expected: + if i == '*': + num_wildcard += 1 + else: + assert i in got + got.remove(i) + assert num_wildcard == len(got) + + got = [str(p) for p in to_add] + num_wildcard = 0 + for i in expected_add: + if i == '*': + num_wildcard += 1 + else: + assert i in got + got.remove(i) + assert num_wildcard == len(got) + + assert sorted([d.name() for d in to_remove]) == sorted(expected_remove) + + +class NodeAssignmentTest5(NamedTuple): + service_type: str + placement: PlacementSpec + available_hosts: List[str] + candidates_hosts: List[str] + + +@pytest.mark.parametrize("service_type, placement, available_hosts, expected_candidates", + [ # noqa: E128 + NodeAssignmentTest5( + 'alertmanager', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host3 host1 host4 host2'.split(), + ), + NodeAssignmentTest5( + 'prometheus', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host3 host2 host4 host1'.split(), + ), + NodeAssignmentTest5( + 'grafana', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host1 host2 host4 host3'.split(), + ), + NodeAssignmentTest5( + 'mgr', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host4 host2 host1 host3'.split(), + ), + NodeAssignmentTest5( + 'mon', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host1 host3 host4 host2'.split(), + ), + NodeAssignmentTest5( + 'rgw', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host1 host3 host2 host4'.split(), + ), + NodeAssignmentTest5( + 'cephfs-mirror', + PlacementSpec(hosts='host1 host2 host3 host4'.split()), + 'host1 host2 host3 host4'.split(), + 'host4 host3 host1 host2'.split(), + ), + ]) +def test_node_assignment_random_shuffle(service_type, placement, available_hosts, expected_candidates): + spec = None + service_id = None + allow_colo = False + spec = ServiceSpec(service_type=service_type, + service_id=service_id, + placement=placement) + + candidates = HostAssignment( + spec=spec, + hosts=[HostSpec(h, labels=['foo']) for h in available_hosts], + unreachable_hosts=[], + daemons=[], + allow_colo=allow_colo, + ).get_candidates() + + candidates_hosts = [h.hostname for h in candidates] + assert candidates_hosts == expected_candidates + + +class NodeAssignmentTest2(NamedTuple): + service_type: str + placement: PlacementSpec + hosts: List[str] + daemons: List[DaemonDescription] + expected_len: int + in_set: List[str] + + +@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected_len,in_set", + [ # noqa: E128 + # just count + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [], + 1, + ['host1', 'host2', 'host3'], + ), + + # hosts + (smaller) count + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=1, hosts='host1 host2'.split()), + 'host1 host2'.split(), + [], + 1, + ['host1', 'host2'], + ), + # hosts + (smaller) count, existing + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=1, hosts='host1 host2 host3'.split()), + 'host1 host2 host3'.split(), + [DaemonDescription('mgr', 'mgr.a', 'host1')], + 1, + ['host1', 'host2', 'host3'], + ), + # hosts + (smaller) count, (more) existing + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=1, hosts='host1 host2 host3'.split()), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + ], + 1, + ['host1', 'host2'] + ), + # count + partial host list + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=2, hosts=['host3']), + 'host1 host2 host3'.split(), + [], + 1, + ['host1', 'host2', 'host3'] + ), + # label + count + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=1, label='foo'), + 'host1 host2 host3'.split(), + [], + 1, + ['host1', 'host2', 'host3'] + ), + ]) +def test_node_assignment2(service_type, placement, hosts, + daemons, expected_len, in_set): + hosts, to_add, to_remove = HostAssignment( + spec=ServiceSpec(service_type, placement=placement), + hosts=[HostSpec(h, labels=['foo']) for h in hosts], + unreachable_hosts=[], + daemons=daemons, + ).place() + assert len(hosts) == expected_len + for h in [h.hostname for h in hosts]: + assert h in in_set + + +@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected_len,must_have", + [ # noqa: E128 + # hosts + (smaller) count, (more) existing + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=3, hosts='host3'.split()), + 'host1 host2 host3'.split(), + [], + 1, + ['host3'] + ), + # count + partial host list + NodeAssignmentTest2( + 'mgr', + PlacementSpec(count=2, hosts=['host3']), + 'host1 host2 host3'.split(), + [], + 1, + ['host3'] + ), + ]) +def test_node_assignment3(service_type, placement, hosts, + daemons, expected_len, must_have): + hosts, to_add, to_remove = HostAssignment( + spec=ServiceSpec(service_type, placement=placement), + hosts=[HostSpec(h) for h in hosts], + unreachable_hosts=[], + daemons=daemons, + ).place() + assert len(hosts) == expected_len + for h in must_have: + assert h in [h.hostname for h in hosts] + + +class NodeAssignmentTest4(NamedTuple): + spec: ServiceSpec + networks: Dict[str, Dict[str, Dict[str, List[str]]]] + daemons: List[DaemonDescription] + expected: List[str] + expected_add: List[str] + expected_remove: List[DaemonDescription] + + +@pytest.mark.parametrize("spec,networks,daemons,expected,expected_add,expected_remove", + [ # noqa: E128 + NodeAssignmentTest4( + ServiceSpec( + service_type='rgw', + service_id='foo', + placement=PlacementSpec(count=6, label='foo'), + networks=['10.0.0.0/8'], + ), + { + 'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}}, + 'host2': {'10.0.0.0/8': {'eth0': ['10.0.0.2']}}, + 'host3': {'192.168.0.0/16': {'eth0': ['192.168.0.1']}}, + }, + [], + ['rgw:host1(10.0.0.1:80)', 'rgw:host2(10.0.0.2:80)', + 'rgw:host1(10.0.0.1:81)', 'rgw:host2(10.0.0.2:81)', + 'rgw:host1(10.0.0.1:82)', 'rgw:host2(10.0.0.2:82)'], + ['rgw:host1(10.0.0.1:80)', 'rgw:host2(10.0.0.2:80)', + 'rgw:host1(10.0.0.1:81)', 'rgw:host2(10.0.0.2:81)', + 'rgw:host1(10.0.0.1:82)', 'rgw:host2(10.0.0.2:82)'], + [] + ), + NodeAssignmentTest4( + IngressSpec( + service_type='ingress', + service_id='rgw.foo', + frontend_port=443, + monitor_port=8888, + virtual_ip='10.0.0.20/8', + backend_service='rgw.foo', + placement=PlacementSpec(label='foo'), + networks=['10.0.0.0/8'], + ), + { + 'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}}, + 'host2': {'10.0.0.0/8': {'eth1': ['10.0.0.2']}}, + 'host3': {'192.168.0.0/16': {'eth2': ['192.168.0.1']}}, + }, + [], + ['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)', + 'keepalived:host1', 'keepalived:host2'], + ['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)', + 'keepalived:host1', 'keepalived:host2'], + [] + ), + NodeAssignmentTest4( + IngressSpec( + service_type='ingress', + service_id='rgw.foo', + frontend_port=443, + monitor_port=8888, + virtual_ip='10.0.0.20/8', + backend_service='rgw.foo', + placement=PlacementSpec(label='foo'), + networks=['10.0.0.0/8'], + ), + { + 'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}}, + 'host2': {'10.0.0.0/8': {'eth1': ['10.0.0.2']}}, + 'host3': {'192.168.0.0/16': {'eth2': ['192.168.0.1']}}, + }, + [ + DaemonDescription('haproxy', 'a', 'host1', ip='10.0.0.1', + ports=[443, 8888]), + DaemonDescription('keepalived', 'b', 'host2'), + DaemonDescription('keepalived', 'c', 'host3'), + ], + ['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)', + 'keepalived:host1', 'keepalived:host2'], + ['haproxy:host2(10.0.0.2:443,8888)', + 'keepalived:host1'], + ['keepalived.c'] + ), + ]) +def test_node_assignment4(spec, networks, daemons, + expected, expected_add, expected_remove): + all_slots, to_add, to_remove = HostAssignment( + spec=spec, + hosts=[HostSpec(h, labels=['foo']) for h in networks.keys()], + unreachable_hosts=[], + daemons=daemons, + allow_colo=True, + networks=networks, + primary_daemon_type='haproxy' if spec.service_type == 'ingress' else spec.service_type, + per_host_daemon_type='keepalived' if spec.service_type == 'ingress' else None, + ).place() + + got = [str(p) for p in all_slots] + num_wildcard = 0 + for i in expected: + if i == '*': + num_wildcard += 1 + else: + assert i in got + got.remove(i) + assert num_wildcard == len(got) + + got = [str(p) for p in to_add] + num_wildcard = 0 + for i in expected_add: + if i == '*': + num_wildcard += 1 + else: + assert i in got + got.remove(i) + assert num_wildcard == len(got) + + assert sorted([d.name() for d in to_remove]) == sorted(expected_remove) + + +@pytest.mark.parametrize("placement", + [ # noqa: E128 + ('1 *'), + ('* label:foo'), + ('* host1 host2'), + ('hostname12hostname12hostname12hostname12hostname12hostname12hostname12'), # > 63 chars + ]) +def test_bad_placements(placement): + try: + PlacementSpec.from_string(placement.split(' ')) + assert False + except SpecValidationError: + pass + + +class NodeAssignmentTestBadSpec(NamedTuple): + service_type: str + placement: PlacementSpec + hosts: List[str] + daemons: List[DaemonDescription] + expected: str + + +@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected", + [ # noqa: E128 + # unknown host + NodeAssignmentTestBadSpec( + 'mgr', + PlacementSpec(hosts=['unknownhost']), + ['knownhost'], + [], + "Cannot place <ServiceSpec for service_name=mgr> on unknownhost: Unknown hosts" + ), + # unknown host pattern + NodeAssignmentTestBadSpec( + 'mgr', + PlacementSpec(host_pattern='unknownhost'), + ['knownhost'], + [], + "Cannot place <ServiceSpec for service_name=mgr>: No matching hosts" + ), + # unknown label + NodeAssignmentTestBadSpec( + 'mgr', + PlacementSpec(label='unknownlabel'), + [], + [], + "Cannot place <ServiceSpec for service_name=mgr>: No matching hosts for label unknownlabel" + ), + ]) +def test_bad_specs(service_type, placement, hosts, daemons, expected): + with pytest.raises(OrchestratorValidationError) as e: + hosts, to_add, to_remove = HostAssignment( + spec=ServiceSpec(service_type, placement=placement), + hosts=[HostSpec(h) for h in hosts], + unreachable_hosts=[], + daemons=daemons, + ).place() + assert str(e.value) == expected + + +class ActiveAssignmentTest(NamedTuple): + service_type: str + placement: PlacementSpec + hosts: List[str] + daemons: List[DaemonDescription] + expected: List[List[str]] + expected_add: List[List[str]] + expected_remove: List[List[str]] + + +@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected,expected_add,expected_remove", + [ + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1', is_active=True), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3'), + ], + [['host1', 'host2'], ['host1', 'host3']], + [[]], + [['mgr.b'], ['mgr.c']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host1', 'host3'], ['host2', 'host3']], + [[]], + [['mgr.a'], ['mgr.b']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2', is_active=True), + DaemonDescription('mgr', 'c', 'host3'), + ], + [['host2']], + [[]], + [['mgr.a', 'mgr.c']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host3']], + [[]], + [['mgr.a', 'mgr.b']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1', is_active=True), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host1'], ['host3']], + [[]], + [['mgr.a', 'mgr.b'], ['mgr.b', 'mgr.c']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2', is_active=True), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host2', 'host3']], + [[]], + [['mgr.a']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1', is_active=True), + DaemonDescription('mgr', 'b', 'host2', is_active=True), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host1'], ['host2'], ['host3']], + [[]], + [['mgr.a', 'mgr.b'], ['mgr.b', 'mgr.c'], ['mgr.a', 'mgr.c']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1', is_active=True), + DaemonDescription('mgr', 'a2', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3'), + ], + [['host1']], + [[]], + [['mgr.a2', 'mgr.b', 'mgr.c']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1', is_active=True), + DaemonDescription('mgr', 'a2', 'host1', is_active=True), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3'), + ], + [['host1']], + [[]], + [['mgr.a', 'mgr.b', 'mgr.c'], ['mgr.a2', 'mgr.b', 'mgr.c']] + ), + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1', is_active=True), + DaemonDescription('mgr', 'a2', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host1', 'host3']], + [[]], + [['mgr.a2', 'mgr.b']] + ), + # Explicit placement should override preference for active daemon + ActiveAssignmentTest( + 'mgr', + PlacementSpec(count=1, hosts=['host1']), + 'host1 host2 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [['host1']], + [[]], + [['mgr.b', 'mgr.c']] + ), + + ]) +def test_active_assignment(service_type, placement, hosts, daemons, expected, expected_add, expected_remove): + + spec = ServiceSpec(service_type=service_type, + service_id=None, + placement=placement) + + hosts, to_add, to_remove = HostAssignment( + spec=spec, + hosts=[HostSpec(h) for h in hosts], + unreachable_hosts=[], + daemons=daemons, + ).place() + assert sorted([h.hostname for h in hosts]) in expected + assert sorted([h.hostname for h in to_add]) in expected_add + assert sorted([h.name() for h in to_remove]) in expected_remove + + +class UnreachableHostsTest(NamedTuple): + service_type: str + placement: PlacementSpec + hosts: List[str] + unreachables_hosts: List[str] + daemons: List[DaemonDescription] + expected_add: List[List[str]] + expected_remove: List[List[str]] + + +@pytest.mark.parametrize("service_type,placement,hosts,unreachable_hosts,daemons,expected_add,expected_remove", + [ + UnreachableHostsTest( + 'mgr', + PlacementSpec(count=3), + 'host1 host2 host3'.split(), + ['host2'], + [], + [['host1', 'host3']], + [[]], + ), + UnreachableHostsTest( + 'mgr', + PlacementSpec(hosts=['host3']), + 'host1 host2 host3'.split(), + ['host1'], + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [[]], + [['mgr.b']], + ), + UnreachableHostsTest( + 'mgr', + PlacementSpec(count=3), + 'host1 host2 host3 host4'.split(), + ['host1'], + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [[]], + [[]], + ), + UnreachableHostsTest( + 'mgr', + PlacementSpec(count=1), + 'host1 host2 host3 host4'.split(), + 'host1 host3'.split(), + [ + DaemonDescription('mgr', 'a', 'host1'), + DaemonDescription('mgr', 'b', 'host2'), + DaemonDescription('mgr', 'c', 'host3', is_active=True), + ], + [[]], + [['mgr.b']], + ), + UnreachableHostsTest( + 'mgr', + PlacementSpec(count=3), + 'host1 host2 host3 host4'.split(), + ['host2'], + [], + [['host1', 'host3', 'host4']], + [[]], + ), + UnreachableHostsTest( + 'mgr', + PlacementSpec(count=3), + 'host1 host2 host3 host4'.split(), + 'host1 host4'.split(), + [], + [['host2', 'host3']], + [[]], + ), + + ]) +def test_unreachable_host(service_type, placement, hosts, unreachable_hosts, daemons, expected_add, expected_remove): + + spec = ServiceSpec(service_type=service_type, + service_id=None, + placement=placement) + + hosts, to_add, to_remove = HostAssignment( + spec=spec, + hosts=[HostSpec(h) for h in hosts], + unreachable_hosts=[HostSpec(h) for h in unreachable_hosts], + daemons=daemons, + ).place() + assert sorted([h.hostname for h in to_add]) in expected_add + assert sorted([h.name() for h in to_remove]) in expected_remove + + +class RescheduleFromOfflineTest(NamedTuple): + service_type: str + placement: PlacementSpec + hosts: List[str] + maintenance_hosts: List[str] + offline_hosts: List[str] + daemons: List[DaemonDescription] + expected_add: List[List[str]] + expected_remove: List[List[str]] + + +@pytest.mark.parametrize("service_type,placement,hosts,maintenance_hosts,offline_hosts,daemons,expected_add,expected_remove", + [ + RescheduleFromOfflineTest( + 'nfs', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [], + ['host2'], + [ + DaemonDescription('nfs', 'a', 'host1'), + DaemonDescription('nfs', 'b', 'host2'), + ], + [['host3']], + [[]], + ), + RescheduleFromOfflineTest( + 'nfs', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + ['host2'], + [], + [ + DaemonDescription('nfs', 'a', 'host1'), + DaemonDescription('nfs', 'b', 'host2'), + ], + [[]], + [[]], + ), + RescheduleFromOfflineTest( + 'mon', + PlacementSpec(count=2), + 'host1 host2 host3'.split(), + [], + ['host2'], + [ + DaemonDescription('mon', 'a', 'host1'), + DaemonDescription('mon', 'b', 'host2'), + ], + [[]], + [[]], + ), + ]) +def test_remove_from_offline(service_type, placement, hosts, maintenance_hosts, offline_hosts, daemons, expected_add, expected_remove): + + spec = ServiceSpec(service_type=service_type, + service_id='test', + placement=placement) + + host_specs = [HostSpec(h) for h in hosts] + for h in host_specs: + if h.hostname in offline_hosts: + h.status = 'offline' + if h.hostname in maintenance_hosts: + h.status = 'maintenance' + + hosts, to_add, to_remove = HostAssignment( + spec=spec, + hosts=host_specs, + unreachable_hosts=[h for h in host_specs if h.status], + daemons=daemons, + ).place() + assert sorted([h.hostname for h in to_add]) in expected_add + assert sorted([h.name() for h in to_remove]) in expected_remove diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py new file mode 100644 index 000000000..57cd12456 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_services.py @@ -0,0 +1,1031 @@ +from textwrap import dedent +import json +import yaml + +import pytest + +from unittest.mock import MagicMock, call, patch, ANY + +from cephadm.serve import CephadmServe +from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ + RbdMirrorService, CrashService, CephadmDaemonDeploySpec +from cephadm.services.iscsi import IscsiService +from cephadm.services.nfs import NFSService +from cephadm.services.osd import OSDService +from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ + NodeExporterService +from cephadm.services.exporter import CephadmExporter +from cephadm.module import CephadmOrchestrator +from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec, \ + ServiceSpec, RGWSpec, GrafanaSpec, SNMPGatewaySpec, IngressSpec, PlacementSpec +from cephadm.tests.fixtures import with_host, with_service, _run_cephadm + +from orchestrator import OrchestratorError +from orchestrator._interface import DaemonDescription + + +class FakeInventory: + def get_addr(self, name: str) -> str: + return '1.2.3.4' + + +class FakeMgr: + def __init__(self): + self.config = '' + self.check_mon_command = MagicMock(side_effect=self._check_mon_command) + self.mon_command = MagicMock(side_effect=self._check_mon_command) + self.template = MagicMock() + self.log = MagicMock() + self.inventory = FakeInventory() + + def _check_mon_command(self, cmd_dict, inbuf=None): + prefix = cmd_dict.get('prefix') + if prefix == 'get-cmd': + return 0, self.config, '' + if prefix == 'set-cmd': + self.config = cmd_dict.get('value') + return 0, 'value set', '' + return -1, '', 'error' + + def get_minimal_ceph_conf(self) -> str: + return '' + + def get_mgr_ip(self) -> str: + return '1.2.3.4' + + +class TestCephadmService: + def test_set_service_url_on_dashboard(self): + # pylint: disable=protected-access + mgr = FakeMgr() + service_url = 'http://svc:1000' + service = GrafanaService(mgr) + service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url) + assert mgr.config == service_url + + # set-cmd should not be called if value doesn't change + mgr.check_mon_command.reset_mock() + service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url) + mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'}) + + def _get_services(self, mgr): + # services: + osd_service = OSDService(mgr) + nfs_service = NFSService(mgr) + mon_service = MonService(mgr) + mgr_service = MgrService(mgr) + mds_service = MdsService(mgr) + rgw_service = RgwService(mgr) + rbd_mirror_service = RbdMirrorService(mgr) + grafana_service = GrafanaService(mgr) + alertmanager_service = AlertmanagerService(mgr) + prometheus_service = PrometheusService(mgr) + node_exporter_service = NodeExporterService(mgr) + crash_service = CrashService(mgr) + iscsi_service = IscsiService(mgr) + cephadm_exporter_service = CephadmExporter(mgr) + cephadm_services = { + 'mon': mon_service, + 'mgr': mgr_service, + 'osd': osd_service, + 'mds': mds_service, + 'rgw': rgw_service, + 'rbd-mirror': rbd_mirror_service, + 'nfs': nfs_service, + 'grafana': grafana_service, + 'alertmanager': alertmanager_service, + 'prometheus': prometheus_service, + 'node-exporter': node_exporter_service, + 'crash': crash_service, + 'iscsi': iscsi_service, + 'cephadm-exporter': cephadm_exporter_service, + } + return cephadm_services + + def test_get_auth_entity(self): + mgr = FakeMgr() + cephadm_services = self._get_services(mgr) + + for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]: + assert "client.%s.id1" % (daemon_type) == \ + cephadm_services[daemon_type].get_auth_entity("id1", "host") + assert "client.%s.id1" % (daemon_type) == \ + cephadm_services[daemon_type].get_auth_entity("id1", "") + assert "client.%s.id1" % (daemon_type) == \ + cephadm_services[daemon_type].get_auth_entity("id1") + + assert "client.crash.host" == \ + cephadm_services["crash"].get_auth_entity("id1", "host") + with pytest.raises(OrchestratorError): + cephadm_services["crash"].get_auth_entity("id1", "") + cephadm_services["crash"].get_auth_entity("id1") + + assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "host") + assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "") + assert "mon." == cephadm_services["mon"].get_auth_entity("id1") + + assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "host") + assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "") + assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1") + + for daemon_type in ["osd", "mds"]: + assert "%s.id1" % daemon_type == \ + cephadm_services[daemon_type].get_auth_entity("id1", "host") + assert "%s.id1" % daemon_type == \ + cephadm_services[daemon_type].get_auth_entity("id1", "") + assert "%s.id1" % daemon_type == \ + cephadm_services[daemon_type].get_auth_entity("id1") + + # services based on CephadmService shouldn't have get_auth_entity + with pytest.raises(AttributeError): + for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter', 'cephadm-exporter']: + cephadm_services[daemon_type].get_auth_entity("id1", "host") + cephadm_services[daemon_type].get_auth_entity("id1", "") + cephadm_services[daemon_type].get_auth_entity("id1") + + +class TestISCSIService: + + mgr = FakeMgr() + iscsi_service = IscsiService(mgr) + + iscsi_spec = IscsiServiceSpec(service_type='iscsi', service_id="a") + iscsi_spec.daemon_type = "iscsi" + iscsi_spec.daemon_id = "a" + iscsi_spec.spec = MagicMock() + iscsi_spec.spec.daemon_type = "iscsi" + iscsi_spec.spec.ssl_cert = '' + iscsi_spec.api_user = "user" + iscsi_spec.api_password = "password" + iscsi_spec.api_port = 5000 + iscsi_spec.api_secure = False + iscsi_spec.ssl_cert = "cert" + iscsi_spec.ssl_key = "key" + + mgr.spec_store = MagicMock() + mgr.spec_store.all_specs.get.return_value = iscsi_spec + + def test_iscsi_client_caps(self): + + iscsi_daemon_spec = CephadmDaemonDeploySpec( + host='host', daemon_id='a', service_name=self.iscsi_spec.service_name()) + + self.iscsi_service.prepare_create(iscsi_daemon_spec) + + expected_caps = ['mon', + 'profile rbd, allow command "osd blocklist", allow command "config-key get" with "key" prefix "iscsi/"', + 'mgr', 'allow command "service status"', + 'osd', 'allow rwx'] + + expected_call = call({'prefix': 'auth get-or-create', + 'entity': 'client.iscsi.a', + 'caps': expected_caps}) + expected_call2 = call({'prefix': 'auth caps', + 'entity': 'client.iscsi.a', + 'caps': expected_caps}) + + assert expected_call in self.mgr.mon_command.mock_calls + assert expected_call2 in self.mgr.mon_command.mock_calls + + @patch('cephadm.utils.resolve_ip') + def test_iscsi_dashboard_config(self, mock_resolve_ip): + + self.mgr.check_mon_command = MagicMock() + self.mgr.check_mon_command.return_value = ('', '{"gateways": {}}', '') + + # Case 1: use IPV4 address + id1 = DaemonDescription(daemon_type='iscsi', hostname="testhost1", + daemon_id="a", ip='192.168.1.1') + daemon_list = [id1] + mock_resolve_ip.return_value = '192.168.1.1' + + self.iscsi_service.config_dashboard(daemon_list) + + dashboard_expected_call = call({'prefix': 'dashboard iscsi-gateway-add', + 'name': 'testhost1'}, + 'http://user:password@192.168.1.1:5000') + + assert dashboard_expected_call in self.mgr.check_mon_command.mock_calls + + # Case 2: use IPV6 address + self.mgr.check_mon_command.reset_mock() + + id1 = DaemonDescription(daemon_type='iscsi', hostname="testhost1", + daemon_id="a", ip='FEDC:BA98:7654:3210:FEDC:BA98:7654:3210') + mock_resolve_ip.return_value = 'FEDC:BA98:7654:3210:FEDC:BA98:7654:3210' + + self.iscsi_service.config_dashboard(daemon_list) + + dashboard_expected_call = call({'prefix': 'dashboard iscsi-gateway-add', + 'name': 'testhost1'}, + 'http://user:password@[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:5000') + + assert dashboard_expected_call in self.mgr.check_mon_command.mock_calls + + # Case 3: IPV6 Address . Secure protocol + self.mgr.check_mon_command.reset_mock() + + self.iscsi_spec.api_secure = True + + self.iscsi_service.config_dashboard(daemon_list) + + dashboard_expected_call = call({'prefix': 'dashboard iscsi-gateway-add', + 'name': 'testhost1'}, + 'https://user:password@[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:5000') + + assert dashboard_expected_call in self.mgr.check_mon_command.mock_calls + + +class TestMonitoring: + def _get_config(self, url: str) -> str: + return f""" + # This file is generated by cephadm. + # See https://prometheus.io/docs/alerting/configuration/ for documentation. + + global: + resolve_timeout: 5m + http_config: + tls_config: + insecure_skip_verify: true + + route: + receiver: 'default' + routes: + - group_by: ['alertname'] + group_wait: 10s + group_interval: 10s + repeat_interval: 1h + receiver: 'ceph-dashboard' + + receivers: + - name: 'default' + webhook_configs: + - name: 'ceph-dashboard' + webhook_configs: + - url: '{url}/api/prometheus_receiver' + """ + + @patch("cephadm.serve.CephadmServe._run_cephadm") + @patch("mgr_module.MgrModule.get") + def test_alertmanager_config(self, mock_get, _run_cephadm, + cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + mock_get.return_value = {"services": {"dashboard": "http://[::1]:8080"}} + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, AlertManagerSpec()): + y = dedent(self._get_config('http://localhost:8080')).lstrip() + _run_cephadm.assert_called_with( + 'test', + 'alertmanager.test', + 'deploy', + [ + '--name', 'alertmanager.test', + '--meta-json', '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', '--tcp-ports', '9093 9094' + ], + stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm") + @patch("mgr_module.MgrModule.get") + def test_alertmanager_config_v6(self, mock_get, _run_cephadm, + cephadm_module: CephadmOrchestrator): + dashboard_url = "http://[2001:db8:4321:0000:0000:0000:0000:0000]:8080" + _run_cephadm.return_value = ('{}', '', 0) + mock_get.return_value = {"services": {"dashboard": dashboard_url}} + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, AlertManagerSpec()): + y = dedent(self._get_config(dashboard_url)).lstrip() + _run_cephadm.assert_called_with( + 'test', + 'alertmanager.test', + 'deploy', + [ + '--name', 'alertmanager.test', + '--meta-json', + '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', '--tcp-ports', '9093 9094' + ], + stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm") + @patch("mgr_module.MgrModule.get") + @patch("socket.getfqdn") + def test_alertmanager_config_v6_fqdn(self, mock_getfqdn, mock_get, _run_cephadm, + cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + mock_getfqdn.return_value = "mgr.test.fqdn" + mock_get.return_value = {"services": { + "dashboard": "http://[2001:db8:4321:0000:0000:0000:0000:0000]:8080"}} + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, AlertManagerSpec()): + y = dedent(self._get_config("http://mgr.test.fqdn:8080")).lstrip() + _run_cephadm.assert_called_with( + 'test', + 'alertmanager.test', + 'deploy', + [ + '--name', 'alertmanager.test', + '--meta-json', + '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', '--tcp-ports', '9093 9094' + ], + stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm") + @patch("mgr_module.MgrModule.get") + def test_alertmanager_config_v4(self, mock_get, _run_cephadm, cephadm_module: CephadmOrchestrator): + dashboard_url = "http://192.168.0.123:8080" + _run_cephadm.return_value = ('{}', '', 0) + mock_get.return_value = {"services": {"dashboard": dashboard_url}} + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, AlertManagerSpec()): + y = dedent(self._get_config(dashboard_url)).lstrip() + _run_cephadm.assert_called_with( + 'test', + 'alertmanager.test', + 'deploy', + [ + '--name', 'alertmanager.test', + '--meta-json', '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', '--tcp-ports', '9093 9094' + ], + stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm") + @patch("mgr_module.MgrModule.get") + @patch("socket.getfqdn") + def test_alertmanager_config_v4_fqdn(self, mock_getfqdn, mock_get, _run_cephadm, + cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + mock_getfqdn.return_value = "mgr.test.fqdn" + mock_get.return_value = {"services": {"dashboard": "http://192.168.0.123:8080"}} + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, AlertManagerSpec()): + y = dedent(self._get_config("http://mgr.test.fqdn:8080")).lstrip() + _run_cephadm.assert_called_with( + 'test', + 'alertmanager.test', + 'deploy', + [ + '--name', 'alertmanager.test', + '--meta-json', + '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', '--tcp-ports', '9093 9094' + ], + stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_prometheus_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \ + with_service(cephadm_module, MonitoringSpec('prometheus')) as _: + + y = dedent(""" + # This file is generated by cephadm. + global: + scrape_interval: 10s + evaluation_interval: 10s + rule_files: + - /etc/prometheus/alerting/* + scrape_configs: + - job_name: 'ceph' + honor_labels: true + static_configs: + - targets: + - '[::1]:9283' + + - job_name: 'node' + static_configs: + - targets: ['[1::4]:9100'] + labels: + instance: 'test' + + """).lstrip() + + _run_cephadm.assert_called_with( + 'test', + 'prometheus.test', + 'deploy', + [ + '--name', 'prometheus.test', + '--meta-json', + '{"service_name": "prometheus", "ports": [9095], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--tcp-ports', '9095' + ], + stdin=json.dumps({"files": {"prometheus.yml": y, + "/etc/prometheus/alerting/custom_alerts.yml": ""}, + 'retention_time': '15d'}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm") + @patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4') + @patch("cephadm.services.monitoring.verify_tls", lambda *_: None) + def test_grafana_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + with with_host(cephadm_module, 'test'): + cephadm_module.set_store('test/grafana_crt', 'c') + cephadm_module.set_store('test/grafana_key', 'k') + with with_service(cephadm_module, MonitoringSpec('prometheus')) as _, \ + with_service(cephadm_module, GrafanaSpec('grafana')) as _: + files = { + 'grafana.ini': dedent(""" + # This file is generated by cephadm. + [users] + default_theme = light + [auth.anonymous] + enabled = true + org_name = 'Main Org.' + org_role = 'Viewer' + [server] + domain = 'bootstrap.storage.lab' + protocol = https + cert_file = /etc/grafana/certs/cert_file + cert_key = /etc/grafana/certs/cert_key + http_port = 3000 + http_addr = + [security] + disable_initial_admin_creation = true + cookie_secure = true + cookie_samesite = none + allow_embedding = true""").lstrip(), # noqa: W291 + 'provisioning/datasources/ceph-dashboard.yml': dedent(""" + # This file is generated by cephadm. + deleteDatasources: + - name: 'Dashboard1' + orgId: 1 + + datasources: + - name: 'Dashboard1' + type: 'prometheus' + access: 'proxy' + orgId: 1 + url: 'http://[1::4]:9095' + basicAuth: false + isDefault: true + editable: false + """).lstrip(), + 'certs/cert_file': dedent(""" + # generated by cephadm + c""").lstrip(), + 'certs/cert_key': dedent(""" + # generated by cephadm + k""").lstrip(), + } + + _run_cephadm.assert_called_with( + 'test', + 'grafana.test', + 'deploy', + [ + '--name', 'grafana.test', + '--meta-json', + '{"service_name": "grafana", "ports": [3000], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', '--tcp-ports', '3000'], + stdin=json.dumps({"files": files}), + image='') + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_grafana_initial_admin_pw(self, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, GrafanaSpec(initial_admin_password='secure')): + out = cephadm_module.cephadm_services['grafana'].generate_config( + CephadmDaemonDeploySpec('test', 'daemon', 'grafana')) + assert out == ( + { + 'files': + { + 'certs/cert_file': ANY, + 'certs/cert_key': ANY, + 'grafana.ini': + '# This file is generated by cephadm.\n' + '[users]\n' + ' default_theme = light\n' + '[auth.anonymous]\n' + ' enabled = true\n' + " org_name = 'Main Org.'\n" + " org_role = 'Viewer'\n" + '[server]\n' + " domain = 'bootstrap.storage.lab'\n" + ' protocol = https\n' + ' cert_file = /etc/grafana/certs/cert_file\n' + ' cert_key = /etc/grafana/certs/cert_key\n' + ' http_port = 3000\n' + ' http_addr = \n' + '[security]\n' + ' admin_user = admin\n' + ' admin_password = secure\n' + ' cookie_secure = true\n' + ' cookie_samesite = none\n' + ' allow_embedding = true', + 'provisioning/datasources/ceph-dashboard.yml': + '# This file is generated by cephadm.\n' + 'deleteDatasources:\n' + '\n' + 'datasources:\n' + } + }, + [], + ) + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_monitoring_ports(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + with with_host(cephadm_module, 'test'): + + yaml_str = """service_type: alertmanager +service_name: alertmanager +placement: + count: 1 +spec: + port: 4200 +""" + yaml_file = yaml.safe_load(yaml_str) + spec = ServiceSpec.from_json(yaml_file) + + with patch("cephadm.services.monitoring.AlertmanagerService.generate_config", return_value=({}, [])): + with with_service(cephadm_module, spec): + + CephadmServe(cephadm_module)._check_daemons() + + _run_cephadm.assert_called_with( + 'test', 'alertmanager.test', 'deploy', [ + '--name', 'alertmanager.test', + '--meta-json', '{"service_name": "alertmanager", "ports": [4200, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--tcp-ports', '4200 9094', + '--reconfig' + ], + stdin='{}', + image='') + + +class TestRGWService: + + @pytest.mark.parametrize( + "frontend, ssl, expected", + [ + ('beast', False, 'beast endpoint=[fd00:fd00:fd00:3000::1]:80'), + ('beast', True, + 'beast ssl_endpoint=[fd00:fd00:fd00:3000::1]:443 ssl_certificate=config://rgw/cert/rgw.foo'), + ('civetweb', False, 'civetweb port=[fd00:fd00:fd00:3000::1]:80'), + ('civetweb', True, + 'civetweb port=[fd00:fd00:fd00:3000::1]:443s ssl_certificate=config://rgw/cert/rgw.foo'), + ] + ) + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_rgw_update(self, frontend, ssl, expected, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + cephadm_module.cache.update_host_devices_networks( + 'host1', + dls=cephadm_module.cache.devices['host1'], + nets={ + 'fd00:fd00:fd00:3000::/64': { + 'if0': ['fd00:fd00:fd00:3000::1'] + } + }) + s = RGWSpec(service_id="foo", + networks=['fd00:fd00:fd00:3000::/64'], + ssl=ssl, + rgw_frontend_type=frontend) + with with_service(cephadm_module, s) as dds: + _, f, _ = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': f'client.{dds[0]}', + 'key': 'rgw_frontends', + }) + assert f == expected + + +class TestSNMPGateway: + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_snmp_v2c_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + spec = SNMPGatewaySpec( + snmp_version='V2c', + snmp_destination='192.168.1.1:162', + credentials={ + 'snmp_community': 'public' + }) + + config = { + "destination": spec.snmp_destination, + "snmp_version": spec.snmp_version, + "snmp_community": spec.credentials.get('snmp_community') + } + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec): + _run_cephadm.assert_called_with( + 'test', + 'snmp-gateway.test', + 'deploy', + [ + '--name', 'snmp-gateway.test', + '--meta-json', + '{"service_name": "snmp-gateway", "ports": [9464], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--tcp-ports', '9464' + ], + stdin=json.dumps(config), + image='' + ) + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_snmp_v2c_with_port(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + spec = SNMPGatewaySpec( + snmp_version='V2c', + snmp_destination='192.168.1.1:162', + credentials={ + 'snmp_community': 'public' + }, + port=9465) + + config = { + "destination": spec.snmp_destination, + "snmp_version": spec.snmp_version, + "snmp_community": spec.credentials.get('snmp_community') + } + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec): + _run_cephadm.assert_called_with( + 'test', + 'snmp-gateway.test', + 'deploy', + [ + '--name', 'snmp-gateway.test', + '--meta-json', + '{"service_name": "snmp-gateway", "ports": [9465], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--tcp-ports', '9465' + ], + stdin=json.dumps(config), + image='' + ) + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_snmp_v3nopriv_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + spec = SNMPGatewaySpec( + snmp_version='V3', + snmp_destination='192.168.1.1:162', + engine_id='8000C53F00000000', + credentials={ + 'snmp_v3_auth_username': 'myuser', + 'snmp_v3_auth_password': 'mypassword' + }) + + config = { + 'destination': spec.snmp_destination, + 'snmp_version': spec.snmp_version, + 'snmp_v3_auth_protocol': 'SHA', + 'snmp_v3_auth_username': 'myuser', + 'snmp_v3_auth_password': 'mypassword', + 'snmp_v3_engine_id': '8000C53F00000000' + } + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec): + _run_cephadm.assert_called_with( + 'test', + 'snmp-gateway.test', + 'deploy', + [ + '--name', 'snmp-gateway.test', + '--meta-json', + '{"service_name": "snmp-gateway", "ports": [9464], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--tcp-ports', '9464' + ], + stdin=json.dumps(config), + image='' + ) + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_snmp_v3priv_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + spec = SNMPGatewaySpec( + snmp_version='V3', + snmp_destination='192.168.1.1:162', + engine_id='8000C53F00000000', + auth_protocol='MD5', + privacy_protocol='AES', + credentials={ + 'snmp_v3_auth_username': 'myuser', + 'snmp_v3_auth_password': 'mypassword', + 'snmp_v3_priv_password': 'mysecret', + }) + + config = { + 'destination': spec.snmp_destination, + 'snmp_version': spec.snmp_version, + 'snmp_v3_auth_protocol': 'MD5', + 'snmp_v3_auth_username': spec.credentials.get('snmp_v3_auth_username'), + 'snmp_v3_auth_password': spec.credentials.get('snmp_v3_auth_password'), + 'snmp_v3_engine_id': '8000C53F00000000', + 'snmp_v3_priv_protocol': spec.privacy_protocol, + 'snmp_v3_priv_password': spec.credentials.get('snmp_v3_priv_password'), + } + + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec): + _run_cephadm.assert_called_with( + 'test', + 'snmp-gateway.test', + 'deploy', + [ + '--name', 'snmp-gateway.test', + '--meta-json', + '{"service_name": "snmp-gateway", "ports": [9464], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}', + '--config-json', '-', + '--tcp-ports', '9464' + ], + stdin=json.dumps(config), + image='' + ) + + +class TestIngressService: + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_ingress_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + with with_host(cephadm_module, 'test'): + cephadm_module.cache.update_host_devices_networks( + 'test', + cephadm_module.cache.devices['test'], + { + '1.2.3.0/24': { + 'if0': ['1.2.3.4/32'] + } + } + ) + + # the ingress backend + s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1), + rgw_frontend_type='beast') + + ispec = IngressSpec(service_type='ingress', + service_id='test', + backend_service='rgw.foo', + frontend_port=8089, + monitor_port=8999, + monitor_user='admin', + monitor_password='12345', + keepalived_password='12345', + virtual_interface_networks=['1.2.3.0/24'], + virtual_ip="1.2.3.4/32") + with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: + # generate the keepalived conf based on the specified spec + keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) + + keepalived_expected_conf = { + 'files': + { + 'keepalived.conf': + '# This file is generated by cephadm.\n' + 'vrrp_script check_backend {\n ' + 'script "/usr/bin/curl http://localhost:8999/health"\n ' + 'weight -20\n ' + 'interval 2\n ' + 'rise 2\n ' + 'fall 2\n}\n\n' + 'vrrp_instance VI_0 {\n ' + 'state MASTER\n ' + 'priority 100\n ' + 'interface if0\n ' + 'virtual_router_id 50\n ' + 'advert_int 1\n ' + 'authentication {\n ' + 'auth_type PASS\n ' + 'auth_pass 12345\n ' + '}\n ' + 'unicast_src_ip 1::4\n ' + 'unicast_peer {\n ' + '}\n ' + 'virtual_ipaddress {\n ' + '1.2.3.4/32 dev if0\n ' + '}\n ' + 'track_script {\n ' + 'check_backend\n }\n' + '}\n' + } + } + + # check keepalived config + assert keepalived_generated_conf[0] == keepalived_expected_conf + + # generate the haproxy conf based on the specified spec + haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) + + haproxy_expected_conf = { + 'files': + { + 'haproxy.cfg': + '# This file is generated by cephadm.' + '\nglobal\n log ' + '127.0.0.1 local2\n ' + 'chroot /var/lib/haproxy\n ' + 'pidfile /var/lib/haproxy/haproxy.pid\n ' + 'maxconn 8000\n ' + 'daemon\n ' + 'stats socket /var/lib/haproxy/stats\n' + '\ndefaults\n ' + 'mode http\n ' + 'log global\n ' + 'option httplog\n ' + 'option dontlognull\n ' + 'option http-server-close\n ' + 'option forwardfor except 127.0.0.0/8\n ' + 'option redispatch\n ' + 'retries 3\n ' + 'timeout queue 20s\n ' + 'timeout connect 5s\n ' + 'timeout http-request 1s\n ' + 'timeout http-keep-alive 5s\n ' + 'timeout client 1s\n ' + 'timeout server 1s\n ' + 'timeout check 5s\n ' + 'maxconn 8000\n' + '\nfrontend stats\n ' + 'mode http\n ' + 'bind 1.2.3.4:8999\n ' + 'bind localhost:8999\n ' + 'stats enable\n ' + 'stats uri /stats\n ' + 'stats refresh 10s\n ' + 'stats auth admin:12345\n ' + 'http-request use-service prometheus-exporter if { path /metrics }\n ' + 'monitor-uri /health\n' + '\nfrontend frontend\n ' + 'bind 1.2.3.4:8089\n ' + 'default_backend backend\n\n' + 'backend backend\n ' + 'option forwardfor\n ' + 'balance static-rr\n ' + 'option httpchk HEAD / HTTP/1.0\n ' + 'server ' + haproxy_generated_conf[1][0] + ' 1::4:80 check weight 100\n' + } + } + + assert haproxy_generated_conf[0] == haproxy_expected_conf + + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_ingress_config_multi_vips(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + + with with_host(cephadm_module, 'test'): + cephadm_module.cache.update_host_devices_networks('test', [], { + '1.2.3.0/24': { + 'if0': ['1.2.3.4/32'] + } + }) + + # Check the ingress with multiple VIPs + s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1), + rgw_frontend_type='beast') + + ispec = IngressSpec(service_type='ingress', + service_id='test', + backend_service='rgw.foo', + frontend_port=8089, + monitor_port=8999, + monitor_user='admin', + monitor_password='12345', + keepalived_password='12345', + virtual_interface_networks=['1.2.3.0/24'], + virtual_ips_list=["1.2.3.4/32"]) + with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: + # generate the keepalived conf based on the specified spec + # Test with only 1 IP on the list, as it will fail with more VIPS but only one host. + keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) + + keepalived_expected_conf = { + 'files': + { + 'keepalived.conf': + '# This file is generated by cephadm.\n' + 'vrrp_script check_backend {\n ' + 'script "/usr/bin/curl http://localhost:8999/health"\n ' + 'weight -20\n ' + 'interval 2\n ' + 'rise 2\n ' + 'fall 2\n}\n\n' + 'vrrp_instance VI_0 {\n ' + 'state MASTER\n ' + 'priority 100\n ' + 'interface if0\n ' + 'virtual_router_id 50\n ' + 'advert_int 1\n ' + 'authentication {\n ' + 'auth_type PASS\n ' + 'auth_pass 12345\n ' + '}\n ' + 'unicast_src_ip 1::4\n ' + 'unicast_peer {\n ' + '}\n ' + 'virtual_ipaddress {\n ' + '1.2.3.4/32 dev if0\n ' + '}\n ' + 'track_script {\n ' + 'check_backend\n }\n' + '}\n' + } + } + + # check keepalived config + assert keepalived_generated_conf[0] == keepalived_expected_conf + + # generate the haproxy conf based on the specified spec + haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) + + haproxy_expected_conf = { + 'files': + { + 'haproxy.cfg': + '# This file is generated by cephadm.' + '\nglobal\n log ' + '127.0.0.1 local2\n ' + 'chroot /var/lib/haproxy\n ' + 'pidfile /var/lib/haproxy/haproxy.pid\n ' + 'maxconn 8000\n ' + 'daemon\n ' + 'stats socket /var/lib/haproxy/stats\n' + '\ndefaults\n ' + 'mode http\n ' + 'log global\n ' + 'option httplog\n ' + 'option dontlognull\n ' + 'option http-server-close\n ' + 'option forwardfor except 127.0.0.0/8\n ' + 'option redispatch\n ' + 'retries 3\n ' + 'timeout queue 20s\n ' + 'timeout connect 5s\n ' + 'timeout http-request 1s\n ' + 'timeout http-keep-alive 5s\n ' + 'timeout client 1s\n ' + 'timeout server 1s\n ' + 'timeout check 5s\n ' + 'maxconn 8000\n' + '\nfrontend stats\n ' + 'mode http\n ' + 'bind *:8999\n ' + 'bind localhost:8999\n ' + 'stats enable\n ' + 'stats uri /stats\n ' + 'stats refresh 10s\n ' + 'stats auth admin:12345\n ' + 'http-request use-service prometheus-exporter if { path /metrics }\n ' + 'monitor-uri /health\n' + '\nfrontend frontend\n ' + 'bind *:8089\n ' + 'default_backend backend\n\n' + 'backend backend\n ' + 'option forwardfor\n ' + 'balance static-rr\n ' + 'option httpchk HEAD / HTTP/1.0\n ' + 'server ' + + haproxy_generated_conf[1][0] + ' 1::4:80 check weight 100\n' + } + } + + assert haproxy_generated_conf[0] == haproxy_expected_conf + + +class TestCephFsMirror: + @patch("cephadm.serve.CephadmServe._run_cephadm") + def test_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec('cephfs-mirror')): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'mgr module enable', + 'module': 'mirroring' + }) diff --git a/src/pybind/mgr/cephadm/tests/test_spec.py b/src/pybind/mgr/cephadm/tests/test_spec.py new file mode 100644 index 000000000..54aa0a7ab --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_spec.py @@ -0,0 +1,600 @@ +# Disable autopep8 for this file: + +# fmt: off + +import json + +import pytest + +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ + IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec +from orchestrator import DaemonDescription, OrchestratorError + + +@pytest.mark.parametrize( + "spec_json", + json.loads("""[ +{ + "placement": { + "count": 1 + }, + "service_type": "alertmanager" +}, +{ + "placement": { + "host_pattern": "*" + }, + "service_type": "crash" +}, +{ + "placement": { + "count": 1 + }, + "service_type": "grafana" +}, +{ + "placement": { + "count": 2 + }, + "service_type": "mgr" +}, +{ + "placement": { + "count": 5 + }, + "service_type": "mon" +}, +{ + "placement": { + "host_pattern": "*" + }, + "service_type": "node-exporter" +}, +{ + "placement": { + "count": 1 + }, + "service_type": "prometheus" +}, +{ + "placement": { + "hosts": [ + { + "hostname": "ceph-001", + "network": "", + "name": "" + } + ] + }, + "service_type": "rgw", + "service_id": "default-rgw-realm.eu-central-1.1", + "rgw_realm": "default-rgw-realm", + "rgw_zone": "eu-central-1" +}, +{ + "service_type": "osd", + "service_id": "osd_spec_default", + "placement": { + "host_pattern": "*" + }, + "data_devices": { + "model": "MC-55-44-XZ" + }, + "db_devices": { + "model": "SSD-123-foo" + }, + "wal_devices": { + "model": "NVME-QQQQ-987" + } +} +] +""") +) +def test_spec_octopus(spec_json): + # https://tracker.ceph.com/issues/44934 + # Those are real user data from early octopus. + # Please do not modify those JSON values. + + spec = ServiceSpec.from_json(spec_json) + + # just some verification that we can sill read old octopus specs + def convert_to_old_style_json(j): + j_c = dict(j.copy()) + j_c.pop('service_name', None) + if 'spec' in j_c: + spec = j_c.pop('spec') + j_c.update(spec) + if 'placement' in j_c: + if 'hosts' in j_c['placement']: + j_c['placement']['hosts'] = [ + { + 'hostname': HostPlacementSpec.parse(h).hostname, + 'network': HostPlacementSpec.parse(h).network, + 'name': HostPlacementSpec.parse(h).name + } + for h in j_c['placement']['hosts'] + ] + j_c.pop('objectstore', None) + j_c.pop('filter_logic', None) + return j_c + + assert spec_json == convert_to_old_style_json(spec.to_json()) + + +@pytest.mark.parametrize( + "dd_json", + json.loads("""[ + { + "hostname": "ceph-001", + "container_id": "d94d7969094d", + "container_image_id": "0881eb8f169f5556a292b4e2c01d683172b12830a62a9225a98a8e206bb734f0", + "container_image_name": "docker.io/prom/alertmanager:latest", + "daemon_id": "ceph-001", + "daemon_type": "alertmanager", + "version": "0.20.0", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.725856", + "created": "2020-04-02T19:23:08.829543", + "started": "2020-04-03T07:29:16.932838", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "c4b036202241", + "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1", + "container_image_name": "docker.io/ceph/ceph:v15", + "daemon_id": "ceph-001", + "daemon_type": "crash", + "version": "15.2.0", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.725903", + "created": "2020-04-02T19:23:11.390694", + "started": "2020-04-03T07:29:16.910897", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "5b7b94b48f31", + "container_image_id": "87a51ecf0b1c9a7b187b21c1b071425dafea0d765a96d5bc371c791169b3d7f4", + "container_image_name": "docker.io/ceph/ceph-grafana:latest", + "daemon_id": "ceph-001", + "daemon_type": "grafana", + "version": "6.6.2", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.725950", + "created": "2020-04-02T19:23:52.025088", + "started": "2020-04-03T07:29:16.847972", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "9ca007280456", + "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1", + "container_image_name": "docker.io/ceph/ceph:v15", + "daemon_id": "ceph-001.gkjwqp", + "daemon_type": "mgr", + "version": "15.2.0", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.725807", + "created": "2020-04-02T19:22:18.648584", + "started": "2020-04-03T07:29:16.856153", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "3d1ba9a2b697", + "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1", + "container_image_name": "docker.io/ceph/ceph:v15", + "daemon_id": "ceph-001", + "daemon_type": "mon", + "version": "15.2.0", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.725715", + "created": "2020-04-02T19:22:13.863300", + "started": "2020-04-03T07:29:17.206024", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "36d026c68ba1", + "container_image_id": "e5a616e4b9cf68dfcad7782b78e118be4310022e874d52da85c55923fb615f87", + "container_image_name": "docker.io/prom/node-exporter:latest", + "daemon_id": "ceph-001", + "daemon_type": "node-exporter", + "version": "0.18.1", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.725996", + "created": "2020-04-02T19:23:53.880197", + "started": "2020-04-03T07:29:16.880044", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "faf76193cbfe", + "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1", + "container_image_name": "docker.io/ceph/ceph:v15", + "daemon_id": "0", + "daemon_type": "osd", + "version": "15.2.0", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.726088", + "created": "2020-04-02T20:35:02.991435", + "started": "2020-04-03T07:29:19.373956", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "f82505bae0f1", + "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1", + "container_image_name": "docker.io/ceph/ceph:v15", + "daemon_id": "1", + "daemon_type": "osd", + "version": "15.2.0", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.726134", + "created": "2020-04-02T20:35:17.142272", + "started": "2020-04-03T07:29:19.374002", + "is_active": false + }, + { + "hostname": "ceph-001", + "container_id": "2708d84cd484", + "container_image_id": "358a0d2395fe711bb8258e8fb4b2d7865c0a9a6463969bcd1452ee8869ea6653", + "container_image_name": "docker.io/prom/prometheus:latest", + "daemon_id": "ceph-001", + "daemon_type": "prometheus", + "version": "2.17.1", + "status": 1, + "status_desc": "running", + "last_refresh": "2020-04-03T15:31:48.726042", + "created": "2020-04-02T19:24:10.281163", + "started": "2020-04-03T07:29:16.926292", + "is_active": false + }, + { + "hostname": "ceph-001", + "daemon_id": "default-rgw-realm.eu-central-1.1.ceph-001.ytywjo", + "daemon_type": "rgw", + "status": 1, + "status_desc": "starting", + "is_active": false + } +]""") +) +def test_dd_octopus(dd_json): + # https://tracker.ceph.com/issues/44934 + # Those are real user data from early octopus. + # Please do not modify those JSON values. + + # Convert datetime properties to old style. + # 2020-04-03T07:29:16.926292Z -> 2020-04-03T07:29:16.926292 + def convert_to_old_style_json(j): + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if k in j: + j[k] = j[k].rstrip('Z') + del j['daemon_name'] + return j + + assert dd_json == convert_to_old_style_json( + DaemonDescription.from_json(dd_json).to_json()) + + +@pytest.mark.parametrize("spec,dd,valid", +[ # noqa: E128 + # https://tracker.ceph.com/issues/44934 + ( + RGWSpec( + service_id="foo", + rgw_realm="default-rgw-realm", + rgw_zone="eu-central-1", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="foo.ceph-001.ytywjo", + hostname="ceph-001", + ), + True + ), + ( + # no realm + RGWSpec( + service_id="foo.bar", + rgw_zone="eu-central-1", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="foo.bar.ceph-001.ytywjo", + hostname="ceph-001", + ), + True + ), + ( + # no realm or zone + RGWSpec( + service_id="bar", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="bar.host.domain.tld.ytywjo", + hostname="host.domain.tld", + ), + True + ), + ( + # explicit naming + RGWSpec( + service_id="realm.zone", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="realm.zone.a", + hostname="smithi028", + ), + True + ), + ( + # without host + RGWSpec( + service_type='rgw', + service_id="foo", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="foo.hostname.ytywjo", + hostname=None, + ), + False + ), + ( + # without host (2) + RGWSpec( + service_type='rgw', + service_id="default-rgw-realm.eu-central-1.1", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="default-rgw-realm.eu-central-1.1.hostname.ytywjo", + hostname=None, + ), + False + ), + ( + # service_id contains hostname + # (sort of) https://tracker.ceph.com/issues/45294 + RGWSpec( + service_id="default.rgw.realm.ceph.001", + ), + DaemonDescription( + daemon_type='rgw', + daemon_id="default.rgw.realm.ceph.001.ceph.001.ytywjo", + hostname="ceph.001", + ), + True + ), + + # https://tracker.ceph.com/issues/45293 + ( + ServiceSpec( + service_type='mds', + service_id="a", + ), + DaemonDescription( + daemon_type='mds', + daemon_id="a.host1.abc123", + hostname="host1", + ), + True + ), + ( + # '.' char in service_id + ServiceSpec( + service_type='mds', + service_id="a.b.c", + ), + DaemonDescription( + daemon_type='mds', + daemon_id="a.b.c.host1.abc123", + hostname="host1", + ), + True + ), + + # https://tracker.ceph.com/issues/45617 + ( + # daemon_id does not contain hostname + ServiceSpec( + service_type='mds', + service_id="a", + ), + DaemonDescription( + daemon_type='mds', + daemon_id="a", + hostname="host1", + ), + True + ), + ( + # daemon_id only contains hostname + ServiceSpec( + service_type='mds', + service_id="host1", + ), + DaemonDescription( + daemon_type='mds', + daemon_id="host1", + hostname="host1", + ), + True + ), + + # https://tracker.ceph.com/issues/45399 + ( + # daemon_id only contains hostname + ServiceSpec( + service_type='mds', + service_id="a", + ), + DaemonDescription( + daemon_type='mds', + daemon_id="a.host1.abc123", + hostname="host1.site", + ), + True + ), + ( + NFSServiceSpec( + service_id="a", + ), + DaemonDescription( + daemon_type='nfs', + daemon_id="a.host1", + hostname="host1.site", + ), + True + ), + + # https://tracker.ceph.com/issues/45293 + ( + NFSServiceSpec( + service_id="a", + ), + DaemonDescription( + daemon_type='nfs', + daemon_id="a.host1", + hostname="host1", + ), + True + ), + ( + # service_id contains a '.' char + NFSServiceSpec( + service_id="a.b.c", + ), + DaemonDescription( + daemon_type='nfs', + daemon_id="a.b.c.host1", + hostname="host1", + ), + True + ), + ( + # trailing chars after hostname + NFSServiceSpec( + service_id="a.b.c", + ), + DaemonDescription( + daemon_type='nfs', + daemon_id="a.b.c.host1.abc123", + hostname="host1", + ), + True + ), + ( + # chars after hostname without '.' + NFSServiceSpec( + service_id="a", + ), + DaemonDescription( + daemon_type='nfs', + daemon_id="a.host1abc123", + hostname="host1", + ), + False + ), + ( + # chars before hostname without '.' + NFSServiceSpec( + service_id="a", + ), + DaemonDescription( + daemon_type='nfs', + daemon_id="ahost1.abc123", + hostname="host1", + ), + False + ), + + # https://tracker.ceph.com/issues/45293 + ( + IscsiServiceSpec( + service_type='iscsi', + service_id="a", + ), + DaemonDescription( + daemon_type='iscsi', + daemon_id="a.host1.abc123", + hostname="host1", + ), + True + ), + ( + # '.' char in service_id + IscsiServiceSpec( + service_type='iscsi', + service_id="a.b.c", + ), + DaemonDescription( + daemon_type='iscsi', + daemon_id="a.b.c.host1.abc123", + hostname="host1", + ), + True + ), + ( + # fixed daemon id for teuthology. + IscsiServiceSpec( + service_type='iscsi', + service_id='iscsi', + ), + DaemonDescription( + daemon_type='iscsi', + daemon_id="iscsi.a", + hostname="host1", + ), + True + ), + + ( + CustomContainerSpec( + service_type='container', + service_id='hello-world', + image='docker.io/library/hello-world:latest', + ), + DaemonDescription( + daemon_type='container', + daemon_id='hello-world.mgr0', + hostname='mgr0', + ), + True + ), + + ( + # daemon_id only contains hostname + ServiceSpec( + service_type='cephadm-exporter', + ), + DaemonDescription( + daemon_type='cephadm-exporter', + daemon_id="testhost", + hostname="testhost", + ), + True + ), +]) +def test_daemon_description_service_name(spec: ServiceSpec, + dd: DaemonDescription, + valid: bool): + if valid: + assert spec.service_name() == dd.service_name() + else: + with pytest.raises(OrchestratorError): + dd.service_name() diff --git a/src/pybind/mgr/cephadm/tests/test_template.py b/src/pybind/mgr/cephadm/tests/test_template.py new file mode 100644 index 000000000..f67304348 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_template.py @@ -0,0 +1,33 @@ +import pathlib + +import pytest + +from cephadm.template import TemplateMgr, UndefinedError, TemplateNotFoundError + + +def test_render(cephadm_module, fs): + template_base = (pathlib.Path(__file__).parent / '../templates').resolve() + fake_template = template_base / 'foo/bar' + fs.create_file(fake_template, contents='{{ cephadm_managed }}{{ var }}') + + template_mgr = TemplateMgr(cephadm_module) + value = 'test' + + # with base context + expected_text = '{}{}'.format(template_mgr.base_context['cephadm_managed'], value) + assert template_mgr.render('foo/bar', {'var': value}) == expected_text + + # without base context + with pytest.raises(UndefinedError): + template_mgr.render('foo/bar', {'var': value}, managed_context=False) + + # override the base context + context = { + 'cephadm_managed': 'abc', + 'var': value + } + assert template_mgr.render('foo/bar', context) == 'abc{}'.format(value) + + # template not found + with pytest.raises(TemplateNotFoundError): + template_mgr.render('foo/bar/2', {}) diff --git a/src/pybind/mgr/cephadm/tests/test_upgrade.py b/src/pybind/mgr/cephadm/tests/test_upgrade.py new file mode 100644 index 000000000..9368f4dc3 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_upgrade.py @@ -0,0 +1,322 @@ +import json +from unittest import mock + +import pytest + +from ceph.deployment.service_spec import PlacementSpec, ServiceSpec +from cephadm import CephadmOrchestrator +from cephadm.upgrade import CephadmUpgrade +from cephadm.serve import CephadmServe +from orchestrator import OrchestratorError, DaemonDescription +from .fixtures import _run_cephadm, wait, with_host, with_service + +from typing import List, Tuple, Optional + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_upgrade_start(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_host(cephadm_module, 'test2'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), status_running=True): + assert wait(cephadm_module, cephadm_module.upgrade_start( + 'image_id', None)) == 'Initiating upgrade to image_id' + + assert wait(cephadm_module, cephadm_module.upgrade_status() + ).target_image == 'image_id' + + assert wait(cephadm_module, cephadm_module.upgrade_pause() + ) == 'Paused upgrade to image_id' + + assert wait(cephadm_module, cephadm_module.upgrade_resume() + ) == 'Resumed upgrade to image_id' + + assert wait(cephadm_module, cephadm_module.upgrade_stop() + ) == 'Stopped upgrade to image_id' + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +@pytest.mark.parametrize("use_repo_digest", + [ + False, + True + ]) +def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + cephadm_module.set_container_image('global', 'from_image') + cephadm_module.use_repo_digest = use_repo_digest + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*', count=2)), + CephadmOrchestrator.apply_mgr, '', status_running=True),\ + mock.patch("cephadm.module.CephadmOrchestrator.lookup_release_name", + return_value='foo'),\ + mock.patch("cephadm.module.CephadmOrchestrator.version", + new_callable=mock.PropertyMock) as version_mock,\ + mock.patch("cephadm.module.CephadmOrchestrator.get", + return_value={ + # capture fields in both mon and osd maps + "require_osd_release": "pacific", + "min_mon_release": 16, + }): + version_mock.return_value = 'ceph version 18.2.1 (somehash)' + assert wait(cephadm_module, cephadm_module.upgrade_start( + 'to_image', None)) == 'Initiating upgrade to to_image' + + assert wait(cephadm_module, cephadm_module.upgrade_status() + ).target_image == 'to_image' + + def _versions_mock(cmd): + return json.dumps({ + 'mgr': { + 'ceph version 1.2.3 (asdf) blah': 1 + } + }) + + cephadm_module._mon_command_mock_versions = _versions_mock + + with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({ + 'image_id': 'image_id', + 'repo_digests': ['to_image@repo_digest'], + 'ceph_version': 'ceph version 18.2.3 (hash)', + }))): + + cephadm_module.upgrade._do_upgrade() + + assert cephadm_module.upgrade_status is not None + + with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name=list(cephadm_module.cache.daemons['host1'].keys())[0], + style='cephadm', + fsid='fsid', + container_id='container_id', + container_image_id='image_id', + container_image_digests=['to_image@repo_digest'], + deployed_by=['to_image@repo_digest'], + version='version', + state='running', + ) + ]) + )): + CephadmServe(cephadm_module)._refresh_hosts_and_daemons() + + with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({ + 'image_id': 'image_id', + 'repo_digests': ['to_image@repo_digest'], + 'ceph_version': 'ceph version 18.2.3 (hash)', + }))): + cephadm_module.upgrade._do_upgrade() + + _, image, _ = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': 'global', + 'key': 'container_image', + }) + if use_repo_digest: + assert image == 'to_image@repo_digest' + else: + assert image == 'to_image' + + +def test_upgrade_state_null(cephadm_module: CephadmOrchestrator): + # This test validates https://tracker.ceph.com/issues/47580 + cephadm_module.set_store('upgrade_state', 'null') + CephadmUpgrade(cephadm_module) + assert CephadmUpgrade(cephadm_module).upgrade_state is None + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_not_enough_mgrs(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'host1'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=1)), CephadmOrchestrator.apply_mgr, ''): + with pytest.raises(OrchestratorError): + wait(cephadm_module, cephadm_module.upgrade_start('image_id', None)) + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +@mock.patch("cephadm.CephadmOrchestrator.check_mon_command") +def test_enough_mons_for_ok_to_stop(check_mon_command, cephadm_module: CephadmOrchestrator): + # only 2 monitors, not enough for ok-to-stop to ever pass + check_mon_command.return_value = ( + 0, '{"monmap": {"mons": [{"name": "mon.1"}, {"name": "mon.2"}]}}', '') + assert not cephadm_module.upgrade._enough_mons_for_ok_to_stop() + + # 3 monitors, ok-to-stop should work fine + check_mon_command.return_value = ( + 0, '{"monmap": {"mons": [{"name": "mon.1"}, {"name": "mon.2"}, {"name": "mon.3"}]}}', '') + assert cephadm_module.upgrade._enough_mons_for_ok_to_stop() + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +@mock.patch("cephadm.module.HostCache.get_daemons_by_service") +@mock.patch("cephadm.CephadmOrchestrator.get") +def test_enough_mds_for_ok_to_stop(get, get_daemons_by_service, cephadm_module: CephadmOrchestrator): + get.side_effect = [{'filesystems': [{'mdsmap': {'fs_name': 'test', 'max_mds': 1}}]}] + get_daemons_by_service.side_effect = [[DaemonDescription()]] + assert not cephadm_module.upgrade._enough_mds_for_ok_to_stop( + DaemonDescription(daemon_type='mds', daemon_id='test.host1.gfknd', service_name='mds.test')) + + get.side_effect = [{'filesystems': [{'mdsmap': {'fs_name': 'myfs.test', 'max_mds': 2}}]}] + get_daemons_by_service.side_effect = [[DaemonDescription(), DaemonDescription()]] + assert not cephadm_module.upgrade._enough_mds_for_ok_to_stop( + DaemonDescription(daemon_type='mds', daemon_id='myfs.test.host1.gfknd', service_name='mds.myfs.test')) + + get.side_effect = [{'filesystems': [{'mdsmap': {'fs_name': 'myfs.test', 'max_mds': 1}}]}] + get_daemons_by_service.side_effect = [[DaemonDescription(), DaemonDescription()]] + assert cephadm_module.upgrade._enough_mds_for_ok_to_stop( + DaemonDescription(daemon_type='mds', daemon_id='myfs.test.host1.gfknd', service_name='mds.myfs.test')) + + +@pytest.mark.parametrize( + "upgraded, not_upgraded, daemon_types, hosts, services, should_block", + # [ ([(type, host, id), ... ], [...], [daemon types], [hosts], [services], True/False), ... ] + [ + ( # valid, upgrade mgr daemons + [], + [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')], + ['mgr'], + None, + None, + False + ), + ( # invalid, can't upgrade mons until mgr is upgraded + [], + [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')], + ['mon'], + None, + None, + True + ), + ( # invalid, can't upgrade mon service until all mgr daemons are upgraded + [], + [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')], + None, + None, + ['mon'], + True + ), + ( # valid, upgrade mgr service + [], + [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')], + None, + None, + ['mgr'], + False + ), + ( # valid, mgr is already upgraded so can upgrade mons + [('mgr', 'a', 'a.x')], + [('mon', 'a', 'a')], + ['mon'], + None, + None, + False + ), + ( # invalid, can't upgrade all daemons on b b/c un-upgraded mgr on a + [], + [('mgr', 'b', 'b.y'), ('mon', 'a', 'a')], + None, + ['a'], + None, + True + ), + ( # valid, only daemon on b is a mgr + [], + [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')], + None, + ['b'], + None, + False + ), + ( # invalid, can't upgrade mon on a while mgr on b is un-upgraded + [], + [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')], + None, + ['a'], + None, + True + ), + ( # valid, only upgrading the mgr on a + [], + [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')], + ['mgr'], + ['a'], + None, + False + ), + ( # valid, mgr daemon not on b are upgraded + [('mgr', 'a', 'a.x')], + [('mgr', 'b', 'b.y'), ('mon', 'a', 'a')], + None, + ['b'], + None, + False + ), + ( # valid, all the necessary hosts are covered, mgr on c is already upgraded + [('mgr', 'c', 'c.z')], + [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a'), ('osd', 'c', '0')], + None, + ['a', 'b'], + None, + False + ), + ( # invalid, can't upgrade mon on a while mgr on b is un-upgraded + [], + [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')], + ['mgr', 'mon'], + ['a'], + None, + True + ), + ( # valid, only mon not on "b" is upgraded already. Case hit while making teuthology test + [('mon', 'a', 'a')], + [('mon', 'b', 'x'), ('mon', 'b', 'y'), ('osd', 'a', '1'), ('osd', 'b', '2')], + ['mon', 'osd'], + ['b'], + None, + False + ), + ] +) +@mock.patch("cephadm.module.HostCache.get_daemons") +@mock.patch("cephadm.serve.CephadmServe._get_container_image_info") +@mock.patch('cephadm.module.SpecStore.__getitem__') +def test_staggered_upgrade_validation( + get_spec, + get_image_info, + get_daemons, + upgraded: List[Tuple[str, str, str]], + not_upgraded: List[Tuple[str, str, str, str]], + daemon_types: Optional[str], + hosts: Optional[str], + services: Optional[str], + should_block: bool, + cephadm_module: CephadmOrchestrator, +): + def to_dds(ts: List[Tuple[str, str]], upgraded: bool) -> List[DaemonDescription]: + dds = [] + digest = 'new_image@repo_digest' if upgraded else 'old_image@repo_digest' + for t in ts: + dds.append(DaemonDescription(daemon_type=t[0], + hostname=t[1], + daemon_id=t[2], + container_image_digests=[digest], + deployed_by=[digest],)) + return dds + get_daemons.return_value = to_dds(upgraded, True) + to_dds(not_upgraded, False) + get_image_info.return_value = ('new_id', 'ceph version 99.99.99 (hash)', ['new_image@repo_digest']) + + class FakeSpecDesc(): + def __init__(self, spec): + self.spec = spec + + def _get_spec(s): + return FakeSpecDesc(ServiceSpec(s)) + + get_spec.side_effect = _get_spec + if should_block: + with pytest.raises(OrchestratorError): + cephadm_module.upgrade._validate_upgrade_filters( + 'new_image_name', daemon_types, hosts, services) + else: + cephadm_module.upgrade._validate_upgrade_filters( + 'new_image_name', daemon_types, hosts, services) |