import os import re import time from datetime import datetime, timedelta from io import SEEK_END from typing import List, Tuple, Any class HttpdErrorLog: """Checking the httpd error log for errors and warnings, including limiting checks from a last known position forward. """ RE_ERRLOG_ERROR = re.compile(r'.*\[(?P[^:]+):error].*') RE_ERRLOG_WARN = re.compile(r'.*\[(?P[^:]+):warn].*') RE_APLOGNO = re.compile(r'.*\[(?P[^:]+):(error|warn)].* (?PAH\d+): .+') RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P\S+):.+') def __init__(self, path: str): self._path = path self._ignored_modules = [] self._ignored_lognos = set() self._ignored_patterns = [] # remember the file position we started with self._start_pos = 0 if os.path.isfile(self._path): with open(self._path) as fd: self._start_pos = fd.seek(0, SEEK_END) self._last_pos = self._start_pos self._last_errors = [] self._last_warnings = [] self._observed_erros = set() self._observed_warnings = set() def __repr__(self): return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \ f"warnings: {' '.join(self._last_warnings)}]" @property def path(self) -> str: return self._path def clear_log(self): if os.path.isfile(self.path): os.remove(self.path) self._start_pos = 0 self._last_pos = self._start_pos self._last_errors = [] self._last_warnings = [] self._observed_erros = set() self._observed_warnings = set() def set_ignored_modules(self, modules: List[str]): self._ignored_modules = modules.copy() if modules else [] def set_ignored_lognos(self, lognos: List[str]): if lognos: for l in lognos: self._ignored_lognos.add(l) def add_ignored_patterns(self, patterns: List[Any]): self._ignored_patterns.extend(patterns) def _is_ignored(self, line: str) -> bool: for p in self._ignored_patterns: if p.match(line): return True m = self.RE_APLOGNO.match(line) if m and m.group('aplogno') in self._ignored_lognos: return True return False def get_recent(self, advance=True) -> Tuple[List[str], List[str]]: """Collect error and warning from the log since the last remembered position :param advance: advance the position to the end of the log afterwards :return: list of error and list of warnings as tuple """ self._last_errors = [] self._last_warnings = [] if os.path.isfile(self._path): with open(self._path) as fd: fd.seek(self._last_pos, os.SEEK_SET) for line in fd: if self._is_ignored(line): continue m = self.RE_ERRLOG_ERROR.match(line) if m and m.group('module') not in self._ignored_modules: self._last_errors.append(line) continue m = self.RE_ERRLOG_WARN.match(line) if m: if m and m.group('module') not in self._ignored_modules: self._last_warnings.append(line) continue if advance: self._last_pos = fd.tell() self._observed_erros.update(set(self._last_errors)) self._observed_warnings.update(set(self._last_warnings)) return self._last_errors, self._last_warnings def get_recent_count(self, advance=True): errors, warnings = self.get_recent(advance=advance) return len(errors), len(warnings) def ignore_recent(self): """After a test case triggered errors/warnings on purpose, add those to our 'observed' list so the do not get reported as 'missed'. """ self._last_errors = [] self._last_warnings = [] if os.path.isfile(self._path): with open(self._path) as fd: fd.seek(self._last_pos, os.SEEK_SET) for line in fd: if self._is_ignored(line): continue m = self.RE_ERRLOG_ERROR.match(line) if m and m.group('module') not in self._ignored_modules: self._observed_erros.add(line) continue m = self.RE_ERRLOG_WARN.match(line) if m: if m and m.group('module') not in self._ignored_modules: self._observed_warnings.add(line) continue self._last_pos = fd.tell() def get_missed(self) -> Tuple[List[str], List[str]]: errors = [] warnings = [] if os.path.isfile(self._path): with open(self._path) as fd: fd.seek(self._start_pos, os.SEEK_SET) for line in fd: if self._is_ignored(line): continue m = self.RE_ERRLOG_ERROR.match(line) if m and m.group('module') not in self._ignored_modules \ and line not in self._observed_erros: errors.append(line) continue m = self.RE_ERRLOG_WARN.match(line) if m: if m and m.group('module') not in self._ignored_modules \ and line not in self._observed_warnings: warnings.append(line) continue return errors, warnings def scan_recent(self, pattern: re, timeout=10): if not os.path.isfile(self.path): return False with open(self.path) as fd: end = datetime.now() + timedelta(seconds=timeout) while True: fd.seek(self._last_pos, os.SEEK_SET) for line in fd: if pattern.match(line): return True if datetime.now() > end: raise TimeoutError(f"pattern not found in error log after {timeout} seconds") time.sleep(.1) return False