summaryrefslogtreecommitdiffstats
path: root/bin/tests/system/statschannel/generic_dnspython.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/tests/system/statschannel/generic_dnspython.py')
-rw-r--r--bin/tests/system/statschannel/generic_dnspython.py128
1 files changed, 0 insertions, 128 deletions
diff --git a/bin/tests/system/statschannel/generic_dnspython.py b/bin/tests/system/statschannel/generic_dnspython.py
deleted file mode 100644
index 34a0398..0000000
--- a/bin/tests/system/statschannel/generic_dnspython.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-from collections import defaultdict
-
-import dns.message
-import dns.query
-import dns.rcode
-
-
-TIMEOUT = 10
-
-
-def create_msg(qname, qtype):
- msg = dns.message.make_query(
- qname, qtype, want_dnssec=True, use_edns=0, payload=4096
- )
-
- return msg
-
-
-def udp_query(ip, port, msg):
- ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
- assert ans.rcode() == dns.rcode.NOERROR
-
- return ans
-
-
-def tcp_query(ip, port, msg):
- ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
- assert ans.rcode() == dns.rcode.NOERROR
-
- return ans
-
-
-def create_expected(data):
- expected = {
- "dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
- "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
- "dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
- "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
- "dns-udp-requests-sizes-received-ipv4": defaultdict(int),
- "dns-udp-requests-sizes-received-ipv6": defaultdict(int),
- "dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
- "dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
- }
-
- for k, v in data.items():
- for kk, vv in v.items():
- expected[k][kk] += vv
-
- return expected
-
-
-def update_expected(expected, key, msg):
- msg_len = len(msg.to_wire())
- bucket_num = (msg_len // 16) * 16
- bucket = "{}-{}".format(bucket_num, bucket_num + 15)
-
- expected[key][bucket] += 1
-
-
-def check_traffic(data, expected):
- def ordered(obj):
- if isinstance(obj, dict):
- return sorted((k, ordered(v)) for k, v in obj.items())
- if isinstance(obj, list):
- return sorted(ordered(x) for x in obj)
- return obj
-
- ordered_data = ordered(data)
- ordered_expected = ordered(expected)
-
- assert len(ordered_data) == 8
- assert len(ordered_expected) == 8
- assert len(data) == len(ordered_data)
- assert len(expected) == len(ordered_expected)
-
- assert ordered_data == ordered_expected
-
-
-def test_traffic(fetch_traffic, **kwargs):
- statsip = kwargs["statsip"]
- statsport = kwargs["statsport"]
- port = kwargs["port"]
-
- data = fetch_traffic(statsip, statsport)
- exp = create_expected(data)
-
- msg = create_msg("short.example.", "TXT")
- update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = udp_query(statsip, port, msg)
- update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
-
- msg = create_msg("long.example.", "TXT")
- update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = udp_query(statsip, port, msg)
- update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
-
- msg = create_msg("short.example.", "TXT")
- update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = tcp_query(statsip, port, msg)
- update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
-
- msg = create_msg("long.example.", "TXT")
- update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = tcp_query(statsip, port, msg)
- update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)