summaryrefslogtreecommitdiffstats
path: root/test/modules/http2/env.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/modules/http2/env.py')
-rw-r--r--test/modules/http2/env.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/test/modules/http2/env.py b/test/modules/http2/env.py
new file mode 100644
index 0000000..537d3bf
--- /dev/null
+++ b/test/modules/http2/env.py
@@ -0,0 +1,168 @@
+import inspect
+import logging
+import os
+import re
+import subprocess
+from typing import Dict, Any
+
+from pyhttpd.certs import CertificateSpec
+from pyhttpd.conf import HttpdConf
+from pyhttpd.env import HttpdTestEnv, HttpdTestSetup
+
+log = logging.getLogger(__name__)
+
+
+class H2TestSetup(HttpdTestSetup):
+
+ def __init__(self, env: 'HttpdTestEnv'):
+ super().__init__(env=env)
+ self.add_source_dir(os.path.dirname(inspect.getfile(H2TestSetup)))
+ self.add_modules(["http2", "proxy_http2", "cgid", "autoindex", "ssl"])
+
+ def make(self):
+ super().make()
+ self._add_h2test()
+ self._setup_data_1k_1m()
+
+ def _add_h2test(self):
+ local_dir = os.path.dirname(inspect.getfile(H2TestSetup))
+ p = subprocess.run([self.env.apxs, '-c', 'mod_h2test.c'],
+ capture_output=True,
+ cwd=os.path.join(local_dir, 'mod_h2test'))
+ rv = p.returncode
+ if rv != 0:
+ log.error(f"compiling md_h2test failed: {p.stderr}")
+ raise Exception(f"compiling md_h2test failed: {p.stderr}")
+
+ modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf')
+ with open(modules_conf, 'a') as fd:
+ # load our test module which is not installed
+ fd.write(f"LoadModule h2test_module \"{local_dir}/mod_h2test/.libs/mod_h2test.so\"\n")
+
+ def _setup_data_1k_1m(self):
+ s90 = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n"
+ with open(os.path.join(self.env.gen_dir, "data-1k"), 'w') as f:
+ for i in range(10):
+ f.write(f"{i:09d}-{s90}")
+ with open(os.path.join(self.env.gen_dir, "data-10k"), 'w') as f:
+ for i in range(100):
+ f.write(f"{i:09d}-{s90}")
+ with open(os.path.join(self.env.gen_dir, "data-100k"), 'w') as f:
+ for i in range(1000):
+ f.write(f"{i:09d}-{s90}")
+ with open(os.path.join(self.env.gen_dir, "data-1m"), 'w') as f:
+ for i in range(10000):
+ f.write(f"{i:09d}-{s90}")
+
+
+class H2TestEnv(HttpdTestEnv):
+
+ @classmethod
+ @property
+ def is_unsupported(cls):
+ mpm_module = f"mpm_{os.environ['MPM']}" if 'MPM' in os.environ else 'mpm_event'
+ return mpm_module == 'mpm_prefork'
+
+ def __init__(self, pytestconfig=None):
+ super().__init__(pytestconfig=pytestconfig)
+ self.add_httpd_conf([
+ "H2MinWorkers 1",
+ "H2MaxWorkers 64",
+ "Protocols h2 http/1.1 h2c",
+ ])
+ self.add_httpd_log_modules(["http2", "proxy_http2", "h2test", "proxy", "proxy_http"])
+ self.add_cert_specs([
+ CertificateSpec(domains=[
+ f"push.{self._http_tld}",
+ f"hints.{self._http_tld}",
+ f"ssl.{self._http_tld}",
+ f"pad0.{self._http_tld}",
+ f"pad1.{self._http_tld}",
+ f"pad2.{self._http_tld}",
+ f"pad3.{self._http_tld}",
+ f"pad8.{self._http_tld}",
+ ]),
+ CertificateSpec(domains=[f"noh2.{self.http_tld}"], key_type='rsa2048'),
+ ])
+
+ self.httpd_error_log.set_ignored_lognos([
+ 'AH02032',
+ 'AH01276',
+ 'AH01630',
+ 'AH00135',
+ 'AH02261', # Re-negotiation handshake failed (our test_101)
+ 'AH03490', # scoreboard full, happens on limit tests
+ 'AH02429', # invalid chars in response header names, see test_h2_200
+ 'AH02430', # invalid chars in response header values, see test_h2_200
+ 'AH10373', # SSL errors on uncompleted handshakes, see test_h2_105
+ 'AH01247', # mod_cgid sometimes freaks out on load tests
+ 'AH01110', # error by proxy reading response
+ 'AH10400', # warning that 'enablereuse' has not effect in certain configs test_h2_600
+ ])
+ self.httpd_error_log.add_ignored_patterns([
+ re.compile(r'.*malformed header from script \'hecho.py\': Bad header: x.*'),
+ re.compile(r'.*:tls_post_process_client_hello:.*'),
+ # OSSL 3 dropped the function name from the error description. Use the code instead:
+ # 0A0000C1 = no shared cipher -- Too restrictive SSLCipherSuite or using DSA server certificate?
+ re.compile(r'.*SSL Library Error: error:0A0000C1:.*'),
+ re.compile(r'.*:tls_process_client_certificate:.*'),
+ # OSSL 3 dropped the function name from the error description. Use the code instead:
+ # 0A0000C7 = peer did not return a certificate -- No CAs known to server for verification?
+ re.compile(r'.*SSL Library Error: error:0A0000C7:.*'),
+ re.compile(r'.*have incompatible TLS configurations.'),
+ ])
+
+ def setup_httpd(self, setup: HttpdTestSetup = None):
+ super().setup_httpd(setup=H2TestSetup(env=self))
+
+
+class H2Conf(HttpdConf):
+
+ def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None):
+ super().__init__(env=env, extras=HttpdConf.merge_extras(extras, {
+ f"cgi.{env.http_tld}": [
+ "SSLOptions +StdEnvVars",
+ "AddHandler cgi-script .py",
+ "<Location \"/h2test/echo\">",
+ " SetHandler h2test-echo",
+ "</Location>",
+ "<Location \"/h2test/delay\">",
+ " SetHandler h2test-delay",
+ "</Location>",
+ "<Location \"/h2test/error\">",
+ " SetHandler h2test-error",
+ "</Location>",
+ ]
+ }))
+
+ def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None,
+ ssl_module=None, with_certificates=None):
+ super().start_vhost(domains=domains, port=port, doc_root=doc_root,
+ with_ssl=with_ssl, ssl_module=ssl_module,
+ with_certificates=with_certificates)
+ if f"noh2.{self.env.http_tld}" in domains:
+ protos = ["http/1.1"]
+ elif port == self.env.https_port or with_ssl is True:
+ protos = ["h2", "http/1.1"]
+ else:
+ protos = ["h2c", "http/1.1"]
+ if f"test2.{self.env.http_tld}" in domains:
+ protos = reversed(protos)
+ self.add(f"Protocols {' '.join(protos)}")
+ return self
+
+ def add_vhost_noh2(self):
+ domains = [f"noh2.{self.env.http_tld}", f"noh2-alias.{self.env.http_tld}"]
+ self.start_vhost(domains=domains, port=self.env.https_port, doc_root="htdocs/noh2")
+ self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"])
+ self.end_vhost()
+ self.start_vhost(domains=domains, port=self.env.http_port, doc_root="htdocs/noh2")
+ self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"])
+ self.end_vhost()
+ return self
+
+ def add_vhost_test1(self, proxy_self=False, h2proxy_self=False):
+ return super().add_vhost_test1(proxy_self=proxy_self, h2proxy_self=h2proxy_self)
+
+ def add_vhost_test2(self):
+ return super().add_vhost_test2()