#!/usr/bin/python3 # encoding=UTF-8 # Copyright © 2012, 2013, 2014 Jakub Wilk # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM # LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR # OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # PERFORMANCE OF THIS SOFTWARE. ''' simple DEP-8 test runner ''' import argparse import errno import os import queue as queuemod import re import shutil import stat import subprocess as ipc import sys import tempfile import threading import warnings from debian import deb822 def parse_relations(field): ''' wrap debian.deb822.PkgRelation.parse_relations() to suppress the UserWarning about the inability to parse something. See https://bugs.debian.org/712513 ''' warnings.simplefilter('ignore') parsed = deb822.PkgRelation.parse_relations(field) warnings.resetwarnings() return parsed def chmod_x(path): ''' chmod a+X ''' old_mode = stat.S_IMODE(os.stat(path).st_mode) new_mode = old_mode | ((old_mode & 0o444) >> 2) if old_mode != new_mode: os.chmod(path, new_mode) return old_mode def annotate_output(child): queue = queuemod.Queue() def reader(fd, tag): buf = b'' while True: assert b'\n' not in buf chunk = os.read(fd, 1024) if chunk == b'': break lines = (buf + chunk).split(b'\n') buf = lines.pop() for line in lines: queue.put((tag, line + b'\n')) if buf != b'': queue.put((tag, buf)) queue.put(None) queue = queuemod.Queue() threads = [] for pipe, tag in [(child.stdout, 'O'), (child.stderr, 'E')]: thread = threading.Thread(target=reader, args=(pipe.fileno(), tag)) thread.start() threads += [thread] nthreads = len(threads) while nthreads > 0: item = queue.get() if item is None: nthreads -= 1 continue yield item for thread in threads: thread.join() class Skip(Exception): pass class Flaky(Exception): pass class Fail(Exception): pass class Progress: @staticmethod def _write(s): sys.stdout.write(s) sys.stdout.flush() def start(self, name): pass def output(self, line): pass def skip(self, name, reason): raise NotImplementedError def fail(self, name, reason): raise NotImplementedError def ok(self, name): raise NotImplementedError def close(self): pass class DefaultProgress(Progress): _hourglass = r'/-\|' def __init__(self): self._counter = 0 self._output = False if sys.stdout.isatty(): self._back = '\b' else: self._back = '' def _reset(self): if self._output: self._write(self._back) def start(self, name): self._output = False def output(self, line): if not self._back: return hourglass = self._hourglass c = self._counter + 1 self._reset() self._write(hourglass[c % len(hourglass)]) self._counter = c self._output = True def skip(self, name, reason): self._write('S') def fail(self, name, reason): self._reset() self._write('F') def ok(self, name): self._reset() self._write('.') def close(self): self._write('\n') class VerboseProgress(Progress): def _separator(self): self._write('-' * 70 + '\n') def start(self, name): self._separator() self._write('{test}\n'.format(test=name)) self._separator() def output(self, line): self._write(line) def skip(self, name, reason): self._write('{test}: SKIP ({reason})\n'.format( test=name, reason=reason)) def fail(self, name, reason): self._separator() self._write('{test}: FAIL ({reason})\n'.format( test=name, reason=reason)) self._write('\n') def ok(self, name): self._separator() self._write('{test}: PASS\n'.format(test=name)) self._write('\n') class TestCommand: def __init__(self, group, command): self.group = group self.command = command def __str__(self): return self.command @property def name(self): return str(self) def get_command(self): return ['sh', '-c', self.command] def prepare(self, progress, rw_build_tree): pass def cleanup(self): pass def run(self, progress, options): # pylint: disable=too-many-locals command = self.get_command() tmpdir1 = tempfile.mkdtemp(prefix='sadt.') tmpdir2 = tempfile.mkdtemp(prefix='sadt.') environ = dict(os.environ) environ['AUTOPKGTEST_TMP'] = tmpdir1 # only for compatibility with old DEP-8 spec. environ['ADTTMP'] = tmpdir2 child = ipc.Popen(command, stdout=ipc.PIPE, stderr=ipc.PIPE, env=environ) output = [] stderr = False for tag, line in annotate_output(child): if tag == 'E': stderr = True this_line = '{tag}: {line}'.format( tag=tag, line=line.decode(sys.stdout.encoding, 'replace'), ) progress.output(this_line) output.append(this_line) for fp in child.stdout, child.stderr: fp.close() rc = child.wait() shutil.rmtree(tmpdir1) shutil.rmtree(tmpdir2) if rc == 77 and options.skippable: reason = 'exit status 77 and marked as skippable' progress.skip(self, reason) raise Skip(self, reason, ''.join(output)) fail_reason = None if rc == 0: if stderr and not options.allow_stderr: rc = -1 fail_reason = 'stderr non-empty' else: fail_reason = 'exit code: {rc}'.format(rc=rc) if rc != 0: if options.flaky: progress.skip(self, fail_reason) raise Flaky(self, fail_reason, ''.join(output)) progress.fail(self, fail_reason) raise Fail(self, fail_reason, ''.join(output)) progress.ok(self) class Test(TestCommand): def __init__(self, group, testname): self.testname = testname self.original_mode = None self.cwd = None super(Test, self).__init__(group, testname) @property def path(self): return os.path.join(self.group.tests_directory, self.testname) def __str__(self): return self.testname def get_command(self): return [self.path] def prepare(self, progress, rw_build_tree): if rw_build_tree: self.cwd = os.getcwd() os.chdir(rw_build_tree) chmod_x(self.path) else: if not os.access(self.path, os.X_OK): try: self.original_mode = chmod_x(self.path) except OSError as exc: progress.skip(self.testname, '{path} could not be made executable: {exc}' .format(path=self.path, exc=exc)) raise Skip def cleanup(self): if self.original_mode is not None: os.chmod(self.path, self.original_mode) if self.cwd is not None: os.chdir(self.cwd) class TestOptions: # pylint: disable=too-few-public-methods def __init__(self): self.allow_stderr = False self.flaky = False self.rw_build_tree_needed = False self.skippable = False class TestGroup: def __init__(self): self.tests = [] self.restrictions = frozenset() self.features = frozenset() self.depends = '@' self.tests_directory = 'debian/tests' self._depends_checked = False self._depends_cache = None def __iter__(self): return iter(self.tests) def expand_depends(self, packages, build_depends): if '@' not in self.depends: return or_clauses = [] parsed_depends = parse_relations(self.depends) for or_clause in parsed_depends: if len(or_clause) == 1 and or_clause[0]['name'] == '@builddeps@': or_clauses += build_depends or_clauses += parse_relations('make') continue stripped_or_clause = [r for r in or_clause if r['name'] != '@'] if len(stripped_or_clause) < len(or_clause): for package in packages: or_clauses += [ stripped_or_clause + [dict(name=package, version=None, arch=None)] ] else: or_clauses += [or_clause] self.depends = deb822.PkgRelation.str(or_clauses) def check_depends(self): if self._depends_checked: if isinstance(self._depends_cache, Exception): raise self._depends_cache # fpos, pylint: disable=raising-bad-type return child = ipc.Popen(['dpkg-checkbuilddeps', '-d', self.depends], stderr=ipc.PIPE, env={}) error = child.stderr.read().decode('ASCII') child.stderr.close() if child.wait() != 0: error = re.sub('^dpkg-checkbuilddeps: Unmet build dependencies', 'unmet dependencies', error) error = error.rstrip() skip = Skip(error) self._depends_cache = skip raise skip else: self._depends_checked = True def check_restrictions(self, ignored_restrictions): options = TestOptions() restrictions = self.restrictions - frozenset(ignored_restrictions) for r in restrictions: if r == 'rw-build-tree': options.rw_build_tree_needed = True elif r == 'needs-root': if os.getuid() != 0: raise Skip('this test needs root privileges') elif r == 'breaks-testbed': raise Skip( 'breaks-testbed restriction is not implemented; use adt-run') elif r == 'build-needed': raise Skip('source tree not built') elif r == 'allow-stderr': options.allow_stderr = True elif r == 'flaky': options.flaky = True elif r == 'skippable': options.skippable = True else: raise Skip('unknown restriction: {restr}'.format(restr=r)) return options def check(self, ignored_restrictions=()): options = self.check_restrictions(ignored_restrictions) self.check_depends() return options def run(self, test, progress, ignored_restrictions=(), rw_build_tree=None, built_source_tree=None): ignored_restrictions = set(ignored_restrictions) if rw_build_tree: ignored_restrictions.add('rw-build-tree') if built_source_tree: ignored_restrictions.add('build-needed') ignored_restrictions.add('needs-recommends') ignored_restrictions.add('superficial') try: options = self.check(ignored_restrictions) except Skip as exc: progress.skip(test, str(exc)) raise test.prepare(progress, rw_build_tree) try: progress.start(test) test.run(progress, options) finally: test.cleanup() def add_tests(self, tests): tests = [Test(self, t) for t in re.split(r'\s*,?\s+', tests)] self.tests = frozenset(tests) def add_test_command(self, test_command): self.tests = frozenset([TestCommand(self, test_command)]) def add_restrictions(self, restrictions): restrictions = re.split(r'\s*,?\s+', restrictions) self.restrictions = frozenset(restrictions) def add_features(self, features): features = re.split(r'\s*,?\s+', features) self.features = frozenset(features) def add_depends(self, depends): self.depends = depends def add_tests_directory(self, path): self.tests_directory = path def copy_build_tree(): rw_build_tree = tempfile.mkdtemp(prefix='sadt-rwbt.') print('sadt: info: copying build tree to {tree}'.format(tree=rw_build_tree), file=sys.stderr) ipc.check_call(['cp', '-a', '.', rw_build_tree]) return rw_build_tree def main(): # TODO: refactor main function # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-nested-blocks parser = argparse.ArgumentParser(description=__doc__.strip()) parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') parser.add_argument('-b', '--built-source-tree', action='store_true', help='assume built source tree') parser.add_argument('--ignore-restrictions', metavar='[,...]', help='ignore specified restrictions', default='') parser.add_argument('tests', metavar='', nargs='*', help='tests to run') options = parser.parse_args() options.tests = frozenset(options.tests) options.ignore_restrictions = frozenset( options.ignore_restrictions.split(',')) binary_packages = set() build_depends = [] try: file = open('debian/control', encoding='UTF-8') except IOError as exc: if exc.errno == errno.ENOENT: print('sadt: error: cannot find debian/control', file=sys.stderr) sys.exit(1) raise with file: for n, para in enumerate(deb822.Packages.iter_paragraphs(file)): if n == 0: # FIXME statement with no effect # para['Source'] for field in 'Build-Depends', 'Build-Depends-Indep': try: build_depends += parse_relations(para[field]) except KeyError: continue else: binary_packages.add(para['Package']) test_groups = [] try: file = open('debian/tests/control', encoding='UTF-8') except IOError as exc: if exc.errno == errno.ENOENT: print('sadt: error: cannot find debian/tests/control', file=sys.stderr) sys.exit(1) raise with file: for para in deb822.Packages.iter_paragraphs(file): group = TestGroup() for key, value in para.items(): lkey = key.lower().replace('-', '_') try: method = getattr(group, 'add_' + lkey) except AttributeError: print('sadt: warning: unknown field {field}, skipping the whole paragraph' .format(field=key), file=sys.stderr) group = None break method(value) if group is not None: group.expand_depends(binary_packages, build_depends) test_groups += [group] failures = [] flakes = [] n_skip = n_ok = 0 progress = VerboseProgress() if options.verbose else DefaultProgress() rw_build_tree = None try: for group in test_groups: for test in group: if options.tests and test.name not in options.tests: continue try: if rw_build_tree is None: try: group_options = group.check() except Skip: pass else: if group_options.rw_build_tree_needed: rw_build_tree = copy_build_tree() assert rw_build_tree is not None group.run( test, progress=progress, ignored_restrictions=options.ignore_restrictions, rw_build_tree=rw_build_tree, built_source_tree=options.built_source_tree ) except Skip: n_skip += 1 except Fail as exc: failures += [(test, exc)] except Flaky as exc: flakes += [(test, exc)] else: n_ok += 1 finally: progress.close() n_fail = len(failures) n_flake = len(flakes) n_test = n_fail + n_flake + n_skip + n_ok if failures: for name, exception in failures: print('=' * 70) print('FAIL: {test} ({reason})'.format( test=name, reason=exception.args[1])) if flakes: for name, exception in flakes: print('=' * 70) print('FLAKY: {test} ({reason})'.format( test=name, reason=exception.args[1])) print() fmt_message = ['tests={n}'.format(n=n_test)] if n_skip > 0: fmt_message += ['skipped={n}'.format(n=n_skip)] if n_fail > 0: fmt_message += ['failures={n}'.format(n=n_fail)] if n_flake > 0: fmt_message += ['flaky={n}'.format(n=n_flake)] if fmt_message: extra_message = ' ({msg})'.format(msg=', '.join(fmt_message)) else: extra_message = '' message = ('OK' if n_fail == 0 else 'FAILED') + extra_message print(message) if rw_build_tree is not None: shutil.rmtree(rw_build_tree) sys.exit(n_fail > 0) if __name__ == '__main__': main() # vim:ts=4 sw=4 et