summaryrefslogtreecommitdiffstats
path: root/src/cephadm/tests/test_agent.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cephadm/tests/test_agent.py')
-rw-r--r--src/cephadm/tests/test_agent.py800
1 files changed, 800 insertions, 0 deletions
diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py
new file mode 100644
index 000000000..f9cf201e2
--- /dev/null
+++ b/src/cephadm/tests/test_agent.py
@@ -0,0 +1,800 @@
+from unittest import mock
+import copy, datetime, json, os, socket, threading
+
+import pytest
+
+from tests.fixtures import with_cephadm_ctx, cephadm_fs, import_cephadm
+
+from typing import Optional
+
+_cephadm = import_cephadm()
+
+
+FSID = "beefbeef-beef-beef-1234-beefbeefbeef"
+AGENT_ID = 'host1'
+AGENT_DIR = f'/var/lib/ceph/{FSID}/agent.{AGENT_ID}'
+
+
+def test_agent_validate():
+ required_files = _cephadm.CephadmAgent.required_files
+ with with_cephadm_ctx([]) as ctx:
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ for i in range(len(required_files)):
+ incomplete_files = {s: 'text' for s in [f for j, f in enumerate(required_files) if j != i]}
+ with pytest.raises(_cephadm.Error, match=f'required file missing from config: {required_files[i]}'):
+ agent.validate(incomplete_files)
+ all_files = {s: 'text' for s in required_files}
+ agent.validate(all_files)
+
+
+def _check_file(path, content):
+ assert os.path.exists(path)
+ with open(path) as f:
+ fcontent = f.read()
+ assert fcontent == content
+
+
+@mock.patch('cephadm.call_throws')
+def test_agent_deploy_daemon_unit(_call_throws, cephadm_fs):
+ _call_throws.return_value = ('', '', 0)
+ agent_id = AGENT_ID
+
+ with with_cephadm_ctx([]) as ctx:
+ ctx.meta_json = json.dumps({'meta': 'data'})
+ agent = _cephadm.CephadmAgent(ctx, FSID, agent_id)
+ cephadm_fs.create_dir(AGENT_DIR)
+
+ with pytest.raises(_cephadm.Error, match='Agent needs a config'):
+ agent.deploy_daemon_unit()
+
+ config = {s: f'text for {s}' for s in _cephadm.CephadmAgent.required_files}
+ config['not-required-file.txt'] = 'don\'t write me'
+
+ agent.deploy_daemon_unit(config)
+
+ # check required config file were all created
+ for fname in _cephadm.CephadmAgent.required_files:
+ _check_file(f'{AGENT_DIR}/{fname}', f'text for {fname}')
+
+ # assert non-required file was not written
+ assert not os.path.exists(f'{AGENT_DIR}/not-required-file.txt')
+
+ # check unit.run file was created correctly
+ _check_file(f'{AGENT_DIR}/unit.run', agent.unit_run())
+
+ # check unit.meta file created correctly
+ _check_file(f'{AGENT_DIR}/unit.meta', json.dumps({'meta': 'data'}, indent=4) + '\n')
+
+ # check unit file was created correctly
+ _check_file(f'{ctx.unit_dir}/{agent.unit_name()}', agent.unit_file())
+
+ expected_call_throws_calls = [
+ mock.call(ctx, ['systemctl', 'daemon-reload']),
+ mock.call(ctx, ['systemctl', 'enable', '--now', agent.unit_name()]),
+ ]
+ _call_throws.assert_has_calls(expected_call_throws_calls)
+
+ expected_call_calls = [
+ mock.call(ctx, ['systemctl', 'stop', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG),
+ mock.call(ctx, ['systemctl', 'reset-failed', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG),
+ ]
+ _cephadm.call.assert_has_calls(expected_call_calls)
+
+
+@mock.patch('threading.Thread.is_alive')
+def test_agent_shutdown(_is_alive):
+ with with_cephadm_ctx([]) as ctx:
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ _is_alive.return_value = True
+ assert agent.stop == False
+ assert agent.mgr_listener.stop == False
+ assert agent.ls_gatherer.stop == False
+ assert agent.volume_gatherer.stop == False
+ agent.shutdown()
+ assert agent.stop == True
+ assert agent.mgr_listener.stop == True
+ assert agent.ls_gatherer.stop == True
+ assert agent.volume_gatherer.stop == True
+
+
+def test_agent_wakeup():
+ with with_cephadm_ctx([]) as ctx:
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ assert agent.event.is_set() == False
+ agent.wakeup()
+ assert agent.event.is_set() == True
+
+
+@mock.patch("cephadm.CephadmAgent.shutdown")
+@mock.patch("cephadm.AgentGatherer.update_func")
+def test_pull_conf_settings(_update_func, _shutdown, cephadm_fs):
+ target_ip = '192.168.0.0'
+ target_port = 9876
+ refresh_period = 20
+ listener_port = 5678
+ host = AGENT_ID
+ device_enhanced_scan = 'True'
+ with with_cephadm_ctx([]) as ctx:
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ full_config = {
+ 'target_ip': target_ip,
+ 'target_port': target_port,
+ 'refresh_period': refresh_period,
+ 'listener_port': listener_port,
+ 'host': host,
+ 'device_enhanced_scan': device_enhanced_scan
+ }
+ cephadm_fs.create_dir(AGENT_DIR)
+ with open(agent.config_path, 'w') as f:
+ f.write(json.dumps(full_config))
+
+ with pytest.raises(_cephadm.Error, match="Failed to get agent keyring:"):
+ agent.pull_conf_settings()
+ _shutdown.assert_called()
+ with open(agent.keyring_path, 'w') as f:
+ f.write('keyring')
+
+ assert agent.device_enhanced_scan == False
+ agent.pull_conf_settings()
+ assert agent.host == host
+ assert agent.target_ip == target_ip
+ assert agent.target_port == target_port
+ assert agent.loop_interval == refresh_period
+ assert agent.starting_port == listener_port
+ assert agent.device_enhanced_scan == True
+ assert agent.keyring == 'keyring'
+ _update_func.assert_called()
+
+ full_config.pop('target_ip')
+ with open(agent.config_path, 'w') as f:
+ f.write(json.dumps(full_config))
+ with pytest.raises(_cephadm.Error, match="Failed to get agent target ip and port from config:"):
+ agent.pull_conf_settings()
+
+
+@mock.patch("cephadm.command_ceph_volume")
+def test_agent_ceph_volume(_ceph_volume):
+
+ def _ceph_volume_outputter(_):
+ print("ceph-volume output")
+
+ def _ceph_volume_empty(_):
+ pass
+
+ with with_cephadm_ctx([]) as ctx:
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+
+ _ceph_volume.side_effect = _ceph_volume_outputter
+ out, _ = agent._ceph_volume(False)
+ assert ctx.command == ['inventory', '--format=json']
+ assert out == "ceph-volume output\n"
+
+ out, _ = agent._ceph_volume(True)
+ assert ctx.command == ['inventory', '--format=json', '--with-lsm']
+ assert out == "ceph-volume output\n"
+
+ _ceph_volume.side_effect = _ceph_volume_empty
+ with pytest.raises(Exception, match='ceph-volume returned empty value'):
+ out, _ = agent._ceph_volume(False)
+
+
+def test_agent_daemon_ls_subset(cephadm_fs):
+ # Basing part of this test on some actual sample output
+
+ # Some sample "podman stats --format '{{.ID}},{{.MemUsage}}' --no-stream" output
+ # 3f2b31d19ecd,456.4MB / 41.96GB
+ # 5aca2499e0f8,7.082MB / 41.96GB
+ # fe0cef07d5f7,35.91MB / 41.96GB
+
+ # Sample "podman ps --format '{{.ID}},{{.Names}}' --no-trunc" output with the same containers
+ # fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mon-host1
+ # 3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mgr-host1-pntmho
+ # 5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-crash-host1
+
+ # Some of the components from that output
+ mgr_cid = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f'
+ mon_cid = 'fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30'
+ crash_cid = '5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a'
+ mgr_short_cid = mgr_cid[0:12]
+ mon_short_cid = mon_cid[0:12]
+ crash_short_cid = crash_cid[0:12]
+
+ #Rebuilding the output but with our testing FSID and components (to allow alteration later for whatever reason)
+ mem_out = f"""{mgr_short_cid},456.4MB / 41.96GB
+{crash_short_cid},7.082MB / 41.96GB
+{mon_short_cid},35.91MB / 41.96GB"""
+
+ ps_out = f"""{mon_cid},ceph-{FSID}-mon-host1
+{mgr_cid},ceph-{FSID}-mgr-host1-pntmho
+{crash_cid},ceph-{FSID}-crash-host1"""
+
+ def _fake_call(ctx, cmd, desc=None, verbosity=_cephadm.CallVerbosity.VERBOSE_ON_FAILURE, timeout=_cephadm.DEFAULT_TIMEOUT, **kwargs):
+ if 'stats' in cmd:
+ return (mem_out, '', 0)
+ elif 'ps' in cmd:
+ return (ps_out, '', 0)
+ return ('out', 'err', 0)
+
+ cephadm_fs.create_dir(AGENT_DIR)
+ cephadm_fs.create_dir(f'/var/lib/ceph/mon/ceph-host1') # legacy daemon
+ cephadm_fs.create_dir(f'/var/lib/ceph/osd/nothing') # improper directory, should be skipped
+ cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/mgr.host1.pntmho') # cephadm daemon
+ cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/crash.host1') # cephadm daemon
+
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ _cephadm.call.side_effect = _fake_call
+ daemons = agent._daemon_ls_subset()
+
+ assert 'agent.host1' in daemons
+ assert 'mgr.host1.pntmho' in daemons
+ assert 'crash.host1' in daemons
+ assert 'mon.host1' in daemons
+
+ assert daemons['mon.host1']['style'] == 'legacy'
+ assert daemons['mgr.host1.pntmho']['style'] == 'cephadm:v1'
+ assert daemons['crash.host1']['style'] == 'cephadm:v1'
+ assert daemons['agent.host1']['style'] == 'cephadm:v1'
+
+ assert daemons['mgr.host1.pntmho']['systemd_unit'] == f'ceph-{FSID}@mgr.host1.pntmho'
+ assert daemons['agent.host1']['systemd_unit'] == f'ceph-{FSID}@agent.host1'
+ assert daemons['crash.host1']['systemd_unit'] == f'ceph-{FSID}@crash.host1'
+
+ assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid
+ assert daemons['crash.host1']['container_id'] == crash_cid
+
+ assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB
+ assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB
+
+
+@mock.patch("cephadm.list_daemons")
+@mock.patch("cephadm.CephadmAgent._daemon_ls_subset")
+def test_agent_get_ls(_ls_subset, _ls, cephadm_fs):
+ ls_out = [{
+ "style": "cephadm:v1",
+ "name": "mgr.host1.pntmho",
+ "fsid": FSID,
+ "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho",
+ "enabled": True,
+ "state": "running",
+ "service_name": "mgr",
+ "memory_request": None,
+ "memory_limit": None,
+ "ports": [
+ 9283,
+ 8765
+ ],
+ "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f",
+ "container_image_name": "quay.io/ceph/ceph:testing",
+ "container_image_id": "3300e39269f0c13ae45026cf233d8b3fff1303d52f2598a69c7fba0bb8405164",
+ "container_image_digests": [
+ "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30"
+ ],
+ "memory_usage": 507510784,
+ "cpu_percentage": "5.95%",
+ "version": "18.0.0-556-gb4d1a199",
+ "started": "2022-10-27T14:19:36.086664Z",
+ "created": "2022-10-27T14:19:36.282281Z",
+ "deployed": "2022-10-27T14:19:35.377275Z",
+ "configured": "2022-10-27T14:22:40.316912Z"
+ },{
+ "style": "cephadm:v1",
+ "name": "agent.host1",
+ "fsid": FSID,
+ "systemd_unit": f"ceph-{FSID}@agent.host1",
+ "enabled": True,
+ "state": "running",
+ "service_name": "agent",
+ "ports": [],
+ "ip": None,
+ "deployed_by": [
+ "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30"
+ ],
+ "rank": None,
+ "rank_generation": None,
+ "extra_container_args": None,
+ "container_id": None,
+ "container_image_name": None,
+ "container_image_id": None,
+ "container_image_digests": None,
+ "version": None,
+ "started": None,
+ "created": "2022-10-27T19:46:49.751594Z",
+ "deployed": None,
+ "configured": "2022-10-27T19:46:49.751594Z"
+ }, {
+ "style": "legacy",
+ "name": "mon.host1",
+ "fsid": FSID,
+ "systemd_unit": "ceph-mon@host1",
+ "enabled": False,
+ "state": "stopped",
+ "host_version": None
+ }]
+
+ ls_subset_out = {
+ 'mgr.host1.pntmho': {
+ "style": "cephadm:v1",
+ "fsid": FSID,
+ "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho",
+ "enabled": True,
+ "state": "running",
+ "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f",
+ "memory_usage": 507510784,
+ },
+ 'agent.host1': {
+ "style": "cephadm:v1",
+ "fsid": FSID,
+ "systemd_unit": f"ceph-{FSID}@agent.host1",
+ "enabled": True,
+ "state": "running",
+ "container_id": None
+ }, 'mon.host1': {
+ "style": "legacy",
+ "name": "mon.host1",
+ "fsid": FSID,
+ "systemd_unit": "ceph-mon@host1",
+ "enabled": False,
+ "state": "stopped",
+ "host_version": None
+ }}
+
+ _ls.return_value = ls_out
+ _ls_subset.return_value = ls_subset_out
+
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+
+ # first pass, no cached daemon metadata
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out
+ assert changed
+
+ # second pass, should recognize that daemons have not changed and just keep cached values
+ daemons, changed = agent._get_ls()
+ assert daemons == daemons
+ assert not changed
+
+ # change a container id so it needs to get more info
+ ls_subset_out2 = copy.deepcopy(ls_subset_out)
+ ls_out2 = copy.deepcopy(ls_out)
+ ls_subset_out2['mgr.host1.pntmho']['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa'
+ ls_out2[0]['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa'
+ _ls.return_value = ls_out2
+ _ls_subset.return_value = ls_subset_out2
+ assert agent.cached_ls_values['mgr.host1.pntmho']['container_id'] == "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f"
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out2
+ assert changed
+
+ # run again with the same data so it should use cached values
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out2
+ assert not changed
+
+ # change the state of a container so new daemon metadata is needed
+ ls_subset_out3 = copy.deepcopy(ls_subset_out2)
+ ls_out3 = copy.deepcopy(ls_out2)
+ ls_subset_out3['mgr.host1.pntmho']['enabled'] = False
+ ls_out3[0]['enabled'] = False
+ _ls.return_value = ls_out3
+ _ls_subset.return_value = ls_subset_out3
+ assert agent.cached_ls_values['mgr.host1.pntmho']['enabled'] == True
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out3
+ assert changed
+
+ # run again with the same data so it should use cached values
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out3
+ assert not changed
+
+ # remove a daemon so new metadats is needed
+ ls_subset_out4 = copy.deepcopy(ls_subset_out3)
+ ls_out4 = copy.deepcopy(ls_out3)
+ ls_subset_out4.pop('mon.host1')
+ ls_out4.pop()
+ _ls.return_value = ls_out4
+ _ls_subset.return_value = ls_subset_out4
+ assert 'mon.host1' in agent.cached_ls_values
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out4
+ assert changed
+
+ # run again with the same data so it should use cached values
+ daemons, changed = agent._get_ls()
+ assert daemons == ls_out4
+ assert not changed
+
+
+@mock.patch("threading.Event.clear")
+@mock.patch("threading.Event.wait")
+@mock.patch("urllib.request.Request.__init__")
+@mock.patch("cephadm.urlopen")
+@mock.patch("cephadm.list_networks")
+@mock.patch("cephadm.HostFacts.dump")
+@mock.patch("cephadm.HostFacts.__init__", lambda _, __: None)
+@mock.patch("ssl.SSLContext.load_verify_locations")
+@mock.patch("threading.Thread.is_alive")
+@mock.patch("cephadm.MgrListener.start")
+@mock.patch("cephadm.AgentGatherer.start")
+@mock.patch("cephadm.port_in_use")
+@mock.patch("cephadm.CephadmAgent.pull_conf_settings")
+def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start,
+ _listener_start, _is_alive, _load_verify_locations,
+ _HF_dump, _list_networks, _urlopen, _RQ_init, _wait, _clear):
+ target_ip = '192.168.0.0'
+ target_port = '9999'
+ refresh_period = 20
+ listener_port = 7770
+ open_listener_port = 7777
+ host = AGENT_ID
+ device_enhanced_scan = False
+
+ def _fake_port_in_use(ctx, endpoint):
+ if endpoint.port == open_listener_port:
+ return False
+ return True
+
+ network_data: Dict[str, Dict[str, Set[str]]] = {
+ "10.2.1.0/24": {
+ "eth1": set(["10.2.1.122"])
+ },
+ "192.168.122.0/24": {
+ "eth0": set(["192.168.122.221"])
+ },
+ "fe80::/64": {
+ "eth0": set(["fe80::5054:ff:fe3f:d94e"]),
+ "eth1": set(["fe80::5054:ff:fe3f:aa4a"]),
+ }
+ }
+
+ # the json serializable version of the networks data
+ # we expect the agent to actually send
+ network_data_no_sets: Dict[str, Dict[str, List[str]]] = {
+ "10.2.1.0/24": {
+ "eth1": ["10.2.1.122"]
+ },
+ "192.168.122.0/24": {
+ "eth0": ["192.168.122.221"]
+ },
+ "fe80::/64": {
+ "eth0": ["fe80::5054:ff:fe3f:d94e"],
+ "eth1": ["fe80::5054:ff:fe3f:aa4a"],
+ }
+ }
+
+ class FakeHTTPResponse():
+ def __init__(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, tb):
+ pass
+
+ def read(self):
+ return json.dumps({'valid': 'output', 'result': '400'})
+
+ _port_in_use.side_effect = _fake_port_in_use
+ _is_alive.return_value = False
+ _HF_dump.return_value = 'Host Facts'
+ _list_networks.return_value = network_data
+ _urlopen.side_effect = lambda *args, **kwargs: FakeHTTPResponse()
+ _RQ_init.side_effect = lambda *args, **kwargs: None
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ agent.keyring = 'agent keyring'
+ agent.ack = 7
+ agent.volume_gatherer.ack = 7
+ agent.volume_gatherer.data = 'ceph-volume inventory data'
+ agent.ls_gatherer.ack = 7
+ agent.ls_gatherer.data = [{'valid_daemon': 'valid_metadata'}]
+
+ def _set_conf():
+ agent.target_ip = target_ip
+ agent.target_port = target_port
+ agent.loop_interval = refresh_period
+ agent.starting_port = listener_port
+ agent.host = host
+ agent.device_enhanced_scan = device_enhanced_scan
+ _pull_conf_settings.side_effect = _set_conf
+
+ # technically the run function loops forever unless the agent
+ # is told to stop. To get around that we're going to have the
+ # event.wait() (which happens at the end of the loop) to throw
+ # a special exception type. If we catch this exception we can
+ # consider it as being a "success" run
+ class EventCleared(Exception):
+ pass
+
+ _clear.side_effect = EventCleared('SUCCESS')
+ with pytest.raises(EventCleared, match='SUCCESS'):
+ agent.run()
+
+ expected_data = {
+ 'host': host,
+ 'ls': [{'valid_daemon': 'valid_metadata'}],
+ 'networks': network_data_no_sets,
+ 'facts': 'Host Facts',
+ 'volume': 'ceph-volume inventory data',
+ 'ack': str(7),
+ 'keyring': 'agent keyring',
+ 'port': str(open_listener_port)
+ }
+ _RQ_init.assert_called_with(
+ f'https://{target_ip}:{target_port}/data/',
+ json.dumps(expected_data).encode('ascii'),
+ {'Content-Type': 'application/json'}
+ )
+ _listener_start.assert_called()
+ _gatherer_start.assert_called()
+ _urlopen.assert_called()
+
+ # agent should not go down if connections fail
+ _urlopen.side_effect = Exception()
+ with pytest.raises(EventCleared, match='SUCCESS'):
+ agent.run()
+
+ # should fail if no ports are open for listener
+ _port_in_use.side_effect = lambda _, __: True
+ agent.listener_port = None
+ with pytest.raises(Exception, match='Failed to pick port for agent to listen on: All 1000 ports starting at 7770 taken.'):
+ agent.run()
+
+
+@mock.patch("cephadm.CephadmAgent.pull_conf_settings")
+@mock.patch("cephadm.CephadmAgent.wakeup")
+def test_mgr_listener_handle_json_payload(_agent_wakeup, _pull_conf_settings, cephadm_fs):
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ cephadm_fs.create_dir(AGENT_DIR)
+
+ data_no_config = {
+ 'counter': 7
+ }
+ agent.mgr_listener.handle_json_payload(data_no_config)
+ _agent_wakeup.assert_not_called()
+ _pull_conf_settings.assert_not_called()
+ assert not any(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files)
+
+ data_with_config = {
+ 'counter': 7,
+ 'config': {
+ 'unrequired-file': 'unrequired-text'
+ }
+ }
+ data_with_config['config'].update({s: f'{s} text' for s in agent.required_files if s != agent.required_files[2]})
+ agent.mgr_listener.handle_json_payload(data_with_config)
+ _agent_wakeup.assert_called()
+ _pull_conf_settings.assert_called()
+ assert all(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files if s != agent.required_files[2])
+ assert not os.path.exists(os.path.join(AGENT_DIR, agent.required_files[2]))
+ assert not os.path.exists(os.path.join(AGENT_DIR, 'unrequired-file'))
+
+
+@mock.patch("socket.socket")
+@mock.patch("ssl.SSLContext.wrap_socket")
+@mock.patch("cephadm.MgrListener.handle_json_payload")
+@mock.patch("ssl.SSLContext.load_verify_locations")
+@mock.patch("ssl.SSLContext.load_cert_chain")
+def test_mgr_listener_run(_load_cert_chain, _load_verify_locations, _handle_json_payload,
+ _wrap_context, _socket, cephadm_fs):
+
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ cephadm_fs.create_dir(AGENT_DIR)
+
+ payload = json.dumps({'counter': 3,
+ 'config': {s: f'{s} text' for s in agent.required_files if s != agent.required_files[1]}})
+
+ class FakeSocket:
+
+ def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
+ self.family = family
+ self.type = type
+
+ def bind(*args, **kwargs):
+ return
+
+ def settimeout(*args, **kwargs):
+ return
+
+ def listen(*args, **kwargs):
+ return
+
+ class FakeSecureSocket:
+
+ def __init__(self, pload):
+ self.payload = pload
+ self._conn = FakeConn(self.payload)
+ self.accepted = False
+
+ def accept(self):
+ # to make mgr listener run loop stop running,
+ # set it to stop after accepting a "connection"
+ # on our fake socket so only one iteration of the loop
+ # actually happens
+ agent.mgr_listener.stop = True
+ accepted = True
+ return self._conn, None
+
+ def load_cert_chain(*args, **kwargs):
+ return
+
+ def load_verify_locations(*args, **kwargs):
+ return
+
+ class FakeConn:
+
+ def __init__(self, payload: str = ''):
+ payload_len_str = str(len(payload.encode('utf-8')))
+ while len(payload_len_str.encode('utf-8')) < 10:
+ payload_len_str = '0' + payload_len_str
+ self.payload = (payload_len_str + payload).encode('utf-8')
+ self.buffer_len = len(self.payload)
+
+ def recv(self, len: Optional[int] = None):
+ if not len or len >= self.buffer_len:
+ ret = self.payload
+ self.payload = b''
+ self.buffer_len = 0
+ return ret
+ else:
+ ret = self.payload[:len]
+ self.payload = self.payload[len:]
+ self.buffer_len = self.buffer_len - len
+ return ret
+
+ FSS_good_data = FakeSecureSocket(payload)
+ FSS_bad_json = FakeSecureSocket('bad json')
+ _socket = FakeSocket
+ agent.listener_port = 7777
+
+ # first run, should successfully receive properly structured json payload
+ _wrap_context.side_effect = [FSS_good_data]
+ agent.mgr_listener.stop = False
+ FakeConn.send = mock.Mock(return_value=None)
+ agent.mgr_listener.run()
+
+ # verify payload was correctly extracted
+ assert _handle_json_payload.called_with(json.loads(payload))
+ FakeConn.send.assert_called_once_with(b'ACK')
+
+ # second run, with bad json data received
+ _wrap_context.side_effect = [FSS_bad_json]
+ agent.mgr_listener.stop = False
+ FakeConn.send = mock.Mock(return_value=None)
+ agent.mgr_listener.run()
+ FakeConn.send.assert_called_once_with(b'Failed to extract json payload from message: Expecting value: line 1 column 1 (char 0)')
+
+ # third run, no proper length as beginning og payload
+ FSS_no_length = FakeSecureSocket(payload)
+ FSS_no_length.payload = FSS_no_length.payload[10:]
+ FSS_no_length._conn.payload = FSS_no_length._conn.payload[10:]
+ FSS_no_length._conn.buffer_len -= 10
+ _wrap_context.side_effect = [FSS_no_length]
+ agent.mgr_listener.stop = False
+ FakeConn.send = mock.Mock(return_value=None)
+ agent.mgr_listener.run()
+ FakeConn.send.assert_called_once_with(b'Failed to extract length of payload from message: invalid literal for int() with base 10: \'{"counter"\'')
+
+ # some exception handling for full coverage
+ FSS_exc_testing = FakeSecureSocket(payload)
+ FSS_exc_testing.accept = mock.MagicMock()
+
+ def _accept(*args, **kwargs):
+ if not FSS_exc_testing.accepted:
+ FSS_exc_testing.accepted = True
+ raise socket.timeout()
+ else:
+ agent.mgr_listener.stop = True
+ raise Exception()
+
+ FSS_exc_testing.accept.side_effect = _accept
+ _wrap_context.side_effect = [FSS_exc_testing]
+ agent.mgr_listener.stop = False
+ FakeConn.send = mock.Mock(return_value=None)
+ agent.mgr_listener.run()
+ FakeConn.send.assert_not_called()
+ FSS_exc_testing.accept.call_count == 3
+
+
+@mock.patch("cephadm.CephadmAgent._get_ls")
+def test_gatherer_update_func(_get_ls, cephadm_fs):
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ cephadm_fs.create_dir(AGENT_DIR)
+
+ def _sample_func():
+ return 7
+
+ agent.ls_gatherer.func()
+ _get_ls.assert_called()
+
+ _get_ls = mock.MagicMock()
+ agent.ls_gatherer.update_func(_sample_func)
+ out = agent.ls_gatherer.func()
+ assert out == 7
+ _get_ls.assert_not_called()
+
+
+@mock.patch("cephadm.CephadmAgent.wakeup")
+@mock.patch("time.monotonic")
+@mock.patch("threading.Event.wait")
+def test_gatherer_run(_wait, _time, _agent_wakeup, cephadm_fs):
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ cephadm_fs.create_dir(AGENT_DIR)
+ agent.loop_interval = 30
+ agent.ack = 23
+
+ _sample_func = lambda *args, **kwargs: ('sample out', True)
+ agent.ls_gatherer.update_func(_sample_func)
+ agent.ls_gatherer.ack = 20
+ agent.ls_gatherer.stop = False
+
+ def _fake_clear(*args, **kwargs):
+ agent.ls_gatherer.stop = True
+
+ _time.side_effect = [0, 20, 0, 20, 0, 20] # start at time 0, complete at time 20
+ _wait.return_value = None
+
+ with mock.patch("threading.Event.clear") as _clear:
+ _clear.side_effect = _fake_clear
+ agent.ls_gatherer.run()
+
+ _wait.assert_called_with(10) # agent loop_interval - run time
+ assert agent.ls_gatherer.data == 'sample out'
+ assert agent.ls_gatherer.ack == 23
+ _agent_wakeup.assert_called_once()
+ _clear.assert_called_once()
+
+ _exc_func = lambda *args, **kwargs: Exception()
+ agent.ls_gatherer.update_func(_exc_func)
+ agent.ls_gatherer.ack = 20
+ agent.ls_gatherer.stop = False
+
+ with mock.patch("threading.Event.clear") as _clear:
+ _clear.side_effect = _fake_clear
+ agent.ls_gatherer.run()
+ assert agent.ls_gatherer.data is None
+ assert agent.ls_gatherer.ack == agent.ack
+ # should have run full loop despite exception
+ _clear.assert_called_once()
+
+ # test general exception for full coverage
+ _agent_wakeup.side_effect = [Exception()]
+ agent.ls_gatherer.update_func(_sample_func)
+ agent.ls_gatherer.stop = False
+ # just to force only one iteration
+ _time.side_effect = _fake_clear
+ with mock.patch("threading.Event.clear") as _clear:
+ _clear.side_effect = Exception()
+ agent.ls_gatherer.run()
+ assert agent.ls_gatherer.data == 'sample out'
+ assert agent.ls_gatherer.ack == agent.ack
+ # should not have gotten to end of loop
+ _clear.assert_not_called()
+
+
+@mock.patch("cephadm.CephadmAgent.run")
+def test_command_agent(_agent_run, cephadm_fs):
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ ctx.daemon_id = AGENT_ID
+
+ with pytest.raises(Exception, match=f"Agent daemon directory {AGENT_DIR} does not exist. Perhaps agent was never deployed?"):
+ _cephadm.command_agent(ctx)
+
+ cephadm_fs.create_dir(AGENT_DIR)
+ _cephadm.command_agent(ctx)
+ _agent_run.assert_called()