From 830407e88f9d40d954356c3754f2647f91d5c06a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 17:26:00 +0200 Subject: Adding upstream version 5.6.0. Signed-off-by: Daniel Baumann --- tests/integration/deckard/tools/answer_checker.py | 84 +++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 tests/integration/deckard/tools/answer_checker.py (limited to 'tests/integration/deckard/tools/answer_checker.py') diff --git a/tests/integration/deckard/tools/answer_checker.py b/tests/integration/deckard/tools/answer_checker.py new file mode 100644 index 0000000..3754ef3 --- /dev/null +++ b/tests/integration/deckard/tools/answer_checker.py @@ -0,0 +1,84 @@ +"""Functions for sending DNS queries and checking recieved answers checking""" +# pylint: disable=C0301 +# flake8: noqa + +from ipaddress import IPv4Address, IPv6Address +import random +from typing import Iterable, Optional, Set, Union + +import dns.message +import dns.flags + +import pydnstest.matchpart +import pydnstest.mock_client + +def unset_flag(message: dns.message.Message, flag: int) -> dns.message.Message: + """Unsets given flag in given DNS message.""" + message.flags &= ~flag + return message + + +def send_and_check(question: Union[dns.message.Message, bytes], # pylint: disable=R0913 + expected: dns.message.Message, + server: Union[IPv4Address, IPv6Address], + match_fields: Set[str], + port: int = 53, + tcp: bool = False, + timeout: int = pydnstest.mock_client.SOCKET_OPERATION_TIMEOUT, + unset_flags: Iterable[int] = tuple()) -> bool: + """Checks if DNS answer recieved for a question from a server matches expected one in specified + field. See pydnstest.matchpart for more information on match fields + + Returns True on success, raises an exceptions on failure. + """ + print("Sending query:\n%s\n" % str(question)) + answer = get_answer(question, server, port, tcp, timeout=timeout) + + for flag in unset_flags: + answer = unset_flag(answer, flag) + + print("Got answer:\n%s\n" % answer) + print("Matching:\n%s\n%s\n" % (match_fields, expected)) + for field in match_fields: + pydnstest.matchpart.match_part(expected, answer, field) + + return True + + +def get_answer(question: Union[dns.message.Message, bytes], + server: Union[IPv4Address, IPv6Address], + port: int = 53, + tcp: bool = False, + timeout: int = pydnstest.mock_client.SOCKET_OPERATION_TIMEOUT) -> dns.message.Message: + """Get an DNS message with answer with specific query""" + sock = pydnstest.mock_client.setup_socket(str(server), port, tcp=tcp) + with sock: + pydnstest.mock_client.send_query(sock, question) + return pydnstest.mock_client.get_dns_message(sock, timeout=timeout) + + +def string_answer(question: Union[dns.message.Message, bytes], + server: Union[IPv4Address, IPv6Address], + port: int = 53, + tcp: bool = False) -> str: + """Prints answer of a server. Good for generating tests.""" + return get_answer(question, server, port, tcp).to_text() + + +def randomize_case(label: bytes) -> bytes: + """Randomize case in a DNS name label""" + output = [] + for byte in label: + if random.randint(0, 1): + output.append(bytes([byte]).swapcase()) + else: + output.append(bytes([byte])) + return b''.join(output) + + +def make_random_case_query(name: str, *args, **kwargs) -> dns.message.Message: + """Proxy for dns.message.make_query with rANdoM-cASe""" + query = dns.message.make_query(name, *args, **kwargs) + for label in query.question[0].name.labels: + label = randomize_case(label) + return query -- cgit v1.2.3