summaryrefslogtreecommitdiffstats
path: root/tests/topotests/lib/lutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/lutil.py')
-rw-r--r--tests/topotests/lib/lutil.py499
1 files changed, 499 insertions, 0 deletions
diff --git a/tests/topotests/lib/lutil.py b/tests/topotests/lib/lutil.py
new file mode 100644
index 0000000..1eb88f2
--- /dev/null
+++ b/tests/topotests/lib/lutil.py
@@ -0,0 +1,499 @@
+#!/usr/bin/env python
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+# Copyright 2017, LabN Consulting, L.L.C.
+
+import os
+import re
+import sys
+import time
+import json
+import math
+import time
+from lib.topolog import logger
+from lib.topotest import json_cmp
+
+
+# L utility functions
+#
+# These functions are inteneted to provide support for CI testing within MiniNet
+# environments.
+
+
+class lUtil:
+ # to be made configurable in the future
+ base_script_dir = "."
+ base_log_dir = "."
+ fout_name = "output.log"
+ fsum_name = "summary.txt"
+ l_level = 6
+ CallOnFail = False
+
+ l_total = 0
+ l_pass = 0
+ l_fail = 0
+ l_filename = ""
+ l_last = None
+ l_line = 0
+ l_dotall_experiment = False
+ l_last_nl = None
+ l_wait_strict = 1
+
+ fout = ""
+ fsum = ""
+ net = ""
+
+ def log(self, str, level=6):
+ if self.l_level > 0:
+ if self.fout == "":
+ self.fout = open(self.fout_name, "w")
+ self.fout.write(str + "\n")
+ if level <= self.l_level:
+ print(str)
+
+ def summary(self, str):
+ if self.fsum == "":
+ self.fsum = open(self.fsum_name, "w")
+ self.fsum.write(
+ "\
+******************************************************************************\n"
+ )
+ self.fsum.write(
+ "\
+Test Target Summary Pass Fail\n"
+ )
+ self.fsum.write(
+ "\
+******************************************************************************\n"
+ )
+ self.fsum.write(str + "\n")
+
+ def result(self, target, success, str, logstr=None):
+ if success:
+ p = 1
+ f = 0
+ self.l_pass += 1
+ sstr = "PASS"
+ else:
+ f = 1
+ p = 0
+ self.l_fail += 1
+ sstr = "FAIL"
+ self.l_total += 1
+ if logstr != None:
+ self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
+ res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
+ self.log("R:" + res)
+ self.summary(res)
+ if f == 1 and self.CallOnFail != False:
+ self.CallOnFail()
+
+ def closeFiles(self):
+ ret = (
+ "\
+******************************************************************************\n\
+Total %-4d %-4d %d\n\
+******************************************************************************"
+ % (self.l_total, self.l_pass, self.l_fail)
+ )
+ if self.fsum != "":
+ self.fsum.write(ret + "\n")
+ self.fsum.close()
+ self.fsum = ""
+ if self.fout != "":
+ if os.path.isfile(self.fsum_name):
+ r = open(self.fsum_name, "r")
+ self.fout.write(r.read())
+ r.close()
+ self.fout.close()
+ self.fout = ""
+ return ret
+
+ def setFilename(self, name):
+ str = "FILE: " + name
+ self.log(str)
+ self.summary(str)
+ self.l_filename = name
+ self.line = 0
+
+ def getCallOnFail(self):
+ return self.CallOnFail
+
+ def setCallOnFail(self, CallOnFail):
+ self.CallOnFail = CallOnFail
+
+ def strToArray(self, string):
+ a = []
+ c = 0
+ end = ""
+ words = string.split()
+ if len(words) < 1 or words[0].startswith("#"):
+ return a
+ words = string.split()
+ for word in words:
+ if len(end) == 0:
+ a.append(word)
+ else:
+ a[c] += str(" " + word)
+ if end == "\\":
+ end = ""
+ if not word.endswith("\\"):
+ if end != '"':
+ if word.startswith('"'):
+ end = '"'
+ else:
+ c += 1
+ else:
+ if word.endswith('"'):
+ end = ""
+ c += 1
+ else:
+ c += 1
+ else:
+ end = "\\"
+ # if len(end) == 0:
+ # print('%d:%s:' % (c, a[c-1]))
+
+ return a
+
+ def execTestFile(self, tstFile):
+ if os.path.isfile(tstFile):
+ f = open(tstFile)
+ for line in f:
+ if len(line) > 1:
+ a = self.strToArray(line)
+ if len(a) >= 6:
+ luCommand(a[1], a[2], a[3], a[4], a[5])
+ else:
+ self.l_line += 1
+ self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
+ if len(a) >= 2:
+ if a[0] == "sleep":
+ time.sleep(int(a[1]))
+ elif a[0] == "include":
+ self.execTestFile(a[1])
+ f.close()
+ else:
+ self.log("unable to read: " + tstFile)
+ sys.exit(1)
+
+ def command(
+ self,
+ target,
+ command,
+ regexp,
+ op,
+ result,
+ returnJson,
+ startt=None,
+ force_result=False,
+ ):
+ global net
+ if op == "jsoncmp_pass" or op == "jsoncmp_fail":
+ returnJson = True
+
+ self.log(
+ "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
+ % (
+ time.asctime(),
+ self.l_total + 1,
+ self.l_filename,
+ self.l_line,
+ target,
+ command,
+ regexp,
+ op,
+ result,
+ )
+ )
+ if self.net == "":
+ return False
+ # self.log("Running %s %s" % (target, command))
+ js = None
+ out = self.net[target].cmd(command).rstrip()
+ if len(out) == 0:
+ report = "<no output>"
+ else:
+ report = out
+ if returnJson == True:
+ try:
+ js = json.loads(out)
+ except:
+ js = None
+ self.log(
+ "WARNING: JSON load failed -- confirm command output is in JSON format."
+ )
+ self.log("COMMAND OUTPUT:%s:" % report)
+
+ # JSON comparison
+ if op == "jsoncmp_pass" or op == "jsoncmp_fail":
+ try:
+ expect = json.loads(regexp)
+ except:
+ expect = None
+ self.log(
+ "WARNING: JSON load failed -- confirm regex input is in JSON format."
+ )
+ json_diff = json_cmp(js, expect)
+ if json_diff != None:
+ if op == "jsoncmp_fail":
+ success = True
+ else:
+ success = False
+ self.log("JSON DIFF:%s:" % json_diff)
+ ret = success
+ else:
+ if op == "jsoncmp_fail":
+ success = False
+ else:
+ success = True
+ self.result(target, success, result)
+ if js != None:
+ return js
+ return ret
+
+ # Experiment: can we achieve the same match behavior via DOTALL
+ # without converting newlines to spaces?
+ out_nl = out
+ search_nl = re.search(regexp, out_nl, re.DOTALL)
+ self.l_last_nl = search_nl
+ # Set up for comparison
+ if search_nl != None:
+ group_nl = search_nl.group()
+ group_nl_converted = " ".join(group_nl.splitlines())
+ else:
+ group_nl_converted = None
+
+ out = " ".join(out.splitlines())
+ search = re.search(regexp, out)
+ self.l_last = search
+ if search == None:
+ if op == "fail":
+ success = True
+ else:
+ success = False
+ ret = success
+ else:
+ ret = search.group()
+ if op != "fail":
+ success = True
+ level = 7
+ else:
+ success = False
+ level = 5
+ self.log("found:%s:" % ret, level)
+ # Experiment: compare matched strings obtained each way
+ if self.l_dotall_experiment and (group_nl_converted != ret):
+ self.log(
+ "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
+ % (group_nl_converted, ret),
+ 9,
+ )
+ if startt != None:
+ if js != None or ret is not False or force_result is not False:
+ delta = time.time() - startt
+ self.result(target, success, "%s +%4.2f secs" % (result, delta))
+ elif op == "pass" or op == "fail":
+ self.result(target, success, result)
+ if js != None:
+ return js
+ return ret
+
+ def wait(
+ self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
+ ):
+ self.log(
+ "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
+ % (
+ self.l_filename,
+ self.l_line,
+ target,
+ command,
+ regexp,
+ op,
+ result,
+ wait,
+ wait_time,
+ )
+ )
+ found = False
+ n = 0
+ startt = time.time()
+
+ if (op == "wait-strict") or ((op == "wait") and self.l_wait_strict):
+ strict = True
+ else:
+ strict = False
+
+ # Calculate the amount of `sleep`s we are going to peform.
+ wait_count = int(math.ceil(wait / wait_time)) + 1
+
+ force_result = False
+ while wait_count > 0:
+ n += 1
+
+ # log a failure on last iteration if we don't get desired regexp
+ if strict and (wait_count == 1):
+ force_result = True
+
+ found = self.command(
+ target, command, regexp, op, result, returnJson, startt, force_result
+ )
+ if found is not False:
+ break
+
+ wait_count -= 1
+ if wait_count > 0:
+ time.sleep(wait_time)
+
+ delta = time.time() - startt
+ self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
+ return found
+
+
+# initialized by luStart
+LUtil = None
+
+
+# entry calls
+def luStart(
+ baseScriptDir=".",
+ baseLogDir=".",
+ net="",
+ fout="output.log",
+ fsum="summary.txt",
+ level=None,
+):
+ global LUtil
+ # init class
+ LUtil = lUtil()
+ LUtil.base_script_dir = baseScriptDir
+ LUtil.base_log_dir = baseLogDir
+ LUtil.net = net
+ if fout != "":
+ LUtil.fout_name = baseLogDir + "/" + fout
+ if fsum != None:
+ LUtil.fsum_name = baseLogDir + "/" + fsum
+ if level != None:
+ LUtil.l_level = level
+ LUtil.l_dotall_experiment = False
+ LUtil.l_dotall_experiment = True
+
+
+def luCommand(
+ target,
+ command,
+ regexp=".",
+ op="none",
+ result="",
+ time=10,
+ returnJson=False,
+ wait_time=0.5,
+):
+ waitops = ["wait", "wait-strict", "wait-nostrict"]
+
+ if op in waitops:
+ return LUtil.wait(
+ target, command, regexp, op, result, time, returnJson, wait_time
+ )
+ else:
+ return LUtil.command(target, command, regexp, op, result, returnJson)
+
+
+def luLast(usenl=False):
+ if usenl:
+ if LUtil.l_last_nl != None:
+ LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
+ return LUtil.l_last_nl
+ else:
+ if LUtil.l_last != None:
+ LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
+ return LUtil.l_last
+
+
+def luInclude(filename, CallOnFail=None):
+ tstFile = LUtil.base_script_dir + "/" + filename
+ LUtil.setFilename(filename)
+ if CallOnFail != None:
+ oldCallOnFail = LUtil.getCallOnFail()
+ LUtil.setCallOnFail(CallOnFail)
+ if filename.endswith(".py"):
+ LUtil.log("luInclude: execfile " + tstFile)
+ with open(tstFile) as infile:
+ exec(infile.read())
+ else:
+ LUtil.log("luInclude: execTestFile " + tstFile)
+ LUtil.execTestFile(tstFile)
+ if CallOnFail != None:
+ LUtil.setCallOnFail(oldCallOnFail)
+
+
+def luFinish():
+ global LUtil
+ ret = LUtil.closeFiles()
+ # done
+ LUtil = None
+ return ret
+
+
+def luNumFail():
+ return LUtil.l_fail
+
+
+def luNumPass():
+ return LUtil.l_pass
+
+
+def luResult(target, success, str, logstr=None):
+ return LUtil.result(target, success, str, logstr)
+
+
+def luShowResults(prFunction):
+ printed = 0
+ sf = open(LUtil.fsum_name, "r")
+ for line in sf:
+ printed += 1
+ prFunction(line.rstrip())
+ sf.close()
+
+
+def luShowFail():
+ printed = 0
+ sf = open(LUtil.fsum_name, "r")
+ for line in sf:
+ if line[-2] != "0":
+ printed += 1
+ logger.error(line.rstrip())
+ sf.close()
+ if printed > 0:
+ logger.error("See %s for details of errors" % LUtil.fout_name)
+
+
+#
+# Sets default wait type for luCommand(op="wait) (may be overridden by
+# specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
+#
+# "nostrict" is the historical default behavior, which is to ignore
+# failures to match the specified regexp in the specified time.
+#
+# "strict" means that failure to match the specified regexp in the
+# specified time yields an explicit, logged failure result
+#
+def luSetWaitType(waittype):
+ if waittype == "strict":
+ LUtil.l_wait_strict = 1
+ else:
+ if waittype == "nostrict":
+ LUtil.l_wait_strict = 0
+ else:
+ raise ValueError('waittype must be one of "strict" or "nostrict"')
+
+
+# for testing
+if __name__ == "__main__":
+ print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
+ luStart()
+ for arg in sys.argv[1:]:
+ luInclude(arg)
+ luFinish()
+ sys.exit(0)