From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/cephadm/tests/fixtures.py | 200 +++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 src/pybind/mgr/cephadm/tests/fixtures.py (limited to 'src/pybind/mgr/cephadm/tests/fixtures.py') diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py new file mode 100644 index 000000000..6281283d7 --- /dev/null +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -0,0 +1,200 @@ +import fnmatch +import asyncio +import sys +from tempfile import NamedTemporaryFile +from contextlib import contextmanager + +from ceph.deployment.service_spec import PlacementSpec, ServiceSpec +from ceph.utils import datetime_to_str, datetime_now +from cephadm.serve import CephadmServe, cephadmNoImage + +try: + from typing import Any, Iterator, List, Callable, Dict +except ImportError: + pass + +from cephadm import CephadmOrchestrator +from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus +from tests import mock + + +def async_side_effect(result): + async def side_effect(*args, **kwargs): + return result + return side_effect + + +def get_ceph_option(_, key): + return __file__ + + +def get_module_option_ex(_, module, key, default=None): + if module == 'prometheus': + if key == 'server_port': + return 9283 + return None + + +def _run_cephadm(ret): + async def foo(s, host, entity, cmd, e, **kwargs): + if cmd == 'gather-facts': + return '{}', '', 0 + return [ret], '', 0 + return foo + + +def match_glob(val, pat): + ok = fnmatch.fnmatchcase(val, pat) + if not ok: + assert pat in val + + +class MockEventLoopThread: + def get_result(self, coro, timeout): + if sys.version_info >= (3, 7): + return asyncio.run(coro) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + loop.close() + asyncio.set_event_loop(None) + + +def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None: + to_update: Dict[str, Callable[[str, Any], None]] = { + 'ls': m._process_ls_output, + 'gather-facts': m.cache.update_host_facts, + 'list-networks': m.cache.update_host_networks, + } + if ops: + for op in ops: + out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])) + to_update[op](host, out) + m.cache.last_daemon_update[host] = datetime_now() + m.cache.last_facts_update[host] = datetime_now() + m.cache.last_network_update[host] = datetime_now() + m.cache.metadata_up_to_date[host] = True + + +def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None: + for host in m.cache.get_hosts(): + receive_agent_metadata(m, host) + + +@contextmanager +def with_cephadm_module(module_options=None, store=None): + """ + :param module_options: Set opts as if they were set before module.__init__ is called + :param store: Set the store before module.__init__ is called + """ + with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option), \ + mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \ + mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex), \ + mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \ + mock.patch("cephadm.module.CephadmOrchestrator.remote"), \ + mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \ + mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \ + mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \ + mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \ + mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'), \ + mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \ + mock.patch('cephadm.http_server.CephadmHttpServer.run'): + + m = CephadmOrchestrator.__new__(CephadmOrchestrator) + if module_options is not None: + for k, v in module_options.items(): + m._ceph_set_module_option('cephadm', k, v) + if store is None: + store = {} + if '_ceph_get/mon_map' not in store: + m.mock_store_set('_ceph_get', 'mon_map', { + 'modified': datetime_to_str(datetime_now()), + 'fsid': 'foobar', + }) + if '_ceph_get/mgr_map' not in store: + m.mock_store_set('_ceph_get', 'mgr_map', { + 'services': { + 'dashboard': 'http://[::1]:8080', + 'prometheus': 'http://[::1]:8081' + }, + 'modules': ['dashboard', 'prometheus'], + }) + for k, v in store.items(): + m._ceph_set_store(k, v) + + m.__init__('cephadm', 0, 0) + m._cluster_fsid = "fsid" + + m.event_loop = MockEventLoopThread() + m.tkey = NamedTemporaryFile(prefix='test-cephadm-identity-') + + yield m + + +def wait(m: CephadmOrchestrator, c: OrchResult) -> Any: + return raise_if_exception(c) + + +@contextmanager +def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True): + with mock.patch("cephadm.utils.resolve_ip", return_value=addr): + wait(m, m.add_host(HostSpec(hostname=name))) + if refresh_hosts: + CephadmServe(m)._refresh_hosts_and_daemons() + receive_agent_metadata(m, name) + yield + wait(m, m.remove_host(name, force=rm_with_force)) + + +def assert_rm_service(cephadm: CephadmOrchestrator, srv_name): + mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr') + if mon_or_mgr: + assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name)) + return + assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}' + assert cephadm.spec_store[srv_name].deleted is not None + CephadmServe(cephadm)._check_daemons() + CephadmServe(cephadm)._apply_all_services() + assert cephadm.spec_store[srv_name].deleted + unmanaged = cephadm.spec_store[srv_name].spec.unmanaged + CephadmServe(cephadm)._purge_deleted_services() + if not unmanaged: # cause then we're not deleting daemons + assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}' + + +@contextmanager +def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]: + if spec.placement.is_empty() and host: + spec.placement = PlacementSpec(hosts=[host], count=1) + if meth is not None: + c = meth(cephadm_module, spec) + assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...' + else: + c = cephadm_module.apply([spec]) + assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...'] + + specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] + assert spec in specs + + CephadmServe(cephadm_module)._apply_all_services() + + if status_running: + make_daemons_running(cephadm_module, spec.service_name()) + + dds = wait(cephadm_module, cephadm_module.list_daemons()) + own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()] + if host and spec.service_type != 'osd': + assert own_dds + + yield [dd.name() for dd in own_dds] + + assert_rm_service(cephadm_module, spec.service_name()) + + +def make_daemons_running(cephadm_module, service_name): + own_dds = cephadm_module.cache.get_daemons_by_service(service_name) + for dd in own_dds: + dd.status = DaemonDescriptionStatus.running # We're changing the reference -- cgit v1.2.3