summaryrefslogtreecommitdiffstats
path: root/bin/tests/system/doth/tests_gnutls.py
blob: 5ddb708fd2a3d40d61fbc068816397ffcea17ad5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python3

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0.  If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.

import selectors
import struct
import subprocess
import time

import pytest

pytest.importorskip("dns")
import dns.exception
import dns.message
import dns.name
import dns.rdataclass
import dns.rdatatype


def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
    # Prepare the example/SOA query which will be sent over TLS.
    query = dns.message.make_query("example.", dns.rdatatype.SOA)
    query_wire = query.to_wire()
    query_with_length = struct.pack(">H", len(query_wire)) + query_wire

    # Run gnutls-cli.
    gnutls_cli_args = [
        gnutls_cli_executable,
        "--no-ca-verification",
        "-V",
        "--no-ocsp",
        "--alpn=dot",
        "--logfile=gnutls-cli.log",
        "--port=%d" % named_tlsport,
        "10.53.0.1",
    ]
    with open("gnutls-cli.err", "wb") as gnutls_cli_stderr, subprocess.Popen(
        gnutls_cli_args,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=gnutls_cli_stderr,
        bufsize=0,
    ) as gnutls_cli:
        # Send the example/SOA query to the standard input of gnutls-cli.  Do
        # not close standard input yet because that causes gnutls-cli to close
        # the TLS connection immediately, preventing the response from being
        # read.
        gnutls_cli.stdin.write(query_with_length)
        gnutls_cli.stdin.flush()

        # Keep reading data from the standard output of gnutls-cli until a full
        # DNS message is received or a timeout is exceeded or gnutls-cli exits.
        # Popen.communicate() cannot be used here because: a) it closes
        # standard input after sending data to the process (see above why this
        # is a problem), b) gnutls-cli is not DNS-aware, so it does not exit
        # upon receiving a DNS response.
        selector = selectors.DefaultSelector()
        selector.register(gnutls_cli.stdout, selectors.EVENT_READ)
        deadline = time.time() + 10
        gnutls_cli_output = b""
        response = b""
        while not response and not gnutls_cli.poll():
            if not selector.select(timeout=deadline - time.time()):
                break
            gnutls_cli_output += gnutls_cli.stdout.read(512)
            try:
                # Ignore TCP length, just try to parse a DNS message from
                # the rest of the data received.
                response = dns.message.from_wire(gnutls_cli_output[2:])
            except dns.exception.FormError:
                continue

        # At this point either a DNS response was received or a timeout fired
        # or gnutls-cli exited prematurely.  Close the standard input of
        # gnutls-cli.  Terminate it if that does not cause it to shut down
        # gracefully.
        gnutls_cli.stdin.close()
        try:
            gnutls_cli.wait(5)
        except subprocess.TimeoutExpired:
            gnutls_cli.kill()

    # Store the response received for diagnostic purposes.
    with open("gnutls-cli.out.bin", "wb") as response_bin:
        response_bin.write(gnutls_cli_output)
    if response:
        with open("gnutls-cli.out.txt", "w", encoding="utf-8") as response_txt:
            response_txt.write(response.to_text())

    # Check whether a response was received and whether it is sane.
    assert response
    assert query.id == response.id
    assert len(response.answer) == 1
    assert response.answer[0].match(
        dns.name.from_text("example."),
        dns.rdataclass.IN,
        dns.rdatatype.SOA,
        dns.rdatatype.NONE,
    )