summaryrefslogtreecommitdiffstats
path: root/test/pyhttpd/log.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/pyhttpd/log.py')
-rw-r--r--test/pyhttpd/log.py163
1 files changed, 163 insertions, 0 deletions
diff --git a/test/pyhttpd/log.py b/test/pyhttpd/log.py
new file mode 100644
index 0000000..dff7623
--- /dev/null
+++ b/test/pyhttpd/log.py
@@ -0,0 +1,163 @@
+import os
+import re
+import time
+from datetime import datetime, timedelta
+from io import SEEK_END
+from typing import List, Tuple, Any
+
+
+class HttpdErrorLog:
+ """Checking the httpd error log for errors and warnings, including
+ limiting checks from a last known position forward.
+ """
+
+ RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
+ RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
+ RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
+ RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')
+
+ def __init__(self, path: str):
+ self._path = path
+ self._ignored_modules = []
+ self._ignored_lognos = set()
+ self._ignored_patterns = []
+ # remember the file position we started with
+ self._start_pos = 0
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ self._start_pos = fd.seek(0, SEEK_END)
+ self._last_pos = self._start_pos
+ self._last_errors = []
+ self._last_warnings = []
+ self._observed_erros = set()
+ self._observed_warnings = set()
+
+ def __repr__(self):
+ return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \
+ f"warnings: {' '.join(self._last_warnings)}]"
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+ def clear_log(self):
+ if os.path.isfile(self.path):
+ os.remove(self.path)
+ self._start_pos = 0
+ self._last_pos = self._start_pos
+ self._last_errors = []
+ self._last_warnings = []
+ self._observed_erros = set()
+ self._observed_warnings = set()
+
+ def set_ignored_modules(self, modules: List[str]):
+ self._ignored_modules = modules.copy() if modules else []
+
+ def set_ignored_lognos(self, lognos: List[str]):
+ if lognos:
+ for l in lognos:
+ self._ignored_lognos.add(l)
+
+ def add_ignored_patterns(self, patterns: List[Any]):
+ self._ignored_patterns.extend(patterns)
+
+ def _is_ignored(self, line: str) -> bool:
+ for p in self._ignored_patterns:
+ if p.match(line):
+ return True
+ m = self.RE_APLOGNO.match(line)
+ if m and m.group('aplogno') in self._ignored_lognos:
+ return True
+ return False
+
+ def get_recent(self, advance=True) -> Tuple[List[str], List[str]]:
+ """Collect error and warning from the log since the last remembered position
+ :param advance: advance the position to the end of the log afterwards
+ :return: list of error and list of warnings as tuple
+ """
+ self._last_errors = []
+ self._last_warnings = []
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ fd.seek(self._last_pos, os.SEEK_SET)
+ for line in fd:
+ if self._is_ignored(line):
+ continue
+ m = self.RE_ERRLOG_ERROR.match(line)
+ if m and m.group('module') not in self._ignored_modules:
+ self._last_errors.append(line)
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m:
+ if m and m.group('module') not in self._ignored_modules:
+ self._last_warnings.append(line)
+ continue
+ if advance:
+ self._last_pos = fd.tell()
+ self._observed_erros.update(set(self._last_errors))
+ self._observed_warnings.update(set(self._last_warnings))
+ return self._last_errors, self._last_warnings
+
+ def get_recent_count(self, advance=True):
+ errors, warnings = self.get_recent(advance=advance)
+ return len(errors), len(warnings)
+
+ def ignore_recent(self):
+ """After a test case triggered errors/warnings on purpose, add
+ those to our 'observed' list so the do not get reported as 'missed'.
+ """
+ self._last_errors = []
+ self._last_warnings = []
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ fd.seek(self._last_pos, os.SEEK_SET)
+ for line in fd:
+ if self._is_ignored(line):
+ continue
+ m = self.RE_ERRLOG_ERROR.match(line)
+ if m and m.group('module') not in self._ignored_modules:
+ self._observed_erros.add(line)
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m:
+ if m and m.group('module') not in self._ignored_modules:
+ self._observed_warnings.add(line)
+ continue
+ self._last_pos = fd.tell()
+
+ def get_missed(self) -> Tuple[List[str], List[str]]:
+ errors = []
+ warnings = []
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ fd.seek(self._start_pos, os.SEEK_SET)
+ for line in fd:
+ if self._is_ignored(line):
+ continue
+ m = self.RE_ERRLOG_ERROR.match(line)
+ if m and m.group('module') not in self._ignored_modules \
+ and line not in self._observed_erros:
+ errors.append(line)
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m:
+ if m and m.group('module') not in self._ignored_modules \
+ and line not in self._observed_warnings:
+ warnings.append(line)
+ continue
+ return errors, warnings
+
+ def scan_recent(self, pattern: re, timeout=10):
+ if not os.path.isfile(self.path):
+ return False
+ with open(self.path) as fd:
+ end = datetime.now() + timedelta(seconds=timeout)
+ while True:
+ fd.seek(self._last_pos, os.SEEK_SET)
+ for line in fd:
+ if pattern.match(line):
+ return True
+ if datetime.now() > end:
+ raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
+ time.sleep(.1)
+ return False