summaryrefslogtreecommitdiffstats
path: root/tests/integration/deckard/networking.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/integration/deckard/networking.py93
1 files changed, 93 insertions, 0 deletions
diff --git a/tests/integration/deckard/networking.py b/tests/integration/deckard/networking.py
new file mode 100644
index 0000000..6b76273
--- /dev/null
+++ b/tests/integration/deckard/networking.py
@@ -0,0 +1,93 @@
+import errno
+from ipaddress import IPv4Network, IPv6Network, ip_address
+from socket import AF_INET, AF_INET6
+
+# pylint: disable=no-name-in-module,import-error
+from pyroute2 import IPRoute
+from pyroute2.netlink.rtnl import ndmsg
+from pyroute2.netlink.exceptions import NetlinkError
+# pylint: enable=no-name-in-module,import-error
+
+
+class InterfaceManager:
+ """Wrapper for the `ip` command."""
+
+ def __init__(self,
+ interface="deckard",
+ ip4_range=IPv4Network('127.127.0.0/16'),
+ ip6_range=IPv6Network('fd00:dec::/32')):
+ self.ip4_internal_range = ip4_range
+ self.ip6_internal_range = ip6_range
+ self.ip4_iterator = (host for host in ip4_range)
+ self.ip6_iterator = (host for host in ip6_range)
+ self.added_addresses = set()
+ self.interface = interface
+
+ self._ip = IPRoute()
+ try:
+ self._dev = self._setup_interface()
+ except NetlinkError as ex:
+ raise RuntimeError(f"Couldn't set interface `{self.interface}` up.") from ex
+
+ def _setup_interface(self):
+ """Set up a dummy interface with default route as well as loopback.
+ This is done so the resulting PCAP contains as much of the communication
+ as possible (including ICMP Destination unreachable packets etc.)."""
+
+ # Create and set the interface up.
+ self._ip.link("add", ifname=self.interface, kind="dummy")
+ dev = self._ip.link_lookup(ifname=self.interface)[0]
+ self._ip.link("set", index=dev, state="up")
+
+ # Set up default route for both IPv6 and IPv4
+ self._ip.neigh("add", dst='169.254.1.1', lladdr='21:21:21:21:21:21',
+ state=ndmsg.states['permanent'], ifindex=dev)
+ self._ip.neigh("add", family=AF_INET6, dst='fe80::1', lladdr='21:21:21:21:21:21',
+ state=ndmsg.states['permanent'], ifindex=dev)
+ self._ip.addr("add", index=dev, address="169.254.1.2", mask=24)
+ self._ip.route("add", gateway="169.254.1.1", oif=dev)
+ self._ip.route("add", family=AF_INET6, gateway='fe80::1', oif=dev)
+
+ # Set the loopback up as well since some of the packets go through there.
+ lo = self._ip.link_lookup(ifname="lo")[0]
+ self._ip.link("set", index=lo, state="up")
+
+ # Return internal interface ID for later use
+ return dev
+
+ def assign_internal_address(self, sockfamily) -> str:
+ """Add and return new address from the internal range"""
+ try:
+ if sockfamily == AF_INET:
+ a = str(next(self.ip4_iterator))
+ elif sockfamily == AF_INET6:
+ a = str(next(self.ip6_iterator))
+ else:
+ raise ValueError(f"Unknown sockfamily {sockfamily}")
+ except StopIteration as ex:
+ raise RuntimeError("Out of addresses.") from ex
+
+ self._add_address(a)
+ return a
+
+ def add_address(self, address: str, check_duplicate=False):
+ """Add an arbitrary new address to the interface"""
+ if address in self.added_addresses and check_duplicate:
+ raise ValueError(f"Tried to add duplicate address {address}")
+ if ip_address(address) in self.ip4_internal_range or \
+ ip_address(address) in self.ip6_internal_range:
+ raise ValueError(f"Address {address} in the internally reserved range.")
+ self._add_address(address)
+
+ def _add_address(self, address):
+ if ":" in address:
+ mask = 128
+ else:
+ mask = 32
+ try:
+ self._ip.addr("add", index=self._dev, address=address, mask=mask, nodad=True)
+ except NetlinkError as ex:
+ if ex.code != errno.EEXIST: # 'RTNETLINK answers: File exists' is OK here
+ raise ValueError(f"Couldn't add {address}") from ex
+
+ self.added_addresses.add(address)