From d835b2cae8abc71958b69362162e6a70c3d7ef63 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 08:48:59 +0200 Subject: Adding upstream version 4.6.0. Signed-off-by: Daniel Baumann --- test/unittests/test_crashtest_task.py | 777 ++++++++++++++++++++++++++++++++++ 1 file changed, 777 insertions(+) create mode 100644 test/unittests/test_crashtest_task.py (limited to 'test/unittests/test_crashtest_task.py') diff --git a/test/unittests/test_crashtest_task.py b/test/unittests/test_crashtest_task.py new file mode 100644 index 0000000..3b4c092 --- /dev/null +++ b/test/unittests/test_crashtest_task.py @@ -0,0 +1,777 @@ +import os +import sys + +try: + from unittest import mock, TestCase +except ImportError: + import mock +from datetime import datetime + +from crmsh import utils as crmshutils +from crmsh.crash_test import utils, main, config, task + + +class TestTaskKill(TestCase): + + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + @mock.patch('crmsh.crash_test.utils.msg_info') + def setUp(self, mock_msg_info): + """ + Test setUp. + """ + ctx = mock.Mock(current_case="sbd", loop=False) + self.task_kill_inst = task.TaskKill(ctx) + ctx2 = mock.Mock(current_case="sbd", loop=True) + self.task_kill_inst_loop = task.TaskKill(ctx2) + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + @mock.patch('os.path.isdir') + def test_enable_report_error(self, mock_isdir): + mock_isdir.return_value = False + main.ctx = mock.Mock(report_path="/path") + with self.assertRaises(task.TaskError) as error: + self.task_kill_inst.enable_report() + self.assertEqual("/path is not a directory", str(error.exception)) + mock_isdir.assert_called_once_with("/path") + + @mock.patch('crmsh.crash_test.utils.this_node') + @mock.patch('crmsh.crash_test.utils.now') + @mock.patch('os.path.isdir') + def test_enable_report_looping(self, mock_isdir, mock_now, mock_this_node): + main.ctx = mock.Mock(report_path="/path", process_name="cpc") + mock_now.return_value = "20210119-12345" + mock_this_node.return_value = "node1" + self.task_kill_inst_loop.enable_report() + + @mock.patch('crmsh.crash_test.utils.this_node') + @mock.patch('crmsh.crash_test.utils.now') + @mock.patch('os.path.isdir') + def test_enable_report(self, mock_isdir, mock_now, mock_this_node): + main.ctx = mock.Mock(report_path="/path", process_name="cpc") + mock_now.return_value = "20210119-12345" + mock_this_node.return_value = "node1" + self.task_kill_inst.enable_report() + + def test_header(self): + expected_res = """============================================== +Testcase: Force kill sbd +Looping Kill: False +Expected State: a) sbd process restarted + b) Or, this node fenced. +""" + res = self.task_kill_inst.header() + self.assertEqual(res, expected_res) + + @mock.patch('crmsh.crash_test.utils.json_dumps') + def test_to_json(self, mock_dumps): + self.task_kill_inst.build_base_result = mock.Mock() + self.task_kill_inst.result = {} + self.task_kill_inst.prev_task_list = [] + self.task_kill_inst.to_json() + self.task_kill_inst.build_base_result.assert_called_once_with() + mock_dumps.assert_called_once_with() + + def test_to_report_return(self): + self.task_kill_inst.report = False + self.task_kill_inst.to_report() + + @mock.patch('os.fsync') + @mock.patch('builtins.open', create=True) + @mock.patch('crmsh.crash_test.task.TaskKill.header') + def test_to_report(self, mock_header, mock_open_file, mock_fsync): + mock_header.return_value = "#### header" + self.task_kill_inst.report = True + self.task_kill_inst.messages = [["info", "data", "2021"]] + self.task_kill_inst.explain = "explain" + self.task_kill_inst.report_file = "report_file1" + file_handle = mock_open_file.return_value.__enter__.return_value + + self.task_kill_inst.to_report() + + file_handle.write.assert_has_calls([ + mock.call("#### header"), + mock.call("\nLog:\n"), + mock.call("2021 INFO:data\n"), + mock.call("\nTestcase Explained:\n"), + mock.call("explain\n") + ]) + + @mock.patch('crmsh.crash_test.utils.get_process_status') + @mock.patch('crmsh.crash_test.task.Task.task_pre_check') + def test_pre_check(self, mock_pre_check, mock_status): + mock_status.return_value = (False, 100) + with self.assertRaises(task.TaskError) as err: + self.task_kill_inst.pre_check() + self.assertEqual("Process sbd is not running!", str(err.exception)) + mock_pre_check.assert_called_once_with() + mock_status.assert_called_once_with("sbd") + + @mock.patch('crmsh.crash_test.task.TaskKill.process_monitor') + @mock.patch('crmsh.crash_test.task.Task.fence_action_monitor') + @mock.patch('threading.Thread') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.Task.info') + @mock.patch('crmsh.crash_test.utils.get_process_status') + def test_run(self, mock_status, mock_info, mock_run, mock_thread, mock_fence_monitor, mock_process_monitor): + mock_status.side_effect = [(False, -1), (True, 100)] + mock_thread_fence_inst = mock.Mock() + mock_thread_restart_inst = mock.Mock() + mock_thread.side_effect = [mock_thread_fence_inst, mock_thread_restart_inst] + + self.task_kill_inst.run() + + mock_status.assert_has_calls([mock.call("sbd"), mock.call("sbd")]) + mock_info.assert_has_calls([ + mock.call('Process sbd(100) is running...'), + mock.call('Trying to run "killall -9 sbd"') + ]) + mock_run.assert_called_once_with("killall -9 sbd") + mock_thread.assert_has_calls([ + mock.call(target=mock_fence_monitor), + mock.call(target=mock_process_monitor), + ]) + mock_thread_fence_inst.start.assert_called_once_with() + mock_thread_restart_inst.start.assert_called_once_with() + + def test_wait_exception(self): + self.task_kill_inst.fence_start_event = mock.Mock() + self.task_kill_inst.restart_happen_event = mock.Mock() + self.task_kill_inst.thread_stop_event = mock.Mock() + self.task_kill_inst.fence_start_event.wait.return_value = True + self.task_kill_inst.restart_happen_event.is_set.return_value = False + + with self.assertRaises(task.TaskError) as err: + self.task_kill_inst.wait() + self.assertEqual("Process sbd is not restarted!", str(err.exception)) + + def test_wait(self): + self.task_kill_inst.fence_start_event = mock.Mock() + self.task_kill_inst.restart_happen_event = mock.Mock() + self.task_kill_inst.thread_stop_event = mock.Mock() + self.task_kill_inst.fence_start_event.wait.return_value = True + self.task_kill_inst.restart_happen_event.is_set.return_value = True + + self.task_kill_inst.wait() + + self.task_kill_inst.thread_stop_event.set.assert_called_once_with() + + @mock.patch('time.sleep') + @mock.patch('crmsh.crash_test.task.Task.info') + @mock.patch('crmsh.crash_test.utils.get_process_status') + def test_process_monitor(self, mock_status, mock_info, mock_sleep): + self.task_kill_inst.thread_stop_event = mock.Mock() + self.task_kill_inst.thread_stop_event.is_set.side_effect = [False, False] + self.task_kill_inst.restart_happen_event = mock.Mock() + mock_status.side_effect = [(False, -1), (True, 100)] + + self.task_kill_inst.process_monitor() + + self.task_kill_inst.thread_stop_event.is_set.assert_has_calls([ + mock.call(), + mock.call() + ]) + mock_status.assert_has_calls([ + mock.call("sbd"), + mock.call("sbd") + ]) + mock_info.assert_called_once_with("Process sbd(100) is restarted!") + self.task_kill_inst.restart_happen_event.set.assert_called_once_with() + mock_sleep.assert_called_once_with(1) + + +class TestTaskCheck(TestCase): + + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + @mock.patch('crmsh.crash_test.utils.msg_info') + @mock.patch('crmsh.crash_test.utils.now') + def setUp(self, mock_now, mock_msg_info): + """ + Test setUp. + """ + mock_now.return_value = "2019/07/10 01:15:15" + main.ctx = mock.Mock(task_list=[{"process_name": "xin", "age": 38}]) + self.task_check_inst = task.TaskCheck("task check job1", quiet=False) + self.task_check_inst_quiet = task.TaskCheck("task check job1", quiet=True) + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + @mock.patch('crmsh.crash_test.utils.MyLoggingFormatter') + @mock.patch('crmsh.crash_test.utils.get_handler') + @mock.patch('crmsh.crash_test.utils.manage_handler') + def test_to_stdout(self, mock_manage_handler, mock_get_handler, mock_myformatter): + mock_manage_handler.return_value.__enter__ = mock.Mock() + mock_manage_handler.return_value.__exit__ = mock.Mock() + + task.logger = mock.Mock() + task.logger.info = mock.Mock() + task.logger.log = mock.Mock() + + get_handler_inst1 = mock.Mock() + get_handler_inst1.setFormatter = mock.Mock() + get_handler_inst2 = mock.Mock() + get_handler_inst2.setFormatter = mock.Mock() + mock_get_handler.side_effect = [get_handler_inst1, get_handler_inst2] + + myformatter_inst1 = mock.Mock() + myformatter_inst2 = mock.Mock() + mock_myformatter.side_effect = [myformatter_inst1, myformatter_inst2] + + self.task_check_inst.messages = [("info", "info message"), ("warn", "warn message")] + utils.CGREEN = "" + utils.CEND = "" + utils.CRED = "" + + self.task_check_inst.to_stdout() + + mock_manage_handler.assert_called_once_with("file", keep=False) + mock_get_handler.assert_has_calls([ + mock.call(task.logger, "stream"), + mock.call(task.logger, "stream") + ]) + get_handler_inst1.setFormatter.assert_called_once_with(myformatter_inst1) + get_handler_inst2.setFormatter.assert_called_once_with(myformatter_inst2) + mock_myformatter.assert_has_calls([ + mock.call(flush=False), + mock.call() + ]) + task.logger.info.assert_called_once_with('task check job1 [Pass]', extra={'timestamp': '[2019/07/10 01:15:15]'}) + task.logger.log.assert_has_calls([ + mock.call(20, 'info message', extra={'timestamp': ' '}). + mock.call(30, 'warn message', extra={'timestamp': ' '}) + ]) + + @mock.patch('crmsh.crash_test.utils.json_dumps') + def test_to_json(self, mock_dumps): + self.task_check_inst.build_base_result = mock.Mock() + self.task_check_inst.result = {} + self.task_check_inst.to_json() + self.task_check_inst.build_base_result.assert_called_once_with() + mock_dumps.assert_called_once_with() + + def test_print_result(self): + self.task_check_inst.to_stdout = mock.Mock() + self.task_check_inst.to_json = mock.Mock() + self.task_check_inst.print_result() + self.task_check_inst.to_stdout.assert_called_once_with() + self.task_check_inst.to_json.assert_called_once_with() + + def test_print_result_quiet(self): + self.task_check_inst.quiet = True + self.task_check_inst.to_stdout = mock.Mock() + self.task_check_inst.print_result() + self.task_check_inst.to_stdout.assert_not_called() + + def test_run(self): + self.task_check_inst.print_result = mock.Mock() + with self.task_check_inst.run(): + pass + self.task_check_inst.print_result.assert_called_once_with() + + +class TestTaskSplitBrain(TestCase): + + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + @mock.patch('crmsh.crash_test.utils.msg_info') + def setUp(self, mock_msg_info): + """ + Test setUp. + """ + self.task_sp_inst = task.TaskSplitBrain() + self.task_sp_inst.fence_action = "reboot" + self.task_sp_inst.fence_timeout = 60 + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + def test_header(self): + expected_res = """============================================== +Testcase: Simulate split brain by blocking traffic between cluster nodes +Expected Result: One of nodes get fenced +Fence action: reboot +Fence timeout: 60 +""" + res = self.task_sp_inst.header() + self.assertEqual(res, expected_res) + + @mock.patch('crmsh.crash_test.utils.json_dumps') + @mock.patch('crmsh.crash_test.task.Task.build_base_result') + def test_to_json(self, mock_result, mock_json): + self.task_sp_inst.result = {} + self.task_sp_inst.to_json() + mock_result.assert_called_once_with() + mock_json.assert_called_once_with() + + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.Task.task_pre_check') + def test_pre_check_no_cmd(self, mock_pre_check, mock_run): + mock_run.return_value = (1, None, "error") + with self.assertRaises(task.TaskError) as err: + self.task_sp_inst.pre_check() + self.assertEqual("error", str(err.exception)) + mock_run.assert_called_once_with("which iptables") + mock_pre_check.assert_called_once_with() + + @mock.patch('crmsh.crash_test.utils.online_nodes') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.Task.task_pre_check') + def test_pre_check_error(self, mock_pre_check, mock_run, mock_online_nodes): + mock_run.return_value = (0, None, None) + mock_online_nodes.return_value = ["node1"] + with self.assertRaises(task.TaskError) as err: + self.task_sp_inst.pre_check() + self.assertEqual("At least two nodes online!", str(err.exception)) + mock_run.assert_called_once_with("which iptables") + mock_online_nodes.assert_called_once_with() + + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.crmshutils.get_iplist_from_name') + @mock.patch('crmsh.crash_test.task.Task.info') + @mock.patch('crmsh.crash_test.utils.peer_node_list') + def test_do_block_iptables(self, mock_peer_list, mock_info, mock_get_iplist, mock_run): + mock_peer_list.return_value = ["node1", "node2"] + mock_get_iplist.side_effect = [["10.10.10.1", "20.20.20.1"], ["10.10.10.2", "20.20.20.2"]] + self.task_sp_inst.do_block_iptables() + mock_peer_list.assert_called_once_with() + mock_info.assert_has_calls([ + mock.call("Trying to temporarily block node1 communication ip"), + mock.call("Trying to temporarily block node2 communication ip") + ]) + mock_get_iplist.assert_has_calls([ + mock.call("node1"), + mock.call("node2") + ]) + mock_run.assert_has_calls([ + mock.call(config.BLOCK_IP.format(action='I', peer_ip="10.10.10.1")), + mock.call(config.BLOCK_IP.format(action='I', peer_ip="20.20.20.1")), + mock.call(config.BLOCK_IP.format(action='I', peer_ip="10.10.10.2")), + mock.call(config.BLOCK_IP.format(action='I', peer_ip="20.20.20.2")) + ]) + + @mock.patch('crmsh.crash_test.task.TaskSplitBrain.un_block_iptables') + def test_un_block(self, mock_unblock_iptables): + self.task_sp_inst.un_block() + mock_unblock_iptables.assert_called_once_with() + + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.crmshutils.get_iplist_from_name') + @mock.patch('crmsh.crash_test.task.Task.info') + def test_un_block_iptables(self, mock_info, mock_get_iplist, mock_run): + mock_get_iplist.side_effect = [["10.10.10.1", "20.20.20.1"], ["10.10.10.2", "20.20.20.2"]] + self.task_sp_inst.peer_nodelist = ["node1", "node2"] + self.task_sp_inst.un_block_iptables() + mock_info.assert_has_calls([ + mock.call("Trying to recover node1 communication ip"), + mock.call("Trying to recover node2 communication ip") + ]) + mock_get_iplist.assert_has_calls([ + mock.call("node1"), + mock.call("node2") + ]) + mock_run.assert_has_calls([ + mock.call(config.BLOCK_IP.format(action='D', peer_ip="10.10.10.1")), + mock.call(config.BLOCK_IP.format(action='D', peer_ip="20.20.20.1")), + mock.call(config.BLOCK_IP.format(action='D', peer_ip="10.10.10.2")), + mock.call(config.BLOCK_IP.format(action='D', peer_ip="20.20.20.2")) + ]) + + @mock.patch('crmsh.crash_test.task.Task.fence_action_monitor') + @mock.patch('threading.Thread') + def test_run(self, mock_thread, mock_monitor): + mock_thread_inst = mock.Mock() + mock_thread.return_value = mock_thread_inst + self.task_sp_inst.run() + mock_thread.assert_called_once_with(target=mock_monitor) + mock_thread_inst.start.assert_called_once_with() + + def test_wait(self): + self.task_sp_inst.fence_finish_event = mock.Mock() + self.task_sp_inst.fence_finish_event.wait.return_value = False + self.task_sp_inst.thread_stop_event = mock.Mock() + self.task_sp_inst.wait() + self.task_sp_inst.fence_finish_event.wait.assert_called_once_with(60) + self.task_sp_inst.thread_stop_event.set.assert_called_once_with() + + +class TestFence(TestCase): + + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + @mock.patch('crmsh.crash_test.utils.msg_info') + def setUp(self, mock_msg_info): + """ + Test setUp. + """ + ctx = mock.Mock(fence_node="node1", yes=False) + self.task_fence_inst = task.TaskFence(ctx) + self.task_fence_inst.fence_action = "reboot" + self.task_fence_inst.fence_timeout = 60 + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + def test_header(self): + expected_res = """============================================== +Testcase: Fence node node1 +Fence action: reboot +Fence timeout: 60 +""" + res = self.task_fence_inst.header() + self.assertEqual(res, expected_res) + + @mock.patch('crmsh.crash_test.utils.json_dumps') + @mock.patch('crmsh.crash_test.task.Task.build_base_result') + def test_to_json(self, mock_result, mock_json): + self.task_fence_inst.result = {} + self.task_fence_inst.to_json() + mock_result.assert_called_once_with() + mock_json.assert_called_once_with() + + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.Task.task_pre_check') + def test_pre_check_no_cmd(self, mock_pre_check, mock_run): + mock_run.return_value = (1, None, "error") + with self.assertRaises(task.TaskError) as err: + self.task_fence_inst.pre_check() + self.assertEqual("error", str(err.exception)) + mock_run.assert_called_once_with("which crm_node") + mock_pre_check.assert_called_once_with() + + @mock.patch('crmsh.crash_test.utils.check_node_status') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.Task.task_pre_check') + def test_pre_check_error(self, mock_pre_check, mock_run, mock_node_status): + mock_run.side_effect = [(0, None, None), (0, None, None), (0, None, None)] + mock_node_status.return_value = False + with self.assertRaises(task.TaskError) as err: + self.task_fence_inst.pre_check() + self.assertEqual("Node \"node1\" not in cluster!", str(err.exception)) + mock_run.assert_has_calls([ + mock.call("which crm_node"), + mock.call("which stonith_admin"), + mock.call("which crm_attribute") + ]) + mock_node_status.assert_called_once_with("node1", "member") + + @mock.patch('crmsh.crash_test.task.Task.fence_action_monitor') + @mock.patch('threading.Thread') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + @mock.patch('crmsh.crash_test.task.Task.info') + def test_run(self, mock_info, mock_run, mock_thread, mock_monitor): + mock_thread_inst = mock.Mock() + mock_thread.return_value = mock_thread_inst + self.task_fence_inst.run() + mock_info.assert_called_once_with("Trying to fence node \"node1\"") + mock_run.assert_called_once_with("crm_attribute -t status -N 'node1' -n terminate -v true") + mock_thread.assert_called_once_with(target=mock_monitor) + mock_thread_inst.start.assert_called_once_with() + + @mock.patch('crmsh.crash_test.task.Task.info') + @mock.patch('crmsh.crash_test.utils.this_node') + def test_wait_this_node(self, mock_this_node, mock_info): + mock_this_node.return_value = "node1" + self.task_fence_inst.fence_finish_event = mock.Mock() + self.task_fence_inst.thread_stop_event = mock.Mock() + self.task_fence_inst.fence_finish_event.wait.return_value = True + + self.task_fence_inst.wait() + + mock_this_node.assert_called_once_with() + mock_info.assert_called_once_with("Waiting 60s for self reboot...") + self.task_fence_inst.fence_finish_event.wait.assert_called_once_with(60) + + @mock.patch('crmsh.crash_test.task.Task.info') + @mock.patch('crmsh.crash_test.utils.this_node') + def test_wait(self, mock_this_node, mock_info): + mock_this_node.return_value = "node2" + self.task_fence_inst.fence_finish_event = mock.Mock() + self.task_fence_inst.thread_stop_event = mock.Mock() + self.task_fence_inst.fence_finish_event.wait.return_value = None + + with self.assertRaises(task.TaskError) as err: + self.task_fence_inst.wait() + self.assertEqual("Target fence node \"node1\" still alive", str(err.exception)) + + mock_this_node.assert_called_once_with() + mock_info.assert_called_once_with("Waiting 60s for node \"node1\" reboot...") + self.task_fence_inst.fence_finish_event.wait.assert_called_once_with(60) + + +class TestTask(TestCase): + + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + @mock.patch('crmsh.crash_test.utils.msg_info') + @mock.patch('crmsh.crash_test.utils.now') + def setUp(self, mock_now, mock_info): + """ + Test setUp. + """ + mock_now.return_value = "2019/07/10 01:15:15" + main.ctx = mock.Mock(task_list={"process_name": "xin", "age": 38}) + self.task_inst = task.Task("task description", flush=True) + mock_now.assert_called_once_with() + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + def test_header(self): + self.task_inst.header() + + def test_to_report(self): + self.task_inst.to_report() + + def test_to_json(self): + self.task_inst.to_json() + + @mock.patch('crmsh.service_manager.ServiceManager.service_is_active') + def test_task_pre_check_exception(self, mock_active): + mock_active.return_value = False + with self.assertRaises(task.TaskError) as err: + self.task_inst.task_pre_check() + self.assertEqual("Cluster not running!", str(err.exception)) + mock_active.assert_called_once_with("pacemaker.service") + + @mock.patch('crmsh.service_manager.ServiceManager.service_is_active') + def test_task_pre_check_exception_no_fence(self, mock_active): + mock_active.return_value = True + self.task_inst.get_fence_info = mock.Mock() + self.task_inst.fence_enabled = False + with self.assertRaises(task.TaskError) as err: + self.task_inst.task_pre_check() + self.assertEqual("Require stonith enabled", str(err.exception)) + mock_active.assert_called_once_with("pacemaker.service") + self.task_inst.get_fence_info.assert_called_once_with() + + @mock.patch('crmsh.crash_test.utils.FenceInfo') + def test_get_fence_info(self, mock_fence_info): + mock_fence_info_inst = mock.Mock() + mock_fence_info.return_value = mock_fence_info_inst + self.task_inst.get_fence_info() + + @mock.patch('crmsh.crash_test.utils.msg_info') + def test_info(self, mock_info): + self.task_inst.msg_append = mock.Mock() + self.task_inst.info("info message") + self.task_inst.msg_append.assert_called_once_with("info", "info message") + mock_info.assert_called_once_with("info message", to_stdout=True) + + @mock.patch('crmsh.crash_test.utils.msg_warn') + def test_warn(self, mock_warn): + self.task_inst.msg_append = mock.Mock() + self.task_inst.warn("warn message") + self.task_inst.msg_append.assert_called_once_with("warn", "warn message") + mock_warn.assert_called_once_with("warn message", to_stdout=True) + + @mock.patch('crmsh.crash_test.utils.msg_error') + def test_error(self, mock_error): + self.task_inst.msg_append = mock.Mock() + self.task_inst.error("error message") + self.task_inst.msg_append.assert_called_once_with("error", "error message") + mock_error.assert_called_once_with("error message", to_stdout=True) + + @mock.patch('crmsh.crash_test.utils.now') + def test_msg_append(self, mock_now): + self.task_inst.to_json = mock.Mock() + self.task_inst.to_report = mock.Mock() + self.task_inst.msg_append("error", "warn message") + mock_now.assert_called_once_with() + self.task_inst.to_json.assert_called_once_with() + self.task_inst.to_report.assert_called_once_with() + + def test_build_base_result(self): + self.task_inst.build_base_result() + expected_result = { + "Timestamp": self.task_inst.timestamp, + "Description": self.task_inst.description, + "Messages": [] + } + self.assertDictEqual(expected_result, self.task_inst.result) + + @mock.patch('crmsh.crash_test.utils.warning_ask') + def test_print_header(self, mock_ask): + self.task_inst.header = mock.Mock() + self.task_inst.info = mock.Mock() + mock_ask.return_value = False + + with self.assertRaises(crmshutils.TerminateSubCommand): + self.task_inst.print_header() + + self.task_inst.header.assert_called_once_with() + mock_ask.assert_called_once_with(task.Task.REBOOT_WARNING) + self.task_inst.info.assert_called_once_with("Testcase cancelled") + + @mock.patch('crmsh.crash_test.utils.str_to_datetime') + @mock.patch('time.sleep') + @mock.patch('crmsh.crash_test.task.Task.info') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + def test_fence_action_monitor(self, mock_run, mock_info, mock_sleep, mock_datetime): + self.task_inst.thread_stop_event = mock.Mock() + self.task_inst.thread_stop_event.is_set.side_effect = [False, False, False, False] + self.task_inst.fence_start_event = mock.Mock() + self.task_inst.fence_finish_event = mock.Mock() + output = "Pending Fencing Actions:\n * reboot of 15sp2-2 pending: client=pacemaker-controld.2430, origin=15sp2-1" + output2 = "Node 15sp2-2 last fenced at: Tue Jan 19 16:08:37 2021" + mock_run.side_effect = [(1, None, None), (0, output, None), (1, None, None), (0, output2, None)] + self.task_inst.timestamp = "2021/01/19 16:08:24" + mock_datetime.side_effect = [ + datetime.strptime(self.task_inst.timestamp, '%Y/%m/%d %H:%M:%S'), + datetime.strptime("Tue Jan 19 16:08:37 2021", '%a %b %d %H:%M:%S %Y') + ] + + self.task_inst.fence_action_monitor() + + self.task_inst.thread_stop_event.is_set.assert_has_calls([ + mock.call(), + mock.call(), + mock.call(), + mock.call() + ]) + mock_run.assert_has_calls([ + mock.call("crm_mon -1|grep -A1 \"Fencing Actions:\""), + mock.call("crm_mon -1|grep -A1 \"Fencing Actions:\""), + mock.call(config.FENCE_HISTORY.format(node="15sp2-2")), + mock.call(config.FENCE_HISTORY.format(node="15sp2-2")) + ]) + mock_info.assert_has_calls([ + mock.call("Node \"15sp2-2\" will be fenced by \"15sp2-1\"!"), + mock.call("Node \"15sp2-2\" was successfully fenced by \"15sp2-1\"") + ]) + self.task_inst.fence_start_event.set.assert_called_once_with() + self.task_inst.fence_finish_event.set.assert_called_once_with() + +class TestFixSBD(TestCase): + """ + Class to test TaskFixSBD of task.py + All tested in test_crash_test.py except verify() + """ + + @mock.patch('builtins.open') + @mock.patch('os.path.isfile') + @mock.patch('tempfile.mkstemp') + @mock.patch('crmsh.crash_test.utils.msg_info') + def setUp(self, mock_msg_info, mock_mkstemp, mock_isfile, mock_open): + """ + Test setUp. + """ + dev = "/dev/disk/by-id/scsi-SATA_ST2000LM007-1R81_WDZ5J42A" + bak = "/tmp/tmpmby3ty9g" + edit = "/tmp/tmpnic4t30s" + mock_isfile.return_value = True + mock_open.return_value = mock.mock_open(read_data="SBD_DEVICE={}". + format(dev)).return_value + mock_mkstemp.side_effect = [(1, bak), (2, edit)] + + self.task_fixsbd = task.TaskFixSBD(dev, force=False) + mock_msg_info.assert_called_once_with('Replace SBD_DEVICE with candidate {}'. + format(dev), to_stdout=False) + + def tearDown(self): + """ + Test tearDown. + """ + pass + + @mock.patch('os.fsync') + @mock.patch('builtins.open') + @mock.patch('os.path.isfile') + @mock.patch('crmsh.crash_test.utils.msg_info') + def test_verify_succeed(self, mock_msg_info, mock_isfile, mock_open, mock_fsync): + """ + Test verify successful. + """ + dev = "/dev/disk/by-id/scsi-SATA_ST2000LM007-1R81_WDZ5J42A" + mock_isfile.return_value = True + mock_open.return_value = mock.mock_open(read_data="SBD_DEVICE={}". + format(dev)).return_value + self.task_fixsbd.prev_task_list = [] + + self.task_fixsbd.verify() + mock_isfile.assert_called_once_with(config.SBD_CONF) + mock_msg_info.assert_called_once_with('SBD DEVICE change succeed', + to_stdout=True) + mock_fsync.assert_called() + + @mock.patch('builtins.open') + @mock.patch('os.path.isfile') + def test_verify_fail(self, mock_isfile, mock_open): + """ + Test verify failed. + """ + dev = "/dev/disk/by-id/scsi-SATA_ST2000LM007-1R81_WDZ5J42A" + dev_cur = "/dev/disk/by-id/scsi-SATA_ST2000LM007-no_change" + mock_isfile.return_value = True + mock_open.return_value = mock.mock_open(read_data="SBD_DEVICE={}". + format(dev_cur)).return_value + self.task_fixsbd.prev_task_list = [] + + with self.assertRaises(task.TaskError) as err: + self.task_fixsbd.verify() + mock_isfile.assert_called_once_with(config.SBD_CONF) + self.assertEqual("Fail to replace SBD device {} in {}!". + format(dev, config.SBD_CONF), str(err.exception)) -- cgit v1.2.3