summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/tests/test_cephadm.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/cephadm/tests/test_cephadm.py')
-rw-r--r--src/pybind/mgr/cephadm/tests/test_cephadm.py2709
1 files changed, 2709 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py
new file mode 100644
index 000000000..24fcb0280
--- /dev/null
+++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py
@@ -0,0 +1,2709 @@
+import asyncio
+import json
+import logging
+
+from contextlib import contextmanager
+
+import pytest
+
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
+from cephadm.serve import CephadmServe
+from cephadm.inventory import HostCacheStatus, ClientKeyringSpec
+from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
+from cephadm.utils import SpecialHostLabels
+
+try:
+ from typing import List
+except ImportError:
+ pass
+
+from ceph.deployment.service_spec import (
+ CustomConfig,
+ CustomContainerSpec,
+ HostPlacementSpec,
+ IscsiServiceSpec,
+ MDSSpec,
+ NFSServiceSpec,
+ PlacementSpec,
+ RGWSpec,
+ ServiceSpec,
+)
+from ceph.deployment.drive_selection.selector import DriveSelection
+from ceph.deployment.inventory import Devices, Device
+from ceph.utils import datetime_to_str, datetime_now, str_to_datetime
+from orchestrator import DaemonDescription, InventoryHost, \
+ HostSpec, OrchestratorError, DaemonDescriptionStatus, OrchestratorEvent
+from tests import mock
+from .fixtures import wait, _run_cephadm, match_glob, with_host, \
+ with_cephadm_module, with_service, make_daemons_running, async_side_effect
+from cephadm.module import CephadmOrchestrator
+
+"""
+TODOs:
+ There is really room for improvement here. I just quickly assembled theses tests.
+ I general, everything should be testes in Teuthology as well. Reasons for
+ also testing this here is the development roundtrip time.
+"""
+
+
+def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
+ dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host))
+ d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)]
+ assert d_names
+ # there should only be one daemon (if not match_glob will throw mismatch)
+ assert len(d_names) == 1
+
+ c = cephadm.remove_daemons(d_names)
+ [out] = wait(cephadm, c)
+ # picking the 1st element is needed, rather than passing the list when the daemon
+ # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter
+ # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e)
+ # are valid, so pass without incident! Also, match_gob acts on strings anyway!
+ match_glob(out, f"Removed {d_names[0]}* from host '{host}'")
+
+
+@contextmanager
+def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: str):
+ spec.placement = PlacementSpec(hosts=[host], count=1)
+
+ c = cephadm_module.add_daemon(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'")
+
+ dds = cephadm_module.cache.get_daemons_by_service(spec.service_name())
+ for dd in dds:
+ if dd.hostname == host:
+ yield dd.daemon_id
+ assert_rm_daemon(cephadm_module, spec.service_name(), host)
+ return
+
+ assert False, 'Daemon not found'
+
+
+@contextmanager
+def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {
+ 'osd': 1,
+ 'up_from': 0,
+ 'up': True,
+ 'uuid': 'uuid'
+ }
+ ]
+ })
+
+ _run_cephadm.reset_mock(return_value=True, side_effect=True)
+ if ceph_volume_lvm_list:
+ _run_cephadm.side_effect = ceph_volume_lvm_list
+ else:
+ async def _ceph_volume_list(s, host, entity, cmd, **kwargs):
+ logging.info(f'ceph-volume cmd: {cmd}')
+ if 'raw' in cmd:
+ return json.dumps({
+ "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": {
+ "ceph_fsid": cephadm_module._cluster_fsid,
+ "device": "/dev/loop0",
+ "osd_id": 21,
+ "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5",
+ "type": "bluestore"
+ },
+ }), '', 0
+ if 'lvm' in cmd:
+ return json.dumps({
+ str(osd_id): [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }]
+ }), '', 0
+ return '{}', '', 0
+
+ _run_cephadm.side_effect = _ceph_volume_list
+
+ assert cephadm_module._osd_activate(
+ [host]).stdout == f"Created osd(s) 1 on host '{host}'"
+ assert _run_cephadm.mock_calls == [
+ mock.call(host, 'osd', 'ceph-volume',
+ ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, error_ok=False, image='', log_output=True),
+ mock.call(host, f'osd.{osd_id}', ['_orch', 'deploy'], [], stdin=mock.ANY),
+ mock.call(host, 'osd', 'ceph-volume',
+ ['--', 'raw', 'list', '--format', 'json'], no_fsid=False, error_ok=False, image='', log_output=True),
+ ]
+ dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host)
+ assert dd.name() == f'osd.{osd_id}'
+ yield dd
+ cephadm_module._remove_daemons([(f'osd.{osd_id}', host)])
+
+
+class TestCephadm(object):
+
+ def test_get_unique_name(self, cephadm_module):
+ # type: (CephadmOrchestrator) -> None
+ existing = [
+ DaemonDescription(daemon_type='mon', daemon_id='a')
+ ]
+ new_mon = cephadm_module.get_unique_name('mon', 'myhost', existing)
+ match_glob(new_mon, 'myhost')
+ new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
+ match_glob(new_mgr, 'myhost.*')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_host(self, cephadm_module):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
+ with with_host(cephadm_module, 'test'):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')]
+
+ # Be careful with backward compatibility when changing things here:
+ assert json.loads(cephadm_module.get_store('inventory')) == \
+ {"test": {"hostname": "test", "addr": "1::4", "labels": [], "status": ""}}
+
+ with with_host(cephadm_module, 'second', '1.2.3.5'):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [
+ HostSpec('test', '1::4'),
+ HostSpec('second', '1.2.3.5')
+ ]
+
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')]
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.utils.resolve_ip")
+ def test_re_add_host_receive_loopback(self, resolve_ip, cephadm_module):
+ resolve_ip.side_effect = ['192.168.122.1', '127.0.0.1', '127.0.0.1']
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
+ cephadm_module._add_host(HostSpec('test', '192.168.122.1'))
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [
+ HostSpec('test', '192.168.122.1')]
+ cephadm_module._add_host(HostSpec('test'))
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [
+ HostSpec('test', '192.168.122.1')]
+ with pytest.raises(OrchestratorError):
+ cephadm_module._add_host(HostSpec('test2'))
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_service_ls(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.list_daemons(refresh=True)
+ assert wait(cephadm_module, c) == []
+ with with_service(cephadm_module, MDSSpec('mds', 'name', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, MDSSpec('mds', 'name'), 'test') as _:
+
+ c = cephadm_module.list_daemons()
+
+ def remove_id_events(dd):
+ out = dd.to_json()
+ del out['daemon_id']
+ del out['events']
+ del out['daemon_name']
+ return out
+
+ assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [
+ {
+ 'service_name': 'mds.name',
+ 'daemon_type': 'mds',
+ 'hostname': 'test',
+ 'status': 2,
+ 'status_desc': 'starting',
+ 'is_active': False,
+ 'ports': [],
+ }
+ ]
+
+ with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'),
+ CephadmOrchestrator.apply_rgw, 'test', status_running=True):
+ make_daemons_running(cephadm_module, 'mds.name')
+
+ c = cephadm_module.describe_service()
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'count': 2},
+ 'service_id': 'name',
+ 'service_name': 'mds.name',
+ 'service_type': 'mds',
+ 'status': {'created': mock.ANY, 'running': 1, 'size': 2},
+ 'unmanaged': True
+ },
+ {
+ 'placement': {
+ 'count': 1,
+ 'hosts': ["test"]
+ },
+ 'service_id': 'r.z',
+ 'service_name': 'rgw.r.z',
+ 'service_type': 'rgw',
+ 'status': {'created': mock.ANY, 'running': 1, 'size': 1,
+ 'ports': [80]},
+ }
+ ]
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_service_ls_service_type_flag(self, cephadm_module):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)),
+ CephadmOrchestrator.apply_mgr, '', status_running=True):
+ with with_service(cephadm_module, MDSSpec('mds', 'test-id', placement=PlacementSpec(count=2)),
+ CephadmOrchestrator.apply_mds, '', status_running=True):
+
+ # with no service-type. Should provide info fot both services
+ c = cephadm_module.describe_service()
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'count': 2},
+ 'service_name': 'mgr',
+ 'service_type': 'mgr',
+ 'status': {'created': mock.ANY,
+ 'running': 2,
+ 'size': 2}
+ },
+ {
+ 'placement': {'count': 2},
+ 'service_id': 'test-id',
+ 'service_name': 'mds.test-id',
+ 'service_type': 'mds',
+ 'status': {'created': mock.ANY,
+ 'running': 2,
+ 'size': 2}
+ },
+ ]
+
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
+
+ # with service-type. Should provide info fot only mds
+ c = cephadm_module.describe_service(service_type='mds')
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'count': 2},
+ 'service_id': 'test-id',
+ 'service_name': 'mds.test-id',
+ 'service_type': 'mds',
+ 'status': {'created': mock.ANY,
+ 'running': 2,
+ 'size': 2}
+ },
+ ]
+
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
+
+ # service-type should not match with service names
+ c = cephadm_module.describe_service(service_type='mds.test-id')
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ assert out == []
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_device_ls(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.get_inventory()
+ assert wait(cephadm_module, c) == [InventoryHost('test')]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='rgw.myrgw.foobar',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ ),
+ dict(
+ name='something.foo.bar',
+ style='cephadm',
+ fsid='fsid',
+ ),
+ dict(
+ name='haproxy.test.bar',
+ style='cephadm',
+ fsid='fsid',
+ ),
+
+ ])
+ ))
+ def test_list_daemons(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ assert {d.name() for d in dds} == {'rgw.myrgw.foobar', 'haproxy.test.bar'}
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
+
+ d_name = 'rgw.' + daemon_id
+
+ c = cephadm_module.daemon_action('redeploy', d_name)
+ assert wait(cephadm_module,
+ c) == f"Scheduled to redeploy rgw.{daemon_id} on host 'test'"
+
+ for what in ('start', 'stop', 'restart'):
+ c = cephadm_module.daemon_action(what, d_name)
+ assert wait(cephadm_module,
+ c) == F"Scheduled to {what} {d_name} on host 'test'"
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module._store['_ceph_get/mon_map'] = {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ }
+ cephadm_module.notify('mon_map', None)
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ assert cephadm_module.events.get_for_daemon(d_name) == [
+ OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO',
+ f"Deployed {d_name} on host \'test\'"),
+ OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO',
+ f"stop {d_name} from host \'test\'"),
+ ]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
+ with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
+
+ _ceph_send_command.side_effect = Exception("myerror")
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ evs = [e.message for e in cephadm_module.events.get_for_daemon(
+ f'rgw.{daemon_id}')]
+
+ assert 'myerror' in ''.join(evs)
+
+ @pytest.mark.parametrize(
+ "action",
+ [
+ 'start',
+ 'stop',
+ 'restart',
+ 'reconfig',
+ 'redeploy'
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.HostCache.save_host")
+ def test_daemon_check(self, _save_host, cephadm_module: CephadmOrchestrator, action):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
+ [daemon_name] = d_names
+
+ cephadm_module._schedule_daemon_action(daemon_name, action)
+
+ assert cephadm_module.cache.get_scheduled_daemon_action(
+ 'test', daemon_name) == action
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ assert _save_host.called_with('test')
+ assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+ with with_host(cephadm_module, 'test'):
+
+ # Also testing deploying mons without explicit network placement
+ cephadm_module.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mon',
+ 'name': 'public_network',
+ 'value': '127.0.0.0/8'
+ })
+
+ cephadm_module.cache.update_host_networks(
+ 'test',
+ {
+ "127.0.0.0/8": [
+ "127.0.0.1"
+ ],
+ }
+ )
+
+ with with_service(cephadm_module, ServiceSpec(service_type='mon'), CephadmOrchestrator.apply_mon, 'test') as d_names:
+ [daemon_name] = d_names
+
+ cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'mon.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "mon.test",
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'reconfig': True,
+ },
+ "meta": {
+ 'service_name': 'mon',
+ 'ports': [],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': None,
+ 'extra_entrypoint_args': None,
+ },
+ "config_blobs": {
+ "config": "[mon]\nk=v\n[mon.test]\npublic network = 127.0.0.0/8\n",
+ "keyring": "",
+ "files": {
+ "config": "[mon.test]\npublic network = 127.0.0.0/8\n"
+ },
+ },
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_mon_crush_location_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mon',
+ 'name': 'public_network',
+ 'value': '127.0.0.0/8'
+ })
+
+ cephadm_module.cache.update_host_networks(
+ 'test',
+ {
+ "127.0.0.0/8": [
+ "127.0.0.1"
+ ],
+ }
+ )
+
+ with with_service(cephadm_module, ServiceSpec(service_type='mon', crush_locations={'test': ['datacenter=a', 'rack=2']}), CephadmOrchestrator.apply_mon, 'test'):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'mon.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "mon.test",
+ "image": '',
+ "deploy_arguments": [],
+ "params": {},
+ "meta": {
+ 'service_name': 'mon',
+ 'ports': [],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': None,
+ 'extra_entrypoint_args': None,
+ },
+ "config_blobs": {
+ "config": "[mon.test]\npublic network = 127.0.0.0/8\n",
+ "keyring": "",
+ "files": {
+ "config": "[mon.test]\npublic network = 127.0.0.0/8\n",
+ },
+ "crush_location": "datacenter=a",
+ },
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_extra_container_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='crash', extra_container_args=['--cpus=2', '--quiet']), CephadmOrchestrator.apply_crash):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'crash.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "crash.test",
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'extra_container_args': [
+ "--cpus=2",
+ "--quiet",
+ ],
+ },
+ "meta": {
+ 'service_name': 'crash',
+ 'ports': [],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': [
+ "--cpus=2",
+ "--quiet",
+ ],
+ 'extra_entrypoint_args': None,
+ },
+ "config_blobs": {
+ "config": "",
+ "keyring": "[client.crash.test]\nkey = None\n",
+ },
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_extra_entrypoint_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='node-exporter',
+ extra_entrypoint_args=['--collector.textfile.directory=/var/lib/node_exporter/textfile_collector', '--some-other-arg']),
+ CephadmOrchestrator.apply_node_exporter):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'node-exporter.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "node-exporter.test",
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'tcp_ports': [9100],
+ 'extra_entrypoint_args': [
+ "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector",
+ "--some-other-arg",
+ ],
+ },
+ "meta": {
+ 'service_name': 'node-exporter',
+ 'ports': [9100],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': None,
+ 'extra_entrypoint_args': [
+ "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector",
+ "--some-other-arg",
+ ],
+ },
+ "config_blobs": {},
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_extra_entrypoint_and_container_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='node-exporter',
+ extra_entrypoint_args=['--collector.textfile.directory=/var/lib/node_exporter/textfile_collector', '--some-other-arg'],
+ extra_container_args=['--cpus=2', '--quiet']),
+ CephadmOrchestrator.apply_node_exporter):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'node-exporter.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "node-exporter.test",
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'tcp_ports': [9100],
+ 'extra_container_args': [
+ "--cpus=2",
+ "--quiet",
+ ],
+ 'extra_entrypoint_args': [
+ "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector",
+ "--some-other-arg",
+ ],
+ },
+ "meta": {
+ 'service_name': 'node-exporter',
+ 'ports': [9100],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': [
+ "--cpus=2",
+ "--quiet",
+ ],
+ 'extra_entrypoint_args': [
+ "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector",
+ "--some-other-arg",
+ ],
+ },
+ "config_blobs": {},
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_extra_entrypoint_and_container_args_with_spaces(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='node-exporter',
+ extra_entrypoint_args=['--entrypoint-arg-with-value value', '--some-other-arg 3'],
+ extra_container_args=['--cpus 2', '--container-arg-with-value value']),
+ CephadmOrchestrator.apply_node_exporter):
+ _run_cephadm.assert_called_with(
+ 'test',
+ 'node-exporter.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "node-exporter.test",
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'tcp_ports': [9100],
+ 'extra_container_args': [
+ "--cpus",
+ "2",
+ "--container-arg-with-value",
+ "value",
+ ],
+ 'extra_entrypoint_args': [
+ "--entrypoint-arg-with-value",
+ "value",
+ "--some-other-arg",
+ "3",
+ ],
+ },
+ "meta": {
+ 'service_name': 'node-exporter',
+ 'ports': [9100],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': [
+ "--cpus 2",
+ "--container-arg-with-value value",
+ ],
+ 'extra_entrypoint_args': [
+ "--entrypoint-arg-with-value value",
+ "--some-other-arg 3",
+ ],
+ },
+ "config_blobs": {},
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_custom_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ test_cert = ['-----BEGIN PRIVATE KEY-----',
+ 'YSBhbGlxdXlhbSBlcmF0LCBzZWQgZGlhbSB2b2x1cHR1YS4gQXQgdmVybyBlb3Mg',
+ 'ZXQgYWNjdXNhbSBldCBqdXN0byBkdW8=',
+ '-----END PRIVATE KEY-----',
+ '-----BEGIN CERTIFICATE-----',
+ 'YSBhbGlxdXlhbSBlcmF0LCBzZWQgZGlhbSB2b2x1cHR1YS4gQXQgdmVybyBlb3Mg',
+ 'ZXQgYWNjdXNhbSBldCBqdXN0byBkdW8=',
+ '-----END CERTIFICATE-----']
+ configs = [
+ CustomConfig(content='something something something',
+ mount_path='/etc/test.conf'),
+ CustomConfig(content='\n'.join(test_cert), mount_path='/usr/share/grafana/thing.crt')
+ ]
+ tc_joined = '\n'.join(test_cert)
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='crash', custom_configs=configs), CephadmOrchestrator.apply_crash):
+ _run_cephadm(
+ 'test',
+ 'crash.test',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": "crash.test",
+ "image": "",
+ "deploy_arguments": [],
+ "params": {},
+ "meta": {
+ "service_name": "crash",
+ "ports": [],
+ "ip": None,
+ "deployed_by": [],
+ "rank": None,
+ "rank_generation": None,
+ "extra_container_args": None,
+ "extra_entrypoint_args": None,
+ },
+ "config_blobs": {
+ "config": "",
+ "keyring": "[client.crash.test]\nkey = None\n",
+ "custom_config_files": [
+ {
+ "content": "something something something",
+ "mount_path": "/etc/test.conf",
+ },
+ {
+ "content": tc_joined,
+ "mount_path": "/usr/share/grafana/thing.crt",
+ },
+ ]
+ }
+ }),
+ )
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+ cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
+ 'modules': ['dashboard']
+ })
+
+ with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
+ CephadmServe(cephadm_module)._check_daemons()
+ _mon_cmd.assert_any_call(
+ {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://[1::4]:3000'},
+ None)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1.2.3.4')
+ def test_iscsi_post_actions_with_missing_daemon_in_cache(self, cephadm_module: CephadmOrchestrator):
+ # https://tracker.ceph.com/issues/52866
+ with with_host(cephadm_module, 'test1'):
+ with with_host(cephadm_module, 'test2'):
+ with with_service(cephadm_module, IscsiServiceSpec(service_id='foobar', pool='pool', placement=PlacementSpec(host_pattern='*')), CephadmOrchestrator.apply_iscsi, 'test'):
+
+ CephadmServe(cephadm_module)._apply_all_services()
+ assert len(cephadm_module.cache.get_daemons_by_type('iscsi')) == 2
+
+ # get a daemons from postaction list (ARRGH sets!!)
+ tempset = cephadm_module.requires_post_actions.copy()
+ tempdaemon1 = tempset.pop()
+ tempdaemon2 = tempset.pop()
+
+ # make sure post actions has 2 daemons in it
+ assert len(cephadm_module.requires_post_actions) == 2
+
+ # replicate a host cache that is not in sync when check_daemons is called
+ tempdd1 = cephadm_module.cache.get_daemon(tempdaemon1)
+ tempdd2 = cephadm_module.cache.get_daemon(tempdaemon2)
+ host = 'test1'
+ if 'test1' not in tempdaemon1:
+ host = 'test2'
+ cephadm_module.cache.rm_daemon(host, tempdaemon1)
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime_to_str(datetime_now()),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+ cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
+ 'modules': ['dashboard']
+ })
+
+ with mock.patch("cephadm.module.IscsiService.config_dashboard") as _cfg_db:
+ CephadmServe(cephadm_module)._check_daemons()
+ _cfg_db.assert_called_once_with([tempdd2])
+
+ # post actions still has the other daemon in it and will run next _check_daemons
+ assert len(cephadm_module.requires_post_actions) == 1
+
+ # post actions was missed for a daemon
+ assert tempdaemon1 in cephadm_module.requires_post_actions
+
+ # put the daemon back in the cache
+ cephadm_module.cache.add_daemon(host, tempdd1)
+
+ _cfg_db.reset_mock()
+ # replicate serve loop running again
+ CephadmServe(cephadm_module)._check_daemons()
+
+ # post actions should have been called again
+ _cfg_db.asset_called()
+
+ # post actions is now empty
+ assert len(cephadm_module.requires_post_actions) == 0
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_mon_add(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
+ assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
+
+ with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
+ wait(cephadm_module, c)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_mgr_update(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
+ assert r
+
+ assert_rm_daemon(cephadm_module, 'mgr.a', 'test')
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
+ def test_find_destroyed_osds(self, _mon_cmd, cephadm_module):
+ dict_out = {
+ "nodes": [
+ {
+ "id": -1,
+ "name": "default",
+ "type": "root",
+ "type_id": 11,
+ "children": [
+ -3
+ ]
+ },
+ {
+ "id": -3,
+ "name": "host1",
+ "type": "host",
+ "type_id": 1,
+ "pool_weights": {},
+ "children": [
+ 0
+ ]
+ },
+ {
+ "id": 0,
+ "device_class": "hdd",
+ "name": "osd.0",
+ "type": "osd",
+ "type_id": 0,
+ "crush_weight": 0.0243988037109375,
+ "depth": 2,
+ "pool_weights": {},
+ "exists": 1,
+ "status": "destroyed",
+ "reweight": 1,
+ "primary_affinity": 1
+ }
+ ],
+ "stray": []
+ }
+ json_out = json.dumps(dict_out)
+ _mon_cmd.return_value = (0, json_out, '')
+ osd_claims = OsdIdClaims(cephadm_module)
+ assert osd_claims.get() == {'host1': ['0']}
+ assert osd_claims.filtered_by_host('host1') == ['0']
+ assert osd_claims.filtered_by_host('host1.domain.com') == ['0']
+
+ @ pytest.mark.parametrize(
+ "ceph_services, cephadm_daemons, strays_expected, metadata",
+ # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ]
+ [
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [],
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ {},
+ ),
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [],
+ {},
+ ),
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [('mds', 'a'), ('osd', '0')],
+ [('mgr', 'x')],
+ {},
+ ),
+ # https://tracker.ceph.com/issues/49573
+ (
+ [('rgw-nfs', '14649')],
+ [],
+ [('nfs', 'foo-rgw.host1')],
+ {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}},
+ ),
+ (
+ [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
+ [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')],
+ [],
+ {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
+ ),
+ (
+ [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
+ [('nfs', 'foo-rgw.host1')],
+ [('nfs', 'foo2.host2')],
+ {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
+ ),
+ ]
+ )
+ def test_check_for_stray_daemons(
+ self,
+ cephadm_module,
+ ceph_services,
+ cephadm_daemons,
+ strays_expected,
+ metadata
+ ):
+ # mock ceph service-map
+ services = []
+ for service in ceph_services:
+ s = {'type': service[0], 'id': service[1]}
+ services.append(s)
+ ls = [{'hostname': 'host1', 'services': services}]
+
+ with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers:
+ list_servers.return_value = ls
+ list_servers.__iter__.side_effect = ls.__iter__
+
+ # populate cephadm daemon cache
+ dm = {}
+ for daemon_type, daemon_id in cephadm_daemons:
+ dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id)
+ dm[dd.name()] = dd
+ cephadm_module.cache.update_host_daemons('host1', dm)
+
+ def get_metadata_mock(svc_type, svc_id, default):
+ return metadata[svc_id]
+
+ with mock.patch.object(cephadm_module, 'get_metadata', new_callable=lambda: get_metadata_mock):
+
+ # test
+ CephadmServe(cephadm_module)._check_for_strays()
+
+ # verify
+ strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON')
+ if not strays:
+ assert len(strays_expected) == 0
+ else:
+ for dt, di in strays_expected:
+ name = '%s.%s' % (dt, di)
+ for detail in strays['detail']:
+ if name in detail:
+ strays['detail'].remove(detail)
+ break
+ assert name in detail
+ assert len(strays['detail']) == 0
+ assert strays['count'] == len(strays_expected)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
+ def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
+ _mon_cmd.return_value = (1, "", "fail_msg")
+ with pytest.raises(OrchestratorError):
+ OsdIdClaims(cephadm_module)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+
+ spec = DriveGroupSpec(
+ service_id='foo',
+ placement=PlacementSpec(
+ host_pattern='*',
+ ),
+ data_devices=DeviceSelection(
+ all=True
+ )
+ )
+
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
+
+ inventory = Devices([
+ Device(
+ '/dev/sdb',
+ available=True
+ ),
+ ])
+
+ cephadm_module.cache.update_host_devices('test', inventory.devices)
+
+ _run_cephadm.side_effect = async_side_effect((['{}'], '', 0))
+
+ assert CephadmServe(cephadm_module)._apply_all_services() is False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'batch',
+ '--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
+ env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True,
+ stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True)
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+
+ spec = DriveGroupSpec(
+ service_id='noncollocated',
+ placement=PlacementSpec(
+ hosts=['test']
+ ),
+ data_devices=DeviceSelection(paths=['/dev/sdb']),
+ db_devices=DeviceSelection(paths=['/dev/sdc']),
+ wal_devices=DeviceSelection(paths=['/dev/sdd'])
+ )
+
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...']
+
+ inventory = Devices([
+ Device('/dev/sdb', available=True),
+ Device('/dev/sdc', available=True),
+ Device('/dev/sdd', available=True)
+ ])
+
+ cephadm_module.cache.update_host_devices('test', inventory.devices)
+
+ _run_cephadm.side_effect = async_side_effect((['{}'], '', 0))
+
+ assert CephadmServe(cephadm_module)._apply_all_services() is False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'batch',
+ '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc',
+ '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'],
+ env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
+ error_ok=True, stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True)
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.SpecStore.save")
+ def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'},
+ 'service_id': 'foo', 'data_devices': {'all': True}}
+ spec = ServiceSpec.from_json(json_spec)
+ assert isinstance(spec, DriveGroupSpec)
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
+ _save_spec.assert_called_with(spec)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_create_osds(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(dg)
+ out = wait(cephadm_module, c)
+ assert out == "Created no osd(s) on host test; already created?"
+ bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_host'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(bad_dg)
+ out = wait(cephadm_module, c)
+ assert "Invalid 'host:device' spec: host not found in cluster" in out
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_create_noncollocated_osd(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(dg)
+ out = wait(cephadm_module, c)
+ assert out == "Created no osd(s) on host test; already created?"
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch('cephadm.services.osd.OSDService._run_ceph_volume_command')
+ @mock.patch('cephadm.services.osd.OSDService.driveselection_to_ceph_volume')
+ @mock.patch('cephadm.services.osd.OsdIdClaims.refresh', lambda _: None)
+ @mock.patch('cephadm.services.osd.OsdIdClaims.get', lambda _: {})
+ def test_limit_not_reached(self, d_to_cv, _run_cv_cmd, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(limit=5, rotational=1),
+ service_id='not_enough')
+
+ disks_found = [
+ '[{"data": "/dev/vdb", "data_size": "50.00 GB", "encryption": "None"}, {"data": "/dev/vdc", "data_size": "50.00 GB", "encryption": "None"}]']
+ d_to_cv.return_value = 'foo'
+ _run_cv_cmd.side_effect = async_side_effect((disks_found, '', 0))
+ preview = cephadm_module.osd_service.generate_previews([dg], 'test')
+
+ for osd in preview:
+ assert 'notes' in osd
+ assert osd['notes'] == [
+ 'NOTE: Did not find enough disks matching filter on host test to reach data device limit (Found: 2 | Limit: 5)']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_prepare_drivegroup(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ out = cephadm_module.osd_service.prepare_drivegroup(dg)
+ assert len(out) == 1
+ f1 = out[0]
+ assert f1[0] == 'test'
+ assert isinstance(f1[1], DriveSelection)
+
+ @pytest.mark.parametrize(
+ "devices, preview, exp_commands",
+ [
+ # no preview and only one disk, prepare is used due the hack that is in place.
+ (['/dev/sda'], False, ["lvm batch --no-auto /dev/sda --yes --no-systemd"]),
+ # no preview and multiple disks, uses batch
+ (['/dev/sda', '/dev/sdb'], False,
+ ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"]),
+ # preview and only one disk needs to use batch again to generate the preview
+ (['/dev/sda'], True, ["lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"]),
+ # preview and multiple disks work the same
+ (['/dev/sda', '/dev/sdb'], True,
+ ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"]),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
+ host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+ preview = preview
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
+ assert all(any(cmd in exp_cmd for exp_cmd in exp_commands)
+ for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
+
+ @pytest.mark.parametrize(
+ "devices, preview, exp_commands",
+ [
+ # one data device, no preview
+ (['/dev/sda'], False, ["raw prepare --bluestore --data /dev/sda"]),
+ # multiple data devices, no preview
+ (['/dev/sda', '/dev/sdb'], False,
+ ["raw prepare --bluestore --data /dev/sda", "raw prepare --bluestore --data /dev/sdb"]),
+ # one data device, preview
+ (['/dev/sda'], True, ["raw prepare --bluestore --data /dev/sda --report --format json"]),
+ # multiple data devices, preview
+ (['/dev/sda', '/dev/sdb'], True,
+ ["raw prepare --bluestore --data /dev/sda --report --format json", "raw prepare --bluestore --data /dev/sdb --report --format json"]),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_raw_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(service_id='test.spec', method='raw', placement=PlacementSpec(
+ host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+ preview = preview
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
+ assert all(any(cmd in exp_cmd for exp_cmd in exp_commands)
+ for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='osd.0',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ )
+ ])
+ ))
+ @mock.patch("cephadm.services.osd.OSD.exists", True)
+ @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
+ def test_remove_osds(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
+ wait(cephadm_module, c)
+
+ c = cephadm_module.remove_daemons(['osd.0'])
+ out = wait(cephadm_module, c)
+ assert out == ["Removed osd.0 from host 'test'"]
+
+ cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0,
+ replace=False,
+ force=False,
+ hostname='test',
+ process_started_at=datetime_now(),
+ remove_util=cephadm_module.to_remove_osds.rm_util
+ ))
+ cephadm_module.to_remove_osds.process_removal_queue()
+ assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
+
+ c = cephadm_module.remove_osds_status()
+ out = wait(cephadm_module, c)
+ assert out == []
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_rgw_update(self, cephadm_module):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_service(cephadm_module, RGWSpec(service_id="foo", unmanaged=True)):
+ ps = PlacementSpec(hosts=['host1'], count=1)
+ c = cephadm_module.add_daemon(
+ RGWSpec(service_id="foo", placement=ps))
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed rgw.foo.* on host 'host1'")
+
+ ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
+ r = CephadmServe(cephadm_module)._apply_service(
+ RGWSpec(service_id="foo", placement=ps))
+ assert r
+
+ assert_rm_daemon(cephadm_module, 'rgw.foo', 'host1')
+ assert_rm_daemon(cephadm_module, 'rgw.foo', 'host2')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='rgw.myrgw.myhost.myid',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ )
+ ])
+ ))
+ def test_remove_daemon(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
+ wait(cephadm_module, c)
+ c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
+ out = wait(cephadm_module, c)
+ assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2: # type: DaemonDescription
+ CephadmServe(cephadm_module)._check_for_moved_osds()
+ # both are in status "starting"
+ assert len(cephadm_module.cache.get_daemons()) == 2
+
+ dd1.status = DaemonDescriptionStatus.running
+ dd2.status = DaemonDescriptionStatus.error
+ cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1})
+ cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2})
+ CephadmServe(cephadm_module)._check_for_moved_osds()
+ assert len(cephadm_module.cache.get_daemons()) == 1
+
+ assert cephadm_module.events.get_for_daemon('osd.1') == [
+ OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO',
+ "Deployed osd.1 on host 'host1'"),
+ OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO',
+ "Deployed osd.1 on host 'host2'"),
+ OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO',
+ "Removed duplicated daemon on host 'host2'"),
+ ]
+
+ with pytest.raises(AssertionError):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'osd.1',
+ })
+
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'osd.1',
+ })
+
+ @pytest.mark.parametrize(
+ "spec",
+ [
+ ServiceSpec('crash'),
+ ServiceSpec('prometheus'),
+ ServiceSpec('grafana'),
+ ServiceSpec('node-exporter'),
+ ServiceSpec('alertmanager'),
+ ServiceSpec('rbd-mirror'),
+ ServiceSpec('cephfs-mirror'),
+ ServiceSpec('mds', service_id='fsname'),
+ RGWSpec(rgw_realm='realm', rgw_zone='zone'),
+ RGWSpec(service_id="foo"),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_add(self, spec: ServiceSpec, cephadm_module):
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, unmanaged_spec):
+ with with_daemon(cephadm_module, spec, 'test'):
+ pass
+
+ @pytest.mark.parametrize(
+ "entity,success,spec",
+ [
+ ('mgr.x', True, ServiceSpec(
+ service_type='mgr',
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ('client.rgw.x', True, ServiceSpec(
+ service_type='rgw',
+ service_id='id',
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ('client.nfs.x', True, ServiceSpec(
+ service_type='nfs',
+ service_id='id',
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ('mon.', False, ServiceSpec(
+ service_type='mon',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec('test', '127.0.0.0/24', 'x')], count=1),
+ unmanaged=True)
+ ), # noqa: E124
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
+ def test_daemon_add_fail(self, _run_cephadm, entity, success, spec, cephadm_module):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec):
+ _run_cephadm.side_effect = OrchestratorError('fail')
+ with pytest.raises(OrchestratorError):
+ wait(cephadm_module, cephadm_module.add_daemon(spec))
+ if success:
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+ else:
+ with pytest.raises(AssertionError):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': entity,
+ })
+ assert cephadm_module.events.get_for_service(spec.service_name()) == [
+ OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'INFO',
+ "service was created"),
+ OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'ERROR',
+ "fail"),
+ ]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_daemon_place_fail_health_warning(self, _run_cephadm, cephadm_module):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ _run_cephadm.side_effect = OrchestratorError('fail')
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
+ assert not r
+ assert cephadm_module.health_checks.get('CEPHADM_DAEMON_PLACE_FAIL') is not None
+ assert cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['count'] == 1
+ assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks[
+ 'CEPHADM_DAEMON_PLACE_FAIL']['summary']
+ assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks[
+ 'CEPHADM_DAEMON_PLACE_FAIL']['detail']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ CephadmServe(cephadm_module)._apply_all_services()
+ ps = PlacementSpec(hosts=['fail'], count=1)
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
+ assert not r
+ assert cephadm_module.apply_spec_fails
+ assert cephadm_module.health_checks.get('CEPHADM_APPLY_SPEC_FAIL') is not None
+ assert cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['count'] == 1
+ assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks[
+ 'CEPHADM_APPLY_SPEC_FAIL']['summary']
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.HostCache.save_host_devices")
+ def test_invalid_config_option_health_warning(self, _save_devs, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+ _save_devs.return_value = None
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+ get_foreign_ceph_option.side_effect = KeyError
+ CephadmServe(cephadm_module)._apply_service_config(
+ ServiceSpec('mgr', placement=ps, config={'test': 'foo'}))
+ assert cephadm_module.health_checks.get('CEPHADM_INVALID_CONFIG_OPTION') is not None
+ assert cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['count'] == 1
+ assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks[
+ 'CEPHADM_INVALID_CONFIG_OPTION']['summary']
+ assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[
+ 'CEPHADM_INVALID_CONFIG_OPTION']['detail']
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+ def test_save_devices(self, _set_store, _run_cephadm, _get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ entry_size = 65536 # default 64k size
+ _get_foreign_ceph_option.return_value = entry_size
+
+ class FakeDev():
+ def __init__(self, c: str = 'a'):
+ # using 1015 here makes the serialized string exactly 1024 bytes if c is one char
+ self.content = {c: c * 1015}
+ self.path = 'dev/vdc'
+
+ def to_json(self):
+ return self.content
+
+ def from_json(self, stuff):
+ return json.loads(stuff)
+
+ def byte_len(s):
+ return len(s.encode('utf-8'))
+
+ with with_host(cephadm_module, 'test'):
+ fake_devices = [FakeDev()] * 100 # should be ~100k
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 34], 'entries': 3})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 34]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 32]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ fake_devices = [FakeDev()] * 300 # should be ~300k
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size * 4
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 5
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50], 'entries': 6})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.3', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.4', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.5', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ fake_devices = [FakeDev()] * 62 # should be ~62k, just under cache size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 62], 'entries': 1})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ # should be ~64k but just over so it requires more entries
+ fake_devices = [FakeDev()] * 64
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 22], 'entries': 3})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 22]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 20]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ # test for actual content being correct using differing devices
+ entry_size = 3072
+ _get_foreign_ceph_option.return_value = entry_size
+ fake_devices = [FakeDev('a'), FakeDev('b'), FakeDev('c'), FakeDev('d'), FakeDev('e')]
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev('a'), FakeDev('b')]], 'entries': 3})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev('c'), FakeDev('d')]]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev('e')]]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_store")
+ def test_load_devices(self, _get_store, cephadm_module: CephadmOrchestrator):
+ def _fake_store(key):
+ if key == 'host.test.devices.0':
+ return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 9], 'entries': 3})
+ elif key == 'host.test.devices.1':
+ return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 7]})
+ elif key == 'host.test.devices.2':
+ return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 4]})
+ else:
+ raise Exception(f'Get store with unexpected value {key}')
+
+ _get_store.side_effect = _fake_store
+ devs = cephadm_module.cache.load_host_devices('test')
+ assert devs == [Device('/path')] * 20
+
+ @mock.patch("cephadm.module.Inventory.__contains__")
+ def test_check_stray_host_cache_entry(self, _contains, cephadm_module: CephadmOrchestrator):
+ def _fake_inv(key):
+ if key in ['host1', 'node02', 'host.something.com']:
+ return True
+ return False
+
+ _contains.side_effect = _fake_inv
+ assert cephadm_module.cache._get_host_cache_entry_status('host1') == HostCacheStatus.host
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'host.something.com') == HostCacheStatus.host
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'node02.devices.37') == HostCacheStatus.devices
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'host.something.com.devices.0') == HostCacheStatus.devices
+ assert cephadm_module.cache._get_host_cache_entry_status('hostXXX') == HostCacheStatus.stray
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'host.nothing.com') == HostCacheStatus.stray
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
+ def test_nfs(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = NFSServiceSpec(
+ service_id='name',
+ placement=ps)
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_service(cephadm_module, unmanaged_spec):
+ c = cephadm_module.add_daemon(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed nfs.name.* on host 'test'")
+
+ assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("subprocess.run", None)
+ @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
+ def test_iscsi(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ spec = IscsiServiceSpec(
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password',
+ placement=ps)
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_service(cephadm_module, unmanaged_spec):
+
+ c = cephadm_module.add_daemon(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed iscsi.name.* on host 'test'")
+
+ assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
+
+ @pytest.mark.parametrize(
+ "on_bool",
+ [
+ True,
+ False
+ ]
+ )
+ @pytest.mark.parametrize(
+ "fault_ident",
+ [
+ 'fault',
+ 'ident'
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')])
+ on_off = 'on' if on_bool else 'off'
+ assert wait(cephadm_module, c) == [f'Set {fault_ident} light for test: {on_off}']
+ _run_cephadm.assert_called_with('test', 'osd', 'shell', [
+ '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_store('blink_device_light_cmd', 'echo hello')
+ c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')])
+ assert wait(cephadm_module, c) == ['Set ident light for test: on']
+ _run_cephadm.assert_called_with('test', 'osd', 'shell', [
+ '--', 'echo', 'hello'], error_ok=True)
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'mgr0'):
+ cephadm_module.set_store('mgr0/blink_device_light_cmd',
+ 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'')
+ c = cephadm_module.blink_device_light(
+ 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')])
+ assert wait(cephadm_module, c) == [
+ 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on']
+ _run_cephadm.assert_called_with('mgr0', 'osd', 'shell', [
+ '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784'
+ ], error_ok=True)
+
+ @pytest.mark.parametrize(
+ "spec, meth",
+ [
+ (ServiceSpec('mgr'), CephadmOrchestrator.apply_mgr),
+ (ServiceSpec('crash'), CephadmOrchestrator.apply_crash),
+ (ServiceSpec('prometheus'), CephadmOrchestrator.apply_prometheus),
+ (ServiceSpec('grafana'), CephadmOrchestrator.apply_grafana),
+ (ServiceSpec('node-exporter'), CephadmOrchestrator.apply_node_exporter),
+ (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager),
+ (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror),
+ (ServiceSpec('cephfs-mirror'), CephadmOrchestrator.apply_rbd_mirror),
+ (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds),
+ (ServiceSpec(
+ 'mds', service_id='fsname',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec(
+ hostname='test',
+ name='fsname',
+ network=''
+ )]
+ )
+ ), CephadmOrchestrator.apply_mds),
+ (RGWSpec(service_id='foo'), CephadmOrchestrator.apply_rgw),
+ (RGWSpec(
+ service_id='bar',
+ rgw_realm='realm', rgw_zone='zone',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec(
+ hostname='test',
+ name='bar',
+ network=''
+ )]
+ )
+ ), CephadmOrchestrator.apply_rgw),
+ (NFSServiceSpec(
+ service_id='name',
+ ), CephadmOrchestrator.apply_nfs),
+ (IscsiServiceSpec(
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password'
+ ), CephadmOrchestrator.apply_iscsi),
+ (CustomContainerSpec(
+ service_id='hello-world',
+ image='docker.io/library/hello-world:latest',
+ uid=65534,
+ gid=65534,
+ dirs=['foo/bar'],
+ files={
+ 'foo/bar/xyz.conf': 'aaa\nbbb'
+ },
+ bind_mounts=[[
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]],
+ volume_mounts={
+ 'foo/bar': '/foo/bar:Z'
+ },
+ args=['--no-healthcheck'],
+ envs=['SECRET=password'],
+ ports=[8080, 8443]
+ ), CephadmOrchestrator.apply_container),
+ ]
+ )
+ @mock.patch("subprocess.run", None)
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
+ @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
+ @mock.patch("subprocess.run", mock.MagicMock())
+ def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec, meth, 'test'):
+ pass
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
+ spec = MDSSpec('mds', service_id='fsname', config={'test': 'foo'})
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec, host='test'):
+ ret, out, err = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': spec.service_name(),
+ 'key': 'mds_join_fs',
+ })
+ assert out == 'fsname'
+ ret, out, err = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': spec.service_name(),
+ 'key': 'mds_join_fs',
+ })
+ assert not out
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
+ def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
+ spec = MDSSpec(
+ 'mds',
+ service_id='fsname',
+ placement=PlacementSpec(hosts=['host1', 'host2']),
+ config={'test': 'foo'}
+ )
+ with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'):
+ c = cephadm_module.apply_mds(spec)
+ out = wait(cephadm_module, c)
+ match_glob(out, "Scheduled mds.fsname update...")
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ [daemon] = cephadm_module.cache.daemons['host1'].keys()
+
+ spec.placement.set_hosts(['host2'])
+
+ ok_to_stop.side_effect = False
+
+ c = cephadm_module.apply_mds(spec)
+ out = wait(cephadm_module, c)
+ match_glob(out, "Scheduled mds.fsname update...")
+ CephadmServe(cephadm_module)._apply_all_services()
+
+ ok_to_stop.assert_called_with([daemon[4:]], force=True)
+
+ assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop
+ assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_dont_touch_offline_or_maintenance_host_daemons(self, cephadm_module):
+ # test daemons on offline/maint hosts not removed when applying specs
+ # test daemons not added to hosts in maint/offline state
+ with with_host(cephadm_module, 'test1'):
+ with with_host(cephadm_module, 'test2'):
+ with with_host(cephadm_module, 'test3'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*'))):
+ # should get a mgr on all 3 hosts
+ # CephadmServe(cephadm_module)._apply_all_services()
+ assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3
+
+ # put one host in offline state and one host in maintenance state
+ cephadm_module.offline_hosts = {'test2'}
+ cephadm_module.inventory._inventory['test3']['status'] = 'maintenance'
+ cephadm_module.inventory.save()
+
+ # being in offline/maint mode should disqualify hosts from being
+ # candidates for scheduling
+ assert cephadm_module.cache.is_host_schedulable('test2')
+ assert cephadm_module.cache.is_host_schedulable('test3')
+
+ assert cephadm_module.cache.is_host_unreachable('test2')
+ assert cephadm_module.cache.is_host_unreachable('test3')
+
+ with with_service(cephadm_module, ServiceSpec('crash', placement=PlacementSpec(host_pattern='*'))):
+ # re-apply services. No mgr should be removed from maint/offline hosts
+ # crash daemon should only be on host not in maint/offline mode
+ CephadmServe(cephadm_module)._apply_all_services()
+ assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3
+ assert len(cephadm_module.cache.get_daemons_by_type('crash')) == 1
+
+ cephadm_module.offline_hosts = {}
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_enter_success(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nsuccess - systemd target xxx disabled'], 0))
+ _host_ok.return_value = 0, 'it is okay'
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname))
+ # should not raise an error
+ retval = cephadm_module.enter_host_maintenance(hostname)
+ assert retval.result_str().startswith('Daemons for Ceph cluster')
+ assert not retval.exception_str
+ assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_enter_failure(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nfailed - disable the target'], 0))
+ _host_ok.return_value = 0, 'it is okay'
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname))
+
+ with pytest.raises(OrchestratorError, match='Failed to place host1 into maintenance for cluster fsid'):
+ cephadm_module.enter_host_maintenance(hostname)
+
+ assert not cephadm_module.inventory._inventory[hostname]['status']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_enter_i_really_mean_it(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ err_str = 'some kind of error'
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nfailed - disable the target'], 0))
+ _host_ok.return_value = 1, err_str
+ _get_daemon_types.return_value = ['mon']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname))
+
+ with pytest.raises(OrchestratorError, match=err_str):
+ cephadm_module.enter_host_maintenance(hostname)
+ assert not cephadm_module.inventory._inventory[hostname]['status']
+
+ with pytest.raises(OrchestratorError, match=err_str):
+ cephadm_module.enter_host_maintenance(hostname, force=True)
+ assert not cephadm_module.inventory._inventory[hostname]['status']
+
+ retval = cephadm_module.enter_host_maintenance(hostname, force=True, yes_i_really_mean_it=True)
+ assert retval.result_str().startswith('Daemons for Ceph cluster')
+ assert not retval.exception_str
+ assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_exit_success(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.side_effect = async_side_effect(([''], [
+ 'something\nsuccess - systemd target xxx enabled and started'], 0))
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance'))
+ # should not raise an error
+ retval = cephadm_module.exit_host_maintenance(hostname)
+ assert retval.result_str().startswith('Ceph cluster')
+ assert not retval.exception_str
+ assert not cephadm_module.inventory._inventory[hostname]['status']
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.HostCache.get_daemon_types")
+ @mock.patch("cephadm.module.HostCache.get_hosts")
+ def test_maintenance_exit_failure(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ hostname = 'host1'
+ _run_cephadm.side_effect = async_side_effect(
+ ([''], ['something\nfailed - unable to enable the target'], 0))
+ _get_daemon_types.return_value = ['crash']
+ _hosts.return_value = [hostname, 'other_host']
+ cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance'))
+
+ with pytest.raises(OrchestratorError, match='Failed to exit maintenance state for host host1, cluster fsid'):
+ cephadm_module.exit_host_maintenance(hostname)
+
+ assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance'
+
+ @mock.patch("cephadm.ssh.SSHManager._remote_connection")
+ @mock.patch("cephadm.ssh.SSHManager._execute_command")
+ @mock.patch("cephadm.ssh.SSHManager._check_execute_command")
+ @mock.patch("cephadm.ssh.SSHManager._write_remote_file")
+ def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module):
+ _write_file.side_effect = async_side_effect(None)
+ check_execute_command.side_effect = async_side_effect('')
+ execute_command.side_effect = async_side_effect(('{}', '', 0))
+ remote_connection.side_effect = async_side_effect(mock.Mock())
+
+ assert cephadm_module.manage_etc_ceph_ceph_conf is False
+
+ with with_host(cephadm_module, 'test'):
+ assert '/etc/ceph/ceph.conf' not in cephadm_module.cache.get_host_client_files('test')
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True)
+ cephadm_module.config_notify()
+ assert cephadm_module.manage_etc_ceph_ceph_conf is True
+
+ CephadmServe(cephadm_module)._write_all_client_files()
+ # Make sure both ceph conf locations (default and per fsid) are called
+ _write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'',
+ 0o644, 0, 0, None),
+ mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'',
+ 0o644, 0, 0, None)]
+ )
+ ceph_conf_files = cephadm_module.cache.get_host_client_files('test')
+ assert len(ceph_conf_files) == 2
+ assert '/etc/ceph/ceph.conf' in ceph_conf_files
+ assert '/var/lib/ceph/fsid/config/ceph.conf' in ceph_conf_files
+
+ # set extra config and expect that we deploy another ceph.conf
+ cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
+ CephadmServe(cephadm_module)._write_all_client_files()
+ _write_file.assert_has_calls([mock.call('test',
+ '/etc/ceph/ceph.conf',
+ b'[mon]\nk=v\n', 0o644, 0, 0, None),
+ mock.call('test',
+ '/var/lib/ceph/fsid/config/ceph.conf',
+ b'[mon]\nk=v\n', 0o644, 0, 0, None)])
+ # reload
+ cephadm_module.cache.last_client_files = {}
+ cephadm_module.cache.load()
+
+ ceph_conf_files = cephadm_module.cache.get_host_client_files('test')
+ assert len(ceph_conf_files) == 2
+ assert '/etc/ceph/ceph.conf' in ceph_conf_files
+ assert '/var/lib/ceph/fsid/config/ceph.conf' in ceph_conf_files
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ f1_before_digest = cephadm_module.cache.get_host_client_files('test')[
+ '/etc/ceph/ceph.conf'][0]
+ f2_before_digest = cephadm_module.cache.get_host_client_files(
+ 'test')['/var/lib/ceph/fsid/config/ceph.conf'][0]
+ cephadm_module._set_extra_ceph_conf('[mon]\nk2=v2')
+ CephadmServe(cephadm_module)._write_all_client_files()
+ f1_after_digest = cephadm_module.cache.get_host_client_files('test')[
+ '/etc/ceph/ceph.conf'][0]
+ f2_after_digest = cephadm_module.cache.get_host_client_files(
+ 'test')['/var/lib/ceph/fsid/config/ceph.conf'][0]
+ assert f1_before_digest != f1_after_digest
+ assert f2_before_digest != f2_after_digest
+
+ @mock.patch("cephadm.inventory.HostCache.get_host_client_files")
+ def test_dont_write_client_files_to_unreachable_hosts(self, _get_client_files, cephadm_module):
+ cephadm_module.inventory.add_host(HostSpec('host1', '1.2.3.1')) # online
+ cephadm_module.inventory.add_host(HostSpec('host2', '1.2.3.2')) # maintenance
+ cephadm_module.inventory.add_host(HostSpec('host3', '1.2.3.3')) # offline
+
+ # mark host2 as maintenance and host3 as offline
+ cephadm_module.inventory._inventory['host2']['status'] = 'maintenance'
+ cephadm_module.offline_hosts.add('host3')
+
+ # verify host2 and host3 are correctly marked as unreachable but host1 is not
+ assert not cephadm_module.cache.is_host_unreachable('host1')
+ assert cephadm_module.cache.is_host_unreachable('host2')
+ assert cephadm_module.cache.is_host_unreachable('host3')
+
+ _get_client_files.side_effect = Exception('Called _get_client_files')
+
+ # with the online host, should call _get_client_files which
+ # we have setup to raise an Exception
+ with pytest.raises(Exception, match='Called _get_client_files'):
+ CephadmServe(cephadm_module)._write_client_files({}, 'host1')
+
+ # for the maintenance and offline host, _get_client_files should
+ # not be called and it should just return immediately with nothing
+ # having been raised
+ CephadmServe(cephadm_module)._write_client_files({}, 'host2')
+ CephadmServe(cephadm_module)._write_client_files({}, 'host3')
+
+ def test_etc_ceph_init(self):
+ with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
+ assert m.manage_etc_ceph_ceph_conf is True
+
+ @mock.patch("cephadm.CephadmOrchestrator.check_mon_command")
+ @mock.patch("cephadm.CephadmOrchestrator.extra_ceph_conf")
+ def test_extra_ceph_conf(self, _extra_ceph_conf, _check_mon_cmd, cephadm_module: CephadmOrchestrator):
+ # settings put into the [global] section in the extra conf
+ # need to be appended to existing [global] section in given
+ # minimal ceph conf, but anything in another section (e.g. [mon])
+ # needs to continue to be its own section
+
+ # this is the conf "ceph generate-minimal-conf" will return in this test
+ _check_mon_cmd.return_value = (0, """[global]
+global_k1 = global_v1
+global_k2 = global_v2
+[mon]
+mon_k1 = mon_v1
+[osd]
+osd_k1 = osd_v1
+osd_k2 = osd_v2
+""", '')
+
+ # test with extra ceph conf that has some of the sections from minimal conf
+ _extra_ceph_conf.return_value = CephadmOrchestrator.ExtraCephConf(conf="""[mon]
+mon_k2 = mon_v2
+[global]
+global_k3 = global_v3
+""", last_modified=datetime_now())
+
+ expected_combined_conf = """[global]
+global_k1 = global_v1
+global_k2 = global_v2
+global_k3 = global_v3
+
+[mon]
+mon_k1 = mon_v1
+mon_k2 = mon_v2
+
+[osd]
+osd_k1 = osd_v1
+osd_k2 = osd_v2
+"""
+
+ assert cephadm_module.get_minimal_ceph_conf() == expected_combined_conf
+
+ def test_client_keyrings_special_host_labels(self, cephadm_module):
+ cephadm_module.inventory.add_host(HostSpec('host1', labels=['keyring1']))
+ cephadm_module.inventory.add_host(HostSpec('host2', labels=['keyring1', SpecialHostLabels.DRAIN_DAEMONS]))
+ cephadm_module.inventory.add_host(HostSpec('host3', labels=['keyring1', SpecialHostLabels.DRAIN_DAEMONS, SpecialHostLabels.DRAIN_CONF_KEYRING]))
+ # hosts need to be marked as having had refresh to be available for placement
+ # so "refresh" with empty daemon list
+ cephadm_module.cache.update_host_daemons('host1', {})
+ cephadm_module.cache.update_host_daemons('host2', {})
+ cephadm_module.cache.update_host_daemons('host3', {})
+
+ assert 'host1' in [h.hostname for h in cephadm_module.cache.get_conf_keyring_available_hosts()]
+ assert 'host2' in [h.hostname for h in cephadm_module.cache.get_conf_keyring_available_hosts()]
+ assert 'host3' not in [h.hostname for h in cephadm_module.cache.get_conf_keyring_available_hosts()]
+
+ assert 'host1' not in [h.hostname for h in cephadm_module.cache.get_conf_keyring_draining_hosts()]
+ assert 'host2' not in [h.hostname for h in cephadm_module.cache.get_conf_keyring_draining_hosts()]
+ assert 'host3' in [h.hostname for h in cephadm_module.cache.get_conf_keyring_draining_hosts()]
+
+ cephadm_module.keys.update(ClientKeyringSpec('keyring1', PlacementSpec(label='keyring1')))
+
+ with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
+ _mon_cmd.return_value = (0, 'real-keyring', '')
+ client_files = CephadmServe(cephadm_module)._calc_client_files()
+ assert 'host1' in client_files.keys()
+ assert '/etc/ceph/ceph.keyring1.keyring' in client_files['host1'].keys()
+ assert 'host2' in client_files.keys()
+ assert '/etc/ceph/ceph.keyring1.keyring' in client_files['host2'].keys()
+ assert 'host3' not in client_files.keys()
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ def check_registry_credentials(url, username, password):
+ assert json.loads(cephadm_module.get_store('registry_credentials')) == {
+ 'url': url, 'username': username, 'password': password}
+
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test'):
+ # test successful login with valid args
+ code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password')
+ assert out == 'registry login scheduled'
+ assert err == ''
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test bad login attempt with invalid args
+ code, out, err = cephadm_module.registry_login('bad-args')
+ assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> "
+ "or -i <login credentials json file>")
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test bad login using invalid json file
+ code, out, err = cephadm_module.registry_login(
+ None, None, None, '{"bad-json": "bad-json"}')
+ assert err == ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test good login using valid json file
+ good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
+ " \"password\": \"" + "json-pass" + "\"}")
+ code, out, err = cephadm_module.registry_login(None, None, None, good_json)
+ assert out == 'registry login scheduled'
+ assert err == ''
+ check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ # test bad login where args are valid but login command fails
+ _run_cephadm.side_effect = async_side_effect(('{}', 'error', 1))
+ code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
+ assert err == 'Host test failed to login to fail-url as fail-user with given password'
+ check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digests': ['image@repo_digest'],
+ })))
+ @pytest.mark.parametrize("use_repo_digest",
+ [
+ False,
+ True
+ ])
+ def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
+ cephadm_module.use_repo_digest = use_repo_digest
+
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ cephadm_module.set_container_image('global', 'image')
+
+ if use_repo_digest:
+
+ CephadmServe(cephadm_module).convert_tags_to_repo_digest()
+
+ _, image, _ = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'global',
+ 'key': 'container_image',
+ })
+ if use_repo_digest:
+ assert image == 'image@repo_digest'
+ else:
+ assert image == 'image'
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+ error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch
+Traceback (most recent call last):
+ File "<stdin>", line 6112, in <module>
+ File "<stdin>", line 1299, in _infer_fsid
+ File "<stdin>", line 1382, in _infer_image
+ File "<stdin>", line 3612, in command_ceph_volume
+ File "<stdin>", line 1061, in call_throws"""
+
+ with with_host(cephadm_module, 'test'):
+ _run_cephadm.reset_mock()
+ _run_cephadm.side_effect = OrchestratorError(error_message)
+
+ s = CephadmServe(cephadm_module)._refresh_host_devices('test')
+ assert s == 'host test `cephadm ceph-volume` failed: ' + error_message
+
+ assert _run_cephadm.mock_calls == [
+ mock.call('test', 'osd', 'ceph-volume',
+ ['--', 'inventory', '--format=json-pretty', '--filter-for-batch'], image='',
+ no_fsid=False, error_ok=False, log_output=False),
+ mock.call('test', 'osd', 'ceph-volume',
+ ['--', 'inventory', '--format=json-pretty'], image='',
+ no_fsid=False, error_ok=False, log_output=False),
+ ]
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_activate_datadevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1):
+ pass
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_activate_datadevice_fail(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {
+ 'osd': 1,
+ 'up_from': 0,
+ 'uuid': 'uuid'
+ }
+ ]
+ })
+
+ ceph_volume_lvm_list = {
+ '1': [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }]
+ }
+ _run_cephadm.reset_mock(return_value=True, side_effect=True)
+
+ async def _r_c(*args, **kwargs):
+ if 'ceph-volume' in args:
+ return (json.dumps(ceph_volume_lvm_list), '', 0)
+ else:
+ assert ['_orch', 'deploy'] in args
+ raise OrchestratorError("let's fail somehow")
+ _run_cephadm.side_effect = _r_c
+ assert cephadm_module._osd_activate(
+ ['test']).stderr == "let's fail somehow"
+ with pytest.raises(AssertionError):
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'osd.1',
+ })
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_activate_datadevice_dbdevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+
+ async def _ceph_volume_list(s, host, entity, cmd, **kwargs):
+ logging.info(f'ceph-volume cmd: {cmd}')
+ if 'raw' in cmd:
+ return json.dumps({
+ "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": {
+ "ceph_fsid": "64c84f19-fe1d-452a-a731-ab19dc144aa8",
+ "device": "/dev/loop0",
+ "osd_id": 21,
+ "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5",
+ "type": "bluestore"
+ },
+ }), '', 0
+ if 'lvm' in cmd:
+ return json.dumps({
+ '1': [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }, {
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'db'
+ }]
+ }), '', 0
+ return '{}', '', 0
+
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1, ceph_volume_lvm_list=_ceph_volume_list):
+ pass
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_osd_count(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ dg = DriveGroupSpec(service_id='', data_devices=DeviceSelection(all=True))
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ with with_service(cephadm_module, dg, host='test'):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1):
+ assert wait(cephadm_module, cephadm_module.describe_service())[0].size == 1
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_host_rm_last_admin(self, cephadm_module: CephadmOrchestrator):
+ with pytest.raises(OrchestratorError):
+ with with_host(cephadm_module, 'test', refresh_hosts=False, rm_with_force=False):
+ cephadm_module.inventory.add_label('test', SpecialHostLabels.ADMIN)
+ pass
+ assert False
+ with with_host(cephadm_module, 'test1', refresh_hosts=False, rm_with_force=True):
+ with with_host(cephadm_module, 'test2', refresh_hosts=False, rm_with_force=False):
+ cephadm_module.inventory.add_label('test2', SpecialHostLabels.ADMIN)
+
+ @pytest.mark.parametrize("facts, settings, expected_value",
+ [
+ # All options are available on all hosts
+ (
+ {
+ "host1":
+ {
+ "sysctl_options":
+ {
+ 'opt1': 'val1',
+ 'opt2': 'val2',
+ }
+ },
+ "host2":
+ {
+ "sysctl_options":
+ {
+ 'opt1': '',
+ 'opt2': '',
+ }
+ },
+ },
+ {'opt1', 'opt2'}, # settings
+ {'host1': [], 'host2': []} # expected_value
+ ),
+ # opt1 is missing on host 1, opt2 is missing on host2
+ ({
+ "host1":
+ {
+ "sysctl_options":
+ {
+ 'opt2': '',
+ 'optX': '',
+ }
+ },
+ "host2":
+ {
+ "sysctl_options":
+ {
+ 'opt1': '',
+ 'opt3': '',
+ 'opt4': '',
+ }
+ },
+ },
+ {'opt1', 'opt2'}, # settings
+ {'host1': ['opt1'], 'host2': ['opt2']} # expected_value
+ ),
+ # All options are missing on all hosts
+ ({
+ "host1":
+ {
+ "sysctl_options":
+ {
+ }
+ },
+ "host2":
+ {
+ "sysctl_options":
+ {
+ }
+ },
+ },
+ {'opt1', 'opt2'}, # settings
+ {'host1': ['opt1', 'opt2'], 'host2': [
+ 'opt1', 'opt2']} # expected_value
+ ),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_tuned_profiles_settings_validation(self, facts, settings, expected_value, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ spec = mock.Mock()
+ spec.settings = sorted(settings)
+ spec.placement.filter_matching_hostspecs = mock.Mock()
+ spec.placement.filter_matching_hostspecs.return_value = ['host1', 'host2']
+ cephadm_module.cache.facts = facts
+ assert cephadm_module._validate_tunedprofile_settings(spec) == expected_value
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+ def test_tuned_profiles_validation(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+
+ with pytest.raises(OrchestratorError, match="^Invalid placement specification.+"):
+ spec = mock.Mock()
+ spec.settings = {'a': 'b'}
+ spec.placement = PlacementSpec(hosts=[])
+ cephadm_module._validate_tuned_profile_spec(spec)
+
+ with pytest.raises(OrchestratorError, match="Invalid spec: settings section cannot be empty."):
+ spec = mock.Mock()
+ spec.settings = {}
+ spec.placement = PlacementSpec(hosts=['host1', 'host2'])
+ cephadm_module._validate_tuned_profile_spec(spec)
+
+ with pytest.raises(OrchestratorError, match="^Placement 'count' field is no supported .+"):
+ spec = mock.Mock()
+ spec.settings = {'a': 'b'}
+ spec.placement = PlacementSpec(count=1)
+ cephadm_module._validate_tuned_profile_spec(spec)
+
+ with pytest.raises(OrchestratorError, match="^Placement 'count_per_host' field is no supported .+"):
+ spec = mock.Mock()
+ spec.settings = {'a': 'b'}
+ spec.placement = PlacementSpec(count_per_host=1, label='foo')
+ cephadm_module._validate_tuned_profile_spec(spec)
+
+ with pytest.raises(OrchestratorError, match="^Found invalid host"):
+ spec = mock.Mock()
+ spec.settings = {'a': 'b'}
+ spec.placement = PlacementSpec(hosts=['host1', 'host2'])
+ cephadm_module.inventory = mock.Mock()
+ cephadm_module.inventory.all_specs = mock.Mock(
+ return_value=[mock.Mock().hostname, mock.Mock().hostname])
+ cephadm_module._validate_tuned_profile_spec(spec)
+
+ def test_set_unmanaged(self, cephadm_module):
+ cephadm_module.spec_store._specs['crash'] = ServiceSpec('crash', unmanaged=False)
+ assert not cephadm_module.spec_store._specs['crash'].unmanaged
+ cephadm_module.spec_store.set_unmanaged('crash', True)
+ assert cephadm_module.spec_store._specs['crash'].unmanaged
+ cephadm_module.spec_store.set_unmanaged('crash', False)
+ assert not cephadm_module.spec_store._specs['crash'].unmanaged
+
+ def test_inventory_known_hostnames(self, cephadm_module):
+ cephadm_module.inventory.add_host(HostSpec('host1', '1.2.3.1'))
+ cephadm_module.inventory.add_host(HostSpec('host2', '1.2.3.2'))
+ cephadm_module.inventory.add_host(HostSpec('host3.domain', '1.2.3.3'))
+ cephadm_module.inventory.add_host(HostSpec('host4.domain', '1.2.3.4'))
+ cephadm_module.inventory.add_host(HostSpec('host5', '1.2.3.5'))
+
+ # update_known_hostname expects args to be <hostname, shortname, fqdn>
+ # as are gathered from cephadm gather-facts. Although, passing the
+ # names in the wrong order should actually have no effect on functionality
+ cephadm_module.inventory.update_known_hostnames('host1', 'host1', 'host1.domain')
+ cephadm_module.inventory.update_known_hostnames('host2.domain', 'host2', 'host2.domain')
+ cephadm_module.inventory.update_known_hostnames('host3', 'host3', 'host3.domain')
+ cephadm_module.inventory.update_known_hostnames('host4.domain', 'host4', 'host4.domain')
+ cephadm_module.inventory.update_known_hostnames('host5', 'host5', 'host5')
+
+ assert 'host1' in cephadm_module.inventory
+ assert 'host1.domain' in cephadm_module.inventory
+ assert cephadm_module.inventory.get_addr('host1') == '1.2.3.1'
+ assert cephadm_module.inventory.get_addr('host1.domain') == '1.2.3.1'
+
+ assert 'host2' in cephadm_module.inventory
+ assert 'host2.domain' in cephadm_module.inventory
+ assert cephadm_module.inventory.get_addr('host2') == '1.2.3.2'
+ assert cephadm_module.inventory.get_addr('host2.domain') == '1.2.3.2'
+
+ assert 'host3' in cephadm_module.inventory
+ assert 'host3.domain' in cephadm_module.inventory
+ assert cephadm_module.inventory.get_addr('host3') == '1.2.3.3'
+ assert cephadm_module.inventory.get_addr('host3.domain') == '1.2.3.3'
+
+ assert 'host4' in cephadm_module.inventory
+ assert 'host4.domain' in cephadm_module.inventory
+ assert cephadm_module.inventory.get_addr('host4') == '1.2.3.4'
+ assert cephadm_module.inventory.get_addr('host4.domain') == '1.2.3.4'
+
+ assert 'host4.otherdomain' not in cephadm_module.inventory
+ with pytest.raises(OrchestratorError):
+ cephadm_module.inventory.get_addr('host4.otherdomain')
+
+ assert 'host5' in cephadm_module.inventory
+ assert cephadm_module.inventory.get_addr('host5') == '1.2.3.5'
+ with pytest.raises(OrchestratorError):
+ cephadm_module.inventory.get_addr('host5.domain')
+
+ def test_async_timeout_handler(self, cephadm_module):
+ cephadm_module.default_cephadm_command_timeout = 900
+
+ async def _timeout():
+ raise asyncio.TimeoutError
+
+ with pytest.raises(OrchestratorError, match=r'Command timed out \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler():
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command timed out on host hostA \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler('hostA'):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command "testing" timed out \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler(cmd='testing'):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command "testing" timed out on host hostB \(default 900 second timeout\)'):
+ with cephadm_module.async_timeout_handler('hostB', 'testing'):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command timed out \(non-default 111 second timeout\)'):
+ with cephadm_module.async_timeout_handler(timeout=111):
+ cephadm_module.wait_async(_timeout())
+
+ with pytest.raises(OrchestratorError, match=r'Command "very slow" timed out on host hostC \(non-default 999 second timeout\)'):
+ with cephadm_module.async_timeout_handler('hostC', 'very slow', 999):
+ cephadm_module.wait_async(_timeout())
+
+ @mock.patch("cephadm.CephadmOrchestrator.remove_osds")
+ @mock.patch("cephadm.CephadmOrchestrator.add_host_label", lambda *a, **kw: None)
+ @mock.patch("cephadm.inventory.HostCache.get_daemons_by_host", lambda *a, **kw: [])
+ def test_host_drain_zap(self, _rm_osds, cephadm_module):
+ # pass force=true in these tests to bypass _admin label check
+ cephadm_module.drain_host('host1', force=True, zap_osd_devices=False)
+ assert _rm_osds.called_with([], zap=False)
+
+ cephadm_module.drain_host('host1', force=True, zap_osd_devices=True)
+ assert _rm_osds.called_with([], zap=True)
+
+ def test_process_ls_output(self, cephadm_module):
+ sample_ls_output = """[
+ {
+ "style": "cephadm:v1",
+ "name": "mon.vm-00",
+ "fsid": "588f83ba-5995-11ee-9e94-52540057a206",
+ "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@mon.vm-00",
+ "enabled": true,
+ "state": "running",
+ "service_name": "mon",
+ "ports": [],
+ "ip": null,
+ "deployed_by": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "rank": null,
+ "rank_generation": null,
+ "extra_container_args": null,
+ "extra_entrypoint_args": null,
+ "memory_request": null,
+ "memory_limit": null,
+ "container_id": "b170b964a6e2918955362eb36195627c6086d3f859d4ebce2ee13f3ee4738733",
+ "container_image_name": "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3",
+ "container_image_id": "674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55",
+ "container_image_digests": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "memory_usage": 56214159,
+ "cpu_percentage": "2.32%",
+ "version": "18.0.0-5185-g7b3a4f2b",
+ "started": "2023-09-22T22:31:11.752300Z",
+ "created": "2023-09-22T22:15:24.121387Z",
+ "deployed": "2023-09-22T22:31:10.383431Z",
+ "configured": "2023-09-22T22:31:11.859440Z"
+ },
+ {
+ "style": "cephadm:v1",
+ "name": "mgr.vm-00.mpexeg",
+ "fsid": "588f83ba-5995-11ee-9e94-52540057a206",
+ "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@mgr.vm-00.mpexeg",
+ "enabled": true,
+ "state": "running",
+ "service_name": "mgr",
+ "ports": [
+ 8443,
+ 9283,
+ 8765
+ ],
+ "ip": null,
+ "deployed_by": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "rank": null,
+ "rank_generation": null,
+ "extra_container_args": null,
+ "extra_entrypoint_args": null,
+ "memory_request": null,
+ "memory_limit": null,
+ "container_id": "6e7756cef553a25a2a84227e8755d3d25046b9cd8758b23c698d34b3af895242",
+ "container_image_name": "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3",
+ "container_image_id": "674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55",
+ "container_image_digests": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "memory_usage": 529740595,
+ "cpu_percentage": "8.35%",
+ "version": "18.0.0-5185-g7b3a4f2b",
+ "started": "2023-09-22T22:30:18.587021Z",
+ "created": "2023-09-22T22:15:29.101409Z",
+ "deployed": "2023-09-22T22:30:17.339114Z",
+ "configured": "2023-09-22T22:30:18.758122Z"
+ },
+ {
+ "style": "cephadm:v1",
+ "name": "agent.vm-00",
+ "fsid": "588f83ba-5995-11ee-9e94-52540057a206",
+ "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@agent.vm-00",
+ "enabled": true,
+ "state": "running",
+ "service_name": "agent",
+ "ports": [],
+ "ip": null,
+ "deployed_by": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "rank": null,
+ "rank_generation": null,
+ "extra_container_args": null,
+ "extra_entrypoint_args": null,
+ "container_id": null,
+ "container_image_name": null,
+ "container_image_id": null,
+ "container_image_digests": null,
+ "version": null,
+ "started": null,
+ "created": "2023-09-22T22:33:34.708289Z",
+ "deployed": null,
+ "configured": "2023-09-22T22:33:34.722289Z"
+ },
+ {
+ "style": "cephadm:v1",
+ "name": "osd.0",
+ "fsid": "588f83ba-5995-11ee-9e94-52540057a206",
+ "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@osd.0",
+ "enabled": true,
+ "state": "running",
+ "service_name": "osd.foo",
+ "ports": [],
+ "ip": null,
+ "deployed_by": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "rank": null,
+ "rank_generation": null,
+ "extra_container_args": null,
+ "extra_entrypoint_args": null,
+ "memory_request": null,
+ "memory_limit": null,
+ "container_id": "93f71c60820b86901a45b3b1fe3dba3e3e677b37fd22310b7e7da3f67bb8ccd6",
+ "container_image_name": "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3",
+ "container_image_id": "674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55",
+ "container_image_digests": [
+ "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3"
+ ],
+ "memory_usage": 73410805,
+ "cpu_percentage": "6.54%",
+ "version": "18.0.0-5185-g7b3a4f2b",
+ "started": "2023-09-22T22:41:29.019587Z",
+ "created": "2023-09-22T22:41:03.615080Z",
+ "deployed": "2023-09-22T22:41:24.965222Z",
+ "configured": "2023-09-22T22:41:29.119250Z"
+ }
+]"""
+
+ now = str_to_datetime('2023-09-22T22:45:29.119250Z')
+ cephadm_module._cluster_fsid = '588f83ba-5995-11ee-9e94-52540057a206'
+ with mock.patch("cephadm.module.datetime_now", lambda: now):
+ cephadm_module._process_ls_output('vm-00', json.loads(sample_ls_output))
+ assert 'vm-00' in cephadm_module.cache.daemons
+ assert 'mon.vm-00' in cephadm_module.cache.daemons['vm-00']
+ assert 'mgr.vm-00.mpexeg' in cephadm_module.cache.daemons['vm-00']
+ assert 'agent.vm-00' in cephadm_module.cache.daemons['vm-00']
+ assert 'osd.0' in cephadm_module.cache.daemons['vm-00']
+
+ daemons = cephadm_module.cache.get_daemons_by_host('vm-00')
+ c_img_ids = [dd.container_image_id for dd in daemons if dd.daemon_type != 'agent']
+ assert all(c_img_id == '674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55' for c_img_id in c_img_ids)
+ last_refreshes = [dd.last_refresh for dd in daemons]
+ assert all(lrf == now for lrf in last_refreshes)
+ versions = [dd.version for dd in daemons if dd.daemon_type != 'agent']
+ assert all(version == '18.0.0-5185-g7b3a4f2b' for version in versions)
+
+ osd = cephadm_module.cache.get_daemons_by_type('osd', 'vm-00')[0]
+ assert osd.cpu_percentage == '6.54%'
+ assert osd.memory_usage == 73410805
+ assert osd.created == str_to_datetime('2023-09-22T22:41:03.615080Z')