summaryrefslogtreecommitdiffstats
path: root/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py')
-rw-r--r--tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py626
1 files changed, 626 insertions, 0 deletions
diff --git a/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py b/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py
new file mode 100644
index 0000000..48584f0
--- /dev/null
+++ b/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py
@@ -0,0 +1,626 @@
+#!/usr/bin/env python
+
+#
+# test_ldp_isis_topo1.py
+# Part of NetDEF Topology Tests
+#
+# Copyright (c) 2020 by Volta Networks
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+test_ldp_vpls_topo1.py:
+
+ +---------+ +---------+
+ | | | |
+ | CE1 | | CE2 |
+ | | | |
+ +---------+ +---------+
+ce1-eth0 (172.16.1.1/24)| |ce2-eth0 (172.16.1.2/24)
+ | |
+ | |
+ rt1-eth0| |rt2-eth0
+ +---------+ 10.0.1.0/24 +---------+
+ | |rt1-eth1 | |
+ | RT1 +----------------+ RT2 |
+ | 1.1.1.1 | rt2-eth1| 2.2.2.2 |
+ | | | |
+ +---------+ +---------+
+ rt1-eth2| |rt2-eth2
+ | |
+ | |
+ 10.0.2.0/24| +---------+ |10.0.3.0/24
+ | | | |
+ | | RT3 | |
+ +--------+ 3.3.3.3 +-------+
+ rt3-eth2| |rt3-eth1
+ +---------+
+ |rt3-eth0
+ |
+ |
+ ce3-eth0 (172.16.1.3/24)|
+ +---------+
+ | |
+ | CE3 |
+ | |
+ +---------+
+"""
+
+import os
+import re
+import sys
+import pytest
+import json
+from functools import partial
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib import topotest
+from lib.topogen import Topogen, TopoRouter, get_topogen
+from lib.topolog import logger
+
+# Required to instantiate the topology builder class.
+
+pytestmark = [pytest.mark.isisd, pytest.mark.ldpd]
+
+
+def build_topo(tgen):
+ "Build function"
+
+ #
+ # Define FRR Routers
+ #
+ for router in ["ce1", "ce2", "ce3", "r1", "r2", "r3"]:
+ tgen.add_router(router)
+
+ #
+ # Define connections
+ #
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["ce1"])
+ switch.add_link(tgen.gears["r1"])
+
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["ce2"])
+ switch.add_link(tgen.gears["r2"])
+
+ switch = tgen.add_switch("s3")
+ switch.add_link(tgen.gears["ce3"])
+ switch.add_link(tgen.gears["r3"])
+
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["r1"])
+ switch.add_link(tgen.gears["r2"])
+
+ switch = tgen.add_switch("s5")
+ switch.add_link(tgen.gears["r1"])
+ switch.add_link(tgen.gears["r3"])
+
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["r2"])
+ switch.add_link(tgen.gears["r3"])
+
+
+def setup_module(mod):
+ "Sets up the pytest environment"
+ tgen = Topogen(build_topo, mod.__name__)
+ tgen.start_topology()
+
+ router_list = tgen.routers()
+
+ # For all registered routers, load the zebra configuration file
+ for rname, router in router_list.items():
+ router.load_config(
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
+ )
+ # Don't start isisd and ldpd in the CE nodes
+ if router.name[0] == "r":
+ router.load_config(
+ TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
+ )
+ router.load_config(
+ TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname))
+ )
+
+ tgen.start_router()
+
+
+def teardown_module(mod):
+ "Teardown the pytest environment"
+ tgen = get_topogen()
+
+ # This function tears down the whole topology.
+ tgen.stop_topology()
+
+
+def router_compare_json_output(rname, command, reference):
+ "Compare router JSON output"
+
+ logger.info('Comparing router "%s" "%s" output', rname, command)
+
+ tgen = get_topogen()
+ filename = "{}/{}/{}".format(CWD, rname, reference)
+ expected = json.loads(open(filename).read())
+
+ # Run test function until we get an result.
+ test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected)
+ _, diff = topotest.run_and_expect(test_func, None, count=320, wait=0.5)
+ assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
+ assert diff is None, assertmsg
+
+
+def test_isis_convergence():
+ logger.info("Test: check ISIS adjacencies")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname,
+ "show yang operational-data /frr-interface:lib isisd",
+ "show_yang_interface_isis_adjacencies.ref",
+ )
+
+
+def test_rib():
+ logger.info("Test: verify RIB")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(rname, "show ip route json", "show_ip_route.ref")
+
+
+def test_ldp_adjacencies():
+ logger.info("Test: verify LDP adjacencies")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show mpls ldp discovery json", "show_ldp_discovery.ref"
+ )
+
+
+def test_ldp_neighbors():
+ logger.info("Test: verify LDP neighbors")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show mpls ldp neighbor json", "show_ldp_neighbor.ref"
+ )
+
+
+def test_ldp_bindings():
+ logger.info("Test: verify LDP bindings")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show mpls ldp binding json", "show_ldp_binding.ref"
+ )
+
+
+def test_ldp_pwid_bindings():
+ logger.info("Test: verify LDP PW-ID bindings")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show l2vpn atom binding json", "show_l2vpn_binding.ref"
+ )
+
+
+def test_ldp_pseudowires():
+ logger.info("Test: verify LDP pseudowires")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show l2vpn atom vc json", "show_l2vpn_vc.ref"
+ )
+
+
+def test_ldp_igp_sync():
+ logger.info("Test: verify LDP igp-sync")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show mpls ldp igp-sync json", "show_ldp_igp_sync.ref"
+ )
+
+
+def test_isis_ldp_sync():
+ logger.info("Test: verify ISIS igp-sync")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_ldp_sync(rname, "show_isis_ldp_sync.ref")
+ assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_interface_detail(
+ rname, "show_isis_interface_detail.ref"
+ )
+ assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff)
+
+
+def test_r1_eth1_shutdown():
+ logger.info("Test: verify behaviour after r1-eth1 is shutdown")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Shut down r1-r2 link */
+ tgen = get_topogen()
+ tgen.gears["r1"].peer_link_enable("r1-eth1", False)
+ topotest.sleep(5, "Waiting for the network to reconverge")
+
+ # check if the pseudowire is still up (using an alternate path for nexthop resolution)
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show l2vpn atom vc json", "show_l2vpn_vc.ref"
+ )
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname,
+ "show mpls ldp igp-sync json",
+ "show_ldp_igp_sync_r1_eth1_shutdown.ref",
+ )
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_ldp_sync(
+ rname, "show_isis_ldp_sync_r1_eth1_shutdown.ref"
+ )
+ assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_interface_detail(
+ rname, "show_isis_interface_detail_r1_eth1_shutdown.ref"
+ )
+ assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff)
+
+
+def test_r1_eth1_no_shutdown():
+ logger.info("Test: verify behaviour after r1-eth1 is no shutdown")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Run no shutdown on r1-eth1 interface */
+ tgen = get_topogen()
+ tgen.gears["r1"].peer_link_enable("r1-eth1", True)
+ topotest.sleep(5, "Waiting for the network to reconverge")
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show mpls ldp igp-sync json", "show_ldp_igp_sync.ref"
+ )
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_ldp_sync(rname, "show_isis_ldp_sync.ref")
+ assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_interface_detail(
+ rname, "show_isis_interface_detail.ref"
+ )
+ assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff)
+
+
+def test_r2_eth1_shutdown():
+ logger.info("Test: verify behaviour after r2-eth1 is shutdown")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Shut down r1-r2 link */
+ tgen = get_topogen()
+ tgen.gears["r2"].peer_link_enable("r2-eth1", False)
+ topotest.sleep(5, "Waiting for the network to reconverge")
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname,
+ "show mpls ldp igp-sync json",
+ "show_ldp_igp_sync_r1_eth1_shutdown.ref",
+ )
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_ldp_sync(
+ rname, "show_isis_ldp_sync_r2_eth1_shutdown.ref"
+ )
+ assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_interface_detail(
+ rname, "show_isis_interface_detail_r2_eth1_shutdown.ref"
+ )
+ assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff)
+
+
+def test_r2_eth1_no_shutdown():
+ logger.info("Test: verify behaviour after r2-eth1 is no shutdown")
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Run no shutdown on r2-eth1 interface */
+ tgen = get_topogen()
+ tgen.gears["r2"].peer_link_enable("r2-eth1", True)
+ topotest.sleep(5, "Waiting for the network to reconverge")
+
+ for rname in ["r1", "r2", "r3"]:
+ router_compare_json_output(
+ rname, "show mpls ldp igp-sync json", "show_ldp_igp_sync.ref"
+ )
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_ldp_sync(rname, "show_isis_ldp_sync.ref")
+ assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
+
+ for rname in ["r1", "r2", "r3"]:
+ (result, diff) = validate_show_isis_interface_detail(
+ rname, "show_isis_interface_detail.ref"
+ )
+ assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff)
+
+
+# Memory leak test template
+def test_memory_leak():
+ "Run the memory leak test and report results."
+ tgen = get_topogen()
+ if not tgen.is_memleak_enabled():
+ pytest.skip("Memory leak test/report is disabled")
+
+ tgen.report_memory_leaks()
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
+
+
+#
+# Auxiliary functions
+#
+
+
+def parse_show_isis_ldp_sync(lines, rname):
+ """
+ Parse the output of 'show isis mpls ldp sync' into a Python dict.
+ """
+ interfaces = {}
+
+ it = iter(lines)
+
+ while True:
+ try:
+ interface = {}
+ interface_name = None
+
+ line = next(it)
+
+ if line.startswith(rname + "-eth"):
+ interface_name = line
+
+ line = next(it)
+
+ if line.startswith(" LDP-IGP Synchronization enabled: "):
+ interface["ldpIgpSyncEnabled"] = line.endswith("yes")
+ line = next(it)
+
+ if line.startswith(" holddown timer in seconds: "):
+ interface["holdDownTimeInSec"] = int(line.split(": ")[-1])
+ line = next(it)
+
+ if line.startswith(" State: "):
+ interface["ldpIgpSyncState"] = line.split(": ")[-1]
+
+ elif line.startswith(" Interface "):
+ interface["Interface"] = line.endswith("down")
+
+ interfaces[interface_name] = interface
+
+ except StopIteration:
+ break
+
+ return interfaces
+
+
+def show_isis_ldp_sync(router, rname):
+ """
+ Get the show isis mpls ldp-sync info in a dictionary format.
+
+ """
+ out = topotest.normalize_text(
+ router.vtysh_cmd("show isis mpls ldp-sync")
+ ).splitlines()
+
+ parsed = parse_show_isis_ldp_sync(out, rname)
+
+ return parsed
+
+
+def validate_show_isis_ldp_sync(rname, fname):
+ tgen = get_topogen()
+
+ filename = "{0}/{1}/{2}".format(CWD, rname, fname)
+ expected = json.loads(open(filename).read())
+
+ router = tgen.gears[rname]
+
+ def compare_isis_ldp_sync(router, expected):
+ "Helper function to test show isis mpls ldp-sync"
+ actual = show_isis_ldp_sync(router, rname)
+ return topotest.json_cmp(actual, expected)
+
+ test_func = partial(compare_isis_ldp_sync, router, expected)
+ (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=160)
+
+ return (result, diff)
+
+
+def parse_show_isis_interface_detail(lines, rname):
+ """
+ Parse the output of 'show isis interface detail' into a Python dict.
+ """
+ areas = {}
+ area_id = None
+
+ it = iter(lines)
+
+ while True:
+ try:
+ line = next(it)
+
+ area_match = re.match(r"Area (.+):", line)
+ if not area_match:
+ continue
+
+ area_id = area_match.group(1)
+ area = {}
+
+ line = next(it)
+
+ while line.startswith(" Interface: "):
+ interface_name = re.split(":|,", line)[1].lstrip()
+
+ area[interface_name] = []
+
+ # Look for keyword: Level-1 or Level-2
+ while not line.startswith(" Level-"):
+ line = next(it)
+
+ while line.startswith(" Level-"):
+
+ level = {}
+
+ level_name = line.split()[0]
+ level["level"] = level_name
+
+ line = next(it)
+
+ if line.startswith(" Metric:"):
+ level["metric"] = re.split(":|,", line)[1].lstrip()
+
+ area[interface_name].append(level)
+
+ # Look for keyword: Level-1 or Level-2 or Interface:
+ while not line.startswith(" Level-") and not line.startswith(
+ " Interface: "
+ ):
+ line = next(it)
+
+ if line.startswith(" Level-"):
+ continue
+
+ if line.startswith(" Interface: "):
+ break
+
+ areas[area_id] = area
+
+ except StopIteration:
+
+ areas[area_id] = area
+ break
+
+ return areas
+
+
+def show_isis_interface_detail(router, rname):
+ """
+ Get the show isis mpls ldp-sync info in a dictionary format.
+
+ """
+ out = topotest.normalize_text(
+ router.vtysh_cmd("show isis interface detail")
+ ).splitlines()
+
+ logger.warning(out)
+
+ parsed = parse_show_isis_interface_detail(out, rname)
+
+ logger.warning(parsed)
+
+ return parsed
+
+
+def validate_show_isis_interface_detail(rname, fname):
+ tgen = get_topogen()
+
+ filename = "{0}/{1}/{2}".format(CWD, rname, fname)
+ expected = json.loads(open(filename).read())
+
+ router = tgen.gears[rname]
+
+ def compare_isis_interface_detail(router, expected):
+ "Helper function to test show isis interface detail"
+ actual = show_isis_interface_detail(router, rname)
+ return topotest.json_cmp(actual, expected)
+
+ test_func = partial(compare_isis_interface_detail, router, expected)
+ (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=160)
+
+ return (result, diff)