summaryrefslogtreecommitdiffstats
path: root/test/modules/http2/env.py
blob: 34d196d6bd63a6aa99a730070a087efcab9f024a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import inspect
import logging
import os
import re
import subprocess
from typing import Dict, Any

from pyhttpd.certs import CertificateSpec
from pyhttpd.conf import HttpdConf
from pyhttpd.env import HttpdTestEnv, HttpdTestSetup

log = logging.getLogger(__name__)


class H2TestSetup(HttpdTestSetup):

    def __init__(self, env: 'HttpdTestEnv'):
        super().__init__(env=env)
        self.add_source_dir(os.path.dirname(inspect.getfile(H2TestSetup)))
        self.add_modules(["http2", "proxy_http2", "cgid", "autoindex", "ssl", "include"])

    def make(self):
        super().make()
        self._add_h2test()
        self._setup_data_1k_1m()

    def _add_h2test(self):
        local_dir = os.path.dirname(inspect.getfile(H2TestSetup))
        p = subprocess.run([self.env.apxs, '-c', 'mod_h2test.c'],
                           capture_output=True,
                           cwd=os.path.join(local_dir, 'mod_h2test'))
        rv = p.returncode
        if rv != 0:
            log.error(f"compiling md_h2test failed: {p.stderr}")
            raise Exception(f"compiling md_h2test failed: {p.stderr}")

        modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf')
        with open(modules_conf, 'a') as fd:
            # load our test module which is not installed
            fd.write(f"LoadModule h2test_module   \"{local_dir}/mod_h2test/.libs/mod_h2test.so\"\n")

    def _setup_data_1k_1m(self):
        s90 = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n"
        with open(os.path.join(self.env.gen_dir, "data-1k"), 'w') as f:
            for i in range(10):
                f.write(f"{i:09d}-{s90}")
        with open(os.path.join(self.env.gen_dir, "data-10k"), 'w') as f:
            for i in range(100):
                f.write(f"{i:09d}-{s90}")
        with open(os.path.join(self.env.gen_dir, "data-100k"), 'w') as f:
            for i in range(1000):
                f.write(f"{i:09d}-{s90}")
        with open(os.path.join(self.env.gen_dir, "data-1m"), 'w') as f:
            for i in range(10000):
                f.write(f"{i:09d}-{s90}")


class H2TestEnv(HttpdTestEnv):

    @classmethod
    @property
    def is_unsupported(cls):
        mpm_module = f"mpm_{os.environ['MPM']}" if 'MPM' in os.environ else 'mpm_event'
        return mpm_module == 'mpm_prefork'

    def __init__(self, pytestconfig=None):
        super().__init__(pytestconfig=pytestconfig)
        self.add_httpd_conf([
            "H2MinWorkers 1",
            "H2MaxWorkers 64",
            "Protocols h2 http/1.1 h2c",
        ])
        self.add_httpd_log_modules(["http2", "proxy_http2", "h2test", "proxy", "proxy_http"])
        self.add_cert_specs([
            CertificateSpec(domains=[
                f"push.{self._http_tld}",
                f"hints.{self._http_tld}",
                f"ssl.{self._http_tld}",
                f"pad0.{self._http_tld}",
                f"pad1.{self._http_tld}",
                f"pad2.{self._http_tld}",
                f"pad3.{self._http_tld}",
                f"pad8.{self._http_tld}",
            ]),
            CertificateSpec(domains=[f"noh2.{self.http_tld}"], key_type='rsa2048'),
        ])

        self.httpd_error_log.set_ignored_lognos([
            'AH02032',
            'AH01276',
            'AH01630',
            'AH00135',
            'AH02261',  # Re-negotiation handshake failed (our test_101)
            'AH03490',  # scoreboard full, happens on limit tests
            'AH02429',  # invalid chars in response header names, see test_h2_200
            'AH02430',  # invalid chars in response header values, see test_h2_200
            'AH10373',  # SSL errors on uncompleted handshakes, see test_h2_105
            'AH01247',  # mod_cgid sometimes freaks out on load tests
            'AH01110',  # error by proxy reading response
            'AH10400',  # warning that 'enablereuse' has not effect in certain configs test_h2_600
            'AH00045',  # child did not exit in time, SIGTERM was sent
        ])
        self.httpd_error_log.add_ignored_patterns([
            re.compile(r'.*malformed header from script \'hecho.py\': Bad header: x.*'),
            re.compile(r'.*:tls_post_process_client_hello:.*'),
            # OSSL 3 dropped the function name from the error description. Use the code instead:
            # 0A0000C1 = no shared cipher -- Too restrictive SSLCipherSuite or using DSA server certificate?
            re.compile(r'.*SSL Library Error: error:0A0000C1:.*'),
            re.compile(r'.*:tls_process_client_certificate:.*'),
            # OSSL 3 dropped the function name from the error description. Use the code instead:
            # 0A0000C7 = peer did not return a certificate -- No CAs known to server for verification?
            re.compile(r'.*SSL Library Error: error:0A0000C7:.*'),
            re.compile(r'.*have incompatible TLS configurations.'),
        ])

    def setup_httpd(self, setup: HttpdTestSetup = None):
        super().setup_httpd(setup=H2TestSetup(env=self))


class H2Conf(HttpdConf):

    def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None):
        super().__init__(env=env, extras=HttpdConf.merge_extras(extras, {
            f"cgi.{env.http_tld}": [
                "SSLOptions +StdEnvVars",
                "AddHandler cgi-script .py",
                "<Location \"/h2test/echo\">",
                "    SetHandler h2test-echo",
                "</Location>",
                "<Location \"/h2test/delay\">",
                "    SetHandler h2test-delay",
                "</Location>",
                "<Location \"/h2test/error\">",
                "    SetHandler h2test-error",
                "</Location>",
            ]
        }))

    def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None,
                    ssl_module=None, with_certificates=None):
        super().start_vhost(domains=domains, port=port, doc_root=doc_root,
                            with_ssl=with_ssl, ssl_module=ssl_module,
                            with_certificates=with_certificates)
        if f"noh2.{self.env.http_tld}" in domains:
            protos = ["http/1.1"]
        elif port == self.env.https_port or with_ssl is True:
            protos = ["h2", "http/1.1"]
        else:
            protos = ["h2c", "http/1.1"]
        if f"test2.{self.env.http_tld}" in domains:
            protos = reversed(protos)
        self.add(f"Protocols {' '.join(protos)}")
        return self

    def add_vhost_noh2(self):
        domains = [f"noh2.{self.env.http_tld}", f"noh2-alias.{self.env.http_tld}"]
        self.start_vhost(domains=domains, port=self.env.https_port, doc_root="htdocs/noh2")
        self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"])
        self.end_vhost()
        self.start_vhost(domains=domains, port=self.env.http_port, doc_root="htdocs/noh2")
        self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"])
        self.end_vhost()
        return self

    def add_vhost_test1(self, proxy_self=False, h2proxy_self=False):
        return super().add_vhost_test1(proxy_self=proxy_self, h2proxy_self=h2proxy_self)

    def add_vhost_test2(self):
        return super().add_vhost_test2()