diff options
Diffstat (limited to 'scripts/sadt')
-rwxr-xr-x | scripts/sadt | 647 |
1 files changed, 647 insertions, 0 deletions
diff --git a/scripts/sadt b/scripts/sadt new file mode 100755 index 0000000..b874330 --- /dev/null +++ b/scripts/sadt @@ -0,0 +1,647 @@ +#!/usr/bin/python3 +# encoding=UTF-8 + +# Copyright © 2012, 2013, 2014 Jakub Wilk <jwilk@debian.org> + +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. + +""" +simple DEP-8 test runner +""" + +import argparse +import errno +import os +import queue as queuemod +import re +import shutil +import stat +import subprocess as ipc +import sys +import tempfile +import threading +import warnings + +from debian import deb822 + + +def parse_relations(field): + """ + wrap debian.deb822.PkgRelation.parse_relations() to suppress the + UserWarning about the inability to parse something. + See https://bugs.debian.org/712513 + """ + warnings.simplefilter("ignore") + parsed = deb822.PkgRelation.parse_relations(field) + warnings.resetwarnings() + return parsed + + +def chmod_x(path): + """ + chmod a+X <path> + """ + old_mode = stat.S_IMODE(os.stat(path).st_mode) + new_mode = old_mode | ((old_mode & 0o444) >> 2) + if old_mode != new_mode: + os.chmod(path, new_mode) + return old_mode + + +def annotate_output(child): + queue = queuemod.Queue() + + def reader(fd, tag): + buf = b"" + while True: + assert b"\n" not in buf + chunk = os.read(fd, 1024) + if chunk == b"": + break + lines = (buf + chunk).split(b"\n") + buf = lines.pop() + for line in lines: + queue.put((tag, line + b"\n")) + if buf != b"": + queue.put((tag, buf)) + queue.put(None) + + queue = queuemod.Queue() + threads = [] + for pipe, tag in [(child.stdout, "O"), (child.stderr, "E")]: + thread = threading.Thread(target=reader, args=(pipe.fileno(), tag)) + thread.start() + threads += [thread] + nthreads = len(threads) + while nthreads > 0: + item = queue.get() + if item is None: + nthreads -= 1 + continue + yield item + for thread in threads: + thread.join() + + +class Skip(Exception): + pass + + +class Flaky(Exception): + pass + + +class Fail(Exception): + pass + + +class Progress: + @staticmethod + def _write(text): + sys.stdout.write(text) + sys.stdout.flush() + + def start(self, name): + pass + + def output(self, line): + pass + + def skip(self, name, reason): + raise NotImplementedError + + def fail(self, name, reason): + raise NotImplementedError + + def ok(self, name): + raise NotImplementedError + + def close(self): + pass + + +class DefaultProgress(Progress): + _hourglass = r"/-\|" + + def __init__(self): + self._counter = 0 + self._output = False + if sys.stdout.isatty(): + self._back = "\b" + else: + self._back = "" + + def _reset(self): + if self._output: + self._write(self._back) + + def start(self, name): + self._output = False + + def output(self, line): + if not self._back: + return + hourglass = self._hourglass + counter = self._counter + 1 + self._reset() + self._write(hourglass[counter % len(hourglass)]) + self._counter = counter + self._output = True + + def skip(self, name, reason): + self._write("S") + + def fail(self, name, reason): + self._reset() + self._write("F") + + def ok(self, name): + self._reset() + self._write(".") + + def close(self): + self._write("\n") + + +class VerboseProgress(Progress): + def _separator(self): + self._write("-" * 70 + "\n") + + def start(self, name): + self._separator() + self._write(f"{name}\n") + self._separator() + + def output(self, line): + self._write(line) + + def skip(self, name, reason): + self._write(f"{name}: SKIP ({reason})\n") + + def fail(self, name, reason): + self._separator() + self._write(f"{name}: FAIL ({reason})\n") + self._write("\n") + + def ok(self, name): + self._separator() + self._write(f"{name}: PASS\n") + self._write("\n") + + +class TestCommand: + def __init__(self, group, command): + self.group = group + self.command = command + + def __str__(self): + return self.command + + @property + def name(self): + return str(self) + + def get_command(self): + return ["sh", "-c", self.command] + + def prepare(self, progress, rw_build_tree): + pass + + def cleanup(self): + pass + + def run(self, progress, options): # pylint: disable=too-many-locals + command = self.get_command() + tmpdir1 = tempfile.mkdtemp(prefix="sadt.") + tmpdir2 = tempfile.mkdtemp(prefix="sadt.") + environ = dict(os.environ) + environ["AUTOPKGTEST_TMP"] = tmpdir1 + # only for compatibility with old DEP-8 spec. + environ["ADTTMP"] = tmpdir2 + child = ipc.Popen( # pylint: disable=consider-using-with + command, stdout=ipc.PIPE, stderr=ipc.PIPE, env=environ + ) + output = [] + stderr = False + for tag, line in annotate_output(child): + if tag == "E": + stderr = True + this_line = f"{tag}: {line.decode(sys.stdout.encoding, 'replace')}" + progress.output(this_line) + output.append(this_line) + for fp in child.stdout, child.stderr: + fp.close() + returncode = child.wait() + shutil.rmtree(tmpdir1) + shutil.rmtree(tmpdir2) + + if returncode == 77 and options.skippable: + reason = "exit status 77 and marked as skippable" + progress.skip(self, reason) + raise Skip(self, reason, "".join(output)) + + fail_reason = None + + if returncode == 0: + if stderr and not options.allow_stderr: + returncode = -1 + fail_reason = "stderr non-empty" + else: + fail_reason = f"exit code: {returncode}" + + if returncode != 0: + if options.flaky: + progress.skip(self, fail_reason) + raise Flaky(self, fail_reason, "".join(output)) + + progress.fail(self, fail_reason) + raise Fail(self, fail_reason, "".join(output)) + + progress.ok(self) + + +class Test(TestCommand): + def __init__(self, group, testname): + self.testname = testname + self.original_mode = None + self.cwd = None + super().__init__(group, testname) + + @property + def path(self): + return os.path.join(self.group.tests_directory, self.testname) + + def __str__(self): + return self.testname + + def get_command(self): + return [self.path] + + def prepare(self, progress, rw_build_tree): + if rw_build_tree: + self.cwd = os.getcwd() + os.chdir(rw_build_tree) + chmod_x(self.path) + else: + if not os.access(self.path, os.X_OK): + try: + self.original_mode = chmod_x(self.path) + except OSError as exc: + progress.skip( + self.testname, + f"{self.path} could not be made executable: {exc}", + ) + raise Skip from exc + + def cleanup(self): + if self.original_mode is not None: + os.chmod(self.path, self.original_mode) + if self.cwd is not None: + os.chdir(self.cwd) + + +class TestOptions: # pylint: disable=too-few-public-methods + def __init__(self): + self.allow_stderr = False + self.flaky = False + self.rw_build_tree_needed = False + self.skippable = False + + +class TestGroup: + def __init__(self): + self.tests = [] + self.restrictions = frozenset() + self.features = frozenset() + self.depends = "@" + self.tests_directory = "debian/tests" + self._depends_checked = False + self._depends_cache = None + + def __iter__(self): + return iter(self.tests) + + def expand_depends(self, packages, build_depends): + if "@" not in self.depends: + return + or_clauses = [] + parsed_depends = parse_relations(self.depends) + for or_clause in parsed_depends: + if len(or_clause) == 1 and or_clause[0]["name"] == "@builddeps@": + or_clauses += build_depends + or_clauses += parse_relations("make") + continue + stripped_or_clause = [r for r in or_clause if r["name"] != "@"] + if len(stripped_or_clause) < len(or_clause): + for package in packages: + or_clauses += [ + stripped_or_clause + + [{"name": package, "version": None, "arch": None}] + ] + else: + or_clauses += [or_clause] + self.depends = deb822.PkgRelation.str(or_clauses) + + def check_depends(self): + if self._depends_checked: + if isinstance(self._depends_cache, Exception): + raise self._depends_cache # fpos, pylint: disable=raising-bad-type + return + child = ipc.Popen( # pylint: disable=consider-using-with + ["dpkg-checkbuilddeps", "-d", self.depends], stderr=ipc.PIPE, env={} + ) + error = child.stderr.read().decode("ASCII") + child.stderr.close() + if child.wait() != 0: + error = re.sub( + "^dpkg-checkbuilddeps: Unmet build dependencies", + "unmet dependencies", + error, + ) + error = error.rstrip() + skip = Skip(error) + self._depends_cache = skip + raise skip + self._depends_checked = True + + def check_restrictions(self, ignored_restrictions): + options = TestOptions() + restrictions = self.restrictions - frozenset(ignored_restrictions) + + for restriction in restrictions: + if restriction == "rw-build-tree": + options.rw_build_tree_needed = True + elif restriction == "needs-root": + if os.getuid() != 0: + raise Skip("this test needs root privileges") + elif restriction == "breaks-testbed": + raise Skip("breaks-testbed restriction is not implemented; use adt-run") + elif restriction == "build-needed": + raise Skip("source tree not built") + elif restriction == "allow-stderr": + options.allow_stderr = True + elif restriction == "flaky": + options.flaky = True + elif restriction == "skippable": + options.skippable = True + else: + raise Skip(f"unknown restriction: {restriction}") + return options + + def check(self, ignored_restrictions=()): + options = self.check_restrictions(ignored_restrictions) + self.check_depends() + return options + + def run( + self, + test, + progress, + ignored_restrictions=(), + rw_build_tree=None, + built_source_tree=None, + ): + ignored_restrictions = set(ignored_restrictions) + if rw_build_tree: + ignored_restrictions.add("rw-build-tree") + if built_source_tree: + ignored_restrictions.add("build-needed") + ignored_restrictions.add("needs-recommends") + ignored_restrictions.add("superficial") + try: + options = self.check(ignored_restrictions) + except Skip as exc: + progress.skip(test, str(exc)) + raise + test.prepare(progress, rw_build_tree) + try: + progress.start(test) + test.run(progress, options) + finally: + test.cleanup() + + def add_tests(self, tests): + tests = [Test(self, t) for t in re.split(r"\s*,?\s+", tests)] + self.tests = frozenset(tests) + + def add_test_command(self, test_command): + self.tests = frozenset([TestCommand(self, test_command)]) + + def add_restrictions(self, restrictions): + restrictions = re.split(r"\s*,?\s+", restrictions) + self.restrictions = frozenset(restrictions) + + def add_features(self, features): + features = re.split(r"\s*,?\s+", features) + self.features = frozenset(features) + + def add_depends(self, depends): + self.depends = depends + + def add_tests_directory(self, path): + self.tests_directory = path + + +def copy_build_tree(): + rw_build_tree = tempfile.mkdtemp(prefix="sadt-rwbt.") + print(f"sadt: info: copying build tree to {rw_build_tree}", file=sys.stderr) + ipc.check_call(["cp", "-a", ".", rw_build_tree]) + return rw_build_tree + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__.strip()) + parser.add_argument("-v", "--verbose", action="store_true", help="verbose output") + parser.add_argument( + "-b", + "--built-source-tree", + action="store_true", + help="assume built source tree", + ) + parser.add_argument( + "--run-autodep8", action="store_true", help="Run autodep8 (default)" + ) + parser.add_argument( + "--no-run-autodep8", action="store_false", help="Don't run autodep8" + ) + parser.set_defaults(run_autodep8=True) + parser.add_argument( + "--ignore-restrictions", + metavar="<restr>[,<restr>...]", + help="ignore specified restrictions", + default="", + ) + parser.add_argument("tests", metavar="<test-name>", nargs="*", help="tests to run") + options = parser.parse_args() + options.tests = frozenset(options.tests) + options.ignore_restrictions = frozenset(options.ignore_restrictions.split(",")) + return options + + +def get_test_groups( + binary_packages, build_depends, run_autodep8: bool +) -> list[TestGroup]: + test_groups = [] + try: + ipc.check_call(["which", "autodep8"], stdout=ipc.DEVNULL) + autodep8_available = True + except ipc.CalledProcessError: + autodep8_available = False + try: + if run_autodep8 and autodep8_available: + file = tempfile.TemporaryFile("w+") + ipc.check_call(["autodep8"], stdout=file) + file.seek(0) + else: + file = open("debian/tests/control", encoding="UTF-8") + except IOError as exc: + if exc.errno == errno.ENOENT: + print("sadt: error: cannot find debian/tests/control", file=sys.stderr) + sys.exit(1) + raise + with file: + for para in deb822.Packages.iter_paragraphs(file): + group = TestGroup() + for key, value in para.items(): + lkey = key.lower().replace("-", "_") + try: + method = getattr(group, "add_" + lkey) + except AttributeError: + print( + f"sadt: warning: unknown field {key}," + f" skipping the whole paragraph", + file=sys.stderr, + ) + group = None + break + method(value) + if group is not None: + group.expand_depends(binary_packages, build_depends) + test_groups += [group] + return test_groups + + +def test_summary(failures, flakes, n_skip: int, n_ok: int) -> str: + n_fail = len(failures) + n_flake = len(flakes) + n_test = n_fail + n_flake + n_skip + n_ok + if failures: + for name, exception in failures: + print("=" * 70) + print(f"FAIL: {name} ({exception.args[1]})") + if flakes: + for name, exception in flakes: + print("=" * 70) + print(f"FLAKY: {name} ({exception.args[1]})") + print() + fmt_message = [f"tests={n_test}"] + if n_skip > 0: + fmt_message += [f"skipped={n_skip}"] + if n_fail > 0: + fmt_message += [f"failures={n_fail}"] + if n_flake > 0: + fmt_message += [f"flaky={n_flake}"] + if fmt_message: + extra_message = f" ({', '.join(fmt_message)})" + else: + extra_message = "" + message = ("OK" if n_fail == 0 else "FAILED") + extra_message + return message + + +def run_tests(test_groups: list[TestGroup], options: argparse.Namespace) -> None: + # TODO: refactor run_tests function + # pylint: disable=too-many-branches,too-many-nested-blocks + failures = [] + flakes = [] + n_skip = n_ok = 0 + progress = VerboseProgress() if options.verbose else DefaultProgress() + rw_build_tree = None + try: + for group in test_groups: + for test in group: + if options.tests and test.name not in options.tests: + continue + try: + if rw_build_tree is None: + try: + group_options = group.check() + except Skip: + pass + else: + if group_options.rw_build_tree_needed: + rw_build_tree = copy_build_tree() + assert rw_build_tree is not None + group.run( + test, + progress=progress, + ignored_restrictions=options.ignore_restrictions, + rw_build_tree=rw_build_tree, + built_source_tree=options.built_source_tree, + ) + except Skip: + n_skip += 1 + except Fail as exc: + failures += [(test, exc)] + except Flaky as exc: + flakes += [(test, exc)] + else: + n_ok += 1 + finally: + progress.close() + n_fail = len(failures) + print(test_summary(failures, flakes, n_skip, n_ok)) + if rw_build_tree is not None: + shutil.rmtree(rw_build_tree) + sys.exit(n_fail > 0) + + +def main() -> None: + options = parse_args() + binary_packages = set() + build_depends = [] + try: + file = open("debian/control", encoding="UTF-8") + except IOError as exc: + if exc.errno == errno.ENOENT: + print("sadt: error: cannot find debian/control", file=sys.stderr) + sys.exit(1) + raise + with file: + for i, para in enumerate(deb822.Packages.iter_paragraphs(file)): + if i == 0: + # FIXME statement with no effect + # para['Source'] + for field in ( + "Build-Depends", + "Build-Depends-Indep", + "Build-Depends-Arch", + ): + try: + build_depends += parse_relations(para[field]) + except KeyError: + continue + else: + if para.get("Package-Type") == "udeb": + # udebs can't be tested + continue + binary_packages.add(para["Package"]) + + test_groups = get_test_groups(binary_packages, build_depends, options.run_autodep8) + run_tests(test_groups, options) + + +if __name__ == "__main__": + main() + +# vim:ts=4 sw=4 et |