summaryrefslogtreecommitdiffstats
path: root/tests/integration/deckard/deckard_pytest.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-08 20:37:50 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-08 20:37:50 +0000
commitc1f743ab2e4a7046d5500875a47d1f62c8624603 (patch)
tree709946d52f5f3bbaeb38be9e3f1d56d11f058237 /tests/integration/deckard/deckard_pytest.py
parentInitial commit. (diff)
downloadknot-resolver-upstream/5.7.1.tar.xz
knot-resolver-upstream/5.7.1.zip
Adding upstream version 5.7.1.upstream/5.7.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/integration/deckard/deckard_pytest.py')
-rwxr-xr-xtests/integration/deckard/deckard_pytest.py167
1 files changed, 167 insertions, 0 deletions
diff --git a/tests/integration/deckard/deckard_pytest.py b/tests/integration/deckard/deckard_pytest.py
new file mode 100755
index 0000000..b7b7a7e
--- /dev/null
+++ b/tests/integration/deckard/deckard_pytest.py
@@ -0,0 +1,167 @@
+import logging
+import os
+import random
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+from ipaddress import ip_address
+
+import dpkt
+import pytest
+
+import deckard
+from contrib.namespaces import LinuxNamespace
+from networking import InterfaceManager
+
+
+def set_coverage_env(path, qmin):
+ """Sets up enviroment variables so code coverage utility can work."""
+ if os.environ.get("COVERAGE"):
+ exports = subprocess.check_output([os.environ["COVERAGE_ENV_SCRIPT"],
+ os.environ["DAEMONSRCDIR"],
+ os.environ["COVERAGE_STATSDIR"],
+ path + "-qmin-" + str(qmin)]).decode()
+ for export in exports.split():
+ key, value = export.split("=", 1)
+ value = value.strip('"')
+ os.environ[key] = value
+
+
+def check_platform():
+ if sys.platform == 'windows':
+ pytest.exit('Not supported at all on Windows')
+
+
+# Suppress extensive Augeas logging
+logging.getLogger("augeas").setLevel(logging.ERROR)
+
+
+check_platform()
+
+
+class DeckardUnderLoadError(Exception):
+ pass
+
+
+class TCPDump:
+ """This context manager captures a PCAP file and than checks it for obvious errors."""
+
+ DUMPCAP_CMD = ["dumpcap", "-i", "any", "-q", "-P", "-w"]
+
+ def __init__(self, config):
+ self.config = config
+ self.config["tmpdir"] = self.get_tmpdir()
+ self.tcpdump = None
+ self.config["pcap"] = os.path.join(self.config["tmpdir"], "deckard.pcap")
+
+ def __enter__(self):
+ cmd = self.DUMPCAP_CMD.copy()
+ cmd.append(self.config["pcap"])
+ # pylint: disable=consider-using-with
+ self.tcpdump = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ def __exit__(self, _, exc_value, __):
+ # Wait for the PCAP to be finalized
+ while not os.path.exists(self.config["pcap"]):
+ time.sleep(1)
+
+ self.tcpdump.terminate()
+ self.tcpdump.wait()
+
+ self.check_for_unknown_server()
+
+ if exc_value is None:
+ if self.config.get('noclean') or "DECKARD_NOCLEAN" in os.environ:
+ # Do not clear files if the server crashed (for analysis)
+ logging.getLogger('deckard.hint').info(
+ 'test working directory %s', self.config["tmpdir"])
+ else:
+ shutil.rmtree(self.config["tmpdir"])
+ else:
+ if isinstance(exc_value, ValueError):
+ self.check_for_icmp()
+ raise
+
+ @staticmethod
+ def get_tmpdir():
+ if "DECKARD_DIR" in os.environ:
+ tmpdir = os.environ["DECKARD_DIR"]
+ if os.path.lexists(tmpdir):
+ raise ValueError('DECKARD_DIR "%s" must not exist' % tmpdir)
+ else:
+ tmpdir = tempfile.mkdtemp(suffix='', prefix='tmpdeckard')
+
+ return tmpdir
+
+ def check_for_icmp(self):
+ """ Checks Deckards's PCAP for ICMP packets """
+ # Deckard's responses to resolvers might be delayed due to load which
+ # leads the resolver to close the port and to the test failing in the
+ # end. We partially detect these by checking the PCAP for ICMP packets.
+ udp_seen = False
+ with open(self.config["pcap"], "rb") as f:
+ pcap = dpkt.pcap.Reader(f)
+ for _, packet in pcap:
+ ip = dpkt.sll.SLL(packet).data
+
+ if isinstance(ip.data, dpkt.udp.UDP):
+ udp_seen = True
+
+ if udp_seen:
+ if isinstance(ip.data, (dpkt.icmp.ICMP, dpkt.icmp6.ICMP6)):
+ raise DeckardUnderLoadError("Deckard is under load. "
+ "Other errors might be false negatives. "
+ "Consider retrying the job later.")
+
+ def check_for_unknown_server(self):
+ unknown_addresses = set()
+ with open(self.config["pcap"], "rb") as f:
+ pcap = dpkt.pcap.Reader(f)
+ for _, packet in pcap:
+ ip = dpkt.sll.SLL(packet).data
+ try:
+ if ip.p != dpkt.ip.IP_PROTO_TCP or ip.p != dpkt.ip.IP_PROTO_UDP:
+ continue
+ except AttributeError:
+ continue
+ dest = str(ip_address(ip.dst))
+ if dest not in self.config["if_manager"].added_addresses:
+ unknown_addresses.add(dest)
+
+ if unknown_addresses:
+ raise RuntimeError("Binary under test queried an IP address not present"
+ " in scenario %s" % unknown_addresses)
+
+
+def run_test(path, qmin, config, max_retries, retries=0):
+ set_coverage_env(path, qmin)
+
+ try:
+ with LinuxNamespace("net"):
+ config["if_manager"] = InterfaceManager()
+ with TCPDump(config):
+ deckard.process_file(path, qmin, config)
+ except deckard.DeckardUnderLoadError as e:
+ if retries < max_retries:
+ logging.error("Deckard under load. Retrying…")
+ # Exponential backoff
+ time.sleep((2 ** retries) + random.random())
+ run_test(path, qmin, config, max_retries, retries + 1)
+ else:
+ raise e
+
+
+def test_passes_qmin_on(scenario, max_retries):
+ if scenario.qmin is True or scenario.qmin is None:
+ run_test(scenario.path, True, scenario.config, max_retries)
+ else:
+ pytest.skip("Query minimization is off in test config")
+
+
+def test_passes_qmin_off(scenario, max_retries):
+ if scenario.qmin is False or scenario.qmin is None:
+ run_test(scenario.path, False, scenario.config, max_retries)
+ else:
+ pytest.skip("Query minimization is on in test config")