from typing import Dict, Any from pyhttpd.env import HttpdTestEnv class HttpdConf(object): def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None): """ Create a new httpd configuration. :param env: then environment this operates in :param extras: extra configuration directive with ServerName as key and 'base' as special key for global configuration additions. """ self.env = env self._indents = 0 self._lines = [] self._extras = extras.copy() if extras else {} if 'base' in self._extras: self.add(self._extras['base']) self._tls_engine_ports = set() def __repr__(self): s = '\n'.join(self._lines) return f"HttpdConf[{s}]" def install(self): self.env.install_test_conf(self._lines) def replacetlsstr(self, line): l = line.replace("TLS_", "") l = l.replace("\n", " ") l = l.replace("\\", " ") l = " ".join(l.split()) l = l.replace(" ", ":") l = l.replace("_", "-") l = l.replace("-WITH", "") l = l.replace("AES-", "AES") l = l.replace("POLY1305-SHA256", "POLY1305") return l def replaceinstr(self, line): if line.startswith("TLSCiphersPrefer"): # the "TLS_" are changed into "". l = self.replacetlsstr(line) l = l.replace("TLSCiphersPrefer:", "SSLCipherSuite ") elif line.startswith("TLSCiphersSuppress"): # like SSLCipherSuite but with :! l = self.replacetlsstr(line) l = l.replace("TLSCiphersSuppress:", "SSLCipherSuite !") l = l.replace(":", ":!") elif line.startswith("TLSCertificate"): l = line.replace("TLSCertificate", "SSLCertificateFile") elif line.startswith("TLSProtocol"): # mod_ssl is different (+ no supported and 0x code have to be translated) l = line.replace("TLSProtocol", "SSLProtocol") l = l.replace("+", "") l = l.replace("default", "all") l = l.replace("0x0303", "1.2") # need to check 1.3 and 1.1 elif line.startswith("SSLProtocol"): l = line # we have that in test/modules/tls/test_05_proto.py elif line.startswith("TLSHonorClientOrder"): # mod_ssl has SSLHonorCipherOrder on = use server off = use client. l = line.lower() if "on" in l: l = "SSLHonorCipherOrder off" else: l = "SSLHonorCipherOrder on" elif line.startswith("TLSEngine"): # In fact it should go in the corresponding VirtualHost... Not sure how to do that. l = "SSLEngine On" else: if line != "": l = line.replace("TLS", "SSL") else: l = line return l def add(self, line: Any): # make we transform the TLS to SSL if we are using mod_ssl if isinstance(line, str): if not HttpdTestEnv.has_shared_module("tls"): line = self.replaceinstr(line) if self._indents > 0: line = f"{' ' * self._indents}{line}" self._lines.append(line) else: if not HttpdTestEnv.has_shared_module("tls"): new = [] previous = "" for l in line: if previous.startswith("SSLCipherSuite"): if l.startswith("TLSCiphersPrefer") or l.startswith("TLSCiphersSuppress"): # we need to merge it l = self.replaceinstr(l) l = l.replace("SSLCipherSuite ", ":") previous = previous + l continue else: if self._indents > 0: previous = f"{' ' * self._indents}{previous}" new.append(previous) previous = "" l = self.replaceinstr(l) if l.startswith("SSLCipherSuite"): previous = l continue if self._indents > 0: l = f"{' ' * self._indents}{l}" new.append(l) if previous != "": if self._indents > 0: previous = f"{' ' * self._indents}{previous}" new.append(previous) self._lines.extend(new) else: if self._indents > 0: line = [f"{' ' * self._indents}{l}" for l in line] self._lines.extend(line) return self def add_certificate(self, cert_file, key_file, ssl_module=None): if ssl_module is None: ssl_module = self.env.ssl_module if ssl_module == 'mod_ssl': self.add([ f"SSLCertificateFile {cert_file}", f"SSLCertificateKeyFile {key_file if key_file else cert_file}", ]) elif ssl_module == 'mod_tls': self.add(f"TLSCertificate {cert_file} {key_file if key_file else ''}") elif ssl_module == 'mod_gnutls': self.add([ f"GnuTLSCertificateFile {cert_file}", f"GnuTLSKeyFile {key_file if key_file else cert_file}", ]) else: raise Exception(f"unsupported ssl module: {ssl_module}") def add_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None, with_certificates=None, ssl_module=None): self.start_vhost(domains=domains, port=port, doc_root=doc_root, with_ssl=with_ssl, with_certificates=with_certificates, ssl_module=ssl_module) self.end_vhost() return self def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None, ssl_module=None, with_certificates=None): if not isinstance(domains, list): domains = [domains] if port is None: port = self.env.https_port if ssl_module is None: ssl_module = self.env.ssl_module if with_ssl is None: with_ssl = self.env.https_port == port if with_ssl and ssl_module == 'mod_tls' and port not in self._tls_engine_ports: self.add(f"TLSEngine {port}") self._tls_engine_ports.add(port) self.add("") self.add(f"") self._indents += 1 self.add(f"ServerName {domains[0]}") for alias in domains[1:]: self.add(f"ServerAlias {alias}") self.add(f"DocumentRoot {doc_root}") if with_ssl: if ssl_module == 'mod_ssl': self.add("SSLEngine on") elif ssl_module == 'mod_gnutls': self.add("GnuTLSEnable on") if with_certificates is not False: for cred in self.env.get_credentials_for_name(domains[0]): self.add_certificate(cred.cert_file, cred.pkey_file, ssl_module=ssl_module) if domains[0] in self._extras: self.add(self._extras[domains[0]]) return self def end_vhost(self): self._indents -= 1 self.add("") self.add("") return self def add_proxies(self, host, proxy_self=False, h2proxy_self=False): if proxy_self or h2proxy_self: self.add("ProxyPreserveHost on") if proxy_self: self.add([ f"ProxyPass /proxy/ http://127.0.0.1:{self.env.http_port}/", f"ProxyPassReverse /proxy/ http://{host}.{self.env.http_tld}:{self.env.http_port}/", ]) if h2proxy_self: self.add([ f"ProxyPass /h2proxy/ h2://127.0.0.1:{self.env.https_port}/", f"ProxyPassReverse /h2proxy/ https://{host}.{self.env.http_tld}:self.env.https_port/", ]) return self def add_vhost_test1(self, proxy_self=False, h2proxy_self=False): domain = f"test1.{self.env.http_tld}" self.start_vhost(domains=[domain, f"www1.{self.env.http_tld}"], port=self.env.http_port, doc_root="htdocs/test1") self.end_vhost() self.start_vhost(domains=[domain, f"www1.{self.env.http_tld}"], port=self.env.https_port, doc_root="htdocs/test1") self.add([ "", " Options +Indexes", "", ]) self.add_proxies("test1", proxy_self, h2proxy_self) self.end_vhost() return self def add_vhost_test2(self): domain = f"test2.{self.env.http_tld}" self.start_vhost(domains=[domain, f"www2.{self.env.http_tld}"], port=self.env.http_port, doc_root="htdocs/test2") self.end_vhost() self.start_vhost(domains=[domain, f"www2.{self.env.http_tld}"], port=self.env.https_port, doc_root="htdocs/test2") self.add([ "", " Options +Indexes", "", ]) self.end_vhost() return self def add_vhost_cgi(self, proxy_self=False, h2proxy_self=False): domain = f"cgi.{self.env.http_tld}" if proxy_self: self.add(["ProxyStatus on", "ProxyTimeout 5", "SSLProxyEngine on", "SSLProxyVerify none"]) if h2proxy_self: self.add(["SSLProxyEngine on", "SSLProxyCheckPeerName off"]) self.start_vhost(domains=[domain, f"cgi-alias.{self.env.http_tld}"], port=self.env.https_port, doc_root="htdocs/cgi") self.add_proxies("cgi", proxy_self=proxy_self, h2proxy_self=h2proxy_self) self.end_vhost() self.start_vhost(domains=[domain, f"cgi-alias.{self.env.http_tld}"], port=self.env.http_port, doc_root="htdocs/cgi") self.add("AddHandler cgi-script .py") self.add_proxies("cgi", proxy_self=proxy_self, h2proxy_self=h2proxy_self) self.end_vhost() return self @staticmethod def merge_extras(e1: Dict[str, Any], e2: Dict[str, Any]) -> Dict[str, Any]: def _concat(v1, v2): if isinstance(v1, str): v1 = [v1] if isinstance(v2, str): v2 = [v2] v1.extend(v2) return v1 if e1 is None: return e2.copy() if e2 else None if e2 is None: return e1.copy() e3 = e1.copy() for name, val in e2.items(): if name in e3: e3[name] = _concat(e3[name], val) else: e3[name] = val return e3