#!/usr/bin/env python3 # -*- coding: utf-8 eval: (blacken-mode 1) -*- # SPDX-License-Identifier: GPL-2.0-or-later # # July 9 2021, Christian Hopps # # Copyright (c) 2021, LabN Consulting, L.L.C. # import argparse import atexit import logging import os import re import subprocess import sys import tempfile from collections import OrderedDict import xmltodict def get_range_list(rangestr): result = [] for e in rangestr.split(","): e = e.strip() if not e: continue if e.find("-") == -1: result.append(int(e)) else: start, end = e.split("-") result.extend(list(range(int(start), int(end) + 1))) return result def dict_range_(dct, rangestr, dokeys): keys = list(dct.keys()) if not rangestr or rangestr == "all": for key in keys: if dokeys: yield key else: yield dct[key] return dlen = len(keys) for index in get_range_list(rangestr): if index >= dlen: break key = keys[index] if dokeys: yield key else: yield dct[key] def dict_range_keys(dct, rangestr): return dict_range_(dct, rangestr, True) def dict_range_values(dct, rangestr): return dict_range_(dct, rangestr, False) def get_summary(results): ntest = int(results["@tests"]) nfail = int(results["@failures"]) nerror = int(results["@errors"]) nskip = int(results["@skipped"]) npass = ntest - nfail - nskip - nerror return ntest, npass, nfail, nerror, nskip def print_summary(results, args): ntest, npass, nfail, nerror, nskip = (0, 0, 0, 0, 0) for group in results: _ntest, _npass, _nfail, _nerror, _nskip = get_summary(results[group]) if args.verbose: print( f"Group: {group} Total: {_ntest} PASSED: {_npass}" " FAIL: {_nfail} ERROR: {_nerror} SKIP: {_nskip}" ) ntest += _ntest npass += _npass nfail += _nfail nerror += _nerror nskip += _nskip print(f"Total: {ntest} PASSED: {npass} FAIL: {nfail} ERROR: {nerror} SKIP: {nskip}") def get_global_testcase(results): for group in results: for testcase in results[group]["testcase"]: if "@file" not in testcase: return testcase return None def get_filtered(tfilters, results, args): if isinstance(tfilters, str) or tfilters is None: tfilters = [tfilters] found_files = OrderedDict() for group in results: if isinstance(results[group]["testcase"], list): tlist = results[group]["testcase"] else: tlist = [results[group]["testcase"]] for testcase in tlist: for tfilter in tfilters: if tfilter is None: if ( "failure" not in testcase and "error" not in testcase and "skipped" not in testcase ): break elif tfilter in testcase: break else: continue # cname = testcase["@classname"] fname = testcase.get("@file", "") cname = testcase.get("@classname", "") if not fname and not cname: name = testcase.get("@name", "") if not name: continue # If we had a failure at the module level we could be here. fname = name.replace(".", "/") + ".py" tcname = fname else: if not fname: fname = cname.replace(".", "/") + ".py" if "@name" not in testcase: tcname = fname else: tcname = fname + "::" + testcase["@name"] found_files[tcname] = testcase return found_files def search_testcase(testcase, regexp): for key, val in testcase.items(): if regexp.search(str(val)): return True return False def dump_testcase(testcase): s = "" for key, val in testcase.items(): if isinstance(val, str) or isinstance(val, float) or isinstance(val, int): s += "{}: {}\n".format(key, val) elif isinstance(val, list): for k2, v2 in enumerate(val): s += "{}: {}\n".format(k2, v2) else: for k2, v2 in val.items(): s += "{}: {}\n".format(k2, v2) return s def main(): parser = argparse.ArgumentParser() parser.add_argument( "-a", "--save-xml", action="store_true", help=( "Move [container:]/tmp/topotests/topotests.xml " "to --results value if --results does not exist yet" ), ) parser.add_argument( "-A", "--save", action="store_true", help=( "Move [container:]/tmp/topotests{,.xml} " "to --results value if --results does not exist yet" ), ) parser.add_argument( "-C", "--container", help="specify docker/podman container of the run", ) parser.add_argument( "--use-podman", action="store_true", help="Use `podman` instead of `docker` for saving container data", ) parser.add_argument( "-S", "--select", help=( "select results combination of letters: " "'e'rrored 'f'ailed 'p'assed 's'kipped. " "Default is 'fe', unless --search or --time which default to 'efps'" ), ) parser.add_argument( "-R", "--search", help=( "filter results to those which match a regex. " "All test text is search unless restricted by --errmsg or --errtext" ), ) parser.add_argument( "-r", "--results", help="xml results file or directory containing xml results file", ) parser.add_argument("--rundir", help=argparse.SUPPRESS) parser.add_argument( "-E", "--enumerate", action="store_true", help="enumerate each item (results scoped)", ) parser.add_argument( "-T", "--test", help="select testcase at given ordinal from the enumerated list" ) parser.add_argument( "--errmsg", action="store_true", help="print testcase error message" ) parser.add_argument( "--errtext", action="store_true", help="print testcase error text" ) parser.add_argument( "--full", action="store_true", help="print all logging for selected testcases" ) parser.add_argument("--time", action="store_true", help="print testcase run times") parser.add_argument("-s", "--summary", action="store_true", help="print summary") parser.add_argument("-v", "--verbose", action="store_true", help="be verbose") args = parser.parse_args() if args.save and args.save_xml: logging.critical("Only one of --save or --save-xml allowed") sys.exit(1) scount = bool(args.save) + bool(args.save_xml) # # Saving/Archiving results # docker_bin = "podman" if args.use_podman else "docker" contid = "" if args.container: # check for container existence contid = args.container try: # p = subprocess.run( f"{docker_bin} inspect {contid}", check=True, shell=True, errors="ignore", capture_output=True, ) except subprocess.CalledProcessError: logging.critical(f"{docker_bin} container '{contid}' does not exist") sys.exit(1) # If you need container info someday... # cont_info = json.loads(p.stdout) cppath = "/tmp/topotests" if args.save_xml or scount == 0: cppath += "/topotests.xml" if contid: cppath = contid + ":" + cppath tresfile = None if scount and args.results and not os.path.exists(args.results): if not contid: if not os.path.exists(cppath): logging.critical(f"'{cppath}' doesn't exist to save") sys.exit(1) if args.save_xml: subprocess.run(["cp", cppath, args.results]) else: subprocess.run(["mv", cppath, args.results]) else: try: subprocess.run( f"{docker_bin} cp {cppath} {args.results}", check=True, shell=True, errors="ignore", capture_output=True, ) except subprocess.CalledProcessError as error: logging.critical(f"Can't {docker_bin} cp '{cppath}': %s", str(error)) sys.exit(1) if "SUDO_USER" in os.environ: subprocess.run(["chown", "-R", os.environ["SUDO_USER"], args.results]) elif not args.results: # User doesn't want to save results just use them inplace if not contid: if not os.path.exists(cppath): logging.critical(f"'{cppath}' doesn't exist") sys.exit(1) args.results = cppath else: tresfile, tresname = tempfile.mkstemp( suffix=".xml", prefix="topotests-", text=True ) atexit.register(lambda: os.unlink(tresname)) os.close(tresfile) try: subprocess.run( f"{docker_bin} cp {cppath} {tresname}", check=True, shell=True, errors="ignore", capture_output=True, ) except subprocess.CalledProcessError as error: logging.critical(f"Can't {docker_bin} cp '{cppath}': %s", str(error)) sys.exit(1) args.results = tresname # # Result option validation # count = 0 if args.errmsg: count += 1 if args.errtext: count += 1 if args.full: count += 1 if count > 1: logging.critical("Only one of --full, --errmsg or --errtext allowed") sys.exit(1) if args.time and count: logging.critical("Can't use --full, --errmsg or --errtext with --time") sys.exit(1) if args.enumerate and (count or args.time or args.test): logging.critical( "Can't use --enumerate with --errmsg, --errtext, --full, --test or --time" ) sys.exit(1) results = {} ttfiles = [] if os.path.exists(os.path.join(args.results, "topotests.xml")): args.results = os.path.join(args.results, "topotests.xml") if not os.path.exists(args.results): logging.critical("%s doesn't exist", args.results) sys.exit(1) ttfiles = [args.results] for f in ttfiles: m = re.match(r"tt-group-(\d+)/topotests.xml", f) group = int(m.group(1)) if m else 0 with open(f) as xml_file: results[group] = xmltodict.parse(xml_file.read())["testsuites"]["testsuite"] search_re = re.compile(args.search) if args.search else None if args.select is None: if search_re or args.time: args.select = "efsp" else: args.select = "fe" filters = [] if "e" in args.select: filters.append("error") if "f" in args.select: filters.append("failure") if "s" in args.select: filters.append("skipped") if "p" in args.select: filters.append(None) found_files = get_filtered(filters, results, args) if search_re: found_files = { k: v for k, v in found_files.items() if search_testcase(v, search_re) } if args.enumerate: # print the selected test names with ordinal print("\n".join(["{} {}".format(i, x) for i, x in enumerate(found_files)])) elif args.test is None and count == 0 and not args.time: # print the selected test names print("\n".join([str(x) for x in found_files])) else: rangestr = args.test if args.test else "all" for key in dict_range_keys(found_files, rangestr): testcase = found_files[key] if args.time: text = testcase["@time"] s = "{}: {}".format(text, key) elif args.errtext: if "error" in testcase: errmsg = testcase["error"]["#text"] elif "failure" in testcase: errmsg = testcase["failure"]["#text"] else: errmsg = "none found" s = "{}: {}".format(key, errmsg) elif args.errmsg: if "error" in testcase: errmsg = testcase["error"]["@message"] elif "failure" in testcase: errmsg = testcase["failure"]["@message"] else: errmsg = "none found" s = "{}: {}".format(key, errmsg) else: s = dump_testcase(testcase) print(s) if args.summary: print_summary(results, args) if __name__ == "__main__": main()