From cf7da1843c45a4c2df7a749f7886a2d2ba0ee92a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 19:25:40 +0200 Subject: Adding upstream version 7.2.6. Signed-off-by: Daniel Baumann --- sphinx/util/logging.py | 602 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 602 insertions(+) create mode 100644 sphinx/util/logging.py (limited to 'sphinx/util/logging.py') diff --git a/sphinx/util/logging.py b/sphinx/util/logging.py new file mode 100644 index 0000000..429018a --- /dev/null +++ b/sphinx/util/logging.py @@ -0,0 +1,602 @@ +"""Logging utility functions for Sphinx.""" + +from __future__ import annotations + +import logging +import logging.handlers +from collections import defaultdict +from contextlib import contextmanager +from typing import IO, TYPE_CHECKING, Any + +from docutils import nodes +from docutils.utils import get_source_line + +from sphinx.errors import SphinxWarning +from sphinx.util.console import colorize +from sphinx.util.osutil import abspath + +if TYPE_CHECKING: + from collections.abc import Generator + + from docutils.nodes import Node + + from sphinx.application import Sphinx + + +NAMESPACE = 'sphinx' +VERBOSE = 15 + +LEVEL_NAMES: defaultdict[str, int] = defaultdict(lambda: logging.WARNING, { + 'CRITICAL': logging.CRITICAL, + 'SEVERE': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'VERBOSE': VERBOSE, + 'DEBUG': logging.DEBUG, +}) + +VERBOSITY_MAP: defaultdict[int, int] = defaultdict(lambda: logging.NOTSET, { + 0: logging.INFO, + 1: VERBOSE, + 2: logging.DEBUG, +}) + +COLOR_MAP: defaultdict[int, str] = defaultdict(lambda: 'blue', { + logging.ERROR: 'darkred', + logging.WARNING: 'red', + logging.DEBUG: 'darkgray', +}) + + +def getLogger(name: str) -> SphinxLoggerAdapter: + """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`. + + Sphinx logger always uses ``sphinx.*`` namespace to be independent from + settings of root logger. It ensures logging is consistent even if a + third-party extension or imported application resets logger settings. + + Example usage:: + + >>> from sphinx.util import logging + >>> logger = logging.getLogger(__name__) + >>> logger.info('Hello, this is an extension!') + Hello, this is an extension! + """ + # add sphinx prefix to name forcely + logger = logging.getLogger(NAMESPACE + '.' + name) + # Forcely enable logger + logger.disabled = False + # wrap logger by SphinxLoggerAdapter + return SphinxLoggerAdapter(logger, {}) + + +def convert_serializable(records: list[logging.LogRecord]) -> None: + """Convert LogRecord serializable.""" + for r in records: + # extract arguments to a message and clear them + r.msg = r.getMessage() + r.args = () + + location = getattr(r, 'location', None) + if isinstance(location, nodes.Node): + r.location = get_node_location(location) + + +class SphinxLogRecord(logging.LogRecord): + """Log record class supporting location""" + prefix = '' + location: Any = None + + def getMessage(self) -> str: + message = super().getMessage() + location = getattr(self, 'location', None) + if location: + message = f'{location}: {self.prefix}{message}' + elif self.prefix not in message: + message = self.prefix + message + + return message + + +class SphinxInfoLogRecord(SphinxLogRecord): + """Info log record class supporting location""" + prefix = '' # do not show any prefix for INFO messages + + +class SphinxWarningLogRecord(SphinxLogRecord): + """Warning log record class supporting location""" + @property + def prefix(self) -> str: # type: ignore[override] + if self.levelno >= logging.CRITICAL: + return 'CRITICAL: ' + elif self.levelno >= logging.ERROR: + return 'ERROR: ' + else: + return 'WARNING: ' + + +class SphinxLoggerAdapter(logging.LoggerAdapter): + """LoggerAdapter allowing ``type`` and ``subtype`` keywords.""" + KEYWORDS = ['type', 'subtype', 'location', 'nonl', 'color', 'once'] + + def log( # type: ignore[override] + self, level: int | str, msg: str, *args: Any, **kwargs: Any, + ) -> None: + if isinstance(level, int): + super().log(level, msg, *args, **kwargs) + else: + levelno = LEVEL_NAMES[level] + super().log(levelno, msg, *args, **kwargs) + + def verbose(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.log(VERBOSE, msg, *args, **kwargs) + + def process(self, msg: str, kwargs: dict) -> tuple[str, dict]: # type: ignore[override] + extra = kwargs.setdefault('extra', {}) + for keyword in self.KEYWORDS: + if keyword in kwargs: + extra[keyword] = kwargs.pop(keyword) + + return msg, kwargs + + def handle(self, record: logging.LogRecord) -> None: + self.logger.handle(record) + + +class WarningStreamHandler(logging.StreamHandler): + """StreamHandler for warnings.""" + pass + + +class NewLineStreamHandler(logging.StreamHandler): + """StreamHandler which switches line terminator by record.nonl flag.""" + + def emit(self, record: logging.LogRecord) -> None: + try: + self.acquire() + if getattr(record, 'nonl', False): + # skip appending terminator when nonl=True + self.terminator = '' + super().emit(record) + finally: + self.terminator = '\n' + self.release() + + +class MemoryHandler(logging.handlers.BufferingHandler): + """Handler buffering all logs.""" + + buffer: list[logging.LogRecord] + + def __init__(self) -> None: + super().__init__(-1) + + def shouldFlush(self, record: logging.LogRecord) -> bool: + return False # never flush + + def flush(self) -> None: + # suppress any flushes triggered by importing packages that flush + # all handlers at initialization time + pass + + def flushTo(self, logger: logging.Logger) -> None: + self.acquire() + try: + for record in self.buffer: + logger.handle(record) + self.buffer = [] + finally: + self.release() + + def clear(self) -> list[logging.LogRecord]: + buffer, self.buffer = self.buffer, [] + return buffer + + +@contextmanager +def pending_warnings() -> Generator[logging.Handler, None, None]: + """Context manager to postpone logging warnings temporarily. + + Similar to :func:`pending_logging`. + """ + logger = logging.getLogger(NAMESPACE) + memhandler = MemoryHandler() + memhandler.setLevel(logging.WARNING) + + try: + handlers = [] + for handler in logger.handlers[:]: + if isinstance(handler, WarningStreamHandler): + logger.removeHandler(handler) + handlers.append(handler) + + logger.addHandler(memhandler) + yield memhandler + finally: + logger.removeHandler(memhandler) + + for handler in handlers: + logger.addHandler(handler) + + memhandler.flushTo(logger) + + +@contextmanager +def suppress_logging() -> Generator[MemoryHandler, None, None]: + """Context manager to suppress logging all logs temporarily. + + For example:: + + >>> with suppress_logging(): + >>> logger.warning('Warning message!') # suppressed + >>> some_long_process() + >>> + """ + logger = logging.getLogger(NAMESPACE) + memhandler = MemoryHandler() + + try: + handlers = [] + for handler in logger.handlers[:]: + logger.removeHandler(handler) + handlers.append(handler) + + logger.addHandler(memhandler) + yield memhandler + finally: + logger.removeHandler(memhandler) + + for handler in handlers: + logger.addHandler(handler) + + +@contextmanager +def pending_logging() -> Generator[MemoryHandler, None, None]: + """Context manager to postpone logging all logs temporarily. + + For example:: + + >>> with pending_logging(): + >>> logger.warning('Warning message!') # not flushed yet + >>> some_long_process() + >>> + Warning message! # the warning is flushed here + """ + logger = logging.getLogger(NAMESPACE) + try: + with suppress_logging() as memhandler: + yield memhandler + finally: + memhandler.flushTo(logger) + + +@contextmanager +def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]: + """Context manager to skip WarningIsErrorFilter temporarily.""" + logger = logging.getLogger(NAMESPACE) + + if skip is False: + yield + else: + try: + disabler = DisableWarningIsErrorFilter() + for handler in logger.handlers: + # use internal method; filters.insert() directly to install disabler + # before WarningIsErrorFilter + handler.filters.insert(0, disabler) + yield + finally: + for handler in logger.handlers: + handler.removeFilter(disabler) + + +@contextmanager +def prefixed_warnings(prefix: str) -> Generator[None, None, None]: + """Context manager to prepend prefix to all warning log records temporarily. + + For example:: + + >>> with prefixed_warnings("prefix:"): + >>> logger.warning('Warning message!') # => prefix: Warning message! + + .. versionadded:: 2.0 + """ + logger = logging.getLogger(NAMESPACE) + warning_handler = None + for handler in logger.handlers: + if isinstance(handler, WarningStreamHandler): + warning_handler = handler + break + else: + # warning stream not found + yield + return + + prefix_filter = None + for _filter in warning_handler.filters: + if isinstance(_filter, MessagePrefixFilter): + prefix_filter = _filter + break + + if prefix_filter: + # already prefixed + try: + previous = prefix_filter.prefix + prefix_filter.prefix = prefix + yield + finally: + prefix_filter.prefix = previous + else: + # not prefixed yet + prefix_filter = MessagePrefixFilter(prefix) + try: + warning_handler.addFilter(prefix_filter) + yield + finally: + warning_handler.removeFilter(prefix_filter) + + +class LogCollector: + def __init__(self) -> None: + self.logs: list[logging.LogRecord] = [] + + @contextmanager + def collect(self) -> Generator[None, None, None]: + with pending_logging() as memhandler: + yield + + self.logs = memhandler.clear() + + +class InfoFilter(logging.Filter): + """Filter error and warning messages.""" + + def filter(self, record: logging.LogRecord) -> bool: + return record.levelno < logging.WARNING + + +def is_suppressed_warning(type: str, subtype: str, suppress_warnings: list[str]) -> bool: + """Check whether the warning is suppressed or not.""" + if type is None: + return False + + subtarget: str | None + + for warning_type in suppress_warnings: + if '.' in warning_type: + target, subtarget = warning_type.split('.', 1) + else: + target, subtarget = warning_type, None + + if target == type and subtarget in (None, subtype, "*"): + return True + + return False + + +class WarningSuppressor(logging.Filter): + """Filter logs by `suppress_warnings`.""" + + def __init__(self, app: Sphinx) -> None: + self.app = app + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + type = getattr(record, 'type', '') + subtype = getattr(record, 'subtype', '') + + try: + suppress_warnings = self.app.config.suppress_warnings + except AttributeError: + # config is not initialized yet (ex. in conf.py) + suppress_warnings = [] + + if is_suppressed_warning(type, subtype, suppress_warnings): + return False + else: + self.app._warncount += 1 + return True + + +class WarningIsErrorFilter(logging.Filter): + """Raise exception if warning emitted.""" + + def __init__(self, app: Sphinx) -> None: + self.app = app + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + if getattr(record, 'skip_warningsiserror', False): + # disabled by DisableWarningIsErrorFilter + return True + elif self.app.warningiserror: + location = getattr(record, 'location', '') + try: + message = record.msg % record.args + except (TypeError, ValueError): + message = record.msg # use record.msg itself + + if location: + exc = SphinxWarning(location + ":" + str(message)) + else: + exc = SphinxWarning(message) + if record.exc_info is not None: + raise exc from record.exc_info[1] + else: + raise exc + else: + return True + + +class DisableWarningIsErrorFilter(logging.Filter): + """Disable WarningIsErrorFilter if this filter installed.""" + + def filter(self, record: logging.LogRecord) -> bool: + record.skip_warningsiserror = True + return True + + +class MessagePrefixFilter(logging.Filter): + """Prepend prefix to all log records.""" + + def __init__(self, prefix: str) -> None: + self.prefix = prefix + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + if self.prefix: + record.msg = self.prefix + ' ' + record.msg + return True + + +class OnceFilter(logging.Filter): + """Show the message only once.""" + + def __init__(self, name: str = '') -> None: + super().__init__(name) + self.messages: dict[str, list] = {} + + def filter(self, record: logging.LogRecord) -> bool: + once = getattr(record, 'once', '') + if not once: + return True + else: + params = self.messages.setdefault(record.msg, []) + if record.args in params: + return False + + params.append(record.args) + return True + + +class SphinxLogRecordTranslator(logging.Filter): + """Converts a log record to one Sphinx expects + + * Make a instance of SphinxLogRecord + * docname to path if location given + """ + LogRecordClass: type[logging.LogRecord] + + def __init__(self, app: Sphinx) -> None: + self.app = app + super().__init__() + + def filter(self, record: SphinxWarningLogRecord) -> bool: # type: ignore[override] + if isinstance(record, logging.LogRecord): + # force subclassing to handle location + record.__class__ = self.LogRecordClass # type: ignore[assignment] + + location = getattr(record, 'location', None) + if isinstance(location, tuple): + docname, lineno = location + if docname: + if lineno: + record.location = f'{self.app.env.doc2path(docname)}:{lineno}' + else: + record.location = f'{self.app.env.doc2path(docname)}' + else: + record.location = None + elif isinstance(location, nodes.Node): + record.location = get_node_location(location) + elif location and ':' not in location: + record.location = f'{self.app.env.doc2path(location)}' + + return True + + +class InfoLogRecordTranslator(SphinxLogRecordTranslator): + """LogRecordTranslator for INFO level log records.""" + LogRecordClass = SphinxInfoLogRecord + + +class WarningLogRecordTranslator(SphinxLogRecordTranslator): + """LogRecordTranslator for WARNING level log records.""" + LogRecordClass = SphinxWarningLogRecord + + +def get_node_location(node: Node) -> str | None: + source, line = get_source_line(node) + if source: + source = abspath(source) + if source and line: + return f"{source}:{line}" + if source: + return f"{source}:" + if line: + return f":{line}" + return None + + +class ColorizeFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + message = super().format(record) + color = getattr(record, 'color', None) + if color is None: + color = COLOR_MAP.get(record.levelno) + + if color: + return colorize(color, message) + else: + return message + + +class SafeEncodingWriter: + """Stream writer which ignores UnicodeEncodeError silently""" + def __init__(self, stream: IO) -> None: + self.stream = stream + self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii' + + def write(self, data: str) -> None: + try: + self.stream.write(data) + except UnicodeEncodeError: + # stream accept only str, not bytes. So, we encode and replace + # non-encodable characters, then decode them. + self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding)) + + def flush(self) -> None: + if hasattr(self.stream, 'flush'): + self.stream.flush() + + +class LastMessagesWriter: + """Stream writer storing last 10 messages in memory to save trackback""" + def __init__(self, app: Sphinx, stream: IO) -> None: + self.app = app + + def write(self, data: str) -> None: + self.app.messagelog.append(data) + + +def setup(app: Sphinx, status: IO, warning: IO) -> None: + """Setup root logger for Sphinx""" + logger = logging.getLogger(NAMESPACE) + logger.setLevel(logging.DEBUG) + logger.propagate = False + + # clear all handlers + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + info_handler = NewLineStreamHandler(SafeEncodingWriter(status)) + info_handler.addFilter(InfoFilter()) + info_handler.addFilter(InfoLogRecordTranslator(app)) + info_handler.setLevel(VERBOSITY_MAP[app.verbosity]) + info_handler.setFormatter(ColorizeFormatter()) + + warning_handler = WarningStreamHandler(SafeEncodingWriter(warning)) + warning_handler.addFilter(WarningSuppressor(app)) + warning_handler.addFilter(WarningLogRecordTranslator(app)) + warning_handler.addFilter(WarningIsErrorFilter(app)) + warning_handler.addFilter(OnceFilter()) + warning_handler.setLevel(logging.WARNING) + warning_handler.setFormatter(ColorizeFormatter()) + + messagelog_handler = logging.StreamHandler(LastMessagesWriter(app, status)) + messagelog_handler.addFilter(InfoFilter()) + messagelog_handler.setLevel(VERBOSITY_MAP[app.verbosity]) + + logger.addHandler(info_handler) + logger.addHandler(warning_handler) + logger.addHandler(messagelog_handler) -- cgit v1.2.3