import sys import datetime from crmsh import config from crmsh import utils as crmutils from crmsh.report import utils, constants import crmsh.log import unittest from unittest import mock class TestPackage(unittest.TestCase): @mock.patch('crmsh.report.utils.get_pkg_mgr') def setUp(self, mock_get_pkg_mgr): mock_get_pkg_mgr.side_effect = [None, "rpm", "deb"] self.inst_none = utils.Package("xxx1 xxx2") self.inst = utils.Package("rpm1 rpm2") self.inst_deb = utils.Package("deb1 deb2") def test_version_return(self): res = self.inst_none.version() self.assertEqual(res, "") @mock.patch('crmsh.report.utils.Package.pkg_ver_rpm') def test_version(self, mock_ver_rpm): mock_ver_rpm.return_value = "version1" res = self.inst.version() self.assertEqual(res, "version1") @mock.patch('crmsh.report.utils.ShellUtils') def test_version_rpm(self, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst data = "rpm1-4.5.0\nrpm2 not installed" mock_run_inst.get_stdout_stderr.return_value = (0, data, None) res = self.inst.pkg_ver_rpm() self.assertEqual(res, "rpm1-4.5.0") @mock.patch('crmsh.report.utils.ShellUtils') def test_version_deb(self, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst data = "deb1-4.5.0\nno packages found" mock_run_inst.get_stdout_stderr.return_value = (0, data, None) res = self.inst_deb.pkg_ver_deb() self.assertEqual(res, "deb1-4.5.0") def test_verify_return(self): res = self.inst_none.verify() self.assertEqual(res, "") @mock.patch('crmsh.report.utils.Package.verify_rpm') def test_verify(self, mock_verify_rpm): mock_verify_rpm.return_value = "" res = self.inst.verify() self.assertEqual(res, "") @mock.patch('crmsh.report.utils.ShellUtils') def test_verify_rpm(self, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst mock_run_inst.get_stdout_stderr.return_value = (0, "verify data\nThis is not installed","") res = self.inst.verify_rpm() self.assertEqual(res, "verify data") @mock.patch('crmsh.report.utils.ShellUtils') def test_verify_deb(self, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst mock_run_inst.get_stdout_stderr.return_value = (0, "verify data\nThis is not installed","") res = self.inst_deb.verify_deb() self.assertEqual(res, "verify data") class TestSanitizer(unittest.TestCase): def setUp(self): mock_ctx_inst_no_sanitize = mock.Mock(sanitize=False) self.s_inst_no_sanitize = utils.Sanitizer(mock_ctx_inst_no_sanitize) mock_ctx_inst_no_sanitize_set = mock.Mock(sensitive_regex_list=[]) self.s_inst_no_sanitize_set = utils.Sanitizer(mock_ctx_inst_no_sanitize_set) mock_ctx_inst = mock.Mock(sanitize=True, work_dir="/opt", sensitive_regex_list=["test_patt"]) self.s_inst = utils.Sanitizer(mock_ctx_inst) @mock.patch('logging.Logger.warning') @mock.patch('crmsh.report.utils.Sanitizer._include_sensitive_data') @mock.patch('crmsh.report.utils.Sanitizer._extract_sensitive_value_list') @mock.patch('crmsh.report.utils.Sanitizer._parse_sensitive_set') @mock.patch('crmsh.report.utils.Sanitizer._load_cib_from_work_dir') def test_prepare_return(self, mock_load_cib, mock_parse, mock_extract, mock_include, mock_warning): mock_include.return_value = True self.s_inst_no_sanitize.prepare() mock_load_cib.assert_called_once_with() mock_parse.assert_called_once_with() mock_warning.assert_has_calls([ mock.call("Some PE/CIB/log files contain possibly sensitive data"), mock.call("Using \"-s\" option can replace sensitive data") ]) @mock.patch('crmsh.report.utils.Sanitizer._get_file_list_in_work_dir') @mock.patch('crmsh.report.utils.Sanitizer._include_sensitive_data') @mock.patch('crmsh.report.utils.Sanitizer._extract_sensitive_value_list') @mock.patch('crmsh.report.utils.Sanitizer._parse_sensitive_set') @mock.patch('crmsh.report.utils.Sanitizer._load_cib_from_work_dir') def test_prepare(self, mock_load_cib, mock_parse, mock_extract, mock_include, mock_get_file): mock_include.return_value = True self.s_inst.prepare() mock_load_cib.assert_called_once_with() mock_parse.assert_called_once_with() mock_get_file.assert_called_once_with @mock.patch('crmsh.report.utils.Sanitizer._include_sensitive_data') @mock.patch('crmsh.report.utils.Sanitizer._extract_sensitive_value_list') @mock.patch('crmsh.report.utils.Sanitizer._parse_sensitive_set') @mock.patch('crmsh.report.utils.Sanitizer._load_cib_from_work_dir') def test_prepare_no_sensitive_data(self, mock_load_cib, mock_parse, mock_extract, mock_include): mock_include.return_value = False self.s_inst.prepare() mock_load_cib.assert_called_once_with() mock_parse.assert_called_once_with() def test_include_sensitive_data(self): res = self.s_inst._include_sensitive_data() self.assertEqual(res, []) @mock.patch('os.walk') def test_get_file_list_in_work_dir(self, mock_walk): mock_walk.return_value = [ ("/opt", [], ["file1", "file2"]), ("/opt/dir1", [], ["file3"]), ] self.s_inst._get_file_list_in_work_dir() self.assertEqual(self.s_inst.file_list_in_workdir, ['/opt/file1', '/opt/file2', '/opt/dir1/file3']) @mock.patch('glob.glob') def test_load_cib_from_work_dir_no_cib(self, mock_glob): mock_glob.return_value = [] with self.assertRaises(utils.ReportGenericError) as err: self.s_inst._load_cib_from_work_dir() self.assertEqual(f"CIB file {constants.CIB_F} was not collected", str(err.exception)) @mock.patch('glob.glob') @mock.patch('crmsh.utils.read_from_file') def test_load_cib_from_work_dir_empty(self, mock_read, mock_glob): mock_glob.return_value = [f"/opt/node1/{constants.CIB_F}"] mock_read.return_value = None with self.assertRaises(utils.ReportGenericError) as err: self.s_inst._load_cib_from_work_dir() self.assertEqual(f"File /opt/node1/{constants.CIB_F} is empty", str(err.exception)) mock_read.assert_called_once_with(f"/opt/node1/{constants.CIB_F}") @mock.patch('glob.glob') @mock.patch('crmsh.utils.read_from_file') def test_load_cib_from_work_dir(self, mock_read, mock_glob): mock_glob.return_value = [f"/opt/node1/{constants.CIB_F}"] mock_read.return_value = "data" self.s_inst._load_cib_from_work_dir() self.assertEqual(self.s_inst.cib_data, "data") mock_read.assert_called_once_with(f"/opt/node1/{constants.CIB_F}") @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) def test_parse_sensitive_set_no_set(self, mock_logger): config.report.sanitize_rule = "" self.s_inst_no_sanitize_set._parse_sensitive_set() self.assertEqual(self.s_inst_no_sanitize_set.sensitive_regex_set, set(utils.Sanitizer.DEFAULT_RULE_LIST)) mock_logger.debug2.assert_called_once_with(f"Regex set to match sensitive data: {set(utils.Sanitizer.DEFAULT_RULE_LIST)}") @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) def test_parse_sensitive_set(self, mock_logger): config.report.sanitize_rule = "passw.*" self.s_inst._parse_sensitive_set() self.assertEqual(self.s_inst.sensitive_regex_set, set(['test_patt', 'passw.*'])) mock_logger.debug2.assert_called_once_with(f"Regex set to match sensitive data: {set(['test_patt', 'passw.*'])}") def test_sanitize_return(self): self.s_inst_no_sanitize.sanitize() @mock.patch('crmsh.report.utils.write_to_file') @mock.patch('logging.Logger.debug') @mock.patch('crmsh.report.utils.Sanitizer._sub_sensitive_string') @mock.patch('crmsh.utils.read_from_file') def test_sanitize(self, mock_read, mock_sub, mock_debug, mock_write): self.s_inst.file_list_in_workdir = ["file1", "file2"] mock_read.side_effect = [None, "data"] mock_sub.return_value = "replace_data" self.s_inst.sanitize() mock_debug.assert_called_once_with("Replace sensitive info for %s", "file2") def test_extract_from_cib(self): self.s_inst.cib_data = """ """ res = self.s_inst._extract_from_cib("passw.*") self.assertEqual(res, ["qwertyui"]) def test_sub_sensitive_string(self): data = """ This my tel 13356789876 """ self.s_inst.sensitive_value_list_with_raw_option = ["13356789876"] self.s_inst.sensitive_key_list = ["passw.*"] self.s_inst.sensitive_value_list = ["qwertyui"] res = self.s_inst._sub_sensitive_string(data) expected_data = """ This my tel ****** """ self.assertEqual(res, expected_data) @mock.patch('logging.Logger.warning') def test_extract_sensitive_value_list_warn(self, mock_warn): self.s_inst.sensitive_regex_set = set(["TEL:test"]) self.s_inst._extract_sensitive_value_list() mock_warn.assert_called_once_with("For sanitize pattern TEL:test, option should be \"raw\"") @mock.patch('crmsh.report.utils.Sanitizer._extract_from_cib') def test_extract_sensitive_value_list(self, mock_extract): mock_extract.side_effect = [["123456"], ["qwertyui"]] self.s_inst.sensitive_regex_set = set(["TEL:raw", "passw.*"]) self.s_inst._extract_sensitive_value_list() class TestUtils(unittest.TestCase): @mock.patch('builtins.sorted', side_effect=lambda x, *args, **kwargs: x[::-1]) @mock.patch('crmsh.report.utils.get_timespan_str') @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) @mock.patch('glob.glob') @mock.patch('crmsh.report.utils.is_our_log') def test_arch_logs(self, mock_is_our_log, mock_glob, mock_logger, mock_timespan, mock_sorted): mock_is_our_log.return_value = utils.LogType.GOOD mock_glob.return_value = [] mock_ctx_inst = mock.Mock() mock_timespan.return_value = "0101-0202" return_list, log_type = utils.arch_logs(mock_ctx_inst, "file1") self.assertEqual(return_list, ["file1"]) self.assertEqual(log_type, utils.LogType.GOOD) mock_logger.debug2.assert_called_once_with("Found logs ['file1'] in 0101-0202") @mock.patch('sys.stdout.flush') @mock.patch('traceback.print_exc') def test_print_traceback(self, mock_trace, mock_flush): utils.print_traceback() mock_trace.assert_called_once_with() @mock.patch('crmsh.report.utils.ts_to_str') def test_get_timespan_str(self, mock_ts_to_str): mock_ctx_inst = mock.Mock(from_time=1691938980.0, to_time=1691982180.0) mock_ts_to_str.side_effect = ["2023-08-13 23:03", "2023-08-14 11:03"] res = utils.get_timespan_str(mock_ctx_inst) self.assertEqual(res, "2023-08-13 23:03 - 2023-08-14 11:03") mock_ts_to_str.assert_has_calls([ mock.call(mock_ctx_inst.from_time), mock.call(mock_ctx_inst.to_time) ]) @mock.patch('crmsh.report.utils.ShellUtils') def test_get_cmd_output(self, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst mock_run_inst.get_stdout_stderr.return_value = (0, "stdout_data", "stderr_data") res = utils.get_cmd_output("cmd") self.assertEqual(res, "stdout_data\nstderr_data\n") mock_run_inst.get_stdout_stderr.assert_called_once_with("cmd", timeout=None) @mock.patch('crmsh.utils.read_from_file') def test_is_our_log_empty(self, mock_read): mock_read.return_value = None mock_ctx_inst = mock.Mock() res = utils.is_our_log(mock_ctx_inst, "/opt/logfile") self.assertEqual(res, utils.LogType.EMPTY) mock_read.assert_called_once_with("/opt/logfile") @mock.patch('crmsh.report.utils.determin_log_format') @mock.patch('crmsh.utils.read_from_file') def test_is_our_log_irregular(self, mock_read, mock_log_format): mock_read.return_value = "This is the log" mock_ctx_inst = mock.Mock() mock_log_format.return_value = None res = utils.is_our_log(mock_ctx_inst, "/opt/logfile") self.assertEqual(res, utils.LogType.IRREGULAR) mock_read.assert_called_once_with("/opt/logfile") mock_log_format.assert_called_once_with(mock_read.return_value) @mock.patch('crmsh.report.utils.find_first_timestamp') @mock.patch('crmsh.report.utils.head') @mock.patch('crmsh.report.utils.determin_log_format') @mock.patch('crmsh.utils.read_from_file') def test_is_our_log_before(self, mock_read, mock_determine, mock_head, mock_find_first): mock_read.return_value = "data" mock_determine.return_value = "rfc5424" mock_find_first.side_effect = [1000, 1500] mock_ctx_inst = mock.Mock(from_time=1600, to_time=1800) res = utils.is_our_log(mock_ctx_inst, "/var/log/pacemaker.log") self.assertEqual(res, utils.LogType.BEFORE_TIMESPAN) @mock.patch('crmsh.report.utils.find_first_timestamp') @mock.patch('crmsh.report.utils.head') @mock.patch('crmsh.report.utils.determin_log_format') @mock.patch('crmsh.utils.read_from_file') def test_is_our_log_good(self, mock_read, mock_determine, mock_head, mock_find_first): mock_read.return_value = "data" mock_determine.return_value = "rfc5424" mock_find_first.side_effect = [1000, 1500] mock_ctx_inst = mock.Mock(from_time=1200, to_time=1800) res = utils.is_our_log(mock_ctx_inst, "/var/log/pacemaker.log") self.assertEqual(res, utils.LogType.GOOD) @mock.patch('crmsh.report.utils.find_first_timestamp') @mock.patch('crmsh.report.utils.head') @mock.patch('crmsh.report.utils.determin_log_format') @mock.patch('crmsh.utils.read_from_file') def test_is_our_log_after(self, mock_read, mock_determine, mock_head, mock_find_first): mock_read.return_value = "data" mock_determine.return_value = "rfc5424" mock_find_first.side_effect = [1000, 1500] mock_ctx_inst = mock.Mock(from_time=200, to_time=800) res = utils.is_our_log(mock_ctx_inst, "/var/log/pacemaker.log") self.assertEqual(res, utils.LogType.AFTER_TIMESPAN) @mock.patch('logging.Logger.warning') @mock.patch('shutil.which') def test_get_pkg_mgr_unknown(self, mock_which, mock_warning): mock_which.side_effect = [False, False] self.assertEqual(utils.get_pkg_mgr(), "") mock_warning.assert_called_once_with("Unknown package manager!") @mock.patch('shutil.which') def test_get_pkg_mgr(self, mock_which): mock_which.return_value = True utils.get_pkg_mgr() self.assertEqual(utils.get_pkg_mgr(), "rpm") @mock.patch('os.walk') @mock.patch('os.stat') @mock.patch('os.path.isdir') def test_find_files_in_timespan(self, mock_isdir, mock_stat, mock_walk): mock_isdir.side_effect = [True, False] mock_stat.return_value = mock.Mock(st_ctime=1615) mock_walk.return_value = [ ('/mock_dir', [], ['file1.txt', 'file2.txt']) ] mock_ctx_inst = mock.Mock(from_time=1611, to_time=1620) res = utils.find_files_in_timespan(mock_ctx_inst, ['/mock_dir', '/not_exist']) expected_result = ['/mock_dir/file1.txt', '/mock_dir/file2.txt'] self.assertEqual(res, expected_result) @mock.patch('crmsh.report.utils.get_timespan_str') @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) @mock.patch('crmsh.report.utils.arch_logs') def test_dump_logset_return(self, mock_arch, mock_debug, mock_timespan): mock_arch.return_value = [[], ""] mock_ctx_inst = mock.Mock() utils.dump_logset(mock_ctx_inst, "file") @mock.patch('crmsh.report.utils.real_path') @mock.patch('logging.Logger.debug') @mock.patch('crmsh.utils.str2file') @mock.patch('os.path.basename') @mock.patch('crmsh.report.utils.print_logseg') @mock.patch('crmsh.report.utils.arch_logs') def test_dump_logset_irrgular(self, mock_arch, mock_print, mock_basename, mock_str2file, mock_debug, mock_real_path): mock_real_path.return_value = "file1" mock_arch.return_value = [["file1"], utils.LogType.IRREGULAR] mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir") mock_basename.return_value = "file1" mock_print.return_value = "data" utils.dump_logset(mock_ctx_inst, "file1") mock_print.assert_called_once_with("file1", 0, 0) mock_str2file.assert_called_once_with("data", "/opt/work_dir/file1") mock_debug.assert_called_once_with("Dump file1 into file1") @mock.patch('crmsh.report.utils.real_path') @mock.patch('logging.Logger.debug') @mock.patch('crmsh.utils.str2file') @mock.patch('os.path.basename') @mock.patch('crmsh.report.utils.print_logseg') @mock.patch('crmsh.report.utils.arch_logs') def test_dump_logset_one(self, mock_arch, mock_print, mock_basename, mock_str2file, mock_debug, mock_real_path): mock_real_path.return_value = "file1" mock_arch.return_value = [["file1"], utils.LogType.GOOD] mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir", from_time=10, to_time=20) mock_basename.return_value = "file1" mock_print.return_value = "data" utils.dump_logset(mock_ctx_inst, "file1") mock_print.assert_called_once_with("file1", 10, 20) mock_str2file.assert_called_once_with("data", "/opt/work_dir/file1") mock_debug.assert_called_once_with("Dump file1 into file1") @mock.patch('crmsh.report.utils.real_path') @mock.patch('logging.Logger.debug') @mock.patch('crmsh.utils.str2file') @mock.patch('os.path.basename') @mock.patch('crmsh.report.utils.print_logseg') @mock.patch('crmsh.report.utils.arch_logs') def test_dump_logset(self, mock_arch, mock_print, mock_basename, mock_str2file, mock_debug, mock_real_path): mock_real_path.return_value = "file1" mock_arch.return_value = [["file1", "file2", "file3"], utils.LogType.GOOD] mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir", from_time=10, to_time=20) mock_basename.return_value = "file1" mock_print.side_effect = ["data1\n", "data2\n", "data3\n"] utils.dump_logset(mock_ctx_inst, "file1") mock_print.assert_has_calls([ mock.call("file3", 10, 0), mock.call("file2", 0, 0), mock.call("file1", 0, 20), ]) mock_str2file.assert_called_once_with("data1\ndata2\ndata3", "/opt/work_dir/file1") mock_debug.assert_called_once_with("Dump file1 into file1") @mock.patch('crmsh.utils.read_from_file') @mock.patch('os.path.exists') @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) def test_get_distro_info(self, mock_debug2, mock_exists, mock_read): mock_exists.return_value = True mock_read.return_value = """ VERSION_ID="20230629" PRETTY_NAME="openSUSE Tumbleweed" ANSI_COLOR="0;32" """ res = utils.get_distro_info() self.assertEqual(res, "openSUSE Tumbleweed") @mock.patch('shutil.which') @mock.patch('crmsh.report.utils.sh.LocalShell') @mock.patch('os.path.exists') @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) def test_get_distro_info_lsb(self, mock_debug2, mock_exists, mock_sh, mock_which): mock_which.return_value = True mock_exists.return_value = False mock_sh_inst = mock.Mock() mock_sh.return_value = mock_sh_inst mock_sh_inst.get_stdout_or_raise_error.return_value = "data" res = utils.get_distro_info() self.assertEqual(res, "Unknown") @mock.patch('crmsh.report.utils.get_timestamp') def test_find_first_timestamp_none(self, mock_get_timestamp): mock_get_timestamp.side_effect = [None, None] data = ["line1", "line2"] self.assertIsNone(utils.find_first_timestamp(data, "file1")) mock_get_timestamp.assert_has_calls([ mock.call("line1", "file1"), mock.call("line2", "file1") ]) @mock.patch('crmsh.report.utils.get_timestamp') def test_find_first_timestamp(self, mock_get_timestamp): mock_get_timestamp.return_value = 123456 data = ["line1", "line2"] res = utils.find_first_timestamp(data, "file1") self.assertEqual(res, 123456) mock_get_timestamp.assert_called_once_with("line1", "file1") def test_filter_lines(self): data = """line1 line2 line3 line4 line5 """ res = utils.filter_lines(data, 2, 4) self.assertEqual(res, 'line2\nline3\nline4\n') @mock.patch('crmsh.utils.parse_time') @mock.patch('crmsh.report.utils.head') def test_determin_log_format_none(self, mock_head, mock_parse): mock_head.return_value = ["line1", "line2"] mock_parse.side_effect = [None, None] data = """line1 line2 """ self.assertIsNone(utils.determin_log_format(data)) def test_determin_log_format_rfc5424(self): data = """ 2003-10-11T22:14:15.003Z mymachine.example.com su """ res = utils.determin_log_format(data) self.assertEqual(res, "rfc5424") def test_determin_log_format_syslog(self): data = """ Feb 12 18:30:08 15sp1-1 kernel: """ res = utils.determin_log_format(data) self.assertEqual(res, "syslog") @mock.patch('crmsh.utils.parse_time') @mock.patch('crmsh.report.utils.head') def test_determin_log_format_legacy(self, mock_head, mock_parse): mock_head.return_value = ["Legacy 2003-10-11T22:14:15.003Z log"] mock_parse.side_effect = [None, None, 123456] data = """ Legacy 003-10-11T22:14:15.003Z log data log """ res = utils.determin_log_format(data) self.assertEqual(res, "legacy") mock_parse.assert_has_calls([ mock.call("Legacy 2003-10-11T22:14:15.003Z log", quiet=True), mock.call("Legacy", quiet=True), mock.call("2003-10-11T22:14:15.003Z", quiet=True) ]) def test_get_timestamp_none(self): self.assertIsNone(utils.get_timestamp("", "file1")) @mock.patch('crmsh.report.utils.get_timestamp_from_time_line') def test_get_timespan_rfc5424(self, mock_get_timestamp): constants.STAMP_TYPE = "rfc5424" mock_get_timestamp.return_value = 12345 res = utils.get_timestamp("2003-10-11T22:14:15.003Z mymachine.example.com su", "file1") self.assertEqual(res, mock_get_timestamp.return_value) mock_get_timestamp.assert_called_once_with("2003-10-11T22:14:15.003Z", "rfc5424", "file1") @mock.patch('crmsh.report.utils.get_timestamp_from_time_line') def test_get_timespan_syslog(self, mock_get_timestamp): constants.STAMP_TYPE = "syslog" mock_get_timestamp.return_value = 12345 res = utils.get_timestamp("Feb 12 18:30:08 15sp1-1 kernel:", "file1") self.assertEqual(res, mock_get_timestamp.return_value) mock_get_timestamp.assert_called_once_with("Feb 12 18:30:08", "syslog", "file1") @mock.patch('crmsh.report.utils.get_timestamp_from_time_line') def test_get_timespan_legacy(self, mock_get_timestamp): constants.STAMP_TYPE = "legacy" mock_get_timestamp.return_value = 12345 res = utils.get_timestamp("legacy 2003-10-11T22:14:15.003Z log data", "file1") self.assertEqual(res, mock_get_timestamp.return_value) mock_get_timestamp.assert_called_once_with("2003-10-11T22:14:15.003Z", "legacy", "file1") @mock.patch('crmsh.report.utils.diff_check') def test_do_compare(self, mock_diff): mock_ctx_inst = mock.Mock(work_dir="/opt/workdir", node_list=["node1", "node2"]) mock_diff.side_effect = [[0, ""], [0, ""]] rc, out = utils.do_compare(mock_ctx_inst, "file1") self.assertEqual(rc, 0) self.assertEqual(out, "") mock_diff.assert_called_once_with("/opt/workdir/node1/file1", "/opt/workdir/node2/file1") @mock.patch('os.path.isfile') def test_diff_check_return(self, mock_isfile): mock_isfile.return_value = False rc, out = utils.diff_check("/opt/file1", "/opt/fil2") self.assertEqual(rc, 1) self.assertEqual(out, "/opt/file1 does not exist\n") @mock.patch('crmsh.report.utils.cib_diff') @mock.patch('os.path.basename') @mock.patch('os.path.isfile') def test_diff_check(self, mock_isfile, mock_basename, mock_cib_diff): mock_isfile.side_effect = [True, True] mock_basename.return_value = "cib.xml" mock_cib_diff.return_value = (0, "") rc, out = utils.diff_check("/opt/node1/cib.xml", "/opt/node2/cib.xml") self.assertEqual(rc, 0) self.assertEqual(out, "") @mock.patch('crmsh.report.utils.ShellUtils') def test_txt_diff(self, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst mock_run_inst.get_stdout_stderr.return_value = (0, "", None) rc, out = utils.txt_diff("txt1", "txt2") self.assertEqual(rc, 0) self.assertEqual(out, "") @mock.patch('os.path.isfile') def test_cib_diff_not_running(self, mock_isfile): mock_isfile.side_effect = [True, False, False, True] rc, out = utils.cib_diff("/opt/node1/cib.xml", "/opt/node2/cib.xml") self.assertEqual(rc, 1) self.assertEqual(out, "Can't compare cibs from running and stopped systems\n") @mock.patch('crmsh.report.utils.ShellUtils') @mock.patch('os.path.isfile') def test_cib_diff(self, mock_isfile, mock_run): mock_run_inst = mock.Mock() mock_run.return_value = mock_run_inst mock_isfile.side_effect = [True, True] mock_run_inst.get_stdout_stderr.return_value = (0, "", None) rc, out = utils.cib_diff("/opt/node1/cib.xml", "/opt/node2/cib.xml") self.assertEqual(rc, 0) self.assertEqual(out, "") mock_run_inst.get_stdout_stderr.assert_called_once_with("crm_diff -c -n /opt/node1/cib.xml -o /opt/node2/cib.xml") @mock.patch('os.symlink') @mock.patch('shutil.move') @mock.patch('os.remove') @mock.patch('os.path.isfile') def test_consolidate(self, mock_isfile, mock_remove, mock_move, mock_symlink): mock_isfile.side_effect = [True, False] mock_ctx_inst = mock.Mock(node_list=["node1", "node2"], work_dir="/opt/workdir") utils.consolidate(mock_ctx_inst, "target_file") mock_isfile.assert_has_calls([ mock.call("/opt/workdir/target_file"), mock.call("/opt/workdir/target_file") ]) mock_symlink.assert_has_calls([ mock.call('../target_file', '/opt/workdir/node1/target_file'), mock.call('../target_file', '/opt/workdir/node2/target_file') ]) @mock.patch('crmsh.report.utils.Sanitizer') def test_do_sanitize(self, mock_sanitizer): mock_inst = mock.Mock() mock_sanitizer.return_value = mock_inst mock_ctx_inst = mock.Mock() utils.do_sanitize(mock_ctx_inst) mock_inst.prepare.assert_called_once_with() mock_inst.sanitize.assert_called_once_with() @mock.patch('crmsh.utils.read_from_file') def test_print_logseg_empty(self, mock_read): mock_read.return_value = "" res = utils.print_logseg("log1", 1234, 0) self.assertEqual(res, "") @mock.patch('crmsh.report.utils.findln_by_timestamp') @mock.patch('crmsh.utils.read_from_file') def test_print_logseg_none(self, mock_read, mock_findln): mock_read.return_value = "data" mock_findln.return_value = None res = utils.print_logseg("log1", 1234, 0) self.assertEqual(res, "") @mock.patch('crmsh.report.utils.filter_lines') @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) @mock.patch('crmsh.report.utils.findln_by_timestamp') @mock.patch('crmsh.utils.read_from_file') def test_print_logseg(self, mock_read, mock_findln, mock_logger, mock_filter): mock_read.return_value = "line1\nline2\nline3" mock_filter.return_value = "line1\nline2\nline3" res = utils.print_logseg("log1", 0, 0) self.assertEqual(res, mock_filter.return_value) mock_logger.debug2.assert_called_once_with("Including segment [%d-%d] from %s", 1, 3, "log1") def test_head(self): data = "line1\nline2\nline3" res = utils.head(2, data) self.assertEqual(res, ["line1", "line2"]) def test_tail(self): data = "line1\nline2\nline3" res = utils.tail(2, data) self.assertEqual(res, ["line2", "line3"]) @mock.patch('crmsh.utils.get_open_method') @mock.patch('builtins.open', create=True) def test_write_to_file(self, mock_open, mock_method): mock_method.return_value = mock_open file_handle = mock_open.return_value.__enter__.return_value utils.write_to_file('Hello', 'file.txt') mock_open.assert_called_once_with('file.txt', 'w') file_handle.write.assert_called_once_with('Hello') @mock.patch('gzip.open') @mock.patch('crmsh.utils.get_open_method') def test_write_to_file_encode(self, mock_method, mock_open): mock_method.return_value = mock_open file_handle = mock_open.return_value.__enter__.return_value utils.write_to_file('Hello', 'file.txt') mock_open.assert_called_once_with('file.txt', 'w') file_handle.write.assert_called_once_with(b'Hello') @mock.patch('crmsh.report.utils.dt_to_str') @mock.patch('crmsh.report.utils.ts_to_dt') def test_ts_to_str(self, mock_ts_to_dt, mock_dt_to_str): mock_ts_to_dt.return_value = datetime.datetime(2020, 2, 19, 21, 44, 7, 977355) mock_dt_to_str.return_value = "2020-02-19 21:44" res = utils.ts_to_str(1693519260.0) self.assertEqual(res, mock_dt_to_str.return_value) def test_ts_to_dt(self): res = utils.ts_to_dt(crmutils.parse_to_timestamp("2023-09-01 06:01")) self.assertEqual(utils.dt_to_str(res), "2023-09-01 06:01:00") def test_now(self): expected_res = datetime.datetime.now().strftime(constants.TIME_FORMAT) res = utils.now() self.assertEqual(res, expected_res) @mock.patch('crmsh.utils.str2file') @mock.patch('crmsh.utils.read_from_file') @mock.patch('os.path.isfile') @mock.patch('crmsh.report.utils.now') def test_create_description_template(self, mock_now, mock_isfile, mock_read, mock_str2file): mock_now.return_value = "2023-09-01 06:01" sys.argv = ["crm", "report", "option1"] mock_ctx_inst = mock.Mock(node_list=["node1"], work_dir="/opt/workdir") mock_isfile.return_value = True mock_read.return_value = "data" utils.create_description_template(mock_ctx_inst) @mock.patch('crmsh.utils.str2file') @mock.patch('crmsh.report.utils.extract_critical_log') @mock.patch('crmsh.report.utils.check_collected_files') @mock.patch('crmsh.report.utils.compare_and_consolidate_files') @mock.patch('glob.glob') def test_analyze(self, mock_glob, mock_compare, mock_check_collected, mock_extract, mock_str2file): mock_compare.return_value = "data" mock_check_collected.return_value = "" mock_extract.return_value = "" mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir") utils.analyze(mock_ctx_inst) mock_str2file.assert_called_once_with("data", f"/opt/work_dir/{constants.ANALYSIS_F}") @mock.patch('crmsh.report.utils.consolidate') @mock.patch('crmsh.report.utils.do_compare') @mock.patch('glob.glob') def test_compare_and_consolidate_files(self, mock_glob, mock_compare, mock_consolidate): mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir") mock_glob.side_effect = [False, True, True, True, True] mock_compare.side_effect = [(0, ""), (0, ""), (0, ""), (0, "")] res = utils.compare_and_consolidate_files(mock_ctx_inst) self.assertEqual(f"Diff {constants.MEMBERSHIP_F}... no {constants.MEMBERSHIP_F} found in /opt/work_dir\nDiff {constants.CRM_MON_F}... OK\nDiff {constants.COROSYNC_F}... OK\nDiff {constants.SYSINFO_F}... OK\nDiff {constants.CIB_F}... OK\n\n", res) @mock.patch('crmsh.utils.read_from_file') @mock.patch('crmsh.utils.file_is_empty') @mock.patch('os.path.isfile') def test_check_collected_files(self, mock_isfile, mock_is_empty, mock_read): mock_ctx_inst = mock.Mock(node_list=["node1"], work_dir="/opt/work_dir") mock_isfile.side_effect = [False, False, True] mock_is_empty.return_value = False mock_read.return_value = "data" res = utils.check_collected_files(mock_ctx_inst) self.assertEqual(res, ["Checking problems with permissions/ownership at node1:", "data"]) @mock.patch('logging.Logger.error') @mock.patch('crmsh.utils.parse_to_timestamp') def test_parse_to_timestamp_none(self, mock_parse, mock_error): mock_parse.return_value = None with self.assertRaises(utils.ReportGenericError) as err: utils.parse_to_timestamp("xxxxx") mock_error.assert_has_calls([ mock.call(f"Invalid time string 'xxxxx'"), mock.call('Try these formats like: 2pm; "2019/9/5 12:30"; "09-Sep-07 2:00"; "[1-9][0-9]*[YmdHM]"') ]) @mock.patch('logging.Logger.error') @mock.patch('crmsh.utils.parse_to_timestamp') def test_parse_to_timestamp(self, mock_parse, mock_error): mock_parse.return_value = 1234567 res = utils.parse_to_timestamp("2023") self.assertEqual(res, mock_parse.return_value) def test_parse_to_timestamp_delta(self): timedelta = datetime.timedelta(days=10) expected_timestamp = (datetime.datetime.now() - timedelta).timestamp() res = utils.parse_to_timestamp("10d") self.assertEqual(int(res), int(expected_timestamp)) @mock.patch('crmsh.utils.read_from_file') @mock.patch('glob.glob') def test_extract_critical_log(self, mock_glob, mock_read): mock_glob.return_value = ["/opt/workdir/pacemaker.log"] mock_read.return_value = """ line1 pacemaker-controld[5678]: warning: data pacemaker-schedulerd[5677]: error: Resource line4 """ mock_ctx_inst = mock.Mock(work_dir="/opt/workdir") res = utils.extract_critical_log(mock_ctx_inst) expected_data = """ WARNINGS or ERRORS in pacemaker.log: pacemaker-controld[5678]: warning: data pacemaker-schedulerd[5677]: error: Resource""" self.assertEqual('\n'.join(res), expected_data) def test_findln_by_timestamp_1(self): pacemaker_file_path = "pacemaker.log.2" with open(pacemaker_file_path) as f: data = f.read() data_list = data.split('\n') constants.STAMP_TYPE = utils.determin_log_format(data) first_timestamp = utils.get_timestamp(data_list[0], pacemaker_file_path) middle_timestamp = utils.get_timestamp(data_list[1], pacemaker_file_path) last_timestamp = utils.get_timestamp(data_list[2], pacemaker_file_path) assert first_timestamp < middle_timestamp < last_timestamp line_stamp = crmutils.parse_to_timestamp("Jan 03 11:03:41 2024") result_line = utils.findln_by_timestamp(data, line_stamp, pacemaker_file_path) assert result_line == 2 line_stamp = crmutils.parse_to_timestamp("Jan 03 12:03:41 2024") result_line = utils.findln_by_timestamp(data, line_stamp, pacemaker_file_path) assert result_line == 3 def test_findln_by_timestamp_irregular(self): data = """line1 line2 line3 line4""" target_time = "Apr 03 13:10" target_time_stamp = crmutils.parse_to_timestamp(target_time) result_line = utils.findln_by_timestamp(data, target_time_stamp, "file1") self.assertIsNone(result_line) def test_findln_by_timestamp(self): target_time = "Apr 03 13:10" target_time_stamp = crmutils.parse_to_timestamp(target_time+' 2023') with open('pacemaker.log') as f: data = f.read() constants.STAMP_TYPE = utils.determin_log_format(data) pacemaker_file_path = "pacemaker.log" result_line = utils.findln_by_timestamp(data, target_time_stamp, pacemaker_file_path) result_line_stamp = utils.get_timestamp(data.split('\n')[result_line-1], pacemaker_file_path) assert result_line_stamp > target_time_stamp result_pre_line_stamp = utils.get_timestamp(data.split('\n')[result_line-2], pacemaker_file_path) assert result_pre_line_stamp < target_time_stamp target_time = "Apr 03 11:01:19" target_time_stamp = crmutils.parse_to_timestamp(target_time+' 2023') result_line = utils.findln_by_timestamp(data, target_time_stamp, pacemaker_file_path) result_time = ' '.join(data.split('\n')[result_line-1].split()[:3]) self.assertEqual(result_time, target_time) @mock.patch('crmsh.utils.parse_to_timestamp') def test_get_timestamp_from_time_line_not_syslog(self, mock_parse): mock_parse.return_value = 123456 res = utils.get_timestamp_from_time_line("line1", "rfc5424", "file1") self.assertEqual(res, mock_parse.return_value) @mock.patch('os.path.getmtime') @mock.patch('crmsh.report.utils.datetime') @mock.patch('crmsh.utils.parse_to_timestamp') def test_get_timestamp_from_time_line_next_year(self, mock_parse, mock_datetime, mock_getmtime): mock_parse.return_value = 8888888888888 mock_getmtime.return_value = 1691938980.0 mock_datetime.datetime.now.return_value = datetime.datetime(2023, 9, 1, 6, 1) mock_datetime.datetime.fromtimestamp.return_value = datetime.datetime(2024, 9, 1, 6, 1) res = utils.get_timestamp_from_time_line("line1", "syslog", "file1") self.assertIsNone(res) @mock.patch('os.path.getmtime') @mock.patch('crmsh.report.utils.datetime') @mock.patch('crmsh.utils.parse_to_timestamp') def test_get_timestamp_from_time_line_that_year(self, mock_parse, mock_datetime, mock_getmtime): mock_parse.return_value = 8888888888888 mock_getmtime.return_value = 1691938980.0 mock_datetime.datetime.now.return_value = datetime.datetime(2023, 9, 1, 6, 1) mock_datetime.datetime.fromtimestamp.return_value = datetime.datetime(2022, 9, 1, 6, 1) res = utils.get_timestamp_from_time_line("line1", "syslog", "file1") self.assertEqual(res, mock_parse.return_value)