From 2c7cac91ed6e7db0f6937923d2b57f97dbdbc337 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 11:53:30 +0200 Subject: Adding upstream version 8.4.4. Signed-off-by: Daniel Baumann --- .../bgp_basic_functionality_topo1/__init__.py | 0 .../bgp_basic_functionality.json | 115 ++ .../test_bgp_basic_functionality.py | 1182 ++++++++++++++++++++ 3 files changed, 1297 insertions(+) create mode 100644 tests/topotests/bgp_basic_functionality_topo1/__init__.py create mode 100644 tests/topotests/bgp_basic_functionality_topo1/bgp_basic_functionality.json create mode 100644 tests/topotests/bgp_basic_functionality_topo1/test_bgp_basic_functionality.py (limited to 'tests/topotests/bgp_basic_functionality_topo1') diff --git a/tests/topotests/bgp_basic_functionality_topo1/__init__.py b/tests/topotests/bgp_basic_functionality_topo1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/topotests/bgp_basic_functionality_topo1/bgp_basic_functionality.json b/tests/topotests/bgp_basic_functionality_topo1/bgp_basic_functionality.json new file mode 100644 index 0000000..ee1f1b7 --- /dev/null +++ b/tests/topotests/bgp_basic_functionality_topo1/bgp_basic_functionality.json @@ -0,0 +1,115 @@ +{ + "address_types": ["ipv4", "ipv6"], + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": {"ipv4": "10.0.0.0", "v4mask": 30, "ipv6": "fd00::", "v6mask": 64}, + "lo_prefix": {"ipv4": "1.0.", "v4mask": 32, "ipv6": "2001:db8:f::", "v6mask": 128}, + "routers": { + "r1": { + "links": { + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2": {"ipv4": "auto", "ipv6": "auto"}, + "r3": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": {"dest_link": {"r1": {}}}, + "r3": {"dest_link": {"r1": {}}} + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": {"dest_link": {"r1": {}}}, + "r3": {"dest_link": {"r1": {}}} + } + } + } + } + } + }, + "r2": { + "links": { + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1": {"ipv4": "auto", "ipv6": "auto"}, + "r3": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": {"dest_link": {"r2": {}}}, + "r3": {"dest_link": {"r2": {}}} + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": {"dest_link": {"r2": {}}}, + "r3": {"dest_link": {"r2": {}}} + } + } + } + } + } + }, + "r3": { + "links": { + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1": {"ipv4": "auto", "ipv6": "auto"}, + "r2": {"ipv4": "auto", "ipv6": "auto"}, + "r4": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": {"dest_link": {"r3": {}}}, + "r2": {"dest_link": {"r3": {}}}, + "r4": {"dest_link": {"r3": {}}} + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": {"dest_link": {"r3": {}}}, + "r2": {"dest_link": {"r3": {}}}, + "r4": {"dest_link": {"r3": {}}} + } + } + } + } + } + }, + "r4": { + "links": { + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r3": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp": { + "local_as": "200", + "address_family": { + "ipv4": { + "unicast": {"neighbor": {"r3": {"dest_link": {"r4": {}}}}} + }, + "ipv6": { + "unicast": {"neighbor": {"r3": {"dest_link": {"r4": {}}}}} + } + } + } + } + } +} diff --git a/tests/topotests/bgp_basic_functionality_topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp_basic_functionality_topo1/test_bgp_basic_functionality.py new file mode 100644 index 0000000..b18e32f --- /dev/null +++ b/tests/topotests/bgp_basic_functionality_topo1/test_bgp_basic_functionality.py @@ -0,0 +1,1182 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test BGP basic functionality: + +Test steps +- Create topology (setup module) + Creating 4 routers topology, r1, r2, r3 are in IBGP and + r3, r4 are in EBGP +- Bring up topology +- Verify for bgp to converge +- Modify/Delete and verify router-id +- Modify and verify bgp timers +- Create and verify static routes +- Modify and verify admin distance for existing static routes +- Test advertise network using network command +- Verify clear bgp +- Test bgp convergence with loopback interface +- Test advertise network using network command +- Verify routes not installed in zebra when /32 routes received + with loopback BGP session subnet +""" +# XXX clean up in later commit to avoid conflict on rebase +# pylint: disable=C0413 + +import os +import sys +import time +import pytest +from copy import deepcopy + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) +sys.path.append(os.path.join(CWD, "../lib/")) + +# Required to instantiate the topology builder class. + +from lib.bgp import ( + clear_bgp_and_verify, + create_router_bgp, + modify_as_number, + verify_as_numbers, + verify_bgp_convergence, + verify_bgp_rib, + verify_bgp_timers_and_functionality, + verify_router_id, +) +from lib.common_config import ( + addKernelRoute, + apply_raw_config, + check_address_types, + create_prefix_lists, + create_route_maps, + create_static_routes, + required_linux_kernel_version, + reset_config_on_routers, + start_topology, + step, + verify_admin_distance_for_static_routes, + verify_bgp_community, + verify_fib_routes, + verify_rib, + write_test_footer, + write_test_header, +) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen +from lib.topojson import build_config_from_json +from lib.topolog import logger + +pytestmark = [pytest.mark.bgpd, pytest.mark.staticd] + + +# Global Variable +KEEPALIVETIMER = 2 +HOLDDOWNTIMER = 6 +r1_ipv4_loopback = "1.0.1.0/24" +r2_ipv4_loopback = "1.0.2.0/24" +r3_ipv4_loopback = "1.0.3.0/24" +r4_ipv4_loopback = "1.0.4.0/24" +r1_ipv6_loopback = "2001:db8:f::1:0/120" +r2_ipv6_loopback = "2001:db8:f::2:0/120" +r3_ipv6_loopback = "2001:db8:f::3:0/120" +r4_ipv6_loopback = "2001:db8:f::4:0/120" +NETWORK = { + "ipv4": ["100.1.1.1/32", "100.1.1.2/32"], + "ipv6": ["100::1/128", "100::2/128"], +} + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + # Required linux kernel version for this suite to run. + result = required_linux_kernel_version("4.15") + if result is not True: + pytest.skip("Kernel requirements are not met") + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + json_file = "{}/bgp_basic_functionality.json".format(CWD) + tgen = Topogen(json_file, mod.__name__) + global topo + topo = tgen.json_topo + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start daemons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + global ADDR_TYPES + global BGP_CONVERGENCE + ADDR_TYPES = check_address_types() + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format( + BGP_CONVERGENCE + ) + + logger.info("Running setup_module() done") + + +def teardown_module(): + """Teardown the pytest environment""" + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) + logger.info("=" * 40) + + +##################################################### +# +# Testcases +# +##################################################### + + +def test_modify_and_delete_router_id(request): + """Test to modify, delete and verify router-id.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Modify router id + input_dict = { + "r1": {"bgp": {"router_id": "12.12.12.12"}}, + "r2": {"bgp": {"router_id": "22.22.22.22"}}, + "r3": {"bgp": {"router_id": "33.33.33.33"}}, + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Verifying router id once modified + result = verify_router_id(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Delete router id + input_dict = { + "r1": {"bgp": {"del_router_id": True}}, + "r2": {"bgp": {"del_router_id": True}}, + "r3": {"bgp": {"del_router_id": True}}, + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Verifying router id once deleted + # Once router-id is deleted, highest interface ip should become + # router-id + result = verify_router_id(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_config_with_4byte_as_number(request): + """ + Configure BGP with 4 byte ASN and verify it works fine + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + input_dict = { + "r1": {"bgp": {"local_as": 131079}}, + "r2": {"bgp": {"local_as": 131079}}, + "r3": {"bgp": {"local_as": 131079}}, + "r4": {"bgp": {"local_as": 131080}}, + } + result = modify_as_number(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + result = verify_as_numbers(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_BGP_config_with_invalid_ASN_p2(request): + """ + Configure BGP with invalid ASN(ex - 0, reserved ASN) and verify test case + ended up with error + """ + + tgen = get_topogen() + global BGP_CONVERGENCE + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to modify AS number + input_dict = { + "r1": { + "bgp": { + "local_as": 0, + } + }, + "r2": { + "bgp": { + "local_as": 0, + } + }, + "r3": { + "bgp": { + "local_as": 0, + } + }, + "r4": { + "bgp": { + "local_as": 64000, + } + }, + } + result = modify_as_number(tgen, topo, input_dict) + assert ( + result is not True + ), "Expected BGP config is not created because of invalid ASNs: {}".format(result) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + result = verify_bgp_convergence(tgen, topo) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) + + write_test_footer(tc_name) + + +def test_BGP_config_with_2byteAS_and_4byteAS_number_p1(request): + """ + Configure BGP with 4 byte and 2 byte ASN and verify BGP is converged + """ + + tgen = get_topogen() + global BGP_CONVERGENCE + + if BGP_CONVERGENCE != True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + result = verify_bgp_convergence(tgen, topo) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) + + # Api call to modify AS number + input_dict = { + "r1": {"bgp": {"local_as": 131079}}, + "r2": {"bgp": {"local_as": 131079}}, + "r3": {"bgp": {"local_as": 131079}}, + "r4": {"bgp": {"local_as": 111}}, + } + result = modify_as_number(tgen, topo, input_dict) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) + + result = verify_as_numbers(tgen, topo, input_dict) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) + + # Api call verify whether BGP is converged + result = verify_bgp_convergence(tgen, topo) + if result != True: + assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result) + + write_test_footer(tc_name) + + +def test_bgp_timers_functionality(request): + """ + Test to modify bgp timers and verify timers functionality. + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to modify BGP timerse + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": { + "keepalivetimer": KEEPALIVETIMER, + "holddowntimer": HOLDDOWNTIMER, + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, deepcopy(input_dict)) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Api call to clear bgp, so timer modification would take place + clear_bgp_and_verify(tgen, topo, "r1") + + # Verifying bgp timers functionality + result = verify_bgp_timers_and_functionality(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_static_routes(request): + """Test to create and verify static routes.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to create static routes + input_dict = { + "r1": { + "static_routes": [ + { + "network": "10.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.2", + } + ] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + next_hop = ["10.0.0.2", "10.0.0.5"] + result = verify_rib( + tgen, "ipv4", dut, input_dict, next_hop=next_hop, protocol=protocol + ) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_admin_distance_for_existing_static_routes(request): + """Test to modify and verify admin distance for existing static routes.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + input_dict = { + "r1": { + "static_routes": [ + { + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2", + } + ] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Verifying admin distance once modified + result = verify_admin_distance_for_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_advertise_network_using_network_command(request): + """Test advertise networks using network command.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + {"network": "20.0.0.0/32", "no_of_network": 10}, + {"network": "30.0.0.0/32", "no_of_network": 10}, + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Verifying RIB routes + dut = "r2" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_clear_bgp_and_verify(request): + """ + Created few static routes and verified all routes are learned via BGP + cleared BGP and verified all routes are intact + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # clear ip bgp + result = clear_bgp_and_verify(tgen, topo, "r1") + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_BGP_attributes_with_vrf_default_keyword_p0(request): + """ + TC_9: + Verify BGP functionality for default vrf with + "vrf default" keyword. + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + step("Configure static routes and redistribute in BGP on R3") + for addr_type in ADDR_TYPES: + input_dict = { + "r3": { + "static_routes": [ + { + "network": NETWORK[addr_type][0], + "no_of_ip": 4, + "next_hop": "Null0", + } + ] + } + } + + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + input_dict_2 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}}, + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "Create a route-map to match a specific prefix and modify" + "BGP attributes for matched prefix" + ) + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "ABC": [ + { + "seqid": 10, + "action": "permit", + "network": NETWORK["ipv4"][0], + } + ] + }, + "ipv6": { + "XYZ": [ + { + "seqid": 100, + "action": "permit", + "network": NETWORK["ipv6"][0], + } + ] + }, + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + if addr_type == "ipv4": + pf_list = "ABC" + else: + pf_list = "XYZ" + + input_dict_6 = { + "r3": { + "route_maps": { + "BGP_ATTR_{}".format(addr_type): [ + { + "action": "permit", + "seq_id": 10, + "match": {addr_type: {"prefix_lists": pf_list}}, + "set": { + "aspath": {"as_num": 500, "as_action": "prepend"}, + "localpref": 500, + "origin": "egp", + "community": {"num": "500:500", "action": "additive"}, + "large_community": { + "num": "500:500:500", + "action": "additive", + }, + }, + }, + {"action": "permit", "seq_id": 20}, + ] + }, + "BGP_ATTR_{}".format(addr_type): [ + { + "action": "permit", + "seq_id": 100, + "match": {addr_type: {"prefix_lists": pf_list}}, + "set": { + "aspath": {"as_num": 500, "as_action": "prepend"}, + "localpref": 500, + "origin": "egp", + "community": {"num": "500:500", "action": "additive"}, + "large_community": { + "num": "500:500:500", + "action": "additive", + }, + }, + }, + {"action": "permit", "seq_id": 200}, + ], + } + } + + result = create_route_maps(tgen, input_dict_6) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Apply the route-map on R3 in outbound direction for peer R4") + + input_dict_7 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "route_maps": [ + { + "name": "BGP_ATTR_ipv4", + "direction": "out", + } + ] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "route_maps": [ + { + "name": "BGP_ATTR_ipv6", + "direction": "out", + } + ] + } + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step( + "verify modified attributes for specific prefix with 'vrf default'" + "keyword on R4" + ) + for addr_type in ADDR_TYPES: + dut = "r4" + input_dict = { + "r3": { + "static_routes": [ + { + "network": NETWORK[addr_type][0], + "vrf": "default", + "largeCommunity": "500:500:500", + } + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + dut = "r4" + input_dict = { + "r3": { + "static_routes": [ + { + "network": NETWORK[addr_type][0], + "vrf": "default", + "community": "500:500", + } + ] + } + } + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + result = verify_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + input_dict_4 = {"largeCommunity": "500:500:500", "community": "500:500"} + + result = verify_bgp_community( + tgen, addr_type, dut, [NETWORK[addr_type][0]], input_dict_4 + ) + assert result is True, "Test case {} : Should fail \n Error: {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_bgp_with_loopback_interface(request): + """ + Test BGP with loopback interface + + Adding keys:value pair "dest_link": "lo" and "source_link": "lo" + peer dict of input json file for all router's creating config using + loopback interface. Once BGP neighboship is up then verifying BGP + convergence + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for routerN in sorted(topo["routers"].keys()): + for bgp_neighbor in topo["routers"][routerN]["bgp"]["address_family"]["ipv4"][ + "unicast" + ]["neighbor"].keys(): + + # Adding ['source_link'] = 'lo' key:value pair + topo["routers"][routerN]["bgp"]["address_family"]["ipv4"]["unicast"][ + "neighbor" + ][bgp_neighbor]["dest_link"] = { + "lo": { + "source_link": "lo", + } + } + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + input_dict = { + "r1": { + "static_routes": [ + {"network": "1.0.2.17/32", "next_hop": "10.0.0.2"}, + {"network": "1.0.3.17/32", "next_hop": "10.0.0.6"}, + ] + }, + "r2": { + "static_routes": [ + {"network": "1.0.1.17/32", "next_hop": "10.0.0.1"}, + {"network": "1.0.3.17/32", "next_hop": "10.0.0.10"}, + ] + }, + "r3": { + "static_routes": [ + {"network": "1.0.1.17/32", "next_hop": "10.0.0.5"}, + {"network": "1.0.2.17/32", "next_hop": "10.0.0.9"}, + {"network": "1.0.4.17/32", "next_hop": "10.0.0.14"}, + ] + }, + "r4": {"static_routes": [{"network": "1.0.3.17/32", "next_hop": "10.0.0.13"}]}, + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + # Api call verify whether BGP is converged + result = verify_bgp_convergence(tgen, topo) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_with_loopback_with_same_subnet_p1(request): + """ + Verify routes not installed in zebra when /32 routes received + with loopback BGP session subnet + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + step("Delete BGP seesion created initially") + input_dict_r1 = { + "r1": {"bgp": {"delete": True}}, + "r2": {"bgp": {"delete": True}}, + "r3": {"bgp": {"delete": True}}, + "r4": {"bgp": {"delete": True}}, + } + result = create_router_bgp(tgen, topo, input_dict_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Create BGP session over loop address") + topo_modify = deepcopy(topo) + + for routerN in sorted(topo["routers"].keys()): + for addr_type in ADDR_TYPES: + for bgp_neighbor in topo_modify["routers"][routerN]["bgp"][ + "address_family" + ][addr_type]["unicast"]["neighbor"].keys(): + + # Adding ['source_link'] = 'lo' key:value pair + topo_modify["routers"][routerN]["bgp"]["address_family"][addr_type][ + "unicast" + ]["neighbor"][bgp_neighbor]["dest_link"] = { + "lo": {"source_link": "lo", "ebgp_multihop": 2} + } + + result = create_router_bgp(tgen, topo_modify["routers"]) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Disable IPv6 BGP nbr from ipv4 address family") + raw_config = { + "r1": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r1"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + "r2": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r2"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + "r3": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r3"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0] + ), + "no neighbor {} activate".format( + topo["routers"]["r4"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + "r4": { + "raw_config": [ + "router bgp {}".format(topo["routers"]["r4"]["bgp"]["local_as"]), + "address-family ipv4 unicast", + "no neighbor {} activate".format( + topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0] + ), + ] + }, + } + + step("Configure kernel routes") + result = apply_raw_config(tgen, raw_config) + assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result) + + r1_ipv4_lo = topo["routers"]["r1"]["links"]["lo"]["ipv4"] + r1_ipv6_lo = topo["routers"]["r1"]["links"]["lo"]["ipv6"] + r2_ipv4_lo = topo["routers"]["r2"]["links"]["lo"]["ipv4"] + r2_ipv6_lo = topo["routers"]["r2"]["links"]["lo"]["ipv6"] + r3_ipv4_lo = topo["routers"]["r3"]["links"]["lo"]["ipv4"] + r3_ipv6_lo = topo["routers"]["r3"]["links"]["lo"]["ipv6"] + r4_ipv4_lo = topo["routers"]["r4"]["links"]["lo"]["ipv4"] + r4_ipv6_lo = topo["routers"]["r4"]["links"]["lo"]["ipv6"] + + r1_r2 = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0] + r2_r1 = topo["routers"]["r2"]["links"]["r1"]["ipv6"].split("/")[0] + r1_r3 = topo["routers"]["r1"]["links"]["r3"]["ipv6"].split("/")[0] + r3_r1 = topo["routers"]["r3"]["links"]["r1"]["ipv6"].split("/")[0] + r2_r3 = topo["routers"]["r2"]["links"]["r3"]["ipv6"].split("/")[0] + r3_r2 = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0] + r3_r4 = topo["routers"]["r3"]["links"]["r4"]["ipv6"].split("/")[0] + r4_r3 = topo["routers"]["r4"]["links"]["r3"]["ipv6"].split("/")[0] + + r1_r2_ipv4 = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0] + r2_r1_ipv4 = topo["routers"]["r2"]["links"]["r1"]["ipv4"].split("/")[0] + r1_r3_ipv4 = topo["routers"]["r1"]["links"]["r3"]["ipv4"].split("/")[0] + r3_r1_ipv4 = topo["routers"]["r3"]["links"]["r1"]["ipv4"].split("/")[0] + r2_r3_ipv4 = topo["routers"]["r2"]["links"]["r3"]["ipv4"].split("/")[0] + r3_r2_ipv4 = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0] + r3_r4_ipv4 = topo["routers"]["r3"]["links"]["r4"]["ipv4"].split("/")[0] + r4_r3_ipv4 = topo["routers"]["r4"]["links"]["r3"]["ipv4"].split("/")[0] + + r1_r2_intf = topo["routers"]["r1"]["links"]["r2"]["interface"] + r2_r1_intf = topo["routers"]["r2"]["links"]["r1"]["interface"] + r1_r3_intf = topo["routers"]["r1"]["links"]["r3"]["interface"] + r3_r1_intf = topo["routers"]["r3"]["links"]["r1"]["interface"] + r2_r3_intf = topo["routers"]["r2"]["links"]["r3"]["interface"] + r3_r2_intf = topo["routers"]["r3"]["links"]["r2"]["interface"] + r3_r4_intf = topo["routers"]["r3"]["links"]["r4"]["interface"] + r4_r3_intf = topo["routers"]["r4"]["links"]["r3"]["interface"] + + ipv4_list = [ + ("r1", r1_r2_intf, r2_ipv4_loopback), + ("r1", r1_r3_intf, r3_ipv4_loopback), + ("r2", r2_r1_intf, r1_ipv4_loopback), + ("r2", r2_r3_intf, r3_ipv4_loopback), + ("r3", r3_r1_intf, r1_ipv4_loopback), + ("r3", r3_r2_intf, r2_ipv4_loopback), + ("r3", r3_r4_intf, r4_ipv4_loopback), + ("r4", r4_r3_intf, r3_ipv4_loopback), + ] + + ipv6_list = [ + ("r1", r1_r2_intf, r2_ipv6_loopback, r2_r1), + ("r1", r1_r3_intf, r3_ipv6_loopback, r3_r1), + ("r2", r2_r1_intf, r1_ipv6_loopback, r1_r2), + ("r2", r2_r3_intf, r3_ipv6_loopback, r3_r2), + ("r3", r3_r1_intf, r1_ipv6_loopback, r1_r3), + ("r3", r3_r2_intf, r2_ipv6_loopback, r2_r3), + ("r3", r3_r4_intf, r4_ipv6_loopback, r4_r3), + ("r4", r4_r3_intf, r3_ipv6_loopback, r3_r4), + ] + + for dut, intf, loop_addr in ipv4_list: + result = addKernelRoute(tgen, dut, intf, loop_addr) + assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result) + + for dut, intf, loop_addr, next_hop in ipv6_list: + result = addKernelRoute(tgen, dut, intf, loop_addr, next_hop) + assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result) + + step("Configure static routes") + + input_dict = { + "r1": { + "static_routes": [ + {"network": r2_ipv4_loopback, "next_hop": r2_r1_ipv4}, + {"network": r3_ipv4_loopback, "next_hop": r3_r1_ipv4}, + {"network": r2_ipv6_loopback, "next_hop": r2_r1}, + {"network": r3_ipv6_loopback, "next_hop": r3_r1}, + ] + }, + "r2": { + "static_routes": [ + {"network": r1_ipv4_loopback, "next_hop": r1_r2_ipv4}, + {"network": r3_ipv4_loopback, "next_hop": r3_r2_ipv4}, + {"network": r1_ipv6_loopback, "next_hop": r1_r2}, + {"network": r3_ipv6_loopback, "next_hop": r3_r2}, + ] + }, + "r3": { + "static_routes": [ + {"network": r1_ipv4_loopback, "next_hop": r1_r3_ipv4}, + {"network": r2_ipv4_loopback, "next_hop": r2_r3_ipv4}, + {"network": r4_ipv4_loopback, "next_hop": r4_r3_ipv4}, + {"network": r1_ipv6_loopback, "next_hop": r1_r3}, + {"network": r2_ipv6_loopback, "next_hop": r2_r3}, + {"network": r4_ipv6_loopback, "next_hop": r4_r3}, + ] + }, + "r4": { + "static_routes": [ + {"network": r3_ipv4_loopback, "next_hop": r3_r4_ipv4}, + {"network": r3_ipv6_loopback, "next_hop": r3_r4}, + ] + }, + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Verify BGP session convergence") + + result = verify_bgp_convergence(tgen, topo_modify) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Configure redistribute connected on R2 and R4") + input_dict_1 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + "ipv6": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + } + } + }, + "r4": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + "ipv6": { + "unicast": {"redistribute": [{"redist_type": "connected"}]} + }, + } + } + }, + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Verify Ipv4 and Ipv6 network installed in R1 RIB but not in FIB") + input_dict_r1 = { + "r1": { + "static_routes": [ + {"network": "1.0.2.17/32"}, + {"network": "2001:db8:f::2:17/128"}, + ] + } + } + + dut = "r1" + protocol = "bgp" + for addr_type in ADDR_TYPES: + result = verify_fib_routes( + tgen, addr_type, dut, input_dict_r1, expected=False + ) # pylint: disable=E1123 + assert result is not True, ( + "Testcase {} : Failed \n".format(tc_name) + + "Expected behavior: routes should not present in fib \n" + + "Error: {}".format(result) + ) + + step("Verify Ipv4 and Ipv6 network installed in r3 RIB but not in FIB") + input_dict_r3 = { + "r3": { + "static_routes": [ + {"network": "1.0.4.17/32"}, + {"network": "2001:db8:f::4:17/128"}, + ] + } + } + dut = "r3" + protocol = "bgp" + for addr_type in ADDR_TYPES: + result = verify_fib_routes( + tgen, addr_type, dut, input_dict_r1, expected=False + ) # pylint: disable=E1123 + assert result is not True, ( + "Testcase {} : Failed \n".format(tc_name) + + "Expected behavior: routes should not present in fib \n" + + "Error: {}".format(result) + ) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) -- cgit v1.2.3