1
0
Fork 0
apache2/test/pyhttpd/log.py
Daniel Baumann 7263481e48
Adding upstream version 2.4.63.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-22 11:01:26 +02:00

152 lines
5.5 KiB
Python

import os
import re
import time
from datetime import datetime, timedelta
from io import SEEK_END
from typing import List, Tuple, Any
class HttpdErrorLog:
"""Checking the httpd error log for errors and warnings, including
limiting checks from a recent known position forward.
"""
RE_ERRLOG_WARN = re.compile(r'.*\[[^:]+:warn].*')
RE_ERRLOG_ERROR = re.compile(r'.*\[[^:]+:error].*')
RE_APLOGNO = re.compile(r'.*\[[^:]+:(error|warn)].* (?P<aplogno>AH\d+): .+')
def __init__(self, path: str):
self._path = path
self._ignored_matches = []
self._ignored_lognos = set()
# remember the file position we started with
self._start_pos = 0
if os.path.isfile(self._path):
with open(self._path) as fd:
self._start_pos = fd.seek(0, SEEK_END)
self._recent_pos = self._start_pos
self._recent_errors = []
self._recent_warnings = []
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set()
def __repr__(self):
return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._recent_errors)}, " \
f"warnings: {' '.join(self._recent_warnings)}]"
@property
def path(self) -> str:
return self._path
def clear_log(self):
if os.path.isfile(self.path):
os.truncate(self.path, 0)
self._start_pos = self._recent_pos = 0
self._recent_errors = []
self._recent_warnings = []
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set()
def _lookup_matches(self, line: str, matches: List[str]) -> bool:
for m in matches:
if re.match(m, line):
return True
return False
def _lookup_lognos(self, line: str, lognos: set) -> bool:
if len(lognos) > 0:
m = self.RE_APLOGNO.match(line)
if m and m.group('aplogno') in lognos:
return True
return False
def clear_ignored_matches(self):
self._ignored_matches = []
def add_ignored_matches(self, matches: List[str]):
for m in matches:
self._ignored_matches.append(re.compile(m))
def clear_ignored_lognos(self):
self._ignored_lognos = set()
def add_ignored_lognos(self, lognos: List[str]):
for l in lognos:
self._ignored_lognos.add(l)
def _is_ignored(self, line: str) -> bool:
if self._lookup_matches(line, self._ignored_matches):
return True
if self._lookup_lognos(line, self._ignored_lognos):
return True
return False
def ignore_recent(self, lognos: List[str] = [], matches: List[str] = []):
"""After a test case triggered errors/warnings on purpose, add
those to our 'caught' list so the do not get reported as 'missed'.
"""
self._recent_errors = []
self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._recent_pos, os.SEEK_SET)
lognos_set = set(lognos)
for line in fd:
if self._is_ignored(line):
continue
if self._lookup_matches(line, matches):
self._caught_matches.add(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
if m and self._lookup_lognos(line, lognos_set):
self._caught_warnings.add(line)
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and self._lookup_lognos(line, lognos_set):
self._caught_errors.add(line)
continue
self._recent_pos = fd.tell()
def get_missed(self) -> Tuple[List[str], List[str]]:
errors = []
warnings = []
self._recent_errors = []
self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._start_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
if line in self._caught_matches:
continue
m = self.RE_ERRLOG_WARN.match(line)
if m and line not in self._caught_warnings:
warnings.append(line)
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and line not in self._caught_errors:
errors.append(line)
continue
self._start_pos = self._recent_pos = fd.tell()
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set()
return errors, warnings
def scan_recent(self, pattern: re.Pattern, timeout=10):
if not os.path.isfile(self.path):
return False
with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout)
while True:
fd.seek(self._recent_pos, os.SEEK_SET)
for line in fd:
if pattern.match(line):
return True
if datetime.now() > end:
raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
time.sleep(.1)
return False