summaryrefslogtreecommitdiffstats
path: root/tests/integration/deckard/networking.py
blob: 6b76273f950096d52ecabb170021cfa8f93c00d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import errno
from ipaddress import IPv4Network, IPv6Network, ip_address
from socket import AF_INET, AF_INET6

# pylint: disable=no-name-in-module,import-error
from pyroute2 import IPRoute
from pyroute2.netlink.rtnl import ndmsg
from pyroute2.netlink.exceptions import NetlinkError
# pylint: enable=no-name-in-module,import-error


class InterfaceManager:
    """Wrapper for the `ip` command."""

    def __init__(self,
                 interface="deckard",
                 ip4_range=IPv4Network('127.127.0.0/16'),
                 ip6_range=IPv6Network('fd00:dec::/32')):
        self.ip4_internal_range = ip4_range
        self.ip6_internal_range = ip6_range
        self.ip4_iterator = (host for host in ip4_range)
        self.ip6_iterator = (host for host in ip6_range)
        self.added_addresses = set()
        self.interface = interface

        self._ip = IPRoute()
        try:
            self._dev = self._setup_interface()
        except NetlinkError as ex:
            raise RuntimeError(f"Couldn't set interface `{self.interface}` up.") from ex

    def _setup_interface(self):
        """Set up a dummy interface with default route as well as loopback.
           This is done so the resulting PCAP contains as much of the communication
           as possible (including ICMP Destination unreachable packets etc.)."""

        # Create and set the interface up.
        self._ip.link("add", ifname=self.interface, kind="dummy")
        dev = self._ip.link_lookup(ifname=self.interface)[0]
        self._ip.link("set", index=dev, state="up")

        # Set up default route for both IPv6 and IPv4
        self._ip.neigh("add", dst='169.254.1.1', lladdr='21:21:21:21:21:21',
                       state=ndmsg.states['permanent'], ifindex=dev)
        self._ip.neigh("add", family=AF_INET6, dst='fe80::1', lladdr='21:21:21:21:21:21',
                       state=ndmsg.states['permanent'], ifindex=dev)
        self._ip.addr("add", index=dev, address="169.254.1.2", mask=24)
        self._ip.route("add", gateway="169.254.1.1", oif=dev)
        self._ip.route("add", family=AF_INET6, gateway='fe80::1', oif=dev)

        # Set the loopback up as well since some of the packets go through there.
        lo = self._ip.link_lookup(ifname="lo")[0]
        self._ip.link("set", index=lo, state="up")

        # Return internal interface ID for later use
        return dev

    def assign_internal_address(self, sockfamily) -> str:
        """Add and return new address from the internal range"""
        try:
            if sockfamily == AF_INET:
                a = str(next(self.ip4_iterator))
            elif sockfamily == AF_INET6:
                a = str(next(self.ip6_iterator))
            else:
                raise ValueError(f"Unknown sockfamily {sockfamily}")
        except StopIteration as ex:
            raise RuntimeError("Out of addresses.") from ex

        self._add_address(a)
        return a

    def add_address(self, address: str, check_duplicate=False):
        """Add an arbitrary new address to the interface"""
        if address in self.added_addresses and check_duplicate:
            raise ValueError(f"Tried to add duplicate address {address}")
        if ip_address(address) in self.ip4_internal_range or \
           ip_address(address) in self.ip6_internal_range:
            raise ValueError(f"Address {address} in the internally reserved range.")
        self._add_address(address)

    def _add_address(self, address):
        if ":" in address:
            mask = 128
        else:
            mask = 32
        try:
            self._ip.addr("add", index=self._dev, address=address, mask=mask, nodad=True)
        except NetlinkError as ex:
            if ex.code != errno.EEXIST:  # 'RTNETLINK answers: File exists' is OK here
                raise ValueError(f"Couldn't add {address}") from ex

        self.added_addresses.add(address)