summaryrefslogtreecommitdiffstats
path: root/tests/pytests/test_rehandshake.py
blob: ffbc10bdb63acfcd1d7b066cb491a4634bb8e2d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""TLS rehandshake test

Test utilizes rehandshake/tls-proxy, which forwards queries to configured
resolver, but when it sends the response back to the query source, it
performs a rehandshake after every 8 bytes sent.

It is expected the answer will be received by the source kresd instance
and sent back to the client (this test).

Make sure to run `make all` in `rehandshake/` to compile the proxy.
"""

import os
import re
import subprocess
import time

import dns
import dns.rcode
import pytest

from kresd import CERTS_DIR, Forward, make_kresd, PYTESTS_DIR
import utils


REHANDSHAKE_PROXY = os.path.join(PYTESTS_DIR, 'rehandshake', 'tlsproxy')


@pytest.mark.skipif(not os.path.exists(REHANDSHAKE_PROXY),
                    reason="tlsproxy not found (did you compile it?)")
def test_rehandshake(tmpdir):
    def resolve_hint(sock, qname):
        buff, msgid = utils.get_msgbuff(qname)
        sock.sendall(buff)
        answer = utils.receive_parse_answer(sock)
        assert answer.id == msgid
        assert answer.rcode() == dns.rcode.NOERROR
        assert answer.answer[0][0].address == '127.0.0.1'

    hints = {
        '0.foo.': '127.0.0.1',
        '1.foo.': '127.0.0.1',
        '2.foo.': '127.0.0.1',
        '3.foo.': '127.0.0.1',
    }
    # run forward target instance
    workdir = os.path.join(str(tmpdir), 'kresd_fwd_target')
    os.makedirs(workdir)

    with make_kresd(workdir, hints=hints, port=53910) as kresd_fwd_target:
        sock = kresd_fwd_target.ip_tls_socket()
        resolve_hint(sock, '0.foo.')

        # run proxy
        cwd, cmd = os.path.split(REHANDSHAKE_PROXY)
        cmd = './' + cmd
        ca_file = os.path.join(CERTS_DIR, 'tt.cert.pem')
        try:
            proxy = subprocess.Popen(
                [cmd], cwd=cwd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

            # run test kresd instance
            workdir2 = os.path.join(str(tmpdir), 'kresd')
            os.makedirs(workdir2)
            forward = Forward(proto='tls', ip='127.0.0.1', port=53921,
                              hostname='transport-test-server.com', ca_file=ca_file)
            with make_kresd(workdir2, forward=forward) as kresd:
                sock2 = kresd.ip_tcp_socket()
                try:
                    for hint in hints:
                        resolve_hint(sock2, hint)
                        time.sleep(0.1)
                finally:
                    # verify log
                    n_connecting_to = 0
                    n_rehandshake = 0
                    partial_log = kresd.partial_log()
                    print(partial_log)
                    for line in partial_log.splitlines():
                        if re.search(r"connecting to: .*", line) is not None:
                            n_connecting_to += 1
                        elif re.search(r"TLS rehandshake .* has started", line) is not None:
                            n_rehandshake += 1
                    assert n_connecting_to == 0  # shouldn't be present in partial log
                    assert n_rehandshake > 0
        finally:
            proxy.terminate()