diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 09:53:30 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 09:53:30 +0000 |
commit | 2c7cac91ed6e7db0f6937923d2b57f97dbdbc337 (patch) | |
tree | c05dc0f8e6aa3accc84e3e5cffc933ed94941383 /tests/topotests/lib/snmptest.py | |
parent | Initial commit. (diff) | |
download | frr-upstream.tar.xz frr-upstream.zip |
Adding upstream version 8.4.4.upstream/8.4.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | tests/topotests/lib/snmptest.py | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/tests/topotests/lib/snmptest.py b/tests/topotests/lib/snmptest.py new file mode 100644 index 0000000..fe5ff28 --- /dev/null +++ b/tests/topotests/lib/snmptest.py @@ -0,0 +1,155 @@ +# +# topogen.py +# Library of helper functions for NetDEF Topology Tests +# +# Copyright (c) 2020 by Volta Networks +# +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +SNMP library to test snmp walks and gets + +Basic usage instructions: + +* define an SnmpTester class giving a router, address, community and version +* use test_oid or test_walk to check values in MIBS +* see tests/topotest/simple-snmp-test/test_simple_snmp.py for example +""" + +from lib.topolog import logger + + +class SnmpTester(object): + "A helper class for testing SNMP" + + def __init__(self, router, iface, community, version): + self.community = community + self.version = version + self.router = router + self.iface = iface + logger.info( + "created SNMP tester: SNMPv{0} community:{1}".format( + self.version, self.community + ) + ) + + def _snmp_config(self): + """ + Helper function to build a string with SNMP + configuration for commands. + """ + return "-v {0} -c {1} {2}".format(self.version, self.community, self.iface) + + @staticmethod + def _get_snmp_value(snmp_output): + tokens = snmp_output.strip().split() + + num_value_tokens = len(tokens) - 3 + + # this copes with the emptys string return + if num_value_tokens == 0: + return tokens[2] + + if num_value_tokens > 1: + output = "" + index = 3 + while index < len(tokens) - 1: + output += "{} ".format(tokens[index]) + index += 1 + output += "{}".format(tokens[index]) + return output + # third token is the value of the object + return tokens[3] + + @staticmethod + def _get_snmp_oid(snmp_output): + tokens = snmp_output.strip().split() + + # third token onwards is the value of the object + return tokens[0].split(".", 1)[1] + + @staticmethod + def _get_snmp_oid(snmp_output): + tokens = snmp_output.strip().split() + + # if len(tokens) > 5: + # return None + + # third token is the value of the object + return tokens[0].split(".", 1)[1] + + def _parse_multiline(self, snmp_output): + results = snmp_output.strip().split("\n") + + out_dict = {} + out_list = [] + for response in results: + out_dict[self._get_snmp_oid(response)] = self._get_snmp_value(response) + out_list.append(self._get_snmp_value(response)) + + return out_dict, out_list + + def get(self, oid): + cmd = "snmpget {0} {1}".format(self._snmp_config(), oid) + + result = self.router.cmd(cmd) + if "not found" in result: + return None + return self._get_snmp_value(result) + + def get_next(self, oid): + cmd = "snmpgetnext {0} {1}".format(self._snmp_config(), oid) + + result = self.router.cmd(cmd) + print("get_next: {}".format(result)) + if "not found" in result: + return None + return self._get_snmp_value(result) + + def walk(self, oid): + cmd = "snmpwalk {0} {1}".format(self._snmp_config(), oid) + + result = self.router.cmd(cmd) + return self._parse_multiline(result) + + def test_oid(self, oid, value): + print("oid: {}".format(self.get_next(oid))) + return self.get_next(oid) == value + + def test_oid_walk(self, oid, values, oids=None): + results_dict, results_list = self.walk(oid) + print("test_oid_walk: {} {}".format(oid, results_dict)) + if oids is not None: + index = 0 + for oid in oids: + # avoid key error for missing keys + if not oid in results_dict.keys(): + print("FAIL: missing oid key {}".format(oid)) + return False + if results_dict[oid] != values[index]: + print( + "FAIL{} {} |{}| == |{}|".format( + oid, index, results_dict[oid], values[index] + ) + ) + return False + index += 1 + return True + + # Return true if 'values' is a subset of 'results_list' + print("test {} == {}".format(results_list[: len(values)], values)) + return results_list[: len(values)] == values |