1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
import fnmatch
import asyncio
import sys
from tempfile import NamedTemporaryFile
from contextlib import contextmanager
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from ceph.utils import datetime_to_str, datetime_now
from cephadm.serve import CephadmServe, cephadmNoImage
try:
from typing import Any, Iterator, List, Callable, Dict
except ImportError:
pass
from cephadm import CephadmOrchestrator
from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus
from tests import mock
def async_side_effect(result):
async def side_effect(*args, **kwargs):
return result
return side_effect
def get_ceph_option(_, key):
return __file__
def get_module_option_ex(_, module, key, default=None):
if module == 'prometheus':
if key == 'server_port':
return 9283
return None
def _run_cephadm(ret):
async def foo(s, host, entity, cmd, e, **kwargs):
if cmd == 'gather-facts':
return '{}', '', 0
return [ret], '', 0
return foo
def match_glob(val, pat):
ok = fnmatch.fnmatchcase(val, pat)
if not ok:
assert pat in val
class MockEventLoopThread:
def get_result(self, coro, timeout):
if sys.version_info >= (3, 7):
return asyncio.run(coro)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
asyncio.set_event_loop(None)
def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
to_update: Dict[str, Callable[[str, Any], None]] = {
'ls': m._process_ls_output,
'gather-facts': m.cache.update_host_facts,
'list-networks': m.cache.update_host_networks,
}
if ops:
for op in ops:
out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []))
to_update[op](host, out)
m.cache.last_daemon_update[host] = datetime_now()
m.cache.last_facts_update[host] = datetime_now()
m.cache.last_network_update[host] = datetime_now()
m.cache.metadata_up_to_date[host] = True
def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None:
for host in m.cache.get_hosts():
receive_agent_metadata(m, host)
@contextmanager
def with_cephadm_module(module_options=None, store=None):
"""
:param module_options: Set opts as if they were set before module.__init__ is called
:param store: Set the store before module.__init__ is called
"""
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option), \
mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex), \
mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \
mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'), \
mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \
mock.patch('cephadm.http_server.CephadmHttpServer.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:
for k, v in module_options.items():
m._ceph_set_module_option('cephadm', k, v)
if store is None:
store = {}
if '_ceph_get/mon_map' not in store:
m.mock_store_set('_ceph_get', 'mon_map', {
'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
if '_ceph_get/mgr_map' not in store:
m.mock_store_set('_ceph_get', 'mgr_map', {
'services': {
'dashboard': 'http://[::1]:8080',
'prometheus': 'http://[::1]:8081'
},
'modules': ['dashboard', 'prometheus'],
})
for k, v in store.items():
m._ceph_set_store(k, v)
m.__init__('cephadm', 0, 0)
m._cluster_fsid = "fsid"
m.event_loop = MockEventLoopThread()
m.tkey = NamedTemporaryFile(prefix='test-cephadm-identity-')
yield m
def wait(m: CephadmOrchestrator, c: OrchResult) -> Any:
return raise_if_exception(c)
@contextmanager
def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True):
with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
wait(m, m.add_host(HostSpec(hostname=name)))
if refresh_hosts:
CephadmServe(m)._refresh_hosts_and_daemons()
receive_agent_metadata(m, name)
yield
wait(m, m.remove_host(name, force=rm_with_force))
def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
if mon_or_mgr:
assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
return
assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
assert cephadm.spec_store[srv_name].deleted is not None
CephadmServe(cephadm)._check_daemons()
CephadmServe(cephadm)._apply_all_services()
assert cephadm.spec_store[srv_name].deleted
unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
CephadmServe(cephadm)._purge_deleted_services()
if not unmanaged: # cause then we're not deleting daemons
assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'
@contextmanager
def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]:
if spec.placement.is_empty() and host:
spec.placement = PlacementSpec(hosts=[host], count=1)
if meth is not None:
c = meth(cephadm_module, spec)
assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
else:
c = cephadm_module.apply([spec])
assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...']
specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
assert spec in specs
CephadmServe(cephadm_module)._apply_all_services()
if status_running:
make_daemons_running(cephadm_module, spec.service_name())
dds = wait(cephadm_module, cephadm_module.list_daemons())
own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
if host and spec.service_type != 'osd':
assert own_dds
yield [dd.name() for dd in own_dds]
assert_rm_service(cephadm_module, spec.service_name())
def make_daemons_running(cephadm_module, service_name):
own_dds = cephadm_module.cache.get_daemons_by_service(service_name)
for dd in own_dds:
dd.status = DaemonDescriptionStatus.running # We're changing the reference
|