summaryrefslogtreecommitdiffstats
path: root/bin/tests/system/statschannel/generic_dnspython.py
blob: 34a0398108ae49971dd2e811d24437e1fcb637de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0.  If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.

from collections import defaultdict

import dns.message
import dns.query
import dns.rcode


TIMEOUT = 10


def create_msg(qname, qtype):
    msg = dns.message.make_query(
        qname, qtype, want_dnssec=True, use_edns=0, payload=4096
    )

    return msg


def udp_query(ip, port, msg):
    ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
    assert ans.rcode() == dns.rcode.NOERROR

    return ans


def tcp_query(ip, port, msg):
    ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
    assert ans.rcode() == dns.rcode.NOERROR

    return ans


def create_expected(data):
    expected = {
        "dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
        "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
        "dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
        "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
        "dns-udp-requests-sizes-received-ipv4": defaultdict(int),
        "dns-udp-requests-sizes-received-ipv6": defaultdict(int),
        "dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
        "dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
    }

    for k, v in data.items():
        for kk, vv in v.items():
            expected[k][kk] += vv

    return expected


def update_expected(expected, key, msg):
    msg_len = len(msg.to_wire())
    bucket_num = (msg_len // 16) * 16
    bucket = "{}-{}".format(bucket_num, bucket_num + 15)

    expected[key][bucket] += 1


def check_traffic(data, expected):
    def ordered(obj):
        if isinstance(obj, dict):
            return sorted((k, ordered(v)) for k, v in obj.items())
        if isinstance(obj, list):
            return sorted(ordered(x) for x in obj)
        return obj

    ordered_data = ordered(data)
    ordered_expected = ordered(expected)

    assert len(ordered_data) == 8
    assert len(ordered_expected) == 8
    assert len(data) == len(ordered_data)
    assert len(expected) == len(ordered_expected)

    assert ordered_data == ordered_expected


def test_traffic(fetch_traffic, **kwargs):
    statsip = kwargs["statsip"]
    statsport = kwargs["statsport"]
    port = kwargs["port"]

    data = fetch_traffic(statsip, statsport)
    exp = create_expected(data)

    msg = create_msg("short.example.", "TXT")
    update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
    ans = udp_query(statsip, port, msg)
    update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
    data = fetch_traffic(statsip, statsport)

    check_traffic(data, exp)

    msg = create_msg("long.example.", "TXT")
    update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
    ans = udp_query(statsip, port, msg)
    update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
    data = fetch_traffic(statsip, statsport)

    check_traffic(data, exp)

    msg = create_msg("short.example.", "TXT")
    update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
    ans = tcp_query(statsip, port, msg)
    update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
    data = fetch_traffic(statsip, statsport)

    check_traffic(data, exp)

    msg = create_msg("long.example.", "TXT")
    update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
    ans = tcp_query(statsip, port, msg)
    update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
    data = fetch_traffic(statsip, statsport)

    check_traffic(data, exp)