summaryrefslogtreecommitdiffstats
path: root/test/pyhttpd/log.py
blob: dff7623b246ad51ee1d3ef03a9575fe698424d08 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import re
import time
from datetime import datetime, timedelta
from io import SEEK_END
from typing import List, Tuple, Any


class HttpdErrorLog:
    """Checking the httpd error log for errors and warnings, including
       limiting checks from a last known position forward.
    """

    RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
    RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
    RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
    RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')

    def __init__(self, path: str):
        self._path = path
        self._ignored_modules = []
        self._ignored_lognos = set()
        self._ignored_patterns = []
        # remember the file position we started with
        self._start_pos = 0
        if os.path.isfile(self._path):
            with open(self._path) as fd:
                self._start_pos = fd.seek(0, SEEK_END)
        self._last_pos = self._start_pos
        self._last_errors = []
        self._last_warnings = []
        self._observed_erros = set()
        self._observed_warnings = set()

    def __repr__(self):
        return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \
               f"warnings: {' '.join(self._last_warnings)}]"

    @property
    def path(self) -> str:
        return self._path

    def clear_log(self):
        if os.path.isfile(self.path):
            os.remove(self.path)
        self._start_pos = 0
        self._last_pos = self._start_pos
        self._last_errors = []
        self._last_warnings = []
        self._observed_erros = set()
        self._observed_warnings = set()

    def set_ignored_modules(self, modules: List[str]):
        self._ignored_modules = modules.copy() if modules else []

    def set_ignored_lognos(self, lognos: List[str]):
        if lognos:
            for l in lognos:
                self._ignored_lognos.add(l)

    def add_ignored_patterns(self, patterns: List[Any]):
        self._ignored_patterns.extend(patterns)

    def _is_ignored(self, line: str) -> bool:
        for p in self._ignored_patterns:
            if p.match(line):
                return True
        m = self.RE_APLOGNO.match(line)
        if m and m.group('aplogno') in self._ignored_lognos:
            return True
        return False

    def get_recent(self, advance=True) -> Tuple[List[str], List[str]]:
        """Collect error and warning from the log since the last remembered position
        :param advance: advance the position to the end of the log afterwards
        :return: list of error and list of warnings as tuple
        """
        self._last_errors = []
        self._last_warnings = []
        if os.path.isfile(self._path):
            with open(self._path) as fd:
                fd.seek(self._last_pos, os.SEEK_SET)
                for line in fd:
                    if self._is_ignored(line):
                        continue
                    m = self.RE_ERRLOG_ERROR.match(line)
                    if m and m.group('module') not in self._ignored_modules:
                        self._last_errors.append(line)
                        continue
                    m = self.RE_ERRLOG_WARN.match(line)
                    if m:
                        if m and m.group('module') not in self._ignored_modules:
                            self._last_warnings.append(line)
                            continue
                if advance:
                    self._last_pos = fd.tell()
            self._observed_erros.update(set(self._last_errors))
            self._observed_warnings.update(set(self._last_warnings))
        return self._last_errors, self._last_warnings

    def get_recent_count(self, advance=True):
        errors, warnings = self.get_recent(advance=advance)
        return len(errors), len(warnings)

    def ignore_recent(self):
        """After a test case triggered errors/warnings on purpose, add
           those to our 'observed' list so the do not get reported as 'missed'.
           """
        self._last_errors = []
        self._last_warnings = []
        if os.path.isfile(self._path):
            with open(self._path) as fd:
                fd.seek(self._last_pos, os.SEEK_SET)
                for line in fd:
                    if self._is_ignored(line):
                        continue
                    m = self.RE_ERRLOG_ERROR.match(line)
                    if m and m.group('module') not in self._ignored_modules:
                        self._observed_erros.add(line)
                        continue
                    m = self.RE_ERRLOG_WARN.match(line)
                    if m:
                        if m and m.group('module') not in self._ignored_modules:
                            self._observed_warnings.add(line)
                            continue
                self._last_pos = fd.tell()

    def get_missed(self) -> Tuple[List[str], List[str]]:
        errors = []
        warnings = []
        if os.path.isfile(self._path):
            with open(self._path) as fd:
                fd.seek(self._start_pos, os.SEEK_SET)
                for line in fd:
                    if self._is_ignored(line):
                        continue
                    m = self.RE_ERRLOG_ERROR.match(line)
                    if m and m.group('module') not in self._ignored_modules \
                            and line not in self._observed_erros:
                        errors.append(line)
                        continue
                    m = self.RE_ERRLOG_WARN.match(line)
                    if m:
                        if m and m.group('module') not in self._ignored_modules \
                                and line not in self._observed_warnings:
                            warnings.append(line)
                            continue
        return errors, warnings

    def scan_recent(self, pattern: re, timeout=10):
        if not os.path.isfile(self.path):
            return False
        with open(self.path) as fd:
            end = datetime.now() + timedelta(seconds=timeout)
            while True:
                fd.seek(self._last_pos, os.SEEK_SET)
                for line in fd:
                    if pattern.match(line):
                        return True
                if datetime.now() > end:
                    raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
                time.sleep(.1)
        return False