diff options
Diffstat (limited to 'src/pybind/mgr/cephadm/tests/test_cephadm.py')
-rw-r--r-- | src/pybind/mgr/cephadm/tests/test_cephadm.py | 2709 |
1 files changed, 2709 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py new file mode 100644 index 000000000..24fcb0280 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -0,0 +1,2709 @@ +import asyncio +import json +import logging + +from contextlib import contextmanager + +import pytest + +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection +from cephadm.serve import CephadmServe +from cephadm.inventory import HostCacheStatus, ClientKeyringSpec +from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims +from cephadm.utils import SpecialHostLabels + +try: + from typing import List +except ImportError: + pass + +from ceph.deployment.service_spec import ( + CustomConfig, + CustomContainerSpec, + HostPlacementSpec, + IscsiServiceSpec, + MDSSpec, + NFSServiceSpec, + PlacementSpec, + RGWSpec, + ServiceSpec, +) +from ceph.deployment.drive_selection.selector import DriveSelection +from ceph.deployment.inventory import Devices, Device +from ceph.utils import datetime_to_str, datetime_now, str_to_datetime +from orchestrator import DaemonDescription, InventoryHost, \ + HostSpec, OrchestratorError, DaemonDescriptionStatus, OrchestratorEvent +from tests import mock +from .fixtures import wait, _run_cephadm, match_glob, with_host, \ + with_cephadm_module, with_service, make_daemons_running, async_side_effect +from cephadm.module import CephadmOrchestrator + +""" +TODOs: + There is really room for improvement here. I just quickly assembled theses tests. + I general, everything should be testes in Teuthology as well. Reasons for + also testing this here is the development roundtrip time. +""" + + +def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host): + dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host)) + d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)] + assert d_names + # there should only be one daemon (if not match_glob will throw mismatch) + assert len(d_names) == 1 + + c = cephadm.remove_daemons(d_names) + [out] = wait(cephadm, c) + # picking the 1st element is needed, rather than passing the list when the daemon + # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter + # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e) + # are valid, so pass without incident! Also, match_gob acts on strings anyway! + match_glob(out, f"Removed {d_names[0]}* from host '{host}'") + + +@contextmanager +def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: str): + spec.placement = PlacementSpec(hosts=[host], count=1) + + c = cephadm_module.add_daemon(spec) + [out] = wait(cephadm_module, c) + match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'") + + dds = cephadm_module.cache.get_daemons_by_service(spec.service_name()) + for dd in dds: + if dd.hostname == host: + yield dd.daemon_id + assert_rm_daemon(cephadm_module, spec.service_name(), host) + return + + assert False, 'Daemon not found' + + +@contextmanager +def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None): + cephadm_module.mock_store_set('_ceph_get', 'osd_map', { + 'osds': [ + { + 'osd': 1, + 'up_from': 0, + 'up': True, + 'uuid': 'uuid' + } + ] + }) + + _run_cephadm.reset_mock(return_value=True, side_effect=True) + if ceph_volume_lvm_list: + _run_cephadm.side_effect = ceph_volume_lvm_list + else: + async def _ceph_volume_list(s, host, entity, cmd, **kwargs): + logging.info(f'ceph-volume cmd: {cmd}') + if 'raw' in cmd: + return json.dumps({ + "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": { + "ceph_fsid": cephadm_module._cluster_fsid, + "device": "/dev/loop0", + "osd_id": 21, + "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5", + "type": "bluestore" + }, + }), '', 0 + if 'lvm' in cmd: + return json.dumps({ + str(osd_id): [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }] + }), '', 0 + return '{}', '', 0 + + _run_cephadm.side_effect = _ceph_volume_list + + assert cephadm_module._osd_activate( + [host]).stdout == f"Created osd(s) 1 on host '{host}'" + assert _run_cephadm.mock_calls == [ + mock.call(host, 'osd', 'ceph-volume', + ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, error_ok=False, image='', log_output=True), + mock.call(host, f'osd.{osd_id}', ['_orch', 'deploy'], [], stdin=mock.ANY), + mock.call(host, 'osd', 'ceph-volume', + ['--', 'raw', 'list', '--format', 'json'], no_fsid=False, error_ok=False, image='', log_output=True), + ] + dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host) + assert dd.name() == f'osd.{osd_id}' + yield dd + cephadm_module._remove_daemons([(f'osd.{osd_id}', host)]) + + +class TestCephadm(object): + + def test_get_unique_name(self, cephadm_module): + # type: (CephadmOrchestrator) -> None + existing = [ + DaemonDescription(daemon_type='mon', daemon_id='a') + ] + new_mon = cephadm_module.get_unique_name('mon', 'myhost', existing) + match_glob(new_mon, 'myhost') + new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing) + match_glob(new_mgr, 'myhost.*') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_host(self, cephadm_module): + assert wait(cephadm_module, cephadm_module.get_hosts()) == [] + with with_host(cephadm_module, 'test'): + assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')] + + # Be careful with backward compatibility when changing things here: + assert json.loads(cephadm_module.get_store('inventory')) == \ + {"test": {"hostname": "test", "addr": "1::4", "labels": [], "status": ""}} + + with with_host(cephadm_module, 'second', '1.2.3.5'): + assert wait(cephadm_module, cephadm_module.get_hosts()) == [ + HostSpec('test', '1::4'), + HostSpec('second', '1.2.3.5') + ] + + assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '1::4')] + assert wait(cephadm_module, cephadm_module.get_hosts()) == [] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + @mock.patch("cephadm.utils.resolve_ip") + def test_re_add_host_receive_loopback(self, resolve_ip, cephadm_module): + resolve_ip.side_effect = ['192.168.122.1', '127.0.0.1', '127.0.0.1'] + assert wait(cephadm_module, cephadm_module.get_hosts()) == [] + cephadm_module._add_host(HostSpec('test', '192.168.122.1')) + assert wait(cephadm_module, cephadm_module.get_hosts()) == [ + HostSpec('test', '192.168.122.1')] + cephadm_module._add_host(HostSpec('test')) + assert wait(cephadm_module, cephadm_module.get_hosts()) == [ + HostSpec('test', '192.168.122.1')] + with pytest.raises(OrchestratorError): + cephadm_module._add_host(HostSpec('test2')) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_service_ls(self, cephadm_module): + with with_host(cephadm_module, 'test'): + c = cephadm_module.list_daemons(refresh=True) + assert wait(cephadm_module, c) == [] + with with_service(cephadm_module, MDSSpec('mds', 'name', unmanaged=True)) as _, \ + with_daemon(cephadm_module, MDSSpec('mds', 'name'), 'test') as _: + + c = cephadm_module.list_daemons() + + def remove_id_events(dd): + out = dd.to_json() + del out['daemon_id'] + del out['events'] + del out['daemon_name'] + return out + + assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [ + { + 'service_name': 'mds.name', + 'daemon_type': 'mds', + 'hostname': 'test', + 'status': 2, + 'status_desc': 'starting', + 'is_active': False, + 'ports': [], + } + ] + + with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'), + CephadmOrchestrator.apply_rgw, 'test', status_running=True): + make_daemons_running(cephadm_module, 'mds.name') + + c = cephadm_module.describe_service() + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + expected = [ + { + 'placement': {'count': 2}, + 'service_id': 'name', + 'service_name': 'mds.name', + 'service_type': 'mds', + 'status': {'created': mock.ANY, 'running': 1, 'size': 2}, + 'unmanaged': True + }, + { + 'placement': { + 'count': 1, + 'hosts': ["test"] + }, + 'service_id': 'r.z', + 'service_name': 'rgw.r.z', + 'service_type': 'rgw', + 'status': {'created': mock.ANY, 'running': 1, 'size': 1, + 'ports': [80]}, + } + ] + for o in out: + if 'events' in o: + del o['events'] # delete it, as it contains a timestamp + assert out == expected + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_service_ls_service_type_flag(self, cephadm_module): + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), + CephadmOrchestrator.apply_mgr, '', status_running=True): + with with_service(cephadm_module, MDSSpec('mds', 'test-id', placement=PlacementSpec(count=2)), + CephadmOrchestrator.apply_mds, '', status_running=True): + + # with no service-type. Should provide info fot both services + c = cephadm_module.describe_service() + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + expected = [ + { + 'placement': {'count': 2}, + 'service_name': 'mgr', + 'service_type': 'mgr', + 'status': {'created': mock.ANY, + 'running': 2, + 'size': 2} + }, + { + 'placement': {'count': 2}, + 'service_id': 'test-id', + 'service_name': 'mds.test-id', + 'service_type': 'mds', + 'status': {'created': mock.ANY, + 'running': 2, + 'size': 2} + }, + ] + + for o in out: + if 'events' in o: + del o['events'] # delete it, as it contains a timestamp + assert out == expected + + # with service-type. Should provide info fot only mds + c = cephadm_module.describe_service(service_type='mds') + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + expected = [ + { + 'placement': {'count': 2}, + 'service_id': 'test-id', + 'service_name': 'mds.test-id', + 'service_type': 'mds', + 'status': {'created': mock.ANY, + 'running': 2, + 'size': 2} + }, + ] + + for o in out: + if 'events' in o: + del o['events'] # delete it, as it contains a timestamp + assert out == expected + + # service-type should not match with service names + c = cephadm_module.describe_service(service_type='mds.test-id') + out = [dict(o.to_json()) for o in wait(cephadm_module, c)] + assert out == [] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_device_ls(self, cephadm_module): + with with_host(cephadm_module, 'test'): + c = cephadm_module.get_inventory() + assert wait(cephadm_module, c) == [InventoryHost('test')] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name='rgw.myrgw.foobar', + style='cephadm', + fsid='fsid', + container_id='container_id', + version='version', + state='running', + ), + dict( + name='something.foo.bar', + style='cephadm', + fsid='fsid', + ), + dict( + name='haproxy.test.bar', + style='cephadm', + fsid='fsid', + ), + + ]) + )) + def test_list_daemons(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._refresh_host_daemons('test') + dds = wait(cephadm_module, cephadm_module.list_daemons()) + assert {d.name() for d in dds} == {'rgw.myrgw.foobar', 'haproxy.test.bar'} + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_daemon_action(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \ + with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id: + + d_name = 'rgw.' + daemon_id + + c = cephadm_module.daemon_action('redeploy', d_name) + assert wait(cephadm_module, + c) == f"Scheduled to redeploy rgw.{daemon_id} on host 'test'" + + for what in ('start', 'stop', 'restart'): + c = cephadm_module.daemon_action(what, d_name) + assert wait(cephadm_module, + c) == F"Scheduled to {what} {d_name} on host 'test'" + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module._store['_ceph_get/mon_map'] = { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + } + cephadm_module.notify('mon_map', None) + + CephadmServe(cephadm_module)._check_daemons() + + assert cephadm_module.events.get_for_daemon(d_name) == [ + OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO', + f"Deployed {d_name} on host \'test\'"), + OrchestratorEvent(mock.ANY, 'daemon', d_name, 'INFO', + f"stop {d_name} from host \'test\'"), + ] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator): + cephadm_module.service_cache_timeout = 10 + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \ + with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id: + with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command: + + _ceph_send_command.side_effect = Exception("myerror") + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + cephadm_module.notify('mon_map', None) + + CephadmServe(cephadm_module)._check_daemons() + + evs = [e.message for e in cephadm_module.events.get_for_daemon( + f'rgw.{daemon_id}')] + + assert 'myerror' in ''.join(evs) + + @pytest.mark.parametrize( + "action", + [ + 'start', + 'stop', + 'restart', + 'reconfig', + 'redeploy' + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.module.HostCache.save_host") + def test_daemon_check(self, _save_host, cephadm_module: CephadmOrchestrator, action): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names: + [daemon_name] = d_names + + cephadm_module._schedule_daemon_action(daemon_name, action) + + assert cephadm_module.cache.get_scheduled_daemon_action( + 'test', daemon_name) == action + + CephadmServe(cephadm_module)._check_daemons() + + assert _save_host.called_with('test') + assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + + with with_host(cephadm_module, 'test'): + + # Also testing deploying mons without explicit network placement + cephadm_module.check_mon_command({ + 'prefix': 'config set', + 'who': 'mon', + 'name': 'public_network', + 'value': '127.0.0.0/8' + }) + + cephadm_module.cache.update_host_networks( + 'test', + { + "127.0.0.0/8": [ + "127.0.0.1" + ], + } + ) + + with with_service(cephadm_module, ServiceSpec(service_type='mon'), CephadmOrchestrator.apply_mon, 'test') as d_names: + [daemon_name] = d_names + + cephadm_module._set_extra_ceph_conf('[mon]\nk=v') + + CephadmServe(cephadm_module)._check_daemons() + + _run_cephadm.assert_called_with( + 'test', + 'mon.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "mon.test", + "image": '', + "deploy_arguments": [], + "params": { + 'reconfig': True, + }, + "meta": { + 'service_name': 'mon', + 'ports': [], + 'ip': None, + 'deployed_by': [], + 'rank': None, + 'rank_generation': None, + 'extra_container_args': None, + 'extra_entrypoint_args': None, + }, + "config_blobs": { + "config": "[mon]\nk=v\n[mon.test]\npublic network = 127.0.0.0/8\n", + "keyring": "", + "files": { + "config": "[mon.test]\npublic network = 127.0.0.0/8\n" + }, + }, + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_mon_crush_location_deployment(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + + with with_host(cephadm_module, 'test'): + cephadm_module.check_mon_command({ + 'prefix': 'config set', + 'who': 'mon', + 'name': 'public_network', + 'value': '127.0.0.0/8' + }) + + cephadm_module.cache.update_host_networks( + 'test', + { + "127.0.0.0/8": [ + "127.0.0.1" + ], + } + ) + + with with_service(cephadm_module, ServiceSpec(service_type='mon', crush_locations={'test': ['datacenter=a', 'rack=2']}), CephadmOrchestrator.apply_mon, 'test'): + _run_cephadm.assert_called_with( + 'test', + 'mon.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "mon.test", + "image": '', + "deploy_arguments": [], + "params": {}, + "meta": { + 'service_name': 'mon', + 'ports': [], + 'ip': None, + 'deployed_by': [], + 'rank': None, + 'rank_generation': None, + 'extra_container_args': None, + 'extra_entrypoint_args': None, + }, + "config_blobs": { + "config": "[mon.test]\npublic network = 127.0.0.0/8\n", + "keyring": "", + "files": { + "config": "[mon.test]\npublic network = 127.0.0.0/8\n", + }, + "crush_location": "datacenter=a", + }, + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_extra_container_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='crash', extra_container_args=['--cpus=2', '--quiet']), CephadmOrchestrator.apply_crash): + _run_cephadm.assert_called_with( + 'test', + 'crash.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "crash.test", + "image": '', + "deploy_arguments": [], + "params": { + 'extra_container_args': [ + "--cpus=2", + "--quiet", + ], + }, + "meta": { + 'service_name': 'crash', + 'ports': [], + 'ip': None, + 'deployed_by': [], + 'rank': None, + 'rank_generation': None, + 'extra_container_args': [ + "--cpus=2", + "--quiet", + ], + 'extra_entrypoint_args': None, + }, + "config_blobs": { + "config": "", + "keyring": "[client.crash.test]\nkey = None\n", + }, + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_extra_entrypoint_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='node-exporter', + extra_entrypoint_args=['--collector.textfile.directory=/var/lib/node_exporter/textfile_collector', '--some-other-arg']), + CephadmOrchestrator.apply_node_exporter): + _run_cephadm.assert_called_with( + 'test', + 'node-exporter.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "node-exporter.test", + "image": '', + "deploy_arguments": [], + "params": { + 'tcp_ports': [9100], + 'extra_entrypoint_args': [ + "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector", + "--some-other-arg", + ], + }, + "meta": { + 'service_name': 'node-exporter', + 'ports': [9100], + 'ip': None, + 'deployed_by': [], + 'rank': None, + 'rank_generation': None, + 'extra_container_args': None, + 'extra_entrypoint_args': [ + "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector", + "--some-other-arg", + ], + }, + "config_blobs": {}, + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_extra_entrypoint_and_container_args(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='node-exporter', + extra_entrypoint_args=['--collector.textfile.directory=/var/lib/node_exporter/textfile_collector', '--some-other-arg'], + extra_container_args=['--cpus=2', '--quiet']), + CephadmOrchestrator.apply_node_exporter): + _run_cephadm.assert_called_with( + 'test', + 'node-exporter.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "node-exporter.test", + "image": '', + "deploy_arguments": [], + "params": { + 'tcp_ports': [9100], + 'extra_container_args': [ + "--cpus=2", + "--quiet", + ], + 'extra_entrypoint_args': [ + "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector", + "--some-other-arg", + ], + }, + "meta": { + 'service_name': 'node-exporter', + 'ports': [9100], + 'ip': None, + 'deployed_by': [], + 'rank': None, + 'rank_generation': None, + 'extra_container_args': [ + "--cpus=2", + "--quiet", + ], + 'extra_entrypoint_args': [ + "--collector.textfile.directory=/var/lib/node_exporter/textfile_collector", + "--some-other-arg", + ], + }, + "config_blobs": {}, + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_extra_entrypoint_and_container_args_with_spaces(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='node-exporter', + extra_entrypoint_args=['--entrypoint-arg-with-value value', '--some-other-arg 3'], + extra_container_args=['--cpus 2', '--container-arg-with-value value']), + CephadmOrchestrator.apply_node_exporter): + _run_cephadm.assert_called_with( + 'test', + 'node-exporter.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "node-exporter.test", + "image": '', + "deploy_arguments": [], + "params": { + 'tcp_ports': [9100], + 'extra_container_args': [ + "--cpus", + "2", + "--container-arg-with-value", + "value", + ], + 'extra_entrypoint_args': [ + "--entrypoint-arg-with-value", + "value", + "--some-other-arg", + "3", + ], + }, + "meta": { + 'service_name': 'node-exporter', + 'ports': [9100], + 'ip': None, + 'deployed_by': [], + 'rank': None, + 'rank_generation': None, + 'extra_container_args': [ + "--cpus 2", + "--container-arg-with-value value", + ], + 'extra_entrypoint_args': [ + "--entrypoint-arg-with-value value", + "--some-other-arg 3", + ], + }, + "config_blobs": {}, + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_custom_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + test_cert = ['-----BEGIN PRIVATE KEY-----', + 'YSBhbGlxdXlhbSBlcmF0LCBzZWQgZGlhbSB2b2x1cHR1YS4gQXQgdmVybyBlb3Mg', + 'ZXQgYWNjdXNhbSBldCBqdXN0byBkdW8=', + '-----END PRIVATE KEY-----', + '-----BEGIN CERTIFICATE-----', + 'YSBhbGlxdXlhbSBlcmF0LCBzZWQgZGlhbSB2b2x1cHR1YS4gQXQgdmVybyBlb3Mg', + 'ZXQgYWNjdXNhbSBldCBqdXN0byBkdW8=', + '-----END CERTIFICATE-----'] + configs = [ + CustomConfig(content='something something something', + mount_path='/etc/test.conf'), + CustomConfig(content='\n'.join(test_cert), mount_path='/usr/share/grafana/thing.crt') + ] + tc_joined = '\n'.join(test_cert) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='crash', custom_configs=configs), CephadmOrchestrator.apply_crash): + _run_cephadm( + 'test', + 'crash.test', + ['_orch', 'deploy'], + [], + stdin=json.dumps({ + "fsid": "fsid", + "name": "crash.test", + "image": "", + "deploy_arguments": [], + "params": {}, + "meta": { + "service_name": "crash", + "ports": [], + "ip": None, + "deployed_by": [], + "rank": None, + "rank_generation": None, + "extra_container_args": None, + "extra_entrypoint_args": None, + }, + "config_blobs": { + "config": "", + "keyring": "[client.crash.test]\nkey = None\n", + "custom_config_files": [ + { + "content": "something something something", + "mount_path": "/etc/test.conf", + }, + { + "content": tc_joined, + "mount_path": "/usr/share/grafana/thing.crt", + }, + ] + } + }), + ) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'): + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + cephadm_module.notify('mon_map', None) + cephadm_module.mock_store_set('_ceph_get', 'mgr_map', { + 'modules': ['dashboard'] + }) + + with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd: + CephadmServe(cephadm_module)._check_daemons() + _mon_cmd.assert_any_call( + {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://[1::4]:3000'}, + None) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1.2.3.4') + def test_iscsi_post_actions_with_missing_daemon_in_cache(self, cephadm_module: CephadmOrchestrator): + # https://tracker.ceph.com/issues/52866 + with with_host(cephadm_module, 'test1'): + with with_host(cephadm_module, 'test2'): + with with_service(cephadm_module, IscsiServiceSpec(service_id='foobar', pool='pool', placement=PlacementSpec(host_pattern='*')), CephadmOrchestrator.apply_iscsi, 'test'): + + CephadmServe(cephadm_module)._apply_all_services() + assert len(cephadm_module.cache.get_daemons_by_type('iscsi')) == 2 + + # get a daemons from postaction list (ARRGH sets!!) + tempset = cephadm_module.requires_post_actions.copy() + tempdaemon1 = tempset.pop() + tempdaemon2 = tempset.pop() + + # make sure post actions has 2 daemons in it + assert len(cephadm_module.requires_post_actions) == 2 + + # replicate a host cache that is not in sync when check_daemons is called + tempdd1 = cephadm_module.cache.get_daemon(tempdaemon1) + tempdd2 = cephadm_module.cache.get_daemon(tempdaemon2) + host = 'test1' + if 'test1' not in tempdaemon1: + host = 'test2' + cephadm_module.cache.rm_daemon(host, tempdaemon1) + + # Make sure, _check_daemons does a redeploy due to monmap change: + cephadm_module.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + cephadm_module.notify('mon_map', None) + cephadm_module.mock_store_set('_ceph_get', 'mgr_map', { + 'modules': ['dashboard'] + }) + + with mock.patch("cephadm.module.IscsiService.config_dashboard") as _cfg_db: + CephadmServe(cephadm_module)._check_daemons() + _cfg_db.assert_called_once_with([tempdd2]) + + # post actions still has the other daemon in it and will run next _check_daemons + assert len(cephadm_module.requires_post_actions) == 1 + + # post actions was missed for a daemon + assert tempdaemon1 in cephadm_module.requires_post_actions + + # put the daemon back in the cache + cephadm_module.cache.add_daemon(host, tempdd1) + + _cfg_db.reset_mock() + # replicate serve loop running again + CephadmServe(cephadm_module)._check_daemons() + + # post actions should have been called again + _cfg_db.asset_called() + + # post actions is now empty + assert len(cephadm_module.requires_post_actions) == 0 + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_mon_add(self, cephadm_module): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)): + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps)) + assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"] + + with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"): + ps = PlacementSpec(hosts=['test'], count=1) + c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps)) + wait(cephadm_module, c) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_mgr_update(self, cephadm_module): + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) + assert r + + assert_rm_daemon(cephadm_module, 'mgr.a', 'test') + + @mock.patch("cephadm.module.CephadmOrchestrator.mon_command") + def test_find_destroyed_osds(self, _mon_cmd, cephadm_module): + dict_out = { + "nodes": [ + { + "id": -1, + "name": "default", + "type": "root", + "type_id": 11, + "children": [ + -3 + ] + }, + { + "id": -3, + "name": "host1", + "type": "host", + "type_id": 1, + "pool_weights": {}, + "children": [ + 0 + ] + }, + { + "id": 0, + "device_class": "hdd", + "name": "osd.0", + "type": "osd", + "type_id": 0, + "crush_weight": 0.0243988037109375, + "depth": 2, + "pool_weights": {}, + "exists": 1, + "status": "destroyed", + "reweight": 1, + "primary_affinity": 1 + } + ], + "stray": [] + } + json_out = json.dumps(dict_out) + _mon_cmd.return_value = (0, json_out, '') + osd_claims = OsdIdClaims(cephadm_module) + assert osd_claims.get() == {'host1': ['0']} + assert osd_claims.filtered_by_host('host1') == ['0'] + assert osd_claims.filtered_by_host('host1.domain.com') == ['0'] + + @ pytest.mark.parametrize( + "ceph_services, cephadm_daemons, strays_expected, metadata", + # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ] + [ + ( + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [], + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + {}, + ), + ( + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [], + {}, + ), + ( + [('mds', 'a'), ('osd', '0'), ('mgr', 'x')], + [('mds', 'a'), ('osd', '0')], + [('mgr', 'x')], + {}, + ), + # https://tracker.ceph.com/issues/49573 + ( + [('rgw-nfs', '14649')], + [], + [('nfs', 'foo-rgw.host1')], + {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}}, + ), + ( + [('rgw-nfs', '14649'), ('rgw-nfs', '14650')], + [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')], + [], + {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}}, + ), + ( + [('rgw-nfs', '14649'), ('rgw-nfs', '14650')], + [('nfs', 'foo-rgw.host1')], + [('nfs', 'foo2.host2')], + {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}}, + ), + ] + ) + def test_check_for_stray_daemons( + self, + cephadm_module, + ceph_services, + cephadm_daemons, + strays_expected, + metadata + ): + # mock ceph service-map + services = [] + for service in ceph_services: + s = {'type': service[0], 'id': service[1]} + services.append(s) + ls = [{'hostname': 'host1', 'services': services}] + + with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers: + list_servers.return_value = ls + list_servers.__iter__.side_effect = ls.__iter__ + + # populate cephadm daemon cache + dm = {} + for daemon_type, daemon_id in cephadm_daemons: + dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id) + dm[dd.name()] = dd + cephadm_module.cache.update_host_daemons('host1', dm) + + def get_metadata_mock(svc_type, svc_id, default): + return metadata[svc_id] + + with mock.patch.object(cephadm_module, 'get_metadata', new_callable=lambda: get_metadata_mock): + + # test + CephadmServe(cephadm_module)._check_for_strays() + + # verify + strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON') + if not strays: + assert len(strays_expected) == 0 + else: + for dt, di in strays_expected: + name = '%s.%s' % (dt, di) + for detail in strays['detail']: + if name in detail: + strays['detail'].remove(detail) + break + assert name in detail + assert len(strays['detail']) == 0 + assert strays['count'] == len(strays_expected) + + @mock.patch("cephadm.module.CephadmOrchestrator.mon_command") + def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module): + _mon_cmd.return_value = (1, "", "fail_msg") + with pytest.raises(OrchestratorError): + OsdIdClaims(cephadm_module) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + + spec = DriveGroupSpec( + service_id='foo', + placement=PlacementSpec( + host_pattern='*', + ), + data_devices=DeviceSelection( + all=True + ) + ) + + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == ['Scheduled osd.foo update...'] + + inventory = Devices([ + Device( + '/dev/sdb', + available=True + ), + ]) + + cephadm_module.cache.update_host_devices('test', inventory.devices) + + _run_cephadm.side_effect = async_side_effect((['{}'], '', 0)) + + assert CephadmServe(cephadm_module)._apply_all_services() is False + + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', + ['--config-json', '-', '--', 'lvm', 'batch', + '--no-auto', '/dev/sdb', '--yes', '--no-systemd'], + env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, + stdin='{"config": "", "keyring": ""}') + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True) + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + + spec = DriveGroupSpec( + service_id='noncollocated', + placement=PlacementSpec( + hosts=['test'] + ), + data_devices=DeviceSelection(paths=['/dev/sdb']), + db_devices=DeviceSelection(paths=['/dev/sdc']), + wal_devices=DeviceSelection(paths=['/dev/sdd']) + ) + + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...'] + + inventory = Devices([ + Device('/dev/sdb', available=True), + Device('/dev/sdc', available=True), + Device('/dev/sdd', available=True) + ]) + + cephadm_module.cache.update_host_devices('test', inventory.devices) + + _run_cephadm.side_effect = async_side_effect((['{}'], '', 0)) + + assert CephadmServe(cephadm_module)._apply_all_services() is False + + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', + ['--config-json', '-', '--', 'lvm', 'batch', + '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc', + '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'], + env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'], + error_ok=True, stdin='{"config": "", "keyring": ""}') + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True) + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', ['--', 'raw', 'list', '--format', 'json'], image='', no_fsid=False, error_ok=False, log_output=True) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.module.SpecStore.save") + def test_apply_osd_save_placement(self, _save_spec, cephadm_module): + with with_host(cephadm_module, 'test'): + json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, + 'service_id': 'foo', 'data_devices': {'all': True}} + spec = ServiceSpec.from_json(json_spec) + assert isinstance(spec, DriveGroupSpec) + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == ['Scheduled osd.foo update...'] + _save_spec.assert_called_with(spec) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_create_osds(self, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(paths=[''])) + c = cephadm_module.create_osds(dg) + out = wait(cephadm_module, c) + assert out == "Created no osd(s) on host test; already created?" + bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_host'), + data_devices=DeviceSelection(paths=[''])) + c = cephadm_module.create_osds(bad_dg) + out = wait(cephadm_module, c) + assert "Invalid 'host:device' spec: host not found in cluster" in out + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_create_noncollocated_osd(self, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(paths=[''])) + c = cephadm_module.create_osds(dg) + out = wait(cephadm_module, c) + assert out == "Created no osd(s) on host test; already created?" + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch('cephadm.services.osd.OSDService._run_ceph_volume_command') + @mock.patch('cephadm.services.osd.OSDService.driveselection_to_ceph_volume') + @mock.patch('cephadm.services.osd.OsdIdClaims.refresh', lambda _: None) + @mock.patch('cephadm.services.osd.OsdIdClaims.get', lambda _: {}) + def test_limit_not_reached(self, d_to_cv, _run_cv_cmd, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(limit=5, rotational=1), + service_id='not_enough') + + disks_found = [ + '[{"data": "/dev/vdb", "data_size": "50.00 GB", "encryption": "None"}, {"data": "/dev/vdc", "data_size": "50.00 GB", "encryption": "None"}]'] + d_to_cv.return_value = 'foo' + _run_cv_cmd.side_effect = async_side_effect((disks_found, '', 0)) + preview = cephadm_module.osd_service.generate_previews([dg], 'test') + + for osd in preview: + assert 'notes' in osd + assert osd['notes'] == [ + 'NOTE: Did not find enough disks matching filter on host test to reach data device limit (Found: 2 | Limit: 5)'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_prepare_drivegroup(self, cephadm_module): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), + data_devices=DeviceSelection(paths=[''])) + out = cephadm_module.osd_service.prepare_drivegroup(dg) + assert len(out) == 1 + f1 = out[0] + assert f1[0] == 'test' + assert isinstance(f1[1], DriveSelection) + + @pytest.mark.parametrize( + "devices, preview, exp_commands", + [ + # no preview and only one disk, prepare is used due the hack that is in place. + (['/dev/sda'], False, ["lvm batch --no-auto /dev/sda --yes --no-systemd"]), + # no preview and multiple disks, uses batch + (['/dev/sda', '/dev/sdb'], False, + ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"]), + # preview and only one disk needs to use batch again to generate the preview + (['/dev/sda'], True, ["lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"]), + # preview and multiple disks work the same + (['/dev/sda', '/dev/sdb'], True, + ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"]), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec( + host_pattern='test'), data_devices=DeviceSelection(paths=devices)) + ds = DriveSelection(dg, Devices([Device(path) for path in devices])) + preview = preview + out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview) + assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) + for cmd in out), f'Expected cmds from f{out} in {exp_commands}' + + @pytest.mark.parametrize( + "devices, preview, exp_commands", + [ + # one data device, no preview + (['/dev/sda'], False, ["raw prepare --bluestore --data /dev/sda"]), + # multiple data devices, no preview + (['/dev/sda', '/dev/sdb'], False, + ["raw prepare --bluestore --data /dev/sda", "raw prepare --bluestore --data /dev/sdb"]), + # one data device, preview + (['/dev/sda'], True, ["raw prepare --bluestore --data /dev/sda --report --format json"]), + # multiple data devices, preview + (['/dev/sda', '/dev/sdb'], True, + ["raw prepare --bluestore --data /dev/sda --report --format json", "raw prepare --bluestore --data /dev/sdb --report --format json"]), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_raw_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands): + with with_host(cephadm_module, 'test'): + dg = DriveGroupSpec(service_id='test.spec', method='raw', placement=PlacementSpec( + host_pattern='test'), data_devices=DeviceSelection(paths=devices)) + ds = DriveSelection(dg, Devices([Device(path) for path in devices])) + preview = preview + out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview) + assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) + for cmd in out), f'Expected cmds from f{out} in {exp_commands}' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name='osd.0', + style='cephadm', + fsid='fsid', + container_id='container_id', + version='version', + state='running', + ) + ]) + )) + @mock.patch("cephadm.services.osd.OSD.exists", True) + @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0) + def test_remove_osds(self, cephadm_module): + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._refresh_host_daemons('test') + c = cephadm_module.list_daemons() + wait(cephadm_module, c) + + c = cephadm_module.remove_daemons(['osd.0']) + out = wait(cephadm_module, c) + assert out == ["Removed osd.0 from host 'test'"] + + cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0, + replace=False, + force=False, + hostname='test', + process_started_at=datetime_now(), + remove_util=cephadm_module.to_remove_osds.rm_util + )) + cephadm_module.to_remove_osds.process_removal_queue() + assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module) + + c = cephadm_module.remove_osds_status() + out = wait(cephadm_module, c) + assert out == [] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_rgw_update(self, cephadm_module): + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service(cephadm_module, RGWSpec(service_id="foo", unmanaged=True)): + ps = PlacementSpec(hosts=['host1'], count=1) + c = cephadm_module.add_daemon( + RGWSpec(service_id="foo", placement=ps)) + [out] = wait(cephadm_module, c) + match_glob(out, "Deployed rgw.foo.* on host 'host1'") + + ps = PlacementSpec(hosts=['host1', 'host2'], count=2) + r = CephadmServe(cephadm_module)._apply_service( + RGWSpec(service_id="foo", placement=ps)) + assert r + + assert_rm_daemon(cephadm_module, 'rgw.foo', 'host1') + assert_rm_daemon(cephadm_module, 'rgw.foo', 'host2') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm( + json.dumps([ + dict( + name='rgw.myrgw.myhost.myid', + style='cephadm', + fsid='fsid', + container_id='container_id', + version='version', + state='running', + ) + ]) + )) + def test_remove_daemon(self, cephadm_module): + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._refresh_host_daemons('test') + c = cephadm_module.list_daemons() + wait(cephadm_module, c) + c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid']) + out = wait(cephadm_module, c) + assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription + with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2: # type: DaemonDescription + CephadmServe(cephadm_module)._check_for_moved_osds() + # both are in status "starting" + assert len(cephadm_module.cache.get_daemons()) == 2 + + dd1.status = DaemonDescriptionStatus.running + dd2.status = DaemonDescriptionStatus.error + cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1}) + cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2}) + CephadmServe(cephadm_module)._check_for_moved_osds() + assert len(cephadm_module.cache.get_daemons()) == 1 + + assert cephadm_module.events.get_for_daemon('osd.1') == [ + OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO', + "Deployed osd.1 on host 'host1'"), + OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO', + "Deployed osd.1 on host 'host2'"), + OrchestratorEvent(mock.ANY, 'daemon', 'osd.1', 'INFO', + "Removed duplicated daemon on host 'host2'"), + ] + + with pytest.raises(AssertionError): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': 'osd.1', + }) + + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': 'osd.1', + }) + + @pytest.mark.parametrize( + "spec", + [ + ServiceSpec('crash'), + ServiceSpec('prometheus'), + ServiceSpec('grafana'), + ServiceSpec('node-exporter'), + ServiceSpec('alertmanager'), + ServiceSpec('rbd-mirror'), + ServiceSpec('cephfs-mirror'), + ServiceSpec('mds', service_id='fsname'), + RGWSpec(rgw_realm='realm', rgw_zone='zone'), + RGWSpec(service_id="foo"), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_daemon_add(self, spec: ServiceSpec, cephadm_module): + unmanaged_spec = ServiceSpec.from_json(spec.to_json()) + unmanaged_spec.unmanaged = True + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, unmanaged_spec): + with with_daemon(cephadm_module, spec, 'test'): + pass + + @pytest.mark.parametrize( + "entity,success,spec", + [ + ('mgr.x', True, ServiceSpec( + service_type='mgr', + placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ('client.rgw.x', True, ServiceSpec( + service_type='rgw', + service_id='id', + placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ('client.nfs.x', True, ServiceSpec( + service_type='nfs', + service_id='id', + placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ('mon.', False, ServiceSpec( + service_type='mon', + placement=PlacementSpec( + hosts=[HostPlacementSpec('test', '127.0.0.0/24', 'x')], count=1), + unmanaged=True) + ), # noqa: E124 + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) + def test_daemon_add_fail(self, _run_cephadm, entity, success, spec, cephadm_module): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec): + _run_cephadm.side_effect = OrchestratorError('fail') + with pytest.raises(OrchestratorError): + wait(cephadm_module, cephadm_module.add_daemon(spec)) + if success: + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + else: + with pytest.raises(AssertionError): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': entity, + }) + assert cephadm_module.events.get_for_service(spec.service_name()) == [ + OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'INFO', + "service was created"), + OrchestratorEvent(mock.ANY, 'service', spec.service_name(), 'ERROR', + "fail"), + ] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_daemon_place_fail_health_warning(self, _run_cephadm, cephadm_module): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + _run_cephadm.side_effect = OrchestratorError('fail') + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) + assert not r + assert cephadm_module.health_checks.get('CEPHADM_DAEMON_PLACE_FAIL') is not None + assert cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['count'] == 1 + assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks[ + 'CEPHADM_DAEMON_PLACE_FAIL']['summary'] + assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks[ + 'CEPHADM_DAEMON_PLACE_FAIL']['detail'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + CephadmServe(cephadm_module)._apply_all_services() + ps = PlacementSpec(hosts=['fail'], count=1) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) + assert not r + assert cephadm_module.apply_spec_fails + assert cephadm_module.health_checks.get('CEPHADM_APPLY_SPEC_FAIL') is not None + assert cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['count'] == 1 + assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks[ + 'CEPHADM_APPLY_SPEC_FAIL']['summary'] + + @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.HostCache.save_host_devices") + def test_invalid_config_option_health_warning(self, _save_devs, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator): + _save_devs.return_value = None + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) + get_foreign_ceph_option.side_effect = KeyError + CephadmServe(cephadm_module)._apply_service_config( + ServiceSpec('mgr', placement=ps, config={'test': 'foo'})) + assert cephadm_module.health_checks.get('CEPHADM_INVALID_CONFIG_OPTION') is not None + assert cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['count'] == 1 + assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks[ + 'CEPHADM_INVALID_CONFIG_OPTION']['summary'] + assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[ + 'CEPHADM_INVALID_CONFIG_OPTION']['detail'] + + @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.CephadmOrchestrator.set_store") + def test_save_devices(self, _set_store, _run_cephadm, _get_foreign_ceph_option, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + entry_size = 65536 # default 64k size + _get_foreign_ceph_option.return_value = entry_size + + class FakeDev(): + def __init__(self, c: str = 'a'): + # using 1015 here makes the serialized string exactly 1024 bytes if c is one char + self.content = {c: c * 1015} + self.path = 'dev/vdc' + + def to_json(self): + return self.content + + def from_json(self, stuff): + return json.loads(stuff) + + def byte_len(s): + return len(s.encode('utf-8')) + + with with_host(cephadm_module, 'test'): + fake_devices = [FakeDev()] * 100 # should be ~100k + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 34], 'entries': 3})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 34]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 32]})), + ] + _set_store.assert_has_calls(expected_calls) + + fake_devices = [FakeDev()] * 300 # should be ~300k + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size * 4 + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 5 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50], 'entries': 6})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.3', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.4', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.5', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + ] + _set_store.assert_has_calls(expected_calls) + + fake_devices = [FakeDev()] * 62 # should be ~62k, just under cache size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 62], 'entries': 1})), + ] + _set_store.assert_has_calls(expected_calls) + + # should be ~64k but just over so it requires more entries + fake_devices = [FakeDev()] * 64 + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 22], 'entries': 3})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 22]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 20]})), + ] + _set_store.assert_has_calls(expected_calls) + + # test for actual content being correct using differing devices + entry_size = 3072 + _get_foreign_ceph_option.return_value = entry_size + fake_devices = [FakeDev('a'), FakeDev('b'), FakeDev('c'), FakeDev('d'), FakeDev('e')] + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev('a'), FakeDev('b')]], 'entries': 3})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev('c'), FakeDev('d')]]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev('e')]]})), + ] + _set_store.assert_has_calls(expected_calls) + + @mock.patch("cephadm.module.CephadmOrchestrator.get_store") + def test_load_devices(self, _get_store, cephadm_module: CephadmOrchestrator): + def _fake_store(key): + if key == 'host.test.devices.0': + return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 9], 'entries': 3}) + elif key == 'host.test.devices.1': + return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 7]}) + elif key == 'host.test.devices.2': + return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 4]}) + else: + raise Exception(f'Get store with unexpected value {key}') + + _get_store.side_effect = _fake_store + devs = cephadm_module.cache.load_host_devices('test') + assert devs == [Device('/path')] * 20 + + @mock.patch("cephadm.module.Inventory.__contains__") + def test_check_stray_host_cache_entry(self, _contains, cephadm_module: CephadmOrchestrator): + def _fake_inv(key): + if key in ['host1', 'node02', 'host.something.com']: + return True + return False + + _contains.side_effect = _fake_inv + assert cephadm_module.cache._get_host_cache_entry_status('host1') == HostCacheStatus.host + assert cephadm_module.cache._get_host_cache_entry_status( + 'host.something.com') == HostCacheStatus.host + assert cephadm_module.cache._get_host_cache_entry_status( + 'node02.devices.37') == HostCacheStatus.devices + assert cephadm_module.cache._get_host_cache_entry_status( + 'host.something.com.devices.0') == HostCacheStatus.devices + assert cephadm_module.cache._get_host_cache_entry_status('hostXXX') == HostCacheStatus.stray + assert cephadm_module.cache._get_host_cache_entry_status( + 'host.nothing.com') == HostCacheStatus.stray + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) + def test_nfs(self, cephadm_module): + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test'], count=1) + spec = NFSServiceSpec( + service_id='name', + placement=ps) + unmanaged_spec = ServiceSpec.from_json(spec.to_json()) + unmanaged_spec.unmanaged = True + with with_service(cephadm_module, unmanaged_spec): + c = cephadm_module.add_daemon(spec) + [out] = wait(cephadm_module, c) + match_glob(out, "Deployed nfs.name.* on host 'test'") + + assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("subprocess.run", None) + @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock()) + @mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4') + def test_iscsi(self, cephadm_module): + with with_host(cephadm_module, 'test'): + ps = PlacementSpec(hosts=['test'], count=1) + spec = IscsiServiceSpec( + service_id='name', + pool='pool', + api_user='user', + api_password='password', + placement=ps) + unmanaged_spec = ServiceSpec.from_json(spec.to_json()) + unmanaged_spec.unmanaged = True + with with_service(cephadm_module, unmanaged_spec): + + c = cephadm_module.add_daemon(spec) + [out] = wait(cephadm_module, c) + match_glob(out, "Deployed iscsi.name.* on host 'test'") + + assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test') + + @pytest.mark.parametrize( + "on_bool", + [ + True, + False + ] + ) + @pytest.mark.parametrize( + "fault_ident", + [ + 'fault', + 'ident' + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')]) + on_off = 'on' if on_bool else 'off' + assert wait(cephadm_module, c) == [f'Set {fault_ident} light for test: {on_off}'] + _run_cephadm.assert_called_with('test', 'osd', 'shell', [ + '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_blink_device_light_custom(self, _run_cephadm, cephadm_module): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + cephadm_module.set_store('blink_device_light_cmd', 'echo hello') + c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')]) + assert wait(cephadm_module, c) == ['Set ident light for test: on'] + _run_cephadm.assert_called_with('test', 'osd', 'shell', [ + '--', 'echo', 'hello'], error_ok=True) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'mgr0'): + cephadm_module.set_store('mgr0/blink_device_light_cmd', + 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'') + c = cephadm_module.blink_device_light( + 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')]) + assert wait(cephadm_module, c) == [ + 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on'] + _run_cephadm.assert_called_with('mgr0', 'osd', 'shell', [ + '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784' + ], error_ok=True) + + @pytest.mark.parametrize( + "spec, meth", + [ + (ServiceSpec('mgr'), CephadmOrchestrator.apply_mgr), + (ServiceSpec('crash'), CephadmOrchestrator.apply_crash), + (ServiceSpec('prometheus'), CephadmOrchestrator.apply_prometheus), + (ServiceSpec('grafana'), CephadmOrchestrator.apply_grafana), + (ServiceSpec('node-exporter'), CephadmOrchestrator.apply_node_exporter), + (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager), + (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror), + (ServiceSpec('cephfs-mirror'), CephadmOrchestrator.apply_rbd_mirror), + (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds), + (ServiceSpec( + 'mds', service_id='fsname', + placement=PlacementSpec( + hosts=[HostPlacementSpec( + hostname='test', + name='fsname', + network='' + )] + ) + ), CephadmOrchestrator.apply_mds), + (RGWSpec(service_id='foo'), CephadmOrchestrator.apply_rgw), + (RGWSpec( + service_id='bar', + rgw_realm='realm', rgw_zone='zone', + placement=PlacementSpec( + hosts=[HostPlacementSpec( + hostname='test', + name='bar', + network='' + )] + ) + ), CephadmOrchestrator.apply_rgw), + (NFSServiceSpec( + service_id='name', + ), CephadmOrchestrator.apply_nfs), + (IscsiServiceSpec( + service_id='name', + pool='pool', + api_user='user', + api_password='password' + ), CephadmOrchestrator.apply_iscsi), + (CustomContainerSpec( + service_id='hello-world', + image='docker.io/library/hello-world:latest', + uid=65534, + gid=65534, + dirs=['foo/bar'], + files={ + 'foo/bar/xyz.conf': 'aaa\nbbb' + }, + bind_mounts=[[ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ]], + volume_mounts={ + 'foo/bar': '/foo/bar:Z' + }, + args=['--no-healthcheck'], + envs=['SECRET=password'], + ports=[8080, 8443] + ), CephadmOrchestrator.apply_container), + ] + ) + @mock.patch("subprocess.run", None) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) + @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) + @mock.patch("subprocess.run", mock.MagicMock()) + def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec, meth, 'test'): + pass + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator): + spec = MDSSpec('mds', service_id='fsname', config={'test': 'foo'}) + with with_host(cephadm_module, 'test'): + with with_service(cephadm_module, spec, host='test'): + ret, out, err = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': spec.service_name(), + 'key': 'mds_join_fs', + }) + assert out == 'fsname' + ret, out, err = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': spec.service_name(), + 'key': 'mds_join_fs', + }) + assert not out + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop") + def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator): + spec = MDSSpec( + 'mds', + service_id='fsname', + placement=PlacementSpec(hosts=['host1', 'host2']), + config={'test': 'foo'} + ) + with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'): + c = cephadm_module.apply_mds(spec) + out = wait(cephadm_module, c) + match_glob(out, "Scheduled mds.fsname update...") + CephadmServe(cephadm_module)._apply_all_services() + + [daemon] = cephadm_module.cache.daemons['host1'].keys() + + spec.placement.set_hosts(['host2']) + + ok_to_stop.side_effect = False + + c = cephadm_module.apply_mds(spec) + out = wait(cephadm_module, c) + match_glob(out, "Scheduled mds.fsname update...") + CephadmServe(cephadm_module)._apply_all_services() + + ok_to_stop.assert_called_with([daemon[4:]], force=True) + + assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop + assert_rm_daemon(cephadm_module, spec.service_name(), 'host2') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_dont_touch_offline_or_maintenance_host_daemons(self, cephadm_module): + # test daemons on offline/maint hosts not removed when applying specs + # test daemons not added to hosts in maint/offline state + with with_host(cephadm_module, 'test1'): + with with_host(cephadm_module, 'test2'): + with with_host(cephadm_module, 'test3'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*'))): + # should get a mgr on all 3 hosts + # CephadmServe(cephadm_module)._apply_all_services() + assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3 + + # put one host in offline state and one host in maintenance state + cephadm_module.offline_hosts = {'test2'} + cephadm_module.inventory._inventory['test3']['status'] = 'maintenance' + cephadm_module.inventory.save() + + # being in offline/maint mode should disqualify hosts from being + # candidates for scheduling + assert cephadm_module.cache.is_host_schedulable('test2') + assert cephadm_module.cache.is_host_schedulable('test3') + + assert cephadm_module.cache.is_host_unreachable('test2') + assert cephadm_module.cache.is_host_unreachable('test3') + + with with_service(cephadm_module, ServiceSpec('crash', placement=PlacementSpec(host_pattern='*'))): + # re-apply services. No mgr should be removed from maint/offline hosts + # crash daemon should only be on host not in maint/offline mode + CephadmServe(cephadm_module)._apply_all_services() + assert len(cephadm_module.cache.get_daemons_by_type('mgr')) == 3 + assert len(cephadm_module.cache.get_daemons_by_type('crash')) == 1 + + cephadm_module.offline_hosts = {} + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_enter_success(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.side_effect = async_side_effect( + ([''], ['something\nsuccess - systemd target xxx disabled'], 0)) + _host_ok.return_value = 0, 'it is okay' + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname)) + # should not raise an error + retval = cephadm_module.enter_host_maintenance(hostname) + assert retval.result_str().startswith('Daemons for Ceph cluster') + assert not retval.exception_str + assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_enter_failure(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.side_effect = async_side_effect( + ([''], ['something\nfailed - disable the target'], 0)) + _host_ok.return_value = 0, 'it is okay' + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname)) + + with pytest.raises(OrchestratorError, match='Failed to place host1 into maintenance for cluster fsid'): + cephadm_module.enter_host_maintenance(hostname) + + assert not cephadm_module.inventory._inventory[hostname]['status'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.CephadmOrchestrator._host_ok_to_stop") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_enter_i_really_mean_it(self, _hosts, _get_daemon_types, _host_ok, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + err_str = 'some kind of error' + _run_cephadm.side_effect = async_side_effect( + ([''], ['something\nfailed - disable the target'], 0)) + _host_ok.return_value = 1, err_str + _get_daemon_types.return_value = ['mon'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname)) + + with pytest.raises(OrchestratorError, match=err_str): + cephadm_module.enter_host_maintenance(hostname) + assert not cephadm_module.inventory._inventory[hostname]['status'] + + with pytest.raises(OrchestratorError, match=err_str): + cephadm_module.enter_host_maintenance(hostname, force=True) + assert not cephadm_module.inventory._inventory[hostname]['status'] + + retval = cephadm_module.enter_host_maintenance(hostname, force=True, yes_i_really_mean_it=True) + assert retval.result_str().startswith('Daemons for Ceph cluster') + assert not retval.exception_str + assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_exit_success(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.side_effect = async_side_effect(([''], [ + 'something\nsuccess - systemd target xxx enabled and started'], 0)) + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance')) + # should not raise an error + retval = cephadm_module.exit_host_maintenance(hostname) + assert retval.result_str().startswith('Ceph cluster') + assert not retval.exception_str + assert not cephadm_module.inventory._inventory[hostname]['status'] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.HostCache.get_daemon_types") + @mock.patch("cephadm.module.HostCache.get_hosts") + def test_maintenance_exit_failure(self, _hosts, _get_daemon_types, _run_cephadm, cephadm_module: CephadmOrchestrator): + hostname = 'host1' + _run_cephadm.side_effect = async_side_effect( + ([''], ['something\nfailed - unable to enable the target'], 0)) + _get_daemon_types.return_value = ['crash'] + _hosts.return_value = [hostname, 'other_host'] + cephadm_module.inventory.add_host(HostSpec(hostname, status='maintenance')) + + with pytest.raises(OrchestratorError, match='Failed to exit maintenance state for host host1, cluster fsid'): + cephadm_module.exit_host_maintenance(hostname) + + assert cephadm_module.inventory._inventory[hostname]['status'] == 'maintenance' + + @mock.patch("cephadm.ssh.SSHManager._remote_connection") + @mock.patch("cephadm.ssh.SSHManager._execute_command") + @mock.patch("cephadm.ssh.SSHManager._check_execute_command") + @mock.patch("cephadm.ssh.SSHManager._write_remote_file") + def test_etc_ceph(self, _write_file, check_execute_command, execute_command, remote_connection, cephadm_module): + _write_file.side_effect = async_side_effect(None) + check_execute_command.side_effect = async_side_effect('') + execute_command.side_effect = async_side_effect(('{}', '', 0)) + remote_connection.side_effect = async_side_effect(mock.Mock()) + + assert cephadm_module.manage_etc_ceph_ceph_conf is False + + with with_host(cephadm_module, 'test'): + assert '/etc/ceph/ceph.conf' not in cephadm_module.cache.get_host_client_files('test') + + with with_host(cephadm_module, 'test'): + cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True) + cephadm_module.config_notify() + assert cephadm_module.manage_etc_ceph_ceph_conf is True + + CephadmServe(cephadm_module)._write_all_client_files() + # Make sure both ceph conf locations (default and per fsid) are called + _write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'', + 0o644, 0, 0, None), + mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'', + 0o644, 0, 0, None)] + ) + ceph_conf_files = cephadm_module.cache.get_host_client_files('test') + assert len(ceph_conf_files) == 2 + assert '/etc/ceph/ceph.conf' in ceph_conf_files + assert '/var/lib/ceph/fsid/config/ceph.conf' in ceph_conf_files + + # set extra config and expect that we deploy another ceph.conf + cephadm_module._set_extra_ceph_conf('[mon]\nk=v') + CephadmServe(cephadm_module)._write_all_client_files() + _write_file.assert_has_calls([mock.call('test', + '/etc/ceph/ceph.conf', + b'[mon]\nk=v\n', 0o644, 0, 0, None), + mock.call('test', + '/var/lib/ceph/fsid/config/ceph.conf', + b'[mon]\nk=v\n', 0o644, 0, 0, None)]) + # reload + cephadm_module.cache.last_client_files = {} + cephadm_module.cache.load() + + ceph_conf_files = cephadm_module.cache.get_host_client_files('test') + assert len(ceph_conf_files) == 2 + assert '/etc/ceph/ceph.conf' in ceph_conf_files + assert '/var/lib/ceph/fsid/config/ceph.conf' in ceph_conf_files + + # Make sure, _check_daemons does a redeploy due to monmap change: + f1_before_digest = cephadm_module.cache.get_host_client_files('test')[ + '/etc/ceph/ceph.conf'][0] + f2_before_digest = cephadm_module.cache.get_host_client_files( + 'test')['/var/lib/ceph/fsid/config/ceph.conf'][0] + cephadm_module._set_extra_ceph_conf('[mon]\nk2=v2') + CephadmServe(cephadm_module)._write_all_client_files() + f1_after_digest = cephadm_module.cache.get_host_client_files('test')[ + '/etc/ceph/ceph.conf'][0] + f2_after_digest = cephadm_module.cache.get_host_client_files( + 'test')['/var/lib/ceph/fsid/config/ceph.conf'][0] + assert f1_before_digest != f1_after_digest + assert f2_before_digest != f2_after_digest + + @mock.patch("cephadm.inventory.HostCache.get_host_client_files") + def test_dont_write_client_files_to_unreachable_hosts(self, _get_client_files, cephadm_module): + cephadm_module.inventory.add_host(HostSpec('host1', '1.2.3.1')) # online + cephadm_module.inventory.add_host(HostSpec('host2', '1.2.3.2')) # maintenance + cephadm_module.inventory.add_host(HostSpec('host3', '1.2.3.3')) # offline + + # mark host2 as maintenance and host3 as offline + cephadm_module.inventory._inventory['host2']['status'] = 'maintenance' + cephadm_module.offline_hosts.add('host3') + + # verify host2 and host3 are correctly marked as unreachable but host1 is not + assert not cephadm_module.cache.is_host_unreachable('host1') + assert cephadm_module.cache.is_host_unreachable('host2') + assert cephadm_module.cache.is_host_unreachable('host3') + + _get_client_files.side_effect = Exception('Called _get_client_files') + + # with the online host, should call _get_client_files which + # we have setup to raise an Exception + with pytest.raises(Exception, match='Called _get_client_files'): + CephadmServe(cephadm_module)._write_client_files({}, 'host1') + + # for the maintenance and offline host, _get_client_files should + # not be called and it should just return immediately with nothing + # having been raised + CephadmServe(cephadm_module)._write_client_files({}, 'host2') + CephadmServe(cephadm_module)._write_client_files({}, 'host3') + + def test_etc_ceph_init(self): + with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m: + assert m.manage_etc_ceph_ceph_conf is True + + @mock.patch("cephadm.CephadmOrchestrator.check_mon_command") + @mock.patch("cephadm.CephadmOrchestrator.extra_ceph_conf") + def test_extra_ceph_conf(self, _extra_ceph_conf, _check_mon_cmd, cephadm_module: CephadmOrchestrator): + # settings put into the [global] section in the extra conf + # need to be appended to existing [global] section in given + # minimal ceph conf, but anything in another section (e.g. [mon]) + # needs to continue to be its own section + + # this is the conf "ceph generate-minimal-conf" will return in this test + _check_mon_cmd.return_value = (0, """[global] +global_k1 = global_v1 +global_k2 = global_v2 +[mon] +mon_k1 = mon_v1 +[osd] +osd_k1 = osd_v1 +osd_k2 = osd_v2 +""", '') + + # test with extra ceph conf that has some of the sections from minimal conf + _extra_ceph_conf.return_value = CephadmOrchestrator.ExtraCephConf(conf="""[mon] +mon_k2 = mon_v2 +[global] +global_k3 = global_v3 +""", last_modified=datetime_now()) + + expected_combined_conf = """[global] +global_k1 = global_v1 +global_k2 = global_v2 +global_k3 = global_v3 + +[mon] +mon_k1 = mon_v1 +mon_k2 = mon_v2 + +[osd] +osd_k1 = osd_v1 +osd_k2 = osd_v2 +""" + + assert cephadm_module.get_minimal_ceph_conf() == expected_combined_conf + + def test_client_keyrings_special_host_labels(self, cephadm_module): + cephadm_module.inventory.add_host(HostSpec('host1', labels=['keyring1'])) + cephadm_module.inventory.add_host(HostSpec('host2', labels=['keyring1', SpecialHostLabels.DRAIN_DAEMONS])) + cephadm_module.inventory.add_host(HostSpec('host3', labels=['keyring1', SpecialHostLabels.DRAIN_DAEMONS, SpecialHostLabels.DRAIN_CONF_KEYRING])) + # hosts need to be marked as having had refresh to be available for placement + # so "refresh" with empty daemon list + cephadm_module.cache.update_host_daemons('host1', {}) + cephadm_module.cache.update_host_daemons('host2', {}) + cephadm_module.cache.update_host_daemons('host3', {}) + + assert 'host1' in [h.hostname for h in cephadm_module.cache.get_conf_keyring_available_hosts()] + assert 'host2' in [h.hostname for h in cephadm_module.cache.get_conf_keyring_available_hosts()] + assert 'host3' not in [h.hostname for h in cephadm_module.cache.get_conf_keyring_available_hosts()] + + assert 'host1' not in [h.hostname for h in cephadm_module.cache.get_conf_keyring_draining_hosts()] + assert 'host2' not in [h.hostname for h in cephadm_module.cache.get_conf_keyring_draining_hosts()] + assert 'host3' in [h.hostname for h in cephadm_module.cache.get_conf_keyring_draining_hosts()] + + cephadm_module.keys.update(ClientKeyringSpec('keyring1', PlacementSpec(label='keyring1'))) + + with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd: + _mon_cmd.return_value = (0, 'real-keyring', '') + client_files = CephadmServe(cephadm_module)._calc_client_files() + assert 'host1' in client_files.keys() + assert '/etc/ceph/ceph.keyring1.keyring' in client_files['host1'].keys() + assert 'host2' in client_files.keys() + assert '/etc/ceph/ceph.keyring1.keyring' in client_files['host2'].keys() + assert 'host3' not in client_files.keys() + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + def check_registry_credentials(url, username, password): + assert json.loads(cephadm_module.get_store('registry_credentials')) == { + 'url': url, 'username': username, 'password': password} + + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test'): + # test successful login with valid args + code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password') + assert out == 'registry login scheduled' + assert err == '' + check_registry_credentials('test-url', 'test-user', 'test-password') + + # test bad login attempt with invalid args + code, out, err = cephadm_module.registry_login('bad-args') + assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> " + "or -i <login credentials json file>") + check_registry_credentials('test-url', 'test-user', 'test-password') + + # test bad login using invalid json file + code, out, err = cephadm_module.registry_login( + None, None, None, '{"bad-json": "bad-json"}') + assert err == ("json provided for custom registry login did not include all necessary fields. " + "Please setup json file as\n" + "{\n" + " \"url\": \"REGISTRY_URL\",\n" + " \"username\": \"REGISTRY_USERNAME\",\n" + " \"password\": \"REGISTRY_PASSWORD\"\n" + "}\n") + check_registry_credentials('test-url', 'test-user', 'test-password') + + # test good login using valid json file + good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", " + " \"password\": \"" + "json-pass" + "\"}") + code, out, err = cephadm_module.registry_login(None, None, None, good_json) + assert out == 'registry login scheduled' + assert err == '' + check_registry_credentials('json-url', 'json-user', 'json-pass') + + # test bad login where args are valid but login command fails + _run_cephadm.side_effect = async_side_effect(('{}', 'error', 1)) + code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password') + assert err == 'Host test failed to login to fail-url as fail-user with given password' + check_registry_credentials('json-url', 'json-user', 'json-pass') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({ + 'image_id': 'image_id', + 'repo_digests': ['image@repo_digest'], + }))) + @pytest.mark.parametrize("use_repo_digest", + [ + False, + True + ]) + def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator): + cephadm_module.use_repo_digest = use_repo_digest + + with with_host(cephadm_module, 'test', refresh_hosts=False): + cephadm_module.set_container_image('global', 'image') + + if use_repo_digest: + + CephadmServe(cephadm_module).convert_tags_to_repo_digest() + + _, image, _ = cephadm_module.check_mon_command({ + 'prefix': 'config get', + 'who': 'global', + 'key': 'container_image', + }) + if use_repo_digest: + assert image == 'image@repo_digest' + else: + assert image == 'image' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + + error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch +Traceback (most recent call last): + File "<stdin>", line 6112, in <module> + File "<stdin>", line 1299, in _infer_fsid + File "<stdin>", line 1382, in _infer_image + File "<stdin>", line 3612, in command_ceph_volume + File "<stdin>", line 1061, in call_throws""" + + with with_host(cephadm_module, 'test'): + _run_cephadm.reset_mock() + _run_cephadm.side_effect = OrchestratorError(error_message) + + s = CephadmServe(cephadm_module)._refresh_host_devices('test') + assert s == 'host test `cephadm ceph-volume` failed: ' + error_message + + assert _run_cephadm.mock_calls == [ + mock.call('test', 'osd', 'ceph-volume', + ['--', 'inventory', '--format=json-pretty', '--filter-for-batch'], image='', + no_fsid=False, error_ok=False, log_output=False), + mock.call('test', 'osd', 'ceph-volume', + ['--', 'inventory', '--format=json-pretty'], image='', + no_fsid=False, error_ok=False, log_output=False), + ] + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_activate_datadevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test', refresh_hosts=False): + with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1): + pass + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_activate_datadevice_fail(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test', refresh_hosts=False): + cephadm_module.mock_store_set('_ceph_get', 'osd_map', { + 'osds': [ + { + 'osd': 1, + 'up_from': 0, + 'uuid': 'uuid' + } + ] + }) + + ceph_volume_lvm_list = { + '1': [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }] + } + _run_cephadm.reset_mock(return_value=True, side_effect=True) + + async def _r_c(*args, **kwargs): + if 'ceph-volume' in args: + return (json.dumps(ceph_volume_lvm_list), '', 0) + else: + assert ['_orch', 'deploy'] in args + raise OrchestratorError("let's fail somehow") + _run_cephadm.side_effect = _r_c + assert cephadm_module._osd_activate( + ['test']).stderr == "let's fail somehow" + with pytest.raises(AssertionError): + cephadm_module.assert_issued_mon_command({ + 'prefix': 'auth rm', + 'entity': 'osd.1', + }) + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_activate_datadevice_dbdevice(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + with with_host(cephadm_module, 'test', refresh_hosts=False): + + async def _ceph_volume_list(s, host, entity, cmd, **kwargs): + logging.info(f'ceph-volume cmd: {cmd}') + if 'raw' in cmd: + return json.dumps({ + "21a4209b-f51b-4225-81dc-d2dca5b8b2f5": { + "ceph_fsid": "64c84f19-fe1d-452a-a731-ab19dc144aa8", + "device": "/dev/loop0", + "osd_id": 21, + "osd_uuid": "21a4209b-f51b-4225-81dc-d2dca5b8b2f5", + "type": "bluestore" + }, + }), '', 0 + if 'lvm' in cmd: + return json.dumps({ + '1': [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }, { + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'db' + }] + }), '', 0 + return '{}', '', 0 + + with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1, ceph_volume_lvm_list=_ceph_volume_list): + pass + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_osd_count(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + dg = DriveGroupSpec(service_id='', data_devices=DeviceSelection(all=True)) + with with_host(cephadm_module, 'test', refresh_hosts=False): + with with_service(cephadm_module, dg, host='test'): + with with_osd_daemon(cephadm_module, _run_cephadm, 'test', 1): + assert wait(cephadm_module, cephadm_module.describe_service())[0].size == 1 + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_host_rm_last_admin(self, cephadm_module: CephadmOrchestrator): + with pytest.raises(OrchestratorError): + with with_host(cephadm_module, 'test', refresh_hosts=False, rm_with_force=False): + cephadm_module.inventory.add_label('test', SpecialHostLabels.ADMIN) + pass + assert False + with with_host(cephadm_module, 'test1', refresh_hosts=False, rm_with_force=True): + with with_host(cephadm_module, 'test2', refresh_hosts=False, rm_with_force=False): + cephadm_module.inventory.add_label('test2', SpecialHostLabels.ADMIN) + + @pytest.mark.parametrize("facts, settings, expected_value", + [ + # All options are available on all hosts + ( + { + "host1": + { + "sysctl_options": + { + 'opt1': 'val1', + 'opt2': 'val2', + } + }, + "host2": + { + "sysctl_options": + { + 'opt1': '', + 'opt2': '', + } + }, + }, + {'opt1', 'opt2'}, # settings + {'host1': [], 'host2': []} # expected_value + ), + # opt1 is missing on host 1, opt2 is missing on host2 + ({ + "host1": + { + "sysctl_options": + { + 'opt2': '', + 'optX': '', + } + }, + "host2": + { + "sysctl_options": + { + 'opt1': '', + 'opt3': '', + 'opt4': '', + } + }, + }, + {'opt1', 'opt2'}, # settings + {'host1': ['opt1'], 'host2': ['opt2']} # expected_value + ), + # All options are missing on all hosts + ({ + "host1": + { + "sysctl_options": + { + } + }, + "host2": + { + "sysctl_options": + { + } + }, + }, + {'opt1', 'opt2'}, # settings + {'host1': ['opt1', 'opt2'], 'host2': [ + 'opt1', 'opt2']} # expected_value + ), + ] + ) + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_tuned_profiles_settings_validation(self, facts, settings, expected_value, cephadm_module): + with with_host(cephadm_module, 'test'): + spec = mock.Mock() + spec.settings = sorted(settings) + spec.placement.filter_matching_hostspecs = mock.Mock() + spec.placement.filter_matching_hostspecs.return_value = ['host1', 'host2'] + cephadm_module.cache.facts = facts + assert cephadm_module._validate_tunedprofile_settings(spec) == expected_value + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]')) + def test_tuned_profiles_validation(self, cephadm_module): + with with_host(cephadm_module, 'test'): + + with pytest.raises(OrchestratorError, match="^Invalid placement specification.+"): + spec = mock.Mock() + spec.settings = {'a': 'b'} + spec.placement = PlacementSpec(hosts=[]) + cephadm_module._validate_tuned_profile_spec(spec) + + with pytest.raises(OrchestratorError, match="Invalid spec: settings section cannot be empty."): + spec = mock.Mock() + spec.settings = {} + spec.placement = PlacementSpec(hosts=['host1', 'host2']) + cephadm_module._validate_tuned_profile_spec(spec) + + with pytest.raises(OrchestratorError, match="^Placement 'count' field is no supported .+"): + spec = mock.Mock() + spec.settings = {'a': 'b'} + spec.placement = PlacementSpec(count=1) + cephadm_module._validate_tuned_profile_spec(spec) + + with pytest.raises(OrchestratorError, match="^Placement 'count_per_host' field is no supported .+"): + spec = mock.Mock() + spec.settings = {'a': 'b'} + spec.placement = PlacementSpec(count_per_host=1, label='foo') + cephadm_module._validate_tuned_profile_spec(spec) + + with pytest.raises(OrchestratorError, match="^Found invalid host"): + spec = mock.Mock() + spec.settings = {'a': 'b'} + spec.placement = PlacementSpec(hosts=['host1', 'host2']) + cephadm_module.inventory = mock.Mock() + cephadm_module.inventory.all_specs = mock.Mock( + return_value=[mock.Mock().hostname, mock.Mock().hostname]) + cephadm_module._validate_tuned_profile_spec(spec) + + def test_set_unmanaged(self, cephadm_module): + cephadm_module.spec_store._specs['crash'] = ServiceSpec('crash', unmanaged=False) + assert not cephadm_module.spec_store._specs['crash'].unmanaged + cephadm_module.spec_store.set_unmanaged('crash', True) + assert cephadm_module.spec_store._specs['crash'].unmanaged + cephadm_module.spec_store.set_unmanaged('crash', False) + assert not cephadm_module.spec_store._specs['crash'].unmanaged + + def test_inventory_known_hostnames(self, cephadm_module): + cephadm_module.inventory.add_host(HostSpec('host1', '1.2.3.1')) + cephadm_module.inventory.add_host(HostSpec('host2', '1.2.3.2')) + cephadm_module.inventory.add_host(HostSpec('host3.domain', '1.2.3.3')) + cephadm_module.inventory.add_host(HostSpec('host4.domain', '1.2.3.4')) + cephadm_module.inventory.add_host(HostSpec('host5', '1.2.3.5')) + + # update_known_hostname expects args to be <hostname, shortname, fqdn> + # as are gathered from cephadm gather-facts. Although, passing the + # names in the wrong order should actually have no effect on functionality + cephadm_module.inventory.update_known_hostnames('host1', 'host1', 'host1.domain') + cephadm_module.inventory.update_known_hostnames('host2.domain', 'host2', 'host2.domain') + cephadm_module.inventory.update_known_hostnames('host3', 'host3', 'host3.domain') + cephadm_module.inventory.update_known_hostnames('host4.domain', 'host4', 'host4.domain') + cephadm_module.inventory.update_known_hostnames('host5', 'host5', 'host5') + + assert 'host1' in cephadm_module.inventory + assert 'host1.domain' in cephadm_module.inventory + assert cephadm_module.inventory.get_addr('host1') == '1.2.3.1' + assert cephadm_module.inventory.get_addr('host1.domain') == '1.2.3.1' + + assert 'host2' in cephadm_module.inventory + assert 'host2.domain' in cephadm_module.inventory + assert cephadm_module.inventory.get_addr('host2') == '1.2.3.2' + assert cephadm_module.inventory.get_addr('host2.domain') == '1.2.3.2' + + assert 'host3' in cephadm_module.inventory + assert 'host3.domain' in cephadm_module.inventory + assert cephadm_module.inventory.get_addr('host3') == '1.2.3.3' + assert cephadm_module.inventory.get_addr('host3.domain') == '1.2.3.3' + + assert 'host4' in cephadm_module.inventory + assert 'host4.domain' in cephadm_module.inventory + assert cephadm_module.inventory.get_addr('host4') == '1.2.3.4' + assert cephadm_module.inventory.get_addr('host4.domain') == '1.2.3.4' + + assert 'host4.otherdomain' not in cephadm_module.inventory + with pytest.raises(OrchestratorError): + cephadm_module.inventory.get_addr('host4.otherdomain') + + assert 'host5' in cephadm_module.inventory + assert cephadm_module.inventory.get_addr('host5') == '1.2.3.5' + with pytest.raises(OrchestratorError): + cephadm_module.inventory.get_addr('host5.domain') + + def test_async_timeout_handler(self, cephadm_module): + cephadm_module.default_cephadm_command_timeout = 900 + + async def _timeout(): + raise asyncio.TimeoutError + + with pytest.raises(OrchestratorError, match=r'Command timed out \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler(): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command timed out on host hostA \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler('hostA'): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command "testing" timed out \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler(cmd='testing'): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command "testing" timed out on host hostB \(default 900 second timeout\)'): + with cephadm_module.async_timeout_handler('hostB', 'testing'): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command timed out \(non-default 111 second timeout\)'): + with cephadm_module.async_timeout_handler(timeout=111): + cephadm_module.wait_async(_timeout()) + + with pytest.raises(OrchestratorError, match=r'Command "very slow" timed out on host hostC \(non-default 999 second timeout\)'): + with cephadm_module.async_timeout_handler('hostC', 'very slow', 999): + cephadm_module.wait_async(_timeout()) + + @mock.patch("cephadm.CephadmOrchestrator.remove_osds") + @mock.patch("cephadm.CephadmOrchestrator.add_host_label", lambda *a, **kw: None) + @mock.patch("cephadm.inventory.HostCache.get_daemons_by_host", lambda *a, **kw: []) + def test_host_drain_zap(self, _rm_osds, cephadm_module): + # pass force=true in these tests to bypass _admin label check + cephadm_module.drain_host('host1', force=True, zap_osd_devices=False) + assert _rm_osds.called_with([], zap=False) + + cephadm_module.drain_host('host1', force=True, zap_osd_devices=True) + assert _rm_osds.called_with([], zap=True) + + def test_process_ls_output(self, cephadm_module): + sample_ls_output = """[ + { + "style": "cephadm:v1", + "name": "mon.vm-00", + "fsid": "588f83ba-5995-11ee-9e94-52540057a206", + "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@mon.vm-00", + "enabled": true, + "state": "running", + "service_name": "mon", + "ports": [], + "ip": null, + "deployed_by": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "rank": null, + "rank_generation": null, + "extra_container_args": null, + "extra_entrypoint_args": null, + "memory_request": null, + "memory_limit": null, + "container_id": "b170b964a6e2918955362eb36195627c6086d3f859d4ebce2ee13f3ee4738733", + "container_image_name": "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3", + "container_image_id": "674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55", + "container_image_digests": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "memory_usage": 56214159, + "cpu_percentage": "2.32%", + "version": "18.0.0-5185-g7b3a4f2b", + "started": "2023-09-22T22:31:11.752300Z", + "created": "2023-09-22T22:15:24.121387Z", + "deployed": "2023-09-22T22:31:10.383431Z", + "configured": "2023-09-22T22:31:11.859440Z" + }, + { + "style": "cephadm:v1", + "name": "mgr.vm-00.mpexeg", + "fsid": "588f83ba-5995-11ee-9e94-52540057a206", + "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@mgr.vm-00.mpexeg", + "enabled": true, + "state": "running", + "service_name": "mgr", + "ports": [ + 8443, + 9283, + 8765 + ], + "ip": null, + "deployed_by": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "rank": null, + "rank_generation": null, + "extra_container_args": null, + "extra_entrypoint_args": null, + "memory_request": null, + "memory_limit": null, + "container_id": "6e7756cef553a25a2a84227e8755d3d25046b9cd8758b23c698d34b3af895242", + "container_image_name": "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3", + "container_image_id": "674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55", + "container_image_digests": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "memory_usage": 529740595, + "cpu_percentage": "8.35%", + "version": "18.0.0-5185-g7b3a4f2b", + "started": "2023-09-22T22:30:18.587021Z", + "created": "2023-09-22T22:15:29.101409Z", + "deployed": "2023-09-22T22:30:17.339114Z", + "configured": "2023-09-22T22:30:18.758122Z" + }, + { + "style": "cephadm:v1", + "name": "agent.vm-00", + "fsid": "588f83ba-5995-11ee-9e94-52540057a206", + "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@agent.vm-00", + "enabled": true, + "state": "running", + "service_name": "agent", + "ports": [], + "ip": null, + "deployed_by": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "rank": null, + "rank_generation": null, + "extra_container_args": null, + "extra_entrypoint_args": null, + "container_id": null, + "container_image_name": null, + "container_image_id": null, + "container_image_digests": null, + "version": null, + "started": null, + "created": "2023-09-22T22:33:34.708289Z", + "deployed": null, + "configured": "2023-09-22T22:33:34.722289Z" + }, + { + "style": "cephadm:v1", + "name": "osd.0", + "fsid": "588f83ba-5995-11ee-9e94-52540057a206", + "systemd_unit": "ceph-588f83ba-5995-11ee-9e94-52540057a206@osd.0", + "enabled": true, + "state": "running", + "service_name": "osd.foo", + "ports": [], + "ip": null, + "deployed_by": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "rank": null, + "rank_generation": null, + "extra_container_args": null, + "extra_entrypoint_args": null, + "memory_request": null, + "memory_limit": null, + "container_id": "93f71c60820b86901a45b3b1fe3dba3e3e677b37fd22310b7e7da3f67bb8ccd6", + "container_image_name": "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3", + "container_image_id": "674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55", + "container_image_digests": [ + "quay.io/adk3798/ceph@sha256:ff374767a4568f6d11a941ab763e7732cd7e071362328f7b6a7891bc4852a3a3" + ], + "memory_usage": 73410805, + "cpu_percentage": "6.54%", + "version": "18.0.0-5185-g7b3a4f2b", + "started": "2023-09-22T22:41:29.019587Z", + "created": "2023-09-22T22:41:03.615080Z", + "deployed": "2023-09-22T22:41:24.965222Z", + "configured": "2023-09-22T22:41:29.119250Z" + } +]""" + + now = str_to_datetime('2023-09-22T22:45:29.119250Z') + cephadm_module._cluster_fsid = '588f83ba-5995-11ee-9e94-52540057a206' + with mock.patch("cephadm.module.datetime_now", lambda: now): + cephadm_module._process_ls_output('vm-00', json.loads(sample_ls_output)) + assert 'vm-00' in cephadm_module.cache.daemons + assert 'mon.vm-00' in cephadm_module.cache.daemons['vm-00'] + assert 'mgr.vm-00.mpexeg' in cephadm_module.cache.daemons['vm-00'] + assert 'agent.vm-00' in cephadm_module.cache.daemons['vm-00'] + assert 'osd.0' in cephadm_module.cache.daemons['vm-00'] + + daemons = cephadm_module.cache.get_daemons_by_host('vm-00') + c_img_ids = [dd.container_image_id for dd in daemons if dd.daemon_type != 'agent'] + assert all(c_img_id == '674eb38037f1555bb7884ede5db47f1749486e7f12ecb416e34ada87c9934e55' for c_img_id in c_img_ids) + last_refreshes = [dd.last_refresh for dd in daemons] + assert all(lrf == now for lrf in last_refreshes) + versions = [dd.version for dd in daemons if dd.daemon_type != 'agent'] + assert all(version == '18.0.0-5185-g7b3a4f2b' for version in versions) + + osd = cephadm_module.cache.get_daemons_by_type('osd', 'vm-00')[0] + assert osd.cpu_percentage == '6.54%' + assert osd.memory_usage == 73410805 + assert osd.created == str_to_datetime('2023-09-22T22:41:03.615080Z') |