diff options
Diffstat (limited to 'src/cephadm/tests/test_agent.py')
-rw-r--r-- | src/cephadm/tests/test_agent.py | 800 |
1 files changed, 800 insertions, 0 deletions
diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py new file mode 100644 index 000000000..f9cf201e2 --- /dev/null +++ b/src/cephadm/tests/test_agent.py @@ -0,0 +1,800 @@ +from unittest import mock +import copy, datetime, json, os, socket, threading + +import pytest + +from tests.fixtures import with_cephadm_ctx, cephadm_fs, import_cephadm + +from typing import Optional + +_cephadm = import_cephadm() + + +FSID = "beefbeef-beef-beef-1234-beefbeefbeef" +AGENT_ID = 'host1' +AGENT_DIR = f'/var/lib/ceph/{FSID}/agent.{AGENT_ID}' + + +def test_agent_validate(): + required_files = _cephadm.CephadmAgent.required_files + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + for i in range(len(required_files)): + incomplete_files = {s: 'text' for s in [f for j, f in enumerate(required_files) if j != i]} + with pytest.raises(_cephadm.Error, match=f'required file missing from config: {required_files[i]}'): + agent.validate(incomplete_files) + all_files = {s: 'text' for s in required_files} + agent.validate(all_files) + + +def _check_file(path, content): + assert os.path.exists(path) + with open(path) as f: + fcontent = f.read() + assert fcontent == content + + +@mock.patch('cephadm.call_throws') +def test_agent_deploy_daemon_unit(_call_throws, cephadm_fs): + _call_throws.return_value = ('', '', 0) + agent_id = AGENT_ID + + with with_cephadm_ctx([]) as ctx: + ctx.meta_json = json.dumps({'meta': 'data'}) + agent = _cephadm.CephadmAgent(ctx, FSID, agent_id) + cephadm_fs.create_dir(AGENT_DIR) + + with pytest.raises(_cephadm.Error, match='Agent needs a config'): + agent.deploy_daemon_unit() + + config = {s: f'text for {s}' for s in _cephadm.CephadmAgent.required_files} + config['not-required-file.txt'] = 'don\'t write me' + + agent.deploy_daemon_unit(config) + + # check required config file were all created + for fname in _cephadm.CephadmAgent.required_files: + _check_file(f'{AGENT_DIR}/{fname}', f'text for {fname}') + + # assert non-required file was not written + assert not os.path.exists(f'{AGENT_DIR}/not-required-file.txt') + + # check unit.run file was created correctly + _check_file(f'{AGENT_DIR}/unit.run', agent.unit_run()) + + # check unit.meta file created correctly + _check_file(f'{AGENT_DIR}/unit.meta', json.dumps({'meta': 'data'}, indent=4) + '\n') + + # check unit file was created correctly + _check_file(f'{ctx.unit_dir}/{agent.unit_name()}', agent.unit_file()) + + expected_call_throws_calls = [ + mock.call(ctx, ['systemctl', 'daemon-reload']), + mock.call(ctx, ['systemctl', 'enable', '--now', agent.unit_name()]), + ] + _call_throws.assert_has_calls(expected_call_throws_calls) + + expected_call_calls = [ + mock.call(ctx, ['systemctl', 'stop', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG), + mock.call(ctx, ['systemctl', 'reset-failed', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG), + ] + _cephadm.call.assert_has_calls(expected_call_calls) + + +@mock.patch('threading.Thread.is_alive') +def test_agent_shutdown(_is_alive): + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + _is_alive.return_value = True + assert agent.stop == False + assert agent.mgr_listener.stop == False + assert agent.ls_gatherer.stop == False + assert agent.volume_gatherer.stop == False + agent.shutdown() + assert agent.stop == True + assert agent.mgr_listener.stop == True + assert agent.ls_gatherer.stop == True + assert agent.volume_gatherer.stop == True + + +def test_agent_wakeup(): + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + assert agent.event.is_set() == False + agent.wakeup() + assert agent.event.is_set() == True + + +@mock.patch("cephadm.CephadmAgent.shutdown") +@mock.patch("cephadm.AgentGatherer.update_func") +def test_pull_conf_settings(_update_func, _shutdown, cephadm_fs): + target_ip = '192.168.0.0' + target_port = 9876 + refresh_period = 20 + listener_port = 5678 + host = AGENT_ID + device_enhanced_scan = 'True' + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + full_config = { + 'target_ip': target_ip, + 'target_port': target_port, + 'refresh_period': refresh_period, + 'listener_port': listener_port, + 'host': host, + 'device_enhanced_scan': device_enhanced_scan + } + cephadm_fs.create_dir(AGENT_DIR) + with open(agent.config_path, 'w') as f: + f.write(json.dumps(full_config)) + + with pytest.raises(_cephadm.Error, match="Failed to get agent keyring:"): + agent.pull_conf_settings() + _shutdown.assert_called() + with open(agent.keyring_path, 'w') as f: + f.write('keyring') + + assert agent.device_enhanced_scan == False + agent.pull_conf_settings() + assert agent.host == host + assert agent.target_ip == target_ip + assert agent.target_port == target_port + assert agent.loop_interval == refresh_period + assert agent.starting_port == listener_port + assert agent.device_enhanced_scan == True + assert agent.keyring == 'keyring' + _update_func.assert_called() + + full_config.pop('target_ip') + with open(agent.config_path, 'w') as f: + f.write(json.dumps(full_config)) + with pytest.raises(_cephadm.Error, match="Failed to get agent target ip and port from config:"): + agent.pull_conf_settings() + + +@mock.patch("cephadm.command_ceph_volume") +def test_agent_ceph_volume(_ceph_volume): + + def _ceph_volume_outputter(_): + print("ceph-volume output") + + def _ceph_volume_empty(_): + pass + + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + + _ceph_volume.side_effect = _ceph_volume_outputter + out, _ = agent._ceph_volume(False) + assert ctx.command == ['inventory', '--format=json'] + assert out == "ceph-volume output\n" + + out, _ = agent._ceph_volume(True) + assert ctx.command == ['inventory', '--format=json', '--with-lsm'] + assert out == "ceph-volume output\n" + + _ceph_volume.side_effect = _ceph_volume_empty + with pytest.raises(Exception, match='ceph-volume returned empty value'): + out, _ = agent._ceph_volume(False) + + +def test_agent_daemon_ls_subset(cephadm_fs): + # Basing part of this test on some actual sample output + + # Some sample "podman stats --format '{{.ID}},{{.MemUsage}}' --no-stream" output + # 3f2b31d19ecd,456.4MB / 41.96GB + # 5aca2499e0f8,7.082MB / 41.96GB + # fe0cef07d5f7,35.91MB / 41.96GB + + # Sample "podman ps --format '{{.ID}},{{.Names}}' --no-trunc" output with the same containers + # fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mon-host1 + # 3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mgr-host1-pntmho + # 5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-crash-host1 + + # Some of the components from that output + mgr_cid = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f' + mon_cid = 'fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30' + crash_cid = '5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a' + mgr_short_cid = mgr_cid[0:12] + mon_short_cid = mon_cid[0:12] + crash_short_cid = crash_cid[0:12] + + #Rebuilding the output but with our testing FSID and components (to allow alteration later for whatever reason) + mem_out = f"""{mgr_short_cid},456.4MB / 41.96GB +{crash_short_cid},7.082MB / 41.96GB +{mon_short_cid},35.91MB / 41.96GB""" + + ps_out = f"""{mon_cid},ceph-{FSID}-mon-host1 +{mgr_cid},ceph-{FSID}-mgr-host1-pntmho +{crash_cid},ceph-{FSID}-crash-host1""" + + def _fake_call(ctx, cmd, desc=None, verbosity=_cephadm.CallVerbosity.VERBOSE_ON_FAILURE, timeout=_cephadm.DEFAULT_TIMEOUT, **kwargs): + if 'stats' in cmd: + return (mem_out, '', 0) + elif 'ps' in cmd: + return (ps_out, '', 0) + return ('out', 'err', 0) + + cephadm_fs.create_dir(AGENT_DIR) + cephadm_fs.create_dir(f'/var/lib/ceph/mon/ceph-host1') # legacy daemon + cephadm_fs.create_dir(f'/var/lib/ceph/osd/nothing') # improper directory, should be skipped + cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/mgr.host1.pntmho') # cephadm daemon + cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/crash.host1') # cephadm daemon + + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + _cephadm.call.side_effect = _fake_call + daemons = agent._daemon_ls_subset() + + assert 'agent.host1' in daemons + assert 'mgr.host1.pntmho' in daemons + assert 'crash.host1' in daemons + assert 'mon.host1' in daemons + + assert daemons['mon.host1']['style'] == 'legacy' + assert daemons['mgr.host1.pntmho']['style'] == 'cephadm:v1' + assert daemons['crash.host1']['style'] == 'cephadm:v1' + assert daemons['agent.host1']['style'] == 'cephadm:v1' + + assert daemons['mgr.host1.pntmho']['systemd_unit'] == f'ceph-{FSID}@mgr.host1.pntmho' + assert daemons['agent.host1']['systemd_unit'] == f'ceph-{FSID}@agent.host1' + assert daemons['crash.host1']['systemd_unit'] == f'ceph-{FSID}@crash.host1' + + assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid + assert daemons['crash.host1']['container_id'] == crash_cid + + assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB + assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB + + +@mock.patch("cephadm.list_daemons") +@mock.patch("cephadm.CephadmAgent._daemon_ls_subset") +def test_agent_get_ls(_ls_subset, _ls, cephadm_fs): + ls_out = [{ + "style": "cephadm:v1", + "name": "mgr.host1.pntmho", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho", + "enabled": True, + "state": "running", + "service_name": "mgr", + "memory_request": None, + "memory_limit": None, + "ports": [ + 9283, + 8765 + ], + "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f", + "container_image_name": "quay.io/ceph/ceph:testing", + "container_image_id": "3300e39269f0c13ae45026cf233d8b3fff1303d52f2598a69c7fba0bb8405164", + "container_image_digests": [ + "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30" + ], + "memory_usage": 507510784, + "cpu_percentage": "5.95%", + "version": "18.0.0-556-gb4d1a199", + "started": "2022-10-27T14:19:36.086664Z", + "created": "2022-10-27T14:19:36.282281Z", + "deployed": "2022-10-27T14:19:35.377275Z", + "configured": "2022-10-27T14:22:40.316912Z" + },{ + "style": "cephadm:v1", + "name": "agent.host1", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@agent.host1", + "enabled": True, + "state": "running", + "service_name": "agent", + "ports": [], + "ip": None, + "deployed_by": [ + "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30" + ], + "rank": None, + "rank_generation": None, + "extra_container_args": None, + "container_id": None, + "container_image_name": None, + "container_image_id": None, + "container_image_digests": None, + "version": None, + "started": None, + "created": "2022-10-27T19:46:49.751594Z", + "deployed": None, + "configured": "2022-10-27T19:46:49.751594Z" + }, { + "style": "legacy", + "name": "mon.host1", + "fsid": FSID, + "systemd_unit": "ceph-mon@host1", + "enabled": False, + "state": "stopped", + "host_version": None + }] + + ls_subset_out = { + 'mgr.host1.pntmho': { + "style": "cephadm:v1", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho", + "enabled": True, + "state": "running", + "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f", + "memory_usage": 507510784, + }, + 'agent.host1': { + "style": "cephadm:v1", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@agent.host1", + "enabled": True, + "state": "running", + "container_id": None + }, 'mon.host1': { + "style": "legacy", + "name": "mon.host1", + "fsid": FSID, + "systemd_unit": "ceph-mon@host1", + "enabled": False, + "state": "stopped", + "host_version": None + }} + + _ls.return_value = ls_out + _ls_subset.return_value = ls_subset_out + + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + + # first pass, no cached daemon metadata + daemons, changed = agent._get_ls() + assert daemons == ls_out + assert changed + + # second pass, should recognize that daemons have not changed and just keep cached values + daemons, changed = agent._get_ls() + assert daemons == daemons + assert not changed + + # change a container id so it needs to get more info + ls_subset_out2 = copy.deepcopy(ls_subset_out) + ls_out2 = copy.deepcopy(ls_out) + ls_subset_out2['mgr.host1.pntmho']['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa' + ls_out2[0]['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa' + _ls.return_value = ls_out2 + _ls_subset.return_value = ls_subset_out2 + assert agent.cached_ls_values['mgr.host1.pntmho']['container_id'] == "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f" + daemons, changed = agent._get_ls() + assert daemons == ls_out2 + assert changed + + # run again with the same data so it should use cached values + daemons, changed = agent._get_ls() + assert daemons == ls_out2 + assert not changed + + # change the state of a container so new daemon metadata is needed + ls_subset_out3 = copy.deepcopy(ls_subset_out2) + ls_out3 = copy.deepcopy(ls_out2) + ls_subset_out3['mgr.host1.pntmho']['enabled'] = False + ls_out3[0]['enabled'] = False + _ls.return_value = ls_out3 + _ls_subset.return_value = ls_subset_out3 + assert agent.cached_ls_values['mgr.host1.pntmho']['enabled'] == True + daemons, changed = agent._get_ls() + assert daemons == ls_out3 + assert changed + + # run again with the same data so it should use cached values + daemons, changed = agent._get_ls() + assert daemons == ls_out3 + assert not changed + + # remove a daemon so new metadats is needed + ls_subset_out4 = copy.deepcopy(ls_subset_out3) + ls_out4 = copy.deepcopy(ls_out3) + ls_subset_out4.pop('mon.host1') + ls_out4.pop() + _ls.return_value = ls_out4 + _ls_subset.return_value = ls_subset_out4 + assert 'mon.host1' in agent.cached_ls_values + daemons, changed = agent._get_ls() + assert daemons == ls_out4 + assert changed + + # run again with the same data so it should use cached values + daemons, changed = agent._get_ls() + assert daemons == ls_out4 + assert not changed + + +@mock.patch("threading.Event.clear") +@mock.patch("threading.Event.wait") +@mock.patch("urllib.request.Request.__init__") +@mock.patch("cephadm.urlopen") +@mock.patch("cephadm.list_networks") +@mock.patch("cephadm.HostFacts.dump") +@mock.patch("cephadm.HostFacts.__init__", lambda _, __: None) +@mock.patch("ssl.SSLContext.load_verify_locations") +@mock.patch("threading.Thread.is_alive") +@mock.patch("cephadm.MgrListener.start") +@mock.patch("cephadm.AgentGatherer.start") +@mock.patch("cephadm.port_in_use") +@mock.patch("cephadm.CephadmAgent.pull_conf_settings") +def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, + _listener_start, _is_alive, _load_verify_locations, + _HF_dump, _list_networks, _urlopen, _RQ_init, _wait, _clear): + target_ip = '192.168.0.0' + target_port = '9999' + refresh_period = 20 + listener_port = 7770 + open_listener_port = 7777 + host = AGENT_ID + device_enhanced_scan = False + + def _fake_port_in_use(ctx, endpoint): + if endpoint.port == open_listener_port: + return False + return True + + network_data: Dict[str, Dict[str, Set[str]]] = { + "10.2.1.0/24": { + "eth1": set(["10.2.1.122"]) + }, + "192.168.122.0/24": { + "eth0": set(["192.168.122.221"]) + }, + "fe80::/64": { + "eth0": set(["fe80::5054:ff:fe3f:d94e"]), + "eth1": set(["fe80::5054:ff:fe3f:aa4a"]), + } + } + + # the json serializable version of the networks data + # we expect the agent to actually send + network_data_no_sets: Dict[str, Dict[str, List[str]]] = { + "10.2.1.0/24": { + "eth1": ["10.2.1.122"] + }, + "192.168.122.0/24": { + "eth0": ["192.168.122.221"] + }, + "fe80::/64": { + "eth0": ["fe80::5054:ff:fe3f:d94e"], + "eth1": ["fe80::5054:ff:fe3f:aa4a"], + } + } + + class FakeHTTPResponse(): + def __init__(self): + pass + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + pass + + def read(self): + return json.dumps({'valid': 'output', 'result': '400'}) + + _port_in_use.side_effect = _fake_port_in_use + _is_alive.return_value = False + _HF_dump.return_value = 'Host Facts' + _list_networks.return_value = network_data + _urlopen.side_effect = lambda *args, **kwargs: FakeHTTPResponse() + _RQ_init.side_effect = lambda *args, **kwargs: None + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + agent.keyring = 'agent keyring' + agent.ack = 7 + agent.volume_gatherer.ack = 7 + agent.volume_gatherer.data = 'ceph-volume inventory data' + agent.ls_gatherer.ack = 7 + agent.ls_gatherer.data = [{'valid_daemon': 'valid_metadata'}] + + def _set_conf(): + agent.target_ip = target_ip + agent.target_port = target_port + agent.loop_interval = refresh_period + agent.starting_port = listener_port + agent.host = host + agent.device_enhanced_scan = device_enhanced_scan + _pull_conf_settings.side_effect = _set_conf + + # technically the run function loops forever unless the agent + # is told to stop. To get around that we're going to have the + # event.wait() (which happens at the end of the loop) to throw + # a special exception type. If we catch this exception we can + # consider it as being a "success" run + class EventCleared(Exception): + pass + + _clear.side_effect = EventCleared('SUCCESS') + with pytest.raises(EventCleared, match='SUCCESS'): + agent.run() + + expected_data = { + 'host': host, + 'ls': [{'valid_daemon': 'valid_metadata'}], + 'networks': network_data_no_sets, + 'facts': 'Host Facts', + 'volume': 'ceph-volume inventory data', + 'ack': str(7), + 'keyring': 'agent keyring', + 'port': str(open_listener_port) + } + _RQ_init.assert_called_with( + f'https://{target_ip}:{target_port}/data/', + json.dumps(expected_data).encode('ascii'), + {'Content-Type': 'application/json'} + ) + _listener_start.assert_called() + _gatherer_start.assert_called() + _urlopen.assert_called() + + # agent should not go down if connections fail + _urlopen.side_effect = Exception() + with pytest.raises(EventCleared, match='SUCCESS'): + agent.run() + + # should fail if no ports are open for listener + _port_in_use.side_effect = lambda _, __: True + agent.listener_port = None + with pytest.raises(Exception, match='Failed to pick port for agent to listen on: All 1000 ports starting at 7770 taken.'): + agent.run() + + +@mock.patch("cephadm.CephadmAgent.pull_conf_settings") +@mock.patch("cephadm.CephadmAgent.wakeup") +def test_mgr_listener_handle_json_payload(_agent_wakeup, _pull_conf_settings, cephadm_fs): + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + cephadm_fs.create_dir(AGENT_DIR) + + data_no_config = { + 'counter': 7 + } + agent.mgr_listener.handle_json_payload(data_no_config) + _agent_wakeup.assert_not_called() + _pull_conf_settings.assert_not_called() + assert not any(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files) + + data_with_config = { + 'counter': 7, + 'config': { + 'unrequired-file': 'unrequired-text' + } + } + data_with_config['config'].update({s: f'{s} text' for s in agent.required_files if s != agent.required_files[2]}) + agent.mgr_listener.handle_json_payload(data_with_config) + _agent_wakeup.assert_called() + _pull_conf_settings.assert_called() + assert all(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files if s != agent.required_files[2]) + assert not os.path.exists(os.path.join(AGENT_DIR, agent.required_files[2])) + assert not os.path.exists(os.path.join(AGENT_DIR, 'unrequired-file')) + + +@mock.patch("socket.socket") +@mock.patch("ssl.SSLContext.wrap_socket") +@mock.patch("cephadm.MgrListener.handle_json_payload") +@mock.patch("ssl.SSLContext.load_verify_locations") +@mock.patch("ssl.SSLContext.load_cert_chain") +def test_mgr_listener_run(_load_cert_chain, _load_verify_locations, _handle_json_payload, + _wrap_context, _socket, cephadm_fs): + + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + cephadm_fs.create_dir(AGENT_DIR) + + payload = json.dumps({'counter': 3, + 'config': {s: f'{s} text' for s in agent.required_files if s != agent.required_files[1]}}) + + class FakeSocket: + + def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None): + self.family = family + self.type = type + + def bind(*args, **kwargs): + return + + def settimeout(*args, **kwargs): + return + + def listen(*args, **kwargs): + return + + class FakeSecureSocket: + + def __init__(self, pload): + self.payload = pload + self._conn = FakeConn(self.payload) + self.accepted = False + + def accept(self): + # to make mgr listener run loop stop running, + # set it to stop after accepting a "connection" + # on our fake socket so only one iteration of the loop + # actually happens + agent.mgr_listener.stop = True + accepted = True + return self._conn, None + + def load_cert_chain(*args, **kwargs): + return + + def load_verify_locations(*args, **kwargs): + return + + class FakeConn: + + def __init__(self, payload: str = ''): + payload_len_str = str(len(payload.encode('utf-8'))) + while len(payload_len_str.encode('utf-8')) < 10: + payload_len_str = '0' + payload_len_str + self.payload = (payload_len_str + payload).encode('utf-8') + self.buffer_len = len(self.payload) + + def recv(self, len: Optional[int] = None): + if not len or len >= self.buffer_len: + ret = self.payload + self.payload = b'' + self.buffer_len = 0 + return ret + else: + ret = self.payload[:len] + self.payload = self.payload[len:] + self.buffer_len = self.buffer_len - len + return ret + + FSS_good_data = FakeSecureSocket(payload) + FSS_bad_json = FakeSecureSocket('bad json') + _socket = FakeSocket + agent.listener_port = 7777 + + # first run, should successfully receive properly structured json payload + _wrap_context.side_effect = [FSS_good_data] + agent.mgr_listener.stop = False + FakeConn.send = mock.Mock(return_value=None) + agent.mgr_listener.run() + + # verify payload was correctly extracted + assert _handle_json_payload.called_with(json.loads(payload)) + FakeConn.send.assert_called_once_with(b'ACK') + + # second run, with bad json data received + _wrap_context.side_effect = [FSS_bad_json] + agent.mgr_listener.stop = False + FakeConn.send = mock.Mock(return_value=None) + agent.mgr_listener.run() + FakeConn.send.assert_called_once_with(b'Failed to extract json payload from message: Expecting value: line 1 column 1 (char 0)') + + # third run, no proper length as beginning og payload + FSS_no_length = FakeSecureSocket(payload) + FSS_no_length.payload = FSS_no_length.payload[10:] + FSS_no_length._conn.payload = FSS_no_length._conn.payload[10:] + FSS_no_length._conn.buffer_len -= 10 + _wrap_context.side_effect = [FSS_no_length] + agent.mgr_listener.stop = False + FakeConn.send = mock.Mock(return_value=None) + agent.mgr_listener.run() + FakeConn.send.assert_called_once_with(b'Failed to extract length of payload from message: invalid literal for int() with base 10: \'{"counter"\'') + + # some exception handling for full coverage + FSS_exc_testing = FakeSecureSocket(payload) + FSS_exc_testing.accept = mock.MagicMock() + + def _accept(*args, **kwargs): + if not FSS_exc_testing.accepted: + FSS_exc_testing.accepted = True + raise socket.timeout() + else: + agent.mgr_listener.stop = True + raise Exception() + + FSS_exc_testing.accept.side_effect = _accept + _wrap_context.side_effect = [FSS_exc_testing] + agent.mgr_listener.stop = False + FakeConn.send = mock.Mock(return_value=None) + agent.mgr_listener.run() + FakeConn.send.assert_not_called() + FSS_exc_testing.accept.call_count == 3 + + +@mock.patch("cephadm.CephadmAgent._get_ls") +def test_gatherer_update_func(_get_ls, cephadm_fs): + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + cephadm_fs.create_dir(AGENT_DIR) + + def _sample_func(): + return 7 + + agent.ls_gatherer.func() + _get_ls.assert_called() + + _get_ls = mock.MagicMock() + agent.ls_gatherer.update_func(_sample_func) + out = agent.ls_gatherer.func() + assert out == 7 + _get_ls.assert_not_called() + + +@mock.patch("cephadm.CephadmAgent.wakeup") +@mock.patch("time.monotonic") +@mock.patch("threading.Event.wait") +def test_gatherer_run(_wait, _time, _agent_wakeup, cephadm_fs): + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + cephadm_fs.create_dir(AGENT_DIR) + agent.loop_interval = 30 + agent.ack = 23 + + _sample_func = lambda *args, **kwargs: ('sample out', True) + agent.ls_gatherer.update_func(_sample_func) + agent.ls_gatherer.ack = 20 + agent.ls_gatherer.stop = False + + def _fake_clear(*args, **kwargs): + agent.ls_gatherer.stop = True + + _time.side_effect = [0, 20, 0, 20, 0, 20] # start at time 0, complete at time 20 + _wait.return_value = None + + with mock.patch("threading.Event.clear") as _clear: + _clear.side_effect = _fake_clear + agent.ls_gatherer.run() + + _wait.assert_called_with(10) # agent loop_interval - run time + assert agent.ls_gatherer.data == 'sample out' + assert agent.ls_gatherer.ack == 23 + _agent_wakeup.assert_called_once() + _clear.assert_called_once() + + _exc_func = lambda *args, **kwargs: Exception() + agent.ls_gatherer.update_func(_exc_func) + agent.ls_gatherer.ack = 20 + agent.ls_gatherer.stop = False + + with mock.patch("threading.Event.clear") as _clear: + _clear.side_effect = _fake_clear + agent.ls_gatherer.run() + assert agent.ls_gatherer.data is None + assert agent.ls_gatherer.ack == agent.ack + # should have run full loop despite exception + _clear.assert_called_once() + + # test general exception for full coverage + _agent_wakeup.side_effect = [Exception()] + agent.ls_gatherer.update_func(_sample_func) + agent.ls_gatherer.stop = False + # just to force only one iteration + _time.side_effect = _fake_clear + with mock.patch("threading.Event.clear") as _clear: + _clear.side_effect = Exception() + agent.ls_gatherer.run() + assert agent.ls_gatherer.data == 'sample out' + assert agent.ls_gatherer.ack == agent.ack + # should not have gotten to end of loop + _clear.assert_not_called() + + +@mock.patch("cephadm.CephadmAgent.run") +def test_command_agent(_agent_run, cephadm_fs): + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + ctx.daemon_id = AGENT_ID + + with pytest.raises(Exception, match=f"Agent daemon directory {AGENT_DIR} does not exist. Perhaps agent was never deployed?"): + _cephadm.command_agent(ctx) + + cephadm_fs.create_dir(AGENT_DIR) + _cephadm.command_agent(ctx) + _agent_run.assert_called() |