summaryrefslogtreecommitdiffstats
path: root/test/modules/md/md_acme.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-25 04:41:26 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-25 04:41:26 +0000
commit7b31d4f4901cdb89a79f2f7de4a6b8bb637b523b (patch)
treefdeb0b5ff80273f95ce61607fc3613dff0b9a235 /test/modules/md/md_acme.py
parentAdding upstream version 2.4.38. (diff)
downloadapache2-upstream.tar.xz
apache2-upstream.zip
Adding upstream version 2.4.59.upstream/2.4.59upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/modules/md/md_acme.py')
-rwxr-xr-xtest/modules/md/md_acme.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/test/modules/md/md_acme.py b/test/modules/md/md_acme.py
new file mode 100755
index 0000000..36be347
--- /dev/null
+++ b/test/modules/md/md_acme.py
@@ -0,0 +1,125 @@
+import logging
+import os
+import shutil
+import subprocess
+import time
+from abc import ABCMeta, abstractmethod
+from datetime import datetime, timedelta
+from threading import Thread
+from typing import Dict
+
+from .md_env import MDTestEnv
+
+
+log = logging.getLogger(__name__)
+
+
+def monitor_proc(env: MDTestEnv, proc):
+ _env = env
+ proc.wait()
+
+
+class ACMEServer:
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def start(self):
+ raise NotImplementedError
+
+ @abstractmethod
+ def stop(self):
+ raise NotImplementedError
+
+ @abstractmethod
+ def install_ca_bundle(self, dest):
+ raise NotImplementedError
+
+
+class MDPebbleRunner(ACMEServer):
+
+ def __init__(self, env: MDTestEnv, configs: Dict[str, str]):
+ self.env = env
+ self.configs = configs
+ self._current = 'default'
+ self._pebble = None
+ self._challtestsrv = None
+ self._log = None
+
+ def start(self, config: str = None):
+ if config is not None and config != self._current:
+ # change, tear down and start again
+ assert config in self.configs
+ self.stop()
+ self._current = config
+ elif self._pebble is not None:
+ # already running
+ return
+ args = ['pebble', '-config', self.configs[self._current], '-dnsserver', ':8053']
+ env = {}
+ env.update(os.environ)
+ env['PEBBLE_VA_NOSLEEP'] = '1'
+ self._log = open(f'{self.env.gen_dir}/pebble.log', 'w')
+ self._pebble = subprocess.Popen(args=args, env=env,
+ stdout=self._log, stderr=self._log)
+ t = Thread(target=monitor_proc, args=(self.env, self._pebble))
+ t.start()
+
+ args = ['pebble-challtestsrv', '-http01', '', '-https01', '', '-tlsalpn01', '']
+ self._challtestsrv = subprocess.Popen(args, stdout=self._log, stderr=self._log)
+ t = Thread(target=monitor_proc, args=(self.env, self._challtestsrv))
+ t.start()
+ self.install_ca_bundle(self.env.acme_ca_pemfile)
+ # disable ipv6 default address, this gives trouble inside docker
+ end = datetime.now() + timedelta(seconds=5)
+ while True:
+ r = self.env.run(['curl', 'localhost:8055/'])
+ if r.exit_code == 0:
+ break
+ if datetime.now() > end:
+ raise TimeoutError(f'unable to contact pebble-challtestsrv on localhost:8055')
+ time.sleep(.1)
+ r = self.env.run(['curl', '-d', f'{{"ip":""}}',
+ 'localhost:8055/set-default-ipv6'])
+ assert r.exit_code == 0, f"{r}"
+
+ def stop(self):
+ if self._pebble:
+ self._pebble.terminate()
+ self._pebble = None
+ if self._challtestsrv:
+ self._challtestsrv.terminate()
+ self._challtestsrv = None
+ if self._log:
+ self._log.close()
+ self._log = None
+
+ def install_ca_bundle(self, dest):
+ shutil.copyfile(self.env.ca.cert_file, dest)
+ end = datetime.now() + timedelta(seconds=20)
+ while datetime.now() < end:
+ r = self.env.curl_get('https://localhost:15000/roots/0', insecure=True)
+ if r.exit_code == 0:
+ with open(dest, 'a') as fd:
+ fd.write(r.stdout)
+ break
+
+
+class MDBoulderRunner(ACMEServer):
+
+ def __init__(self, env: MDTestEnv):
+ self.env = env
+ self.install_ca_bundle(self.env.acme_ca_pemfile)
+
+ def start(self, config=None):
+ pass
+
+ def stop(self):
+ pass
+
+ def install_ca_bundle(self, dest):
+ r = self.env.run([
+ 'docker', 'exec', 'boulder_boulder_1', 'bash', '-c', "cat /tmp/root*.pem"
+ ])
+ assert r.exit_code == 0
+ with open(dest, 'w') as fd:
+ fd.write(r.stdout)