diff options
Diffstat (limited to 'test/pyhttpd/log.py')
-rw-r--r-- | test/pyhttpd/log.py | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/test/pyhttpd/log.py b/test/pyhttpd/log.py new file mode 100644 index 0000000..dff7623 --- /dev/null +++ b/test/pyhttpd/log.py @@ -0,0 +1,163 @@ +import os +import re +import time +from datetime import datetime, timedelta +from io import SEEK_END +from typing import List, Tuple, Any + + +class HttpdErrorLog: + """Checking the httpd error log for errors and warnings, including + limiting checks from a last known position forward. + """ + + RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*') + RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*') + RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+') + RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+') + + def __init__(self, path: str): + self._path = path + self._ignored_modules = [] + self._ignored_lognos = set() + self._ignored_patterns = [] + # remember the file position we started with + self._start_pos = 0 + if os.path.isfile(self._path): + with open(self._path) as fd: + self._start_pos = fd.seek(0, SEEK_END) + self._last_pos = self._start_pos + self._last_errors = [] + self._last_warnings = [] + self._observed_erros = set() + self._observed_warnings = set() + + def __repr__(self): + return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \ + f"warnings: {' '.join(self._last_warnings)}]" + + @property + def path(self) -> str: + return self._path + + def clear_log(self): + if os.path.isfile(self.path): + os.remove(self.path) + self._start_pos = 0 + self._last_pos = self._start_pos + self._last_errors = [] + self._last_warnings = [] + self._observed_erros = set() + self._observed_warnings = set() + + def set_ignored_modules(self, modules: List[str]): + self._ignored_modules = modules.copy() if modules else [] + + def set_ignored_lognos(self, lognos: List[str]): + if lognos: + for l in lognos: + self._ignored_lognos.add(l) + + def add_ignored_patterns(self, patterns: List[Any]): + self._ignored_patterns.extend(patterns) + + def _is_ignored(self, line: str) -> bool: + for p in self._ignored_patterns: + if p.match(line): + return True + m = self.RE_APLOGNO.match(line) + if m and m.group('aplogno') in self._ignored_lognos: + return True + return False + + def get_recent(self, advance=True) -> Tuple[List[str], List[str]]: + """Collect error and warning from the log since the last remembered position + :param advance: advance the position to the end of the log afterwards + :return: list of error and list of warnings as tuple + """ + self._last_errors = [] + self._last_warnings = [] + if os.path.isfile(self._path): + with open(self._path) as fd: + fd.seek(self._last_pos, os.SEEK_SET) + for line in fd: + if self._is_ignored(line): + continue + m = self.RE_ERRLOG_ERROR.match(line) + if m and m.group('module') not in self._ignored_modules: + self._last_errors.append(line) + continue + m = self.RE_ERRLOG_WARN.match(line) + if m: + if m and m.group('module') not in self._ignored_modules: + self._last_warnings.append(line) + continue + if advance: + self._last_pos = fd.tell() + self._observed_erros.update(set(self._last_errors)) + self._observed_warnings.update(set(self._last_warnings)) + return self._last_errors, self._last_warnings + + def get_recent_count(self, advance=True): + errors, warnings = self.get_recent(advance=advance) + return len(errors), len(warnings) + + def ignore_recent(self): + """After a test case triggered errors/warnings on purpose, add + those to our 'observed' list so the do not get reported as 'missed'. + """ + self._last_errors = [] + self._last_warnings = [] + if os.path.isfile(self._path): + with open(self._path) as fd: + fd.seek(self._last_pos, os.SEEK_SET) + for line in fd: + if self._is_ignored(line): + continue + m = self.RE_ERRLOG_ERROR.match(line) + if m and m.group('module') not in self._ignored_modules: + self._observed_erros.add(line) + continue + m = self.RE_ERRLOG_WARN.match(line) + if m: + if m and m.group('module') not in self._ignored_modules: + self._observed_warnings.add(line) + continue + self._last_pos = fd.tell() + + def get_missed(self) -> Tuple[List[str], List[str]]: + errors = [] + warnings = [] + if os.path.isfile(self._path): + with open(self._path) as fd: + fd.seek(self._start_pos, os.SEEK_SET) + for line in fd: + if self._is_ignored(line): + continue + m = self.RE_ERRLOG_ERROR.match(line) + if m and m.group('module') not in self._ignored_modules \ + and line not in self._observed_erros: + errors.append(line) + continue + m = self.RE_ERRLOG_WARN.match(line) + if m: + if m and m.group('module') not in self._ignored_modules \ + and line not in self._observed_warnings: + warnings.append(line) + continue + return errors, warnings + + def scan_recent(self, pattern: re, timeout=10): + if not os.path.isfile(self.path): + return False + with open(self.path) as fd: + end = datetime.now() + timedelta(seconds=timeout) + while True: + fd.seek(self._last_pos, os.SEEK_SET) + for line in fd: + if pattern.match(line): + return True + if datetime.now() > end: + raise TimeoutError(f"pattern not found in error log after {timeout} seconds") + time.sleep(.1) + return False |