167 lines
5.7 KiB
Python
Executable file
167 lines
5.7 KiB
Python
Executable file
import logging
|
|
import os
|
|
import random
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from ipaddress import ip_address
|
|
|
|
import dpkt
|
|
import pytest
|
|
|
|
import deckard
|
|
from contrib.namespaces import LinuxNamespace
|
|
from networking import InterfaceManager
|
|
|
|
|
|
def set_coverage_env(path, qmin):
|
|
"""Sets up enviroment variables so code coverage utility can work."""
|
|
if os.environ.get("COVERAGE"):
|
|
exports = subprocess.check_output([os.environ["COVERAGE_ENV_SCRIPT"],
|
|
os.environ["DAEMONSRCDIR"],
|
|
os.environ["COVERAGE_STATSDIR"],
|
|
path + "-qmin-" + str(qmin)]).decode()
|
|
for export in exports.split():
|
|
key, value = export.split("=", 1)
|
|
value = value.strip('"')
|
|
os.environ[key] = value
|
|
|
|
|
|
def check_platform():
|
|
if sys.platform == 'windows':
|
|
pytest.exit('Not supported at all on Windows')
|
|
|
|
|
|
# Suppress extensive Augeas logging
|
|
logging.getLogger("augeas").setLevel(logging.ERROR)
|
|
|
|
|
|
check_platform()
|
|
|
|
|
|
class DeckardUnderLoadError(Exception):
|
|
pass
|
|
|
|
|
|
class TCPDump:
|
|
"""This context manager captures a PCAP file and than checks it for obvious errors."""
|
|
|
|
DUMPCAP_CMD = ["dumpcap", "-i", "any", "-q", "-P", "-w"]
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.config["tmpdir"] = self.get_tmpdir()
|
|
self.tcpdump = None
|
|
self.config["pcap"] = os.path.join(self.config["tmpdir"], "deckard.pcap")
|
|
|
|
def __enter__(self):
|
|
cmd = self.DUMPCAP_CMD.copy()
|
|
cmd.append(self.config["pcap"])
|
|
# pylint: disable=consider-using-with
|
|
self.tcpdump = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
|
|
def __exit__(self, _, exc_value, __):
|
|
# Wait for the PCAP to be finalized
|
|
while not os.path.exists(self.config["pcap"]):
|
|
time.sleep(1)
|
|
|
|
self.tcpdump.terminate()
|
|
self.tcpdump.wait()
|
|
|
|
self.check_for_unknown_server()
|
|
|
|
if exc_value is None:
|
|
if self.config.get('noclean') or "DECKARD_NOCLEAN" in os.environ:
|
|
# Do not clear files if the server crashed (for analysis)
|
|
logging.getLogger('deckard.hint').info(
|
|
'test working directory %s', self.config["tmpdir"])
|
|
else:
|
|
shutil.rmtree(self.config["tmpdir"])
|
|
else:
|
|
if isinstance(exc_value, ValueError):
|
|
self.check_for_icmp()
|
|
raise
|
|
|
|
@staticmethod
|
|
def get_tmpdir():
|
|
if "DECKARD_DIR" in os.environ:
|
|
tmpdir = os.environ["DECKARD_DIR"]
|
|
if os.path.lexists(tmpdir):
|
|
raise ValueError(f'DECKARD_DIR "{tmpdir}" must not exist')
|
|
else:
|
|
tmpdir = tempfile.mkdtemp(suffix='', prefix='tmpdeckard')
|
|
|
|
return tmpdir
|
|
|
|
def check_for_icmp(self):
|
|
""" Checks Deckards's PCAP for ICMP packets """
|
|
# Deckard's responses to resolvers might be delayed due to load which
|
|
# leads the resolver to close the port and to the test failing in the
|
|
# end. We partially detect these by checking the PCAP for ICMP packets.
|
|
udp_seen = False
|
|
with open(self.config["pcap"], "rb") as f:
|
|
pcap = dpkt.pcap.Reader(f)
|
|
for _, packet in pcap:
|
|
ip = dpkt.sll.SLL(packet).data
|
|
|
|
if isinstance(ip.data, dpkt.udp.UDP):
|
|
udp_seen = True
|
|
|
|
if udp_seen:
|
|
if isinstance(ip.data, (dpkt.icmp.ICMP, dpkt.icmp6.ICMP6)):
|
|
raise DeckardUnderLoadError("Deckard is under load. "
|
|
"Other errors might be false negatives. "
|
|
"Consider retrying the job later.")
|
|
|
|
def check_for_unknown_server(self):
|
|
unknown_addresses = set()
|
|
with open(self.config["pcap"], "rb") as f:
|
|
pcap = dpkt.pcap.Reader(f)
|
|
for _, packet in pcap:
|
|
ip = dpkt.sll.SLL(packet).data
|
|
try:
|
|
if ip.p != dpkt.ip.IP_PROTO_TCP or ip.p != dpkt.ip.IP_PROTO_UDP:
|
|
continue
|
|
except AttributeError:
|
|
continue
|
|
dest = str(ip_address(ip.dst))
|
|
if dest not in self.config["if_manager"].added_addresses:
|
|
unknown_addresses.add(dest)
|
|
|
|
if unknown_addresses:
|
|
raise RuntimeError("Binary under test queried an IP address not present"
|
|
f" in scenario {unknown_addresses}")
|
|
|
|
|
|
def run_test(path, qmin, config, max_retries, retries=0):
|
|
set_coverage_env(path, qmin)
|
|
|
|
try:
|
|
with LinuxNamespace("net"):
|
|
config["if_manager"] = InterfaceManager()
|
|
with TCPDump(config):
|
|
deckard.process_file(path, qmin, config)
|
|
except deckard.DeckardUnderLoadError as e:
|
|
if retries < max_retries:
|
|
logging.error("Deckard under load. Retrying…")
|
|
# Exponential backoff
|
|
time.sleep((2 ** retries) + random.random())
|
|
run_test(path, qmin, config, max_retries, retries + 1)
|
|
else:
|
|
raise e
|
|
|
|
|
|
def test_passes_qmin_on(scenario, max_retries):
|
|
if scenario.qmin is True or scenario.qmin is None:
|
|
run_test(scenario.path, True, scenario.config, max_retries)
|
|
else:
|
|
pytest.skip("Query minimization is off in test config")
|
|
|
|
|
|
def test_passes_qmin_off(scenario, max_retries):
|
|
if scenario.qmin is False or scenario.qmin is None:
|
|
run_test(scenario.path, False, scenario.config, max_retries)
|
|
else:
|
|
pytest.skip("Query minimization is on in test config")
|