From 2c7cac91ed6e7db0f6937923d2b57f97dbdbc337 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 11:53:30 +0200 Subject: Adding upstream version 8.4.4. Signed-off-by: Daniel Baumann --- .../bgp_as_allow_in/test_bgp_as_allow_in.py | 982 +++++++++++++++++++++ 1 file changed, 982 insertions(+) create mode 100644 tests/topotests/bgp_as_allow_in/test_bgp_as_allow_in.py (limited to 'tests/topotests/bgp_as_allow_in/test_bgp_as_allow_in.py') diff --git a/tests/topotests/bgp_as_allow_in/test_bgp_as_allow_in.py b/tests/topotests/bgp_as_allow_in/test_bgp_as_allow_in.py new file mode 100644 index 0000000..ec66c8c --- /dev/null +++ b/tests/topotests/bgp_as_allow_in/test_bgp_as_allow_in.py @@ -0,0 +1,982 @@ +#!/usr/bin/python + +# +# Copyright (c) 2020 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test bgp allowas-in functionality: + +- Verify that routes coming from same AS are accepted only when + '"allowas-in" is configuerd. +- Verify that "allowas-in" feature works per address-family/VRF + 'basis and doesn't impact the other AFIs. +- Verify that the if number of occurrences of AS number in path is + 'more than the configured allowas-in value then we do not accept + 'the route. +- Verify that when we advertise a network, learned from the same AS + 'via allowas-in command, to an iBGP neighbor we see multiple + 'occurrences. +- Verify that when we advertise a network, learned from the same AS + 'via allowas-in command, to an eBGP neighbor we see multiple + 'occurrences of our own AS based on configured value+1. +""" + +import os +import sys +import time +import json +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) +sys.path.append(os.path.join(CWD, "../lib/")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, + write_test_header, + write_test_footer, + reset_config_on_routers, + verify_rib, + create_static_routes, + create_route_maps, + check_address_types, + step, + required_linux_kernel_version, +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, + create_router_bgp, + verify_bgp_rib, +) +from lib.topojson import build_topo_from_json, build_config_from_json + +pytestmark = [pytest.mark.bgpd, pytest.mark.staticd] + + +# Reading the data from JSON File for topology creation +jsonFile = "{}/bgp_as_allow_in.json".format(CWD) +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +BGP_CONVERGENCE = False +ADDR_TYPES = check_address_types() +NETWORK = {"ipv4": "2.2.2.2/32", "ipv6": "22:22::2/128"} +NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"} + + +def build_topo(tgen): + """Build function""" + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + # Required linux kernel version for this suite to run. + result = required_linux_kernel_version("4.15") + if result is not True: + pytest.skip("Kernel requirements are not met") + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(build_topo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start daemons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Checking BGP convergence + global BGP_CONVERGENCE + global ADDR_TYPES + + # Api call verify whether BGP is converged + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format( + BGP_CONVERGENCE + ) + + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) + logger.info("=" * 40) + + +##################################################### +# +# Tests starting +# +##################################################### + + +def test_bgp_allowas_in_p0(request): + """ + Verify that routes coming from same AS are accepted only when + "allowas-in" is configuerd. + + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + reset_config_on_routers(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + step("Advertise prefix 2.2.2.2/32 from Router-1(AS-200).") + step("Advertise an ipv6 prefix 22:22::2/128 from Router-1(AS-200).") + # configure static routes + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + input_dict_4 = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static in Router BGP in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": {"redistribute": [{"redist_type": "static"}]} + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + step( + 'Check BGP table of router R3 using "sh bgp ipv4" and "sh bgp ' + 'ipv6" command.' + ) + step( + "We should not see prefix advertised from R1 in R3's BGP " + "table without allowas-in." + ) + logger.info("Verifying %s routes on r3, route should not be present", addr_type) + result = verify_rib( + tgen, + addr_type, + dut, + input_dict_4, + next_hop=NEXT_HOP_IP[addr_type], + protocol=protocol, + expected=False, + ) + assert result is not True, ( + "Testcase {} : Failed \n".format(tc_name) + + "Expected behavior: routes should not present in rib \n" + + "Error: {}".format(result) + ) + + step("Configure allowas-in on R3 for R2.") + step("We should see the prefix advertised from R1 in R3's BGP table.") + # Api call to enable allowas-in in bgp process. + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "allowas-in": {"number_occurences": 1} + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_bgp_allowas_in_per_addr_family_p0(request): + """ + Verify that "allowas-in" feature works per address-family/VRF + basis and doesn't impact the other AFIs. + + """ + + # This test is applicable only for dual stack. + if "ipv4" not in ADDR_TYPES or "ipv6" not in ADDR_TYPES: + pytest.skip("NOT APPLICABLE") + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + reset_config_on_routers(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + step("Advertise prefix 2.2.2.2/32 from Router-1(AS-200).") + step("Advertise an ipv6 prefix 22:22::2/128 from Router-1(AS-200).") + # configure static routes routes + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + input_dict_4 = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static in Router BGP in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": {"redistribute": [{"redist_type": "static"}]} + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure allowas-in on R3 for R2 under IPv4 addr-family only") + # Api call to enable allowas-in in bgp process. + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {"allowas-in": {"number_occurences": 1}} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + static_route_ipv4 = { + "r1": { + "static_routes": [ + {"network": NETWORK["ipv4"], "next_hop": NEXT_HOP_IP["ipv4"]} + ] + } + } + + static_route_ipv6 = { + "r1": { + "static_routes": [ + {"network": NETWORK["ipv6"], "next_hop": NEXT_HOP_IP["ipv6"]} + ] + } + } + step("We should see R1 advertised prefix only in IPv4 AFI " "not in IPv6 AFI.") + result = verify_rib(tgen, "ipv4", dut, static_route_ipv4, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + result = verify_rib( + tgen, "ipv6", dut, static_route_ipv6, protocol=protocol, expected=False + ) + assert result is not True, ( + "Testcase {} : Failed \n".format(tc_name) + + "Expected behavior: routes are should not be present in ipv6 rib\n" + + " Error: {}".format(result) + ) + + step("Repeat the same test for IPv6 AFI.") + step("Configure allowas-in on R3 for R2 under IPv6 addr-family only") + # Api call to enable allowas-in in bgp process. + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "allowas-in": { + "number_occurences": 2, + "delete": True, + } + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {"allowas-in": {"number_occurences": 2}} + } + } + } + } + }, + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + step("We should see R1 advertised prefix only in IPv6 AFI " "not in IPv4 AFI.") + result = verify_rib( + tgen, "ipv4", dut, static_route_ipv4, protocol=protocol, expected=False + ) + assert result is not True, ( + "Testcase {} : Failed \n".format(tc_name) + + "Expected behavior: routes should not be present in ipv4 rib\n" + + " Error: {}".format(result) + ) + result = verify_rib(tgen, "ipv6", dut, static_route_ipv6, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_allowas_in_no_of_occurrences_p0(request): + """ + Verify that the if number of occurrences of AS number in path is + more than the configured allowas-in value then we do not accept + the route. + + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + reset_config_on_routers(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + static_routes = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, static_routes) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static in Router BGP in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": {"redistribute": [{"redist_type": "static"}]} + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure a route-map on R1 to prepend AS 4 times.") + for addr_type in ADDR_TYPES: + input_dict_4 = { + "r1": { + "route_maps": { + "ASP_{}".format(addr_type): [ + { + "action": "permit", + "set": { + "path": { + "as_num": "200 200 200 200", + "as_action": "prepend", + } + }, + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure route map in out direction on R1") + # Configure neighbor for route map + input_dict_7 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": { + "route_maps": [ + { + "name": "ASP_{}".format( + addr_type + ), + "direction": "out", + } + ] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + step('Configure "allowas-in 4" on R3 for R2.') + # Api call to enable allowas-in in bgp process. + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "allowas-in": {"number_occurences": 4} + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_rib( + tgen, addr_type, dut, static_routes, protocol=protocol, expected=False + ) + assert result is not True, ( + "Testcase {} : Failed \n ".format(tc_name) + + "Expected behavior: routes are should not be present in rib\n" + + "Error: {}".format(result) + ) + + for addr_type in ADDR_TYPES: + step('Configure "allowas-in 5" on R3 for R2.') + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "allowas-in": {"number_occurences": 5} + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + static_routes = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + result = verify_rib(tgen, addr_type, dut, static_routes, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_bgp_allowas_in_sameastoibgp_p1(request): + """ + Verify that when we advertise a network, learned from the same AS + via allowas-in command, to an iBGP neighbor we see multiple + occurrences. + + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + reset_config_on_routers(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + static_routes = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, static_routes) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static in Router BGP in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": {"redistribute": [{"redist_type": "static"}]} + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure a route-map on R2 to prepend AS 2 times.") + for addr_type in ADDR_TYPES: + input_dict_4 = { + "r2": { + "route_maps": { + "ASP_{}".format(addr_type): [ + { + "action": "permit", + "set": { + "path": {"as_num": "200 200", "as_action": "prepend"} + }, + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure route map in out direction on R2") + # Configure neighbor for route map + input_dict_7 = { + "r2": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r2": { + "route_maps": [ + { + "name": "ASP_{}".format( + addr_type + ), + "direction": "out", + } + ] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step('Configure "allowas-in 3" on R3 for R1.') + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "allowas-in": {"number_occurences": 3} + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + input_dict_1 = { + "r4": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": { + "allowas-in": {"number_occurences": 3} + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + static_routes = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + dut = "r4" + path = "100 200 200 200" + result = verify_bgp_rib(tgen, addr_type, dut, static_routes, aspath=path) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_bgp_allowas_in_sameastoebgp_p1(request): + """ + Verify that when we advertise a network, learned from the same AS + via allowas-in command, to an eBGP neighbor we see multiple + occurrences of our own AS based on configured value+1. + + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + reset_config_on_routers(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + static_routes = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, static_routes) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static in Router BGP in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": {"redistribute": [{"redist_type": "static"}]} + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure a route-map on R2 to prepend AS 2 times.") + for addr_type in ADDR_TYPES: + input_dict_4 = { + "r2": { + "route_maps": { + "ASP_{}".format(addr_type): [ + { + "action": "permit", + "set": { + "path": {"as_num": "200 200", "as_action": "prepend"} + }, + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure route map in out direction on R2") + # Configure neighbor for route map + input_dict_7 = { + "r2": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r2": { + "route_maps": [ + { + "name": "ASP_{}".format( + addr_type + ), + "direction": "out", + } + ] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + step('Configure "allowas-in 3" on R3 for R1.') + input_dict_1 = { + "r3": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": { + "allowas-in": {"number_occurences": 3} + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + static_routes = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + dut = "r5" + path = "200 100 200 200 200" + result = verify_bgp_rib(tgen, addr_type, dut, static_routes, aspath=path) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) -- cgit v1.2.3