diff options
Diffstat (limited to 'tests/topotests/analyze.py')
-rwxr-xr-x | tests/topotests/analyze.py | 433 |
1 files changed, 433 insertions, 0 deletions
diff --git a/tests/topotests/analyze.py b/tests/topotests/analyze.py new file mode 100755 index 0000000..690786a --- /dev/null +++ b/tests/topotests/analyze.py @@ -0,0 +1,433 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# SPDX-License-Identifier: GPL-2.0-or-later +# +# July 9 2021, Christian Hopps <chopps@labn.net> +# +# Copyright (c) 2021, LabN Consulting, L.L.C. +# +import argparse +import atexit +import logging +import os +import re +import subprocess +import sys +import tempfile +from collections import OrderedDict + +import xmltodict + + +def get_range_list(rangestr): + result = [] + for e in rangestr.split(","): + e = e.strip() + if not e: + continue + if e.find("-") == -1: + result.append(int(e)) + else: + start, end = e.split("-") + result.extend(list(range(int(start), int(end) + 1))) + return result + + +def dict_range_(dct, rangestr, dokeys): + keys = list(dct.keys()) + if not rangestr or rangestr == "all": + for key in keys: + if dokeys: + yield key + else: + yield dct[key] + return + + dlen = len(keys) + for index in get_range_list(rangestr): + if index >= dlen: + break + key = keys[index] + if dokeys: + yield key + else: + yield dct[key] + + +def dict_range_keys(dct, rangestr): + return dict_range_(dct, rangestr, True) + + +def dict_range_values(dct, rangestr): + return dict_range_(dct, rangestr, False) + + +def get_summary(results): + ntest = int(results["@tests"]) + nfail = int(results["@failures"]) + nerror = int(results["@errors"]) + nskip = int(results["@skipped"]) + npass = ntest - nfail - nskip - nerror + return ntest, npass, nfail, nerror, nskip + + +def print_summary(results, args): + ntest, npass, nfail, nerror, nskip = (0, 0, 0, 0, 0) + for group in results: + _ntest, _npass, _nfail, _nerror, _nskip = get_summary(results[group]) + if args.verbose: + print( + f"Group: {group} Total: {_ntest} PASSED: {_npass}" + " FAIL: {_nfail} ERROR: {_nerror} SKIP: {_nskip}" + ) + ntest += _ntest + npass += _npass + nfail += _nfail + nerror += _nerror + nskip += _nskip + print(f"Total: {ntest} PASSED: {npass} FAIL: {nfail} ERROR: {nerror} SKIP: {nskip}") + + +def get_global_testcase(results): + for group in results: + for testcase in results[group]["testcase"]: + if "@file" not in testcase: + return testcase + return None + + +def get_filtered(tfilters, results, args): + if isinstance(tfilters, str) or tfilters is None: + tfilters = [tfilters] + found_files = OrderedDict() + for group in results: + if isinstance(results[group]["testcase"], list): + tlist = results[group]["testcase"] + else: + tlist = [results[group]["testcase"]] + for testcase in tlist: + for tfilter in tfilters: + if tfilter is None: + if ( + "failure" not in testcase + and "error" not in testcase + and "skipped" not in testcase + ): + break + elif tfilter in testcase: + break + else: + continue + # cname = testcase["@classname"] + fname = testcase.get("@file", "") + cname = testcase.get("@classname", "") + if not fname and not cname: + name = testcase.get("@name", "") + if not name: + continue + # If we had a failure at the module level we could be here. + fname = name.replace(".", "/") + ".py" + tcname = fname + else: + if not fname: + fname = cname.replace(".", "/") + ".py" + if "@name" not in testcase: + tcname = fname + else: + tcname = fname + "::" + testcase["@name"] + found_files[tcname] = testcase + return found_files + + +def search_testcase(testcase, regexp): + for key, val in testcase.items(): + if regexp.search(str(val)): + return True + return False + + +def dump_testcase(testcase): + s = "" + for key, val in testcase.items(): + if isinstance(val, str) or isinstance(val, float) or isinstance(val, int): + s += "{}: {}\n".format(key, val) + elif isinstance(val, list): + for k2, v2 in enumerate(val): + s += "{}: {}\n".format(k2, v2) + else: + for k2, v2 in val.items(): + s += "{}: {}\n".format(k2, v2) + return s + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-a", + "--save-xml", + action="store_true", + help=( + "Move [container:]/tmp/topotests/topotests.xml " + "to --results value if --results does not exist yet" + ), + ) + parser.add_argument( + "-A", + "--save", + action="store_true", + help=( + "Move [container:]/tmp/topotests{,.xml} " + "to --results value if --results does not exist yet" + ), + ) + parser.add_argument( + "-C", + "--container", + help="specify docker/podman container of the run", + ) + parser.add_argument( + "--use-podman", + action="store_true", + help="Use `podman` instead of `docker` for saving container data", + ) + parser.add_argument( + "-S", + "--select", + help=( + "select results combination of letters: " + "'e'rrored 'f'ailed 'p'assed 's'kipped. " + "Default is 'fe', unless --search or --time which default to 'efps'" + ), + ) + parser.add_argument( + "-R", + "--search", + help=( + "filter results to those which match a regex. " + "All test text is search unless restricted by --errmsg or --errtext" + ), + ) + parser.add_argument( + "-r", + "--results", + help="xml results file or directory containing xml results file", + ) + parser.add_argument("--rundir", help=argparse.SUPPRESS) + parser.add_argument( + "-E", + "--enumerate", + action="store_true", + help="enumerate each item (results scoped)", + ) + parser.add_argument( + "-T", "--test", help="select testcase at given ordinal from the enumerated list" + ) + parser.add_argument( + "--errmsg", action="store_true", help="print testcase error message" + ) + parser.add_argument( + "--errtext", action="store_true", help="print testcase error text" + ) + parser.add_argument( + "--full", action="store_true", help="print all logging for selected testcases" + ) + parser.add_argument("--time", action="store_true", help="print testcase run times") + + parser.add_argument("-s", "--summary", action="store_true", help="print summary") + parser.add_argument("-v", "--verbose", action="store_true", help="be verbose") + args = parser.parse_args() + + if args.save and args.save_xml: + logging.critical("Only one of --save or --save-xml allowed") + sys.exit(1) + + scount = bool(args.save) + bool(args.save_xml) + + # + # Saving/Archiving results + # + + docker_bin = "podman" if args.use_podman else "docker" + contid = "" + if args.container: + # check for container existence + contid = args.container + try: + # p = + subprocess.run( + f"{docker_bin} inspect {contid}", + check=True, + shell=True, + errors="ignore", + capture_output=True, + ) + except subprocess.CalledProcessError: + print(f"{docker_bin} container '{contid}' does not exist") + sys.exit(1) + # If you need container info someday... + # cont_info = json.loads(p.stdout) + + cppath = "/tmp/topotests" + if args.save_xml or scount == 0: + cppath += "/topotests.xml" + if contid: + cppath = contid + ":" + cppath + + tresfile = None + + if scount and args.results and not os.path.exists(args.results): + if not contid: + if not os.path.exists(cppath): + print(f"'{cppath}' doesn't exist to save") + sys.exit(1) + if args.save_xml: + subprocess.run(["cp", cppath, args.results]) + else: + subprocess.run(["mv", cppath, args.results]) + else: + try: + subprocess.run( + f"{docker_bin} cp {cppath} {args.results}", + check=True, + shell=True, + errors="ignore", + capture_output=True, + ) + except subprocess.CalledProcessError as error: + print(f"Can't {docker_bin} cp '{cppath}': %s", str(error)) + sys.exit(1) + + if "SUDO_USER" in os.environ: + subprocess.run(["chown", "-R", os.environ["SUDO_USER"], args.results]) + elif not args.results: + # User doesn't want to save results just use them inplace + if not contid: + if not os.path.exists(cppath): + print(f"'{cppath}' doesn't exist") + sys.exit(1) + args.results = cppath + else: + tresfile, tresname = tempfile.mkstemp( + suffix=".xml", prefix="topotests-", text=True + ) + atexit.register(lambda: os.unlink(tresname)) + os.close(tresfile) + try: + subprocess.run( + f"{docker_bin} cp {cppath} {tresname}", + check=True, + shell=True, + errors="ignore", + capture_output=True, + ) + except subprocess.CalledProcessError as error: + print(f"Can't {docker_bin} cp '{cppath}': %s", str(error)) + sys.exit(1) + args.results = tresname + + # + # Result option validation + # + + count = 0 + if args.errmsg: + count += 1 + if args.errtext: + count += 1 + if args.full: + count += 1 + if count > 1: + logging.critical("Only one of --full, --errmsg or --errtext allowed") + sys.exit(1) + + if args.time and count: + logging.critical("Can't use --full, --errmsg or --errtext with --time") + sys.exit(1) + + if args.enumerate and (count or args.time or args.test): + logging.critical( + "Can't use --enumerate with --errmsg, --errtext, --full, --test or --time" + ) + sys.exit(1) + + results = {} + ttfiles = [] + + if os.path.exists(os.path.join(args.results, "topotests.xml")): + args.results = os.path.join(args.results, "topotests.xml") + if not os.path.exists(args.results): + logging.critical("%s doesn't exist", args.results) + sys.exit(1) + + ttfiles = [args.results] + + for f in ttfiles: + m = re.match(r"tt-group-(\d+)/topotests.xml", f) + group = int(m.group(1)) if m else 0 + with open(f) as xml_file: + results[group] = xmltodict.parse(xml_file.read())["testsuites"]["testsuite"] + + search_re = re.compile(args.search) if args.search else None + + if args.select is None: + if search_re or args.time: + args.select = "efsp" + else: + args.select = "fe" + + filters = [] + if "e" in args.select: + filters.append("error") + if "f" in args.select: + filters.append("failure") + if "s" in args.select: + filters.append("skipped") + if "p" in args.select: + filters.append(None) + + found_files = get_filtered(filters, results, args) + + if search_re: + found_files = { + k: v for k, v in found_files.items() if search_testcase(v, search_re) + } + + if args.enumerate: + # print the selected test names with ordinal + print("\n".join(["{} {}".format(i, x) for i, x in enumerate(found_files)])) + elif args.test is None and count == 0 and not args.time: + # print the selected test names + print("\n".join([str(x) for x in found_files])) + else: + rangestr = args.test if args.test else "all" + for key in dict_range_keys(found_files, rangestr): + testcase = found_files[key] + if args.time: + text = testcase["@time"] + s = "{}: {}".format(text, key) + elif args.errtext: + if "error" in testcase: + errmsg = testcase["error"]["#text"] + elif "failure" in testcase: + errmsg = testcase["failure"]["#text"] + else: + errmsg = "none found" + s = "{}: {}".format(key, errmsg) + elif args.errmsg: + if "error" in testcase: + errmsg = testcase["error"]["@message"] + elif "failure" in testcase: + errmsg = testcase["failure"]["@message"] + else: + errmsg = "none found" + s = "{}: {}".format(key, errmsg) + else: + s = dump_testcase(testcase) + print(s) + + if args.summary: + print_summary(results, args) + + +if __name__ == "__main__": + main() |