#!/usr/bin/python3 # encoding=UTF-8 # Copyright © 2012, 2013, 2014 Jakub Wilk # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM # LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR # OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # PERFORMANCE OF THIS SOFTWARE. """ simple DEP-8 test runner """ import argparse import errno import os import queue as queuemod import re import shutil import stat import subprocess as ipc import sys import tempfile import threading import warnings from debian import deb822 def parse_relations(field): """ wrap debian.deb822.PkgRelation.parse_relations() to suppress the UserWarning about the inability to parse something. See https://bugs.debian.org/712513 """ warnings.simplefilter("ignore") parsed = deb822.PkgRelation.parse_relations(field) warnings.resetwarnings() return parsed def chmod_x(path): """ chmod a+X """ old_mode = stat.S_IMODE(os.stat(path).st_mode) new_mode = old_mode | ((old_mode & 0o444) >> 2) if old_mode != new_mode: os.chmod(path, new_mode) return old_mode def annotate_output(child): queue = queuemod.Queue() def reader(fd, tag): buf = b"" while True: assert b"\n" not in buf chunk = os.read(fd, 1024) if chunk == b"": break lines = (buf + chunk).split(b"\n") buf = lines.pop() for line in lines: queue.put((tag, line + b"\n")) if buf != b"": queue.put((tag, buf)) queue.put(None) queue = queuemod.Queue() threads = [] for pipe, tag in [(child.stdout, "O"), (child.stderr, "E")]: thread = threading.Thread(target=reader, args=(pipe.fileno(), tag)) thread.start() threads += [thread] nthreads = len(threads) while nthreads > 0: item = queue.get() if item is None: nthreads -= 1 continue yield item for thread in threads: thread.join() class Skip(Exception): pass class Flaky(Exception): pass class Fail(Exception): pass class Progress: @staticmethod def _write(text): sys.stdout.write(text) sys.stdout.flush() def start(self, name): pass def output(self, line): pass def skip(self, name, reason): raise NotImplementedError def fail(self, name, reason): raise NotImplementedError def ok(self, name): raise NotImplementedError def close(self): pass class DefaultProgress(Progress): _hourglass = r"/-\|" def __init__(self): self._counter = 0 self._output = False if sys.stdout.isatty(): self._back = "\b" else: self._back = "" def _reset(self): if self._output: self._write(self._back) def start(self, name): self._output = False def output(self, line): if not self._back: return hourglass = self._hourglass counter = self._counter + 1 self._reset() self._write(hourglass[counter % len(hourglass)]) self._counter = counter self._output = True def skip(self, name, reason): self._write("S") def fail(self, name, reason): self._reset() self._write("F") def ok(self, name): self._reset() self._write(".") def close(self): self._write("\n") class VerboseProgress(Progress): def _separator(self): self._write("-" * 70 + "\n") def start(self, name): self._separator() self._write(f"{name}\n") self._separator() def output(self, line): self._write(line) def skip(self, name, reason): self._write(f"{name}: SKIP ({reason})\n") def fail(self, name, reason): self._separator() self._write(f"{name}: FAIL ({reason})\n") self._write("\n") def ok(self, name): self._separator() self._write(f"{name}: PASS\n") self._write("\n") class TestCommand: def __init__(self, group, command): self.group = group self.command = command def __str__(self): return self.command @property def name(self): return str(self) def get_command(self): return ["sh", "-c", self.command] def prepare(self, progress, rw_build_tree): pass def cleanup(self): pass def run(self, progress, options): # pylint: disable=too-many-locals command = self.get_command() tmpdir1 = tempfile.mkdtemp(prefix="sadt.") tmpdir2 = tempfile.mkdtemp(prefix="sadt.") environ = dict(os.environ) environ["AUTOPKGTEST_TMP"] = tmpdir1 # only for compatibility with old DEP-8 spec. environ["ADTTMP"] = tmpdir2 child = ipc.Popen( # pylint: disable=consider-using-with command, stdout=ipc.PIPE, stderr=ipc.PIPE, env=environ ) output = [] stderr = False for tag, line in annotate_output(child): if tag == "E": stderr = True this_line = f"{tag}: {line.decode(sys.stdout.encoding, 'replace')}" progress.output(this_line) output.append(this_line) for fp in child.stdout, child.stderr: fp.close() returncode = child.wait() shutil.rmtree(tmpdir1) shutil.rmtree(tmpdir2) if returncode == 77 and options.skippable: reason = "exit status 77 and marked as skippable" progress.skip(self, reason) raise Skip(self, reason, "".join(output)) fail_reason = None if returncode == 0: if stderr and not options.allow_stderr: returncode = -1 fail_reason = "stderr non-empty" else: fail_reason = f"exit code: {returncode}" if returncode != 0: if options.flaky: progress.skip(self, fail_reason) raise Flaky(self, fail_reason, "".join(output)) progress.fail(self, fail_reason) raise Fail(self, fail_reason, "".join(output)) progress.ok(self) class Test(TestCommand): def __init__(self, group, testname): self.testname = testname self.original_mode = None self.cwd = None super().__init__(group, testname) @property def path(self): return os.path.join(self.group.tests_directory, self.testname) def __str__(self): return self.testname def get_command(self): return [self.path] def prepare(self, progress, rw_build_tree): if rw_build_tree: self.cwd = os.getcwd() os.chdir(rw_build_tree) chmod_x(self.path) else: if not os.access(self.path, os.X_OK): try: self.original_mode = chmod_x(self.path) except OSError as exc: progress.skip( self.testname, f"{self.path} could not be made executable: {exc}", ) raise Skip from exc def cleanup(self): if self.original_mode is not None: os.chmod(self.path, self.original_mode) if self.cwd is not None: os.chdir(self.cwd) class TestOptions: # pylint: disable=too-few-public-methods def __init__(self): self.allow_stderr = False self.flaky = False self.rw_build_tree_needed = False self.skippable = False class TestGroup: def __init__(self): self.tests = [] self.restrictions = frozenset() self.features = frozenset() self.depends = "@" self.tests_directory = "debian/tests" self._depends_checked = False self._depends_cache = None def __iter__(self): return iter(self.tests) def expand_depends(self, packages, build_depends): if "@" not in self.depends: return or_clauses = [] parsed_depends = parse_relations(self.depends) for or_clause in parsed_depends: if len(or_clause) == 1 and or_clause[0]["name"] == "@builddeps@": or_clauses += build_depends or_clauses += parse_relations("make") continue stripped_or_clause = [r for r in or_clause if r["name"] != "@"] if len(stripped_or_clause) < len(or_clause): for package in packages: or_clauses += [ stripped_or_clause + [{"name": package, "version": None, "arch": None}] ] else: or_clauses += [or_clause] self.depends = deb822.PkgRelation.str(or_clauses) def check_depends(self): if self._depends_checked: if isinstance(self._depends_cache, Exception): raise self._depends_cache # fpos, pylint: disable=raising-bad-type return child = ipc.Popen( # pylint: disable=consider-using-with ["dpkg-checkbuilddeps", "-d", self.depends], stderr=ipc.PIPE, env={} ) error = child.stderr.read().decode("ASCII") child.stderr.close() if child.wait() != 0: error = re.sub( "^dpkg-checkbuilddeps: Unmet build dependencies", "unmet dependencies", error, ) error = error.rstrip() skip = Skip(error) self._depends_cache = skip raise skip self._depends_checked = True def check_restrictions(self, ignored_restrictions): options = TestOptions() restrictions = self.restrictions - frozenset(ignored_restrictions) for restriction in restrictions: if restriction == "rw-build-tree": options.rw_build_tree_needed = True elif restriction == "needs-root": if os.getuid() != 0: raise Skip("this test needs root privileges") elif restriction == "breaks-testbed": raise Skip("breaks-testbed restriction is not implemented; use adt-run") elif restriction == "build-needed": raise Skip("source tree not built") elif restriction == "allow-stderr": options.allow_stderr = True elif restriction == "flaky": options.flaky = True elif restriction == "skippable": options.skippable = True else: raise Skip(f"unknown restriction: {restriction}") return options def check(self, ignored_restrictions=()): options = self.check_restrictions(ignored_restrictions) self.check_depends() return options def run( self, test, progress, ignored_restrictions=(), rw_build_tree=None, built_source_tree=None, ): ignored_restrictions = set(ignored_restrictions) if rw_build_tree: ignored_restrictions.add("rw-build-tree") if built_source_tree: ignored_restrictions.add("build-needed") ignored_restrictions.add("needs-recommends") ignored_restrictions.add("superficial") try: options = self.check(ignored_restrictions) except Skip as exc: progress.skip(test, str(exc)) raise test.prepare(progress, rw_build_tree) try: progress.start(test) test.run(progress, options) finally: test.cleanup() def add_tests(self, tests): tests = [Test(self, t) for t in re.split(r"\s*,?\s+", tests)] self.tests = frozenset(tests) def add_test_command(self, test_command): self.tests = frozenset([TestCommand(self, test_command)]) def add_restrictions(self, restrictions): restrictions = re.split(r"\s*,?\s+", restrictions) self.restrictions = frozenset(restrictions) def add_features(self, features): features = re.split(r"\s*,?\s+", features) self.features = frozenset(features) def add_depends(self, depends): self.depends = depends def add_tests_directory(self, path): self.tests_directory = path def copy_build_tree(): rw_build_tree = tempfile.mkdtemp(prefix="sadt-rwbt.") print(f"sadt: info: copying build tree to {rw_build_tree}", file=sys.stderr) ipc.check_call(["cp", "-a", ".", rw_build_tree]) return rw_build_tree def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__.strip()) parser.add_argument("-v", "--verbose", action="store_true", help="verbose output") parser.add_argument( "-b", "--built-source-tree", action="store_true", help="assume built source tree", ) parser.add_argument( "--run-autodep8", action="store_true", help="Run autodep8 (default)" ) parser.add_argument( "--no-run-autodep8", action="store_false", help="Don't run autodep8" ) parser.set_defaults(run_autodep8=True) parser.add_argument( "--ignore-restrictions", metavar="[,...]", help="ignore specified restrictions", default="", ) parser.add_argument("tests", metavar="", nargs="*", help="tests to run") options = parser.parse_args() options.tests = frozenset(options.tests) options.ignore_restrictions = frozenset(options.ignore_restrictions.split(",")) return options def get_test_groups( binary_packages, build_depends, run_autodep8: bool ) -> list[TestGroup]: test_groups = [] try: ipc.check_call(["which", "autodep8"], stdout=ipc.DEVNULL) autodep8_available = True except ipc.CalledProcessError: autodep8_available = False try: if run_autodep8 and autodep8_available: file = tempfile.TemporaryFile("w+") ipc.check_call(["autodep8"], stdout=file) file.seek(0) else: file = open("debian/tests/control", encoding="UTF-8") except IOError as exc: if exc.errno == errno.ENOENT: print("sadt: error: cannot find debian/tests/control", file=sys.stderr) sys.exit(1) raise with file: for para in deb822.Packages.iter_paragraphs(file): group = TestGroup() for key, value in para.items(): lkey = key.lower().replace("-", "_") try: method = getattr(group, "add_" + lkey) except AttributeError: print( f"sadt: warning: unknown field {key}," f" skipping the whole paragraph", file=sys.stderr, ) group = None break method(value) if group is not None: group.expand_depends(binary_packages, build_depends) test_groups += [group] return test_groups def test_summary(failures, flakes, n_skip: int, n_ok: int) -> str: n_fail = len(failures) n_flake = len(flakes) n_test = n_fail + n_flake + n_skip + n_ok if failures: for name, exception in failures: print("=" * 70) print(f"FAIL: {name} ({exception.args[1]})") if flakes: for name, exception in flakes: print("=" * 70) print(f"FLAKY: {name} ({exception.args[1]})") print() fmt_message = [f"tests={n_test}"] if n_skip > 0: fmt_message += [f"skipped={n_skip}"] if n_fail > 0: fmt_message += [f"failures={n_fail}"] if n_flake > 0: fmt_message += [f"flaky={n_flake}"] if fmt_message: extra_message = f" ({', '.join(fmt_message)})" else: extra_message = "" message = ("OK" if n_fail == 0 else "FAILED") + extra_message return message def run_tests(test_groups: list[TestGroup], options: argparse.Namespace) -> None: # TODO: refactor run_tests function # pylint: disable=too-many-branches,too-many-nested-blocks failures = [] flakes = [] n_skip = n_ok = 0 progress = VerboseProgress() if options.verbose else DefaultProgress() rw_build_tree = None try: for group in test_groups: for test in group: if options.tests and test.name not in options.tests: continue try: if rw_build_tree is None: try: group_options = group.check() except Skip: pass else: if group_options.rw_build_tree_needed: rw_build_tree = copy_build_tree() assert rw_build_tree is not None group.run( test, progress=progress, ignored_restrictions=options.ignore_restrictions, rw_build_tree=rw_build_tree, built_source_tree=options.built_source_tree, ) except Skip: n_skip += 1 except Fail as exc: failures += [(test, exc)] except Flaky as exc: flakes += [(test, exc)] else: n_ok += 1 finally: progress.close() n_fail = len(failures) print(test_summary(failures, flakes, n_skip, n_ok)) if rw_build_tree is not None: shutil.rmtree(rw_build_tree) sys.exit(n_fail > 0) def main() -> None: options = parse_args() binary_packages = set() build_depends = [] try: file = open("debian/control", encoding="UTF-8") except IOError as exc: if exc.errno == errno.ENOENT: print("sadt: error: cannot find debian/control", file=sys.stderr) sys.exit(1) raise with file: for i, para in enumerate(deb822.Packages.iter_paragraphs(file)): if i == 0: # FIXME statement with no effect # para['Source'] for field in ( "Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch", ): try: build_depends += parse_relations(para[field]) except KeyError: continue else: if para.get("Package-Type") == "udeb": # udebs can't be tested continue binary_packages.add(para["Package"]) test_groups = get_test_groups(binary_packages, build_depends, options.run_autodep8) run_tests(test_groups, options) if __name__ == "__main__": main() # vim:ts=4 sw=4 et