diff options
Diffstat (limited to '')
-rw-r--r-- | tests/topotests/lib/common_config.py | 5130 |
1 files changed, 5130 insertions, 0 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py new file mode 100644 index 0000000..5f4c280 --- /dev/null +++ b/tests/topotests/lib/common_config.py @@ -0,0 +1,5130 @@ +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. +# ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +import ipaddress +import json +import os +import platform +import socket +import subprocess +import sys +import traceback +import functools +from collections import OrderedDict +from copy import deepcopy +from datetime import datetime, timedelta +from functools import wraps +from re import search as re_search +from time import sleep + +try: + # Imports from python2 + import ConfigParser as configparser +except ImportError: + # Imports from python3 + import configparser + +from lib.micronet import comm_error +from lib.topogen import TopoRouter, get_topogen +from lib.topolog import get_logger, logger +from lib.topotest import frr_unicode, interface_set_status, version_cmp +from lib import topotest + +FRRCFG_FILE = "frr_json.conf" +FRRCFG_BKUP_FILE = "frr_json_initial.conf" + +ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"] + +#### +CD = os.path.dirname(os.path.realpath(__file__)) +PYTESTINI_PATH = os.path.join(CD, "../pytest.ini") + +# NOTE: to save execution logs to log file frrtest_log_dir must be configured +# in `pytest.ini`. +config = configparser.ConfigParser() +config.read(PYTESTINI_PATH) + +config_section = "topogen" + +# Debug logs for daemons +DEBUG_LOGS = { + "pimd": [ + "debug msdp events", + "debug msdp packets", + "debug igmp events", + "debug igmp trace", + "debug mroute", + "debug mroute detail", + "debug pim events", + "debug pim packets", + "debug pim trace", + "debug pim zebra", + "debug pim bsm", + "debug pim packets joins", + "debug pim packets register", + "debug pim nht", + ], + "bgpd": [ + "debug bgp neighbor-events", + "debug bgp updates", + "debug bgp zebra", + "debug bgp nht", + "debug bgp neighbor-events", + "debug bgp graceful-restart", + "debug bgp update-groups", + "debug bgp vpn leak-from-vrf", + "debug bgp vpn leak-to-vrf", + "debug bgp zebr", + "debug bgp updates", + "debug bgp nht", + "debug bgp neighbor-events", + "debug vrf", + ], + "zebra": [ + "debug zebra events", + "debug zebra rib", + "debug zebra vxlan", + "debug zebra nht", + ], + "ospf": [ + "debug ospf event", + "debug ospf ism", + "debug ospf lsa", + "debug ospf nsm", + "debug ospf nssa", + "debug ospf packet all", + "debug ospf sr", + "debug ospf te", + "debug ospf zebra", + ], + "ospf6": [ + "debug ospf6 event", + "debug ospf6 ism", + "debug ospf6 lsa", + "debug ospf6 nsm", + "debug ospf6 nssa", + "debug ospf6 packet all", + "debug ospf6 sr", + "debug ospf6 te", + "debug ospf6 zebra", + ], +} + +g_iperf_client_procs = {} +g_iperf_server_procs = {} + + +def is_string(value): + try: + return isinstance(value, basestring) + except NameError: + return isinstance(value, str) + + +if config.has_option("topogen", "verbosity"): + loglevel = config.get("topogen", "verbosity") + loglevel = loglevel.lower() +else: + loglevel = "info" + +if config.has_option("topogen", "frrtest_log_dir"): + frrtest_log_dir = config.get("topogen", "frrtest_log_dir") + time_stamp = datetime.time(datetime.now()) + logfile_name = "frr_test_bgp_" + frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp) + print("frrtest_log_file..", frrtest_log_file) + + logger = get_logger( + "test_execution_logs", log_level=loglevel, target=frrtest_log_file + ) + print("Logs will be sent to logfile: {}".format(frrtest_log_file)) + +if config.has_option("topogen", "show_router_config"): + show_router_config = config.get("topogen", "show_router_config") +else: + show_router_config = False + +# env variable for setting what address type to test +ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES") + + +# Saves sequence id numbers +SEQ_ID = {"prefix_lists": {}, "route_maps": {}} + + +def get_seq_id(obj_type, router, obj_name): + """ + Generates and saves sequence number in interval of 10 + Parameters + ---------- + * `obj_type`: prefix_lists or route_maps + * `router`: router name + *` obj_name`: name of the prefix-list or route-map + Returns + -------- + Sequence number generated + """ + + router_data = SEQ_ID[obj_type].setdefault(router, {}) + obj_data = router_data.setdefault(obj_name, {}) + seq_id = obj_data.setdefault("seq_id", 0) + + seq_id = int(seq_id) + 10 + obj_data["seq_id"] = seq_id + + return seq_id + + +def set_seq_id(obj_type, router, id, obj_name): + """ + Saves sequence number if not auto-generated and given by user + Parameters + ---------- + * `obj_type`: prefix_lists or route_maps + * `router`: router name + *` obj_name`: name of the prefix-list or route-map + """ + router_data = SEQ_ID[obj_type].setdefault(router, {}) + obj_data = router_data.setdefault(obj_name, {}) + seq_id = obj_data.setdefault("seq_id", 0) + + seq_id = int(seq_id) + int(id) + obj_data["seq_id"] = seq_id + + +class InvalidCLIError(Exception): + """Raise when the CLI command is wrong""" + + +def run_frr_cmd(rnode, cmd, isjson=False): + """ + Execute frr show commands in privileged mode + * `rnode`: router node on which command needs to be executed + * `cmd`: Command to be executed on frr + * `isjson`: If command is to get json data or not + :return str: + """ + + if cmd: + ret_data = rnode.vtysh_cmd(cmd, isjson=isjson) + + if isjson: + rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False) + + return ret_data + + else: + raise InvalidCLIError("No actual cmd passed") + + +def apply_raw_config(tgen, input_dict): + + """ + API to configure raw configuration on device. This can be used for any cli + which has not been implemented in JSON. + + Parameters + ---------- + * `tgen`: tgen object + * `input_dict`: configuration that needs to be applied + + Usage + ----- + input_dict = { + "r2": { + "raw_config": [ + "router bgp", + "no bgp update-group-split-horizon" + ] + } + } + Returns + ------- + True or errormsg + """ + + rlist = [] + + for router_name in input_dict.keys(): + config_cmd = input_dict[router_name]["raw_config"] + + if not isinstance(config_cmd, list): + config_cmd = [config_cmd] + + frr_cfg_file = "{}/{}/{}".format(tgen.logdir, router_name, FRRCFG_FILE) + with open(frr_cfg_file, "w") as cfg: + for cmd in config_cmd: + cfg.write("{}\n".format(cmd)) + + rlist.append(router_name) + + # Load config on all routers + return load_config_to_routers(tgen, rlist) + + +def create_common_configurations( + tgen, config_dict, config_type=None, build=False, load_config=True +): + """ + API to create object of class FRRConfig and also create frr_json.conf + file. It will create interface and common configurations and save it to + frr_json.conf and load to router + Parameters + ---------- + * `tgen`: tgen object + * `config_dict`: Configuration data saved in a dict of { router: config-list } + * `routers` : list of router id to be configured. + * `config_type` : Syntactic information while writing configuration. Should + be one of the value as mentioned in the config_map below. + * `build` : Only for initial setup phase this is set as True + Returns + ------- + True or False + """ + + config_map = OrderedDict( + { + "general_config": "! FRR General Config\n", + "debug_log_config": "! Debug log Config\n", + "interface_config": "! Interfaces Config\n", + "static_route": "! Static Route Config\n", + "prefix_list": "! Prefix List Config\n", + "bgp_community_list": "! Community List Config\n", + "route_maps": "! Route Maps Config\n", + "bgp": "! BGP Config\n", + "vrf": "! VRF Config\n", + "ospf": "! OSPF Config\n", + "ospf6": "! OSPF Config\n", + "pim": "! PIM Config\n", + } + ) + + if build: + mode = "a" + elif not load_config: + mode = "a" + else: + mode = "w" + + routers = config_dict.keys() + for router in routers: + fname = "{}/{}/{}".format(tgen.logdir, router, FRRCFG_FILE) + try: + frr_cfg_fd = open(fname, mode) + if config_type: + frr_cfg_fd.write(config_map[config_type]) + for line in config_dict[router]: + frr_cfg_fd.write("{} \n".format(str(line))) + frr_cfg_fd.write("\n") + + except IOError as err: + logger.error("Unable to open FRR Config '%s': %s" % (fname, str(err))) + return False + finally: + frr_cfg_fd.close() + + # If configuration applied from build, it will done at last + result = True + if not build and load_config: + result = load_config_to_routers(tgen, routers) + + return result + + +def create_common_configuration( + tgen, router, data, config_type=None, build=False, load_config=True +): + """ + API to create object of class FRRConfig and also create frr_json.conf + file. It will create interface and common configurations and save it to + frr_json.conf and load to router + Parameters + ---------- + * `tgen`: tgen object + * `data`: Configuration data saved in a list. + * `router` : router id to be configured. + * `config_type` : Syntactic information while writing configuration. Should + be one of the value as mentioned in the config_map below. + * `build` : Only for initial setup phase this is set as True + Returns + ------- + True or False + """ + return create_common_configurations( + tgen, {router: data}, config_type, build, load_config + ) + + +def kill_router_daemons(tgen, router, daemons, save_config=True): + """ + Router's current config would be saved to /etc/frr/ for each daemon + and daemon would be killed forcefully using SIGKILL. + * `tgen` : topogen object + * `router`: Device under test + * `daemons`: list of daemons to be killed + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + try: + router_list = tgen.routers() + + if save_config: + # Saving router config to /etc/frr, which will be loaded to router + # when it starts + router_list[router].vtysh_cmd("write memory") + + # Kill Daemons + result = router_list[router].killDaemons(daemons) + if len(result) > 0: + assert "Errors found post shutdown - details follow:" == 0, result + return result + + except Exception as e: + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + +def start_router_daemons(tgen, router, daemons): + """ + Daemons defined by user would be started + * `tgen` : topogen object + * `router`: Device under test + * `daemons`: list of daemons to be killed + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + try: + router_list = tgen.routers() + + # Start daemons + res = router_list[router].startDaemons(daemons) + + except Exception as e: + errormsg = traceback.format_exc() + logger.error(errormsg) + res = errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return res + + +def check_router_status(tgen): + """ + Check if all daemons are running for all routers in topology + * `tgen` : topogen object + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + try: + router_list = tgen.routers() + for router, rnode in router_list.items(): + + result = rnode.check_router_running() + if result != "": + daemons = [] + if "bgpd" in result: + daemons.append("bgpd") + if "zebra" in result: + daemons.append("zebra") + if "pimd" in result: + daemons.append("pimd") + if "pim6d" in result: + daemons.append("pim6d") + if "ospfd" in result: + daemons.append("ospfd") + if "ospf6d" in result: + daemons.append("ospf6d") + rnode.startDaemons(daemons) + + except Exception as e: + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def save_initial_config_on_routers(tgen): + """Save current configuration on routers to FRRCFG_BKUP_FILE. + + FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()` + is called. + + Parameters + ---------- + * `tgen` : Topogen object + """ + router_list = tgen.routers() + target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf" + + # Get all running configs in parallel + procs = {} + for rname in router_list: + logger.info("Fetching running config for router %s", rname) + procs[rname] = router_list[rname].popen( + ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"], + stdin=None, + stdout=open(target_cfg_fmt.format(rname), "w"), + stderr=subprocess.PIPE, + ) + for rname, p in procs.items(): + _, error = p.communicate() + if p.returncode: + logger.error( + "Get running config for %s failed %d: %s", rname, p.returncode, error + ) + raise InvalidCLIError( + "vtysh show running error on {}: {}".format(rname, error) + ) + + +def reset_config_on_routers(tgen, routerName=None): + """ + Resets configuration on routers to the snapshot created using input JSON + file. It replaces existing router configuration with FRRCFG_BKUP_FILE + + Parameters + ---------- + * `tgen` : Topogen object + * `routerName` : router config is to be reset + """ + + logger.debug("Entering API: reset_config_on_routers") + + tgen.cfg_gen += 1 + gen = tgen.cfg_gen + + # Trim the router list if needed + router_list = tgen.routers() + if routerName: + if routerName not in router_list: + logger.warning( + "Exiting API: reset_config_on_routers: no router %s", + routerName, + exc_info=True, + ) + return True + router_list = {routerName: router_list[routerName]} + + delta_fmt = tgen.logdir + "/{}/delta-{}.conf" + # FRRCFG_BKUP_FILE + target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf" + run_cfg_fmt = tgen.logdir + "/{}/frr-{}.sav" + + # + # Get all running configs in parallel + # + procs = {} + for rname in router_list: + logger.info("Fetching running config for router %s", rname) + procs[rname] = router_list[rname].popen( + ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"], + stdin=None, + stdout=open(run_cfg_fmt.format(rname, gen), "w"), + stderr=subprocess.PIPE, + ) + for rname, p in procs.items(): + _, error = p.communicate() + if p.returncode: + logger.error( + "Get running config for %s failed %d: %s", rname, p.returncode, error + ) + raise InvalidCLIError( + "vtysh show running error on {}: {}".format(rname, error) + ) + + # + # Get all delta's in parallel + # + procs = {} + for rname in router_list: + logger.info( + "Generating delta for router %s to new configuration (gen %d)", rname, gen + ) + procs[rname] = tgen.net.popen( + [ + "/usr/lib/frr/frr-reload.py", + "--test-reset", + "--input", + run_cfg_fmt.format(rname, gen), + "--test", + target_cfg_fmt.format(rname), + ], + stdin=None, + stdout=open(delta_fmt.format(rname, gen), "w"), + stderr=subprocess.PIPE, + ) + for rname, p in procs.items(): + _, error = p.communicate() + if p.returncode: + logger.error( + "Delta file creation for %s failed %d: %s", rname, p.returncode, error + ) + raise InvalidCLIError("frr-reload error for {}: {}".format(rname, error)) + + # + # Apply all the deltas in parallel + # + procs = {} + for rname in router_list: + logger.info("Applying delta config on router %s", rname) + + procs[rname] = router_list[rname].popen( + ["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)], + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + for rname, p in procs.items(): + output, _ = p.communicate() + vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen)) + if not p.returncode: + router_list[rname].logger.info( + '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format( + vtysh_command, output + ) + ) + else: + router_list[rname].logger.warning( + '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format( + vtysh_command, output + ) + ) + logger.error( + "Delta file apply for %s failed %d: %s", rname, p.returncode, output + ) + + # We really need to enable this failure; however, currently frr-reload.py + # producing invalid "no" commands as it just preprends "no", but some of the + # command forms lack matching values (e.g., final values). Until frr-reload + # is fixed to handle this (or all the CLI no forms are adjusted) we can't + # fail tests. + # raise InvalidCLIError("frr-reload error for {}: {}".format(rname, output)) + + # + # Optionally log all new running config if "show_router_config" is defined in + # "pytest.ini" + # + if show_router_config: + procs = {} + for rname in router_list: + logger.info("Fetching running config for router %s", rname) + procs[rname] = router_list[rname].popen( + ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"], + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + for rname, p in procs.items(): + output, _ = p.communicate() + if p.returncode: + logger.warning( + "Get running config for %s failed %d: %s", + rname, + p.returncode, + output, + ) + else: + logger.info( + "Configuration on router %s after reset:\n%s", rname, output + ) + + logger.debug("Exiting API: reset_config_on_routers") + return True + + +def prep_load_config_to_routers(tgen, *config_name_list): + """Create common config for `load_config_to_routers`. + + The common config file is constructed from the list of sub-config files passed as + position arguments to this function. Each entry in `config_name_list` is looked for + under the router sub-directory in the test directory and those files are + concatenated together to create the common config. e.g., + + # Routers are "r1" and "r2", test file is `example/test_example_foo.py` + prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf") + + When the above call is made the files in + + example/r1/bgpd.conf + example/r1/ospfd.conf + + Are concat'd together into a single config file that will be loaded on r1, and + + example/r2/bgpd.conf + example/r2/ospfd.conf + + Are concat'd together into a single config file that will be loaded on r2 when + the call to `load_config_to_routers` is made. + """ + + routers = tgen.routers() + for rname, router in routers.items(): + destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE) + wmode = "w" + for cfbase in config_name_list: + script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"] + confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase)) + with open(confname, "r") as cf: + with open(destname, wmode) as df: + df.write(cf.read()) + wmode = "a" + + +def load_config_to_routers(tgen, routers, save_bkup=False): + """ + Loads configuration on routers from the file FRRCFG_FILE. + + Parameters + ---------- + * `tgen` : Topogen object + * `routers` : routers for which configuration is to be loaded + * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE + Returns + ------- + True or False + """ + + logger.debug("Entering API: load_config_to_routers") + + tgen.cfg_gen += 1 + gen = tgen.cfg_gen + + base_router_list = tgen.routers() + router_list = {} + for router in routers: + if router not in base_router_list: + continue + router_list[router] = base_router_list[router] + + frr_cfg_file_fmt = tgen.logdir + "/{}/" + FRRCFG_FILE + frr_cfg_save_file_fmt = tgen.logdir + "/{}/{}-" + FRRCFG_FILE + frr_cfg_bkup_fmt = tgen.logdir + "/{}/" + FRRCFG_BKUP_FILE + + procs = {} + for rname in router_list: + router = router_list[rname] + try: + frr_cfg_file = frr_cfg_file_fmt.format(rname) + frr_cfg_save_file = frr_cfg_save_file_fmt.format(rname, gen) + frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname) + with open(frr_cfg_file, "r+") as cfg: + data = cfg.read() + logger.info( + "Applying following configuration on router %s (gen: %d):\n%s", + rname, + gen, + data, + ) + # Always save a copy of what we just did + with open(frr_cfg_save_file, "w") as bkup: + bkup.write(data) + if save_bkup: + with open(frr_cfg_bkup, "w") as bkup: + bkup.write(data) + procs[rname] = router_list[rname].popen( + ["/usr/bin/env", "vtysh", "-f", frr_cfg_file], + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + except IOError as err: + logger.error( + "Unable to open config File. error(%s): %s", err.errno, err.strerror + ) + return False + except Exception as error: + logger.error("Unable to apply config on %s: %s", rname, str(error)) + return False + + errors = [] + for rname, p in procs.items(): + output, _ = p.communicate() + frr_cfg_file = frr_cfg_file_fmt.format(rname) + vtysh_command = "vtysh -f " + frr_cfg_file + if not p.returncode: + router_list[rname].logger.info( + '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format( + vtysh_command, output + ) + ) + else: + router_list[rname].logger.error( + '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format( + vtysh_command, output + ) + ) + logger.error( + "Config apply for %s failed %d: %s", rname, p.returncode, output + ) + # We can't thorw an exception here as we won't clear the config file. + errors.append( + InvalidCLIError( + "load_config_to_routers error for {}: {}".format(rname, output) + ) + ) + + # Empty the config file or we append to it next time through. + with open(frr_cfg_file, "r+") as cfg: + cfg.truncate(0) + + # Router current configuration to log file or console if + # "show_router_config" is defined in "pytest.ini" + if show_router_config: + procs = {} + for rname in router_list: + procs[rname] = router_list[rname].popen( + ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"], + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + for rname, p in procs.items(): + output, _ = p.communicate() + if p.returncode: + logger.warning( + "Get running config for %s failed %d: %s", + rname, + p.returncode, + output, + ) + else: + logger.info("New configuration for router %s:\n%s", rname, output) + + logger.debug("Exiting API: load_config_to_routers") + return not errors + + +def load_config_to_router(tgen, routerName, save_bkup=False): + """ + Loads configuration on router from the file FRRCFG_FILE. + + Parameters + ---------- + * `tgen` : Topogen object + * `routerName` : router for which configuration to be loaded + * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE + """ + return load_config_to_routers(tgen, [routerName], save_bkup) + + +def reset_with_new_configs(tgen, *cflist): + """Reset the router to initial config, then load new configs. + + Resets routers to the initial config state (see `save_initial_config_on_routers() + and `reset_config_on_routers()` `), then concat list of router sub-configs together + and load onto the routers (see `prep_load_config_to_routers()` and + `load_config_to_routers()`) + """ + routers = tgen.routers() + + reset_config_on_routers(tgen) + prep_load_config_to_routers(tgen, *cflist) + load_config_to_routers(tgen, tgen.routers(), save_bkup=False) + + +def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None): + """ + API to get the link local ipv6 address of a particular interface using + FRR command 'show interface' + + * `tgen`: tgen object + * `router` : router for which highest interface should be + calculated + * `intf` : interface for which link-local address needs to be taken + * `vrf` : VRF name + + Usage + ----- + linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A) + + Returns + ------- + 1) array of interface names to link local ips. + """ + + router_list = tgen.routers() + for rname, rnode in router_list.items(): + if rname != router: + continue + + linklocal = [] + + if vrf: + cmd = "show interface vrf {}".format(vrf) + else: + cmd = "show interface" + + linklocal = [] + if vrf: + cmd = "show interface vrf {}".format(vrf) + else: + cmd = "show interface" + for chk_ll in range(0, 60): + sleep(1 / 4) + ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd)) + # Fix newlines (make them all the same) + ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines() + + interface = None + ll_per_if_count = 0 + for line in ifaces: + # Interface name + m = re_search("Interface ([a-zA-Z0-9-]+) is", line) + if m: + interface = m.group(1).split(" ")[0] + ll_per_if_count = 0 + + # Interface ip + m1 = re_search("inet6 (fe80[:a-fA-F0-9]+/[0-9]+)", line) + if m1: + local = m1.group(1) + ll_per_if_count += 1 + if ll_per_if_count > 1: + linklocal += [["%s-%s" % (interface, ll_per_if_count), local]] + else: + linklocal += [[interface, local]] + + try: + if linklocal: + if intf: + return [ + _linklocal[1] + for _linklocal in linklocal + if _linklocal[0] == intf + ][0].split("/")[0] + return linklocal + except IndexError: + continue + + errormsg = "Link local ip missing on router {}".format(router) + return errormsg + + +def generate_support_bundle(): + """ + API to generate support bundle on any verification ste failure. + it runs a python utility, /usr/lib/frr/generate_support_bundle.py, + which basically runs defined CLIs and dumps the data to specified location + """ + + tgen = get_topogen() + router_list = tgen.routers() + test_name = os.environ.get("PYTEST_CURRENT_TEST").split(":")[-1].split(" ")[0] + + bundle_procs = {} + for rname, rnode in router_list.items(): + logger.info("Spawn collection of support bundle for %s", rname) + dst_bundle = "{}/{}/support_bundles/{}".format(tgen.logdir, rname, test_name) + rnode.run("mkdir -p " + dst_bundle) + + gen_sup_cmd = [ + "/usr/lib/frr/generate_support_bundle.py", + "--log-dir=" + dst_bundle, + ] + bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None) + + for rname, rnode in router_list.items(): + logger.info("Waiting on support bundle for %s", rname) + output, error = bundle_procs[rname].communicate() + if output: + logger.info( + "Output from collecting support bundle for %s:\n%s", rname, output + ) + if error: + logger.warning( + "Error from collecting support bundle for %s:\n%s", rname, error + ) + + return True + + +def start_topology(tgen, daemon=None): + """ + Starting topology, create tmp files which are loaded to routers + to start daemons and then start routers + * `tgen` : topogen object + """ + + # Starting topology + tgen.start_topology() + + # Starting daemons + + router_list = tgen.routers() + routers_sorted = sorted( + router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0)) + ) + + linux_ver = "" + router_list = tgen.routers() + for rname in routers_sorted: + router = router_list[rname] + + # It will help in debugging the failures, will give more details on which + # specific kernel version tests are failing + if linux_ver == "": + linux_ver = router.run("uname -a") + logger.info("Logging platform related details: \n %s \n", linux_ver) + + try: + os.chdir(tgen.logdir) + + # # Creating router named dir and empty zebra.conf bgpd.conf files + # # inside the current directory + # if os.path.isdir("{}".format(rname)): + # os.system("rm -rf {}".format(rname)) + # os.mkdir("{}".format(rname)) + # os.system("chmod -R go+rw {}".format(rname)) + # os.chdir("{}/{}".format(tgen.logdir, rname)) + # os.system("touch zebra.conf bgpd.conf") + # else: + # os.mkdir("{}".format(rname)) + # os.system("chmod -R go+rw {}".format(rname)) + # os.chdir("{}/{}".format(tgen.logdir, rname)) + # os.system("touch zebra.conf bgpd.conf") + + except IOError as err: + logger.error("I/O error({0}): {1}".format(err.errno, err.strerror)) + + # Loading empty zebra.conf file to router, to start the zebra daemon + router.load_config( + TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(tgen.logdir, rname) + ) + + # Loading empty bgpd.conf file to router, to start the bgp daemon + router.load_config( + TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(tgen.logdir, rname) + ) + + if daemon and "ospfd" in daemon: + # Loading empty ospf.conf file to router, to start the bgp daemon + router.load_config( + TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(tgen.logdir, rname) + ) + + if daemon and "ospf6d" in daemon: + # Loading empty ospf.conf file to router, to start the bgp daemon + router.load_config( + TopoRouter.RD_OSPF6, "{}/{}/ospf6d.conf".format(tgen.logdir, rname) + ) + + if daemon and "pimd" in daemon: + # Loading empty pimd.conf file to router, to start the pim deamon + router.load_config( + TopoRouter.RD_PIM, "{}/{}/pimd.conf".format(tgen.logdir, rname) + ) + + if daemon and "pim6d" in daemon: + # Loading empty pimd.conf file to router, to start the pim6d deamon + router.load_config( + TopoRouter.RD_PIM6, "{}/{}/pim6d.conf".format(tgen.logdir, rname) + ) + + # Starting routers + logger.info("Starting all routers once topology is created") + tgen.start_router() + + +def stop_router(tgen, router): + """ + Router"s current config would be saved to /tmp/topotest/<suite>/<router> for each daemon + and router and its daemons would be stopped. + + * `tgen` : topogen object + * `router`: Device under test + """ + + router_list = tgen.routers() + + # Saving router config to /etc/frr, which will be loaded to router + # when it starts + router_list[router].vtysh_cmd("write memory") + + # Stop router + router_list[router].stop() + + +def start_router(tgen, router): + """ + Router will be started and config would be loaded from /tmp/topotest/<suite>/<router> for each + daemon + + * `tgen` : topogen object + * `router`: Device under test + """ + + logger.debug("Entering lib API: start_router") + + try: + router_list = tgen.routers() + + # Router and its daemons would be started and config would + # be loaded to router for each daemon from /etc/frr + router_list[router].start() + + # Waiting for router to come up + sleep(5) + + except Exception as e: + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: start_router()") + return True + + +def number_to_row(routerName): + """ + Returns the number for the router. + Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23 + etc + """ + return int(routerName[1:]) + + +def number_to_column(routerName): + """ + Returns the number for the router. + Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1, + z23 = column 26 etc + """ + return ord(routerName[0]) - 97 + + +def topo_daemons(tgen, topo=None): + """ + Returns daemon list required for the suite based on topojson. + """ + daemon_list = [] + + if topo is None: + topo = tgen.json_topo + + router_list = tgen.routers() + routers_sorted = sorted( + router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0)) + ) + + for rtr in routers_sorted: + if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list: + daemon_list.append("ospfd") + + if "ospf6" in topo["routers"][rtr] and "ospf6d" not in daemon_list: + daemon_list.append("ospf6d") + + for val in topo["routers"][rtr]["links"].values(): + if "pim" in val and "pimd" not in daemon_list: + daemon_list.append("pimd") + if "pim6" in val and "pim6d" not in daemon_list: + daemon_list.append("pim6d") + if "ospf" in val and "ospfd" not in daemon_list: + daemon_list.append("ospfd") + if "ospf6" in val and "ospf6d" not in daemon_list: + daemon_list.append("ospf6d") + break + + return daemon_list + + +def add_interfaces_to_vlan(tgen, input_dict): + """ + Add interfaces to VLAN, we need vlan pakcage to be installed on machine + + * `tgen`: tgen onject + * `input_dict` : interfaces to be added to vlans + + input_dict= { + "r1":{ + "vlan":{ + VLAN_1: [{ + intf_r1_s1: { + "ip": "10.1.1.1", + "subnet": "255.255.255.0 + } + }] + } + } + } + + add_interfaces_to_vlan(tgen, input_dict) + + """ + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = router_list[dut] + + if "vlan" in input_dict[dut]: + for vlan, interfaces in input_dict[dut]["vlan"].items(): + for intf_dict in interfaces: + for interface, data in intf_dict.items(): + # Adding interface to VLAN + vlan_intf = "{}.{}".format(interface, vlan) + cmd = "ip link add link {} name {} type vlan id {}".format( + interface, vlan_intf, vlan + ) + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + result = rnode.run(cmd) + logger.info("result %s", result) + + # Bringing interface up + cmd = "ip link set {} up".format(vlan_intf) + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + result = rnode.run(cmd) + logger.info("result %s", result) + + # Assigning IP address + ifaddr = ipaddress.ip_interface( + "{}/{}".format( + frr_unicode(data["ip"]), frr_unicode(data["subnet"]) + ) + ) + + cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format( + ifaddr.version, vlan_intf, ifaddr + ) + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + result = rnode.run(cmd) + logger.info("result %s", result) + + +def tcpdump_capture_start( + tgen, + router, + intf, + protocol=None, + grepstr=None, + timeout=0, + options=None, + cap_file=None, + background=True, +): + """ + API to capture network packets using tcp dump. + + Packages used : + + Parameters + ---------- + * `tgen`: topogen object. + * `router`: router on which ping has to be performed. + * `intf` : interface for capture. + * `protocol` : protocol for which packet needs to be captured. + * `grepstr` : string to filter out tcp dump output. + * `timeout` : Time for which packet needs to be captured. + * `options` : options for TCP dump, all tcpdump options can be used. + * `cap_file` : filename to store capture dump. + * `background` : Make tcp dump run in back ground. + + Usage + ----- + tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20, + options='-A -vv -x > r2bgp.txt ') + Returns + ------- + 1) True for successful capture + 2) errormsg - when tcp dump fails + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + rnode = tgen.gears[router] + + if timeout > 0: + cmd = "timeout {}".format(timeout) + else: + cmd = "" + + cmdargs = "{} tcpdump".format(cmd) + + if intf: + cmdargs += " -i {}".format(str(intf)) + if protocol: + cmdargs += " {}".format(str(protocol)) + if options: + cmdargs += " -s 0 {}".format(str(options)) + + if cap_file: + file_name = os.path.join(tgen.logdir, router, cap_file) + cmdargs += " -w {}".format(str(file_name)) + # Remove existing capture file + rnode.run("rm -rf {}".format(file_name)) + + if grepstr: + cmdargs += ' | grep "{}"'.format(str(grepstr)) + + logger.info("Running tcpdump command: [%s]", cmdargs) + if not background: + rnode.run(cmdargs) + else: + # XXX this & is bogus doesn't work + # rnode.run("nohup {} & /dev/null 2>&1".format(cmdargs)) + rnode.run("nohup {} > /dev/null 2>&1".format(cmdargs)) + + # Check if tcpdump process is running + if background: + result = rnode.run("pgrep tcpdump") + logger.debug("ps -ef | grep tcpdump \n {}".format(result)) + + if not result: + errormsg = "tcpdump is not running {}".format("tcpdump") + return errormsg + else: + logger.info("Packet capture started on %s: interface %s", router, intf) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def tcpdump_capture_stop(tgen, router): + """ + API to capture network packets using tcp dump. + + Packages used : + + Parameters + ---------- + * `tgen`: topogen object. + * `router`: router on which ping has to be performed. + * `intf` : interface for capture. + * `protocol` : protocol for which packet needs to be captured. + * `grepstr` : string to filter out tcp dump output. + * `timeout` : Time for which packet needs to be captured. + * `options` : options for TCP dump, all tcpdump options can be used. + * `cap2file` : filename to store capture dump. + * `bakgrnd` : Make tcp dump run in back ground. + + Usage + ----- + tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20, + options='-A -vv -x > r2bgp.txt ') + Returns + ------- + 1) True for successful capture + 2) errormsg - when tcp dump fails + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + rnode = tgen.gears[router] + + # Check if tcpdump process is running + result = rnode.run("ps -ef | grep tcpdump") + logger.debug("ps -ef | grep tcpdump \n {}".format(result)) + + if not re_search(r"{}".format("tcpdump"), result): + errormsg = "tcpdump is not running {}".format("tcpdump") + return errormsg + else: + # XXX this doesn't work with micronet + ppid = tgen.net.nameToNode[rnode.name].pid + rnode.run("set +m; pkill -P %s tcpdump &> /dev/null" % ppid) + logger.info("Stopped tcpdump capture") + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def create_debug_log_config(tgen, input_dict, build=False): + """ + Enable/disable debug logs for any protocol with defined debug + options and logs would be saved to created log file + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : details to enable debug logs for protocols + * `build` : Only for initial setup phase this is set as True. + + + Usage: + ------ + input_dict = { + "r2": { + "debug":{ + "log_file" : "debug.log", + "enable": ["pimd", "zebra"], + "disable": { + "bgpd":[ + 'debug bgp neighbor-events', + 'debug bgp updates', + 'debug bgp zebra', + ] + } + } + } + } + + result = create_debug_log_config(tgen, input_dict) + + Returns + ------- + True or False + """ + + result = False + try: + debug_config_dict = {} + + for router in input_dict.keys(): + debug_config = [] + if "debug" in input_dict[router]: + debug_dict = input_dict[router]["debug"] + + disable_logs = debug_dict.setdefault("disable", None) + enable_logs = debug_dict.setdefault("enable", None) + log_file = debug_dict.setdefault("log_file", None) + + if log_file: + _log_file = os.path.join(tgen.logdir, log_file) + debug_config.append("log file {} \n".format(_log_file)) + + if type(enable_logs) is list: + for daemon in enable_logs: + for debug_log in DEBUG_LOGS[daemon]: + debug_config.append("{}".format(debug_log)) + elif type(enable_logs) is dict: + for daemon, debug_logs in enable_logs.items(): + for debug_log in debug_logs: + debug_config.append("{}".format(debug_log)) + + if type(disable_logs) is list: + for daemon in disable_logs: + for debug_log in DEBUG_LOGS[daemon]: + debug_config.append("no {}".format(debug_log)) + elif type(disable_logs) is dict: + for daemon, debug_logs in disable_logs.items(): + for debug_log in debug_logs: + debug_config.append("no {}".format(debug_log)) + if debug_config: + debug_config_dict[router] = debug_config + + result = create_common_configurations( + tgen, debug_config_dict, "debug_log_config", build=build + ) + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + + +############################################# +# Common APIs, will be used by all protocols +############################################# + + +def create_vrf_cfg(tgen, topo, input_dict=None, build=False): + """ + Create vrf configuration for created topology. VRF + configuration is provided in input json file. + + VRF config is done in Linux Kernel: + * Create VRF + * Attach interface to VRF + * Bring up VRF + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring + from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + input_dict={ + "r3": { + "links": { + "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"}, + "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"}, + "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"}, + "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"}, + }, + "vrfs":[ + { + "name": "RED_A", + "id": "1" + }, + { + "name": "RED_B", + "id": "2" + }, + { + "name": "BLUE_A", + "id": "3", + "delete": True + }, + { + "name": "BLUE_B", + "id": "4" + } + ] + } + } + result = create_vrf_cfg(tgen, topo, input_dict) + + Returns + ------- + True or False + """ + result = True + if not input_dict: + input_dict = deepcopy(topo) + else: + input_dict = deepcopy(input_dict) + + try: + config_data_dict = {} + + for c_router, c_data in input_dict.items(): + rnode = tgen.gears[c_router] + config_data = [] + if "vrfs" in c_data: + for vrf in c_data["vrfs"]: + name = vrf.setdefault("name", None) + table_id = vrf.setdefault("id", None) + del_action = vrf.setdefault("delete", False) + + if del_action: + # Kernel cmd- Add VRF and table + cmd = "ip link del {} type vrf table {}".format( + vrf["name"], vrf["id"] + ) + + logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd) + rnode.run(cmd) + + # Kernel cmd - Bring down VRF + cmd = "ip link set dev {} down".format(name) + logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd) + rnode.run(cmd) + + else: + if name and table_id: + # Kernel cmd- Add VRF and table + cmd = "ip link add {} type vrf table {}".format( + name, table_id + ) + logger.info( + "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd + ) + rnode.run(cmd) + + # Kernel cmd - Bring up VRF + cmd = "ip link set dev {} up".format(name) + logger.info( + "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd + ) + rnode.run(cmd) + + for vrf in c_data["vrfs"]: + vni = vrf.setdefault("vni", None) + del_vni = vrf.setdefault("no_vni", None) + + if "links" in c_data: + for destRouterLink, data in sorted(c_data["links"].items()): + # Loopback interfaces + if "type" in data and data["type"] == "loopback": + interface_name = destRouterLink + else: + interface_name = data["interface"] + + if "vrf" in data: + vrf_list = data["vrf"] + + if type(vrf_list) is not list: + vrf_list = [vrf_list] + + for _vrf in vrf_list: + cmd = "ip link set {} master {}".format( + interface_name, _vrf + ) + + logger.info( + "[DUT: %s]: Running" " kernel cmd [%s]", + c_router, + cmd, + ) + rnode.run(cmd) + + if vni: + config_data.append("vrf {}".format(vrf["name"])) + cmd = "vni {}".format(vni) + config_data.append(cmd) + + if del_vni: + config_data.append("vrf {}".format(vrf["name"])) + cmd = "no vni {}".format(del_vni) + config_data.append(cmd) + + if config_data: + config_data_dict[c_router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "vrf", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + return result + + +def create_interface_in_kernel( + tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True +): + """ + Cretae interfaces in kernel for ipv4/ipv6 + Config is done in Linux Kernel: + + Parameters + ---------- + * `tgen` : Topogen object + * `dut` : Device for which interfaces to be added + * `name` : interface name + * `ip_addr` : ip address for interface + * `vrf` : VRF name, to which interface will be associated + * `netmask` : netmask value, default is None + * `create`: Create interface in kernel, if created then no need + to create + """ + + rnode = tgen.gears[dut] + + if create: + cmd = "ip link show {0} >/dev/null || ip link add {0} type dummy".format(name) + rnode.run(cmd) + + if not netmask: + ifaddr = ipaddress.ip_interface(frr_unicode(ip_addr)) + else: + ifaddr = ipaddress.ip_interface( + "{}/{}".format(frr_unicode(ip_addr), frr_unicode(netmask)) + ) + cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format( + ifaddr.version, name, ifaddr + ) + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + if vrf: + cmd = "ip link set {} master {}".format(name, vrf) + rnode.run(cmd) + + +def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False): + """ + Cretae interfaces in kernel for ipv4/ipv6 + Config is done in Linux Kernel: + + Parameters + ---------- + * `tgen` : Topogen object + * `dut` : Device for which interfaces to be added + * `intf_name` : interface name + * `ifaceaction` : False to shutdown and True to bringup the + ineterface + """ + + rnode = tgen.gears[dut] + + cmd = "ip link set dev" + if ifaceaction: + action = "up" + cmd = "{} {} {}".format(cmd, intf_name, action) + else: + action = "down" + cmd = "{} {} {}".format(cmd, intf_name, action) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + +def validate_ip_address(ip_address): + """ + Validates the type of ip address + Parameters + ---------- + * `ip_address`: IPv4/IPv6 address + Returns + ------- + Type of address as string + """ + + if "/" in ip_address: + ip_address = ip_address.split("/")[0] + + v4 = True + v6 = True + try: + socket.inet_aton(ip_address) + except socket.error as error: + logger.debug("Not a valid IPv4 address") + v4 = False + else: + return "ipv4" + + try: + socket.inet_pton(socket.AF_INET6, ip_address) + except socket.error as error: + logger.debug("Not a valid IPv6 address") + v6 = False + else: + return "ipv6" + + if not v4 and not v6: + raise Exception( + "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address + ) + + +def check_address_types(addr_type=None): + """ + Checks environment variable set and compares with the current address type + """ + + addr_types_env = os.environ.get("ADDRESS_TYPES") + if not addr_types_env: + addr_types_env = "dual" + + if addr_types_env == "dual": + addr_types = ["ipv4", "ipv6"] + elif addr_types_env == "ipv4": + addr_types = ["ipv4"] + elif addr_types_env == "ipv6": + addr_types = ["ipv6"] + + if addr_type is None: + return addr_types + + if addr_type not in addr_types: + logger.debug( + "{} not in supported/configured address types {}".format( + addr_type, addr_types + ) + ) + return False + + return True + + +def generate_ips(network, no_of_ips): + """ + Returns list of IPs. + based on start_ip and no_of_ips + + * `network` : from here the ip will start generating, + start_ip will be + * `no_of_ips` : these many IPs will be generated + """ + ipaddress_list = [] + if type(network) is not list: + network = [network] + + for start_ipaddr in network: + if "/" in start_ipaddr: + start_ip = start_ipaddr.split("/")[0] + mask = int(start_ipaddr.split("/")[1]) + else: + logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr)) + assert 0 + + addr_type = validate_ip_address(start_ip) + if addr_type == "ipv4": + if start_ip == "0.0.0.0" and mask == 0 and no_of_ips == 1: + ipaddress_list.append("{}/{}".format(start_ip, mask)) + return ipaddress_list + start_ip = ipaddress.IPv4Address(frr_unicode(start_ip)) + step = 2 ** (32 - mask) + elif addr_type == "ipv6": + if start_ip == "0::0" and mask == 0 and no_of_ips == 1: + ipaddress_list.append("{}/{}".format(start_ip, mask)) + return ipaddress_list + start_ip = ipaddress.IPv6Address(frr_unicode(start_ip)) + step = 2 ** (128 - mask) + else: + return [] + + next_ip = start_ip + count = 0 + while count < no_of_ips: + ipaddress_list.append("{}/{}".format(next_ip, mask)) + if addr_type == "ipv6": + next_ip = ipaddress.IPv6Address(int(next_ip) + step) + else: + next_ip += step + count += 1 + + return ipaddress_list + + +def find_interface_with_greater_ip(topo, router, loopback=True, interface=True): + """ + Returns highest interface ip for ipv4/ipv6. If loopback is there then + it will return highest IP from loopback IPs otherwise from physical + interface IPs. + * `topo` : json file data + * `router` : router for which highest interface should be calculated + """ + + link_data = topo["routers"][router]["links"] + lo_list = [] + interfaces_list = [] + lo_exists = False + for destRouterLink, data in sorted(link_data.items()): + if loopback: + if "type" in data and data["type"] == "loopback": + lo_exists = True + ip_address = topo["routers"][router]["links"][destRouterLink][ + "ipv4" + ].split("/")[0] + lo_list.append(ip_address) + if interface: + ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split( + "/" + )[0] + interfaces_list.append(ip_address) + + if lo_exists: + return sorted(lo_list)[-1] + + return sorted(interfaces_list)[-1] + + +def write_test_header(tc_name): + """Display message at beginning of test case""" + count = 20 + logger.info("*" * (len(tc_name) + count)) + step("START -> Testcase : %s" % tc_name, reset=True) + logger.info("*" * (len(tc_name) + count)) + + +def write_test_footer(tc_name): + """Display message at end of test case""" + count = 21 + logger.info("=" * (len(tc_name) + count)) + logger.info("Testcase : %s -> PASSED", tc_name) + logger.info("=" * (len(tc_name) + count)) + + +def interface_status(tgen, topo, input_dict): + """ + Delete ip route maps from device + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : for which router, route map has to be deleted + Usage + ----- + input_dict = { + "r3": { + "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'], + "status": "down" + } + } + Returns + ------- + errormsg(str) or True + """ + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + try: + rlist = [] + + for router in input_dict.keys(): + + interface_list = input_dict[router]["interface_list"] + status = input_dict[router].setdefault("status", "up") + for intf in interface_list: + rnode = tgen.gears[router] + interface_set_status(rnode, intf, status) + + rlist.append(router) + + # Load config to routers + load_config_to_routers(tgen, rlist) + + except Exception as e: + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def retry(retry_timeout, initial_wait=0, expected=True, diag_pct=0.75): + """ + Fixture: Retries function while it's return value is an errormsg (str), False, or it raises an exception. + + * `retry_timeout`: Retry for at least this many seconds; after waiting initial_wait seconds + * `initial_wait`: Sleeps for this many seconds before first executing function + * `expected`: if False then the return logic is inverted, except for exceptions, + (i.e., a False or errmsg (str) function return ends the retry loop, + and returns that False or str value) + * `diag_pct`: Percentage of `retry_timeout` to keep testing after negative result would have + been returned in order to see if a positive result comes after. This is an + important diagnostic tool, and normally should not be disabled. Calls to wrapped + functions though, can override the `diag_pct` value to make it larger in case more + diagnostic retrying is appropriate. + """ + + def _retry(func): + @wraps(func) + def func_retry(*args, **kwargs): + # We will continue to retry diag_pct of the timeout value to see if test would have passed with a + # longer retry timeout value. + saved_failure = None + + retry_sleep = 2 + + # Allow the wrapped function's args to override the fixtures + _retry_timeout = kwargs.pop("retry_timeout", retry_timeout) + _expected = kwargs.pop("expected", expected) + _initial_wait = kwargs.pop("initial_wait", initial_wait) + _diag_pct = kwargs.pop("diag_pct", diag_pct) + + start_time = datetime.now() + retry_until = datetime.now() + timedelta( + seconds=_retry_timeout + _initial_wait + ) + + if initial_wait > 0: + logger.info("Waiting for [%s]s as initial delay", initial_wait) + sleep(initial_wait) + + invert_logic = not _expected + while True: + seconds_left = (retry_until - datetime.now()).total_seconds() + try: + ret = func(*args, **kwargs) + logger.debug("Function returned %s", ret) + + negative_result = ret is False or is_string(ret) + if negative_result == invert_logic: + # Simple case, successful result in time + if not saved_failure: + return ret + + # Positive result, but happened after timeout failure, very important to + # note for fixing tests. + logger.warning( + "RETRY DIAGNOSTIC: SUCCEED after FAILED with requested timeout of %.1fs; however, succeeded in %.1fs, investigate timeout timing", + _retry_timeout, + (datetime.now() - start_time).total_seconds(), + ) + if isinstance(saved_failure, Exception): + raise saved_failure # pylint: disable=E0702 + return saved_failure + + except Exception as error: + logger.info("Function raised exception: %s", str(error)) + ret = error + + if seconds_left < 0 and saved_failure: + logger.info( + "RETRY DIAGNOSTIC: Retry timeout reached, still failing" + ) + if isinstance(saved_failure, Exception): + raise saved_failure # pylint: disable=E0702 + return saved_failure + + if seconds_left < 0: + logger.info("Retry timeout of %ds reached", _retry_timeout) + + saved_failure = ret + retry_extra_delta = timedelta( + seconds=seconds_left + _retry_timeout * _diag_pct + ) + retry_until = datetime.now() + retry_extra_delta + seconds_left = retry_extra_delta.total_seconds() + + # Generate bundle after setting remaining diagnostic retry time + generate_support_bundle() + + # If user has disabled diagnostic retries return now + if not _diag_pct: + if isinstance(saved_failure, Exception): + raise saved_failure + return saved_failure + + if saved_failure: + logger.info( + "RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short", + retry_sleep, + seconds_left, + ) + else: + logger.info( + "Sleeping %ds until next retry with %.1f retry time left", + retry_sleep, + seconds_left, + ) + sleep(retry_sleep) + + func_retry._original = func + return func_retry + + return _retry + + +class Stepper: + """ + Prints step number for the test case step being executed + """ + + count = 1 + + def __call__(self, msg, reset): + if reset: + Stepper.count = 1 + logger.info(msg) + else: + logger.info("STEP %s: '%s'", Stepper.count, msg) + Stepper.count += 1 + + +def step(msg, reset=False): + """ + Call Stepper to print test steps. Need to reset at the beginning of test. + * ` msg` : Step message body. + * `reset` : Reset step count to 1 when set to True. + """ + _step = Stepper() + _step(msg, reset) + + +def do_countdown(secs): + """ + Countdown timer display + """ + for i in range(secs, 0, -1): + sys.stdout.write("{} ".format(str(i))) + sys.stdout.flush() + sleep(1) + return + + +############################################# +# These APIs, will used by testcase +############################################# +def create_interfaces_cfg(tgen, topo, build=False): + """ + Create interface configuration for created topology. Basic Interface + configuration is provided in input json file. + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `build` : Only for initial setup phase this is set as True. + + Returns + ------- + True or False + """ + + def _create_interfaces_ospf_cfg(ospf, c_data, data, ospf_keywords): + interface_data = [] + ip_ospf = "ipv6 ospf6" if ospf == "ospf6" else "ip ospf" + for keyword in ospf_keywords: + if keyword in data[ospf]: + intf_ospf_value = c_data["links"][destRouterLink][ospf][keyword] + if "delete" in data and data["delete"]: + interface_data.append( + "no {} {}".format(ip_ospf, keyword.replace("_", "-")) + ) + else: + interface_data.append( + "{} {} {}".format( + ip_ospf, keyword.replace("_", "-"), intf_ospf_value + ) + ) + return interface_data + + result = False + topo = deepcopy(topo) + + try: + interface_data_dict = {} + + for c_router, c_data in topo.items(): + interface_data = [] + for destRouterLink, data in sorted(c_data["links"].items()): + # Loopback interfaces + if "type" in data and data["type"] == "loopback": + interface_name = destRouterLink + else: + interface_name = data["interface"] + + interface_data.append("interface {}".format(str(interface_name))) + + if "ipv4" in data: + intf_addr = c_data["links"][destRouterLink]["ipv4"] + + if "delete" in data and data["delete"]: + interface_data.append("no ip address {}".format(intf_addr)) + else: + interface_data.append("ip address {}".format(intf_addr)) + if "ipv6" in data: + intf_addr = c_data["links"][destRouterLink]["ipv6"] + + if "delete" in data and data["delete"]: + interface_data.append("no ipv6 address {}".format(intf_addr)) + else: + interface_data.append("ipv6 address {}".format(intf_addr)) + + # Wait for vrf interfaces to get link local address once they are up + if ( + not destRouterLink == "lo" + and "vrf" in topo[c_router]["links"][destRouterLink] + ): + vrf = topo[c_router]["links"][destRouterLink]["vrf"] + intf = topo[c_router]["links"][destRouterLink]["interface"] + ll = get_frr_ipv6_linklocal(tgen, c_router, intf=intf, vrf=vrf) + + if "ipv6-link-local" in data: + intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"] + + if "delete" in data and data["delete"]: + interface_data.append("no ipv6 address {}".format(intf_addr)) + else: + interface_data.append("ipv6 address {}\n".format(intf_addr)) + + ospf_keywords = [ + "hello_interval", + "dead_interval", + "network", + "priority", + "cost", + "mtu_ignore", + ] + if "ospf" in data: + interface_data += _create_interfaces_ospf_cfg( + "ospf", c_data, data, ospf_keywords + ["area"] + ) + if "ospf6" in data: + interface_data += _create_interfaces_ospf_cfg( + "ospf6", c_data, data, ospf_keywords + ["area"] + ) + if interface_data: + interface_data_dict[c_router] = interface_data + + result = create_common_configurations( + tgen, interface_data_dict, "interface_config", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + return result + + +def create_static_routes(tgen, input_dict, build=False): + """ + Create static routes for given router as defined in input_dict + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + input_dict should be in the format below: + # static_routes: list of all routes + # network: network address + # no_of_ip: number of next-hop address that will be configured + # admin_distance: admin distance for route/routes. + # next_hop: starting next-hop address + # tag: tag id for static routes + # vrf: VRF name in which static routes needs to be created + # delete: True if config to be removed. Default False. + + Example: + "routers": { + "r1": { + "static_routes": [ + { + "network": "100.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.1", + "tag": 4001, + "vrf": "RED_A" + "delete": true + } + ] + } + } + + Returns + ------- + errormsg(str) or True + """ + result = False + logger.debug("Entering lib API: create_static_routes()") + input_dict = deepcopy(input_dict) + + try: + static_routes_list_dict = {} + + for router in input_dict.keys(): + if "static_routes" not in input_dict[router]: + errormsg = "static_routes not present in input_dict" + logger.info(errormsg) + continue + + static_routes_list = [] + + static_routes = input_dict[router]["static_routes"] + for static_route in static_routes: + del_action = static_route.setdefault("delete", False) + no_of_ip = static_route.setdefault("no_of_ip", 1) + network = static_route.setdefault("network", []) + if type(network) is not list: + network = [network] + + admin_distance = static_route.setdefault("admin_distance", None) + tag = static_route.setdefault("tag", None) + vrf = static_route.setdefault("vrf", None) + interface = static_route.setdefault("interface", None) + next_hop = static_route.setdefault("next_hop", None) + nexthop_vrf = static_route.setdefault("nexthop_vrf", None) + + ip_list = generate_ips(network, no_of_ip) + for ip in ip_list: + addr_type = validate_ip_address(ip) + + if addr_type == "ipv4": + cmd = "ip route {}".format(ip) + else: + cmd = "ipv6 route {}".format(ip) + + if interface: + cmd = "{} {}".format(cmd, interface) + + if next_hop: + cmd = "{} {}".format(cmd, next_hop) + + if nexthop_vrf: + cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf) + + if vrf: + cmd = "{} vrf {}".format(cmd, vrf) + + if tag: + cmd = "{} tag {}".format(cmd, str(tag)) + + if admin_distance: + cmd = "{} {}".format(cmd, admin_distance) + + if del_action: + cmd = "no {}".format(cmd) + + static_routes_list.append(cmd) + + if static_routes_list: + static_routes_list_dict[router] = static_routes_list + + result = create_common_configurations( + tgen, static_routes_list_dict, "static_route", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_static_routes()") + return result + + +def create_prefix_lists(tgen, input_dict, build=False): + """ + Create ip prefix lists as per the config provided in input + JSON or input_dict + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + Usage + ----- + # pf_lists_1: name of prefix-list, user defined + # seqid: prefix-list seqid, auto-generated if not given by user + # network: criteria for applying prefix-list + # action: permit/deny + # le: less than or equal number of bits + # ge: greater than or equal number of bits + Example + ------- + input_dict = { + "r1": { + "prefix_lists":{ + "ipv4": { + "pf_list_1": [ + { + "seqid": 10, + "network": "any", + "action": "permit", + "le": "32", + "ge": "30", + "delete": True + } + ] + } + } + } + } + Returns + ------- + errormsg or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + result = False + try: + config_data_dict = {} + + for router in input_dict.keys(): + if "prefix_lists" not in input_dict[router]: + errormsg = "prefix_lists not present in input_dict" + logger.debug(errormsg) + continue + + config_data = [] + prefix_lists = input_dict[router]["prefix_lists"] + for addr_type, prefix_data in prefix_lists.items(): + if not check_address_types(addr_type): + continue + + for prefix_name, prefix_list in prefix_data.items(): + for prefix_dict in prefix_list: + if "action" not in prefix_dict or "network" not in prefix_dict: + errormsg = "'action' or network' missing in" " input_dict" + return errormsg + + network_addr = prefix_dict["network"] + action = prefix_dict["action"] + le = prefix_dict.setdefault("le", None) + ge = prefix_dict.setdefault("ge", None) + seqid = prefix_dict.setdefault("seqid", None) + del_action = prefix_dict.setdefault("delete", False) + if seqid is None: + seqid = get_seq_id("prefix_lists", router, prefix_name) + else: + set_seq_id("prefix_lists", router, seqid, prefix_name) + + if addr_type == "ipv4": + protocol = "ip" + else: + protocol = "ipv6" + + cmd = "{} prefix-list {} seq {} {} {}".format( + protocol, prefix_name, seqid, action, network_addr + ) + if le: + cmd = "{} le {}".format(cmd, le) + if ge: + cmd = "{} ge {}".format(cmd, ge) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "prefix_list", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + + +def create_route_maps(tgen, input_dict, build=False): + """ + Create route-map on the devices as per the arguments passed + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + Usage + ----- + # route_maps: key, value pair for route-map name and its attribute + # rmap_match_prefix_list_1: user given name for route-map + # action: PERMIT/DENY + # match: key,value pair for match criteria. prefix_list, community-list, + large-community-list or tag. Only one option at a time. + # prefix_list: name of prefix list + # large-community-list: name of large community list + # community-ist: name of community list + # tag: tag id for static routes + # set: key, value pair for modifying route attributes + # localpref: preference value for the network + # med: metric value advertised for AS + # aspath: set AS path value + # weight: weight for the route + # community: standard community value to be attached + # large_community: large community value to be attached + # community_additive: if set to "additive", adds community/large-community + value to the existing values of the network prefix + Example: + -------- + input_dict = { + "r1": { + "route_maps": { + "rmap_match_prefix_list_1": [ + { + "action": "PERMIT", + "match": { + "ipv4": { + "prefix_list": "pf_list_1" + } + "ipv6": { + "prefix_list": "pf_list_1" + } + "large-community-list": { + "id": "community_1", + "exact_match": True + } + "community_list": { + "id": "community_2", + "exact_match": True + } + "tag": "tag_id" + }, + "set": { + "locPrf": 150, + "metric": 30, + "path": { + "num": 20000, + "action": "prepend", + }, + "weight": 500, + "community": { + "num": "1:2 2:3", + "action": additive + } + "large_community": { + "num": "1:2:3 4:5;6", + "action": additive + }, + } + } + ] + } + } + } + Returns + ------- + errormsg(str) or True + """ + + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + input_dict = deepcopy(input_dict) + try: + rmap_data_dict = {} + + for router in input_dict.keys(): + if "route_maps" not in input_dict[router]: + logger.debug("route_maps not present in input_dict") + continue + rmap_data = [] + for rmap_name, rmap_value in input_dict[router]["route_maps"].items(): + + for rmap_dict in rmap_value: + del_action = rmap_dict.setdefault("delete", False) + + if del_action: + rmap_data.append("no route-map {}".format(rmap_name)) + continue + + if "action" not in rmap_dict: + errormsg = "action not present in input_dict" + logger.error(errormsg) + return False + + rmap_action = rmap_dict.setdefault("action", "deny") + + seq_id = rmap_dict.setdefault("seq_id", None) + if seq_id is None: + seq_id = get_seq_id("route_maps", router, rmap_name) + else: + set_seq_id("route_maps", router, seq_id, rmap_name) + + rmap_data.append( + "route-map {} {} {}".format(rmap_name, rmap_action, seq_id) + ) + + if "continue" in rmap_dict: + continue_to = rmap_dict["continue"] + if continue_to: + rmap_data.append("on-match goto {}".format(continue_to)) + else: + logger.error( + "In continue, 'route-map entry " + "sequence number' is not provided" + ) + return False + + if "goto" in rmap_dict: + go_to = rmap_dict["goto"] + if go_to: + rmap_data.append("on-match goto {}".format(go_to)) + else: + logger.error( + "In goto, 'Goto Clause number' is not" " provided" + ) + return False + + if "call" in rmap_dict: + call_rmap = rmap_dict["call"] + if call_rmap: + rmap_data.append("call {}".format(call_rmap)) + else: + logger.error( + "In call, 'destination Route-Map' is" " not provided" + ) + return False + + # Verifying if SET criteria is defined + if "set" in rmap_dict: + set_data = rmap_dict["set"] + ipv4_data = set_data.setdefault("ipv4", {}) + ipv6_data = set_data.setdefault("ipv6", {}) + local_preference = set_data.setdefault("locPrf", None) + metric = set_data.setdefault("metric", None) + metric_type = set_data.setdefault("metric-type", None) + as_path = set_data.setdefault("path", {}) + weight = set_data.setdefault("weight", None) + community = set_data.setdefault("community", {}) + large_community = set_data.setdefault("large_community", {}) + large_comm_list = set_data.setdefault("large_comm_list", {}) + set_action = set_data.setdefault("set_action", None) + nexthop = set_data.setdefault("nexthop", None) + origin = set_data.setdefault("origin", None) + ext_comm_list = set_data.setdefault("extcommunity", {}) + metrictype = set_data.setdefault("metric-type", {}) + + # Local Preference + if local_preference: + rmap_data.append( + "set local-preference {}".format(local_preference) + ) + + # Metric-Type + if metrictype: + rmap_data.append("set metric-type {}\n".format(metrictype)) + + # Metric + if metric: + del_comm = set_data.setdefault("delete", None) + if del_comm: + rmap_data.append("no set metric {}".format(metric)) + else: + rmap_data.append("set metric {}".format(metric)) + + # Origin + if origin: + rmap_data.append("set origin {} \n".format(origin)) + + # AS Path Prepend + if as_path: + as_num = as_path.setdefault("as_num", None) + as_action = as_path.setdefault("as_action", None) + if as_action and as_num: + rmap_data.append( + "set as-path {} {}".format(as_action, as_num) + ) + + # Community + if community: + num = community.setdefault("num", None) + comm_action = community.setdefault("action", None) + if num: + cmd = "set community {}".format(num) + if comm_action: + cmd = "{} {}".format(cmd, comm_action) + rmap_data.append(cmd) + else: + logger.error("In community, AS Num not" " provided") + return False + + if large_community: + num = large_community.setdefault("num", None) + comm_action = large_community.setdefault("action", None) + if num: + cmd = "set large-community {}".format(num) + if comm_action: + cmd = "{} {}".format(cmd, comm_action) + + rmap_data.append(cmd) + else: + logger.error( + "In large_community, AS Num not" " provided" + ) + return False + if large_comm_list: + id = large_comm_list.setdefault("id", None) + del_comm = large_comm_list.setdefault("delete", None) + if id: + cmd = "set large-comm-list {}".format(id) + if del_comm: + cmd = "{} delete".format(cmd) + + rmap_data.append(cmd) + else: + logger.error("In large_comm_list 'id' not" " provided") + return False + + if ext_comm_list: + rt = ext_comm_list.setdefault("rt", None) + del_comm = ext_comm_list.setdefault("delete", None) + if rt: + cmd = "set extcommunity rt {}".format(rt) + if del_comm: + cmd = "{} delete".format(cmd) + + rmap_data.append(cmd) + else: + logger.debug("In ext_comm_list 'rt' not" " provided") + return False + + # Weight + if weight: + rmap_data.append("set weight {}".format(weight)) + if ipv6_data: + nexthop = ipv6_data.setdefault("nexthop", None) + if nexthop: + rmap_data.append("set ipv6 next-hop {}".format(nexthop)) + + # Adding MATCH and SET sequence to RMAP if defined + if "match" in rmap_dict: + match_data = rmap_dict["match"] + ipv4_data = match_data.setdefault("ipv4", {}) + ipv6_data = match_data.setdefault("ipv6", {}) + community = match_data.setdefault("community_list", {}) + large_community = match_data.setdefault("large_community", {}) + large_community_list = match_data.setdefault( + "large_community_list", {} + ) + + metric = match_data.setdefault("metric", None) + source_vrf = match_data.setdefault("source-vrf", None) + + if ipv4_data: + # fetch prefix list data from rmap + prefix_name = ipv4_data.setdefault("prefix_lists", None) + if prefix_name: + rmap_data.append( + "match ip address" + " prefix-list {}".format(prefix_name) + ) + + # fetch tag data from rmap + tag = ipv4_data.setdefault("tag", None) + if tag: + rmap_data.append("match tag {}".format(tag)) + + # fetch large community data from rmap + large_community_list = ipv4_data.setdefault( + "large_community_list", {} + ) + large_community = match_data.setdefault( + "large_community", {} + ) + + if ipv6_data: + prefix_name = ipv6_data.setdefault("prefix_lists", None) + if prefix_name: + rmap_data.append( + "match ipv6 address" + " prefix-list {}".format(prefix_name) + ) + + # fetch tag data from rmap + tag = ipv6_data.setdefault("tag", None) + if tag: + rmap_data.append("match tag {}".format(tag)) + + # fetch large community data from rmap + large_community_list = ipv6_data.setdefault( + "large_community_list", {} + ) + large_community = match_data.setdefault( + "large_community", {} + ) + + if community: + if "id" not in community: + logger.error( + "'id' is mandatory for " + "community-list in match" + " criteria" + ) + return False + cmd = "match community {}".format(community["id"]) + exact_match = community.setdefault("exact_match", False) + if exact_match: + cmd = "{} exact-match".format(cmd) + + rmap_data.append(cmd) + if large_community: + if "id" not in large_community: + logger.error( + "'id' is mandatory for " + "large-community-list in match " + "criteria" + ) + return False + cmd = "match large-community {}".format( + large_community["id"] + ) + exact_match = large_community.setdefault( + "exact_match", False + ) + if exact_match: + cmd = "{} exact-match".format(cmd) + rmap_data.append(cmd) + if large_community_list: + if "id" not in large_community_list: + logger.error( + "'id' is mandatory for " + "large-community-list in match " + "criteria" + ) + return False + cmd = "match large-community {}".format( + large_community_list["id"] + ) + exact_match = large_community_list.setdefault( + "exact_match", False + ) + if exact_match: + cmd = "{} exact-match".format(cmd) + rmap_data.append(cmd) + + if source_vrf: + cmd = "match source-vrf {}".format(source_vrf) + rmap_data.append(cmd) + + if metric: + cmd = "match metric {}".format(metric) + rmap_data.append(cmd) + + if rmap_data: + rmap_data_dict[router] = rmap_data + + result = create_common_configurations( + tgen, rmap_data_dict, "route_maps", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + + +def delete_route_maps(tgen, input_dict): + """ + Delete ip route maps from device + * `tgen` : Topogen object + * `input_dict` : for which router, + route map has to be deleted + Usage + ----- + # Delete route-map rmap_1 and rmap_2 from router r1 + input_dict = { + "r1": { + "route_maps": ["rmap_1", "rmap__2"] + } + } + result = delete_route_maps("ipv4", input_dict) + Returns + ------- + errormsg(str) or True + """ + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + for router in input_dict.keys(): + route_maps = input_dict[router]["route_maps"][:] + rmap_data = input_dict[router] + rmap_data["route_maps"] = {} + for route_map_name in route_maps: + rmap_data["route_maps"].update({route_map_name: [{"delete": True}]}) + + return create_route_maps(tgen, input_dict) + + +def create_bgp_community_lists(tgen, input_dict, build=False): + """ + Create bgp community-list or large-community-list on the devices as per + the arguments passed. Takes list of communities in input. + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + Usage + ----- + input_dict_1 = { + "r3": { + "bgp_community_lists": [ + { + "community_type": "standard", + "action": "permit", + "name": "rmap_lcomm_{}".format(addr_type), + "value": "1:1:1 1:2:3 2:1:1 2:2:2", + "large": True + } + ] + } + } + } + result = create_bgp_community_lists(tgen, input_dict_1) + """ + + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + input_dict = deepcopy(input_dict) + try: + config_data_dict = {} + + for router in input_dict.keys(): + if "bgp_community_lists" not in input_dict[router]: + errormsg = "bgp_community_lists not present in input_dict" + logger.debug(errormsg) + continue + + config_data = [] + + community_list = input_dict[router]["bgp_community_lists"] + for community_dict in community_list: + del_action = community_dict.setdefault("delete", False) + community_type = community_dict.setdefault("community_type", None) + action = community_dict.setdefault("action", None) + value = community_dict.setdefault("value", "") + large = community_dict.setdefault("large", None) + name = community_dict.setdefault("name", None) + if large: + cmd = "bgp large-community-list" + else: + cmd = "bgp community-list" + + if not large and not (community_type and action and value): + errormsg = ( + "community_type, action and value are " + "required in bgp_community_list" + ) + logger.error(errormsg) + return False + + cmd = "{} {} {} {} {}".format(cmd, community_type, name, action, value) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + if config_data: + config_data_dict[router] = config_data + + result = create_common_configurations( + tgen, config_data_dict, "bgp_community_list", build=build + ) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + + +def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False): + """ + Shutdown or bringup router's interface " + * `tgen` : Topogen object + * `dut` : Device under test + * `intf_name` : Interface name to be shut/no shut + * `ifaceaction` : Action, to shut/no shut interface, + by default is False + Usage + ----- + dut = "r3" + intf = "r3-r1-eth0" + # Shut down interface + shutdown_bringup_interface(tgen, dut, intf, False) + # Bring up interface + shutdown_bringup_interface(tgen, dut, intf, True) + Returns + ------- + errormsg(str) or True + """ + + router_list = tgen.routers() + if ifaceaction: + logger.info("Bringing up interface {} : {}".format(dut, intf_name)) + else: + logger.info("Shutting down interface {} : {}".format(dut, intf_name)) + + interface_set_status(router_list[dut], intf_name, ifaceaction) + + +def addKernelRoute( + tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None +): + """ + Add route to kernel + + Parameters: + ----------- + * `tgen` : Topogen object + * `router`: router for which kernel routes needs to be added + * `intf`: interface name, for which kernel routes needs to be added + * `bindToAddress`: bind to <host>, an interface or multicast + address + + returns: + -------- + errormsg or True + """ + + logger.debug("Entering lib API: addKernelRoute()") + + rnode = tgen.gears[router] + + if type(group_addr_range) is not list: + group_addr_range = [group_addr_range] + + for grp_addr in group_addr_range: + + addr_type = validate_ip_address(grp_addr) + if addr_type == "ipv4": + if next_hop is not None: + cmd = "ip route add {} via {}".format(grp_addr, next_hop) + else: + cmd = "ip route add {} dev {}".format(grp_addr, intf) + if del_action: + cmd = "ip route del {}".format(grp_addr) + verify_cmd = "ip route" + elif addr_type == "ipv6": + if intf and src: + cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src) + else: + cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop) + verify_cmd = "ip -6 route" + if del_action: + cmd = "ip -6 route del {}".format(grp_addr) + + logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd)) + output = rnode.run(cmd) + + def check_in_kernel(rnode, verify_cmd, grp_addr, router): + # Verifying if ip route added to kernel + errormsg = None + result = rnode.run(verify_cmd) + logger.debug("{}\n{}".format(verify_cmd, result)) + if "/" in grp_addr: + ip, mask = grp_addr.split("/") + if mask == "32" or mask == "128": + grp_addr = ip + else: + mask = "32" if addr_type == "ipv4" else "128" + + if not re_search(r"{}".format(grp_addr), result) and mask != "0": + errormsg = ( + "[DUT: {}]: Kernal route is not added for group" + " address {} Config output: {}".format( + router, grp_addr, output + ) + ) + + return errormsg + + test_func = functools.partial( + check_in_kernel, rnode, verify_cmd, grp_addr, router + ) + (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1) + assert result, out + + logger.debug("Exiting lib API: addKernelRoute()") + return True + + +def configure_vxlan(tgen, input_dict): + """ + Add and configure vxlan + + * `tgen`: tgen object + * `input_dict` : data for vxlan config + + Usage: + ------ + input_dict= { + "dcg2":{ + "vxlan":[{ + "vxlan_name": "vxlan75100", + "vxlan_id": "75100", + "dstport": 4789, + "local_addr": "120.0.0.1", + "learning": "no", + "delete": True + }] + } + } + + configure_vxlan(tgen, input_dict) + + Returns: + ------- + True or errormsg + + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = router_list[dut] + + if "vxlan" in input_dict[dut]: + for vxlan_dict in input_dict[dut]["vxlan"]: + cmd = "ip link " + + del_vxlan = vxlan_dict.setdefault("delete", None) + vxlan_names = vxlan_dict.setdefault("vxlan_name", []) + vxlan_ids = vxlan_dict.setdefault("vxlan_id", []) + dstport = vxlan_dict.setdefault("dstport", None) + local_addr = vxlan_dict.setdefault("local_addr", None) + learning = vxlan_dict.setdefault("learning", None) + + config_data = [] + if vxlan_names and vxlan_ids: + for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids): + cmd = "ip link" + + if del_vxlan: + cmd = "{} del {} type vxlan id {}".format( + cmd, vxlan_name, vxlan_id + ) + else: + cmd = "{} add {} type vxlan id {}".format( + cmd, vxlan_name, vxlan_id + ) + + if dstport: + cmd = "{} dstport {}".format(cmd, dstport) + + if local_addr: + ip_cmd = "ip addr add {} dev {}".format( + local_addr, vxlan_name + ) + if del_vxlan: + ip_cmd = "ip addr del {} dev {}".format( + local_addr, vxlan_name + ) + + config_data.append(ip_cmd) + + cmd = "{} local {}".format(cmd, local_addr) + + if learning == "no": + cmd = "{} nolearning".format(cmd) + + elif learning == "yes": + cmd = "{} learning".format(cmd) + + config_data.append(cmd) + + try: + for _cmd in config_data: + logger.info("[DUT: %s]: Running command: %s", dut, _cmd) + rnode.run(_cmd) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return True + + +def configure_brctl(tgen, topo, input_dict): + """ + Add and configure brctl + + * `tgen`: tgen object + * `input_dict` : data for brctl config + + Usage: + ------ + input_dict= { + dut:{ + "brctl": [{ + "brctl_name": "br100", + "addvxlan": "vxlan75100", + "vrf": "RED", + "stp": "off" + }] + } + } + + configure_brctl(tgen, topo, input_dict) + + Returns: + ------- + True or errormsg + + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = router_list[dut] + + if "brctl" in input_dict[dut]: + for brctl_dict in input_dict[dut]["brctl"]: + + brctl_names = brctl_dict.setdefault("brctl_name", []) + addvxlans = brctl_dict.setdefault("addvxlan", []) + stp_values = brctl_dict.setdefault("stp", []) + vrfs = brctl_dict.setdefault("vrf", []) + + ip_cmd = "ip link set" + for brctl_name, vxlan, vrf, stp in zip( + brctl_names, addvxlans, vrfs, stp_values + ): + + ip_cmd_list = [] + cmd = "ip link add name {} type bridge stp_state {}".format( + brctl_name, stp + ) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name)) + + if vxlan: + cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan)) + + if vrf: + ip_cmd_list.append( + "{} dev {} master {}".format(ip_cmd, brctl_name, vrf) + ) + + for intf_name, data in topo["routers"][dut]["links"].items(): + if "vrf" not in data: + continue + + if data["vrf"] == vrf: + ip_cmd_list.append( + "{} up dev {}".format(ip_cmd, data["interface"]) + ) + + try: + for _ip_cmd in ip_cmd_list: + logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd) + rnode.run(_ip_cmd) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def configure_interface_mac(tgen, input_dict): + """ + Add and configure brctl + + * `tgen`: tgen object + * `input_dict` : data for mac config + + input_mac= { + "edge1":{ + "br75100": "00:80:48:BA:d1:00, + "br75200": "00:80:48:BA:d1:00 + } + } + + configure_interface_mac(tgen, input_mac) + + Returns: + ------- + True or errormsg + + """ + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = router_list[dut] + + for intf, mac in input_dict[dut].items(): + cmd = "ip link set {} address {}".format(intf, mac) + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + + try: + result = rnode.run(cmd) + if len(result) != 0: + return result + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + return True + + +def socat_send_igmp_join_traffic( + tgen, + server, + protocol_option, + igmp_groups, + send_from_intf, + send_from_intf_ip=None, + port=12345, + reuseaddr=True, + join=False, + traffic=False, +): + """ + API to send IGMP join using SOCAT tool + + Parameters: + ----------- + * `tgen` : Topogen object + * `server`: iperf server, from where IGMP join would be sent + * `protocol_option`: Protocol options, ex: UDP6-RECV + * `igmp_groups`: IGMP group for which join has to be sent + * `send_from_intf`: Interface from which join would be sent + * `send_from_intf_ip`: Interface IP, default is None + * `port`: Port to be used, default is 12345 + * `reuseaddr`: True|False, bydefault True + * `join`: If join needs to be sent + * `traffic`: If traffic needs to be sent + + returns: + -------- + errormsg or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + rnode = tgen.routers()[server] + socat_cmd = "socat -u " + + # UDP4/TCP4/UDP6/UDP6-RECV + if protocol_option: + socat_cmd += "{}".format(protocol_option) + + if port: + socat_cmd += ":{},".format(port) + + if reuseaddr: + socat_cmd += "{},".format("reuseaddr") + + # Group address range to cover + if igmp_groups: + if not isinstance(igmp_groups, list): + igmp_groups = [igmp_groups] + + for igmp_group in igmp_groups: + if join: + join_traffic_option = "ipv6-join-group" + elif traffic: + join_traffic_option = "ipv6-join-group-source" + + if send_from_intf and not send_from_intf_ip: + socat_cmd += "{}='[{}]:{}'".format( + join_traffic_option, igmp_group, send_from_intf + ) + else: + socat_cmd += "{}='[{}]:{}:[{}]'".format( + join_traffic_option, igmp_group, send_from_intf, send_from_intf_ip + ) + + socat_cmd += " STDOUT" + + socat_cmd += " &>{}/socat.logs &".format(tgen.logdir) + + # Run socat command to send IGMP join + logger.info("[DUT: {}]: Running command: [{}]".format(server, socat_cmd)) + output = rnode.run("set +m; {} sleep 0.5".format(socat_cmd)) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +############################################# +# Verification APIs +############################################# +@retry(retry_timeout=40) +def verify_rib( + tgen, + addr_type, + dut, + input_dict, + next_hop=None, + protocol=None, + tag=None, + metric=None, + fib=None, + count_only=False, + admin_distance=None, +): + """ + Data will be read from input_dict or input JSON file, API will generate + same prefixes, which were redistributed by either create_static_routes() or + advertise_networks_using_network_command() and do will verify next_hop and + each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" + command o/p. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test, for which user wants to test the data + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default: static + * `protocol`[optional]: protocol, default = None + * `count_only`[optional]: count of nexthops only, not specific addresses, + default = False + + Usage + ----- + # RIB can be verified for static routes OR network advertised using + network command. Following are input_dicts to create static routes + and advertise networks using network command. Any one of the input_dict + can be passed to verify_rib() to verify routes in DUT"s RIB. + + # Creating static routes for r1 + input_dict = { + "r1": { + "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \ + "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}] + }} + # Advertising networks using network command in router r1 + input_dict = { + "r1": { + "advertise_networks": [{"start_ip": "20.0.0.0/32", + "no_of_network": 10}, + {"start_ip": "30.0.0.0/32"}] + }} + # Verifying ipv4 routes in router r1 learned via BGP + dut = "r2" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + additional_nexthops_in_required_nhs = [] + found_hops = [] + for routerInput in input_dict.keys(): + for router, rnode in router_list.items(): + if router != dut: + continue + + logger.info("Checking router %s RIB:", router) + + # Verifying RIB routes + if addr_type == "ipv4": + command = "show ip route" + else: + command = "show ipv6 route" + + found_routes = [] + missing_routes = [] + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + + for static_route in static_routes: + if "vrf" in static_route and static_route["vrf"] is not None: + + logger.info( + "[DUT: {}]: Verifying routes for VRF:" + " {}".format(router, static_route["vrf"]) + ) + + cmd = "{} vrf {}".format(command, static_route["vrf"]) + + else: + cmd = "{}".format(command) + + if protocol: + cmd = "{} {}".format(cmd, protocol) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + network = static_route["network"] + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 1 + + if "tag" in static_route: + _tag = static_route["tag"] + else: + _tag = None + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + st_found = False + nh_found = False + + for st_rt in ip_list: + st_rt = str( + ipaddress.ip_network(frr_unicode(st_rt), strict=False) + ) + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if "queued" in rib_routes_json[st_rt][0]: + errormsg = "Route {} is queued\n".format(st_rt) + return errormsg + + if fib and next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + for mnh in range(0, len(rib_routes_json[st_rt])): + if not "selected" in rib_routes_json[st_rt][mnh]: + continue + + if ( + "fib" + in rib_routes_json[st_rt][mnh]["nexthops"][0] + ): + found_hops.append( + [ + rib_r["ip"] + for rib_r in rib_routes_json[st_rt][ + mnh + ]["nexthops"] + ] + ) + + if found_hops[0]: + missing_list_of_nexthops = set( + found_hops[0] + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops[0]) + + if additional_nexthops_in_required_nhs: + logger.info( + "Nexthop " + "%s is not active for route %s in " + "RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is not active" + " for route {} in RIB of router" + " {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + + elif next_hop and fib is None: + if type(next_hop) is not list: + next_hop = [next_hop] + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json[st_rt][0]["nexthops"] + if "ip" in rib_r + ] + + # If somehow key "ip" is not found in nexthops JSON + # then found_hops would be 0, this particular + # situation will be handled here + if not len(found_hops): + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + next_hop, + st_rt, + dut, + ) + ) + return errormsg + + # Check only the count of nexthops + if count_only: + if len(next_hop) == len(found_hops): + nh_found = True + else: + errormsg = ( + "Nexthops are missing for " + "route {} in RIB of router {}: " + "expected {}, found {}\n".format( + st_rt, + dut, + len(next_hop), + len(found_hops), + ) + ) + return errormsg + + # Check the actual nexthops + elif found_hops: + missing_list_of_nexthops = set( + found_hops + ).difference(next_hop) + additional_nexthops_in_required_nhs = set( + next_hop + ).difference(found_hops) + + if additional_nexthops_in_required_nhs: + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) + return errormsg + else: + nh_found = True + + if tag: + if "tag" not in rib_routes_json[st_rt][0]: + errormsg = ( + "[DUT: {}]: tag is not" + " present for" + " route {} in RIB \n".format(dut, st_rt) + ) + return errormsg + + if _tag != rib_routes_json[st_rt][0]["tag"]: + errormsg = ( + "[DUT: {}]: tag value {}" + " is not matched for" + " route {} in RIB \n".format( + dut, + _tag, + st_rt, + ) + ) + return errormsg + + if admin_distance is not None: + if "distance" not in rib_routes_json[st_rt][0]: + errormsg = ( + "[DUT: {}]: admin distance is" + " not present for" + " route {} in RIB \n".format(dut, st_rt) + ) + return errormsg + + if ( + admin_distance + != rib_routes_json[st_rt][0]["distance"] + ): + errormsg = ( + "[DUT: {}]: admin distance value " + "{} is not matched for " + "route {} in RIB \n".format( + dut, + admin_distance, + st_rt, + ) + ) + return errormsg + + if metric is not None: + if "metric" not in rib_routes_json[st_rt][0]: + errormsg = ( + "[DUT: {}]: metric is" + " not present for" + " route {} in RIB \n".format(dut, st_rt) + ) + return errormsg + + if metric != rib_routes_json[st_rt][0]["metric"]: + errormsg = ( + "[DUT: {}]: metric value " + "{} is not matched for " + "route {} in RIB \n".format( + dut, + metric, + st_rt, + ) + ) + return errormsg + + else: + missing_routes.append(st_rt) + + if nh_found: + logger.info( + "[DUT: {}]: Found next_hop {} for" + " RIB routes: {}".format(router, next_hop, found_routes) + ) + + if len(missing_routes) > 0: + errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format( + dut, missing_routes + ) + return errormsg + + if found_routes: + logger.info( + "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n", + dut, + found_routes, + ) + + continue + + if "bgp" in input_dict[routerInput]: + if ( + "advertise_networks" + not in input_dict[routerInput]["bgp"]["address_family"][addr_type][ + "unicast" + ] + ): + continue + + found_routes = [] + missing_routes = [] + advertise_network = input_dict[routerInput]["bgp"]["address_family"][ + addr_type + ]["unicast"]["advertise_networks"] + + # Continue if there are no network advertise + if len(advertise_network) == 0: + continue + + for advertise_network_dict in advertise_network: + if "vrf" in advertise_network_dict: + cmd = "{} vrf {} json".format( + command, advertise_network_dict["vrf"] + ) + else: + cmd = "{} json".format(command) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + start_ip = advertise_network_dict["network"] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(start_ip, no_of_network) + st_found = False + nh_found = False + + for st_rt in ip_list: + st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False)) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if "queued" in rib_routes_json[st_rt][0]: + errormsg = "Route {} is queued\n".format(st_rt) + return errormsg + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + count = 0 + for nh in next_hop: + for nh_dict in rib_routes_json[st_rt][0]["nexthops"]: + if nh_dict["ip"] != nh: + continue + else: + count += 1 + + if count == len(next_hop): + nh_found = True + else: + errormsg = ( + "Nexthop {} is Missing" + " for route {} in " + "RIB of router {}\n".format(next_hop, st_rt, dut) + ) + return errormsg + else: + missing_routes.append(st_rt) + + if nh_found: + logger.info( + "Found next_hop {} for all routes in RIB" + " of router {}\n".format(next_hop, dut) + ) + + if len(missing_routes) > 0: + errormsg = ( + "Missing {} route in RIB of router {}, " + "routes: {} \n".format(addr_type, dut, missing_routes) + ) + return errormsg + + if found_routes: + logger.info( + "Verified {} routes in router {} RIB, found" + " routes are: {}\n".format(addr_type, dut, found_routes) + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=12) +def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): + """ + Data will be read from input_dict or input JSON file, API will generate + same prefixes, which were redistributed by either create_static_routes() or + advertise_networks_using_network_command() and will verify next_hop and + each prefix/routes is present in "show ip/ipv6 fib json" + command o/p. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test, for which user wants to test the data + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default: static + + Usage + ----- + input_routes_r1 = { + "r1": { + "static_routes": [{ + "network": ["1.1.1.1/32], + "next_hop": "Null0", + "vrf": "RED" + }] + } + } + result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + if dut not in router_list: + return + + for routerInput in input_dict.keys(): + # XXX replace with router = dut; rnode = router_list[dut] + for router, rnode in router_list.items(): + if router != dut: + continue + + logger.info("Checking router %s FIB routes:", router) + + # Verifying RIB routes + if addr_type == "ipv4": + command = "show ip fib" + else: + command = "show ipv6 fib" + + found_routes = [] + missing_routes = [] + + if protocol: + command = "{} {}".format(command, protocol) + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + + for static_route in static_routes: + if "vrf" in static_route and static_route["vrf"] is not None: + + logger.info( + "[DUT: {}]: Verifying routes for VRF:" + " {}".format(router, static_route["vrf"]) + ) + + cmd = "{} vrf {}".format(command, static_route["vrf"]) + + else: + cmd = "{}".format(command) + + cmd = "{} json".format(cmd) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "[DUT: {}]: No route found in fib".format(router) + return errormsg + + network = static_route["network"] + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 1 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + st_found = False + nh_found = False + + for st_rt in ip_list: + st_rt = str( + ipaddress.ip_network(frr_unicode(st_rt), strict=False) + ) + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + count = 0 + for nh in next_hop: + for nh_dict in rib_routes_json[st_rt][0][ + "nexthops" + ]: + if nh_dict["ip"] != nh: + continue + else: + count += 1 + + if count == len(next_hop): + nh_found = True + else: + missing_routes.append(st_rt) + errormsg = ( + "Nexthop {} is Missing" + " for route {} in " + "RIB of router {}\n".format( + next_hop, st_rt, dut + ) + ) + return errormsg + + else: + missing_routes.append(st_rt) + + if len(missing_routes) > 0: + errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format( + dut, missing_routes + ) + return errormsg + + if nh_found: + logger.info( + "Found next_hop {} for all routes in RIB" + " of router {}\n".format(next_hop, dut) + ) + + if found_routes: + logger.info( + "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n", + dut, + found_routes, + ) + + continue + + if "bgp" in input_dict[routerInput]: + if ( + "advertise_networks" + not in input_dict[routerInput]["bgp"]["address_family"][addr_type][ + "unicast" + ] + ): + continue + + found_routes = [] + missing_routes = [] + advertise_network = input_dict[routerInput]["bgp"]["address_family"][ + addr_type + ]["unicast"]["advertise_networks"] + + # Continue if there are no network advertise + if len(advertise_network) == 0: + continue + + for advertise_network_dict in advertise_network: + if "vrf" in advertise_network_dict: + cmd = "{} vrf {} json".format(command, static_route["vrf"]) + else: + cmd = "{} json".format(command) + + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No route found in rib of router {}..".format(router) + return errormsg + + start_ip = advertise_network_dict["network"] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + # Generating IPs for verification + ip_list = generate_ips(start_ip, no_of_network) + st_found = False + nh_found = False + + for st_rt in ip_list: + st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False)) + + _addr_type = validate_ip_address(st_rt) + if _addr_type != addr_type: + continue + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + count = 0 + for nh in next_hop: + for nh_dict in rib_routes_json[st_rt][0]["nexthops"]: + if nh_dict["ip"] != nh: + continue + else: + count += 1 + + if count == len(next_hop): + nh_found = True + else: + missing_routes.append(st_rt) + errormsg = ( + "Nexthop {} is Missing" + " for route {} in " + "RIB of router {}\n".format(next_hop, st_rt, dut) + ) + return errormsg + else: + missing_routes.append(st_rt) + + if len(missing_routes) > 0: + errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format( + dut, missing_routes + ) + return errormsg + + if nh_found: + logger.info( + "Found next_hop {} for all routes in RIB" + " of router {}\n".format(next_hop, dut) + ) + + if found_routes: + logger.info( + "[DUT: {}]: Verified routes FIB" + ", found routes are: {}\n".format(dut, found_routes) + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def verify_admin_distance_for_static_routes(tgen, input_dict): + """ + API to verify admin distance for static routes as defined in input_dict/ + input JSON by running show ip/ipv6 route json command. + Parameter + --------- + * `tgen` : topogen object + * `input_dict`: having details like - for which router and static routes + admin dsitance needs to be verified + Usage + ----- + # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop + 10.0.0.2 in router r1 + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2" + }] + } + } + result = verify_admin_distance_for_static_routes(tgen, input_dict) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for router in input_dict.keys(): + if router not in router_list: + continue + rnode = router_list[router] + + for static_route in input_dict[router]["static_routes"]: + addr_type = validate_ip_address(static_route["network"]) + # Command to execute + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + show_ip_route_json = run_frr_cmd(rnode, command, isjson=True) + + logger.info( + "Verifying admin distance for static route %s" " under dut %s:", + static_route, + router, + ) + network = static_route["network"] + next_hop = static_route["next_hop"] + admin_distance = static_route["admin_distance"] + route_data = show_ip_route_json[network][0] + if network in show_ip_route_json: + if route_data["nexthops"][0]["ip"] == next_hop: + if route_data["distance"] != admin_distance: + errormsg = ( + "Verification failed: admin distance" + " for static route {} under dut {}," + " found:{} but expected:{}".format( + static_route, + router, + route_data["distance"], + admin_distance, + ) + ) + return errormsg + else: + logger.info( + "Verification successful: admin" + " distance for static route %s under" + " dut %s, found:%s", + static_route, + router, + route_data["distance"], + ) + + else: + errormsg = ( + "Static route {} not found in " + "show_ip_route_json for dut {}".format(network, router) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def verify_prefix_lists(tgen, input_dict): + """ + Running "show ip prefix-list" command and verifying given prefix-list + is present in router. + Parameters + ---------- + * `tgen` : topogen object + * `input_dict`: data to verify prefix lists + Usage + ----- + # To verify pf_list_1 is present in router r1 + input_dict = { + "r1": { + "prefix_lists": ["pf_list_1"] + }} + result = verify_prefix_lists("ipv4", input_dict, tgen) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for router in input_dict.keys(): + if router not in router_list: + continue + + rnode = router_list[router] + + # Show ip prefix list + show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list") + + # Verify Prefix list is deleted + prefix_lists_addr = input_dict[router]["prefix_lists"] + for addr_type in prefix_lists_addr: + if not check_address_types(addr_type): + continue + # show ip prefix list + if addr_type == "ipv4": + cmd = "show ip prefix-list" + else: + cmd = "show {} prefix-list".format(addr_type) + show_prefix_list = run_frr_cmd(rnode, cmd) + for prefix_list in prefix_lists_addr[addr_type].keys(): + if prefix_list in show_prefix_list: + errormsg = ( + "Prefix list {} is/are present in the router" + " {}".format(prefix_list, router) + ) + return errormsg + + logger.info( + "Prefix list %s is/are not present in the router" " from router %s", + prefix_list, + router, + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=12) +def verify_route_maps(tgen, input_dict): + """ + Running "show route-map" command and verifying given route-map + is present in router. + Parameters + ---------- + * `tgen` : topogen object + * `input_dict`: data to verify prefix lists + Usage + ----- + # To verify rmap_1 and rmap_2 are present in router r1 + input_dict = { + "r1": { + "route_maps": ["rmap_1", "rmap_2"] + } + } + result = verify_route_maps(tgen, input_dict) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for router in input_dict.keys(): + if router not in router_list: + continue + + rnode = router_list[router] + # Show ip route-map + show_route_maps = rnode.vtysh_cmd("show route-map") + + # Verify route-map is deleted + route_maps = input_dict[router]["route_maps"] + for route_map in route_maps: + if route_map in show_route_maps: + errormsg = "Route map {} is not deleted from router" " {}".format( + route_map, router + ) + return errormsg + + logger.info( + "Route map %s is/are deleted successfully from" " router %s", + route_maps, + router, + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=16) +def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): + """ + API to veiryf BGP large community is attached in route for any given + DUT by running "show bgp ipv4/6 {route address} json" command. + Parameters + ---------- + * `tgen`: topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `network`: network for which set criteria needs to be verified + * `input_dict`: having details like - for which router, community and + values needs to be verified + Usage + ----- + networks = ["200.50.2.0/32"] + input_dict = { + "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5" + } + result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + router_list = tgen.routers() + if router not in router_list: + return False + + rnode = router_list[router] + + logger.debug( + "Verifying BGP community attributes on dut %s: for %s " "network %s", + router, + addr_type, + network, + ) + + for net in network: + cmd = "show bgp {} {} json".format(addr_type, net) + show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True) + logger.info(show_bgp_json) + if "paths" not in show_bgp_json: + return "Prefix {} not found in BGP table of router: {}".format(net, router) + + as_paths = show_bgp_json["paths"] + found = False + for i in range(len(as_paths)): + if ( + "largeCommunity" in show_bgp_json["paths"][i] + or "community" in show_bgp_json["paths"][i] + ): + found = True + logger.info( + "Large Community attribute is found for route:" " %s in router: %s", + net, + router, + ) + if input_dict is not None: + for criteria, comm_val in input_dict.items(): + show_val = show_bgp_json["paths"][i][criteria]["string"] + if comm_val == show_val: + logger.info( + "Verifying BGP %s for prefix: %s" + " in router: %s, found expected" + " value: %s", + criteria, + net, + router, + comm_val, + ) + else: + errormsg = ( + "Failed: Verifying BGP attribute" + " {} for route: {} in router: {}" + ", expected value: {} but found" + ": {}".format(criteria, net, router, comm_val, show_val) + ) + return errormsg + + if not found: + errormsg = ( + "Large Community attribute is not found for route: " + "{} in router: {} ".format(net, router) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def get_ipv6_linklocal_address(topo, node, intf): + """ + API to get the link local ipv6 address of a particular interface + + Parameters + ---------- + * `node`: node on which link local ip to be fetched. + * `intf` : interface for which link local ip needs to be returned. + * `topo` : base topo + + Usage + ----- + result = get_ipv6_linklocal_address(topo, 'r1', 'r2') + + Returns link local ip of interface between r1 and r2. + + Returns + ------- + 1) link local ipv6 address from the interface + 2) errormsg - when link local ip not found + """ + tgen = get_topogen() + ext_nh = tgen.net[node].get_ipv6_linklocal() + req_nh = topo[node]["links"][intf]["interface"] + llip = None + for llips in ext_nh: + if llips[0] == req_nh: + llip = llips[1] + logger.info("Link local ip found = %s", llip) + return llip + + errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format( + node, intf + ) + + return errormsg + + +def verify_create_community_list(tgen, input_dict): + """ + API is to verify if large community list is created for any given DUT in + input_dict by running "sh bgp large-community-list {"comm_name"} detail" + command. + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: having details like - for which router, large community + needs to be verified + Usage + ----- + input_dict = { + "r1": { + "large-community-list": { + "standard": { + "Test1": [{"action": "PERMIT", "attribute":\ + ""}] + }}}} + result = verify_create_community_list(tgen, input_dict) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for router in input_dict.keys(): + if router not in router_list: + continue + + rnode = router_list[router] + + logger.info("Verifying large-community is created for dut %s:", router) + + for comm_data in input_dict[router]["bgp_community_lists"]: + comm_name = comm_data["name"] + comm_type = comm_data["community_type"] + show_bgp_community = run_frr_cmd( + rnode, "show bgp large-community-list {} detail".format(comm_name) + ) + + # Verify community list and type + if comm_name in show_bgp_community and comm_type in show_bgp_community: + logger.info( + "BGP %s large-community-list %s is" " created", comm_type, comm_name + ) + else: + errormsg = "BGP {} large-community-list {} is not" " created".format( + comm_type, comm_name + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def verify_cli_json(tgen, input_dict): + """ + API to verify if JSON is available for clis + command. + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: CLIs for which JSON needs to be verified + Usage + ----- + input_dict = { + "edge1":{ + "cli": ["show evpn vni detail", show evpn rmac vni all] + } + } + + result = verify_cli_json(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.gears[dut] + + for cli in input_dict[dut]["cli"]: + logger.info( + "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli + ) + + test_cli = "{} json".format(cli) + ret_json = rnode.vtysh_cmd(test_cli, isjson=True) + if not bool(ret_json): + errormsg = "CLI: %s, JSON format is not available" % (cli) + return errormsg + elif "unknown" in ret_json or "Unknown" in ret_json: + errormsg = "CLI: %s, JSON format is not available" % (cli) + return errormsg + else: + logger.info( + "CLI : %s JSON format is available: " "\n %s", cli, ret_json + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return True + + +@retry(retry_timeout=12) +def verify_evpn_vni(tgen, input_dict): + """ + API to verify evpn vni details using "show evpn vni detail json" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: having details like - for which router, evpn details + needs to be verified + Usage + ----- + input_dict = { + "edge1":{ + "vni": [ + { + "75100":{ + "vrf": "RED", + "vxlanIntf": "vxlan75100", + "localVtepIp": "120.1.1.1", + "sviIntf": "br100" + } + } + ] + } + } + + result = verify_evpn_vni(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.gears[dut] + + logger.info("[DUT: %s]: Verifying evpn vni details :", dut) + + cmd = "show evpn vni detail json" + evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True) + if not bool(evpn_all_vni_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if "vni" in input_dict[dut]: + for vni_dict in input_dict[dut]["vni"]: + found = False + vni = vni_dict["name"] + for evpn_vni_json in evpn_all_vni_json: + if "vni" in evpn_vni_json: + if evpn_vni_json["vni"] != int(vni): + continue + + for attribute in vni_dict.keys(): + if vni_dict[attribute] != evpn_vni_json[attribute]: + errormsg = ( + "[DUT: %s] Verifying " + "%s for VNI: %s [FAILED]||" + ", EXPECTED : %s " + " FOUND : %s" + % ( + dut, + attribute, + vni, + vni_dict[attribute], + evpn_vni_json[attribute], + ) + ) + return errormsg + + else: + found = True + logger.info( + "[DUT: %s] Verifying" + " %s for VNI: %s , " + "Found Expected : %s ", + dut, + attribute, + vni, + evpn_vni_json[attribute], + ) + + if evpn_vni_json["state"] != "Up": + errormsg = ( + "[DUT: %s] Failed: Verifying" + " State for VNI: %s is not Up" % (dut, vni) + ) + return errormsg + + else: + errormsg = ( + "[DUT: %s] Failed:" + " VNI: %s is not present in JSON" % (dut, vni) + ) + return errormsg + + if found: + logger.info( + "[DUT %s]: Verifying VNI : %s " + "details and state is Up [PASSED]!!", + dut, + vni, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Failed:" " vni details are not present in input data" % (dut) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False + + +@retry(retry_timeout=12) +def verify_vrf_vni(tgen, input_dict): + """ + API to verify vrf vni details using "show vrf vni json" + command. + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: having details like - for which router, evpn details + needs to be verified + Usage + ----- + input_dict = { + "edge1":{ + "vrfs": [ + { + "RED":{ + "vni": 75000, + "vxlanIntf": "vxlan75100", + "sviIntf": "br100", + "routerMac": "00:80:48:ba:d1:00", + "state": "Up" + } + } + ] + } + } + + result = verify_vrf_vni(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.gears[dut] + + logger.info("[DUT: %s]: Verifying vrf vni details :", dut) + + cmd = "show vrf vni json" + vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True) + if not bool(vrf_all_vni_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if "vrfs" in input_dict[dut]: + for vrfs in input_dict[dut]["vrfs"]: + for vrf, vrf_dict in vrfs.items(): + found = False + for vrf_vni_json in vrf_all_vni_json["vrfs"]: + if "vrf" in vrf_vni_json: + if vrf_vni_json["vrf"] != vrf: + continue + + for attribute in vrf_dict.keys(): + if vrf_dict[attribute] == vrf_vni_json[attribute]: + found = True + logger.info( + "[DUT %s]: VRF: %s, " + "verifying %s " + ", Found Expected: %s " + "[PASSED]!!", + dut, + vrf, + attribute, + vrf_vni_json[attribute], + ) + else: + errormsg = ( + "[DUT: %s] VRF: %s, " + "verifying %s [FAILED!!] " + ", EXPECTED : %s " + ", FOUND : %s" + % ( + dut, + vrf, + attribute, + vrf_dict[attribute], + vrf_vni_json[attribute], + ) + ) + return errormsg + + else: + errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % ( + dut, + vrf, + ) + return errormsg + + if found: + logger.info( + "[DUT %s] Verifying VRF: %s " " details [PASSED]!!", + dut, + vrf, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False + + +def required_linux_kernel_version(required_version): + """ + This API is used to check linux version compatibility of the test suite. + If version mentioned in required_version is higher than the linux kernel + of the system, test suite will be skipped. This API returns true or errormsg. + + Parameters + ---------- + * `required_version` : Kernel version required for the suites to run. + + Usage + ----- + result = linux_kernel_version_lowerthan('4.15') + + Returns + ------- + errormsg(str) or True + """ + system_kernel = platform.release() + if version_cmp(system_kernel, required_version) < 0: + error_msg = ( + 'These tests will not run on kernel "{}", ' + "they require kernel >= {})".format(system_kernel, required_version) + ) + + logger.info(error_msg) + + return error_msg + return True + + +class HostApplicationHelper(object): + """Helper to track and cleanup per-host based test processes.""" + + def __init__(self, tgen=None, base_cmd=None): + self.base_cmd_str = "" + self.host_procs = {} + self.tgen = None + self.set_base_cmd(base_cmd if base_cmd else []) + if tgen is not None: + self.init(tgen) + + def __enter__(self): + self.init() + return self + + def __exit__(self, type, value, traceback): + self.cleanup() + + def __str__(self): + return "HostApplicationHelper({})".format(self.base_cmd_str) + + def set_base_cmd(self, base_cmd): + assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple) + self.base_cmd = base_cmd + if base_cmd: + self.base_cmd_str = " ".join(base_cmd) + else: + self.base_cmd_str = "" + + def init(self, tgen=None): + """Initialize the helper with tgen if needed. + + If overridden, need to handle multiple entries but one init. Will be called on + object creation if tgen is supplied. Will be called again on __enter__ so should + not re-init if already inited. + """ + if self.tgen: + assert tgen is None or self.tgen == tgen + else: + self.tgen = tgen + + def started_proc(self, host, p): + """Called after process started on host. + + Return value is passed to `stopping_proc` method.""" + logger.debug("%s: Doing nothing after starting process", self) + return False + + def stopping_proc(self, host, p, info): + """Called after process started on host.""" + logger.debug("%s: Doing nothing before stopping process", self) + + def _add_host_proc(self, host, p): + v = self.started_proc(host, p) + + if host not in self.host_procs: + self.host_procs[host] = [] + logger.debug("%s: %s: tracking process %s", self, host, p) + self.host_procs[host].append((p, v)) + + def stop_host(self, host): + """Stop the process on the host. + + Override to do additional cleanup.""" + if host in self.host_procs: + hlogger = self.tgen.net[host].logger + for p, v in self.host_procs[host]: + self.stopping_proc(host, p, v) + logger.debug("%s: %s: terminating process %s", self, host, p.pid) + hlogger.debug("%s: %s: terminating process %s", self, host, p.pid) + rc = p.poll() + if rc is not None: + logger.error( + "%s: %s: process early exit %s: %s", + self, + host, + p.pid, + comm_error(p), + ) + hlogger.error( + "%s: %s: process early exit %s: %s", + self, + host, + p.pid, + comm_error(p), + ) + else: + p.terminate() + p.wait() + logger.debug( + "%s: %s: terminated process %s: %s", + self, + host, + p.pid, + comm_error(p), + ) + hlogger.debug( + "%s: %s: terminated process %s: %s", + self, + host, + p.pid, + comm_error(p), + ) + + del self.host_procs[host] + + def stop_all_hosts(self): + hosts = set(self.host_procs) + for host in hosts: + self.stop_host(host) + + def cleanup(self): + self.stop_all_hosts() + + def run(self, host, cmd_args, **kwargs): + cmd = list(self.base_cmd) + cmd.extend(cmd_args) + p = self.tgen.gears[host].popen(cmd, **kwargs) + assert p.poll() is None + self._add_host_proc(host, p) + return p + + def check_procs(self): + """Check that all current processes are running, log errors if not. + + Returns: List of stopped processes.""" + procs = [] + + logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys()) + + for host in self.host_procs: + hlogger = self.tgen.net[host].logger + for p, _ in self.host_procs[host]: + logger.debug("%s: checking %s proc %s", self, host, p) + rc = p.poll() + if rc is None: + continue + logger.error( + "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True + ) + hlogger.error( + "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True + ) + procs.append(p) + return procs + + +class IPerfHelper(HostApplicationHelper): + def __str__(self): + return "IPerfHelper()" + + def run_join( + self, + host, + join_addr, + l4Type="UDP", + join_interval=1, + join_intf=None, + join_towards=None, + ): + """ + Use iperf to send IGMP join and listen to traffic + + Parameters: + ----------- + * `host`: iperf host from where IGMP join would be sent + * `l4Type`: string, one of [ TCP, UDP ] + * `join_addr`: multicast address (or addresses) to join to + * `join_interval`: seconds between periodic bandwidth reports + * `join_intf`: the interface to bind the join to + * `join_towards`: router whos interface to bind the join to + + returns: Success (bool) + """ + + iperf_path = self.tgen.net.get_exec_path("iperf") + + assert join_addr + if not isinstance(join_addr, list) and not isinstance(join_addr, tuple): + join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))] + + for bindTo in join_addr: + iperf_args = [iperf_path, "-s"] + + if l4Type == "UDP": + iperf_args.append("-u") + + iperf_args.append("-B") + if join_towards: + to_intf = frr_unicode( + self.tgen.json_topo["routers"][host]["links"][join_towards][ + "interface" + ] + ) + iperf_args.append("{}%{}".format(str(bindTo), to_intf)) + elif join_intf: + iperf_args.append("{}%{}".format(str(bindTo), join_intf)) + else: + iperf_args.append(str(bindTo)) + + if join_interval: + iperf_args.append("-i") + iperf_args.append(str(join_interval)) + + p = self.run(host, iperf_args) + if p.poll() is not None: + logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p)) + return False + return True + + def run_traffic( + self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None + ): + """ + Run iperf to send IGMP join and traffic + + Parameters: + ----------- + * `host`: iperf host to send traffic from + * `l4Type`: string, one of [ TCP, UDP ] + * `sentToAddress`: multicast address to send traffic to + * `ttl`: time to live + * `time`: time in seconds to transmit for + * `bind_towards`: Router who's interface the source ip address is got from + + returns: Success (bool) + """ + + iperf_path = self.tgen.net.get_exec_path("iperf") + + if sentToAddress and not isinstance(sentToAddress, list): + sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))] + + for sendTo in sentToAddress: + iperf_args = [iperf_path, "-c", sendTo] + + # Bind to Interface IP + if bind_towards: + ifaddr = frr_unicode( + self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"] + ) + ipaddr = ipaddress.IPv4Interface(ifaddr).ip + iperf_args.append("-B") + iperf_args.append(str(ipaddr)) + + # UDP/TCP + if l4Type == "UDP": + iperf_args.append("-u") + iperf_args.append("-b") + iperf_args.append("0.012m") + + # TTL + if ttl: + iperf_args.append("-T") + iperf_args.append(str(ttl)) + + # Time + if time: + iperf_args.append("-t") + iperf_args.append(str(time)) + + p = self.run(host, iperf_args) + if p.poll() is not None: + logger.error( + "mcast traffic send failed for %s: %s", sendTo, comm_error(p) + ) + return False + + return True + + +def verify_ip_nht(tgen, input_dict): + """ + Running "show ip nht" command and verifying given nexthop resolution + Parameters + ---------- + * `tgen` : topogen object + * `input_dict`: data to verify nexthop + Usage + ----- + input_dict_4 = { + "r1": { + nh: { + "Address": nh, + "resolvedVia": "connected", + "nexthops": { + "nexthop1": { + "Interface": intf + } + } + } + } + } + result = verify_ip_nht(tgen, input_dict_4) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_ip_nht()") + + router_list = tgen.routers() + for router in input_dict.keys(): + if router not in router_list: + continue + + rnode = router_list[router] + nh_list = input_dict[router] + + if validate_ip_address(next(iter(nh_list))) == "ipv6": + show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht") + else: + show_ip_nht = run_frr_cmd(rnode, "show ip nht") + + for nh in nh_list: + if nh in show_ip_nht: + nht = run_frr_cmd(rnode, "show ip nht {}".format(nh)) + if "unresolved" in nht: + errormsg = "Nexthop {} became unresolved on {}".format(nh, router) + return errormsg + else: + logger.info("Nexthop %s is resolved on %s", nh, router) + return True + else: + errormsg = "Nexthop {} is resolved on {}".format(nh, router) + return errormsg + + logger.debug("Exiting lib API: verify_ip_nht()") + return False + + +def scapy_send_raw_packet(tgen, topo, senderRouter, intf, packet=None): + """ + Using scapy Raw() method to send BSR raw packet from one FRR + to other + + Parameters: + ----------- + * `tgen` : Topogen object + * `topo` : json file data + * `senderRouter` : Sender router + * `packet` : packet in raw format + + returns: + -------- + errormsg or True + """ + + global CD + result = "" + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + sender_interface = intf + rnode = tgen.routers()[senderRouter] + + for destLink, data in topo["routers"][senderRouter]["links"].items(): + if "type" in data and data["type"] == "loopback": + continue + + if not packet: + packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][ + "data" + ] + + python3_path = tgen.net.get_exec_path(["python3", "python"]) + script_path = os.path.join(CD, "send_bsr_packet.py") + cmd = "{} {} '{}' '{}' --interval=1 --count=1".format( + python3_path, script_path, packet, sender_interface + ) + + logger.info("Scapy cmd: \n %s", cmd) + result = rnode.run(cmd) + + if result == "": + return result + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True |