summaryrefslogtreecommitdiffstats
path: root/tests/topotests/lib/ospf.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-09 13:16:35 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-09 13:16:35 +0000
commite2bbf175a2184bd76f6c54ccf8456babeb1a46fc (patch)
treef0b76550d6e6f500ada964a3a4ee933a45e5a6f1 /tests/topotests/lib/ospf.py
parentInitial commit. (diff)
downloadfrr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.tar.xz
frr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.zip
Adding upstream version 9.1.upstream/9.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/topotests/lib/ospf.py')
-rw-r--r--tests/topotests/lib/ospf.py3028
1 files changed, 3028 insertions, 0 deletions
diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py
new file mode 100644
index 0000000..5b18f8b
--- /dev/null
+++ b/tests/topotests/lib/ospf.py
@@ -0,0 +1,3028 @@
+# SPDX-License-Identifier: ISC
+#
+# Copyright (c) 2020 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+
+import ipaddress
+import sys
+from copy import deepcopy
+from time import sleep
+
+# Import common_config to use commomnly used APIs
+from lib.common_config import (
+ create_common_configurations,
+ InvalidCLIError,
+ generate_ips,
+ retry,
+ run_frr_cmd,
+ validate_ip_address,
+)
+from lib.topolog import logger
+from lib.topotest import frr_unicode
+
+################################
+# Configure procs
+################################
+
+
+def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True):
+ """
+ API to configure ospf on router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "ospf": {
+ "router_id": "22.22.22.22",
+ "area": [{ "id": "0.0.0.0", "type": "nssa"}]
+ }
+ }
+
+ result = create_router_ospf(tgen, topo, input_dict)
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: create_router_ospf()")
+ result = False
+
+ if topo is None:
+ topo = tgen.json_topo
+
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+
+ for ospf in ["ospf", "ospf6"]:
+ config_data_dict = {}
+
+ for router in input_dict.keys():
+ if ospf not in input_dict[router]:
+ logger.debug("Router %s: %s not present in input_dict", router, ospf)
+ continue
+
+ config_data = __create_ospf_global(
+ tgen, input_dict, router, build, load_config, ospf
+ )
+ if config_data:
+ if router not in config_data_dict:
+ config_data_dict[router] = config_data
+ else:
+ config_data_dict[router].extend(config_data)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, ospf, build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_router_ospf (ipv4)", exc_info=True)
+ result = False
+
+ logger.debug("Exiting lib API: create_router_ospf()")
+ return result
+
+
+def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
+ """
+ Helper API to create ospf global configuration.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+ * `ospf` : either 'ospf' or 'ospf6'
+
+ Usage
+ -----
+ input_dict = {
+ "routers": {
+ "r1": {
+ "links": {
+ "r3": {
+ "ipv6": "2013:13::1/64",
+ "ospf6": {
+ "hello_interval": 1,
+ "dead_interval": 4,
+ "network": "point-to-point"
+ }
+ }
+ },
+ "ospf6": {
+ "router_id": "1.1.1.1",
+ "neighbors": {
+ "r3": {
+ "area": "1.1.1.1"
+ }
+ }
+ }
+ }
+ }
+
+ Returns
+ -------
+ list of configuration commands
+ """
+
+ config_data = []
+
+ if ospf not in input_dict[router]:
+ return config_data
+
+ logger.debug("Entering lib API: __create_ospf_global()")
+
+ ospf_data = input_dict[router][ospf]
+ del_ospf_action = ospf_data.setdefault("delete", False)
+ if del_ospf_action:
+ config_data = ["no router {}".format(ospf)]
+ return config_data
+
+ cmd = "router {}".format(ospf)
+
+ config_data.append(cmd)
+
+ # router id
+ router_id = ospf_data.setdefault("router_id", None)
+ del_router_id = ospf_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no {} router-id".format(ospf))
+ if router_id:
+ config_data.append("{} router-id {}".format(ospf, router_id))
+
+ # log-adjacency-changes
+ log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
+ del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
+ if del_log_adj_changes:
+ config_data.append("no log-adjacency-changes detail")
+ if log_adj_changes:
+ config_data.append("log-adjacency-changes {}".format(log_adj_changes))
+
+ # aggregation timer
+ aggr_timer = ospf_data.setdefault("aggr_timer", None)
+ del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
+ if del_aggr_timer:
+ config_data.append("no aggregation timer")
+ if aggr_timer:
+ config_data.append("aggregation timer {}".format(aggr_timer))
+
+ # maximum path information
+ ecmp_data = ospf_data.setdefault("maximum-paths", {})
+ if ecmp_data:
+ cmd = "maximum-paths {}".format(ecmp_data)
+ del_action = ospf_data.setdefault("del_max_path", False)
+ if del_action:
+ cmd = "no maximum-paths"
+ config_data.append(cmd)
+
+ # Flood reduction.
+ flood_data = ospf_data.setdefault("flood-reduction", {})
+ if flood_data:
+ cmd = "flood-reduction"
+ del_action = ospf_data.setdefault("del_flood_reduction", False)
+ if del_action:
+ cmd = "no flood-reduction"
+ config_data.append(cmd)
+
+ # LSA refresh timer - A hidden command.
+ refresh_data = ospf_data.setdefault("lsa-refresh", {})
+ if refresh_data:
+ cmd = "ospf lsa-refresh {}".format(refresh_data)
+ del_action = ospf_data.setdefault("del_lsa_refresh", False)
+ if del_action:
+ cmd = "no ospf lsa-refresh"
+ config_data.append(cmd)
+
+ # redistribute command
+ redistribute_data = ospf_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.debug(
+ "Router %s: 'redist_type' not present in " "input_dict", router
+ )
+ else:
+ cmd = "redistribute {}".format(redistribute["redist_type"])
+ for red_type in redistribute_data:
+ if "route_map" in red_type:
+ cmd = cmd + " route-map {}".format(red_type["route_map"])
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # area information
+ area_data = ospf_data.setdefault("area", {})
+ if area_data:
+ for area in area_data:
+ if "id" not in area:
+ logger.debug(
+ "Router %s: 'area id' not present in " "input_dict", router
+ )
+ else:
+ cmd = "area {}".format(area["id"])
+
+ if "type" in area:
+ cmd = cmd + " {}".format(area["type"])
+
+ if "flood-reduction" in area:
+ cmd = cmd + " flood-reduction"
+
+ del_action = area.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # def route information
+ def_rte_data = ospf_data.setdefault("default-information", {})
+ if def_rte_data:
+ if "originate" not in def_rte_data:
+ logger.debug(
+ "Router %s: 'originate key' not present in " "input_dict", router
+ )
+ else:
+ cmd = "default-information originate"
+
+ if "always" in def_rte_data:
+ cmd = cmd + " always"
+
+ if "metric" in def_rte_data:
+ cmd = cmd + " metric {}".format(def_rte_data["metric"])
+
+ if "metric-type" in def_rte_data:
+ cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
+
+ if "route-map" in def_rte_data:
+ cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
+
+ del_action = def_rte_data.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # summary information
+ summary_data = ospf_data.setdefault("summary-address", {})
+ if summary_data:
+ for summary in summary_data:
+ if "prefix" not in summary:
+ logger.debug(
+ "Router %s: 'summary-address' not present in " "input_dict",
+ router,
+ )
+ else:
+ cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
+
+ _tag = summary.setdefault("tag", None)
+ if _tag:
+ cmd = "{} tag {}".format(cmd, _tag)
+
+ _advertise = summary.setdefault("advertise", True)
+ if not _advertise:
+ cmd = "{} no-advertise".format(cmd)
+
+ del_action = summary.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # ospf gr information
+ gr_data = ospf_data.setdefault("graceful-restart", {})
+ if gr_data:
+ if "opaque" in gr_data and gr_data["opaque"]:
+ cmd = "capability opaque"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "helper enable" in gr_data and not gr_data["helper enable"]:
+ cmd = "graceful-restart helper enable"
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list:
+ for rtrs in gr_data["helper enable"]:
+ cmd = "graceful-restart helper enable {}".format(rtrs)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "helper" in gr_data:
+ if type(gr_data["helper"]) is not list:
+ gr_data["helper"] = list(gr_data["helper"])
+ for helper_role in gr_data["helper"]:
+ cmd = "graceful-restart helper {}".format(helper_role)
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "supported-grace-time" in gr_data:
+ cmd = "graceful-restart helper supported-grace-time {}".format(
+ gr_data["supported-grace-time"]
+ )
+ if gr_data.setdefault("delete", False):
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ config_data.append("exit")
+ logger.debug("Exiting lib API: create_ospf_global()")
+
+ return config_data
+
+
+def config_ospf_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
+ """
+ API to configure ospf on router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+
+ Usage
+ -----
+ r1_ospf_auth = {
+ "r1": {
+ "links": {
+ "r2": {
+ "ospf": {
+ "authentication": "message-digest",
+ "authentication-key": "ospf",
+ "message-digest-key": "10"
+ }
+ }
+ }
+ }
+ }
+ result = config_ospf_interface(tgen, topo, r1_ospf_auth)
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Enter lib config_ospf_interface")
+ result = False
+
+ if topo is None:
+ topo = tgen.json_topo
+
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
+ for router in input_dict.keys():
+ config_data = []
+ for lnk in input_dict[router]["links"].keys():
+ if "ospf" not in input_dict[router]["links"][lnk]:
+ logger.debug(
+ "Router %s: ospf config is not present in" "input_dict", router
+ )
+ continue
+ ospf_data = input_dict[router]["links"][lnk]["ospf"]
+ data_ospf_area = ospf_data.setdefault("area", None)
+ data_ospf_auth = ospf_data.setdefault("authentication", None)
+ data_ospf_dr_priority = ospf_data.setdefault("priority", None)
+ data_ospf_cost = ospf_data.setdefault("cost", None)
+ data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
+
+ try:
+ intf = topo["routers"][router]["links"][lnk]["interface"]
+ except KeyError:
+ intf = topo["switches"][router]["links"][lnk]["interface"]
+
+ # interface
+ cmd = "interface {}".format(intf)
+
+ config_data.append(cmd)
+ # interface area config
+ if data_ospf_area:
+ cmd = "ip ospf area {}".format(data_ospf_area)
+ config_data.append(cmd)
+
+ # interface ospf auth
+ if data_ospf_auth:
+ if data_ospf_auth == "null":
+ cmd = "ip ospf authentication null"
+ elif data_ospf_auth == "message-digest":
+ cmd = "ip ospf authentication message-digest"
+ elif data_ospf_auth == "key-chain":
+ cmd = "ip ospf authentication key-chain {}".format(
+ ospf_data["keychain"]
+ )
+ else:
+ cmd = "ip ospf authentication"
+
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "message-digest-key" in ospf_data:
+ cmd = "ip ospf message-digest-key {} md5 {}".format(
+ ospf_data["message-digest-key"], ospf_data["authentication-key"]
+ )
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if (
+ "authentication-key" in ospf_data
+ and "message-digest-key" not in ospf_data
+ ):
+ cmd = "ip ospf authentication-key {}".format(
+ ospf_data["authentication-key"]
+ )
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf dr priority
+ if data_ospf_dr_priority:
+ cmd = "ip ospf priority {}".format(ospf_data["priority"])
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf cost
+ if data_ospf_cost:
+ cmd = "ip ospf cost {}".format(ospf_data["cost"])
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf mtu
+ if data_ospf_mtu:
+ cmd = "ip ospf mtu-ignore"
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if build:
+ return config_data
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
+ logger.debug("Exiting lib API: config_ospf_interface()")
+ return result
+
+
+def clear_ospf(tgen, router, ospf=None):
+ """
+ This API is to clear ospf neighborship by running
+ clear ip ospf interface * command,
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `router`: device under test
+
+ Usage
+ -----
+ clear_ospf(tgen, "r1")
+ """
+
+ logger.debug("Entering lib API: clear_ospf()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+ # Clearing OSPF
+ if ospf:
+ version = "ipv6"
+ else:
+ version = "ip"
+
+ cmd = "clear {} ospf interface".format(version)
+ logger.info("Clearing ospf process on router %s.. using command '%s'", router, cmd)
+ run_frr_cmd(rnode, cmd)
+
+ logger.debug("Exiting lib API: clear_ospf()")
+
+
+def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
+ """
+ Redstribution of routes inside ospf.
+
+ Parameters
+ ----------
+ * `tgen`: Topogen object
+ * `topo` : json file data
+ * `dut`: device under test
+ * `route_type`: "static" or "connected" or ....
+ * `kwargs`: pass extra information (see below)
+
+ Usage
+ -----
+ redistribute_ospf(tgen, topo, "r0", "static", delete=True)
+ redistribute_ospf(tgen, topo, "r0", "static", route_map="rmap_ipv4")
+ """
+
+ ospf_red = {dut: {"ospf": {"redistribute": [{"redist_type": route_type}]}}}
+ for k, v in kwargs.items():
+ ospf_red[dut]["ospf"]["redistribute"][0][k] = v
+
+ result = create_router_ospf(tgen, topo, ospf_red)
+ assert result is True, "Testcase : Failed \n Error: {}".format(result)
+
+
+################################
+# Verification procs
+################################
+@retry(retry_timeout=80)
+def verify_ospf_neighbor(
+ tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
+):
+ """
+ This API is to verify ospf neighborship by running
+ show ip ospf neighbour command,
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `lan` : verify neighbors in lan topology
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ 1. To check FULL neighbors.
+ verify_ospf_neighbor(tgen, topo, dut=dut)
+
+ 2. To check neighbors with their roles.
+ input_dict = {
+ "r0": {
+ "ospf": {
+ "neighbors": {
+ "r1": {
+ "nbrState": "Full",
+ "role": "DR"
+ },
+ "r2": {
+ "nbrState": "Full",
+ "role": "DROther"
+ },
+ "r3": {
+ "nbrState": "Full",
+ "role": "DROther"
+ }
+ }
+ }
+ }
+ }
+ result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+ logger.debug("Entering lib API: verify_ospf_neighbor()")
+ result = False
+ if topo is None:
+ topo = tgen.json_topo
+
+ if input_dict:
+ for router, rnode in tgen.routers().items():
+ if "ospf" not in topo["routers"][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf neighbor all json", isjson=True
+ )
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ ospf_data_list = input_dict[router]["ospf"]
+ ospf_nbr_list = ospf_data_list["neighbors"]
+
+ for ospf_nbr, nbr_data in ospf_nbr_list.items():
+ data_ip = topo["routers"][ospf_nbr]["links"]
+ data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
+ if ospf_nbr in data_ip:
+ nbr_details = nbr_data[ospf_nbr]
+ elif lan:
+ for switch in topo["switches"]:
+ if "ospf" in topo["switches"][switch]["links"][router]:
+ neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
+ else:
+ continue
+ else:
+ neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
+
+ nh_state = None
+ neighbor_ip = neighbor_ip.lower()
+ nbr_rid = data_rid
+ try:
+ nh_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[0]
+ intf_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[1]
+ except KeyError:
+ errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid)
+ return errormsg
+
+ nbr_state = nbr_data.setdefault("nbrState", None)
+ nbr_role = nbr_data.setdefault("role", None)
+
+ if nbr_state:
+ if nbr_state == nh_state:
+ logger.info(
+ "[DUT: {}] OSPF Nbr is {}:{} State {}".format(
+ router, ospf_nbr, nbr_rid, nh_state
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF is not Converged, neighbor"
+ " state is {}".format(router, nh_state)
+ )
+ return errormsg
+ if nbr_role:
+ if nbr_role == intf_state:
+ logger.info(
+ "[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
+ router, ospf_nbr, nbr_rid, nbr_role
+ )
+ )
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF is not Converged with rid"
+ "{}, role is {}".format(router, nbr_rid, intf_state)
+ )
+ return errormsg
+ continue
+ else:
+ for router, rnode in tgen.routers().items():
+ if "ospf" not in topo["routers"][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf neighbor all json", isjson=True
+ )
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ ospf_data_list = topo["routers"][router]["ospf"]
+ ospf_neighbors = ospf_data_list["neighbors"]
+ total_peer = 0
+ total_peer = len(ospf_neighbors.keys())
+ no_of_ospf_nbr = 0
+ ospf_nbr_list = ospf_data_list["neighbors"]
+ no_of_peer = 0
+ for ospf_nbr, nbr_data in ospf_nbr_list.items():
+ if nbr_data:
+ data_ip = topo["routers"][nbr_data["nbr"]]["links"]
+ data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"]
+ else:
+ data_ip = topo["routers"][ospf_nbr]["links"]
+ data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
+ logger.info("ospf neighbor %s: router-id: %s", router, data_rid)
+ if ospf_nbr in data_ip:
+ nbr_details = nbr_data[ospf_nbr]
+ elif lan:
+ for switch in topo["switches"]:
+ if "ospf" in topo["switches"][switch]["links"][router]:
+ neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
+ else:
+ continue
+ else:
+ neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
+
+ nh_state = None
+ neighbor_ip = neighbor_ip.lower()
+ nbr_rid = data_rid
+
+ try:
+ nh_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[0]
+ except KeyError:
+ errormsg = (
+ "[DUT: {}] missing OSPF neighbor {} with router-id {}".format(
+ router, ospf_nbr, nbr_rid
+ )
+ )
+ return errormsg
+
+ if nh_state == "Full":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("[DUT: {}] OSPF is Converged".format(router))
+ result = True
+ else:
+ errormsg = "[DUT: {}] OSPF is not Converged".format(router)
+ return errormsg
+
+ logger.debug("Exiting API: verify_ospf_neighbor()")
+ return result
+
+
+@retry(retry_timeout=50)
+def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
+ """
+ This API is to verify ospf neighborship by running
+ show ipv6 ospf neighbour command,
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `lan` : verify neighbors in lan topology
+
+ Usage
+ -----
+ 1. To check FULL neighbors.
+ verify_ospf_neighbor(tgen, topo, dut=dut)
+
+ 2. To check neighbors with their roles.
+ input_dict = {
+ "r0": {
+ "ospf6": {
+ "neighbors": {
+ "r1": {
+ "state": "Full",
+ "role": "DR"
+ },
+ "r2": {
+ "state": "Full",
+ "role": "DROther"
+ },
+ "r3": {
+ "state": "Full",
+ "role": "DROther"
+ }
+ }
+ }
+ }
+ }
+ result = verify_ospf6_neighbor(tgen, topo, dut, input_dict, lan=True)
+
+ 3. To check there are no neighbors.
+ input_dict = {
+ "r0": {
+ "ospf6": {
+ "neighbors": []
+ }
+ }
+ }
+ result = verify_ospf6_neighbor(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ if topo is None:
+ topo = tgen.json_topo
+
+ if input_dict:
+ for router, rnode in tgen.routers().items():
+ if "ospf6" not in topo["routers"][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf neighbor json", isjson=True
+ )
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF6 is not running"
+ return errormsg
+
+ ospf_data_list = input_dict[router]["ospf6"]
+ ospf_nbr_list = ospf_data_list["neighbors"]
+
+ # Check if looking for no neighbors
+ if ospf_nbr_list == []:
+ if show_ospf_json["neighbors"] == []:
+ logger.info("[DUT: {}] OSPF6 no neighbors found".format(router))
+ return True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF6 active neighbors found, expected None".format(
+ router
+ )
+ )
+ return errormsg
+
+ for ospf_nbr, nbr_data in ospf_nbr_list.items():
+ try:
+ data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
+ except KeyError:
+ data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
+ "router_id"
+ ]
+
+ if ospf_nbr in data_ip:
+ nbr_details = nbr_data[ospf_nbr]
+ elif lan:
+ for switch in topo["switches"]:
+ if "ospf6" in topo["switches"][switch]["links"][router]:
+ neighbor_ip = data_ip
+ else:
+ continue
+ else:
+ neighbor_ip = data_ip[router]["ipv6"].split("/")[0]
+
+ nh_state = None
+ neighbor_ip = neighbor_ip.lower()
+ nbr_rid = data_rid
+ get_index_val = dict(
+ (d["neighborId"], dict(d, index=index))
+ for (index, d) in enumerate(show_ospf_json["neighbors"])
+ )
+ try:
+ nh_state = get_index_val.get(neighbor_ip)["state"]
+ intf_state = get_index_val.get(neighbor_ip)["ifState"]
+ except TypeError:
+ errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
+ router, nbr_rid, ospf_nbr
+ )
+ return errormsg
+
+ nbr_state = nbr_data.setdefault("state", None)
+ nbr_role = nbr_data.setdefault("role", None)
+
+ if nbr_state:
+ if nbr_state == nh_state:
+ logger.info(
+ "[DUT: {}] OSPF6 Nbr is {}:{} State {}".format(
+ router, ospf_nbr, nbr_rid, nh_state
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF6 is not Converged, neighbor"
+ " state is {} , Expected state is {}".format(
+ router, nh_state, nbr_state
+ )
+ )
+ return errormsg
+ if nbr_role:
+ if nbr_role == intf_state:
+ logger.info(
+ "[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format(
+ router, ospf_nbr, nbr_rid, nbr_role
+ )
+ )
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF6 is not Converged with rid"
+ "{}, role is {}, Expected role is {}".format(
+ router, nbr_rid, intf_state, nbr_role
+ )
+ )
+ return errormsg
+ continue
+ else:
+ for router, rnode in tgen.routers().items():
+ if "ospf6" not in topo["routers"][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF6 neighborship on router %s:", router)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf neighbor json", isjson=True
+ )
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF6 is not running"
+ return errormsg
+
+ ospf_data_list = topo["routers"][router]["ospf6"]
+ ospf_neighbors = ospf_data_list["neighbors"]
+ total_peer = 0
+ total_peer = len(ospf_neighbors.keys())
+ no_of_ospf_nbr = 0
+ ospf_nbr_list = ospf_data_list["neighbors"]
+ no_of_peer = 0
+ for ospf_nbr, nbr_data in ospf_nbr_list.items():
+ try:
+ data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
+ except KeyError:
+ data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
+ "router_id"
+ ]
+ logger.info("ospf neighbor %s: router-id: %s", ospf_nbr, data_rid)
+ if ospf_nbr in data_ip:
+ nbr_details = nbr_data[ospf_nbr]
+ elif lan:
+ for switch in topo["switches"]:
+ if "ospf6" in topo["switches"][switch]["links"][router]:
+ neighbor_ip = data_ip
+ else:
+ continue
+ else:
+ neighbor_ip = data_ip
+
+ nh_state = None
+ neighbor_ip = neighbor_ip.lower()
+ nbr_rid = data_rid
+ get_index_val = dict(
+ (d["neighborId"], dict(d, index=index))
+ for (index, d) in enumerate(show_ospf_json["neighbors"])
+ )
+ try:
+ nh_state = get_index_val.get(neighbor_ip)["state"]
+ intf_state = get_index_val.get(neighbor_ip)["ifState"]
+ except TypeError:
+ errormsg = (
+ "[DUT: {}] missing OSPF neighbor {} with router-id {}".format(
+ router, ospf_nbr, nbr_rid
+ )
+ )
+ return errormsg
+
+ if nh_state == "Full":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("[DUT: {}] OSPF6 is Converged".format(router))
+ result = True
+ else:
+ errormsg = "[DUT: {}] OSPF6 is not Converged".format(router)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=40)
+def verify_ospf_rib(
+ tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None, expected=True
+):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf route command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `next_hop` : next to be verified
+ * `tag` : tag to be verified
+ * `metric` : metric to be verified
+ * `fib` : True if the route is installed in FIB.
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": ip_net,
+ "no_of_ip": 1,
+ "routeType": "N"
+ }
+ ]
+ }
+ }
+
+ result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.info("Entering lib API: verify_ospf_rib()")
+ result = False
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ found_hops = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.items():
+ if router != dut:
+ continue
+
+ logger.info("Checking router %s RIB:", router)
+
+ # Verifying RIB routes
+ command = "show ip ospf route"
+
+ found_routes = []
+ missing_routes = []
+
+ if (
+ "static_routes" in input_dict[routerInput]
+ or "prefix" in input_dict[routerInput]
+ ):
+ if "prefix" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["prefix"]
+ else:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ cmd = "{}".format(command)
+
+ cmd = "{} json".format(cmd)
+
+ ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary ospf_rib_json is not empty
+ if bool(ospf_rib_json) is False:
+ errormsg = (
+ "[DUT: {}] No routes found in OSPF route "
+ "table".format(router)
+ )
+ return errormsg
+
+ network = static_route["network"]
+ no_of_ip = static_route.setdefault("no_of_ip", 1)
+ _tag = static_route.setdefault("tag", None)
+ _rtype = static_route.setdefault("routeType", None)
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ st_found = False
+ nh_found = False
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != "ipv4":
+ continue
+
+ if st_rt in ospf_rib_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if fib and next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ for mnh in range(0, len(ospf_rib_json[st_rt])):
+ if (
+ "fib"
+ in ospf_rib_json[st_rt][mnh]["nexthops"][0]
+ ):
+ found_hops.append(
+ [
+ rib_r["ip"]
+ for rib_r in ospf_rib_json[st_rt][mnh][
+ "nexthops"
+ ]
+ ]
+ )
+
+ if found_hops[0]:
+ missing_list_of_nexthops = set(
+ found_hops[0]
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops[0])
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Nexthop "
+ "%s is not active for route %s in "
+ "RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is not active"
+ " for route {} in RIB of router"
+ " {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+
+ elif next_hop and fib is None:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in ospf_rib_json[st_rt]["nexthops"]
+ ]
+
+ if found_hops:
+ missing_list_of_nexthops = set(
+ found_hops
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops)
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if _rtype:
+ if "routeType" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: routeType missing"
+ " for route {} in OSPF RIB \n".format(
+ dut, st_rt
+ )
+ )
+ return errormsg
+ elif _rtype != ospf_rib_json[st_rt]["routeType"]:
+ errormsg = (
+ "[DUT: {}]: routeType mismatch"
+ " for route {} in OSPF RIB \n".format(
+ dut, st_rt
+ )
+ )
+ return errormsg
+ else:
+ logger.info(
+ "[DUT: {}]: Found routeType {}"
+ " for route {}".format(dut, _rtype, st_rt)
+ )
+ if tag:
+ if "tag" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if _tag != ospf_rib_json[st_rt]["tag"]:
+ errormsg = (
+ "[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".format(
+ dut,
+ _tag,
+ st_rt,
+ )
+ )
+ return errormsg
+
+ if metric is not None:
+ if "type2cost" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if metric != ospf_rib_json[st_rt]["type2cost"]:
+ errormsg = (
+ "[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".format(
+ dut,
+ metric,
+ st_rt,
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "[DUT: {}]: Found next_hop {} for all OSPF"
+ " routes in RIB".format(router, next_hop)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
+ result = True
+
+ logger.info("Exiting lib API: verify_ospf_rib()")
+ return result
+
+
+@retry(retry_timeout=20)
+def verify_ospf_interface(
+ tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
+):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf interface command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * `dut`: device under test
+ * `lan`: if set to true this interface belongs to LAN.
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict= {
+ 'r0': {
+ 'links':{
+ 's1': {
+ 'ospf':{
+ 'priority':98,
+ 'timerDeadSecs': 4,
+ 'area': '0.0.0.3',
+ 'mcastMemberOspfDesignatedRouters': True,
+ 'mcastMemberOspfAllRouters': True,
+ 'ospfEnabled': True,
+
+ }
+ }
+ }
+ }
+ }
+ result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: verify_ospf_interface()")
+ result = False
+ if topo is None:
+ topo = tgen.json_topo
+
+ for router, rnode in tgen.routers().items():
+ if "ospf" not in topo["routers"][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF interface on router %s:", router)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True)
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ ospf_intf_data = input_dict[router]["links"]
+ for ospf_intf, intf_data in ospf_intf_data.items():
+ intf = topo["routers"][router]["links"][ospf_intf]["interface"]
+ if intf in show_ospf_json["interfaces"]:
+ for intf_attribute in intf_data["ospf"]:
+ if (
+ intf_data["ospf"][intf_attribute]
+ == show_ospf_json["interfaces"][intf][intf_attribute]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF interface %s: %s is %s",
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf"][intf_attribute],
+ )
+ else:
+ errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \
+ Expected is {}".format(
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf"][intf_attribute],
+ show_ospf_json["interfaces"][intf][intf_attribute],
+ )
+ return errormsg
+ result = True
+ logger.debug("Exiting API: verify_ospf_interface()")
+ return result
+
+
+@retry(retry_timeout=40)
+def verify_ospf_database(
+ tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None, expected=True
+):
+ """
+ This API is to verify ospf lsa's by running
+ show ip ospf database command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `topo` : next to be verified
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict = {
+ "areas": {
+ "0.0.0.0": {
+ "Router Link States": {
+ "100.1.1.0-100.1.1.0": {
+ "LSID": "100.1.1.0",
+ "Advertised router": "100.1.1.0",
+ "LSA Age": 130,
+ "Sequence Number": "80000006",
+ "Checksum": "a703",
+ "Router links": 3
+ }
+ },
+ "Net Link States": {
+ "10.0.0.2-100.1.1.1": {
+ "LSID": "10.0.0.2",
+ "Advertised router": "100.1.1.1",
+ "LSA Age": 137,
+ "Sequence Number": "80000001",
+ "Checksum": "9583"
+ }
+ },
+ },
+ }
+ }
+ result = verify_ospf_database(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ result = False
+ router = dut
+ logger.debug("Entering lib API: verify_ospf_database()")
+
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("Verifying OSPF interface on router %s:", dut)
+
+ if not rid:
+ rid = "self-originate"
+ if lsatype:
+ if vrf is None:
+ command = "show ip ospf database {} {} json".format(lsatype, rid)
+ else:
+ command = "show ip ospf database {} {} vrf {} json".format(
+ lsatype, rid, vrf
+ )
+ else:
+ if vrf is None:
+ command = "show ip ospf database json"
+ else:
+ command = "show ip ospf database vrf {} json".format(vrf)
+
+ show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # for inter and inter lsa's
+ ospf_db_data = input_dict.setdefault("areas", None)
+ ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
+ # import pdb; pdb.set_trace()
+ if ospf_db_data:
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if ospf_area in show_ospf_json["routerLinkStates"]["areas"]:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ _advrtr = lsa.setdefault("advertisedRouter", None)
+ _options = lsa.setdefault("options", None)
+
+ if (
+ _options
+ and lsa["lsaId"]
+ == show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["linkStateId"]
+ and lsa["options"]
+ == show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["options"]
+ ):
+ result = True
+ break
+ else:
+ errormsg = '[DUT: {}] OSPF LSA options: expected {}, Received Options are {} lsa["options"] {} OSPF LSAID: expected lsaid {}, Received lsaid {}'.format(
+ dut,
+ show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["options"],
+ _options,
+ lsa["options"],
+ show_ospf_json["routerLinkStates"]["areas"][ospf_area][
+ 0
+ ]["linkStateId"],
+ lsa["lsaId"],
+ )
+ return errormsg
+ if "Net Link States" in area_lsa:
+ for lsa in area_lsa["Net Link States"]:
+ if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+ if "Summary Link States" in area_lsa:
+ for lsa in area_lsa["Summary Link States"]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area]["Summary Link States"]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+ if "ASBR-Summary Link States" in area_lsa:
+ for lsa in area_lsa["ASBR-Summary Link States"]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "ASBR-Summary Link States"
+ ]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+ if ospf_external_lsa:
+ for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
+ if ospf_ext_lsa in show_ospf_json["AS External Link States"]:
+ logger.info(
+ "[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB : expected"
+ " External LSA is {}".format(router, ospf_ext_lsa)
+ )
+ return errormsg
+
+ logger.debug("Exiting API: verify_ospf_database()")
+ return result
+
+
+@retry(retry_timeout=20)
+def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf interface command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+
+ Usage
+ -----
+ input_dict = {
+ "11.0.0.0/8": {
+ "summaryAddress": "11.0.0.0/8",
+ "metricType": "E2",
+ "metric": 20,
+ "tag": 0,
+ "externalRouteCount": 5
+ }
+ }
+ result = verify_ospf_summary(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ router = dut
+
+ logger.info("Verifying OSPF summary on router %s:", router)
+
+ rnode = tgen.routers()[dut]
+
+ if ospf:
+ if "ospf6" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
+ return errormsg
+
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf summary detail json", isjson=True
+ )
+ else:
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
+ return errormsg
+
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf summary detail json", isjson=True
+ )
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ ospf_summary_data = input_dict
+
+ if ospf:
+ show_ospf_json = show_ospf_json["default"]
+
+ for ospf_summ, summ_data in ospf_summary_data.items():
+ if ospf_summ not in show_ospf_json:
+ continue
+ summary = ospf_summary_data[ospf_summ]["summaryAddress"]
+
+ if summary in show_ospf_json:
+ for summ in summ_data:
+ if summ_data[summ] == show_ospf_json[summary][summ]:
+ logger.info(
+ "[DUT: %s] OSPF summary %s:%s is %s",
+ router,
+ summary,
+ summ,
+ summ_data[summ],
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF summary {} : {} is {}, "
+ "Expected is {}".format(
+ router,
+ summary,
+ summ,
+ show_ospf_json[summary][summ],
+ summ_data[summ],
+ )
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=30)
+def verify_ospf6_rib(
+ tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
+):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf route command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `next_hop` : next to be verified
+ * `tag` : tag to be verified
+ * `metric` : metric to be verified
+ * `fib` : True if the route is installed in FIB.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": ip_net,
+ "no_of_ip": 1,
+ "routeType": "N"
+ }
+ ]
+ }
+ }
+
+ result = verify_ospf6_rib(tgen, dut, input_dict,next_hop=nh)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ found_hops = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.items():
+ if router != dut:
+ continue
+
+ logger.info("Checking router %s RIB:", router)
+
+ # Verifying RIB routes
+ command = "show ipv6 ospf route detail"
+
+ found_routes = []
+ missing_routes = []
+
+ if (
+ "static_routes" in input_dict[routerInput]
+ or "prefix" in input_dict[routerInput]
+ ):
+ if "prefix" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["prefix"]
+ else:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ cmd = "{}".format(command)
+
+ cmd = "{} json".format(cmd)
+
+ ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Fix for PR 2644182
+ try:
+ ospf_rib_json = ospf_rib_json["routes"]
+ except KeyError:
+ pass
+
+ # Verifying output dictionary ospf_rib_json is not empty
+ if bool(ospf_rib_json) is False:
+ errormsg = (
+ "[DUT: {}] No routes found in OSPF6 route "
+ "table".format(router)
+ )
+ return errormsg
+
+ network = static_route["network"]
+ no_of_ip = static_route.setdefault("no_of_ip", 1)
+ _tag = static_route.setdefault("tag", None)
+ _rtype = static_route.setdefault("routeType", None)
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ if len(ip_list) == 1:
+ ip_list = [network]
+ st_found = False
+ nh_found = False
+ for st_rt in ip_list:
+ st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != "ipv6":
+ continue
+
+ if st_rt in ospf_rib_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if fib and next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ for mnh in range(0, len(ospf_rib_json[st_rt])):
+ if (
+ "fib"
+ in ospf_rib_json[st_rt][mnh]["nextHops"][0]
+ ):
+ found_hops.append(
+ [
+ rib_r["ip"]
+ for rib_r in ospf_rib_json[st_rt][mnh][
+ "nextHops"
+ ]
+ ]
+ )
+
+ if found_hops[0]:
+ missing_list_of_nexthops = set(
+ found_hops[0]
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops[0])
+
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Nexthop "
+ "%s is not active for route %s in "
+ "RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is not active"
+ " for route {} in RIB of router"
+ " {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+
+ elif next_hop and fib is None:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+ found_hops = [
+ rib_r["nextHop"]
+ for rib_r in ospf_rib_json[st_rt]["nextHops"]
+ ]
+
+ if found_hops:
+ missing_list_of_nexthops = set(
+ found_hops
+ ).difference(next_hop)
+ additional_nexthops_in_required_nhs = set(
+ next_hop
+ ).difference(found_hops)
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if _rtype:
+ if "destinationType" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: destinationType missing"
+ "for route {} in OSPF RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+ elif _rtype != ospf_rib_json[st_rt]["destinationType"]:
+ errormsg = (
+ "[DUT: {}]: destinationType mismatch"
+ "for route {} in OSPF RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+ else:
+ logger.info(
+ "DUT: {}]: Found destinationType {}"
+ "for route {}".format(dut, _rtype, st_rt)
+ )
+ if tag:
+ if "tag" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: tag is not"
+ " present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if _tag != ospf_rib_json[st_rt]["tag"]:
+ errormsg = (
+ "[DUT: {}]: tag value {}"
+ " is not matched for"
+ " route {} in RIB \n".format(
+ dut,
+ _tag,
+ st_rt,
+ )
+ )
+ return errormsg
+
+ if metric is not None:
+ if "metricCostE2" not in ospf_rib_json[st_rt]:
+ errormsg = (
+ "[DUT: {}]: metric is"
+ " not present for"
+ " route {} in RIB \n".format(dut, st_rt)
+ )
+ return errormsg
+
+ if metric != ospf_rib_json[st_rt]["metricCostE2"]:
+ errormsg = (
+ "[DUT: {}]: metric value "
+ "{} is not matched for "
+ "route {} in RIB \n".format(
+ dut,
+ metric,
+ st_rt,
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "[DUT: {}]: Found next_hop {} for all OSPF"
+ " routes in RIB".format(router, next_hop)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
+ dut, missing_routes
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
+ dut,
+ found_routes,
+ )
+ result = True
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=6)
+def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
+ """
+ This API is to verify ospf routes by running
+ show ip ospf interface command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * `dut`: device under test
+ * `lan`: if set to true this interface belongs to LAN.
+ * `input_dict` : Input dict data, required when configuring from testcase
+
+ Usage
+ -----
+ input_dict= {
+ 'r0': {
+ 'links':{
+ 's1': {
+ 'ospf6':{
+ 'priority':98,
+ 'timerDeadSecs': 4,
+ 'area': '0.0.0.3',
+ 'mcastMemberOspfDesignatedRouters': True,
+ 'mcastMemberOspfAllRouters': True,
+ 'ospfEnabled': True,
+
+ }
+ }
+ }
+ }
+ }
+ result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ logger.debug("Entering lib API: verify_ospf6_interface")
+ result = False
+
+ if topo is None:
+ topo = tgen.json_topo
+
+ for router, rnode in tgen.routers().items():
+ if "ospf6" not in topo["routers"][router]:
+ continue
+
+ if dut is not None and dut != router:
+ continue
+
+ logger.info("Verifying OSPF interface on router %s:", router)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ipv6 ospf interface json", isjson=True
+ )
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF6 is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ ospf_intf_data = input_dict[router]["links"]
+ for ospf_intf, intf_data in ospf_intf_data.items():
+ intf = topo["routers"][router]["links"][ospf_intf]["interface"]
+ if intf in show_ospf_json:
+ for intf_attribute in intf_data["ospf6"]:
+ if intf_data["ospf6"][intf_attribute] is not list:
+ if (
+ intf_data["ospf6"][intf_attribute]
+ == show_ospf_json[intf][intf_attribute]
+ ):
+ logger.info(
+ "[DUT: %s] OSPF6 interface %s: %s is %s",
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf6"][intf_attribute],
+ )
+ elif intf_data["ospf6"][intf_attribute] is list:
+ for addr_list in len(show_ospf_json[intf][intf_attribute]):
+ if (
+ show_ospf_json[intf][intf_attribute][addr_list][
+ "address"
+ ].split("/")[0]
+ == intf_data["ospf6"]["internetAddress"][0]["address"]
+ ):
+ break
+ else:
+ errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
+ Expected is {}".format(
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf6"][intf_attribute],
+ intf_data["ospf6"][intf_attribute],
+ )
+ return errormsg
+ else:
+ errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
+ Expected is {}".format(
+ router,
+ intf,
+ intf_attribute,
+ intf_data["ospf6"][intf_attribute],
+ intf_data["ospf6"][intf_attribute],
+ )
+ return errormsg
+ result = True
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=20)
+def verify_ospf6_database(tgen, topo, dut, input_dict):
+ """
+ This API is to verify ospf lsa's by running
+ show ip ospf database command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `topo` : next to be verified
+
+ Usage
+ -----
+ input_dict = {
+ "areas": {
+ "0.0.0.0": {
+ "routerLinkStates": {
+ "100.1.1.0-100.1.1.0": {
+ "LSID": "100.1.1.0",
+ "Advertised router": "100.1.1.0",
+ "LSA Age": 130,
+ "Sequence Number": "80000006",
+ "Checksum": "a703",
+ "Router links": 3
+ }
+ },
+ "networkLinkStates": {
+ "10.0.0.2-100.1.1.1": {
+ "LSID": "10.0.0.2",
+ "Advertised router": "100.1.1.1",
+ "LSA Age": 137,
+ "Sequence Number": "80000001",
+ "Checksum": "9583"
+ }
+ },
+ },
+ }
+ }
+ result = verify_ospf_database(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ result = False
+ router = dut
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("Verifying OSPF interface on router %s:", dut)
+ show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # for inter and inter lsa's
+ ospf_db_data = input_dict.setdefault("areas", None)
+ ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
+
+ if ospf_db_data:
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if ospf_area in show_ospf_json["areas"]:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ for rtrlsa in show_ospf_json["areas"][ospf_area][
+ "routerLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == rtrlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == rtrlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Router LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "networkLinkStates" in area_lsa:
+ for lsa in area_lsa["networkLinkStates"]:
+ for netlsa in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]
+ ):
+ if (
+ lsa["lsaId"] == netlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == netlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "summaryLinkStates" in area_lsa:
+ for lsa in area_lsa["summaryLinkStates"]:
+ for t3lsa in show_ospf_json["areas"][ospf_area][
+ "summaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t3lsa["lsaId"]
+ and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "nssaExternalLinkStates" in area_lsa:
+ for lsa in area_lsa["nssaExternalLinkStates"]:
+ for t7lsa in show_ospf_json["areas"][ospf_area][
+ "nssaExternalLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t7lsa["lsaId"]
+ and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Type7 LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "asbrSummaryLinkStates" in area_lsa:
+ for lsa in area_lsa["asbrSummaryLinkStates"]:
+ for t4lsa in show_ospf_json["areas"][ospf_area][
+ "asbrSummaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t4lsa["lsaId"]
+ and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "linkLocalOpaqueLsa" in area_lsa:
+ for lsa in area_lsa["linkLocalOpaqueLsa"]:
+ try:
+ for lnklsa in show_ospf_json["areas"][ospf_area][
+ "linkLocalOpaqueLsa"
+ ]:
+ if (
+ lsa["lsaId"] in lnklsa["lsaId"]
+ and "linkLocalOpaqueLsa"
+ in show_ospf_json["areas"][ospf_area]
+ ):
+ logger.info(
+ (
+ "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
+ "%s",
+ ospf_area,
+ lsa,
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF LSDB area: {} "
+ "expected Opaque-LSA is {}, Found is {}".format(
+ ospf_area, lsa, show_ospf_json
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+ except KeyError:
+ errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
+ return errormsg
+
+ if ospf_external_lsa:
+ for lsa in ospf_external_lsa:
+ try:
+ for t5lsa in show_ospf_json["asExternalLinkStates"]:
+ if (
+ lsa["lsaId"] == t5lsa["lsaId"]
+ and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ except KeyError:
+ result = False
+ if result:
+ logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB : expected"
+ " External LSA is {}".format(router, lsa)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def config_ospf6_interface(
+ tgen, topo=None, input_dict=None, build=False, load_config=True
+):
+ """
+ API to configure ospf on router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+ * `load_config` : Loading the config to router this is set as True.
+
+ Usage
+ -----
+ r1_ospf_auth = {
+ "r1": {
+ "links": {
+ "r2": {
+ "ospf": {
+ "authentication": 'message-digest',
+ "authentication-key": "ospf",
+ "message-digest-key": "10"
+ }
+ }
+ }
+ }
+ }
+ result = config_ospf6_interface(tgen, topo, r1_ospf_auth)
+
+ Returns
+ -------
+ True or False
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ if topo is None:
+ topo = tgen.json_topo
+
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
+ for router in input_dict.keys():
+ config_data = []
+ for lnk in input_dict[router]["links"].keys():
+ if "ospf6" not in input_dict[router]["links"][lnk]:
+ logger.debug(
+ "Router %s: ospf6 config is not present in"
+ "input_dict, passed input_dict %s",
+ router,
+ str(input_dict),
+ )
+ continue
+ ospf_data = input_dict[router]["links"][lnk]["ospf6"]
+ data_ospf_area = ospf_data.setdefault("area", None)
+ data_ospf_auth = ospf_data.setdefault("hash-algo", None)
+ data_ospf_keychain = ospf_data.setdefault("keychain", None)
+ data_ospf_dr_priority = ospf_data.setdefault("priority", None)
+ data_ospf_cost = ospf_data.setdefault("cost", None)
+ data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
+
+ try:
+ intf = topo["routers"][router]["links"][lnk]["interface"]
+ except KeyError:
+ intf = topo["switches"][router]["links"][lnk]["interface"]
+
+ # interface
+ cmd = "interface {}".format(intf)
+
+ config_data.append(cmd)
+ # interface area config
+ if data_ospf_area:
+ cmd = "ipv6 ospf area {}".format(data_ospf_area)
+ config_data.append(cmd)
+
+ # interface ospf auth
+ if data_ospf_auth:
+ cmd = "ipv6 ospf6 authentication"
+
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+
+ if "hash-algo" in ospf_data:
+ cmd = "{} key-id {} hash-algo {} key {}".format(
+ cmd,
+ ospf_data["key-id"],
+ ospf_data["hash-algo"],
+ ospf_data["key"],
+ )
+ config_data.append(cmd)
+
+ # interface ospf auth with keychain
+ if data_ospf_keychain:
+ cmd = "ipv6 ospf6 authentication"
+
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+
+ if "keychain" in ospf_data:
+ cmd = "{} keychain {}".format(cmd, ospf_data["keychain"])
+ config_data.append(cmd)
+
+ # interface ospf dr priority
+ if data_ospf_dr_priority:
+ cmd = "ipv6 ospf priority {}".format(ospf_data["priority"])
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf cost
+ if data_ospf_cost:
+ cmd = "ipv6 ospf cost {}".format(ospf_data["cost"])
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # interface ospf mtu
+ if data_ospf_mtu:
+ cmd = "ipv6 ospf mtu-ignore"
+ if "del_action" in ospf_data:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if build:
+ return config_data
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=20)
+def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
+ """
+ This API is used to vreify gr helper using command
+ show ip ospf graceful-restart helper
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : topology descriptions
+ * 'dut' : router
+ * 'input_dict' - values to be verified
+
+ Usage:
+ -------
+ input_dict = {
+ "helperSupport":"Disabled",
+ "strictLsaCheck":"Enabled",
+ "restartSupport":"Planned and Unplanned Restarts",
+ "supportedGracePeriod":1800
+ }
+ result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
+
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+ logger.info("Verifying OSPF GR details on router %s:", dut)
+ show_ospf_json = run_frr_cmd(
+ rnode, "show ip ospf graceful-restart helper json", isjson=True
+ )
+
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ raise ValueError(errormsg)
+ return errormsg
+
+ for ospf_gr, gr_data in input_dict.items():
+ try:
+ if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
+ logger.info(
+ "[DUT: FRR] OSPF GR Helper: %s is %s",
+ ospf_gr,
+ show_ospf_json[ospf_gr],
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
+ "is {}".format(
+ ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+
+ except KeyError:
+ errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def get_ospf_database(tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None):
+ """
+ This API is to return ospf lsa's by running
+ show ip ospf database command.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `dut`: device under test
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `topo` : next to be verified
+ * `vrf` : vrf to be checked
+ * `lsatype` : type of lsa to be checked
+ * `rid` : router id for lsa to be checked
+ Usage
+ -----
+ input_dict = {
+ "areas": {
+ "0.0.0.0": {
+ "routerLinkStates": {
+ "100.1.1.0-100.1.1.0": {
+ "LSID": "100.1.1.0",
+ "Advertised router": "100.1.1.0",
+ "LSA Age": 130,
+ "Sequence Number": "80000006",
+ "Checksum": "a703",
+ "Router links": 3
+ }
+ },
+ "networkLinkStates": {
+ "10.0.0.2-100.1.1.1": {
+ "LSID": "10.0.0.2",
+ "Advertised router": "100.1.1.1",
+ "LSA Age": 137,
+ "Sequence Number": "80000001",
+ "Checksum": "9583"
+ }
+ },
+ },
+ }
+ }
+ result = get_ospf_database(tgen, topo, dut, input_dict)
+
+ Returns
+ -------
+ True or False (Error Message)
+ """
+
+ result = False
+ router = dut
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ sleep(10)
+ if "ospf" not in topo["routers"][dut]:
+ errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("Verifying OSPF interface on router %s:", dut)
+ if not rid:
+ rid = "self-originate"
+ if lsatype:
+ if vrf is None:
+ command = "show ip ospf database {} {} json".format(lsatype, rid)
+ else:
+ command = "show ip ospf database {} {} vrf {} json".format(
+ lsatype, rid, vrf
+ )
+ else:
+ if vrf is None:
+ command = "show ip ospf database json"
+ else:
+ command = "show ip ospf database vrf {} json".format(vrf)
+
+ show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
+ # Verifying output dictionary show_ospf_json is empty or not
+ if not bool(show_ospf_json):
+ errormsg = "OSPF is not running"
+ return errormsg
+
+ # for inter and inter lsa's
+ ospf_db_data = input_dict.setdefault("areas", None)
+ ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
+
+ if ospf_db_data:
+ for ospf_area, area_lsa in ospf_db_data.items():
+ if "areas" in show_ospf_json and ospf_area in show_ospf_json["areas"]:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ for rtrlsa in show_ospf_json["areas"][ospf_area][
+ "routerLinkStates"
+ ]:
+ _advrtr = lsa.setdefault("advertisedRouter", None)
+ _options = lsa.setdefault("options", None)
+ if (
+ _advrtr
+ and lsa["lsaId"] == rtrlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == rtrlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if (
+ _options
+ and lsa["lsaId"] == rtrlsa["lsaId"]
+ and lsa["options"] == rtrlsa["options"]
+ ):
+ result = True
+ break
+
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Router LSA is {}\n found Router LSA: {}".format(
+ router, ospf_area, lsa, rtrlsa
+ )
+ )
+ return errormsg
+
+ if "networkLinkStates" in area_lsa:
+ for lsa in area_lsa["networkLinkStates"]:
+ for netlsa in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]
+ ):
+ if (
+ lsa["lsaId"] == netlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == netlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "summaryLinkStates" in area_lsa:
+ for lsa in area_lsa["summaryLinkStates"]:
+ for t3lsa in show_ospf_json["areas"][ospf_area][
+ "summaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t3lsa["lsaId"]
+ and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "nssaExternalLinkStates" in area_lsa:
+ for lsa in area_lsa["nssaExternalLinkStates"]:
+ for t7lsa in show_ospf_json["areas"][ospf_area][
+ "nssaExternalLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t7lsa["lsaId"]
+ and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Type7 LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "asbrSummaryLinkStates" in area_lsa:
+ for lsa in area_lsa["asbrSummaryLinkStates"]:
+ for t4lsa in show_ospf_json["areas"][ospf_area][
+ "asbrSummaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t4lsa["lsaId"]
+ and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "linkLocalOpaqueLsa" in area_lsa:
+ for lsa in area_lsa["linkLocalOpaqueLsa"]:
+ try:
+ for lnklsa in show_ospf_json["areas"][ospf_area][
+ "linkLocalOpaqueLsa"
+ ]:
+ if (
+ lsa["lsaId"] in lnklsa["lsaId"]
+ and "linkLocalOpaqueLsa"
+ in show_ospf_json["areas"][ospf_area]
+ ):
+ logger.info(
+ (
+ "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
+ "%s",
+ ospf_area,
+ lsa,
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF LSDB area: {} "
+ "expected Opaque-LSA is {}, Found is {}".format(
+ ospf_area, lsa, show_ospf_json
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+ except KeyError:
+ errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
+ return errormsg
+ else:
+ if "routerLinkStates" in area_lsa:
+ for lsa in area_lsa["routerLinkStates"]:
+ for rtrlsa in show_ospf_json["routerLinkStates"]:
+ _advrtr = lsa.setdefault("advertisedRouter", None)
+ _options = lsa.setdefault("options", None)
+ _age = lsa.setdefault("lsaAge", None)
+ if (
+ _options
+ and lsa["options"]
+ == show_ospf_json["routerLinkStates"][rtrlsa][
+ ospf_area
+ ][0]["options"]
+ ):
+ result = True
+ break
+ if (
+ _age != "get"
+ and lsa["lsaAge"]
+ == show_ospf_json["routerLinkStates"][rtrlsa][
+ ospf_area
+ ][0]["lsaAge"]
+ ):
+ result = True
+ break
+
+ if _age == "get":
+ return "{}".format(
+ show_ospf_json["routerLinkStates"][rtrlsa][
+ ospf_area
+ ][0]["lsaAge"]
+ )
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Router LSA is {}\n found Router LSA: {}".format(
+ router,
+ ospf_area,
+ lsa,
+ show_ospf_json["routerLinkStates"],
+ )
+ )
+ return errormsg
+
+ if "networkLinkStates" in area_lsa:
+ for lsa in area_lsa["networkLinkStates"]:
+ for netlsa in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]:
+ if (
+ lsa
+ in show_ospf_json["areas"][ospf_area][
+ "networkLinkStates"
+ ]
+ ):
+ if (
+ lsa["lsaId"] == netlsa["lsaId"]
+ and lsa["advertisedRouter"]
+ == netlsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Network LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "summaryLinkStates" in area_lsa:
+ for lsa in area_lsa["summaryLinkStates"]:
+ for t3lsa in show_ospf_json["areas"][ospf_area][
+ "summaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t3lsa["lsaId"]
+ and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "nssaExternalLinkStates" in area_lsa:
+ for lsa in area_lsa["nssaExternalLinkStates"]:
+ for t7lsa in show_ospf_json["areas"][ospf_area][
+ "nssaExternalLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t7lsa["lsaId"]
+ and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ break
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " Type7 LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "asbrSummaryLinkStates" in area_lsa:
+ for lsa in area_lsa["asbrSummaryLinkStates"]:
+ for t4lsa in show_ospf_json["areas"][ospf_area][
+ "asbrSummaryLinkStates"
+ ]:
+ if (
+ lsa["lsaId"] == t4lsa["lsaId"]
+ and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ if result:
+ logger.info(
+ "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
+ router,
+ ospf_area,
+ lsa,
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB area {}: expected"
+ " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
+ )
+ return errormsg
+
+ if "linkLocalOpaqueLsa" in area_lsa:
+ for lsa in area_lsa["linkLocalOpaqueLsa"]:
+ try:
+ for lnklsa in show_ospf_json["areas"][ospf_area][
+ "linkLocalOpaqueLsa"
+ ]:
+ if (
+ lsa["lsaId"] in lnklsa["lsaId"]
+ and "linkLocalOpaqueLsa"
+ in show_ospf_json["areas"][ospf_area]
+ ):
+ logger.info(
+ (
+ "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
+ "%s",
+ ospf_area,
+ lsa,
+ )
+ )
+ result = True
+ else:
+ errormsg = (
+ "[DUT: FRR] OSPF LSDB area: {} "
+ "expected Opaque-LSA is {}, Found is {}".format(
+ ospf_area, lsa, show_ospf_json
+ )
+ )
+ raise ValueError(errormsg)
+ return errormsg
+ except KeyError:
+ errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
+ return errormsg
+
+ if ospf_external_lsa:
+ for lsa in ospf_external_lsa:
+ try:
+ for t5lsa in show_ospf_json["asExternalLinkStates"]:
+ if (
+ lsa["lsaId"] == t5lsa["lsaId"]
+ and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
+ ):
+ result = True
+ break
+ except KeyError:
+ result = False
+ if result:
+ logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
+ result = True
+ else:
+ errormsg = (
+ "[DUT: {}] OSPF LSDB : expected"
+ " External LSA is {}".format(router, lsa)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result