summaryrefslogtreecommitdiffstats
path: root/tests/topotests/ospf_topo1/test_ospf_topo1.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/ospf_topo1/test_ospf_topo1.py')
-rw-r--r--tests/topotests/ospf_topo1/test_ospf_topo1.py598
1 files changed, 598 insertions, 0 deletions
diff --git a/tests/topotests/ospf_topo1/test_ospf_topo1.py b/tests/topotests/ospf_topo1/test_ospf_topo1.py
new file mode 100644
index 0000000..37facfc
--- /dev/null
+++ b/tests/topotests/ospf_topo1/test_ospf_topo1.py
@@ -0,0 +1,598 @@
+#!/usr/bin/env python
+
+#
+# test_ospf_topo1.py
+# Part of NetDEF Topology Tests
+#
+# Copyright (c) 2017 by
+# Network Device Education Foundation, Inc. ("NetDEF")
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+test_ospf_topo1.py: Test the FRR OSPF routing daemon.
+"""
+
+import os
+import re
+import sys
+from functools import partial
+from time import sleep
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib import topotest
+from lib.topogen import Topogen, TopoRouter, get_topogen
+from lib.topolog import logger
+
+# Required to instantiate the topology builder class.
+
+pytestmark = [pytest.mark.ospfd]
+
+
+def build_topo(tgen):
+ "Build function"
+
+ # Create 4 routers
+ for routern in range(1, 5):
+ tgen.add_router("r{}".format(routern))
+
+ # Create a empty network for router 1
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["r1"])
+
+ # Create a empty network for router 2
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["r2"])
+
+ # Interconect router 1, 2 and 3
+ switch = tgen.add_switch("s3")
+ switch.add_link(tgen.gears["r1"])
+ switch.add_link(tgen.gears["r2"])
+ switch.add_link(tgen.gears["r3"])
+
+ # Create empty netowrk for router3
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["r3"])
+
+ # Interconect router 3 and 4
+ switch = tgen.add_switch("s5")
+ switch.add_link(tgen.gears["r3"])
+ switch.add_link(tgen.gears["r4"])
+
+ # Create a empty network for router 4
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["r4"])
+
+
+def setup_module(mod):
+ "Sets up the pytest environment"
+ tgen = Topogen(build_topo, mod.__name__)
+ tgen.start_topology()
+
+ ospf6_config = "ospf6d.conf"
+
+ router_list = tgen.routers()
+ for rname, router in router_list.items():
+ router.load_config(
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
+ )
+ router.load_config(
+ TopoRouter.RD_OSPF, os.path.join(CWD, "{}/ospfd.conf".format(rname))
+ )
+ router.load_config(
+ TopoRouter.RD_OSPF6, os.path.join(CWD, "{}/{}".format(rname, ospf6_config))
+ )
+
+ # Initialize all routers.
+ tgen.start_router()
+
+
+def teardown_module(mod):
+ "Teardown the pytest environment"
+ tgen = get_topogen()
+ tgen.stop_topology()
+
+
+def test_wait_protocol_convergence():
+ "Wait for OSPFv2/OSPFv3 to converge"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("waiting for protocols to converge")
+
+ def expect_ospfv2_neighbor_full(router, neighbor):
+ "Wait until OSPFv2 convergence."
+ logger.info("waiting OSPFv2 router '{}'".format(router))
+
+ def run_command_and_expect():
+ """
+ Function that runs command and expect the following outcomes:
+ * Full/DR
+ * Full/DROther
+ * Full/Backup
+ """
+ result = tgen.gears[router].vtysh_cmd(
+ "show ip ospf neighbor json", isjson=True
+ )
+ if (
+ topotest.json_cmp(
+ result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
+ )
+ is None
+ ):
+ return None
+
+ if (
+ topotest.json_cmp(
+ result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
+ )
+ is None
+ ):
+ return None
+
+ return topotest.json_cmp(
+ result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
+ )
+
+ _, result = topotest.run_and_expect(
+ run_command_and_expect, None, count=130, wait=1
+ )
+ assertmsg = '"{}" convergence failure'.format(router)
+ assert result is None, assertmsg
+
+ def expect_ospfv3_neighbor_full(router, neighbor):
+ "Wait until OSPFv3 convergence."
+ logger.info("waiting OSPFv3 router '{}'".format(router))
+ test_func = partial(
+ topotest.router_json_cmp,
+ tgen.gears[router],
+ "show ipv6 ospf6 neighbor json",
+ {"neighbors": [{"neighborId": neighbor, "state": "Full"}]},
+ )
+ _, result = topotest.run_and_expect(test_func, None, count=130, wait=1)
+ assertmsg = '"{}" convergence failure'.format(router)
+ assert result is None, assertmsg
+
+ # Wait for OSPFv2 convergence
+ expect_ospfv2_neighbor_full("r1", "10.0.255.2")
+ expect_ospfv2_neighbor_full("r1", "10.0.255.3")
+ expect_ospfv2_neighbor_full("r2", "10.0.255.1")
+ expect_ospfv2_neighbor_full("r2", "10.0.255.3")
+ expect_ospfv2_neighbor_full("r3", "10.0.255.1")
+ expect_ospfv2_neighbor_full("r3", "10.0.255.2")
+ expect_ospfv2_neighbor_full("r3", "10.0.255.4")
+ expect_ospfv2_neighbor_full("r4", "10.0.255.3")
+
+ # Wait for OSPFv3 convergence
+ expect_ospfv3_neighbor_full("r1", "10.0.255.2")
+ expect_ospfv3_neighbor_full("r1", "10.0.255.3")
+ expect_ospfv3_neighbor_full("r2", "10.0.255.1")
+ expect_ospfv3_neighbor_full("r2", "10.0.255.3")
+ expect_ospfv3_neighbor_full("r3", "10.0.255.1")
+ expect_ospfv3_neighbor_full("r3", "10.0.255.2")
+ expect_ospfv3_neighbor_full("r3", "10.0.255.4")
+ expect_ospfv3_neighbor_full("r4", "10.0.255.3")
+
+
+def compare_show_ipv6_ospf6(rname, expected):
+ """
+ Calls 'show ipv6 ospf6 route' for router `rname` and compare the obtained
+ result with the expected output.
+ """
+ tgen = get_topogen()
+ current = tgen.gears[rname].vtysh_cmd("show ipv6 ospf6 route")
+
+ # Remove the link addresses
+ current = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", current)
+ expected = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", expected)
+
+ # Remove the time
+ current = re.sub(r"\d+:\d{2}:\d{2}", "", current)
+ expected = re.sub(r"\d+:\d{2}:\d{2}", "", expected)
+
+ return topotest.difflines(
+ topotest.normalize_text(current),
+ topotest.normalize_text(expected),
+ title1="Current output",
+ title2="Expected output",
+ )
+
+
+def test_ospf_convergence():
+ "Test OSPF daemon convergence"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ for router, rnode in tgen.routers().items():
+ logger.info('Waiting for router "%s" convergence', router)
+
+ # Load expected results from the command
+ reffile = os.path.join(CWD, "{}/ospfroute.txt".format(router))
+ expected = open(reffile).read()
+
+ # Run test function until we get an result. Wait at most 80 seconds.
+ test_func = partial(
+ topotest.router_output_cmp, rnode, "show ip ospf route", expected
+ )
+ result, diff = topotest.run_and_expect(test_func, "", count=160, wait=0.5)
+ assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
+
+
+def test_ospf_kernel_route():
+ "Test OSPF kernel route installation"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ rlist = tgen.routers().values()
+ for router in rlist:
+ logger.info('Checking OSPF IPv4 kernel routes in "%s"', router.name)
+
+ routes = topotest.ip4_route(router)
+ expected = {
+ "10.0.1.0/24": {},
+ "10.0.2.0/24": {},
+ "10.0.3.0/24": {},
+ "10.0.10.0/24": {},
+ "172.16.0.0/24": {},
+ "172.16.1.0/24": {},
+ }
+ assertmsg = 'OSPF IPv4 route mismatch in router "{}"'.format(router.name)
+ assert topotest.json_cmp(routes, expected) is None, assertmsg
+
+
+def test_ospf6_convergence():
+ "Test OSPF6 daemon convergence"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ ospf6route_file = "{}/ospf6route_ecmp.txt"
+ for rnum in range(1, 5):
+ router = "r{}".format(rnum)
+
+ logger.info('Waiting for router "%s" IPv6 OSPF convergence', router)
+
+ # Load expected results from the command
+ reffile = os.path.join(CWD, ospf6route_file.format(router))
+ expected = open(reffile).read()
+
+ # Run test function until we get an result. Wait at most 60 seconds.
+ test_func = partial(compare_show_ipv6_ospf6, router, expected)
+ result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
+ if (not result) and (rnum == 1):
+ # Didn't match the new ECMP version - try the old pre-ECMP format
+ ospf6route_file = "{}/ospf6route.txt"
+
+ # Load expected results from the command
+ reffile = os.path.join(CWD, ospf6route_file.format(router))
+ expected = open(reffile).read()
+
+ test_func = partial(compare_show_ipv6_ospf6, router, expected)
+ result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
+ if not result:
+ # Didn't match the old version - switch back to new ECMP version
+ # and fail
+ ospf6route_file = "{}/ospf6route_ecmp.txt"
+
+ # Load expected results from the command
+ reffile = os.path.join(CWD, ospf6route_file.format(router))
+ expected = open(reffile).read()
+
+ test_func = partial(compare_show_ipv6_ospf6, router, expected)
+ result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
+
+ assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
+
+
+def test_ospf6_kernel_route():
+ "Test OSPF kernel route installation"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ rlist = tgen.routers().values()
+ for router in rlist:
+ logger.info('Checking OSPF IPv6 kernel routes in "%s"', router.name)
+
+ def _routes_in_fib6():
+ routes = topotest.ip6_route(router)
+ expected = {
+ "2001:db8:1::/64": {},
+ "2001:db8:2::/64": {},
+ "2001:db8:3::/64": {},
+ "2001:db8:100::/64": {},
+ "2001:db8:200::/64": {},
+ "2001:db8:300::/64": {},
+ }
+ logger.info("Routes:")
+ logger.info(routes)
+ logger.info(topotest.json_cmp(routes, expected))
+ logger.info("ENd:")
+ return topotest.json_cmp(routes, expected)
+
+ _, result = topotest.run_and_expect(_routes_in_fib6, None, count=20, wait=1)
+
+ assertmsg = 'OSPF IPv6 route mismatch in router "{}"'.format(router.name)
+ assert result is None, assertmsg
+
+
+def test_ospf_json():
+ "Test 'show ip ospf json' output for coherency."
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ for rnum in range(1, 5):
+ router = tgen.gears["r{}".format(rnum)]
+ logger.info(router.vtysh_cmd("show ip ospf database"))
+ logger.info('Comparing router "%s" "show ip ospf json" output', router.name)
+ expected = {
+ "routerId": "10.0.255.{}".format(rnum),
+ "tosRoutesOnly": True,
+ "rfc2328Conform": True,
+ "spfScheduleDelayMsecs": 0,
+ "holdtimeMinMsecs": 50,
+ "holdtimeMaxMsecs": 5000,
+ "lsaMinIntervalMsecs": 5000,
+ "lsaMinArrivalMsecs": 1000,
+ "writeMultiplier": 20,
+ "refreshTimerMsecs": 10000,
+ "asbrRouter": "injectingExternalRoutingInformation",
+ "attachedAreaCounter": 1,
+ "areas": {},
+ }
+ # Area specific additional checks
+ if router.name == "r1" or router.name == "r2" or router.name == "r3":
+ expected["areas"]["0.0.0.0"] = {
+ "areaIfActiveCounter": 2,
+ "areaIfTotalCounter": 2,
+ "authentication": "authenticationNone",
+ "backbone": True,
+ "lsaAsbrNumber": 1,
+ "lsaNetworkNumber": 1,
+ "lsaNssaNumber": 0,
+ "lsaNumber": 7,
+ "lsaOpaqueAreaNumber": 0,
+ "lsaOpaqueLinkNumber": 0,
+ "lsaRouterNumber": 3,
+ "lsaSummaryNumber": 2,
+ "nbrFullAdjacentCounter": 2,
+ }
+ if router.name == "r3" or router.name == "r4":
+ expected["areas"]["0.0.0.1"] = {
+ "areaIfActiveCounter": 1,
+ "areaIfTotalCounter": 1,
+ "authentication": "authenticationNone",
+ "lsaAsbrNumber": 2,
+ "lsaNetworkNumber": 1,
+ "lsaNssaNumber": 0,
+ "lsaNumber": 9,
+ "lsaOpaqueAreaNumber": 0,
+ "lsaOpaqueLinkNumber": 0,
+ "lsaRouterNumber": 2,
+ "lsaSummaryNumber": 4,
+ "nbrFullAdjacentCounter": 1,
+ }
+ # r4 has more interfaces for area 0.0.0.1
+ if router.name == "r4":
+ expected["areas"]["0.0.0.1"].update(
+ {
+ "areaIfActiveCounter": 2,
+ "areaIfTotalCounter": 2,
+ }
+ )
+
+ # router 3 has an additional area
+ if router.name == "r3":
+ expected["attachedAreaCounter"] = 2
+
+ output = router.vtysh_cmd("show ip ospf json", isjson=True)
+ result = topotest.json_cmp(output, expected)
+ assert result is None, '"{}" JSON output mismatches the expected result'.format(
+ router.name
+ )
+
+
+def test_ospf_link_down():
+ "Test OSPF convergence after a link goes down"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ # Simulate a network down event on router3 switch3 interface.
+ router3 = tgen.gears["r3"]
+ router3.peer_link_enable("r3-eth0", False)
+
+ # Expect convergence on all routers
+ for router, rnode in tgen.routers().items():
+ logger.info('Waiting for router "%s" convergence after link failure', router)
+ # Load expected results from the command
+ reffile = os.path.join(CWD, "{}/ospfroute_down.txt".format(router))
+ expected = open(reffile).read()
+
+ # Run test function until we get an result. Wait at most 80 seconds.
+ test_func = partial(
+ topotest.router_output_cmp, rnode, "show ip ospf route", expected
+ )
+ result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5)
+ assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
+
+
+def test_ospf_link_down_kernel_route():
+ "Test OSPF kernel route installation"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ rlist = tgen.routers().values()
+ for router in rlist:
+ logger.info(
+ 'Checking OSPF IPv4 kernel routes in "%s" after link down', router.name
+ )
+
+ routes = topotest.ip4_route(router)
+ expected = {
+ "10.0.1.0/24": {},
+ "10.0.2.0/24": {},
+ "10.0.3.0/24": {},
+ "10.0.10.0/24": {},
+ "172.16.0.0/24": {},
+ "172.16.1.0/24": {},
+ }
+ if router.name == "r1" or router.name == "r2":
+ expected.update(
+ {
+ "10.0.10.0/24": None,
+ "172.16.0.0/24": None,
+ "172.16.1.0/24": None,
+ }
+ )
+ elif router.name == "r3" or router.name == "r4":
+ expected.update(
+ {
+ "10.0.1.0/24": None,
+ "10.0.2.0/24": None,
+ }
+ )
+ # Route '10.0.3.0' is no longer available for r4 since it is down.
+ if router.name == "r4":
+ expected.update(
+ {
+ "10.0.3.0/24": None,
+ }
+ )
+ assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format(
+ router.name
+ )
+ count = 0
+ not_found = True
+ while not_found and count < 10:
+ not_found = topotest.json_cmp(routes, expected)
+ if not_found:
+ sleep(1)
+ routes = topotest.ip4_route(router)
+ count += 1
+ else:
+ not_found = False
+ break
+ assert not_found is False, assertmsg
+
+
+def test_ospf6_link_down():
+ "Test OSPF6 daemon convergence after link goes down"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ for rnum in range(1, 5):
+ router = "r{}".format(rnum)
+
+ logger.info(
+ 'Waiting for router "%s" IPv6 OSPF convergence after link down', router
+ )
+
+ # Load expected results from the command
+ reffile = os.path.join(CWD, "{}/ospf6route_down.txt".format(router))
+ expected = open(reffile).read()
+
+ # Run test function until we get an result. Wait at most 60 seconds.
+ test_func = partial(compare_show_ipv6_ospf6, router, expected)
+ result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
+ assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
+
+
+def test_ospf6_link_down_kernel_route():
+ "Test OSPF kernel route installation"
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip("skipped because of router(s) failure")
+
+ rlist = tgen.routers().values()
+ for router in rlist:
+ logger.info(
+ 'Checking OSPF IPv6 kernel routes in "%s" after link down', router.name
+ )
+
+ routes = topotest.ip6_route(router)
+ expected = {
+ "2001:db8:1::/64": {},
+ "2001:db8:2::/64": {},
+ "2001:db8:3::/64": {},
+ "2001:db8:100::/64": {},
+ "2001:db8:200::/64": {},
+ "2001:db8:300::/64": {},
+ }
+ if router.name == "r1" or router.name == "r2":
+ expected.update(
+ {
+ "2001:db8:100::/64": None,
+ "2001:db8:200::/64": None,
+ "2001:db8:300::/64": None,
+ }
+ )
+ elif router.name == "r3" or router.name == "r4":
+ expected.update(
+ {
+ "2001:db8:1::/64": None,
+ "2001:db8:2::/64": None,
+ }
+ )
+ # Route '2001:db8:3::/64' is no longer available for r4 since it is down.
+ if router.name == "r4":
+ expected.update(
+ {
+ "2001:db8:3::/64": None,
+ }
+ )
+ assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format(
+ router.name
+ )
+ count = 0
+ not_found = True
+ while not_found and count < 10:
+ not_found = topotest.json_cmp(routes, expected)
+ if not_found:
+ sleep(1)
+ routes = topotest.ip6_route(router)
+ count += 1
+ else:
+ not_found = False
+ break
+
+ assert not_found is False, assertmsg
+
+
+def test_memory_leak():
+ "Run the memory leak test and report results."
+ tgen = get_topogen()
+ if not tgen.is_memleak_enabled():
+ pytest.skip("Memory leak test/report is disabled")
+
+ tgen.report_memory_leaks()
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))