diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-09 13:16:35 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-09 13:16:35 +0000 |
commit | e2bbf175a2184bd76f6c54ccf8456babeb1a46fc (patch) | |
tree | f0b76550d6e6f500ada964a3a4ee933a45e5a6f1 /tests/topotests/ospf_topo2/test_ospf_topo2.py | |
parent | Initial commit. (diff) | |
download | frr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.tar.xz frr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.zip |
Adding upstream version 9.1.upstream/9.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/topotests/ospf_topo2/test_ospf_topo2.py')
-rw-r--r-- | tests/topotests/ospf_topo2/test_ospf_topo2.py | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/tests/topotests/ospf_topo2/test_ospf_topo2.py b/tests/topotests/ospf_topo2/test_ospf_topo2.py new file mode 100644 index 0000000..8be06e4 --- /dev/null +++ b/tests/topotests/ospf_topo2/test_ospf_topo2.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# SPDX-License-Identifier: ISC +# +# test_ospf_topo2.py +# Part of NetDEF Topology Tests +# +# Copyright (c) 2017 by +# Network Device Education Foundation, Inc. ("NetDEF") +# + +""" +test_ospf_topo2.py: Test correct route removal. + +Proofs the following issue: +https://github.com/FRRouting/frr/issues/14488 + +""" + +import ipaddress +import json +import pytest +import sys +import time + +from lib.topogen import Topogen + + +pytestmark = [ + pytest.mark.ospf6d, + pytest.mark.ospfd, +] + + +def build_topo(tgen): + """Build the topology used by all tests below.""" + + # Create 4 routers + r1 = tgen.add_router("r1") + r2 = tgen.add_router("r2") + r3 = tgen.add_router("r3") + r4 = tgen.add_router("r4") + + # The r1/r2 and r3/r4 router pairs have two connections each + tgen.add_link(r1, r2, ifname1="eth1", ifname2="eth1") + tgen.add_link(r1, r2, ifname1="eth2", ifname2="eth2") + tgen.add_link(r3, r4, ifname1="eth2", ifname2="eth2") + tgen.add_link(r3, r4, ifname1="eth3", ifname2="eth3") + + # The r1/r4 and r2/r3 router pairs have one connection each + tgen.add_link(r1, r4, ifname1="eth3", ifname2="eth1") + tgen.add_link(r2, r3, ifname1="eth3", ifname2="eth1") + + +@pytest.fixture(scope="function") +def tgen(request): + """Setup/Teardown the environment and provide tgen argument to tests. + + Do this once per function as some of the tests will leave the router + in an unclean state. + + """ + + tgen = Topogen(build_topo, request.module.__name__) + tgen.start_topology() + + router_list = tgen.routers() + + for rname, router in router_list.items(): + router.load_frr_config("frr.conf") + + tgen.start_router() + + yield tgen + + tgen.stop_topology() + + +def ospf_neighbors(router, ip_version): + """List the OSPF neighbors for the given router and IP version.""" + + if ip_version == 4: + cmd = "show ip ospf neighbor json" + else: + cmd = "show ipv6 ospf neighbor json" + + output = router.vtysh_cmd(cmd) + + if ip_version == 4: + return [v for n in json.loads(output)["neighbors"].values() for v in n] + else: + return json.loads(output)["neighbors"] + + +def ospf_neighbor_uptime(router, interface, ip_version): + """Uptime of the neighbor with the given interface name in seconds.""" + + for neighbor in ospf_neighbors(router, ip_version): + if ip_version == 4: + if not neighbor["ifaceName"].startswith("{}:".format(interface)): + continue + + return neighbor["upTimeInMsec"] / 1000 + else: + if neighbor["interfaceName"] != interface: + continue + + h, m, s = [int(d) for d in neighbor["duration"].split(":")] + return h * 3600 + m * 60 + s + + raise KeyError( + "No IPv{} neighbor with interface name {} on {}".format( + ip_version, interface, router.name + ) + ) + + +def ospf_routes(router, prefix): + """List the OSPF routes for the given router and prefix.""" + + if ipaddress.ip_interface(prefix).ip.version == 4: + cmd = "show ip route {} json" + else: + cmd = "show ipv6 route {} json" + + output = router.vtysh_cmd(cmd.format(prefix)) + return json.loads(output)[prefix] + + +def ospf_nexthops(router, prefix, protocol): + """List the OSPF nexthops for the given prefix.""" + + for route in ospf_routes(router, prefix): + if route["protocol"] != protocol: + continue + + for nexthop in route["nexthops"]: + yield nexthop + + +def ospf_directly_connected_interfaces(router, ip_version): + """The names of the directly connected interfaces, as discovered + through the OSPF nexthops. + + """ + + if ip_version == 4: + prefix = "192.0.2.{}/32".format(router.name.strip("r")) + else: + prefix = "fe80::/64" + + hops = ospf_nexthops(router, prefix, protocol="connected") + return sorted([n["interfaceName"] for n in hops if n["directlyConnected"]]) + + +def wait_for_ospf(router, ip_version, neighbors, timeout=60): + """Wait until the router has the given number of neighbors that are + fully converged. + + Note that this checks for the exact number of neighbors, so if one neighbor + is requested and three are converged, the wait continues. + + """ + + until = time.monotonic() + timeout + + if ip_version == 4: + filter = {"converged": "Full"} + else: + filter = {"state": "Full"} + + def is_match(neighbor): + for k, v in filter.items(): + if neighbor[k] != v: + return False + + return True + + while time.monotonic() < until: + found = sum(1 for n in ospf_neighbors(router, ip_version) if is_match(n)) + + if neighbors == found: + return + + raise TimeoutError( + "Waited over {}s for {} neighbors to reach {}".format( + timeout, neighbors, filter + ) + ) + + +@pytest.mark.parametrize("ip_version", [4, 6]) +def test_interface_up(tgen, ip_version): + """Verify the initial routing table, before any changes.""" + + # Wait for the routers to be ready + routers = {id: tgen.gears[id] for id in ("r1", "r2", "r3", "r4")} + + for router in routers.values(): + wait_for_ospf(router, ip_version=ip_version, neighbors=3) + + # Verify that the link-local routes are correct + for router in routers.values(): + connected = ospf_directly_connected_interfaces(router, ip_version) + + if ip_version == 4: + expected = ["eth1", "eth2", "eth3", "lo"] + else: + expected = ["eth1", "eth2", "eth3"] + + assert ( + connected == expected + ), "Expected all interfaces to be connected on {}".format(router.name) + + +@pytest.mark.parametrize("ip_version", [4, 6]) +def test_interface_down(tgen, ip_version): + """Verify the routing table after taking interfaces down.""" + + # Wait for the routers to be ready + routers = {id: tgen.gears[id] for id in ("r1", "r2", "r3", "r4")} + + for id, router in routers.items(): + wait_for_ospf(router, ip_version=ip_version, neighbors=3) + + # Keep track of the uptime of the eth3 neighbor + uptime = ospf_neighbor_uptime(routers["r1"], "eth3", ip_version) + before = time.monotonic() + + # Take the links between r1 and r2 down + routers["r1"].cmd_raises("ip link set down dev eth1") + routers["r1"].cmd_raises("ip link set down dev eth2") + + # Wait for OSPF to converge + wait_for_ospf(routers["r1"], ip_version=ip_version, neighbors=1) + + # The uptime of the unaffected eth3 neighbor should be monotonic + new_uptime = ospf_neighbor_uptime(routers["r1"], "eth3", ip_version) + took = round(time.monotonic() - before, 3) + + # IPv6 has a resolution of 1s, for IPv4 some slack is necesssary. + if ip_version == 4: + offset = 0.25 + else: + offset = 1 + + assert ( + new_uptime + offset >= uptime + took + ), "The eth3 neighbor uptime must not decrease" + + # We should only find eth3 once OSPF has converged + connected = ospf_directly_connected_interfaces(routers["r1"], ip_version) + + if ip_version == 4: + expected = ["eth3", "lo"] + else: + expected = ["eth3"] + + assert connected == expected, "Expected only eth1 and eth2 to be disconnected" + + +@pytest.mark.parametrize("ip_version", [4, 6]) +def test_interface_flap(tgen, ip_version): + """Verify the routing table after enabling an interface that was down.""" + + # Wait for the routers to be ready + routers = {id: tgen.gears[id] for id in ("r1", "r2", "r3", "r4")} + + for id, router in routers.items(): + wait_for_ospf(router, ip_version=ip_version, neighbors=3) + + # Keep track of the uptime of the eth3 neighbor + uptime = ospf_neighbor_uptime(routers["r1"], "eth3", ip_version) + before = time.monotonic() + + # Take the links between r1 and r2 down + routers["r1"].cmd_raises("ip link set down dev eth1") + routers["r2"].cmd_raises("ip link set down dev eth2") + + # Wait for OSPF to converge + wait_for_ospf(routers["r1"], ip_version=ip_version, neighbors=1) + + # Take the links between r1 and r2 up + routers["r1"].cmd_raises("ip link set up dev eth1") + routers["r2"].cmd_raises("ip link set up dev eth2") + + # Wait for OSPF to converge + wait_for_ospf(routers["r1"], ip_version=ip_version, neighbors=3) + + # The uptime of the unaffected eth3 neighbor should be monotonic + new_uptime = ospf_neighbor_uptime(routers["r1"], "eth3", ip_version) + took = round(time.monotonic() - before, 3) + + # IPv6 has a resolution of 1s, for IPv4 some slack is necesssary. + if ip_version == 4: + offset = 0.25 + else: + offset = 1 + + assert ( + new_uptime + offset >= uptime + took + ), "The eth3 neighbor uptime must not decrease" + + # We should find all interfaces again + connected = ospf_directly_connected_interfaces(routers["r1"], ip_version) + + if ip_version == 4: + expected = ["eth1", "eth2", "eth3", "lo"] + else: + expected = ["eth1", "eth2", "eth3"] + + assert connected == expected, "Expected all interfaces to be connected" + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) |