summaryrefslogtreecommitdiffstats
path: root/mesonbuild/mlog.py
diff options
context:
space:
mode:
Diffstat (limited to 'mesonbuild/mlog.py')
-rw-r--r--mesonbuild/mlog.py465
1 files changed, 465 insertions, 0 deletions
diff --git a/mesonbuild/mlog.py b/mesonbuild/mlog.py
new file mode 100644
index 0000000..e9c4017
--- /dev/null
+++ b/mesonbuild/mlog.py
@@ -0,0 +1,465 @@
+# Copyright 2013-2014 The Meson development team
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import io
+import sys
+import time
+import platform
+import shlex
+import subprocess
+import shutil
+import typing as T
+from contextlib import contextmanager
+from pathlib import Path
+
+if T.TYPE_CHECKING:
+ from ._typing import StringProtocol, SizedStringProtocol
+
+"""This is (mostly) a standalone module used to write logging
+information about Meson runs. Some output goes to screen,
+some to logging dir and some goes to both."""
+
+def is_windows() -> bool:
+ platname = platform.system().lower()
+ return platname == 'windows'
+
+def _windows_ansi() -> bool:
+ # windll only exists on windows, so mypy will get mad
+ from ctypes import windll, byref # type: ignore
+ from ctypes.wintypes import DWORD
+
+ kernel = windll.kernel32
+ stdout = kernel.GetStdHandle(-11)
+ mode = DWORD()
+ if not kernel.GetConsoleMode(stdout, byref(mode)):
+ return False
+ # ENABLE_VIRTUAL_TERMINAL_PROCESSING == 0x4
+ # If the call to enable VT processing fails (returns 0), we fallback to
+ # original behavior
+ return bool(kernel.SetConsoleMode(stdout, mode.value | 0x4) or os.environ.get('ANSICON'))
+
+def colorize_console() -> bool:
+ _colorize_console = getattr(sys.stdout, 'colorize_console', None) # type: bool
+ if _colorize_console is not None:
+ return _colorize_console
+
+ try:
+ if is_windows():
+ _colorize_console = os.isatty(sys.stdout.fileno()) and _windows_ansi()
+ else:
+ _colorize_console = os.isatty(sys.stdout.fileno()) and os.environ.get('TERM', 'dumb') != 'dumb'
+ except Exception:
+ _colorize_console = False
+
+ sys.stdout.colorize_console = _colorize_console # type: ignore[attr-defined]
+ return _colorize_console
+
+def setup_console() -> None:
+ # on Windows, a subprocess might call SetConsoleMode() on the console
+ # connected to stdout and turn off ANSI escape processing. Call this after
+ # running a subprocess to ensure we turn it on again.
+ if is_windows():
+ try:
+ delattr(sys.stdout, 'colorize_console')
+ except AttributeError:
+ pass
+
+log_dir = None # type: T.Optional[str]
+log_file = None # type: T.Optional[T.TextIO]
+log_fname = 'meson-log.txt' # type: str
+log_depth = [] # type: T.List[str]
+log_timestamp_start = None # type: T.Optional[float]
+log_fatal_warnings = False # type: bool
+log_disable_stdout = False # type: bool
+log_errors_only = False # type: bool
+_in_ci = 'CI' in os.environ # type: bool
+_logged_once = set() # type: T.Set[T.Tuple[str, ...]]
+log_warnings_counter = 0 # type: int
+log_pager: T.Optional['subprocess.Popen'] = None
+
+def disable() -> None:
+ global log_disable_stdout # pylint: disable=global-statement
+ log_disable_stdout = True
+
+def enable() -> None:
+ global log_disable_stdout # pylint: disable=global-statement
+ log_disable_stdout = False
+
+def set_quiet() -> None:
+ global log_errors_only # pylint: disable=global-statement
+ log_errors_only = True
+
+def set_verbose() -> None:
+ global log_errors_only # pylint: disable=global-statement
+ log_errors_only = False
+
+def initialize(logdir: str, fatal_warnings: bool = False) -> None:
+ global log_dir, log_file, log_fatal_warnings # pylint: disable=global-statement
+ log_dir = logdir
+ log_file = open(os.path.join(logdir, log_fname), 'w', encoding='utf-8')
+ log_fatal_warnings = fatal_warnings
+
+def set_timestamp_start(start: float) -> None:
+ global log_timestamp_start # pylint: disable=global-statement
+ log_timestamp_start = start
+
+def shutdown() -> T.Optional[str]:
+ global log_file # pylint: disable=global-statement
+ if log_file is not None:
+ path = log_file.name
+ exception_around_goer = log_file
+ log_file = None
+ exception_around_goer.close()
+ return path
+ stop_pager()
+ return None
+
+class AnsiDecorator:
+ plain_code = "\033[0m"
+
+ def __init__(self, text: str, code: str, quoted: bool = False):
+ self.text = text
+ self.code = code
+ self.quoted = quoted
+
+ def get_text(self, with_codes: bool) -> str:
+ text = self.text
+ if with_codes and self.code:
+ text = self.code + self.text + AnsiDecorator.plain_code
+ if self.quoted:
+ text = f'"{text}"'
+ return text
+
+ def __len__(self) -> int:
+ return len(self.text)
+
+ def __str__(self) -> str:
+ return self.get_text(colorize_console())
+
+TV_Loggable = T.Union[str, AnsiDecorator, 'StringProtocol']
+TV_LoggableList = T.List[TV_Loggable]
+
+class AnsiText:
+ def __init__(self, *args: 'SizedStringProtocol'):
+ self.args = args
+
+ def __len__(self) -> int:
+ return sum(len(x) for x in self.args)
+
+ def __str__(self) -> str:
+ return ''.join(str(x) for x in self.args)
+
+
+def bold(text: str, quoted: bool = False) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[1m", quoted=quoted)
+
+def italic(text: str, quoted: bool = False) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[3m", quoted=quoted)
+
+def plain(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "")
+
+def red(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[1;31m")
+
+def green(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[1;32m")
+
+def yellow(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[1;33m")
+
+def blue(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[1;34m")
+
+def cyan(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[1;36m")
+
+def normal_red(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[31m")
+
+def normal_green(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[32m")
+
+def normal_yellow(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[33m")
+
+def normal_blue(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[34m")
+
+def normal_cyan(text: str) -> AnsiDecorator:
+ return AnsiDecorator(text, "\033[36m")
+
+# This really should be AnsiDecorator or anything that implements
+# __str__(), but that requires protocols from typing_extensions
+def process_markup(args: T.Sequence[TV_Loggable], keep: bool) -> T.List[str]:
+ arr = [] # type: T.List[str]
+ if log_timestamp_start is not None:
+ arr = ['[{:.3f}]'.format(time.monotonic() - log_timestamp_start)]
+ for arg in args:
+ if arg is None:
+ continue
+ if isinstance(arg, str):
+ arr.append(arg)
+ elif isinstance(arg, AnsiDecorator):
+ arr.append(arg.get_text(keep))
+ else:
+ arr.append(str(arg))
+ return arr
+
+def force_print(*args: str, nested: bool, **kwargs: T.Any) -> None:
+ if log_disable_stdout:
+ return
+ iostr = io.StringIO()
+ kwargs['file'] = iostr
+ print(*args, **kwargs)
+
+ raw = iostr.getvalue()
+ if log_depth:
+ prepend = log_depth[-1] + '| ' if nested else ''
+ lines = []
+ for l in raw.split('\n'):
+ l = l.strip()
+ lines.append(prepend + l if l else '')
+ raw = '\n'.join(lines)
+
+ # _Something_ is going to get printed.
+ try:
+ output = log_pager.stdin if log_pager else None
+ print(raw, end='', file=output)
+ except UnicodeEncodeError:
+ cleaned = raw.encode('ascii', 'replace').decode('ascii')
+ print(cleaned, end='')
+
+# We really want a heterogeneous dict for this, but that's in typing_extensions
+def debug(*args: TV_Loggable, **kwargs: T.Any) -> None:
+ arr = process_markup(args, False)
+ if log_file is not None:
+ print(*arr, file=log_file, **kwargs)
+ log_file.flush()
+
+def _debug_log_cmd(cmd: str, args: T.List[str]) -> None:
+ if not _in_ci:
+ return
+ args = [f'"{x}"' for x in args] # Quote all args, just in case
+ debug('!meson_ci!/{} {}'.format(cmd, ' '.join(args)))
+
+def cmd_ci_include(file: str) -> None:
+ _debug_log_cmd('ci_include', [file])
+
+
+def log(*args: TV_Loggable, is_error: bool = False,
+ once: bool = False, **kwargs: T.Any) -> None:
+ if once:
+ log_once(*args, is_error=is_error, **kwargs)
+ else:
+ _log(*args, is_error=is_error, **kwargs)
+
+
+def _log(*args: TV_Loggable, is_error: bool = False,
+ **kwargs: T.Any) -> None:
+ nested = kwargs.pop('nested', True)
+ arr = process_markup(args, False)
+ if log_file is not None:
+ print(*arr, file=log_file, **kwargs)
+ log_file.flush()
+ if colorize_console():
+ arr = process_markup(args, True)
+ if not log_errors_only or is_error:
+ force_print(*arr, nested=nested, **kwargs)
+
+def log_once(*args: TV_Loggable, is_error: bool = False,
+ **kwargs: T.Any) -> None:
+ """Log variant that only prints a given message one time per meson invocation.
+
+ This considers ansi decorated values by the values they wrap without
+ regard for the AnsiDecorator itself.
+ """
+ def to_str(x: TV_Loggable) -> str:
+ if isinstance(x, str):
+ return x
+ if isinstance(x, AnsiDecorator):
+ return x.text
+ return str(x)
+ t = tuple(to_str(a) for a in args)
+ if t in _logged_once:
+ return
+ _logged_once.add(t)
+ _log(*args, is_error=is_error, **kwargs)
+
+# This isn't strictly correct. What we really want here is something like:
+# class StringProtocol(typing_extensions.Protocol):
+#
+# def __str__(self) -> str: ...
+#
+# This would more accurately embody what this function can handle, but we
+# don't have that yet, so instead we'll do some casting to work around it
+def get_error_location_string(fname: str, lineno: int) -> str:
+ return f'{fname}:{lineno}:'
+
+def _log_error(severity: str, *rargs: TV_Loggable,
+ once: bool = False, fatal: bool = True, **kwargs: T.Any) -> None:
+ from .mesonlib import MesonException, relpath
+
+ # The typing requirements here are non-obvious. Lists are invariant,
+ # therefore T.List[A] and T.List[T.Union[A, B]] are not able to be joined
+ if severity == 'notice':
+ label = [bold('NOTICE:')] # type: TV_LoggableList
+ elif severity == 'warning':
+ label = [yellow('WARNING:')]
+ elif severity == 'error':
+ label = [red('ERROR:')]
+ elif severity == 'deprecation':
+ label = [red('DEPRECATION:')]
+ else:
+ raise MesonException('Invalid severity ' + severity)
+ # rargs is a tuple, not a list
+ args = label + list(rargs)
+
+ location = kwargs.pop('location', None)
+ if location is not None:
+ location_file = relpath(location.filename, os.getcwd())
+ location_str = get_error_location_string(location_file, location.lineno)
+ # Unions are frankly awful, and we have to T.cast here to get mypy
+ # to understand that the list concatenation is safe
+ location_list = T.cast('TV_LoggableList', [location_str])
+ args = location_list + args
+
+ log(*args, once=once, **kwargs)
+
+ global log_warnings_counter # pylint: disable=global-statement
+ log_warnings_counter += 1
+
+ if log_fatal_warnings and fatal:
+ raise MesonException("Fatal warnings enabled, aborting")
+
+def error(*args: TV_Loggable, **kwargs: T.Any) -> None:
+ return _log_error('error', *args, **kwargs, is_error=True)
+
+def warning(*args: TV_Loggable, **kwargs: T.Any) -> None:
+ return _log_error('warning', *args, **kwargs, is_error=True)
+
+def deprecation(*args: TV_Loggable, **kwargs: T.Any) -> None:
+ return _log_error('deprecation', *args, **kwargs, is_error=True)
+
+def notice(*args: TV_Loggable, **kwargs: T.Any) -> None:
+ return _log_error('notice', *args, **kwargs, is_error=False)
+
+def get_relative_path(target: Path, current: Path) -> Path:
+ """Get the path to target from current"""
+ # Go up "current" until we find a common ancestor to target
+ acc = ['.']
+ for part in [current, *current.parents]:
+ try:
+ path = target.relative_to(part)
+ return Path(*acc, path)
+ except ValueError:
+ pass
+ acc += ['..']
+
+ # we failed, should not get here
+ return target
+
+def exception(e: Exception, prefix: T.Optional[AnsiDecorator] = None) -> None:
+ if prefix is None:
+ prefix = red('ERROR:')
+ log()
+ args = [] # type: T.List[T.Union[AnsiDecorator, str]]
+ if all(getattr(e, a, None) is not None for a in ['file', 'lineno', 'colno']):
+ # Mypy doesn't follow hasattr, and it's pretty easy to visually inspect
+ # that this is correct, so we'll just ignore it.
+ path = get_relative_path(Path(e.file), Path(os.getcwd())) # type: ignore
+ args.append(f'{path}:{e.lineno}:{e.colno}:') # type: ignore
+ if prefix:
+ args.append(prefix)
+ args.append(str(e))
+
+ restore = log_disable_stdout
+ if restore:
+ enable()
+ log(*args, is_error=True)
+ if restore:
+ disable()
+
+# Format a list for logging purposes as a string. It separates
+# all but the last item with commas, and the last with 'and'.
+def format_list(input_list: T.List[str]) -> str:
+ l = len(input_list)
+ if l > 2:
+ return ' and '.join([', '.join(input_list[:-1]), input_list[-1]])
+ elif l == 2:
+ return ' and '.join(input_list)
+ elif l == 1:
+ return input_list[0]
+ else:
+ return ''
+
+@contextmanager
+def nested(name: str = '') -> T.Generator[None, None, None]:
+ log_depth.append(name)
+ try:
+ yield
+ finally:
+ log_depth.pop()
+
+def start_pager() -> None:
+ if not colorize_console():
+ return
+ pager_cmd = []
+ if 'PAGER' in os.environ:
+ pager_cmd = shlex.split(os.environ['PAGER'])
+ else:
+ less = shutil.which('less')
+ if not less and is_windows():
+ git = shutil.which('git')
+ if git:
+ path = Path(git).parents[1] / 'usr' / 'bin'
+ less = shutil.which('less', path=str(path))
+ if less:
+ pager_cmd = [less]
+ if not pager_cmd:
+ return
+ global log_pager # pylint: disable=global-statement
+ assert log_pager is None
+ try:
+ # Set 'LESS' environment variable, rather than arguments in
+ # pager_cmd, to also support the case where the user has 'PAGER'
+ # set to 'less'. Arguments set are:
+ # "R" : support color
+ # "X" : do not clear the screen when leaving the pager
+ # "F" : skip the pager if content fits into the screen
+ env = os.environ.copy()
+ if 'LESS' not in env:
+ env['LESS'] = 'RXF'
+ # Set "-c" for lv to support color
+ if 'LV' not in env:
+ env['LV'] = '-c'
+ log_pager = subprocess.Popen(pager_cmd, stdin=subprocess.PIPE,
+ text=True, encoding='utf-8', env=env)
+ except Exception as e:
+ # Ignore errors, unless it is a user defined pager.
+ if 'PAGER' in os.environ:
+ from .mesonlib import MesonException
+ raise MesonException(f'Failed to start pager: {str(e)}')
+
+def stop_pager() -> None:
+ global log_pager # pylint: disable=global-statement
+ if log_pager:
+ try:
+ log_pager.stdin.flush()
+ log_pager.stdin.close()
+ except BrokenPipeError:
+ pass
+ log_pager.wait()
+ log_pager = None