diff options
Diffstat (limited to 'sphinx/util/logging.py')
-rw-r--r-- | sphinx/util/logging.py | 602 |
1 files changed, 602 insertions, 0 deletions
diff --git a/sphinx/util/logging.py b/sphinx/util/logging.py new file mode 100644 index 0000000..9b38aab --- /dev/null +++ b/sphinx/util/logging.py @@ -0,0 +1,602 @@ +"""Logging utility functions for Sphinx.""" + +import logging +import logging.handlers +from collections import defaultdict +from contextlib import contextmanager +from typing import IO, TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union + +from docutils import nodes +from docutils.nodes import Node +from docutils.utils import get_source_line + +from sphinx.errors import SphinxWarning +from sphinx.util.console import colorize +from sphinx.util.osutil import abspath + +if TYPE_CHECKING: + from sphinx.application import Sphinx + + +NAMESPACE = 'sphinx' +VERBOSE = 15 + +LEVEL_NAMES: Dict[str, int] = defaultdict(lambda: logging.WARNING) +LEVEL_NAMES.update({ + 'CRITICAL': logging.CRITICAL, + 'SEVERE': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'VERBOSE': VERBOSE, + 'DEBUG': logging.DEBUG, +}) + +VERBOSITY_MAP: Dict[int, int] = defaultdict(lambda: 0) +VERBOSITY_MAP.update({ + 0: logging.INFO, + 1: VERBOSE, + 2: logging.DEBUG, +}) + +COLOR_MAP = defaultdict(lambda: 'blue', + { + logging.ERROR: 'darkred', + logging.WARNING: 'red', + logging.DEBUG: 'darkgray' + }) + + +def getLogger(name: str) -> "SphinxLoggerAdapter": + """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`. + + Sphinx logger always uses ``sphinx.*`` namespace to be independent from + settings of root logger. It ensures logging is consistent even if a + third-party extension or imported application resets logger settings. + + Example usage:: + + >>> from sphinx.util import logging + >>> logger = logging.getLogger(__name__) + >>> logger.info('Hello, this is an extension!') + Hello, this is an extension! + """ + # add sphinx prefix to name forcely + logger = logging.getLogger(NAMESPACE + '.' + name) + # Forcely enable logger + logger.disabled = False + # wrap logger by SphinxLoggerAdapter + return SphinxLoggerAdapter(logger, {}) + + +def convert_serializable(records: List[logging.LogRecord]) -> None: + """Convert LogRecord serializable.""" + for r in records: + # extract arguments to a message and clear them + r.msg = r.getMessage() + r.args = () + + location = getattr(r, 'location', None) + if isinstance(location, nodes.Node): + r.location = get_node_location(location) + + +class SphinxLogRecord(logging.LogRecord): + """Log record class supporting location""" + prefix = '' + location: Any = None + + def getMessage(self) -> str: + message = super().getMessage() + location = getattr(self, 'location', None) + if location: + message = '%s: %s%s' % (location, self.prefix, message) + elif self.prefix not in message: + message = self.prefix + message + + return message + + +class SphinxInfoLogRecord(SphinxLogRecord): + """Info log record class supporting location""" + prefix = '' # do not show any prefix for INFO messages + + +class SphinxWarningLogRecord(SphinxLogRecord): + """Warning log record class supporting location""" + @property + def prefix(self) -> str: # type: ignore + if self.levelno >= logging.CRITICAL: + return 'CRITICAL: ' + elif self.levelno >= logging.ERROR: + return 'ERROR: ' + else: + return 'WARNING: ' + + +class SphinxLoggerAdapter(logging.LoggerAdapter): + """LoggerAdapter allowing ``type`` and ``subtype`` keywords.""" + KEYWORDS = ['type', 'subtype', 'location', 'nonl', 'color', 'once'] + + def log(self, level: Union[int, str], msg: str, *args: Any, **kwargs: Any) -> None: # type: ignore # NOQA + if isinstance(level, int): + super().log(level, msg, *args, **kwargs) + else: + levelno = LEVEL_NAMES[level] + super().log(levelno, msg, *args, **kwargs) + + def verbose(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.log(VERBOSE, msg, *args, **kwargs) + + def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]: # type: ignore + extra = kwargs.setdefault('extra', {}) + for keyword in self.KEYWORDS: + if keyword in kwargs: + extra[keyword] = kwargs.pop(keyword) + + return msg, kwargs + + def handle(self, record: logging.LogRecord) -> None: + self.logger.handle(record) + + +class WarningStreamHandler(logging.StreamHandler): + """StreamHandler for warnings.""" + pass + + +class NewLineStreamHandler(logging.StreamHandler): + """StreamHandler which switches line terminator by record.nonl flag.""" + + def emit(self, record: logging.LogRecord) -> None: + try: + self.acquire() + if getattr(record, 'nonl', False): + # skip appending terminator when nonl=True + self.terminator = '' + super().emit(record) + finally: + self.terminator = '\n' + self.release() + + +class MemoryHandler(logging.handlers.BufferingHandler): + """Handler buffering all logs.""" + + buffer: List[logging.LogRecord] + + def __init__(self) -> None: + super().__init__(-1) + + def shouldFlush(self, record: logging.LogRecord) -> bool: + return False # never flush + + def flush(self) -> None: + # suppress any flushes triggered by importing packages that flush + # all handlers at initialization time + pass + + def flushTo(self, logger: logging.Logger) -> None: + self.acquire() + try: + for record in self.buffer: + logger.handle(record) + self.buffer = [] + finally: + self.release() + + def clear(self) -> List[logging.LogRecord]: + buffer, self.buffer = self.buffer, [] + return buffer + + +@contextmanager +def pending_warnings() -> Generator[logging.Handler, None, None]: + """Context manager to postpone logging warnings temporarily. + + Similar to :func:`pending_logging`. + """ + logger = logging.getLogger(NAMESPACE) + memhandler = MemoryHandler() + memhandler.setLevel(logging.WARNING) + + try: + handlers = [] + for handler in logger.handlers[:]: + if isinstance(handler, WarningStreamHandler): + logger.removeHandler(handler) + handlers.append(handler) + + logger.addHandler(memhandler) + yield memhandler + finally: + logger.removeHandler(memhandler) + + for handler in handlers: + logger.addHandler(handler) + + memhandler.flushTo(logger) + + +@contextmanager +def suppress_logging() -> Generator[MemoryHandler, None, None]: + """Context manager to suppress logging all logs temporarily. + + For example:: + + >>> with suppress_logging(): + >>> logger.warning('Warning message!') # suppressed + >>> some_long_process() + >>> + """ + logger = logging.getLogger(NAMESPACE) + memhandler = MemoryHandler() + + try: + handlers = [] + for handler in logger.handlers[:]: + logger.removeHandler(handler) + handlers.append(handler) + + logger.addHandler(memhandler) + yield memhandler + finally: + logger.removeHandler(memhandler) + + for handler in handlers: + logger.addHandler(handler) + + +@contextmanager +def pending_logging() -> Generator[MemoryHandler, None, None]: + """Context manager to postpone logging all logs temporarily. + + For example:: + + >>> with pending_logging(): + >>> logger.warning('Warning message!') # not flushed yet + >>> some_long_process() + >>> + Warning message! # the warning is flushed here + """ + logger = logging.getLogger(NAMESPACE) + try: + with suppress_logging() as memhandler: + yield memhandler + finally: + memhandler.flushTo(logger) + + +@contextmanager +def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]: + """Context manager to skip WarningIsErrorFilter temporarily.""" + logger = logging.getLogger(NAMESPACE) + + if skip is False: + yield + else: + try: + disabler = DisableWarningIsErrorFilter() + for handler in logger.handlers: + # use internal method; filters.insert() directly to install disabler + # before WarningIsErrorFilter + handler.filters.insert(0, disabler) + yield + finally: + for handler in logger.handlers: + handler.removeFilter(disabler) + + +@contextmanager +def prefixed_warnings(prefix: str) -> Generator[None, None, None]: + """Context manager to prepend prefix to all warning log records temporarily. + + For example:: + + >>> with prefixed_warnings("prefix:"): + >>> logger.warning('Warning message!') # => prefix: Warning message! + + .. versionadded:: 2.0 + """ + logger = logging.getLogger(NAMESPACE) + warning_handler = None + for handler in logger.handlers: + if isinstance(handler, WarningStreamHandler): + warning_handler = handler + break + else: + # warning stream not found + yield + return + + prefix_filter = None + for _filter in warning_handler.filters: + if isinstance(_filter, MessagePrefixFilter): + prefix_filter = _filter + break + + if prefix_filter: + # already prefixed + try: + previous = prefix_filter.prefix + prefix_filter.prefix = prefix + yield + finally: + prefix_filter.prefix = previous + else: + # not prefixed yet + prefix_filter = MessagePrefixFilter(prefix) + try: + warning_handler.addFilter(prefix_filter) + yield + finally: + warning_handler.removeFilter(prefix_filter) + + +class LogCollector: + def __init__(self) -> None: + self.logs: List[logging.LogRecord] = [] + + @contextmanager + def collect(self) -> Generator[None, None, None]: + with pending_logging() as memhandler: + yield + + self.logs = memhandler.clear() + + +class InfoFilter(logging.Filter): + """Filter error and warning messages.""" + + def filter(self, record: logging.LogRecord) -> bool: + if record.levelno < logging.WARNING: + return True + else: + return False + + +def is_suppressed_warning(type: str, subtype: str, suppress_warnings: List[str]) -> bool: + """Check whether the warning is suppressed or not.""" + if type is None: + return False + + subtarget: Optional[str] + + for warning_type in suppress_warnings: + if '.' in warning_type: + target, subtarget = warning_type.split('.', 1) + else: + target, subtarget = warning_type, None + + if target == type and subtarget in (None, subtype, "*"): + return True + + return False + + +class WarningSuppressor(logging.Filter): + """Filter logs by `suppress_warnings`.""" + + def __init__(self, app: "Sphinx") -> None: + self.app = app + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + type = getattr(record, 'type', '') + subtype = getattr(record, 'subtype', '') + + try: + suppress_warnings = self.app.config.suppress_warnings + except AttributeError: + # config is not initialized yet (ex. in conf.py) + suppress_warnings = [] + + if is_suppressed_warning(type, subtype, suppress_warnings): + return False + else: + self.app._warncount += 1 + return True + + +class WarningIsErrorFilter(logging.Filter): + """Raise exception if warning emitted.""" + + def __init__(self, app: "Sphinx") -> None: + self.app = app + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + if getattr(record, 'skip_warningsiserror', False): + # disabled by DisableWarningIsErrorFilter + return True + elif self.app.warningiserror: + location = getattr(record, 'location', '') + try: + message = record.msg % record.args + except (TypeError, ValueError): + message = record.msg # use record.msg itself + + if location: + exc = SphinxWarning(location + ":" + str(message)) + else: + exc = SphinxWarning(message) + if record.exc_info is not None: + raise exc from record.exc_info[1] + else: + raise exc + else: + return True + + +class DisableWarningIsErrorFilter(logging.Filter): + """Disable WarningIsErrorFilter if this filter installed.""" + + def filter(self, record: logging.LogRecord) -> bool: + record.skip_warningsiserror = True + return True + + +class MessagePrefixFilter(logging.Filter): + """Prepend prefix to all log records.""" + + def __init__(self, prefix: str) -> None: + self.prefix = prefix + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + if self.prefix: + record.msg = self.prefix + ' ' + record.msg + return True + + +class OnceFilter(logging.Filter): + """Show the message only once.""" + + def __init__(self, name: str = '') -> None: + super().__init__(name) + self.messages: Dict[str, List] = {} + + def filter(self, record: logging.LogRecord) -> bool: + once = getattr(record, 'once', '') + if not once: + return True + else: + params = self.messages.setdefault(record.msg, []) + if record.args in params: + return False + + params.append(record.args) + return True + + +class SphinxLogRecordTranslator(logging.Filter): + """Converts a log record to one Sphinx expects + + * Make a instance of SphinxLogRecord + * docname to path if location given + """ + LogRecordClass: Type[logging.LogRecord] + + def __init__(self, app: "Sphinx") -> None: + self.app = app + super().__init__() + + def filter(self, record: SphinxWarningLogRecord) -> bool: # type: ignore + if isinstance(record, logging.LogRecord): + # force subclassing to handle location + record.__class__ = self.LogRecordClass # type: ignore + + location = getattr(record, 'location', None) + if isinstance(location, tuple): + docname, lineno = location + if docname and lineno: + record.location = '%s:%s' % (self.app.env.doc2path(docname), lineno) + elif docname: + record.location = '%s' % self.app.env.doc2path(docname) + else: + record.location = None + elif isinstance(location, nodes.Node): + record.location = get_node_location(location) + elif location and ':' not in location: + record.location = '%s' % self.app.env.doc2path(location) + + return True + + +class InfoLogRecordTranslator(SphinxLogRecordTranslator): + """LogRecordTranslator for INFO level log records.""" + LogRecordClass = SphinxInfoLogRecord + + +class WarningLogRecordTranslator(SphinxLogRecordTranslator): + """LogRecordTranslator for WARNING level log records.""" + LogRecordClass = SphinxWarningLogRecord + + +def get_node_location(node: Node) -> Optional[str]: + (source, line) = get_source_line(node) + if source: + source = abspath(source) + if source and line: + return "%s:%s" % (source, line) + elif source: + return "%s:" % source + elif line: + return "<unknown>:%s" % line + else: + return None + + +class ColorizeFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + message = super().format(record) + color = getattr(record, 'color', None) + if color is None: + color = COLOR_MAP.get(record.levelno) + + if color: + return colorize(color, message) + else: + return message + + +class SafeEncodingWriter: + """Stream writer which ignores UnicodeEncodeError silently""" + def __init__(self, stream: IO) -> None: + self.stream = stream + self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii' + + def write(self, data: str) -> None: + try: + self.stream.write(data) + except UnicodeEncodeError: + # stream accept only str, not bytes. So, we encode and replace + # non-encodable characters, then decode them. + self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding)) + + def flush(self) -> None: + if hasattr(self.stream, 'flush'): + self.stream.flush() + + +class LastMessagesWriter: + """Stream writer storing last 10 messages in memory to save trackback""" + def __init__(self, app: "Sphinx", stream: IO) -> None: + self.app = app + + def write(self, data: str) -> None: + self.app.messagelog.append(data) + + +def setup(app: "Sphinx", status: IO, warning: IO) -> None: + """Setup root logger for Sphinx""" + logger = logging.getLogger(NAMESPACE) + logger.setLevel(logging.DEBUG) + logger.propagate = False + + # clear all handlers + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + info_handler = NewLineStreamHandler(SafeEncodingWriter(status)) + info_handler.addFilter(InfoFilter()) + info_handler.addFilter(InfoLogRecordTranslator(app)) + info_handler.setLevel(VERBOSITY_MAP[app.verbosity]) + info_handler.setFormatter(ColorizeFormatter()) + + warning_handler = WarningStreamHandler(SafeEncodingWriter(warning)) + warning_handler.addFilter(WarningSuppressor(app)) + warning_handler.addFilter(WarningLogRecordTranslator(app)) + warning_handler.addFilter(WarningIsErrorFilter(app)) + warning_handler.addFilter(OnceFilter()) + warning_handler.setLevel(logging.WARNING) + warning_handler.setFormatter(ColorizeFormatter()) + + messagelog_handler = logging.StreamHandler(LastMessagesWriter(app, status)) + messagelog_handler.addFilter(InfoFilter()) + messagelog_handler.setLevel(VERBOSITY_MAP[app.verbosity]) + messagelog_handler.setFormatter(ColorizeFormatter()) + + logger.addHandler(info_handler) + logger.addHandler(warning_handler) + logger.addHandler(messagelog_handler) |