summaryrefslogtreecommitdiffstats
path: root/scripts/sadt
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/sadt')
-rwxr-xr-xscripts/sadt647
1 files changed, 647 insertions, 0 deletions
diff --git a/scripts/sadt b/scripts/sadt
new file mode 100755
index 0000000..b874330
--- /dev/null
+++ b/scripts/sadt
@@ -0,0 +1,647 @@
+#!/usr/bin/python3
+# encoding=UTF-8
+
+# Copyright © 2012, 2013, 2014 Jakub Wilk <jwilk@debian.org>
+
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
+# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+"""
+simple DEP-8 test runner
+"""
+
+import argparse
+import errno
+import os
+import queue as queuemod
+import re
+import shutil
+import stat
+import subprocess as ipc
+import sys
+import tempfile
+import threading
+import warnings
+
+from debian import deb822
+
+
+def parse_relations(field):
+ """
+ wrap debian.deb822.PkgRelation.parse_relations() to suppress the
+ UserWarning about the inability to parse something.
+ See https://bugs.debian.org/712513
+ """
+ warnings.simplefilter("ignore")
+ parsed = deb822.PkgRelation.parse_relations(field)
+ warnings.resetwarnings()
+ return parsed
+
+
+def chmod_x(path):
+ """
+ chmod a+X <path>
+ """
+ old_mode = stat.S_IMODE(os.stat(path).st_mode)
+ new_mode = old_mode | ((old_mode & 0o444) >> 2)
+ if old_mode != new_mode:
+ os.chmod(path, new_mode)
+ return old_mode
+
+
+def annotate_output(child):
+ queue = queuemod.Queue()
+
+ def reader(fd, tag):
+ buf = b""
+ while True:
+ assert b"\n" not in buf
+ chunk = os.read(fd, 1024)
+ if chunk == b"":
+ break
+ lines = (buf + chunk).split(b"\n")
+ buf = lines.pop()
+ for line in lines:
+ queue.put((tag, line + b"\n"))
+ if buf != b"":
+ queue.put((tag, buf))
+ queue.put(None)
+
+ queue = queuemod.Queue()
+ threads = []
+ for pipe, tag in [(child.stdout, "O"), (child.stderr, "E")]:
+ thread = threading.Thread(target=reader, args=(pipe.fileno(), tag))
+ thread.start()
+ threads += [thread]
+ nthreads = len(threads)
+ while nthreads > 0:
+ item = queue.get()
+ if item is None:
+ nthreads -= 1
+ continue
+ yield item
+ for thread in threads:
+ thread.join()
+
+
+class Skip(Exception):
+ pass
+
+
+class Flaky(Exception):
+ pass
+
+
+class Fail(Exception):
+ pass
+
+
+class Progress:
+ @staticmethod
+ def _write(text):
+ sys.stdout.write(text)
+ sys.stdout.flush()
+
+ def start(self, name):
+ pass
+
+ def output(self, line):
+ pass
+
+ def skip(self, name, reason):
+ raise NotImplementedError
+
+ def fail(self, name, reason):
+ raise NotImplementedError
+
+ def ok(self, name):
+ raise NotImplementedError
+
+ def close(self):
+ pass
+
+
+class DefaultProgress(Progress):
+ _hourglass = r"/-\|"
+
+ def __init__(self):
+ self._counter = 0
+ self._output = False
+ if sys.stdout.isatty():
+ self._back = "\b"
+ else:
+ self._back = ""
+
+ def _reset(self):
+ if self._output:
+ self._write(self._back)
+
+ def start(self, name):
+ self._output = False
+
+ def output(self, line):
+ if not self._back:
+ return
+ hourglass = self._hourglass
+ counter = self._counter + 1
+ self._reset()
+ self._write(hourglass[counter % len(hourglass)])
+ self._counter = counter
+ self._output = True
+
+ def skip(self, name, reason):
+ self._write("S")
+
+ def fail(self, name, reason):
+ self._reset()
+ self._write("F")
+
+ def ok(self, name):
+ self._reset()
+ self._write(".")
+
+ def close(self):
+ self._write("\n")
+
+
+class VerboseProgress(Progress):
+ def _separator(self):
+ self._write("-" * 70 + "\n")
+
+ def start(self, name):
+ self._separator()
+ self._write(f"{name}\n")
+ self._separator()
+
+ def output(self, line):
+ self._write(line)
+
+ def skip(self, name, reason):
+ self._write(f"{name}: SKIP ({reason})\n")
+
+ def fail(self, name, reason):
+ self._separator()
+ self._write(f"{name}: FAIL ({reason})\n")
+ self._write("\n")
+
+ def ok(self, name):
+ self._separator()
+ self._write(f"{name}: PASS\n")
+ self._write("\n")
+
+
+class TestCommand:
+ def __init__(self, group, command):
+ self.group = group
+ self.command = command
+
+ def __str__(self):
+ return self.command
+
+ @property
+ def name(self):
+ return str(self)
+
+ def get_command(self):
+ return ["sh", "-c", self.command]
+
+ def prepare(self, progress, rw_build_tree):
+ pass
+
+ def cleanup(self):
+ pass
+
+ def run(self, progress, options): # pylint: disable=too-many-locals
+ command = self.get_command()
+ tmpdir1 = tempfile.mkdtemp(prefix="sadt.")
+ tmpdir2 = tempfile.mkdtemp(prefix="sadt.")
+ environ = dict(os.environ)
+ environ["AUTOPKGTEST_TMP"] = tmpdir1
+ # only for compatibility with old DEP-8 spec.
+ environ["ADTTMP"] = tmpdir2
+ child = ipc.Popen( # pylint: disable=consider-using-with
+ command, stdout=ipc.PIPE, stderr=ipc.PIPE, env=environ
+ )
+ output = []
+ stderr = False
+ for tag, line in annotate_output(child):
+ if tag == "E":
+ stderr = True
+ this_line = f"{tag}: {line.decode(sys.stdout.encoding, 'replace')}"
+ progress.output(this_line)
+ output.append(this_line)
+ for fp in child.stdout, child.stderr:
+ fp.close()
+ returncode = child.wait()
+ shutil.rmtree(tmpdir1)
+ shutil.rmtree(tmpdir2)
+
+ if returncode == 77 and options.skippable:
+ reason = "exit status 77 and marked as skippable"
+ progress.skip(self, reason)
+ raise Skip(self, reason, "".join(output))
+
+ fail_reason = None
+
+ if returncode == 0:
+ if stderr and not options.allow_stderr:
+ returncode = -1
+ fail_reason = "stderr non-empty"
+ else:
+ fail_reason = f"exit code: {returncode}"
+
+ if returncode != 0:
+ if options.flaky:
+ progress.skip(self, fail_reason)
+ raise Flaky(self, fail_reason, "".join(output))
+
+ progress.fail(self, fail_reason)
+ raise Fail(self, fail_reason, "".join(output))
+
+ progress.ok(self)
+
+
+class Test(TestCommand):
+ def __init__(self, group, testname):
+ self.testname = testname
+ self.original_mode = None
+ self.cwd = None
+ super().__init__(group, testname)
+
+ @property
+ def path(self):
+ return os.path.join(self.group.tests_directory, self.testname)
+
+ def __str__(self):
+ return self.testname
+
+ def get_command(self):
+ return [self.path]
+
+ def prepare(self, progress, rw_build_tree):
+ if rw_build_tree:
+ self.cwd = os.getcwd()
+ os.chdir(rw_build_tree)
+ chmod_x(self.path)
+ else:
+ if not os.access(self.path, os.X_OK):
+ try:
+ self.original_mode = chmod_x(self.path)
+ except OSError as exc:
+ progress.skip(
+ self.testname,
+ f"{self.path} could not be made executable: {exc}",
+ )
+ raise Skip from exc
+
+ def cleanup(self):
+ if self.original_mode is not None:
+ os.chmod(self.path, self.original_mode)
+ if self.cwd is not None:
+ os.chdir(self.cwd)
+
+
+class TestOptions: # pylint: disable=too-few-public-methods
+ def __init__(self):
+ self.allow_stderr = False
+ self.flaky = False
+ self.rw_build_tree_needed = False
+ self.skippable = False
+
+
+class TestGroup:
+ def __init__(self):
+ self.tests = []
+ self.restrictions = frozenset()
+ self.features = frozenset()
+ self.depends = "@"
+ self.tests_directory = "debian/tests"
+ self._depends_checked = False
+ self._depends_cache = None
+
+ def __iter__(self):
+ return iter(self.tests)
+
+ def expand_depends(self, packages, build_depends):
+ if "@" not in self.depends:
+ return
+ or_clauses = []
+ parsed_depends = parse_relations(self.depends)
+ for or_clause in parsed_depends:
+ if len(or_clause) == 1 and or_clause[0]["name"] == "@builddeps@":
+ or_clauses += build_depends
+ or_clauses += parse_relations("make")
+ continue
+ stripped_or_clause = [r for r in or_clause if r["name"] != "@"]
+ if len(stripped_or_clause) < len(or_clause):
+ for package in packages:
+ or_clauses += [
+ stripped_or_clause
+ + [{"name": package, "version": None, "arch": None}]
+ ]
+ else:
+ or_clauses += [or_clause]
+ self.depends = deb822.PkgRelation.str(or_clauses)
+
+ def check_depends(self):
+ if self._depends_checked:
+ if isinstance(self._depends_cache, Exception):
+ raise self._depends_cache # fpos, pylint: disable=raising-bad-type
+ return
+ child = ipc.Popen( # pylint: disable=consider-using-with
+ ["dpkg-checkbuilddeps", "-d", self.depends], stderr=ipc.PIPE, env={}
+ )
+ error = child.stderr.read().decode("ASCII")
+ child.stderr.close()
+ if child.wait() != 0:
+ error = re.sub(
+ "^dpkg-checkbuilddeps: Unmet build dependencies",
+ "unmet dependencies",
+ error,
+ )
+ error = error.rstrip()
+ skip = Skip(error)
+ self._depends_cache = skip
+ raise skip
+ self._depends_checked = True
+
+ def check_restrictions(self, ignored_restrictions):
+ options = TestOptions()
+ restrictions = self.restrictions - frozenset(ignored_restrictions)
+
+ for restriction in restrictions:
+ if restriction == "rw-build-tree":
+ options.rw_build_tree_needed = True
+ elif restriction == "needs-root":
+ if os.getuid() != 0:
+ raise Skip("this test needs root privileges")
+ elif restriction == "breaks-testbed":
+ raise Skip("breaks-testbed restriction is not implemented; use adt-run")
+ elif restriction == "build-needed":
+ raise Skip("source tree not built")
+ elif restriction == "allow-stderr":
+ options.allow_stderr = True
+ elif restriction == "flaky":
+ options.flaky = True
+ elif restriction == "skippable":
+ options.skippable = True
+ else:
+ raise Skip(f"unknown restriction: {restriction}")
+ return options
+
+ def check(self, ignored_restrictions=()):
+ options = self.check_restrictions(ignored_restrictions)
+ self.check_depends()
+ return options
+
+ def run(
+ self,
+ test,
+ progress,
+ ignored_restrictions=(),
+ rw_build_tree=None,
+ built_source_tree=None,
+ ):
+ ignored_restrictions = set(ignored_restrictions)
+ if rw_build_tree:
+ ignored_restrictions.add("rw-build-tree")
+ if built_source_tree:
+ ignored_restrictions.add("build-needed")
+ ignored_restrictions.add("needs-recommends")
+ ignored_restrictions.add("superficial")
+ try:
+ options = self.check(ignored_restrictions)
+ except Skip as exc:
+ progress.skip(test, str(exc))
+ raise
+ test.prepare(progress, rw_build_tree)
+ try:
+ progress.start(test)
+ test.run(progress, options)
+ finally:
+ test.cleanup()
+
+ def add_tests(self, tests):
+ tests = [Test(self, t) for t in re.split(r"\s*,?\s+", tests)]
+ self.tests = frozenset(tests)
+
+ def add_test_command(self, test_command):
+ self.tests = frozenset([TestCommand(self, test_command)])
+
+ def add_restrictions(self, restrictions):
+ restrictions = re.split(r"\s*,?\s+", restrictions)
+ self.restrictions = frozenset(restrictions)
+
+ def add_features(self, features):
+ features = re.split(r"\s*,?\s+", features)
+ self.features = frozenset(features)
+
+ def add_depends(self, depends):
+ self.depends = depends
+
+ def add_tests_directory(self, path):
+ self.tests_directory = path
+
+
+def copy_build_tree():
+ rw_build_tree = tempfile.mkdtemp(prefix="sadt-rwbt.")
+ print(f"sadt: info: copying build tree to {rw_build_tree}", file=sys.stderr)
+ ipc.check_call(["cp", "-a", ".", rw_build_tree])
+ return rw_build_tree
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description=__doc__.strip())
+ parser.add_argument("-v", "--verbose", action="store_true", help="verbose output")
+ parser.add_argument(
+ "-b",
+ "--built-source-tree",
+ action="store_true",
+ help="assume built source tree",
+ )
+ parser.add_argument(
+ "--run-autodep8", action="store_true", help="Run autodep8 (default)"
+ )
+ parser.add_argument(
+ "--no-run-autodep8", action="store_false", help="Don't run autodep8"
+ )
+ parser.set_defaults(run_autodep8=True)
+ parser.add_argument(
+ "--ignore-restrictions",
+ metavar="<restr>[,<restr>...]",
+ help="ignore specified restrictions",
+ default="",
+ )
+ parser.add_argument("tests", metavar="<test-name>", nargs="*", help="tests to run")
+ options = parser.parse_args()
+ options.tests = frozenset(options.tests)
+ options.ignore_restrictions = frozenset(options.ignore_restrictions.split(","))
+ return options
+
+
+def get_test_groups(
+ binary_packages, build_depends, run_autodep8: bool
+) -> list[TestGroup]:
+ test_groups = []
+ try:
+ ipc.check_call(["which", "autodep8"], stdout=ipc.DEVNULL)
+ autodep8_available = True
+ except ipc.CalledProcessError:
+ autodep8_available = False
+ try:
+ if run_autodep8 and autodep8_available:
+ file = tempfile.TemporaryFile("w+")
+ ipc.check_call(["autodep8"], stdout=file)
+ file.seek(0)
+ else:
+ file = open("debian/tests/control", encoding="UTF-8")
+ except IOError as exc:
+ if exc.errno == errno.ENOENT:
+ print("sadt: error: cannot find debian/tests/control", file=sys.stderr)
+ sys.exit(1)
+ raise
+ with file:
+ for para in deb822.Packages.iter_paragraphs(file):
+ group = TestGroup()
+ for key, value in para.items():
+ lkey = key.lower().replace("-", "_")
+ try:
+ method = getattr(group, "add_" + lkey)
+ except AttributeError:
+ print(
+ f"sadt: warning: unknown field {key},"
+ f" skipping the whole paragraph",
+ file=sys.stderr,
+ )
+ group = None
+ break
+ method(value)
+ if group is not None:
+ group.expand_depends(binary_packages, build_depends)
+ test_groups += [group]
+ return test_groups
+
+
+def test_summary(failures, flakes, n_skip: int, n_ok: int) -> str:
+ n_fail = len(failures)
+ n_flake = len(flakes)
+ n_test = n_fail + n_flake + n_skip + n_ok
+ if failures:
+ for name, exception in failures:
+ print("=" * 70)
+ print(f"FAIL: {name} ({exception.args[1]})")
+ if flakes:
+ for name, exception in flakes:
+ print("=" * 70)
+ print(f"FLAKY: {name} ({exception.args[1]})")
+ print()
+ fmt_message = [f"tests={n_test}"]
+ if n_skip > 0:
+ fmt_message += [f"skipped={n_skip}"]
+ if n_fail > 0:
+ fmt_message += [f"failures={n_fail}"]
+ if n_flake > 0:
+ fmt_message += [f"flaky={n_flake}"]
+ if fmt_message:
+ extra_message = f" ({', '.join(fmt_message)})"
+ else:
+ extra_message = ""
+ message = ("OK" if n_fail == 0 else "FAILED") + extra_message
+ return message
+
+
+def run_tests(test_groups: list[TestGroup], options: argparse.Namespace) -> None:
+ # TODO: refactor run_tests function
+ # pylint: disable=too-many-branches,too-many-nested-blocks
+ failures = []
+ flakes = []
+ n_skip = n_ok = 0
+ progress = VerboseProgress() if options.verbose else DefaultProgress()
+ rw_build_tree = None
+ try:
+ for group in test_groups:
+ for test in group:
+ if options.tests and test.name not in options.tests:
+ continue
+ try:
+ if rw_build_tree is None:
+ try:
+ group_options = group.check()
+ except Skip:
+ pass
+ else:
+ if group_options.rw_build_tree_needed:
+ rw_build_tree = copy_build_tree()
+ assert rw_build_tree is not None
+ group.run(
+ test,
+ progress=progress,
+ ignored_restrictions=options.ignore_restrictions,
+ rw_build_tree=rw_build_tree,
+ built_source_tree=options.built_source_tree,
+ )
+ except Skip:
+ n_skip += 1
+ except Fail as exc:
+ failures += [(test, exc)]
+ except Flaky as exc:
+ flakes += [(test, exc)]
+ else:
+ n_ok += 1
+ finally:
+ progress.close()
+ n_fail = len(failures)
+ print(test_summary(failures, flakes, n_skip, n_ok))
+ if rw_build_tree is not None:
+ shutil.rmtree(rw_build_tree)
+ sys.exit(n_fail > 0)
+
+
+def main() -> None:
+ options = parse_args()
+ binary_packages = set()
+ build_depends = []
+ try:
+ file = open("debian/control", encoding="UTF-8")
+ except IOError as exc:
+ if exc.errno == errno.ENOENT:
+ print("sadt: error: cannot find debian/control", file=sys.stderr)
+ sys.exit(1)
+ raise
+ with file:
+ for i, para in enumerate(deb822.Packages.iter_paragraphs(file)):
+ if i == 0:
+ # FIXME statement with no effect
+ # para['Source']
+ for field in (
+ "Build-Depends",
+ "Build-Depends-Indep",
+ "Build-Depends-Arch",
+ ):
+ try:
+ build_depends += parse_relations(para[field])
+ except KeyError:
+ continue
+ else:
+ if para.get("Package-Type") == "udeb":
+ # udebs can't be tested
+ continue
+ binary_packages.add(para["Package"])
+
+ test_groups = get_test_groups(binary_packages, build_depends, options.run_autodep8)
+ run_tests(test_groups, options)
+
+
+if __name__ == "__main__":
+ main()
+
+# vim:ts=4 sw=4 et