From 2c7cac91ed6e7db0f6937923d2b57f97dbdbc337 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 11:53:30 +0200 Subject: Adding upstream version 8.4.4. Signed-off-by: Daniel Baumann --- .../test_ldp_sync_isis_topo1.py | 626 +++++++++++++++++++++ 1 file changed, 626 insertions(+) create mode 100644 tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py (limited to 'tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py') diff --git a/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py b/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py new file mode 100644 index 0000000..48584f0 --- /dev/null +++ b/tests/topotests/ldp_sync_isis_topo1/test_ldp_sync_isis_topo1.py @@ -0,0 +1,626 @@ +#!/usr/bin/env python + +# +# test_ldp_isis_topo1.py +# Part of NetDEF Topology Tests +# +# Copyright (c) 2020 by Volta Networks +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +test_ldp_vpls_topo1.py: + + +---------+ +---------+ + | | | | + | CE1 | | CE2 | + | | | | + +---------+ +---------+ +ce1-eth0 (172.16.1.1/24)| |ce2-eth0 (172.16.1.2/24) + | | + | | + rt1-eth0| |rt2-eth0 + +---------+ 10.0.1.0/24 +---------+ + | |rt1-eth1 | | + | RT1 +----------------+ RT2 | + | 1.1.1.1 | rt2-eth1| 2.2.2.2 | + | | | | + +---------+ +---------+ + rt1-eth2| |rt2-eth2 + | | + | | + 10.0.2.0/24| +---------+ |10.0.3.0/24 + | | | | + | | RT3 | | + +--------+ 3.3.3.3 +-------+ + rt3-eth2| |rt3-eth1 + +---------+ + |rt3-eth0 + | + | + ce3-eth0 (172.16.1.3/24)| + +---------+ + | | + | CE3 | + | | + +---------+ +""" + +import os +import re +import sys +import pytest +import json +from functools import partial + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib import topotest +from lib.topogen import Topogen, TopoRouter, get_topogen +from lib.topolog import logger + +# Required to instantiate the topology builder class. + +pytestmark = [pytest.mark.isisd, pytest.mark.ldpd] + + +def build_topo(tgen): + "Build function" + + # + # Define FRR Routers + # + for router in ["ce1", "ce2", "ce3", "r1", "r2", "r3"]: + tgen.add_router(router) + + # + # Define connections + # + switch = tgen.add_switch("s1") + switch.add_link(tgen.gears["ce1"]) + switch.add_link(tgen.gears["r1"]) + + switch = tgen.add_switch("s2") + switch.add_link(tgen.gears["ce2"]) + switch.add_link(tgen.gears["r2"]) + + switch = tgen.add_switch("s3") + switch.add_link(tgen.gears["ce3"]) + switch.add_link(tgen.gears["r3"]) + + switch = tgen.add_switch("s4") + switch.add_link(tgen.gears["r1"]) + switch.add_link(tgen.gears["r2"]) + + switch = tgen.add_switch("s5") + switch.add_link(tgen.gears["r1"]) + switch.add_link(tgen.gears["r3"]) + + switch = tgen.add_switch("s6") + switch.add_link(tgen.gears["r2"]) + switch.add_link(tgen.gears["r3"]) + + +def setup_module(mod): + "Sets up the pytest environment" + tgen = Topogen(build_topo, mod.__name__) + tgen.start_topology() + + router_list = tgen.routers() + + # For all registered routers, load the zebra configuration file + for rname, router in router_list.items(): + router.load_config( + TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) + ) + # Don't start isisd and ldpd in the CE nodes + if router.name[0] == "r": + router.load_config( + TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)) + ) + router.load_config( + TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)) + ) + + tgen.start_router() + + +def teardown_module(mod): + "Teardown the pytest environment" + tgen = get_topogen() + + # This function tears down the whole topology. + tgen.stop_topology() + + +def router_compare_json_output(rname, command, reference): + "Compare router JSON output" + + logger.info('Comparing router "%s" "%s" output', rname, command) + + tgen = get_topogen() + filename = "{}/{}/{}".format(CWD, rname, reference) + expected = json.loads(open(filename).read()) + + # Run test function until we get an result. + test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected) + _, diff = topotest.run_and_expect(test_func, None, count=320, wait=0.5) + assertmsg = '"{}" JSON output mismatches the expected result'.format(rname) + assert diff is None, assertmsg + + +def test_isis_convergence(): + logger.info("Test: check ISIS adjacencies") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, + "show yang operational-data /frr-interface:lib isisd", + "show_yang_interface_isis_adjacencies.ref", + ) + + +def test_rib(): + logger.info("Test: verify RIB") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output(rname, "show ip route json", "show_ip_route.ref") + + +def test_ldp_adjacencies(): + logger.info("Test: verify LDP adjacencies") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show mpls ldp discovery json", "show_ldp_discovery.ref" + ) + + +def test_ldp_neighbors(): + logger.info("Test: verify LDP neighbors") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show mpls ldp neighbor json", "show_ldp_neighbor.ref" + ) + + +def test_ldp_bindings(): + logger.info("Test: verify LDP bindings") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show mpls ldp binding json", "show_ldp_binding.ref" + ) + + +def test_ldp_pwid_bindings(): + logger.info("Test: verify LDP PW-ID bindings") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show l2vpn atom binding json", "show_l2vpn_binding.ref" + ) + + +def test_ldp_pseudowires(): + logger.info("Test: verify LDP pseudowires") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show l2vpn atom vc json", "show_l2vpn_vc.ref" + ) + + +def test_ldp_igp_sync(): + logger.info("Test: verify LDP igp-sync") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show mpls ldp igp-sync json", "show_ldp_igp_sync.ref" + ) + + +def test_isis_ldp_sync(): + logger.info("Test: verify ISIS igp-sync") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_ldp_sync(rname, "show_isis_ldp_sync.ref") + assert result, "ISIS did not converge on {}:\n{}".format(rname, diff) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_interface_detail( + rname, "show_isis_interface_detail.ref" + ) + assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff) + + +def test_r1_eth1_shutdown(): + logger.info("Test: verify behaviour after r1-eth1 is shutdown") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Shut down r1-r2 link */ + tgen = get_topogen() + tgen.gears["r1"].peer_link_enable("r1-eth1", False) + topotest.sleep(5, "Waiting for the network to reconverge") + + # check if the pseudowire is still up (using an alternate path for nexthop resolution) + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show l2vpn atom vc json", "show_l2vpn_vc.ref" + ) + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, + "show mpls ldp igp-sync json", + "show_ldp_igp_sync_r1_eth1_shutdown.ref", + ) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_ldp_sync( + rname, "show_isis_ldp_sync_r1_eth1_shutdown.ref" + ) + assert result, "ISIS did not converge on {}:\n{}".format(rname, diff) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_interface_detail( + rname, "show_isis_interface_detail_r1_eth1_shutdown.ref" + ) + assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff) + + +def test_r1_eth1_no_shutdown(): + logger.info("Test: verify behaviour after r1-eth1 is no shutdown") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Run no shutdown on r1-eth1 interface */ + tgen = get_topogen() + tgen.gears["r1"].peer_link_enable("r1-eth1", True) + topotest.sleep(5, "Waiting for the network to reconverge") + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show mpls ldp igp-sync json", "show_ldp_igp_sync.ref" + ) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_ldp_sync(rname, "show_isis_ldp_sync.ref") + assert result, "ISIS did not converge on {}:\n{}".format(rname, diff) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_interface_detail( + rname, "show_isis_interface_detail.ref" + ) + assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff) + + +def test_r2_eth1_shutdown(): + logger.info("Test: verify behaviour after r2-eth1 is shutdown") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Shut down r1-r2 link */ + tgen = get_topogen() + tgen.gears["r2"].peer_link_enable("r2-eth1", False) + topotest.sleep(5, "Waiting for the network to reconverge") + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, + "show mpls ldp igp-sync json", + "show_ldp_igp_sync_r1_eth1_shutdown.ref", + ) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_ldp_sync( + rname, "show_isis_ldp_sync_r2_eth1_shutdown.ref" + ) + assert result, "ISIS did not converge on {}:\n{}".format(rname, diff) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_interface_detail( + rname, "show_isis_interface_detail_r2_eth1_shutdown.ref" + ) + assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff) + + +def test_r2_eth1_no_shutdown(): + logger.info("Test: verify behaviour after r2-eth1 is no shutdown") + tgen = get_topogen() + + # Skip if previous fatal error condition is raised + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Run no shutdown on r2-eth1 interface */ + tgen = get_topogen() + tgen.gears["r2"].peer_link_enable("r2-eth1", True) + topotest.sleep(5, "Waiting for the network to reconverge") + + for rname in ["r1", "r2", "r3"]: + router_compare_json_output( + rname, "show mpls ldp igp-sync json", "show_ldp_igp_sync.ref" + ) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_ldp_sync(rname, "show_isis_ldp_sync.ref") + assert result, "ISIS did not converge on {}:\n{}".format(rname, diff) + + for rname in ["r1", "r2", "r3"]: + (result, diff) = validate_show_isis_interface_detail( + rname, "show_isis_interface_detail.ref" + ) + assert result, "ISIS interface did not converge on {}:\n{}".format(rname, diff) + + +# Memory leak test template +def test_memory_leak(): + "Run the memory leak test and report results." + tgen = get_topogen() + if not tgen.is_memleak_enabled(): + pytest.skip("Memory leak test/report is disabled") + + tgen.report_memory_leaks() + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) + + +# +# Auxiliary functions +# + + +def parse_show_isis_ldp_sync(lines, rname): + """ + Parse the output of 'show isis mpls ldp sync' into a Python dict. + """ + interfaces = {} + + it = iter(lines) + + while True: + try: + interface = {} + interface_name = None + + line = next(it) + + if line.startswith(rname + "-eth"): + interface_name = line + + line = next(it) + + if line.startswith(" LDP-IGP Synchronization enabled: "): + interface["ldpIgpSyncEnabled"] = line.endswith("yes") + line = next(it) + + if line.startswith(" holddown timer in seconds: "): + interface["holdDownTimeInSec"] = int(line.split(": ")[-1]) + line = next(it) + + if line.startswith(" State: "): + interface["ldpIgpSyncState"] = line.split(": ")[-1] + + elif line.startswith(" Interface "): + interface["Interface"] = line.endswith("down") + + interfaces[interface_name] = interface + + except StopIteration: + break + + return interfaces + + +def show_isis_ldp_sync(router, rname): + """ + Get the show isis mpls ldp-sync info in a dictionary format. + + """ + out = topotest.normalize_text( + router.vtysh_cmd("show isis mpls ldp-sync") + ).splitlines() + + parsed = parse_show_isis_ldp_sync(out, rname) + + return parsed + + +def validate_show_isis_ldp_sync(rname, fname): + tgen = get_topogen() + + filename = "{0}/{1}/{2}".format(CWD, rname, fname) + expected = json.loads(open(filename).read()) + + router = tgen.gears[rname] + + def compare_isis_ldp_sync(router, expected): + "Helper function to test show isis mpls ldp-sync" + actual = show_isis_ldp_sync(router, rname) + return topotest.json_cmp(actual, expected) + + test_func = partial(compare_isis_ldp_sync, router, expected) + (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=160) + + return (result, diff) + + +def parse_show_isis_interface_detail(lines, rname): + """ + Parse the output of 'show isis interface detail' into a Python dict. + """ + areas = {} + area_id = None + + it = iter(lines) + + while True: + try: + line = next(it) + + area_match = re.match(r"Area (.+):", line) + if not area_match: + continue + + area_id = area_match.group(1) + area = {} + + line = next(it) + + while line.startswith(" Interface: "): + interface_name = re.split(":|,", line)[1].lstrip() + + area[interface_name] = [] + + # Look for keyword: Level-1 or Level-2 + while not line.startswith(" Level-"): + line = next(it) + + while line.startswith(" Level-"): + + level = {} + + level_name = line.split()[0] + level["level"] = level_name + + line = next(it) + + if line.startswith(" Metric:"): + level["metric"] = re.split(":|,", line)[1].lstrip() + + area[interface_name].append(level) + + # Look for keyword: Level-1 or Level-2 or Interface: + while not line.startswith(" Level-") and not line.startswith( + " Interface: " + ): + line = next(it) + + if line.startswith(" Level-"): + continue + + if line.startswith(" Interface: "): + break + + areas[area_id] = area + + except StopIteration: + + areas[area_id] = area + break + + return areas + + +def show_isis_interface_detail(router, rname): + """ + Get the show isis mpls ldp-sync info in a dictionary format. + + """ + out = topotest.normalize_text( + router.vtysh_cmd("show isis interface detail") + ).splitlines() + + logger.warning(out) + + parsed = parse_show_isis_interface_detail(out, rname) + + logger.warning(parsed) + + return parsed + + +def validate_show_isis_interface_detail(rname, fname): + tgen = get_topogen() + + filename = "{0}/{1}/{2}".format(CWD, rname, fname) + expected = json.loads(open(filename).read()) + + router = tgen.gears[rname] + + def compare_isis_interface_detail(router, expected): + "Helper function to test show isis interface detail" + actual = show_isis_interface_detail(router, rname) + return topotest.json_cmp(actual, expected) + + test_func = partial(compare_isis_interface_detail, router, expected) + (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=160) + + return (result, diff) -- cgit v1.2.3