summaryrefslogtreecommitdiffstats
path: root/tests/pytests/test_rehandshake.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/pytests/test_rehandshake.py87
1 files changed, 87 insertions, 0 deletions
diff --git a/tests/pytests/test_rehandshake.py b/tests/pytests/test_rehandshake.py
new file mode 100644
index 0000000..ffbc10b
--- /dev/null
+++ b/tests/pytests/test_rehandshake.py
@@ -0,0 +1,87 @@
+"""TLS rehandshake test
+
+Test utilizes rehandshake/tls-proxy, which forwards queries to configured
+resolver, but when it sends the response back to the query source, it
+performs a rehandshake after every 8 bytes sent.
+
+It is expected the answer will be received by the source kresd instance
+and sent back to the client (this test).
+
+Make sure to run `make all` in `rehandshake/` to compile the proxy.
+"""
+
+import os
+import re
+import subprocess
+import time
+
+import dns
+import dns.rcode
+import pytest
+
+from kresd import CERTS_DIR, Forward, make_kresd, PYTESTS_DIR
+import utils
+
+
+REHANDSHAKE_PROXY = os.path.join(PYTESTS_DIR, 'rehandshake', 'tlsproxy')
+
+
+@pytest.mark.skipif(not os.path.exists(REHANDSHAKE_PROXY),
+ reason="tlsproxy not found (did you compile it?)")
+def test_rehandshake(tmpdir):
+ def resolve_hint(sock, qname):
+ buff, msgid = utils.get_msgbuff(qname)
+ sock.sendall(buff)
+ answer = utils.receive_parse_answer(sock)
+ assert answer.id == msgid
+ assert answer.rcode() == dns.rcode.NOERROR
+ assert answer.answer[0][0].address == '127.0.0.1'
+
+ hints = {
+ '0.foo.': '127.0.0.1',
+ '1.foo.': '127.0.0.1',
+ '2.foo.': '127.0.0.1',
+ '3.foo.': '127.0.0.1',
+ }
+ # run forward target instance
+ workdir = os.path.join(str(tmpdir), 'kresd_fwd_target')
+ os.makedirs(workdir)
+
+ with make_kresd(workdir, hints=hints, port=53910) as kresd_fwd_target:
+ sock = kresd_fwd_target.ip_tls_socket()
+ resolve_hint(sock, '0.foo.')
+
+ # run proxy
+ cwd, cmd = os.path.split(REHANDSHAKE_PROXY)
+ cmd = './' + cmd
+ ca_file = os.path.join(CERTS_DIR, 'tt.cert.pem')
+ try:
+ proxy = subprocess.Popen(
+ [cmd], cwd=cwd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+ # run test kresd instance
+ workdir2 = os.path.join(str(tmpdir), 'kresd')
+ os.makedirs(workdir2)
+ forward = Forward(proto='tls', ip='127.0.0.1', port=53921,
+ hostname='transport-test-server.com', ca_file=ca_file)
+ with make_kresd(workdir2, forward=forward) as kresd:
+ sock2 = kresd.ip_tcp_socket()
+ try:
+ for hint in hints:
+ resolve_hint(sock2, hint)
+ time.sleep(0.1)
+ finally:
+ # verify log
+ n_connecting_to = 0
+ n_rehandshake = 0
+ partial_log = kresd.partial_log()
+ print(partial_log)
+ for line in partial_log.splitlines():
+ if re.search(r"connecting to: .*", line) is not None:
+ n_connecting_to += 1
+ elif re.search(r"TLS rehandshake .* has started", line) is not None:
+ n_rehandshake += 1
+ assert n_connecting_to == 0 # shouldn't be present in partial log
+ assert n_rehandshake > 0
+ finally:
+ proxy.terminate()