summaryrefslogtreecommitdiffstats
path: root/tests/topotests/mgmt_oper/oper.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/mgmt_oper/oper.py')
-rw-r--r--tests/topotests/mgmt_oper/oper.py258
1 files changed, 258 insertions, 0 deletions
diff --git a/tests/topotests/mgmt_oper/oper.py b/tests/topotests/mgmt_oper/oper.py
new file mode 100644
index 0000000..0f6c3cd
--- /dev/null
+++ b/tests/topotests/mgmt_oper/oper.py
@@ -0,0 +1,258 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: ISC
+#
+# October 29 2023, Christian Hopps <chopps@labn.net>
+#
+# Copyright (c) 2023, LabN Consulting, L.L.C.
+#
+
+import datetime
+import ipaddress
+import json
+import logging
+import math
+import os
+import pprint
+import re
+
+from lib.common_config import retry, step
+from lib.topolog import logger
+from lib.topotest import json_cmp as tt_json_cmp
+
+try:
+ from deepdiff import DeepDiff as dd_json_cmp
+except ImportError:
+ dd_json_cmp = None
+
+
+def json_cmp(got, expect, exact_match):
+ if dd_json_cmp:
+ if exact_match:
+ deep_diff = dd_json_cmp(expect, got)
+ # Convert DeepDiff completely into dicts or lists at all levels
+ json_diff = json.loads(deep_diff.to_json())
+ else:
+ json_diff = dd_json_cmp(expect, got, ignore_order=True)
+ # Convert DeepDiff completely into dicts or lists at all levels
+ # json_diff = json.loads(deep_diff.to_json())
+ # Remove new fields in json object from diff
+ if json_diff.get("dictionary_item_added") is not None:
+ del json_diff["dictionary_item_added"]
+ # Remove new json objects in json array from diff
+ if (new_items := json_diff.get("iterable_item_added")) is not None:
+ new_item_paths = list(new_items.keys())
+ for path in new_item_paths:
+ if type(new_items[path]) is dict:
+ del new_items[path]
+ if len(new_items) == 0:
+ del json_diff["iterable_item_added"]
+ if not json_diff:
+ json_diff = None
+ else:
+ json_diff = tt_json_cmp(got, expect, exact_match)
+ json_diff = str(json_diff)
+ return json_diff
+
+
+def enable_debug(router):
+ router.vtysh_cmd("debug northbound callbacks configuration")
+
+
+def disable_debug(router):
+ router.vtysh_cmd("no debug northbound callbacks configuration")
+
+
+def do_oper_test(tgen, query_results):
+ r1 = tgen.gears["r1"].net
+
+ qcmd = (
+ r"vtysh -c 'show mgmt get-data {} {}' "
+ r"""| sed -e 's/"phy-address": ".*"/"phy-address": "rubout"/'"""
+ r"""| sed -e 's/"uptime": ".*"/"uptime": "rubout"/'"""
+ r"""| sed -e 's/"vrf": "[0-9]*"/"vrf": "rubout"/'"""
+ r"""| sed -e 's/"if-index": [0-9][0-9]*/"if-index": "rubout"/'"""
+ r"""| sed -e 's/"id": [0-9][0-9]*/"id": "rubout"/'"""
+ )
+
+ doreset = True
+ dd_json_cmp = None
+ for qr in query_results:
+ step(f"Perform query '{qr[0]}'", reset=doreset)
+ if doreset:
+ doreset = False
+ expected = open(qr[1], encoding="ascii").read()
+ output = r1.cmd_nostatus(qcmd.format(qr[0], qr[2] if len(qr) > 2 else ""))
+
+ try:
+ ojson = json.loads(output)
+ except json.decoder.JSONDecodeError as error:
+ logging.error("Error decoding json: %s\noutput:\n%s", error, output)
+ raise
+
+ try:
+ ejson = json.loads(expected)
+ except json.decoder.JSONDecodeError as error:
+ logging.error(
+ "Error decoding json exp result: %s\noutput:\n%s", error, expected
+ )
+ raise
+
+ if dd_json_cmp:
+ cmpout = json_cmp(ojson, ejson, exact_match=True)
+ if cmpout:
+ logging.warning(
+ "-------DIFF---------\n%s\n---------DIFF----------",
+ pprint.pformat(cmpout),
+ )
+ else:
+ cmpout = tt_json_cmp(ojson, ejson, exact=True)
+ if cmpout:
+ logging.warning(
+ "-------EXPECT--------\n%s\n------END-EXPECT------",
+ json.dumps(ejson, indent=4),
+ )
+ logging.warning(
+ "--------GOT----------\n%s\n-------END-GOT--------",
+ json.dumps(ojson, indent=4),
+ )
+
+ assert cmpout is None
+
+
+def get_ip_networks(super_prefix, count):
+ count_log2 = math.log(count, 2)
+ if count_log2 != int(count_log2):
+ count_log2 = int(count_log2) + 1
+ else:
+ count_log2 = int(count_log2)
+ network = ipaddress.ip_network(super_prefix)
+ return tuple(network.subnets(count_log2))[0:count]
+
+
+@retry(retry_timeout=30, initial_wait=0.1)
+def check_kernel(r1, super_prefix, count, add, is_blackhole, vrf, matchvia):
+ network = ipaddress.ip_network(super_prefix)
+ vrfstr = f" vrf {vrf}" if vrf else ""
+ if network.version == 6:
+ kernel = r1.cmd_raises(f"ip -6 route show{vrfstr}")
+ else:
+ kernel = r1.cmd_raises(f"ip -4 route show{vrfstr}")
+
+ # logger.debug("checking kernel routing table%s:\n%s", vrfstr, kernel)
+
+ for i, net in enumerate(get_ip_networks(super_prefix, count)):
+ if not add:
+ assert str(net) not in kernel
+ continue
+
+ if is_blackhole:
+ route = f"blackhole {str(net)} proto (static|196) metric 20"
+ else:
+ route = (
+ f"{str(net)}(?: nhid [0-9]+)? {matchvia} "
+ "proto (static|196) metric 20"
+ )
+ assert re.search(route, kernel), f"Failed to find \n'{route}'\n in \n'{kernel}'"
+
+
+def addrgen(a, count, step=1):
+ for _ in range(0, count, step):
+ yield a
+ a += step
+
+
+@retry(retry_timeout=30, initial_wait=0.1)
+def check_kernel_32(r1, start_addr, count, vrf, step=1):
+ start = ipaddress.ip_address(start_addr)
+ vrfstr = f" vrf {vrf}" if vrf else ""
+ if start.version == 6:
+ kernel = r1.cmd_raises(f"ip -6 route show{vrfstr}")
+ else:
+ kernel = r1.cmd_raises(f"ip -4 route show{vrfstr}")
+
+ nentries = len(re.findall("\n", kernel))
+ logging.info("checking kernel routing table%s: (%s entries)", vrfstr, nentries)
+
+ for addr in addrgen(start, count, step):
+ assert str(addr) in kernel, f"Failed to find '{addr}' in {nentries} entries"
+
+
+def do_config(
+ r1,
+ count,
+ add=True,
+ do_ipv6=False,
+ via=None,
+ vrf=None,
+ use_cli=False,
+):
+ optype = "adding" if add else "removing"
+ iptype = "IPv6" if do_ipv6 else "IPv4"
+
+ #
+ # Set the route details
+ #
+
+ if vrf:
+ super_prefix = "2111::/48" if do_ipv6 else "111.0.0.0/8"
+ else:
+ super_prefix = "2055::/48" if do_ipv6 else "55.0.0.0/8"
+
+ matchvia = ""
+ if via == "blackhole":
+ pass
+ elif via:
+ matchvia = f"dev {via}"
+ else:
+ if vrf:
+ via = "2102::2" if do_ipv6 else "3.3.3.2"
+ matchvia = f"via {via} dev r1-eth1"
+ else:
+ via = "2101::2" if do_ipv6 else "1.1.1.2"
+ matchvia = f"via {via} dev r1-eth0"
+
+ vrfdbg = " in vrf {}".format(vrf) if vrf else ""
+ logger.debug("{} {} static {} routes{}".format(optype, count, iptype, vrfdbg))
+
+ #
+ # Generate config file in a retrievable place
+ #
+
+ config_file = os.path.join(
+ r1.logdir, r1.name, "{}-routes-{}.conf".format(iptype.lower(), optype)
+ )
+ with open(config_file, "w") as f:
+ if use_cli:
+ f.write("configure terminal\n")
+ if vrf:
+ f.write("vrf {}\n".format(vrf))
+
+ for i, net in enumerate(get_ip_networks(super_prefix, count)):
+ if add:
+ f.write("ip route {} {}\n".format(net, via))
+ else:
+ f.write("no ip route {} {}\n".format(net, via))
+
+ #
+ # Load config file.
+ #
+
+ if use_cli:
+ load_command = 'vtysh < "{}"'.format(config_file)
+ else:
+ load_command = 'vtysh -f "{}"'.format(config_file)
+ tstamp = datetime.datetime.now()
+ output = r1.cmd_raises(load_command)
+ delta = (datetime.datetime.now() - tstamp).total_seconds()
+
+ #
+ # Verify the results are in the kernel
+ #
+ check_kernel(r1, super_prefix, count, add, via == "blackhole", vrf, matchvia)
+
+ optyped = "added" if add else "removed"
+ logger.debug(
+ "{} {} {} static routes under {}{} in {}s".format(
+ optyped, count, iptype.lower(), super_prefix, vrfdbg, delta
+ )
+ )