summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/cephadm/tests
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/cephadm/tests/__init__.py0
-rw-r--r--src/pybind/mgr/cephadm/tests/conftest.py27
-rw-r--r--src/pybind/mgr/cephadm/tests/fixtures.py151
-rw-r--r--src/pybind/mgr/cephadm/tests/test_autotune.py69
-rw-r--r--src/pybind/mgr/cephadm/tests/test_cephadm.py1805
-rw-r--r--src/pybind/mgr/cephadm/tests/test_completion.py40
-rw-r--r--src/pybind/mgr/cephadm/tests/test_configchecks.py668
-rw-r--r--src/pybind/mgr/cephadm/tests/test_facts.py10
-rw-r--r--src/pybind/mgr/cephadm/tests/test_migration.py229
-rw-r--r--src/pybind/mgr/cephadm/tests/test_osd_removal.py298
-rw-r--r--src/pybind/mgr/cephadm/tests/test_scheduling.py1591
-rw-r--r--src/pybind/mgr/cephadm/tests/test_services.py1031
-rw-r--r--src/pybind/mgr/cephadm/tests/test_spec.py600
-rw-r--r--src/pybind/mgr/cephadm/tests/test_template.py33
-rw-r--r--src/pybind/mgr/cephadm/tests/test_upgrade.py322
15 files changed, 6874 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/tests/__init__.py b/src/pybind/mgr/cephadm/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/__init__.py
diff --git a/src/pybind/mgr/cephadm/tests/conftest.py b/src/pybind/mgr/cephadm/tests/conftest.py
new file mode 100644
index 000000000..e8add2c7b
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/conftest.py
@@ -0,0 +1,27 @@
+import pytest
+
+from cephadm.services.osd import RemoveUtil, OSD
+from tests import mock
+
+from .fixtures import with_cephadm_module
+
+
+@pytest.fixture()
+def cephadm_module():
+ with with_cephadm_module({}) as m:
+ yield m
+
+
+@pytest.fixture()
+def rm_util():
+ with with_cephadm_module({}) as m:
+ r = RemoveUtil.__new__(RemoveUtil)
+ r.__init__(m)
+ yield r
+
+
+@pytest.fixture()
+def osd_obj():
+ with mock.patch("cephadm.services.osd.RemoveUtil"):
+ o = OSD(0, mock.MagicMock())
+ yield o
diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py
new file mode 100644
index 000000000..8c2e1cfbf
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/fixtures.py
@@ -0,0 +1,151 @@
+import fnmatch
+from contextlib import contextmanager
+
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
+from ceph.utils import datetime_to_str, datetime_now
+from cephadm.serve import CephadmServe
+
+try:
+ from typing import Any, Iterator, List
+except ImportError:
+ pass
+
+from cephadm import CephadmOrchestrator
+from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus
+from tests import mock
+
+
+def get_ceph_option(_, key):
+ return __file__
+
+
+def get_module_option_ex(_, module, key, default=None):
+ if module == 'prometheus':
+ if key == 'server_port':
+ return 9283
+ return None
+
+
+def _run_cephadm(ret):
+ def foo(s, host, entity, cmd, e, **kwargs):
+ if cmd == 'gather-facts':
+ return '{}', '', 0
+ return [ret], '', 0
+ return foo
+
+
+def match_glob(val, pat):
+ ok = fnmatch.fnmatchcase(val, pat)
+ if not ok:
+ assert pat in val
+
+
+@contextmanager
+def with_cephadm_module(module_options=None, store=None):
+ """
+ :param module_options: Set opts as if they were set before module.__init__ is called
+ :param store: Set the store before module.__init__ is called
+ """
+ with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
+ mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
+ mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex),\
+ mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+ mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
+
+ m = CephadmOrchestrator.__new__(CephadmOrchestrator)
+ if module_options is not None:
+ for k, v in module_options.items():
+ m._ceph_set_module_option('cephadm', k, v)
+ if store is None:
+ store = {}
+ if '_ceph_get/mon_map' not in store:
+ m.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ if '_ceph_get/mgr_map' not in store:
+ m.mock_store_set('_ceph_get', 'mgr_map', {
+ 'services': {
+ 'dashboard': 'http://[::1]:8080',
+ 'prometheus': 'http://[::1]:8081'
+ },
+ 'modules': ['dashboard', 'prometheus'],
+ })
+ for k, v in store.items():
+ m._ceph_set_store(k, v)
+
+ m.__init__('cephadm', 0, 0)
+ m._cluster_fsid = "fsid"
+ yield m
+
+
+def wait(m: CephadmOrchestrator, c: OrchResult) -> Any:
+ return raise_if_exception(c)
+
+
+@contextmanager
+def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True):
+ with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
+ wait(m, m.add_host(HostSpec(hostname=name)))
+ if refresh_hosts:
+ CephadmServe(m)._refresh_hosts_and_daemons()
+ yield
+ wait(m, m.remove_host(name, force=rm_with_force))
+
+
+def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
+ mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
+ if mon_or_mgr:
+ assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
+ return
+ assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
+ assert cephadm.spec_store[srv_name].deleted is not None
+ CephadmServe(cephadm)._check_daemons()
+ CephadmServe(cephadm)._apply_all_services()
+ assert cephadm.spec_store[srv_name].deleted
+ unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
+ CephadmServe(cephadm)._purge_deleted_services()
+ if not unmanaged: # cause then we're not deleting daemons
+ assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'
+
+
+@contextmanager
+def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]:
+ if spec.placement.is_empty() and host:
+ spec.placement = PlacementSpec(hosts=[host], count=1)
+ if meth is not None:
+ c = meth(cephadm_module, spec)
+ assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
+ else:
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...']
+
+ specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
+ assert spec in specs
+
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ if status_running:
+ make_daemons_running(cephadm_module, spec.service_name())
+
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
+ if host and spec.service_type != 'osd':
+ assert own_dds
+
+ yield [dd.name() for dd in own_dds]
+
+ assert_rm_service(cephadm_module, spec.service_name())
+
+
+def make_daemons_running(cephadm_module, service_name):
+ own_dds = cephadm_module.cache.get_daemons_by_service(service_name)
+ for dd in own_dds:
+ dd.status = DaemonDescriptionStatus.running # We're changing the reference
+
+
+def _deploy_cephadm_binary(host):
+ def foo(*args, **kwargs):
+ return True
+ return foo
diff --git a/src/pybind/mgr/cephadm/tests/test_autotune.py b/src/pybind/mgr/cephadm/tests/test_autotune.py
new file mode 100644
index 000000000..524da9c00
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_autotune.py
@@ -0,0 +1,69 @@
+# Disable autopep8 for this file:
+
+# fmt: off
+
+import pytest
+
+from cephadm.autotune import MemoryAutotuner
+from orchestrator import DaemonDescription
+
+
+@pytest.mark.parametrize("total,daemons,config,result",
+ [ # noqa: E128
+ (
+ 128 * 1024 * 1024 * 1024,
+ [],
+ {},
+ None,
+ ),
+ (
+ 128 * 1024 * 1024 * 1024,
+ [
+ DaemonDescription('osd', '1', 'host1'),
+ DaemonDescription('osd', '2', 'host1'),
+ ],
+ {},
+ 64 * 1024 * 1024 * 1024,
+ ),
+ (
+ 128 * 1024 * 1024 * 1024,
+ [
+ DaemonDescription('osd', '1', 'host1'),
+ DaemonDescription('osd', '2', 'host1'),
+ DaemonDescription('osd', '3', 'host1'),
+ ],
+ {
+ 'osd.3': 16 * 1024 * 1024 * 1024,
+ },
+ 56 * 1024 * 1024 * 1024,
+ ),
+ (
+ 128 * 1024 * 1024 * 1024,
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('osd', '1', 'host1'),
+ DaemonDescription('osd', '2', 'host1'),
+ ],
+ {},
+ 62 * 1024 * 1024 * 1024,
+ )
+ ])
+def test_autotune(total, daemons, config, result):
+ def fake_getter(who, opt):
+ if opt == 'osd_memory_target_autotune':
+ if who in config:
+ return False
+ else:
+ return True
+ if opt == 'osd_memory_target':
+ return config.get(who, 4 * 1024 * 1024 * 1024)
+ if opt == 'mds_cache_memory_limit':
+ return 16 * 1024 * 1024 * 1024
+
+ a = MemoryAutotuner(
+ total_mem=total,
+ daemons=daemons,
+ config_get=fake_getter,
+ )
+ val, osds = a.tune()
+ assert val == result
diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py
new file mode 100644
index 000000000..a6850f6cb
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py
@@ -0,0 +1,1805 @@
+import json
+import logging
+from contextlib import contextmanager
+
+import pytest
+
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
+from cephadm.serve import CephadmServe
+from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
+
+try:
+ from typing import List
+except ImportError:
+ pass
+
+from execnet.gateway_bootstrap import HostNotFound
+
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
+ NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec, MDSSpec
+from ceph.deployment.drive_selection.selector import DriveSelection
+from ceph.deployment.inventory import Devices, Device
+from ceph.utils import datetime_to_str, datetime_now
+from orchestrator import DaemonDescription, InventoryHost, \
+ HostSpec, OrchestratorError, DaemonDescriptionStatus, OrchestratorEvent
+from tests import mock
+from .fixtures import wait, _run_cephadm, match_glob, with_host, \
+ with_cephadm_module, with_service, _deploy_cephadm_binary, make_daemons_running
+from cephadm.module import CephadmOrchestrator
+
+"""
+TODOs:
+ There is really room for improvement here. I just quickly assembled theses tests.
+ I general, everything should be testes in Teuthology as well. Reasons for
+ also testing this here is the development roundtrip time.
+"""
+
+
+def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
+ dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host))
+ d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)]
+ assert d_names
+ # there should only be one daemon (if not match_glob will throw mismatch)
+ assert len(d_names) == 1
+
+ c = cephadm.remove_daemons(d_names)
+ [out] = wait(cephadm, c)
+ # picking the 1st element is needed, rather than passing the list when the daemon
+ # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter
+ # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e)
+ # are valid, so pass without incident! Also, match_gob acts on strings anyway!
+ match_glob(out, f"Removed {d_names[0]}* from host '{host}'")
+
+
+@contextmanager
+def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: str):
+ spec.placement = PlacementSpec(hosts=[host], count=1)
+
+ c = cephadm_module.add_daemon(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'")
+
+ dds = cephadm_module.cache.get_daemons_by_service(spec.service_name())
+ for dd in dds:
+ if dd.hostname == host:
+ yield dd.daemon_id
+ assert_rm_daemon(cephadm_module, spec.service_name(), host)
+ return
+
+ assert False, 'Daemon not found'
+
+
+@contextmanager
+def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {
+ 'osd': 1,
+ 'up_from': 0,
+ 'up': True,
+ 'uuid': 'uuid'
+ }
+ ]
+ })
+
+ _run_cephadm.reset_mock(return_value=True, side_effect=True)
+ if ceph_volume_lvm_list:
+ _run_cephadm.side_effect = ceph_volume_lvm_list
+ else:
+ def _ceph_volume_list(s, host, entity, cmd, **kwargs):
+ logging.info(f'ceph-volume cmd: {cmd}')
+ if 'raw' in cmd:
+ return json.dumps({
+ "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": {
+ "ceph_fsid": cephadm_module._cluster_fsid,
+ "device": "/dev/loop0",
+ "osd_id": 21,
+ "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5",
+ "type": "bluestore"
+ },
+ }), '', 0
+ if 'lvm' in cmd:
+ return json.dumps({
+ str(osd_id): [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }]
+ }), '', 0
+ return '{}', '', 0
+
+ _run_cephadm.side_effect = _ceph_volume_list
+
+ assert cephadm_module._osd_activate(
+ [host]).stdout == f"Created osd(s) 1 on host '{host}'"
+ assert _run_cephadm.mock_calls == [
+ mock.call(host, 'osd', 'ceph-volume',
+ ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, image=''),
+ mock.call(host, f'osd.{osd_id}', 'deploy',
+ ['--name', f'osd.{osd_id}', '--meta-json', mock.ANY,
+ '--config-json', '-', '--osd-fsid', 'uuid'],
+ stdin=mock.ANY, image=''),
+ mock.call(host, 'osd', 'ceph-volume',
+ ['--', 'raw', 'list', '--format', 'json'], no_fsid=False, image=''),
+ ]
+ dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host)
+ assert dd.name() == f'osd.{osd_id}'
+ yield dd
+ cephadm_module._remove_daemons([(f'osd.{osd_id}', host)])
+
+
+class TestCephadm(object):
+
+ def test_get_unique_name(self, cephadm_module):
+ # type: (CephadmOrchestrator) -> None
+ existing = [
+ DaemonDescription(daemon_type='mon', daemon_id='a')
+ ]
+ new_mon = cephadm_module.get_unique_name('mon', 'myhost', existing)
+ match_glob(new_mon, 'myhost')
+ new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
+ match_glob(new_mgr, 'myhost.*')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_host(self, cephadm_module):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
+ with with_host(cephadm_module, 'test'):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')]
+
+ # Be careful with backward compatibility when changing things here:
+ assert json.loads(cephadm_module.get_store('inventory')) == \
+ {"test": {"hostname": "test", "addr": "1::4", "labels": [], "status": ""}}
+
+ with with_host(cephadm_module, 'second', '1.2.3.5'):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [
+ HostSpec('test', '1::4'),
+ HostSpec('second', '1.2.3.5')
+ ]
+
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')]
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.utils.resolve_ip")
+ def test_re_add_host_receive_loopback(self, resolve_ip, cephadm_module):
+ resolve_ip.side_effect = ['192.168.122.1', '127.0.0.1', '127.0.0.1']
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
+ cephadm_module._add_host(HostSpec('test', '192.168.122.1'))
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '192.168.122.1')]
+ cephadm_module._add_host(HostSpec('test'))
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '192.168.122.1')]
+ with pytest.raises(OrchestratorError):
+ cephadm_module._add_host(HostSpec('test2'))
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_service_ls(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.list_daemons(refresh=True)
+ assert wait(cephadm_module, c) == []
+ with with_service(cephadm_module, MDSSpec('mds', 'name', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, MDSSpec('mds', 'name'), 'test') as _:
+
+ c = cephadm_module.list_daemons()
+
+ def remove_id_events(dd):
+ out = dd.to_json()
+ del out['daemon_id']
+ del out['events']
+ del out['daemon_name']
+ return out
+
+ assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [
+ {
+ 'service_name': 'mds.name',
+ 'daemon_type': 'mds',
+ 'hostname': 'test',
+ 'status': 2,
+ 'status_desc': 'starting',
+ 'is_active': False,
+ 'ports': [],
+ }
+ ]
+
+ with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'),
+ CephadmOrchestrator.apply_rgw, 'test', status_running=True):
+ make_daemons_running(cephadm_module, 'mds.name')
+
+ c = cephadm_module.describe_service()
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'count': 2},
+ 'service_id': 'name',
+ 'service_name': 'mds.name',
+ 'service_type': 'mds',
+ 'status': {'created': mock.ANY, 'running': 1, 'size': 2},
+ 'unmanaged': True
+ },
+ {
+ 'placement': {
+ 'count': 1,
+ 'hosts': ["test"]
+ },
+ 'service_id': 'r.z',
+ 'service_name': 'rgw.r.z',
+ 'service_type': 'rgw',
+ 'status': {'created': mock.ANY, 'running': 1, 'size': 1,
+ 'ports': [80]},
+ }
+ ]
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_service_ls_service_type_flag(self, cephadm_module):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)),
+ CephadmOrchestrator.apply_mgr, '', status_running=True):
+ with with_service(cephadm_module, MDSSpec('mds', 'test-id', placement=PlacementSpec(count=2)),
+ CephadmOrchestrator.apply_mds, '', status_running=True):
+
+ # with no service-type. Should provide info fot both services
+ c = cephadm_module.describe_service()
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'count': 2},
+ 'service_name': 'mgr',
+ 'service_type': 'mgr',
+ 'status': {'created': mock.ANY,
+ 'running': 2,
+ 'size': 2}
+ },
+ {
+ 'placement': {'count': 2},
+ 'service_id': 'test-id',
+ 'service_name': 'mds.test-id',
+ 'service_type': 'mds',
+ 'status': {'created': mock.ANY,
+ 'running': 2,
+ 'size': 2}
+ },
+ ]
+
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
+
+ # with service-type. Should provide info fot only mds
+ c = cephadm_module.describe_service(service_type='mds')
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'count': 2},
+ 'service_id': 'test-id',
+ 'service_name': 'mds.test-id',
+ 'service_type': 'mds',
+ 'status': {'created': mock.ANY,
+ 'running': 2,
+ 'size': 2}
+ },
+ ]
+
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
+
+ # service-type should not match with service names
+ c = cephadm_module.describe_service(service_type='mds.test-id')
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ assert out == []
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_device_ls(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.get_inventory()
+ assert wait(cephadm_module, c) == [InventoryHost('test')]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='rgw.myrgw.foobar',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ ),
+ dict(
+ name='something.foo.bar',
+ style='cephadm',
+ fsid='fsid',
+ ),
+ dict(
+ name='haproxy.test.bar',
+ style='cephadm',
+ fsid='fsid',
+ ),
+
+ ])
+ ))
+ def test_list_daemons(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ assert {d.name() for d in dds} == {'rgw.myrgw.foobar', 'haproxy.test.bar'}
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
+
+ d_name = 'rgw.' + daemon_id
+
+ c = cephadm_module.daemon_action('redeploy', d_name)
+ assert wait(cephadm_module,
+ c) == f"Scheduled to redeploy rgw.{daemon_id} on host 'test'"
+
+ for what in ('start', 'stop', 'restart'):
+ c = cephadm_module.daemon_action(what, d_name)
+ assert wait(cephadm_module,
+ c) == F"Scheduled to {what} {d_name} on host 'test'"
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module._store['_ceph_get/mon_map'] = {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ }
+ cephadm_module.notify('mon_map', None)
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ assert cephadm_module.events.get_for_daemon(d_name) == [
+ OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO',
+ f"Deployed {d_name} on host \'test\'"),
+ OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO',
+ f"stop {d_name} from host \'test\'"),
+ ]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
+ with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
+
+ _ceph_send_command.side_effect = Exception("myerror")
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ evs = [e.message for e in cephadm_module.events.get_for_daemon(
+ f'rgw.{daemon_id}')]
+
+ assert 'myerror' in ''.join(evs)
+
+ @pytest.mark.parametrize(
+ "action",
+ [
+ 'start',
+ 'stop',
+ 'restart',
+ 'reconfig',
+ 'redeploy'
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
+ [daemon_name] = d_names
+
+ cephadm_module._schedule_daemon_action(daemon_name, action)
+
+ assert cephadm_module.cache.get_scheduled_daemon_action(
+ 'test', daemon_name) == action
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+
+ # Also testing deploying mons without explicit network placement
+ cephadm_module.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mon',
+ 'name': 'public_network',
+ 'value': '127.0.0.0/8'
+ })
+
+ cephadm_module.cache.update_host_devices_networks(
+ 'test',
+ [],
+ {
+ "127.0.0.0/8": [
+ "127.0.0.1"
+ ],
+ }
+ )
+
+ with with_service(cephadm_module, ServiceSpec(service_type='mon'), CephadmOrchestrator.apply_mon, 'test') as d_names:
+ [daemon_name] = d_names
+
+ cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ _run_cephadm.assert_called_with(
+ 'test', 'mon.test', 'deploy', [
+ '--name', 'mon.test',
+ '--meta-json', '{"service_name": "mon", "ports": [], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--reconfig',
+ ],
+ stdin='{"config": "\\n\\n[mon]\\nk=v\\n[mon.test]\\npublic network = 127.0.0.0/8\\n", '
+ + '"keyring": "", "files": {"config": "[mon.test]\\npublic network = 127.0.0.0/8\\n"}}',
+ image='')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_extra_container_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='crash', extra_container_args=['--cpus=2', '--quiet']), CephadmOrchestrator.apply_crash):
+ _run_cephadm.assert_called_with(
+ 'test', 'crash.test', 'deploy', [
+ '--name', 'crash.test',
+ '--meta-json', '{"service_name": "crash", "ports": [], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": ["--cpus=2", "--quiet"]}',
+ '--config-json', '-',
+ '--extra-container-args=--cpus=2',
+ '--extra-container-args=--quiet'
+ ],
+ stdin='{"config": "", "keyring": ""}',
+ image='',
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+ cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
+ 'modules': ['dashboard']
+ })
+
+ with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
+ CephadmServe(cephadm_module)._check_daemons()
+ _mon_cmd.assert_any_call(
+ {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://[1::4]:3000'},
+ None)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1.2.3.4')
+ def test_iscsi_post_actions_with_missing_daemon_in_cache(self, cephadm_module: CephadmOrchestrator):
+ # https://tracker.ceph.com/issues/52866
+ with with_host(cephadm_module, 'test1'):
+ with with_host(cephadm_module, 'test2'):
+ with with_service(cephadm_module, IscsiServiceSpec(service_id='foobar', pool='pool', placement=PlacementSpec(host_pattern='*')), CephadmOrchestrator.apply_iscsi, 'test'):
+
+ CephadmServe(cephadm_module)._apply_all_services()
+ assert len(cephadm_module.cache.get_daemons_by_type('iscsi')) == 2
+
+ # get a deamons from postaction list (ARRGH sets!!)
+ tempset = cephadm_module.requires_post_actions.copy()
+ tempdeamon1 = tempset.pop()
+ tempdeamon2 = tempset.pop()
+
+ # make sure post actions has 2 daemons in it
+ assert len(cephadm_module.requires_post_actions) == 2
+
+ # replicate a host cache that is not in sync when check_daemons is called
+ tempdd1 = cephadm_module.cache.get_daemon(tempdeamon1)
+ tempdd2 = cephadm_module.cache.get_daemon(tempdeamon2)
+ host = 'test1'
+ if 'test1' not in tempdeamon1:
+ host = 'test2'
+ cephadm_module.cache.rm_daemon(host, tempdeamon1)
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+ cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
+ 'modules': ['dashboard']
+ })
+
+ with mock.patch("cephadm.module.IscsiService.config_dashboard") as _cfg_db:
+ CephadmServe(cephadm_module)._check_daemons()
+ _cfg_db.assert_called_once_with([tempdd2])
+
+ # post actions still has the other deamon in it and will run next _check_deamons
+ assert len(cephadm_module.requires_post_actions) == 1
+
+ # post actions was missed for a daemon
+ assert tempdeamon1 in cephadm_module.requires_post_actions
+
+ # put the daemon back in the cache
+ cephadm_module.cache.add_daemon(host, tempdd1)
+
+ _cfg_db.reset_mock()
+ # replicate serve loop running again
+ CephadmServe(cephadm_module)._check_daemons()
+
+ # post actions should have been called again
+ _cfg_db.asset_called()
+
+ # post actions is now empty
+ assert len(cephadm_module.requires_post_actions) == 0
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_mon_add(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
+ assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
+
+ with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
+ wait(cephadm_module, c)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_mgr_update(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
+ assert r
+
+ assert_rm_daemon(cephadm_module, 'mgr.a', 'test')
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
+ def test_find_destroyed_osds(self, _mon_cmd, cephadm_module):
+ dict_out = {
+ "nodes": [
+ {
+ "id": -1,
+ "name": "default",
+ "type": "root",
+ "type_id": 11,
+ "children": [
+ -3
+ ]
+ },
+ {
+ "id": -3,
+ "name": "host1",
+ "type": "host",
+ "type_id": 1,
+ "pool_weights": {},
+ "children": [
+ 0
+ ]
+ },
+ {
+ "id": 0,
+ "device_class": "hdd",
+ "name": "osd.0",
+ "type": "osd",
+ "type_id": 0,
+ "crush_weight": 0.0243988037109375,
+ "depth": 2,
+ "pool_weights": {},
+ "exists": 1,
+ "status": "destroyed",
+ "reweight": 1,
+ "primary_affinity": 1
+ }
+ ],
+ "stray": []
+ }
+ json_out = json.dumps(dict_out)
+ _mon_cmd.return_value = (0, json_out, '')
+ osd_claims = OsdIdClaims(cephadm_module)
+ assert osd_claims.get() == {'host1': ['0']}
+ assert osd_claims.filtered_by_host('host1') == ['0']
+ assert osd_claims.filtered_by_host('host1.domain.com') == ['0']
+
+ @ pytest.mark.parametrize(
+ "ceph_services, cephadm_daemons, strays_expected, metadata",
+ # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ]
+ [
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [],
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ {},
+ ),
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [],
+ {},
+ ),
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [('mds', 'a'), ('osd', '0')],
+ [('mgr', 'x')],
+ {},
+ ),
+ # https://tracker.ceph.com/issues/49573
+ (
+ [('rgw-nfs', '14649')],
+ [],
+ [('nfs', 'foo-rgw.host1')],
+ {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}},
+ ),
+ (
+ [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
+ [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')],
+ [],
+ {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
+ ),
+ (
+ [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
+ [('nfs', 'foo-rgw.host1')],
+ [('nfs', 'foo2.host2')],
+ {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
+ ),
+ ]
+ )
+ def test_check_for_stray_daemons(
+ self,
+ cephadm_module,
+ ceph_services,
+ cephadm_daemons,
+ strays_expected,
+ metadata
+ ):
+ # mock ceph service-map
+ services = []
+ for service in ceph_services:
+ s = {'type': service[0], 'id': service[1]}
+ services.append(s)
+ ls = [{'hostname': 'host1', 'services': services}]
+
+ with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers:
+ list_servers.return_value = ls
+ list_servers.__iter__.side_effect = ls.__iter__
+
+ # populate cephadm daemon cache
+ dm = {}
+ for daemon_type, daemon_id in cephadm_daemons:
+ dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id)
+ dm[dd.name()] = dd
+ cephadm_module.cache.update_host_daemons('host1', dm)
+
+ def get_metadata_mock(svc_type, svc_id, default):
+ return metadata[svc_id]
+
+ with mock.patch.object(cephadm_module, 'get_metadata', new_callable=lambda: get_metadata_mock):
+
+ # test
+ CephadmServe(cephadm_module)._check_for_strays()
+
+ # verify
+ strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON')
+ if not strays:
+ assert len(strays_expected) == 0
+ else:
+ for dt, di in strays_expected:
+ name = '%s.%s' % (dt, di)
+ for detail in strays['detail']:
+ if name in detail:
+ strays['detail'].remove(detail)
+ break
+ assert name in detail
+ assert len(strays['detail']) == 0
+ assert strays['count'] == len(strays_expected)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
+ def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
+ _mon_cmd.return_value = (1, "", "fail_msg")
+ with pytest.raises(OrchestratorError):
+ OsdIdClaims(cephadm_module)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+
+ spec = DriveGroupSpec(
+ service_id='foo',
+ placement=PlacementSpec(
+ host_pattern='*',
+ ),
+ data_devices=DeviceSelection(
+ all=True
+ )
+ )
+
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
+
+ inventory = Devices([
+ Device(
+ '/dev/sdb',
+ available=True
+ ),
+ ])
+
+ cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+ _run_cephadm.return_value = (['{}'], '', 0)
+
+ assert CephadmServe(cephadm_module)._apply_all_services() is False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'batch',
+ '--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
+ env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+
+ spec = DriveGroupSpec(
+ service_id='noncollocated',
+ placement=PlacementSpec(
+ hosts=['test']
+ ),
+ data_devices=DeviceSelection(paths=['/dev/sdb']),
+ db_devices=DeviceSelection(paths=['/dev/sdc']),
+ wal_devices=DeviceSelection(paths=['/dev/sdd'])
+ )
+
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...']
+
+ inventory = Devices([
+ Device('/dev/sdb', available=True),
+ Device('/dev/sdc', available=True),
+ Device('/dev/sdd', available=True)
+ ])
+
+ cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+ _run_cephadm.return_value = (['{}'], '', 0)
+
+ assert CephadmServe(cephadm_module)._apply_all_services() is False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'batch',
+ '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc',
+ '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'],
+ env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
+ error_ok=True, stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.SpecStore.save")
+ def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'},
+ 'service_id': 'foo', 'data_devices': {'all': True}}
+ spec = ServiceSpec.from_json(json_spec)
+ assert isinstance(spec, DriveGroupSpec)
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
+ _save_spec.assert_called_with(spec)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_create_osds(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(dg)
+ out = wait(cephadm_module, c)
+ assert out == "Created no osd(s) on host test; already created?"
+ bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_hsot'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(bad_dg)
+ out = wait(cephadm_module, c)
+ assert "Invalid 'host:device' spec: host not found in cluster" in out
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_create_noncollocated_osd(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(dg)
+ out = wait(cephadm_module, c)
+ assert out == "Created no osd(s) on host test; already created?"
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch('cephadm.services.osd.OSDService._run_ceph_volume_command')
+ @mock.patch('cephadm.services.osd.OSDService.driveselection_to_ceph_volume')
+ @mock.patch('cephadm.services.osd.OsdIdClaims.refresh', lambda _: None)
+ @mock.patch('cephadm.services.osd.OsdIdClaims.get', lambda _: {})
+ def test_limit_not_reached(self, d_to_cv, _run_cv_cmd, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(limit=5, rotational=1),
+ service_id='not_enough')
+
+ disks_found = [
+ '[{"data": "/dev/vdb", "data_size": "50.00 GB", "encryption": "None"}, {"data": "/dev/vdc", "data_size": "50.00 GB", "encryption": "None"}]']
+ d_to_cv.return_value = 'foo'
+ _run_cv_cmd.return_value = (disks_found, '', 0)
+ preview = cephadm_module.osd_service.generate_previews([dg], 'test')
+
+ for osd in preview:
+ assert 'notes' in osd
+ assert osd['notes'] == [
+ 'NOTE: Did not find enough disks matching filter on host test to reach data device limit (Found: 2 | Limit: 5)']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_prepare_drivegroup(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ out = cephadm_module.osd_service.prepare_drivegroup(dg)
+ assert len(out) == 1
+ f1 = out[0]
+ assert f1[0] == 'test'
+ assert isinstance(f1[1], DriveSelection)
+
+ @pytest.mark.parametrize(
+ "devices, preview, exp_commands",
+ [
+ # no preview and only one disk, prepare is used due the hack that is in place.
+ (['/dev/sda'], False, ["lvm batch --no-auto /dev/sda --yes --no-systemd"]),
+ # no preview and multiple disks, uses batch
+ (['/dev/sda', '/dev/sdb'], False,
+ ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"]),
+ # preview and only one disk needs to use batch again to generate the preview
+ (['/dev/sda'], True, ["lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"]),
+ # preview and multiple disks work the same
+ (['/dev/sda', '/dev/sdb'], True,
+ ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"]),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
+ host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+ preview = preview
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
+ assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
+
+ @pytest.mark.parametrize(
+ "devices, preview, exp_commands",
+ [
+ # one data device, no preview
+ (['/dev/sda'], False, ["raw prepare --bluestore --data /dev/sda"]),
+ # multiple data devices, no preview
+ (['/dev/sda', '/dev/sdb'], False,
+ ["raw prepare --bluestore --data /dev/sda", "raw prepare --bluestore --data /dev/sdb"]),
+ # one data device, preview
+ (['/dev/sda'], True, ["raw prepare --bluestore --data /dev/sda --report --format json"]),
+ # multiple data devices, preview
+ (['/dev/sda', '/dev/sdb'], True,
+ ["raw prepare --bluestore --data /dev/sda --report --format json", "raw prepare --bluestore --data /dev/sdb --report --format json"]),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_raw_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(service_id='test.spec', method='raw', placement=PlacementSpec(
+ host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+ preview = preview
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
+ assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='osd.0',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ )
+ ])
+ ))
+ @mock.patch("cephadm.services.osd.OSD.exists", True)
+ @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
+ def test_remove_osds(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
+ wait(cephadm_module, c)
+
+ c = cephadm_module.remove_daemons(['osd.0'])
+ out = wait(cephadm_module, c)
+ assert out == ["Removed osd.0 from host 'test'"]
+
+ cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0,
+ replace=False,
+ force=False,
+ hostname='test',
+ process_started_at=datetime_now(),
+ remove_util=cephadm_module.to_remove_osds.rm_util
+ ))
+ cephadm_module.to_remove_osds.process_removal_queue()
+ assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
+
+ c = cephadm_module.remove_osds_status()
+ out = wait(cephadm_module, c)
+ assert out == []
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_rgw_update(self, cephadm_module):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_service(cephadm_module, RGWSpec(service_id="foo", unmanaged=True)):
+ ps = PlacementSpec(hosts=['host1'], count=1)
+ c = cephadm_module.add_daemon(
+ RGWSpec(service_id="foo", placement=ps))
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed rgw.foo.* on host 'host1'")
+
+ ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
+ r = CephadmServe(cephadm_module)._apply_service(
+ RGWSpec(service_id="foo", placement=ps))
+ assert r
+
+ assert_rm_daemon(cephadm_module, 'rgw.foo', 'host1')
+ assert_rm_daemon(cephadm_module, 'rgw.foo', 'host2')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='rgw.myrgw.myhost.myid',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ )
+ ])
+ ))
+ def test_remove_daemon(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
+ wait(cephadm_module, c)
+ c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
+ out = wait(cephadm_module, c)
+ assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2: # type: DaemonDescription
+ CephadmServe(cephadm_module)._check_for_moved_osds()
+ # both are in status "starting"
+ assert len(cephadm_module.cache.get_daemons()) == 2
+
+ dd1.status = DaemonDescriptionStatus.running
+ dd2.status = DaemonDescriptionStatus.error
+ cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1})
+ cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2})
+ CephadmServe(cephadm_module)._check_for_moved_osds()
+ assert len(cephadm_module.cache.get_daemons()) == 1
+
+ assert cephadm_module.events.get_for_daemon('osd.1') == [
+ OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO',
+ "Deployed osd.1 on host 'host1'"),
+ OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO',
+ "Deployed osd.1 on host 'host2'"),
+ OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO',
+ "Removed duplicated daemon on host 'host2'"),
+ ]
+
+ with pytest.raises(AssertionError):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'osd.1',
+ })
+
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'osd.1',
+ })
+
+ @pytest.mark.parametrize(
+ "spec",
+ [
+ ServiceSpec('crash'),
+ ServiceSpec('prometheus'),
+ ServiceSpec('grafana'),
+ ServiceSpec('node-exporter'),
+ ServiceSpec('alertmanager'),
+ ServiceSpec('rbd-mirror'),
+ ServiceSpec('cephfs-mirror'),
+ ServiceSpec('mds', service_id='fsname'),
+ RGWSpec(rgw_realm='realm', rgw_zone='zone'),
+ RGWSpec(service_id="foo"),
+ ServiceSpec('cephadm-exporter'),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_add(self, spec: ServiceSpec, cephadm_module):
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, unmanaged_spec):
+ with with_daemon(cephadm_module, spec, 'test'):
+ pass
+
+ @pytest.mark.parametrize(
+ "entity,success,spec",
+ [
+ ('mgr.x', True, ServiceSpec(
+ service_type='mgr',
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ('client.rgw.x', True, ServiceSpec(
+ service_type='rgw',
+ service_id='id',
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ('client.nfs.x', True, ServiceSpec(
+ service_type='nfs',
+ service_id='id',
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ('mon.', False, ServiceSpec(
+ service_type='mon',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec('test', '127.0.0.0/24', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
+ def test_daemon_add_fail(self, _run_cephadm, entity, success, spec, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec):
+ _run_cephadm.side_effect = OrchestratorError('fail')
+ with pytest.raises(OrchestratorError):
+ wait(cephadm_module, cephadm_module.add_daemon(spec))
+ if success:
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+ else:
+ with pytest.raises(AssertionError):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+ assert cephadm_module.events.get_for_service(spec.service_name()) == [
+ OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'INFO',
+ "service was created"),
+ OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'ERROR',
+ "fail"),
+ ]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_daemon_place_fail_health_warning(self, _run_cephadm, cephadm_module):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+ _run_cephadm.side_effect = OrchestratorError('fail')
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
+ assert not r
+ assert cephadm_module.health_checks.get('CEPHADM_DAEMON_PLACE_FAIL') is not None
+ assert cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['count'] == 1
+ assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['summary']
+ assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['detail']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._apply_all_services()
+ ps = PlacementSpec(hosts=['fail'], count=1)
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
+ assert not r
+ assert cephadm_module.apply_spec_fails
+ assert cephadm_module.health_checks.get('CEPHADM_APPLY_SPEC_FAIL') is not None
+ assert cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['count'] == 1
+ assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['summary']
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ get_foreign_ceph_option.side_effect = KeyError
+ CephadmServe(cephadm_module)._apply_service_config(
+ ServiceSpec('mgr', placement=ps, config={'test': 'foo'}))
+ assert cephadm_module.health_checks.get('CEPHADM_INVALID_CONFIG_OPTION') is not None
+ assert cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['count'] == 1
+ assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks[
+ 'CEPHADM_INVALID_CONFIG_OPTION']['summary']
+ assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[
+ 'CEPHADM_INVALID_CONFIG_OPTION']['detail']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
+ def test_nfs(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = NFSServiceSpec(
+ service_id='name',
+ placement=ps)
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_service(cephadm_module, unmanaged_spec):
+ c = cephadm_module.add_daemon(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed nfs.name.* on host 'test'")
+
+ assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("subprocess.run", None)
+ @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
+ def test_iscsi(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = IscsiServiceSpec(
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password',
+ placement=ps)
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_service(cephadm_module, unmanaged_spec):
+
+ c = cephadm_module.add_daemon(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed iscsi.name.* on host 'test'")
+
+ assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
+
+ @pytest.mark.parametrize(
+ "on_bool",
+ [
+ True,
+ False
+ ]
+ )
+ @pytest.mark.parametrize(
+ "fault_ident",
+ [
+ 'fault',
+ 'ident'
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')])
+ on_off = 'on' if on_bool else 'off'
+ assert wait(cephadm_module, c) == [f'Set {fault_ident} light for test: {on_off}']
+ _run_cephadm.assert_called_with('test', 'osd', 'shell', [
+ '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_store('blink_device_light_cmd', 'echo hello')
+ c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')])
+ assert wait(cephadm_module, c) == ['Set ident light for test: on']
+ _run_cephadm.assert_called_with('test', 'osd', 'shell', [
+ '--', 'echo', 'hello'], error_ok=True)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'mgr0'):
+ cephadm_module.set_store('mgr0/blink_device_light_cmd',
+ 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'')
+ c = cephadm_module.blink_device_light(
+ 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')])
+ assert wait(cephadm_module, c) == [
+ 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on']
+ _run_cephadm.assert_called_with('mgr0', 'osd', 'shell', [
+ '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784'
+ ], error_ok=True)
+
+ @pytest.mark.parametrize(
+ "spec, meth",
+ [
+ (ServiceSpec('mgr'), CephadmOrchestrator.apply_mgr),
+ (ServiceSpec('crash'), CephadmOrchestrator.apply_crash),
+ (ServiceSpec('prometheus'), CephadmOrchestrator.apply_prometheus),
+ (ServiceSpec('grafana'), CephadmOrchestrator.apply_grafana),
+ (ServiceSpec('node-exporter'), CephadmOrchestrator.apply_node_exporter),
+ (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager),
+ (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror),
+ (ServiceSpec('cephfs-mirror'), CephadmOrchestrator.apply_rbd_mirror),
+ (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds),
+ (ServiceSpec(
+ 'mds', service_id='fsname',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec(
+ hostname='test',
+ name='fsname',
+ network=''
+ )]
+ )
+ ), CephadmOrchestrator.apply_mds),
+ (RGWSpec(service_id='foo'), CephadmOrchestrator.apply_rgw),
+ (RGWSpec(
+ service_id='bar',
+ rgw_realm='realm', rgw_zone='zone',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec(
+ hostname='test',
+ name='bar',
+ network=''
+ )]
+ )
+ ), CephadmOrchestrator.apply_rgw),
+ (NFSServiceSpec(
+ service_id='name',
+ ), CephadmOrchestrator.apply_nfs),
+ (IscsiServiceSpec(
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password'
+ ), CephadmOrchestrator.apply_iscsi),
+ (CustomContainerSpec(
+ service_id='hello-world',
+ image='docker.io/library/hello-world:latest',
+ uid=65534,
+ gid=65534,
+ dirs=['foo/bar'],
+ files={
+ 'foo/bar/xyz.conf': 'aaa\nbbb'
+ },
+ bind_mounts=[[
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]],
+ volume_mounts={
+ 'foo/bar': '/foo/bar:Z'
+ },
+ args=['--no-healthcheck'],
+ envs=['SECRET=password'],
+ ports=[8080, 8443]
+ ), CephadmOrchestrator.apply_container),
+ (ServiceSpec('cephadm-exporter'), CephadmOrchestrator.apply_cephadm_exporter),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+ @mock.patch("subprocess.run", None)
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
+ @mock.patch("subprocess.run", mock.MagicMock())
+ def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec, meth, 'test'):
+ pass
+
+ @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
+ spec = MDSSpec('mds', service_id='fsname', config={'test': 'foo'})
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec, host='test'):
+ ret, out, err = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': spec.service_name(),
+ 'key': 'mds_join_fs',
+ })
+ assert out == 'fsname'
+ ret, out, err = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': spec.service_name(),
+ 'key': 'mds_join_fs',
+ })
+ assert not out
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
+ def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
+ spec = MDSSpec(
+ 'mds',
+ service_id='fsname',
+ placement=PlacementSpec(hosts=['host1', 'host2']),
+ config={'test': 'foo'}
+ )
+ with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'):
+ c = cephadm_module.apply_mds(spec)
+ out = wait(cephadm_module, c)
+ match_glob(out, "Scheduled mds.fsname update...")
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ [daemon] = cephadm_module.cache.daemons['host1'].keys()
+
+ spec.placement.set_hosts(['host2'])
+
+ ok_to_stop.side_effect = False
+
+ c = cephadm_module.apply_mds(spec)
+ out = wait(cephadm_module, c)
+ match_glob(out, "Scheduled mds.fsname update...")
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ ok_to_stop.assert_called_with([daemon[4:]], force=True)
+
+ assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop
+ assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("remoto.process.check")
+ def test_offline(self, _check, _get_connection, cephadm_module):
+ _check.return_value = '{}', '', 0
+ _get_connection.return_value = mock.Mock(), mock.Mock()
+ with with_host(cephadm_module, 'test'):
+ _get_connection.side_effect = HostNotFound
+ code, out, err = cephadm_module.check_host('test')
+ assert out == ''
+ assert "Host 'test' not found" in err
+
+ out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
+ assert out == HostSpec('test', '1::4', status='Offline').to_json()
+
+ _get_connection.side_effect = None
+ assert CephadmServe(cephadm_module)._check_host('test') is None
+ out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
+ assert out == HostSpec('test', '1::4').to_json()
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_dont_touch_offline_or_maintenance_host_daemons(self, cephadm_module):
+ # test daemons on offline/maint hosts not removed when applying specs
+ # test daemons not added to hosts in maint/offline state
+ with with_host(cephadm_module, 'test1'):
+ with with_host(cephadm_module, 'test2'):
+ with with_host(cephadm_module, 'test3'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*'))):
+ # should get a mgr on all 3 hosts
+ # CephadmServe(cephadm_module)._apply_all_services()
+ assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3
+
+ # put one host in offline state and one host in maintenance state
+ cephadm_module.offline_hosts = {'test2'}
+ cephadm_module.inventory._inventory['test3']['status'] = 'maintenance'
+ cephadm_module.inventory.save()
+
+ # being in offline/maint mode should disqualify hosts from being
+ # candidates for scheduling
+ candidates = [
+ h.hostname for h in cephadm_module._schedulable_hosts()]
+ assert 'test2' in candidates
+ assert 'test3' in candidates
+
+ unreachable = [h.hostname for h in cephadm_module._unreachable_hosts()]
+ assert 'test2' in unreachable
+ assert 'test3' in unreachable
+
+ with with_service(cephadm_module, ServiceSpec('crash', placement=PlacementSpec(host_pattern='*'))):
+ # re-apply services. No mgr should be removed from maint/offline hosts
+ # crash daemon should only be on host not in maint/offline mode
+ CephadmServe(cephadm_module)._apply_all_services()
+ assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3
+ assert len(cephadm_module.cache.get_daemons_by_type('crash')) == 1
+
+ cephadm_module.offline_hosts = {}
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_enter_success(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.return_value = [''], ['something\nsuccess - systemd target xxx disabled'], 0
+ _host_ok.return_value = 0, 'it is okay'
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname))
+ # should not raise an error
+ retval = cephadm_module.enter_host_maintenance(hostname)
+ assert retval.result_str().startswith('Daemons for Ceph cluster')
+ assert not retval.exception_str
+ assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_enter_failure(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.return_value = [''], ['something\nfailed - disable the target'], 0
+ _host_ok.return_value = 0, 'it is okay'
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname))
+
+ with pytest.raises(OrchestratorError, match='Failed to place host1 into maintenance for cluster fsid'):
+ cephadm_module.enter_host_maintenance(hostname)
+
+ assert not cephadm_module.inventory._inventory[hostname]['status']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_exit_success(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.return_value = [''], [
+ 'something\nsuccess - systemd target xxx enabled and started'], 0
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance'))
+ # should not raise an error
+ retval = cephadm_module.exit_host_maintenance(hostname)
+ assert retval.result_str().startswith('Ceph cluster')
+ assert not retval.exception_str
+ assert not cephadm_module.inventory._inventory[hostname]['status']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_exit_failure(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.return_value = [''], ['something\nfailed - unable to enable the target'], 0
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance'))
+
+ with pytest.raises(OrchestratorError, match='Failed to exit maintenance state for host host1, cluster fsid'):
+ cephadm_module.exit_host_maintenance(hostname)
+
+ assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
+
+ def test_stale_connections(self, cephadm_module):
+ class Connection(object):
+ """
+ A mocked connection class that only allows the use of the connection
+ once. If you attempt to use it again via a _check, it'll explode (go
+ boom!).
+
+ The old code triggers the boom. The new code checks the has_connection
+ and will recreate the connection.
+ """
+ fuse = False
+
+ @ staticmethod
+ def has_connection():
+ return False
+
+ def import_module(self, *args, **kargs):
+ return mock.Mock()
+
+ @ staticmethod
+ def exit():
+ pass
+
+ def _check(conn, *args, **kargs):
+ if conn.fuse:
+ raise Exception("boom: connection is dead")
+ else:
+ conn.fuse = True
+ return '{}', [], 0
+ with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
+ with mock.patch("remoto.process.check", _check):
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ code, out, err = cephadm_module.check_host('test')
+ # First should succeed.
+ assert err == ''
+
+ # On second it should attempt to reuse the connection, where the
+ # connection is "down" so will recreate the connection. The old
+ # code will blow up here triggering the BOOM!
+ code, out, err = cephadm_module.check_host('test')
+ assert err == ''
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("remoto.process.check")
+ @mock.patch("cephadm.module.CephadmServe._write_remote_file")
+ def test_etc_ceph(self, _write_file, _check, _get_connection, cephadm_module):
+ _get_connection.return_value = mock.Mock(), mock.Mock()
+ _check.return_value = '{}', '', 0
+ _write_file.return_value = None
+
+ assert cephadm_module.manage_etc_ceph_ceph_conf is False
+
+ with with_host(cephadm_module, 'test'):
+ assert '/etc/ceph/ceph.conf' not in cephadm_module.cache.get_host_client_files('test')
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True)
+ cephadm_module.config_notify()
+ assert cephadm_module.manage_etc_ceph_ceph_conf is True
+
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ _write_file.assert_called_with('test', '/etc/ceph/ceph.conf', b'',
+ 0o644, 0, 0)
+
+ assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test')
+
+ # set extra config and expect that we deploy another ceph.conf
+ cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ _write_file.assert_called_with('test', '/etc/ceph/ceph.conf',
+ b'\n\n[mon]\nk=v\n', 0o644, 0, 0)
+
+ # reload
+ cephadm_module.cache.last_client_files = {}
+ cephadm_module.cache.load()
+
+ assert '/etc/ceph/ceph.conf' in cephadm_module.cache.get_host_client_files('test')
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ before_digest = cephadm_module.cache.get_host_client_files('test')[
+ '/etc/ceph/ceph.conf'][0]
+ cephadm_module._set_extra_ceph_conf('[mon]\nk2=v2')
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ after_digest = cephadm_module.cache.get_host_client_files('test')[
+ '/etc/ceph/ceph.conf'][0]
+ assert before_digest != after_digest
+
+ def test_etc_ceph_init(self):
+ with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
+ assert m.manage_etc_ceph_ceph_conf is True
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ def check_registry_credentials(url, username, password):
+ assert json.loads(cephadm_module.get_store('registry_credentials')) == {
+ 'url': url, 'username': username, 'password': password}
+
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'test'):
+ # test successful login with valid args
+ code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password')
+ assert out == 'registry login scheduled'
+ assert err == ''
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test bad login attempt with invalid args
+ code, out, err = cephadm_module.registry_login('bad-args')
+ assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> "
+ "or -i <login credentials json file>")
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test bad login using invalid json file
+ code, out, err = cephadm_module.registry_login(
+ None, None, None, '{"bad-json": "bad-json"}')
+ assert err == ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test good login using valid json file
+ good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
+ " \"password\": \"" + "json-pass" + "\"}")
+ code, out, err = cephadm_module.registry_login(None, None, None, good_json)
+ assert out == 'registry login scheduled'
+ assert err == ''
+ check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ # test bad login where args are valid but login command fails
+ _run_cephadm.return_value = '{}', 'error', 1
+ code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
+ assert err == 'Host test failed to login to fail-url as fail-user with given password'
+ check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digests': ['image@repo_digest'],
+ })))
+ @pytest.mark.parametrize("use_repo_digest",
+ [
+ False,
+ True
+ ])
+ def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
+ cephadm_module.use_repo_digest = use_repo_digest
+
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ cephadm_module.set_container_image('global', 'image')
+
+ if use_repo_digest:
+
+ CephadmServe(cephadm_module).convert_tags_to_repo_digest()
+
+ _, image, _ = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'global',
+ 'key': 'container_image',
+ })
+ if use_repo_digest:
+ assert image == 'image@repo_digest'
+ else:
+ assert image == 'image'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch
+Traceback (most recent call last):
+ File "<stdin>", line 6112, in <module>
+ File "<stdin>", line 1299, in _infer_fsid
+ File "<stdin>", line 1382, in _infer_image
+ File "<stdin>", line 3612, in command_ceph_volume
+ File "<stdin>", line 1061, in call_throws"""
+
+ with with_host(cephadm_module, 'test'):
+ _run_cephadm.reset_mock()
+ _run_cephadm.side_effect = OrchestratorError(error_message)
+
+ s = CephadmServe(cephadm_module)._refresh_host_devices('test')
+ assert s == 'host test `cephadm ceph-volume` failed: ' + error_message
+
+ assert _run_cephadm.mock_calls == [
+ mock.call('test', 'osd', 'ceph-volume',
+ ['--', 'inventory', '--format=json-pretty', '--filter-for-batch'], image='',
+ no_fsid=False),
+ mock.call('test', 'osd', 'ceph-volume',
+ ['--', 'inventory', '--format=json-pretty'], image='',
+ no_fsid=False),
+ ]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_activate_datadevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1):
+ pass
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_activate_datadevice_fail(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {
+ 'osd': 1,
+ 'up_from': 0,
+ 'uuid': 'uuid'
+ }
+ ]
+ })
+
+ ceph_volume_lvm_list = {
+ '1': [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }]
+ }
+ _run_cephadm.reset_mock(return_value=True)
+
+ def _r_c(*args, **kwargs):
+ if 'ceph-volume' in args:
+ return (json.dumps(ceph_volume_lvm_list), '', 0)
+ else:
+ assert 'deploy' in args
+ raise OrchestratorError("let's fail somehow")
+ _run_cephadm.side_effect = _r_c
+ assert cephadm_module._osd_activate(
+ ['test']).stderr == "let's fail somehow"
+ with pytest.raises(AssertionError):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'osd.1',
+ })
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_activate_datadevice_dbdevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+
+ def _ceph_volume_list(s, host, entity, cmd, **kwargs):
+ logging.info(f'ceph-volume cmd: {cmd}')
+ if 'raw' in cmd:
+ return json.dumps({
+ "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": {
+ "ceph_fsid": "64c84f19-fe1d-452a-a731-ab19dc144aa8",
+ "device": "/dev/loop0",
+ "osd_id": 21,
+ "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5",
+ "type": "bluestore"
+ },
+ }), '', 0
+ if 'lvm' in cmd:
+ return json.dumps({
+ '1': [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }, {
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'db'
+ }]
+ }), '', 0
+ return '{}', '', 0
+
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1, ceph_volume_lvm_list=_ceph_volume_list):
+ pass
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_count(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ dg = DriveGroupSpec(service_id='', data_devices=DeviceSelection(all=True))
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ with with_service(cephadm_module, dg, host='test'):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1):
+ assert wait(cephadm_module, cephadm_module.describe_service())[0].size == 1
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_host_rm_last_admin(self, cephadm_module: CephadmOrchestrator):
+ with pytest.raises(OrchestratorError):
+ with with_host(cephadm_module, 'test', refresh_hosts=False, rm_with_force=False):
+ cephadm_module.inventory.add_label('test', '_admin')
+ pass
+ assert False
+ with with_host(cephadm_module, 'test1', refresh_hosts=False, rm_with_force=True):
+ with with_host(cephadm_module, 'test2', refresh_hosts=False, rm_with_force=False):
+ cephadm_module.inventory.add_label('test2', '_admin')
diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py
new file mode 100644
index 000000000..327c12d2a
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_completion.py
@@ -0,0 +1,40 @@
+import pytest
+
+from ..module import forall_hosts
+
+
+class TestCompletion(object):
+
+ @pytest.mark.parametrize("input,expected", [
+ ([], []),
+ ([1], ["(1,)"]),
+ (["hallo"], ["('hallo',)"]),
+ ("hi", ["('h',)", "('i',)"]),
+ (list(range(5)), [str((x, )) for x in range(5)]),
+ ([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
+ ])
+ def test_async_map(self, input, expected, cephadm_module):
+ @forall_hosts
+ def run_forall(*args):
+ return str(args)
+ assert run_forall(input) == expected
+
+ @pytest.mark.parametrize("input,expected", [
+ ([], []),
+ ([1], ["(1,)"]),
+ (["hallo"], ["('hallo',)"]),
+ ("hi", ["('h',)", "('i',)"]),
+ (list(range(5)), [str((x, )) for x in range(5)]),
+ ([(1, 2), (3, 4)], ["(1, 2)", "(3, 4)"]),
+ ])
+ def test_async_map_self(self, input, expected, cephadm_module):
+ class Run(object):
+ def __init__(self):
+ self.attr = 1
+
+ @forall_hosts
+ def run_forall(self, *args):
+ assert self.attr == 1
+ return str(args)
+
+ assert Run().run_forall(input) == expected
diff --git a/src/pybind/mgr/cephadm/tests/test_configchecks.py b/src/pybind/mgr/cephadm/tests/test_configchecks.py
new file mode 100644
index 000000000..3cae0a27d
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_configchecks.py
@@ -0,0 +1,668 @@
+import copy
+import json
+import logging
+import ipaddress
+import pytest
+import uuid
+
+from time import time as now
+
+from ..configchecks import CephadmConfigChecks
+from ..inventory import HostCache
+from ..upgrade import CephadmUpgrade, UpgradeState
+from orchestrator import DaemonDescription
+
+from typing import List, Dict, Any, Optional
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+
+host_sample = {
+ "arch": "x86_64",
+ "bios_date": "04/01/2014",
+ "bios_version": "F2",
+ "cpu_cores": 16,
+ "cpu_count": 2,
+ "cpu_load": {
+ "15min": 0.0,
+ "1min": 0.01,
+ "5min": 0.01
+ },
+ "cpu_model": "Intel® Xeon® Processor E5-2698 v3",
+ "cpu_threads": 64,
+ "flash_capacity": "4.0TB",
+ "flash_capacity_bytes": 4000797868032,
+ "flash_count": 2,
+ "flash_list": [
+ {
+ "description": "ATA CT2000MX500SSD1 (2.0TB)",
+ "dev_name": "sda",
+ "disk_size_bytes": 2000398934016,
+ "model": "CT2000MX500SSD1",
+ "rev": "023",
+ "vendor": "ATA",
+ "wwid": "t10.ATA CT2000MX500SSD1 193023156DE0"
+ },
+ {
+ "description": "ATA CT2000MX500SSD1 (2.0TB)",
+ "dev_name": "sdb",
+ "disk_size_bytes": 2000398934016,
+ "model": "CT2000MX500SSD1",
+ "rev": "023",
+ "vendor": "ATA",
+ "wwid": "t10.ATA CT2000MX500SSD1 193023156DE0"
+ },
+ ],
+ "hdd_capacity": "16.0TB",
+ "hdd_capacity_bytes": 16003148120064,
+ "hdd_count": 4,
+ "hdd_list": [
+ {
+ "description": "ST4000VN008-2DR1 (4.0TB)",
+ "dev_name": "sdc",
+ "disk_size_bytes": 4000787030016,
+ "model": "ST4000VN008-2DR1",
+ "rev": "SC60",
+ "vendor": "ATA",
+ "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ"
+ },
+ {
+ "description": "ST4000VN008-2DR1 (4.0TB)",
+ "dev_name": "sdd",
+ "disk_size_bytes": 4000787030016,
+ "model": "ST4000VN008-2DR1",
+ "rev": "SC60",
+ "vendor": "ATA",
+ "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ"
+ },
+ {
+ "description": "ST4000VN008-2DR1 (4.0TB)",
+ "dev_name": "sde",
+ "disk_size_bytes": 4000787030016,
+ "model": "ST4000VN008-2DR1",
+ "rev": "SC60",
+ "vendor": "ATA",
+ "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ"
+ },
+ {
+ "description": "ST4000VN008-2DR1 (4.0TB)",
+ "dev_name": "sdf",
+ "disk_size_bytes": 4000787030016,
+ "model": "ST4000VN008-2DR1",
+ "rev": "SC60",
+ "vendor": "ATA",
+ "wwid": "t10.ATA ST4000VN008-2DR1 Z340EPBJ"
+ },
+ ],
+ "hostname": "dummy",
+ "interfaces": {
+ "eth0": {
+ "driver": "e1000e",
+ "iftype": "physical",
+ "ipv4_address": "10.7.17.1/24",
+ "ipv6_address": "fe80::215:17ff:feab:50e2/64",
+ "lower_devs_list": [],
+ "mtu": 9000,
+ "nic_type": "ethernet",
+ "operstate": "up",
+ "speed": 1000,
+ "upper_devs_list": [],
+ },
+ "eth1": {
+ "driver": "e1000e",
+ "iftype": "physical",
+ "ipv4_address": "10.7.18.1/24",
+ "ipv6_address": "fe80::215:17ff:feab:50e2/64",
+ "lower_devs_list": [],
+ "mtu": 9000,
+ "nic_type": "ethernet",
+ "operstate": "up",
+ "speed": 1000,
+ "upper_devs_list": [],
+ },
+ "eth2": {
+ "driver": "r8169",
+ "iftype": "physical",
+ "ipv4_address": "10.7.19.1/24",
+ "ipv6_address": "fe80::76d4:35ff:fe58:9a79/64",
+ "lower_devs_list": [],
+ "mtu": 1500,
+ "nic_type": "ethernet",
+ "operstate": "up",
+ "speed": 1000,
+ "upper_devs_list": []
+ },
+ },
+ "kernel": "4.18.0-240.10.1.el8_3.x86_64",
+ "kernel_parameters": {
+ "net.ipv4.ip_nonlocal_bind": "0",
+ },
+ "kernel_security": {
+ "SELINUX": "enforcing",
+ "SELINUXTYPE": "targeted",
+ "description": "SELinux: Enabled(enforcing, targeted)",
+ "type": "SELinux"
+ },
+ "memory_available_kb": 19489212,
+ "memory_free_kb": 245164,
+ "memory_total_kb": 32900916,
+ "model": "StorageHeavy",
+ "nic_count": 3,
+ "operating_system": "Red Hat Enterprise Linux 8.3 (Ootpa)",
+ "subscribed": "Yes",
+ "system_uptime": 777600.0,
+ "timestamp": now(),
+ "vendor": "Ceph Servers Inc",
+}
+
+
+def role_list(n: int) -> List[str]:
+ if n == 1:
+ return ['mon', 'mgr', 'osd']
+ if n in [2, 3]:
+ return ['mon', 'mds', 'osd']
+
+ return ['osd']
+
+
+def generate_testdata(count: int = 10, public_network: str = '10.7.17.0/24', cluster_network: str = '10.7.18.0/24'):
+ # public network = eth0, cluster_network = eth1
+ assert count > 3
+ assert public_network
+ num_disks = host_sample['hdd_count']
+ hosts = {}
+ daemons = {}
+ daemon_to_host = {}
+ osd_num = 0
+ public_netmask = public_network.split('/')[1]
+ cluster_ip_list = []
+ cluster_netmask = ''
+
+ public_ip_list = [str(i) for i in list(ipaddress.ip_network(public_network).hosts())]
+ if cluster_network:
+ cluster_ip_list = [str(i) for i in list(ipaddress.ip_network(cluster_network).hosts())]
+ cluster_netmask = cluster_network.split('/')[1]
+
+ for n in range(1, count + 1, 1):
+
+ new_host = copy.deepcopy(host_sample)
+ hostname = f"node-{n}.ceph.com"
+
+ new_host['hostname'] = hostname
+ new_host['interfaces']['eth0']['ipv4_address'] = f"{public_ip_list.pop(0)}/{public_netmask}"
+ if cluster_ip_list:
+ new_host['interfaces']['eth1']['ipv4_address'] = f"{cluster_ip_list.pop(0)}/{cluster_netmask}"
+ else:
+ new_host['interfaces']['eth1']['ipv4_address'] = ''
+
+ hosts[hostname] = new_host
+ daemons[hostname] = {}
+ for r in role_list(n):
+ name = ''
+ if r == 'osd':
+ for n in range(num_disks):
+ osd = DaemonDescription(
+ hostname=hostname, daemon_type='osd', daemon_id=osd_num)
+ name = f"osd.{osd_num}"
+ daemons[hostname][name] = osd
+ daemon_to_host[name] = hostname
+ osd_num += 1
+ else:
+ name = f"{r}.{hostname}"
+ daemons[hostname][name] = DaemonDescription(
+ hostname=hostname, daemon_type=r, daemon_id=hostname)
+ daemon_to_host[name] = hostname
+
+ logger.debug(f"daemon to host lookup - {json.dumps(daemon_to_host)}")
+ return hosts, daemons, daemon_to_host
+
+
+@pytest.fixture()
+def mgr():
+ """Provide a fake ceph mgr object preloaded with a configuration"""
+ mgr = FakeMgr()
+ mgr.cache.facts, mgr.cache.daemons, mgr.daemon_to_host = \
+ generate_testdata(public_network='10.9.64.0/24', cluster_network='')
+ mgr.module_option.update({
+ "config_checks_enabled": True,
+ })
+ yield mgr
+
+
+class FakeMgr:
+
+ def __init__(self):
+ self.datastore = {}
+ self.module_option = {}
+ self.health_checks = {}
+ self.default_version = 'quincy'
+ self.version_overrides = {}
+ self.daemon_to_host = {}
+
+ self.cache = HostCache(self)
+ self.upgrade = CephadmUpgrade(self)
+
+ def set_health_checks(self, checks: dict):
+ return
+
+ def get_module_option(self, keyname: str) -> Optional[str]:
+ return self.module_option.get(keyname, None)
+
+ def set_module_option(self, keyname: str, value: str) -> None:
+ return None
+
+ def get_store(self, keyname: str, default=None) -> Optional[str]:
+ return self.datastore.get(keyname, None)
+
+ def set_store(self, keyname: str, value: str) -> None:
+ self.datastore[keyname] = value
+ return None
+
+ def _ceph_get_server(self) -> None:
+ pass
+
+ def get_metadata(self, daemon_type: str, daemon_id: str) -> Dict[str, Any]:
+ key = f"{daemon_type}.{daemon_id}"
+ if key in self.version_overrides:
+ logger.debug(f"override applied for {key}")
+ version_str = self.version_overrides[key]
+ else:
+ version_str = self.default_version
+
+ return {"ceph_release": version_str, "hostname": self.daemon_to_host[key]}
+
+ def list_servers(self) -> List[Dict[str, List[Dict[str, str]]]]:
+ num_disks = host_sample['hdd_count']
+ osd_num = 0
+ service_map = []
+
+ for hostname in self.cache.facts:
+
+ host_num = int(hostname.split('.')[0].split('-')[1])
+ svc_list = []
+ for r in role_list(host_num):
+ if r == 'osd':
+ for _n in range(num_disks):
+ svc_list.append({
+ "type": "osd",
+ "id": osd_num,
+ })
+ osd_num += 1
+ else:
+ svc_list.append({
+ "type": r,
+ "id": hostname,
+ })
+
+ service_map.append({"services": svc_list})
+ logger.debug(f"services map - {json.dumps(service_map)}")
+ return service_map
+
+ def use_repo_digest(self) -> None:
+ return None
+
+
+class TestConfigCheck:
+
+ def test_to_json(self, mgr):
+ checker = CephadmConfigChecks(mgr)
+ out = checker.to_json()
+ assert out
+ assert len(out) == len(checker.health_checks)
+
+ def test_lookup_check(self, mgr):
+ checker = CephadmConfigChecks(mgr)
+ check = checker.lookup_check('osd_mtu_size')
+ logger.debug(json.dumps(check.to_json()))
+ assert check
+ assert check.healthcheck_name == "CEPHADM_CHECK_MTU"
+
+ def test_old_checks_removed(self, mgr):
+ mgr.datastore.update({
+ "config_checks": '{"bogus_one": "enabled", "bogus_two": "enabled", '
+ '"kernel_security": "enabled", "public_network": "enabled", '
+ '"kernel_version": "enabled", "network_missing": "enabled", '
+ '"osd_mtu_size": "enabled", "osd_linkspeed": "enabled", '
+ '"os_subscription": "enabled", "ceph_release": "enabled"}'
+ })
+ checker = CephadmConfigChecks(mgr)
+ raw = mgr.get_store('config_checks')
+ checks = json.loads(raw)
+ assert "bogus_one" not in checks
+ assert "bogus_two" not in checks
+ assert len(checks) == len(checker.health_checks)
+
+ def test_new_checks(self, mgr):
+ mgr.datastore.update({
+ "config_checks": '{"kernel_security": "enabled", "public_network": "enabled", '
+ '"osd_mtu_size": "enabled", "osd_linkspeed": "enabled"}'
+ })
+ checker = CephadmConfigChecks(mgr)
+ raw = mgr.get_store('config_checks')
+ checks = json.loads(raw)
+ assert len(checks) == len(checker.health_checks)
+
+ def test_no_issues(self, mgr):
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+ checker.run_checks()
+
+ assert not mgr.health_checks
+
+ def test_no_public_network(self, mgr):
+ bad_node = mgr.cache.facts['node-1.ceph.com']
+ bad_node['interfaces']['eth0']['ipv4_address'] = "192.168.1.20/24"
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+ checker.run_checks()
+ logger.debug(mgr.health_checks)
+ assert len(mgr.health_checks) == 1
+ assert 'CEPHADM_CHECK_PUBLIC_MEMBERSHIP' in mgr.health_checks
+ assert mgr.health_checks['CEPHADM_CHECK_PUBLIC_MEMBERSHIP']['detail'][0] == \
+ 'node-1.ceph.com does not have an interface on any public network'
+
+ def test_missing_networks(self, mgr):
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.66.0/24']
+ checker.run_checks()
+
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert len(mgr.health_checks) == 1
+ assert 'CEPHADM_CHECK_NETWORK_MISSING' in mgr.health_checks
+ assert mgr.health_checks['CEPHADM_CHECK_NETWORK_MISSING']['detail'][0] == \
+ "10.9.66.0/24 not found on any host in the cluster"
+
+ def test_bad_mtu_single(self, mgr):
+
+ bad_node = mgr.cache.facts['node-1.ceph.com']
+ bad_node['interfaces']['eth0']['mtu'] = 1500
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert "CEPHADM_CHECK_MTU" in mgr.health_checks and len(mgr.health_checks) == 1
+ assert mgr.health_checks['CEPHADM_CHECK_MTU']['detail'][0] == \
+ 'host node-1.ceph.com(eth0) is using MTU 1500 on 10.9.64.0/24, NICs on other hosts use 9000'
+
+ def test_bad_mtu_multiple(self, mgr):
+
+ for n in [1, 5]:
+ bad_node = mgr.cache.facts[f'node-{n}.ceph.com']
+ bad_node['interfaces']['eth0']['mtu'] = 1500
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert "CEPHADM_CHECK_MTU" in mgr.health_checks and len(mgr.health_checks) == 1
+ assert mgr.health_checks['CEPHADM_CHECK_MTU']['count'] == 2
+
+ def test_bad_linkspeed_single(self, mgr):
+
+ bad_node = mgr.cache.facts['node-1.ceph.com']
+ bad_node['interfaces']['eth0']['speed'] = 100
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert mgr.health_checks
+ assert "CEPHADM_CHECK_LINKSPEED" in mgr.health_checks and len(mgr.health_checks) == 1
+ assert mgr.health_checks['CEPHADM_CHECK_LINKSPEED']['detail'][0] == \
+ 'host node-1.ceph.com(eth0) has linkspeed of 100 on 10.9.64.0/24, NICs on other hosts use 1000'
+
+ def test_super_linkspeed_single(self, mgr):
+
+ bad_node = mgr.cache.facts['node-1.ceph.com']
+ bad_node['interfaces']['eth0']['speed'] = 10000
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert not mgr.health_checks
+
+ def test_release_mismatch_single(self, mgr):
+
+ mgr.version_overrides = {
+ "osd.1": "pacific",
+ }
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ assert mgr.health_checks
+ assert "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and len(mgr.health_checks) == 1
+ assert mgr.health_checks['CEPHADM_CHECK_CEPH_RELEASE']['detail'][0] == \
+ 'osd.1 is running pacific (majority of cluster is using quincy)'
+
+ def test_release_mismatch_multi(self, mgr):
+
+ mgr.version_overrides = {
+ "osd.1": "pacific",
+ "osd.5": "octopus",
+ }
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ assert mgr.health_checks
+ assert "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and len(mgr.health_checks) == 1
+ assert len(mgr.health_checks['CEPHADM_CHECK_CEPH_RELEASE']['detail']) == 2
+
+ def test_kernel_mismatch(self, mgr):
+
+ bad_host = mgr.cache.facts['node-1.ceph.com']
+ bad_host['kernel'] = "5.10.18.0-241.10.1.el8.x86_64"
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ assert len(mgr.health_checks) == 1
+ assert 'CEPHADM_CHECK_KERNEL_VERSION' in mgr.health_checks
+ assert mgr.health_checks['CEPHADM_CHECK_KERNEL_VERSION']['detail'][0] == \
+ "host node-1.ceph.com running kernel 5.10, majority of hosts(9) running 4.18"
+ assert mgr.health_checks['CEPHADM_CHECK_KERNEL_VERSION']['count'] == 1
+
+ def test_inconsistent_subscription(self, mgr):
+
+ bad_host = mgr.cache.facts['node-5.ceph.com']
+ bad_host['subscribed'] = "no"
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ assert len(mgr.health_checks) == 1
+ assert "CEPHADM_CHECK_SUBSCRIPTION" in mgr.health_checks
+ assert mgr.health_checks['CEPHADM_CHECK_SUBSCRIPTION']['detail'][0] == \
+ "node-5.ceph.com does not have an active subscription"
+
+ def test_kernel_security_inconsistent(self, mgr):
+
+ bad_node = mgr.cache.facts['node-3.ceph.com']
+ bad_node['kernel_security'] = {
+ "SELINUX": "permissive",
+ "SELINUXTYPE": "targeted",
+ "description": "SELinux: Enabled(permissive, targeted)",
+ "type": "SELinux"
+ }
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ assert len(mgr.health_checks) == 1
+ assert 'CEPHADM_CHECK_KERNEL_LSM' in mgr.health_checks
+ assert mgr.health_checks['CEPHADM_CHECK_KERNEL_LSM']['detail'][0] == \
+ "node-3.ceph.com has inconsistent KSM settings compared to the majority of hosts(9) in the cluster"
+
+ def test_release_and_bad_mtu(self, mgr):
+
+ mgr.version_overrides = {
+ "osd.1": "pacific",
+ }
+ bad_node = mgr.cache.facts['node-1.ceph.com']
+ bad_node['interfaces']['eth0']['mtu'] = 1500
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert mgr.health_checks
+ assert len(mgr.health_checks) == 2
+ assert "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and \
+ "CEPHADM_CHECK_MTU" in mgr.health_checks
+
+ def test_release_mtu_LSM(self, mgr):
+
+ mgr.version_overrides = {
+ "osd.1": "pacific",
+ }
+ bad_node1 = mgr.cache.facts['node-1.ceph.com']
+ bad_node1['interfaces']['eth0']['mtu'] = 1500
+ bad_node2 = mgr.cache.facts['node-3.ceph.com']
+ bad_node2['kernel_security'] = {
+ "SELINUX": "permissive",
+ "SELINUXTYPE": "targeted",
+ "description": "SELinux: Enabled(permissive, targeted)",
+ "type": "SELinux"
+ }
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert mgr.health_checks
+ assert len(mgr.health_checks) == 3
+ assert \
+ "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and \
+ "CEPHADM_CHECK_MTU" in mgr.health_checks and \
+ "CEPHADM_CHECK_KERNEL_LSM" in mgr.health_checks
+
+ def test_release_mtu_LSM_subscription(self, mgr):
+
+ mgr.version_overrides = {
+ "osd.1": "pacific",
+ }
+ bad_node1 = mgr.cache.facts['node-1.ceph.com']
+ bad_node1['interfaces']['eth0']['mtu'] = 1500
+ bad_node1['subscribed'] = "no"
+ bad_node2 = mgr.cache.facts['node-3.ceph.com']
+ bad_node2['kernel_security'] = {
+ "SELINUX": "permissive",
+ "SELINUXTYPE": "targeted",
+ "description": "SELinux: Enabled(permissive, targeted)",
+ "type": "SELinux"
+ }
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(json.dumps(mgr.health_checks))
+ logger.info(checker.subnet_lookup)
+ assert mgr.health_checks
+ assert len(mgr.health_checks) == 4
+ assert \
+ "CEPHADM_CHECK_CEPH_RELEASE" in mgr.health_checks and \
+ "CEPHADM_CHECK_MTU" in mgr.health_checks and \
+ "CEPHADM_CHECK_KERNEL_LSM" in mgr.health_checks and \
+ "CEPHADM_CHECK_SUBSCRIPTION" in mgr.health_checks
+
+ def test_skip_release_during_upgrade(self, mgr):
+ mgr.upgrade.upgrade_state = UpgradeState.from_json({
+ 'target_name': 'wah',
+ 'progress_id': str(uuid.uuid4()),
+ 'target_id': 'wah',
+ 'error': '',
+ 'paused': False,
+ })
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(f"{checker.skipped_checks_count} skipped check(s): {checker.skipped_checks}")
+ assert checker.skipped_checks_count == 1
+ assert 'ceph_release' in checker.skipped_checks
+
+ def test_skip_when_disabled(self, mgr):
+ mgr.module_option.update({
+ "config_checks_enabled": "false"
+ })
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(checker.active_checks)
+ logger.info(checker.defined_checks)
+ assert checker.active_checks_count == 0
+
+ def test_skip_mtu_checks(self, mgr):
+ mgr.datastore.update({
+ 'config_checks': '{"osd_mtu_size": "disabled"}'
+ })
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(checker.active_checks)
+ logger.info(checker.defined_checks)
+ assert 'osd_mtu_size' not in checker.active_checks
+ assert checker.defined_checks == 8 and checker.active_checks_count == 7
+
+ def test_skip_mtu_lsm_checks(self, mgr):
+ mgr.datastore.update({
+ 'config_checks': '{"osd_mtu_size": "disabled", "kernel_security": "disabled"}'
+ })
+
+ checker = CephadmConfigChecks(mgr)
+ checker.cluster_network_list = []
+ checker.public_network_list = ['10.9.64.0/24']
+
+ checker.run_checks()
+ logger.info(checker.active_checks)
+ logger.info(checker.defined_checks)
+ assert 'osd_mtu_size' not in checker.active_checks and \
+ 'kernel_security' not in checker.active_checks
+ assert checker.defined_checks == 8 and checker.active_checks_count == 6
+ assert not mgr.health_checks
diff --git a/src/pybind/mgr/cephadm/tests/test_facts.py b/src/pybind/mgr/cephadm/tests/test_facts.py
new file mode 100644
index 000000000..6c33f5368
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_facts.py
@@ -0,0 +1,10 @@
+from ..import CephadmOrchestrator
+
+from .fixtures import wait
+
+
+def test_facts(cephadm_module: CephadmOrchestrator):
+ facts = {'node-1.ceph.com': {'bios_version': 'F2', 'cpu_cores': 16}}
+ cephadm_module.cache.facts = facts
+ ret_facts = cephadm_module.get_facts('node-1.ceph.com')
+ assert wait(cephadm_module, ret_facts) == [{'bios_version': 'F2', 'cpu_cores': 16}]
diff --git a/src/pybind/mgr/cephadm/tests/test_migration.py b/src/pybind/mgr/cephadm/tests/test_migration.py
new file mode 100644
index 000000000..b95f54f7c
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_migration.py
@@ -0,0 +1,229 @@
+import json
+
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
+from ceph.utils import datetime_to_str, datetime_now
+from cephadm import CephadmOrchestrator
+from cephadm.inventory import SPEC_STORE_PREFIX
+from cephadm.migrations import LAST_MIGRATION
+from cephadm.tests.fixtures import _run_cephadm, wait, with_host
+from cephadm.serve import CephadmServe
+from tests import mock
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1', refresh_hosts=False):
+ with with_host(cephadm_module, 'host2', refresh_hosts=False):
+
+ # emulate the old scheduler:
+ c = cephadm_module.apply_rgw(
+ ServiceSpec('rgw', 'r.z', placement=PlacementSpec(host_pattern='*', count=2))
+ )
+ assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+
+ # with pytest.raises(OrchestratorError, match="cephadm migration still ongoing. Please wait, until the migration is complete."):
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ cephadm_module.migration_current = 0
+ cephadm_module.migration.migrate()
+ # assert we need all daemons.
+ assert cephadm_module.migration_current == 0
+
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ cephadm_module.migration.migrate()
+
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ out = {o.hostname for o in wait(cephadm_module, cephadm_module.list_daemons())}
+ assert out == {'host1', 'host2'}
+
+ c = cephadm_module.apply_rgw(
+ ServiceSpec('rgw', 'r.z', placement=PlacementSpec(host_pattern='host1', count=2))
+ )
+ assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+
+ # Sorry, for this hack, but I need to make sure, Migration thinks,
+ # we have updated all daemons already.
+ cephadm_module.cache.last_daemon_update['host1'] = datetime_now()
+ cephadm_module.cache.last_daemon_update['host2'] = datetime_now()
+
+ cephadm_module.migration_current = 0
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current >= 2
+
+ out = [o.spec.placement for o in wait(
+ cephadm_module, cephadm_module.describe_service())]
+ assert out == [PlacementSpec(count=2, hosts=[HostPlacementSpec(
+ hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])]
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({
+ 'spec': {
+ 'service_type': 'mon',
+ 'service_id': 'wrong',
+ 'placement': {
+ 'hosts': ['host1']
+ }
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+
+ cephadm_module.spec_store.load()
+
+ assert len(cephadm_module.spec_store.all_specs) == 1
+ assert cephadm_module.spec_store.all_specs['mon.wrong'].service_name() == 'mon'
+
+ cephadm_module.migration_current = 1
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current >= 2
+
+ assert len(cephadm_module.spec_store.all_specs) == 1
+ assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec(
+ service_type='mon',
+ unmanaged=True,
+ placement=PlacementSpec(hosts=['host1'])
+ )
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon', json.dumps({
+ 'spec': {
+ 'service_type': 'mon',
+ 'placement': {
+ 'count': 5,
+ }
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+ cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({
+ 'spec': {
+ 'service_type': 'mon',
+ 'service_id': 'wrong',
+ 'placement': {
+ 'hosts': ['host1']
+ }
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+
+ cephadm_module.spec_store.load()
+
+ assert len(cephadm_module.spec_store.all_specs) == 2
+ assert cephadm_module.spec_store.all_specs['mon.wrong'].service_name() == 'mon'
+ assert cephadm_module.spec_store.all_specs['mon'].service_name() == 'mon'
+
+ cephadm_module.migration_current = 1
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current >= 2
+
+ assert len(cephadm_module.spec_store.all_specs) == 1
+ assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec(
+ service_type='mon',
+ unmanaged=True,
+ placement=PlacementSpec(count=5)
+ )
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_service_id_mds_one(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.set_store(SPEC_STORE_PREFIX + 'mds', json.dumps({
+ 'spec': {
+ 'service_type': 'mds',
+ 'placement': {
+ 'hosts': ['host1']
+ }
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+
+ cephadm_module.spec_store.load()
+
+ # there is nothing to migrate, as the spec is gone now.
+ assert len(cephadm_module.spec_store.all_specs) == 0
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_nfs_initial(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.set_store(
+ SPEC_STORE_PREFIX + 'mds',
+ json.dumps({
+ 'spec': {
+ 'service_type': 'nfs',
+ 'service_id': 'foo',
+ 'placement': {
+ 'hosts': ['host1']
+ },
+ 'spec': {
+ 'pool': 'mypool',
+ 'namespace': 'foons',
+ },
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+ cephadm_module.migration_current = 1
+ cephadm_module.spec_store.load()
+
+ ls = json.loads(cephadm_module.get_store('nfs_migration_queue'))
+ assert ls == [['foo', 'mypool', 'foons']]
+
+ cephadm_module.migration.migrate(True)
+ assert cephadm_module.migration_current == 2
+
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_nfs_initial_octopus(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.set_store(
+ SPEC_STORE_PREFIX + 'mds',
+ json.dumps({
+ 'spec': {
+ 'service_type': 'nfs',
+ 'service_id': 'ganesha-foo',
+ 'placement': {
+ 'hosts': ['host1']
+ },
+ 'spec': {
+ 'pool': 'mypool',
+ 'namespace': 'foons',
+ },
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+ cephadm_module.migration_current = 1
+ cephadm_module.spec_store.load()
+
+ ls = json.loads(cephadm_module.get_store('nfs_migration_queue'))
+ assert ls == [['ganesha-foo', 'mypool', 'foons']]
+
+ cephadm_module.migration.migrate(True)
+ assert cephadm_module.migration_current == 2
+
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_admin_client_keyring(cephadm_module: CephadmOrchestrator):
+ assert 'client.admin' not in cephadm_module.keys.keys
+
+ cephadm_module.migration_current = 3
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+ assert cephadm_module.keys.keys['client.admin'].placement.label == '_admin'
diff --git a/src/pybind/mgr/cephadm/tests/test_osd_removal.py b/src/pybind/mgr/cephadm/tests/test_osd_removal.py
new file mode 100644
index 000000000..6685fcb2a
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_osd_removal.py
@@ -0,0 +1,298 @@
+import json
+
+from cephadm.services.osd import OSDRemovalQueue, OSD
+import pytest
+from tests import mock
+from .fixtures import with_cephadm_module
+from datetime import datetime
+
+
+class MockOSD:
+
+ def __init__(self, osd_id):
+ self.osd_id = osd_id
+
+
+class TestOSDRemoval:
+
+ @pytest.mark.parametrize(
+ "osd_id, osd_df, expected",
+ [
+ # missing 'nodes' key
+ (1, dict(nodes=[]), -1),
+ # missing 'pgs' key
+ (1, dict(nodes=[dict(id=1)]), -1),
+ # id != osd_id
+ (1, dict(nodes=[dict(id=999, pgs=1)]), -1),
+ # valid
+ (1, dict(nodes=[dict(id=1, pgs=1)]), 1),
+ ]
+ )
+ def test_get_pg_count(self, rm_util, osd_id, osd_df, expected):
+ with mock.patch("cephadm.services.osd.RemoveUtil.osd_df", return_value=osd_df):
+ assert rm_util.get_pg_count(osd_id) == expected
+
+ @pytest.mark.parametrize(
+ "osds, ok_to_stop, expected",
+ [
+ # no osd_ids provided
+ ([], [False], []),
+ # all osds are ok_to_stop
+ ([1, 2], [True], [1, 2]),
+ # osds are ok_to_stop after the second iteration
+ ([1, 2], [False, True], [2]),
+ # osds are never ok_to_stop, (taking the sample size `(len(osd_ids))` into account),
+ # expected to get False
+ ([1, 2], [False, False], []),
+ ]
+ )
+ def test_find_stop_threshold(self, rm_util, osds, ok_to_stop, expected):
+ with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop):
+ assert rm_util.find_osd_stop_threshold(osds) == expected
+
+ def test_process_removal_queue(self, rm_util):
+ # TODO: !
+ # rm_util.process_removal_queue()
+ pass
+
+ @pytest.mark.parametrize(
+ "max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected",
+ [
+ # drain one at a time, one already draining
+ (1, [1], [1], [True], 0),
+ # drain one at a time, none draining yet
+ (1, [], [1, 2, 3], [True, True, True], 1),
+ # drain one at a time, one already draining, none ok-to-stop
+ (1, [1], [1], [False], 0),
+ # drain one at a time, none draining, one ok-to-stop
+ (1, [], [1, 2, 3], [False, False, True], 1),
+ # drain three at a time, one already draining, all ok-to-stop
+ (3, [1], [1, 2, 3], [True, True, True], 2),
+ # drain two at a time, none already draining, none ok-to-stop
+ (2, [], [1, 2, 3], [False, False, False], 0),
+ # drain two at a time, none already draining, none idling
+ (2, [], [], [], 0),
+ ]
+ )
+ def test_ready_to_drain_osds(self, max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected):
+ with with_cephadm_module({'max_osd_draining_count': max_osd_draining_count}) as m:
+ with mock.patch("cephadm.services.osd.OSDRemovalQueue.draining_osds", return_value=draining_osds):
+ with mock.patch("cephadm.services.osd.OSDRemovalQueue.idling_osds", return_value=idling_osds):
+ with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop):
+ removal_queue = OSDRemovalQueue(m)
+ assert len(removal_queue._ready_to_drain_osds()) == expected
+
+ def test_ok_to_stop(self, rm_util):
+ rm_util.ok_to_stop([MockOSD(1)])
+ rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']},
+ error_ok=True)
+
+ def test_safe_to_destroy(self, rm_util):
+ rm_util.safe_to_destroy([1])
+ rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd safe-to-destroy',
+ 'ids': ['1']}, error_ok=True)
+
+ def test_destroy_osd(self, rm_util):
+ rm_util.destroy_osd(1)
+ rm_util._run_mon_cmd.assert_called_with(
+ {'prefix': 'osd destroy-actual', 'id': 1, 'yes_i_really_mean_it': True})
+
+ def test_purge_osd(self, rm_util):
+ rm_util.purge_osd(1)
+ rm_util._run_mon_cmd.assert_called_with(
+ {'prefix': 'osd purge-actual', 'id': 1, 'yes_i_really_mean_it': True})
+
+ def test_load(self, cephadm_module, rm_util):
+ data = json.dumps([
+ {
+ "osd_id": 35,
+ "started": True,
+ "draining": True,
+ "stopped": False,
+ "replace": False,
+ "force": False,
+ "zap": False,
+ "nodename": "node2",
+ "drain_started_at": "2020-09-14T11:41:53.960463",
+ "drain_stopped_at": None,
+ "drain_done_at": None,
+ "process_started_at": "2020-09-14T11:41:52.245832"
+ }
+ ])
+ cephadm_module.set_store('osd_remove_queue', data)
+ cephadm_module.to_remove_osds.load_from_store()
+
+ expected = OSDRemovalQueue(cephadm_module)
+ expected.osds.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
+ assert cephadm_module.to_remove_osds == expected
+
+
+class TestOSD:
+
+ def test_start(self, osd_obj):
+ assert osd_obj.started is False
+ osd_obj.start()
+ assert osd_obj.started is True
+ assert osd_obj.stopped is False
+
+ def test_start_draining_purge(self, osd_obj):
+ assert osd_obj.draining is False
+ assert osd_obj.drain_started_at is None
+ ret = osd_obj.start_draining()
+ osd_obj.rm_util.reweight_osd.assert_called_with(osd_obj, 0.0)
+ assert isinstance(osd_obj.drain_started_at, datetime)
+ assert osd_obj.draining is True
+ assert osd_obj.replace is False
+ assert ret is True
+
+ def test_start_draining_replace(self, osd_obj):
+ assert osd_obj.draining is False
+ assert osd_obj.drain_started_at is None
+ osd_obj.replace = True
+ ret = osd_obj.start_draining()
+ osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'out')
+ assert isinstance(osd_obj.drain_started_at, datetime)
+ assert osd_obj.draining is True
+ assert osd_obj.replace is True
+ assert ret is True
+
+ def test_start_draining_stopped(self, osd_obj):
+ osd_obj.stopped = True
+ ret = osd_obj.start_draining()
+ assert osd_obj.drain_started_at is None
+ assert ret is False
+ assert osd_obj.draining is False
+
+ def test_stop_draining_replace(self, osd_obj):
+ osd_obj.replace = True
+ ret = osd_obj.stop_draining()
+ osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'in')
+ assert isinstance(osd_obj.drain_stopped_at, datetime)
+ assert osd_obj.draining is False
+ assert ret is True
+
+ def test_stop_draining_purge(self, osd_obj):
+ osd_obj.original_weight = 1.0
+ ret = osd_obj.stop_draining()
+ osd_obj.rm_util.reweight_osd.assert_called_with(osd_obj, 1.0)
+ assert isinstance(osd_obj.drain_stopped_at, datetime)
+ assert osd_obj.draining is False
+ assert ret is True
+
+ @mock.patch('cephadm.services.osd.OSD.stop_draining')
+ def test_stop(self, stop_draining_mock, osd_obj):
+ osd_obj.stop()
+ assert osd_obj.started is False
+ assert osd_obj.stopped is True
+ stop_draining_mock.assert_called_once()
+
+ @pytest.mark.parametrize(
+ "draining, empty, expected",
+ [
+ # must be !draining! and !not empty! to yield True
+ (True, not True, True),
+ # not draining and not empty
+ (False, not True, False),
+ # not draining and empty
+ (False, True, False),
+ # draining and empty
+ (True, True, False),
+ ]
+ )
+ def test_is_draining(self, osd_obj, draining, empty, expected):
+ with mock.patch("cephadm.services.osd.OSD.is_empty", new_callable=mock.PropertyMock(return_value=empty)):
+ osd_obj.draining = draining
+ assert osd_obj.is_draining is expected
+
+ @mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop")
+ def test_is_ok_to_stop(self, _, osd_obj):
+ osd_obj.is_ok_to_stop
+ osd_obj.rm_util.ok_to_stop.assert_called_once()
+
+ @pytest.mark.parametrize(
+ "pg_count, expected",
+ [
+ (0, True),
+ (1, False),
+ (9999, False),
+ (-1, False),
+ ]
+ )
+ def test_is_empty(self, osd_obj, pg_count, expected):
+ with mock.patch("cephadm.services.osd.OSD.get_pg_count", return_value=pg_count):
+ assert osd_obj.is_empty is expected
+
+ @mock.patch("cephadm.services.osd.RemoveUtil.safe_to_destroy")
+ def test_safe_to_destroy(self, _, osd_obj):
+ osd_obj.safe_to_destroy()
+ osd_obj.rm_util.safe_to_destroy.assert_called_once()
+
+ @mock.patch("cephadm.services.osd.RemoveUtil.set_osd_flag")
+ def test_down(self, _, osd_obj):
+ osd_obj.down()
+ osd_obj.rm_util.set_osd_flag.assert_called_with([osd_obj], 'down')
+
+ @mock.patch("cephadm.services.osd.RemoveUtil.destroy_osd")
+ def test_destroy_osd(self, _, osd_obj):
+ osd_obj.destroy()
+ osd_obj.rm_util.destroy_osd.assert_called_once()
+
+ @mock.patch("cephadm.services.osd.RemoveUtil.purge_osd")
+ def test_purge(self, _, osd_obj):
+ osd_obj.purge()
+ osd_obj.rm_util.purge_osd.assert_called_once()
+
+ @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count")
+ def test_pg_count(self, _, osd_obj):
+ osd_obj.get_pg_count()
+ osd_obj.rm_util.get_pg_count.assert_called_once()
+
+ def test_drain_status_human_not_started(self, osd_obj):
+ assert osd_obj.drain_status_human() == 'not started'
+
+ def test_drain_status_human_started(self, osd_obj):
+ osd_obj.started = True
+ assert osd_obj.drain_status_human() == 'started'
+
+ def test_drain_status_human_draining(self, osd_obj):
+ osd_obj.started = True
+ osd_obj.draining = True
+ assert osd_obj.drain_status_human() == 'draining'
+
+ def test_drain_status_human_done(self, osd_obj):
+ osd_obj.started = True
+ osd_obj.draining = False
+ osd_obj.drain_done_at = datetime.utcnow()
+ assert osd_obj.drain_status_human() == 'done, waiting for purge'
+
+
+class TestOSDRemovalQueue:
+
+ def test_queue_size(self, osd_obj):
+ q = OSDRemovalQueue(mock.Mock())
+ assert q.queue_size() == 0
+ q.osds.add(osd_obj)
+ assert q.queue_size() == 1
+
+ @mock.patch("cephadm.services.osd.OSD.start")
+ @mock.patch("cephadm.services.osd.OSD.exists")
+ def test_enqueue(self, exist, start, osd_obj):
+ q = OSDRemovalQueue(mock.Mock())
+ q.enqueue(osd_obj)
+ osd_obj.start.assert_called_once()
+
+ @mock.patch("cephadm.services.osd.OSD.stop")
+ @mock.patch("cephadm.services.osd.OSD.exists")
+ def test_rm_raise(self, exist, stop, osd_obj):
+ q = OSDRemovalQueue(mock.Mock())
+ with pytest.raises(KeyError):
+ q.rm(osd_obj)
+ osd_obj.stop.assert_called_once()
+
+ @mock.patch("cephadm.services.osd.OSD.stop")
+ @mock.patch("cephadm.services.osd.OSD.exists")
+ def test_rm(self, exist, stop, osd_obj):
+ q = OSDRemovalQueue(mock.Mock())
+ q.osds.add(osd_obj)
+ q.rm(osd_obj)
+ osd_obj.stop.assert_called_once()
diff --git a/src/pybind/mgr/cephadm/tests/test_scheduling.py b/src/pybind/mgr/cephadm/tests/test_scheduling.py
new file mode 100644
index 000000000..2454dc0d1
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_scheduling.py
@@ -0,0 +1,1591 @@
+# Disable autopep8 for this file:
+
+# fmt: off
+
+from typing import NamedTuple, List, Dict, Optional
+import pytest
+
+from ceph.deployment.hostspec import HostSpec
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, IngressSpec
+from ceph.deployment.hostspec import SpecValidationError
+
+from cephadm.module import HostAssignment
+from cephadm.schedule import DaemonPlacement
+from orchestrator import DaemonDescription, OrchestratorValidationError, OrchestratorError
+
+
+def wrapper(func):
+ # some odd thingy to revert the order or arguments
+ def inner(*args):
+ def inner2(expected):
+ func(expected, *args)
+ return inner2
+ return inner
+
+
+@wrapper
+def none(expected):
+ assert expected == []
+
+
+@wrapper
+def one_of(expected, *hosts):
+ if not isinstance(expected, list):
+ assert False, str(expected)
+ assert len(expected) == 1, f'one_of failed len({expected}) != 1'
+ assert expected[0] in hosts
+
+
+@wrapper
+def two_of(expected, *hosts):
+ if not isinstance(expected, list):
+ assert False, str(expected)
+ assert len(expected) == 2, f'one_of failed len({expected}) != 2'
+ matches = 0
+ for h in hosts:
+ matches += int(h in expected)
+ if matches != 2:
+ assert False, f'two of {hosts} not in {expected}'
+
+
+@wrapper
+def exactly(expected, *hosts):
+ assert expected == list(hosts)
+
+
+@wrapper
+def error(expected, kind, match):
+ assert isinstance(expected, kind), (str(expected), match)
+ assert str(expected) == match, (str(expected), match)
+
+
+@wrapper
+def _or(expected, *inners):
+ def catch(inner):
+ try:
+ inner(expected)
+ except AssertionError as e:
+ return e
+ result = [catch(i) for i in inners]
+ if None not in result:
+ assert False, f"_or failed: {expected}"
+
+
+def _always_true(_):
+ pass
+
+
+def k(s):
+ return [e for e in s.split(' ') if e]
+
+
+def get_result(key, results):
+ def match(one):
+ for o, k in zip(one, key):
+ if o != k and o != '*':
+ return False
+ return True
+ return [v for k, v in results if match(k)][0]
+
+
+def mk_spec_and_host(spec_section, hosts, explicit_key, explicit, count):
+
+ if spec_section == 'hosts':
+ mk_spec = lambda: ServiceSpec('mgr', placement=PlacementSpec( # noqa: E731
+ hosts=explicit,
+ count=count,
+ ))
+ elif spec_section == 'label':
+ mk_spec = lambda: ServiceSpec('mgr', placement=PlacementSpec( # noqa: E731
+ label='mylabel',
+ count=count,
+ ))
+ elif spec_section == 'host_pattern':
+ pattern = {
+ 'e': 'notfound',
+ '1': '1',
+ '12': '[1-2]',
+ '123': '*',
+ }[explicit_key]
+ mk_spec = lambda: ServiceSpec('mgr', placement=PlacementSpec( # noqa: E731
+ host_pattern=pattern,
+ count=count,
+ ))
+ else:
+ assert False
+
+ hosts = [
+ HostSpec(h, labels=['mylabel']) if h in explicit else HostSpec(h)
+ for h in hosts
+ ]
+
+ return mk_spec, hosts
+
+
+def run_scheduler_test(results, mk_spec, hosts, daemons, key_elems):
+ key = ' '.join('N' if e is None else str(e) for e in key_elems)
+ try:
+ assert_res = get_result(k(key), results)
+ except IndexError:
+ try:
+ spec = mk_spec()
+ host_res, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=hosts,
+ unreachable_hosts=[],
+ daemons=daemons,
+ ).place()
+ if isinstance(host_res, list):
+ e = ', '.join(repr(h.hostname) for h in host_res)
+ assert False, f'`(k("{key}"), exactly({e})),` not found'
+ assert False, f'`(k("{key}"), ...),` not found'
+ except OrchestratorError as e:
+ assert False, f'`(k("{key}"), error({type(e).__name__}, {repr(str(e))})),` not found'
+
+ for _ in range(10): # scheduler has a random component
+ try:
+ spec = mk_spec()
+ host_res, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=hosts,
+ unreachable_hosts=[],
+ daemons=daemons
+ ).place()
+
+ assert_res(sorted([h.hostname for h in host_res]))
+ except Exception as e:
+ assert_res(e)
+
+
+@pytest.mark.parametrize("dp,n,result",
+ [ # noqa: E128
+ (
+ DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80]),
+ 0,
+ DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80]),
+ ),
+ (
+ DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80]),
+ 2,
+ DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[82]),
+ ),
+ (
+ DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[80, 90]),
+ 2,
+ DaemonPlacement(daemon_type='mgr', hostname='host1', ports=[82, 92]),
+ ),
+ ])
+def test_daemon_placement_renumber(dp, n, result):
+ assert dp.renumber_ports(n) == result
+
+
+@pytest.mark.parametrize(
+ 'dp,dd,result',
+ [
+ (
+ DaemonPlacement(daemon_type='mgr', hostname='host1'),
+ DaemonDescription('mgr', 'a', 'host1'),
+ True
+ ),
+ (
+ DaemonPlacement(daemon_type='mgr', hostname='host1', name='a'),
+ DaemonDescription('mgr', 'a', 'host1'),
+ True
+ ),
+ (
+ DaemonPlacement(daemon_type='mon', hostname='host1', name='a'),
+ DaemonDescription('mgr', 'a', 'host1'),
+ False
+ ),
+ (
+ DaemonPlacement(daemon_type='mgr', hostname='host1', name='a'),
+ DaemonDescription('mgr', 'b', 'host1'),
+ False
+ ),
+ ])
+def test_daemon_placement_match(dp, dd, result):
+ assert dp.matches_daemon(dd) == result
+
+
+# * first match from the top wins
+# * where e=[], *=any
+#
+# + list of known hosts available for scheduling (host_key)
+# | + hosts used for explict placement (explicit_key)
+# | | + count
+# | | | + section (host, label, pattern)
+# | | | | + expected result
+# | | | | |
+test_explicit_scheduler_results = [
+ (k("* * 0 *"), error(SpecValidationError, 'num/count must be >= 1')),
+ (k("* e N l"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr>: No matching hosts for label mylabel')),
+ (k("* e N p"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr>: No matching hosts')),
+ (k("* e N h"), error(OrchestratorValidationError, 'placement spec is empty: no hosts, no label, no pattern, no count')),
+ (k("* e * *"), none),
+ (k("1 12 * h"), error(OrchestratorValidationError, "Cannot place <ServiceSpec for service_name=mgr> on 2: Unknown hosts")),
+ (k("1 123 * h"), error(OrchestratorValidationError, "Cannot place <ServiceSpec for service_name=mgr> on 2, 3: Unknown hosts")),
+ (k("1 * * *"), exactly('1')),
+ (k("12 1 * *"), exactly('1')),
+ (k("12 12 1 *"), one_of('1', '2')),
+ (k("12 12 * *"), exactly('1', '2')),
+ (k("12 123 * h"), error(OrchestratorValidationError, "Cannot place <ServiceSpec for service_name=mgr> on 3: Unknown hosts")),
+ (k("12 123 1 *"), one_of('1', '2', '3')),
+ (k("12 123 * *"), two_of('1', '2', '3')),
+ (k("123 1 * *"), exactly('1')),
+ (k("123 12 1 *"), one_of('1', '2')),
+ (k("123 12 * *"), exactly('1', '2')),
+ (k("123 123 1 *"), one_of('1', '2', '3')),
+ (k("123 123 2 *"), two_of('1', '2', '3')),
+ (k("123 123 * *"), exactly('1', '2', '3')),
+]
+
+
+@pytest.mark.parametrize("spec_section_key,spec_section",
+ [ # noqa: E128
+ ('h', 'hosts'),
+ ('l', 'label'),
+ ('p', 'host_pattern'),
+ ])
+@pytest.mark.parametrize("count",
+ [ # noqa: E128
+ None,
+ 0,
+ 1,
+ 2,
+ 3,
+ ])
+@pytest.mark.parametrize("explicit_key, explicit",
+ [ # noqa: E128
+ ('e', []),
+ ('1', ['1']),
+ ('12', ['1', '2']),
+ ('123', ['1', '2', '3']),
+ ])
+@pytest.mark.parametrize("host_key, hosts",
+ [ # noqa: E128
+ ('1', ['1']),
+ ('12', ['1', '2']),
+ ('123', ['1', '2', '3']),
+ ])
+def test_explicit_scheduler(host_key, hosts,
+ explicit_key, explicit,
+ count,
+ spec_section_key, spec_section):
+
+ mk_spec, hosts = mk_spec_and_host(spec_section, hosts, explicit_key, explicit, count)
+ run_scheduler_test(
+ results=test_explicit_scheduler_results,
+ mk_spec=mk_spec,
+ hosts=hosts,
+ daemons=[],
+ key_elems=(host_key, explicit_key, count, spec_section_key)
+ )
+
+
+# * first match from the top wins
+# * where e=[], *=any
+#
+# + list of known hosts available for scheduling (host_key)
+# | + hosts used for explict placement (explicit_key)
+# | | + count
+# | | | + existing daemons
+# | | | | + section (host, label, pattern)
+# | | | | | + expected result
+# | | | | | |
+test_scheduler_daemons_results = [
+ (k("* 1 * * *"), exactly('1')),
+ (k("1 123 * * h"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr> on 2, 3: Unknown hosts')),
+ (k("1 123 * * *"), exactly('1')),
+ (k("12 123 * * h"), error(OrchestratorValidationError, 'Cannot place <ServiceSpec for service_name=mgr> on 3: Unknown hosts')),
+ (k("12 123 N * *"), exactly('1', '2')),
+ (k("12 123 1 * *"), one_of('1', '2')),
+ (k("12 123 2 * *"), exactly('1', '2')),
+ (k("12 123 3 * *"), exactly('1', '2')),
+ (k("123 123 N * *"), exactly('1', '2', '3')),
+ (k("123 123 1 e *"), one_of('1', '2', '3')),
+ (k("123 123 1 1 *"), exactly('1')),
+ (k("123 123 1 3 *"), exactly('3')),
+ (k("123 123 1 12 *"), one_of('1', '2')),
+ (k("123 123 1 112 *"), one_of('1', '2')),
+ (k("123 123 1 23 *"), one_of('2', '3')),
+ (k("123 123 1 123 *"), one_of('1', '2', '3')),
+ (k("123 123 2 e *"), two_of('1', '2', '3')),
+ (k("123 123 2 1 *"), _or(exactly('1', '2'), exactly('1', '3'))),
+ (k("123 123 2 3 *"), _or(exactly('1', '3'), exactly('2', '3'))),
+ (k("123 123 2 12 *"), exactly('1', '2')),
+ (k("123 123 2 112 *"), exactly('1', '2')),
+ (k("123 123 2 23 *"), exactly('2', '3')),
+ (k("123 123 2 123 *"), two_of('1', '2', '3')),
+ (k("123 123 3 * *"), exactly('1', '2', '3')),
+]
+
+
+@pytest.mark.parametrize("spec_section_key,spec_section",
+ [ # noqa: E128
+ ('h', 'hosts'),
+ ('l', 'label'),
+ ('p', 'host_pattern'),
+ ])
+@pytest.mark.parametrize("daemons_key, daemons",
+ [ # noqa: E128
+ ('e', []),
+ ('1', ['1']),
+ ('3', ['3']),
+ ('12', ['1', '2']),
+ ('112', ['1', '1', '2']), # deal with existing co-located daemons
+ ('23', ['2', '3']),
+ ('123', ['1', '2', '3']),
+ ])
+@pytest.mark.parametrize("count",
+ [ # noqa: E128
+ None,
+ 1,
+ 2,
+ 3,
+ ])
+@pytest.mark.parametrize("explicit_key, explicit",
+ [ # noqa: E128
+ ('1', ['1']),
+ ('123', ['1', '2', '3']),
+ ])
+@pytest.mark.parametrize("host_key, hosts",
+ [ # noqa: E128
+ ('1', ['1']),
+ ('12', ['1', '2']),
+ ('123', ['1', '2', '3']),
+ ])
+def test_scheduler_daemons(host_key, hosts,
+ explicit_key, explicit,
+ count,
+ daemons_key, daemons,
+ spec_section_key, spec_section):
+ mk_spec, hosts = mk_spec_and_host(spec_section, hosts, explicit_key, explicit, count)
+ dds = [
+ DaemonDescription('mgr', d, d)
+ for d in daemons
+ ]
+ run_scheduler_test(
+ results=test_scheduler_daemons_results,
+ mk_spec=mk_spec,
+ hosts=hosts,
+ daemons=dds,
+ key_elems=(host_key, explicit_key, count, daemons_key, spec_section_key)
+ )
+
+
+# =========================
+
+
+class NodeAssignmentTest(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ daemons: List[DaemonDescription]
+ rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
+ post_rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
+ expected: List[str]
+ expected_add: List[str]
+ expected_remove: List[DaemonDescription]
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,rank_map,post_rank_map,expected,expected_add,expected_remove",
+ [ # noqa: E128
+ # just hosts
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(hosts=['smithi060']),
+ ['smithi060'],
+ [],
+ None, None,
+ ['mgr:smithi060'], ['mgr:smithi060'], []
+ ),
+ # all_hosts
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(host_pattern='*'),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ ],
+ None, None,
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ ['mgr:host3'],
+ []
+ ),
+ # all_hosts + count_per_host
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(host_pattern='*', count_per_host=2),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mds', 'a', 'host1'),
+ DaemonDescription('mds', 'b', 'host2'),
+ ],
+ None, None,
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'],
+ ['mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'],
+ []
+ ),
+ # count that is bigger than the amount of hosts. Truncate to len(hosts)
+ # mgr should not be co-located to each other.
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=4),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ []
+ ),
+ # count that is bigger than the amount of hosts; wrap around.
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(count=6),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'],
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'],
+ []
+ ),
+ # count + partial host list
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=3, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ ],
+ None, None,
+ ['mgr:host3'],
+ ['mgr:host3'],
+ ['mgr.a', 'mgr.b']
+ ),
+ # count + partial host list (with colo)
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(count=3, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mds', 'a', 'host1'),
+ DaemonDescription('mds', 'b', 'host2'),
+ ],
+ None, None,
+ ['mds:host3', 'mds:host3', 'mds:host3'],
+ ['mds:host3', 'mds:host3', 'mds:host3'],
+ ['mds.a', 'mds.b']
+ ),
+ # count 1 + partial host list
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ ],
+ None, None,
+ ['mgr:host3'],
+ ['mgr:host3'],
+ ['mgr.a', 'mgr.b']
+ ),
+ # count + partial host list + existing
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ ],
+ None, None,
+ ['mgr:host3'],
+ ['mgr:host3'],
+ ['mgr.a']
+ ),
+ # count + partial host list + existing (deterministic)
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2, hosts=['host1']),
+ 'host1 host2'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ ],
+ None, None,
+ ['mgr:host1'],
+ [],
+ []
+ ),
+ # count + partial host list + existing (deterministic)
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2, hosts=['host1']),
+ 'host1 host2'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host2'),
+ ],
+ None, None,
+ ['mgr:host1'],
+ ['mgr:host1'],
+ ['mgr.a']
+ ),
+ # label only
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ []
+ ),
+ # label + count (truncate to host list)
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=4, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ ['mgr:host1', 'mgr:host2', 'mgr:host3'],
+ []
+ ),
+ # label + count (with colo)
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(count=6, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'],
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3'],
+ []
+ ),
+ # label only + count_per_hst
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(label='foo', count_per_host=3),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3',
+ 'mds:host1', 'mds:host2', 'mds:host3'],
+ ['mds:host1', 'mds:host2', 'mds:host3', 'mds:host1', 'mds:host2', 'mds:host3',
+ 'mds:host1', 'mds:host2', 'mds:host3'],
+ []
+ ),
+ # host_pattern
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(host_pattern='mgr*'),
+ 'mgrhost1 mgrhost2 datahost'.split(),
+ [],
+ None, None,
+ ['mgr:mgrhost1', 'mgr:mgrhost2'],
+ ['mgr:mgrhost1', 'mgr:mgrhost2'],
+ []
+ ),
+ # host_pattern + count_per_host
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(host_pattern='mds*', count_per_host=3),
+ 'mdshost1 mdshost2 datahost'.split(),
+ [],
+ None, None,
+ ['mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2'],
+ ['mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2', 'mds:mdshost1', 'mds:mdshost2'],
+ []
+ ),
+ # label + count_per_host + ports
+ NodeAssignmentTest(
+ 'rgw',
+ PlacementSpec(count=6, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ None, None,
+ ['rgw:host1(*:80)', 'rgw:host2(*:80)', 'rgw:host3(*:80)',
+ 'rgw:host1(*:81)', 'rgw:host2(*:81)', 'rgw:host3(*:81)'],
+ ['rgw:host1(*:80)', 'rgw:host2(*:80)', 'rgw:host3(*:80)',
+ 'rgw:host1(*:81)', 'rgw:host2(*:81)', 'rgw:host3(*:81)'],
+ []
+ ),
+ # label + count_per_host + ports (+ xisting)
+ NodeAssignmentTest(
+ 'rgw',
+ PlacementSpec(count=6, label='foo'),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('rgw', 'a', 'host1', ports=[81]),
+ DaemonDescription('rgw', 'b', 'host2', ports=[80]),
+ DaemonDescription('rgw', 'c', 'host1', ports=[82]),
+ ],
+ None, None,
+ ['rgw:host1(*:80)', 'rgw:host2(*:80)', 'rgw:host3(*:80)',
+ 'rgw:host1(*:81)', 'rgw:host2(*:81)', 'rgw:host3(*:81)'],
+ ['rgw:host1(*:80)', 'rgw:host3(*:80)',
+ 'rgw:host2(*:81)', 'rgw:host3(*:81)'],
+ ['rgw.c']
+ ),
+ # cephadm.py teuth case
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=3, hosts=['host1=y', 'host2=x']),
+ 'host1 host2'.split(),
+ [
+ DaemonDescription('mgr', 'y', 'host1'),
+ DaemonDescription('mgr', 'x', 'host2'),
+ ],
+ None, None,
+ ['mgr:host1(name=y)', 'mgr:host2(name=x)'],
+ [], []
+ ),
+
+ # note: host -> rank mapping is psuedo-random based on svc name, so these
+ # host/rank pairs may seem random but they match the nfs.mynfs seed used by
+ # the test.
+
+ # ranked, fresh
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ [],
+ {},
+ {0: {0: None}, 1: {0: None}, 2: {0: None}},
+ ['nfs:host3(rank=0.0)', 'nfs:host2(rank=1.0)', 'nfs:host1(rank=2.0)'],
+ ['nfs:host3(rank=0.0)', 'nfs:host2(rank=1.0)', 'nfs:host1(rank=2.0)'],
+ []
+ ),
+ # 21: ranked, exist
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.1', 'host1', rank=0, rank_generation=1),
+ ],
+ {0: {1: '0.1'}},
+ {0: {1: '0.1'}, 1: {0: None}, 2: {0: None}},
+ ['nfs:host1(rank=0.1)', 'nfs:host3(rank=1.0)', 'nfs:host2(rank=2.0)'],
+ ['nfs:host3(rank=1.0)', 'nfs:host2(rank=2.0)'],
+ []
+ ),
+ # ranked, exist, different ranks
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.1', 'host1', rank=0, rank_generation=1),
+ DaemonDescription('nfs', '1.1', 'host2', rank=1, rank_generation=1),
+ ],
+ {0: {1: '0.1'}, 1: {1: '1.1'}},
+ {0: {1: '0.1'}, 1: {1: '1.1'}, 2: {0: None}},
+ ['nfs:host1(rank=0.1)', 'nfs:host2(rank=1.1)', 'nfs:host3(rank=2.0)'],
+ ['nfs:host3(rank=2.0)'],
+ []
+ ),
+ # ranked, exist, different ranks (2)
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.1', 'host1', rank=0, rank_generation=1),
+ DaemonDescription('nfs', '1.1', 'host3', rank=1, rank_generation=1),
+ ],
+ {0: {1: '0.1'}, 1: {1: '1.1'}},
+ {0: {1: '0.1'}, 1: {1: '1.1'}, 2: {0: None}},
+ ['nfs:host1(rank=0.1)', 'nfs:host3(rank=1.1)', 'nfs:host2(rank=2.0)'],
+ ['nfs:host2(rank=2.0)'],
+ []
+ ),
+ # ranked, exist, extra ranks
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.5', 'host1', rank=0, rank_generation=5),
+ DaemonDescription('nfs', '1.5', 'host2', rank=1, rank_generation=5),
+ DaemonDescription('nfs', '4.5', 'host2', rank=4, rank_generation=5),
+ ],
+ {0: {5: '0.5'}, 1: {5: '1.5'}},
+ {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {0: None}},
+ ['nfs:host1(rank=0.5)', 'nfs:host2(rank=1.5)', 'nfs:host3(rank=2.0)'],
+ ['nfs:host3(rank=2.0)'],
+ ['nfs.4.5']
+ ),
+ # 25: ranked, exist, extra ranks (scale down: kill off high rank)
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=2),
+ 'host3 host2 host1'.split(),
+ [
+ DaemonDescription('nfs', '0.5', 'host1', rank=0, rank_generation=5),
+ DaemonDescription('nfs', '1.5', 'host2', rank=1, rank_generation=5),
+ DaemonDescription('nfs', '2.5', 'host3', rank=2, rank_generation=5),
+ ],
+ {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}},
+ {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}},
+ ['nfs:host1(rank=0.5)', 'nfs:host2(rank=1.5)'],
+ [],
+ ['nfs.2.5']
+ ),
+ # ranked, exist, extra ranks (scale down hosts)
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=2),
+ 'host1 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.5', 'host1', rank=0, rank_generation=5),
+ DaemonDescription('nfs', '1.5', 'host2', rank=1, rank_generation=5),
+ DaemonDescription('nfs', '2.5', 'host3', rank=4, rank_generation=5),
+ ],
+ {0: {5: '0.5'}, 1: {5: '1.5'}, 2: {5: '2.5'}},
+ {0: {5: '0.5'}, 1: {5: '1.5', 6: None}, 2: {5: '2.5'}},
+ ['nfs:host1(rank=0.5)', 'nfs:host3(rank=1.6)'],
+ ['nfs:host3(rank=1.6)'],
+ ['nfs.2.5', 'nfs.1.5']
+ ),
+ # ranked, exist, duplicate rank
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.0', 'host1', rank=0, rank_generation=0),
+ DaemonDescription('nfs', '1.1', 'host2', rank=1, rank_generation=1),
+ DaemonDescription('nfs', '1.2', 'host3', rank=1, rank_generation=2),
+ ],
+ {0: {0: '0.0'}, 1: {2: '1.2'}},
+ {0: {0: '0.0'}, 1: {2: '1.2'}, 2: {0: None}},
+ ['nfs:host1(rank=0.0)', 'nfs:host3(rank=1.2)', 'nfs:host2(rank=2.0)'],
+ ['nfs:host2(rank=2.0)'],
+ ['nfs.1.1']
+ ),
+ # 28: ranked, all gens stale (failure during update cycle)
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.2', 'host1', rank=0, rank_generation=2),
+ DaemonDescription('nfs', '1.2', 'host2', rank=1, rank_generation=2),
+ ],
+ {0: {2: '0.2'}, 1: {2: '1.2', 3: '1.3'}},
+ {0: {2: '0.2'}, 1: {2: '1.2', 3: '1.3', 4: None}},
+ ['nfs:host1(rank=0.2)', 'nfs:host3(rank=1.4)'],
+ ['nfs:host3(rank=1.4)'],
+ ['nfs.1.2']
+ ),
+ # ranked, not enough hosts
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(count=4),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('nfs', '0.2', 'host1', rank=0, rank_generation=2),
+ DaemonDescription('nfs', '1.2', 'host2', rank=1, rank_generation=2),
+ ],
+ {0: {2: '0.2'}, 1: {2: '1.2'}},
+ {0: {2: '0.2'}, 1: {2: '1.2'}, 2: {0: None}},
+ ['nfs:host1(rank=0.2)', 'nfs:host2(rank=1.2)', 'nfs:host3(rank=2.0)'],
+ ['nfs:host3(rank=2.0)'],
+ []
+ ),
+ # ranked, scale down
+ NodeAssignmentTest(
+ 'nfs',
+ PlacementSpec(hosts=['host2']),
+ 'host1 host2'.split(),
+ [
+ DaemonDescription('nfs', '0.2', 'host1', rank=0, rank_generation=2),
+ DaemonDescription('nfs', '1.2', 'host2', rank=1, rank_generation=2),
+ DaemonDescription('nfs', '2.2', 'host3', rank=2, rank_generation=2),
+ ],
+ {0: {2: '0.2'}, 1: {2: '1.2'}, 2: {2: '2.2'}},
+ {0: {2: '0.2', 3: None}, 1: {2: '1.2'}, 2: {2: '2.2'}},
+ ['nfs:host2(rank=0.3)'],
+ ['nfs:host2(rank=0.3)'],
+ ['nfs.0.2', 'nfs.1.2', 'nfs.2.2']
+ ),
+
+ ])
+def test_node_assignment(service_type, placement, hosts, daemons, rank_map, post_rank_map,
+ expected, expected_add, expected_remove):
+ spec = None
+ service_id = None
+ allow_colo = False
+ if service_type == 'rgw':
+ service_id = 'realm.zone'
+ allow_colo = True
+ elif service_type == 'mds':
+ service_id = 'myfs'
+ allow_colo = True
+ elif service_type == 'nfs':
+ service_id = 'mynfs'
+ spec = ServiceSpec(service_type=service_type,
+ service_id=service_id,
+ placement=placement)
+
+ if not spec:
+ spec = ServiceSpec(service_type=service_type,
+ service_id=service_id,
+ placement=placement)
+
+ all_slots, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=[HostSpec(h, labels=['foo']) for h in hosts],
+ unreachable_hosts=[],
+ daemons=daemons,
+ allow_colo=allow_colo,
+ rank_map=rank_map,
+ ).place()
+
+ assert rank_map == post_rank_map
+
+ got = [str(p) for p in all_slots]
+ num_wildcard = 0
+ for i in expected:
+ if i == '*':
+ num_wildcard += 1
+ else:
+ assert i in got
+ got.remove(i)
+ assert num_wildcard == len(got)
+
+ got = [str(p) for p in to_add]
+ num_wildcard = 0
+ for i in expected_add:
+ if i == '*':
+ num_wildcard += 1
+ else:
+ assert i in got
+ got.remove(i)
+ assert num_wildcard == len(got)
+
+ assert sorted([d.name() for d in to_remove]) == sorted(expected_remove)
+
+
+class NodeAssignmentTest5(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ available_hosts: List[str]
+ candidates_hosts: List[str]
+
+
+@pytest.mark.parametrize("service_type, placement, available_hosts, expected_candidates",
+ [ # noqa: E128
+ NodeAssignmentTest5(
+ 'alertmanager',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host3 host1 host4 host2'.split(),
+ ),
+ NodeAssignmentTest5(
+ 'prometheus',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host3 host2 host4 host1'.split(),
+ ),
+ NodeAssignmentTest5(
+ 'grafana',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host1 host2 host4 host3'.split(),
+ ),
+ NodeAssignmentTest5(
+ 'mgr',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host4 host2 host1 host3'.split(),
+ ),
+ NodeAssignmentTest5(
+ 'mon',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host1 host3 host4 host2'.split(),
+ ),
+ NodeAssignmentTest5(
+ 'rgw',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host1 host3 host2 host4'.split(),
+ ),
+ NodeAssignmentTest5(
+ 'cephfs-mirror',
+ PlacementSpec(hosts='host1 host2 host3 host4'.split()),
+ 'host1 host2 host3 host4'.split(),
+ 'host4 host3 host1 host2'.split(),
+ ),
+ ])
+def test_node_assignment_random_shuffle(service_type, placement, available_hosts, expected_candidates):
+ spec = None
+ service_id = None
+ allow_colo = False
+ spec = ServiceSpec(service_type=service_type,
+ service_id=service_id,
+ placement=placement)
+
+ candidates = HostAssignment(
+ spec=spec,
+ hosts=[HostSpec(h, labels=['foo']) for h in available_hosts],
+ unreachable_hosts=[],
+ daemons=[],
+ allow_colo=allow_colo,
+ ).get_candidates()
+
+ candidates_hosts = [h.hostname for h in candidates]
+ assert candidates_hosts == expected_candidates
+
+
+class NodeAssignmentTest2(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected_len: int
+ in_set: List[str]
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected_len,in_set",
+ [ # noqa: E128
+ # just count
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host1', 'host2', 'host3'],
+ ),
+
+ # hosts + (smaller) count
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=1, hosts='host1 host2'.split()),
+ 'host1 host2'.split(),
+ [],
+ 1,
+ ['host1', 'host2'],
+ ),
+ # hosts + (smaller) count, existing
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=1, hosts='host1 host2 host3'.split()),
+ 'host1 host2 host3'.split(),
+ [DaemonDescription('mgr', 'mgr.a', 'host1')],
+ 1,
+ ['host1', 'host2', 'host3'],
+ ),
+ # hosts + (smaller) count, (more) existing
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=1, hosts='host1 host2 host3'.split()),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ ],
+ 1,
+ ['host1', 'host2']
+ ),
+ # count + partial host list
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=2, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host1', 'host2', 'host3']
+ ),
+ # label + count
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=1, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host1', 'host2', 'host3']
+ ),
+ ])
+def test_node_assignment2(service_type, placement, hosts,
+ daemons, expected_len, in_set):
+ hosts, to_add, to_remove = HostAssignment(
+ spec=ServiceSpec(service_type, placement=placement),
+ hosts=[HostSpec(h, labels=['foo']) for h in hosts],
+ unreachable_hosts=[],
+ daemons=daemons,
+ ).place()
+ assert len(hosts) == expected_len
+ for h in [h.hostname for h in hosts]:
+ assert h in in_set
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected_len,must_have",
+ [ # noqa: E128
+ # hosts + (smaller) count, (more) existing
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=3, hosts='host3'.split()),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host3']
+ ),
+ # count + partial host list
+ NodeAssignmentTest2(
+ 'mgr',
+ PlacementSpec(count=2, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host3']
+ ),
+ ])
+def test_node_assignment3(service_type, placement, hosts,
+ daemons, expected_len, must_have):
+ hosts, to_add, to_remove = HostAssignment(
+ spec=ServiceSpec(service_type, placement=placement),
+ hosts=[HostSpec(h) for h in hosts],
+ unreachable_hosts=[],
+ daemons=daemons,
+ ).place()
+ assert len(hosts) == expected_len
+ for h in must_have:
+ assert h in [h.hostname for h in hosts]
+
+
+class NodeAssignmentTest4(NamedTuple):
+ spec: ServiceSpec
+ networks: Dict[str, Dict[str, Dict[str, List[str]]]]
+ daemons: List[DaemonDescription]
+ expected: List[str]
+ expected_add: List[str]
+ expected_remove: List[DaemonDescription]
+
+
+@pytest.mark.parametrize("spec,networks,daemons,expected,expected_add,expected_remove",
+ [ # noqa: E128
+ NodeAssignmentTest4(
+ ServiceSpec(
+ service_type='rgw',
+ service_id='foo',
+ placement=PlacementSpec(count=6, label='foo'),
+ networks=['10.0.0.0/8'],
+ ),
+ {
+ 'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}},
+ 'host2': {'10.0.0.0/8': {'eth0': ['10.0.0.2']}},
+ 'host3': {'192.168.0.0/16': {'eth0': ['192.168.0.1']}},
+ },
+ [],
+ ['rgw:host1(10.0.0.1:80)', 'rgw:host2(10.0.0.2:80)',
+ 'rgw:host1(10.0.0.1:81)', 'rgw:host2(10.0.0.2:81)',
+ 'rgw:host1(10.0.0.1:82)', 'rgw:host2(10.0.0.2:82)'],
+ ['rgw:host1(10.0.0.1:80)', 'rgw:host2(10.0.0.2:80)',
+ 'rgw:host1(10.0.0.1:81)', 'rgw:host2(10.0.0.2:81)',
+ 'rgw:host1(10.0.0.1:82)', 'rgw:host2(10.0.0.2:82)'],
+ []
+ ),
+ NodeAssignmentTest4(
+ IngressSpec(
+ service_type='ingress',
+ service_id='rgw.foo',
+ frontend_port=443,
+ monitor_port=8888,
+ virtual_ip='10.0.0.20/8',
+ backend_service='rgw.foo',
+ placement=PlacementSpec(label='foo'),
+ networks=['10.0.0.0/8'],
+ ),
+ {
+ 'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}},
+ 'host2': {'10.0.0.0/8': {'eth1': ['10.0.0.2']}},
+ 'host3': {'192.168.0.0/16': {'eth2': ['192.168.0.1']}},
+ },
+ [],
+ ['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)',
+ 'keepalived:host1', 'keepalived:host2'],
+ ['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)',
+ 'keepalived:host1', 'keepalived:host2'],
+ []
+ ),
+ NodeAssignmentTest4(
+ IngressSpec(
+ service_type='ingress',
+ service_id='rgw.foo',
+ frontend_port=443,
+ monitor_port=8888,
+ virtual_ip='10.0.0.20/8',
+ backend_service='rgw.foo',
+ placement=PlacementSpec(label='foo'),
+ networks=['10.0.0.0/8'],
+ ),
+ {
+ 'host1': {'10.0.0.0/8': {'eth0': ['10.0.0.1']}},
+ 'host2': {'10.0.0.0/8': {'eth1': ['10.0.0.2']}},
+ 'host3': {'192.168.0.0/16': {'eth2': ['192.168.0.1']}},
+ },
+ [
+ DaemonDescription('haproxy', 'a', 'host1', ip='10.0.0.1',
+ ports=[443, 8888]),
+ DaemonDescription('keepalived', 'b', 'host2'),
+ DaemonDescription('keepalived', 'c', 'host3'),
+ ],
+ ['haproxy:host1(10.0.0.1:443,8888)', 'haproxy:host2(10.0.0.2:443,8888)',
+ 'keepalived:host1', 'keepalived:host2'],
+ ['haproxy:host2(10.0.0.2:443,8888)',
+ 'keepalived:host1'],
+ ['keepalived.c']
+ ),
+ ])
+def test_node_assignment4(spec, networks, daemons,
+ expected, expected_add, expected_remove):
+ all_slots, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=[HostSpec(h, labels=['foo']) for h in networks.keys()],
+ unreachable_hosts=[],
+ daemons=daemons,
+ allow_colo=True,
+ networks=networks,
+ primary_daemon_type='haproxy' if spec.service_type == 'ingress' else spec.service_type,
+ per_host_daemon_type='keepalived' if spec.service_type == 'ingress' else None,
+ ).place()
+
+ got = [str(p) for p in all_slots]
+ num_wildcard = 0
+ for i in expected:
+ if i == '*':
+ num_wildcard += 1
+ else:
+ assert i in got
+ got.remove(i)
+ assert num_wildcard == len(got)
+
+ got = [str(p) for p in to_add]
+ num_wildcard = 0
+ for i in expected_add:
+ if i == '*':
+ num_wildcard += 1
+ else:
+ assert i in got
+ got.remove(i)
+ assert num_wildcard == len(got)
+
+ assert sorted([d.name() for d in to_remove]) == sorted(expected_remove)
+
+
+@pytest.mark.parametrize("placement",
+ [ # noqa: E128
+ ('1 *'),
+ ('* label:foo'),
+ ('* host1 host2'),
+ ('hostname12hostname12hostname12hostname12hostname12hostname12hostname12'), # > 63 chars
+ ])
+def test_bad_placements(placement):
+ try:
+ PlacementSpec.from_string(placement.split(' '))
+ assert False
+ except SpecValidationError:
+ pass
+
+
+class NodeAssignmentTestBadSpec(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected: str
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected",
+ [ # noqa: E128
+ # unknown host
+ NodeAssignmentTestBadSpec(
+ 'mgr',
+ PlacementSpec(hosts=['unknownhost']),
+ ['knownhost'],
+ [],
+ "Cannot place <ServiceSpec for service_name=mgr> on unknownhost: Unknown hosts"
+ ),
+ # unknown host pattern
+ NodeAssignmentTestBadSpec(
+ 'mgr',
+ PlacementSpec(host_pattern='unknownhost'),
+ ['knownhost'],
+ [],
+ "Cannot place <ServiceSpec for service_name=mgr>: No matching hosts"
+ ),
+ # unknown label
+ NodeAssignmentTestBadSpec(
+ 'mgr',
+ PlacementSpec(label='unknownlabel'),
+ [],
+ [],
+ "Cannot place <ServiceSpec for service_name=mgr>: No matching hosts for label unknownlabel"
+ ),
+ ])
+def test_bad_specs(service_type, placement, hosts, daemons, expected):
+ with pytest.raises(OrchestratorValidationError) as e:
+ hosts, to_add, to_remove = HostAssignment(
+ spec=ServiceSpec(service_type, placement=placement),
+ hosts=[HostSpec(h) for h in hosts],
+ unreachable_hosts=[],
+ daemons=daemons,
+ ).place()
+ assert str(e.value) == expected
+
+
+class ActiveAssignmentTest(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected: List[List[str]]
+ expected_add: List[List[str]]
+ expected_remove: List[List[str]]
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected,expected_add,expected_remove",
+ [
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1', is_active=True),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3'),
+ ],
+ [['host1', 'host2'], ['host1', 'host3']],
+ [[]],
+ [['mgr.b'], ['mgr.c']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host1', 'host3'], ['host2', 'host3']],
+ [[]],
+ [['mgr.a'], ['mgr.b']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2', is_active=True),
+ DaemonDescription('mgr', 'c', 'host3'),
+ ],
+ [['host2']],
+ [[]],
+ [['mgr.a', 'mgr.c']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host3']],
+ [[]],
+ [['mgr.a', 'mgr.b']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1', is_active=True),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host1'], ['host3']],
+ [[]],
+ [['mgr.a', 'mgr.b'], ['mgr.b', 'mgr.c']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2', is_active=True),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host2', 'host3']],
+ [[]],
+ [['mgr.a']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1', is_active=True),
+ DaemonDescription('mgr', 'b', 'host2', is_active=True),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host1'], ['host2'], ['host3']],
+ [[]],
+ [['mgr.a', 'mgr.b'], ['mgr.b', 'mgr.c'], ['mgr.a', 'mgr.c']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1', is_active=True),
+ DaemonDescription('mgr', 'a2', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3'),
+ ],
+ [['host1']],
+ [[]],
+ [['mgr.a2', 'mgr.b', 'mgr.c']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1', is_active=True),
+ DaemonDescription('mgr', 'a2', 'host1', is_active=True),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3'),
+ ],
+ [['host1']],
+ [[]],
+ [['mgr.a', 'mgr.b', 'mgr.c'], ['mgr.a2', 'mgr.b', 'mgr.c']]
+ ),
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1', is_active=True),
+ DaemonDescription('mgr', 'a2', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host1', 'host3']],
+ [[]],
+ [['mgr.a2', 'mgr.b']]
+ ),
+ # Explicit placement should override preference for active daemon
+ ActiveAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=1, hosts=['host1']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [['host1']],
+ [[]],
+ [['mgr.b', 'mgr.c']]
+ ),
+
+ ])
+def test_active_assignment(service_type, placement, hosts, daemons, expected, expected_add, expected_remove):
+
+ spec = ServiceSpec(service_type=service_type,
+ service_id=None,
+ placement=placement)
+
+ hosts, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=[HostSpec(h) for h in hosts],
+ unreachable_hosts=[],
+ daemons=daemons,
+ ).place()
+ assert sorted([h.hostname for h in hosts]) in expected
+ assert sorted([h.hostname for h in to_add]) in expected_add
+ assert sorted([h.name() for h in to_remove]) in expected_remove
+
+
+class UnreachableHostsTest(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ unreachables_hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected_add: List[List[str]]
+ expected_remove: List[List[str]]
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,unreachable_hosts,daemons,expected_add,expected_remove",
+ [
+ UnreachableHostsTest(
+ 'mgr',
+ PlacementSpec(count=3),
+ 'host1 host2 host3'.split(),
+ ['host2'],
+ [],
+ [['host1', 'host3']],
+ [[]],
+ ),
+ UnreachableHostsTest(
+ 'mgr',
+ PlacementSpec(hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ ['host1'],
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [[]],
+ [['mgr.b']],
+ ),
+ UnreachableHostsTest(
+ 'mgr',
+ PlacementSpec(count=3),
+ 'host1 host2 host3 host4'.split(),
+ ['host1'],
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [[]],
+ [[]],
+ ),
+ UnreachableHostsTest(
+ 'mgr',
+ PlacementSpec(count=1),
+ 'host1 host2 host3 host4'.split(),
+ 'host1 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ DaemonDescription('mgr', 'c', 'host3', is_active=True),
+ ],
+ [[]],
+ [['mgr.b']],
+ ),
+ UnreachableHostsTest(
+ 'mgr',
+ PlacementSpec(count=3),
+ 'host1 host2 host3 host4'.split(),
+ ['host2'],
+ [],
+ [['host1', 'host3', 'host4']],
+ [[]],
+ ),
+ UnreachableHostsTest(
+ 'mgr',
+ PlacementSpec(count=3),
+ 'host1 host2 host3 host4'.split(),
+ 'host1 host4'.split(),
+ [],
+ [['host2', 'host3']],
+ [[]],
+ ),
+
+ ])
+def test_unreachable_host(service_type, placement, hosts, unreachable_hosts, daemons, expected_add, expected_remove):
+
+ spec = ServiceSpec(service_type=service_type,
+ service_id=None,
+ placement=placement)
+
+ hosts, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=[HostSpec(h) for h in hosts],
+ unreachable_hosts=[HostSpec(h) for h in unreachable_hosts],
+ daemons=daemons,
+ ).place()
+ assert sorted([h.hostname for h in to_add]) in expected_add
+ assert sorted([h.name() for h in to_remove]) in expected_remove
+
+
+class RescheduleFromOfflineTest(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ maintenance_hosts: List[str]
+ offline_hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected_add: List[List[str]]
+ expected_remove: List[List[str]]
+
+
+@pytest.mark.parametrize("service_type,placement,hosts,maintenance_hosts,offline_hosts,daemons,expected_add,expected_remove",
+ [
+ RescheduleFromOfflineTest(
+ 'nfs',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [],
+ ['host2'],
+ [
+ DaemonDescription('nfs', 'a', 'host1'),
+ DaemonDescription('nfs', 'b', 'host2'),
+ ],
+ [['host3']],
+ [[]],
+ ),
+ RescheduleFromOfflineTest(
+ 'nfs',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ ['host2'],
+ [],
+ [
+ DaemonDescription('nfs', 'a', 'host1'),
+ DaemonDescription('nfs', 'b', 'host2'),
+ ],
+ [[]],
+ [[]],
+ ),
+ RescheduleFromOfflineTest(
+ 'mon',
+ PlacementSpec(count=2),
+ 'host1 host2 host3'.split(),
+ [],
+ ['host2'],
+ [
+ DaemonDescription('mon', 'a', 'host1'),
+ DaemonDescription('mon', 'b', 'host2'),
+ ],
+ [[]],
+ [[]],
+ ),
+ ])
+def test_remove_from_offline(service_type, placement, hosts, maintenance_hosts, offline_hosts, daemons, expected_add, expected_remove):
+
+ spec = ServiceSpec(service_type=service_type,
+ service_id='test',
+ placement=placement)
+
+ host_specs = [HostSpec(h) for h in hosts]
+ for h in host_specs:
+ if h.hostname in offline_hosts:
+ h.status = 'offline'
+ if h.hostname in maintenance_hosts:
+ h.status = 'maintenance'
+
+ hosts, to_add, to_remove = HostAssignment(
+ spec=spec,
+ hosts=host_specs,
+ unreachable_hosts=[h for h in host_specs if h.status],
+ daemons=daemons,
+ ).place()
+ assert sorted([h.hostname for h in to_add]) in expected_add
+ assert sorted([h.name() for h in to_remove]) in expected_remove
diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py
new file mode 100644
index 000000000..57cd12456
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_services.py
@@ -0,0 +1,1031 @@
+from textwrap import dedent
+import json
+import yaml
+
+import pytest
+
+from unittest.mock import MagicMock, call, patch, ANY
+
+from cephadm.serve import CephadmServe
+from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
+ RbdMirrorService, CrashService, CephadmDaemonDeploySpec
+from cephadm.services.iscsi import IscsiService
+from cephadm.services.nfs import NFSService
+from cephadm.services.osd import OSDService
+from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
+ NodeExporterService
+from cephadm.services.exporter import CephadmExporter
+from cephadm.module import CephadmOrchestrator
+from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec, \
+ ServiceSpec, RGWSpec, GrafanaSpec, SNMPGatewaySpec, IngressSpec, PlacementSpec
+from cephadm.tests.fixtures import with_host, with_service, _run_cephadm
+
+from orchestrator import OrchestratorError
+from orchestrator._interface import DaemonDescription
+
+
+class FakeInventory:
+ def get_addr(self, name: str) -> str:
+ return '1.2.3.4'
+
+
+class FakeMgr:
+ def __init__(self):
+ self.config = ''
+ self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
+ self.mon_command = MagicMock(side_effect=self._check_mon_command)
+ self.template = MagicMock()
+ self.log = MagicMock()
+ self.inventory = FakeInventory()
+
+ def _check_mon_command(self, cmd_dict, inbuf=None):
+ prefix = cmd_dict.get('prefix')
+ if prefix == 'get-cmd':
+ return 0, self.config, ''
+ if prefix == 'set-cmd':
+ self.config = cmd_dict.get('value')
+ return 0, 'value set', ''
+ return -1, '', 'error'
+
+ def get_minimal_ceph_conf(self) -> str:
+ return ''
+
+ def get_mgr_ip(self) -> str:
+ return '1.2.3.4'
+
+
+class TestCephadmService:
+ def test_set_service_url_on_dashboard(self):
+ # pylint: disable=protected-access
+ mgr = FakeMgr()
+ service_url = 'http://svc:1000'
+ service = GrafanaService(mgr)
+ service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
+ assert mgr.config == service_url
+
+ # set-cmd should not be called if value doesn't change
+ mgr.check_mon_command.reset_mock()
+ service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
+ mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'})
+
+ def _get_services(self, mgr):
+ # services:
+ osd_service = OSDService(mgr)
+ nfs_service = NFSService(mgr)
+ mon_service = MonService(mgr)
+ mgr_service = MgrService(mgr)
+ mds_service = MdsService(mgr)
+ rgw_service = RgwService(mgr)
+ rbd_mirror_service = RbdMirrorService(mgr)
+ grafana_service = GrafanaService(mgr)
+ alertmanager_service = AlertmanagerService(mgr)
+ prometheus_service = PrometheusService(mgr)
+ node_exporter_service = NodeExporterService(mgr)
+ crash_service = CrashService(mgr)
+ iscsi_service = IscsiService(mgr)
+ cephadm_exporter_service = CephadmExporter(mgr)
+ cephadm_services = {
+ 'mon': mon_service,
+ 'mgr': mgr_service,
+ 'osd': osd_service,
+ 'mds': mds_service,
+ 'rgw': rgw_service,
+ 'rbd-mirror': rbd_mirror_service,
+ 'nfs': nfs_service,
+ 'grafana': grafana_service,
+ 'alertmanager': alertmanager_service,
+ 'prometheus': prometheus_service,
+ 'node-exporter': node_exporter_service,
+ 'crash': crash_service,
+ 'iscsi': iscsi_service,
+ 'cephadm-exporter': cephadm_exporter_service,
+ }
+ return cephadm_services
+
+ def test_get_auth_entity(self):
+ mgr = FakeMgr()
+ cephadm_services = self._get_services(mgr)
+
+ for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
+ assert "client.%s.id1" % (daemon_type) == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ assert "client.%s.id1" % (daemon_type) == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "")
+ assert "client.%s.id1" % (daemon_type) == \
+ cephadm_services[daemon_type].get_auth_entity("id1")
+
+ assert "client.crash.host" == \
+ cephadm_services["crash"].get_auth_entity("id1", "host")
+ with pytest.raises(OrchestratorError):
+ cephadm_services["crash"].get_auth_entity("id1", "")
+ cephadm_services["crash"].get_auth_entity("id1")
+
+ assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "host")
+ assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "")
+ assert "mon." == cephadm_services["mon"].get_auth_entity("id1")
+
+ assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "host")
+ assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "")
+ assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1")
+
+ for daemon_type in ["osd", "mds"]:
+ assert "%s.id1" % daemon_type == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ assert "%s.id1" % daemon_type == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "")
+ assert "%s.id1" % daemon_type == \
+ cephadm_services[daemon_type].get_auth_entity("id1")
+
+ # services based on CephadmService shouldn't have get_auth_entity
+ with pytest.raises(AttributeError):
+ for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter', 'cephadm-exporter']:
+ cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ cephadm_services[daemon_type].get_auth_entity("id1", "")
+ cephadm_services[daemon_type].get_auth_entity("id1")
+
+
+class TestISCSIService:
+
+ mgr = FakeMgr()
+ iscsi_service = IscsiService(mgr)
+
+ iscsi_spec = IscsiServiceSpec(service_type='iscsi', service_id="a")
+ iscsi_spec.daemon_type = "iscsi"
+ iscsi_spec.daemon_id = "a"
+ iscsi_spec.spec = MagicMock()
+ iscsi_spec.spec.daemon_type = "iscsi"
+ iscsi_spec.spec.ssl_cert = ''
+ iscsi_spec.api_user = "user"
+ iscsi_spec.api_password = "password"
+ iscsi_spec.api_port = 5000
+ iscsi_spec.api_secure = False
+ iscsi_spec.ssl_cert = "cert"
+ iscsi_spec.ssl_key = "key"
+
+ mgr.spec_store = MagicMock()
+ mgr.spec_store.all_specs.get.return_value = iscsi_spec
+
+ def test_iscsi_client_caps(self):
+
+ iscsi_daemon_spec = CephadmDaemonDeploySpec(
+ host='host', daemon_id='a', service_name=self.iscsi_spec.service_name())
+
+ self.iscsi_service.prepare_create(iscsi_daemon_spec)
+
+ expected_caps = ['mon',
+ 'profile rbd, allow command "osd blocklist", allow command "config-key get" with "key" prefix "iscsi/"',
+ 'mgr', 'allow command "service status"',
+ 'osd', 'allow rwx']
+
+ expected_call = call({'prefix': 'auth get-or-create',
+ 'entity': 'client.iscsi.a',
+ 'caps': expected_caps})
+ expected_call2 = call({'prefix': 'auth caps',
+ 'entity': 'client.iscsi.a',
+ 'caps': expected_caps})
+
+ assert expected_call in self.mgr.mon_command.mock_calls
+ assert expected_call2 in self.mgr.mon_command.mock_calls
+
+ @patch('cephadm.utils.resolve_ip')
+ def test_iscsi_dashboard_config(self, mock_resolve_ip):
+
+ self.mgr.check_mon_command = MagicMock()
+ self.mgr.check_mon_command.return_value = ('', '{"gateways": {}}', '')
+
+ # Case 1: use IPV4 address
+ id1 = DaemonDescription(daemon_type='iscsi', hostname="testhost1",
+ daemon_id="a", ip='192.168.1.1')
+ daemon_list = [id1]
+ mock_resolve_ip.return_value = '192.168.1.1'
+
+ self.iscsi_service.config_dashboard(daemon_list)
+
+ dashboard_expected_call = call({'prefix': 'dashboard iscsi-gateway-add',
+ 'name': 'testhost1'},
+ 'http://user:password@192.168.1.1:5000')
+
+ assert dashboard_expected_call in self.mgr.check_mon_command.mock_calls
+
+ # Case 2: use IPV6 address
+ self.mgr.check_mon_command.reset_mock()
+
+ id1 = DaemonDescription(daemon_type='iscsi', hostname="testhost1",
+ daemon_id="a", ip='FEDC:BA98:7654:3210:FEDC:BA98:7654:3210')
+ mock_resolve_ip.return_value = 'FEDC:BA98:7654:3210:FEDC:BA98:7654:3210'
+
+ self.iscsi_service.config_dashboard(daemon_list)
+
+ dashboard_expected_call = call({'prefix': 'dashboard iscsi-gateway-add',
+ 'name': 'testhost1'},
+ 'http://user:password@[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:5000')
+
+ assert dashboard_expected_call in self.mgr.check_mon_command.mock_calls
+
+ # Case 3: IPV6 Address . Secure protocol
+ self.mgr.check_mon_command.reset_mock()
+
+ self.iscsi_spec.api_secure = True
+
+ self.iscsi_service.config_dashboard(daemon_list)
+
+ dashboard_expected_call = call({'prefix': 'dashboard iscsi-gateway-add',
+ 'name': 'testhost1'},
+ 'https://user:password@[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:5000')
+
+ assert dashboard_expected_call in self.mgr.check_mon_command.mock_calls
+
+
+class TestMonitoring:
+ def _get_config(self, url: str) -> str:
+ return f"""
+ # This file is generated by cephadm.
+ # See https://prometheus.io/docs/alerting/configuration/ for documentation.
+
+ global:
+ resolve_timeout: 5m
+ http_config:
+ tls_config:
+ insecure_skip_verify: true
+
+ route:
+ receiver: 'default'
+ routes:
+ - group_by: ['alertname']
+ group_wait: 10s
+ group_interval: 10s
+ repeat_interval: 1h
+ receiver: 'ceph-dashboard'
+
+ receivers:
+ - name: 'default'
+ webhook_configs:
+ - name: 'ceph-dashboard'
+ webhook_configs:
+ - url: '{url}/api/prometheus_receiver'
+ """
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("mgr_module.MgrModule.get")
+ def test_alertmanager_config(self, mock_get, _run_cephadm,
+ cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ mock_get.return_value = {"services": {"dashboard": "http://[::1]:8080"}}
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, AlertManagerSpec()):
+ y = dedent(self._get_config('http://localhost:8080')).lstrip()
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'alertmanager.test',
+ 'deploy',
+ [
+ '--name', 'alertmanager.test',
+ '--meta-json', '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-', '--tcp-ports', '9093 9094'
+ ],
+ stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("mgr_module.MgrModule.get")
+ def test_alertmanager_config_v6(self, mock_get, _run_cephadm,
+ cephadm_module: CephadmOrchestrator):
+ dashboard_url = "http://[2001:db8:4321:0000:0000:0000:0000:0000]:8080"
+ _run_cephadm.return_value = ('{}', '', 0)
+ mock_get.return_value = {"services": {"dashboard": dashboard_url}}
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, AlertManagerSpec()):
+ y = dedent(self._get_config(dashboard_url)).lstrip()
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'alertmanager.test',
+ 'deploy',
+ [
+ '--name', 'alertmanager.test',
+ '--meta-json',
+ '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-', '--tcp-ports', '9093 9094'
+ ],
+ stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("mgr_module.MgrModule.get")
+ @patch("socket.getfqdn")
+ def test_alertmanager_config_v6_fqdn(self, mock_getfqdn, mock_get, _run_cephadm,
+ cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ mock_getfqdn.return_value = "mgr.test.fqdn"
+ mock_get.return_value = {"services": {
+ "dashboard": "http://[2001:db8:4321:0000:0000:0000:0000:0000]:8080"}}
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, AlertManagerSpec()):
+ y = dedent(self._get_config("http://mgr.test.fqdn:8080")).lstrip()
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'alertmanager.test',
+ 'deploy',
+ [
+ '--name', 'alertmanager.test',
+ '--meta-json',
+ '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-', '--tcp-ports', '9093 9094'
+ ],
+ stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("mgr_module.MgrModule.get")
+ def test_alertmanager_config_v4(self, mock_get, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ dashboard_url = "http://192.168.0.123:8080"
+ _run_cephadm.return_value = ('{}', '', 0)
+ mock_get.return_value = {"services": {"dashboard": dashboard_url}}
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, AlertManagerSpec()):
+ y = dedent(self._get_config(dashboard_url)).lstrip()
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'alertmanager.test',
+ 'deploy',
+ [
+ '--name', 'alertmanager.test',
+ '--meta-json', '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-', '--tcp-ports', '9093 9094'
+ ],
+ stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("mgr_module.MgrModule.get")
+ @patch("socket.getfqdn")
+ def test_alertmanager_config_v4_fqdn(self, mock_getfqdn, mock_get, _run_cephadm,
+ cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ mock_getfqdn.return_value = "mgr.test.fqdn"
+ mock_get.return_value = {"services": {"dashboard": "http://192.168.0.123:8080"}}
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, AlertManagerSpec()):
+ y = dedent(self._get_config("http://mgr.test.fqdn:8080")).lstrip()
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'alertmanager.test',
+ 'deploy',
+ [
+ '--name', 'alertmanager.test',
+ '--meta-json',
+ '{"service_name": "alertmanager", "ports": [9093, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-', '--tcp-ports', '9093 9094'
+ ],
+ stdin=json.dumps({"files": {"alertmanager.yml": y}, "peers": []}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_prometheus_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
+ with_service(cephadm_module, MonitoringSpec('prometheus')) as _:
+
+ y = dedent("""
+ # This file is generated by cephadm.
+ global:
+ scrape_interval: 10s
+ evaluation_interval: 10s
+ rule_files:
+ - /etc/prometheus/alerting/*
+ scrape_configs:
+ - job_name: 'ceph'
+ honor_labels: true
+ static_configs:
+ - targets:
+ - '[::1]:9283'
+
+ - job_name: 'node'
+ static_configs:
+ - targets: ['[1::4]:9100']
+ labels:
+ instance: 'test'
+
+ """).lstrip()
+
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'prometheus.test',
+ 'deploy',
+ [
+ '--name', 'prometheus.test',
+ '--meta-json',
+ '{"service_name": "prometheus", "ports": [9095], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--tcp-ports', '9095'
+ ],
+ stdin=json.dumps({"files": {"prometheus.yml": y,
+ "/etc/prometheus/alerting/custom_alerts.yml": ""},
+ 'retention_time': '15d'}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
+ @patch("cephadm.services.monitoring.verify_tls", lambda *_: None)
+ def test_grafana_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_store('test/grafana_crt', 'c')
+ cephadm_module.set_store('test/grafana_key', 'k')
+ with with_service(cephadm_module, MonitoringSpec('prometheus')) as _, \
+ with_service(cephadm_module, GrafanaSpec('grafana')) as _:
+ files = {
+ 'grafana.ini': dedent("""
+ # This file is generated by cephadm.
+ [users]
+ default_theme = light
+ [auth.anonymous]
+ enabled = true
+ org_name = 'Main Org.'
+ org_role = 'Viewer'
+ [server]
+ domain = 'bootstrap.storage.lab'
+ protocol = https
+ cert_file = /etc/grafana/certs/cert_file
+ cert_key = /etc/grafana/certs/cert_key
+ http_port = 3000
+ http_addr =
+ [security]
+ disable_initial_admin_creation = true
+ cookie_secure = true
+ cookie_samesite = none
+ allow_embedding = true""").lstrip(), # noqa: W291
+ 'provisioning/datasources/ceph-dashboard.yml': dedent("""
+ # This file is generated by cephadm.
+ deleteDatasources:
+ - name: 'Dashboard1'
+ orgId: 1
+
+ datasources:
+ - name: 'Dashboard1'
+ type: 'prometheus'
+ access: 'proxy'
+ orgId: 1
+ url: 'http://[1::4]:9095'
+ basicAuth: false
+ isDefault: true
+ editable: false
+ """).lstrip(),
+ 'certs/cert_file': dedent("""
+ # generated by cephadm
+ c""").lstrip(),
+ 'certs/cert_key': dedent("""
+ # generated by cephadm
+ k""").lstrip(),
+ }
+
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'grafana.test',
+ 'deploy',
+ [
+ '--name', 'grafana.test',
+ '--meta-json',
+ '{"service_name": "grafana", "ports": [3000], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-', '--tcp-ports', '3000'],
+ stdin=json.dumps({"files": files}),
+ image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_grafana_initial_admin_pw(self, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, GrafanaSpec(initial_admin_password='secure')):
+ out = cephadm_module.cephadm_services['grafana'].generate_config(
+ CephadmDaemonDeploySpec('test', 'daemon', 'grafana'))
+ assert out == (
+ {
+ 'files':
+ {
+ 'certs/cert_file': ANY,
+ 'certs/cert_key': ANY,
+ 'grafana.ini':
+ '# This file is generated by cephadm.\n'
+ '[users]\n'
+ ' default_theme = light\n'
+ '[auth.anonymous]\n'
+ ' enabled = true\n'
+ " org_name = 'Main Org.'\n"
+ " org_role = 'Viewer'\n"
+ '[server]\n'
+ " domain = 'bootstrap.storage.lab'\n"
+ ' protocol = https\n'
+ ' cert_file = /etc/grafana/certs/cert_file\n'
+ ' cert_key = /etc/grafana/certs/cert_key\n'
+ ' http_port = 3000\n'
+ ' http_addr = \n'
+ '[security]\n'
+ ' admin_user = admin\n'
+ ' admin_password = secure\n'
+ ' cookie_secure = true\n'
+ ' cookie_samesite = none\n'
+ ' allow_embedding = true',
+ 'provisioning/datasources/ceph-dashboard.yml':
+ '# This file is generated by cephadm.\n'
+ 'deleteDatasources:\n'
+ '\n'
+ 'datasources:\n'
+ }
+ },
+ [],
+ )
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_monitoring_ports(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+
+ yaml_str = """service_type: alertmanager
+service_name: alertmanager
+placement:
+ count: 1
+spec:
+ port: 4200
+"""
+ yaml_file = yaml.safe_load(yaml_str)
+ spec = ServiceSpec.from_json(yaml_file)
+
+ with patch("cephadm.services.monitoring.AlertmanagerService.generate_config", return_value=({}, [])):
+ with with_service(cephadm_module, spec):
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ _run_cephadm.assert_called_with(
+ 'test', 'alertmanager.test', 'deploy', [
+ '--name', 'alertmanager.test',
+ '--meta-json', '{"service_name": "alertmanager", "ports": [4200, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--tcp-ports', '4200 9094',
+ '--reconfig'
+ ],
+ stdin='{}',
+ image='')
+
+
+class TestRGWService:
+
+ @pytest.mark.parametrize(
+ "frontend, ssl, expected",
+ [
+ ('beast', False, 'beast endpoint=[fd00:fd00:fd00:3000::1]:80'),
+ ('beast', True,
+ 'beast ssl_endpoint=[fd00:fd00:fd00:3000::1]:443 ssl_certificate=config://rgw/cert/rgw.foo'),
+ ('civetweb', False, 'civetweb port=[fd00:fd00:fd00:3000::1]:80'),
+ ('civetweb', True,
+ 'civetweb port=[fd00:fd00:fd00:3000::1]:443s ssl_certificate=config://rgw/cert/rgw.foo'),
+ ]
+ )
+ @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_rgw_update(self, frontend, ssl, expected, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.cache.update_host_devices_networks(
+ 'host1',
+ dls=cephadm_module.cache.devices['host1'],
+ nets={
+ 'fd00:fd00:fd00:3000::/64': {
+ 'if0': ['fd00:fd00:fd00:3000::1']
+ }
+ })
+ s = RGWSpec(service_id="foo",
+ networks=['fd00:fd00:fd00:3000::/64'],
+ ssl=ssl,
+ rgw_frontend_type=frontend)
+ with with_service(cephadm_module, s) as dds:
+ _, f, _ = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': f'client.{dds[0]}',
+ 'key': 'rgw_frontends',
+ })
+ assert f == expected
+
+
+class TestSNMPGateway:
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_snmp_v2c_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ spec = SNMPGatewaySpec(
+ snmp_version='V2c',
+ snmp_destination='192.168.1.1:162',
+ credentials={
+ 'snmp_community': 'public'
+ })
+
+ config = {
+ "destination": spec.snmp_destination,
+ "snmp_version": spec.snmp_version,
+ "snmp_community": spec.credentials.get('snmp_community')
+ }
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'snmp-gateway.test',
+ 'deploy',
+ [
+ '--name', 'snmp-gateway.test',
+ '--meta-json',
+ '{"service_name": "snmp-gateway", "ports": [9464], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--tcp-ports', '9464'
+ ],
+ stdin=json.dumps(config),
+ image=''
+ )
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_snmp_v2c_with_port(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ spec = SNMPGatewaySpec(
+ snmp_version='V2c',
+ snmp_destination='192.168.1.1:162',
+ credentials={
+ 'snmp_community': 'public'
+ },
+ port=9465)
+
+ config = {
+ "destination": spec.snmp_destination,
+ "snmp_version": spec.snmp_version,
+ "snmp_community": spec.credentials.get('snmp_community')
+ }
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'snmp-gateway.test',
+ 'deploy',
+ [
+ '--name', 'snmp-gateway.test',
+ '--meta-json',
+ '{"service_name": "snmp-gateway", "ports": [9465], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--tcp-ports', '9465'
+ ],
+ stdin=json.dumps(config),
+ image=''
+ )
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_snmp_v3nopriv_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ spec = SNMPGatewaySpec(
+ snmp_version='V3',
+ snmp_destination='192.168.1.1:162',
+ engine_id='8000C53F00000000',
+ credentials={
+ 'snmp_v3_auth_username': 'myuser',
+ 'snmp_v3_auth_password': 'mypassword'
+ })
+
+ config = {
+ 'destination': spec.snmp_destination,
+ 'snmp_version': spec.snmp_version,
+ 'snmp_v3_auth_protocol': 'SHA',
+ 'snmp_v3_auth_username': 'myuser',
+ 'snmp_v3_auth_password': 'mypassword',
+ 'snmp_v3_engine_id': '8000C53F00000000'
+ }
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'snmp-gateway.test',
+ 'deploy',
+ [
+ '--name', 'snmp-gateway.test',
+ '--meta-json',
+ '{"service_name": "snmp-gateway", "ports": [9464], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--tcp-ports', '9464'
+ ],
+ stdin=json.dumps(config),
+ image=''
+ )
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_snmp_v3priv_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ spec = SNMPGatewaySpec(
+ snmp_version='V3',
+ snmp_destination='192.168.1.1:162',
+ engine_id='8000C53F00000000',
+ auth_protocol='MD5',
+ privacy_protocol='AES',
+ credentials={
+ 'snmp_v3_auth_username': 'myuser',
+ 'snmp_v3_auth_password': 'mypassword',
+ 'snmp_v3_priv_password': 'mysecret',
+ })
+
+ config = {
+ 'destination': spec.snmp_destination,
+ 'snmp_version': spec.snmp_version,
+ 'snmp_v3_auth_protocol': 'MD5',
+ 'snmp_v3_auth_username': spec.credentials.get('snmp_v3_auth_username'),
+ 'snmp_v3_auth_password': spec.credentials.get('snmp_v3_auth_password'),
+ 'snmp_v3_engine_id': '8000C53F00000000',
+ 'snmp_v3_priv_protocol': spec.privacy_protocol,
+ 'snmp_v3_priv_password': spec.credentials.get('snmp_v3_priv_password'),
+ }
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'snmp-gateway.test',
+ 'deploy',
+ [
+ '--name', 'snmp-gateway.test',
+ '--meta-json',
+ '{"service_name": "snmp-gateway", "ports": [9464], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null, "extra_container_args": null}',
+ '--config-json', '-',
+ '--tcp-ports', '9464'
+ ],
+ stdin=json.dumps(config),
+ image=''
+ )
+
+
+class TestIngressService:
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_ingress_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.cache.update_host_devices_networks(
+ 'test',
+ cephadm_module.cache.devices['test'],
+ {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.4/32']
+ }
+ }
+ )
+
+ # the ingress backend
+ s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1),
+ rgw_frontend_type='beast')
+
+ ispec = IngressSpec(service_type='ingress',
+ service_id='test',
+ backend_service='rgw.foo',
+ frontend_port=8089,
+ monitor_port=8999,
+ monitor_user='admin',
+ monitor_password='12345',
+ keepalived_password='12345',
+ virtual_interface_networks=['1.2.3.0/24'],
+ virtual_ip="1.2.3.4/32")
+ with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
+ # generate the keepalived conf based on the specified spec
+ keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
+
+ keepalived_expected_conf = {
+ 'files':
+ {
+ 'keepalived.conf':
+ '# This file is generated by cephadm.\n'
+ 'vrrp_script check_backend {\n '
+ 'script "/usr/bin/curl http://localhost:8999/health"\n '
+ 'weight -20\n '
+ 'interval 2\n '
+ 'rise 2\n '
+ 'fall 2\n}\n\n'
+ 'vrrp_instance VI_0 {\n '
+ 'state MASTER\n '
+ 'priority 100\n '
+ 'interface if0\n '
+ 'virtual_router_id 50\n '
+ 'advert_int 1\n '
+ 'authentication {\n '
+ 'auth_type PASS\n '
+ 'auth_pass 12345\n '
+ '}\n '
+ 'unicast_src_ip 1::4\n '
+ 'unicast_peer {\n '
+ '}\n '
+ 'virtual_ipaddress {\n '
+ '1.2.3.4/32 dev if0\n '
+ '}\n '
+ 'track_script {\n '
+ 'check_backend\n }\n'
+ '}\n'
+ }
+ }
+
+ # check keepalived config
+ assert keepalived_generated_conf[0] == keepalived_expected_conf
+
+ # generate the haproxy conf based on the specified spec
+ haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
+
+ haproxy_expected_conf = {
+ 'files':
+ {
+ 'haproxy.cfg':
+ '# This file is generated by cephadm.'
+ '\nglobal\n log '
+ '127.0.0.1 local2\n '
+ 'chroot /var/lib/haproxy\n '
+ 'pidfile /var/lib/haproxy/haproxy.pid\n '
+ 'maxconn 8000\n '
+ 'daemon\n '
+ 'stats socket /var/lib/haproxy/stats\n'
+ '\ndefaults\n '
+ 'mode http\n '
+ 'log global\n '
+ 'option httplog\n '
+ 'option dontlognull\n '
+ 'option http-server-close\n '
+ 'option forwardfor except 127.0.0.0/8\n '
+ 'option redispatch\n '
+ 'retries 3\n '
+ 'timeout queue 20s\n '
+ 'timeout connect 5s\n '
+ 'timeout http-request 1s\n '
+ 'timeout http-keep-alive 5s\n '
+ 'timeout client 1s\n '
+ 'timeout server 1s\n '
+ 'timeout check 5s\n '
+ 'maxconn 8000\n'
+ '\nfrontend stats\n '
+ 'mode http\n '
+ 'bind 1.2.3.4:8999\n '
+ 'bind localhost:8999\n '
+ 'stats enable\n '
+ 'stats uri /stats\n '
+ 'stats refresh 10s\n '
+ 'stats auth admin:12345\n '
+ 'http-request use-service prometheus-exporter if { path /metrics }\n '
+ 'monitor-uri /health\n'
+ '\nfrontend frontend\n '
+ 'bind 1.2.3.4:8089\n '
+ 'default_backend backend\n\n'
+ 'backend backend\n '
+ 'option forwardfor\n '
+ 'balance static-rr\n '
+ 'option httpchk HEAD / HTTP/1.0\n '
+ 'server ' + haproxy_generated_conf[1][0] + ' 1::4:80 check weight 100\n'
+ }
+ }
+
+ assert haproxy_generated_conf[0] == haproxy_expected_conf
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_ingress_config_multi_vips(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.cache.update_host_devices_networks('test', [], {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.4/32']
+ }
+ })
+
+ # Check the ingress with multiple VIPs
+ s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1),
+ rgw_frontend_type='beast')
+
+ ispec = IngressSpec(service_type='ingress',
+ service_id='test',
+ backend_service='rgw.foo',
+ frontend_port=8089,
+ monitor_port=8999,
+ monitor_user='admin',
+ monitor_password='12345',
+ keepalived_password='12345',
+ virtual_interface_networks=['1.2.3.0/24'],
+ virtual_ips_list=["1.2.3.4/32"])
+ with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
+ # generate the keepalived conf based on the specified spec
+ # Test with only 1 IP on the list, as it will fail with more VIPS but only one host.
+ keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
+
+ keepalived_expected_conf = {
+ 'files':
+ {
+ 'keepalived.conf':
+ '# This file is generated by cephadm.\n'
+ 'vrrp_script check_backend {\n '
+ 'script "/usr/bin/curl http://localhost:8999/health"\n '
+ 'weight -20\n '
+ 'interval 2\n '
+ 'rise 2\n '
+ 'fall 2\n}\n\n'
+ 'vrrp_instance VI_0 {\n '
+ 'state MASTER\n '
+ 'priority 100\n '
+ 'interface if0\n '
+ 'virtual_router_id 50\n '
+ 'advert_int 1\n '
+ 'authentication {\n '
+ 'auth_type PASS\n '
+ 'auth_pass 12345\n '
+ '}\n '
+ 'unicast_src_ip 1::4\n '
+ 'unicast_peer {\n '
+ '}\n '
+ 'virtual_ipaddress {\n '
+ '1.2.3.4/32 dev if0\n '
+ '}\n '
+ 'track_script {\n '
+ 'check_backend\n }\n'
+ '}\n'
+ }
+ }
+
+ # check keepalived config
+ assert keepalived_generated_conf[0] == keepalived_expected_conf
+
+ # generate the haproxy conf based on the specified spec
+ haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
+
+ haproxy_expected_conf = {
+ 'files':
+ {
+ 'haproxy.cfg':
+ '# This file is generated by cephadm.'
+ '\nglobal\n log '
+ '127.0.0.1 local2\n '
+ 'chroot /var/lib/haproxy\n '
+ 'pidfile /var/lib/haproxy/haproxy.pid\n '
+ 'maxconn 8000\n '
+ 'daemon\n '
+ 'stats socket /var/lib/haproxy/stats\n'
+ '\ndefaults\n '
+ 'mode http\n '
+ 'log global\n '
+ 'option httplog\n '
+ 'option dontlognull\n '
+ 'option http-server-close\n '
+ 'option forwardfor except 127.0.0.0/8\n '
+ 'option redispatch\n '
+ 'retries 3\n '
+ 'timeout queue 20s\n '
+ 'timeout connect 5s\n '
+ 'timeout http-request 1s\n '
+ 'timeout http-keep-alive 5s\n '
+ 'timeout client 1s\n '
+ 'timeout server 1s\n '
+ 'timeout check 5s\n '
+ 'maxconn 8000\n'
+ '\nfrontend stats\n '
+ 'mode http\n '
+ 'bind *:8999\n '
+ 'bind localhost:8999\n '
+ 'stats enable\n '
+ 'stats uri /stats\n '
+ 'stats refresh 10s\n '
+ 'stats auth admin:12345\n '
+ 'http-request use-service prometheus-exporter if { path /metrics }\n '
+ 'monitor-uri /health\n'
+ '\nfrontend frontend\n '
+ 'bind *:8089\n '
+ 'default_backend backend\n\n'
+ 'backend backend\n '
+ 'option forwardfor\n '
+ 'balance static-rr\n '
+ 'option httpchk HEAD / HTTP/1.0\n '
+ 'server '
+ + haproxy_generated_conf[1][0] + ' 1::4:80 check weight 100\n'
+ }
+ }
+
+ assert haproxy_generated_conf[0] == haproxy_expected_conf
+
+
+class TestCephFsMirror:
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec('cephfs-mirror')):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'mgr module enable',
+ 'module': 'mirroring'
+ })
diff --git a/src/pybind/mgr/cephadm/tests/test_spec.py b/src/pybind/mgr/cephadm/tests/test_spec.py
new file mode 100644
index 000000000..54aa0a7ab
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_spec.py
@@ -0,0 +1,600 @@
+# Disable autopep8 for this file:
+
+# fmt: off
+
+import json
+
+import pytest
+
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
+ IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec
+from orchestrator import DaemonDescription, OrchestratorError
+
+
+@pytest.mark.parametrize(
+ "spec_json",
+ json.loads("""[
+{
+ "placement": {
+ "count": 1
+ },
+ "service_type": "alertmanager"
+},
+{
+ "placement": {
+ "host_pattern": "*"
+ },
+ "service_type": "crash"
+},
+{
+ "placement": {
+ "count": 1
+ },
+ "service_type": "grafana"
+},
+{
+ "placement": {
+ "count": 2
+ },
+ "service_type": "mgr"
+},
+{
+ "placement": {
+ "count": 5
+ },
+ "service_type": "mon"
+},
+{
+ "placement": {
+ "host_pattern": "*"
+ },
+ "service_type": "node-exporter"
+},
+{
+ "placement": {
+ "count": 1
+ },
+ "service_type": "prometheus"
+},
+{
+ "placement": {
+ "hosts": [
+ {
+ "hostname": "ceph-001",
+ "network": "",
+ "name": ""
+ }
+ ]
+ },
+ "service_type": "rgw",
+ "service_id": "default-rgw-realm.eu-central-1.1",
+ "rgw_realm": "default-rgw-realm",
+ "rgw_zone": "eu-central-1"
+},
+{
+ "service_type": "osd",
+ "service_id": "osd_spec_default",
+ "placement": {
+ "host_pattern": "*"
+ },
+ "data_devices": {
+ "model": "MC-55-44-XZ"
+ },
+ "db_devices": {
+ "model": "SSD-123-foo"
+ },
+ "wal_devices": {
+ "model": "NVME-QQQQ-987"
+ }
+}
+]
+""")
+)
+def test_spec_octopus(spec_json):
+ # https://tracker.ceph.com/issues/44934
+ # Those are real user data from early octopus.
+ # Please do not modify those JSON values.
+
+ spec = ServiceSpec.from_json(spec_json)
+
+ # just some verification that we can sill read old octopus specs
+ def convert_to_old_style_json(j):
+ j_c = dict(j.copy())
+ j_c.pop('service_name', None)
+ if 'spec' in j_c:
+ spec = j_c.pop('spec')
+ j_c.update(spec)
+ if 'placement' in j_c:
+ if 'hosts' in j_c['placement']:
+ j_c['placement']['hosts'] = [
+ {
+ 'hostname': HostPlacementSpec.parse(h).hostname,
+ 'network': HostPlacementSpec.parse(h).network,
+ 'name': HostPlacementSpec.parse(h).name
+ }
+ for h in j_c['placement']['hosts']
+ ]
+ j_c.pop('objectstore', None)
+ j_c.pop('filter_logic', None)
+ return j_c
+
+ assert spec_json == convert_to_old_style_json(spec.to_json())
+
+
+@pytest.mark.parametrize(
+ "dd_json",
+ json.loads("""[
+ {
+ "hostname": "ceph-001",
+ "container_id": "d94d7969094d",
+ "container_image_id": "0881eb8f169f5556a292b4e2c01d683172b12830a62a9225a98a8e206bb734f0",
+ "container_image_name": "docker.io/prom/alertmanager:latest",
+ "daemon_id": "ceph-001",
+ "daemon_type": "alertmanager",
+ "version": "0.20.0",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.725856",
+ "created": "2020-04-02T19:23:08.829543",
+ "started": "2020-04-03T07:29:16.932838",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "c4b036202241",
+ "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1",
+ "container_image_name": "docker.io/ceph/ceph:v15",
+ "daemon_id": "ceph-001",
+ "daemon_type": "crash",
+ "version": "15.2.0",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.725903",
+ "created": "2020-04-02T19:23:11.390694",
+ "started": "2020-04-03T07:29:16.910897",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "5b7b94b48f31",
+ "container_image_id": "87a51ecf0b1c9a7b187b21c1b071425dafea0d765a96d5bc371c791169b3d7f4",
+ "container_image_name": "docker.io/ceph/ceph-grafana:latest",
+ "daemon_id": "ceph-001",
+ "daemon_type": "grafana",
+ "version": "6.6.2",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.725950",
+ "created": "2020-04-02T19:23:52.025088",
+ "started": "2020-04-03T07:29:16.847972",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "9ca007280456",
+ "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1",
+ "container_image_name": "docker.io/ceph/ceph:v15",
+ "daemon_id": "ceph-001.gkjwqp",
+ "daemon_type": "mgr",
+ "version": "15.2.0",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.725807",
+ "created": "2020-04-02T19:22:18.648584",
+ "started": "2020-04-03T07:29:16.856153",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "3d1ba9a2b697",
+ "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1",
+ "container_image_name": "docker.io/ceph/ceph:v15",
+ "daemon_id": "ceph-001",
+ "daemon_type": "mon",
+ "version": "15.2.0",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.725715",
+ "created": "2020-04-02T19:22:13.863300",
+ "started": "2020-04-03T07:29:17.206024",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "36d026c68ba1",
+ "container_image_id": "e5a616e4b9cf68dfcad7782b78e118be4310022e874d52da85c55923fb615f87",
+ "container_image_name": "docker.io/prom/node-exporter:latest",
+ "daemon_id": "ceph-001",
+ "daemon_type": "node-exporter",
+ "version": "0.18.1",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.725996",
+ "created": "2020-04-02T19:23:53.880197",
+ "started": "2020-04-03T07:29:16.880044",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "faf76193cbfe",
+ "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1",
+ "container_image_name": "docker.io/ceph/ceph:v15",
+ "daemon_id": "0",
+ "daemon_type": "osd",
+ "version": "15.2.0",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.726088",
+ "created": "2020-04-02T20:35:02.991435",
+ "started": "2020-04-03T07:29:19.373956",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "f82505bae0f1",
+ "container_image_id": "204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1",
+ "container_image_name": "docker.io/ceph/ceph:v15",
+ "daemon_id": "1",
+ "daemon_type": "osd",
+ "version": "15.2.0",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.726134",
+ "created": "2020-04-02T20:35:17.142272",
+ "started": "2020-04-03T07:29:19.374002",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "container_id": "2708d84cd484",
+ "container_image_id": "358a0d2395fe711bb8258e8fb4b2d7865c0a9a6463969bcd1452ee8869ea6653",
+ "container_image_name": "docker.io/prom/prometheus:latest",
+ "daemon_id": "ceph-001",
+ "daemon_type": "prometheus",
+ "version": "2.17.1",
+ "status": 1,
+ "status_desc": "running",
+ "last_refresh": "2020-04-03T15:31:48.726042",
+ "created": "2020-04-02T19:24:10.281163",
+ "started": "2020-04-03T07:29:16.926292",
+ "is_active": false
+ },
+ {
+ "hostname": "ceph-001",
+ "daemon_id": "default-rgw-realm.eu-central-1.1.ceph-001.ytywjo",
+ "daemon_type": "rgw",
+ "status": 1,
+ "status_desc": "starting",
+ "is_active": false
+ }
+]""")
+)
+def test_dd_octopus(dd_json):
+ # https://tracker.ceph.com/issues/44934
+ # Those are real user data from early octopus.
+ # Please do not modify those JSON values.
+
+ # Convert datetime properties to old style.
+ # 2020-04-03T07:29:16.926292Z -> 2020-04-03T07:29:16.926292
+ def convert_to_old_style_json(j):
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if k in j:
+ j[k] = j[k].rstrip('Z')
+ del j['daemon_name']
+ return j
+
+ assert dd_json == convert_to_old_style_json(
+ DaemonDescription.from_json(dd_json).to_json())
+
+
+@pytest.mark.parametrize("spec,dd,valid",
+[ # noqa: E128
+ # https://tracker.ceph.com/issues/44934
+ (
+ RGWSpec(
+ service_id="foo",
+ rgw_realm="default-rgw-realm",
+ rgw_zone="eu-central-1",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="foo.ceph-001.ytywjo",
+ hostname="ceph-001",
+ ),
+ True
+ ),
+ (
+ # no realm
+ RGWSpec(
+ service_id="foo.bar",
+ rgw_zone="eu-central-1",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="foo.bar.ceph-001.ytywjo",
+ hostname="ceph-001",
+ ),
+ True
+ ),
+ (
+ # no realm or zone
+ RGWSpec(
+ service_id="bar",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="bar.host.domain.tld.ytywjo",
+ hostname="host.domain.tld",
+ ),
+ True
+ ),
+ (
+ # explicit naming
+ RGWSpec(
+ service_id="realm.zone",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="realm.zone.a",
+ hostname="smithi028",
+ ),
+ True
+ ),
+ (
+ # without host
+ RGWSpec(
+ service_type='rgw',
+ service_id="foo",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="foo.hostname.ytywjo",
+ hostname=None,
+ ),
+ False
+ ),
+ (
+ # without host (2)
+ RGWSpec(
+ service_type='rgw',
+ service_id="default-rgw-realm.eu-central-1.1",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="default-rgw-realm.eu-central-1.1.hostname.ytywjo",
+ hostname=None,
+ ),
+ False
+ ),
+ (
+ # service_id contains hostname
+ # (sort of) https://tracker.ceph.com/issues/45294
+ RGWSpec(
+ service_id="default.rgw.realm.ceph.001",
+ ),
+ DaemonDescription(
+ daemon_type='rgw',
+ daemon_id="default.rgw.realm.ceph.001.ceph.001.ytywjo",
+ hostname="ceph.001",
+ ),
+ True
+ ),
+
+ # https://tracker.ceph.com/issues/45293
+ (
+ ServiceSpec(
+ service_type='mds',
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='mds',
+ daemon_id="a.host1.abc123",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # '.' char in service_id
+ ServiceSpec(
+ service_type='mds',
+ service_id="a.b.c",
+ ),
+ DaemonDescription(
+ daemon_type='mds',
+ daemon_id="a.b.c.host1.abc123",
+ hostname="host1",
+ ),
+ True
+ ),
+
+ # https://tracker.ceph.com/issues/45617
+ (
+ # daemon_id does not contain hostname
+ ServiceSpec(
+ service_type='mds',
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='mds',
+ daemon_id="a",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # daemon_id only contains hostname
+ ServiceSpec(
+ service_type='mds',
+ service_id="host1",
+ ),
+ DaemonDescription(
+ daemon_type='mds',
+ daemon_id="host1",
+ hostname="host1",
+ ),
+ True
+ ),
+
+ # https://tracker.ceph.com/issues/45399
+ (
+ # daemon_id only contains hostname
+ ServiceSpec(
+ service_type='mds',
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='mds',
+ daemon_id="a.host1.abc123",
+ hostname="host1.site",
+ ),
+ True
+ ),
+ (
+ NFSServiceSpec(
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id="a.host1",
+ hostname="host1.site",
+ ),
+ True
+ ),
+
+ # https://tracker.ceph.com/issues/45293
+ (
+ NFSServiceSpec(
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id="a.host1",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # service_id contains a '.' char
+ NFSServiceSpec(
+ service_id="a.b.c",
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id="a.b.c.host1",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # trailing chars after hostname
+ NFSServiceSpec(
+ service_id="a.b.c",
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id="a.b.c.host1.abc123",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # chars after hostname without '.'
+ NFSServiceSpec(
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id="a.host1abc123",
+ hostname="host1",
+ ),
+ False
+ ),
+ (
+ # chars before hostname without '.'
+ NFSServiceSpec(
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id="ahost1.abc123",
+ hostname="host1",
+ ),
+ False
+ ),
+
+ # https://tracker.ceph.com/issues/45293
+ (
+ IscsiServiceSpec(
+ service_type='iscsi',
+ service_id="a",
+ ),
+ DaemonDescription(
+ daemon_type='iscsi',
+ daemon_id="a.host1.abc123",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # '.' char in service_id
+ IscsiServiceSpec(
+ service_type='iscsi',
+ service_id="a.b.c",
+ ),
+ DaemonDescription(
+ daemon_type='iscsi',
+ daemon_id="a.b.c.host1.abc123",
+ hostname="host1",
+ ),
+ True
+ ),
+ (
+ # fixed daemon id for teuthology.
+ IscsiServiceSpec(
+ service_type='iscsi',
+ service_id='iscsi',
+ ),
+ DaemonDescription(
+ daemon_type='iscsi',
+ daemon_id="iscsi.a",
+ hostname="host1",
+ ),
+ True
+ ),
+
+ (
+ CustomContainerSpec(
+ service_type='container',
+ service_id='hello-world',
+ image='docker.io/library/hello-world:latest',
+ ),
+ DaemonDescription(
+ daemon_type='container',
+ daemon_id='hello-world.mgr0',
+ hostname='mgr0',
+ ),
+ True
+ ),
+
+ (
+ # daemon_id only contains hostname
+ ServiceSpec(
+ service_type='cephadm-exporter',
+ ),
+ DaemonDescription(
+ daemon_type='cephadm-exporter',
+ daemon_id="testhost",
+ hostname="testhost",
+ ),
+ True
+ ),
+])
+def test_daemon_description_service_name(spec: ServiceSpec,
+ dd: DaemonDescription,
+ valid: bool):
+ if valid:
+ assert spec.service_name() == dd.service_name()
+ else:
+ with pytest.raises(OrchestratorError):
+ dd.service_name()
diff --git a/src/pybind/mgr/cephadm/tests/test_template.py b/src/pybind/mgr/cephadm/tests/test_template.py
new file mode 100644
index 000000000..f67304348
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_template.py
@@ -0,0 +1,33 @@
+import pathlib
+
+import pytest
+
+from cephadm.template import TemplateMgr, UndefinedError, TemplateNotFoundError
+
+
+def test_render(cephadm_module, fs):
+ template_base = (pathlib.Path(__file__).parent / '../templates').resolve()
+ fake_template = template_base / 'foo/bar'
+ fs.create_file(fake_template, contents='{{ cephadm_managed }}{{ var }}')
+
+ template_mgr = TemplateMgr(cephadm_module)
+ value = 'test'
+
+ # with base context
+ expected_text = '{}{}'.format(template_mgr.base_context['cephadm_managed'], value)
+ assert template_mgr.render('foo/bar', {'var': value}) == expected_text
+
+ # without base context
+ with pytest.raises(UndefinedError):
+ template_mgr.render('foo/bar', {'var': value}, managed_context=False)
+
+ # override the base context
+ context = {
+ 'cephadm_managed': 'abc',
+ 'var': value
+ }
+ assert template_mgr.render('foo/bar', context) == 'abc{}'.format(value)
+
+ # template not found
+ with pytest.raises(TemplateNotFoundError):
+ template_mgr.render('foo/bar/2', {})
diff --git a/src/pybind/mgr/cephadm/tests/test_upgrade.py b/src/pybind/mgr/cephadm/tests/test_upgrade.py
new file mode 100644
index 000000000..9368f4dc3
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_upgrade.py
@@ -0,0 +1,322 @@
+import json
+from unittest import mock
+
+import pytest
+
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
+from cephadm import CephadmOrchestrator
+from cephadm.upgrade import CephadmUpgrade
+from cephadm.serve import CephadmServe
+from orchestrator import OrchestratorError, DaemonDescription
+from .fixtures import _run_cephadm, wait, with_host, with_service
+
+from typing import List, Tuple, Optional
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_start(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test2'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), status_running=True):
+ assert wait(cephadm_module, cephadm_module.upgrade_start(
+ 'image_id', None)) == 'Initiating upgrade to image_id'
+
+ assert wait(cephadm_module, cephadm_module.upgrade_status()
+ ).target_image == 'image_id'
+
+ assert wait(cephadm_module, cephadm_module.upgrade_pause()
+ ) == 'Paused upgrade to image_id'
+
+ assert wait(cephadm_module, cephadm_module.upgrade_resume()
+ ) == 'Resumed upgrade to image_id'
+
+ assert wait(cephadm_module, cephadm_module.upgrade_stop()
+ ) == 'Stopped upgrade to image_id'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+@pytest.mark.parametrize("use_repo_digest",
+ [
+ False,
+ True
+ ])
+def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ cephadm_module.set_container_image('global', 'from_image')
+ cephadm_module.use_repo_digest = use_repo_digest
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*', count=2)),
+ CephadmOrchestrator.apply_mgr, '', status_running=True),\
+ mock.patch("cephadm.module.CephadmOrchestrator.lookup_release_name",
+ return_value='foo'),\
+ mock.patch("cephadm.module.CephadmOrchestrator.version",
+ new_callable=mock.PropertyMock) as version_mock,\
+ mock.patch("cephadm.module.CephadmOrchestrator.get",
+ return_value={
+ # capture fields in both mon and osd maps
+ "require_osd_release": "pacific",
+ "min_mon_release": 16,
+ }):
+ version_mock.return_value = 'ceph version 18.2.1 (somehash)'
+ assert wait(cephadm_module, cephadm_module.upgrade_start(
+ 'to_image', None)) == 'Initiating upgrade to to_image'
+
+ assert wait(cephadm_module, cephadm_module.upgrade_status()
+ ).target_image == 'to_image'
+
+ def _versions_mock(cmd):
+ return json.dumps({
+ 'mgr': {
+ 'ceph version 1.2.3 (asdf) blah': 1
+ }
+ })
+
+ cephadm_module._mon_command_mock_versions = _versions_mock
+
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digests': ['to_image@repo_digest'],
+ 'ceph_version': 'ceph version 18.2.3 (hash)',
+ }))):
+
+ cephadm_module.upgrade._do_upgrade()
+
+ assert cephadm_module.upgrade_status is not None
+
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name=list(cephadm_module.cache.daemons['host1'].keys())[0],
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ container_image_id='image_id',
+ container_image_digests=['to_image@repo_digest'],
+ deployed_by=['to_image@repo_digest'],
+ version='version',
+ state='running',
+ )
+ ])
+ )):
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digests': ['to_image@repo_digest'],
+ 'ceph_version': 'ceph version 18.2.3 (hash)',
+ }))):
+ cephadm_module.upgrade._do_upgrade()
+
+ _, image, _ = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'global',
+ 'key': 'container_image',
+ })
+ if use_repo_digest:
+ assert image == 'to_image@repo_digest'
+ else:
+ assert image == 'to_image'
+
+
+def test_upgrade_state_null(cephadm_module: CephadmOrchestrator):
+ # This test validates https://tracker.ceph.com/issues/47580
+ cephadm_module.set_store('upgrade_state', 'null')
+ CephadmUpgrade(cephadm_module)
+ assert CephadmUpgrade(cephadm_module).upgrade_state is None
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_not_enough_mgrs(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=1)), CephadmOrchestrator.apply_mgr, ''):
+ with pytest.raises(OrchestratorError):
+ wait(cephadm_module, cephadm_module.upgrade_start('image_id', None))
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+@mock.patch("cephadm.CephadmOrchestrator.check_mon_command")
+def test_enough_mons_for_ok_to_stop(check_mon_command, cephadm_module: CephadmOrchestrator):
+ # only 2 monitors, not enough for ok-to-stop to ever pass
+ check_mon_command.return_value = (
+ 0, '{"monmap": {"mons": [{"name": "mon.1"}, {"name": "mon.2"}]}}', '')
+ assert not cephadm_module.upgrade._enough_mons_for_ok_to_stop()
+
+ # 3 monitors, ok-to-stop should work fine
+ check_mon_command.return_value = (
+ 0, '{"monmap": {"mons": [{"name": "mon.1"}, {"name": "mon.2"}, {"name": "mon.3"}]}}', '')
+ assert cephadm_module.upgrade._enough_mons_for_ok_to_stop()
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+@mock.patch("cephadm.module.HostCache.get_daemons_by_service")
+@mock.patch("cephadm.CephadmOrchestrator.get")
+def test_enough_mds_for_ok_to_stop(get, get_daemons_by_service, cephadm_module: CephadmOrchestrator):
+ get.side_effect = [{'filesystems': [{'mdsmap': {'fs_name': 'test', 'max_mds': 1}}]}]
+ get_daemons_by_service.side_effect = [[DaemonDescription()]]
+ assert not cephadm_module.upgrade._enough_mds_for_ok_to_stop(
+ DaemonDescription(daemon_type='mds', daemon_id='test.host1.gfknd', service_name='mds.test'))
+
+ get.side_effect = [{'filesystems': [{'mdsmap': {'fs_name': 'myfs.test', 'max_mds': 2}}]}]
+ get_daemons_by_service.side_effect = [[DaemonDescription(), DaemonDescription()]]
+ assert not cephadm_module.upgrade._enough_mds_for_ok_to_stop(
+ DaemonDescription(daemon_type='mds', daemon_id='myfs.test.host1.gfknd', service_name='mds.myfs.test'))
+
+ get.side_effect = [{'filesystems': [{'mdsmap': {'fs_name': 'myfs.test', 'max_mds': 1}}]}]
+ get_daemons_by_service.side_effect = [[DaemonDescription(), DaemonDescription()]]
+ assert cephadm_module.upgrade._enough_mds_for_ok_to_stop(
+ DaemonDescription(daemon_type='mds', daemon_id='myfs.test.host1.gfknd', service_name='mds.myfs.test'))
+
+
+@pytest.mark.parametrize(
+ "upgraded, not_upgraded, daemon_types, hosts, services, should_block",
+ # [ ([(type, host, id), ... ], [...], [daemon types], [hosts], [services], True/False), ... ]
+ [
+ ( # valid, upgrade mgr daemons
+ [],
+ [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')],
+ ['mgr'],
+ None,
+ None,
+ False
+ ),
+ ( # invalid, can't upgrade mons until mgr is upgraded
+ [],
+ [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')],
+ ['mon'],
+ None,
+ None,
+ True
+ ),
+ ( # invalid, can't upgrade mon service until all mgr daemons are upgraded
+ [],
+ [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')],
+ None,
+ None,
+ ['mon'],
+ True
+ ),
+ ( # valid, upgrade mgr service
+ [],
+ [('mgr', 'a', 'a.x'), ('mon', 'a', 'a')],
+ None,
+ None,
+ ['mgr'],
+ False
+ ),
+ ( # valid, mgr is already upgraded so can upgrade mons
+ [('mgr', 'a', 'a.x')],
+ [('mon', 'a', 'a')],
+ ['mon'],
+ None,
+ None,
+ False
+ ),
+ ( # invalid, can't upgrade all daemons on b b/c un-upgraded mgr on a
+ [],
+ [('mgr', 'b', 'b.y'), ('mon', 'a', 'a')],
+ None,
+ ['a'],
+ None,
+ True
+ ),
+ ( # valid, only daemon on b is a mgr
+ [],
+ [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')],
+ None,
+ ['b'],
+ None,
+ False
+ ),
+ ( # invalid, can't upgrade mon on a while mgr on b is un-upgraded
+ [],
+ [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')],
+ None,
+ ['a'],
+ None,
+ True
+ ),
+ ( # valid, only upgrading the mgr on a
+ [],
+ [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')],
+ ['mgr'],
+ ['a'],
+ None,
+ False
+ ),
+ ( # valid, mgr daemon not on b are upgraded
+ [('mgr', 'a', 'a.x')],
+ [('mgr', 'b', 'b.y'), ('mon', 'a', 'a')],
+ None,
+ ['b'],
+ None,
+ False
+ ),
+ ( # valid, all the necessary hosts are covered, mgr on c is already upgraded
+ [('mgr', 'c', 'c.z')],
+ [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a'), ('osd', 'c', '0')],
+ None,
+ ['a', 'b'],
+ None,
+ False
+ ),
+ ( # invalid, can't upgrade mon on a while mgr on b is un-upgraded
+ [],
+ [('mgr', 'a', 'a.x'), ('mgr', 'b', 'b.y'), ('mon', 'a', 'a')],
+ ['mgr', 'mon'],
+ ['a'],
+ None,
+ True
+ ),
+ ( # valid, only mon not on "b" is upgraded already. Case hit while making teuthology test
+ [('mon', 'a', 'a')],
+ [('mon', 'b', 'x'), ('mon', 'b', 'y'), ('osd', 'a', '1'), ('osd', 'b', '2')],
+ ['mon', 'osd'],
+ ['b'],
+ None,
+ False
+ ),
+ ]
+)
+@mock.patch("cephadm.module.HostCache.get_daemons")
+@mock.patch("cephadm.serve.CephadmServe._get_container_image_info")
+@mock.patch('cephadm.module.SpecStore.__getitem__')
+def test_staggered_upgrade_validation(
+ get_spec,
+ get_image_info,
+ get_daemons,
+ upgraded: List[Tuple[str, str, str]],
+ not_upgraded: List[Tuple[str, str, str, str]],
+ daemon_types: Optional[str],
+ hosts: Optional[str],
+ services: Optional[str],
+ should_block: bool,
+ cephadm_module: CephadmOrchestrator,
+):
+ def to_dds(ts: List[Tuple[str, str]], upgraded: bool) -> List[DaemonDescription]:
+ dds = []
+ digest = 'new_image@repo_digest' if upgraded else 'old_image@repo_digest'
+ for t in ts:
+ dds.append(DaemonDescription(daemon_type=t[0],
+ hostname=t[1],
+ daemon_id=t[2],
+ container_image_digests=[digest],
+ deployed_by=[digest],))
+ return dds
+ get_daemons.return_value = to_dds(upgraded, True) + to_dds(not_upgraded, False)
+ get_image_info.return_value = ('new_id', 'ceph version 99.99.99 (hash)', ['new_image@repo_digest'])
+
+ class FakeSpecDesc():
+ def __init__(self, spec):
+ self.spec = spec
+
+ def _get_spec(s):
+ return FakeSpecDesc(ServiceSpec(s))
+
+ get_spec.side_effect = _get_spec
+ if should_block:
+ with pytest.raises(OrchestratorError):
+ cephadm_module.upgrade._validate_upgrade_filters(
+ 'new_image_name', daemon_types, hosts, services)
+ else:
+ cephadm_module.upgrade._validate_upgrade_filters(
+ 'new_image_name', daemon_types, hosts, services)