1
0
Fork 0
knot-resolver/tests/integration/deckard/deckard_pytest.py
Daniel Baumann fbc604e215
Adding upstream version 5.7.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:56:17 +02:00

167 lines
5.7 KiB
Python
Executable file

import logging
import os
import random
import shutil
import subprocess
import sys
import tempfile
import time
from ipaddress import ip_address
import dpkt
import pytest
import deckard
from contrib.namespaces import LinuxNamespace
from networking import InterfaceManager
def set_coverage_env(path, qmin):
"""Sets up enviroment variables so code coverage utility can work."""
if os.environ.get("COVERAGE"):
exports = subprocess.check_output([os.environ["COVERAGE_ENV_SCRIPT"],
os.environ["DAEMONSRCDIR"],
os.environ["COVERAGE_STATSDIR"],
path + "-qmin-" + str(qmin)]).decode()
for export in exports.split():
key, value = export.split("=", 1)
value = value.strip('"')
os.environ[key] = value
def check_platform():
if sys.platform == 'windows':
pytest.exit('Not supported at all on Windows')
# Suppress extensive Augeas logging
logging.getLogger("augeas").setLevel(logging.ERROR)
check_platform()
class DeckardUnderLoadError(Exception):
pass
class TCPDump:
"""This context manager captures a PCAP file and than checks it for obvious errors."""
DUMPCAP_CMD = ["dumpcap", "-i", "any", "-q", "-P", "-w"]
def __init__(self, config):
self.config = config
self.config["tmpdir"] = self.get_tmpdir()
self.tcpdump = None
self.config["pcap"] = os.path.join(self.config["tmpdir"], "deckard.pcap")
def __enter__(self):
cmd = self.DUMPCAP_CMD.copy()
cmd.append(self.config["pcap"])
# pylint: disable=consider-using-with
self.tcpdump = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
def __exit__(self, _, exc_value, __):
# Wait for the PCAP to be finalized
while not os.path.exists(self.config["pcap"]):
time.sleep(1)
self.tcpdump.terminate()
self.tcpdump.wait()
self.check_for_unknown_server()
if exc_value is None:
if self.config.get('noclean') or "DECKARD_NOCLEAN" in os.environ:
# Do not clear files if the server crashed (for analysis)
logging.getLogger('deckard.hint').info(
'test working directory %s', self.config["tmpdir"])
else:
shutil.rmtree(self.config["tmpdir"])
else:
if isinstance(exc_value, ValueError):
self.check_for_icmp()
raise
@staticmethod
def get_tmpdir():
if "DECKARD_DIR" in os.environ:
tmpdir = os.environ["DECKARD_DIR"]
if os.path.lexists(tmpdir):
raise ValueError(f'DECKARD_DIR "{tmpdir}" must not exist')
else:
tmpdir = tempfile.mkdtemp(suffix='', prefix='tmpdeckard')
return tmpdir
def check_for_icmp(self):
""" Checks Deckards's PCAP for ICMP packets """
# Deckard's responses to resolvers might be delayed due to load which
# leads the resolver to close the port and to the test failing in the
# end. We partially detect these by checking the PCAP for ICMP packets.
udp_seen = False
with open(self.config["pcap"], "rb") as f:
pcap = dpkt.pcap.Reader(f)
for _, packet in pcap:
ip = dpkt.sll.SLL(packet).data
if isinstance(ip.data, dpkt.udp.UDP):
udp_seen = True
if udp_seen:
if isinstance(ip.data, (dpkt.icmp.ICMP, dpkt.icmp6.ICMP6)):
raise DeckardUnderLoadError("Deckard is under load. "
"Other errors might be false negatives. "
"Consider retrying the job later.")
def check_for_unknown_server(self):
unknown_addresses = set()
with open(self.config["pcap"], "rb") as f:
pcap = dpkt.pcap.Reader(f)
for _, packet in pcap:
ip = dpkt.sll.SLL(packet).data
try:
if ip.p != dpkt.ip.IP_PROTO_TCP or ip.p != dpkt.ip.IP_PROTO_UDP:
continue
except AttributeError:
continue
dest = str(ip_address(ip.dst))
if dest not in self.config["if_manager"].added_addresses:
unknown_addresses.add(dest)
if unknown_addresses:
raise RuntimeError("Binary under test queried an IP address not present"
f" in scenario {unknown_addresses}")
def run_test(path, qmin, config, max_retries, retries=0):
set_coverage_env(path, qmin)
try:
with LinuxNamespace("net"):
config["if_manager"] = InterfaceManager()
with TCPDump(config):
deckard.process_file(path, qmin, config)
except deckard.DeckardUnderLoadError as e:
if retries < max_retries:
logging.error("Deckard under load. Retrying…")
# Exponential backoff
time.sleep((2 ** retries) + random.random())
run_test(path, qmin, config, max_retries, retries + 1)
else:
raise e
def test_passes_qmin_on(scenario, max_retries):
if scenario.qmin is True or scenario.qmin is None:
run_test(scenario.path, True, scenario.config, max_retries)
else:
pytest.skip("Query minimization is off in test config")
def test_passes_qmin_off(scenario, max_retries):
if scenario.qmin is False or scenario.qmin is None:
run_test(scenario.path, False, scenario.config, max_retries)
else:
pytest.skip("Query minimization is on in test config")