summaryrefslogtreecommitdiffstats
path: root/tests/topotests/lib/pim.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/topotests/lib/pim.py3979
1 files changed, 3979 insertions, 0 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
new file mode 100644
index 0000000..03ab024
--- /dev/null
+++ b/tests/topotests/lib/pim.py
@@ -0,0 +1,3979 @@
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+
+import datetime
+import os
+import re
+import sys
+import traceback
+import functools
+from copy import deepcopy
+from time import sleep
+from lib import topotest
+
+
+# Import common_config to use commomnly used APIs
+from lib.common_config import (
+ create_common_configurations,
+ HostApplicationHelper,
+ InvalidCLIError,
+ create_common_configuration,
+ InvalidCLIError,
+ retry,
+ run_frr_cmd,
+ validate_ip_address,
+)
+from lib.micronet import get_exec_path
+from lib.topolog import logger
+from lib.topotest import frr_unicode
+
+####
+CWD = os.path.dirname(os.path.realpath(__file__))
+
+
+def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True):
+ """
+ API to configure pim/pimv6 on router
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from
+ testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "pim": {
+ "join-prune-interval": "5",
+ "rp": [{
+ "rp_addr" : "1.0.3.17".
+ "keep-alive-timer": "100"
+ "group_addr_range": ["224.1.1.0/24", "225.1.1.0/24"]
+ "prefix-list": "pf_list_1"
+ "delete": True
+ }]
+ },
+ "pim6": {
+ "disable" : ["l1-i1-eth1"],
+ "rp": [{
+ "rp_addr" : "2001:db8:f::5:17".
+ "keep-alive-timer": "100"
+ "group_addr_range": ["FF00::/8"]
+ "prefix-list": "pf_list_1"
+ "delete": True
+ }]
+ }
+ }
+ }
+
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
+ for router in input_dict.keys():
+ config_data = _enable_disable_pim_config(tgen, topo, input_dict, router, build)
+
+ if config_data:
+ config_data_dict[router] = config_data
+
+ # Now add RP config to all routers
+ for router in input_dict.keys():
+ if "pim" in input_dict[router] or "pim6" in input_dict[router]:
+ _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict)
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "pim", build, load_config
+ )
+ except InvalidCLIError:
+ logger.error("create_pim_config", exc_info=True)
+ result = False
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict):
+ """
+ Helper API to create pim RP configurations.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+ * `config_data_dict` : OUT: adds `router` config to dictinary
+ Returns
+ -------
+ None
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ rp_data = []
+
+ # PIMv4
+ pim_data = None
+ if "pim" in input_dict[router]:
+ pim_data = input_dict[router]["pim"]
+ if "rp" in input_dict[router]["pim"]:
+ rp_data += pim_data["rp"]
+
+ # PIMv6
+ pim6_data = None
+ if "pim6" in input_dict[router]:
+ pim6_data = input_dict[router]["pim6"]
+ if "rp" in input_dict[router]["pim6"]:
+ rp_data += pim6_data["rp"]
+
+ # Configure this RP on every router.
+ for dut in tgen.routers():
+ # At least one interface must be enabled for PIM on the router
+ pim_if_enabled = False
+ pim6_if_enabled = False
+ for destLink, data in topo[dut]["links"].items():
+ if "pim" in data:
+ pim_if_enabled = True
+ if "pim6" in data:
+ pim6_if_enabled = True
+ if not pim_if_enabled and pim_data:
+ continue
+ if not pim6_if_enabled and pim6_data:
+ continue
+
+ config_data = []
+
+ if rp_data:
+ for rp_dict in deepcopy(rp_data):
+ # ip address of RP
+ if "rp_addr" not in rp_dict and build:
+ logger.error(
+ "Router %s: 'ip address of RP' not "
+ "present in input_dict/JSON",
+ router,
+ )
+
+ return False
+ rp_addr = rp_dict.setdefault("rp_addr", None)
+ if rp_addr:
+ addr_type = validate_ip_address(rp_addr)
+ # Keep alive Timer
+ keep_alive_timer = rp_dict.setdefault("keep_alive_timer", None)
+
+ # Group Address range to cover
+ if "group_addr_range" not in rp_dict and build:
+ logger.error(
+ "Router %s:'Group Address range to cover'"
+ " not present in input_dict/JSON",
+ router,
+ )
+
+ return False
+ group_addr_range = rp_dict.setdefault("group_addr_range", None)
+
+ # Group prefix-list filter
+ prefix_list = rp_dict.setdefault("prefix_list", None)
+
+ # Delete rp config
+ del_action = rp_dict.setdefault("delete", False)
+
+ if keep_alive_timer:
+ if addr_type == "ipv4":
+ cmd = "ip pim rp keep-alive-timer {}".format(keep_alive_timer)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ if addr_type == "ipv6":
+ cmd = "ipv6 pim rp keep-alive-timer {}".format(keep_alive_timer)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if rp_addr:
+ if group_addr_range:
+ if type(group_addr_range) is not list:
+ group_addr_range = [group_addr_range]
+
+ for grp_addr in group_addr_range:
+ if addr_type == "ipv4":
+ cmd = "ip pim rp {} {}".format(rp_addr, grp_addr)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ if addr_type == "ipv6":
+ cmd = "ipv6 pim rp {} {}".format(rp_addr, grp_addr)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if prefix_list:
+ if addr_type == "ipv4":
+ cmd = "ip pim rp {} prefix-list {}".format(
+ rp_addr, prefix_list
+ )
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+ if addr_type == "ipv6":
+ cmd = "ipv6 pim rp {} prefix-list {}".format(
+ rp_addr, prefix_list
+ )
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if config_data:
+ if dut not in config_data_dict:
+ config_data_dict[dut] = config_data
+ else:
+ config_data_dict[dut].extend(config_data)
+
+
+def create_igmp_config(tgen, topo, input_dict=None, build=False):
+ """
+ API to configure igmp on router
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from
+ testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "igmp": {
+ "interfaces": {
+ "r1-r0-eth0" :{
+ "igmp":{
+ "version": "2",
+ "delete": True
+ "query": {
+ "query-interval" : 100,
+ "query-max-response-time": 200
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+
+ config_data_dict = {}
+
+ for router in input_dict.keys():
+ if "igmp" not in input_dict[router]:
+ logger.debug("Router %s: 'igmp' is not present in " "input_dict", router)
+ continue
+
+ igmp_data = input_dict[router]["igmp"]
+
+ if "interfaces" in igmp_data:
+ config_data = []
+ intf_data = igmp_data["interfaces"]
+
+ for intf_name in intf_data.keys():
+ cmd = "interface {}".format(intf_name)
+ config_data.append(cmd)
+ protocol = "igmp"
+ del_action = intf_data[intf_name]["igmp"].setdefault("delete", False)
+ del_attr = intf_data[intf_name]["igmp"].setdefault("delete_attr", False)
+ cmd = "ip igmp"
+ if del_action:
+ cmd = "no {}".format(cmd)
+ if not del_attr:
+ config_data.append(cmd)
+
+ for attribute, data in intf_data[intf_name]["igmp"].items():
+ if attribute == "version":
+ cmd = "ip {} {} {}".format(protocol, attribute, data)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ if not del_attr:
+ config_data.append(cmd)
+
+ if attribute == "join":
+ for group in data:
+ cmd = "ip {} {} {}".format(protocol, attribute, group)
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if attribute == "query":
+ for query, value in data.items():
+ if query != "delete":
+ cmd = "ip {} {} {}".format(protocol, query, value)
+
+ if "delete" in intf_data[intf_name][protocol]["query"]:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+ if config_data:
+ config_data_dict[router] = config_data
+
+ try:
+ result = create_common_configurations(
+ tgen, config_data_dict, "interface_config", build=build
+ )
+ except InvalidCLIError:
+ logger.error("create_igmp_config", exc_info=True)
+ result = False
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def create_mld_config(tgen, topo, input_dict=None, build=False):
+ """
+ API to configure mld for PIMv6 on router
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from
+ testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "mld": {
+ "interfaces": {
+ "r1-r0-eth0" :{
+ "mld":{
+ "version": "2",
+ "delete": True
+ "query": {
+ "query-interval" : 100,
+ "query-max-response-time": 200
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ result = False
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ input_dict = deepcopy(input_dict)
+ for router in input_dict.keys():
+ if "mld" not in input_dict[router]:
+ logger.debug("Router %s: 'mld' is not present in " "input_dict", router)
+ continue
+
+ mld_data = input_dict[router]["mld"]
+
+ if "interfaces" in mld_data:
+ config_data = []
+ intf_data = mld_data["interfaces"]
+
+ for intf_name in intf_data.keys():
+ cmd = "interface {}".format(intf_name)
+ config_data.append(cmd)
+ protocol = "mld"
+ del_action = intf_data[intf_name]["mld"].setdefault("delete", False)
+ cmd = "ipv6 mld"
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ del_attr = intf_data[intf_name]["mld"].setdefault("delete_attr", False)
+ join = intf_data[intf_name]["mld"].setdefault("join", None)
+ source = intf_data[intf_name]["mld"].setdefault("source", None)
+ version = intf_data[intf_name]["mld"].setdefault("version", False)
+ query = intf_data[intf_name]["mld"].setdefault("query", {})
+
+ if version:
+ cmd = "ipv6 {} version {}".format(protocol, version)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if source and join:
+ for group in join:
+ cmd = "ipv6 {} join {} {}".format(protocol, group, source)
+
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ elif join:
+ for group in join:
+ cmd = "ipv6 {} join {}".format(protocol, group)
+
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if query:
+ for _query, value in query.items():
+ if _query != "delete":
+ cmd = "ipv6 {} {} {}".format(protocol, _query, value)
+
+ if "delete" in intf_data[intf_name][protocol]["query"]:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+ try:
+ result = create_common_configuration(
+ tgen, router, config_data, "interface_config", build=build
+ )
+ except InvalidCLIError:
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def _enable_disable_pim_config(tgen, topo, input_dict, router, build=False):
+ """
+ Helper API to enable or disable pim on interfaces
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+
+ Returns
+ -------
+ list of config
+ """
+
+ config_data = []
+
+ # Enable pim/pim6 on interfaces
+ for destRouterLink, data in sorted(topo[router]["links"].items()):
+ if "pim" in data and data["pim"] == "enable":
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+
+ cmd = "interface {}".format(interface_name)
+ config_data.append(cmd)
+ config_data.append("ip pim")
+
+ if "pim6" in data and data["pim6"] == "enable":
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+
+ cmd = "interface {}".format(interface_name)
+ config_data.append(cmd)
+ config_data.append("ipv6 pim")
+
+ # pim global config
+ if "pim" in input_dict[router]:
+ pim_data = input_dict[router]["pim"]
+ del_action = pim_data.setdefault("delete", False)
+ for t in [
+ "join-prune-interval",
+ "keep-alive-timer",
+ "register-suppress-time",
+ ]:
+ if t in pim_data:
+ cmd = "ip pim {} {}".format(t, pim_data[t])
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ # pim6 global config
+ if "pim6" in input_dict[router]:
+ pim6_data = input_dict[router]["pim6"]
+ del_action = pim6_data.setdefault("delete", False)
+ for t in [
+ "join-prune-interval",
+ "keep-alive-timer",
+ "register-suppress-time",
+ ]:
+ if t in pim6_data:
+ cmd = "ipv6 pim {} {}".format(t, pim6_data[t])
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ return config_data
+
+
+def find_rp_details(tgen, topo):
+ """
+ Find who is RP in topology and returns list of RPs
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ rp_details = {}
+
+ router_list = tgen.routers()
+ topo_data = topo["routers"]
+
+ for router in router_list.keys():
+
+ if "pim" not in topo_data[router]:
+ continue
+
+ pim_data = topo_data[router]["pim"]
+ if "rp" in pim_data:
+ rp_data = pim_data["rp"]
+ for rp_dict in rp_data:
+ # ip address of RP
+ rp_addr = rp_dict["rp_addr"]
+
+ for link, data in topo["routers"][router]["links"].items():
+ if data["ipv4"].split("/")[0] == rp_addr:
+ rp_details[router] = rp_addr
+
+ return rp_details
+
+
+def configure_pim_force_expire(tgen, topo, input_dict, build=False):
+ """
+ Helper API to create pim configuration.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict ={
+ "l1": {
+ "pim": {
+ "force_expire":{
+ "10.0.10.1": ["255.1.1.1"]
+ }
+ }
+ }
+ }
+
+ result = create_pim_config(tgen, topo, input_dict)
+
+ Returns
+ -------
+ True or False
+ """
+
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ try:
+ config_data_dict = {}
+
+ for dut in input_dict.keys():
+ if "pim" not in input_dict[dut]:
+ continue
+
+ pim_data = input_dict[dut]["pim"]
+
+ config_data = []
+ if "force_expire" in pim_data:
+ force_expire_data = pim_data["force_expire"]
+
+ for source, groups in force_expire_data.items():
+ if type(groups) is not list:
+ groups = [groups]
+
+ for group in groups:
+ cmd = "ip pim force-expire source {} group {}".format(
+ source, group
+ )
+ config_data.append(cmd)
+
+ if config_data:
+ config_data_dict[dut] = config_data
+
+ result = create_common_configurations(
+ tgen, config_data_dict, "pim", build=build
+ )
+ except InvalidCLIError:
+ logger.error("configure_pim_force_expire", exc_info=True)
+ result = False
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+#############################################
+# Verification APIs
+#############################################
+@retry(retry_timeout=12)
+def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected=True):
+ """
+ Verify all PIM neighbors are up and running, config is verified
+ using "show ip pim neighbor" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : dut info
+ * `iface` : link for which PIM nbr need to check
+ * `nbr_ip` : neighbor ip of interface
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_pim_neighbors(tgen, topo, dut, iface=ens192, nbr_ip=20.1.1.2)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if dut is not None and dut != router:
+ continue
+
+ rnode = tgen.routers()[router]
+ show_ip_pim_neighbor_json = rnode.vtysh_cmd(
+ "show ip pim neighbor json", isjson=True
+ )
+
+ for destLink, data in topo["routers"][router]["links"].items():
+ if iface is not None and iface != data["interface"]:
+ continue
+
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" not in data:
+ continue
+
+ if "pim" in data and data["pim"] == "disable":
+ continue
+
+ if "pim" in data and data["pim"] == "enable":
+ local_interface = data["interface"]
+
+ if "-" in destLink:
+ # Spliting and storing destRouterLink data in tempList
+ tempList = destLink.split("-")
+
+ # destRouter
+ destLink = tempList.pop(0)
+
+ # Current Router Link
+ tempList.insert(0, router)
+ curRouter = "-".join(tempList)
+ else:
+ curRouter = router
+ if destLink not in topo["routers"]:
+ continue
+ data = topo["routers"][destLink]["links"][curRouter]
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" not in data:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM neighbor status:", router)
+
+ if "pim" in data and data["pim"] == "enable":
+ pim_nh_intf_ip = data["ipv4"].split("/")[0]
+
+ # Verifying PIM neighbor
+ if local_interface in show_ip_pim_neighbor_json:
+ if show_ip_pim_neighbor_json[local_interface]:
+ if (
+ show_ip_pim_neighbor_json[local_interface][pim_nh_intf_ip][
+ "neighbor"
+ ]
+ != pim_nh_intf_ip
+ ):
+ errormsg = (
+ "[DUT %s]: Local interface: %s, PIM"
+ " neighbor check failed "
+ "Expected neighbor: %s, Found neighbor:"
+ " %s"
+ % (
+ router,
+ local_interface,
+ pim_nh_intf_ip,
+ show_ip_pim_neighbor_json[local_interface][
+ pim_nh_intf_ip
+ ]["neighbor"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Local interface: %s, Found"
+ " expected PIM neighbor %s",
+ router,
+ local_interface,
+ pim_nh_intf_ip,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Local interface: %s, and"
+ "interface ip: %s is not found in "
+ "PIM neighbor " % (router, local_interface, pim_nh_intf_ip)
+ )
+ return errormsg
+ else:
+ errormsg = (
+ "[DUT %s]: Local interface: %s, is not "
+ "present in PIM neighbor " % (router, local_interface)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_igmp_groups(tgen, dut, interface, group_addresses, expected=True):
+ """
+ Verify IGMP groups are received from an intended interface
+ by running "show ip igmp groups" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which IGMP groups would be received
+ * `group_addresses`: IGMP group address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_igmp_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying IGMP groups received:", dut)
+ show_ip_igmp_json = run_frr_cmd(rnode, "show ip igmp groups json", isjson=True)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface in show_ip_igmp_json:
+ show_ip_igmp_json = show_ip_igmp_json[interface]["groups"]
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying IGMP group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ found = False
+ for grp_addr in group_addresses:
+ for index in show_ip_igmp_json:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if found is not True:
+ errormsg = (
+ "[DUT %s]: Verifying IGMP group received"
+ " from interface %s [FAILED]!! "
+ " Expected not found: %s" % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying IGMP group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_upstream_iif(
+ tgen,
+ dut,
+ iif,
+ src_address,
+ group_addresses,
+ joinState=None,
+ refCount=1,
+ expected=True,
+):
+ """
+ Verify upstream inbound interface is updated correctly
+ by running "show ip pim upstream" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `iif`: inbound interface
+ * `src_address`: source address
+ * `group_addresses`: IGMP group address
+ * `joinState`: upstream join state
+ * `refCount`: refCount value
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ iif = "r1-r0-eth0"
+ src_address = "*"
+ group_address = "225.1.1.1"
+ result = verify_upstream_iif(tgen, dut, iif, src_address, group_address,
+ state, refCount)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info(
+ "[DUT: %s]: Verifying upstream Inbound Interface" " for IGMP groups received:",
+ dut,
+ )
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if type(iif) is not list:
+ iif = [iif]
+
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ cmd = "show {} pim upstream json".format(ip_cmd)
+ show_ip_pim_upstream_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ for grp_addr in group_addresses:
+ # Verify group address
+ if grp_addr not in show_ip_pim_upstream_json:
+ errormsg = "[DUT %s]: Verifying upstream" " for group %s [FAILED]!!" % (
+ dut,
+ grp_addr,
+ )
+ return errormsg
+ group_addr_json = show_ip_pim_upstream_json[grp_addr]
+
+ # Verify source address
+ if src_address not in group_addr_json:
+ errormsg = "[DUT %s]: Verifying upstream" " for (%s,%s) [FAILED]!!" % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+
+ # Verify Inbound Interface
+ found = False
+ for in_interface in iif:
+ if group_addr_json[src_address]["inboundInterface"] == in_interface:
+ if refCount > 0:
+ logger.info(
+ "[DUT %s]: Verifying refCount "
+ "for (%s,%s) [PASSED]!! "
+ " Found Expected: %s",
+ dut,
+ src_address,
+ grp_addr,
+ group_addr_json[src_address]["refCount"],
+ )
+ found = True
+ if found:
+ if joinState is None:
+ if group_addr_json[src_address]["joinState"] != "Joined":
+ errormsg = (
+ "[DUT %s]: Verifying iif "
+ "(Inbound Interface) for (%s,%s) and"
+ " joinState :%s [FAILED]!! "
+ " Expected: %s, Found: %s"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ group_addr_json[src_address]["joinState"],
+ in_interface,
+ group_addr_json[src_address]["inboundInterface"],
+ )
+ )
+ return errormsg
+
+ elif group_addr_json[src_address]["joinState"] != joinState:
+ errormsg = (
+ "[DUT %s]: Verifying iif "
+ "(Inbound Interface) for (%s,%s) and"
+ " joinState :%s [FAILED]!! "
+ " Expected: %s, Found: %s"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ group_addr_json[src_address]["joinState"],
+ in_interface,
+ group_addr_json[src_address]["inboundInterface"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying iif(Inbound Interface)"
+ " for (%s,%s) and joinState is %s [PASSED]!! "
+ " Found Expected: (%s)",
+ dut,
+ src_address,
+ grp_addr,
+ group_addr_json[src_address]["joinState"],
+ group_addr_json[src_address]["inboundInterface"],
+ )
+ if not found:
+ errormsg = (
+ "[DUT %s]: Verifying iif "
+ "(Inbound Interface) for (%s, %s) "
+ "[FAILED]!! "
+ " Expected: %s, Found: %s"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ in_interface,
+ group_addr_json[src_address]["inboundInterface"],
+ )
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=12)
+def verify_join_state_and_timer(
+ tgen, dut, iif, src_address, group_addresses, expected=True
+):
+ """
+ Verify join state is updated correctly and join timer is
+ running with the help of "show ip pim upstream" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `iif`: inbound interface
+ * `src_address`: source address
+ * `group_addresses`: IGMP group address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ iif = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_join_state_and_timer(tgen, dut, iif, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ errormsg = ""
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info(
+ "[DUT: %s]: Verifying Join state and Join Timer" " for IGMP groups received:",
+ dut,
+ )
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ cmd = "show ip pim upstream json"
+ elif addr_type == "ipv6":
+ cmd = "show ipv6 pim upstream json"
+ show_ip_pim_upstream_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ for grp_addr in group_addresses:
+ # Verify group address
+ if grp_addr not in show_ip_pim_upstream_json:
+ errormsg = "[DUT %s]: Verifying upstream" " for group %s [FAILED]!!" % (
+ dut,
+ grp_addr,
+ )
+ return errormsg
+
+ group_addr_json = show_ip_pim_upstream_json[grp_addr]
+
+ # Verify source address
+ if src_address not in group_addr_json:
+ errormsg = "[DUT %s]: Verifying upstream" " for (%s,%s) [FAILED]!!" % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+
+ # Verify join state
+ joinState = group_addr_json[src_address]["joinState"]
+ if joinState != "Joined":
+ error = (
+ "[DUT %s]: Verifying join state for"
+ " (%s,%s) [FAILED]!! "
+ " Expected: %s, Found: %s"
+ % (dut, src_address, grp_addr, "Joined", joinState)
+ )
+ errormsg = errormsg + "\n" + str(error)
+ else:
+ logger.info(
+ "[DUT %s]: Verifying join state for"
+ " (%s,%s) [PASSED]!! "
+ " Found Expected: %s",
+ dut,
+ src_address,
+ grp_addr,
+ joinState,
+ )
+
+ # Verify join timer
+ joinTimer = group_addr_json[src_address]["joinTimer"]
+ if not re.match(r"(\d{2}):(\d{2}):(\d{2})", joinTimer):
+ error = (
+ "[DUT %s]: Verifying join timer for"
+ " (%s,%s) [FAILED]!! "
+ " Expected: %s, Found: %s"
+ ) % (
+ dut,
+ src_address,
+ grp_addr,
+ "join timer should be running",
+ joinTimer,
+ )
+ errormsg = errormsg + "\n" + str(error)
+ else:
+ logger.info(
+ "[DUT %s]: Verifying join timer is running"
+ " for (%s,%s) [PASSED]!! "
+ " Found Expected: %s",
+ dut,
+ src_address,
+ grp_addr,
+ joinTimer,
+ )
+
+ if errormsg != "":
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=120, diag_pct=0)
+def verify_mroutes(
+ tgen,
+ dut,
+ src_address,
+ group_addresses,
+ iif,
+ oil,
+ return_uptime=False,
+ mwait=0,
+ expected=True,
+):
+ """
+ Verify ip mroutes and make sure (*, G)/(S, G) is present in mroutes
+ by running "show ip/ipv6 mroute" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `src_address`: source address
+ * `group_addresses`: IGMP group address
+ * `iif`: Incoming interface
+ * `oil`: Outgoing interface
+ * `return_uptime`: If True, return uptime dict, default is False
+ * `mwait`: Wait time, default is 0
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ group_address = "225.1.1.1"
+ result = verify_mroutes(tgen, dut, src_address, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ if not isinstance(group_addresses, list):
+ group_addresses = [group_addresses]
+
+ if not isinstance(iif, list) and iif != "none":
+ iif = [iif]
+
+ if not isinstance(oil, list) and oil != "none":
+ oil = [oil]
+
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ if return_uptime:
+ logger.info("Sleeping for %s sec..", mwait)
+ sleep(mwait)
+
+ logger.info("[DUT: %s]: Verifying ip mroutes", dut)
+ show_ip_mroute_json = run_frr_cmd(
+ rnode, "show {} mroute json".format(ip_cmd), isjson=True
+ )
+
+ if return_uptime:
+ uptime_dict = {}
+
+ if bool(show_ip_mroute_json) == False:
+ error_msg = "[DUT %s]: mroutes are not present or flushed out !!" % (dut)
+ return error_msg
+
+ for grp_addr in group_addresses:
+ if grp_addr not in show_ip_mroute_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+ else:
+ if return_uptime:
+ uptime_dict[grp_addr] = {}
+
+ group_addr_json = show_ip_mroute_json[grp_addr]
+
+ if src_address not in group_addr_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+ else:
+ if return_uptime:
+ uptime_dict[grp_addr][src_address] = {}
+
+ mroutes = group_addr_json[src_address]
+
+ if mroutes["installed"] != 0:
+ logger.info(
+ "[DUT %s]: mroute (%s,%s) is installed", dut, src_address, grp_addr
+ )
+
+ if "oil" not in mroutes:
+ if oil == "none" and mroutes["iif"] in iif:
+ logger.info(
+ "[DUT %s]: Verifying (%s, %s) mroute,"
+ " [PASSED]!! Found Expected: "
+ "(iif: %s, oil: %s, installed: (%s,%s))",
+ dut,
+ src_address,
+ grp_addr,
+ mroutes["iif"],
+ oil,
+ src_address,
+ grp_addr,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying (%s, %s) mroute,"
+ " [FAILED]!! "
+ "Expected: (oil: %s, installed:"
+ " (%s,%s)) Found: ( oil: none, "
+ "installed: (%s,%s))"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ oil,
+ src_address,
+ grp_addr,
+ src_address,
+ grp_addr,
+ )
+ )
+
+ return errormsg
+
+ else:
+ found = False
+ for route, data in mroutes["oil"].items():
+ if route in oil and route != "pimreg":
+ if (
+ data["source"] == src_address
+ and data["group"] == grp_addr
+ and data["inboundInterface"] in iif
+ and data["outboundInterface"] in oil
+ ):
+ if return_uptime:
+
+ uptime_dict[grp_addr][src_address] = data["upTime"]
+
+ logger.info(
+ "[DUT %s]: Verifying (%s, %s)"
+ " mroute, [PASSED]!! "
+ "Found Expected: "
+ "(iif: %s, oil: %s, installed:"
+ " (%s,%s)",
+ dut,
+ src_address,
+ grp_addr,
+ data["inboundInterface"],
+ data["outboundInterface"],
+ data["source"],
+ data["group"],
+ )
+ found = True
+ break
+ else:
+ continue
+
+ if not found:
+ errormsg = (
+ "[DUT %s]: Verifying (%s, %s)"
+ " mroute [FAILED]!! "
+ "Expected in: (iif: %s, oil: %s,"
+ " installed: (%s,%s)) Found: "
+ "(iif: %s, oil: %s, "
+ "installed: (%s,%s))"
+ % (
+ dut,
+ src_address,
+ grp_addr,
+ iif,
+ oil,
+ src_address,
+ grp_addr,
+ data["inboundInterface"],
+ data["outboundInterface"],
+ data["source"],
+ data["group"],
+ )
+ )
+ return errormsg
+
+ else:
+ errormsg = "[DUT %s]: mroute (%s,%s) is not installed" % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True if return_uptime == False else uptime_dict
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_rp_info(
+ tgen,
+ topo,
+ dut,
+ group_addresses,
+ oif=None,
+ rp=None,
+ source=None,
+ iamrp=None,
+ expected=True,
+):
+ """
+ Verify pim rp info by running "show ip pim rp-info" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: JSON file handler
+ * `dut`: device under test
+ * `group_addresses`: IGMP group address
+ * `oif`: outbound interface name
+ * `rp`: RP address
+ * `source`: Source of RP
+ * `iamrp`: User defined RP
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ result = verify_pim_rp_info(tgen, topo, dut, group_address,
+ rp=rp, source="BSR")
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if type(oif) is not list:
+ oif = [oif]
+
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ for grp_addr in group_addresses:
+ if rp is None:
+ rp_details = find_rp_details(tgen, topo)
+
+ if dut in rp_details:
+ iamRP = True
+ else:
+ iamRP = False
+ else:
+ if addr_type == "ipv4":
+ show_ip_route_json = run_frr_cmd(
+ rnode, "show ip route connected json", isjson=True
+ )
+ elif addr_type == "ipv6":
+ show_ip_route_json = run_frr_cmd(
+ rnode, "show ipv6 route connected json", isjson=True
+ )
+ for _rp in show_ip_route_json.keys():
+ if rp == _rp.split("/")[0]:
+ iamRP = True
+ break
+ else:
+ iamRP = False
+
+ logger.info("[DUT: %s]: Verifying ip rp info", dut)
+ cmd = "show {} pim rp-info json".format(ip_cmd)
+ show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if rp not in show_ip_rp_info_json:
+ errormsg = (
+ "[DUT %s]: Verifying rp-info "
+ "for rp_address %s [FAILED]!! " % (dut, rp)
+ )
+ return errormsg
+ else:
+ group_addr_json = show_ip_rp_info_json[rp]
+
+ for rp_json in group_addr_json:
+ if "rpAddress" not in rp_json:
+ errormsg = "[DUT %s]: %s key not " "present in rp-info " % (
+ dut,
+ "rpAddress",
+ )
+ return errormsg
+
+ if oif is not None:
+ found = False
+ if rp_json["outboundInterface"] not in oif:
+ errormsg = (
+ "[DUT %s]: Verifying OIF "
+ "for group %s and RP %s [FAILED]!! "
+ "Expected interfaces: (%s),"
+ " Found: (%s)"
+ % (dut, grp_addr, rp, oif, rp_json["outboundInterface"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying OIF "
+ "for group %s and RP %s [PASSED]!! "
+ "Found Expected: (%s)"
+ % (dut, grp_addr, rp, rp_json["outboundInterface"])
+ )
+
+ if source is not None:
+ if rp_json["source"] != source:
+ errormsg = (
+ "[DUT %s]: Verifying SOURCE "
+ "for group %s and RP %s [FAILED]!! "
+ "Expected: (%s),"
+ " Found: (%s)" % (dut, grp_addr, rp, source, rp_json["source"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying SOURCE "
+ "for group %s and RP %s [PASSED]!! "
+ "Found Expected: (%s)" % (dut, grp_addr, rp, rp_json["source"])
+ )
+
+ if rp_json["group"] == grp_addr and iamrp is not None:
+ if iamRP:
+ if rp_json["iAmRP"]:
+ logger.info(
+ "[DUT %s]: Verifying group "
+ "and iAmRP [PASSED]!!"
+ " Found Expected: (%s, %s:%s)",
+ dut,
+ grp_addr,
+ "iAmRP",
+ rp_json["iAmRP"],
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying group"
+ "%s and iAmRP [FAILED]!! "
+ "Expected: (iAmRP: %s),"
+ " Found: (iAmRP: %s)"
+ % (dut, grp_addr, "true", rp_json["iAmRP"])
+ )
+ return errormsg
+
+ if not iamRP:
+ if rp_json["iAmRP"] == False:
+ logger.info(
+ "[DUT %s]: Verifying group "
+ "and iAmNotRP [PASSED]!!"
+ " Found Expected: (%s, %s:%s)",
+ dut,
+ grp_addr,
+ "iAmRP",
+ rp_json["iAmRP"],
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying group"
+ "%s and iAmRP [FAILED]!! "
+ "Expected: (iAmRP: %s),"
+ " Found: (iAmRP: %s)"
+ % (dut, grp_addr, "false", rp_json["iAmRP"])
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_state(
+ tgen,
+ dut,
+ iif,
+ oil,
+ group_addresses,
+ src_address=None,
+ installed_fl=None,
+ expected=True,
+):
+ """
+ Verify pim state by running "show ip pim state" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `iif`: inbound interface
+ * `oil`: outbound interface
+ * `group_addresses`: IGMP group address
+ * `src_address`: source address, default = None
+ * installed_fl` : Installed flag
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ iif = "r1-r3-eth1"
+ oil = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_pim_state(tgen, dut, iif, oil, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying pim state", dut)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ for grp in group_addresses:
+ addr_type = validate_ip_address(grp)
+
+ if addr_type == "ipv4":
+ ip_cmd = "ip"
+ elif addr_type == "ipv6":
+ ip_cmd = "ipv6"
+
+ logger.info("[DUT: %s]: Verifying pim state", dut)
+ show_pim_state_json = run_frr_cmd(
+ rnode, "show {} pim state json".format(ip_cmd), isjson=True
+ )
+
+ if installed_fl is None:
+ installed_fl = 1
+
+ for grp_addr in group_addresses:
+ if src_address is None:
+ src_address = "*"
+ pim_state_json = show_pim_state_json[grp_addr][src_address]
+ else:
+ pim_state_json = show_pim_state_json[grp_addr][src_address]
+
+ if pim_state_json["Installed"] == installed_fl:
+ logger.info(
+ "[DUT %s]: group %s is installed flag: %s",
+ dut,
+ grp_addr,
+ pim_state_json["Installed"],
+ )
+ for interface, data in pim_state_json[iif].items():
+ if interface != oil:
+ continue
+
+ # Verify iif, oil and installed state
+ if (
+ data["group"] == grp_addr
+ and data["installed"] == installed_fl
+ and data["inboundInterface"] == iif
+ and data["outboundInterface"] == oil
+ ):
+ logger.info(
+ "[DUT %s]: Verifying pim state for group"
+ " %s [PASSED]!! Found Expected: "
+ "(iif: %s, oil: %s, installed: %s) ",
+ dut,
+ grp_addr,
+ data["inboundInterface"],
+ data["outboundInterface"],
+ data["installed"],
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying pim state for group"
+ " %s, [FAILED]!! Expected: "
+ "(iif: %s, oil: %s, installed: %s) "
+ % (dut, grp_addr, iif, oil, "1"),
+ "Found: (iif: %s, oil: %s, installed: %s)"
+ % (
+ data["inboundInterface"],
+ data["outboundInterface"],
+ data["installed"],
+ ),
+ )
+ return errormsg
+ else:
+ errormsg = "[DUT %s]: %s install flag value not as expected" % (
+ dut,
+ grp_addr,
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def get_pim_interface_traffic(tgen, input_dict):
+ """
+ get ip pim interface traffice by running
+ "show ip pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict(dict)`: defines DUT, what and from which interfaces
+ traffic needs to be retrieved
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "r1-r0-eth0": {
+ "helloRx": 0,
+ "helloTx": 1,
+ "joinRx": 0,
+ "joinTx": 0
+ }
+ }
+ }
+
+ result = get_pim_interface_traffic(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ output_dict = {}
+ for dut in input_dict.keys():
+ if dut not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying pim interface traffic", dut)
+
+ def show_pim_intf_traffic(rnode, dut, input_dict, output_dict):
+ show_pim_intf_traffic_json = run_frr_cmd(
+ rnode, "show ip pim interface traffic json", isjson=True
+ )
+
+ output_dict[dut] = {}
+ for intf, data in input_dict[dut].items():
+ interface_json = show_pim_intf_traffic_json[intf]
+ for state in data:
+
+ # Verify Tx/Rx
+ if state in interface_json:
+ output_dict[dut][state] = interface_json[state]
+ else:
+ errormsg = (
+ "[DUT %s]: %s is not present"
+ "for interface %s [FAILED]!! " % (dut, state, intf)
+ )
+ return errormsg
+ return None
+
+ test_func = functools.partial(
+ show_pim_intf_traffic, rnode, dut, input_dict, output_dict
+ )
+ (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
+ if not result:
+ return out
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return output_dict
+
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_pim_interface(
+ tgen, topo, dut, interface=None, interface_ip=None, expected=True
+):
+ """
+ Verify all PIM interface are up and running, config is verified
+ using "show ip pim interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `interface` : interface name
+ * `interface_ip` : interface ip address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_pim_interfacetgen, topo, dut, interface=ens192, interface_ip=20.1.1.1)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if router != dut:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
+
+ rnode = tgen.routers()[dut]
+ show_ip_pim_interface_json = rnode.vtysh_cmd(
+ "show ip pim interface json", isjson=True
+ )
+
+ logger.info("show_ip_pim_interface_json: \n %s", show_ip_pim_interface_json)
+
+ if interface_ip:
+ if interface in show_ip_pim_interface_json:
+ pim_intf_json = show_ip_pim_interface_json[interface]
+ if pim_intf_json["address"] != interface_ip:
+ errormsg = (
+ "[DUT %s]: PIM interface "
+ "ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, pim_intf_json["address"], interface_ip)
+ )
+ return errormsg
+ else:
+ logger.info(
+ "[DUT %s]: PIM interface "
+ "ip is correct "
+ "[Passed]!! Expected : %s, Found : %s"
+ % (dut, pim_intf_json["address"], interface_ip)
+ )
+ return True
+ else:
+ for destLink, data in topo["routers"][dut]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" in data and data["pim"] == "enable":
+ pim_interface = data["interface"]
+ pim_intf_ip = data["ipv4"].split("/")[0]
+
+ if pim_interface in show_ip_pim_interface_json:
+ pim_intf_json = show_ip_pim_interface_json[pim_interface]
+ else:
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ "PIM interface ip: %s, not Found"
+ % (dut, pim_interface, pim_intf_ip)
+ )
+ return errormsg
+
+ # Verifying PIM interface
+ if (
+ pim_intf_json["address"] != pim_intf_ip
+ and pim_intf_json["state"] != "up"
+ ):
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ "PIM interface ip: %s, status check "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (
+ dut,
+ pim_interface,
+ pim_intf_ip,
+ pim_interface,
+ pim_intf_json["state"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM interface: %s, "
+ "interface ip: %s, status: %s"
+ " [PASSED]!!",
+ dut,
+ pim_interface,
+ pim_intf_ip,
+ pim_intf_json["state"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def clear_pim_interface_traffic(tgen, topo):
+ """
+ Clear ip/ipv6 pim interface traffice by running
+ "clear ip/ipv6 pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ Usage
+ -----
+
+ result = clear_pim_interface_traffic(tgen, topo)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in tgen.routers():
+ if "pim" not in topo["routers"][dut]:
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Clearing pim interface traffic", dut)
+ result = run_frr_cmd(rnode, "clear ip pim interface traffic")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+def clear_pim_interfaces(tgen, dut):
+ """
+ Clear ip/ipv6 pim interface by running
+ "clear ip/ipv6 pim interfaces" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: Device Under Test
+ Usage
+ -----
+
+ result = clear_pim_interfaces(tgen, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ nh_before_clear = {}
+ nh_after_clear = {}
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verify pim neighbor before pim" " neighbor clear", dut)
+ # To add uptime initially
+ sleep(10)
+ run_json_before = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True)
+
+ for key, value in run_json_before.items():
+ if bool(value):
+ for _key, _value in value.items():
+ nh_before_clear[key] = _value["upTime"]
+
+ # Clearing PIM neighbors
+ logger.info("[DUT: %s]: Clearing pim interfaces", dut)
+ run_frr_cmd(rnode, "clear ip pim interfaces")
+
+ logger.info("[DUT: %s]: Verify pim neighbor after pim" " neighbor clear", dut)
+
+ found = False
+
+ # Waiting for maximum 60 sec
+ fail_intf = []
+ for retry in range(1, 13):
+ logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
+ sleep(5)
+ run_json_after = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True)
+ found = True
+ for pim_intf in nh_before_clear.keys():
+ if pim_intf not in run_json_after or not run_json_after[pim_intf]:
+ found = False
+ fail_intf.append(pim_intf)
+
+ if found is True:
+ break
+ else:
+ errormsg = (
+ "[DUT: %s]: pim neighborship is not formed for %s"
+ "after clear_ip_pim_interfaces %s [FAILED!!]",
+ dut,
+ fail_intf,
+ )
+ return errormsg
+
+ for key, value in run_json_after.items():
+ if bool(value):
+ for _key, _value in value.items():
+ nh_after_clear[key] = _value["upTime"]
+
+ # Verify uptime for neighbors
+ for pim_intf in nh_before_clear.keys():
+ d1 = datetime.datetime.strptime(nh_before_clear[pim_intf], "%H:%M:%S")
+ d2 = datetime.datetime.strptime(nh_after_clear[pim_intf], "%H:%M:%S")
+ if d2 >= d1:
+ errormsg = (
+ "[DUT: %s]: PIM neighborship is not cleared for",
+ " interface %s [FAILED!!]",
+ dut,
+ pim_intf,
+ )
+
+ logger.info("[DUT: %s]: PIM neighborship is cleared [PASSED!!]")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+def clear_igmp_interfaces(tgen, dut):
+ """
+ Clear ip/ipv6 igmp interfaces by running
+ "clear ip/ipv6 igmp interfaces" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+
+ Usage
+ -----
+ dut = "r1"
+ result = clear_igmp_interfaces(tgen, dut)
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ group_before_clear = {}
+ group_after_clear = {}
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: IGMP group uptime before clear" " igmp groups:", dut)
+ igmp_json = run_frr_cmd(rnode, "show ip igmp groups json", isjson=True)
+
+ total_groups_before_clear = igmp_json["totalGroups"]
+
+ for key, value in igmp_json.items():
+ if type(value) is not dict:
+ continue
+
+ groups = value["groups"]
+ group = groups[0]["group"]
+ uptime = groups[0]["uptime"]
+ group_before_clear[group] = uptime
+
+ logger.info("[DUT: %s]: Clearing ip igmp interfaces", dut)
+ result = run_frr_cmd(rnode, "clear ip igmp interfaces")
+
+ # Waiting for maximum 60 sec
+ for retry in range(1, 13):
+ logger.info(
+ "[DUT: %s]: Waiting for 5 sec for igmp interfaces" " to come up", dut
+ )
+ sleep(5)
+ igmp_json = run_frr_cmd(rnode, "show ip igmp groups json", isjson=True)
+
+ total_groups_after_clear = igmp_json["totalGroups"]
+
+ if total_groups_before_clear == total_groups_after_clear:
+ break
+
+ for key, value in igmp_json.items():
+ if type(value) is not dict:
+ continue
+
+ groups = value["groups"]
+ group = groups[0]["group"]
+ uptime = groups[0]["uptime"]
+ group_after_clear[group] = uptime
+
+ # Verify uptime for groups
+ for group in group_before_clear.keys():
+ d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S")
+ d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S")
+ if d2 >= d1:
+ errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut)
+
+ logger.info("[DUT: %s]: IGMP group is cleared [PASSED!!]")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+@retry(retry_timeout=20)
+def clear_mroute_verify(tgen, dut, expected=True):
+ """
+ Clear ip/ipv6 mroute by running "clear ip/ipv6 mroute" cli and verify
+ mroutes are up again after mroute clear
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: Device Under Test
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+
+ result = clear_mroute_verify(tgen, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ mroute_before_clear = {}
+ mroute_after_clear = {}
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: IP mroutes uptime before clear", dut)
+ mroute_json_1 = run_frr_cmd(rnode, "show ip mroute json", isjson=True)
+
+ for group in mroute_json_1.keys():
+ mroute_before_clear[group] = {}
+ for key in mroute_json_1[group].keys():
+ for _key, _value in mroute_json_1[group][key]["oil"].items():
+ if _key != "pimreg":
+ mroute_before_clear[group][key] = _value["upTime"]
+
+ logger.info("[DUT: %s]: Clearing ip mroute", dut)
+ result = run_frr_cmd(rnode, "clear ip mroute")
+
+ # RFC 3376: 8.2. Query Interval - Default: 125 seconds
+ # So waiting for maximum 130 sec to get the igmp report
+ for retry in range(1, 26):
+ logger.info("[DUT: %s]: Waiting for 2 sec for mroutes" " to come up", dut)
+ sleep(5)
+ keys_json1 = mroute_json_1.keys()
+ mroute_json_2 = run_frr_cmd(rnode, "show ip mroute json", isjson=True)
+
+ if bool(mroute_json_2):
+ keys_json2 = mroute_json_2.keys()
+
+ for group in mroute_json_2.keys():
+ flag = False
+ for key in mroute_json_2[group].keys():
+ if "oil" not in mroute_json_2[group]:
+ continue
+
+ for _key, _value in mroute_json_2[group][key]["oil"].items():
+ if _key != "pimreg" and keys_json1 == keys_json2:
+ break
+ flag = True
+ if flag:
+ break
+ else:
+ continue
+
+ for group in mroute_json_2.keys():
+ mroute_after_clear[group] = {}
+ for key in mroute_json_2[group].keys():
+ for _key, _value in mroute_json_2[group][key]["oil"].items():
+ if _key != "pimreg":
+ mroute_after_clear[group][key] = _value["upTime"]
+
+ # Verify uptime for mroute
+ for group in mroute_before_clear.keys():
+ for source in mroute_before_clear[group].keys():
+ if set(mroute_before_clear[group]) != set(mroute_after_clear[group]):
+ errormsg = (
+ "[DUT: %s]: mroute (%s, %s) has not come"
+ " up after mroute clear [FAILED!!]" % (dut, source, group)
+ )
+ return errormsg
+
+ d1 = datetime.datetime.strptime(
+ mroute_before_clear[group][source], "%H:%M:%S"
+ )
+ d2 = datetime.datetime.strptime(
+ mroute_after_clear[group][source], "%H:%M:%S"
+ )
+ if d2 >= d1:
+ errormsg = "[DUT: %s]: IP mroute is not cleared" " [FAILED!!]" % (dut)
+
+ logger.info("[DUT: %s]: IP mroute is cleared [PASSED!!]", dut)
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return True
+
+
+def clear_mroute(tgen, dut=None):
+ """
+ Clear ip/ipv6 mroute by running "clear ip mroute" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test, default None
+
+ Usage
+ -----
+ clear_mroute(tgen, dut)
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ router_list = tgen.routers()
+ for router, rnode in router_list.items():
+ if dut is not None and router != dut:
+ continue
+
+ logger.debug("[DUT: %s]: Clearing ip mroute", router)
+ rnode.vtysh_cmd("clear ip mroute")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+
+def reconfig_interfaces(tgen, topo, senderRouter, receiverRouter, packet=None):
+ """
+ Configure interface ip for sender and receiver routers
+ as per bsr packet
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `senderRouter` : Sender router
+ * `receiverRouter` : Receiver router
+ * `packet` : BSR packet in raw format
+
+ Returns
+ -------
+ True or False
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ try:
+ config_data = []
+
+ src_ip = topo["routers"][senderRouter]["bsm"]["bsr_packets"][packet]["src_ip"]
+ dest_ip = topo["routers"][senderRouter]["bsm"]["bsr_packets"][packet]["dest_ip"]
+
+ for destLink, data in topo["routers"][senderRouter]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" in data and data["pim"] == "enable":
+ sender_interface = data["interface"]
+ sender_interface_ip = data["ipv4"]
+
+ config_data.append("interface {}".format(sender_interface))
+ config_data.append("no ip address {}".format(sender_interface_ip))
+ config_data.append("ip address {}".format(src_ip))
+
+ result = create_common_configuration(
+ tgen, senderRouter, config_data, "interface_config"
+ )
+ if result is not True:
+ return False
+
+ config_data = []
+ links = topo["routers"][destLink]["links"]
+ pim_neighbor = {key: links[key] for key in [senderRouter]}
+
+ data = pim_neighbor[senderRouter]
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" in data and data["pim"] == "enable":
+ receiver_interface = data["interface"]
+ receiver_interface_ip = data["ipv4"]
+
+ config_data.append("interface {}".format(receiver_interface))
+ config_data.append("no ip address {}".format(receiver_interface_ip))
+ config_data.append("ip address {}".format(dest_ip))
+
+ result = create_common_configuration(
+ tgen, receiverRouter, config_data, "interface_config"
+ )
+ if result is not True:
+ return False
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: reconfig_interfaces()")
+ return result
+
+
+def add_rp_interfaces_and_pim_config(tgen, topo, interface, rp, rp_mapping):
+ """
+ Add physical interfaces tp RP for all the RPs
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `interface` : RP interface
+ * `rp` : rp for given topology
+ * `rp_mapping` : dictionary of all groups and RPs
+
+ Returns
+ -------
+ True or False
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ try:
+ config_data = []
+
+ for group, rp_list in rp_mapping.items():
+ for _rp in rp_list:
+ config_data.append("interface {}".format(interface))
+ config_data.append("ip address {}".format(_rp))
+ config_data.append("ip pim")
+
+ # Why not config just once, why per group?
+ result = create_common_configuration(
+ tgen, rp, config_data, "interface_config"
+ )
+ if result is not True:
+ return False
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=None):
+ """
+ Using scapy Raw() method to send BSR raw packet from one FRR
+ to other
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `senderRouter` : Sender router
+ * `receiverRouter` : Receiver router
+ * `packet` : BSR packet in raw format
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ global CWD
+ result = ""
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ python3_path = tgen.net.get_exec_path(["python3", "python"])
+ script_path = os.path.join(CWD, "send_bsr_packet.py")
+ node = tgen.net[senderRouter]
+
+ for destLink, data in topo["routers"][senderRouter]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" in data and data["pim"] == "enable":
+ sender_interface = data["interface"]
+
+ packet = topo["routers"][senderRouter]["bsm"]["bsr_packets"][packet]["data"]
+
+ cmd = [
+ python3_path,
+ script_path,
+ packet,
+ sender_interface,
+ "--interval=1",
+ "--count=1",
+ ]
+ logger.info("Scapy cmd: \n %s", cmd)
+ node.cmd_raises(cmd)
+
+ logger.debug("Exiting lib API: scapy_send_bsr_raw_packet")
+ return True
+
+
+def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None):
+ """
+ Find which RP is having lowest prioriy and returns rp IP
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `bsr`: BSR address
+ * 'grp': Group Address
+
+ Usage
+ -----
+ dut = "r1"
+ result = verify_pim_rp_info(tgen, dut, bsr)
+
+ Returns:
+ dictionary: group and RP, which has to be installed as per
+ lowest priority or highest priority
+ """
+
+ rp_details = {}
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Fetching rp details from bsrp-info", dut)
+ bsrp_json = run_frr_cmd(rnode, "show ip pim bsrp-info json", isjson=True)
+
+ if grp not in bsrp_json:
+ return {}
+
+ for group, rp_data in bsrp_json.items():
+ if group == "BSR Address" and bsrp_json["BSR Address"] == bsr:
+ continue
+
+ if group != grp:
+ continue
+
+ rp_priority = {}
+ rp_hash = {}
+
+ for rp, value in rp_data.items():
+ if rp == "Pending RP count":
+ continue
+ rp_priority[value["Rp Address"]] = value["Rp Priority"]
+ rp_hash[value["Rp Address"]] = value["Hash Val"]
+
+ priority_dict = dict(zip(rp_priority.values(), rp_priority.keys()))
+ hash_dict = dict(zip(rp_hash.values(), rp_hash.keys()))
+
+ # RP with lowest priority
+ if len(priority_dict) != 1:
+ rp_p, lowest_priority = sorted(rp_priority.items(), key=lambda x: x[1])[0]
+ rp_details[group] = rp_p
+
+ # RP with highest hash value
+ if len(priority_dict) == 1:
+ rp_h, highest_hash = sorted(rp_hash.items(), key=lambda x: x[1])[-1]
+ rp_details[group] = rp_h
+
+ # RP with highest IP address
+ if len(priority_dict) == 1 and len(hash_dict) == 1:
+ rp_details[group] = sorted(rp_priority.keys())[-1]
+
+ return rp_details
+
+
+@retry(retry_timeout=12)
+def verify_pim_grp_rp_source(
+ tgen, topo, dut, grp_addr, rp_source, rpadd=None, expected=True
+):
+ """
+ Verify pim rp info by running "show ip pim rp-info" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: JSON file handler
+ * `dut`: device under test
+ * `grp_addr`: IGMP group address
+ * 'rp_source': source from which rp installed
+ * 'rpadd': rp address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ group_address = "225.1.1.1"
+ rp_source = "BSR"
+ result = verify_pim_rp_and_source(tgen, topo, dut, group_address, rp_source)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying ip rp info", dut)
+ show_ip_rp_info_json = run_frr_cmd(rnode, "show ip pim rp-info json", isjson=True)
+
+ if rpadd != None:
+ rp_json = show_ip_rp_info_json[rpadd]
+ if rp_json[0]["group"] == grp_addr:
+ if rp_json[0]["source"] == rp_source:
+ logger.info(
+ "[DUT %s]: Verifying Group and rp_source [PASSED]"
+ "Found Expected: %s, %s"
+ % (dut, rp_json[0]["group"], rp_json[0]["source"])
+ )
+ return True
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying Group and rp_source [FAILED]"
+ "Expected (%s, %s) "
+ "Found (%s, %s)"
+ % (
+ dut,
+ grp_addr,
+ rp_source,
+ rp_json[0]["group"],
+ rp_json[0]["source"],
+ )
+ )
+ return errormsg
+ errormsg = (
+ "[DUT %s]: Verifying Group and rp_source [FAILED]"
+ "Expected: %s, %s but not found" % (dut, grp_addr, rp_source)
+ )
+ return errormsg
+
+ for rp in show_ip_rp_info_json:
+ rp_json = show_ip_rp_info_json[rp]
+ logger.info("%s", rp_json)
+ if rp_json[0]["group"] == grp_addr:
+ if rp_json[0]["source"] == rp_source:
+ logger.info(
+ "[DUT %s]: Verifying Group and rp_source [PASSED]"
+ "Found Expected: %s, %s"
+ % (dut, rp_json[0]["group"], rp_json[0]["source"])
+ )
+ return True
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying Group and rp_source [FAILED]"
+ "Expected (%s, %s) "
+ "Found (%s, %s)"
+ % (
+ dut,
+ grp_addr,
+ rp_source,
+ rp_json[0]["group"],
+ rp_json[0]["source"],
+ )
+ )
+ return errormsg
+
+ errormsg = (
+ "[DUT %s]: Verifying Group and rp_source [FAILED]"
+ "Expected: %s, %s but not found" % (dut, grp_addr, rp_source)
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return errormsg
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_bsr(tgen, topo, dut, bsr_ip, expected=True):
+ """
+ Verify all PIM interface are up and running, config is verified
+ using "show ip pim interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * 'bsr' : bsr ip to be verified
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_pim_bsr(tgen, topo, dut, bsr_ip)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if router != dut:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM bsr status:", dut)
+
+ rnode = tgen.routers()[dut]
+ pim_bsr_json = rnode.vtysh_cmd("show ip pim bsr json", isjson=True)
+
+ logger.info("show_ip_pim_bsr_json: \n %s", pim_bsr_json)
+
+ # Verifying PIM bsr
+ if pim_bsr_json["bsr"] != bsr_ip:
+ errormsg = (
+ "[DUT %s]:"
+ "bsr status: not found"
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, bsr_ip, pim_bsr_json["bsr"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]:" " bsr status: found, Address :%s" " [PASSED]!!",
+ dut,
+ pim_bsr_json["bsr"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_upstream_rpf(
+ tgen, topo, dut, interface, group_addresses, rp=None, expected=True
+):
+ """
+ Verify IP/IPv6 PIM upstream rpf, config is verified
+ using "show ip/ipv6 pim neighbor" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : devuce under test
+ * `interface` : upstream interface
+ * `group_addresses` : list of group address for which upstream info
+ needs to be checked
+ * `rp` : RP address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_pim_upstream_rpf(gen, topo, dut, interface,
+ group_addresses, rp=None)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if "pim" in topo["routers"][dut]:
+
+ logger.info("[DUT: %s]: Verifying ip pim upstream rpf:", dut)
+
+ rnode = tgen.routers()[dut]
+ show_ip_pim_upstream_rpf_json = rnode.vtysh_cmd(
+ "show ip pim upstream-rpf json", isjson=True
+ )
+
+ logger.info(
+ "show_ip_pim_upstream_rpf_json: \n %s", show_ip_pim_upstream_rpf_json
+ )
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ for grp_addr in group_addresses:
+ for destLink, data in topo["routers"][dut]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
+
+ if "pim" not in topo["routers"][destLink]:
+ continue
+
+ # Verify RP info
+ if rp is None:
+ rp_details = find_rp_details(tgen, topo)
+ else:
+ rp_details = {dut: rp}
+
+ if dut in rp_details:
+ pim_nh_intf_ip = topo["routers"][dut]["links"]["lo"]["ipv4"].split(
+ "/"
+ )[0]
+ else:
+ if destLink not in interface:
+ continue
+
+ links = topo["routers"][destLink]["links"]
+ pim_neighbor = {key: links[key] for key in [dut]}
+
+ data = pim_neighbor[dut]
+ if "pim" in data and data["pim"] == "enable":
+ pim_nh_intf_ip = data["ipv4"].split("/")[0]
+
+ upstream_rpf_json = show_ip_pim_upstream_rpf_json[grp_addr]["*"]
+
+ # Verifying ip pim upstream rpf
+ if (
+ upstream_rpf_json["rpfInterface"] == interface
+ and upstream_rpf_json["ribNexthop"] != pim_nh_intf_ip
+ ):
+ errormsg = (
+ "[DUT %s]: Verifying group: %s, "
+ "rpf interface: %s, "
+ " rib Nexthop check [FAILED]!!"
+ "Expected: %s, Found: %s"
+ % (
+ dut,
+ grp_addr,
+ interface,
+ pim_nh_intf_ip,
+ upstream_rpf_json["ribNexthop"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying group: %s,"
+ " rpf interface: %s, "
+ " rib Nexthop: %s [PASSED]!!",
+ dut,
+ grp_addr,
+ interface,
+ pim_nh_intf_ip,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def enable_disable_pim_unicast_bsm(tgen, router, intf, enable=True):
+ """
+ Helper API to enable or disable pim bsm on interfaces
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `router` : router id to be configured.
+ * `intf` : Interface to be configured
+ * `enable` : this flag denotes if config should be enabled or disabled
+
+ Returns
+ -------
+ True or False
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ try:
+ config_data = []
+ cmd = "interface {}".format(intf)
+ config_data.append(cmd)
+
+ if enable == True:
+ config_data.append("ip pim unicast-bsm")
+ else:
+ config_data.append("no ip pim unicast-bsm")
+
+ result = create_common_configuration(
+ tgen, router, config_data, "interface_config", build=False
+ )
+ if result is not True:
+ return False
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+def enable_disable_pim_bsm(tgen, router, intf, enable=True):
+ """
+ Helper API to enable or disable pim bsm on interfaces
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `router` : router id to be configured.
+ * `intf` : Interface to be configured
+ * `enable` : this flag denotes if config should be enabled or disabled
+
+ Returns
+ -------
+ True or False
+ """
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ try:
+ config_data = []
+ cmd = "interface {}".format(intf)
+ config_data.append(cmd)
+
+ if enable is True:
+ config_data.append("ip pim bsm")
+ else:
+ config_data.append("no ip pim bsm")
+
+ result = create_common_configuration(
+ tgen, router, config_data, "interface_config", build=False
+ )
+ if result is not True:
+ return False
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_join(
+ tgen, topo, dut, interface, group_addresses, src_address=None, expected=True
+):
+ """
+ Verify ip/ipv6 pim join by running "show ip/ipv6 pim join" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: JSON file handler
+ * `dut`: device under test
+ * `interface`: interface name, from which PIM join would come
+ * `group_addresses`: IGMP group address
+ * `src_address`: Source address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_pim_join(tgen, dut, star, group_address, interface)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying pim join", dut)
+ show_pim_join_json = run_frr_cmd(rnode, "show ip pim join json", isjson=True)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ for grp_addr in group_addresses:
+ # Verify if IGMP is enabled in DUT
+ if "igmp" not in topo["routers"][dut]:
+ pim_join = True
+ else:
+ pim_join = False
+
+ interface_json = show_pim_join_json[interface]
+
+ grp_addr = grp_addr.split("/")[0]
+ for source, data in interface_json[grp_addr].items():
+
+ # Verify pim join
+ if pim_join:
+ if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
+ logger.info(
+ "[DUT %s]: Verifying pim join for group: %s"
+ "[PASSED]!! Found Expected: (%s)",
+ dut,
+ grp_addr,
+ data["channelJoinName"],
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying pim join for group: %s"
+ "[FAILED]!! Expected: (%s) "
+ "Found: (%s)" % (dut, grp_addr, "JOIN", data["channelJoinName"])
+ )
+ return errormsg
+
+ if not pim_join:
+ if data["group"] == grp_addr and data["channelJoinName"] == "NOINFO":
+ logger.info(
+ "[DUT %s]: Verifying pim join for group: %s"
+ "[PASSED]!! Found Expected: (%s)",
+ dut,
+ grp_addr,
+ data["channelJoinName"],
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Verifying pim join for group: %s"
+ "[FAILED]!! Expected: (%s) "
+ "Found: (%s)"
+ % (dut, grp_addr, "NOINFO", data["channelJoinName"])
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_igmp_config(tgen, input_dict, stats_return=False, expected=True):
+ """
+ Verify igmp interface details, verifying following configs:
+ timerQueryInterval
+ timerQueryResponseIntervalMsec
+ lastMemberQueryCount
+ timerLastMemberQueryMsec
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict` : Input dict data, required to verify
+ timer
+ * `stats_return`: If user wants API to return statistics
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict ={
+ "l1": {
+ "igmp": {
+ "interfaces": {
+ "l1-i1-eth1": {
+ "igmp": {
+ "query": {
+ "query-interval" : 200,
+ "query-max-response-time" : 100
+ },
+ "statistics": {
+ "queryV2" : 2,
+ "reportV2" : 1
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_igmp_config(tgen, input_dict, stats_return)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
+
+ statistics = False
+ report = False
+ if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
+ statistics = True
+ cmd = "show ip igmp statistics"
+ else:
+ cmd = "show ip igmp"
+
+ logger.info(
+ "[DUT: %s]: Verifying IGMP interface %s detail:", dut, interface
+ )
+
+ if statistics:
+ if (
+ "report"
+ in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"][
+ "statistics"
+ ]
+ ):
+ report = True
+
+ if statistics and report:
+ show_ip_igmp_intf_json = run_frr_cmd(
+ rnode, "{} json".format(cmd), isjson=True
+ )
+ intf_detail_json = show_ip_igmp_intf_json["global"]
+ else:
+ show_ip_igmp_intf_json = run_frr_cmd(
+ rnode, "{} interface {} json".format(cmd, interface), isjson=True
+ )
+
+ if not report:
+ if interface not in show_ip_igmp_intf_json:
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ " is not present in CLI output "
+ "[FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ else:
+ intf_detail_json = show_ip_igmp_intf_json[interface]
+
+ if stats_return:
+ igmp_stats = {}
+
+ if "statistics" in data["igmp"]:
+ if stats_return:
+ igmp_stats["statistics"] = {}
+ for query, value in data["igmp"]["statistics"].items():
+ if query == "queryV2":
+ # Verifying IGMP interface queryV2 statistics
+ if stats_return:
+ igmp_stats["statistics"][query] = intf_detail_json[
+ "queryV2"
+ ]
+
+ else:
+ if intf_detail_json["queryV2"] != value:
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ " queryV2 statistics verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["queryV2"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP interface: %s "
+ "queryV2 statistics is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if query == "reportV2":
+ # Verifying IGMP interface timerV2 statistics
+ if stats_return:
+ igmp_stats["statistics"][query] = intf_detail_json[
+ "reportV2"
+ ]
+
+ else:
+ if intf_detail_json["reportV2"] <= value:
+ errormsg = (
+ "[DUT %s]: IGMP reportV2 "
+ "statistics verification "
+ "[FAILED]!! Expected : %s "
+ "or more, Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP reportV2 " "statistics is %s",
+ dut,
+ intf_detail_json["reportV2"],
+ )
+
+ if "query" in data["igmp"]:
+ for query, value in data["igmp"]["query"].items():
+ if query == "query-interval":
+ # Verifying IGMP interface query interval timer
+ if intf_detail_json["timerQueryInterval"] != value:
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ " query-interval verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["timerQueryInterval"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP interface: %s " "query-interval is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if query == "query-max-response-time":
+ # Verifying IGMP interface query max response timer
+ if (
+ intf_detail_json["timerQueryResponseIntervalMsec"]
+ != value * 100
+ ):
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ "query-max-response-time "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value * 1000,
+ intf_detail_json["timerQueryResponseIntervalMsec"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP interface: %s "
+ "query-max-response-time is %s ms",
+ dut,
+ interface,
+ value * 100,
+ )
+
+ if query == "last-member-query-count":
+ # Verifying IGMP interface last member query count
+ if intf_detail_json["lastMemberQueryCount"] != value:
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ "last-member-query-count "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value,
+ intf_detail_json["lastMemberQueryCount"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP interface: %s "
+ "last-member-query-count is %s ms",
+ dut,
+ interface,
+ value * 1000,
+ )
+
+ if query == "last-member-query-interval":
+ # Verifying IGMP interface last member query interval
+ if (
+ intf_detail_json["timerLastMemberQueryMsec"]
+ != value * 100 * intf_detail_json["lastMemberQueryCount"]
+ ):
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ "last-member-query-interval "
+ "verification [FAILED]!!"
+ " Expected : %s, Found : %s"
+ % (
+ dut,
+ interface,
+ value * 1000,
+ intf_detail_json["timerLastMemberQueryMsec"],
+ )
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP interface: %s "
+ "last-member-query-interval is %s ms",
+ dut,
+ interface,
+ value * intf_detail_json["lastMemberQueryCount"] * 100,
+ )
+
+ if "version" in data["igmp"]:
+ # Verifying IGMP interface state is up
+ if intf_detail_json["state"] != "up":
+ errormsg = (
+ "[DUT %s]: IGMP interface: %s "
+ " state: %s verification "
+ "[FAILED]!!" % (dut, interface, intf_detail_json["state"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: IGMP interface: %s " "state: %s",
+ dut,
+ interface,
+ intf_detail_json["state"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True if stats_return == False else igmp_stats
+
+
+@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_config(tgen, input_dict, expected=True):
+ """
+ Verify pim interface details, verifying following configs:
+ drPriority
+ helloPeriod
+ helloReceived
+ helloSend
+ drAddress
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict` : Input dict data, required to verify
+ timer
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict ={
+ "l1": {
+ "igmp": {
+ "interfaces": {
+ "l1-i1-eth1": {
+ "pim": {
+ "drPriority" : 10,
+ "helloPeriod" : 5
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = verify_pim_config(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ for interface, data in input_dict[dut]["pim"]["interfaces"].items():
+
+ logger.info("[DUT: %s]: Verifying PIM interface %s detail:", dut, interface)
+
+ show_ip_igmp_intf_json = run_frr_cmd(
+ rnode, "show ip pim interface {} json".format(interface), isjson=True
+ )
+
+ if interface not in show_ip_igmp_intf_json:
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ " is not present in CLI output "
+ "[FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ intf_detail_json = show_ip_igmp_intf_json[interface]
+
+ for config, value in data.items():
+ if config == "helloPeriod":
+ # Verifying PIM interface helloPeriod
+ if intf_detail_json["helloPeriod"] != value:
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ " helloPeriod verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (dut, interface, value, intf_detail_json["helloPeriod"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM interface: %s " "helloPeriod is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if config == "drPriority":
+ # Verifying PIM interface drPriority
+ if intf_detail_json["drPriority"] != value:
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ " drPriority verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (dut, interface, value, intf_detail_json["drPriority"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM interface: %s " "drPriority is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ if config == "drAddress":
+ # Verifying PIM interface drAddress
+ if intf_detail_json["drAddress"] != value:
+ errormsg = (
+ "[DUT %s]: PIM interface: %s "
+ " drAddress verification "
+ "[FAILED]!! Expected : %s,"
+ " Found : %s"
+ % (dut, interface, value, intf_detail_json["drAddress"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: PIM interface: %s " "drAddress is %s",
+ dut,
+ interface,
+ value,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=20, diag_pct=0)
+def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=True):
+ """
+ Verify multicast traffic by running
+ "show multicast traffic count json" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict(dict)`: defines DUT, what and for which interfaces
+ traffic needs to be verified
+ * `return_traffic`: returns traffic stats
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "traffic_received": ["r1-r0-eth0"],
+ "traffic_sent": ["r1-r0-eth0"]
+ }
+ }
+
+ result = verify_multicast_traffic(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ traffic_dict = {}
+ for dut in input_dict.keys():
+ if dut not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying multicast " "traffic", dut)
+
+ show_multicast_traffic_json = run_frr_cmd(
+ rnode, "show ip multicast count json", isjson=True
+ )
+
+ for traffic_type, interfaces in input_dict[dut].items():
+ traffic_dict[traffic_type] = {}
+ if traffic_type == "traffic_received":
+ for interface in interfaces:
+ traffic_dict[traffic_type][interface] = {}
+ interface_json = show_multicast_traffic_json[interface]
+
+ if interface_json["pktsIn"] == 0 and interface_json["bytesIn"] == 0:
+ errormsg = (
+ "[DUT %s]: Multicast traffic is "
+ "not received on interface %s "
+ "PktsIn: %s, BytesIn: %s "
+ "[FAILED]!!"
+ % (
+ dut,
+ interface,
+ interface_json["pktsIn"],
+ interface_json["bytesIn"],
+ )
+ )
+ return errormsg
+
+ elif (
+ interface_json["pktsIn"] != 0 and interface_json["bytesIn"] != 0
+ ):
+
+ traffic_dict[traffic_type][interface][
+ "pktsIn"
+ ] = interface_json["pktsIn"]
+ traffic_dict[traffic_type][interface][
+ "bytesIn"
+ ] = interface_json["bytesIn"]
+
+ logger.info(
+ "[DUT %s]: Multicast traffic is "
+ "received on interface %s "
+ "PktsIn: %s, BytesIn: %s "
+ "[PASSED]!!"
+ % (
+ dut,
+ interface,
+ interface_json["pktsIn"],
+ interface_json["bytesIn"],
+ )
+ )
+
+ else:
+ errormsg = (
+ "[DUT %s]: Multicast traffic interface %s:"
+ " Miss-match in "
+ "PktsIn: %s, BytesIn: %s"
+ "[FAILED]!!"
+ % (
+ dut,
+ interface,
+ interface_json["pktsIn"],
+ interface_json["bytesIn"],
+ )
+ )
+ return errormsg
+
+ if traffic_type == "traffic_sent":
+ traffic_dict[traffic_type] = {}
+ for interface in interfaces:
+ traffic_dict[traffic_type][interface] = {}
+ interface_json = show_multicast_traffic_json[interface]
+
+ if (
+ interface_json["pktsOut"] == 0
+ and interface_json["bytesOut"] == 0
+ ):
+ errormsg = (
+ "[DUT %s]: Multicast traffic is "
+ "not received on interface %s "
+ "PktsIn: %s, BytesIn: %s"
+ "[FAILED]!!"
+ % (
+ dut,
+ interface,
+ interface_json["pktsOut"],
+ interface_json["bytesOut"],
+ )
+ )
+ return errormsg
+
+ elif (
+ interface_json["pktsOut"] != 0
+ and interface_json["bytesOut"] != 0
+ ):
+
+ traffic_dict[traffic_type][interface][
+ "pktsOut"
+ ] = interface_json["pktsOut"]
+ traffic_dict[traffic_type][interface][
+ "bytesOut"
+ ] = interface_json["bytesOut"]
+
+ logger.info(
+ "[DUT %s]: Multicast traffic is "
+ "received on interface %s "
+ "PktsOut: %s, BytesOut: %s "
+ "[PASSED]!!"
+ % (
+ dut,
+ interface,
+ interface_json["pktsOut"],
+ interface_json["bytesOut"],
+ )
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: Multicast traffic interface %s:"
+ " Miss-match in "
+ "PktsOut: %s, BytesOut: %s "
+ "[FAILED]!!"
+ % (
+ dut,
+ interface,
+ interface_json["pktsOut"],
+ interface_json["bytesOut"],
+ )
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True if return_traffic == False else traffic_dict
+
+
+def get_refCount_for_mroute(tgen, dut, iif, src_address, group_addresses):
+ """
+ Verify upstream inbound interface is updated correctly
+ by running "show ip pim upstream" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `iif`: inbound interface
+ * `src_address`: source address
+ * `group_addresses`: IGMP group address
+
+ Usage
+ -----
+ dut = "r1"
+ iif = "r1-r0-eth0"
+ src_address = "*"
+ group_address = "225.1.1.1"
+ result = get_refCount_for_mroute(tgen, dut, iif, src_address,
+ group_address)
+
+ Returns
+ -------
+ refCount(int)
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ refCount = 0
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying refCount for mroutes: ", dut)
+ show_ip_pim_upstream_json = run_frr_cmd(
+ rnode, "show ip pim upstream json", isjson=True
+ )
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ for grp_addr in group_addresses:
+ # Verify group address
+ if grp_addr not in show_ip_pim_upstream_json:
+ errormsg = "[DUT %s]: Verifying upstream" " for group %s [FAILED]!!" % (
+ dut,
+ grp_addr,
+ )
+ return errormsg
+ group_addr_json = show_ip_pim_upstream_json[grp_addr]
+
+ # Verify source address
+ if src_address not in group_addr_json:
+ errormsg = "[DUT %s]: Verifying upstream" " for (%s,%s) [FAILED]!!" % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+
+ # Verify Inbound Interface
+ if group_addr_json[src_address]["inboundInterface"] == iif:
+ refCount = group_addr_json[src_address]["refCount"]
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return refCount
+
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_multicast_flag_state(
+ tgen, dut, src_address, group_addresses, flag, expected=True
+):
+ """
+ Verify flag state for mroutes and make sure (*, G)/(S, G) are having
+ coorect flags by running "show ip mroute" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `src_address`: source address
+ * `group_addresses`: IGMP group address
+ * `flag`: flag state, needs to be verified
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ dut = "r1"
+ flag = "SC"
+ group_address = "225.1.1.1"
+ result = verify_multicast_flag_state(tgen, dut, src_address,
+ group_address, flag)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying flag state for mroutes", dut)
+ show_ip_mroute_json = run_frr_cmd(rnode, "show ip mroute json", isjson=True)
+
+ if bool(show_ip_mroute_json) == False:
+ error_msg = "[DUT %s]: mroutes are not present or flushed out !!" % (dut)
+ return error_msg
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ for grp_addr in group_addresses:
+ if grp_addr not in show_ip_mroute_json:
+ errormsg = (
+ "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! ",
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+ else:
+ group_addr_json = show_ip_mroute_json[grp_addr]
+
+ if src_address not in group_addr_json:
+ errormsg = "[DUT %s]: Verifying (%s, %s) mroute," "[FAILED]!! " % (
+ dut,
+ src_address,
+ grp_addr,
+ )
+ return errormsg
+ else:
+ mroutes = group_addr_json[src_address]
+
+ if mroutes["installed"] != 0:
+ logger.info(
+ "[DUT %s]: mroute (%s,%s) is installed", dut, src_address, grp_addr
+ )
+
+ if mroutes["flags"] != flag:
+ errormsg = (
+ "[DUT %s]: Verifying flag for (%s, %s) "
+ "mroute [FAILED]!! "
+ "Expected: %s Found: %s"
+ % (dut, src_address, grp_addr, flag, mroutes["flags"])
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying flag for (%s, %s)"
+ " mroute, [PASSED]!! "
+ "Found Expected: %s",
+ dut,
+ src_address,
+ grp_addr,
+ mroutes["flags"],
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=40, diag_pct=0)
+def verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip, expected=True):
+ """
+ Verify all IGMP interface are up and running, config is verified
+ using "show ip igmp interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `igmp_iface` : interface name
+ * `interface_ip` : interface ip address
+ * `expected` : expected results from API, by-default True
+
+ Usage
+ -----
+ result = verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if router != dut:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
+
+ rnode = tgen.routers()[dut]
+ show_ip_igmp_interface_json = run_frr_cmd(
+ rnode, "show ip igmp interface json", isjson=True
+ )
+
+ if igmp_iface in show_ip_igmp_interface_json:
+ igmp_intf_json = show_ip_igmp_interface_json[igmp_iface]
+ # Verifying igmp interface
+ if igmp_intf_json["address"] != interface_ip:
+ errormsg = (
+ "[DUT %s]: igmp interface ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ % (dut, igmp_intf_json["address"], interface_ip)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: igmp interface: %s, " "interface ip: %s" " [PASSED]!!",
+ dut,
+ igmp_iface,
+ interface_ip,
+ )
+ else:
+ errormsg = (
+ "[DUT %s]: igmp interface: %s "
+ "igmp interface ip: %s, is not present "
+ % (dut, igmp_iface, interface_ip)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+class McastTesterHelper(HostApplicationHelper):
+ def __init__(self, tgen=None):
+ self.script_path = os.path.join(CWD, "mcast-tester.py")
+ self.host_conn = {}
+ self.listen_sock = None
+
+ # # Get a temporary file for socket path
+ # (fd, sock_path) = tempfile.mkstemp("-mct.sock", "tmp" + str(os.getpid()))
+ # os.close(fd)
+ # os.remove(sock_path)
+ # self.app_sock_path = sock_path
+
+ # # Listen on unix socket
+ # logger.debug("%s: listening on socket %s", self, self.app_sock_path)
+ # self.listen_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+ # self.listen_sock.settimeout(10)
+ # self.listen_sock.bind(self.app_sock_path)
+ # self.listen_sock.listen(10)
+
+ python3_path = get_exec_path(["python3", "python"])
+ super(McastTesterHelper, self).__init__(
+ tgen,
+ # [python3_path, self.script_path, self.app_sock_path]
+ [python3_path, self.script_path],
+ )
+
+ def __str__(self):
+ return "McastTesterHelper({})".format(self.script_path)
+
+ def run_join(self, host, join_addrs, join_towards=None, join_intf=None):
+ """
+ Join a UDP multicast group.
+
+ One of join_towards or join_intf MUST be set.
+
+ Parameters:
+ -----------
+ * `host`: host from where IGMP join would be sent
+ * `join_addrs`: multicast address (or addresses) to join to
+ * `join_intf`: the interface to bind the join[s] to
+ * `join_towards`: router whos interface to bind the join[s] to
+ """
+ if not isinstance(join_addrs, list) and not isinstance(join_addrs, tuple):
+ join_addrs = [join_addrs]
+
+ if join_towards:
+ join_intf = frr_unicode(
+ self.tgen.json_topo["routers"][host]["links"][join_towards]["interface"]
+ )
+ else:
+ assert join_intf
+
+ for join in join_addrs:
+ self.run(host, [join, join_intf])
+
+ return True
+
+ def run_traffic(self, host, send_to_addrs, bind_towards=None, bind_intf=None):
+ """
+ Send UDP multicast traffic.
+
+ One of bind_towards or bind_intf MUST be set.
+
+ Parameters:
+ -----------
+ * `host`: host to send traffic from
+ * `send_to_addrs`: multicast address (or addresses) to send traffic to
+ * `bind_towards`: Router who's interface the source ip address is got from
+ """
+ if bind_towards:
+ bind_intf = frr_unicode(
+ self.tgen.json_topo["routers"][host]["links"][bind_towards]["interface"]
+ )
+ else:
+ assert bind_intf
+
+ if not isinstance(send_to_addrs, list) and not isinstance(send_to_addrs, tuple):
+ send_to_addrs = [send_to_addrs]
+
+ for send_to in send_to_addrs:
+ self.run(host, ["--send=0.7", send_to, bind_intf])
+
+ return True
+
+
+@retry(retry_timeout=62)
+def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
+ """
+ Verify local IGMP groups are received from an intended interface
+ by running "show ip igmp join json" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which IGMP groups are configured
+ * `group_addresses`: IGMP group address
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_local_igmp_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying local IGMP groups received:", dut)
+ show_ip_local_igmp_json = run_frr_cmd(rnode, "show ip igmp join json", isjson=True)
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface not in show_ip_local_igmp_json:
+
+ errormsg = (
+ "[DUT %s]: Verifying local IGMP group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ for grp_addr in group_addresses:
+ found = False
+ for index in show_ip_local_igmp_json[interface]["groups"]:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if not found:
+ errormsg = (
+ "[DUT %s]: Verifying local IGMP group received"
+ " from interface %s [FAILED]!! "
+ " Expected: %s " % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying local IGMP group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
+ """
+ Verify ip pim interface traffice by running
+ "show ip pim interface traffic" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `input_dict(dict)`: defines DUT, what and from which interfaces
+ traffic needs to be verified
+ * [optional]`addr_type`: specify address-family, default is ipv4
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "r1-r0-eth0": {
+ "helloRx": 0,
+ "helloTx": 1,
+ "joinRx": 0,
+ "joinTx": 0
+ }
+ }
+ }
+
+ result = verify_pim_interface_traffic(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ output_dict = {}
+ for dut in input_dict.keys():
+ if dut not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying pim interface traffic", dut)
+
+ if addr_type == "ipv4":
+ cmd = "show ip pim interface traffic json"
+ elif addr_type == "ipv6":
+ cmd = "show ipv6 pim interface traffic json"
+
+ show_pim_intf_traffic_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ output_dict[dut] = {}
+ for intf, data in input_dict[dut].items():
+ interface_json = show_pim_intf_traffic_json[intf]
+ for state in data:
+
+ # Verify Tx/Rx
+ if state in interface_json:
+ output_dict[dut][state] = interface_json[state]
+ else:
+ errormsg = (
+ "[DUT %s]: %s is not present"
+ "for interface %s [FAILED]!! " % (dut, state, intf)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True if return_stats == False else output_dict
+
+ # def cleanup(self):
+ # super(McastTesterHelper, self).cleanup()
+
+ # if not self.listen_sock:
+ # return
+
+ # logger.debug("%s: closing listen socket %s", self, self.app_sock_path)
+ # self.listen_sock.close()
+ # self.listen_sock = None
+
+ # if os.path.exists(self.app_sock_path):
+ # os.remove(self.app_sock_path)
+
+ # def started_proc(self, host, p):
+ # logger.debug("%s: %s: accepting on socket %s", self, host, self.app_sock_path)
+ # try:
+ # conn = self.listen_sock.accept()
+ # return conn
+ # except Exception as error:
+ # logger.error("%s: %s: accept on socket failed: %s", self, host, error)
+ # if p.poll() is not None:
+ # logger.error("%s: %s: helper app quit: %s", self, host, comm_error(p))
+ # raise
+
+ # def stopping_proc(self, host, p, conn):
+ # logger.debug("%s: %s: closing socket %s", self, host, conn)
+ # conn[0].close()