summaryrefslogtreecommitdiffstats
path: root/mesonbuild/mtest.py
diff options
context:
space:
mode:
Diffstat (limited to 'mesonbuild/mtest.py')
-rw-r--r--mesonbuild/mtest.py2120
1 files changed, 2120 insertions, 0 deletions
diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py
new file mode 100644
index 0000000..50a5f97
--- /dev/null
+++ b/mesonbuild/mtest.py
@@ -0,0 +1,2120 @@
+# Copyright 2016-2017 The Meson development team
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# A tool to run tests in many different ways.
+from __future__ import annotations
+
+from pathlib import Path
+from collections import deque
+from contextlib import suppress
+from copy import deepcopy
+import argparse
+import asyncio
+import datetime
+import enum
+import json
+import multiprocessing
+import os
+import pickle
+import platform
+import random
+import re
+import signal
+import subprocess
+import shlex
+import sys
+import textwrap
+import time
+import typing as T
+import unicodedata
+import xml.etree.ElementTree as et
+
+from . import build
+from . import environment
+from . import mlog
+from .coredata import major_versions_differ, MesonVersionMismatchException
+from .coredata import version as coredata_version
+from .mesonlib import (MesonException, OrderedSet, RealPathAction,
+ get_wine_shortpath, join_args, split_args, setup_vsenv)
+from .mintro import get_infodir, load_info_file
+from .programs import ExternalProgram
+from .backend.backends import TestProtocol, TestSerialisation
+
+if T.TYPE_CHECKING:
+ TYPE_TAPResult = T.Union['TAPParser.Test',
+ 'TAPParser.Error',
+ 'TAPParser.Version',
+ 'TAPParser.Plan',
+ 'TAPParser.UnknownLine',
+ 'TAPParser.Bailout']
+
+
+# GNU autotools interprets a return code of 77 from tests it executes to
+# mean that the test should be skipped.
+GNU_SKIP_RETURNCODE = 77
+
+# GNU autotools interprets a return code of 99 from tests it executes to
+# mean that the test failed even before testing what it is supposed to test.
+GNU_ERROR_RETURNCODE = 99
+
+# Exit if 3 Ctrl-C's are received within one second
+MAX_CTRLC = 3
+
+def is_windows() -> bool:
+ platname = platform.system().lower()
+ return platname == 'windows'
+
+def is_cygwin() -> bool:
+ return sys.platform == 'cygwin'
+
+UNIWIDTH_MAPPING = {'F': 2, 'H': 1, 'W': 2, 'Na': 1, 'N': 1, 'A': 1}
+def uniwidth(s: str) -> int:
+ result = 0
+ for c in s:
+ w = unicodedata.east_asian_width(c)
+ result += UNIWIDTH_MAPPING[w]
+ return result
+
+def determine_worker_count() -> int:
+ varname = 'MESON_TESTTHREADS'
+ if varname in os.environ:
+ try:
+ num_workers = int(os.environ[varname])
+ except ValueError:
+ print(f'Invalid value in {varname}, using 1 thread.')
+ num_workers = 1
+ else:
+ try:
+ # Fails in some weird environments such as Debian
+ # reproducible build.
+ num_workers = multiprocessing.cpu_count()
+ except Exception:
+ num_workers = 1
+ return num_workers
+
+def add_arguments(parser: argparse.ArgumentParser) -> None:
+ parser.add_argument('--maxfail', default=0, type=int,
+ help='Number of failing tests before aborting the '
+ 'test run. (default: 0, to disable aborting on failure)')
+ parser.add_argument('--repeat', default=1, dest='repeat', type=int,
+ help='Number of times to run the tests.')
+ parser.add_argument('--no-rebuild', default=False, action='store_true',
+ help='Do not rebuild before running tests.')
+ parser.add_argument('--gdb', default=False, dest='gdb', action='store_true',
+ help='Run test under gdb.')
+ parser.add_argument('--gdb-path', default='gdb', dest='gdb_path',
+ help='Path to the gdb binary (default: gdb).')
+ parser.add_argument('--list', default=False, dest='list', action='store_true',
+ help='List available tests.')
+ parser.add_argument('--wrapper', default=None, dest='wrapper', type=split_args,
+ help='wrapper to run tests with (e.g. Valgrind)')
+ parser.add_argument('-C', dest='wd', action=RealPathAction,
+ # https://github.com/python/typeshed/issues/3107
+ # https://github.com/python/mypy/issues/7177
+ type=os.path.abspath, # type: ignore
+ help='directory to cd into before running')
+ parser.add_argument('--suite', default=[], dest='include_suites', action='append', metavar='SUITE',
+ help='Only run tests belonging to the given suite.')
+ parser.add_argument('--no-suite', default=[], dest='exclude_suites', action='append', metavar='SUITE',
+ help='Do not run tests belonging to the given suite.')
+ parser.add_argument('--no-stdsplit', default=True, dest='split', action='store_false',
+ help='Do not split stderr and stdout in test logs.')
+ parser.add_argument('--print-errorlogs', default=False, action='store_true',
+ help="Whether to print failing tests' logs.")
+ parser.add_argument('--benchmark', default=False, action='store_true',
+ help="Run benchmarks instead of tests.")
+ parser.add_argument('--logbase', default='testlog',
+ help="Base name for log file.")
+ parser.add_argument('--num-processes', default=determine_worker_count(), type=int,
+ help='How many parallel processes to use.')
+ parser.add_argument('-v', '--verbose', default=False, action='store_true',
+ help='Do not redirect stdout and stderr')
+ parser.add_argument('-q', '--quiet', default=False, action='store_true',
+ help='Produce less output to the terminal.')
+ parser.add_argument('-t', '--timeout-multiplier', type=float, default=None,
+ help='Define a multiplier for test timeout, for example '
+ ' when running tests in particular conditions they might take'
+ ' more time to execute. (<= 0 to disable timeout)')
+ parser.add_argument('--setup', default=None, dest='setup',
+ help='Which test setup to use.')
+ parser.add_argument('--test-args', default=[], type=split_args,
+ help='Arguments to pass to the specified test(s) or all tests')
+ parser.add_argument('args', nargs='*',
+ help='Optional list of test names to run. "testname" to run all tests with that name, '
+ '"subprojname:testname" to specifically run "testname" from "subprojname", '
+ '"subprojname:" to run all tests defined by "subprojname".')
+
+
+def print_safe(s: str) -> None:
+ end = '' if s[-1] == '\n' else '\n'
+ try:
+ print(s, end=end)
+ except UnicodeEncodeError:
+ s = s.encode('ascii', errors='backslashreplace').decode('ascii')
+ print(s, end=end)
+
+def join_lines(a: str, b: str) -> str:
+ if not a:
+ return b
+ if not b:
+ return a
+ return a + '\n' + b
+
+def dashes(s: str, dash: str, cols: int) -> str:
+ if not s:
+ return dash * cols
+ s = ' ' + s + ' '
+ width = uniwidth(s)
+ first = (cols - width) // 2
+ s = dash * first + s
+ return s + dash * (cols - first - width)
+
+def returncode_to_status(retcode: int) -> str:
+ # Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related
+ # functions here because the status returned by subprocess is munged. It
+ # returns a negative value if the process was killed by a signal rather than
+ # the raw status returned by `wait()`. Also, If a shell sits between Meson
+ # the the actual unit test that shell is likely to convert a termination due
+ # to a signal into an exit status of 128 plus the signal number.
+ if retcode < 0:
+ signum = -retcode
+ try:
+ signame = signal.Signals(signum).name
+ except ValueError:
+ signame = 'SIGinvalid'
+ return f'killed by signal {signum} {signame}'
+
+ if retcode <= 128:
+ return f'exit status {retcode}'
+
+ signum = retcode - 128
+ try:
+ signame = signal.Signals(signum).name
+ except ValueError:
+ signame = 'SIGinvalid'
+ return f'(exit status {retcode} or signal {signum} {signame})'
+
+# TODO for Windows
+sh_quote: T.Callable[[str], str] = lambda x: x
+if not is_windows():
+ sh_quote = shlex.quote
+
+def env_tuple_to_str(env: T.Iterable[T.Tuple[str, str]]) -> str:
+ return ''.join(["{}={} ".format(k, sh_quote(v)) for k, v in env])
+
+
+class TestException(MesonException):
+ pass
+
+
+@enum.unique
+class ConsoleUser(enum.Enum):
+
+ # the logger can use the console
+ LOGGER = 0
+
+ # the console is used by gdb
+ GDB = 1
+
+ # the console is used to write stdout/stderr
+ STDOUT = 2
+
+
+@enum.unique
+class TestResult(enum.Enum):
+
+ PENDING = 'PENDING'
+ RUNNING = 'RUNNING'
+ OK = 'OK'
+ TIMEOUT = 'TIMEOUT'
+ INTERRUPT = 'INTERRUPT'
+ SKIP = 'SKIP'
+ FAIL = 'FAIL'
+ EXPECTEDFAIL = 'EXPECTEDFAIL'
+ UNEXPECTEDPASS = 'UNEXPECTEDPASS'
+ ERROR = 'ERROR'
+
+ @staticmethod
+ def maxlen() -> int:
+ return 14 # len(UNEXPECTEDPASS)
+
+ def is_ok(self) -> bool:
+ return self in {TestResult.OK, TestResult.EXPECTEDFAIL}
+
+ def is_bad(self) -> bool:
+ return self in {TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT,
+ TestResult.UNEXPECTEDPASS, TestResult.ERROR}
+
+ def is_finished(self) -> bool:
+ return self not in {TestResult.PENDING, TestResult.RUNNING}
+
+ def was_killed(self) -> bool:
+ return self in (TestResult.TIMEOUT, TestResult.INTERRUPT)
+
+ def colorize(self, s: str) -> mlog.AnsiDecorator:
+ if self.is_bad():
+ decorator = mlog.red
+ elif self in (TestResult.SKIP, TestResult.EXPECTEDFAIL):
+ decorator = mlog.yellow
+ elif self.is_finished():
+ decorator = mlog.green
+ else:
+ decorator = mlog.blue
+ return decorator(s)
+
+ def get_text(self, colorize: bool) -> str:
+ result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen())
+ return self.colorize(result_str).get_text(colorize)
+
+ def get_command_marker(self) -> str:
+ return str(self.colorize('>>> '))
+
+
+class TAPParser:
+ class Plan(T.NamedTuple):
+ num_tests: int
+ late: bool
+ skipped: bool
+ explanation: T.Optional[str]
+
+ class Bailout(T.NamedTuple):
+ message: str
+
+ class Test(T.NamedTuple):
+ number: int
+ name: str
+ result: TestResult
+ explanation: T.Optional[str]
+
+ def __str__(self) -> str:
+ return f'{self.number} {self.name}'.strip()
+
+ class Error(T.NamedTuple):
+ message: str
+
+ class UnknownLine(T.NamedTuple):
+ message: str
+ lineno: int
+
+ class Version(T.NamedTuple):
+ version: int
+
+ _MAIN = 1
+ _AFTER_TEST = 2
+ _YAML = 3
+
+ _RE_BAILOUT = re.compile(r'Bail out!\s*(.*)')
+ _RE_DIRECTIVE = re.compile(r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?')
+ _RE_PLAN = re.compile(r'1\.\.([0-9]+)' + _RE_DIRECTIVE.pattern)
+ _RE_TEST = re.compile(r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE.pattern)
+ _RE_VERSION = re.compile(r'TAP version ([0-9]+)')
+ _RE_YAML_START = re.compile(r'(\s+)---.*')
+ _RE_YAML_END = re.compile(r'\s+\.\.\.\s*')
+
+ found_late_test = False
+ bailed_out = False
+ plan: T.Optional[Plan] = None
+ lineno = 0
+ num_tests = 0
+ yaml_lineno: T.Optional[int] = None
+ yaml_indent = ''
+ state = _MAIN
+ version = 12
+
+ def parse_test(self, ok: bool, num: int, name: str, directive: T.Optional[str], explanation: T.Optional[str]) -> \
+ T.Generator[T.Union['TAPParser.Test', 'TAPParser.Error'], None, None]:
+ name = name.strip()
+ explanation = explanation.strip() if explanation else None
+ if directive is not None:
+ directive = directive.upper()
+ if directive.startswith('SKIP'):
+ if ok:
+ yield self.Test(num, name, TestResult.SKIP, explanation)
+ return
+ elif directive == 'TODO':
+ yield self.Test(num, name, TestResult.UNEXPECTEDPASS if ok else TestResult.EXPECTEDFAIL, explanation)
+ return
+ else:
+ yield self.Error(f'invalid directive "{directive}"')
+
+ yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation)
+
+ async def parse_async(self, lines: T.AsyncIterator[str]) -> T.AsyncIterator[TYPE_TAPResult]:
+ async for line in lines:
+ for event in self.parse_line(line):
+ yield event
+ for event in self.parse_line(None):
+ yield event
+
+ def parse(self, io: T.Iterator[str]) -> T.Iterator[TYPE_TAPResult]:
+ for line in io:
+ yield from self.parse_line(line)
+ yield from self.parse_line(None)
+
+ def parse_line(self, line: T.Optional[str]) -> T.Iterator[TYPE_TAPResult]:
+ if line is not None:
+ self.lineno += 1
+ line = line.rstrip()
+
+ # YAML blocks are only accepted after a test
+ if self.state == self._AFTER_TEST:
+ if self.version >= 13:
+ m = self._RE_YAML_START.match(line)
+ if m:
+ self.state = self._YAML
+ self.yaml_lineno = self.lineno
+ self.yaml_indent = m.group(1)
+ return
+ self.state = self._MAIN
+
+ elif self.state == self._YAML:
+ if self._RE_YAML_END.match(line):
+ self.state = self._MAIN
+ return
+ if line.startswith(self.yaml_indent):
+ return
+ yield self.Error(f'YAML block not terminated (started on line {self.yaml_lineno})')
+ self.state = self._MAIN
+
+ assert self.state == self._MAIN
+ if not line or line.startswith('#'):
+ return
+
+ m = self._RE_TEST.match(line)
+ if m:
+ if self.plan and self.plan.late and not self.found_late_test:
+ yield self.Error('unexpected test after late plan')
+ self.found_late_test = True
+ self.num_tests += 1
+ num = self.num_tests if m.group(2) is None else int(m.group(2))
+ if num != self.num_tests:
+ yield self.Error('out of order test numbers')
+ yield from self.parse_test(m.group(1) == 'ok', num,
+ m.group(3), m.group(4), m.group(5))
+ self.state = self._AFTER_TEST
+ return
+
+ m = self._RE_PLAN.match(line)
+ if m:
+ if self.plan:
+ yield self.Error('more than one plan found')
+ else:
+ num_tests = int(m.group(1))
+ skipped = num_tests == 0
+ if m.group(2):
+ if m.group(2).upper().startswith('SKIP'):
+ if num_tests > 0:
+ yield self.Error('invalid SKIP directive for plan')
+ skipped = True
+ else:
+ yield self.Error('invalid directive for plan')
+ self.plan = self.Plan(num_tests=num_tests, late=(self.num_tests > 0),
+ skipped=skipped, explanation=m.group(3))
+ yield self.plan
+ return
+
+ m = self._RE_BAILOUT.match(line)
+ if m:
+ yield self.Bailout(m.group(1))
+ self.bailed_out = True
+ return
+
+ m = self._RE_VERSION.match(line)
+ if m:
+ # The TAP version is only accepted as the first line
+ if self.lineno != 1:
+ yield self.Error('version number must be on the first line')
+ return
+ self.version = int(m.group(1))
+ if self.version < 13:
+ yield self.Error('version number should be at least 13')
+ else:
+ yield self.Version(version=self.version)
+ return
+
+ # unknown syntax
+ yield self.UnknownLine(line, self.lineno)
+ else:
+ # end of file
+ if self.state == self._YAML:
+ yield self.Error(f'YAML block not terminated (started on line {self.yaml_lineno})')
+
+ if not self.bailed_out and self.plan and self.num_tests != self.plan.num_tests:
+ if self.num_tests < self.plan.num_tests:
+ yield self.Error(f'Too few tests run (expected {self.plan.num_tests}, got {self.num_tests})')
+ else:
+ yield self.Error(f'Too many tests run (expected {self.plan.num_tests}, got {self.num_tests})')
+
+class TestLogger:
+ def flush(self) -> None:
+ pass
+
+ def start(self, harness: 'TestHarness') -> None:
+ pass
+
+ def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None:
+ pass
+
+ def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, res: TestResult) -> None:
+ pass
+
+ def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
+ pass
+
+ async def finish(self, harness: 'TestHarness') -> None:
+ pass
+
+ def close(self) -> None:
+ pass
+
+
+class TestFileLogger(TestLogger):
+ def __init__(self, filename: str, errors: str = 'replace') -> None:
+ self.filename = filename
+ self.file = open(filename, 'w', encoding='utf-8', errors=errors)
+
+ def close(self) -> None:
+ if self.file:
+ self.file.close()
+ self.file = None
+
+
+class ConsoleLogger(TestLogger):
+ ASCII_SPINNER = ['..', ':.', '.:']
+ SPINNER = ["\U0001f311", "\U0001f312", "\U0001f313", "\U0001f314",
+ "\U0001f315", "\U0001f316", "\U0001f317", "\U0001f318"]
+
+ SCISSORS = "\u2700 "
+ HLINE = "\u2015"
+ RTRI = "\u25B6 "
+
+ def __init__(self) -> None:
+ self.running_tests = OrderedSet() # type: OrderedSet['TestRun']
+ self.progress_test = None # type: T.Optional['TestRun']
+ self.progress_task = None # type: T.Optional[asyncio.Future]
+ self.max_left_width = 0 # type: int
+ self.stop = False
+ self.update = asyncio.Event()
+ self.should_erase_line = ''
+ self.test_count = 0
+ self.started_tests = 0
+ self.spinner_index = 0
+ try:
+ self.cols, _ = os.get_terminal_size(1)
+ self.is_tty = True
+ except OSError:
+ self.cols = 80
+ self.is_tty = False
+
+ self.output_start = dashes(self.SCISSORS, self.HLINE, self.cols - 2)
+ self.output_end = dashes('', self.HLINE, self.cols - 2)
+ self.sub = self.RTRI
+ self.spinner = self.SPINNER
+ try:
+ self.output_start.encode(sys.stdout.encoding or 'ascii')
+ except UnicodeEncodeError:
+ self.output_start = dashes('8<', '-', self.cols - 2)
+ self.output_end = dashes('', '-', self.cols - 2)
+ self.sub = '| '
+ self.spinner = self.ASCII_SPINNER
+
+ def flush(self) -> None:
+ if self.should_erase_line:
+ print(self.should_erase_line, end='')
+ self.should_erase_line = ''
+
+ def print_progress(self, line: str) -> None:
+ print(self.should_erase_line, line, sep='', end='\r')
+ self.should_erase_line = '\x1b[K'
+
+ def request_update(self) -> None:
+ self.update.set()
+
+ def emit_progress(self, harness: 'TestHarness') -> None:
+ if self.progress_test is None:
+ self.flush()
+ return
+
+ if len(self.running_tests) == 1:
+ count = f'{self.started_tests}/{self.test_count}'
+ else:
+ count = '{}-{}/{}'.format(self.started_tests - len(self.running_tests) + 1,
+ self.started_tests, self.test_count)
+
+ left = '[{}] {} '.format(count, self.spinner[self.spinner_index])
+ self.spinner_index = (self.spinner_index + 1) % len(self.spinner)
+
+ right = '{spaces} {dur:{durlen}}'.format(
+ spaces=' ' * TestResult.maxlen(),
+ dur=int(time.time() - self.progress_test.starttime),
+ durlen=harness.duration_max_len)
+ if self.progress_test.timeout:
+ right += '/{timeout:{durlen}}'.format(
+ timeout=self.progress_test.timeout,
+ durlen=harness.duration_max_len)
+ right += 's'
+ details = self.progress_test.get_details()
+ if details:
+ right += ' ' + details
+
+ line = harness.format(self.progress_test, colorize=True,
+ max_left_width=self.max_left_width,
+ left=left, right=right)
+ self.print_progress(line)
+
+ def start(self, harness: 'TestHarness') -> None:
+ async def report_progress() -> None:
+ loop = asyncio.get_event_loop()
+ next_update = 0.0
+ self.request_update()
+ while not self.stop:
+ await self.update.wait()
+ self.update.clear()
+ # We may get here simply because the progress line has been
+ # overwritten, so do not always switch. Only do so every
+ # second, or if the printed test has finished
+ if loop.time() >= next_update:
+ self.progress_test = None
+ next_update = loop.time() + 1
+ loop.call_at(next_update, self.request_update)
+
+ if (self.progress_test and
+ self.progress_test.res is not TestResult.RUNNING):
+ self.progress_test = None
+
+ if not self.progress_test:
+ if not self.running_tests:
+ continue
+ # Pick a test in round robin order
+ self.progress_test = self.running_tests.pop(last=False)
+ self.running_tests.add(self.progress_test)
+
+ self.emit_progress(harness)
+ self.flush()
+
+ self.test_count = harness.test_count
+ self.cols = max(self.cols, harness.max_left_width + 30)
+
+ if self.is_tty and not harness.need_console:
+ # Account for "[aa-bb/cc] OO " in the progress report
+ self.max_left_width = 3 * len(str(self.test_count)) + 8
+ self.progress_task = asyncio.ensure_future(report_progress())
+
+ def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None:
+ if test.verbose and test.cmdline:
+ self.flush()
+ print(harness.format(test, mlog.colorize_console(),
+ max_left_width=self.max_left_width,
+ right=test.res.get_text(mlog.colorize_console())))
+ print(test.res.get_command_marker() + test.cmdline)
+ if test.direct_stdout:
+ print(self.output_start, flush=True)
+ elif not test.needs_parsing:
+ print(flush=True)
+
+ self.started_tests += 1
+ self.running_tests.add(test)
+ self.running_tests.move_to_end(test, last=False)
+ self.request_update()
+
+ def shorten_log(self, harness: 'TestHarness', result: 'TestRun') -> str:
+ if not result.verbose and not harness.options.print_errorlogs:
+ return ''
+
+ log = result.get_log(mlog.colorize_console(),
+ stderr_only=result.needs_parsing)
+ if result.verbose:
+ return log
+
+ lines = log.splitlines()
+ if len(lines) < 100:
+ return log
+ else:
+ return str(mlog.bold('Listing only the last 100 lines from a long log.\n')) + '\n'.join(lines[-100:])
+
+ def print_log(self, harness: 'TestHarness', result: 'TestRun') -> None:
+ if not result.verbose:
+ cmdline = result.cmdline
+ if not cmdline:
+ print(result.res.get_command_marker() + result.stdo)
+ return
+ print(result.res.get_command_marker() + cmdline)
+
+ log = self.shorten_log(harness, result)
+ if log:
+ print(self.output_start)
+ print_safe(log)
+ print(self.output_end)
+
+ def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, result: TestResult) -> None:
+ if test.verbose or (harness.options.print_errorlogs and result.is_bad()):
+ self.flush()
+ print(harness.format(test, mlog.colorize_console(), max_left_width=self.max_left_width,
+ prefix=self.sub,
+ middle=s,
+ right=result.get_text(mlog.colorize_console())), flush=True)
+
+ self.request_update()
+
+ def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
+ self.running_tests.remove(result)
+ if result.res is TestResult.TIMEOUT and (result.verbose or
+ harness.options.print_errorlogs):
+ self.flush()
+ print(f'{result.name} time out (After {result.timeout} seconds)')
+
+ if not harness.options.quiet or not result.res.is_ok():
+ self.flush()
+ if result.cmdline and result.direct_stdout:
+ print(self.output_end)
+ print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width))
+ else:
+ print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width),
+ flush=True)
+ if result.verbose or result.res.is_bad():
+ self.print_log(harness, result)
+ if result.warnings:
+ print(flush=True)
+ for w in result.warnings:
+ print(w, flush=True)
+ print(flush=True)
+ if result.verbose or result.res.is_bad():
+ print(flush=True)
+
+ self.request_update()
+
+ async def finish(self, harness: 'TestHarness') -> None:
+ self.stop = True
+ self.request_update()
+ if self.progress_task:
+ await self.progress_task
+
+ if harness.collected_failures and \
+ (harness.options.print_errorlogs or harness.options.verbose):
+ print("\nSummary of Failures:\n")
+ for i, result in enumerate(harness.collected_failures, 1):
+ print(harness.format(result, mlog.colorize_console()))
+
+ print(harness.summary())
+
+
+class TextLogfileBuilder(TestFileLogger):
+ def start(self, harness: 'TestHarness') -> None:
+ self.file.write(f'Log of Meson test suite run on {datetime.datetime.now().isoformat()}\n\n')
+ inherit_env = env_tuple_to_str(os.environ.items())
+ self.file.write(f'Inherited environment: {inherit_env}\n\n')
+
+ def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
+ title = f'{result.num}/{harness.test_count}'
+ self.file.write(dashes(title, '=', 78) + '\n')
+ self.file.write('test: ' + result.name + '\n')
+ starttime_str = time.strftime("%H:%M:%S", time.gmtime(result.starttime))
+ self.file.write('start time: ' + starttime_str + '\n')
+ self.file.write('duration: ' + '%.2fs' % result.duration + '\n')
+ self.file.write('result: ' + result.get_exit_status() + '\n')
+ if result.cmdline:
+ self.file.write('command: ' + result.cmdline + '\n')
+ if result.stdo:
+ name = 'stdout' if harness.options.split else 'output'
+ self.file.write(dashes(name, '-', 78) + '\n')
+ self.file.write(result.stdo)
+ if result.stde:
+ self.file.write(dashes('stderr', '-', 78) + '\n')
+ self.file.write(result.stde)
+ self.file.write(dashes('', '=', 78) + '\n\n')
+
+ async def finish(self, harness: 'TestHarness') -> None:
+ if harness.collected_failures:
+ self.file.write("\nSummary of Failures:\n\n")
+ for i, result in enumerate(harness.collected_failures, 1):
+ self.file.write(harness.format(result, False) + '\n')
+ self.file.write(harness.summary())
+
+ print(f'Full log written to {self.filename}')
+
+
+class JsonLogfileBuilder(TestFileLogger):
+ def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
+ jresult = {'name': result.name,
+ 'stdout': result.stdo,
+ 'result': result.res.value,
+ 'starttime': result.starttime,
+ 'duration': result.duration,
+ 'returncode': result.returncode,
+ 'env': result.env,
+ 'command': result.cmd} # type: T.Dict[str, T.Any]
+ if result.stde:
+ jresult['stderr'] = result.stde
+ self.file.write(json.dumps(jresult) + '\n')
+
+
+class JunitBuilder(TestLogger):
+
+ """Builder for Junit test results.
+
+ Junit is impossible to stream out, it requires attributes counting the
+ total number of tests, failures, skips, and errors in the root element
+ and in each test suite. As such, we use a builder class to track each
+ test case, and calculate all metadata before writing it out.
+
+ For tests with multiple results (like from a TAP test), we record the
+ test as a suite with the project_name.test_name. This allows us to track
+ each result separately. For tests with only one result (such as exit-code
+ tests) we record each one into a suite with the name project_name. The use
+ of the project_name allows us to sort subproject tests separately from
+ the root project.
+ """
+
+ def __init__(self, filename: str) -> None:
+ self.filename = filename
+ self.root = et.Element(
+ 'testsuites', tests='0', errors='0', failures='0')
+ self.suites = {} # type: T.Dict[str, et.Element]
+
+ def log(self, harness: 'TestHarness', test: 'TestRun') -> None:
+ """Log a single test case."""
+ if test.junit is not None:
+ for suite in test.junit.findall('.//testsuite'):
+ # Assume that we don't need to merge anything here...
+ suite.attrib['name'] = '{}.{}.{}'.format(test.project, test.name, suite.attrib['name'])
+
+ # GTest can inject invalid attributes
+ for case in suite.findall('.//testcase[@result]'):
+ del case.attrib['result']
+ for case in suite.findall('.//testcase[@timestamp]'):
+ del case.attrib['timestamp']
+ for case in suite.findall('.//testcase[@file]'):
+ del case.attrib['file']
+ for case in suite.findall('.//testcase[@line]'):
+ del case.attrib['line']
+ self.root.append(suite)
+ return
+
+ # In this case we have a test binary with multiple results.
+ # We want to record this so that each result is recorded
+ # separately
+ if test.results:
+ suitename = f'{test.project}.{test.name}'
+ assert suitename not in self.suites or harness.options.repeat > 1, 'duplicate suite'
+
+ suite = self.suites[suitename] = et.Element(
+ 'testsuite',
+ name=suitename,
+ tests=str(len(test.results)),
+ errors=str(sum(1 for r in test.results if r.result in
+ {TestResult.INTERRUPT, TestResult.ERROR})),
+ failures=str(sum(1 for r in test.results if r.result in
+ {TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})),
+ skipped=str(sum(1 for r in test.results if r.result is TestResult.SKIP)),
+ time=str(test.duration),
+ )
+
+ for subtest in test.results:
+ # Both name and classname are required. Use the suite name as
+ # the class name, so that e.g. GitLab groups testcases correctly.
+ testcase = et.SubElement(suite, 'testcase', name=str(subtest), classname=suitename)
+ if subtest.result is TestResult.SKIP:
+ et.SubElement(testcase, 'skipped')
+ elif subtest.result is TestResult.ERROR:
+ et.SubElement(testcase, 'error')
+ elif subtest.result is TestResult.FAIL:
+ et.SubElement(testcase, 'failure')
+ elif subtest.result is TestResult.UNEXPECTEDPASS:
+ fail = et.SubElement(testcase, 'failure')
+ fail.text = 'Test unexpected passed.'
+ elif subtest.result is TestResult.INTERRUPT:
+ fail = et.SubElement(testcase, 'error')
+ fail.text = 'Test was interrupted by user.'
+ elif subtest.result is TestResult.TIMEOUT:
+ fail = et.SubElement(testcase, 'error')
+ fail.text = 'Test did not finish before configured timeout.'
+ if subtest.explanation:
+ et.SubElement(testcase, 'system-out').text = subtest.explanation
+ if test.stdo:
+ out = et.SubElement(suite, 'system-out')
+ out.text = test.stdo.rstrip()
+ if test.stde:
+ err = et.SubElement(suite, 'system-err')
+ err.text = test.stde.rstrip()
+ else:
+ if test.project not in self.suites:
+ suite = self.suites[test.project] = et.Element(
+ 'testsuite', name=test.project, tests='1', errors='0',
+ failures='0', skipped='0', time=str(test.duration))
+ else:
+ suite = self.suites[test.project]
+ suite.attrib['tests'] = str(int(suite.attrib['tests']) + 1)
+
+ testcase = et.SubElement(suite, 'testcase', name=test.name,
+ classname=test.project, time=str(test.duration))
+ if test.res is TestResult.SKIP:
+ et.SubElement(testcase, 'skipped')
+ suite.attrib['skipped'] = str(int(suite.attrib['skipped']) + 1)
+ elif test.res is TestResult.ERROR:
+ et.SubElement(testcase, 'error')
+ suite.attrib['errors'] = str(int(suite.attrib['errors']) + 1)
+ elif test.res is TestResult.FAIL:
+ et.SubElement(testcase, 'failure')
+ suite.attrib['failures'] = str(int(suite.attrib['failures']) + 1)
+ if test.stdo:
+ out = et.SubElement(testcase, 'system-out')
+ out.text = test.stdo.rstrip()
+ if test.stde:
+ err = et.SubElement(testcase, 'system-err')
+ err.text = test.stde.rstrip()
+
+ async def finish(self, harness: 'TestHarness') -> None:
+ """Calculate total test counts and write out the xml result."""
+ for suite in self.suites.values():
+ self.root.append(suite)
+ # Skipped is really not allowed in the "testsuits" element
+ for attr in ['tests', 'errors', 'failures']:
+ self.root.attrib[attr] = str(int(self.root.attrib[attr]) + int(suite.attrib[attr]))
+
+ tree = et.ElementTree(self.root)
+ with open(self.filename, 'wb') as f:
+ tree.write(f, encoding='utf-8', xml_declaration=True)
+
+
+class TestRun:
+ TEST_NUM = 0
+ PROTOCOL_TO_CLASS: T.Dict[TestProtocol, T.Type['TestRun']] = {}
+
+ def __new__(cls, test: TestSerialisation, *args: T.Any, **kwargs: T.Any) -> T.Any:
+ return super().__new__(TestRun.PROTOCOL_TO_CLASS[test.protocol])
+
+ def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str],
+ name: str, timeout: T.Optional[int], is_parallel: bool, verbose: bool):
+ self.res = TestResult.PENDING
+ self.test = test
+ self._num = None # type: T.Optional[int]
+ self.name = name
+ self.timeout = timeout
+ self.results = [] # type: T.List[TAPParser.Test]
+ self.returncode = None # type: T.Optional[int]
+ self.starttime = None # type: T.Optional[float]
+ self.duration = None # type: T.Optional[float]
+ self.stdo = ''
+ self.stde = ''
+ self.additional_error = ''
+ self.cmd = None # type: T.Optional[T.List[str]]
+ self.env = test_env # type: T.Dict[str, str]
+ self.should_fail = test.should_fail
+ self.project = test.project_name
+ self.junit = None # type: T.Optional[et.ElementTree]
+ self.is_parallel = is_parallel
+ self.verbose = verbose
+ self.warnings = [] # type: T.List[str]
+
+ def start(self, cmd: T.List[str]) -> None:
+ self.res = TestResult.RUNNING
+ self.starttime = time.time()
+ self.cmd = cmd
+
+ @property
+ def num(self) -> int:
+ if self._num is None:
+ TestRun.TEST_NUM += 1
+ self._num = TestRun.TEST_NUM
+ return self._num
+
+ @property
+ def direct_stdout(self) -> bool:
+ return self.verbose and not self.is_parallel and not self.needs_parsing
+
+ def get_results(self) -> str:
+ if self.results:
+ # running or succeeded
+ passed = sum(x.result.is_ok() for x in self.results)
+ ran = sum(x.result is not TestResult.SKIP for x in self.results)
+ if passed == ran:
+ return f'{passed} subtests passed'
+ else:
+ return f'{passed}/{ran} subtests passed'
+ return ''
+
+ def get_exit_status(self) -> str:
+ return returncode_to_status(self.returncode)
+
+ def get_details(self) -> str:
+ if self.res is TestResult.PENDING:
+ return ''
+ if self.returncode:
+ return self.get_exit_status()
+ return self.get_results()
+
+ def _complete(self) -> None:
+ if self.res == TestResult.RUNNING:
+ self.res = TestResult.OK
+ assert isinstance(self.res, TestResult)
+ if self.should_fail and self.res in (TestResult.OK, TestResult.FAIL):
+ self.res = TestResult.UNEXPECTEDPASS if self.res is TestResult.OK else TestResult.EXPECTEDFAIL
+ if self.stdo and not self.stdo.endswith('\n'):
+ self.stdo += '\n'
+ if self.stde and not self.stde.endswith('\n'):
+ self.stde += '\n'
+ self.duration = time.time() - self.starttime
+
+ @property
+ def cmdline(self) -> T.Optional[str]:
+ if not self.cmd:
+ return None
+ test_only_env = set(self.env.items()) - set(os.environ.items())
+ return env_tuple_to_str(test_only_env) + \
+ ' '.join(sh_quote(x) for x in self.cmd)
+
+ def complete_skip(self) -> None:
+ self.starttime = time.time()
+ self.returncode = GNU_SKIP_RETURNCODE
+ self.res = TestResult.SKIP
+ self._complete()
+
+ def complete(self) -> None:
+ self._complete()
+
+ def get_log(self, colorize: bool = False, stderr_only: bool = False) -> str:
+ stdo = '' if stderr_only else self.stdo
+ if self.stde or self.additional_error:
+ res = ''
+ if stdo:
+ res += mlog.cyan('stdout:').get_text(colorize) + '\n'
+ res += stdo
+ if res[-1:] != '\n':
+ res += '\n'
+ res += mlog.cyan('stderr:').get_text(colorize) + '\n'
+ res += join_lines(self.stde, self.additional_error)
+ else:
+ res = stdo
+ if res and res[-1:] != '\n':
+ res += '\n'
+ return res
+
+ @property
+ def needs_parsing(self) -> bool:
+ return False
+
+ async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> None:
+ async for l in lines:
+ pass
+
+
+class TestRunExitCode(TestRun):
+
+ def complete(self) -> None:
+ if self.res != TestResult.RUNNING:
+ pass
+ elif self.returncode == GNU_SKIP_RETURNCODE:
+ self.res = TestResult.SKIP
+ elif self.returncode == GNU_ERROR_RETURNCODE:
+ self.res = TestResult.ERROR
+ else:
+ self.res = TestResult.FAIL if bool(self.returncode) else TestResult.OK
+ super().complete()
+
+TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode
+
+
+class TestRunGTest(TestRunExitCode):
+ def complete(self) -> None:
+ filename = f'{self.test.name}.xml'
+ if self.test.workdir:
+ filename = os.path.join(self.test.workdir, filename)
+
+ try:
+ self.junit = et.parse(filename)
+ except FileNotFoundError:
+ # This can happen if the test fails to run or complete for some
+ # reason, like the rpath for libgtest isn't properly set. ExitCode
+ # will handle the failure, don't generate a stacktrace.
+ pass
+
+ super().complete()
+
+TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest
+
+
+class TestRunTAP(TestRun):
+ @property
+ def needs_parsing(self) -> bool:
+ return True
+
+ def complete(self) -> None:
+ if self.returncode != 0 and not self.res.was_killed():
+ self.res = TestResult.ERROR
+ self.stde = self.stde or ''
+ self.stde += f'\n(test program exited with status code {self.returncode})'
+ super().complete()
+
+ async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> None:
+ res = None
+ warnings = [] # type: T.List[TAPParser.UnknownLine]
+ version = 12
+
+ async for i in TAPParser().parse_async(lines):
+ if isinstance(i, TAPParser.Version):
+ version = i.version
+ elif isinstance(i, TAPParser.Bailout):
+ res = TestResult.ERROR
+ harness.log_subtest(self, i.message, res)
+ elif isinstance(i, TAPParser.Test):
+ self.results.append(i)
+ if i.result.is_bad():
+ res = TestResult.FAIL
+ harness.log_subtest(self, i.name or f'subtest {i.number}', i.result)
+ elif isinstance(i, TAPParser.UnknownLine):
+ warnings.append(i)
+ elif isinstance(i, TAPParser.Error):
+ self.additional_error += 'TAP parsing error: ' + i.message
+ res = TestResult.ERROR
+
+ if warnings:
+ unknown = str(mlog.yellow('UNKNOWN'))
+ width = len(str(max(i.lineno for i in warnings)))
+ for w in warnings:
+ self.warnings.append(f'stdout: {w.lineno:{width}}: {unknown}: {w.message}')
+ if version > 13:
+ self.warnings.append('Unknown TAP output lines have been ignored. Please open a feature request to\n'
+ 'implement them, or prefix them with a # if they are not TAP syntax.')
+ else:
+ self.warnings.append(str(mlog.red('ERROR')) + ': Unknown TAP output lines for a supported TAP version.\n'
+ 'This is probably a bug in the test; if they are not TAP syntax, prefix them with a #')
+ if all(t.result is TestResult.SKIP for t in self.results):
+ # This includes the case where self.results is empty
+ res = TestResult.SKIP
+
+ if res and self.res == TestResult.RUNNING:
+ self.res = res
+
+TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP
+
+
+class TestRunRust(TestRun):
+ @property
+ def needs_parsing(self) -> bool:
+ return True
+
+ async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> None:
+ def parse_res(n: int, name: str, result: str) -> TAPParser.Test:
+ if result == 'ok':
+ return TAPParser.Test(n, name, TestResult.OK, None)
+ elif result == 'ignored':
+ return TAPParser.Test(n, name, TestResult.SKIP, None)
+ elif result == 'FAILED':
+ return TAPParser.Test(n, name, TestResult.FAIL, None)
+ return TAPParser.Test(n, name, TestResult.ERROR,
+ f'Unsupported output from rust test: {result}')
+
+ n = 1
+ async for line in lines:
+ if line.startswith('test ') and not line.startswith('test result'):
+ _, name, _, result = line.rstrip().split(' ')
+ name = name.replace('::', '.')
+ t = parse_res(n, name, result)
+ self.results.append(t)
+ harness.log_subtest(self, name, t.result)
+ n += 1
+
+ res = None
+
+ if all(t.result is TestResult.SKIP for t in self.results):
+ # This includes the case where self.results is empty
+ res = TestResult.SKIP
+ elif any(t.result is TestResult.ERROR for t in self.results):
+ res = TestResult.ERROR
+ elif any(t.result is TestResult.FAIL for t in self.results):
+ res = TestResult.FAIL
+
+ if res and self.res == TestResult.RUNNING:
+ self.res = res
+
+TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust
+
+
+def decode(stream: T.Union[None, bytes]) -> str:
+ if stream is None:
+ return ''
+ try:
+ return stream.decode('utf-8')
+ except UnicodeDecodeError:
+ return stream.decode('iso-8859-1', errors='ignore')
+
+async def read_decode(reader: asyncio.StreamReader,
+ queue: T.Optional['asyncio.Queue[T.Optional[str]]'],
+ console_mode: ConsoleUser) -> str:
+ stdo_lines = []
+ try:
+ while not reader.at_eof():
+ # Prefer splitting by line, as that produces nicer output
+ try:
+ line_bytes = await reader.readuntil(b'\n')
+ except asyncio.IncompleteReadError as e:
+ line_bytes = e.partial
+ except asyncio.LimitOverrunError as e:
+ line_bytes = await reader.readexactly(e.consumed)
+ if line_bytes:
+ line = decode(line_bytes)
+ stdo_lines.append(line)
+ if console_mode is ConsoleUser.STDOUT:
+ print(line, end='', flush=True)
+ if queue:
+ await queue.put(line)
+ return ''.join(stdo_lines)
+ except asyncio.CancelledError:
+ return ''.join(stdo_lines)
+ finally:
+ if queue:
+ await queue.put(None)
+
+def run_with_mono(fname: str) -> bool:
+ return fname.endswith('.exe') and not (is_windows() or is_cygwin())
+
+def check_testdata(objs: T.List[TestSerialisation]) -> T.List[TestSerialisation]:
+ if not isinstance(objs, list):
+ raise MesonVersionMismatchException('<unknown>', coredata_version)
+ for obj in objs:
+ if not isinstance(obj, TestSerialisation):
+ raise MesonVersionMismatchException('<unknown>', coredata_version)
+ if not hasattr(obj, 'version'):
+ raise MesonVersionMismatchException('<unknown>', coredata_version)
+ if major_versions_differ(obj.version, coredata_version):
+ raise MesonVersionMismatchException(obj.version, coredata_version)
+ return objs
+
+# Custom waiting primitives for asyncio
+
+async def queue_iter(q: 'asyncio.Queue[T.Optional[str]]') -> T.AsyncIterator[str]:
+ while True:
+ item = await q.get()
+ q.task_done()
+ if item is None:
+ break
+ yield item
+
+async def complete(future: asyncio.Future) -> None:
+ """Wait for completion of the given future, ignoring cancellation."""
+ try:
+ await future
+ except asyncio.CancelledError:
+ pass
+
+async def complete_all(futures: T.Iterable[asyncio.Future],
+ timeout: T.Optional[T.Union[int, float]] = None) -> None:
+ """Wait for completion of all the given futures, ignoring cancellation.
+ If timeout is not None, raise an asyncio.TimeoutError after the given
+ time has passed. asyncio.TimeoutError is only raised if some futures
+ have not completed and none have raised exceptions, even if timeout
+ is zero."""
+
+ def check_futures(futures: T.Iterable[asyncio.Future]) -> None:
+ # Raise exceptions if needed
+ left = False
+ for f in futures:
+ if not f.done():
+ left = True
+ elif not f.cancelled():
+ f.result()
+ if left:
+ raise asyncio.TimeoutError
+
+ # Python is silly and does not have a variant of asyncio.wait with an
+ # absolute time as deadline.
+ deadline = None if timeout is None else asyncio.get_event_loop().time() + timeout
+ while futures and (timeout is None or timeout > 0):
+ done, futures = await asyncio.wait(futures, timeout=timeout,
+ return_when=asyncio.FIRST_EXCEPTION)
+ check_futures(done)
+ if deadline:
+ timeout = deadline - asyncio.get_event_loop().time()
+
+ check_futures(futures)
+
+
+class TestSubprocess:
+ def __init__(self, p: asyncio.subprocess.Process,
+ stdout: T.Optional[int], stderr: T.Optional[int],
+ postwait_fn: T.Callable[[], None] = None):
+ self._process = p
+ self.stdout = stdout
+ self.stderr = stderr
+ self.stdo_task: T.Optional[asyncio.Task[None]] = None
+ self.stde_task: T.Optional[asyncio.Task[None]] = None
+ self.postwait_fn = postwait_fn # type: T.Callable[[], None]
+ self.all_futures = [] # type: T.List[asyncio.Future]
+ self.queue = None # type: T.Optional[asyncio.Queue[T.Optional[str]]]
+
+ def stdout_lines(self) -> T.AsyncIterator[str]:
+ self.queue = asyncio.Queue()
+ return queue_iter(self.queue)
+
+ def communicate(self,
+ test: 'TestRun',
+ console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]],
+ T.Optional[T.Awaitable[str]]]:
+ async def collect_stdo(test: 'TestRun',
+ reader: asyncio.StreamReader,
+ console_mode: ConsoleUser) -> None:
+ test.stdo = await read_decode(reader, self.queue, console_mode)
+
+ async def collect_stde(test: 'TestRun',
+ reader: asyncio.StreamReader,
+ console_mode: ConsoleUser) -> None:
+ test.stde = await read_decode(reader, None, console_mode)
+
+ # asyncio.ensure_future ensures that printing can
+ # run in the background, even before it is awaited
+ if self.stdo_task is None and self.stdout is not None:
+ decode_coro = collect_stdo(test, self._process.stdout, console_mode)
+ self.stdo_task = asyncio.ensure_future(decode_coro)
+ self.all_futures.append(self.stdo_task)
+ if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT:
+ decode_coro = collect_stde(test, self._process.stderr, console_mode)
+ self.stde_task = asyncio.ensure_future(decode_coro)
+ self.all_futures.append(self.stde_task)
+
+ return self.stdo_task, self.stde_task
+
+ async def _kill(self) -> T.Optional[str]:
+ # Python does not provide multiplatform support for
+ # killing a process and all its children so we need
+ # to roll our own.
+ p = self._process
+ try:
+ if is_windows():
+ subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)])
+ else:
+ # Send a termination signal to the process group that setsid()
+ # created - giving it a chance to perform any cleanup.
+ os.killpg(p.pid, signal.SIGTERM)
+
+ # Make sure the termination signal actually kills the process
+ # group, otherwise retry with a SIGKILL.
+ with suppress(asyncio.TimeoutError):
+ await asyncio.wait_for(p.wait(), timeout=0.5)
+ if p.returncode is not None:
+ return None
+
+ os.killpg(p.pid, signal.SIGKILL)
+
+ with suppress(asyncio.TimeoutError):
+ await asyncio.wait_for(p.wait(), timeout=1)
+ if p.returncode is not None:
+ return None
+
+ # An earlier kill attempt has not worked for whatever reason.
+ # Try to kill it one last time with a direct call.
+ # If the process has spawned children, they will remain around.
+ p.kill()
+ with suppress(asyncio.TimeoutError):
+ await asyncio.wait_for(p.wait(), timeout=1)
+ if p.returncode is not None:
+ return None
+ return 'Test process could not be killed.'
+ except ProcessLookupError:
+ # Sometimes (e.g. with Wine) this happens. There's nothing
+ # we can do, probably the process already died so just wait
+ # for the event loop to pick that up.
+ await p.wait()
+ return None
+ finally:
+ if self.stdo_task:
+ self.stdo_task.cancel()
+ if self.stde_task:
+ self.stde_task.cancel()
+
+ async def wait(self, test: 'TestRun') -> None:
+ p = self._process
+
+ self.all_futures.append(asyncio.ensure_future(p.wait()))
+ try:
+ await complete_all(self.all_futures, timeout=test.timeout)
+ except asyncio.TimeoutError:
+ test.additional_error += await self._kill() or ''
+ test.res = TestResult.TIMEOUT
+ except asyncio.CancelledError:
+ # The main loop must have seen Ctrl-C.
+ test.additional_error += await self._kill() or ''
+ test.res = TestResult.INTERRUPT
+ finally:
+ if self.postwait_fn:
+ self.postwait_fn()
+
+ test.returncode = p.returncode or 0
+
+class SingleTestRunner:
+
+ def __init__(self, test: TestSerialisation, env: T.Dict[str, str], name: str,
+ options: argparse.Namespace):
+ self.test = test
+ self.options = options
+ self.cmd = self._get_cmd()
+
+ if self.cmd and self.test.extra_paths:
+ env['PATH'] = os.pathsep.join(self.test.extra_paths + ['']) + env['PATH']
+ winecmd = []
+ for c in self.cmd:
+ winecmd.append(c)
+ if os.path.basename(c).startswith('wine'):
+ env['WINEPATH'] = get_wine_shortpath(
+ winecmd,
+ ['Z:' + p for p in self.test.extra_paths] + env.get('WINEPATH', '').split(';'),
+ self.test.workdir
+ )
+ break
+
+ # If MALLOC_PERTURB_ is not set, or if it is set to an empty value,
+ # (i.e., the test or the environment don't explicitly set it), set
+ # it ourselves. We do this unconditionally for regular tests
+ # because it is extremely useful to have.
+ # Setting MALLOC_PERTURB_="0" will completely disable this feature.
+ if ('MALLOC_PERTURB_' not in env or not env['MALLOC_PERTURB_']) and not options.benchmark:
+ env['MALLOC_PERTURB_'] = str(random.randint(1, 255))
+
+ if self.options.gdb or self.test.timeout is None or self.test.timeout <= 0:
+ timeout = None
+ elif self.options.timeout_multiplier is None:
+ timeout = self.test.timeout
+ elif self.options.timeout_multiplier <= 0:
+ timeout = None
+ else:
+ timeout = self.test.timeout * self.options.timeout_multiplier
+
+ is_parallel = test.is_parallel and self.options.num_processes > 1 and not self.options.gdb
+ verbose = (test.verbose or self.options.verbose) and not self.options.quiet
+ self.runobj = TestRun(test, env, name, timeout, is_parallel, verbose)
+
+ if self.options.gdb:
+ self.console_mode = ConsoleUser.GDB
+ elif self.runobj.direct_stdout:
+ self.console_mode = ConsoleUser.STDOUT
+ else:
+ self.console_mode = ConsoleUser.LOGGER
+
+ def _get_test_cmd(self) -> T.Optional[T.List[str]]:
+ testentry = self.test.fname[0]
+ if self.options.no_rebuild and self.test.cmd_is_built and not os.path.isfile(testentry):
+ raise TestException(f'The test program {testentry!r} does not exist. Cannot run tests before building them.')
+ if testentry.endswith('.jar'):
+ return ['java', '-jar'] + self.test.fname
+ elif not self.test.is_cross_built and run_with_mono(testentry):
+ return ['mono'] + self.test.fname
+ elif self.test.cmd_is_exe and self.test.is_cross_built and self.test.needs_exe_wrapper:
+ if self.test.exe_wrapper is None:
+ # Can not run test on cross compiled executable
+ # because there is no execute wrapper.
+ return None
+ elif self.test.cmd_is_exe:
+ # If the command is not built (ie, its a python script),
+ # then we don't check for the exe-wrapper
+ if not self.test.exe_wrapper.found():
+ msg = ('The exe_wrapper defined in the cross file {!r} was not '
+ 'found. Please check the command and/or add it to PATH.')
+ raise TestException(msg.format(self.test.exe_wrapper.name))
+ return self.test.exe_wrapper.get_command() + self.test.fname
+ return self.test.fname
+
+ def _get_cmd(self) -> T.Optional[T.List[str]]:
+ test_cmd = self._get_test_cmd()
+ if not test_cmd:
+ return None
+ return TestHarness.get_wrapper(self.options) + test_cmd
+
+ @property
+ def is_parallel(self) -> bool:
+ return self.runobj.is_parallel
+
+ @property
+ def visible_name(self) -> str:
+ return self.runobj.name
+
+ @property
+ def timeout(self) -> T.Optional[int]:
+ return self.runobj.timeout
+
+ async def run(self, harness: 'TestHarness') -> TestRun:
+ if self.cmd is None:
+ self.stdo = 'Not run because can not execute cross compiled binaries.'
+ harness.log_start_test(self.runobj)
+ self.runobj.complete_skip()
+ else:
+ cmd = self.cmd + self.test.cmd_args + self.options.test_args
+ self.runobj.start(cmd)
+ harness.log_start_test(self.runobj)
+ await self._run_cmd(harness, cmd)
+ return self.runobj
+
+ async def _run_subprocess(self, args: T.List[str], *,
+ stdout: T.Optional[int], stderr: T.Optional[int],
+ env: T.Dict[str, str], cwd: T.Optional[str]) -> TestSubprocess:
+ # Let gdb handle ^C instead of us
+ if self.options.gdb:
+ previous_sigint_handler = signal.getsignal(signal.SIGINT)
+ # Make the meson executable ignore SIGINT while gdb is running.
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ def preexec_fn() -> None:
+ if self.options.gdb:
+ # Restore the SIGINT handler for the child process to
+ # ensure it can handle it.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ else:
+ # We don't want setsid() in gdb because gdb needs the
+ # terminal in order to handle ^C and not show tcsetpgrp()
+ # errors avoid not being able to use the terminal.
+ os.setsid()
+
+ def postwait_fn() -> None:
+ if self.options.gdb:
+ # Let us accept ^C again
+ signal.signal(signal.SIGINT, previous_sigint_handler)
+
+ p = await asyncio.create_subprocess_exec(*args,
+ stdout=stdout,
+ stderr=stderr,
+ env=env,
+ cwd=cwd,
+ preexec_fn=preexec_fn if not is_windows() else None)
+ return TestSubprocess(p, stdout=stdout, stderr=stderr,
+ postwait_fn=postwait_fn if not is_windows() else None)
+
+ async def _run_cmd(self, harness: 'TestHarness', cmd: T.List[str]) -> None:
+ if self.console_mode is ConsoleUser.GDB:
+ stdout = None
+ stderr = None
+ else:
+ stdout = asyncio.subprocess.PIPE
+ stderr = asyncio.subprocess.STDOUT \
+ if not self.options.split and not self.runobj.needs_parsing \
+ else asyncio.subprocess.PIPE
+
+ extra_cmd = [] # type: T.List[str]
+ if self.test.protocol is TestProtocol.GTEST:
+ gtestname = self.test.name
+ if self.test.workdir:
+ gtestname = os.path.join(self.test.workdir, self.test.name)
+ extra_cmd.append(f'--gtest_output=xml:{gtestname}.xml')
+
+ p = await self._run_subprocess(cmd + extra_cmd,
+ stdout=stdout,
+ stderr=stderr,
+ env=self.runobj.env,
+ cwd=self.test.workdir)
+
+ if self.runobj.needs_parsing:
+ parse_coro = self.runobj.parse(harness, p.stdout_lines())
+ parse_task = asyncio.ensure_future(parse_coro)
+ else:
+ parse_task = None
+
+ stdo_task, stde_task = p.communicate(self.runobj, self.console_mode)
+ await p.wait(self.runobj)
+
+ if parse_task:
+ await parse_task
+ if stdo_task:
+ await stdo_task
+ if stde_task:
+ await stde_task
+
+ self.runobj.complete()
+
+
+class TestHarness:
+ def __init__(self, options: argparse.Namespace):
+ self.options = options
+ self.collected_failures = [] # type: T.List[TestRun]
+ self.fail_count = 0
+ self.expectedfail_count = 0
+ self.unexpectedpass_count = 0
+ self.success_count = 0
+ self.skip_count = 0
+ self.timeout_count = 0
+ self.test_count = 0
+ self.name_max_len = 0
+ self.is_run = False
+ self.loggers = [] # type: T.List[TestLogger]
+ self.console_logger = ConsoleLogger()
+ self.loggers.append(self.console_logger)
+ self.need_console = False
+ self.ninja = None # type: T.List[str]
+
+ self.logfile_base = None # type: T.Optional[str]
+ if self.options.logbase and not self.options.gdb:
+ namebase = None
+ self.logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase)
+
+ if self.options.wrapper:
+ namebase = os.path.basename(self.get_wrapper(self.options)[0])
+ elif self.options.setup:
+ namebase = self.options.setup.replace(":", "_")
+
+ if namebase:
+ self.logfile_base += '-' + namebase.replace(' ', '_')
+
+ self.prepare_build()
+ self.load_metadata()
+
+ ss = set()
+ for t in self.tests:
+ for s in t.suite:
+ ss.add(s)
+ self.suites = list(ss)
+
+ def get_console_logger(self) -> 'ConsoleLogger':
+ assert self.console_logger
+ return self.console_logger
+
+ def prepare_build(self) -> None:
+ if self.options.no_rebuild:
+ return
+
+ if not (Path(self.options.wd) / 'build.ninja').is_file():
+ print('Only ninja backend is supported to rebuild tests before running them.')
+ # Disable, no point in trying to build anything later
+ self.options.no_rebuild = True
+ return
+
+ self.ninja = environment.detect_ninja()
+ if not self.ninja:
+ print("Can't find ninja, can't rebuild test.")
+ # If ninja can't be found return exit code 127, indicating command
+ # not found for shell, which seems appropriate here. This works
+ # nicely for `git bisect run`, telling it to abort - no point in
+ # continuing if there's no ninja.
+ sys.exit(127)
+
+ def load_metadata(self) -> None:
+ startdir = os.getcwd()
+ try:
+ os.chdir(self.options.wd)
+
+ # Before loading build / test data, make sure that the build
+ # configuration does not need to be regenerated. This needs to
+ # happen before rebuild_deps(), because we need the correct list of
+ # tests and their dependencies to compute
+ if not self.options.no_rebuild:
+ ret = subprocess.run(self.ninja + ['build.ninja']).returncode
+ if ret != 0:
+ raise TestException(f'Could not configure {self.options.wd!r}')
+
+ self.build_data = build.load(os.getcwd())
+ if not self.options.setup:
+ self.options.setup = self.build_data.test_setup_default_name
+ if self.options.benchmark:
+ self.tests = self.load_tests('meson_benchmark_setup.dat')
+ else:
+ self.tests = self.load_tests('meson_test_setup.dat')
+ finally:
+ os.chdir(startdir)
+
+ def load_tests(self, file_name: str) -> T.List[TestSerialisation]:
+ datafile = Path('meson-private') / file_name
+ if not datafile.is_file():
+ raise TestException(f'Directory {self.options.wd!r} does not seem to be a Meson build directory.')
+ with datafile.open('rb') as f:
+ objs = check_testdata(pickle.load(f))
+ return objs
+
+ def __enter__(self) -> 'TestHarness':
+ return self
+
+ def __exit__(self, exc_type: T.Any, exc_value: T.Any, traceback: T.Any) -> None:
+ self.close_logfiles()
+
+ def close_logfiles(self) -> None:
+ for l in self.loggers:
+ l.close()
+ self.console_logger = None
+
+ def get_test_setup(self, test: T.Optional[TestSerialisation]) -> build.TestSetup:
+ if ':' in self.options.setup:
+ if self.options.setup not in self.build_data.test_setups:
+ sys.exit(f"Unknown test setup '{self.options.setup}'.")
+ return self.build_data.test_setups[self.options.setup]
+ else:
+ full_name = test.project_name + ":" + self.options.setup
+ if full_name not in self.build_data.test_setups:
+ sys.exit(f"Test setup '{self.options.setup}' not found from project '{test.project_name}'.")
+ return self.build_data.test_setups[full_name]
+
+ def merge_setup_options(self, options: argparse.Namespace, test: TestSerialisation) -> T.Dict[str, str]:
+ current = self.get_test_setup(test)
+ if not options.gdb:
+ options.gdb = current.gdb
+ if options.gdb:
+ options.verbose = True
+ if options.timeout_multiplier is None:
+ options.timeout_multiplier = current.timeout_multiplier
+ # if options.env is None:
+ # options.env = current.env # FIXME, should probably merge options here.
+ if options.wrapper is None:
+ options.wrapper = current.exe_wrapper
+ elif current.exe_wrapper:
+ sys.exit('Conflict: both test setup and command line specify an exe wrapper.')
+ return current.env.get_env(os.environ.copy())
+
+ def get_test_runner(self, test: TestSerialisation) -> SingleTestRunner:
+ name = self.get_pretty_suite(test)
+ options = deepcopy(self.options)
+ if self.options.setup:
+ env = self.merge_setup_options(options, test)
+ else:
+ env = os.environ.copy()
+ test_env = test.env.get_env(env)
+ env.update(test_env)
+ if (test.is_cross_built and test.needs_exe_wrapper and
+ test.exe_wrapper and test.exe_wrapper.found()):
+ env['MESON_EXE_WRAPPER'] = join_args(test.exe_wrapper.get_command())
+ return SingleTestRunner(test, env, name, options)
+
+ def process_test_result(self, result: TestRun) -> None:
+ if result.res is TestResult.TIMEOUT:
+ self.timeout_count += 1
+ elif result.res is TestResult.SKIP:
+ self.skip_count += 1
+ elif result.res is TestResult.OK:
+ self.success_count += 1
+ elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}:
+ self.fail_count += 1
+ elif result.res is TestResult.EXPECTEDFAIL:
+ self.expectedfail_count += 1
+ elif result.res is TestResult.UNEXPECTEDPASS:
+ self.unexpectedpass_count += 1
+ else:
+ sys.exit(f'Unknown test result encountered: {result.res}')
+
+ if result.res.is_bad():
+ self.collected_failures.append(result)
+ for l in self.loggers:
+ l.log(self, result)
+
+ @property
+ def numlen(self) -> int:
+ return len(str(self.test_count))
+
+ @property
+ def max_left_width(self) -> int:
+ return 2 * self.numlen + 2
+
+ def get_test_num_prefix(self, num: int) -> str:
+ return '{num:{numlen}}/{testcount} '.format(numlen=self.numlen,
+ num=num,
+ testcount=self.test_count)
+
+ def format(self, result: TestRun, colorize: bool,
+ max_left_width: int = 0,
+ prefix: str = '',
+ left: T.Optional[str] = None,
+ middle: T.Optional[str] = None,
+ right: T.Optional[str] = None) -> str:
+ if left is None:
+ left = self.get_test_num_prefix(result.num)
+
+ # A non-default max_left_width lets the logger print more stuff before the
+ # name, while ensuring that the rightmost columns remain aligned.
+ max_left_width = max(max_left_width, self.max_left_width)
+
+ if middle is None:
+ middle = result.name
+ extra_mid_width = max_left_width + self.name_max_len + 1 - uniwidth(middle) - uniwidth(left) - uniwidth(prefix)
+ middle += ' ' * max(1, extra_mid_width)
+
+ if right is None:
+ right = '{res} {dur:{durlen}.2f}s'.format(
+ res=result.res.get_text(colorize),
+ dur=result.duration,
+ durlen=self.duration_max_len + 3)
+ details = result.get_details()
+ if details:
+ right += ' ' + details
+ return prefix + left + middle + right
+
+ def summary(self) -> str:
+ return textwrap.dedent('''
+ Ok: {:<4}
+ Expected Fail: {:<4}
+ Fail: {:<4}
+ Unexpected Pass: {:<4}
+ Skipped: {:<4}
+ Timeout: {:<4}
+ ''').format(self.success_count, self.expectedfail_count, self.fail_count,
+ self.unexpectedpass_count, self.skip_count, self.timeout_count)
+
+ def total_failure_count(self) -> int:
+ return self.fail_count + self.unexpectedpass_count + self.timeout_count
+
+ def doit(self) -> int:
+ if self.is_run:
+ raise RuntimeError('Test harness object can only be used once.')
+ self.is_run = True
+ tests = self.get_tests()
+ if not tests:
+ return 0
+ if not self.options.no_rebuild and not rebuild_deps(self.ninja, self.options.wd, tests):
+ # We return 125 here in case the build failed.
+ # The reason is that exit code 125 tells `git bisect run` that the current
+ # commit should be skipped. Thus users can directly use `meson test` to
+ # bisect without needing to handle the does-not-build case separately in a
+ # wrapper script.
+ sys.exit(125)
+
+ self.name_max_len = max(uniwidth(self.get_pretty_suite(test)) for test in tests)
+ self.options.num_processes = min(self.options.num_processes,
+ len(tests) * self.options.repeat)
+ startdir = os.getcwd()
+ try:
+ os.chdir(self.options.wd)
+ runners = [] # type: T.List[SingleTestRunner]
+ for i in range(self.options.repeat):
+ runners.extend(self.get_test_runner(test) for test in tests)
+ if i == 0:
+ self.duration_max_len = max(len(str(int(runner.timeout or 99)))
+ for runner in runners)
+ # Disable the progress report if it gets in the way
+ self.need_console = any(runner.console_mode is not ConsoleUser.LOGGER
+ for runner in runners)
+
+ self.test_count = len(runners)
+ self.run_tests(runners)
+ finally:
+ os.chdir(startdir)
+ return self.total_failure_count()
+
+ @staticmethod
+ def split_suite_string(suite: str) -> T.Tuple[str, str]:
+ if ':' in suite:
+ split = suite.split(':', 1)
+ assert len(split) == 2
+ return split[0], split[1]
+ else:
+ return suite, ""
+
+ @staticmethod
+ def test_in_suites(test: TestSerialisation, suites: T.List[str]) -> bool:
+ for suite in suites:
+ (prj_match, st_match) = TestHarness.split_suite_string(suite)
+ for prjst in test.suite:
+ (prj, st) = TestHarness.split_suite_string(prjst)
+
+ # the SUITE can be passed as
+ # suite_name
+ # or
+ # project_name:suite_name
+ # so we need to select only the test belonging to project_name
+
+ # this if handle the first case (i.e., SUITE == suite_name)
+
+ # in this way we can run tests belonging to different
+ # (sub)projects which share the same suite_name
+ if not st_match and st == prj_match:
+ return True
+
+ # these two conditions are needed to handle the second option
+ # i.e., SUITE == project_name:suite_name
+
+ # in this way we select the only the tests of
+ # project_name with suite_name
+ if prj_match and prj != prj_match:
+ continue
+ if st_match and st != st_match:
+ continue
+ return True
+ return False
+
+ def test_suitable(self, test: TestSerialisation) -> bool:
+ if TestHarness.test_in_suites(test, self.options.exclude_suites):
+ return False
+
+ if self.options.include_suites:
+ # Both force inclusion (overriding add_test_setup) and exclude
+ # everything else
+ return TestHarness.test_in_suites(test, self.options.include_suites)
+
+ if self.options.setup:
+ setup = self.get_test_setup(test)
+ if TestHarness.test_in_suites(test, setup.exclude_suites):
+ return False
+
+ return True
+
+ def tests_from_args(self, tests: T.List[TestSerialisation]) -> T.Generator[TestSerialisation, None, None]:
+ '''
+ Allow specifying test names like "meson test foo1 foo2", where test('foo1', ...)
+
+ Also support specifying the subproject to run tests from like
+ "meson test subproj:" (all tests inside subproj) or "meson test subproj:foo1"
+ to run foo1 inside subproj. Coincidentally also "meson test :foo1" to
+ run all tests with that name across all subprojects, which is
+ identical to "meson test foo1"
+ '''
+ for arg in self.options.args:
+ if ':' in arg:
+ subproj, name = arg.split(':', maxsplit=1)
+ else:
+ subproj, name = '', arg
+ for t in tests:
+ if subproj and t.project_name != subproj:
+ continue
+ if name and t.name != name:
+ continue
+ yield t
+
+ def get_tests(self) -> T.List[TestSerialisation]:
+ if not self.tests:
+ print('No tests defined.')
+ return []
+
+ tests = [t for t in self.tests if self.test_suitable(t)]
+ if self.options.args:
+ tests = list(self.tests_from_args(tests))
+
+ if not tests:
+ print('No suitable tests defined.')
+ return []
+
+ return tests
+
+ def flush_logfiles(self) -> None:
+ for l in self.loggers:
+ l.flush()
+
+ def open_logfiles(self) -> None:
+ if not self.logfile_base:
+ return
+
+ self.loggers.append(JunitBuilder(self.logfile_base + '.junit.xml'))
+ self.loggers.append(JsonLogfileBuilder(self.logfile_base + '.json'))
+ self.loggers.append(TextLogfileBuilder(self.logfile_base + '.txt', errors='surrogateescape'))
+
+ @staticmethod
+ def get_wrapper(options: argparse.Namespace) -> T.List[str]:
+ wrap = [] # type: T.List[str]
+ if options.gdb:
+ wrap = [options.gdb_path, '--quiet']
+ if options.repeat > 1:
+ wrap += ['-ex', 'run', '-ex', 'quit']
+ # Signal the end of arguments to gdb
+ wrap += ['--args']
+ if options.wrapper:
+ wrap += options.wrapper
+ return wrap
+
+ def get_pretty_suite(self, test: TestSerialisation) -> str:
+ if len(self.suites) > 1 and test.suite:
+ rv = TestHarness.split_suite_string(test.suite[0])[0]
+ s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite)
+ if s:
+ rv += ":"
+ return rv + s + " / " + test.name
+ else:
+ return test.name
+
+ def run_tests(self, runners: T.List[SingleTestRunner]) -> None:
+ try:
+ self.open_logfiles()
+ # Replace with asyncio.run once we can require Python 3.7
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(self._run_tests(runners))
+ finally:
+ self.close_logfiles()
+
+ def log_subtest(self, test: TestRun, s: str, res: TestResult) -> None:
+ for l in self.loggers:
+ l.log_subtest(self, test, s, res)
+
+ def log_start_test(self, test: TestRun) -> None:
+ for l in self.loggers:
+ l.start_test(self, test)
+
+ async def _run_tests(self, runners: T.List[SingleTestRunner]) -> None:
+ semaphore = asyncio.Semaphore(self.options.num_processes)
+ futures = deque() # type: T.Deque[asyncio.Future]
+ running_tests = {} # type: T.Dict[asyncio.Future, str]
+ interrupted = False
+ ctrlc_times = deque(maxlen=MAX_CTRLC) # type: T.Deque[float]
+
+ async def run_test(test: SingleTestRunner) -> None:
+ async with semaphore:
+ if interrupted or (self.options.repeat > 1 and self.fail_count):
+ return
+ res = await test.run(self)
+ self.process_test_result(res)
+ maxfail = self.options.maxfail
+ if maxfail and self.fail_count >= maxfail and res.res.is_bad():
+ cancel_all_tests()
+
+ def test_done(f: asyncio.Future) -> None:
+ if not f.cancelled():
+ f.result()
+ futures.remove(f)
+ try:
+ del running_tests[f]
+ except KeyError:
+ pass
+
+ def cancel_one_test(warn: bool) -> None:
+ future = futures.popleft()
+ futures.append(future)
+ if warn:
+ self.flush_logfiles()
+ mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future]))
+ del running_tests[future]
+ future.cancel()
+
+ def cancel_all_tests() -> None:
+ nonlocal interrupted
+ interrupted = True
+ while running_tests:
+ cancel_one_test(False)
+
+ def sigterm_handler() -> None:
+ if interrupted:
+ return
+ self.flush_logfiles()
+ mlog.warning('Received SIGTERM, exiting')
+ cancel_all_tests()
+
+ def sigint_handler() -> None:
+ # We always pick the longest-running future that has not been cancelled
+ # If all the tests have been CTRL-C'ed, just stop
+ nonlocal interrupted
+ if interrupted:
+ return
+ ctrlc_times.append(asyncio.get_event_loop().time())
+ if len(ctrlc_times) == MAX_CTRLC and ctrlc_times[-1] - ctrlc_times[0] < 1:
+ self.flush_logfiles()
+ mlog.warning('CTRL-C detected, exiting')
+ cancel_all_tests()
+ elif running_tests:
+ cancel_one_test(True)
+ else:
+ self.flush_logfiles()
+ mlog.warning('CTRL-C detected, exiting')
+ interrupted = True
+
+ for l in self.loggers:
+ l.start(self)
+
+ if sys.platform != 'win32':
+ if os.getpgid(0) == os.getpid():
+ asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigint_handler)
+ else:
+ asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigterm_handler)
+ asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler)
+ try:
+ for runner in runners:
+ if not runner.is_parallel:
+ await complete_all(futures)
+ future = asyncio.ensure_future(run_test(runner))
+ futures.append(future)
+ running_tests[future] = runner.visible_name
+ future.add_done_callback(test_done)
+ if not runner.is_parallel:
+ await complete(future)
+ if self.options.repeat > 1 and self.fail_count:
+ break
+
+ await complete_all(futures)
+ finally:
+ if sys.platform != 'win32':
+ asyncio.get_event_loop().remove_signal_handler(signal.SIGINT)
+ asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM)
+ for l in self.loggers:
+ await l.finish(self)
+
+def list_tests(th: TestHarness) -> bool:
+ tests = th.get_tests()
+ for t in tests:
+ print(th.get_pretty_suite(t))
+ return not tests
+
+def rebuild_deps(ninja: T.List[str], wd: str, tests: T.List[TestSerialisation]) -> bool:
+ def convert_path_to_target(path: str) -> str:
+ path = os.path.relpath(path, wd)
+ if os.sep != '/':
+ path = path.replace(os.sep, '/')
+ return path
+
+ assert len(ninja) > 0
+
+ depends = set() # type: T.Set[str]
+ targets = set() # type: T.Set[str]
+ intro_targets = {} # type: T.Dict[str, T.List[str]]
+ for target in load_info_file(get_infodir(wd), kind='targets'):
+ intro_targets[target['id']] = [
+ convert_path_to_target(f)
+ for f in target['filename']]
+ for t in tests:
+ for d in t.depends:
+ if d in depends:
+ continue
+ depends.update(d)
+ targets.update(intro_targets[d])
+
+ ret = subprocess.run(ninja + ['-C', wd] + sorted(targets)).returncode
+ if ret != 0:
+ print(f'Could not rebuild {wd}')
+ return False
+
+ return True
+
+def run(options: argparse.Namespace) -> int:
+ if options.benchmark:
+ options.num_processes = 1
+
+ if options.verbose and options.quiet:
+ print('Can not be both quiet and verbose at the same time.')
+ return 1
+
+ check_bin = None
+ if options.gdb:
+ options.verbose = True
+ if options.wrapper:
+ print('Must not specify both a wrapper and gdb at the same time.')
+ return 1
+ check_bin = 'gdb'
+
+ if options.wrapper:
+ check_bin = options.wrapper[0]
+
+ if sys.platform == 'win32':
+ loop = asyncio.ProactorEventLoop()
+ asyncio.set_event_loop(loop)
+
+ if check_bin is not None:
+ exe = ExternalProgram(check_bin, silent=True)
+ if not exe.found():
+ print(f'Could not find requested program: {check_bin!r}')
+ return 1
+
+ b = build.load(options.wd)
+ setup_vsenv(b.need_vsenv)
+
+ with TestHarness(options) as th:
+ try:
+ if options.list:
+ return list_tests(th)
+ return th.doit()
+ except TestException as e:
+ print('Meson test encountered an error:\n')
+ if os.environ.get('MESON_FORCE_BACKTRACE'):
+ raise e
+ else:
+ print(e)
+ return 1
+
+def run_with_args(args: T.List[str]) -> int:
+ parser = argparse.ArgumentParser(prog='meson test')
+ add_arguments(parser)
+ options = parser.parse_args(args)
+ return run(options)