summaryrefslogtreecommitdiffstats
path: root/tests/topotests/lib/bgprib.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgprib.py')
-rw-r--r--tests/topotests/lib/bgprib.py165
1 files changed, 165 insertions, 0 deletions
diff --git a/tests/topotests/lib/bgprib.py b/tests/topotests/lib/bgprib.py
new file mode 100644
index 0000000..699c7a4
--- /dev/null
+++ b/tests/topotests/lib/bgprib.py
@@ -0,0 +1,165 @@
+#!/usr/bin/env python
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+# Copyright 2018, LabN Consulting, L.L.C.
+
+#
+# want_rd_routes = [
+# {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1', 'bp': True},
+# {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1', 'bp': False},
+#
+# {'rd':'10:3', 'p':'5.1.0.0/24', 'n':'3.3.3.3'},
+# ]
+#
+# ribRequireVpnRoutes('r2','Customer routes',want_rd_routes)
+#
+# want_unicast_routes = [
+# {'p':'5.1.0.0/24', 'n':'1.1.1.1'},
+# ]
+#
+# ribRequireUnicastRoutes('r1','ipv4','r1-cust1','Customer routes in vrf',want_unicast_routes)
+# ribRequireUnicastRoutes('r1','ipv4','','Customer routes in default',want_unicast_routes)
+#
+
+from lib.lutil import luCommand, luResult, LUtil
+import json
+import re
+
+
+# gpz: get rib in json form and compare against desired routes
+class BgpRib:
+ def log(self, str):
+ LUtil.log("BgpRib: " + str)
+
+ def routes_include_wanted(self, pfxtbl, want, debug):
+ # helper function to RequireVpnRoutes
+ for pfx in pfxtbl.keys():
+ if debug:
+ self.log("trying pfx %s" % pfx)
+ if pfx != want["p"]:
+ if debug:
+ self.log("want pfx=" + want["p"] + ", not " + pfx)
+ continue
+ if debug:
+ self.log("have pfx=%s" % pfx)
+ for r in pfxtbl[pfx]:
+ bp = r.get("bestpath", False)
+ if debug:
+ self.log("trying route %s bp=%s" % (r, bp))
+ nexthops = r["nexthops"]
+ for nh in nexthops:
+ if debug:
+ self.log("trying nh %s" % nh["ip"])
+ if nh["ip"] == want["n"]:
+ if debug:
+ self.log("found %s" % want["n"])
+ if bp == want.get("bp", bp):
+ return 1
+ elif debug:
+ self.log("bestpath mismatch %s != %s" % (bp, want["bp"]))
+ else:
+ if debug:
+ self.log("want nh=" + want["n"] + ", not " + nh["ip"])
+ if debug:
+ self.log("missing route: pfx=" + want["p"] + ", nh=" + want["n"])
+ return 0
+
+ def RequireVpnRoutes(self, target, title, wantroutes, debug=0):
+ import json
+
+ logstr = "RequireVpnRoutes " + str(wantroutes)
+ # non json form for humans
+ luCommand(
+ target,
+ 'vtysh -c "show bgp ipv4 vpn"',
+ ".",
+ "None",
+ "Get VPN RIB (non-json)",
+ )
+ ret = luCommand(
+ target,
+ 'vtysh -c "show bgp ipv4 vpn json"',
+ ".*",
+ "None",
+ "Get VPN RIB (json)",
+ )
+ if re.search(r"^\s*$", ret):
+ # degenerate case: empty json means no routes
+ if len(wantroutes) > 0:
+ luResult(target, False, title, logstr)
+ return
+ luResult(target, True, title, logstr)
+ rib = json.loads(ret)
+ rds = rib["routes"]["routeDistinguishers"]
+ for want in wantroutes:
+ found = 0
+ if debug:
+ self.log("want rd %s" % want["rd"])
+ for rd in rds.keys():
+ if rd != want["rd"]:
+ continue
+ if debug:
+ self.log("found rd %s" % rd)
+ table = rds[rd]
+ if self.routes_include_wanted(table, want, debug):
+ found = 1
+ break
+ if not found:
+ luResult(target, False, title, logstr)
+ return
+ luResult(target, True, title, logstr)
+
+ def RequireUnicastRoutes(self, target, afi, vrf, title, wantroutes, debug=0):
+ logstr = "RequireUnicastRoutes %s" % str(wantroutes)
+ vrfstr = ""
+ if vrf != "":
+ vrfstr = "vrf %s" % (vrf)
+
+ if (afi != "ipv4") and (afi != "ipv6"):
+ self.log("ERROR invalid afi")
+
+ cmdstr = "show bgp %s %s unicast" % (vrfstr, afi)
+ # non json form for humans
+ cmd = 'vtysh -c "%s"' % cmdstr
+ luCommand(target, cmd, ".", "None", "Get %s %s RIB (non-json)" % (vrfstr, afi))
+ cmd = 'vtysh -c "%s json"' % cmdstr
+ ret = luCommand(
+ target, cmd, ".*", "None", "Get %s %s RIB (json)" % (vrfstr, afi)
+ )
+ if re.search(r"^\s*$", ret):
+ # degenerate case: empty json means no routes
+ if len(wantroutes) > 0:
+ luResult(target, False, title, logstr)
+ return
+ luResult(target, True, title, logstr)
+ rib = json.loads(ret)
+ try:
+ table = rib["routes"]
+ # KeyError: 'routes' probably means missing/bad VRF
+ except KeyError as err:
+ if vrf != "":
+ errstr = "-script ERROR: check if wrong vrf (%s)" % (vrf)
+ else:
+ errstr = "-script ERROR: check if vrf missing"
+ luResult(target, False, title + errstr, logstr)
+ return
+ # if debug:
+ # self.log("table=%s" % table)
+ for want in wantroutes:
+ if debug:
+ self.log("want=%s" % want)
+ if not self.routes_include_wanted(table, want, debug):
+ luResult(target, False, title, logstr)
+ return
+ luResult(target, True, title, logstr)
+
+
+BgpRib = BgpRib()
+
+
+def bgpribRequireVpnRoutes(target, title, wantroutes, debug=0):
+ BgpRib.RequireVpnRoutes(target, title, wantroutes, debug)
+
+
+def bgpribRequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug=0):
+ BgpRib.RequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug)