1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
import logging
import os
import re
import subprocess
import time
from datetime import datetime, timedelta
from threading import Thread
from .tls import HandShake
from .env import Env, CryptoLib
from .log import LogFile, HexDumpScanner
log = logging.getLogger(__name__)
class ServerRun:
def __init__(self, env: Env, logfile: LogFile):
self.env = env
self._logfile = logfile
self.log_lines = self._logfile.get_recent()
self._data_recs = None
self._hs_recs = None
if self.env.verbose > 1:
log.debug(f'read {len(self.log_lines)} lines from {logfile.path}')
@property
def handshake(self):
if self._data_recs is None:
self._data_recs = [data for data in HexDumpScanner(source=self.log_lines)]
if self.env.verbose > 1:
log.debug(f'detected {len(self._data_recs)} hexdumps '
f'in {self._logfile.path}')
if self._hs_recs is None:
self._hs_recs = [hrec for hrec in HandShake(source=self._data_recs,
verbose=self.env.verbose)]
if self.env.verbose > 1:
log.debug(f'detected {len(self._hs_recs)} crypto records '
f'in {self._logfile.path}')
return self._hs_recs
@property
def hs_stripe(self):
return ":".join([hrec.name for hrec in self.handshake])
def monitor_proc(env: Env, proc):
_env = env
proc.wait()
class ExampleServer:
def __init__(self, env: Env, crypto_lib: str, verify_client=False):
self.env = env
self._crypto_lib = crypto_lib
self._path = env.server_path(self._crypto_lib)
self._logpath = f'{self.env.gen_dir}/{self._crypto_lib}-server.log'
self._log = LogFile(path=self._logpath)
self._logfile = None
self._process = None
self._verify_client = verify_client
@property
def path(self):
return self._path
@property
def crypto_lib(self):
return self._crypto_lib
@property
def uses_cipher_config(self):
return CryptoLib.uses_cipher_config(self.crypto_lib)
def supports_cipher(self, cipher):
return CryptoLib.supports_cipher(self.crypto_lib, cipher)
@property
def log(self):
return self._log
def exists(self):
return os.path.isfile(self.path)
def start(self):
if self._process is not None:
return False
creds = self.env.get_server_credentials()
assert creds
args = [
self.path,
f'--htdocs={self.env.htdocs_dir}',
]
if self._verify_client:
args.append('--verify-client')
args.extend([
'*', str(self.env.examples_port),
creds.pkey_file, creds.cert_file
])
self._logfile = open(self._logpath, 'w')
self._process = subprocess.Popen(args=args, text=True,
stdout=self._logfile, stderr=self._logfile)
t = Thread(target=monitor_proc, daemon=True, args=(self.env, self._process))
t.start()
timeout = 5
end = datetime.now() + timedelta(seconds=timeout)
while True:
if self._process.poll():
return False
try:
if self.log.scan_recent(pattern=re.compile(r'^Using document root'), timeout=0.5):
break
except TimeoutError:
pass
if datetime.now() > end:
raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
self.log.advance()
return True
def stop(self):
if self._process:
self._process.terminate()
self._process = None
if self._logfile:
self._logfile.close()
self._logfile = None
return True
def restart(self):
self.stop()
self._log.reset()
return self.start()
def get_run(self) -> ServerRun:
return ServerRun(env=self.env, logfile=self.log)
|