From d835b2cae8abc71958b69362162e6a70c3d7ef63 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 08:48:59 +0200 Subject: Adding upstream version 4.6.0. Signed-off-by: Daniel Baumann --- test/unittests/test_report_utils.py | 862 ++++++++++++++++++++++++++++++++++++ 1 file changed, 862 insertions(+) create mode 100644 test/unittests/test_report_utils.py (limited to 'test/unittests/test_report_utils.py') diff --git a/test/unittests/test_report_utils.py b/test/unittests/test_report_utils.py new file mode 100644 index 0000000..aa28563 --- /dev/null +++ b/test/unittests/test_report_utils.py @@ -0,0 +1,862 @@ +import sys +import datetime +from crmsh import config +from crmsh import utils as crmutils +from crmsh.report import utils, constants +import crmsh.log + +import unittest +from unittest import mock + + +class TestPackage(unittest.TestCase): + + @mock.patch('crmsh.report.utils.get_pkg_mgr') + def setUp(self, mock_get_pkg_mgr): + mock_get_pkg_mgr.side_effect = [None, "rpm", "deb"] + self.inst_none = utils.Package("xxx1 xxx2") + self.inst = utils.Package("rpm1 rpm2") + self.inst_deb = utils.Package("deb1 deb2") + + def test_version_return(self): + res = self.inst_none.version() + self.assertEqual(res, "") + + @mock.patch('crmsh.report.utils.Package.pkg_ver_rpm') + def test_version(self, mock_ver_rpm): + mock_ver_rpm.return_value = "version1" + res = self.inst.version() + self.assertEqual(res, "version1") + + @mock.patch('crmsh.report.utils.ShellUtils') + def test_version_rpm(self, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + data = "rpm1-4.5.0\nrpm2 not installed" + mock_run_inst.get_stdout_stderr.return_value = (0, data, None) + res = self.inst.pkg_ver_rpm() + self.assertEqual(res, "rpm1-4.5.0") + + @mock.patch('crmsh.report.utils.ShellUtils') + def test_version_deb(self, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + data = "deb1-4.5.0\nno packages found" + mock_run_inst.get_stdout_stderr.return_value = (0, data, None) + res = self.inst_deb.pkg_ver_deb() + self.assertEqual(res, "deb1-4.5.0") + + def test_verify_return(self): + res = self.inst_none.verify() + self.assertEqual(res, "") + + @mock.patch('crmsh.report.utils.Package.verify_rpm') + def test_verify(self, mock_verify_rpm): + mock_verify_rpm.return_value = "" + res = self.inst.verify() + self.assertEqual(res, "") + + @mock.patch('crmsh.report.utils.ShellUtils') + def test_verify_rpm(self, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_stderr.return_value = (0, "verify data\nThis is not installed","") + res = self.inst.verify_rpm() + self.assertEqual(res, "verify data") + + @mock.patch('crmsh.report.utils.ShellUtils') + def test_verify_deb(self, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_stderr.return_value = (0, "verify data\nThis is not installed","") + res = self.inst_deb.verify_deb() + self.assertEqual(res, "verify data") + + +class TestSanitizer(unittest.TestCase): + + def setUp(self): + mock_ctx_inst_no_sanitize = mock.Mock(sanitize=False) + self.s_inst_no_sanitize = utils.Sanitizer(mock_ctx_inst_no_sanitize) + + mock_ctx_inst_no_sanitize_set = mock.Mock(sensitive_regex_list=[]) + self.s_inst_no_sanitize_set = utils.Sanitizer(mock_ctx_inst_no_sanitize_set) + + mock_ctx_inst = mock.Mock(sanitize=True, work_dir="/opt", sensitive_regex_list=["test_patt"]) + self.s_inst = utils.Sanitizer(mock_ctx_inst) + + @mock.patch('logging.Logger.warning') + @mock.patch('crmsh.report.utils.Sanitizer._include_sensitive_data') + @mock.patch('crmsh.report.utils.Sanitizer._extract_sensitive_value_list') + @mock.patch('crmsh.report.utils.Sanitizer._parse_sensitive_set') + @mock.patch('crmsh.report.utils.Sanitizer._load_cib_from_work_dir') + def test_prepare_return(self, mock_load_cib, mock_parse, mock_extract, mock_include, mock_warning): + mock_include.return_value = True + self.s_inst_no_sanitize.prepare() + mock_load_cib.assert_called_once_with() + mock_parse.assert_called_once_with() + mock_warning.assert_has_calls([ + mock.call("Some PE/CIB/log files contain possibly sensitive data"), + mock.call("Using \"-s\" option can replace sensitive data") + ]) + + @mock.patch('crmsh.report.utils.Sanitizer._get_file_list_in_work_dir') + @mock.patch('crmsh.report.utils.Sanitizer._include_sensitive_data') + @mock.patch('crmsh.report.utils.Sanitizer._extract_sensitive_value_list') + @mock.patch('crmsh.report.utils.Sanitizer._parse_sensitive_set') + @mock.patch('crmsh.report.utils.Sanitizer._load_cib_from_work_dir') + def test_prepare(self, mock_load_cib, mock_parse, mock_extract, mock_include, mock_get_file): + mock_include.return_value = True + self.s_inst.prepare() + mock_load_cib.assert_called_once_with() + mock_parse.assert_called_once_with() + mock_get_file.assert_called_once_with + + @mock.patch('crmsh.report.utils.Sanitizer._include_sensitive_data') + @mock.patch('crmsh.report.utils.Sanitizer._extract_sensitive_value_list') + @mock.patch('crmsh.report.utils.Sanitizer._parse_sensitive_set') + @mock.patch('crmsh.report.utils.Sanitizer._load_cib_from_work_dir') + def test_prepare_no_sensitive_data(self, mock_load_cib, mock_parse, mock_extract, mock_include): + mock_include.return_value = False + self.s_inst.prepare() + mock_load_cib.assert_called_once_with() + mock_parse.assert_called_once_with() + + def test_include_sensitive_data(self): + res = self.s_inst._include_sensitive_data() + self.assertEqual(res, []) + + @mock.patch('os.walk') + def test_get_file_list_in_work_dir(self, mock_walk): + mock_walk.return_value = [ + ("/opt", [], ["file1", "file2"]), + ("/opt/dir1", [], ["file3"]), + ] + self.s_inst._get_file_list_in_work_dir() + self.assertEqual(self.s_inst.file_list_in_workdir, ['/opt/file1', '/opt/file2', '/opt/dir1/file3']) + + @mock.patch('glob.glob') + def test_load_cib_from_work_dir_no_cib(self, mock_glob): + mock_glob.return_value = [] + with self.assertRaises(utils.ReportGenericError) as err: + self.s_inst._load_cib_from_work_dir() + self.assertEqual(f"CIB file {constants.CIB_F} was not collected", str(err.exception)) + + @mock.patch('glob.glob') + @mock.patch('crmsh.utils.read_from_file') + def test_load_cib_from_work_dir_empty(self, mock_read, mock_glob): + mock_glob.return_value = [f"/opt/node1/{constants.CIB_F}"] + mock_read.return_value = None + with self.assertRaises(utils.ReportGenericError) as err: + self.s_inst._load_cib_from_work_dir() + self.assertEqual(f"File /opt/node1/{constants.CIB_F} is empty", str(err.exception)) + mock_read.assert_called_once_with(f"/opt/node1/{constants.CIB_F}") + + @mock.patch('glob.glob') + @mock.patch('crmsh.utils.read_from_file') + def test_load_cib_from_work_dir(self, mock_read, mock_glob): + mock_glob.return_value = [f"/opt/node1/{constants.CIB_F}"] + mock_read.return_value = "data" + self.s_inst._load_cib_from_work_dir() + self.assertEqual(self.s_inst.cib_data, "data") + mock_read.assert_called_once_with(f"/opt/node1/{constants.CIB_F}") + + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + def test_parse_sensitive_set_no_set(self, mock_logger): + config.report.sanitize_rule = "" + self.s_inst_no_sanitize_set._parse_sensitive_set() + self.assertEqual(self.s_inst_no_sanitize_set.sensitive_regex_set, set(utils.Sanitizer.DEFAULT_RULE_LIST)) + mock_logger.debug2.assert_called_once_with(f"Regex set to match sensitive data: {set(utils.Sanitizer.DEFAULT_RULE_LIST)}") + + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + def test_parse_sensitive_set(self, mock_logger): + config.report.sanitize_rule = "passw.*" + self.s_inst._parse_sensitive_set() + self.assertEqual(self.s_inst.sensitive_regex_set, set(['test_patt', 'passw.*'])) + mock_logger.debug2.assert_called_once_with(f"Regex set to match sensitive data: {set(['test_patt', 'passw.*'])}") + + def test_sanitize_return(self): + self.s_inst_no_sanitize.sanitize() + + @mock.patch('crmsh.report.utils.write_to_file') + @mock.patch('logging.Logger.debug') + @mock.patch('crmsh.report.utils.Sanitizer._sub_sensitive_string') + @mock.patch('crmsh.utils.read_from_file') + def test_sanitize(self, mock_read, mock_sub, mock_debug, mock_write): + self.s_inst.file_list_in_workdir = ["file1", "file2"] + mock_read.side_effect = [None, "data"] + mock_sub.return_value = "replace_data" + self.s_inst.sanitize() + mock_debug.assert_called_once_with("Replace sensitive info for %s", "file2") + + def test_extract_from_cib(self): + self.s_inst.cib_data = """ + + + + """ + res = self.s_inst._extract_from_cib("passw.*") + self.assertEqual(res, ["qwertyui"]) + + def test_sub_sensitive_string(self): + data = """ + + + + + This my tel 13356789876 + """ + self.s_inst.sensitive_value_list_with_raw_option = ["13356789876"] + self.s_inst.sensitive_key_list = ["passw.*"] + self.s_inst.sensitive_value_list = ["qwertyui"] + res = self.s_inst._sub_sensitive_string(data) + expected_data = """ + + + + + This my tel ****** + """ + self.assertEqual(res, expected_data) + + @mock.patch('logging.Logger.warning') + def test_extract_sensitive_value_list_warn(self, mock_warn): + self.s_inst.sensitive_regex_set = set(["TEL:test"]) + self.s_inst._extract_sensitive_value_list() + mock_warn.assert_called_once_with("For sanitize pattern TEL:test, option should be \"raw\"") + + @mock.patch('crmsh.report.utils.Sanitizer._extract_from_cib') + def test_extract_sensitive_value_list(self, mock_extract): + mock_extract.side_effect = [["123456"], ["qwertyui"]] + self.s_inst.sensitive_regex_set = set(["TEL:raw", "passw.*"]) + self.s_inst._extract_sensitive_value_list() + +class TestUtils(unittest.TestCase): + + @mock.patch('builtins.sorted', side_effect=lambda x, *args, **kwargs: x[::-1]) + @mock.patch('crmsh.report.utils.get_timespan_str') + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + @mock.patch('glob.glob') + @mock.patch('crmsh.report.utils.is_our_log') + def test_arch_logs(self, mock_is_our_log, mock_glob, mock_logger, mock_timespan, mock_sorted): + mock_is_our_log.return_value = utils.LogType.GOOD + mock_glob.return_value = [] + mock_ctx_inst = mock.Mock() + mock_timespan.return_value = "0101-0202" + + return_list, log_type = utils.arch_logs(mock_ctx_inst, "file1") + + self.assertEqual(return_list, ["file1"]) + self.assertEqual(log_type, utils.LogType.GOOD) + mock_logger.debug2.assert_called_once_with("Found logs ['file1'] in 0101-0202") + + @mock.patch('sys.stdout.flush') + @mock.patch('traceback.print_exc') + def test_print_traceback(self, mock_trace, mock_flush): + utils.print_traceback() + mock_trace.assert_called_once_with() + + @mock.patch('crmsh.report.utils.ts_to_str') + def test_get_timespan_str(self, mock_ts_to_str): + mock_ctx_inst = mock.Mock(from_time=1691938980.0, to_time=1691982180.0) + mock_ts_to_str.side_effect = ["2023-08-13 23:03", "2023-08-14 11:03"] + res = utils.get_timespan_str(mock_ctx_inst) + self.assertEqual(res, "2023-08-13 23:03 - 2023-08-14 11:03") + mock_ts_to_str.assert_has_calls([ + mock.call(mock_ctx_inst.from_time), + mock.call(mock_ctx_inst.to_time) + ]) + + @mock.patch('crmsh.report.utils.ShellUtils') + def test_get_cmd_output(self, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_stderr.return_value = (0, "stdout_data", "stderr_data") + res = utils.get_cmd_output("cmd") + self.assertEqual(res, "stdout_data\nstderr_data\n") + mock_run_inst.get_stdout_stderr.assert_called_once_with("cmd", timeout=None) + + @mock.patch('crmsh.utils.read_from_file') + def test_is_our_log_empty(self, mock_read): + mock_read.return_value = None + mock_ctx_inst = mock.Mock() + res = utils.is_our_log(mock_ctx_inst, "/opt/logfile") + self.assertEqual(res, utils.LogType.EMPTY) + mock_read.assert_called_once_with("/opt/logfile") + + @mock.patch('crmsh.report.utils.determin_log_format') + @mock.patch('crmsh.utils.read_from_file') + def test_is_our_log_irregular(self, mock_read, mock_log_format): + mock_read.return_value = "This is the log" + mock_ctx_inst = mock.Mock() + mock_log_format.return_value = None + res = utils.is_our_log(mock_ctx_inst, "/opt/logfile") + self.assertEqual(res, utils.LogType.IRREGULAR) + mock_read.assert_called_once_with("/opt/logfile") + mock_log_format.assert_called_once_with(mock_read.return_value) + + @mock.patch('crmsh.report.utils.find_first_timestamp') + @mock.patch('crmsh.report.utils.head') + @mock.patch('crmsh.report.utils.determin_log_format') + @mock.patch('crmsh.utils.read_from_file') + def test_is_our_log_before(self, mock_read, mock_determine, mock_head, mock_find_first): + mock_read.return_value = "data" + mock_determine.return_value = "rfc5424" + mock_find_first.side_effect = [1000, 1500] + mock_ctx_inst = mock.Mock(from_time=1600, to_time=1800) + res = utils.is_our_log(mock_ctx_inst, "/var/log/pacemaker.log") + self.assertEqual(res, utils.LogType.BEFORE_TIMESPAN) + + @mock.patch('crmsh.report.utils.find_first_timestamp') + @mock.patch('crmsh.report.utils.head') + @mock.patch('crmsh.report.utils.determin_log_format') + @mock.patch('crmsh.utils.read_from_file') + def test_is_our_log_good(self, mock_read, mock_determine, mock_head, mock_find_first): + mock_read.return_value = "data" + mock_determine.return_value = "rfc5424" + mock_find_first.side_effect = [1000, 1500] + mock_ctx_inst = mock.Mock(from_time=1200, to_time=1800) + res = utils.is_our_log(mock_ctx_inst, "/var/log/pacemaker.log") + self.assertEqual(res, utils.LogType.GOOD) + + @mock.patch('crmsh.report.utils.find_first_timestamp') + @mock.patch('crmsh.report.utils.head') + @mock.patch('crmsh.report.utils.determin_log_format') + @mock.patch('crmsh.utils.read_from_file') + def test_is_our_log_after(self, mock_read, mock_determine, mock_head, mock_find_first): + mock_read.return_value = "data" + mock_determine.return_value = "rfc5424" + mock_find_first.side_effect = [1000, 1500] + mock_ctx_inst = mock.Mock(from_time=200, to_time=800) + res = utils.is_our_log(mock_ctx_inst, "/var/log/pacemaker.log") + self.assertEqual(res, utils.LogType.AFTER_TIMESPAN) + + @mock.patch('logging.Logger.warning') + @mock.patch('shutil.which') + def test_get_pkg_mgr_unknown(self, mock_which, mock_warning): + mock_which.side_effect = [False, False] + self.assertEqual(utils.get_pkg_mgr(), "") + mock_warning.assert_called_once_with("Unknown package manager!") + + @mock.patch('shutil.which') + def test_get_pkg_mgr(self, mock_which): + mock_which.return_value = True + utils.get_pkg_mgr() + self.assertEqual(utils.get_pkg_mgr(), "rpm") + + @mock.patch('os.walk') + @mock.patch('os.stat') + @mock.patch('os.path.isdir') + def test_find_files_in_timespan(self, mock_isdir, mock_stat, mock_walk): + mock_isdir.side_effect = [True, False] + mock_stat.return_value = mock.Mock(st_ctime=1615) + mock_walk.return_value = [ + ('/mock_dir', [], ['file1.txt', 'file2.txt']) + ] + mock_ctx_inst = mock.Mock(from_time=1611, to_time=1620) + + res = utils.find_files_in_timespan(mock_ctx_inst, ['/mock_dir', '/not_exist']) + + expected_result = ['/mock_dir/file1.txt', '/mock_dir/file2.txt'] + self.assertEqual(res, expected_result) + + @mock.patch('crmsh.report.utils.get_timespan_str') + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + @mock.patch('crmsh.report.utils.arch_logs') + def test_dump_logset_return(self, mock_arch, mock_debug, mock_timespan): + mock_arch.return_value = [[], ""] + mock_ctx_inst = mock.Mock() + utils.dump_logset(mock_ctx_inst, "file") + + @mock.patch('crmsh.report.utils.real_path') + @mock.patch('logging.Logger.debug') + @mock.patch('crmsh.utils.str2file') + @mock.patch('os.path.basename') + @mock.patch('crmsh.report.utils.print_logseg') + @mock.patch('crmsh.report.utils.arch_logs') + def test_dump_logset_irrgular(self, mock_arch, mock_print, mock_basename, mock_str2file, mock_debug, mock_real_path): + mock_real_path.return_value = "file1" + mock_arch.return_value = [["file1"], utils.LogType.IRREGULAR] + mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir") + mock_basename.return_value = "file1" + mock_print.return_value = "data" + utils.dump_logset(mock_ctx_inst, "file1") + mock_print.assert_called_once_with("file1", 0, 0) + mock_str2file.assert_called_once_with("data", "/opt/work_dir/file1") + mock_debug.assert_called_once_with("Dump file1 into file1") + + @mock.patch('crmsh.report.utils.real_path') + @mock.patch('logging.Logger.debug') + @mock.patch('crmsh.utils.str2file') + @mock.patch('os.path.basename') + @mock.patch('crmsh.report.utils.print_logseg') + @mock.patch('crmsh.report.utils.arch_logs') + def test_dump_logset_one(self, mock_arch, mock_print, mock_basename, mock_str2file, mock_debug, mock_real_path): + mock_real_path.return_value = "file1" + mock_arch.return_value = [["file1"], utils.LogType.GOOD] + mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir", from_time=10, to_time=20) + mock_basename.return_value = "file1" + mock_print.return_value = "data" + + utils.dump_logset(mock_ctx_inst, "file1") + + mock_print.assert_called_once_with("file1", 10, 20) + mock_str2file.assert_called_once_with("data", "/opt/work_dir/file1") + mock_debug.assert_called_once_with("Dump file1 into file1") + + @mock.patch('crmsh.report.utils.real_path') + @mock.patch('logging.Logger.debug') + @mock.patch('crmsh.utils.str2file') + @mock.patch('os.path.basename') + @mock.patch('crmsh.report.utils.print_logseg') + @mock.patch('crmsh.report.utils.arch_logs') + def test_dump_logset(self, mock_arch, mock_print, mock_basename, mock_str2file, mock_debug, mock_real_path): + mock_real_path.return_value = "file1" + mock_arch.return_value = [["file1", "file2", "file3"], utils.LogType.GOOD] + mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir", from_time=10, to_time=20) + mock_basename.return_value = "file1" + mock_print.side_effect = ["data1\n", "data2\n", "data3\n"] + + utils.dump_logset(mock_ctx_inst, "file1") + + mock_print.assert_has_calls([ + mock.call("file3", 10, 0), + mock.call("file2", 0, 0), + mock.call("file1", 0, 20), + ]) + mock_str2file.assert_called_once_with("data1\ndata2\ndata3", "/opt/work_dir/file1") + mock_debug.assert_called_once_with("Dump file1 into file1") + + @mock.patch('crmsh.utils.read_from_file') + @mock.patch('os.path.exists') + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + def test_get_distro_info(self, mock_debug2, mock_exists, mock_read): + mock_exists.return_value = True + mock_read.return_value = """ +VERSION_ID="20230629" +PRETTY_NAME="openSUSE Tumbleweed" +ANSI_COLOR="0;32" + """ + res = utils.get_distro_info() + self.assertEqual(res, "openSUSE Tumbleweed") + + @mock.patch('shutil.which') + @mock.patch('crmsh.report.utils.sh.LocalShell') + @mock.patch('os.path.exists') + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + def test_get_distro_info_lsb(self, mock_debug2, mock_exists, mock_sh, mock_which): + mock_which.return_value = True + mock_exists.return_value = False + mock_sh_inst = mock.Mock() + mock_sh.return_value = mock_sh_inst + mock_sh_inst.get_stdout_or_raise_error.return_value = "data" + res = utils.get_distro_info() + self.assertEqual(res, "Unknown") + + @mock.patch('crmsh.report.utils.get_timestamp') + def test_find_first_timestamp_none(self, mock_get_timestamp): + mock_get_timestamp.side_effect = [None, None] + data = ["line1", "line2"] + self.assertIsNone(utils.find_first_timestamp(data, "file1")) + mock_get_timestamp.assert_has_calls([ + mock.call("line1", "file1"), + mock.call("line2", "file1") + ]) + + @mock.patch('crmsh.report.utils.get_timestamp') + def test_find_first_timestamp(self, mock_get_timestamp): + mock_get_timestamp.return_value = 123456 + data = ["line1", "line2"] + res = utils.find_first_timestamp(data, "file1") + self.assertEqual(res, 123456) + mock_get_timestamp.assert_called_once_with("line1", "file1") + + def test_filter_lines(self): + data = """line1 +line2 +line3 +line4 +line5 + """ + res = utils.filter_lines(data, 2, 4) + self.assertEqual(res, 'line2\nline3\nline4\n') + + @mock.patch('crmsh.utils.parse_time') + @mock.patch('crmsh.report.utils.head') + def test_determin_log_format_none(self, mock_head, mock_parse): + mock_head.return_value = ["line1", "line2"] + mock_parse.side_effect = [None, None] + data = """line1 +line2 + """ + self.assertIsNone(utils.determin_log_format(data)) + + def test_determin_log_format_rfc5424(self): + data = """ +2003-10-11T22:14:15.003Z mymachine.example.com su + """ + res = utils.determin_log_format(data) + self.assertEqual(res, "rfc5424") + + def test_determin_log_format_syslog(self): + data = """ +Feb 12 18:30:08 15sp1-1 kernel: + """ + res = utils.determin_log_format(data) + self.assertEqual(res, "syslog") + + @mock.patch('crmsh.utils.parse_time') + @mock.patch('crmsh.report.utils.head') + def test_determin_log_format_legacy(self, mock_head, mock_parse): + mock_head.return_value = ["Legacy 2003-10-11T22:14:15.003Z log"] + mock_parse.side_effect = [None, None, 123456] + data = """ +Legacy 003-10-11T22:14:15.003Z log data log + """ + res = utils.determin_log_format(data) + self.assertEqual(res, "legacy") + mock_parse.assert_has_calls([ + mock.call("Legacy 2003-10-11T22:14:15.003Z log", quiet=True), + mock.call("Legacy", quiet=True), + mock.call("2003-10-11T22:14:15.003Z", quiet=True) + ]) + + def test_get_timestamp_none(self): + self.assertIsNone(utils.get_timestamp("", "file1")) + + @mock.patch('crmsh.report.utils.get_timestamp_from_time_line') + def test_get_timespan_rfc5424(self, mock_get_timestamp): + constants.STAMP_TYPE = "rfc5424" + mock_get_timestamp.return_value = 12345 + res = utils.get_timestamp("2003-10-11T22:14:15.003Z mymachine.example.com su", "file1") + self.assertEqual(res, mock_get_timestamp.return_value) + mock_get_timestamp.assert_called_once_with("2003-10-11T22:14:15.003Z", "rfc5424", "file1") + + @mock.patch('crmsh.report.utils.get_timestamp_from_time_line') + def test_get_timespan_syslog(self, mock_get_timestamp): + constants.STAMP_TYPE = "syslog" + mock_get_timestamp.return_value = 12345 + res = utils.get_timestamp("Feb 12 18:30:08 15sp1-1 kernel:", "file1") + self.assertEqual(res, mock_get_timestamp.return_value) + mock_get_timestamp.assert_called_once_with("Feb 12 18:30:08", "syslog", "file1") + + @mock.patch('crmsh.report.utils.get_timestamp_from_time_line') + def test_get_timespan_legacy(self, mock_get_timestamp): + constants.STAMP_TYPE = "legacy" + mock_get_timestamp.return_value = 12345 + res = utils.get_timestamp("legacy 2003-10-11T22:14:15.003Z log data", "file1") + self.assertEqual(res, mock_get_timestamp.return_value) + mock_get_timestamp.assert_called_once_with("2003-10-11T22:14:15.003Z", "legacy", "file1") + + @mock.patch('crmsh.report.utils.diff_check') + def test_do_compare(self, mock_diff): + mock_ctx_inst = mock.Mock(work_dir="/opt/workdir", node_list=["node1", "node2"]) + mock_diff.side_effect = [[0, ""], [0, ""]] + rc, out = utils.do_compare(mock_ctx_inst, "file1") + self.assertEqual(rc, 0) + self.assertEqual(out, "") + mock_diff.assert_called_once_with("/opt/workdir/node1/file1", "/opt/workdir/node2/file1") + + @mock.patch('os.path.isfile') + def test_diff_check_return(self, mock_isfile): + mock_isfile.return_value = False + rc, out = utils.diff_check("/opt/file1", "/opt/fil2") + self.assertEqual(rc, 1) + self.assertEqual(out, "/opt/file1 does not exist\n") + + @mock.patch('crmsh.report.utils.cib_diff') + @mock.patch('os.path.basename') + @mock.patch('os.path.isfile') + def test_diff_check(self, mock_isfile, mock_basename, mock_cib_diff): + mock_isfile.side_effect = [True, True] + mock_basename.return_value = "cib.xml" + mock_cib_diff.return_value = (0, "") + rc, out = utils.diff_check("/opt/node1/cib.xml", "/opt/node2/cib.xml") + self.assertEqual(rc, 0) + self.assertEqual(out, "") + + @mock.patch('crmsh.report.utils.ShellUtils') + def test_txt_diff(self, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_stderr.return_value = (0, "", None) + rc, out = utils.txt_diff("txt1", "txt2") + self.assertEqual(rc, 0) + self.assertEqual(out, "") + + @mock.patch('os.path.isfile') + def test_cib_diff_not_running(self, mock_isfile): + mock_isfile.side_effect = [True, False, False, True] + rc, out = utils.cib_diff("/opt/node1/cib.xml", "/opt/node2/cib.xml") + self.assertEqual(rc, 1) + self.assertEqual(out, "Can't compare cibs from running and stopped systems\n") + + @mock.patch('crmsh.report.utils.ShellUtils') + @mock.patch('os.path.isfile') + def test_cib_diff(self, mock_isfile, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_isfile.side_effect = [True, True] + mock_run_inst.get_stdout_stderr.return_value = (0, "", None) + rc, out = utils.cib_diff("/opt/node1/cib.xml", "/opt/node2/cib.xml") + self.assertEqual(rc, 0) + self.assertEqual(out, "") + mock_run_inst.get_stdout_stderr.assert_called_once_with("crm_diff -c -n /opt/node1/cib.xml -o /opt/node2/cib.xml") + + @mock.patch('os.symlink') + @mock.patch('shutil.move') + @mock.patch('os.remove') + @mock.patch('os.path.isfile') + def test_consolidate(self, mock_isfile, mock_remove, mock_move, mock_symlink): + mock_isfile.side_effect = [True, False] + mock_ctx_inst = mock.Mock(node_list=["node1", "node2"], work_dir="/opt/workdir") + utils.consolidate(mock_ctx_inst, "target_file") + mock_isfile.assert_has_calls([ + mock.call("/opt/workdir/target_file"), + mock.call("/opt/workdir/target_file") + ]) + mock_symlink.assert_has_calls([ + mock.call('../target_file', '/opt/workdir/node1/target_file'), + mock.call('../target_file', '/opt/workdir/node2/target_file') + ]) + + @mock.patch('crmsh.report.utils.Sanitizer') + def test_do_sanitize(self, mock_sanitizer): + mock_inst = mock.Mock() + mock_sanitizer.return_value = mock_inst + mock_ctx_inst = mock.Mock() + utils.do_sanitize(mock_ctx_inst) + mock_inst.prepare.assert_called_once_with() + mock_inst.sanitize.assert_called_once_with() + + @mock.patch('crmsh.utils.read_from_file') + def test_print_logseg_empty(self, mock_read): + mock_read.return_value = "" + res = utils.print_logseg("log1", 1234, 0) + self.assertEqual(res, "") + + @mock.patch('crmsh.report.utils.findln_by_timestamp') + @mock.patch('crmsh.utils.read_from_file') + def test_print_logseg_none(self, mock_read, mock_findln): + mock_read.return_value = "data" + mock_findln.return_value = None + res = utils.print_logseg("log1", 1234, 0) + self.assertEqual(res, "") + + @mock.patch('crmsh.report.utils.filter_lines') + @mock.patch('crmsh.report.utils.logger', spec=crmsh.log.DEBUG2Logger) + @mock.patch('crmsh.report.utils.findln_by_timestamp') + @mock.patch('crmsh.utils.read_from_file') + def test_print_logseg(self, mock_read, mock_findln, mock_logger, mock_filter): + mock_read.return_value = "line1\nline2\nline3" + mock_filter.return_value = "line1\nline2\nline3" + res = utils.print_logseg("log1", 0, 0) + self.assertEqual(res, mock_filter.return_value) + mock_logger.debug2.assert_called_once_with("Including segment [%d-%d] from %s", 1, 3, "log1") + + def test_head(self): + data = "line1\nline2\nline3" + res = utils.head(2, data) + self.assertEqual(res, ["line1", "line2"]) + + def test_tail(self): + data = "line1\nline2\nline3" + res = utils.tail(2, data) + self.assertEqual(res, ["line2", "line3"]) + + @mock.patch('crmsh.utils.get_open_method') + @mock.patch('builtins.open', create=True) + def test_write_to_file(self, mock_open, mock_method): + mock_method.return_value = mock_open + file_handle = mock_open.return_value.__enter__.return_value + utils.write_to_file('Hello', 'file.txt') + mock_open.assert_called_once_with('file.txt', 'w') + file_handle.write.assert_called_once_with('Hello') + + @mock.patch('gzip.open') + @mock.patch('crmsh.utils.get_open_method') + def test_write_to_file_encode(self, mock_method, mock_open): + mock_method.return_value = mock_open + file_handle = mock_open.return_value.__enter__.return_value + utils.write_to_file('Hello', 'file.txt') + mock_open.assert_called_once_with('file.txt', 'w') + file_handle.write.assert_called_once_with(b'Hello') + + @mock.patch('crmsh.report.utils.dt_to_str') + @mock.patch('crmsh.report.utils.ts_to_dt') + def test_ts_to_str(self, mock_ts_to_dt, mock_dt_to_str): + mock_ts_to_dt.return_value = datetime.datetime(2020, 2, 19, 21, 44, 7, 977355) + mock_dt_to_str.return_value = "2020-02-19 21:44" + res = utils.ts_to_str(1693519260.0) + self.assertEqual(res, mock_dt_to_str.return_value) + + def test_ts_to_dt(self): + res = utils.ts_to_dt(crmutils.parse_to_timestamp("2023-09-01 06:01")) + self.assertEqual(utils.dt_to_str(res), "2023-09-01 06:01:00") + + def test_now(self): + expected_res = datetime.datetime.now().strftime(constants.TIME_FORMAT) + res = utils.now() + self.assertEqual(res, expected_res) + + @mock.patch('crmsh.utils.str2file') + @mock.patch('crmsh.utils.read_from_file') + @mock.patch('os.path.isfile') + @mock.patch('crmsh.report.utils.now') + def test_create_description_template(self, mock_now, mock_isfile, mock_read, mock_str2file): + mock_now.return_value = "2023-09-01 06:01" + sys.argv = ["crm", "report", "option1"] + mock_ctx_inst = mock.Mock(node_list=["node1"], work_dir="/opt/workdir") + mock_isfile.return_value = True + mock_read.return_value = "data" + utils.create_description_template(mock_ctx_inst) + + @mock.patch('crmsh.utils.str2file') + @mock.patch('crmsh.report.utils.extract_critical_log') + @mock.patch('crmsh.report.utils.check_collected_files') + @mock.patch('crmsh.report.utils.compare_and_consolidate_files') + @mock.patch('glob.glob') + def test_analyze(self, mock_glob, mock_compare, mock_check_collected, mock_extract, mock_str2file): + mock_compare.return_value = "data" + mock_check_collected.return_value = "" + mock_extract.return_value = "" + mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir") + utils.analyze(mock_ctx_inst) + mock_str2file.assert_called_once_with("data", f"/opt/work_dir/{constants.ANALYSIS_F}") + + @mock.patch('crmsh.report.utils.consolidate') + @mock.patch('crmsh.report.utils.do_compare') + @mock.patch('glob.glob') + def test_compare_and_consolidate_files(self, mock_glob, mock_compare, mock_consolidate): + mock_ctx_inst = mock.Mock(work_dir="/opt/work_dir") + mock_glob.side_effect = [False, True, True, True, True] + mock_compare.side_effect = [(0, ""), (0, ""), (0, ""), (0, "")] + res = utils.compare_and_consolidate_files(mock_ctx_inst) + self.assertEqual(f"Diff {constants.MEMBERSHIP_F}... no {constants.MEMBERSHIP_F} found in /opt/work_dir\nDiff {constants.CRM_MON_F}... OK\nDiff {constants.COROSYNC_F}... OK\nDiff {constants.SYSINFO_F}... OK\nDiff {constants.CIB_F}... OK\n\n", res) + + @mock.patch('crmsh.utils.read_from_file') + @mock.patch('crmsh.utils.file_is_empty') + @mock.patch('os.path.isfile') + def test_check_collected_files(self, mock_isfile, mock_is_empty, mock_read): + mock_ctx_inst = mock.Mock(node_list=["node1"], work_dir="/opt/work_dir") + mock_isfile.side_effect = [False, False, True] + mock_is_empty.return_value = False + mock_read.return_value = "data" + res = utils.check_collected_files(mock_ctx_inst) + self.assertEqual(res, ["Checking problems with permissions/ownership at node1:", "data"]) + + @mock.patch('logging.Logger.error') + @mock.patch('crmsh.utils.parse_to_timestamp') + def test_parse_to_timestamp_none(self, mock_parse, mock_error): + mock_parse.return_value = None + with self.assertRaises(utils.ReportGenericError) as err: + utils.parse_to_timestamp("xxxxx") + mock_error.assert_has_calls([ + mock.call(f"Invalid time string 'xxxxx'"), + mock.call('Try these formats like: 2pm; "2019/9/5 12:30"; "09-Sep-07 2:00"; "[1-9][0-9]*[YmdHM]"') + ]) + + @mock.patch('logging.Logger.error') + @mock.patch('crmsh.utils.parse_to_timestamp') + def test_parse_to_timestamp(self, mock_parse, mock_error): + mock_parse.return_value = 1234567 + res = utils.parse_to_timestamp("2023") + self.assertEqual(res, mock_parse.return_value) + + def test_parse_to_timestamp_delta(self): + timedelta = datetime.timedelta(days=10) + expected_timestamp = (datetime.datetime.now() - timedelta).timestamp() + res = utils.parse_to_timestamp("10d") + self.assertEqual(int(res), int(expected_timestamp)) + + @mock.patch('crmsh.utils.read_from_file') + @mock.patch('glob.glob') + def test_extract_critical_log(self, mock_glob, mock_read): + mock_glob.return_value = ["/opt/workdir/pacemaker.log"] + mock_read.return_value = """ +line1 +pacemaker-controld[5678]: warning: data +pacemaker-schedulerd[5677]: error: Resource +line4 + """ + mock_ctx_inst = mock.Mock(work_dir="/opt/workdir") + res = utils.extract_critical_log(mock_ctx_inst) + expected_data = """ +WARNINGS or ERRORS in pacemaker.log: +pacemaker-controld[5678]: warning: data +pacemaker-schedulerd[5677]: error: Resource""" + self.assertEqual('\n'.join(res), expected_data) + + def test_findln_by_timestamp_1(self): + pacemaker_file_path = "pacemaker.log.2" + with open(pacemaker_file_path) as f: + data = f.read() + data_list = data.split('\n') + constants.STAMP_TYPE = utils.determin_log_format(data) + first_timestamp = utils.get_timestamp(data_list[0], pacemaker_file_path) + middle_timestamp = utils.get_timestamp(data_list[1], pacemaker_file_path) + last_timestamp = utils.get_timestamp(data_list[2], pacemaker_file_path) + assert first_timestamp < middle_timestamp < last_timestamp + line_stamp = crmutils.parse_to_timestamp("Jan 03 11:03:41 2024") + result_line = utils.findln_by_timestamp(data, line_stamp, pacemaker_file_path) + assert result_line == 2 + line_stamp = crmutils.parse_to_timestamp("Jan 03 12:03:41 2024") + result_line = utils.findln_by_timestamp(data, line_stamp, pacemaker_file_path) + assert result_line == 3 + + def test_findln_by_timestamp_irregular(self): + data = """line1 + line2 + line3 + line4""" + target_time = "Apr 03 13:10" + target_time_stamp = crmutils.parse_to_timestamp(target_time) + result_line = utils.findln_by_timestamp(data, target_time_stamp, "file1") + self.assertIsNone(result_line) + + def test_findln_by_timestamp(self): + target_time = "Apr 03 13:10" + target_time_stamp = crmutils.parse_to_timestamp(target_time+' 2023') + with open('pacemaker.log') as f: + data = f.read() + constants.STAMP_TYPE = utils.determin_log_format(data) + pacemaker_file_path = "pacemaker.log" + result_line = utils.findln_by_timestamp(data, target_time_stamp, pacemaker_file_path) + result_line_stamp = utils.get_timestamp(data.split('\n')[result_line-1], pacemaker_file_path) + assert result_line_stamp > target_time_stamp + result_pre_line_stamp = utils.get_timestamp(data.split('\n')[result_line-2], pacemaker_file_path) + assert result_pre_line_stamp < target_time_stamp + + target_time = "Apr 03 11:01:19" + target_time_stamp = crmutils.parse_to_timestamp(target_time+' 2023') + result_line = utils.findln_by_timestamp(data, target_time_stamp, pacemaker_file_path) + result_time = ' '.join(data.split('\n')[result_line-1].split()[:3]) + self.assertEqual(result_time, target_time) + + @mock.patch('crmsh.utils.parse_to_timestamp') + def test_get_timestamp_from_time_line_not_syslog(self, mock_parse): + mock_parse.return_value = 123456 + res = utils.get_timestamp_from_time_line("line1", "rfc5424", "file1") + self.assertEqual(res, mock_parse.return_value) + + @mock.patch('os.path.getmtime') + @mock.patch('crmsh.report.utils.datetime') + @mock.patch('crmsh.utils.parse_to_timestamp') + def test_get_timestamp_from_time_line_next_year(self, mock_parse, mock_datetime, mock_getmtime): + mock_parse.return_value = 8888888888888 + mock_getmtime.return_value = 1691938980.0 + mock_datetime.datetime.now.return_value = datetime.datetime(2023, 9, 1, 6, 1) + mock_datetime.datetime.fromtimestamp.return_value = datetime.datetime(2024, 9, 1, 6, 1) + res = utils.get_timestamp_from_time_line("line1", "syslog", "file1") + self.assertIsNone(res) + + @mock.patch('os.path.getmtime') + @mock.patch('crmsh.report.utils.datetime') + @mock.patch('crmsh.utils.parse_to_timestamp') + def test_get_timestamp_from_time_line_that_year(self, mock_parse, mock_datetime, mock_getmtime): + mock_parse.return_value = 8888888888888 + mock_getmtime.return_value = 1691938980.0 + mock_datetime.datetime.now.return_value = datetime.datetime(2023, 9, 1, 6, 1) + mock_datetime.datetime.fromtimestamp.return_value = datetime.datetime(2022, 9, 1, 6, 1) + res = utils.get_timestamp_from_time_line("line1", "syslog", "file1") + self.assertEqual(res, mock_parse.return_value) -- cgit v1.2.3