summaryrefslogtreecommitdiffstats
path: root/test/pyhttpd/log.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-05 10:00:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-05 10:00:10 +0000
commit3204e211a1e248154ff95b90b6a7e29cfa92069c (patch)
tree79f901498145b63bf34e9981a013f3d9b52eafc2 /test/pyhttpd/log.py
parentAdding upstream version 2.4.61. (diff)
downloadapache2-3204e211a1e248154ff95b90b6a7e29cfa92069c.tar.xz
apache2-3204e211a1e248154ff95b90b6a7e29cfa92069c.zip
Adding upstream version 2.4.62.upstream/2.4.62upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/pyhttpd/log.py')
-rw-r--r--test/pyhttpd/log.py165
1 files changed, 77 insertions, 88 deletions
diff --git a/test/pyhttpd/log.py b/test/pyhttpd/log.py
index dff7623..17b0502 100644
--- a/test/pyhttpd/log.py
+++ b/test/pyhttpd/log.py
@@ -8,33 +8,32 @@ from typing import List, Tuple, Any
class HttpdErrorLog:
"""Checking the httpd error log for errors and warnings, including
- limiting checks from a last known position forward.
+ limiting checks from a recent known position forward.
"""
- RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
- RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
- RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
- RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')
+ RE_ERRLOG_WARN = re.compile(r'.*\[[^:]+:warn].*')
+ RE_ERRLOG_ERROR = re.compile(r'.*\[[^:]+:error].*')
+ RE_APLOGNO = re.compile(r'.*\[[^:]+:(error|warn)].* (?P<aplogno>AH\d+): .+')
def __init__(self, path: str):
self._path = path
- self._ignored_modules = []
+ self._ignored_matches = []
self._ignored_lognos = set()
- self._ignored_patterns = []
# remember the file position we started with
self._start_pos = 0
if os.path.isfile(self._path):
with open(self._path) as fd:
self._start_pos = fd.seek(0, SEEK_END)
- self._last_pos = self._start_pos
- self._last_errors = []
- self._last_warnings = []
- self._observed_erros = set()
- self._observed_warnings = set()
+ self._recent_pos = self._start_pos
+ self._recent_errors = []
+ self._recent_warnings = []
+ self._caught_errors = set()
+ self._caught_warnings = set()
+ self._caught_matches = set()
def __repr__(self):
- return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \
- f"warnings: {' '.join(self._last_warnings)}]"
+ return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._recent_errors)}, " \
+ f"warnings: {' '.join(self._recent_warnings)}]"
@property
def path(self) -> str:
@@ -42,118 +41,108 @@ class HttpdErrorLog:
def clear_log(self):
if os.path.isfile(self.path):
- os.remove(self.path)
- self._start_pos = 0
- self._last_pos = self._start_pos
- self._last_errors = []
- self._last_warnings = []
- self._observed_erros = set()
- self._observed_warnings = set()
+ os.truncate(self.path, 0)
+ self._start_pos = self._recent_pos = 0
+ self._recent_errors = []
+ self._recent_warnings = []
+ self._caught_errors = set()
+ self._caught_warnings = set()
+ self._caught_matches = set()
+
+ def _lookup_matches(self, line: str, matches: List[str]) -> bool:
+ for m in matches:
+ if re.match(m, line):
+ return True
+ return False
+
+ def _lookup_lognos(self, line: str, lognos: set) -> bool:
+ if len(lognos) > 0:
+ m = self.RE_APLOGNO.match(line)
+ if m and m.group('aplogno') in lognos:
+ return True
+ return False
- def set_ignored_modules(self, modules: List[str]):
- self._ignored_modules = modules.copy() if modules else []
+ def clear_ignored_matches(self):
+ self._ignored_matches = []
- def set_ignored_lognos(self, lognos: List[str]):
- if lognos:
- for l in lognos:
- self._ignored_lognos.add(l)
+ def add_ignored_matches(self, matches: List[str]):
+ for m in matches:
+ self._ignored_matches.append(re.compile(m))
- def add_ignored_patterns(self, patterns: List[Any]):
- self._ignored_patterns.extend(patterns)
+ def clear_ignored_lognos(self):
+ self._ignored_lognos = set()
+
+ def add_ignored_lognos(self, lognos: List[str]):
+ for l in lognos:
+ self._ignored_lognos.add(l)
def _is_ignored(self, line: str) -> bool:
- for p in self._ignored_patterns:
- if p.match(line):
- return True
- m = self.RE_APLOGNO.match(line)
- if m and m.group('aplogno') in self._ignored_lognos:
+ if self._lookup_matches(line, self._ignored_matches):
+ return True
+ if self._lookup_lognos(line, self._ignored_lognos):
return True
return False
- def get_recent(self, advance=True) -> Tuple[List[str], List[str]]:
- """Collect error and warning from the log since the last remembered position
- :param advance: advance the position to the end of the log afterwards
- :return: list of error and list of warnings as tuple
- """
- self._last_errors = []
- self._last_warnings = []
+ def ignore_recent(self, lognos: List[str] = [], matches: List[str] = []):
+ """After a test case triggered errors/warnings on purpose, add
+ those to our 'caught' list so the do not get reported as 'missed'.
+ """
+ self._recent_errors = []
+ self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
- fd.seek(self._last_pos, os.SEEK_SET)
+ fd.seek(self._recent_pos, os.SEEK_SET)
+ lognos_set = set(lognos)
for line in fd:
if self._is_ignored(line):
continue
- m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in self._ignored_modules:
- self._last_errors.append(line)
+ if self._lookup_matches(line, matches):
+ self._caught_matches.add(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
- if m:
- if m and m.group('module') not in self._ignored_modules:
- self._last_warnings.append(line)
- continue
- if advance:
- self._last_pos = fd.tell()
- self._observed_erros.update(set(self._last_errors))
- self._observed_warnings.update(set(self._last_warnings))
- return self._last_errors, self._last_warnings
-
- def get_recent_count(self, advance=True):
- errors, warnings = self.get_recent(advance=advance)
- return len(errors), len(warnings)
-
- def ignore_recent(self):
- """After a test case triggered errors/warnings on purpose, add
- those to our 'observed' list so the do not get reported as 'missed'.
- """
- self._last_errors = []
- self._last_warnings = []
- if os.path.isfile(self._path):
- with open(self._path) as fd:
- fd.seek(self._last_pos, os.SEEK_SET)
- for line in fd:
- if self._is_ignored(line):
+ if m and self._lookup_lognos(line, lognos_set):
+ self._caught_warnings.add(line)
continue
m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in self._ignored_modules:
- self._observed_erros.add(line)
+ if m and self._lookup_lognos(line, lognos_set):
+ self._caught_errors.add(line)
continue
- m = self.RE_ERRLOG_WARN.match(line)
- if m:
- if m and m.group('module') not in self._ignored_modules:
- self._observed_warnings.add(line)
- continue
- self._last_pos = fd.tell()
+ self._recent_pos = fd.tell()
def get_missed(self) -> Tuple[List[str], List[str]]:
errors = []
warnings = []
+ self._recent_errors = []
+ self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._start_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
+ if line in self._caught_matches:
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m and line not in self._caught_warnings:
+ warnings.append(line)
+ continue
m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in self._ignored_modules \
- and line not in self._observed_erros:
+ if m and line not in self._caught_errors:
errors.append(line)
continue
- m = self.RE_ERRLOG_WARN.match(line)
- if m:
- if m and m.group('module') not in self._ignored_modules \
- and line not in self._observed_warnings:
- warnings.append(line)
- continue
+ self._start_pos = self._recent_pos = fd.tell()
+ self._caught_errors = set()
+ self._caught_warnings = set()
+ self._caught_matches = set()
return errors, warnings
- def scan_recent(self, pattern: re, timeout=10):
+ def scan_recent(self, pattern: re.Pattern, timeout=10):
if not os.path.isfile(self.path):
return False
with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout)
while True:
- fd.seek(self._last_pos, os.SEEK_SET)
+ fd.seek(self._recent_pos, os.SEEK_SET)
for line in fd:
if pattern.match(line):
return True