From cf7da1843c45a4c2df7a749f7886a2d2ba0ee92a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 19:25:40 +0200 Subject: Adding upstream version 7.2.6. Signed-off-by: Daniel Baumann --- sphinx/util/__init__.py | 297 +++++++++++++++ sphinx/util/_pathlib.py | 115 ++++++ sphinx/util/build_phase.py | 12 + sphinx/util/cfamily.py | 464 ++++++++++++++++++++++++ sphinx/util/console.py | 129 +++++++ sphinx/util/display.py | 94 +++++ sphinx/util/docfields.py | 408 +++++++++++++++++++++ sphinx/util/docstrings.py | 88 +++++ sphinx/util/docutils.py | 635 +++++++++++++++++++++++++++++++++ sphinx/util/exceptions.py | 67 ++++ sphinx/util/fileutil.py | 100 ++++++ sphinx/util/http_date.py | 39 ++ sphinx/util/i18n.py | 253 +++++++++++++ sphinx/util/images.py | 146 ++++++++ sphinx/util/index_entries.py | 27 ++ sphinx/util/inspect.py | 833 +++++++++++++++++++++++++++++++++++++++++++ sphinx/util/inventory.py | 172 +++++++++ sphinx/util/logging.py | 602 +++++++++++++++++++++++++++++++ sphinx/util/matching.py | 169 +++++++++ sphinx/util/math.py | 61 ++++ sphinx/util/nodes.py | 672 ++++++++++++++++++++++++++++++++++ sphinx/util/osutil.py | 217 +++++++++++ sphinx/util/parallel.py | 154 ++++++++ sphinx/util/png.py | 43 +++ sphinx/util/requests.py | 73 ++++ sphinx/util/rst.py | 110 ++++++ sphinx/util/tags.py | 88 +++++ sphinx/util/template.py | 135 +++++++ sphinx/util/texescape.py | 153 ++++++++ sphinx/util/typing.py | 402 +++++++++++++++++++++ 30 files changed, 6758 insertions(+) create mode 100644 sphinx/util/__init__.py create mode 100644 sphinx/util/_pathlib.py create mode 100644 sphinx/util/build_phase.py create mode 100644 sphinx/util/cfamily.py create mode 100644 sphinx/util/console.py create mode 100644 sphinx/util/display.py create mode 100644 sphinx/util/docfields.py create mode 100644 sphinx/util/docstrings.py create mode 100644 sphinx/util/docutils.py create mode 100644 sphinx/util/exceptions.py create mode 100644 sphinx/util/fileutil.py create mode 100644 sphinx/util/http_date.py create mode 100644 sphinx/util/i18n.py create mode 100644 sphinx/util/images.py create mode 100644 sphinx/util/index_entries.py create mode 100644 sphinx/util/inspect.py create mode 100644 sphinx/util/inventory.py create mode 100644 sphinx/util/logging.py create mode 100644 sphinx/util/matching.py create mode 100644 sphinx/util/math.py create mode 100644 sphinx/util/nodes.py create mode 100644 sphinx/util/osutil.py create mode 100644 sphinx/util/parallel.py create mode 100644 sphinx/util/png.py create mode 100644 sphinx/util/requests.py create mode 100644 sphinx/util/rst.py create mode 100644 sphinx/util/tags.py create mode 100644 sphinx/util/template.py create mode 100644 sphinx/util/texescape.py create mode 100644 sphinx/util/typing.py (limited to 'sphinx/util') diff --git a/sphinx/util/__init__.py b/sphinx/util/__init__.py new file mode 100644 index 0000000..69b2848 --- /dev/null +++ b/sphinx/util/__init__.py @@ -0,0 +1,297 @@ +"""Utility functions for Sphinx.""" + +from __future__ import annotations + +import hashlib +import os +import posixpath +import re +from importlib import import_module +from os import path +from typing import IO, Any +from urllib.parse import parse_qsl, quote_plus, urlencode, urlsplit, urlunsplit + +from sphinx.errors import ExtensionError, FiletypeNotFoundError +from sphinx.locale import __ +from sphinx.util import display as _display +from sphinx.util import exceptions as _exceptions +from sphinx.util import http_date as _http_date +from sphinx.util import index_entries as _index_entries +from sphinx.util import logging +from sphinx.util import osutil as _osutil +from sphinx.util.console import strip_colors # NoQA: F401 +from sphinx.util.matching import patfilter # noqa: F401 +from sphinx.util.nodes import ( # noqa: F401 + caption_ref_re, + explicit_title_re, + nested_parse_with_titles, + split_explicit_title, +) + +# import other utilities; partly for backwards compatibility, so don't +# prune unused ones indiscriminately +from sphinx.util.osutil import ( # noqa: F401 + SEP, + copyfile, + copytimes, + ensuredir, + make_filename, + mtimes_of_files, + os_path, + relative_uri, +) + +logger = logging.getLogger(__name__) + +# Generally useful regular expressions. +ws_re: re.Pattern[str] = re.compile(r'\s+') +url_re: re.Pattern[str] = re.compile(r'(?P.+)://.*') + + +# High-level utility functions. + +def docname_join(basedocname: str, docname: str) -> str: + return posixpath.normpath(posixpath.join('/' + basedocname, '..', docname))[1:] + + +def get_filetype(source_suffix: dict[str, str], filename: str) -> str: + for suffix, filetype in source_suffix.items(): + if filename.endswith(suffix): + # If default filetype (None), considered as restructuredtext. + return filetype or 'restructuredtext' + raise FiletypeNotFoundError + + +class FilenameUniqDict(dict): + """ + A dictionary that automatically generates unique names for its keys, + interpreted as filenames, and keeps track of a set of docnames they + appear in. Used for images and downloadable files in the environment. + """ + def __init__(self) -> None: + self._existing: set[str] = set() + + def add_file(self, docname: str, newfile: str) -> str: + if newfile in self: + self[newfile][0].add(docname) + return self[newfile][1] + uniquename = path.basename(newfile) + base, ext = path.splitext(uniquename) + i = 0 + while uniquename in self._existing: + i += 1 + uniquename = f'{base}{i}{ext}' + self[newfile] = ({docname}, uniquename) + self._existing.add(uniquename) + return uniquename + + def purge_doc(self, docname: str) -> None: + for filename, (docs, unique) in list(self.items()): + docs.discard(docname) + if not docs: + del self[filename] + self._existing.discard(unique) + + def merge_other(self, docnames: set[str], other: dict[str, tuple[set[str], Any]]) -> None: + for filename, (docs, _unique) in other.items(): + for doc in docs & set(docnames): + self.add_file(doc, filename) + + def __getstate__(self) -> set[str]: + return self._existing + + def __setstate__(self, state: set[str]) -> None: + self._existing = state + + +def _md5(data=b'', **_kw): + """Deprecated wrapper around hashlib.md5 + + To be removed in Sphinx 9.0 + """ + return hashlib.md5(data, usedforsecurity=False) + + +def _sha1(data=b'', **_kw): + """Deprecated wrapper around hashlib.sha1 + + To be removed in Sphinx 9.0 + """ + return hashlib.sha1(data, usedforsecurity=False) + + +class DownloadFiles(dict): + """A special dictionary for download files. + + .. important:: This class would be refactored in nearly future. + Hence don't hack this directly. + """ + + def add_file(self, docname: str, filename: str) -> str: + if filename not in self: + digest = hashlib.md5(filename.encode(), usedforsecurity=False).hexdigest() + dest = f'{digest}/{os.path.basename(filename)}' + self[filename] = (set(), dest) + + self[filename][0].add(docname) + return self[filename][1] + + def purge_doc(self, docname: str) -> None: + for filename, (docs, _dest) in list(self.items()): + docs.discard(docname) + if not docs: + del self[filename] + + def merge_other(self, docnames: set[str], other: dict[str, tuple[set[str], Any]]) -> None: + for filename, (docs, _dest) in other.items(): + for docname in docs & set(docnames): + self.add_file(docname, filename) + + +# a regex to recognize coding cookies +_coding_re = re.compile(r'coding[:=]\s*([-\w.]+)') + + +class UnicodeDecodeErrorHandler: + """Custom error handler for open() that warns and replaces.""" + + def __init__(self, docname: str) -> None: + self.docname = docname + + def __call__(self, error: UnicodeDecodeError) -> tuple[str, int]: + linestart = error.object.rfind(b'\n', 0, error.start) + lineend = error.object.find(b'\n', error.start) + if lineend == -1: + lineend = len(error.object) + lineno = error.object.count(b'\n', 0, error.start) + 1 + logger.warning(__('undecodable source characters, replacing with "?": %r'), + (error.object[linestart + 1:error.start] + b'>>>' + + error.object[error.start:error.end] + b'<<<' + + error.object[error.end:lineend]), + location=(self.docname, lineno)) + return ('?', error.end) + + +# Low-level utility functions and classes. + +class Tee: + """ + File-like object writing to two streams. + """ + def __init__(self, stream1: IO, stream2: IO) -> None: + self.stream1 = stream1 + self.stream2 = stream2 + + def write(self, text: str) -> None: + self.stream1.write(text) + self.stream2.write(text) + + def flush(self) -> None: + if hasattr(self.stream1, 'flush'): + self.stream1.flush() + if hasattr(self.stream2, 'flush'): + self.stream2.flush() + + +def parselinenos(spec: str, total: int) -> list[int]: + """Parse a line number spec (such as "1,2,4-6") and return a list of + wanted line numbers. + """ + items = [] + parts = spec.split(',') + for part in parts: + try: + begend = part.strip().split('-') + if ['', ''] == begend: + raise ValueError + if len(begend) == 1: + items.append(int(begend[0]) - 1) + elif len(begend) == 2: + start = int(begend[0] or 1) # left half open (cf. -10) + end = int(begend[1] or max(start, total)) # right half open (cf. 10-) + if start > end: # invalid range (cf. 10-1) + raise ValueError + items.extend(range(start - 1, end)) + else: + raise ValueError + except ValueError as exc: + msg = f'invalid line number spec: {spec!r}' + raise ValueError(msg) from exc + + return items + + +def import_object(objname: str, source: str | None = None) -> Any: + """Import python object by qualname.""" + try: + objpath = objname.split('.') + modname = objpath.pop(0) + obj = import_module(modname) + for name in objpath: + modname += '.' + name + try: + obj = getattr(obj, name) + except AttributeError: + obj = import_module(modname) + + return obj + except (AttributeError, ImportError) as exc: + if source: + raise ExtensionError('Could not import %s (needed for %s)' % + (objname, source), exc) from exc + raise ExtensionError('Could not import %s' % objname, exc) from exc + + +def encode_uri(uri: str) -> str: + split = list(urlsplit(uri)) + split[1] = split[1].encode('idna').decode('ascii') + split[2] = quote_plus(split[2].encode(), '/') + query = [(q, v.encode()) for (q, v) in parse_qsl(split[3])] + split[3] = urlencode(query) + return urlunsplit(split) + + +def isurl(url: str) -> bool: + """Check *url* is URL or not.""" + return bool(url) and '://' in url + + +def _xml_name_checker(): + # to prevent import cycles + from sphinx.builders.epub3 import _XML_NAME_PATTERN + + return _XML_NAME_PATTERN + + +# deprecated name -> (object to return, canonical path or empty string) +_DEPRECATED_OBJECTS = { + 'path_stabilize': (_osutil.path_stabilize, 'sphinx.util.osutil.path_stabilize'), + 'display_chunk': (_display.display_chunk, 'sphinx.util.display.display_chunk'), + 'status_iterator': (_display.status_iterator, 'sphinx.util.display.status_iterator'), + 'SkipProgressMessage': (_display.SkipProgressMessage, + 'sphinx.util.display.SkipProgressMessage'), + 'progress_message': (_display.progress_message, 'sphinx.util.display.progress_message'), + 'epoch_to_rfc1123': (_http_date.epoch_to_rfc1123, 'sphinx.http_date.epoch_to_rfc1123'), + 'rfc1123_to_epoch': (_http_date.rfc1123_to_epoch, 'sphinx.http_date.rfc1123_to_epoch'), + 'save_traceback': (_exceptions.save_traceback, 'sphinx.exceptions.save_traceback'), + 'format_exception_cut_frames': (_exceptions.format_exception_cut_frames, + 'sphinx.exceptions.format_exception_cut_frames'), + 'xmlname_checker': (_xml_name_checker, 'sphinx.builders.epub3._XML_NAME_PATTERN'), + 'split_index_msg': (_index_entries.split_index_msg, + 'sphinx.util.index_entries.split_index_msg'), + 'split_into': (_index_entries.split_index_msg, 'sphinx.util.index_entries.split_into'), + 'md5': (_md5, ''), + 'sha1': (_sha1, ''), +} + + +def __getattr__(name): + if name not in _DEPRECATED_OBJECTS: + msg = f'module {__name__!r} has no attribute {name!r}' + raise AttributeError(msg) + + from sphinx.deprecation import _deprecation_warning + + deprecated_object, canonical_name = _DEPRECATED_OBJECTS[name] + _deprecation_warning(__name__, name, canonical_name, remove=(8, 0)) + return deprecated_object diff --git a/sphinx/util/_pathlib.py b/sphinx/util/_pathlib.py new file mode 100644 index 0000000..59980e9 --- /dev/null +++ b/sphinx/util/_pathlib.py @@ -0,0 +1,115 @@ +"""What follows is awful and will be gone in Sphinx 8""" + +from __future__ import annotations + +import sys +import warnings +from pathlib import Path, PosixPath, PurePath, WindowsPath + +from sphinx.deprecation import RemovedInSphinx80Warning + +_STR_METHODS = frozenset(str.__dict__) +_PATH_NAME = Path().__class__.__name__ + +_MSG = ( + 'Sphinx 8 will drop support for representing paths as strings. ' + 'Use "pathlib.Path" or "os.fspath" instead.' +) + +# https://docs.python.org/3/library/stdtypes.html#typesseq-common +# https://docs.python.org/3/library/stdtypes.html#string-methods + +if sys.platform == 'win32': + class _StrPath(WindowsPath): + def replace(self, old, new, count=-1, /): + # replace exists in both Path and str; + # in Path it makes filesystem changes, so we use the safer str version + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__().replace(old, new, count) + + def __getattr__(self, item): + if item in _STR_METHODS: + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return getattr(self.__str__(), item) + msg = f'{_PATH_NAME!r} has no attribute {item!r}' + raise AttributeError(msg) + + def __add__(self, other): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__() + other + + def __bool__(self): + if not self.__str__(): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return False + return True + + def __contains__(self, item): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return item in self.__str__() + + def __eq__(self, other): + if isinstance(other, PurePath): + return super().__eq__(other) + if isinstance(other, str): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__() == other + return NotImplemented + + def __hash__(self): + return super().__hash__() + + def __getitem__(self, item): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__()[item] + + def __len__(self): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return len(self.__str__()) +else: + class _StrPath(PosixPath): + def replace(self, old, new, count=-1, /): + # replace exists in both Path and str; + # in Path it makes filesystem changes, so we use the safer str version + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__().replace(old, new, count) + + def __getattr__(self, item): + if item in _STR_METHODS: + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return getattr(self.__str__(), item) + msg = f'{_PATH_NAME!r} has no attribute {item!r}' + raise AttributeError(msg) + + def __add__(self, other): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__() + other + + def __bool__(self): + if not self.__str__(): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return False + return True + + def __contains__(self, item): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return item in self.__str__() + + def __eq__(self, other): + if isinstance(other, PurePath): + return super().__eq__(other) + if isinstance(other, str): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__() == other + return NotImplemented + + def __hash__(self): + return super().__hash__() + + def __getitem__(self, item): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return self.__str__()[item] + + def __len__(self): + warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2) + return len(self.__str__()) diff --git a/sphinx/util/build_phase.py b/sphinx/util/build_phase.py new file mode 100644 index 0000000..7f80aa5 --- /dev/null +++ b/sphinx/util/build_phase.py @@ -0,0 +1,12 @@ +"""Build phase of Sphinx application.""" + +from enum import IntEnum + + +class BuildPhase(IntEnum): + """Build phase of Sphinx application.""" + INITIALIZATION = 1 + READING = 2 + CONSISTENCY_CHECK = 3 + RESOLVING = 3 + WRITING = 4 diff --git a/sphinx/util/cfamily.py b/sphinx/util/cfamily.py new file mode 100644 index 0000000..a3fdbe3 --- /dev/null +++ b/sphinx/util/cfamily.py @@ -0,0 +1,464 @@ +"""Utility functions common to the C and C++ domains.""" + +from __future__ import annotations + +import re +from copy import deepcopy +from typing import TYPE_CHECKING, Any, Callable + +from docutils import nodes + +from sphinx import addnodes +from sphinx.util import logging + +if TYPE_CHECKING: + from docutils.nodes import TextElement + + from sphinx.config import Config + +logger = logging.getLogger(__name__) + +StringifyTransform = Callable[[Any], str] + + +_whitespace_re = re.compile(r'\s+') +anon_identifier_re = re.compile(r'(@[a-zA-Z0-9_])[a-zA-Z0-9_]*\b') +identifier_re = re.compile(r''' + ( # This 'extends' _anon_identifier_re with the ordinary identifiers, + # make sure they are in sync. + (~?\b[a-zA-Z_]) # ordinary identifiers + | (@[a-zA-Z0-9_]) # our extension for names of anonymous entities + ) + [a-zA-Z0-9_]*\b +''', flags=re.VERBOSE) +integer_literal_re = re.compile(r'[1-9][0-9]*(\'[0-9]+)*') +octal_literal_re = re.compile(r'0[0-7]*(\'[0-7]+)*') +hex_literal_re = re.compile(r'0[xX][0-9a-fA-F]+(\'[0-9a-fA-F]+)*') +binary_literal_re = re.compile(r'0[bB][01]+(\'[01]+)*') +integers_literal_suffix_re = re.compile(r''' + # unsigned and/or (long) long, in any order, but at least one of them + ( + ([uU] ([lL] | (ll) | (LL))?) + | + (([lL] | (ll) | (LL)) [uU]?) + )\b + # the ending word boundary is important for distinguishing + # between suffixes and UDLs in C++ +''', flags=re.VERBOSE) +float_literal_re = re.compile(r''' + [+-]?( + # decimal + ([0-9]+(\'[0-9]+)*[eE][+-]?[0-9]+(\'[0-9]+)*) + | (([0-9]+(\'[0-9]+)*)?\.[0-9]+(\'[0-9]+)*([eE][+-]?[0-9]+(\'[0-9]+)*)?) + | ([0-9]+(\'[0-9]+)*\.([eE][+-]?[0-9]+(\'[0-9]+)*)?) + # hex + | (0[xX][0-9a-fA-F]+(\'[0-9a-fA-F]+)*[pP][+-]?[0-9a-fA-F]+(\'[0-9a-fA-F]+)*) + | (0[xX]([0-9a-fA-F]+(\'[0-9a-fA-F]+)*)?\. + [0-9a-fA-F]+(\'[0-9a-fA-F]+)*([pP][+-]?[0-9a-fA-F]+(\'[0-9a-fA-F]+)*)?) + | (0[xX][0-9a-fA-F]+(\'[0-9a-fA-F]+)*\.([pP][+-]?[0-9a-fA-F]+(\'[0-9a-fA-F]+)*)?) + ) +''', flags=re.VERBOSE) +float_literal_suffix_re = re.compile(r'[fFlL]\b') +# the ending word boundary is important for distinguishing between suffixes and UDLs in C++ +char_literal_re = re.compile(r''' + ((?:u8)|u|U|L)? + '( + (?:[^\\']) + | (\\( + (?:['"?\\abfnrtv]) + | (?:[0-7]{1,3}) + | (?:x[0-9a-fA-F]{2}) + | (?:u[0-9a-fA-F]{4}) + | (?:U[0-9a-fA-F]{8}) + )) + )' +''', flags=re.VERBOSE) + + +def verify_description_mode(mode: str) -> None: + if mode not in ('lastIsName', 'noneIsName', 'markType', 'markName', 'param', 'udl'): + raise Exception("Description mode '%s' is invalid." % mode) + + +class NoOldIdError(Exception): + # Used to avoid implementing unneeded id generation for old id schemes. + pass + + +class ASTBaseBase: + def __eq__(self, other: Any) -> bool: + if type(self) is not type(other): + return False + try: + for key, value in self.__dict__.items(): + if value != getattr(other, key): + return False + except AttributeError: + return False + return True + + # Defining __hash__ = None is not strictly needed when __eq__ is defined. + __hash__ = None # type: ignore[assignment] + + def clone(self) -> Any: + return deepcopy(self) + + def _stringify(self, transform: StringifyTransform) -> str: + raise NotImplementedError(repr(self)) + + def __str__(self) -> str: + return self._stringify(lambda ast: str(ast)) + + def get_display_string(self) -> str: + return self._stringify(lambda ast: ast.get_display_string()) + + def __repr__(self) -> str: + return '<%s>' % self.__class__.__name__ + + +################################################################################ +# Attributes +################################################################################ + +class ASTAttribute(ASTBaseBase): + def describe_signature(self, signode: TextElement) -> None: + raise NotImplementedError(repr(self)) + + +class ASTCPPAttribute(ASTAttribute): + def __init__(self, arg: str) -> None: + self.arg = arg + + def _stringify(self, transform: StringifyTransform) -> str: + return "[[" + self.arg + "]]" + + def describe_signature(self, signode: TextElement) -> None: + signode.append(addnodes.desc_sig_punctuation('[[', '[[')) + signode.append(nodes.Text(self.arg)) + signode.append(addnodes.desc_sig_punctuation(']]', ']]')) + + +class ASTGnuAttribute(ASTBaseBase): + def __init__(self, name: str, args: ASTBaseParenExprList | None) -> None: + self.name = name + self.args = args + + def _stringify(self, transform: StringifyTransform) -> str: + res = [self.name] + if self.args: + res.append(transform(self.args)) + return ''.join(res) + + +class ASTGnuAttributeList(ASTAttribute): + def __init__(self, attrs: list[ASTGnuAttribute]) -> None: + self.attrs = attrs + + def _stringify(self, transform: StringifyTransform) -> str: + res = ['__attribute__(('] + first = True + for attr in self.attrs: + if not first: + res.append(', ') + first = False + res.append(transform(attr)) + res.append('))') + return ''.join(res) + + def describe_signature(self, signode: TextElement) -> None: + txt = str(self) + signode.append(nodes.Text(txt)) + + +class ASTIdAttribute(ASTAttribute): + """For simple attributes defined by the user.""" + + def __init__(self, id: str) -> None: + self.id = id + + def _stringify(self, transform: StringifyTransform) -> str: + return self.id + + def describe_signature(self, signode: TextElement) -> None: + signode.append(nodes.Text(self.id)) + + +class ASTParenAttribute(ASTAttribute): + """For paren attributes defined by the user.""" + + def __init__(self, id: str, arg: str) -> None: + self.id = id + self.arg = arg + + def _stringify(self, transform: StringifyTransform) -> str: + return self.id + '(' + self.arg + ')' + + def describe_signature(self, signode: TextElement) -> None: + txt = str(self) + signode.append(nodes.Text(txt)) + + +class ASTAttributeList(ASTBaseBase): + def __init__(self, attrs: list[ASTAttribute]) -> None: + self.attrs = attrs + + def __len__(self) -> int: + return len(self.attrs) + + def __add__(self, other: ASTAttributeList) -> ASTAttributeList: + return ASTAttributeList(self.attrs + other.attrs) + + def _stringify(self, transform: StringifyTransform) -> str: + return ' '.join(transform(attr) for attr in self.attrs) + + def describe_signature(self, signode: TextElement) -> None: + if len(self.attrs) == 0: + return + self.attrs[0].describe_signature(signode) + if len(self.attrs) == 1: + return + for attr in self.attrs[1:]: + signode.append(addnodes.desc_sig_space()) + attr.describe_signature(signode) + + +################################################################################ + +class ASTBaseParenExprList(ASTBaseBase): + pass + + +################################################################################ + +class UnsupportedMultiCharacterCharLiteral(Exception): + pass + + +class DefinitionError(Exception): + pass + + +class BaseParser: + def __init__(self, definition: str, *, + location: nodes.Node | tuple[str, int] | str, + config: Config) -> None: + self.definition = definition.strip() + self.location = location # for warnings + self.config = config + + self.pos = 0 + self.end = len(self.definition) + self.last_match: re.Match[str] | None = None + self._previous_state: tuple[int, re.Match[str] | None] = (0, None) + self.otherErrors: list[DefinitionError] = [] + + # in our tests the following is set to False to capture bad parsing + self.allowFallbackExpressionParsing = True + + def _make_multi_error(self, errors: list[Any], header: str) -> DefinitionError: + if len(errors) == 1: + if len(header) > 0: + return DefinitionError(header + '\n' + str(errors[0][0])) + else: + return DefinitionError(str(errors[0][0])) + result = [header, '\n'] + for e in errors: + if len(e[1]) > 0: + indent = ' ' + result.append(e[1]) + result.append(':\n') + for line in str(e[0]).split('\n'): + if len(line) == 0: + continue + result.append(indent) + result.append(line) + result.append('\n') + else: + result.append(str(e[0])) + return DefinitionError(''.join(result)) + + @property + def language(self) -> str: + raise NotImplementedError + + def status(self, msg: str) -> None: + # for debugging + indicator = '-' * self.pos + '^' + logger.debug(f"{msg}\n{self.definition}\n{indicator}") # NoQA: G004 + + def fail(self, msg: str) -> None: + errors = [] + indicator = '-' * self.pos + '^' + exMain = DefinitionError( + 'Invalid %s declaration: %s [error at %d]\n %s\n %s' % + (self.language, msg, self.pos, self.definition, indicator)) + errors.append((exMain, "Main error")) + for err in self.otherErrors: + errors.append((err, "Potential other error")) + self.otherErrors = [] + raise self._make_multi_error(errors, '') + + def warn(self, msg: str) -> None: + logger.warning(msg, location=self.location) + + def match(self, regex: re.Pattern[str]) -> bool: + match = regex.match(self.definition, self.pos) + if match is not None: + self._previous_state = (self.pos, self.last_match) + self.pos = match.end() + self.last_match = match + return True + return False + + def skip_string(self, string: str) -> bool: + strlen = len(string) + if self.definition[self.pos:self.pos + strlen] == string: + self.pos += strlen + return True + return False + + def skip_word(self, word: str) -> bool: + return self.match(re.compile(r'\b%s\b' % re.escape(word))) + + def skip_ws(self) -> bool: + return self.match(_whitespace_re) + + def skip_word_and_ws(self, word: str) -> bool: + if self.skip_word(word): + self.skip_ws() + return True + return False + + def skip_string_and_ws(self, string: str) -> bool: + if self.skip_string(string): + self.skip_ws() + return True + return False + + @property + def eof(self) -> bool: + return self.pos >= self.end + + @property + def current_char(self) -> str: + try: + return self.definition[self.pos] + except IndexError: + return 'EOF' + + @property + def matched_text(self) -> str: + if self.last_match is not None: + return self.last_match.group() + return '' + + def read_rest(self) -> str: + rv = self.definition[self.pos:] + self.pos = self.end + return rv + + def assert_end(self, *, allowSemicolon: bool = False) -> None: + self.skip_ws() + if allowSemicolon: + if not self.eof and self.definition[self.pos:] != ';': + self.fail('Expected end of definition or ;.') + else: + if not self.eof: + self.fail('Expected end of definition.') + + ################################################################################ + + @property + def id_attributes(self): + raise NotImplementedError + + @property + def paren_attributes(self): + raise NotImplementedError + + def _parse_balanced_token_seq(self, end: list[str]) -> str: + # TODO: add handling of string literals and similar + brackets = {'(': ')', '[': ']', '{': '}'} + startPos = self.pos + symbols: list[str] = [] + while not self.eof: + if len(symbols) == 0 and self.current_char in end: + break + if self.current_char in brackets: + symbols.append(brackets[self.current_char]) + elif len(symbols) > 0 and self.current_char == symbols[-1]: + symbols.pop() + elif self.current_char in ")]}": + self.fail("Unexpected '%s' in balanced-token-seq." % self.current_char) + self.pos += 1 + if self.eof: + self.fail("Could not find end of balanced-token-seq starting at %d." + % startPos) + return self.definition[startPos:self.pos] + + def _parse_attribute(self) -> ASTAttribute | None: + self.skip_ws() + # try C++11 style + startPos = self.pos + if self.skip_string_and_ws('['): + if not self.skip_string('['): + self.pos = startPos + else: + # TODO: actually implement the correct grammar + arg = self._parse_balanced_token_seq(end=[']']) + if not self.skip_string_and_ws(']'): + self.fail("Expected ']' in end of attribute.") + if not self.skip_string_and_ws(']'): + self.fail("Expected ']' in end of attribute after [[...]") + return ASTCPPAttribute(arg) + + # try GNU style + if self.skip_word_and_ws('__attribute__'): + if not self.skip_string_and_ws('('): + self.fail("Expected '(' after '__attribute__'.") + if not self.skip_string_and_ws('('): + self.fail("Expected '(' after '__attribute__('.") + attrs = [] + while 1: + if self.match(identifier_re): + name = self.matched_text + exprs = self._parse_paren_expression_list() + attrs.append(ASTGnuAttribute(name, exprs)) + if self.skip_string_and_ws(','): + continue + if self.skip_string_and_ws(')'): + break + self.fail("Expected identifier, ')', or ',' in __attribute__.") + if not self.skip_string_and_ws(')'): + self.fail("Expected ')' after '__attribute__((...)'") + return ASTGnuAttributeList(attrs) + + # try the simple id attributes defined by the user + for id in self.id_attributes: + if self.skip_word_and_ws(id): + return ASTIdAttribute(id) + + # try the paren attributes defined by the user + for id in self.paren_attributes: + if not self.skip_string_and_ws(id): + continue + if not self.skip_string('('): + self.fail("Expected '(' after user-defined paren-attribute.") + arg = self._parse_balanced_token_seq(end=[')']) + if not self.skip_string(')'): + self.fail("Expected ')' to end user-defined paren-attribute.") + return ASTParenAttribute(id, arg) + + return None + + def _parse_attribute_list(self) -> ASTAttributeList: + res = [] + while True: + attr = self._parse_attribute() + if attr is None: + break + res.append(attr) + return ASTAttributeList(res) + + def _parse_paren_expression_list(self) -> ASTBaseParenExprList | None: + raise NotImplementedError diff --git a/sphinx/util/console.py b/sphinx/util/console.py new file mode 100644 index 0000000..0fc9450 --- /dev/null +++ b/sphinx/util/console.py @@ -0,0 +1,129 @@ +"""Format colored console output.""" + +from __future__ import annotations + +import os +import re +import shutil +import sys + +try: + # check if colorama is installed to support color on Windows + import colorama +except ImportError: + colorama = None + + +_ansi_re: re.Pattern[str] = re.compile('\x1b\\[(\\d\\d;){0,2}\\d\\dm') +codes: dict[str, str] = {} + + +def terminal_safe(s: str) -> str: + """Safely encode a string for printing to the terminal.""" + return s.encode('ascii', 'backslashreplace').decode('ascii') + + +def get_terminal_width() -> int: + """Return the width of the terminal in columns.""" + return shutil.get_terminal_size().columns - 1 + + +_tw: int = get_terminal_width() + + +def term_width_line(text: str) -> str: + if not codes: + # if no coloring, don't output fancy backspaces + return text + '\n' + else: + # codes are not displayed, this must be taken into account + return text.ljust(_tw + len(text) - len(_ansi_re.sub('', text))) + '\r' + + +def color_terminal() -> bool: + if 'NO_COLOR' in os.environ: + return False + if sys.platform == 'win32' and colorama is not None: + colorama.init() + return True + if 'FORCE_COLOR' in os.environ: + return True + if not hasattr(sys.stdout, 'isatty'): + return False + if not sys.stdout.isatty(): + return False + if 'COLORTERM' in os.environ: + return True + term = os.environ.get('TERM', 'dumb').lower() + if term in ('xterm', 'linux') or 'color' in term: + return True + return False + + +def nocolor() -> None: + if sys.platform == 'win32' and colorama is not None: + colorama.deinit() + codes.clear() + + +def coloron() -> None: + codes.update(_orig_codes) + + +def colorize(name: str, text: str, input_mode: bool = False) -> str: + def escseq(name: str) -> str: + # Wrap escape sequence with ``\1`` and ``\2`` to let readline know + # it is non-printable characters + # ref: https://tiswww.case.edu/php/chet/readline/readline.html + # + # Note: This hack does not work well in Windows (see #5059) + escape = codes.get(name, '') + if input_mode and escape and sys.platform != 'win32': + return '\1' + escape + '\2' + else: + return escape + + return escseq(name) + text + escseq('reset') + + +def strip_colors(s: str) -> str: + return re.compile('\x1b.*?m').sub('', s) + + +def create_color_func(name: str) -> None: + def inner(text: str) -> str: + return colorize(name, text) + globals()[name] = inner + + +_attrs = { + 'reset': '39;49;00m', + 'bold': '01m', + 'faint': '02m', + 'standout': '03m', + 'underline': '04m', + 'blink': '05m', +} + +for _name, _value in _attrs.items(): + codes[_name] = '\x1b[' + _value + +_colors = [ + ('black', 'darkgray'), + ('darkred', 'red'), + ('darkgreen', 'green'), + ('brown', 'yellow'), + ('darkblue', 'blue'), + ('purple', 'fuchsia'), + ('turquoise', 'teal'), + ('lightgray', 'white'), +] + +for i, (dark, light) in enumerate(_colors, 30): + codes[dark] = '\x1b[%im' % i + codes[light] = '\x1b[%im' % (i + 60) + +_orig_codes = codes.copy() + +for _name in codes: + create_color_func(_name) diff --git a/sphinx/util/display.py b/sphinx/util/display.py new file mode 100644 index 0000000..199119c --- /dev/null +++ b/sphinx/util/display.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import functools +from typing import Any, Callable, TypeVar + +from sphinx.locale import __ +from sphinx.util import logging +from sphinx.util.console import bold # type: ignore[attr-defined] + +if False: + from collections.abc import Iterable, Iterator + from types import TracebackType + +logger = logging.getLogger(__name__) + + +def display_chunk(chunk: Any) -> str: + if isinstance(chunk, (list, tuple)): + if len(chunk) == 1: + return str(chunk[0]) + return f'{chunk[0]} .. {chunk[-1]}' + return str(chunk) + + +T = TypeVar('T') + + +def status_iterator( + iterable: Iterable[T], + summary: str, + color: str = 'darkgreen', + length: int = 0, + verbosity: int = 0, + stringify_func: Callable[[Any], str] = display_chunk, +) -> Iterator[T]: + single_line = verbosity < 1 + bold_summary = bold(summary) + if length == 0: + logger.info(bold_summary, nonl=True) + for item in iterable: + logger.info(stringify_func(item) + ' ', nonl=True, color=color) + yield item + else: + for i, item in enumerate(iterable, start=1): + if single_line: + # clear the entire line ('Erase in Line') + logger.info('\x1b[2K', nonl=True) + logger.info(f'{bold_summary}[{i / length: >4.0%}] ', nonl=True) # NoQA: G004 + # Emit the string representation of ``item`` + logger.info(stringify_func(item), nonl=True, color=color) + # If in single-line mode, emit a carriage return to move the cursor + # to the start of the line. + # If not, emit a newline to move the cursor to the next line. + logger.info('\r' * single_line, nonl=single_line) + yield item + logger.info('') + + +class SkipProgressMessage(Exception): + pass + + +class progress_message: + def __init__(self, message: str) -> None: + self.message = message + + def __enter__(self) -> None: + logger.info(bold(self.message + '... '), nonl=True) + + def __exit__( + self, + typ: type[BaseException] | None, + val: BaseException | None, + tb: TracebackType | None, + ) -> bool: + if isinstance(val, SkipProgressMessage): + logger.info(__('skipped')) + if val.args: + logger.info(*val.args) + return True + elif val: + logger.info(__('failed')) + else: + logger.info(__('done')) + + return False + + def __call__(self, f: Callable) -> Callable: + @functools.wraps(f) + def wrapper(*args: Any, **kwargs: Any) -> Any: + with self: + return f(*args, **kwargs) + + return wrapper diff --git a/sphinx/util/docfields.py b/sphinx/util/docfields.py new file mode 100644 index 0000000..c48c3be --- /dev/null +++ b/sphinx/util/docfields.py @@ -0,0 +1,408 @@ +"""Utility code for "Doc fields". + +"Doc fields" are reST field lists in object descriptions that will +be domain-specifically transformed to a more appealing presentation. +""" +from __future__ import annotations + +import contextlib +from typing import TYPE_CHECKING, Any, cast + +from docutils import nodes +from docutils.nodes import Element, Node + +from sphinx import addnodes +from sphinx.locale import __ +from sphinx.util import logging +from sphinx.util.nodes import get_node_line + +if TYPE_CHECKING: + from docutils.parsers.rst.states import Inliner + + from sphinx.directives import ObjectDescription + from sphinx.environment import BuildEnvironment + from sphinx.util.typing import TextlikeNode + +logger = logging.getLogger(__name__) + + +def _is_single_paragraph(node: nodes.field_body) -> bool: + """True if the node only contains one paragraph (and system messages).""" + if len(node) == 0: + return False + elif len(node) > 1: + for subnode in node[1:]: # type: Node + if not isinstance(subnode, nodes.system_message): + return False + if isinstance(node[0], nodes.paragraph): + return True + return False + + +class Field: + """A doc field that is never grouped. It can have an argument or not, the + argument can be linked using a specified *rolename*. Field should be used + for doc fields that usually don't occur more than once. + + The body can be linked using a specified *bodyrolename* if the content is + just a single inline or text node. + + Example:: + + :returns: description of the return value + :rtype: description of the return type + """ + is_grouped = False + is_typed = False + + def __init__( + self, + name: str, + names: tuple[str, ...] = (), + label: str = '', + has_arg: bool = True, + rolename: str = '', + bodyrolename: str = '', + ) -> None: + self.name = name + self.names = names + self.label = label + self.has_arg = has_arg + self.rolename = rolename + self.bodyrolename = bodyrolename + + def make_xref(self, rolename: str, domain: str, target: str, + innernode: type[TextlikeNode] = addnodes.literal_emphasis, + contnode: Node | None = None, env: BuildEnvironment | None = None, + inliner: Inliner | None = None, location: Element | None = None) -> Node: + # note: for backwards compatibility env is last, but not optional + assert env is not None + assert (inliner is None) == (location is None), (inliner, location) + if not rolename: + return contnode or innernode(target, target) + # The domain is passed from DocFieldTransformer. So it surely exists. + # So we don't need to take care the env.get_domain() raises an exception. + role = env.get_domain(domain).role(rolename) + if role is None or inliner is None: + if role is None and inliner is not None: + msg = __("Problem in %s domain: field is supposed " + "to use role '%s', but that role is not in the domain.") + logger.warning(__(msg), domain, rolename, location=location) + refnode = addnodes.pending_xref('', refdomain=domain, refexplicit=False, + reftype=rolename, reftarget=target) + refnode += contnode or innernode(target, target) + env.get_domain(domain).process_field_xref(refnode) + return refnode + lineno = -1 + if location is not None: + with contextlib.suppress(ValueError): + lineno = get_node_line(location) + ns, messages = role(rolename, target, target, lineno, inliner, {}, []) + return nodes.inline(target, '', *ns) + + def make_xrefs(self, rolename: str, domain: str, target: str, + innernode: type[TextlikeNode] = addnodes.literal_emphasis, + contnode: Node | None = None, env: BuildEnvironment | None = None, + inliner: Inliner | None = None, location: Element | None = None, + ) -> list[Node]: + return [self.make_xref(rolename, domain, target, innernode, contnode, + env, inliner, location)] + + def make_entry(self, fieldarg: str, content: list[Node]) -> tuple[str, list[Node]]: + return (fieldarg, content) + + def make_field( + self, + types: dict[str, list[Node]], + domain: str, + item: tuple, + env: BuildEnvironment | None = None, + inliner: Inliner | None = None, + location: Element | None = None, + ) -> nodes.field: + fieldarg, content = item + fieldname = nodes.field_name('', self.label) + if fieldarg: + fieldname += nodes.Text(' ') + fieldname.extend(self.make_xrefs(self.rolename, domain, + fieldarg, nodes.Text, + env=env, inliner=inliner, location=location)) + + if len(content) == 1 and ( + isinstance(content[0], nodes.Text) or + (isinstance(content[0], nodes.inline) and len(content[0]) == 1 and + isinstance(content[0][0], nodes.Text))): + content = self.make_xrefs(self.bodyrolename, domain, + content[0].astext(), contnode=content[0], + env=env, inliner=inliner, location=location) + fieldbody = nodes.field_body('', nodes.paragraph('', '', *content)) + return nodes.field('', fieldname, fieldbody) + + +class GroupedField(Field): + """ + A doc field that is grouped; i.e., all fields of that type will be + transformed into one field with its body being a bulleted list. It always + has an argument. The argument can be linked using the given *rolename*. + GroupedField should be used for doc fields that can occur more than once. + If *can_collapse* is true, this field will revert to a Field if only used + once. + + Example:: + + :raises ErrorClass: description when it is raised + """ + is_grouped = True + list_type = nodes.bullet_list + + def __init__(self, name: str, names: tuple[str, ...] = (), label: str = '', + rolename: str = '', can_collapse: bool = False) -> None: + super().__init__(name, names, label, True, rolename) + self.can_collapse = can_collapse + + def make_field( + self, + types: dict[str, list[Node]], + domain: str, + items: tuple, + env: BuildEnvironment | None = None, + inliner: Inliner | None = None, + location: Element | None = None, + ) -> nodes.field: + fieldname = nodes.field_name('', self.label) + listnode = self.list_type() + for fieldarg, content in items: + par = nodes.paragraph() + par.extend(self.make_xrefs(self.rolename, domain, fieldarg, + addnodes.literal_strong, + env=env, inliner=inliner, location=location)) + par += nodes.Text(' -- ') + par += content + listnode += nodes.list_item('', par) + + if len(items) == 1 and self.can_collapse: + list_item = cast(nodes.list_item, listnode[0]) + fieldbody = nodes.field_body('', list_item[0]) + return nodes.field('', fieldname, fieldbody) + + fieldbody = nodes.field_body('', listnode) + return nodes.field('', fieldname, fieldbody) + + +class TypedField(GroupedField): + """ + A doc field that is grouped and has type information for the arguments. It + always has an argument. The argument can be linked using the given + *rolename*, the type using the given *typerolename*. + + Two uses are possible: either parameter and type description are given + separately, using a field from *names* and one from *typenames*, + respectively, or both are given using a field from *names*, see the example. + + Example:: + + :param foo: description of parameter foo + :type foo: SomeClass + + -- or -- + + :param SomeClass foo: description of parameter foo + """ + is_typed = True + + def __init__( + self, + name: str, + names: tuple[str, ...] = (), + typenames: tuple[str, ...] = (), + label: str = '', + rolename: str = '', + typerolename: str = '', + can_collapse: bool = False, + ) -> None: + super().__init__(name, names, label, rolename, can_collapse) + self.typenames = typenames + self.typerolename = typerolename + + def make_field( + self, + types: dict[str, list[Node]], + domain: str, + items: tuple, + env: BuildEnvironment | None = None, + inliner: Inliner | None = None, + location: Element | None = None, + ) -> nodes.field: + def handle_item(fieldarg: str, content: str) -> nodes.paragraph: + par = nodes.paragraph() + par.extend(self.make_xrefs(self.rolename, domain, fieldarg, + addnodes.literal_strong, env=env)) + if fieldarg in types: + par += nodes.Text(' (') + # NOTE: using .pop() here to prevent a single type node to be + # inserted twice into the doctree, which leads to + # inconsistencies later when references are resolved + fieldtype = types.pop(fieldarg) + if len(fieldtype) == 1 and isinstance(fieldtype[0], nodes.Text): + typename = fieldtype[0].astext() + par.extend(self.make_xrefs(self.typerolename, domain, typename, + addnodes.literal_emphasis, env=env, + inliner=inliner, location=location)) + else: + par += fieldtype + par += nodes.Text(')') + par += nodes.Text(' -- ') + par += content + return par + + fieldname = nodes.field_name('', self.label) + if len(items) == 1 and self.can_collapse: + fieldarg, content = items[0] + bodynode: Node = handle_item(fieldarg, content) + else: + bodynode = self.list_type() + for fieldarg, content in items: + bodynode += nodes.list_item('', handle_item(fieldarg, content)) + fieldbody = nodes.field_body('', bodynode) + return nodes.field('', fieldname, fieldbody) + + +class DocFieldTransformer: + """ + Transforms field lists in "doc field" syntax into better-looking + equivalents, using the field type definitions given on a domain. + """ + typemap: dict[str, tuple[Field, bool]] + + def __init__(self, directive: ObjectDescription) -> None: + self.directive = directive + + self.typemap = directive.get_field_type_map() + + def transform_all(self, node: addnodes.desc_content) -> None: + """Transform all field list children of a node.""" + # don't traverse, only handle field lists that are immediate children + for child in node: + if isinstance(child, nodes.field_list): + self.transform(child) + + def transform(self, node: nodes.field_list) -> None: + """Transform a single field list *node*.""" + typemap = self.typemap + + entries: list[nodes.field | tuple[Field, Any, Element]] = [] + groupindices: dict[str, int] = {} + types: dict[str, dict] = {} + + # step 1: traverse all fields and collect field types and content + for field in cast(list[nodes.field], node): + assert len(field) == 2 + field_name = cast(nodes.field_name, field[0]) + field_body = cast(nodes.field_body, field[1]) + try: + # split into field type and argument + fieldtype_name, fieldarg = field_name.astext().split(None, 1) + except ValueError: + # maybe an argument-less field type? + fieldtype_name, fieldarg = field_name.astext(), '' + typedesc, is_typefield = typemap.get(fieldtype_name, (None, None)) + + # collect the content, trying not to keep unnecessary paragraphs + if _is_single_paragraph(field_body): + paragraph = cast(nodes.paragraph, field_body[0]) + content = paragraph.children + else: + content = field_body.children + + # sort out unknown fields + if typedesc is None or typedesc.has_arg != bool(fieldarg): + # either the field name is unknown, or the argument doesn't + # match the spec; capitalize field name and be done with it + new_fieldname = fieldtype_name[0:1].upper() + fieldtype_name[1:] + if fieldarg: + new_fieldname += ' ' + fieldarg + field_name[0] = nodes.Text(new_fieldname) + entries.append(field) + + # but if this has a type then we can at least link it + if (typedesc and is_typefield and content and + len(content) == 1 and isinstance(content[0], nodes.Text)): + typed_field = cast(TypedField, typedesc) + target = content[0].astext() + xrefs = typed_field.make_xrefs( + typed_field.typerolename, + self.directive.domain or '', + target, + contnode=content[0], + env=self.directive.state.document.settings.env, + ) + if _is_single_paragraph(field_body): + paragraph = cast(nodes.paragraph, field_body[0]) + paragraph.clear() + paragraph.extend(xrefs) + else: + field_body.clear() + field_body += nodes.paragraph('', '', *xrefs) + + continue + + typename = typedesc.name + + # if the field specifies a type, put it in the types collection + if is_typefield: + # filter out only inline nodes; others will result in invalid + # markup being written out + content = [n for n in content if isinstance(n, (nodes.Inline, nodes.Text))] + if content: + types.setdefault(typename, {})[fieldarg] = content + continue + + # also support syntax like ``:param type name:`` + if typedesc.is_typed: + try: + argtype, argname = fieldarg.rsplit(None, 1) + except ValueError: + pass + else: + types.setdefault(typename, {})[argname] = \ + [nodes.Text(argtype)] + fieldarg = argname + + translatable_content = nodes.inline(field_body.rawsource, + translatable=True) + translatable_content.document = field_body.parent.document + translatable_content.source = field_body.parent.source + translatable_content.line = field_body.parent.line + translatable_content += content + + # grouped entries need to be collected in one entry, while others + # get one entry per field + if typedesc.is_grouped: + if typename in groupindices: + group = cast(tuple[Field, list, Node], entries[groupindices[typename]]) + else: + groupindices[typename] = len(entries) + group = (typedesc, [], field) + entries.append(group) + new_entry = typedesc.make_entry(fieldarg, [translatable_content]) + group[1].append(new_entry) + else: + new_entry = typedesc.make_entry(fieldarg, [translatable_content]) + entries.append((typedesc, new_entry, field)) + + # step 2: all entries are collected, construct the new field list + new_list = nodes.field_list() + for entry in entries: + if isinstance(entry, nodes.field): + # pass-through old field + new_list += entry + else: + fieldtype, items, location = entry + fieldtypes = types.get(fieldtype.name, {}) + env = self.directive.state.document.settings.env + inliner = self.directive.state.inliner + domain = self.directive.domain or '' + new_list += fieldtype.make_field(fieldtypes, domain, items, + env=env, inliner=inliner, location=location) + + node.replace_self(new_list) diff --git a/sphinx/util/docstrings.py b/sphinx/util/docstrings.py new file mode 100644 index 0000000..6ccc538 --- /dev/null +++ b/sphinx/util/docstrings.py @@ -0,0 +1,88 @@ +"""Utilities for docstring processing.""" + +from __future__ import annotations + +import re +import sys + +from docutils.parsers.rst.states import Body + +field_list_item_re = re.compile(Body.patterns['field_marker']) + + +def separate_metadata(s: str | None) -> tuple[str | None, dict[str, str]]: + """Separate docstring into metadata and others.""" + in_other_element = False + metadata: dict[str, str] = {} + lines = [] + + if not s: + return s, metadata + + for line in prepare_docstring(s): + if line.strip() == '': + in_other_element = False + lines.append(line) + else: + matched = field_list_item_re.match(line) + if matched and not in_other_element: + field_name = matched.group()[1:].split(':', 1)[0] + if field_name.startswith('meta '): + name = field_name[5:].strip() + metadata[name] = line[matched.end():].strip() + else: + lines.append(line) + else: + in_other_element = True + lines.append(line) + + return '\n'.join(lines), metadata + + +def prepare_docstring(s: str, tabsize: int = 8) -> list[str]: + """Convert a docstring into lines of parseable reST. Remove common leading + indentation, where the indentation of the first line is ignored. + + Return the docstring as a list of lines usable for inserting into a docutils + ViewList (used as argument of nested_parse().) An empty line is added to + act as a separator between this docstring and following content. + """ + lines = s.expandtabs(tabsize).splitlines() + # Find minimum indentation of any non-blank lines after ignored lines. + margin = sys.maxsize + for line in lines[1:]: + content = len(line.lstrip()) + if content: + indent = len(line) - content + margin = min(margin, indent) + # Remove indentation from the first line. + if len(lines): + lines[0] = lines[0].lstrip() + if margin < sys.maxsize: + for i in range(1, len(lines)): + lines[i] = lines[i][margin:] + # Remove any leading blank lines. + while lines and not lines[0]: + lines.pop(0) + # make sure there is an empty line at the end + if lines and lines[-1]: + lines.append('') + return lines + + +def prepare_commentdoc(s: str) -> list[str]: + """Extract documentation comment lines (starting with #:) and return them + as a list of lines. Returns an empty list if there is no documentation. + """ + result = [] + lines = [line.strip() for line in s.expandtabs().splitlines()] + for line in lines: + if line.startswith('#:'): + line = line[2:] + # the first space after the comment is ignored + if line and line[0] == ' ': + line = line[1:] + result.append(line) + if result and result[-1]: + result.append('') + return result diff --git a/sphinx/util/docutils.py b/sphinx/util/docutils.py new file mode 100644 index 0000000..a862417 --- /dev/null +++ b/sphinx/util/docutils.py @@ -0,0 +1,635 @@ +"""Utility functions for docutils.""" + +from __future__ import annotations + +import os +import re +from collections.abc import Sequence # NoQA: TCH003 +from contextlib import contextmanager +from copy import copy +from os import path +from typing import IO, TYPE_CHECKING, Any, Callable, cast + +import docutils +from docutils import nodes +from docutils.io import FileOutput +from docutils.parsers.rst import Directive, directives, roles +from docutils.parsers.rst.states import Inliner # NoQA: TCH002 +from docutils.statemachine import State, StateMachine, StringList +from docutils.utils import Reporter, unescape +from docutils.writers._html_base import HTMLTranslator + +from sphinx.errors import SphinxError +from sphinx.locale import _, __ +from sphinx.util import logging + +logger = logging.getLogger(__name__) +report_re = re.compile('^(.+?:(?:\\d+)?): \\((DEBUG|INFO|WARNING|ERROR|SEVERE)/(\\d+)?\\) ') + +if TYPE_CHECKING: + from collections.abc import Generator + from types import ModuleType + + from docutils.frontend import Values + from docutils.nodes import Element, Node, system_message + + from sphinx.builders import Builder + from sphinx.config import Config + from sphinx.environment import BuildEnvironment + from sphinx.util.typing import RoleFunction + +# deprecated name -> (object to return, canonical path or empty string) +_DEPRECATED_OBJECTS = { + '__version_info__': (docutils.__version_info__, 'docutils.__version_info__'), +} + + +def __getattr__(name): + if name not in _DEPRECATED_OBJECTS: + msg = f'module {__name__!r} has no attribute {name!r}' + raise AttributeError(msg) + + from sphinx.deprecation import _deprecation_warning + + deprecated_object, canonical_name = _DEPRECATED_OBJECTS[name] + _deprecation_warning(__name__, name, canonical_name, remove=(7, 0)) + return deprecated_object + + +additional_nodes: set[type[Element]] = set() + + +@contextmanager +def docutils_namespace() -> Generator[None, None, None]: + """Create namespace for reST parsers.""" + try: + _directives = copy(directives._directives) # type: ignore[attr-defined] + _roles = copy(roles._roles) # type: ignore[attr-defined] + + yield + finally: + directives._directives = _directives # type: ignore[attr-defined] + roles._roles = _roles # type: ignore[attr-defined] + + for node in list(additional_nodes): + unregister_node(node) + additional_nodes.discard(node) + + +def is_directive_registered(name: str) -> bool: + """Check the *name* directive is already registered.""" + return name in directives._directives # type: ignore[attr-defined] + + +def register_directive(name: str, directive: type[Directive]) -> None: + """Register a directive to docutils. + + This modifies global state of docutils. So it is better to use this + inside ``docutils_namespace()`` to prevent side-effects. + """ + directives.register_directive(name, directive) + + +def is_role_registered(name: str) -> bool: + """Check the *name* role is already registered.""" + return name in roles._roles # type: ignore[attr-defined] + + +def register_role(name: str, role: RoleFunction) -> None: + """Register a role to docutils. + + This modifies global state of docutils. So it is better to use this + inside ``docutils_namespace()`` to prevent side-effects. + """ + roles.register_local_role(name, role) + + +def unregister_role(name: str) -> None: + """Unregister a role from docutils.""" + roles._roles.pop(name, None) # type: ignore[attr-defined] + + +def is_node_registered(node: type[Element]) -> bool: + """Check the *node* is already registered.""" + return hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__) + + +def register_node(node: type[Element]) -> None: + """Register a node to docutils. + + This modifies global state of some visitors. So it is better to use this + inside ``docutils_namespace()`` to prevent side-effects. + """ + if not hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__): + nodes._add_node_class_names([node.__name__]) # type: ignore[attr-defined] + additional_nodes.add(node) + + +def unregister_node(node: type[Element]) -> None: + """Unregister a node from docutils. + + This is inverse of ``nodes._add_nodes_class_names()``. + """ + if hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__): + delattr(nodes.GenericNodeVisitor, "visit_" + node.__name__) + delattr(nodes.GenericNodeVisitor, "depart_" + node.__name__) + delattr(nodes.SparseNodeVisitor, 'visit_' + node.__name__) + delattr(nodes.SparseNodeVisitor, 'depart_' + node.__name__) + + +@contextmanager +def patched_get_language() -> Generator[None, None, None]: + """Patch docutils.languages.get_language() temporarily. + + This ignores the second argument ``reporter`` to suppress warnings. + refs: https://github.com/sphinx-doc/sphinx/issues/3788 + """ + from docutils.languages import get_language + + def patched_get_language(language_code: str, reporter: Reporter | None = None) -> Any: + return get_language(language_code) + + try: + docutils.languages.get_language = patched_get_language + yield + finally: + # restore original implementations + docutils.languages.get_language = get_language + + +@contextmanager +def patched_rst_get_language() -> Generator[None, None, None]: + """Patch docutils.parsers.rst.languages.get_language(). + Starting from docutils 0.17, get_language() in ``rst.languages`` + also has a reporter, which needs to be disabled temporarily. + + This should also work for old versions of docutils, + because reporter is none by default. + + refs: https://github.com/sphinx-doc/sphinx/issues/10179 + """ + from docutils.parsers.rst.languages import get_language + + def patched_get_language(language_code: str, reporter: Reporter | None = None) -> Any: + return get_language(language_code) + + try: + docutils.parsers.rst.languages.get_language = patched_get_language + yield + finally: + # restore original implementations + docutils.parsers.rst.languages.get_language = get_language + + +@contextmanager +def using_user_docutils_conf(confdir: str | None) -> Generator[None, None, None]: + """Let docutils know the location of ``docutils.conf`` for Sphinx.""" + try: + docutilsconfig = os.environ.get('DOCUTILSCONFIG', None) + if confdir: + os.environ['DOCUTILSCONFIG'] = path.join(path.abspath(confdir), 'docutils.conf') + + yield + finally: + if docutilsconfig is None: + os.environ.pop('DOCUTILSCONFIG', None) + else: + os.environ['DOCUTILSCONFIG'] = docutilsconfig + + +@contextmanager +def du19_footnotes() -> Generator[None, None, None]: + def visit_footnote(self, node): + label_style = self.settings.footnote_references + if not isinstance(node.previous_sibling(), type(node)): + self.body.append(f'\n') + if not isinstance(node.next_node(descend=False, siblings=True), + type(node)): + self.body.append('\n') + + old_visit_footnote = HTMLTranslator.visit_footnote + old_depart_footnote = HTMLTranslator.depart_footnote + + # Only apply on Docutils 0.18 or 0.18.1, as 0.17 and earlier used a
based + # approach, and 0.19 and later use the fixed approach by default. + if docutils.__version_info__[:2] == (0, 18): + HTMLTranslator.visit_footnote = visit_footnote # type: ignore[method-assign] + HTMLTranslator.depart_footnote = depart_footnote # type: ignore[method-assign] + + try: + yield + finally: + if docutils.__version_info__[:2] == (0, 18): + HTMLTranslator.visit_footnote = old_visit_footnote # type: ignore[method-assign] + HTMLTranslator.depart_footnote = old_depart_footnote # type: ignore[method-assign] + + +@contextmanager +def patch_docutils(confdir: str | None = None) -> Generator[None, None, None]: + """Patch to docutils temporarily.""" + with patched_get_language(), \ + patched_rst_get_language(), \ + using_user_docutils_conf(confdir), \ + du19_footnotes(): + yield + + +class CustomReSTDispatcher: + """Custom reST's mark-up dispatcher. + + This replaces docutils's directives and roles dispatch mechanism for reST parser + by original one temporarily. + """ + + def __init__(self) -> None: + self.directive_func: Callable = lambda *args: (None, []) + self.roles_func: Callable = lambda *args: (None, []) + + def __enter__(self) -> None: + self.enable() + + def __exit__( + self, exc_type: type[Exception], exc_value: Exception, traceback: Any, + ) -> None: + self.disable() + + def enable(self) -> None: + self.directive_func = directives.directive + self.role_func = roles.role + + directives.directive = self.directive + roles.role = self.role + + def disable(self) -> None: + directives.directive = self.directive_func + roles.role = self.role_func + + def directive(self, + directive_name: str, language_module: ModuleType, document: nodes.document, + ) -> tuple[type[Directive] | None, list[system_message]]: + return self.directive_func(directive_name, language_module, document) + + def role( + self, role_name: str, language_module: ModuleType, lineno: int, reporter: Reporter, + ) -> tuple[RoleFunction, list[system_message]]: + return self.role_func(role_name, language_module, # type: ignore[return-value] + lineno, reporter) + + +class ElementLookupError(Exception): + pass + + +class sphinx_domains(CustomReSTDispatcher): + """Monkey-patch directive and role dispatch, so that domain-specific + markup takes precedence. + """ + def __init__(self, env: BuildEnvironment) -> None: + self.env = env + super().__init__() + + def lookup_domain_element(self, type: str, name: str) -> Any: + """Lookup a markup element (directive or role), given its name which can + be a full name (with domain). + """ + name = name.lower() + # explicit domain given? + if ':' in name: + domain_name, name = name.split(':', 1) + if domain_name in self.env.domains: + domain = self.env.get_domain(domain_name) + element = getattr(domain, type)(name) + if element is not None: + return element, [] + else: + logger.warning(_('unknown directive or role name: %s:%s'), domain_name, name) + # else look in the default domain + else: + def_domain = self.env.temp_data.get('default_domain') + if def_domain is not None: + element = getattr(def_domain, type)(name) + if element is not None: + return element, [] + + # always look in the std domain + element = getattr(self.env.get_domain('std'), type)(name) + if element is not None: + return element, [] + + raise ElementLookupError + + def directive(self, + directive_name: str, language_module: ModuleType, document: nodes.document, + ) -> tuple[type[Directive] | None, list[system_message]]: + try: + return self.lookup_domain_element('directive', directive_name) + except ElementLookupError: + return super().directive(directive_name, language_module, document) + + def role( + self, role_name: str, language_module: ModuleType, lineno: int, reporter: Reporter, + ) -> tuple[RoleFunction, list[system_message]]: + try: + return self.lookup_domain_element('role', role_name) + except ElementLookupError: + return super().role(role_name, language_module, lineno, reporter) + + +class WarningStream: + def write(self, text: str) -> None: + matched = report_re.search(text) + if not matched: + logger.warning(text.rstrip("\r\n")) + else: + location, type, level = matched.groups() + message = report_re.sub('', text).rstrip() + logger.log(type, message, location=location) + + +class LoggingReporter(Reporter): + @classmethod + def from_reporter(cls, reporter: Reporter) -> LoggingReporter: + """Create an instance of LoggingReporter from other reporter object.""" + return cls(reporter.source, reporter.report_level, reporter.halt_level, + reporter.debug_flag, reporter.error_handler) + + def __init__(self, source: str, report_level: int = Reporter.WARNING_LEVEL, + halt_level: int = Reporter.SEVERE_LEVEL, debug: bool = False, + error_handler: str = 'backslashreplace') -> None: + stream = cast(IO, WarningStream()) + super().__init__(source, report_level, halt_level, + stream, debug, error_handler=error_handler) + + +class NullReporter(Reporter): + """A dummy reporter; write nothing.""" + + def __init__(self) -> None: + super().__init__('', 999, 4) + + +@contextmanager +def switch_source_input(state: State, content: StringList) -> Generator[None, None, None]: + """Switch current source input of state temporarily.""" + try: + # remember the original ``get_source_and_line()`` method + gsal = state.memo.reporter.get_source_and_line # type: ignore[attr-defined] + + # replace it by new one + state_machine = StateMachine([], None) # type: ignore[arg-type] + state_machine.input_lines = content + state.memo.reporter.get_source_and_line = state_machine.get_source_and_line # type: ignore[attr-defined] # noqa: E501 + + yield + finally: + # restore the method + state.memo.reporter.get_source_and_line = gsal # type: ignore[attr-defined] + + +class SphinxFileOutput(FileOutput): + """Better FileOutput class for Sphinx.""" + + def __init__(self, **kwargs: Any) -> None: + self.overwrite_if_changed = kwargs.pop('overwrite_if_changed', False) + kwargs.setdefault('encoding', 'utf-8') + super().__init__(**kwargs) + + def write(self, data: str) -> str: + if (self.destination_path and self.autoclose and 'b' not in self.mode and + self.overwrite_if_changed and os.path.exists(self.destination_path)): + with open(self.destination_path, encoding=self.encoding) as f: + # skip writing: content not changed + if f.read() == data: + return data + + return super().write(data) + + +class SphinxDirective(Directive): + """A base class for Sphinx directives. + + This class provides helper methods for Sphinx directives. + + .. note:: The subclasses of this class might not work with docutils. + This class is strongly coupled with Sphinx. + """ + + @property + def env(self) -> BuildEnvironment: + """Reference to the :class:`.BuildEnvironment` object.""" + return self.state.document.settings.env + + @property + def config(self) -> Config: + """Reference to the :class:`.Config` object.""" + return self.env.config + + def get_source_info(self) -> tuple[str, int]: + """Get source and line number.""" + return self.state_machine.get_source_and_line(self.lineno) + + def set_source_info(self, node: Node) -> None: + """Set source and line number to the node.""" + node.source, node.line = self.get_source_info() + + def get_location(self) -> str: + """Get current location info for logging.""" + return ':'.join(str(s) for s in self.get_source_info()) + + +class SphinxRole: + """A base class for Sphinx roles. + + This class provides helper methods for Sphinx roles. + + .. note:: The subclasses of this class might not work with docutils. + This class is strongly coupled with Sphinx. + """ + name: str #: The role name actually used in the document. + rawtext: str #: A string containing the entire interpreted text input. + text: str #: The interpreted text content. + lineno: int #: The line number where the interpreted text begins. + inliner: Inliner #: The ``docutils.parsers.rst.states.Inliner`` object. + #: A dictionary of directive options for customisation + #: (from the "role" directive). + options: dict[str, Any] + #: A list of strings, the directive content for customisation + #: (from the "role" directive). + content: Sequence[str] + + def __call__(self, name: str, rawtext: str, text: str, lineno: int, + inliner: Inliner, options: dict | None = None, content: Sequence[str] = (), + ) -> tuple[list[Node], list[system_message]]: + self.rawtext = rawtext + self.text = unescape(text) + self.lineno = lineno + self.inliner = inliner + self.options = options if options is not None else {} + self.content = content + + # guess role type + if name: + self.name = name.lower() + else: + self.name = self.env.temp_data.get('default_role', '') + if not self.name: + self.name = self.env.config.default_role + if not self.name: + msg = 'cannot determine default role!' + raise SphinxError(msg) + + return self.run() + + def run(self) -> tuple[list[Node], list[system_message]]: + raise NotImplementedError + + @property + def env(self) -> BuildEnvironment: + """Reference to the :class:`.BuildEnvironment` object.""" + return self.inliner.document.settings.env + + @property + def config(self) -> Config: + """Reference to the :class:`.Config` object.""" + return self.env.config + + def get_source_info(self, lineno: int | None = None) -> tuple[str, int]: + if lineno is None: + lineno = self.lineno + return self.inliner.reporter.get_source_and_line(lineno) # type: ignore[attr-defined] + + def set_source_info(self, node: Node, lineno: int | None = None) -> None: + node.source, node.line = self.get_source_info(lineno) + + def get_location(self) -> str: + """Get current location info for logging.""" + return ':'.join(str(s) for s in self.get_source_info()) + + +class ReferenceRole(SphinxRole): + """A base class for reference roles. + + The reference roles can accept ``link title `` style as a text for + the role. The parsed result; link title and target will be stored to + ``self.title`` and ``self.target``. + """ + has_explicit_title: bool #: A boolean indicates the role has explicit title or not. + disabled: bool #: A boolean indicates the reference is disabled. + title: str #: The link title for the interpreted text. + target: str #: The link target for the interpreted text. + + # \x00 means the "<" was backslash-escaped + explicit_title_re = re.compile(r'^(.+?)\s*(?$', re.DOTALL) + + def __call__(self, name: str, rawtext: str, text: str, lineno: int, + inliner: Inliner, options: dict | None = None, content: Sequence[str] = (), + ) -> tuple[list[Node], list[system_message]]: + if options is None: + options = {} + + # if the first character is a bang, don't cross-reference at all + self.disabled = text.startswith('!') + + matched = self.explicit_title_re.match(text) + if matched: + self.has_explicit_title = True + self.title = unescape(matched.group(1)) + self.target = unescape(matched.group(2)) + else: + self.has_explicit_title = False + self.title = unescape(text) + self.target = unescape(text) + + return super().__call__(name, rawtext, text, lineno, inliner, options, content) + + +class SphinxTranslator(nodes.NodeVisitor): + """A base class for Sphinx translators. + + This class adds a support for visitor/departure method for super node class + if visitor/departure method for node class is not found. + + It also provides helper methods for Sphinx translators. + + .. note:: The subclasses of this class might not work with docutils. + This class is strongly coupled with Sphinx. + """ + + def __init__(self, document: nodes.document, builder: Builder) -> None: + super().__init__(document) + self.builder = builder + self.config = builder.config + self.settings = document.settings + + def dispatch_visit(self, node: Node) -> None: + """ + Dispatch node to appropriate visitor method. + The priority of visitor method is: + + 1. ``self.visit_{node_class}()`` + 2. ``self.visit_{super_node_class}()`` + 3. ``self.unknown_visit()`` + """ + for node_class in node.__class__.__mro__: + method = getattr(self, 'visit_%s' % (node_class.__name__), None) + if method: + method(node) + break + else: + super().dispatch_visit(node) + + def dispatch_departure(self, node: Node) -> None: + """ + Dispatch node to appropriate departure method. + The priority of departure method is: + + 1. ``self.depart_{node_class}()`` + 2. ``self.depart_{super_node_class}()`` + 3. ``self.unknown_departure()`` + """ + for node_class in node.__class__.__mro__: + method = getattr(self, 'depart_%s' % (node_class.__name__), None) + if method: + method(node) + break + else: + super().dispatch_departure(node) + + def unknown_visit(self, node: Node) -> None: + logger.warning(__('unknown node type: %r'), node, location=node) + + +# cache a vanilla instance of nodes.document +# Used in new_document() function +__document_cache__: tuple[Values, Reporter] + + +def new_document(source_path: str, settings: Any = None) -> nodes.document: + """Return a new empty document object. This is an alternative of docutils'. + + This is a simple wrapper for ``docutils.utils.new_document()``. It + caches the result of docutils' and use it on second call for instantiation. + This makes an instantiation of document nodes much faster. + """ + global __document_cache__ + try: + cached_settings, reporter = __document_cache__ + except NameError: + doc = docutils.utils.new_document(source_path) + __document_cache__ = cached_settings, reporter = doc.settings, doc.reporter + + if settings is None: + # Make a copy of the cached settings to accelerate instantiation + settings = copy(cached_settings) + + # Create a new instance of nodes.document using cached reporter + from sphinx import addnodes + document = addnodes.document(settings, reporter, source=source_path) + document.note_source(source_path, -1) + return document diff --git a/sphinx/util/exceptions.py b/sphinx/util/exceptions.py new file mode 100644 index 0000000..9e25695 --- /dev/null +++ b/sphinx/util/exceptions.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import sys +import traceback +from tempfile import NamedTemporaryFile +from typing import TYPE_CHECKING + +from sphinx.errors import SphinxParallelError +from sphinx.util.console import strip_colors + +if TYPE_CHECKING: + from sphinx.application import Sphinx + + +def save_traceback(app: Sphinx | None, exc: BaseException) -> str: + """Save the given exception's traceback in a temporary file.""" + import platform + + import docutils + import jinja2 + import pygments + + import sphinx + + if isinstance(exc, SphinxParallelError): + exc_format = '(Error in parallel process)\n' + exc.traceback + else: + exc_format = traceback.format_exc() + + if app is None: + last_msgs = exts_list = '' + else: + extensions = app.extensions.values() + last_msgs = '\n'.join(f'# {strip_colors(s).strip()}' for s in app.messagelog) + exts_list = '\n'.join(f'# {ext.name} ({ext.version})' for ext in extensions + if ext.version != 'builtin') + + with NamedTemporaryFile('w', suffix='.log', prefix='sphinx-err-', delete=False) as f: + f.write(f"""\ +# Platform: {sys.platform}; ({platform.platform()}) +# Sphinx version: {sphinx.__display_version__} +# Python version: {platform.python_version()} ({platform.python_implementation()}) +# Docutils version: {docutils.__version__} +# Jinja2 version: {jinja2.__version__} +# Pygments version: {pygments.__version__} + +# Last messages: +{last_msgs} + +# Loaded extensions: +{exts_list} + +# Traceback: +{exc_format} +""") + return f.name + + +def format_exception_cut_frames(x: int = 1) -> str: + """Format an exception with traceback, but only the last x frames.""" + typ, val, tb = sys.exc_info() + # res = ['Traceback (most recent call last):\n'] + res: list[str] = [] + tbres = traceback.format_tb(tb) + res += tbres[-x:] + res += traceback.format_exception_only(typ, val) + return ''.join(res) diff --git a/sphinx/util/fileutil.py b/sphinx/util/fileutil.py new file mode 100644 index 0000000..316ec39 --- /dev/null +++ b/sphinx/util/fileutil.py @@ -0,0 +1,100 @@ +"""File utility functions for Sphinx.""" + +from __future__ import annotations + +import os +import posixpath +from typing import TYPE_CHECKING, Callable + +from docutils.utils import relative_path + +from sphinx.util.osutil import copyfile, ensuredir + +if TYPE_CHECKING: + from sphinx.util.template import BaseRenderer + from sphinx.util.typing import PathMatcher + + +def copy_asset_file(source: str | os.PathLike[str], destination: str | os.PathLike[str], + context: dict | None = None, + renderer: BaseRenderer | None = None) -> None: + """Copy an asset file to destination. + + On copying, it expands the template variables if context argument is given and + the asset is a template file. + + :param source: The path to source file + :param destination: The path to destination file or directory + :param context: The template variables. If not given, template files are simply copied + :param renderer: The template engine. If not given, SphinxRenderer is used by default + """ + if not os.path.exists(source): + return + + if os.path.isdir(destination): + # Use source filename if destination points a directory + destination = os.path.join(destination, os.path.basename(source)) + else: + destination = str(destination) + + if os.path.basename(source).endswith(('_t', '_T')) and context is not None: + if renderer is None: + from sphinx.util.template import SphinxRenderer + renderer = SphinxRenderer() + + with open(source, encoding='utf-8') as fsrc: + if destination.endswith(('_t', '_T')): + destination = destination[:-2] + with open(destination, 'w', encoding='utf-8') as fdst: + fdst.write(renderer.render_string(fsrc.read(), context)) + else: + copyfile(source, destination) + + +def copy_asset(source: str | os.PathLike[str], destination: str | os.PathLike[str], + excluded: PathMatcher = lambda path: False, + context: dict | None = None, renderer: BaseRenderer | None = None, + onerror: Callable[[str, Exception], None] | None = None) -> None: + """Copy asset files to destination recursively. + + On copying, it expands the template variables if context argument is given and + the asset is a template file. + + :param source: The path to source file or directory + :param destination: The path to destination directory + :param excluded: The matcher to determine the given path should be copied or not + :param context: The template variables. If not given, template files are simply copied + :param renderer: The template engine. If not given, SphinxRenderer is used by default + :param onerror: The error handler. + """ + if not os.path.exists(source): + return + + if renderer is None: + from sphinx.util.template import SphinxRenderer + renderer = SphinxRenderer() + + ensuredir(destination) + if os.path.isfile(source): + copy_asset_file(source, destination, context, renderer) + return + + for root, dirs, files in os.walk(source, followlinks=True): + reldir = relative_path(source, root) # type: ignore[arg-type] + for dir in dirs[:]: + if excluded(posixpath.join(reldir, dir)): + dirs.remove(dir) + else: + ensuredir(posixpath.join(destination, reldir, dir)) + + for filename in files: + if not excluded(posixpath.join(reldir, filename)): + try: + copy_asset_file(posixpath.join(root, filename), + posixpath.join(destination, reldir), + context, renderer) + except Exception as exc: + if onerror: + onerror(posixpath.join(root, filename), exc) + else: + raise diff --git a/sphinx/util/http_date.py b/sphinx/util/http_date.py new file mode 100644 index 0000000..8e245cb --- /dev/null +++ b/sphinx/util/http_date.py @@ -0,0 +1,39 @@ +"""Convert times to and from HTTP-date serialisations. + +Reference: https://www.rfc-editor.org/rfc/rfc7231#section-7.1.1.1 +""" + +import time +import warnings +from email.utils import formatdate, parsedate_tz + +from sphinx.deprecation import RemovedInSphinx90Warning + +_GMT_OFFSET = float(time.localtime().tm_gmtoff) + + +def epoch_to_rfc1123(epoch: float) -> str: + """Return HTTP-date string from epoch offset.""" + return formatdate(epoch, usegmt=True) + + +def rfc1123_to_epoch(rfc1123: str) -> float: + """Return epoch offset from HTTP-date string.""" + t = parsedate_tz(rfc1123) + if t is None: + raise ValueError + if not rfc1123.endswith(" GMT"): + warnings.warn( + "HTTP-date string does not meet RFC 7231 requirements " + f"(must end with 'GMT'): {rfc1123!r}", + RemovedInSphinx90Warning, stacklevel=3, + ) + epoch_secs = time.mktime(time.struct_time(t[:9])) + _GMT_OFFSET + if (gmt_offset := t[9]) != 0: + warnings.warn( + "HTTP-date string does not meet RFC 7231 requirements " + f"(must be GMT time): {rfc1123!r}", + RemovedInSphinx90Warning, stacklevel=3, + ) + return epoch_secs - (gmt_offset or 0) + return epoch_secs diff --git a/sphinx/util/i18n.py b/sphinx/util/i18n.py new file mode 100644 index 0000000..b820884 --- /dev/null +++ b/sphinx/util/i18n.py @@ -0,0 +1,253 @@ +"""Builder superclass for all builders.""" + +from __future__ import annotations + +import os +import re +from datetime import datetime, timezone +from os import path +from typing import TYPE_CHECKING, Callable, NamedTuple + +import babel.dates +from babel.messages.mofile import write_mo +from babel.messages.pofile import read_po + +from sphinx.errors import SphinxError +from sphinx.locale import __ +from sphinx.util import logging +from sphinx.util.osutil import SEP, canon_path, relpath + +if TYPE_CHECKING: + from collections.abc import Generator + + from sphinx.environment import BuildEnvironment + + +logger = logging.getLogger(__name__) + + +class LocaleFileInfoBase(NamedTuple): + base_dir: str + domain: str + charset: str + + +class CatalogInfo(LocaleFileInfoBase): + + @property + def po_file(self) -> str: + return self.domain + '.po' + + @property + def mo_file(self) -> str: + return self.domain + '.mo' + + @property + def po_path(self) -> str: + return path.join(self.base_dir, self.po_file) + + @property + def mo_path(self) -> str: + return path.join(self.base_dir, self.mo_file) + + def is_outdated(self) -> bool: + return ( + not path.exists(self.mo_path) or + path.getmtime(self.mo_path) < path.getmtime(self.po_path)) + + def write_mo(self, locale: str, use_fuzzy: bool = False) -> None: + with open(self.po_path, encoding=self.charset) as file_po: + try: + po = read_po(file_po, locale) + except Exception as exc: + logger.warning(__('reading error: %s, %s'), self.po_path, exc) + return + + with open(self.mo_path, 'wb') as file_mo: + try: + write_mo(file_mo, po, use_fuzzy) + except Exception as exc: + logger.warning(__('writing error: %s, %s'), self.mo_path, exc) + + +class CatalogRepository: + """A repository for message catalogs.""" + + def __init__(self, basedir: str | os.PathLike[str], locale_dirs: list[str], + language: str, encoding: str) -> None: + self.basedir = basedir + self._locale_dirs = locale_dirs + self.language = language + self.encoding = encoding + + @property + def locale_dirs(self) -> Generator[str, None, None]: + if not self.language: + return + + for locale_dir in self._locale_dirs: + locale_dir = path.join(self.basedir, locale_dir) + locale_path = path.join(locale_dir, self.language, 'LC_MESSAGES') + if path.exists(locale_path): + yield locale_dir + else: + logger.verbose(__('locale_dir %s does not exist'), locale_path) + + @property + def pofiles(self) -> Generator[tuple[str, str], None, None]: + for locale_dir in self.locale_dirs: + basedir = path.join(locale_dir, self.language, 'LC_MESSAGES') + for root, dirnames, filenames in os.walk(basedir): + # skip dot-directories + for dirname in dirnames: + if dirname.startswith('.'): + dirnames.remove(dirname) + + for filename in filenames: + if filename.endswith('.po'): + fullpath = path.join(root, filename) + yield basedir, relpath(fullpath, basedir) + + @property + def catalogs(self) -> Generator[CatalogInfo, None, None]: + for basedir, filename in self.pofiles: + domain = canon_path(path.splitext(filename)[0]) + yield CatalogInfo(basedir, domain, self.encoding) + + +def docname_to_domain(docname: str, compaction: bool | str) -> str: + """Convert docname to domain for catalogs.""" + if isinstance(compaction, str): + return compaction + if compaction: + return docname.split(SEP, 1)[0] + else: + return docname + + +# date_format mappings: ustrftime() to babel.dates.format_datetime() +date_format_mappings = { + '%a': 'EEE', # Weekday as locale’s abbreviated name. + '%A': 'EEEE', # Weekday as locale’s full name. + '%b': 'MMM', # Month as locale’s abbreviated name. + '%B': 'MMMM', # Month as locale’s full name. + '%c': 'medium', # Locale’s appropriate date and time representation. + '%-d': 'd', # Day of the month as a decimal number. + '%d': 'dd', # Day of the month as a zero-padded decimal number. + '%-H': 'H', # Hour (24-hour clock) as a decimal number [0,23]. + '%H': 'HH', # Hour (24-hour clock) as a zero-padded decimal number [00,23]. + '%-I': 'h', # Hour (12-hour clock) as a decimal number [1,12]. + '%I': 'hh', # Hour (12-hour clock) as a zero-padded decimal number [01,12]. + '%-j': 'D', # Day of the year as a decimal number. + '%j': 'DDD', # Day of the year as a zero-padded decimal number. + '%-m': 'M', # Month as a decimal number. + '%m': 'MM', # Month as a zero-padded decimal number. + '%-M': 'm', # Minute as a decimal number [0,59]. + '%M': 'mm', # Minute as a zero-padded decimal number [00,59]. + '%p': 'a', # Locale’s equivalent of either AM or PM. + '%-S': 's', # Second as a decimal number. + '%S': 'ss', # Second as a zero-padded decimal number. + '%U': 'WW', # Week number of the year (Sunday as the first day of the week) + # as a zero padded decimal number. All days in a new year preceding + # the first Sunday are considered to be in week 0. + '%w': 'e', # Weekday as a decimal number, where 0 is Sunday and 6 is Saturday. + '%-W': 'W', # Week number of the year (Monday as the first day of the week) + # as a decimal number. All days in a new year preceding the first + # Monday are considered to be in week 0. + '%W': 'WW', # Week number of the year (Monday as the first day of the week) + # as a zero-padded decimal number. + '%x': 'medium', # Locale’s appropriate date representation. + '%X': 'medium', # Locale’s appropriate time representation. + '%y': 'YY', # Year without century as a zero-padded decimal number. + '%Y': 'yyyy', # Year with century as a decimal number. + '%Z': 'zzz', # Time zone name (no characters if no time zone exists). + '%z': 'ZZZ', # UTC offset in the form ±HHMM[SS[.ffffff]] + # (empty string if the object is naive). + '%%': '%', +} + +date_format_re = re.compile('(%s)' % '|'.join(date_format_mappings)) + + +def babel_format_date(date: datetime, format: str, locale: str, + formatter: Callable = babel.dates.format_date) -> str: + # Check if we have the tzinfo attribute. If not we cannot do any time + # related formats. + if not hasattr(date, 'tzinfo'): + formatter = babel.dates.format_date + + try: + return formatter(date, format, locale=locale) + except (ValueError, babel.core.UnknownLocaleError): + # fallback to English + return formatter(date, format, locale='en') + except AttributeError: + logger.warning(__('Invalid date format. Quote the string by single quote ' + 'if you want to output it directly: %s'), format) + return format + + +def format_date( + format: str, *, date: datetime | None = None, language: str, +) -> str: + if date is None: + # If time is not specified, try to use $SOURCE_DATE_EPOCH variable + # See https://wiki.debian.org/ReproducibleBuilds/TimestampsProposal + source_date_epoch = os.getenv('SOURCE_DATE_EPOCH') + if source_date_epoch is not None: + date = datetime.fromtimestamp(float(source_date_epoch), tz=timezone.utc) + else: + date = datetime.now(tz=timezone.utc).astimezone() + + result = [] + tokens = date_format_re.split(format) + for token in tokens: + if token in date_format_mappings: + babel_format = date_format_mappings.get(token, '') + + # Check if we have to use a different babel formatter then + # format_datetime, because we only want to format a date + # or a time. + if token == '%x': + function = babel.dates.format_date + elif token == '%X': + function = babel.dates.format_time + else: + function = babel.dates.format_datetime + + result.append(babel_format_date(date, babel_format, locale=language, + formatter=function)) + else: + result.append(token) + + return "".join(result) + + +def get_image_filename_for_language( + filename: str | os.PathLike[str], + env: BuildEnvironment, +) -> str: + root, ext = path.splitext(filename) + dirname = path.dirname(root) + docpath = path.dirname(env.docname) + try: + return env.config.figure_language_filename.format( + root=root, + ext=ext, + path=dirname and dirname + SEP, + basename=path.basename(root), + docpath=docpath and docpath + SEP, + language=env.config.language, + ) + except KeyError as exc: + msg = f'Invalid figure_language_filename: {exc!r}' + raise SphinxError(msg) from exc + + +def search_image_for_language(filename: str, env: BuildEnvironment) -> str: + translated = get_image_filename_for_language(filename, env) + _, abspath = env.relfn2path(translated) + if path.exists(abspath): + return translated + else: + return filename diff --git a/sphinx/util/images.py b/sphinx/util/images.py new file mode 100644 index 0000000..ac0e7f4 --- /dev/null +++ b/sphinx/util/images.py @@ -0,0 +1,146 @@ +"""Image utility functions for Sphinx.""" + +from __future__ import annotations + +import base64 +from os import path +from typing import TYPE_CHECKING, NamedTuple, overload + +import imagesize + +if TYPE_CHECKING: + from os import PathLike + +try: + from PIL import Image +except ImportError: + Image = None + +mime_suffixes = { + '.gif': 'image/gif', + '.jpg': 'image/jpeg', + '.png': 'image/png', + '.pdf': 'application/pdf', + '.svg': 'image/svg+xml', + '.svgz': 'image/svg+xml', + '.ai': 'application/illustrator', +} +_suffix_from_mime = {v: k for k, v in reversed(mime_suffixes.items())} + + +class DataURI(NamedTuple): + mimetype: str + charset: str + data: bytes + + +def get_image_size(filename: str) -> tuple[int, int] | None: + try: + size = imagesize.get(filename) + if size[0] == -1: + size = None + elif isinstance(size[0], float) or isinstance(size[1], float): + size = (int(size[0]), int(size[1])) + + if size is None and Image: # fallback to Pillow + with Image.open(filename) as im: + size = im.size + + return size + except Exception: + return None + + +@overload +def guess_mimetype(filename: PathLike[str] | str, default: str) -> str: + ... + + +@overload +def guess_mimetype(filename: PathLike[str] | str, default: None = None) -> str | None: + ... + + +def guess_mimetype( + filename: PathLike[str] | str = '', + default: str | None = None, +) -> str | None: + ext = path.splitext(filename)[1].lower() + if ext in mime_suffixes: + return mime_suffixes[ext] + if path.exists(filename): + try: + imgtype = _image_type_from_file(filename) + except ValueError: + pass + else: + return 'image/' + imgtype + return default + + +def get_image_extension(mimetype: str) -> str | None: + return _suffix_from_mime.get(mimetype) + + +def parse_data_uri(uri: str) -> DataURI | None: + if not uri.startswith('data:'): + return None + + # data:[][;charset=][;base64], + mimetype = 'text/plain' + charset = 'US-ASCII' + + properties, data = uri[5:].split(',', 1) + for prop in properties.split(';'): + if prop == 'base64': + pass # skip + elif prop.startswith('charset='): + charset = prop[8:] + elif prop: + mimetype = prop + + image_data = base64.b64decode(data) + return DataURI(mimetype, charset, image_data) + + +def _image_type_from_file(filename: PathLike[str] | str) -> str: + with open(filename, 'rb') as f: + header = f.read(32) # 32 bytes + + # Bitmap + # https://en.wikipedia.org/wiki/BMP_file_format#Bitmap_file_header + if header.startswith(b'BM'): + return 'bmp' + + # GIF + # https://en.wikipedia.org/wiki/GIF#File_format + if header.startswith((b'GIF87a', b'GIF89a')): + return 'gif' + + # JPEG data + # https://en.wikipedia.org/wiki/JPEG_File_Interchange_Format#File_format_structure + if header.startswith(b'\xFF\xD8'): + return 'jpeg' + + # Portable Network Graphics + # https://en.wikipedia.org/wiki/PNG#File_header + if header.startswith(b'\x89PNG\r\n\x1A\n'): + return 'png' + + # Scalable Vector Graphics + # https://svgwg.org/svg2-draft/struct.html + if b' list[str]: + # new entry types must be listed in util/nodes.py! + if entry_type == 'single': + try: + return _split_into(2, 'single', value) + except ValueError: + return _split_into(1, 'single', value) + if entry_type == 'pair': + return _split_into(2, 'pair', value) + if entry_type == 'triple': + return _split_into(3, 'triple', value) + if entry_type in {'see', 'seealso'}: + return _split_into(2, 'see', value) + msg = f'invalid {entry_type} index entry {value!r}' + raise ValueError(msg) + + +def _split_into(n: int, type: str, value: str) -> list[str]: + """Split an index entry into a given number of parts at semicolons.""" + parts = [x.strip() for x in value.split(';', n - 1)] + if len(list(filter(None, parts))) < n: + msg = f'invalid {type} index entry {value!r}' + raise ValueError(msg) + return parts diff --git a/sphinx/util/inspect.py b/sphinx/util/inspect.py new file mode 100644 index 0000000..7d7fbb8 --- /dev/null +++ b/sphinx/util/inspect.py @@ -0,0 +1,833 @@ +"""Helpers for inspecting Python modules.""" + +from __future__ import annotations + +import ast +import builtins +import contextlib +import enum +import inspect +import re +import sys +import types +import typing +from collections.abc import Mapping, Sequence +from functools import cached_property, partial, partialmethod, singledispatchmethod +from importlib import import_module +from inspect import ( # noqa: F401 + Parameter, + isasyncgenfunction, + isclass, + ismethod, + ismethoddescriptor, + ismodule, +) +from io import StringIO +from types import ( + ClassMethodDescriptorType, + MethodDescriptorType, + MethodType, + ModuleType, + WrapperDescriptorType, +) +from typing import Any, Callable, cast + +from sphinx.pycode.ast import unparse as ast_unparse +from sphinx.util import logging +from sphinx.util.typing import ForwardRef, stringify_annotation + +logger = logging.getLogger(__name__) + +memory_address_re = re.compile(r' at 0x[0-9a-f]{8,16}(?=>)', re.IGNORECASE) + + +def unwrap(obj: Any) -> Any: + """Get an original object from wrapped object (wrapped functions).""" + if hasattr(obj, '__sphinx_mock__'): + # Skip unwrapping mock object to avoid RecursionError + return obj + try: + return inspect.unwrap(obj) + except ValueError: + # might be a mock object + return obj + + +def unwrap_all(obj: Any, *, stop: Callable | None = None) -> Any: + """ + Get an original object from wrapped object (unwrapping partials, wrapped + functions, and other decorators). + """ + while True: + if stop and stop(obj): + return obj + if ispartial(obj): + obj = obj.func + elif inspect.isroutine(obj) and hasattr(obj, '__wrapped__'): + obj = obj.__wrapped__ + elif isclassmethod(obj) or isstaticmethod(obj): + obj = obj.__func__ + else: + return obj + + +def getall(obj: Any) -> Sequence[str] | None: + """Get __all__ attribute of the module as dict. + + Return None if given *obj* does not have __all__. + Raises ValueError if given *obj* have invalid __all__. + """ + __all__ = safe_getattr(obj, '__all__', None) + if __all__ is None: + return None + if isinstance(__all__, (list, tuple)) and all(isinstance(e, str) for e in __all__): + return __all__ + raise ValueError(__all__) + + +def getannotations(obj: Any) -> Mapping[str, Any]: + """Get __annotations__ from given *obj* safely.""" + __annotations__ = safe_getattr(obj, '__annotations__', None) + if isinstance(__annotations__, Mapping): + return __annotations__ + else: + return {} + + +def getglobals(obj: Any) -> Mapping[str, Any]: + """Get __globals__ from given *obj* safely.""" + __globals__ = safe_getattr(obj, '__globals__', None) + if isinstance(__globals__, Mapping): + return __globals__ + else: + return {} + + +def getmro(obj: Any) -> tuple[type, ...]: + """Get __mro__ from given *obj* safely.""" + __mro__ = safe_getattr(obj, '__mro__', None) + if isinstance(__mro__, tuple): + return __mro__ + else: + return () + + +def getorigbases(obj: Any) -> tuple[Any, ...] | None: + """Get __orig_bases__ from *obj* safely.""" + if not inspect.isclass(obj): + return None + + # Get __orig_bases__ from obj.__dict__ to avoid accessing the parent's __orig_bases__. + # refs: https://github.com/sphinx-doc/sphinx/issues/9607 + __dict__ = safe_getattr(obj, '__dict__', {}) + __orig_bases__ = __dict__.get('__orig_bases__') + if isinstance(__orig_bases__, tuple) and len(__orig_bases__) > 0: + return __orig_bases__ + else: + return None + + +def getslots(obj: Any) -> dict[str, Any] | None: + """Get __slots__ attribute of the class as dict. + + Return None if gienv *obj* does not have __slots__. + Raises TypeError if given *obj* is not a class. + Raises ValueError if given *obj* have invalid __slots__. + """ + if not inspect.isclass(obj): + raise TypeError + + __slots__ = safe_getattr(obj, '__slots__', None) + if __slots__ is None: + return None + elif isinstance(__slots__, dict): + return __slots__ + elif isinstance(__slots__, str): + return {__slots__: None} + elif isinstance(__slots__, (list, tuple)): + return dict.fromkeys(__slots__) + else: + raise ValueError + + +def isNewType(obj: Any) -> bool: + """Check the if object is a kind of NewType.""" + if sys.version_info[:2] >= (3, 10): + return isinstance(obj, typing.NewType) + __module__ = safe_getattr(obj, '__module__', None) + __qualname__ = safe_getattr(obj, '__qualname__', None) + return __module__ == 'typing' and __qualname__ == 'NewType..new_type' + + +def isenumclass(x: Any) -> bool: + """Check if the object is subclass of enum.""" + return inspect.isclass(x) and issubclass(x, enum.Enum) + + +def isenumattribute(x: Any) -> bool: + """Check if the object is attribute of enum.""" + return isinstance(x, enum.Enum) + + +def unpartial(obj: Any) -> Any: + """Get an original object from partial object. + + This returns given object itself if not partial. + """ + while ispartial(obj): + obj = obj.func + + return obj + + +def ispartial(obj: Any) -> bool: + """Check if the object is partial.""" + return isinstance(obj, (partial, partialmethod)) + + +def isclassmethod(obj: Any, cls: Any = None, name: str | None = None) -> bool: + """Check if the object is classmethod.""" + if isinstance(obj, classmethod): + return True + if inspect.ismethod(obj) and obj.__self__ is not None and isclass(obj.__self__): + return True + if cls and name: + placeholder = object() + for basecls in getmro(cls): + meth = basecls.__dict__.get(name, placeholder) + if meth is not placeholder: + return isclassmethod(meth) + + return False + + +def isstaticmethod(obj: Any, cls: Any = None, name: str | None = None) -> bool: + """Check if the object is staticmethod.""" + if isinstance(obj, staticmethod): + return True + if cls and name: + # trace __mro__ if the method is defined in parent class + # + # .. note:: This only works well with new style classes. + for basecls in getattr(cls, '__mro__', [cls]): + meth = basecls.__dict__.get(name) + if meth: + return isinstance(meth, staticmethod) + return False + + +def isdescriptor(x: Any) -> bool: + """Check if the object is some kind of descriptor.""" + return any( + callable(safe_getattr(x, item, None)) + for item in ['__get__', '__set__', '__delete__'] + ) + + +def isabstractmethod(obj: Any) -> bool: + """Check if the object is an abstractmethod.""" + return safe_getattr(obj, '__isabstractmethod__', False) is True + + +def isboundmethod(method: MethodType) -> bool: + """Check if the method is a bound method.""" + return safe_getattr(method, '__self__', None) is not None + + +def is_cython_function_or_method(obj: Any) -> bool: + """Check if the object is a function or method in cython.""" + try: + return obj.__class__.__name__ == 'cython_function_or_method' + except AttributeError: + return False + + +def isattributedescriptor(obj: Any) -> bool: + """Check if the object is an attribute like descriptor.""" + if inspect.isdatadescriptor(obj): + # data descriptor is kind of attribute + return True + if isdescriptor(obj): + # non data descriptor + unwrapped = unwrap(obj) + if isfunction(unwrapped) or isbuiltin(unwrapped) or inspect.ismethod(unwrapped): + # attribute must not be either function, builtin and method + return False + if is_cython_function_or_method(unwrapped): + # attribute must not be either function and method (for cython) + return False + if inspect.isclass(unwrapped): + # attribute must not be a class + return False + if isinstance(unwrapped, (ClassMethodDescriptorType, + MethodDescriptorType, + WrapperDescriptorType)): + # attribute must not be a method descriptor + return False + if type(unwrapped).__name__ == "instancemethod": + # attribute must not be an instancemethod (C-API) + return False + return True + return False + + +def is_singledispatch_function(obj: Any) -> bool: + """Check if the object is singledispatch function.""" + return (inspect.isfunction(obj) and + hasattr(obj, 'dispatch') and + hasattr(obj, 'register') and + obj.dispatch.__module__ == 'functools') + + +def is_singledispatch_method(obj: Any) -> bool: + """Check if the object is singledispatch method.""" + return isinstance(obj, singledispatchmethod) + + +def isfunction(obj: Any) -> bool: + """Check if the object is function.""" + return inspect.isfunction(unpartial(obj)) + + +def isbuiltin(obj: Any) -> bool: + """Check if the object is function.""" + return inspect.isbuiltin(unpartial(obj)) + + +def isroutine(obj: Any) -> bool: + """Check is any kind of function or method.""" + return inspect.isroutine(unpartial(obj)) + + +def iscoroutinefunction(obj: Any) -> bool: + """Check if the object is coroutine-function.""" + def iswrappedcoroutine(obj: Any) -> bool: + """Check if the object is wrapped coroutine-function.""" + if isstaticmethod(obj) or isclassmethod(obj) or ispartial(obj): + # staticmethod, classmethod and partial method are not a wrapped coroutine-function + # Note: Since 3.10, staticmethod and classmethod becomes a kind of wrappers + return False + return hasattr(obj, '__wrapped__') + + obj = unwrap_all(obj, stop=iswrappedcoroutine) + return inspect.iscoroutinefunction(obj) + + +def isproperty(obj: Any) -> bool: + """Check if the object is property.""" + return isinstance(obj, (property, cached_property)) + + +def isgenericalias(obj: Any) -> bool: + """Check if the object is GenericAlias.""" + return isinstance( + obj, (types.GenericAlias, typing._BaseGenericAlias)) # type: ignore[attr-defined] + + +def safe_getattr(obj: Any, name: str, *defargs: Any) -> Any: + """A getattr() that turns all exceptions into AttributeErrors.""" + try: + return getattr(obj, name, *defargs) + except Exception as exc: + # sometimes accessing a property raises an exception (e.g. + # NotImplementedError), so let's try to read the attribute directly + try: + # In case the object does weird things with attribute access + # such that accessing `obj.__dict__` may raise an exception + return obj.__dict__[name] + except Exception: + pass + + # this is a catch-all for all the weird things that some modules do + # with attribute access + if defargs: + return defargs[0] + + raise AttributeError(name) from exc + + +def object_description(obj: Any, *, _seen: frozenset = frozenset()) -> str: + """A repr() implementation that returns text safe to use in reST context. + + Maintains a set of 'seen' object IDs to detect and avoid infinite recursion. + """ + seen = _seen + if isinstance(obj, dict): + if id(obj) in seen: + return 'dict(...)' + seen |= {id(obj)} + try: + sorted_keys = sorted(obj) + except TypeError: + # Cannot sort dict keys, fall back to using descriptions as a sort key + sorted_keys = sorted(obj, key=lambda k: object_description(k, _seen=seen)) + + items = ((object_description(key, _seen=seen), + object_description(obj[key], _seen=seen)) for key in sorted_keys) + return '{%s}' % ', '.join(f'{key}: {value}' for (key, value) in items) + elif isinstance(obj, set): + if id(obj) in seen: + return 'set(...)' + seen |= {id(obj)} + try: + sorted_values = sorted(obj) + except TypeError: + # Cannot sort set values, fall back to using descriptions as a sort key + sorted_values = sorted(obj, key=lambda x: object_description(x, _seen=seen)) + return '{%s}' % ', '.join(object_description(x, _seen=seen) for x in sorted_values) + elif isinstance(obj, frozenset): + if id(obj) in seen: + return 'frozenset(...)' + seen |= {id(obj)} + try: + sorted_values = sorted(obj) + except TypeError: + # Cannot sort frozenset values, fall back to using descriptions as a sort key + sorted_values = sorted(obj, key=lambda x: object_description(x, _seen=seen)) + return 'frozenset({%s})' % ', '.join(object_description(x, _seen=seen) + for x in sorted_values) + elif isinstance(obj, enum.Enum): + return f'{obj.__class__.__name__}.{obj.name}' + elif isinstance(obj, tuple): + if id(obj) in seen: + return 'tuple(...)' + seen |= frozenset([id(obj)]) + return '(%s%s)' % ( + ', '.join(object_description(x, _seen=seen) for x in obj), + ',' * (len(obj) == 1), + ) + elif isinstance(obj, list): + if id(obj) in seen: + return 'list(...)' + seen |= {id(obj)} + return '[%s]' % ', '.join(object_description(x, _seen=seen) for x in obj) + + try: + s = repr(obj) + except Exception as exc: + raise ValueError from exc + # Strip non-deterministic memory addresses such as + # ``<__main__.A at 0x7f68cb685710>`` + s = memory_address_re.sub('', s) + return s.replace('\n', ' ') + + +def is_builtin_class_method(obj: Any, attr_name: str) -> bool: + """If attr_name is implemented at builtin class, return True. + + >>> is_builtin_class_method(int, '__init__') + True + + Why this function needed? CPython implements int.__init__ by Descriptor + but PyPy implements it by pure Python code. + """ + try: + mro = getmro(obj) + cls = next(c for c in mro if attr_name in safe_getattr(c, '__dict__', {})) + except StopIteration: + return False + + try: + name = safe_getattr(cls, '__name__') + except AttributeError: + return False + + return getattr(builtins, name, None) is cls + + +class DefaultValue: + """A simple wrapper for default value of the parameters of overload functions.""" + + def __init__(self, value: str) -> None: + self.value = value + + def __eq__(self, other: object) -> bool: + return self.value == other + + def __repr__(self) -> str: + return self.value + + +class TypeAliasForwardRef: + """Pseudo typing class for autodoc_type_aliases. + + This avoids the error on evaluating the type inside `get_type_hints()`. + """ + def __init__(self, name: str) -> None: + self.name = name + + def __call__(self) -> None: + # Dummy method to imitate special typing classes + pass + + def __eq__(self, other: Any) -> bool: + return self.name == other + + def __hash__(self) -> int: + return hash(self.name) + + def __repr__(self) -> str: + return self.name + + +class TypeAliasModule: + """Pseudo module class for autodoc_type_aliases.""" + + def __init__(self, modname: str, mapping: dict[str, str]) -> None: + self.__modname = modname + self.__mapping = mapping + + self.__module: ModuleType | None = None + + def __getattr__(self, name: str) -> Any: + fullname = '.'.join(filter(None, [self.__modname, name])) + if fullname in self.__mapping: + # exactly matched + return TypeAliasForwardRef(self.__mapping[fullname]) + else: + prefix = fullname + '.' + nested = {k: v for k, v in self.__mapping.items() if k.startswith(prefix)} + if nested: + # sub modules or classes found + return TypeAliasModule(fullname, nested) + else: + # no sub modules or classes found. + try: + # return the real submodule if exists + return import_module(fullname) + except ImportError: + # return the real class + if self.__module is None: + self.__module = import_module(self.__modname) + + return getattr(self.__module, name) + + +class TypeAliasNamespace(dict[str, Any]): + """Pseudo namespace class for autodoc_type_aliases. + + This enables to look up nested modules and classes like `mod1.mod2.Class`. + """ + + def __init__(self, mapping: dict[str, str]) -> None: + self.__mapping = mapping + + def __getitem__(self, key: str) -> Any: + if key in self.__mapping: + # exactly matched + return TypeAliasForwardRef(self.__mapping[key]) + else: + prefix = key + '.' + nested = {k: v for k, v in self.__mapping.items() if k.startswith(prefix)} + if nested: + # sub modules or classes found + return TypeAliasModule(key, nested) + else: + raise KeyError + + +def _should_unwrap(subject: Callable) -> bool: + """Check the function should be unwrapped on getting signature.""" + __globals__ = getglobals(subject) + if (__globals__.get('__name__') == 'contextlib' and + __globals__.get('__file__') == contextlib.__file__): + # contextmanger should be unwrapped + return True + + return False + + +def signature(subject: Callable, bound_method: bool = False, type_aliases: dict | None = None, + ) -> inspect.Signature: + """Return a Signature object for the given *subject*. + + :param bound_method: Specify *subject* is a bound method or not + """ + if type_aliases is None: + type_aliases = {} + + try: + if _should_unwrap(subject): + signature = inspect.signature(subject) + else: + signature = inspect.signature(subject, follow_wrapped=True) + except ValueError: + # follow built-in wrappers up (ex. functools.lru_cache) + signature = inspect.signature(subject) + parameters = list(signature.parameters.values()) + return_annotation = signature.return_annotation + + try: + # Resolve annotations using ``get_type_hints()`` and type_aliases. + localns = TypeAliasNamespace(type_aliases) + annotations = typing.get_type_hints(subject, None, localns) + for i, param in enumerate(parameters): + if param.name in annotations: + annotation = annotations[param.name] + if isinstance(annotation, TypeAliasForwardRef): + annotation = annotation.name + parameters[i] = param.replace(annotation=annotation) + if 'return' in annotations: + if isinstance(annotations['return'], TypeAliasForwardRef): + return_annotation = annotations['return'].name + else: + return_annotation = annotations['return'] + except Exception: + # ``get_type_hints()`` does not support some kind of objects like partial, + # ForwardRef and so on. + pass + + if bound_method: + if inspect.ismethod(subject): + # ``inspect.signature()`` considers the subject is a bound method and removes + # first argument from signature. Therefore no skips are needed here. + pass + else: + if len(parameters) > 0: + parameters.pop(0) + + # To allow to create signature object correctly for pure python functions, + # pass an internal parameter __validate_parameters__=False to Signature + # + # For example, this helps a function having a default value `inspect._empty`. + # refs: https://github.com/sphinx-doc/sphinx/issues/7935 + return inspect.Signature(parameters, return_annotation=return_annotation, + __validate_parameters__=False) + + +def evaluate_signature(sig: inspect.Signature, globalns: dict | None = None, + localns: dict | None = None, + ) -> inspect.Signature: + """Evaluate unresolved type annotations in a signature object.""" + def evaluate_forwardref(ref: ForwardRef, globalns: dict, localns: dict) -> Any: + """Evaluate a forward reference.""" + return ref._evaluate(globalns, localns, frozenset()) + + def evaluate(annotation: Any, globalns: dict, localns: dict) -> Any: + """Evaluate unresolved type annotation.""" + try: + if isinstance(annotation, str): + ref = ForwardRef(annotation, True) + annotation = evaluate_forwardref(ref, globalns, localns) + + if isinstance(annotation, ForwardRef): + annotation = evaluate_forwardref(ref, globalns, localns) + elif isinstance(annotation, str): + # might be a ForwardRef'ed annotation in overloaded functions + ref = ForwardRef(annotation, True) + annotation = evaluate_forwardref(ref, globalns, localns) + except (NameError, TypeError): + # failed to evaluate type. skipped. + pass + + return annotation + + if globalns is None: + globalns = {} + if localns is None: + localns = globalns + + parameters = list(sig.parameters.values()) + for i, param in enumerate(parameters): + if param.annotation: + annotation = evaluate(param.annotation, globalns, localns) + parameters[i] = param.replace(annotation=annotation) + + return_annotation = sig.return_annotation + if return_annotation: + return_annotation = evaluate(return_annotation, globalns, localns) + + return sig.replace(parameters=parameters, return_annotation=return_annotation) + + +def stringify_signature(sig: inspect.Signature, show_annotation: bool = True, + show_return_annotation: bool = True, + unqualified_typehints: bool = False) -> str: + """Stringify a Signature object. + + :param show_annotation: If enabled, show annotations on the signature + :param show_return_annotation: If enabled, show annotation of the return value + :param unqualified_typehints: If enabled, show annotations as unqualified + (ex. io.StringIO -> StringIO) + """ + if unqualified_typehints: + mode = 'smart' + else: + mode = 'fully-qualified' + + args = [] + last_kind = None + for param in sig.parameters.values(): + if param.kind != param.POSITIONAL_ONLY and last_kind == param.POSITIONAL_ONLY: + # PEP-570: Separator for Positional Only Parameter: / + args.append('/') + if param.kind == param.KEYWORD_ONLY and last_kind in (param.POSITIONAL_OR_KEYWORD, + param.POSITIONAL_ONLY, + None): + # PEP-3102: Separator for Keyword Only Parameter: * + args.append('*') + + arg = StringIO() + if param.kind == param.VAR_POSITIONAL: + arg.write('*' + param.name) + elif param.kind == param.VAR_KEYWORD: + arg.write('**' + param.name) + else: + arg.write(param.name) + + if show_annotation and param.annotation is not param.empty: + arg.write(': ') + arg.write(stringify_annotation(param.annotation, mode)) + if param.default is not param.empty: + if show_annotation and param.annotation is not param.empty: + arg.write(' = ') + else: + arg.write('=') + arg.write(object_description(param.default)) + + args.append(arg.getvalue()) + last_kind = param.kind + + if last_kind == Parameter.POSITIONAL_ONLY: + # PEP-570: Separator for Positional Only Parameter: / + args.append('/') + + concatenated_args = ', '.join(args) + if (sig.return_annotation is Parameter.empty or + show_annotation is False or + show_return_annotation is False): + return f'({concatenated_args})' + else: + annotation = stringify_annotation(sig.return_annotation, mode) + return f'({concatenated_args}) -> {annotation}' + + +def signature_from_str(signature: str) -> inspect.Signature: + """Create a Signature object from string.""" + code = 'def func' + signature + ': pass' + module = ast.parse(code) + function = cast(ast.FunctionDef, module.body[0]) + + return signature_from_ast(function, code) + + +def signature_from_ast(node: ast.FunctionDef, code: str = '') -> inspect.Signature: + """Create a Signature object from AST *node*.""" + args = node.args + defaults = list(args.defaults) + params = [] + if hasattr(args, "posonlyargs"): + posonlyargs = len(args.posonlyargs) + positionals = posonlyargs + len(args.args) + else: + posonlyargs = 0 + positionals = len(args.args) + + for _ in range(len(defaults), positionals): + defaults.insert(0, Parameter.empty) # type: ignore[arg-type] + + if hasattr(args, "posonlyargs"): + for i, arg in enumerate(args.posonlyargs): + if defaults[i] is Parameter.empty: + default = Parameter.empty + else: + default = DefaultValue( + ast_unparse(defaults[i], code)) # type: ignore[assignment] + + annotation = ast_unparse(arg.annotation, code) or Parameter.empty + params.append(Parameter(arg.arg, Parameter.POSITIONAL_ONLY, + default=default, annotation=annotation)) + + for i, arg in enumerate(args.args): + if defaults[i + posonlyargs] is Parameter.empty: + default = Parameter.empty + else: + default = DefaultValue( + ast_unparse(defaults[i + posonlyargs], code), # type: ignore[assignment] + ) + + annotation = ast_unparse(arg.annotation, code) or Parameter.empty + params.append(Parameter(arg.arg, Parameter.POSITIONAL_OR_KEYWORD, + default=default, annotation=annotation)) + + if args.vararg: + annotation = ast_unparse(args.vararg.annotation, code) or Parameter.empty + params.append(Parameter(args.vararg.arg, Parameter.VAR_POSITIONAL, + annotation=annotation)) + + for i, arg in enumerate(args.kwonlyargs): + if args.kw_defaults[i] is None: + default = Parameter.empty + else: + default = DefaultValue( + ast_unparse(args.kw_defaults[i], code)) # type: ignore[arg-type,assignment] + annotation = ast_unparse(arg.annotation, code) or Parameter.empty + params.append(Parameter(arg.arg, Parameter.KEYWORD_ONLY, default=default, + annotation=annotation)) + + if args.kwarg: + annotation = ast_unparse(args.kwarg.annotation, code) or Parameter.empty + params.append(Parameter(args.kwarg.arg, Parameter.VAR_KEYWORD, + annotation=annotation)) + + return_annotation = ast_unparse(node.returns, code) or Parameter.empty + + return inspect.Signature(params, return_annotation=return_annotation) + + +def getdoc( + obj: Any, + attrgetter: Callable = safe_getattr, + allow_inherited: bool = False, + cls: Any = None, + name: str | None = None, +) -> str | None: + """Get the docstring for the object. + + This tries to obtain the docstring for some kind of objects additionally: + + * partial functions + * inherited docstring + * inherited decorated methods + """ + def getdoc_internal(obj: Any, attrgetter: Callable = safe_getattr) -> str | None: + doc = attrgetter(obj, '__doc__', None) + if isinstance(doc, str): + return doc + else: + return None + + if cls and name and isclassmethod(obj, cls, name): + for basecls in getmro(cls): + meth = basecls.__dict__.get(name) + if meth and hasattr(meth, '__func__'): + doc: str | None = getdoc(meth.__func__) + if doc is not None or not allow_inherited: + return doc + + doc = getdoc_internal(obj) + if ispartial(obj) and doc == obj.__class__.__doc__: + return getdoc(obj.func) + elif doc is None and allow_inherited: + if cls and name: + # Check a docstring of the attribute or method from super classes. + for basecls in getmro(cls): + meth = safe_getattr(basecls, name, None) + if meth is not None: + doc = getdoc_internal(meth) + if doc is not None: + break + + if doc is None: + # retry using `inspect.getdoc()` + for basecls in getmro(cls): + meth = safe_getattr(basecls, name, None) + if meth is not None: + doc = inspect.getdoc(meth) + if doc is not None: + break + + if doc is None: + doc = inspect.getdoc(obj) + + return doc diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py new file mode 100644 index 0000000..89f0070 --- /dev/null +++ b/sphinx/util/inventory.py @@ -0,0 +1,172 @@ +"""Inventory utility functions for Sphinx.""" +from __future__ import annotations + +import os +import re +import zlib +from typing import IO, TYPE_CHECKING, Callable + +from sphinx.util import logging + +BUFSIZE = 16 * 1024 +logger = logging.getLogger(__name__) + +if TYPE_CHECKING: + from collections.abc import Iterator + + from sphinx.builders import Builder + from sphinx.environment import BuildEnvironment + from sphinx.util.typing import Inventory, InventoryItem + + +class InventoryFileReader: + """A file reader for an inventory file. + + This reader supports mixture of texts and compressed texts. + """ + + def __init__(self, stream: IO) -> None: + self.stream = stream + self.buffer = b'' + self.eof = False + + def read_buffer(self) -> None: + chunk = self.stream.read(BUFSIZE) + if chunk == b'': + self.eof = True + self.buffer += chunk + + def readline(self) -> str: + pos = self.buffer.find(b'\n') + if pos != -1: + line = self.buffer[:pos].decode() + self.buffer = self.buffer[pos + 1:] + elif self.eof: + line = self.buffer.decode() + self.buffer = b'' + else: + self.read_buffer() + line = self.readline() + + return line + + def readlines(self) -> Iterator[str]: + while not self.eof: + line = self.readline() + if line: + yield line + + def read_compressed_chunks(self) -> Iterator[bytes]: + decompressor = zlib.decompressobj() + while not self.eof: + self.read_buffer() + yield decompressor.decompress(self.buffer) + self.buffer = b'' + yield decompressor.flush() + + def read_compressed_lines(self) -> Iterator[str]: + buf = b'' + for chunk in self.read_compressed_chunks(): + buf += chunk + pos = buf.find(b'\n') + while pos != -1: + yield buf[:pos].decode() + buf = buf[pos + 1:] + pos = buf.find(b'\n') + + +class InventoryFile: + @classmethod + def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory: + reader = InventoryFileReader(stream) + line = reader.readline().rstrip() + if line == '# Sphinx inventory version 1': + return cls.load_v1(reader, uri, joinfunc) + elif line == '# Sphinx inventory version 2': + return cls.load_v2(reader, uri, joinfunc) + else: + raise ValueError('invalid inventory header: %s' % line) + + @classmethod + def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory: + invdata: Inventory = {} + projname = stream.readline().rstrip()[11:] + version = stream.readline().rstrip()[11:] + for line in stream.readlines(): + name, type, location = line.rstrip().split(None, 2) + location = join(uri, location) + # version 1 did not add anchors to the location + if type == 'mod': + type = 'py:module' + location += '#module-' + name + else: + type = 'py:' + type + location += '#' + name + invdata.setdefault(type, {})[name] = (projname, version, location, '-') + return invdata + + @classmethod + def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory: + invdata: Inventory = {} + projname = stream.readline().rstrip()[11:] + version = stream.readline().rstrip()[11:] + line = stream.readline() + if 'zlib' not in line: + raise ValueError('invalid inventory header (not compressed): %s' % line) + + for line in stream.read_compressed_lines(): + # be careful to handle names with embedded spaces correctly + m = re.match(r'(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)', + line.rstrip(), flags=re.VERBOSE) + if not m: + continue + name, type, prio, location, dispname = m.groups() + if ':' not in type: + # wrong type value. type should be in the form of "{domain}:{objtype}" + # + # Note: To avoid the regex DoS, this is implemented in python (refs: #8175) + continue + if type == 'py:module' and type in invdata and name in invdata[type]: + # due to a bug in 1.1 and below, + # two inventory entries are created + # for Python modules, and the first + # one is correct + continue + if location.endswith('$'): + location = location[:-1] + name + location = join(uri, location) + inv_item: InventoryItem = projname, version, location, dispname + invdata.setdefault(type, {})[name] = inv_item + return invdata + + @classmethod + def dump(cls, filename: str, env: BuildEnvironment, builder: Builder) -> None: + def escape(string: str) -> str: + return re.sub("\\s+", " ", string) + + with open(os.path.join(filename), 'wb') as f: + # header + f.write(('# Sphinx inventory version 2\n' + '# Project: %s\n' + '# Version: %s\n' + '# The remainder of this file is compressed using zlib.\n' % + (escape(env.config.project), + escape(env.config.version))).encode()) + + # body + compressor = zlib.compressobj(9) + for domainname, domain in sorted(env.domains.items()): + for name, dispname, typ, docname, anchor, prio in \ + sorted(domain.get_objects()): + if anchor.endswith(name): + # this can shorten the inventory by as much as 25% + anchor = anchor[:-len(name)] + '$' + uri = builder.get_target_uri(docname) + if anchor: + uri += '#' + anchor + if dispname == name: + dispname = '-' + entry = ('%s %s:%s %s %s %s\n' % + (name, domainname, typ, prio, uri, dispname)) + f.write(compressor.compress(entry.encode())) + f.write(compressor.flush()) diff --git a/sphinx/util/logging.py b/sphinx/util/logging.py new file mode 100644 index 0000000..429018a --- /dev/null +++ b/sphinx/util/logging.py @@ -0,0 +1,602 @@ +"""Logging utility functions for Sphinx.""" + +from __future__ import annotations + +import logging +import logging.handlers +from collections import defaultdict +from contextlib import contextmanager +from typing import IO, TYPE_CHECKING, Any + +from docutils import nodes +from docutils.utils import get_source_line + +from sphinx.errors import SphinxWarning +from sphinx.util.console import colorize +from sphinx.util.osutil import abspath + +if TYPE_CHECKING: + from collections.abc import Generator + + from docutils.nodes import Node + + from sphinx.application import Sphinx + + +NAMESPACE = 'sphinx' +VERBOSE = 15 + +LEVEL_NAMES: defaultdict[str, int] = defaultdict(lambda: logging.WARNING, { + 'CRITICAL': logging.CRITICAL, + 'SEVERE': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'VERBOSE': VERBOSE, + 'DEBUG': logging.DEBUG, +}) + +VERBOSITY_MAP: defaultdict[int, int] = defaultdict(lambda: logging.NOTSET, { + 0: logging.INFO, + 1: VERBOSE, + 2: logging.DEBUG, +}) + +COLOR_MAP: defaultdict[int, str] = defaultdict(lambda: 'blue', { + logging.ERROR: 'darkred', + logging.WARNING: 'red', + logging.DEBUG: 'darkgray', +}) + + +def getLogger(name: str) -> SphinxLoggerAdapter: + """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`. + + Sphinx logger always uses ``sphinx.*`` namespace to be independent from + settings of root logger. It ensures logging is consistent even if a + third-party extension or imported application resets logger settings. + + Example usage:: + + >>> from sphinx.util import logging + >>> logger = logging.getLogger(__name__) + >>> logger.info('Hello, this is an extension!') + Hello, this is an extension! + """ + # add sphinx prefix to name forcely + logger = logging.getLogger(NAMESPACE + '.' + name) + # Forcely enable logger + logger.disabled = False + # wrap logger by SphinxLoggerAdapter + return SphinxLoggerAdapter(logger, {}) + + +def convert_serializable(records: list[logging.LogRecord]) -> None: + """Convert LogRecord serializable.""" + for r in records: + # extract arguments to a message and clear them + r.msg = r.getMessage() + r.args = () + + location = getattr(r, 'location', None) + if isinstance(location, nodes.Node): + r.location = get_node_location(location) + + +class SphinxLogRecord(logging.LogRecord): + """Log record class supporting location""" + prefix = '' + location: Any = None + + def getMessage(self) -> str: + message = super().getMessage() + location = getattr(self, 'location', None) + if location: + message = f'{location}: {self.prefix}{message}' + elif self.prefix not in message: + message = self.prefix + message + + return message + + +class SphinxInfoLogRecord(SphinxLogRecord): + """Info log record class supporting location""" + prefix = '' # do not show any prefix for INFO messages + + +class SphinxWarningLogRecord(SphinxLogRecord): + """Warning log record class supporting location""" + @property + def prefix(self) -> str: # type: ignore[override] + if self.levelno >= logging.CRITICAL: + return 'CRITICAL: ' + elif self.levelno >= logging.ERROR: + return 'ERROR: ' + else: + return 'WARNING: ' + + +class SphinxLoggerAdapter(logging.LoggerAdapter): + """LoggerAdapter allowing ``type`` and ``subtype`` keywords.""" + KEYWORDS = ['type', 'subtype', 'location', 'nonl', 'color', 'once'] + + def log( # type: ignore[override] + self, level: int | str, msg: str, *args: Any, **kwargs: Any, + ) -> None: + if isinstance(level, int): + super().log(level, msg, *args, **kwargs) + else: + levelno = LEVEL_NAMES[level] + super().log(levelno, msg, *args, **kwargs) + + def verbose(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.log(VERBOSE, msg, *args, **kwargs) + + def process(self, msg: str, kwargs: dict) -> tuple[str, dict]: # type: ignore[override] + extra = kwargs.setdefault('extra', {}) + for keyword in self.KEYWORDS: + if keyword in kwargs: + extra[keyword] = kwargs.pop(keyword) + + return msg, kwargs + + def handle(self, record: logging.LogRecord) -> None: + self.logger.handle(record) + + +class WarningStreamHandler(logging.StreamHandler): + """StreamHandler for warnings.""" + pass + + +class NewLineStreamHandler(logging.StreamHandler): + """StreamHandler which switches line terminator by record.nonl flag.""" + + def emit(self, record: logging.LogRecord) -> None: + try: + self.acquire() + if getattr(record, 'nonl', False): + # skip appending terminator when nonl=True + self.terminator = '' + super().emit(record) + finally: + self.terminator = '\n' + self.release() + + +class MemoryHandler(logging.handlers.BufferingHandler): + """Handler buffering all logs.""" + + buffer: list[logging.LogRecord] + + def __init__(self) -> None: + super().__init__(-1) + + def shouldFlush(self, record: logging.LogRecord) -> bool: + return False # never flush + + def flush(self) -> None: + # suppress any flushes triggered by importing packages that flush + # all handlers at initialization time + pass + + def flushTo(self, logger: logging.Logger) -> None: + self.acquire() + try: + for record in self.buffer: + logger.handle(record) + self.buffer = [] + finally: + self.release() + + def clear(self) -> list[logging.LogRecord]: + buffer, self.buffer = self.buffer, [] + return buffer + + +@contextmanager +def pending_warnings() -> Generator[logging.Handler, None, None]: + """Context manager to postpone logging warnings temporarily. + + Similar to :func:`pending_logging`. + """ + logger = logging.getLogger(NAMESPACE) + memhandler = MemoryHandler() + memhandler.setLevel(logging.WARNING) + + try: + handlers = [] + for handler in logger.handlers[:]: + if isinstance(handler, WarningStreamHandler): + logger.removeHandler(handler) + handlers.append(handler) + + logger.addHandler(memhandler) + yield memhandler + finally: + logger.removeHandler(memhandler) + + for handler in handlers: + logger.addHandler(handler) + + memhandler.flushTo(logger) + + +@contextmanager +def suppress_logging() -> Generator[MemoryHandler, None, None]: + """Context manager to suppress logging all logs temporarily. + + For example:: + + >>> with suppress_logging(): + >>> logger.warning('Warning message!') # suppressed + >>> some_long_process() + >>> + """ + logger = logging.getLogger(NAMESPACE) + memhandler = MemoryHandler() + + try: + handlers = [] + for handler in logger.handlers[:]: + logger.removeHandler(handler) + handlers.append(handler) + + logger.addHandler(memhandler) + yield memhandler + finally: + logger.removeHandler(memhandler) + + for handler in handlers: + logger.addHandler(handler) + + +@contextmanager +def pending_logging() -> Generator[MemoryHandler, None, None]: + """Context manager to postpone logging all logs temporarily. + + For example:: + + >>> with pending_logging(): + >>> logger.warning('Warning message!') # not flushed yet + >>> some_long_process() + >>> + Warning message! # the warning is flushed here + """ + logger = logging.getLogger(NAMESPACE) + try: + with suppress_logging() as memhandler: + yield memhandler + finally: + memhandler.flushTo(logger) + + +@contextmanager +def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]: + """Context manager to skip WarningIsErrorFilter temporarily.""" + logger = logging.getLogger(NAMESPACE) + + if skip is False: + yield + else: + try: + disabler = DisableWarningIsErrorFilter() + for handler in logger.handlers: + # use internal method; filters.insert() directly to install disabler + # before WarningIsErrorFilter + handler.filters.insert(0, disabler) + yield + finally: + for handler in logger.handlers: + handler.removeFilter(disabler) + + +@contextmanager +def prefixed_warnings(prefix: str) -> Generator[None, None, None]: + """Context manager to prepend prefix to all warning log records temporarily. + + For example:: + + >>> with prefixed_warnings("prefix:"): + >>> logger.warning('Warning message!') # => prefix: Warning message! + + .. versionadded:: 2.0 + """ + logger = logging.getLogger(NAMESPACE) + warning_handler = None + for handler in logger.handlers: + if isinstance(handler, WarningStreamHandler): + warning_handler = handler + break + else: + # warning stream not found + yield + return + + prefix_filter = None + for _filter in warning_handler.filters: + if isinstance(_filter, MessagePrefixFilter): + prefix_filter = _filter + break + + if prefix_filter: + # already prefixed + try: + previous = prefix_filter.prefix + prefix_filter.prefix = prefix + yield + finally: + prefix_filter.prefix = previous + else: + # not prefixed yet + prefix_filter = MessagePrefixFilter(prefix) + try: + warning_handler.addFilter(prefix_filter) + yield + finally: + warning_handler.removeFilter(prefix_filter) + + +class LogCollector: + def __init__(self) -> None: + self.logs: list[logging.LogRecord] = [] + + @contextmanager + def collect(self) -> Generator[None, None, None]: + with pending_logging() as memhandler: + yield + + self.logs = memhandler.clear() + + +class InfoFilter(logging.Filter): + """Filter error and warning messages.""" + + def filter(self, record: logging.LogRecord) -> bool: + return record.levelno < logging.WARNING + + +def is_suppressed_warning(type: str, subtype: str, suppress_warnings: list[str]) -> bool: + """Check whether the warning is suppressed or not.""" + if type is None: + return False + + subtarget: str | None + + for warning_type in suppress_warnings: + if '.' in warning_type: + target, subtarget = warning_type.split('.', 1) + else: + target, subtarget = warning_type, None + + if target == type and subtarget in (None, subtype, "*"): + return True + + return False + + +class WarningSuppressor(logging.Filter): + """Filter logs by `suppress_warnings`.""" + + def __init__(self, app: Sphinx) -> None: + self.app = app + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + type = getattr(record, 'type', '') + subtype = getattr(record, 'subtype', '') + + try: + suppress_warnings = self.app.config.suppress_warnings + except AttributeError: + # config is not initialized yet (ex. in conf.py) + suppress_warnings = [] + + if is_suppressed_warning(type, subtype, suppress_warnings): + return False + else: + self.app._warncount += 1 + return True + + +class WarningIsErrorFilter(logging.Filter): + """Raise exception if warning emitted.""" + + def __init__(self, app: Sphinx) -> None: + self.app = app + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + if getattr(record, 'skip_warningsiserror', False): + # disabled by DisableWarningIsErrorFilter + return True + elif self.app.warningiserror: + location = getattr(record, 'location', '') + try: + message = record.msg % record.args + except (TypeError, ValueError): + message = record.msg # use record.msg itself + + if location: + exc = SphinxWarning(location + ":" + str(message)) + else: + exc = SphinxWarning(message) + if record.exc_info is not None: + raise exc from record.exc_info[1] + else: + raise exc + else: + return True + + +class DisableWarningIsErrorFilter(logging.Filter): + """Disable WarningIsErrorFilter if this filter installed.""" + + def filter(self, record: logging.LogRecord) -> bool: + record.skip_warningsiserror = True + return True + + +class MessagePrefixFilter(logging.Filter): + """Prepend prefix to all log records.""" + + def __init__(self, prefix: str) -> None: + self.prefix = prefix + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + if self.prefix: + record.msg = self.prefix + ' ' + record.msg + return True + + +class OnceFilter(logging.Filter): + """Show the message only once.""" + + def __init__(self, name: str = '') -> None: + super().__init__(name) + self.messages: dict[str, list] = {} + + def filter(self, record: logging.LogRecord) -> bool: + once = getattr(record, 'once', '') + if not once: + return True + else: + params = self.messages.setdefault(record.msg, []) + if record.args in params: + return False + + params.append(record.args) + return True + + +class SphinxLogRecordTranslator(logging.Filter): + """Converts a log record to one Sphinx expects + + * Make a instance of SphinxLogRecord + * docname to path if location given + """ + LogRecordClass: type[logging.LogRecord] + + def __init__(self, app: Sphinx) -> None: + self.app = app + super().__init__() + + def filter(self, record: SphinxWarningLogRecord) -> bool: # type: ignore[override] + if isinstance(record, logging.LogRecord): + # force subclassing to handle location + record.__class__ = self.LogRecordClass # type: ignore[assignment] + + location = getattr(record, 'location', None) + if isinstance(location, tuple): + docname, lineno = location + if docname: + if lineno: + record.location = f'{self.app.env.doc2path(docname)}:{lineno}' + else: + record.location = f'{self.app.env.doc2path(docname)}' + else: + record.location = None + elif isinstance(location, nodes.Node): + record.location = get_node_location(location) + elif location and ':' not in location: + record.location = f'{self.app.env.doc2path(location)}' + + return True + + +class InfoLogRecordTranslator(SphinxLogRecordTranslator): + """LogRecordTranslator for INFO level log records.""" + LogRecordClass = SphinxInfoLogRecord + + +class WarningLogRecordTranslator(SphinxLogRecordTranslator): + """LogRecordTranslator for WARNING level log records.""" + LogRecordClass = SphinxWarningLogRecord + + +def get_node_location(node: Node) -> str | None: + source, line = get_source_line(node) + if source: + source = abspath(source) + if source and line: + return f"{source}:{line}" + if source: + return f"{source}:" + if line: + return f":{line}" + return None + + +class ColorizeFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + message = super().format(record) + color = getattr(record, 'color', None) + if color is None: + color = COLOR_MAP.get(record.levelno) + + if color: + return colorize(color, message) + else: + return message + + +class SafeEncodingWriter: + """Stream writer which ignores UnicodeEncodeError silently""" + def __init__(self, stream: IO) -> None: + self.stream = stream + self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii' + + def write(self, data: str) -> None: + try: + self.stream.write(data) + except UnicodeEncodeError: + # stream accept only str, not bytes. So, we encode and replace + # non-encodable characters, then decode them. + self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding)) + + def flush(self) -> None: + if hasattr(self.stream, 'flush'): + self.stream.flush() + + +class LastMessagesWriter: + """Stream writer storing last 10 messages in memory to save trackback""" + def __init__(self, app: Sphinx, stream: IO) -> None: + self.app = app + + def write(self, data: str) -> None: + self.app.messagelog.append(data) + + +def setup(app: Sphinx, status: IO, warning: IO) -> None: + """Setup root logger for Sphinx""" + logger = logging.getLogger(NAMESPACE) + logger.setLevel(logging.DEBUG) + logger.propagate = False + + # clear all handlers + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + info_handler = NewLineStreamHandler(SafeEncodingWriter(status)) + info_handler.addFilter(InfoFilter()) + info_handler.addFilter(InfoLogRecordTranslator(app)) + info_handler.setLevel(VERBOSITY_MAP[app.verbosity]) + info_handler.setFormatter(ColorizeFormatter()) + + warning_handler = WarningStreamHandler(SafeEncodingWriter(warning)) + warning_handler.addFilter(WarningSuppressor(app)) + warning_handler.addFilter(WarningLogRecordTranslator(app)) + warning_handler.addFilter(WarningIsErrorFilter(app)) + warning_handler.addFilter(OnceFilter()) + warning_handler.setLevel(logging.WARNING) + warning_handler.setFormatter(ColorizeFormatter()) + + messagelog_handler = logging.StreamHandler(LastMessagesWriter(app, status)) + messagelog_handler.addFilter(InfoFilter()) + messagelog_handler.setLevel(VERBOSITY_MAP[app.verbosity]) + + logger.addHandler(info_handler) + logger.addHandler(warning_handler) + logger.addHandler(messagelog_handler) diff --git a/sphinx/util/matching.py b/sphinx/util/matching.py new file mode 100644 index 0000000..dd91905 --- /dev/null +++ b/sphinx/util/matching.py @@ -0,0 +1,169 @@ +"""Pattern-matching utility functions for Sphinx.""" + +from __future__ import annotations + +import os.path +import re +from typing import TYPE_CHECKING, Callable + +from sphinx.util.osutil import canon_path, path_stabilize + +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator + + +def _translate_pattern(pat: str) -> str: + """Translate a shell-style glob pattern to a regular expression. + + Adapted from the fnmatch module, but enhanced so that single stars don't + match slashes. + """ + i, n = 0, len(pat) + res = '' + while i < n: + c = pat[i] + i += 1 + if c == '*': + if i < n and pat[i] == '*': + # double star matches slashes too + i += 1 + res = res + '.*' + else: + # single star doesn't match slashes + res = res + '[^/]*' + elif c == '?': + # question mark doesn't match slashes too + res = res + '[^/]' + elif c == '[': + j = i + if j < n and pat[j] == '!': + j += 1 + if j < n and pat[j] == ']': + j += 1 + while j < n and pat[j] != ']': + j += 1 + if j >= n: + res = res + '\\[' + else: + stuff = pat[i:j].replace('\\', '\\\\') + i = j + 1 + if stuff[0] == '!': + # negative pattern mustn't match slashes too + stuff = '^/' + stuff[1:] + elif stuff[0] == '^': + stuff = '\\' + stuff + res = f'{res}[{stuff}]' + else: + res += re.escape(c) + return res + '$' + + +def compile_matchers( + patterns: Iterable[str], +) -> list[Callable[[str], re.Match[str] | None]]: + return [re.compile(_translate_pattern(pat)).match for pat in patterns] + + +class Matcher: + """A pattern matcher for Multiple shell-style glob patterns. + + Note: this modifies the patterns to work with copy_asset(). + For example, "**/index.rst" matches with "index.rst" + """ + + def __init__(self, exclude_patterns: Iterable[str]) -> None: + expanded = [pat[3:] for pat in exclude_patterns if pat.startswith('**/')] + self.patterns = compile_matchers(list(exclude_patterns) + expanded) + + def __call__(self, string: str) -> bool: + return self.match(string) + + def match(self, string: str) -> bool: + string = canon_path(string) + return any(pat(string) for pat in self.patterns) + + +DOTFILES = Matcher(['**/.*']) + + +_pat_cache: dict[str, re.Pattern[str]] = {} + + +def patmatch(name: str, pat: str) -> re.Match[str] | None: + """Return if name matches the regular expression (pattern) + ``pat```. Adapted from fnmatch module.""" + if pat not in _pat_cache: + _pat_cache[pat] = re.compile(_translate_pattern(pat)) + return _pat_cache[pat].match(name) + + +def patfilter(names: Iterable[str], pat: str) -> list[str]: + """Return the subset of the list ``names`` that match + the regular expression (pattern) ``pat``. + + Adapted from fnmatch module. + """ + if pat not in _pat_cache: + _pat_cache[pat] = re.compile(_translate_pattern(pat)) + match = _pat_cache[pat].match + return list(filter(match, names)) + + +def get_matching_files( + dirname: str | os.PathLike[str], + include_patterns: Iterable[str] = ("**",), + exclude_patterns: Iterable[str] = (), +) -> Iterator[str]: + """Get all file names in a directory, recursively. + + Filter file names by the glob-style include_patterns and exclude_patterns. + The default values include all files ("**") and exclude nothing (""). + + Only files matching some pattern in *include_patterns* are included, and + exclusions from *exclude_patterns* take priority over inclusions. + + """ + # dirname is a normalized absolute path. + dirname = os.path.normpath(os.path.abspath(dirname)) + + exclude_matchers = compile_matchers(exclude_patterns) + include_matchers = compile_matchers(include_patterns) + + for root, dirs, files in os.walk(dirname, followlinks=True): + relative_root = os.path.relpath(root, dirname) + if relative_root == ".": + relative_root = "" # suppress dirname for files on the target dir + + # Filter files + included_files = [] + for entry in sorted(files): + entry = path_stabilize(os.path.join(relative_root, entry)) + keep = False + for matcher in include_matchers: + if matcher(entry): + keep = True + break # break the inner loop + + for matcher in exclude_matchers: + if matcher(entry): + keep = False + break # break the inner loop + + if keep: + included_files.append(entry) + + # Filter directories + filtered_dirs = [] + for dir_name in sorted(dirs): + normalised = path_stabilize(os.path.join(relative_root, dir_name)) + for matcher in exclude_matchers: + if matcher(normalised): + break # break the inner loop + else: + # if the loop didn't break + filtered_dirs.append(dir_name) + + dirs[:] = filtered_dirs + + # Yield filtered files + yield from included_files diff --git a/sphinx/util/math.py b/sphinx/util/math.py new file mode 100644 index 0000000..ef0eb39 --- /dev/null +++ b/sphinx/util/math.py @@ -0,0 +1,61 @@ +"""Utility functions for math.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from docutils import nodes + + from sphinx.builders.html import HTML5Translator + + +def get_node_equation_number(writer: HTML5Translator, node: nodes.math_block) -> str: + if writer.builder.config.math_numfig and writer.builder.config.numfig: + figtype = 'displaymath' + if writer.builder.name == 'singlehtml': + key = f"{writer.docnames[-1]}/{figtype}" # type: ignore[has-type] + else: + key = figtype + + id = node['ids'][0] + number = writer.builder.fignumbers.get(key, {}).get(id, ()) + return '.'.join(map(str, number)) + else: + return node['number'] + + +def wrap_displaymath(text: str, label: str | None, numbering: bool) -> str: + def is_equation(part: str) -> str: + return part.strip() + + if label is None: + labeldef = '' + else: + labeldef = r'\label{%s}' % label + numbering = True + + parts = list(filter(is_equation, text.split('\n\n'))) + equations = [] + if len(parts) == 0: + return '' + elif len(parts) == 1: + if numbering: + begin = r'\begin{equation}' + labeldef + end = r'\end{equation}' + else: + begin = r'\begin{equation*}' + labeldef + end = r'\end{equation*}' + equations.append('\\begin{split}%s\\end{split}\n' % parts[0]) + else: + if numbering: + begin = r'\begin{align}%s\!\begin{aligned}' % labeldef + end = r'\end{aligned}\end{align}' + else: + begin = r'\begin{align*}%s\!\begin{aligned}' % labeldef + end = r'\end{aligned}\end{align*}' + for part in parts: + equations.append('%s\\\\\n' % part.strip()) + + concatenated_equations = ''.join(equations) + return f'{begin}\n{concatenated_equations}{end}' diff --git a/sphinx/util/nodes.py b/sphinx/util/nodes.py new file mode 100644 index 0000000..b68b7fd --- /dev/null +++ b/sphinx/util/nodes.py @@ -0,0 +1,672 @@ +"""Docutils node-related utility functions for Sphinx.""" + +from __future__ import annotations + +import contextlib +import re +import unicodedata +from typing import TYPE_CHECKING, Any, Callable + +from docutils import nodes + +from sphinx import addnodes +from sphinx.locale import __ +from sphinx.util import logging + +if TYPE_CHECKING: + from collections.abc import Iterable + + from docutils.nodes import Element, Node + from docutils.parsers.rst import Directive + from docutils.parsers.rst.states import Inliner + from docutils.statemachine import StringList + + from sphinx.builders import Builder + from sphinx.environment import BuildEnvironment + from sphinx.util.tags import Tags + +logger = logging.getLogger(__name__) + + +# \x00 means the "<" was backslash-escaped +explicit_title_re = re.compile(r'^(.+?)\s*(?$', re.DOTALL) +caption_ref_re = explicit_title_re # b/w compat alias + + +class NodeMatcher: + """A helper class for Node.findall(). + + It checks that the given node is an instance of the specified node-classes and + has the specified node-attributes. + + For example, following example searches ``reference`` node having ``refdomain`` + and ``reftype`` attributes:: + + matcher = NodeMatcher(nodes.reference, refdomain='std', reftype='citation') + doctree.findall(matcher) + # => [, , ...] + + A special value ``typing.Any`` matches any kind of node-attributes. For example, + following example searches ``reference`` node having ``refdomain`` attributes:: + + from __future__ import annotations +from typing import TYPE_CHECKING, Any + matcher = NodeMatcher(nodes.reference, refdomain=Any) + doctree.findall(matcher) + # => [, , ...] + """ + + def __init__(self, *node_classes: type[Node], **attrs: Any) -> None: + self.classes = node_classes + self.attrs = attrs + + def match(self, node: Node) -> bool: + try: + if self.classes and not isinstance(node, self.classes): + return False + + if self.attrs: + if not isinstance(node, nodes.Element): + return False + + for key, value in self.attrs.items(): + if key not in node: + return False + elif value is Any: + continue + elif node.get(key) != value: + return False + + return True + except Exception: + # for non-Element nodes + return False + + def __call__(self, node: Node) -> bool: + return self.match(node) + + +def get_full_module_name(node: Node) -> str: + """ + Return full module dotted path like: 'docutils.nodes.paragraph' + + :param nodes.Node node: target node + :return: full module dotted path + """ + return f'{node.__module__}.{node.__class__.__name__}' + + +def repr_domxml(node: Node, length: int = 80) -> str: + """ + return DOM XML representation of the specified node like: + 'New in version...' + + :param nodes.Node node: target node + :param int length: + length of return value to be striped. if false-value is specified, repr_domxml + returns full of DOM XML representation. + :return: DOM XML representation + """ + try: + text = node.asdom().toxml() + except Exception: + text = str(node) + if length and len(text) > length: + text = text[:length] + '...' + return text + + +def apply_source_workaround(node: Element) -> None: + # workaround: nodes.term have wrong rawsource if classifier is specified. + # The behavior of docutils-0.11, 0.12 is: + # * when ``term text : classifier1 : classifier2`` is specified, + # * rawsource of term node will have: ``term text : classifier1 : classifier2`` + # * rawsource of classifier node will be None + if isinstance(node, nodes.classifier) and not node.rawsource: + logger.debug('[i18n] PATCH: %r to have source, line and rawsource: %s', + get_full_module_name(node), repr_domxml(node)) + definition_list_item = node.parent + node.source = definition_list_item.source + node.line = definition_list_item.line - 1 + node.rawsource = node.astext() # set 'classifier1' (or 'classifier2') + elif isinstance(node, nodes.classifier) and not node.source: + # docutils-0.15 fills in rawsource attribute, but not in source. + node.source = node.parent.source + if isinstance(node, nodes.image) and node.source is None: + logger.debug('[i18n] PATCH: %r to have source, line: %s', + get_full_module_name(node), repr_domxml(node)) + node.source, node.line = node.parent.source, node.parent.line + if isinstance(node, nodes.title) and node.source is None: + logger.debug('[i18n] PATCH: %r to have source: %s', + get_full_module_name(node), repr_domxml(node)) + node.source, node.line = node.parent.source, node.parent.line + if isinstance(node, nodes.term): + logger.debug('[i18n] PATCH: %r to have rawsource: %s', + get_full_module_name(node), repr_domxml(node)) + # strip classifier from rawsource of term + for classifier in reversed(list(node.parent.findall(nodes.classifier))): + node.rawsource = re.sub(r'\s*:\s*%s' % re.escape(classifier.astext()), + '', node.rawsource) + if isinstance(node, nodes.topic) and node.source is None: + # docutils-0.18 does not fill the source attribute of topic + logger.debug('[i18n] PATCH: %r to have source, line: %s', + get_full_module_name(node), repr_domxml(node)) + node.source, node.line = node.parent.source, node.parent.line + + # workaround: literal_block under bullet list (#4913) + if isinstance(node, nodes.literal_block) and node.source is None: + with contextlib.suppress(ValueError): + node.source = get_node_source(node) + + # workaround: recommonmark-0.2.0 doesn't set rawsource attribute + if not node.rawsource: + node.rawsource = node.astext() + + if node.source and node.rawsource: + return + + # workaround: some docutils nodes doesn't have source, line. + if (isinstance(node, ( + nodes.rubric, # #1305 rubric directive + nodes.line, # #1477 line node + nodes.image, # #3093 image directive in substitution + nodes.field_name, # #3335 field list syntax + ))): + logger.debug('[i18n] PATCH: %r to have source and line: %s', + get_full_module_name(node), repr_domxml(node)) + try: + node.source = get_node_source(node) + except ValueError: + node.source = '' + node.line = 0 # need fix docutils to get `node.line` + return + + +IGNORED_NODES = ( + nodes.Invisible, + nodes.literal_block, + nodes.doctest_block, + addnodes.versionmodified, + # XXX there are probably more +) + + +def is_translatable(node: Node) -> bool: + if isinstance(node, addnodes.translatable): + return True + + # image node marked as translatable or having alt text + if isinstance(node, nodes.image) and (node.get('translatable') or node.get('alt')): + return True + + if isinstance(node, nodes.Inline) and 'translatable' not in node: # type: ignore[operator] + # inline node must not be translated if 'translatable' is not set + return False + + if isinstance(node, nodes.TextElement): + if not node.source: + logger.debug('[i18n] SKIP %r because no node.source: %s', + get_full_module_name(node), repr_domxml(node)) + return False # built-in message + if isinstance(node, IGNORED_NODES) and 'translatable' not in node: + logger.debug("[i18n] SKIP %r because node is in IGNORED_NODES " + "and no node['translatable']: %s", + get_full_module_name(node), repr_domxml(node)) + return False + if not node.get('translatable', True): + # not(node['translatable'] == True or node['translatable'] is None) + logger.debug("[i18n] SKIP %r because not node['translatable']: %s", + get_full_module_name(node), repr_domxml(node)) + return False + # orphan + # XXX ignore all metadata (== docinfo) + if isinstance(node, nodes.field_name) and node.children[0] == 'orphan': + logger.debug('[i18n] SKIP %r because orphan node: %s', + get_full_module_name(node), repr_domxml(node)) + return False + return True + + if isinstance(node, nodes.meta): # type: ignore[attr-defined] + return True + + return False + + +LITERAL_TYPE_NODES = ( + nodes.literal_block, + nodes.doctest_block, + nodes.math_block, + nodes.raw, +) +IMAGE_TYPE_NODES = ( + nodes.image, +) + + +def extract_messages(doctree: Element) -> Iterable[tuple[Element, str]]: + """Extract translatable messages from a document tree.""" + for node in doctree.findall(is_translatable): # type: Element + if isinstance(node, addnodes.translatable): + for msg in node.extract_original_messages(): + yield node, msg + continue + if isinstance(node, LITERAL_TYPE_NODES): + msg = node.rawsource + if not msg: + msg = node.astext() + elif isinstance(node, nodes.image): + if node.get('alt'): + yield node, node['alt'] + if node.get('translatable'): + image_uri = node.get('original_uri', node['uri']) + msg = f'.. image:: {image_uri}' + else: + msg = '' + elif isinstance(node, nodes.meta): # type: ignore[attr-defined] + msg = node["content"] + else: + msg = node.rawsource.replace('\n', ' ').strip() + + # XXX nodes rendering empty are likely a bug in sphinx.addnodes + if msg: + yield node, msg + + +def get_node_source(node: Element) -> str: + for pnode in traverse_parent(node): + if pnode.source: + return pnode.source + msg = 'node source not found' + raise ValueError(msg) + + +def get_node_line(node: Element) -> int: + for pnode in traverse_parent(node): + if pnode.line: + return pnode.line + msg = 'node line not found' + raise ValueError(msg) + + +def traverse_parent(node: Element, cls: Any = None) -> Iterable[Element]: + while node: + if cls is None or isinstance(node, cls): + yield node + node = node.parent + + +def get_prev_node(node: Node) -> Node | None: + pos = node.parent.index(node) + if pos > 0: + return node.parent[pos - 1] + else: + return None + + +def traverse_translatable_index( + doctree: Element, +) -> Iterable[tuple[Element, list[tuple[str, str, str, str, str | None]]]]: + """Traverse translatable index node from a document tree.""" + matcher = NodeMatcher(addnodes.index, inline=False) + for node in doctree.findall(matcher): # type: addnodes.index + if 'raw_entries' in node: + entries = node['raw_entries'] + else: + entries = node['entries'] + yield node, entries + + +def nested_parse_with_titles(state: Any, content: StringList, node: Node, + content_offset: int = 0) -> str: + """Version of state.nested_parse() that allows titles and does not require + titles to have the same decoration as the calling document. + + This is useful when the parsed content comes from a completely different + context, such as docstrings. + """ + # hack around title style bookkeeping + surrounding_title_styles = state.memo.title_styles + surrounding_section_level = state.memo.section_level + state.memo.title_styles = [] + state.memo.section_level = 0 + try: + return state.nested_parse(content, content_offset, node, match_titles=1) + finally: + state.memo.title_styles = surrounding_title_styles + state.memo.section_level = surrounding_section_level + + +def clean_astext(node: Element) -> str: + """Like node.astext(), but ignore images.""" + node = node.deepcopy() + for img in node.findall(nodes.image): + img['alt'] = '' + for raw in list(node.findall(nodes.raw)): + raw.parent.remove(raw) + return node.astext() + + +def split_explicit_title(text: str) -> tuple[bool, str, str]: + """Split role content into title and target, if given.""" + match = explicit_title_re.match(text) + if match: + return True, match.group(1), match.group(2) + return False, text, text + + +indextypes = [ + 'single', 'pair', 'double', 'triple', 'see', 'seealso', +] + + +def process_index_entry(entry: str, targetid: str, + ) -> list[tuple[str, str, str, str, str | None]]: + from sphinx.domains.python import pairindextypes + + indexentries: list[tuple[str, str, str, str, str | None]] = [] + entry = entry.strip() + oentry = entry + main = '' + if entry.startswith('!'): + main = 'main' + entry = entry[1:].lstrip() + for index_type in pairindextypes: + if entry.startswith(f'{index_type}:'): + value = entry[len(index_type) + 1:].strip() + value = f'{pairindextypes[index_type]}; {value}' + # xref RemovedInSphinx90Warning + logger.warning(__('%r is deprecated for index entries (from entry %r). ' + "Use 'pair: %s' instead."), + index_type, entry, value, type='index') + indexentries.append(('pair', value, targetid, main, None)) + break + else: + for index_type in indextypes: + if entry.startswith(f'{index_type}:'): + value = entry[len(index_type) + 1:].strip() + if index_type == 'double': + index_type = 'pair' + indexentries.append((index_type, value, targetid, main, None)) + break + # shorthand notation for single entries + else: + for value in oentry.split(','): + value = value.strip() + main = '' + if value.startswith('!'): + main = 'main' + value = value[1:].lstrip() + if not value: + continue + indexentries.append(('single', value, targetid, main, None)) + return indexentries + + +def inline_all_toctrees(builder: Builder, docnameset: set[str], docname: str, + tree: nodes.document, colorfunc: Callable, traversed: list[str], + ) -> nodes.document: + """Inline all toctrees in the *tree*. + + Record all docnames in *docnameset*, and output docnames with *colorfunc*. + """ + tree = tree.deepcopy() + for toctreenode in list(tree.findall(addnodes.toctree)): + newnodes = [] + includefiles = map(str, toctreenode['includefiles']) + for includefile in includefiles: + if includefile not in traversed: + try: + traversed.append(includefile) + logger.info(colorfunc(includefile) + " ", nonl=True) + subtree = inline_all_toctrees(builder, docnameset, includefile, + builder.env.get_doctree(includefile), + colorfunc, traversed) + docnameset.add(includefile) + except Exception: + logger.warning(__('toctree contains ref to nonexisting file %r'), + includefile, location=docname) + else: + sof = addnodes.start_of_file(docname=includefile) + sof.children = subtree.children + for sectionnode in sof.findall(nodes.section): + if 'docname' not in sectionnode: + sectionnode['docname'] = includefile + newnodes.append(sof) + toctreenode.parent.replace(toctreenode, newnodes) + return tree + + +def _make_id(string: str) -> str: + """Convert `string` into an identifier and return it. + + This function is a modified version of ``docutils.nodes.make_id()`` of + docutils-0.16. + + Changes: + + * Allow to use capital alphabet characters + * Allow to use dots (".") and underscores ("_") for an identifier + without a leading character. + + # Author: David Goodger + # Maintainer: docutils-develop@lists.sourceforge.net + # Copyright: This module has been placed in the public domain. + """ + id = string.translate(_non_id_translate_digraphs) + id = id.translate(_non_id_translate) + # get rid of non-ascii characters. + # 'ascii' lowercase to prevent problems with turkish locale. + id = unicodedata.normalize('NFKD', id).encode('ascii', 'ignore').decode('ascii') + # shrink runs of whitespace and replace by hyphen + id = _non_id_chars.sub('-', ' '.join(id.split())) + id = _non_id_at_ends.sub('', id) + return str(id) + + +_non_id_chars = re.compile('[^a-zA-Z0-9._]+') +_non_id_at_ends = re.compile('^[-0-9._]+|-+$') +_non_id_translate = { + 0x00f8: 'o', # o with stroke + 0x0111: 'd', # d with stroke + 0x0127: 'h', # h with stroke + 0x0131: 'i', # dotless i + 0x0142: 'l', # l with stroke + 0x0167: 't', # t with stroke + 0x0180: 'b', # b with stroke + 0x0183: 'b', # b with topbar + 0x0188: 'c', # c with hook + 0x018c: 'd', # d with topbar + 0x0192: 'f', # f with hook + 0x0199: 'k', # k with hook + 0x019a: 'l', # l with bar + 0x019e: 'n', # n with long right leg + 0x01a5: 'p', # p with hook + 0x01ab: 't', # t with palatal hook + 0x01ad: 't', # t with hook + 0x01b4: 'y', # y with hook + 0x01b6: 'z', # z with stroke + 0x01e5: 'g', # g with stroke + 0x0225: 'z', # z with hook + 0x0234: 'l', # l with curl + 0x0235: 'n', # n with curl + 0x0236: 't', # t with curl + 0x0237: 'j', # dotless j + 0x023c: 'c', # c with stroke + 0x023f: 's', # s with swash tail + 0x0240: 'z', # z with swash tail + 0x0247: 'e', # e with stroke + 0x0249: 'j', # j with stroke + 0x024b: 'q', # q with hook tail + 0x024d: 'r', # r with stroke + 0x024f: 'y', # y with stroke +} +_non_id_translate_digraphs = { + 0x00df: 'sz', # ligature sz + 0x00e6: 'ae', # ae + 0x0153: 'oe', # ligature oe + 0x0238: 'db', # db digraph + 0x0239: 'qp', # qp digraph +} + + +def make_id(env: BuildEnvironment, document: nodes.document, + prefix: str = '', term: str | None = None) -> str: + """Generate an appropriate node_id for given *prefix* and *term*.""" + node_id = None + if prefix: + idformat = prefix + "-%s" + else: + idformat = (document.settings.id_prefix or "id") + "%s" + + # try to generate node_id by *term* + if prefix and term: + node_id = _make_id(idformat % term) + if node_id == prefix: + # *term* is not good to generate a node_id. + node_id = None + elif term: + node_id = _make_id(term) + if node_id == '': + node_id = None # fallback to None + + while node_id is None or node_id in document.ids: + node_id = idformat % env.new_serialno(prefix) + + return node_id + + +def find_pending_xref_condition(node: addnodes.pending_xref, condition: str, + ) -> Element | None: + """Pick matched pending_xref_condition node up from the pending_xref.""" + for subnode in node: + if (isinstance(subnode, addnodes.pending_xref_condition) and + subnode.get('condition') == condition): + return subnode + return None + + +def make_refnode(builder: Builder, fromdocname: str, todocname: str, targetid: str | None, + child: Node | list[Node], title: str | None = None, + ) -> nodes.reference: + """Shortcut to create a reference node.""" + node = nodes.reference('', '', internal=True) + if fromdocname == todocname and targetid: + node['refid'] = targetid + else: + if targetid: + node['refuri'] = (builder.get_relative_uri(fromdocname, todocname) + + '#' + targetid) + else: + node['refuri'] = builder.get_relative_uri(fromdocname, todocname) + if title: + node['reftitle'] = title + node += child + return node + + +def set_source_info(directive: Directive, node: Node) -> None: + node.source, node.line = \ + directive.state_machine.get_source_and_line(directive.lineno) + + +def set_role_source_info(inliner: Inliner, lineno: int, node: Node) -> None: + gsal = inliner.reporter.get_source_and_line # type: ignore[attr-defined] + node.source, node.line = gsal(lineno) + + +def copy_source_info(src: Element, dst: Element) -> None: + with contextlib.suppress(ValueError): + dst.source = get_node_source(src) + dst.line = get_node_line(src) + + +NON_SMARTQUOTABLE_PARENT_NODES = ( + nodes.FixedTextElement, + nodes.literal, + nodes.math, + nodes.image, + nodes.raw, + nodes.problematic, + addnodes.not_smartquotable, +) + + +def is_smartquotable(node: Node) -> bool: + """Check whether the node is smart-quotable or not.""" + for pnode in traverse_parent(node.parent): + if isinstance(pnode, NON_SMARTQUOTABLE_PARENT_NODES): + return False + if pnode.get('support_smartquotes', None) is False: + return False + + if getattr(node, 'support_smartquotes', None) is False: + return False + + return True + + +def process_only_nodes(document: Node, tags: Tags) -> None: + """Filter ``only`` nodes which do not match *tags*.""" + for node in document.findall(addnodes.only): + if _only_node_keep_children(node, tags): + node.replace_self(node.children or nodes.comment()) + else: + # A comment on the comment() nodes being inserted: replacing by [] would + # result in a "Losing ids" exception if there is a target node before + # the only node, so we make sure docutils can transfer the id to + # something, even if it's just a comment and will lose the id anyway... + node.replace_self(nodes.comment()) + + +def _only_node_keep_children(node: addnodes.only, tags: Tags) -> bool: + """Keep children if tags match or error.""" + try: + return tags.eval_condition(node['expr']) + except Exception as err: + logger.warning( + __('exception while evaluating only directive expression: %s'), + err, + location=node) + return True + + +def _copy_except__document(el: Element) -> Element: + """Monkey-patch ```nodes.Element.copy``` to not copy the ``_document`` + attribute. + + xref: https://github.com/sphinx-doc/sphinx/issues/11116#issuecomment-1376767086 + """ + newnode = object.__new__(el.__class__) + # set in Element.__init__() + newnode.children = [] + newnode.rawsource = el.rawsource + newnode.tagname = el.tagname + # copied in Element.copy() + newnode.attributes = {k: (v + if k not in {'ids', 'classes', 'names', 'dupnames', 'backrefs'} + else v[:]) + for k, v in el.attributes.items()} + newnode.line = el.line + newnode.source = el.source + return newnode + + +nodes.Element.copy = _copy_except__document # type: ignore[assignment] + + +def _deepcopy(el: Element) -> Element: + """Monkey-patch ```nodes.Element.deepcopy``` for speed.""" + newnode = el.copy() + newnode.children = [child.deepcopy() for child in el.children] + for child in newnode.children: + child.parent = newnode + if el.document: + child.document = el.document + if child.source is None: + child.source = el.document.current_source + if child.line is None: + child.line = el.document.current_line + return newnode + + +nodes.Element.deepcopy = _deepcopy # type: ignore[assignment] diff --git a/sphinx/util/osutil.py b/sphinx/util/osutil.py new file mode 100644 index 0000000..c6adbe4 --- /dev/null +++ b/sphinx/util/osutil.py @@ -0,0 +1,217 @@ +"""Operating system-related utility functions for Sphinx.""" + +from __future__ import annotations + +import contextlib +import filecmp +import os +import re +import shutil +import sys +import unicodedata +from io import StringIO +from os import path +from typing import TYPE_CHECKING, Any + +from sphinx.deprecation import _deprecation_warning + +if TYPE_CHECKING: + from collections.abc import Iterator + +# SEP separates path elements in the canonical file names +# +# Define SEP as a manifest constant, not so much because we expect it to change +# in the future as to avoid the suspicion that a stray "/" in the code is a +# hangover from more *nix-oriented origins. +SEP = "/" + + +def os_path(canonical_path: str, /) -> str: + return canonical_path.replace(SEP, path.sep) + + +def canon_path(native_path: str | os.PathLike[str], /) -> str: + """Return path in OS-independent form""" + return os.fspath(native_path).replace(path.sep, SEP) + + +def path_stabilize(filepath: str | os.PathLike[str], /) -> str: + "Normalize path separator and unicode string" + new_path = canon_path(filepath) + return unicodedata.normalize('NFC', new_path) + + +def relative_uri(base: str, to: str) -> str: + """Return a relative URL from ``base`` to ``to``.""" + if to.startswith(SEP): + return to + b2 = base.split('#')[0].split(SEP) + t2 = to.split('#')[0].split(SEP) + # remove common segments (except the last segment) + for x, y in zip(b2[:-1], t2[:-1]): + if x != y: + break + b2.pop(0) + t2.pop(0) + if b2 == t2: + # Special case: relative_uri('f/index.html','f/index.html') + # returns '', not 'index.html' + return '' + if len(b2) == 1 and t2 == ['']: + # Special case: relative_uri('f/index.html','f/') should + # return './', not '' + return '.' + SEP + return ('..' + SEP) * (len(b2) - 1) + SEP.join(t2) + + +def ensuredir(file: str | os.PathLike[str]) -> None: + """Ensure that a path exists.""" + os.makedirs(file, exist_ok=True) + + +def mtimes_of_files(dirnames: list[str], suffix: str) -> Iterator[float]: + for dirname in dirnames: + for root, _dirs, files in os.walk(dirname): + for sfile in files: + if sfile.endswith(suffix): + with contextlib.suppress(OSError): + yield path.getmtime(path.join(root, sfile)) + + +def copytimes(source: str | os.PathLike[str], dest: str | os.PathLike[str]) -> None: + """Copy a file's modification times.""" + st = os.stat(source) + if hasattr(os, 'utime'): + os.utime(dest, (st.st_atime, st.st_mtime)) + + +def copyfile(source: str | os.PathLike[str], dest: str | os.PathLike[str]) -> None: + """Copy a file and its modification times, if possible. + + Note: ``copyfile`` skips copying if the file has not been changed""" + if not path.exists(dest) or not filecmp.cmp(source, dest): + shutil.copyfile(source, dest) + with contextlib.suppress(OSError): + # don't do full copystat because the source may be read-only + copytimes(source, dest) + + +no_fn_re = re.compile(r'[^a-zA-Z0-9_-]') +project_suffix_re = re.compile(' Documentation$') + + +def make_filename(string: str) -> str: + return no_fn_re.sub('', string) or 'sphinx' + + +def make_filename_from_project(project: str) -> str: + return make_filename(project_suffix_re.sub('', project)).lower() + + +def relpath(path: str | os.PathLike[str], + start: str | os.PathLike[str] | None = os.curdir) -> str: + """Return a relative filepath to *path* either from the current directory or + from an optional *start* directory. + + This is an alternative of ``os.path.relpath()``. This returns original path + if *path* and *start* are on different drives (for Windows platform). + """ + try: + return os.path.relpath(path, start) + except ValueError: + return str(path) + + +safe_relpath = relpath # for compatibility +fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding() + + +abspath = path.abspath + + +class _chdir: + """Remove this fall-back once support for Python 3.10 is removed.""" + def __init__(self, target_dir: str, /): + self.path = target_dir + self._dirs: list[str] = [] + + def __enter__(self): + self._dirs.append(os.getcwd()) + os.chdir(self.path) + + def __exit__(self, _exc_type, _exc_value, _traceback, /): + os.chdir(self._dirs.pop()) + + +@contextlib.contextmanager +def cd(target_dir: str) -> Iterator[None]: + if sys.version_info[:2] >= (3, 11): + _deprecation_warning(__name__, 'cd', 'contextlib.chdir', remove=(8, 0)) + with _chdir(target_dir): + yield + + +class FileAvoidWrite: + """File-like object that buffers output and only writes if content changed. + + Use this class like when writing to a file to avoid touching the original + file if the content hasn't changed. This is useful in scenarios where file + mtime is used to invalidate caches or trigger new behavior. + + When writing to this file handle, all writes are buffered until the object + is closed. + + Objects can be used as context managers. + """ + def __init__(self, path: str) -> None: + self._path = path + self._io: StringIO | None = None + + def write(self, data: str) -> None: + if not self._io: + self._io = StringIO() + self._io.write(data) + + def close(self) -> None: + """Stop accepting writes and write file, if needed.""" + if not self._io: + msg = 'FileAvoidWrite does not support empty files.' + raise Exception(msg) + + buf = self.getvalue() + self._io.close() + + try: + with open(self._path, encoding='utf-8') as old_f: + old_content = old_f.read() + if old_content == buf: + return + except OSError: + pass + + with open(self._path, 'w', encoding='utf-8') as f: + f.write(buf) + + def __enter__(self) -> FileAvoidWrite: + return self + + def __exit__( + self, exc_type: type[Exception], exc_value: Exception, traceback: Any, + ) -> bool: + self.close() + return True + + def __getattr__(self, name: str) -> Any: + # Proxy to _io instance. + if not self._io: + msg = 'Must write to FileAvoidWrite before other methods can be used' + raise Exception(msg) + + return getattr(self._io, name) + + +def rmtree(path: str) -> None: + if os.path.isdir(path): + shutil.rmtree(path) + else: + os.remove(path) diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py new file mode 100644 index 0000000..0afdff9 --- /dev/null +++ b/sphinx/util/parallel.py @@ -0,0 +1,154 @@ +"""Parallel building utilities.""" + +from __future__ import annotations + +import os +import time +import traceback +from math import sqrt +from typing import TYPE_CHECKING, Any, Callable + +try: + import multiprocessing + HAS_MULTIPROCESSING = True +except ImportError: + HAS_MULTIPROCESSING = False + +from sphinx.errors import SphinxParallelError +from sphinx.util import logging + +if TYPE_CHECKING: + from collections.abc import Sequence + +logger = logging.getLogger(__name__) + +# our parallel functionality only works for the forking Process +parallel_available = multiprocessing and os.name == 'posix' + + +class SerialTasks: + """Has the same interface as ParallelTasks, but executes tasks directly.""" + + def __init__(self, nproc: int = 1) -> None: + pass + + def add_task( + self, task_func: Callable, arg: Any = None, result_func: Callable | None = None, + ) -> None: + if arg is not None: + res = task_func(arg) + else: + res = task_func() + if result_func: + result_func(res) + + def join(self) -> None: + pass + + +class ParallelTasks: + """Executes *nproc* tasks in parallel after forking.""" + + def __init__(self, nproc: int) -> None: + self.nproc = nproc + # (optional) function performed by each task on the result of main task + self._result_funcs: dict[int, Callable] = {} + # task arguments + self._args: dict[int, list[Any] | None] = {} + # list of subprocesses (both started and waiting) + self._procs: dict[int, Any] = {} + # list of receiving pipe connections of running subprocesses + self._precvs: dict[int, Any] = {} + # list of receiving pipe connections of waiting subprocesses + self._precvsWaiting: dict[int, Any] = {} + # number of working subprocesses + self._pworking = 0 + # task number of each subprocess + self._taskid = 0 + + def _process(self, pipe: Any, func: Callable, arg: Any) -> None: + try: + collector = logging.LogCollector() + with collector.collect(): + if arg is None: + ret = func() + else: + ret = func(arg) + failed = False + except BaseException as err: + failed = True + errmsg = traceback.format_exception_only(err.__class__, err)[0].strip() + ret = (errmsg, traceback.format_exc()) + logging.convert_serializable(collector.logs) + pipe.send((failed, collector.logs, ret)) + + def add_task( + self, task_func: Callable, arg: Any = None, result_func: Callable | None = None, + ) -> None: + tid = self._taskid + self._taskid += 1 + self._result_funcs[tid] = result_func or (lambda arg, result: None) + self._args[tid] = arg + precv, psend = multiprocessing.Pipe(False) + context: Any = multiprocessing.get_context('fork') + proc = context.Process(target=self._process, args=(psend, task_func, arg)) + self._procs[tid] = proc + self._precvsWaiting[tid] = precv + self._join_one() + + def join(self) -> None: + try: + while self._pworking: + if not self._join_one(): + time.sleep(0.02) + finally: + # shutdown other child processes on failure + self.terminate() + + def terminate(self) -> None: + for tid in list(self._precvs): + self._procs[tid].terminate() + self._result_funcs.pop(tid) + self._procs.pop(tid) + self._precvs.pop(tid) + self._pworking -= 1 + + def _join_one(self) -> bool: + joined_any = False + for tid, pipe in self._precvs.items(): + if pipe.poll(): + exc, logs, result = pipe.recv() + if exc: + raise SphinxParallelError(*result) + for log in logs: + logger.handle(log) + self._result_funcs.pop(tid)(self._args.pop(tid), result) + self._procs[tid].join() + self._precvs.pop(tid) + self._pworking -= 1 + joined_any = True + break + + while self._precvsWaiting and self._pworking < self.nproc: + newtid, newprecv = self._precvsWaiting.popitem() + self._precvs[newtid] = newprecv + self._procs[newtid].start() + self._pworking += 1 + + return joined_any + + +def make_chunks(arguments: Sequence[str], nproc: int, maxbatch: int = 10) -> list[Any]: + # determine how many documents to read in one go + nargs = len(arguments) + chunksize = nargs // nproc + if chunksize >= maxbatch: + # try to improve batch size vs. number of batches + chunksize = int(sqrt(nargs / nproc * maxbatch)) + if chunksize == 0: + chunksize = 1 + nchunks, rest = divmod(nargs, chunksize) + if rest: + nchunks += 1 + # partition documents in "chunks" that will be written by one Process + return [arguments[i * chunksize:(i + 1) * chunksize] for i in range(nchunks)] diff --git a/sphinx/util/png.py b/sphinx/util/png.py new file mode 100644 index 0000000..6c94219 --- /dev/null +++ b/sphinx/util/png.py @@ -0,0 +1,43 @@ +"""PNG image manipulation helpers.""" + +from __future__ import annotations + +import binascii +import struct + +LEN_IEND = 12 +LEN_DEPTH = 22 + +DEPTH_CHUNK_LEN = struct.pack('!i', 10) +DEPTH_CHUNK_START = b'tEXtDepth\x00' +IEND_CHUNK = b'\x00\x00\x00\x00IEND\xAE\x42\x60\x82' + + +def read_png_depth(filename: str) -> int | None: + """Read the special tEXt chunk indicating the depth from a PNG file.""" + with open(filename, 'rb') as f: + f.seek(- (LEN_IEND + LEN_DEPTH), 2) + depthchunk = f.read(LEN_DEPTH) + if not depthchunk.startswith(DEPTH_CHUNK_LEN + DEPTH_CHUNK_START): + # either not a PNG file or not containing the depth chunk + return None + else: + return struct.unpack('!i', depthchunk[14:18])[0] + + +def write_png_depth(filename: str, depth: int) -> None: + """Write the special tEXt chunk indicating the depth to a PNG file. + + The chunk is placed immediately before the special IEND chunk. + """ + data = struct.pack('!i', depth) + with open(filename, 'r+b') as f: + # seek to the beginning of the IEND chunk + f.seek(-LEN_IEND, 2) + # overwrite it with the depth chunk + f.write(DEPTH_CHUNK_LEN + DEPTH_CHUNK_START + data) + # calculate the checksum over chunk name and data + crc = binascii.crc32(DEPTH_CHUNK_START + data) & 0xffffffff + f.write(struct.pack('!I', crc)) + # replace the IEND chunk + f.write(IEND_CHUNK) diff --git a/sphinx/util/requests.py b/sphinx/util/requests.py new file mode 100644 index 0000000..ec3d8d2 --- /dev/null +++ b/sphinx/util/requests.py @@ -0,0 +1,73 @@ +"""Simple requests package loader""" + +from __future__ import annotations + +import warnings +from typing import Any +from urllib.parse import urlsplit + +import requests +from urllib3.exceptions import InsecureRequestWarning + +import sphinx + +_USER_AGENT = (f'Mozilla/5.0 (X11; Linux x86_64; rv:100.0) Gecko/20100101 Firefox/100.0 ' + f'Sphinx/{sphinx.__version__}') + + +def _get_tls_cacert(url: str, certs: str | dict[str, str] | None) -> str | bool: + """Get additional CA cert for a specific URL.""" + if not certs: + return True + elif isinstance(certs, (str, tuple)): + return certs + else: + hostname = urlsplit(url).netloc + if '@' in hostname: + _, hostname = hostname.split('@', 1) + + return certs.get(hostname, True) + + +def get(url: str, **kwargs: Any) -> requests.Response: + """Sends a GET request like requests.get(). + + This sets up User-Agent header and TLS verification automatically.""" + with _Session() as session: + return session.get(url, **kwargs) + + +def head(url: str, **kwargs: Any) -> requests.Response: + """Sends a HEAD request like requests.head(). + + This sets up User-Agent header and TLS verification automatically.""" + with _Session() as session: + return session.head(url, **kwargs) + + +class _Session(requests.Session): + def request( # type: ignore[override] + self, method: str, url: str, + _user_agent: str = '', + _tls_info: tuple[bool, str | dict[str, str] | None] = (), # type: ignore[assignment] + **kwargs: Any, + ) -> requests.Response: + """Sends a request with an HTTP verb and url. + + This sets up User-Agent header and TLS verification automatically.""" + headers = kwargs.setdefault('headers', {}) + headers.setdefault('User-Agent', _user_agent or _USER_AGENT) + if _tls_info: + tls_verify, tls_cacerts = _tls_info + verify = bool(kwargs.get('verify', tls_verify)) + kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts)) + else: + verify = kwargs.get('verify', True) + + if verify: + return super().request(method, url, **kwargs) + + with warnings.catch_warnings(): + # ignore InsecureRequestWarning if verify=False + warnings.filterwarnings("ignore", category=InsecureRequestWarning) + return super().request(method, url, **kwargs) diff --git a/sphinx/util/rst.py b/sphinx/util/rst.py new file mode 100644 index 0000000..1e8fd66 --- /dev/null +++ b/sphinx/util/rst.py @@ -0,0 +1,110 @@ +"""reST helper functions.""" + +from __future__ import annotations + +import re +from collections import defaultdict +from contextlib import contextmanager +from typing import TYPE_CHECKING +from unicodedata import east_asian_width + +from docutils.parsers.rst import roles +from docutils.parsers.rst.languages import en as english +from docutils.parsers.rst.states import Body +from docutils.utils import Reporter +from jinja2 import Environment, pass_environment + +from sphinx.locale import __ +from sphinx.util import docutils, logging + +if TYPE_CHECKING: + from collections.abc import Generator + + from docutils.statemachine import StringList + +logger = logging.getLogger(__name__) + +FIELD_NAME_RE = re.compile(Body.patterns['field_marker']) +symbols_re = re.compile(r'([!-\-/:-@\[-`{-~])') # symbols without dot(0x2e) +SECTIONING_CHARS = ['=', '-', '~'] + +# width of characters +WIDECHARS: dict[str, str] = defaultdict(lambda: "WF") # WF: Wide + Full-width +WIDECHARS["ja"] = "WFA" # In Japanese, Ambiguous characters also have double width + + +def escape(text: str) -> str: + text = symbols_re.sub(r'\\\1', text) + text = re.sub(r'^\.', r'\.', text) # escape a dot at top + return text + + +def textwidth(text: str, widechars: str = 'WF') -> int: + """Get width of text.""" + def charwidth(char: str, widechars: str) -> int: + if east_asian_width(char) in widechars: + return 2 + else: + return 1 + + return sum(charwidth(c, widechars) for c in text) + + +@pass_environment +def heading(env: Environment, text: str, level: int = 1) -> str: + """Create a heading for *level*.""" + assert level <= 3 + width = textwidth(text, WIDECHARS[env.language]) + sectioning_char = SECTIONING_CHARS[level - 1] + return f'{text}\n{sectioning_char * width}' + + +@contextmanager +def default_role(docname: str, name: str) -> Generator[None, None, None]: + if name: + dummy_reporter = Reporter('', 4, 4) + role_fn, _ = roles.role(name, english, 0, dummy_reporter) + if role_fn: # type: ignore[truthy-function] + docutils.register_role('', role_fn) # type: ignore[arg-type] + else: + logger.warning(__('default role %s not found'), name, location=docname) + + yield + + docutils.unregister_role('') + + +def prepend_prolog(content: StringList, prolog: str) -> None: + """Prepend a string to content body as prolog.""" + if prolog: + pos = 0 + for line in content: + if FIELD_NAME_RE.match(line): + pos += 1 + else: + break + + if pos > 0: + # insert a blank line after docinfo + content.insert(pos, '', '', 0) + pos += 1 + + # insert prolog (after docinfo if exists) + lineno = 0 + for lineno, line in enumerate(prolog.splitlines()): + content.insert(pos + lineno, line, '', lineno) + + content.insert(pos + lineno + 1, '', '', 0) + + +def append_epilog(content: StringList, epilog: str) -> None: + """Append a string to content body as epilog.""" + if epilog: + if len(content) > 0: + source, lineno = content.info(-1) + else: + source = '' + lineno = 0 + content.append('', source, lineno + 1) + for lineno, line in enumerate(epilog.splitlines()): + content.append(line, '', lineno) diff --git a/sphinx/util/tags.py b/sphinx/util/tags.py new file mode 100644 index 0000000..73e1a83 --- /dev/null +++ b/sphinx/util/tags.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from jinja2 import nodes +from jinja2.environment import Environment +from jinja2.parser import Parser + +if TYPE_CHECKING: + from collections.abc import Iterator + + from jinja2.nodes import Node + + +env = Environment() + + +class BooleanParser(Parser): + """ + Only allow condition exprs and/or/not operations. + """ + + def parse_compare(self) -> Node: + node: Node + token = self.stream.current + if token.type == 'name': + if token.value in ('true', 'false', 'True', 'False'): + node = nodes.Const(token.value in ('true', 'True'), + lineno=token.lineno) + elif token.value in ('none', 'None'): + node = nodes.Const(None, lineno=token.lineno) + else: + node = nodes.Name(token.value, 'load', lineno=token.lineno) + next(self.stream) + elif token.type == 'lparen': + next(self.stream) + node = self.parse_expression() + self.stream.expect('rparen') + else: + self.fail(f"unexpected token '{token}'", token.lineno) + return node + + +class Tags: + def __init__(self, tags: list[str] | None = None) -> None: + self.tags = dict.fromkeys(tags or [], True) + + def has(self, tag: str) -> bool: + return tag in self.tags + + __contains__ = has + + def __iter__(self) -> Iterator[str]: + return iter(self.tags) + + def add(self, tag: str) -> None: + self.tags[tag] = True + + def remove(self, tag: str) -> None: + self.tags.pop(tag, None) + + def eval_condition(self, condition: str) -> bool: + # exceptions are handled by the caller + parser = BooleanParser(env, condition, state='variable') + expr = parser.parse_expression() + if not parser.stream.eos: + msg = 'chunk after expression' + raise ValueError(msg) + + def eval_node(node: Node) -> bool: + if isinstance(node, nodes.CondExpr): + if eval_node(node.test): + return eval_node(node.expr1) + else: + return eval_node(node.expr2) + elif isinstance(node, nodes.And): + return eval_node(node.left) and eval_node(node.right) + elif isinstance(node, nodes.Or): + return eval_node(node.left) or eval_node(node.right) + elif isinstance(node, nodes.Not): + return not eval_node(node.node) + elif isinstance(node, nodes.Name): + return self.tags.get(node.name, False) + else: + msg = 'invalid node, check parsing' + raise ValueError(msg) + + return eval_node(expr) diff --git a/sphinx/util/template.py b/sphinx/util/template.py new file mode 100644 index 0000000..a16a7a1 --- /dev/null +++ b/sphinx/util/template.py @@ -0,0 +1,135 @@ +"""Templates utility functions for Sphinx.""" + +from __future__ import annotations + +import os +from functools import partial +from os import path +from typing import TYPE_CHECKING, Any, Callable + +from jinja2 import TemplateNotFound +from jinja2.loaders import BaseLoader +from jinja2.sandbox import SandboxedEnvironment + +from sphinx import package_dir +from sphinx.jinja2glue import SphinxFileSystemLoader +from sphinx.locale import get_translator +from sphinx.util import rst, texescape + +if TYPE_CHECKING: + from collections.abc import Sequence + + from jinja2.environment import Environment + + +class BaseRenderer: + def __init__(self, loader: BaseLoader | None = None) -> None: + self.env = SandboxedEnvironment(loader=loader, extensions=['jinja2.ext.i18n']) + self.env.filters['repr'] = repr + self.env.install_gettext_translations(get_translator()) + + def render(self, template_name: str, context: dict[str, Any]) -> str: + return self.env.get_template(template_name).render(context) + + def render_string(self, source: str, context: dict[str, Any]) -> str: + return self.env.from_string(source).render(context) + + +class FileRenderer(BaseRenderer): + def __init__(self, search_path: Sequence[str | os.PathLike[str]]) -> None: + if isinstance(search_path, (str, os.PathLike)): + search_path = [search_path] + else: + # filter "None" paths + search_path = list(filter(None, search_path)) + + loader = SphinxFileSystemLoader(search_path) + super().__init__(loader) + + @classmethod + def render_from_file(cls, filename: str, context: dict[str, Any]) -> str: + dirname = os.path.dirname(filename) + basename = os.path.basename(filename) + return cls(dirname).render(basename, context) + + +class SphinxRenderer(FileRenderer): + def __init__(self, template_path: Sequence[str | os.PathLike[str]] | None = None) -> None: + if template_path is None: + template_path = os.path.join(package_dir, 'templates') + super().__init__(template_path) + + @classmethod + def render_from_file(cls, filename: str, context: dict[str, Any]) -> str: + return FileRenderer.render_from_file(filename, context) + + +class LaTeXRenderer(SphinxRenderer): + def __init__(self, template_path: Sequence[str | os.PathLike[str]] | None = None, + latex_engine: str | None = None) -> None: + if template_path is None: + template_path = [os.path.join(package_dir, 'templates', 'latex')] + super().__init__(template_path) + + # use texescape as escape filter + escape = partial(texescape.escape, latex_engine=latex_engine) + self.env.filters['e'] = escape + self.env.filters['escape'] = escape + self.env.filters['eabbr'] = texescape.escape_abbr + + # use JSP/eRuby like tagging instead because curly bracket; the default + # tagging of jinja2 is not good for LaTeX sources. + self.env.variable_start_string = '<%=' + self.env.variable_end_string = '%>' + self.env.block_start_string = '<%' + self.env.block_end_string = '%>' + self.env.comment_start_string = '<#' + self.env.comment_end_string = '#>' + + +class ReSTRenderer(SphinxRenderer): + def __init__(self, template_path: Sequence[str | os.PathLike[str]] | None = None, + language: str | None = None) -> None: + super().__init__(template_path) + + # add language to environment + self.env.extend(language=language) + + # use texescape as escape filter + self.env.filters['e'] = rst.escape + self.env.filters['escape'] = rst.escape + self.env.filters['heading'] = rst.heading + + +class SphinxTemplateLoader(BaseLoader): + """A loader supporting template inheritance""" + + def __init__(self, confdir: str | os.PathLike[str], + templates_paths: Sequence[str | os.PathLike[str]], + system_templates_paths: Sequence[str | os.PathLike[str]]) -> None: + self.loaders = [] + self.sysloaders = [] + + for templates_path in templates_paths: + loader = SphinxFileSystemLoader(path.join(confdir, templates_path)) + self.loaders.append(loader) + + for templates_path in system_templates_paths: + loader = SphinxFileSystemLoader(templates_path) + self.loaders.append(loader) + self.sysloaders.append(loader) + + def get_source(self, environment: Environment, template: str) -> tuple[str, str, Callable]: + if template.startswith('!'): + # search a template from ``system_templates_paths`` + loaders = self.sysloaders + template = template[1:] + else: + loaders = self.loaders + + for loader in loaders: + try: + return loader.get_source(environment, template) + except TemplateNotFound: + pass + raise TemplateNotFound(template) diff --git a/sphinx/util/texescape.py b/sphinx/util/texescape.py new file mode 100644 index 0000000..8527441 --- /dev/null +++ b/sphinx/util/texescape.py @@ -0,0 +1,153 @@ +"""TeX escaping helper.""" + +from __future__ import annotations + +import re + +tex_replacements = [ + # map TeX special chars + ('$', r'\$'), + ('%', r'\%'), + ('&', r'\&'), + ('#', r'\#'), + ('_', r'\_'), + ('{', r'\{'), + ('}', r'\}'), + ('\\', r'\textbackslash{}'), + ('~', r'\textasciitilde{}'), + ('^', r'\textasciicircum{}'), + # map chars to avoid mis-interpretation in LaTeX + ('[', r'{[}'), + (']', r'{]}'), + # map special Unicode characters to TeX commands + ('✓', r'\(\checkmark\)'), + ('✔', r'\(\pmb{\checkmark}\)'), + ('✕', r'\(\times\)'), + ('✖', r'\(\pmb{\times}\)'), + # used to separate -- in options + ('', r'{}'), + # map some special Unicode characters to similar ASCII ones + # (even for Unicode LaTeX as may not be supported by OpenType font) + ('⎽', r'\_'), + ('ℯ', r'e'), + ('ⅈ', r'i'), + # Greek alphabet not escaped: pdflatex handles it via textalpha and inputenc + # OHM SIGN U+2126 is handled by LaTeX textcomp package +] + +# A map to avoid TeX ligatures or character replacements in PDF output +# xelatex/lualatex/uplatex are handled differently (#5790, #6888) +ascii_tex_replacements = [ + # Note: the " renders curly in OT1 encoding but straight in T1, T2A, LY1... + # escaping it to \textquotedbl would break documents using OT1 + # Sphinx does \shorthandoff{"} to avoid problems with some languages + # There is no \text... LaTeX escape for the hyphen character - + ('-', r'\sphinxhyphen{}'), # -- and --- are TeX ligatures + # ,, is a TeX ligature in T1 encoding, but escaping the comma adds + # complications (whether by {}, or a macro) and is not done + # the next two require textcomp package + ("'", r'\textquotesingle{}'), # else ' renders curly, and '' is a ligature + ('`', r'\textasciigrave{}'), # else \` and \`\` render curly + ('<', r'\textless{}'), # < is inv. exclam in OT1, << is a T1-ligature + ('>', r'\textgreater{}'), # > is inv. quest. mark in 0T1, >> a T1-ligature +] + +# A map Unicode characters to LaTeX representation +# (for LaTeX engines which don't support unicode) +unicode_tex_replacements = [ + # map some more common Unicode characters to TeX commands + ('¶', r'\P{}'), + ('§', r'\S{}'), + ('€', r'\texteuro{}'), + ('∞', r'\(\infty\)'), + ('±', r'\(\pm\)'), + ('→', r'\(\rightarrow\)'), + ('‣', r'\(\rightarrow\)'), + ('–', r'\textendash{}'), + # superscript + ('⁰', r'\(\sp{\text{0}}\)'), + ('¹', r'\(\sp{\text{1}}\)'), + ('²', r'\(\sp{\text{2}}\)'), + ('³', r'\(\sp{\text{3}}\)'), + ('⁴', r'\(\sp{\text{4}}\)'), + ('⁵', r'\(\sp{\text{5}}\)'), + ('⁶', r'\(\sp{\text{6}}\)'), + ('⁷', r'\(\sp{\text{7}}\)'), + ('⁸', r'\(\sp{\text{8}}\)'), + ('⁹', r'\(\sp{\text{9}}\)'), + # subscript + ('₀', r'\(\sb{\text{0}}\)'), + ('₁', r'\(\sb{\text{1}}\)'), + ('₂', r'\(\sb{\text{2}}\)'), + ('₃', r'\(\sb{\text{3}}\)'), + ('₄', r'\(\sb{\text{4}}\)'), + ('₅', r'\(\sb{\text{5}}\)'), + ('₆', r'\(\sb{\text{6}}\)'), + ('₇', r'\(\sb{\text{7}}\)'), + ('₈', r'\(\sb{\text{8}}\)'), + ('₉', r'\(\sb{\text{9}}\)'), +] + +# TODO: this should be called tex_idescape_map because its only use is in +# sphinx.writers.latex.LaTeXTranslator.idescape() +# %, {, }, \, #, and ~ are the only ones which must be replaced by _ character +# It would be simpler to define it entirely here rather than in init(). +# Unicode replacements are superfluous, as idescape() uses backslashreplace +tex_replace_map: dict[int, str] = {} + +_tex_escape_map: dict[int, str] = {} +_tex_escape_map_without_unicode: dict[int, str] = {} +_tex_hlescape_map: dict[int, str] = {} +_tex_hlescape_map_without_unicode: dict[int, str] = {} + + +def escape(s: str, latex_engine: str | None = None) -> str: + """Escape text for LaTeX output.""" + if latex_engine in ('lualatex', 'xelatex'): + # unicode based LaTeX engine + return s.translate(_tex_escape_map_without_unicode) + else: + return s.translate(_tex_escape_map) + + +def hlescape(s: str, latex_engine: str | None = None) -> str: + """Escape text for LaTeX highlighter.""" + if latex_engine in ('lualatex', 'xelatex'): + # unicode based LaTeX engine + return s.translate(_tex_hlescape_map_without_unicode) + else: + return s.translate(_tex_hlescape_map) + + +def escape_abbr(text: str) -> str: + """Adjust spacing after abbreviations. Works with @ letter or other.""" + return re.sub(r'\.(?=\s|$)', r'.\@{}', text) + + +def init() -> None: + for a, b in tex_replacements: + _tex_escape_map[ord(a)] = b + _tex_escape_map_without_unicode[ord(a)] = b + tex_replace_map[ord(a)] = '_' + + # no reason to do this for _tex_escape_map_without_unicode + for a, b in ascii_tex_replacements: + _tex_escape_map[ord(a)] = b + + # but the hyphen has a specific PDF bookmark problem + # https://github.com/latex3/hyperref/issues/112 + _tex_escape_map_without_unicode[ord('-')] = r'\sphinxhyphen{}' + + for a, b in unicode_tex_replacements: + _tex_escape_map[ord(a)] = b + # This is actually unneeded: + tex_replace_map[ord(a)] = '_' + + for a, b in tex_replacements: + if a in '[]{}\\': + continue + _tex_hlescape_map[ord(a)] = b + _tex_hlescape_map_without_unicode[ord(a)] = b + + for a, b in unicode_tex_replacements: + _tex_hlescape_map[ord(a)] = b diff --git a/sphinx/util/typing.py b/sphinx/util/typing.py new file mode 100644 index 0000000..171420d --- /dev/null +++ b/sphinx/util/typing.py @@ -0,0 +1,402 @@ +"""The composite types for Sphinx.""" + +from __future__ import annotations + +import sys +import typing +from collections.abc import Sequence +from struct import Struct +from types import TracebackType +from typing import TYPE_CHECKING, Any, Callable, ForwardRef, TypeVar, Union + +from docutils import nodes +from docutils.parsers.rst.states import Inliner + +if TYPE_CHECKING: + import enum + +try: + from types import UnionType # type: ignore[attr-defined] # python 3.10 or above +except ImportError: + UnionType = None + +# classes that have incorrect __module__ +INVALID_BUILTIN_CLASSES = { + Struct: 'struct.Struct', # Struct.__module__ == '_struct' + TracebackType: 'types.TracebackType', # TracebackType.__module__ == 'builtins' +} + + +def is_invalid_builtin_class(obj: Any) -> bool: + """Check *obj* is an invalid built-in class.""" + try: + return obj in INVALID_BUILTIN_CLASSES + except TypeError: # unhashable type + return False + + +# Text like nodes which are initialized with text and rawsource +TextlikeNode = Union[nodes.Text, nodes.TextElement] + +# type of None +NoneType = type(None) + +# path matcher +PathMatcher = Callable[[str], bool] + +# common role functions +RoleFunction = Callable[[str, str, str, int, Inliner, dict[str, Any], Sequence[str]], + tuple[list[nodes.Node], list[nodes.system_message]]] + +# A option spec for directive +OptionSpec = dict[str, Callable[[str], Any]] + +# title getter functions for enumerable nodes (see sphinx.domains.std) +TitleGetter = Callable[[nodes.Node], str] + +# inventory data on memory +InventoryItem = tuple[ + str, # project name + str, # project version + str, # URL + str, # display name +] +Inventory = dict[str, dict[str, InventoryItem]] + + +def get_type_hints( + obj: Any, globalns: dict[str, Any] | None = None, localns: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Return a dictionary containing type hints for a function, method, module or class + object. + + This is a simple wrapper of `typing.get_type_hints()` that does not raise an error on + runtime. + """ + from sphinx.util.inspect import safe_getattr # lazy loading + + try: + return typing.get_type_hints(obj, globalns, localns) + except NameError: + # Failed to evaluate ForwardRef (maybe TYPE_CHECKING) + return safe_getattr(obj, '__annotations__', {}) + except AttributeError: + # Failed to evaluate ForwardRef (maybe not runtime checkable) + return safe_getattr(obj, '__annotations__', {}) + except TypeError: + # Invalid object is given. But try to get __annotations__ as a fallback. + return safe_getattr(obj, '__annotations__', {}) + except KeyError: + # a broken class found (refs: https://github.com/sphinx-doc/sphinx/issues/8084) + return {} + + +def is_system_TypeVar(typ: Any) -> bool: + """Check *typ* is system defined TypeVar.""" + modname = getattr(typ, '__module__', '') + return modname == 'typing' and isinstance(typ, TypeVar) + + +def restify(cls: type | None, mode: str = 'fully-qualified-except-typing') -> str: + """Convert python class to a reST reference. + + :param mode: Specify a method how annotations will be stringified. + + 'fully-qualified-except-typing' + Show the module name and qualified name of the annotation except + the "typing" module. + 'smart' + Show the name of the annotation. + """ + from sphinx.ext.autodoc.mock import ismock, ismockmodule # lazy loading + from sphinx.util import inspect # lazy loading + + if mode == 'smart': + modprefix = '~' + else: + modprefix = '' + + try: + if cls is None or cls is NoneType: + return ':py:obj:`None`' + elif cls is Ellipsis: + return '...' + elif isinstance(cls, str): + return cls + elif ismockmodule(cls): + return f':py:class:`{modprefix}{cls.__name__}`' + elif ismock(cls): + return f':py:class:`{modprefix}{cls.__module__}.{cls.__name__}`' + elif is_invalid_builtin_class(cls): + return f':py:class:`{modprefix}{INVALID_BUILTIN_CLASSES[cls]}`' + elif inspect.isNewType(cls): + if sys.version_info[:2] >= (3, 10): + # newtypes have correct module info since Python 3.10+ + return f':py:class:`{modprefix}{cls.__module__}.{cls.__name__}`' + else: + return ':py:class:`%s`' % cls.__name__ + elif UnionType and isinstance(cls, UnionType): + if len(cls.__args__) > 1 and None in cls.__args__: + args = ' | '.join(restify(a, mode) for a in cls.__args__ if a) + return 'Optional[%s]' % args + else: + return ' | '.join(restify(a, mode) for a in cls.__args__) + elif cls.__module__ in ('__builtin__', 'builtins'): + if hasattr(cls, '__args__'): + if not cls.__args__: # Empty tuple, list, ... + return fr':py:class:`{cls.__name__}`\ [{cls.__args__!r}]' + + concatenated_args = ', '.join(restify(arg, mode) for arg in cls.__args__) + return fr':py:class:`{cls.__name__}`\ [{concatenated_args}]' + else: + return ':py:class:`%s`' % cls.__name__ + elif (inspect.isgenericalias(cls) + and cls.__module__ == 'typing' + and cls.__origin__ is Union): # type: ignore[attr-defined] + if (len(cls.__args__) > 1 # type: ignore[attr-defined] + and cls.__args__[-1] is NoneType): # type: ignore[attr-defined] + if len(cls.__args__) > 2: # type: ignore[attr-defined] + args = ', '.join(restify(a, mode) + for a in cls.__args__[:-1]) # type: ignore[attr-defined] + return ':py:obj:`~typing.Optional`\\ [:obj:`~typing.Union`\\ [%s]]' % args + else: + return ':py:obj:`~typing.Optional`\\ [%s]' % restify( + cls.__args__[0], mode) # type: ignore[attr-defined] + else: + args = ', '.join(restify(a, mode) + for a in cls.__args__) # type: ignore[attr-defined] + return ':py:obj:`~typing.Union`\\ [%s]' % args + elif inspect.isgenericalias(cls): + if isinstance(cls.__origin__, typing._SpecialForm): # type: ignore[attr-defined] + text = restify(cls.__origin__, mode) # type: ignore[attr-defined,arg-type] + elif getattr(cls, '_name', None): + cls_name = cls._name # type: ignore[attr-defined] + if cls.__module__ == 'typing': + text = f':py:class:`~{cls.__module__}.{cls_name}`' + else: + text = f':py:class:`{modprefix}{cls.__module__}.{cls_name}`' + else: + text = restify(cls.__origin__, mode) # type: ignore[attr-defined] + + origin = getattr(cls, '__origin__', None) + if not hasattr(cls, '__args__'): # NoQA: SIM114 + pass + elif all(is_system_TypeVar(a) for a in cls.__args__): + # Suppress arguments if all system defined TypeVars (ex. Dict[KT, VT]) + pass + elif (cls.__module__ == 'typing' + and cls._name == 'Callable'): # type: ignore[attr-defined] + args = ', '.join(restify(a, mode) for a in cls.__args__[:-1]) + text += fr"\ [[{args}], {restify(cls.__args__[-1], mode)}]" + elif cls.__module__ == 'typing' and getattr(origin, '_name', None) == 'Literal': + literal_args = [] + for a in cls.__args__: + if inspect.isenumattribute(a): + literal_args.append(_format_literal_enum_arg(a, mode=mode)) + else: + literal_args.append(repr(a)) + text += r"\ [%s]" % ', '.join(literal_args) + del literal_args + elif cls.__args__: + text += r"\ [%s]" % ", ".join(restify(a, mode) for a in cls.__args__) + + return text + elif isinstance(cls, typing._SpecialForm): + return f':py:obj:`~{cls.__module__}.{cls._name}`' # type: ignore[attr-defined] + elif sys.version_info[:2] >= (3, 11) and cls is typing.Any: + # handle bpo-46998 + return f':py:obj:`~{cls.__module__}.{cls.__name__}`' + elif hasattr(cls, '__qualname__'): + if cls.__module__ == 'typing': + return f':py:class:`~{cls.__module__}.{cls.__qualname__}`' + else: + return f':py:class:`{modprefix}{cls.__module__}.{cls.__qualname__}`' + elif isinstance(cls, ForwardRef): + return ':py:class:`%s`' % cls.__forward_arg__ + else: + # not a class (ex. TypeVar) + if cls.__module__ == 'typing': + return f':py:obj:`~{cls.__module__}.{cls.__name__}`' + else: + return f':py:obj:`{modprefix}{cls.__module__}.{cls.__name__}`' + except (AttributeError, TypeError): + return inspect.object_description(cls) + + +def stringify_annotation( + annotation: Any, + /, + mode: str = 'fully-qualified-except-typing', +) -> str: + """Stringify type annotation object. + + :param annotation: The annotation to stringified. + :param mode: Specify a method how annotations will be stringified. + + 'fully-qualified-except-typing' + Show the module name and qualified name of the annotation except + the "typing" module. + 'smart' + Show the name of the annotation. + 'fully-qualified' + Show the module name and qualified name of the annotation. + """ + from sphinx.ext.autodoc.mock import ismock, ismockmodule # lazy loading + from sphinx.util.inspect import isNewType # lazy loading + + if mode not in {'fully-qualified-except-typing', 'fully-qualified', 'smart'}: + msg = ("'mode' must be one of 'fully-qualified-except-typing', " + f"'fully-qualified', or 'smart'; got {mode!r}.") + raise ValueError(msg) + + if mode == 'smart': + module_prefix = '~' + else: + module_prefix = '' + + annotation_qualname = getattr(annotation, '__qualname__', '') + annotation_module = getattr(annotation, '__module__', '') + annotation_name = getattr(annotation, '__name__', '') + annotation_module_is_typing = annotation_module == 'typing' + + if isinstance(annotation, str): + if annotation.startswith("'") and annotation.endswith("'"): + # might be a double Forward-ref'ed type. Go unquoting. + return annotation[1:-1] + else: + return annotation + elif isinstance(annotation, TypeVar): + if annotation_module_is_typing and mode in {'fully-qualified-except-typing', 'smart'}: + return annotation_name + else: + return module_prefix + f'{annotation_module}.{annotation_name}' + elif isNewType(annotation): + if sys.version_info[:2] >= (3, 10): + # newtypes have correct module info since Python 3.10+ + return module_prefix + f'{annotation_module}.{annotation_name}' + else: + return annotation_name + elif not annotation: + return repr(annotation) + elif annotation is NoneType: + return 'None' + elif ismockmodule(annotation): + return module_prefix + annotation_name + elif ismock(annotation): + return module_prefix + f'{annotation_module}.{annotation_name}' + elif is_invalid_builtin_class(annotation): + return module_prefix + INVALID_BUILTIN_CLASSES[annotation] + elif str(annotation).startswith('typing.Annotated'): # for py310+ + pass + elif annotation_module == 'builtins' and annotation_qualname: + if (args := getattr(annotation, '__args__', None)) is not None: # PEP 585 generic + if not args: # Empty tuple, list, ... + return repr(annotation) + + concatenated_args = ', '.join(stringify_annotation(arg, mode) for arg in args) + return f'{annotation_qualname}[{concatenated_args}]' + else: + return annotation_qualname + elif annotation is Ellipsis: + return '...' + + module_prefix = f'{annotation_module}.' + annotation_forward_arg = getattr(annotation, '__forward_arg__', None) + if annotation_qualname or (annotation_module_is_typing and not annotation_forward_arg): + if mode == 'smart': + module_prefix = '~' + module_prefix + if annotation_module_is_typing and mode == 'fully-qualified-except-typing': + module_prefix = '' + else: + module_prefix = '' + + if annotation_module_is_typing: + if annotation_forward_arg: + # handle ForwardRefs + qualname = annotation_forward_arg + else: + _name = getattr(annotation, '_name', '') + if _name: + qualname = _name + elif annotation_qualname: + qualname = annotation_qualname + else: + qualname = stringify_annotation( + annotation.__origin__, 'fully-qualified-except-typing', + ).replace('typing.', '') # ex. Union + elif annotation_qualname: + qualname = annotation_qualname + elif hasattr(annotation, '__origin__'): + # instantiated generic provided by a user + qualname = stringify_annotation(annotation.__origin__, mode) + elif UnionType and isinstance(annotation, UnionType): # types.UnionType (for py3.10+) + qualname = 'types.UnionType' + else: + # we weren't able to extract the base type, appending arguments would + # only make them appear twice + return repr(annotation) + + annotation_args = getattr(annotation, '__args__', None) + if annotation_args: + if not isinstance(annotation_args, (list, tuple)): + # broken __args__ found + pass + elif qualname in {'Optional', 'Union', 'types.UnionType'}: + return ' | '.join(stringify_annotation(a, mode) for a in annotation_args) + elif qualname == 'Callable': + args = ', '.join(stringify_annotation(a, mode) for a in annotation_args[:-1]) + returns = stringify_annotation(annotation_args[-1], mode) + return f'{module_prefix}Callable[[{args}], {returns}]' + elif qualname == 'Literal': + from sphinx.util.inspect import isenumattribute # lazy loading + + def format_literal_arg(arg): + if isenumattribute(arg): + enumcls = arg.__class__ + + if mode == 'smart': + # MyEnum.member + return f'{enumcls.__qualname__}.{arg.name}' + + # module.MyEnum.member + return f'{enumcls.__module__}.{enumcls.__qualname__}.{arg.name}' + return repr(arg) + + args = ', '.join(map(format_literal_arg, annotation_args)) + return f'{module_prefix}Literal[{args}]' + elif str(annotation).startswith('typing.Annotated'): # for py39+ + return stringify_annotation(annotation_args[0], mode) + elif all(is_system_TypeVar(a) for a in annotation_args): + # Suppress arguments if all system defined TypeVars (ex. Dict[KT, VT]) + return module_prefix + qualname + else: + args = ', '.join(stringify_annotation(a, mode) for a in annotation_args) + return f'{module_prefix}{qualname}[{args}]' + + return module_prefix + qualname + + +def _format_literal_enum_arg(arg: enum.Enum, /, *, mode: str) -> str: + enum_cls = arg.__class__ + if mode == 'smart' or enum_cls.__module__ == 'typing': + return f':py:attr:`~{enum_cls.__module__}.{enum_cls.__qualname__}.{arg.name}`' + else: + return f':py:attr:`{enum_cls.__module__}.{enum_cls.__qualname__}.{arg.name}`' + + +# deprecated name -> (object to return, canonical path or empty string) +_DEPRECATED_OBJECTS = { + 'stringify': (stringify_annotation, 'sphinx.util.typing.stringify_annotation'), +} + + +def __getattr__(name): + if name not in _DEPRECATED_OBJECTS: + msg = f'module {__name__!r} has no attribute {name!r}' + raise AttributeError(msg) + + from sphinx.deprecation import _deprecation_warning + + deprecated_object, canonical_name = _DEPRECATED_OBJECTS[name] + _deprecation_warning(__name__, name, canonical_name, remove=(8, 0)) + return deprecated_object -- cgit v1.2.3