summaryrefslogtreecommitdiffstats
path: root/test/pyhttpd
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/pyhttpd/conf.py87
-rw-r--r--test/pyhttpd/curl.py2
-rw-r--r--test/pyhttpd/env.py29
-rw-r--r--test/pyhttpd/log.py165
4 files changed, 187 insertions, 96 deletions
diff --git a/test/pyhttpd/conf.py b/test/pyhttpd/conf.py
index cd3363f..e1c6bf5 100644
--- a/test/pyhttpd/conf.py
+++ b/test/pyhttpd/conf.py
@@ -26,15 +26,96 @@ class HttpdConf(object):
def install(self):
self.env.install_test_conf(self._lines)
+ def replacetlsstr(self, line):
+ l = line.replace("TLS_", "")
+ l = l.replace("\n", " ")
+ l = l.replace("\\", " ")
+ l = " ".join(l.split())
+ l = l.replace(" ", ":")
+ l = l.replace("_", "-")
+ l = l.replace("-WITH", "")
+ l = l.replace("AES-", "AES")
+ l = l.replace("POLY1305-SHA256", "POLY1305")
+ return l
+
+ def replaceinstr(self, line):
+ if line.startswith("TLSCiphersPrefer"):
+ # the "TLS_" are changed into "".
+ l = self.replacetlsstr(line)
+ l = l.replace("TLSCiphersPrefer:", "SSLCipherSuite ")
+ elif line.startswith("TLSCiphersSuppress"):
+ # like SSLCipherSuite but with :!
+ l = self.replacetlsstr(line)
+ l = l.replace("TLSCiphersSuppress:", "SSLCipherSuite !")
+ l = l.replace(":", ":!")
+ elif line.startswith("TLSCertificate"):
+ l = line.replace("TLSCertificate", "SSLCertificateFile")
+ elif line.startswith("TLSProtocol"):
+ # mod_ssl is different (+ no supported and 0x code have to be translated)
+ l = line.replace("TLSProtocol", "SSLProtocol")
+ l = l.replace("+", "")
+ l = l.replace("default", "all")
+ l = l.replace("0x0303", "1.2") # need to check 1.3 and 1.1
+ elif line.startswith("SSLProtocol"):
+ l = line # we have that in test/modules/tls/test_05_proto.py
+ elif line.startswith("TLSHonorClientOrder"):
+ # mod_ssl has SSLHonorCipherOrder on = use server off = use client.
+ l = line.lower()
+ if "on" in l:
+ l = "SSLHonorCipherOrder off"
+ else:
+ l = "SSLHonorCipherOrder on"
+ elif line.startswith("TLSEngine"):
+ # In fact it should go in the corresponding VirtualHost... Not sure how to do that.
+ l = "SSLEngine On"
+ else:
+ if line != "":
+ l = line.replace("TLS", "SSL")
+ else:
+ l = line
+ return l
+
def add(self, line: Any):
+ # make we transform the TLS to SSL if we are using mod_ssl
if isinstance(line, str):
+ if not HttpdTestEnv.has_shared_module("tls"):
+ line = self.replaceinstr(line)
if self._indents > 0:
line = f"{' ' * self._indents}{line}"
self._lines.append(line)
else:
- if self._indents > 0:
- line = [f"{' ' * self._indents}{l}" for l in line]
- self._lines.extend(line)
+ if not HttpdTestEnv.has_shared_module("tls"):
+ new = []
+ previous = ""
+ for l in line:
+ if previous.startswith("SSLCipherSuite"):
+ if l.startswith("TLSCiphersPrefer") or l.startswith("TLSCiphersSuppress"):
+ # we need to merge it
+ l = self.replaceinstr(l)
+ l = l.replace("SSLCipherSuite ", ":")
+ previous = previous + l
+ continue
+ else:
+ if self._indents > 0:
+ previous = f"{' ' * self._indents}{previous}"
+ new.append(previous)
+ previous = ""
+ l = self.replaceinstr(l)
+ if l.startswith("SSLCipherSuite"):
+ previous = l
+ continue
+ if self._indents > 0:
+ l = f"{' ' * self._indents}{l}"
+ new.append(l)
+ if previous != "":
+ if self._indents > 0:
+ previous = f"{' ' * self._indents}{previous}"
+ new.append(previous)
+ self._lines.extend(new)
+ else:
+ if self._indents > 0:
+ line = [f"{' ' * self._indents}{l}" for l in line]
+ self._lines.extend(line)
return self
def add_certificate(self, cert_file, key_file, ssl_module=None):
diff --git a/test/pyhttpd/curl.py b/test/pyhttpd/curl.py
index 3d7993f..7dcc25b 100644
--- a/test/pyhttpd/curl.py
+++ b/test/pyhttpd/curl.py
@@ -131,8 +131,6 @@ class CurlPiper:
recv_deltas.append(datetime.timedelta(microseconds=delta_mics))
last_mics = mics
stutter_td = datetime.timedelta(seconds=stutter.total_seconds() * 0.75) # 25% leeway
- # TODO: the first two chunks are often close together, it seems
- # there still is a little buffering delay going on
for idx, td in enumerate(recv_deltas[1:]):
assert stutter_td < td, \
f"chunk {idx} arrived too early \n{recv_deltas}\nafter {td}\n{recv_err}"
diff --git a/test/pyhttpd/env.py b/test/pyhttpd/env.py
index 1d4e8b1..8a20d92 100644
--- a/test/pyhttpd/env.py
+++ b/test/pyhttpd/env.py
@@ -93,6 +93,7 @@ class HttpdTestSetup:
self._make_modules_conf()
self._make_htdocs()
self._add_aptest()
+ self._build_clients()
self.env.clear_curl_headerfiles()
def _make_dirs(self):
@@ -196,6 +197,16 @@ class HttpdTestSetup:
# load our test module which is not installed
fd.write(f"LoadModule aptest_module \"{local_dir}/mod_aptest/.libs/mod_aptest.so\"\n")
+ def _build_clients(self):
+ clients_dir = os.path.join(
+ os.path.dirname(os.path.dirname(inspect.getfile(HttpdTestSetup))),
+ 'clients')
+ p = subprocess.run(['make'], capture_output=True, cwd=clients_dir)
+ rv = p.returncode
+ if rv != 0:
+ log.error(f"compiling test clients failed: {p.stderr}")
+ raise Exception(f"compiling test clients failed: {p.stderr}")
+
class HttpdTestEnv:
@@ -324,6 +335,12 @@ class HttpdTestEnv:
for name in self._httpd_log_modules:
self._log_interesting += f" {name}:{log_level}"
+ def check_error_log(self):
+ errors, warnings = self._error_log.get_missed()
+ assert (len(errors), len(warnings)) == (0, 0),\
+ f"apache logged {len(errors)} errors and {len(warnings)} warnings: \n"\
+ "{0}\n{1}\n".format("\n".join(errors), "\n".join(warnings))
+
@property
def curl(self) -> str:
return self._curl
@@ -572,16 +589,22 @@ class HttpdTestEnv:
return f"{scheme}://{hostname}.{self.http_tld}:{port}{path}"
def install_test_conf(self, lines: List[str]):
+ self.apache_stop()
with open(self._test_conf, 'w') as fd:
fd.write('\n'.join(self._httpd_base_conf))
fd.write('\n')
fd.write(f"CoreDumpDirectory {self._server_dir}\n")
- if self._verbosity >= 2:
- fd.write(f"LogLevel core:trace5 {self.mpm_module}:trace5 http:trace5\n")
+ fd.write('\n')
if self._verbosity >= 3:
- fd.write(f"LogLevel dumpio:trace7\n")
+ fd.write(f"LogLevel trace7 ssl:trace6\n")
fd.write(f"DumpIoOutput on\n")
fd.write(f"DumpIoInput on\n")
+ elif self._verbosity >= 2:
+ fd.write(f"LogLevel debug core:trace5 {self.mpm_module}:trace5 ssl:trace5 http:trace5\n")
+ elif self._verbosity >= 1:
+ fd.write(f"LogLevel info\n")
+ else:
+ fd.write(f"LogLevel warn\n")
if self._log_interesting:
fd.write(self._log_interesting)
fd.write('\n\n')
diff --git a/test/pyhttpd/log.py b/test/pyhttpd/log.py
index dff7623..17b0502 100644
--- a/test/pyhttpd/log.py
+++ b/test/pyhttpd/log.py
@@ -8,33 +8,32 @@ from typing import List, Tuple, Any
class HttpdErrorLog:
"""Checking the httpd error log for errors and warnings, including
- limiting checks from a last known position forward.
+ limiting checks from a recent known position forward.
"""
- RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
- RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
- RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
- RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')
+ RE_ERRLOG_WARN = re.compile(r'.*\[[^:]+:warn].*')
+ RE_ERRLOG_ERROR = re.compile(r'.*\[[^:]+:error].*')
+ RE_APLOGNO = re.compile(r'.*\[[^:]+:(error|warn)].* (?P<aplogno>AH\d+): .+')
def __init__(self, path: str):
self._path = path
- self._ignored_modules = []
+ self._ignored_matches = []
self._ignored_lognos = set()
- self._ignored_patterns = []
# remember the file position we started with
self._start_pos = 0
if os.path.isfile(self._path):
with open(self._path) as fd:
self._start_pos = fd.seek(0, SEEK_END)
- self._last_pos = self._start_pos
- self._last_errors = []
- self._last_warnings = []
- self._observed_erros = set()
- self._observed_warnings = set()
+ self._recent_pos = self._start_pos
+ self._recent_errors = []
+ self._recent_warnings = []
+ self._caught_errors = set()
+ self._caught_warnings = set()
+ self._caught_matches = set()
def __repr__(self):
- return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \
- f"warnings: {' '.join(self._last_warnings)}]"
+ return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._recent_errors)}, " \
+ f"warnings: {' '.join(self._recent_warnings)}]"
@property
def path(self) -> str:
@@ -42,118 +41,108 @@ class HttpdErrorLog:
def clear_log(self):
if os.path.isfile(self.path):
- os.remove(self.path)
- self._start_pos = 0
- self._last_pos = self._start_pos
- self._last_errors = []
- self._last_warnings = []
- self._observed_erros = set()
- self._observed_warnings = set()
+ os.truncate(self.path, 0)
+ self._start_pos = self._recent_pos = 0
+ self._recent_errors = []
+ self._recent_warnings = []
+ self._caught_errors = set()
+ self._caught_warnings = set()
+ self._caught_matches = set()
+
+ def _lookup_matches(self, line: str, matches: List[str]) -> bool:
+ for m in matches:
+ if re.match(m, line):
+ return True
+ return False
+
+ def _lookup_lognos(self, line: str, lognos: set) -> bool:
+ if len(lognos) > 0:
+ m = self.RE_APLOGNO.match(line)
+ if m and m.group('aplogno') in lognos:
+ return True
+ return False
- def set_ignored_modules(self, modules: List[str]):
- self._ignored_modules = modules.copy() if modules else []
+ def clear_ignored_matches(self):
+ self._ignored_matches = []
- def set_ignored_lognos(self, lognos: List[str]):
- if lognos:
- for l in lognos:
- self._ignored_lognos.add(l)
+ def add_ignored_matches(self, matches: List[str]):
+ for m in matches:
+ self._ignored_matches.append(re.compile(m))
- def add_ignored_patterns(self, patterns: List[Any]):
- self._ignored_patterns.extend(patterns)
+ def clear_ignored_lognos(self):
+ self._ignored_lognos = set()
+
+ def add_ignored_lognos(self, lognos: List[str]):
+ for l in lognos:
+ self._ignored_lognos.add(l)
def _is_ignored(self, line: str) -> bool:
- for p in self._ignored_patterns:
- if p.match(line):
- return True
- m = self.RE_APLOGNO.match(line)
- if m and m.group('aplogno') in self._ignored_lognos:
+ if self._lookup_matches(line, self._ignored_matches):
+ return True
+ if self._lookup_lognos(line, self._ignored_lognos):
return True
return False
- def get_recent(self, advance=True) -> Tuple[List[str], List[str]]:
- """Collect error and warning from the log since the last remembered position
- :param advance: advance the position to the end of the log afterwards
- :return: list of error and list of warnings as tuple
- """
- self._last_errors = []
- self._last_warnings = []
+ def ignore_recent(self, lognos: List[str] = [], matches: List[str] = []):
+ """After a test case triggered errors/warnings on purpose, add
+ those to our 'caught' list so the do not get reported as 'missed'.
+ """
+ self._recent_errors = []
+ self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
- fd.seek(self._last_pos, os.SEEK_SET)
+ fd.seek(self._recent_pos, os.SEEK_SET)
+ lognos_set = set(lognos)
for line in fd:
if self._is_ignored(line):
continue
- m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in self._ignored_modules:
- self._last_errors.append(line)
+ if self._lookup_matches(line, matches):
+ self._caught_matches.add(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
- if m:
- if m and m.group('module') not in self._ignored_modules:
- self._last_warnings.append(line)
- continue
- if advance:
- self._last_pos = fd.tell()
- self._observed_erros.update(set(self._last_errors))
- self._observed_warnings.update(set(self._last_warnings))
- return self._last_errors, self._last_warnings
-
- def get_recent_count(self, advance=True):
- errors, warnings = self.get_recent(advance=advance)
- return len(errors), len(warnings)
-
- def ignore_recent(self):
- """After a test case triggered errors/warnings on purpose, add
- those to our 'observed' list so the do not get reported as 'missed'.
- """
- self._last_errors = []
- self._last_warnings = []
- if os.path.isfile(self._path):
- with open(self._path) as fd:
- fd.seek(self._last_pos, os.SEEK_SET)
- for line in fd:
- if self._is_ignored(line):
+ if m and self._lookup_lognos(line, lognos_set):
+ self._caught_warnings.add(line)
continue
m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in self._ignored_modules:
- self._observed_erros.add(line)
+ if m and self._lookup_lognos(line, lognos_set):
+ self._caught_errors.add(line)
continue
- m = self.RE_ERRLOG_WARN.match(line)
- if m:
- if m and m.group('module') not in self._ignored_modules:
- self._observed_warnings.add(line)
- continue
- self._last_pos = fd.tell()
+ self._recent_pos = fd.tell()
def get_missed(self) -> Tuple[List[str], List[str]]:
errors = []
warnings = []
+ self._recent_errors = []
+ self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._start_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
+ if line in self._caught_matches:
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m and line not in self._caught_warnings:
+ warnings.append(line)
+ continue
m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in self._ignored_modules \
- and line not in self._observed_erros:
+ if m and line not in self._caught_errors:
errors.append(line)
continue
- m = self.RE_ERRLOG_WARN.match(line)
- if m:
- if m and m.group('module') not in self._ignored_modules \
- and line not in self._observed_warnings:
- warnings.append(line)
- continue
+ self._start_pos = self._recent_pos = fd.tell()
+ self._caught_errors = set()
+ self._caught_warnings = set()
+ self._caught_matches = set()
return errors, warnings
- def scan_recent(self, pattern: re, timeout=10):
+ def scan_recent(self, pattern: re.Pattern, timeout=10):
if not os.path.isfile(self.path):
return False
with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout)
while True:
- fd.seek(self._last_pos, os.SEEK_SET)
+ fd.seek(self._recent_pos, os.SEEK_SET)
for line in fd:
if pattern.match(line):
return True