summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/tests/fixtures.py
blob: 8c2e1cfbfbb621b734e87606474237445e5e336a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import fnmatch
from contextlib import contextmanager

from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from ceph.utils import datetime_to_str, datetime_now
from cephadm.serve import CephadmServe

try:
    from typing import Any, Iterator, List
except ImportError:
    pass

from cephadm import CephadmOrchestrator
from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus
from tests import mock


def get_ceph_option(_, key):
    return __file__


def get_module_option_ex(_, module, key, default=None):
    if module == 'prometheus':
        if key == 'server_port':
            return 9283
    return None


def _run_cephadm(ret):
    def foo(s, host, entity, cmd, e, **kwargs):
        if cmd == 'gather-facts':
            return '{}', '', 0
        return [ret], '', 0
    return foo


def match_glob(val, pat):
    ok = fnmatch.fnmatchcase(val, pat)
    if not ok:
        assert pat in val


@contextmanager
def with_cephadm_module(module_options=None, store=None):
    """
    :param module_options: Set opts as if they were set before module.__init__ is called
    :param store: Set the store before module.__init__ is called
    """
    with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
            mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
            mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex),\
            mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
            mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
            mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):

        m = CephadmOrchestrator.__new__(CephadmOrchestrator)
        if module_options is not None:
            for k, v in module_options.items():
                m._ceph_set_module_option('cephadm', k, v)
        if store is None:
            store = {}
        if '_ceph_get/mon_map' not in store:
            m.mock_store_set('_ceph_get', 'mon_map', {
                'modified': datetime_to_str(datetime_now()),
                'fsid': 'foobar',
            })
        if '_ceph_get/mgr_map' not in store:
            m.mock_store_set('_ceph_get', 'mgr_map', {
                'services': {
                    'dashboard': 'http://[::1]:8080',
                    'prometheus': 'http://[::1]:8081'
                },
                'modules': ['dashboard', 'prometheus'],
            })
        for k, v in store.items():
            m._ceph_set_store(k, v)

        m.__init__('cephadm', 0, 0)
        m._cluster_fsid = "fsid"
        yield m


def wait(m: CephadmOrchestrator, c: OrchResult) -> Any:
    return raise_if_exception(c)


@contextmanager
def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True):
    with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
        wait(m, m.add_host(HostSpec(hostname=name)))
        if refresh_hosts:
            CephadmServe(m)._refresh_hosts_and_daemons()
        yield
        wait(m, m.remove_host(name, force=rm_with_force))


def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
    mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
    if mon_or_mgr:
        assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
        return
    assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
    assert cephadm.spec_store[srv_name].deleted is not None
    CephadmServe(cephadm)._check_daemons()
    CephadmServe(cephadm)._apply_all_services()
    assert cephadm.spec_store[srv_name].deleted
    unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
    CephadmServe(cephadm)._purge_deleted_services()
    if not unmanaged:  # cause then we're not deleting daemons
        assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'


@contextmanager
def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]:
    if spec.placement.is_empty() and host:
        spec.placement = PlacementSpec(hosts=[host], count=1)
    if meth is not None:
        c = meth(cephadm_module, spec)
        assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
    else:
        c = cephadm_module.apply([spec])
        assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...']

    specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
    assert spec in specs

    CephadmServe(cephadm_module)._apply_all_services()

    if status_running:
        make_daemons_running(cephadm_module, spec.service_name())

    dds = wait(cephadm_module, cephadm_module.list_daemons())
    own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
    if host and spec.service_type != 'osd':
        assert own_dds

    yield [dd.name() for dd in own_dds]

    assert_rm_service(cephadm_module, spec.service_name())


def make_daemons_running(cephadm_module, service_name):
    own_dds = cephadm_module.cache.get_daemons_by_service(service_name)
    for dd in own_dds:
        dd.status = DaemonDescriptionStatus.running  # We're changing the reference


def _deploy_cephadm_binary(host):
    def foo(*args, **kwargs):
        return True
    return foo