summaryrefslogtreecommitdiffstats
path: root/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:53:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:53:30 +0000
commit2c7cac91ed6e7db0f6937923d2b57f97dbdbc337 (patch)
treec05dc0f8e6aa3accc84e3e5cffc933ed94941383 /tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
parentInitial commit. (diff)
downloadfrr-2c7cac91ed6e7db0f6937923d2b57f97dbdbc337.tar.xz
frr-2c7cac91ed6e7db0f6937923d2b57f97dbdbc337.zip
Adding upstream version 8.4.4.upstream/8.4.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py')
-rw-r--r--tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py1173
1 files changed, 1173 insertions, 0 deletions
diff --git a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
new file mode 100644
index 0000000..8c9d2c9
--- /dev/null
+++ b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py
@@ -0,0 +1,1173 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2020 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test bgp aggregation functionality:
+
+1. Verify route summarisation with summary-only for redistributed as well as
+ locally generated routes.
+2. Verify route summarisation with as-set for redistributed routes.
+
+"""
+
+import os
+import sys
+import time
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology,
+ write_test_header,
+ write_test_footer,
+ reset_config_on_routers,
+ verify_rib,
+ create_static_routes,
+ check_address_types,
+ step,
+ create_route_maps,
+ create_prefix_lists,
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence,
+ create_router_bgp,
+ verify_bgp_rib,
+ verify_bgp_community,
+)
+from lib.topojson import build_config_from_json
+
+
+pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
+
+# Global variables
+BGP_CONVERGENCE = False
+ADDR_TYPES = check_address_types()
+
+NETWORK_1_1 = {"ipv4": "10.1.1.0/24", "ipv6": "10:1::1:0/120"}
+NETWORK_1_2 = {"ipv4": "10.1.2.0/24", "ipv6": "10:1::2:0/120"}
+NETWORK_1_3 = {"ipv4": "10.1.3.0/24", "ipv6": "10:1::3:0/120"}
+NETWORK_1_4 = {"ipv4": "10.1.4.0/24", "ipv6": "10:1::4:0/120"}
+NETWORK_1_5 = {"ipv4": "10.1.5.0/24", "ipv6": "10:1::5:0/120"}
+NETWORK_2_1 = {"ipv4": "10.1.1.100/32", "ipv6": "10:1::1:0/124"}
+NETWORK_2_2 = {"ipv4": "10.1.5.0/24", "ipv6": "10:1::5:0/120"}
+NETWORK_2_3 = {"ipv4": "10.1.6.0/24", "ipv6": "10:1::6:0/120"}
+NETWORK_2_4 = {"ipv4": "10.1.7.0/24", "ipv6": "10:1::7:0/120"}
+NETWORK_3_1 = {"ipv4": "10.1.8.0/24", "ipv6": "10:1::8:0/120"}
+NETWORK_4_1 = {"ipv4": "10.2.1.0/24", "ipv6": "10:2::1:0/120"}
+NEXT_HOP = {"ipv4": "Null0", "ipv6": "Null0"}
+AGGREGATE_NW = {"ipv4": "10.1.0.0/20", "ipv6": "10:1::/96"}
+
+COMMUNITY = [
+ "0:1 0:10 0:100",
+ "0:2 0:20 0:200",
+ "0:3 0:30 0:300",
+ "0:4 0:40 0:400",
+ "0:5 0:50 0:500",
+ "0:1 0:2 0:3 0:4 0:5 0:10 0:20 0:30 0:40 0:50 0:100 0:200 0:300 0:400 0:500",
+ "0:3 0:4 0:5 0:30 0:40 0:50 0:300 0:400 0:500",
+ "0:6 0:60 0:600",
+ "0:7 0:70 0:700",
+ "0:3 0:4 0:5 0:6 0:30 0:40 0:50 0:60 0:300 0:400 0:500 0:600",
+]
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ json_file = "{}/bgp_aggregation.json".format(CWD)
+ tgen = Topogen(json_file, mod.__name__)
+ global topo
+ topo = tgen.json_topo
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start daemons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ ADDR_TYPES = check_address_types()
+
+ for addr_type in ADDR_TYPES:
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
+ BGP_CONVERGENCE
+ )
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Tests starting
+#
+#####################################################
+
+
+def test_route_summarisation_with_summary_only_p1(request):
+ """
+ Verify route summarisation with summary-only for redistributed as well as
+ locally generated routes.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ reset_config_on_routers(tgen)
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Configure static routes on router R1 and redistribute in " "BGP process.")
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [
+ NETWORK_1_1[addr_type],
+ NETWORK_1_2[addr_type],
+ NETWORK_1_3[addr_type],
+ ],
+ "next_hop": NEXT_HOP[addr_type],
+ }
+ ]
+ }
+ }
+ input_redistribute = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
+ }
+ }
+ }
+ }
+ }
+
+ step("Configuring {} static routes on router R1 ".format(addr_type))
+
+ result = create_static_routes(tgen, input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configuring redistribute static for {} address-family on router R1 ".format(
+ addr_type
+ )
+ )
+
+ result = create_router_bgp(tgen, topo, input_redistribute)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Verify that Static routes are redistributed in BGP process")
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [
+ NETWORK_1_1[addr_type],
+ NETWORK_1_2[addr_type],
+ NETWORK_1_3[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+
+ result = verify_rib(tgen, addr_type, "r3", input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Advertise some prefixes using network command")
+ step(
+ "Additionally advertise 10.1.4.0/24 & 10.1.5.0/24 and "
+ "10:1::4:0/120 & 10:1::5:0/120 from R4 to R1."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_advertise = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": [
+ NETWORK_2_1[addr_type],
+ NETWORK_2_2[addr_type],
+ NETWORK_2_3[addr_type],
+ NETWORK_2_4[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r4": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": [
+ NETWORK_1_4[addr_type],
+ NETWORK_1_5[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ }
+
+ result = create_router_bgp(tgen, topo, input_advertise)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that advertised prefixes using network command are being "
+ "advertised in BGP process"
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_advertise = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": [
+ NETWORK_2_1[addr_type],
+ NETWORK_2_2[addr_type],
+ NETWORK_2_3[addr_type],
+ NETWORK_2_4[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = verify_rib(tgen, addr_type, "r3", input_advertise)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure aggregate-address to summarise all the advertised routes.")
+
+ for addr_type in ADDR_TYPES:
+ route_aggregate = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "aggregate_address": [
+ {
+ "network": AGGREGATE_NW[addr_type],
+ "summary": True,
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, route_aggregate)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that we see 1 summarised route and remaining suppressed "
+ "routes on advertising router R1 and only 1 summarised route on "
+ "receiving router R3 for both AFIs."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static_agg = {
+ "r1": {"static_routes": [{"network": AGGREGATE_NW[addr_type]}]}
+ }
+
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [
+ NETWORK_1_1[addr_type],
+ NETWORK_1_2[addr_type],
+ NETWORK_1_3[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+
+ result = verify_rib(tgen, addr_type, "r3", input_static_agg, protocol="bgp")
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(
+ tgen, addr_type, "r3", input_static, protocol="bgp", expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_static_agg, protocol="bgp")
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for action, value in zip(["removed", "add"], [True, False]):
+
+ step(
+ "{} static routes as below: "
+ "(no) ip route 10.1.1.0/24 and (no) ip route 10.1.2.0/24"
+ "(no) ipv6 route 10:1::1:0/120 and (no) ip route 10:1::2:0/120".format(
+ action
+ )
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK_1_1[addr_type], NETWORK_1_2[addr_type]],
+ "next_hop": NEXT_HOP[addr_type],
+ "delete": value,
+ }
+ ]
+ }
+ }
+
+ result = create_static_routes(tgen, input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that there is no impact on R3, as summarised route remains "
+ "intact. However suppressed routes on R1 disappear and re-appear "
+ "based on {} static routes.".format(action)
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static_1 = {
+ "r1": {
+ "static_routes": [
+ {"network": [NETWORK_1_1[addr_type], NETWORK_1_2[addr_type]]}
+ ]
+ }
+ }
+
+ input_static_2 = {
+ "r1": {"static_routes": [{"network": AGGREGATE_NW[addr_type]}]}
+ }
+
+ if value:
+ result = verify_rib(
+ tgen, addr_type, "r1", input_static_1, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Routes are still present \n Error: {}".format(
+ tc_name, result
+ )
+ else:
+ result = verify_rib(tgen, addr_type, "r1", input_static_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r3", input_static_2, protocol="bgp")
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "{} prefixes using network command as below:"
+ "(no) network 10.1.6.1/24 and (no) network 10.1.7.1/24"
+ "(no) network 10:1::6:0/120 and (no) network 10:1::7:0/120".format(action)
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_advertise = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": [
+ NETWORK_2_3[addr_type],
+ NETWORK_2_4[addr_type],
+ ],
+ "delete": value,
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_advertise)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that there is no impact on R3, as summarised route remains "
+ "intact. However suppressed routes on R1 disappear and re-appear "
+ "based on {} of network command.".format(action)
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_advertise_1 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": [
+ NETWORK_2_3[addr_type],
+ NETWORK_2_4[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ input_advertise_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {"network": AGGREGATE_NW[addr_type]}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if value:
+ result = verify_bgp_rib(
+ tgen, addr_type, "r1", input_advertise_1, expected=False
+ )
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "Routes are still present \n Error: {}".format(tc_name, result)
+ )
+ else:
+ result = verify_bgp_rib(tgen, addr_type, "r1", input_advertise_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r3", input_advertise_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Add a new network each one from out of aggregation range and "
+ "other within aggregation range. "
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {"network": NETWORK_3_1[addr_type], "next_hop": NEXT_HOP[addr_type]}
+ ]
+ }
+ }
+
+ result = create_static_routes(tgen, input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_advertise = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": NETWORK_4_1[addr_type],
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_advertise)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that when a network within aggregation range is added, "
+ "there is no impact on receiving router. However if a network "
+ "outside aggregation range is added/removed, R3 receives and "
+ "withdraws it accordingly."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static = {"r1": {"static_routes": [{"network": AGGREGATE_NW[addr_type]}]}}
+
+ result = verify_rib(tgen, addr_type, "r3", input_static, protocol="bgp")
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ input_advertise_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": [
+ NETWORK_4_1[addr_type],
+ AGGREGATE_NW[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = verify_rib(tgen, addr_type, "r3", input_advertise_2, protocol="bgp")
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for action, value in zip(["Delete", "Re-add"], [True, False]):
+ step("{} aggregation command from R1.".format(action))
+
+ for addr_type in ADDR_TYPES:
+ route_aggregate = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "aggregate_address": [
+ {
+ "network": AGGREGATE_NW[addr_type],
+ "summary": True,
+ "delete": value,
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, route_aggregate)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify on both routers that summarised route is withdrawn from R1 "
+ "and R3 when aggregate-address command is removed and appears again "
+ "when aggregate-address command is re-added. Check for both AFIs."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static_agg = {
+ "r1": {"static_routes": [{"network": AGGREGATE_NW[addr_type]}]}
+ }
+
+ if value:
+ result = verify_rib(
+ tgen, addr_type, "r1", input_static_agg, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Aggregated route is still present \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(
+ tgen, addr_type, "r3", input_static_agg, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Aggregated route is still present \n Error: {}".format(
+ tc_name, result
+ )
+ else:
+ result = verify_rib(tgen, addr_type, "r1", input_static_agg)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r3", input_static_agg)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_route_summarisation_with_as_set_p1(request):
+ """
+ Verify route summarisation with as-set for redistributed routes.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ reset_config_on_routers(tgen)
+ write_test_header(tc_name)
+
+ # Don"t run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Configure static routes on router R1 and redistribute in " "BGP process.")
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [
+ NETWORK_1_1[addr_type],
+ NETWORK_1_2[addr_type],
+ NETWORK_1_3[addr_type],
+ NETWORK_1_4[addr_type],
+ NETWORK_1_5[addr_type],
+ ],
+ "next_hop": NEXT_HOP[addr_type],
+ }
+ ]
+ }
+ }
+ input_redistribute = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
+ }
+ }
+ }
+ }
+ }
+
+ step("Configuring {} static routes on router R1 ".format(addr_type))
+
+ result = create_static_routes(tgen, input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configuring redistribute static for {} address-family on router R1 ".format(
+ addr_type
+ )
+ )
+
+ result = create_router_bgp(tgen, topo, input_redistribute)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Verify that Static routes are redistributed in BGP process")
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [
+ NETWORK_1_1[addr_type],
+ NETWORK_1_2[addr_type],
+ NETWORK_1_3[addr_type],
+ NETWORK_1_4[addr_type],
+ NETWORK_1_5[addr_type],
+ ]
+ }
+ ]
+ }
+ }
+
+ result = verify_rib(tgen, addr_type, "r3", input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Configure a route-map to attach a unique community attribute value "
+ "to each of these prefixes, while re-distributing static."
+ )
+
+ for addr_type in ADDR_TYPES:
+ for pfx, seq_id, network, in zip(
+ [1, 2, 3, 4, 5],
+ [10, 20, 30, 40, 50],
+ [NETWORK_1_1, NETWORK_1_2, NETWORK_1_3, NETWORK_1_4, NETWORK_1_5],
+ ):
+ prefix_list = {
+ "r1": {
+ "prefix_lists": {
+ addr_type: {
+ "pf_list_{}_{}".format(addr_type, pfx): [
+ {
+ "seqid": seq_id,
+ "network": network[addr_type],
+ "action": "permit",
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, prefix_list)
+ assert result is True, "Test case {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Create route-map for applying prefix-list on r1")
+
+ for addr_type in ADDR_TYPES:
+ for pfx, comm_id in zip([1, 2, 3, 4, 5], [0, 1, 2, 3, 4]):
+ route_map = {
+ "r1": {
+ "route_maps": {
+ "rmap_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "match": {
+ addr_type: {
+ "prefix_lists": "pf_list_{}_{}".format(
+ addr_type, pfx
+ )
+ }
+ },
+ "set": {"community": {"num": COMMUNITY[comm_id]}},
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_route_maps(tgen, route_map)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Re-configure redistribute static with route-map")
+
+ for addr_type in ADDR_TYPES:
+ input_redistribute = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [
+ {
+ "redist_type": "static",
+ "attribute": {
+ "route-map": "rmap_{}".format(addr_type)
+ },
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_redistribute)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure aggregate-address to summarise all the advertised routes.")
+
+ for addr_type in ADDR_TYPES:
+ route_aggregate = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "aggregate_address": [
+ {"network": AGGREGATE_NW[addr_type], "as_set": True}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, route_aggregate)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that we see summarised route on router R3 with all the "
+ "community attribute values combined with that aggregate route."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_dict = {"community": COMMUNITY[5]}
+ result = verify_bgp_community(
+ tgen, addr_type, "r3", [AGGREGATE_NW[addr_type]], input_dict
+ )
+ assert result is True, "Test case {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Remove static routes as below: "
+ "(no) ip route 10.1.1.0/24 blackhole "
+ "(no) ip route 10.1.2.0/24 blackhole "
+ "(no) ipv6 route 10:1::1:0/120 blackhole "
+ "(no) ipv6 route 10:1::2:0/120 blackhole "
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK_1_1[addr_type], NETWORK_1_2[addr_type]],
+ "next_hop": NEXT_HOP[addr_type],
+ "delete": True,
+ }
+ ]
+ }
+ }
+
+ result = create_static_routes(tgen, input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify on R3 that whenever we remove the static routes, we still"
+ " see aggregated route however the corresponding community attribute"
+ "values are withdrawn."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_dict = {"community": COMMUNITY[6]}
+ result = verify_bgp_community(
+ tgen, addr_type, "r3", [AGGREGATE_NW[addr_type]], input_dict
+ )
+ assert result is True, "Test case {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Add/remove a new network with community value, each one from out of "
+ "aggregation range and other within aggregation range. "
+ )
+
+ step(
+ "Add a new network each one from out of aggregation range and "
+ "other within aggregation range. "
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": [NETWORK_3_1[addr_type], NETWORK_4_1[addr_type]],
+ "next_hop": NEXT_HOP[addr_type],
+ }
+ ]
+ }
+ }
+
+ result = create_static_routes(tgen, input_static)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ for (
+ pfx,
+ seq_id,
+ network,
+ ) in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]):
+ prefix_list = {
+ "r1": {
+ "prefix_lists": {
+ addr_type: {
+ "pf_list_{}_{}".format(addr_type, pfx): [
+ {
+ "seqid": seq_id,
+ "network": network[addr_type],
+ "action": "permit",
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, prefix_list)
+ assert result is True, "Test case {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Create route-map for applying prefix-list on r1")
+
+ for addr_type in ADDR_TYPES:
+ for pfx, comm_id in zip([6, 7], [7, 8]):
+ route_map = {
+ "r1": {
+ "route_maps": {
+ "rmap_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "match": {
+ addr_type: {
+ "prefix_lists": "pf_list_{}_{}".format(
+ addr_type, pfx
+ )
+ }
+ },
+ "set": {"community": {"num": COMMUNITY[comm_id]}},
+ }
+ ]
+ }
+ }
+ }
+
+ result = create_route_maps(tgen, route_map)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify on R3 when route is added within the summary range, aggregated"
+ " route also has associated community value added. However if the route"
+ " is beyond the summary range the aggregated route would have no impact"
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_dict = {"community": COMMUNITY[9]}
+ result = verify_bgp_community(
+ tgen, addr_type, "r3", [AGGREGATE_NW[addr_type]], input_dict
+ )
+ assert result is True, "Test case {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ for action, value in zip(["Delete", "Re-add"], [True, False]):
+ step("{} aggregation command from R1.".format(action))
+
+ for addr_type in ADDR_TYPES:
+ route_aggregate = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "aggregate_address": [
+ {
+ "network": AGGREGATE_NW[addr_type],
+ "as_set": True,
+ "delete": value,
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, route_aggregate)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Verify that when as-set command is removed, we do not see community "
+ "attribute added to summarised route on R3. However when as-set option "
+ "is re-added, all the community attribute values must appear with "
+ "summarised route."
+ )
+
+ for addr_type in ADDR_TYPES:
+ input_static_agg = {
+ "r1": {"static_routes": [{"network": AGGREGATE_NW[addr_type]}]}
+ }
+
+ if value:
+ result = verify_rib(
+ tgen, addr_type, "r1", input_static_agg, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Aggregated route is still present \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(
+ tgen, addr_type, "r3", input_static_agg, expected=False
+ )
+ assert (
+ result is not True
+ ), "Testcase {} : Failed \n Aggregated route is still present \n Error: {}".format(
+ tc_name, result
+ )
+ else:
+ result = verify_rib(tgen, addr_type, "r1", input_static_agg)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r3", input_static_agg)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ input_dict = {"community": COMMUNITY[9]}
+ result = verify_bgp_community(
+ tgen, addr_type, "r3", [AGGREGATE_NW[addr_type]], input_dict
+ )
+ assert result is True, "Test case {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))