diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-25 04:41:26 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-25 04:41:26 +0000 |
commit | 7b31d4f4901cdb89a79f2f7de4a6b8bb637b523b (patch) | |
tree | fdeb0b5ff80273f95ce61607fc3613dff0b9a235 /test/pyhttpd/env.py | |
parent | Adding upstream version 2.4.38. (diff) | |
download | apache2-upstream.tar.xz apache2-upstream.zip |
Adding upstream version 2.4.59.upstream/2.4.59upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/pyhttpd/env.py')
-rw-r--r-- | test/pyhttpd/env.py | 898 |
1 files changed, 898 insertions, 0 deletions
diff --git a/test/pyhttpd/env.py b/test/pyhttpd/env.py new file mode 100644 index 0000000..1d4e8b1 --- /dev/null +++ b/test/pyhttpd/env.py @@ -0,0 +1,898 @@ +import importlib +import inspect +import logging +import re +import os +import shutil +import stat +import subprocess +import sys +import time +from datetime import datetime, timedelta +from string import Template +from typing import List, Optional + +from configparser import ConfigParser, ExtendedInterpolation +from urllib.parse import urlparse + +from .certs import Credentials, HttpdTestCA, CertificateSpec +from .log import HttpdErrorLog +from .nghttp import Nghttp +from .result import ExecResult + + +log = logging.getLogger(__name__) + + +class Dummy: + pass + + +class HttpdTestSetup: + + # the modules we want to load + MODULES = [ + "log_config", + "logio", + "unixd", + "version", + "watchdog", + "authn_core", + "authz_host", + "authz_groupfile", + "authz_user", + "authz_core", + "access_compat", + "auth_basic", + "cache", + "cache_disk", + "cache_socache", + "socache_shmcb", + "dumpio", + "reqtimeout", + "filter", + "mime", + "env", + "headers", + "setenvif", + "slotmem_shm", + "status", + "dir", + "alias", + "rewrite", + "deflate", + "proxy", + "proxy_http", + ] + + CURL_STDOUT_SEPARATOR = "===CURL_STDOUT_SEPARATOR===" + + def __init__(self, env: 'HttpdTestEnv'): + self.env = env + self._source_dirs = [os.path.dirname(inspect.getfile(HttpdTestSetup))] + self._modules = HttpdTestSetup.MODULES.copy() + self._optional_modules = [] + + def add_source_dir(self, source_dir): + self._source_dirs.append(source_dir) + + def add_modules(self, modules: List[str]): + self._modules.extend(modules) + + def add_optional_modules(self, modules: List[str]): + self._optional_modules.extend(modules) + + def make(self): + self._make_dirs() + self._make_conf() + if self.env.mpm_module is not None \ + and self.env.mpm_module in self.env.mpm_modules: + self.add_modules([self.env.mpm_module]) + if self.env.ssl_module is not None: + self.add_modules([self.env.ssl_module]) + self._make_modules_conf() + self._make_htdocs() + self._add_aptest() + self.env.clear_curl_headerfiles() + + def _make_dirs(self): + if not os.path.exists(self.env.gen_dir): + os.makedirs(self.env.gen_dir) + if not os.path.exists(self.env.server_logs_dir): + os.makedirs(self.env.server_logs_dir) + + def _make_conf(self): + # remove anything from another run/test suite + conf_dest_dir = os.path.join(self.env.server_dir, 'conf') + if os.path.isdir(conf_dest_dir): + shutil.rmtree(conf_dest_dir) + for d in self._source_dirs: + conf_src_dir = os.path.join(d, 'conf') + if os.path.isdir(conf_src_dir): + if not os.path.exists(conf_dest_dir): + os.makedirs(conf_dest_dir) + for name in os.listdir(conf_src_dir): + src_path = os.path.join(conf_src_dir, name) + m = re.match(r'(.+).template', name) + if m: + self._make_template(src_path, os.path.join(conf_dest_dir, m.group(1))) + elif os.path.isfile(src_path): + shutil.copy(src_path, os.path.join(conf_dest_dir, name)) + + def _make_template(self, src, dest): + var_map = dict() + for name, value in HttpdTestEnv.__dict__.items(): + if isinstance(value, property): + var_map[name] = value.fget(self.env) + t = Template(''.join(open(src).readlines())) + with open(dest, 'w') as fd: + fd.write(t.substitute(var_map)) + + def _make_modules_conf(self): + loaded = set() + modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf') + with open(modules_conf, 'w') as fd: + # issue load directives for all modules we want that are shared + missing_mods = list() + for m in self._modules: + match = re.match(r'^mod_(.+)$', m) + if match: + m = match.group(1) + if m in loaded: + continue + mod_path = os.path.join(self.env.libexec_dir, f"mod_{m}.so") + if os.path.isfile(mod_path): + fd.write(f"LoadModule {m}_module \"{mod_path}\"\n") + elif m in self.env.dso_modules: + missing_mods.append(m) + else: + fd.write(f"#built static: LoadModule {m}_module \"{mod_path}\"\n") + loaded.add(m) + for m in self._optional_modules: + match = re.match(r'^mod_(.+)$', m) + if match: + m = match.group(1) + if m in loaded: + continue + mod_path = os.path.join(self.env.libexec_dir, f"mod_{m}.so") + if os.path.isfile(mod_path): + fd.write(f"LoadModule {m}_module \"{mod_path}\"\n") + loaded.add(m) + if len(missing_mods) > 0: + raise Exception(f"Unable to find modules: {missing_mods} " + f"DSOs: {self.env.dso_modules}") + + def _make_htdocs(self): + if not os.path.exists(self.env.server_docs_dir): + os.makedirs(self.env.server_docs_dir) + dest_dir = os.path.join(self.env.server_dir, 'htdocs') + # remove anything from another run/test suite + if os.path.isdir(dest_dir): + shutil.rmtree(dest_dir) + for d in self._source_dirs: + srcdocs = os.path.join(d, 'htdocs') + if os.path.isdir(srcdocs): + shutil.copytree(srcdocs, dest_dir, dirs_exist_ok=True) + # make all contained .py scripts executable + for dirpath, _dirnames, filenames in os.walk(dest_dir): + for fname in filenames: + if re.match(r'.+\.py', fname): + py_file = os.path.join(dirpath, fname) + st = os.stat(py_file) + os.chmod(py_file, st.st_mode | stat.S_IEXEC) + + def _add_aptest(self): + local_dir = os.path.dirname(inspect.getfile(HttpdTestSetup)) + p = subprocess.run([self.env.apxs, '-c', 'mod_aptest.c'], + capture_output=True, + cwd=os.path.join(local_dir, 'mod_aptest')) + rv = p.returncode + if rv != 0: + log.error(f"compiling mod_aptest failed: {p.stderr}") + raise Exception(f"compiling mod_aptest failed: {p.stderr}") + + modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf') + with open(modules_conf, 'a') as fd: + # load our test module which is not installed + fd.write(f"LoadModule aptest_module \"{local_dir}/mod_aptest/.libs/mod_aptest.so\"\n") + + +class HttpdTestEnv: + + LIBEXEC_DIR = None + + @classmethod + def has_python_package(cls, name: str) -> bool: + if name in sys.modules: + # already loaded + return True + elif (spec := importlib.util.find_spec(name)) is not None: + module = importlib.util.module_from_spec(spec) + sys.modules[name] = module + spec.loader.exec_module(module) + return True + else: + return False + + @classmethod + def get_ssl_module(cls): + return os.environ['SSL'] if 'SSL' in os.environ else 'mod_ssl' + + @classmethod + def has_shared_module(cls, name): + if cls.LIBEXEC_DIR is None: + env = HttpdTestEnv() # will initialized it + path = os.path.join(cls.LIBEXEC_DIR, f"mod_{name}.so") + return os.path.isfile(path) + + def __init__(self, pytestconfig=None): + self._our_dir = os.path.dirname(inspect.getfile(Dummy)) + self.config = ConfigParser(interpolation=ExtendedInterpolation()) + self.config.read(os.path.join(self._our_dir, 'config.ini')) + + self._bin_dir = self.config.get('global', 'bindir') + self._apxs = self.config.get('global', 'apxs') + self._prefix = self.config.get('global', 'prefix') + self._apachectl = self.config.get('global', 'apachectl') + if HttpdTestEnv.LIBEXEC_DIR is None: + HttpdTestEnv.LIBEXEC_DIR = self._libexec_dir = self.get_apxs_var('LIBEXECDIR') + self._curl = self.config.get('global', 'curl_bin') + if 'CURL' in os.environ: + self._curl = os.environ['CURL'] + self._nghttp = self.config.get('global', 'nghttp') + if self._nghttp is None: + self._nghttp = 'nghttp' + self._h2load = self.config.get('global', 'h2load') + if self._h2load is None: + self._h2load = 'h2load' + + self._http_port = int(self.config.get('test', 'http_port')) + self._http_port2 = int(self.config.get('test', 'http_port2')) + self._https_port = int(self.config.get('test', 'https_port')) + self._proxy_port = int(self.config.get('test', 'proxy_port')) + self._ws_port = int(self.config.get('test', 'ws_port')) + self._http_tld = self.config.get('test', 'http_tld') + self._test_dir = self.config.get('test', 'test_dir') + self._clients_dir = os.path.join(os.path.dirname(self._test_dir), 'clients') + self._gen_dir = self.config.get('test', 'gen_dir') + self._server_dir = os.path.join(self._gen_dir, 'apache') + self._server_conf_dir = os.path.join(self._server_dir, "conf") + self._server_docs_dir = os.path.join(self._server_dir, "htdocs") + self._server_logs_dir = os.path.join(self.server_dir, "logs") + self._server_access_log = os.path.join(self._server_logs_dir, "access_log") + self._error_log = HttpdErrorLog(os.path.join(self._server_logs_dir, "error_log")) + self._apachectl_stderr = None + + self._dso_modules = self.config.get('httpd', 'dso_modules').split(' ') + self._mpm_modules = self.config.get('httpd', 'mpm_modules').split(' ') + self._mpm_module = f"mpm_{os.environ['MPM']}" if 'MPM' in os.environ else 'mpm_event' + self._ssl_module = self.get_ssl_module() + if len(self._ssl_module.strip()) == 0: + self._ssl_module = None + + self._httpd_addr = "127.0.0.1" + self._http_base = f"http://{self._httpd_addr}:{self.http_port}" + self._https_base = f"https://{self._httpd_addr}:{self.https_port}" + + self._verbosity = pytestconfig.option.verbose if pytestconfig is not None else 0 + self._test_conf = os.path.join(self._server_conf_dir, "test.conf") + self._httpd_base_conf = [] + self._httpd_log_modules = ['aptest'] + self._log_interesting = None + self._setup = None + + self._ca = None + self._cert_specs = [CertificateSpec(domains=[ + f"test1.{self._http_tld}", + f"test2.{self._http_tld}", + f"test3.{self._http_tld}", + f"cgi.{self._http_tld}", + ], key_type='rsa4096')] + + self._verify_certs = False + self._curl_headerfiles_n = 0 + self._curl_version = None + self._h2load_version = None + self._current_test = None + + def add_httpd_conf(self, lines: List[str]): + self._httpd_base_conf.extend(lines) + + def add_httpd_log_modules(self, modules: List[str]): + self._httpd_log_modules.extend(modules) + + def issue_certs(self): + if self._ca is None: + self._ca = HttpdTestCA.create_root(name=self.http_tld, + store_dir=os.path.join(self.server_dir, 'ca'), + key_type="rsa4096") + self._ca.issue_certs(self._cert_specs) + + def setup_httpd(self, setup: HttpdTestSetup = None): + """Create the server environment with config, htdocs and certificates""" + self._setup = setup if setup is not None else HttpdTestSetup(env=self) + self._setup.make() + self.issue_certs() + if self._httpd_log_modules: + if self._verbosity >= 2: + log_level = "trace2" + elif self._verbosity >= 1: + log_level = "debug" + else: + log_level = "info" + self._log_interesting = "LogLevel" + for name in self._httpd_log_modules: + self._log_interesting += f" {name}:{log_level}" + + @property + def curl(self) -> str: + return self._curl + + @property + def apxs(self) -> str: + return self._apxs + + @property + def verbosity(self) -> int: + return self._verbosity + + @property + def prefix(self) -> str: + return self._prefix + + @property + def mpm_module(self) -> str: + return self._mpm_module + + @property + def ssl_module(self) -> str: + return self._ssl_module + + @property + def http_addr(self) -> str: + return self._httpd_addr + + @property + def http_port(self) -> int: + return self._http_port + + @property + def http_port2(self) -> int: + return self._http_port2 + + @property + def https_port(self) -> int: + return self._https_port + + @property + def proxy_port(self) -> int: + return self._proxy_port + + @property + def ws_port(self) -> int: + return self._ws_port + + @property + def http_tld(self) -> str: + return self._http_tld + + @property + def http_base_url(self) -> str: + return self._http_base + + @property + def https_base_url(self) -> str: + return self._https_base + + @property + def bin_dir(self) -> str: + return self._bin_dir + + @property + def gen_dir(self) -> str: + return self._gen_dir + + @property + def test_dir(self) -> str: + return self._test_dir + + @property + def clients_dir(self) -> str: + return self._clients_dir + + @property + def server_dir(self) -> str: + return self._server_dir + + @property + def server_logs_dir(self) -> str: + return self._server_logs_dir + + @property + def libexec_dir(self) -> str: + return HttpdTestEnv.LIBEXEC_DIR + + @property + def dso_modules(self) -> List[str]: + return self._dso_modules + + @property + def mpm_modules(self) -> List[str]: + return self._mpm_modules + + @property + def server_conf_dir(self) -> str: + return self._server_conf_dir + + @property + def server_docs_dir(self) -> str: + return self._server_docs_dir + + @property + def httpd_error_log(self) -> HttpdErrorLog: + return self._error_log + + def htdocs_src(self, path): + return os.path.join(self._our_dir, 'htdocs', path) + + @property + def h2load(self) -> str: + return self._h2load + + @property + def ca(self) -> Credentials: + return self._ca + + @property + def current_test_name(self) -> str: + return self._current_test + + def set_current_test_name(self, val) -> None: + self._current_test = val + + @property + def apachectl_stderr(self): + return self._apachectl_stderr + + def add_cert_specs(self, specs: List[CertificateSpec]): + self._cert_specs.extend(specs) + + def get_credentials_for_name(self, dns_name) -> List['Credentials']: + for spec in [s for s in self._cert_specs if s.domains is not None]: + if dns_name in spec.domains: + return self.ca.get_credentials_for_name(spec.domains[0]) + return [] + + def _versiontuple(self, v): + v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v) + return tuple(map(int, v.split('.'))) + + def httpd_is_at_least(self, minv): + hv = self._versiontuple(self.get_httpd_version()) + return hv >= self._versiontuple(minv) + + def has_h2load(self): + return self._h2load != "" + + def h2load_is_at_least(self, minv): + if not self.has_h2load(): + return False + if self._h2load_version is None: + p = subprocess.run([self._h2load, '--version'], capture_output=True, text=True) + if p.returncode != 0: + return False + s = p.stdout.strip() + m = re.match(r'h2load nghttp2/(\S+)', s) + if m: + self._h2load_version = self._versiontuple(m.group(1)) + if self._h2load_version is not None: + return self._h2load_version >= self._versiontuple(minv) + return False + + def curl_is_at_least(self, minv): + if self._curl_version is None: + p = subprocess.run([self._curl, '-V'], capture_output=True, text=True) + if p.returncode != 0: + return False + for l in p.stdout.splitlines(): + m = re.match(r'curl ([0-9.]+)[- ].*', l) + if m: + self._curl_version = self._versiontuple(m.group(1)) + break + if self._curl_version is not None: + return self._curl_version >= self._versiontuple(minv) + return False + + def curl_is_less_than(self, version): + if self._curl_version is None: + p = subprocess.run([self._curl, '-V'], capture_output=True, text=True) + if p.returncode != 0: + return False + for l in p.stdout.splitlines(): + m = re.match(r'curl ([0-9.]+)[- ].*', l) + if m: + self._curl_version = self._versiontuple(m.group(1)) + break + if self._curl_version is not None: + return self._curl_version < self._versiontuple(version) + return False + + def has_nghttp(self): + return self._nghttp != "" + + def has_nghttp_get_assets(self): + if not self.has_nghttp(): + return False + args = [self._nghttp, "-a"] + p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE) + rv = p.returncode + if rv != 0: + return False + return p.stderr == "" + + def get_apxs_var(self, name: str) -> str: + p = subprocess.run([self._apxs, "-q", name], capture_output=True, text=True) + if p.returncode != 0: + return "" + return p.stdout.strip() + + def get_httpd_version(self) -> str: + return self.get_apxs_var("HTTPD_VERSION") + + def mkpath(self, path): + if not os.path.exists(path): + return os.makedirs(path) + + def run(self, args, stdout_list=False, intext=None, inbytes=None, debug_log=True): + if debug_log: + log.debug(f"run: {args}") + start = datetime.now() + if intext is not None: + inbytes = intext.encode() + p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, + input=inbytes) + stdout_as_list = None + if stdout_list: + try: + out = p.stdout.decode() + if HttpdTestSetup.CURL_STDOUT_SEPARATOR in out: + stdout_as_list = out.split(HttpdTestSetup.CURL_STDOUT_SEPARATOR) + if not stdout_as_list[len(stdout_as_list) - 1]: + stdout_as_list.pop() + p.stdout.replace(HttpdTestSetup.CURL_STDOUT_SEPARATOR.encode(), b'') + except: + pass + return ExecResult(args=args, exit_code=p.returncode, + stdout=p.stdout, stderr=p.stderr, + stdout_as_list=stdout_as_list, + duration=datetime.now() - start) + + def mkurl(self, scheme, hostname, path='/'): + port = self.https_port if scheme == 'https' else self.http_port + return f"{scheme}://{hostname}.{self.http_tld}:{port}{path}" + + def install_test_conf(self, lines: List[str]): + with open(self._test_conf, 'w') as fd: + fd.write('\n'.join(self._httpd_base_conf)) + fd.write('\n') + fd.write(f"CoreDumpDirectory {self._server_dir}\n") + if self._verbosity >= 2: + fd.write(f"LogLevel core:trace5 {self.mpm_module}:trace5 http:trace5\n") + if self._verbosity >= 3: + fd.write(f"LogLevel dumpio:trace7\n") + fd.write(f"DumpIoOutput on\n") + fd.write(f"DumpIoInput on\n") + if self._log_interesting: + fd.write(self._log_interesting) + fd.write('\n\n') + fd.write('\n'.join(lines)) + fd.write('\n') + + def is_live(self, url: str = None, timeout: timedelta = None): + if url is None: + url = self._http_base + if timeout is None: + timeout = timedelta(seconds=5) + try_until = datetime.now() + timeout + last_err = "" + while datetime.now() < try_until: + # noinspection PyBroadException + try: + r = self.curl_get(url, insecure=True) + if r.exit_code == 0: + return True + time.sleep(.1) + except ConnectionRefusedError: + log.debug("connection refused") + time.sleep(.1) + except: + if last_err != str(sys.exc_info()[0]): + last_err = str(sys.exc_info()[0]) + log.debug("Unexpected error: %s", last_err) + time.sleep(.1) + log.debug(f"Unable to contact server after {timeout}") + return False + + def is_dead(self, url: str = None, timeout: timedelta = None): + if url is None: + url = self._http_base + if timeout is None: + timeout = timedelta(seconds=5) + try_until = datetime.now() + timeout + last_err = None + while datetime.now() < try_until: + # noinspection PyBroadException + try: + r = self.curl_get(url) + if r.exit_code != 0: + return True + time.sleep(.1) + except ConnectionRefusedError: + log.debug("connection refused") + return True + except: + if last_err != str(sys.exc_info()[0]): + last_err = str(sys.exc_info()[0]) + log.debug("Unexpected error: %s", last_err) + time.sleep(.1) + log.debug(f"Server still responding after {timeout}") + return False + + def _run_apachectl(self, cmd) -> ExecResult: + conf_file = 'stop.conf' if cmd == 'stop' else 'httpd.conf' + args = [self._apachectl, + "-d", self.server_dir, + "-f", os.path.join(self._server_dir, f'conf/{conf_file}'), + "-k", cmd] + r = self.run(args) + self._apachectl_stderr = r.stderr + if r.exit_code != 0: + log.warning(f"failed: {r}") + return r + + def apache_reload(self): + r = self._run_apachectl("graceful") + if r.exit_code == 0: + timeout = timedelta(seconds=10) + return 0 if self.is_live(self._http_base, timeout=timeout) else -1 + return r.exit_code + + def apache_restart(self): + self.apache_stop() + r = self._run_apachectl("start") + if r.exit_code == 0: + timeout = timedelta(seconds=10) + return 0 if self.is_live(self._http_base, timeout=timeout) else -1 + return r.exit_code + + def apache_stop(self): + r = self._run_apachectl("stop") + if r.exit_code == 0: + timeout = timedelta(seconds=10) + return 0 if self.is_dead(self._http_base, timeout=timeout) else -1 + return r + + def apache_graceful_stop(self): + log.debug("stop apache") + self._run_apachectl("graceful-stop") + return 0 if self.is_dead() else -1 + + def apache_fail(self): + log.debug("expect apache fail") + self._run_apachectl("stop") + rv = self._run_apachectl("start") + if rv == 0: + rv = 0 if self.is_dead() else -1 + else: + rv = 0 + return rv + + def apache_access_log_clear(self): + if os.path.isfile(self._server_access_log): + os.remove(self._server_access_log) + + def get_ca_pem_file(self, hostname: str) -> Optional[str]: + if len(self.get_credentials_for_name(hostname)) > 0: + return self.ca.cert_file + return None + + def clear_curl_headerfiles(self): + for fname in os.listdir(path=self.gen_dir): + if re.match(r'curl\.headers\.\d+', fname): + os.remove(os.path.join(self.gen_dir, fname)) + self._curl_headerfiles_n = 0 + + def curl_resolve_args(self, url, insecure=False, force_resolve=True, options=None): + u = urlparse(url) + + args = [ + ] + if u.scheme == 'http': + pass + elif insecure: + args.append('--insecure') + elif options and "--cacert" in options: + pass + elif u.hostname: + ca_pem = self.get_ca_pem_file(u.hostname) + if ca_pem: + args.extend(["--cacert", ca_pem]) + + if force_resolve and u.hostname and u.hostname != 'localhost' \ + and u.hostname != self._httpd_addr \ + and not re.match(r'^(\d+|\[|:).*', u.hostname): + assert u.port, f"port not in url: {url}" + args.extend(["--resolve", f"{u.hostname}:{u.port}:{self._httpd_addr}"]) + return args + + def curl_complete_args(self, urls, stdout_list=False, + timeout=None, options=None, + insecure=False, force_resolve=True): + headerfile = f"{self.gen_dir}/curl.headers.{self._curl_headerfiles_n}" + self._curl_headerfiles_n += 1 + + args = [ + self._curl, "-s", "--path-as-is", "-D", headerfile, + ] + args.extend(self.curl_resolve_args(urls[0], insecure=insecure, + force_resolve=force_resolve, + options=options)) + if stdout_list: + args.extend(['-w', '%{stdout}' + HttpdTestSetup.CURL_STDOUT_SEPARATOR]) + if self._current_test is not None: + args.extend(["-H", f'AP-Test-Name: {self._current_test}']) + if timeout is not None and int(timeout) > 0: + args.extend(["--connect-timeout", str(int(timeout))]) + if options: + args.extend(options) + return args, headerfile + + def curl_parse_headerfile(self, headerfile: str, r: ExecResult = None) -> ExecResult: + lines = open(headerfile).readlines() + if r is None: + r = ExecResult(args=[], exit_code=0, stdout=b'', stderr=b'') + + response = None + def fin_response(response): + if response: + r.add_response(response) + + expected = ['status'] + for line in lines: + if re.match(r'^$', line): + if 'trailer' in expected: + # end of trailers + fin_response(response) + response = None + expected = ['status'] + elif 'header' in expected: + # end of header, another status or trailers might follow + expected = ['status', 'trailer'] + else: + assert False, f"unexpected line: {line}" + continue + if 'status' in expected: + log.debug("reading 1st response line: %s", line) + m = re.match(r'^(\S+) (\d+) (.*)$', line) + if m: + fin_response(response) + response = { + "protocol": m.group(1), + "status": int(m.group(2)), + "description": m.group(3), + "header": {}, + "trailer": {}, + "body": r.outraw + } + expected = ['header'] + continue + if 'trailer' in expected: + m = re.match(r'^([^:]+):\s*(.*)$', line) + if m: + response['trailer'][m.group(1).lower()] = m.group(2) + continue + if 'header' in expected: + m = re.match(r'^([^:]+):\s*(.*)$', line) + if m: + response['header'][m.group(1).lower()] = m.group(2) + continue + assert False, f"unexpected line: {line}" + + fin_response(response) + return r + + def curl_raw(self, urls, timeout=10, options=None, insecure=False, + force_resolve=True, no_stdout_list=False): + if not isinstance(urls, list): + urls = [urls] + stdout_list = False + if len(urls) > 1 and not no_stdout_list: + stdout_list = True + args, headerfile = self.curl_complete_args( + urls=urls, stdout_list=stdout_list, + timeout=timeout, options=options, insecure=insecure, + force_resolve=force_resolve) + args += urls + r = self.run(args, stdout_list=stdout_list) + if r.exit_code == 0: + self.curl_parse_headerfile(headerfile, r=r) + if r.json: + r.response["json"] = r.json + if os.path.isfile(headerfile): + os.remove(headerfile) + return r + + def curl_get(self, url, insecure=False, options=None): + return self.curl_raw([url], insecure=insecure, options=options) + + def curl_upload(self, url, fpath, timeout=5, options=None): + if not options: + options = [] + options.extend([ + "--form", ("file=@%s" % fpath) + ]) + return self.curl_raw(urls=[url], timeout=timeout, options=options) + + def curl_post_data(self, url, data="", timeout=5, options=None): + if not options: + options = [] + options.extend(["--data", "%s" % data]) + return self.curl_raw(url, timeout, options) + + def curl_post_value(self, url, key, value, timeout=5, options=None): + if not options: + options = [] + options.extend(["--form", "{0}={1}".format(key, value)]) + return self.curl_raw(url, timeout, options) + + def curl_protocol_version(self, url, timeout=5, options=None): + if not options: + options = [] + options.extend(["-w", "%{http_version}\n", "-o", "/dev/null"]) + r = self.curl_raw(url, timeout=timeout, options=options) + if r.exit_code == 0 and r.response: + return r.response["body"].decode('utf-8').rstrip() + return -1 + + def nghttp(self): + return Nghttp(self._nghttp, connect_addr=self._httpd_addr, + tmp_dir=self.gen_dir, test_name=self._current_test) + + def h2load_status(self, run: ExecResult): + stats = {} + m = re.search( + r'requests: (\d+) total, (\d+) started, (\d+) done, (\d+) succeeded' + r', (\d+) failed, (\d+) errored, (\d+) timeout', run.stdout) + if m: + stats["requests"] = { + "total": int(m.group(1)), + "started": int(m.group(2)), + "done": int(m.group(3)), + "succeeded": int(m.group(4)) + } + m = re.search(r'status codes: (\d+) 2xx, (\d+) 3xx, (\d+) 4xx, (\d+) 5xx', + run.stdout) + if m: + stats["status"] = { + "2xx": int(m.group(1)), + "3xx": int(m.group(2)), + "4xx": int(m.group(3)), + "5xx": int(m.group(4)) + } + run.add_results({"h2load": stats}) + return run + + def make_data_file(self, indir: str, fname: str, fsize: int) -> str: + fpath = os.path.join(indir, fname) + s10 = "0123456789" + s = (101 * s10) + s10[0:3] + with open(fpath, 'w') as fd: + for i in range(int(fsize / 1024)): + fd.write(f"{i:09d}-{s}\n") + remain = int(fsize % 1024) + if remain != 0: + i = int(fsize / 1024) + 1 + s = f"{i:09d}-{s}\n" + fd.write(s[0:remain]) + return fpath + |