summaryrefslogtreecommitdiffstats
path: root/coverage.py
diff options
context:
space:
mode:
Diffstat (limited to 'coverage.py')
-rwxr-xr-xcoverage.py457
1 files changed, 457 insertions, 0 deletions
diff --git a/coverage.py b/coverage.py
new file mode 100755
index 0000000..7e911cf
--- /dev/null
+++ b/coverage.py
@@ -0,0 +1,457 @@
+#!/usr/bin/env python3
+
+from debian.deb822 import Deb822, Release
+import email.utils
+import os
+import sys
+import shutil
+import subprocess
+import argparse
+import time
+from datetime import timedelta
+from collections import defaultdict
+from itertools import product
+
+have_qemu = os.getenv("HAVE_QEMU", "yes") == "yes"
+have_binfmt = os.getenv("HAVE_BINFMT", "yes") == "yes"
+run_ma_same_tests = os.getenv("RUN_MA_SAME_TESTS", "yes") == "yes"
+use_host_apt_config = os.getenv("USE_HOST_APT_CONFIG", "no") == "yes"
+cmd = os.getenv("CMD", "./mmdebstrap")
+
+default_dist = os.getenv("DEFAULT_DIST", "unstable")
+all_dists = ["oldstable", "stable", "testing", "unstable"]
+default_mode = "auto"
+all_modes = ["auto", "root", "unshare", "fakechroot", "chrootless"]
+default_variant = "apt"
+all_variants = [
+ "extract",
+ "custom",
+ "essential",
+ "apt",
+ "minbase",
+ "buildd",
+ "-",
+ "standard",
+]
+default_format = "auto"
+all_formats = ["auto", "directory", "tar", "squashfs", "ext2", "null"]
+
+mirror = os.getenv("mirror", "http://127.0.0.1/debian")
+hostarch = subprocess.check_output(["dpkg", "--print-architecture"]).decode().strip()
+
+release_path = f"./shared/cache/debian/dists/{default_dist}/InRelease"
+if not os.path.exists(release_path):
+ print("path doesn't exist:", release_path, file=sys.stderr)
+ print("run ./make_mirror.sh first", file=sys.stderr)
+ exit(1)
+if os.getenv("SOURCE_DATE_EPOCH") is not None:
+ s_d_e = os.getenv("SOURCE_DATE_EPOCH")
+else:
+ with open(release_path) as f:
+ rel = Release(f)
+ s_d_e = str(email.utils.mktime_tz(email.utils.parsedate_tz(rel["Date"])))
+
+separator = (
+ "------------------------------------------------------------------------------"
+)
+
+
+def skip(condition, dist, mode, variant, fmt):
+ if not condition:
+ return ""
+ for line in condition.splitlines():
+ if not line:
+ continue
+ if eval(line):
+ return line.strip()
+ return ""
+
+
+def parse_config(confname):
+ config_dict = defaultdict(dict)
+ config_order = list()
+ all_vals = {
+ "Dists": all_dists,
+ "Modes": all_modes,
+ "Variants": all_variants,
+ "Formats": all_formats,
+ }
+ with open(confname) as f:
+ for test in Deb822.iter_paragraphs(f):
+ if "Test" not in test.keys():
+ print("Test without name", file=sys.stderr)
+ exit(1)
+ name = test["Test"]
+ config_order.append(name)
+ for k in test.keys():
+ v = test[k]
+ if k not in [
+ "Test",
+ "Dists",
+ "Modes",
+ "Variants",
+ "Formats",
+ "Skip-If",
+ "Needs-QEMU",
+ "Needs-Root",
+ "Needs-APT-Config",
+ ]:
+ print(f"Unknown field name {k} in test {name}")
+ exit(1)
+ if k in all_vals.keys():
+ if v == "default":
+ print(
+ f"Setting {k} to default in Test {name} is redundant",
+ file=sys.stderr,
+ )
+ exit(1)
+ if v == "any":
+ v = all_vals[k]
+ else:
+ # else, split the value by whitespace
+ v = v.split()
+ for i in v:
+ if i not in all_vals[k]:
+ print(
+ f"{i} is not a valid value for {k}", file=sys.stderr
+ )
+ exit(1)
+ config_dict[name][k] = v
+ return config_order, config_dict
+
+
+def format_test(num, total, name, dist, mode, variant, fmt, config_dict):
+ ret = f"({num}/{total}) {name}"
+ if len(config_dict[name].get("Dists", [])) > 1:
+ ret += f" --dist={dist}"
+ if len(config_dict[name].get("Modes", [])) > 1:
+ ret += f" --mode={mode}"
+ if len(config_dict[name].get("Variants", [])) > 1:
+ ret += f" --variant={variant}"
+ if len(config_dict[name].get("Formats", [])) > 1:
+ ret += f" --format={fmt}"
+ return ret
+
+
+def print_time_per_test(time_per_test, name="test"):
+ print(
+ f"average time per {name}:",
+ sum(time_per_test.values(), start=timedelta()) / len(time_per_test),
+ file=sys.stderr,
+ )
+ print(
+ f"median time per {name}:",
+ sorted(time_per_test.values())[len(time_per_test) // 2],
+ file=sys.stderr,
+ )
+ head_tail_num = 10
+ print(f"{head_tail_num} fastests {name}s:", file=sys.stderr)
+ for k, v in sorted(time_per_test.items(), key=lambda i: i[1])[
+ : min(head_tail_num, len(time_per_test))
+ ]:
+ print(f" {k}: {v}", file=sys.stderr)
+ print(f"{head_tail_num} slowest {name}s:", file=sys.stderr)
+ for k, v in sorted(time_per_test.items(), key=lambda i: i[1], reverse=True)[
+ : min(head_tail_num, len(time_per_test))
+ ]:
+ print(f" {k}: {v}", file=sys.stderr)
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("test", nargs="*", help="only run these tests")
+ parser.add_argument(
+ "-x",
+ "--exitfirst",
+ action="store_const",
+ dest="maxfail",
+ const=1,
+ help="exit instantly on first error or failed test.",
+ )
+ parser.add_argument(
+ "--maxfail",
+ metavar="num",
+ action="store",
+ type=int,
+ dest="maxfail",
+ default=0,
+ help="exit after first num failures or errors.",
+ )
+ parser.add_argument(
+ "--mode",
+ metavar="mode",
+ help=f"only run tests with this mode (Default = {default_mode})",
+ )
+ parser.add_argument(
+ "--dist",
+ metavar="dist",
+ help=f"only run tests with this dist (Default = {default_dist})",
+ )
+ parser.add_argument(
+ "--variant",
+ metavar="variant",
+ help=f"only run tests with this variant (Default = {default_variant})",
+ )
+ parser.add_argument(
+ "--format",
+ metavar="format",
+ help=f"only run tests with this format (Default = {default_format})",
+ )
+ parser.add_argument(
+ "--skip", metavar="test", action="append", help="skip this test"
+ )
+ args = parser.parse_args()
+
+ # copy over files from git or as distributed
+ for git, dist, target in [
+ ("./mmdebstrap", "/usr/bin/mmdebstrap", "mmdebstrap"),
+ ("./tarfilter", "/usr/bin/mmtarfilter", "tarfilter"),
+ (
+ "./proxysolver",
+ "/usr/lib/apt/solvers/mmdebstrap-dump-solution",
+ "proxysolver",
+ ),
+ (
+ "./ldconfig.fakechroot",
+ "/usr/libexec/mmdebstrap/ldconfig.fakechroot",
+ "ldconfig.fakechroot",
+ ),
+ ]:
+ if os.path.exists(git):
+ shutil.copy(git, f"shared/{target}")
+ else:
+ shutil.copy(dist, f"shared/{target}")
+ # copy over hooks from git or as distributed
+ if os.path.exists("hooks"):
+ shutil.copytree("hooks", "shared/hooks", dirs_exist_ok=True)
+ else:
+ shutil.copytree(
+ "/usr/share/mmdebstrap/hooks", "shared/hooks", dirs_exist_ok=True
+ )
+
+ # parse coverage.txt
+ config_order, config_dict = parse_config("coverage.txt")
+
+ indirbutnotcovered = set(
+ [d for d in os.listdir("tests") if not d.startswith(".")]
+ ) - set(config_order)
+ if indirbutnotcovered:
+ print(
+ "test(s) missing from coverage.txt: %s"
+ % (", ".join(sorted(indirbutnotcovered))),
+ file=sys.stderr,
+ )
+ exit(1)
+ coveredbutnotindir = set(config_order) - set(
+ [d for d in os.listdir("tests") if not d.startswith(".")]
+ )
+ if coveredbutnotindir:
+ print(
+ "test(s) missing from ./tests: %s"
+ % (", ".join(sorted(coveredbutnotindir))),
+ file=sys.stderr,
+ )
+
+ exit(1)
+
+ # produce the list of tests using the cartesian product of all allowed
+ # dists, modes, variants and formats of a given test
+ tests = []
+ for name in config_order:
+ test = config_dict[name]
+ for dist, mode, variant, fmt in product(
+ test.get("Dists", [default_dist]),
+ test.get("Modes", [default_mode]),
+ test.get("Variants", [default_variant]),
+ test.get("Formats", [default_format]),
+ ):
+ skipreason = skip(test.get("Skip-If"), dist, mode, variant, fmt)
+ if skipreason:
+ tt = ("skip", skipreason)
+ elif (
+ test.get("Needs-APT-Config", "false") == "true" and use_host_apt_config
+ ):
+ tt = ("skip", "test cannot use host apt config")
+ elif have_qemu:
+ tt = "qemu"
+ elif test.get("Needs-QEMU", "false") == "true":
+ tt = ("skip", "test needs QEMU")
+ elif test.get("Needs-Root", "false") == "true":
+ tt = "sudo"
+ elif mode == "root":
+ tt = "sudo"
+ else:
+ tt = "null"
+ tests.append((tt, name, dist, mode, variant, fmt))
+
+ torun = []
+ num_tests = len(tests)
+ if args.test:
+ # check if all given tests are either a valid name or a valid number
+ for test in args.test:
+ if test in [name for (_, name, _, _, _, _) in tests]:
+ continue
+ if not test.isdigit():
+ print(f"cannot find test named {test}", file=sys.stderr)
+ exit(1)
+ if int(test) >= len(tests) or int(test) <= 0 or str(int(test)) != test:
+ print(f"test number {test} doesn't exist", file=sys.stderr)
+ exit(1)
+
+ for i, (_, name, _, _, _, _) in enumerate(tests):
+ # if either the number or test name matches, then we use this test,
+ # otherwise we skip it
+ if name in args.test:
+ torun.append(i)
+ if str(i + 1) in args.test:
+ torun.append(i)
+ num_tests = len(torun)
+
+ starttime = time.time()
+ skipped = defaultdict(list)
+ failed = []
+ num_success = 0
+ num_finished = 0
+ time_per_test = {}
+ acc_time_per_test = defaultdict(list)
+ for i, (test, name, dist, mode, variant, fmt) in enumerate(tests):
+ if torun and i not in torun:
+ continue
+ print(separator, file=sys.stderr)
+ print("(%d/%d) %s" % (i + 1, len(tests), name), file=sys.stderr)
+ print("dist: %s" % dist, file=sys.stderr)
+ print("mode: %s" % mode, file=sys.stderr)
+ print("variant: %s" % variant, file=sys.stderr)
+ print("format: %s" % fmt, file=sys.stderr)
+ if num_finished > 0:
+ currenttime = time.time()
+ timeleft = timedelta(
+ seconds=int(
+ (num_tests - num_finished)
+ * (currenttime - starttime)
+ / num_finished
+ )
+ )
+ print("time left: %s" % timeleft, file=sys.stderr)
+ if failed:
+ print("failed: %d" % len(failed), file=sys.stderr)
+ num_finished += 1
+ with open("tests/" + name) as fin, open("shared/test.sh", "w") as fout:
+ for line in fin:
+ line = line.replace("{{ CMD }}", cmd)
+ line = line.replace("{{ SOURCE_DATE_EPOCH }}", s_d_e)
+ line = line.replace("{{ DIST }}", dist)
+ line = line.replace("{{ MIRROR }}", mirror)
+ line = line.replace("{{ MODE }}", mode)
+ line = line.replace("{{ VARIANT }}", variant)
+ line = line.replace("{{ FORMAT }}", fmt)
+ line = line.replace("{{ HOSTARCH }}", hostarch)
+ fout.write(line)
+ # ignore:
+ # SC2016 Expressions don't expand in single quotes, use double quotes for that.
+ # SC2050 This expression is constant. Did you forget the $ on a variable?
+ # SC2194 This word is constant. Did you forget the $ on a variable?
+ shellcheck = subprocess.run(
+ [
+ "shellcheck",
+ "--exclude=SC2050,SC2194,SC2016",
+ "-f",
+ "gcc",
+ "shared/test.sh",
+ ],
+ check=False,
+ stdout=subprocess.PIPE,
+ ).stdout.decode()
+ argv = None
+ match test:
+ case "qemu":
+ argv = ["./run_qemu.sh"]
+ case "sudo":
+ argv = ["./run_null.sh", "SUDO"]
+ case "null":
+ argv = ["./run_null.sh"]
+ case ("skip", reason):
+ skipped[reason].append(
+ format_test(
+ i + 1, len(tests), name, dist, mode, variant, fmt, config_dict
+ )
+ )
+ print(f"skipped because of {reason}", file=sys.stderr)
+ continue
+ print(separator, file=sys.stderr)
+ if args.skip and name in args.skip:
+ print(f"skipping because of --skip={name}", file=sys.stderr)
+ continue
+ if args.dist and args.dist != dist:
+ print(f"skipping because of --dist={args.dist}", file=sys.stderr)
+ continue
+ if args.mode and args.mode != mode:
+ print(f"skipping because of --mode={args.mode}", file=sys.stderr)
+ continue
+ if args.variant and args.variant != variant:
+ print(f"skipping because of --variant={args.variant}", file=sys.stderr)
+ continue
+ if args.format and args.format != fmt:
+ print(f"skipping because of --format={args.format}", file=sys.stderr)
+ continue
+ before = time.time()
+ proc = subprocess.Popen(argv)
+ try:
+ proc.wait()
+ except KeyboardInterrupt:
+ proc.terminate()
+ proc.wait()
+ break
+ after = time.time()
+ walltime = timedelta(seconds=int(after - before))
+ formated_test_name = format_test(
+ i + 1, len(tests), name, dist, mode, variant, fmt, config_dict
+ )
+ time_per_test[formated_test_name] = walltime
+ acc_time_per_test[name].append(walltime)
+ print(separator, file=sys.stderr)
+ print(f"duration: {walltime}", file=sys.stderr)
+ if proc.returncode != 0 or shellcheck != "":
+ if shellcheck != "":
+ print(shellcheck)
+ failed.append(formated_test_name)
+ print("result: FAILURE", file=sys.stderr)
+ else:
+ print("result: SUCCESS", file=sys.stderr)
+ num_success += 1
+ if args.maxfail and len(failed) >= args.maxfail:
+ break
+ print(separator, file=sys.stderr)
+ print(
+ "successfully ran %d tests" % num_success,
+ file=sys.stderr,
+ )
+ if skipped:
+ print("skipped %d:" % sum([len(v) for v in skipped.values()]), file=sys.stderr)
+ for reason, l in skipped.items():
+ print(f"skipped because of {reason}:", file=sys.stderr)
+ for t in l:
+ print(f" {t}", file=sys.stderr)
+ if len(time_per_test) > 1:
+ print_time_per_test(time_per_test)
+ if len(acc_time_per_test) > 1:
+ print_time_per_test(
+ {
+ f"{len(v)}x {k}": sum(v, start=timedelta())
+ for k, v in acc_time_per_test.items()
+ },
+ "accumulated test",
+ )
+ if failed:
+ print("failed %d:" % len(failed), file=sys.stderr)
+ for f in failed:
+ print(f, file=sys.stderr)
+ currenttime = time.time()
+ walltime = timedelta(seconds=int(currenttime - starttime))
+ print(f"total runtime: {walltime}", file=sys.stderr)
+ if failed:
+ exit(1)
+
+
+if __name__ == "__main__":
+ main()