diff options
Diffstat (limited to 'examples/tests/ngtcp2test/client.py')
-rw-r--r-- | examples/tests/ngtcp2test/client.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/examples/tests/ngtcp2test/client.py b/examples/tests/ngtcp2test/client.py new file mode 100644 index 0000000..2676343 --- /dev/null +++ b/examples/tests/ngtcp2test/client.py @@ -0,0 +1,187 @@ +import logging +import os +import re +import subprocess +from typing import List + +import pytest + +from .server import ExampleServer, ServerRun +from .certs import Credentials +from .tls import HandShake, HSRecord +from .env import Env, CryptoLib +from .log import LogFile, HexDumpScanner + + +log = logging.getLogger(__name__) + + +class ClientRun: + + def __init__(self, env: Env, returncode, logfile: LogFile, srun: ServerRun): + self.env = env + self.returncode = returncode + self.logfile = logfile + self.log_lines = logfile.get_recent() + self._data_recs = None + self._hs_recs = None + self._srun = srun + if self.env.verbose > 1: + log.debug(f'read {len(self.log_lines)} lines from {logfile.path}') + + @property + def handshake(self) -> List[HSRecord]: + if self._data_recs is None: + crypto_line = re.compile(r'Ordered CRYPTO data in \S+ crypto level') + scanner = HexDumpScanner(source=self.log_lines, + leading_regex=crypto_line) + self._data_recs = [data for data in scanner] + if self.env.verbose > 1: + log.debug(f'detected {len(self._data_recs)} crypto hexdumps ' + f'in {self.logfile.path}') + if self._hs_recs is None: + self._hs_recs = [hrec for hrec in HandShake(source=self._data_recs, + verbose=self.env.verbose)] + if self.env.verbose > 1: + log.debug(f'detected {len(self._hs_recs)} crypto ' + f'records in {self.logfile.path}') + return self._hs_recs + + @property + def hs_stripe(self) -> str: + return ":".join([hrec.name for hrec in self.handshake]) + + @property + def early_data_rejected(self) -> bool: + for l in self.log_lines: + if re.match(r'^Early data was rejected by server.*', l): + return True + return False + + @property + def server(self) -> ServerRun: + return self._srun + + def norm_exp(self, c_hs, s_hs, allow_hello_retry=True): + if allow_hello_retry and self.hs_stripe.startswith('HelloRetryRequest:'): + c_hs = "HelloRetryRequest:" + c_hs + s_hs = "ClientHello:" + s_hs + return c_hs, s_hs + + def _assert_hs(self, c_hs, s_hs): + if not self.hs_stripe.startswith(c_hs): + # what happened? + if self.hs_stripe == '': + # server send nothing + if self.server.hs_stripe == '': + # client send nothing + pytest.fail(f'client did not send a ClientHello"') + else: + # client send sth, but server did not respond + pytest.fail(f'server did not respond to ClientHello: ' + f'{self.server.handshake[0].to_text()}"') + else: + pytest.fail(f'Expected "{c_hs}", got "{self.hs_stripe}"') + assert self.server.hs_stripe == s_hs, \ + f'Expected "{s_hs}", got "{self.server.hs_stripe}"\n' + + def assert_non_resume_handshake(self, allow_hello_retry=True): + # for client/server where KEY_SHARE do not match, the hello is retried + c_hs, s_hs = self.norm_exp( + "ServerHello:EncryptedExtensions:Certificate:CertificateVerify:Finished", + "ClientHello:Finished", allow_hello_retry=allow_hello_retry) + self._assert_hs(c_hs, s_hs) + + def assert_resume_handshake(self): + # for client/server where KEY_SHARE do not match, the hello is retried + c_hs, s_hs = self.norm_exp("ServerHello:EncryptedExtensions:Finished", + "ClientHello:Finished") + self._assert_hs(c_hs, s_hs) + + def assert_verify_null_handshake(self): + c_hs, s_hs = self.norm_exp( + "ServerHello:EncryptedExtensions:CertificateRequest:Certificate:CertificateVerify:Finished", + "ClientHello:Certificate:Finished") + self._assert_hs(c_hs, s_hs) + + def assert_verify_cert_handshake(self): + c_hs, s_hs = self.norm_exp( + "ServerHello:EncryptedExtensions:CertificateRequest:Certificate:CertificateVerify:Finished", + "ClientHello:Certificate:CertificateVerify:Finished") + self._assert_hs(c_hs, s_hs) + + +class ExampleClient: + + def __init__(self, env: Env, crypto_lib: str): + self.env = env + self._crypto_lib = crypto_lib + self._path = env.client_path(self._crypto_lib) + self._log_path = f'{self.env.gen_dir}/{self._crypto_lib}-client.log' + self._qlog_path = f'{self.env.gen_dir}/{self._crypto_lib}-client.qlog' + self._session_path = f'{self.env.gen_dir}/{self._crypto_lib}-client.session' + self._tp_path = f'{self.env.gen_dir}/{self._crypto_lib}-client.tp' + self._data_path = f'{self.env.gen_dir}/{self._crypto_lib}-client.data' + + @property + def path(self): + return self._path + + @property + def crypto_lib(self): + return self._crypto_lib + + @property + def uses_cipher_config(self): + return CryptoLib.uses_cipher_config(self.crypto_lib) + + def supports_cipher(self, cipher): + return CryptoLib.supports_cipher(self.crypto_lib, cipher) + + def exists(self): + return os.path.isfile(self.path) + + def clear_session(self): + if os.path.isfile(self._session_path): + os.remove(self._session_path) + if os.path.isfile(self._tp_path): + os.remove(self._tp_path) + + def http_get(self, server: ExampleServer, url: str, extra_args: List[str] = None, + use_session=False, data=None, + credentials: Credentials = None, + ciphers: str = None): + args = [ + self.path, '--exit-on-all-streams-close', + f'--qlog-file={self._qlog_path}' + ] + if use_session: + args.append(f'--session-file={self._session_path}') + args.append(f'--tp-file={self._tp_path}') + if data is not None: + with open(self._data_path, 'w') as fd: + fd.write(data) + args.append(f'--data={self._data_path}') + if ciphers is not None: + ciphers = CryptoLib.adjust_ciphers(self.crypto_lib, ciphers) + args.append(f'--ciphers={ciphers}') + if credentials is not None: + args.append(f'--key={credentials.pkey_file}') + args.append(f'--cert={credentials.cert_file}') + if extra_args is not None: + args.extend(extra_args) + args.extend([ + 'localhost', str(self.env.examples_port), + url + ]) + if os.path.isfile(self._qlog_path): + os.remove(self._qlog_path) + with open(self._log_path, 'w') as log_file: + logfile = LogFile(path=self._log_path) + server.log.advance() + process = subprocess.Popen(args=args, text=True, + stdout=log_file, stderr=log_file) + process.wait() + return ClientRun(env=self.env, returncode=process.returncode, + logfile=logfile, srun=server.get_run()) + |