diff options
Diffstat (limited to '')
-rw-r--r-- | tests/topotests/lib/bgprib.py | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/tests/topotests/lib/bgprib.py b/tests/topotests/lib/bgprib.py new file mode 100644 index 0000000..699c7a4 --- /dev/null +++ b/tests/topotests/lib/bgprib.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +# SPDX-License-Identifier: GPL-2.0-or-later + +# Copyright 2018, LabN Consulting, L.L.C. + +# +# want_rd_routes = [ +# {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1', 'bp': True}, +# {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1', 'bp': False}, +# +# {'rd':'10:3', 'p':'5.1.0.0/24', 'n':'3.3.3.3'}, +# ] +# +# ribRequireVpnRoutes('r2','Customer routes',want_rd_routes) +# +# want_unicast_routes = [ +# {'p':'5.1.0.0/24', 'n':'1.1.1.1'}, +# ] +# +# ribRequireUnicastRoutes('r1','ipv4','r1-cust1','Customer routes in vrf',want_unicast_routes) +# ribRequireUnicastRoutes('r1','ipv4','','Customer routes in default',want_unicast_routes) +# + +from lib.lutil import luCommand, luResult, LUtil +import json +import re + + +# gpz: get rib in json form and compare against desired routes +class BgpRib: + def log(self, str): + LUtil.log("BgpRib: " + str) + + def routes_include_wanted(self, pfxtbl, want, debug): + # helper function to RequireVpnRoutes + for pfx in pfxtbl.keys(): + if debug: + self.log("trying pfx %s" % pfx) + if pfx != want["p"]: + if debug: + self.log("want pfx=" + want["p"] + ", not " + pfx) + continue + if debug: + self.log("have pfx=%s" % pfx) + for r in pfxtbl[pfx]: + bp = r.get("bestpath", False) + if debug: + self.log("trying route %s bp=%s" % (r, bp)) + nexthops = r["nexthops"] + for nh in nexthops: + if debug: + self.log("trying nh %s" % nh["ip"]) + if nh["ip"] == want["n"]: + if debug: + self.log("found %s" % want["n"]) + if bp == want.get("bp", bp): + return 1 + elif debug: + self.log("bestpath mismatch %s != %s" % (bp, want["bp"])) + else: + if debug: + self.log("want nh=" + want["n"] + ", not " + nh["ip"]) + if debug: + self.log("missing route: pfx=" + want["p"] + ", nh=" + want["n"]) + return 0 + + def RequireVpnRoutes(self, target, title, wantroutes, debug=0): + import json + + logstr = "RequireVpnRoutes " + str(wantroutes) + # non json form for humans + luCommand( + target, + 'vtysh -c "show bgp ipv4 vpn"', + ".", + "None", + "Get VPN RIB (non-json)", + ) + ret = luCommand( + target, + 'vtysh -c "show bgp ipv4 vpn json"', + ".*", + "None", + "Get VPN RIB (json)", + ) + if re.search(r"^\s*$", ret): + # degenerate case: empty json means no routes + if len(wantroutes) > 0: + luResult(target, False, title, logstr) + return + luResult(target, True, title, logstr) + rib = json.loads(ret) + rds = rib["routes"]["routeDistinguishers"] + for want in wantroutes: + found = 0 + if debug: + self.log("want rd %s" % want["rd"]) + for rd in rds.keys(): + if rd != want["rd"]: + continue + if debug: + self.log("found rd %s" % rd) + table = rds[rd] + if self.routes_include_wanted(table, want, debug): + found = 1 + break + if not found: + luResult(target, False, title, logstr) + return + luResult(target, True, title, logstr) + + def RequireUnicastRoutes(self, target, afi, vrf, title, wantroutes, debug=0): + logstr = "RequireUnicastRoutes %s" % str(wantroutes) + vrfstr = "" + if vrf != "": + vrfstr = "vrf %s" % (vrf) + + if (afi != "ipv4") and (afi != "ipv6"): + self.log("ERROR invalid afi") + + cmdstr = "show bgp %s %s unicast" % (vrfstr, afi) + # non json form for humans + cmd = 'vtysh -c "%s"' % cmdstr + luCommand(target, cmd, ".", "None", "Get %s %s RIB (non-json)" % (vrfstr, afi)) + cmd = 'vtysh -c "%s json"' % cmdstr + ret = luCommand( + target, cmd, ".*", "None", "Get %s %s RIB (json)" % (vrfstr, afi) + ) + if re.search(r"^\s*$", ret): + # degenerate case: empty json means no routes + if len(wantroutes) > 0: + luResult(target, False, title, logstr) + return + luResult(target, True, title, logstr) + rib = json.loads(ret) + try: + table = rib["routes"] + # KeyError: 'routes' probably means missing/bad VRF + except KeyError as err: + if vrf != "": + errstr = "-script ERROR: check if wrong vrf (%s)" % (vrf) + else: + errstr = "-script ERROR: check if vrf missing" + luResult(target, False, title + errstr, logstr) + return + # if debug: + # self.log("table=%s" % table) + for want in wantroutes: + if debug: + self.log("want=%s" % want) + if not self.routes_include_wanted(table, want, debug): + luResult(target, False, title, logstr) + return + luResult(target, True, title, logstr) + + +BgpRib = BgpRib() + + +def bgpribRequireVpnRoutes(target, title, wantroutes, debug=0): + BgpRib.RequireVpnRoutes(target, title, wantroutes, debug) + + +def bgpribRequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug=0): + BgpRib.RequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug) |