summaryrefslogtreecommitdiffstats
path: root/sphinx/util
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 17:25:40 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 17:25:40 +0000
commitcf7da1843c45a4c2df7a749f7886a2d2ba0ee92a (patch)
tree18dcde1a8d1f5570a77cd0c361de3b490d02c789 /sphinx/util
parentInitial commit. (diff)
downloadsphinx-cf7da1843c45a4c2df7a749f7886a2d2ba0ee92a.tar.xz
sphinx-cf7da1843c45a4c2df7a749f7886a2d2ba0ee92a.zip
Adding upstream version 7.2.6.upstream/7.2.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sphinx/util')
-rw-r--r--sphinx/util/__init__.py297
-rw-r--r--sphinx/util/_pathlib.py115
-rw-r--r--sphinx/util/build_phase.py12
-rw-r--r--sphinx/util/cfamily.py464
-rw-r--r--sphinx/util/console.py129
-rw-r--r--sphinx/util/display.py94
-rw-r--r--sphinx/util/docfields.py408
-rw-r--r--sphinx/util/docstrings.py88
-rw-r--r--sphinx/util/docutils.py635
-rw-r--r--sphinx/util/exceptions.py67
-rw-r--r--sphinx/util/fileutil.py100
-rw-r--r--sphinx/util/http_date.py39
-rw-r--r--sphinx/util/i18n.py253
-rw-r--r--sphinx/util/images.py146
-rw-r--r--sphinx/util/index_entries.py27
-rw-r--r--sphinx/util/inspect.py833
-rw-r--r--sphinx/util/inventory.py172
-rw-r--r--sphinx/util/logging.py602
-rw-r--r--sphinx/util/matching.py169
-rw-r--r--sphinx/util/math.py61
-rw-r--r--sphinx/util/nodes.py672
-rw-r--r--sphinx/util/osutil.py217
-rw-r--r--sphinx/util/parallel.py154
-rw-r--r--sphinx/util/png.py43
-rw-r--r--sphinx/util/requests.py73
-rw-r--r--sphinx/util/rst.py110
-rw-r--r--sphinx/util/tags.py88
-rw-r--r--sphinx/util/template.py135
-rw-r--r--sphinx/util/texescape.py153
-rw-r--r--sphinx/util/typing.py402
30 files changed, 6758 insertions, 0 deletions
diff --git a/sphinx/util/__init__.py b/sphinx/util/__init__.py
new file mode 100644
index 0000000..69b2848
--- /dev/null
+++ b/sphinx/util/__init__.py
@@ -0,0 +1,297 @@
+"""Utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import hashlib
+import os
+import posixpath
+import re
+from importlib import import_module
+from os import path
+from typing import IO, Any
+from urllib.parse import parse_qsl, quote_plus, urlencode, urlsplit, urlunsplit
+
+from sphinx.errors import ExtensionError, FiletypeNotFoundError
+from sphinx.locale import __
+from sphinx.util import display as _display
+from sphinx.util import exceptions as _exceptions
+from sphinx.util import http_date as _http_date
+from sphinx.util import index_entries as _index_entries
+from sphinx.util import logging
+from sphinx.util import osutil as _osutil
+from sphinx.util.console import strip_colors # NoQA: F401
+from sphinx.util.matching import patfilter # noqa: F401
+from sphinx.util.nodes import ( # noqa: F401
+ caption_ref_re,
+ explicit_title_re,
+ nested_parse_with_titles,
+ split_explicit_title,
+)
+
+# import other utilities; partly for backwards compatibility, so don't
+# prune unused ones indiscriminately
+from sphinx.util.osutil import ( # noqa: F401
+ SEP,
+ copyfile,
+ copytimes,
+ ensuredir,
+ make_filename,
+ mtimes_of_files,
+ os_path,
+ relative_uri,
+)
+
+logger = logging.getLogger(__name__)
+
+# Generally useful regular expressions.
+ws_re: re.Pattern[str] = re.compile(r'\s+')
+url_re: re.Pattern[str] = re.compile(r'(?P<schema>.+)://.*')
+
+
+# High-level utility functions.
+
+def docname_join(basedocname: str, docname: str) -> str:
+ return posixpath.normpath(posixpath.join('/' + basedocname, '..', docname))[1:]
+
+
+def get_filetype(source_suffix: dict[str, str], filename: str) -> str:
+ for suffix, filetype in source_suffix.items():
+ if filename.endswith(suffix):
+ # If default filetype (None), considered as restructuredtext.
+ return filetype or 'restructuredtext'
+ raise FiletypeNotFoundError
+
+
+class FilenameUniqDict(dict):
+ """
+ A dictionary that automatically generates unique names for its keys,
+ interpreted as filenames, and keeps track of a set of docnames they
+ appear in. Used for images and downloadable files in the environment.
+ """
+ def __init__(self) -> None:
+ self._existing: set[str] = set()
+
+ def add_file(self, docname: str, newfile: str) -> str:
+ if newfile in self:
+ self[newfile][0].add(docname)
+ return self[newfile][1]
+ uniquename = path.basename(newfile)
+ base, ext = path.splitext(uniquename)
+ i = 0
+ while uniquename in self._existing:
+ i += 1
+ uniquename = f'{base}{i}{ext}'
+ self[newfile] = ({docname}, uniquename)
+ self._existing.add(uniquename)
+ return uniquename
+
+ def purge_doc(self, docname: str) -> None:
+ for filename, (docs, unique) in list(self.items()):
+ docs.discard(docname)
+ if not docs:
+ del self[filename]
+ self._existing.discard(unique)
+
+ def merge_other(self, docnames: set[str], other: dict[str, tuple[set[str], Any]]) -> None:
+ for filename, (docs, _unique) in other.items():
+ for doc in docs & set(docnames):
+ self.add_file(doc, filename)
+
+ def __getstate__(self) -> set[str]:
+ return self._existing
+
+ def __setstate__(self, state: set[str]) -> None:
+ self._existing = state
+
+
+def _md5(data=b'', **_kw):
+ """Deprecated wrapper around hashlib.md5
+
+ To be removed in Sphinx 9.0
+ """
+ return hashlib.md5(data, usedforsecurity=False)
+
+
+def _sha1(data=b'', **_kw):
+ """Deprecated wrapper around hashlib.sha1
+
+ To be removed in Sphinx 9.0
+ """
+ return hashlib.sha1(data, usedforsecurity=False)
+
+
+class DownloadFiles(dict):
+ """A special dictionary for download files.
+
+ .. important:: This class would be refactored in nearly future.
+ Hence don't hack this directly.
+ """
+
+ def add_file(self, docname: str, filename: str) -> str:
+ if filename not in self:
+ digest = hashlib.md5(filename.encode(), usedforsecurity=False).hexdigest()
+ dest = f'{digest}/{os.path.basename(filename)}'
+ self[filename] = (set(), dest)
+
+ self[filename][0].add(docname)
+ return self[filename][1]
+
+ def purge_doc(self, docname: str) -> None:
+ for filename, (docs, _dest) in list(self.items()):
+ docs.discard(docname)
+ if not docs:
+ del self[filename]
+
+ def merge_other(self, docnames: set[str], other: dict[str, tuple[set[str], Any]]) -> None:
+ for filename, (docs, _dest) in other.items():
+ for docname in docs & set(docnames):
+ self.add_file(docname, filename)
+
+
+# a regex to recognize coding cookies
+_coding_re = re.compile(r'coding[:=]\s*([-\w.]+)')
+
+
+class UnicodeDecodeErrorHandler:
+ """Custom error handler for open() that warns and replaces."""
+
+ def __init__(self, docname: str) -> None:
+ self.docname = docname
+
+ def __call__(self, error: UnicodeDecodeError) -> tuple[str, int]:
+ linestart = error.object.rfind(b'\n', 0, error.start)
+ lineend = error.object.find(b'\n', error.start)
+ if lineend == -1:
+ lineend = len(error.object)
+ lineno = error.object.count(b'\n', 0, error.start) + 1
+ logger.warning(__('undecodable source characters, replacing with "?": %r'),
+ (error.object[linestart + 1:error.start] + b'>>>' +
+ error.object[error.start:error.end] + b'<<<' +
+ error.object[error.end:lineend]),
+ location=(self.docname, lineno))
+ return ('?', error.end)
+
+
+# Low-level utility functions and classes.
+
+class Tee:
+ """
+ File-like object writing to two streams.
+ """
+ def __init__(self, stream1: IO, stream2: IO) -> None:
+ self.stream1 = stream1
+ self.stream2 = stream2
+
+ def write(self, text: str) -> None:
+ self.stream1.write(text)
+ self.stream2.write(text)
+
+ def flush(self) -> None:
+ if hasattr(self.stream1, 'flush'):
+ self.stream1.flush()
+ if hasattr(self.stream2, 'flush'):
+ self.stream2.flush()
+
+
+def parselinenos(spec: str, total: int) -> list[int]:
+ """Parse a line number spec (such as "1,2,4-6") and return a list of
+ wanted line numbers.
+ """
+ items = []
+ parts = spec.split(',')
+ for part in parts:
+ try:
+ begend = part.strip().split('-')
+ if ['', ''] == begend:
+ raise ValueError
+ if len(begend) == 1:
+ items.append(int(begend[0]) - 1)
+ elif len(begend) == 2:
+ start = int(begend[0] or 1) # left half open (cf. -10)
+ end = int(begend[1] or max(start, total)) # right half open (cf. 10-)
+ if start > end: # invalid range (cf. 10-1)
+ raise ValueError
+ items.extend(range(start - 1, end))
+ else:
+ raise ValueError
+ except ValueError as exc:
+ msg = f'invalid line number spec: {spec!r}'
+ raise ValueError(msg) from exc
+
+ return items
+
+
+def import_object(objname: str, source: str | None = None) -> Any:
+ """Import python object by qualname."""
+ try:
+ objpath = objname.split('.')
+ modname = objpath.pop(0)
+ obj = import_module(modname)
+ for name in objpath:
+ modname += '.' + name
+ try:
+ obj = getattr(obj, name)
+ except AttributeError:
+ obj = import_module(modname)
+
+ return obj
+ except (AttributeError, ImportError) as exc:
+ if source:
+ raise ExtensionError('Could not import %s (needed for %s)' %
+ (objname, source), exc) from exc
+ raise ExtensionError('Could not import %s' % objname, exc) from exc
+
+
+def encode_uri(uri: str) -> str:
+ split = list(urlsplit(uri))
+ split[1] = split[1].encode('idna').decode('ascii')
+ split[2] = quote_plus(split[2].encode(), '/')
+ query = [(q, v.encode()) for (q, v) in parse_qsl(split[3])]
+ split[3] = urlencode(query)
+ return urlunsplit(split)
+
+
+def isurl(url: str) -> bool:
+ """Check *url* is URL or not."""
+ return bool(url) and '://' in url
+
+
+def _xml_name_checker():
+ # to prevent import cycles
+ from sphinx.builders.epub3 import _XML_NAME_PATTERN
+
+ return _XML_NAME_PATTERN
+
+
+# deprecated name -> (object to return, canonical path or empty string)
+_DEPRECATED_OBJECTS = {
+ 'path_stabilize': (_osutil.path_stabilize, 'sphinx.util.osutil.path_stabilize'),
+ 'display_chunk': (_display.display_chunk, 'sphinx.util.display.display_chunk'),
+ 'status_iterator': (_display.status_iterator, 'sphinx.util.display.status_iterator'),
+ 'SkipProgressMessage': (_display.SkipProgressMessage,
+ 'sphinx.util.display.SkipProgressMessage'),
+ 'progress_message': (_display.progress_message, 'sphinx.util.display.progress_message'),
+ 'epoch_to_rfc1123': (_http_date.epoch_to_rfc1123, 'sphinx.http_date.epoch_to_rfc1123'),
+ 'rfc1123_to_epoch': (_http_date.rfc1123_to_epoch, 'sphinx.http_date.rfc1123_to_epoch'),
+ 'save_traceback': (_exceptions.save_traceback, 'sphinx.exceptions.save_traceback'),
+ 'format_exception_cut_frames': (_exceptions.format_exception_cut_frames,
+ 'sphinx.exceptions.format_exception_cut_frames'),
+ 'xmlname_checker': (_xml_name_checker, 'sphinx.builders.epub3._XML_NAME_PATTERN'),
+ 'split_index_msg': (_index_entries.split_index_msg,
+ 'sphinx.util.index_entries.split_index_msg'),
+ 'split_into': (_index_entries.split_index_msg, 'sphinx.util.index_entries.split_into'),
+ 'md5': (_md5, ''),
+ 'sha1': (_sha1, ''),
+}
+
+
+def __getattr__(name):
+ if name not in _DEPRECATED_OBJECTS:
+ msg = f'module {__name__!r} has no attribute {name!r}'
+ raise AttributeError(msg)
+
+ from sphinx.deprecation import _deprecation_warning
+
+ deprecated_object, canonical_name = _DEPRECATED_OBJECTS[name]
+ _deprecation_warning(__name__, name, canonical_name, remove=(8, 0))
+ return deprecated_object
diff --git a/sphinx/util/_pathlib.py b/sphinx/util/_pathlib.py
new file mode 100644
index 0000000..59980e9
--- /dev/null
+++ b/sphinx/util/_pathlib.py
@@ -0,0 +1,115 @@
+"""What follows is awful and will be gone in Sphinx 8"""
+
+from __future__ import annotations
+
+import sys
+import warnings
+from pathlib import Path, PosixPath, PurePath, WindowsPath
+
+from sphinx.deprecation import RemovedInSphinx80Warning
+
+_STR_METHODS = frozenset(str.__dict__)
+_PATH_NAME = Path().__class__.__name__
+
+_MSG = (
+ 'Sphinx 8 will drop support for representing paths as strings. '
+ 'Use "pathlib.Path" or "os.fspath" instead.'
+)
+
+# https://docs.python.org/3/library/stdtypes.html#typesseq-common
+# https://docs.python.org/3/library/stdtypes.html#string-methods
+
+if sys.platform == 'win32':
+ class _StrPath(WindowsPath):
+ def replace(self, old, new, count=-1, /):
+ # replace exists in both Path and str;
+ # in Path it makes filesystem changes, so we use the safer str version
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__().replace(old, new, count)
+
+ def __getattr__(self, item):
+ if item in _STR_METHODS:
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return getattr(self.__str__(), item)
+ msg = f'{_PATH_NAME!r} has no attribute {item!r}'
+ raise AttributeError(msg)
+
+ def __add__(self, other):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__() + other
+
+ def __bool__(self):
+ if not self.__str__():
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return False
+ return True
+
+ def __contains__(self, item):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return item in self.__str__()
+
+ def __eq__(self, other):
+ if isinstance(other, PurePath):
+ return super().__eq__(other)
+ if isinstance(other, str):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__() == other
+ return NotImplemented
+
+ def __hash__(self):
+ return super().__hash__()
+
+ def __getitem__(self, item):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__()[item]
+
+ def __len__(self):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return len(self.__str__())
+else:
+ class _StrPath(PosixPath):
+ def replace(self, old, new, count=-1, /):
+ # replace exists in both Path and str;
+ # in Path it makes filesystem changes, so we use the safer str version
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__().replace(old, new, count)
+
+ def __getattr__(self, item):
+ if item in _STR_METHODS:
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return getattr(self.__str__(), item)
+ msg = f'{_PATH_NAME!r} has no attribute {item!r}'
+ raise AttributeError(msg)
+
+ def __add__(self, other):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__() + other
+
+ def __bool__(self):
+ if not self.__str__():
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return False
+ return True
+
+ def __contains__(self, item):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return item in self.__str__()
+
+ def __eq__(self, other):
+ if isinstance(other, PurePath):
+ return super().__eq__(other)
+ if isinstance(other, str):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__() == other
+ return NotImplemented
+
+ def __hash__(self):
+ return super().__hash__()
+
+ def __getitem__(self, item):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return self.__str__()[item]
+
+ def __len__(self):
+ warnings.warn(_MSG, RemovedInSphinx80Warning, stacklevel=2)
+ return len(self.__str__())
diff --git a/sphinx/util/build_phase.py b/sphinx/util/build_phase.py
new file mode 100644
index 0000000..7f80aa5
--- /dev/null
+++ b/sphinx/util/build_phase.py
@@ -0,0 +1,12 @@
+"""Build phase of Sphinx application."""
+
+from enum import IntEnum
+
+
+class BuildPhase(IntEnum):
+ """Build phase of Sphinx application."""
+ INITIALIZATION = 1
+ READING = 2
+ CONSISTENCY_CHECK = 3
+ RESOLVING = 3
+ WRITING = 4
diff --git a/sphinx/util/cfamily.py b/sphinx/util/cfamily.py
new file mode 100644
index 0000000..a3fdbe3
--- /dev/null
+++ b/sphinx/util/cfamily.py
@@ -0,0 +1,464 @@
+"""Utility functions common to the C and C++ domains."""
+
+from __future__ import annotations
+
+import re
+from copy import deepcopy
+from typing import TYPE_CHECKING, Any, Callable
+
+from docutils import nodes
+
+from sphinx import addnodes
+from sphinx.util import logging
+
+if TYPE_CHECKING:
+ from docutils.nodes import TextElement
+
+ from sphinx.config import Config
+
+logger = logging.getLogger(__name__)
+
+StringifyTransform = Callable[[Any], str]
+
+
+_whitespace_re = re.compile(r'\s+')
+anon_identifier_re = re.compile(r'(@[a-zA-Z0-9_])[a-zA-Z0-9_]*\b')
+identifier_re = re.compile(r'''
+ ( # This 'extends' _anon_identifier_re with the ordinary identifiers,
+ # make sure they are in sync.
+ (~?\b[a-zA-Z_]) # ordinary identifiers
+ | (@[a-zA-Z0-9_]) # our extension for names of anonymous entities
+ )
+ [a-zA-Z0-9_]*\b
+''', flags=re.VERBOSE)
+integer_literal_re = re.compile(r'[1-9][0-9]*(\'[0-9]+)*')
+octal_literal_re = re.compile(r'0[0-7]*(\'[0-7]+)*')
+hex_literal_re = re.compile(r'0[xX][0-9a-fA-F]+(\'[0-9a-fA-F]+)*')
+binary_literal_re = re.compile(r'0[bB][01]+(\'[01]+)*')
+integers_literal_suffix_re = re.compile(r'''
+ # unsigned and/or (long) long, in any order, but at least one of them
+ (
+ ([uU] ([lL] | (ll) | (LL))?)
+ |
+ (([lL] | (ll) | (LL)) [uU]?)
+ )\b
+ # the ending word boundary is important for distinguishing
+ # between suffixes and UDLs in C++
+''', flags=re.VERBOSE)
+float_literal_re = re.compile(r'''
+ [+-]?(
+ # decimal
+ ([0-9]+(\'[0-9]+)*[eE][+-]?[0-9]+(\'[0-9]+)*)
+ | (([0-9]+(\'[0-9]+)*)?\.[0-9]+(\'[0-9]+)*([eE][+-]?[0-9]+(\'[0-9]+)*)?)
+ | ([0-9]+(\'[0-9]+)*\.([eE][+-]?[0-9]+(\'[0-9]+)*)?)
+ # hex
+ | (0[xX][0-9a-fA-F]+(\'[0-9a-fA-F]+)*[pP][+-]?[0-9a-fA-F]+(\'[0-9a-fA-F]+)*)
+ | (0[xX]([0-9a-fA-F]+(\'[0-9a-fA-F]+)*)?\.
+ [0-9a-fA-F]+(\'[0-9a-fA-F]+)*([pP][+-]?[0-9a-fA-F]+(\'[0-9a-fA-F]+)*)?)
+ | (0[xX][0-9a-fA-F]+(\'[0-9a-fA-F]+)*\.([pP][+-]?[0-9a-fA-F]+(\'[0-9a-fA-F]+)*)?)
+ )
+''', flags=re.VERBOSE)
+float_literal_suffix_re = re.compile(r'[fFlL]\b')
+# the ending word boundary is important for distinguishing between suffixes and UDLs in C++
+char_literal_re = re.compile(r'''
+ ((?:u8)|u|U|L)?
+ '(
+ (?:[^\\'])
+ | (\\(
+ (?:['"?\\abfnrtv])
+ | (?:[0-7]{1,3})
+ | (?:x[0-9a-fA-F]{2})
+ | (?:u[0-9a-fA-F]{4})
+ | (?:U[0-9a-fA-F]{8})
+ ))
+ )'
+''', flags=re.VERBOSE)
+
+
+def verify_description_mode(mode: str) -> None:
+ if mode not in ('lastIsName', 'noneIsName', 'markType', 'markName', 'param', 'udl'):
+ raise Exception("Description mode '%s' is invalid." % mode)
+
+
+class NoOldIdError(Exception):
+ # Used to avoid implementing unneeded id generation for old id schemes.
+ pass
+
+
+class ASTBaseBase:
+ def __eq__(self, other: Any) -> bool:
+ if type(self) is not type(other):
+ return False
+ try:
+ for key, value in self.__dict__.items():
+ if value != getattr(other, key):
+ return False
+ except AttributeError:
+ return False
+ return True
+
+ # Defining __hash__ = None is not strictly needed when __eq__ is defined.
+ __hash__ = None # type: ignore[assignment]
+
+ def clone(self) -> Any:
+ return deepcopy(self)
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ raise NotImplementedError(repr(self))
+
+ def __str__(self) -> str:
+ return self._stringify(lambda ast: str(ast))
+
+ def get_display_string(self) -> str:
+ return self._stringify(lambda ast: ast.get_display_string())
+
+ def __repr__(self) -> str:
+ return '<%s>' % self.__class__.__name__
+
+
+################################################################################
+# Attributes
+################################################################################
+
+class ASTAttribute(ASTBaseBase):
+ def describe_signature(self, signode: TextElement) -> None:
+ raise NotImplementedError(repr(self))
+
+
+class ASTCPPAttribute(ASTAttribute):
+ def __init__(self, arg: str) -> None:
+ self.arg = arg
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ return "[[" + self.arg + "]]"
+
+ def describe_signature(self, signode: TextElement) -> None:
+ signode.append(addnodes.desc_sig_punctuation('[[', '[['))
+ signode.append(nodes.Text(self.arg))
+ signode.append(addnodes.desc_sig_punctuation(']]', ']]'))
+
+
+class ASTGnuAttribute(ASTBaseBase):
+ def __init__(self, name: str, args: ASTBaseParenExprList | None) -> None:
+ self.name = name
+ self.args = args
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ res = [self.name]
+ if self.args:
+ res.append(transform(self.args))
+ return ''.join(res)
+
+
+class ASTGnuAttributeList(ASTAttribute):
+ def __init__(self, attrs: list[ASTGnuAttribute]) -> None:
+ self.attrs = attrs
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ res = ['__attribute__((']
+ first = True
+ for attr in self.attrs:
+ if not first:
+ res.append(', ')
+ first = False
+ res.append(transform(attr))
+ res.append('))')
+ return ''.join(res)
+
+ def describe_signature(self, signode: TextElement) -> None:
+ txt = str(self)
+ signode.append(nodes.Text(txt))
+
+
+class ASTIdAttribute(ASTAttribute):
+ """For simple attributes defined by the user."""
+
+ def __init__(self, id: str) -> None:
+ self.id = id
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ return self.id
+
+ def describe_signature(self, signode: TextElement) -> None:
+ signode.append(nodes.Text(self.id))
+
+
+class ASTParenAttribute(ASTAttribute):
+ """For paren attributes defined by the user."""
+
+ def __init__(self, id: str, arg: str) -> None:
+ self.id = id
+ self.arg = arg
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ return self.id + '(' + self.arg + ')'
+
+ def describe_signature(self, signode: TextElement) -> None:
+ txt = str(self)
+ signode.append(nodes.Text(txt))
+
+
+class ASTAttributeList(ASTBaseBase):
+ def __init__(self, attrs: list[ASTAttribute]) -> None:
+ self.attrs = attrs
+
+ def __len__(self) -> int:
+ return len(self.attrs)
+
+ def __add__(self, other: ASTAttributeList) -> ASTAttributeList:
+ return ASTAttributeList(self.attrs + other.attrs)
+
+ def _stringify(self, transform: StringifyTransform) -> str:
+ return ' '.join(transform(attr) for attr in self.attrs)
+
+ def describe_signature(self, signode: TextElement) -> None:
+ if len(self.attrs) == 0:
+ return
+ self.attrs[0].describe_signature(signode)
+ if len(self.attrs) == 1:
+ return
+ for attr in self.attrs[1:]:
+ signode.append(addnodes.desc_sig_space())
+ attr.describe_signature(signode)
+
+
+################################################################################
+
+class ASTBaseParenExprList(ASTBaseBase):
+ pass
+
+
+################################################################################
+
+class UnsupportedMultiCharacterCharLiteral(Exception):
+ pass
+
+
+class DefinitionError(Exception):
+ pass
+
+
+class BaseParser:
+ def __init__(self, definition: str, *,
+ location: nodes.Node | tuple[str, int] | str,
+ config: Config) -> None:
+ self.definition = definition.strip()
+ self.location = location # for warnings
+ self.config = config
+
+ self.pos = 0
+ self.end = len(self.definition)
+ self.last_match: re.Match[str] | None = None
+ self._previous_state: tuple[int, re.Match[str] | None] = (0, None)
+ self.otherErrors: list[DefinitionError] = []
+
+ # in our tests the following is set to False to capture bad parsing
+ self.allowFallbackExpressionParsing = True
+
+ def _make_multi_error(self, errors: list[Any], header: str) -> DefinitionError:
+ if len(errors) == 1:
+ if len(header) > 0:
+ return DefinitionError(header + '\n' + str(errors[0][0]))
+ else:
+ return DefinitionError(str(errors[0][0]))
+ result = [header, '\n']
+ for e in errors:
+ if len(e[1]) > 0:
+ indent = ' '
+ result.append(e[1])
+ result.append(':\n')
+ for line in str(e[0]).split('\n'):
+ if len(line) == 0:
+ continue
+ result.append(indent)
+ result.append(line)
+ result.append('\n')
+ else:
+ result.append(str(e[0]))
+ return DefinitionError(''.join(result))
+
+ @property
+ def language(self) -> str:
+ raise NotImplementedError
+
+ def status(self, msg: str) -> None:
+ # for debugging
+ indicator = '-' * self.pos + '^'
+ logger.debug(f"{msg}\n{self.definition}\n{indicator}") # NoQA: G004
+
+ def fail(self, msg: str) -> None:
+ errors = []
+ indicator = '-' * self.pos + '^'
+ exMain = DefinitionError(
+ 'Invalid %s declaration: %s [error at %d]\n %s\n %s' %
+ (self.language, msg, self.pos, self.definition, indicator))
+ errors.append((exMain, "Main error"))
+ for err in self.otherErrors:
+ errors.append((err, "Potential other error"))
+ self.otherErrors = []
+ raise self._make_multi_error(errors, '')
+
+ def warn(self, msg: str) -> None:
+ logger.warning(msg, location=self.location)
+
+ def match(self, regex: re.Pattern[str]) -> bool:
+ match = regex.match(self.definition, self.pos)
+ if match is not None:
+ self._previous_state = (self.pos, self.last_match)
+ self.pos = match.end()
+ self.last_match = match
+ return True
+ return False
+
+ def skip_string(self, string: str) -> bool:
+ strlen = len(string)
+ if self.definition[self.pos:self.pos + strlen] == string:
+ self.pos += strlen
+ return True
+ return False
+
+ def skip_word(self, word: str) -> bool:
+ return self.match(re.compile(r'\b%s\b' % re.escape(word)))
+
+ def skip_ws(self) -> bool:
+ return self.match(_whitespace_re)
+
+ def skip_word_and_ws(self, word: str) -> bool:
+ if self.skip_word(word):
+ self.skip_ws()
+ return True
+ return False
+
+ def skip_string_and_ws(self, string: str) -> bool:
+ if self.skip_string(string):
+ self.skip_ws()
+ return True
+ return False
+
+ @property
+ def eof(self) -> bool:
+ return self.pos >= self.end
+
+ @property
+ def current_char(self) -> str:
+ try:
+ return self.definition[self.pos]
+ except IndexError:
+ return 'EOF'
+
+ @property
+ def matched_text(self) -> str:
+ if self.last_match is not None:
+ return self.last_match.group()
+ return ''
+
+ def read_rest(self) -> str:
+ rv = self.definition[self.pos:]
+ self.pos = self.end
+ return rv
+
+ def assert_end(self, *, allowSemicolon: bool = False) -> None:
+ self.skip_ws()
+ if allowSemicolon:
+ if not self.eof and self.definition[self.pos:] != ';':
+ self.fail('Expected end of definition or ;.')
+ else:
+ if not self.eof:
+ self.fail('Expected end of definition.')
+
+ ################################################################################
+
+ @property
+ def id_attributes(self):
+ raise NotImplementedError
+
+ @property
+ def paren_attributes(self):
+ raise NotImplementedError
+
+ def _parse_balanced_token_seq(self, end: list[str]) -> str:
+ # TODO: add handling of string literals and similar
+ brackets = {'(': ')', '[': ']', '{': '}'}
+ startPos = self.pos
+ symbols: list[str] = []
+ while not self.eof:
+ if len(symbols) == 0 and self.current_char in end:
+ break
+ if self.current_char in brackets:
+ symbols.append(brackets[self.current_char])
+ elif len(symbols) > 0 and self.current_char == symbols[-1]:
+ symbols.pop()
+ elif self.current_char in ")]}":
+ self.fail("Unexpected '%s' in balanced-token-seq." % self.current_char)
+ self.pos += 1
+ if self.eof:
+ self.fail("Could not find end of balanced-token-seq starting at %d."
+ % startPos)
+ return self.definition[startPos:self.pos]
+
+ def _parse_attribute(self) -> ASTAttribute | None:
+ self.skip_ws()
+ # try C++11 style
+ startPos = self.pos
+ if self.skip_string_and_ws('['):
+ if not self.skip_string('['):
+ self.pos = startPos
+ else:
+ # TODO: actually implement the correct grammar
+ arg = self._parse_balanced_token_seq(end=[']'])
+ if not self.skip_string_and_ws(']'):
+ self.fail("Expected ']' in end of attribute.")
+ if not self.skip_string_and_ws(']'):
+ self.fail("Expected ']' in end of attribute after [[...]")
+ return ASTCPPAttribute(arg)
+
+ # try GNU style
+ if self.skip_word_and_ws('__attribute__'):
+ if not self.skip_string_and_ws('('):
+ self.fail("Expected '(' after '__attribute__'.")
+ if not self.skip_string_and_ws('('):
+ self.fail("Expected '(' after '__attribute__('.")
+ attrs = []
+ while 1:
+ if self.match(identifier_re):
+ name = self.matched_text
+ exprs = self._parse_paren_expression_list()
+ attrs.append(ASTGnuAttribute(name, exprs))
+ if self.skip_string_and_ws(','):
+ continue
+ if self.skip_string_and_ws(')'):
+ break
+ self.fail("Expected identifier, ')', or ',' in __attribute__.")
+ if not self.skip_string_and_ws(')'):
+ self.fail("Expected ')' after '__attribute__((...)'")
+ return ASTGnuAttributeList(attrs)
+
+ # try the simple id attributes defined by the user
+ for id in self.id_attributes:
+ if self.skip_word_and_ws(id):
+ return ASTIdAttribute(id)
+
+ # try the paren attributes defined by the user
+ for id in self.paren_attributes:
+ if not self.skip_string_and_ws(id):
+ continue
+ if not self.skip_string('('):
+ self.fail("Expected '(' after user-defined paren-attribute.")
+ arg = self._parse_balanced_token_seq(end=[')'])
+ if not self.skip_string(')'):
+ self.fail("Expected ')' to end user-defined paren-attribute.")
+ return ASTParenAttribute(id, arg)
+
+ return None
+
+ def _parse_attribute_list(self) -> ASTAttributeList:
+ res = []
+ while True:
+ attr = self._parse_attribute()
+ if attr is None:
+ break
+ res.append(attr)
+ return ASTAttributeList(res)
+
+ def _parse_paren_expression_list(self) -> ASTBaseParenExprList | None:
+ raise NotImplementedError
diff --git a/sphinx/util/console.py b/sphinx/util/console.py
new file mode 100644
index 0000000..0fc9450
--- /dev/null
+++ b/sphinx/util/console.py
@@ -0,0 +1,129 @@
+"""Format colored console output."""
+
+from __future__ import annotations
+
+import os
+import re
+import shutil
+import sys
+
+try:
+ # check if colorama is installed to support color on Windows
+ import colorama
+except ImportError:
+ colorama = None
+
+
+_ansi_re: re.Pattern[str] = re.compile('\x1b\\[(\\d\\d;){0,2}\\d\\dm')
+codes: dict[str, str] = {}
+
+
+def terminal_safe(s: str) -> str:
+ """Safely encode a string for printing to the terminal."""
+ return s.encode('ascii', 'backslashreplace').decode('ascii')
+
+
+def get_terminal_width() -> int:
+ """Return the width of the terminal in columns."""
+ return shutil.get_terminal_size().columns - 1
+
+
+_tw: int = get_terminal_width()
+
+
+def term_width_line(text: str) -> str:
+ if not codes:
+ # if no coloring, don't output fancy backspaces
+ return text + '\n'
+ else:
+ # codes are not displayed, this must be taken into account
+ return text.ljust(_tw + len(text) - len(_ansi_re.sub('', text))) + '\r'
+
+
+def color_terminal() -> bool:
+ if 'NO_COLOR' in os.environ:
+ return False
+ if sys.platform == 'win32' and colorama is not None:
+ colorama.init()
+ return True
+ if 'FORCE_COLOR' in os.environ:
+ return True
+ if not hasattr(sys.stdout, 'isatty'):
+ return False
+ if not sys.stdout.isatty():
+ return False
+ if 'COLORTERM' in os.environ:
+ return True
+ term = os.environ.get('TERM', 'dumb').lower()
+ if term in ('xterm', 'linux') or 'color' in term:
+ return True
+ return False
+
+
+def nocolor() -> None:
+ if sys.platform == 'win32' and colorama is not None:
+ colorama.deinit()
+ codes.clear()
+
+
+def coloron() -> None:
+ codes.update(_orig_codes)
+
+
+def colorize(name: str, text: str, input_mode: bool = False) -> str:
+ def escseq(name: str) -> str:
+ # Wrap escape sequence with ``\1`` and ``\2`` to let readline know
+ # it is non-printable characters
+ # ref: https://tiswww.case.edu/php/chet/readline/readline.html
+ #
+ # Note: This hack does not work well in Windows (see #5059)
+ escape = codes.get(name, '')
+ if input_mode and escape and sys.platform != 'win32':
+ return '\1' + escape + '\2'
+ else:
+ return escape
+
+ return escseq(name) + text + escseq('reset')
+
+
+def strip_colors(s: str) -> str:
+ return re.compile('\x1b.*?m').sub('', s)
+
+
+def create_color_func(name: str) -> None:
+ def inner(text: str) -> str:
+ return colorize(name, text)
+ globals()[name] = inner
+
+
+_attrs = {
+ 'reset': '39;49;00m',
+ 'bold': '01m',
+ 'faint': '02m',
+ 'standout': '03m',
+ 'underline': '04m',
+ 'blink': '05m',
+}
+
+for _name, _value in _attrs.items():
+ codes[_name] = '\x1b[' + _value
+
+_colors = [
+ ('black', 'darkgray'),
+ ('darkred', 'red'),
+ ('darkgreen', 'green'),
+ ('brown', 'yellow'),
+ ('darkblue', 'blue'),
+ ('purple', 'fuchsia'),
+ ('turquoise', 'teal'),
+ ('lightgray', 'white'),
+]
+
+for i, (dark, light) in enumerate(_colors, 30):
+ codes[dark] = '\x1b[%im' % i
+ codes[light] = '\x1b[%im' % (i + 60)
+
+_orig_codes = codes.copy()
+
+for _name in codes:
+ create_color_func(_name)
diff --git a/sphinx/util/display.py b/sphinx/util/display.py
new file mode 100644
index 0000000..199119c
--- /dev/null
+++ b/sphinx/util/display.py
@@ -0,0 +1,94 @@
+from __future__ import annotations
+
+import functools
+from typing import Any, Callable, TypeVar
+
+from sphinx.locale import __
+from sphinx.util import logging
+from sphinx.util.console import bold # type: ignore[attr-defined]
+
+if False:
+ from collections.abc import Iterable, Iterator
+ from types import TracebackType
+
+logger = logging.getLogger(__name__)
+
+
+def display_chunk(chunk: Any) -> str:
+ if isinstance(chunk, (list, tuple)):
+ if len(chunk) == 1:
+ return str(chunk[0])
+ return f'{chunk[0]} .. {chunk[-1]}'
+ return str(chunk)
+
+
+T = TypeVar('T')
+
+
+def status_iterator(
+ iterable: Iterable[T],
+ summary: str,
+ color: str = 'darkgreen',
+ length: int = 0,
+ verbosity: int = 0,
+ stringify_func: Callable[[Any], str] = display_chunk,
+) -> Iterator[T]:
+ single_line = verbosity < 1
+ bold_summary = bold(summary)
+ if length == 0:
+ logger.info(bold_summary, nonl=True)
+ for item in iterable:
+ logger.info(stringify_func(item) + ' ', nonl=True, color=color)
+ yield item
+ else:
+ for i, item in enumerate(iterable, start=1):
+ if single_line:
+ # clear the entire line ('Erase in Line')
+ logger.info('\x1b[2K', nonl=True)
+ logger.info(f'{bold_summary}[{i / length: >4.0%}] ', nonl=True) # NoQA: G004
+ # Emit the string representation of ``item``
+ logger.info(stringify_func(item), nonl=True, color=color)
+ # If in single-line mode, emit a carriage return to move the cursor
+ # to the start of the line.
+ # If not, emit a newline to move the cursor to the next line.
+ logger.info('\r' * single_line, nonl=single_line)
+ yield item
+ logger.info('')
+
+
+class SkipProgressMessage(Exception):
+ pass
+
+
+class progress_message:
+ def __init__(self, message: str) -> None:
+ self.message = message
+
+ def __enter__(self) -> None:
+ logger.info(bold(self.message + '... '), nonl=True)
+
+ def __exit__(
+ self,
+ typ: type[BaseException] | None,
+ val: BaseException | None,
+ tb: TracebackType | None,
+ ) -> bool:
+ if isinstance(val, SkipProgressMessage):
+ logger.info(__('skipped'))
+ if val.args:
+ logger.info(*val.args)
+ return True
+ elif val:
+ logger.info(__('failed'))
+ else:
+ logger.info(__('done'))
+
+ return False
+
+ def __call__(self, f: Callable) -> Callable:
+ @functools.wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ with self:
+ return f(*args, **kwargs)
+
+ return wrapper
diff --git a/sphinx/util/docfields.py b/sphinx/util/docfields.py
new file mode 100644
index 0000000..c48c3be
--- /dev/null
+++ b/sphinx/util/docfields.py
@@ -0,0 +1,408 @@
+"""Utility code for "Doc fields".
+
+"Doc fields" are reST field lists in object descriptions that will
+be domain-specifically transformed to a more appealing presentation.
+"""
+from __future__ import annotations
+
+import contextlib
+from typing import TYPE_CHECKING, Any, cast
+
+from docutils import nodes
+from docutils.nodes import Element, Node
+
+from sphinx import addnodes
+from sphinx.locale import __
+from sphinx.util import logging
+from sphinx.util.nodes import get_node_line
+
+if TYPE_CHECKING:
+ from docutils.parsers.rst.states import Inliner
+
+ from sphinx.directives import ObjectDescription
+ from sphinx.environment import BuildEnvironment
+ from sphinx.util.typing import TextlikeNode
+
+logger = logging.getLogger(__name__)
+
+
+def _is_single_paragraph(node: nodes.field_body) -> bool:
+ """True if the node only contains one paragraph (and system messages)."""
+ if len(node) == 0:
+ return False
+ elif len(node) > 1:
+ for subnode in node[1:]: # type: Node
+ if not isinstance(subnode, nodes.system_message):
+ return False
+ if isinstance(node[0], nodes.paragraph):
+ return True
+ return False
+
+
+class Field:
+ """A doc field that is never grouped. It can have an argument or not, the
+ argument can be linked using a specified *rolename*. Field should be used
+ for doc fields that usually don't occur more than once.
+
+ The body can be linked using a specified *bodyrolename* if the content is
+ just a single inline or text node.
+
+ Example::
+
+ :returns: description of the return value
+ :rtype: description of the return type
+ """
+ is_grouped = False
+ is_typed = False
+
+ def __init__(
+ self,
+ name: str,
+ names: tuple[str, ...] = (),
+ label: str = '',
+ has_arg: bool = True,
+ rolename: str = '',
+ bodyrolename: str = '',
+ ) -> None:
+ self.name = name
+ self.names = names
+ self.label = label
+ self.has_arg = has_arg
+ self.rolename = rolename
+ self.bodyrolename = bodyrolename
+
+ def make_xref(self, rolename: str, domain: str, target: str,
+ innernode: type[TextlikeNode] = addnodes.literal_emphasis,
+ contnode: Node | None = None, env: BuildEnvironment | None = None,
+ inliner: Inliner | None = None, location: Element | None = None) -> Node:
+ # note: for backwards compatibility env is last, but not optional
+ assert env is not None
+ assert (inliner is None) == (location is None), (inliner, location)
+ if not rolename:
+ return contnode or innernode(target, target)
+ # The domain is passed from DocFieldTransformer. So it surely exists.
+ # So we don't need to take care the env.get_domain() raises an exception.
+ role = env.get_domain(domain).role(rolename)
+ if role is None or inliner is None:
+ if role is None and inliner is not None:
+ msg = __("Problem in %s domain: field is supposed "
+ "to use role '%s', but that role is not in the domain.")
+ logger.warning(__(msg), domain, rolename, location=location)
+ refnode = addnodes.pending_xref('', refdomain=domain, refexplicit=False,
+ reftype=rolename, reftarget=target)
+ refnode += contnode or innernode(target, target)
+ env.get_domain(domain).process_field_xref(refnode)
+ return refnode
+ lineno = -1
+ if location is not None:
+ with contextlib.suppress(ValueError):
+ lineno = get_node_line(location)
+ ns, messages = role(rolename, target, target, lineno, inliner, {}, [])
+ return nodes.inline(target, '', *ns)
+
+ def make_xrefs(self, rolename: str, domain: str, target: str,
+ innernode: type[TextlikeNode] = addnodes.literal_emphasis,
+ contnode: Node | None = None, env: BuildEnvironment | None = None,
+ inliner: Inliner | None = None, location: Element | None = None,
+ ) -> list[Node]:
+ return [self.make_xref(rolename, domain, target, innernode, contnode,
+ env, inliner, location)]
+
+ def make_entry(self, fieldarg: str, content: list[Node]) -> tuple[str, list[Node]]:
+ return (fieldarg, content)
+
+ def make_field(
+ self,
+ types: dict[str, list[Node]],
+ domain: str,
+ item: tuple,
+ env: BuildEnvironment | None = None,
+ inliner: Inliner | None = None,
+ location: Element | None = None,
+ ) -> nodes.field:
+ fieldarg, content = item
+ fieldname = nodes.field_name('', self.label)
+ if fieldarg:
+ fieldname += nodes.Text(' ')
+ fieldname.extend(self.make_xrefs(self.rolename, domain,
+ fieldarg, nodes.Text,
+ env=env, inliner=inliner, location=location))
+
+ if len(content) == 1 and (
+ isinstance(content[0], nodes.Text) or
+ (isinstance(content[0], nodes.inline) and len(content[0]) == 1 and
+ isinstance(content[0][0], nodes.Text))):
+ content = self.make_xrefs(self.bodyrolename, domain,
+ content[0].astext(), contnode=content[0],
+ env=env, inliner=inliner, location=location)
+ fieldbody = nodes.field_body('', nodes.paragraph('', '', *content))
+ return nodes.field('', fieldname, fieldbody)
+
+
+class GroupedField(Field):
+ """
+ A doc field that is grouped; i.e., all fields of that type will be
+ transformed into one field with its body being a bulleted list. It always
+ has an argument. The argument can be linked using the given *rolename*.
+ GroupedField should be used for doc fields that can occur more than once.
+ If *can_collapse* is true, this field will revert to a Field if only used
+ once.
+
+ Example::
+
+ :raises ErrorClass: description when it is raised
+ """
+ is_grouped = True
+ list_type = nodes.bullet_list
+
+ def __init__(self, name: str, names: tuple[str, ...] = (), label: str = '',
+ rolename: str = '', can_collapse: bool = False) -> None:
+ super().__init__(name, names, label, True, rolename)
+ self.can_collapse = can_collapse
+
+ def make_field(
+ self,
+ types: dict[str, list[Node]],
+ domain: str,
+ items: tuple,
+ env: BuildEnvironment | None = None,
+ inliner: Inliner | None = None,
+ location: Element | None = None,
+ ) -> nodes.field:
+ fieldname = nodes.field_name('', self.label)
+ listnode = self.list_type()
+ for fieldarg, content in items:
+ par = nodes.paragraph()
+ par.extend(self.make_xrefs(self.rolename, domain, fieldarg,
+ addnodes.literal_strong,
+ env=env, inliner=inliner, location=location))
+ par += nodes.Text(' -- ')
+ par += content
+ listnode += nodes.list_item('', par)
+
+ if len(items) == 1 and self.can_collapse:
+ list_item = cast(nodes.list_item, listnode[0])
+ fieldbody = nodes.field_body('', list_item[0])
+ return nodes.field('', fieldname, fieldbody)
+
+ fieldbody = nodes.field_body('', listnode)
+ return nodes.field('', fieldname, fieldbody)
+
+
+class TypedField(GroupedField):
+ """
+ A doc field that is grouped and has type information for the arguments. It
+ always has an argument. The argument can be linked using the given
+ *rolename*, the type using the given *typerolename*.
+
+ Two uses are possible: either parameter and type description are given
+ separately, using a field from *names* and one from *typenames*,
+ respectively, or both are given using a field from *names*, see the example.
+
+ Example::
+
+ :param foo: description of parameter foo
+ :type foo: SomeClass
+
+ -- or --
+
+ :param SomeClass foo: description of parameter foo
+ """
+ is_typed = True
+
+ def __init__(
+ self,
+ name: str,
+ names: tuple[str, ...] = (),
+ typenames: tuple[str, ...] = (),
+ label: str = '',
+ rolename: str = '',
+ typerolename: str = '',
+ can_collapse: bool = False,
+ ) -> None:
+ super().__init__(name, names, label, rolename, can_collapse)
+ self.typenames = typenames
+ self.typerolename = typerolename
+
+ def make_field(
+ self,
+ types: dict[str, list[Node]],
+ domain: str,
+ items: tuple,
+ env: BuildEnvironment | None = None,
+ inliner: Inliner | None = None,
+ location: Element | None = None,
+ ) -> nodes.field:
+ def handle_item(fieldarg: str, content: str) -> nodes.paragraph:
+ par = nodes.paragraph()
+ par.extend(self.make_xrefs(self.rolename, domain, fieldarg,
+ addnodes.literal_strong, env=env))
+ if fieldarg in types:
+ par += nodes.Text(' (')
+ # NOTE: using .pop() here to prevent a single type node to be
+ # inserted twice into the doctree, which leads to
+ # inconsistencies later when references are resolved
+ fieldtype = types.pop(fieldarg)
+ if len(fieldtype) == 1 and isinstance(fieldtype[0], nodes.Text):
+ typename = fieldtype[0].astext()
+ par.extend(self.make_xrefs(self.typerolename, domain, typename,
+ addnodes.literal_emphasis, env=env,
+ inliner=inliner, location=location))
+ else:
+ par += fieldtype
+ par += nodes.Text(')')
+ par += nodes.Text(' -- ')
+ par += content
+ return par
+
+ fieldname = nodes.field_name('', self.label)
+ if len(items) == 1 and self.can_collapse:
+ fieldarg, content = items[0]
+ bodynode: Node = handle_item(fieldarg, content)
+ else:
+ bodynode = self.list_type()
+ for fieldarg, content in items:
+ bodynode += nodes.list_item('', handle_item(fieldarg, content))
+ fieldbody = nodes.field_body('', bodynode)
+ return nodes.field('', fieldname, fieldbody)
+
+
+class DocFieldTransformer:
+ """
+ Transforms field lists in "doc field" syntax into better-looking
+ equivalents, using the field type definitions given on a domain.
+ """
+ typemap: dict[str, tuple[Field, bool]]
+
+ def __init__(self, directive: ObjectDescription) -> None:
+ self.directive = directive
+
+ self.typemap = directive.get_field_type_map()
+
+ def transform_all(self, node: addnodes.desc_content) -> None:
+ """Transform all field list children of a node."""
+ # don't traverse, only handle field lists that are immediate children
+ for child in node:
+ if isinstance(child, nodes.field_list):
+ self.transform(child)
+
+ def transform(self, node: nodes.field_list) -> None:
+ """Transform a single field list *node*."""
+ typemap = self.typemap
+
+ entries: list[nodes.field | tuple[Field, Any, Element]] = []
+ groupindices: dict[str, int] = {}
+ types: dict[str, dict] = {}
+
+ # step 1: traverse all fields and collect field types and content
+ for field in cast(list[nodes.field], node):
+ assert len(field) == 2
+ field_name = cast(nodes.field_name, field[0])
+ field_body = cast(nodes.field_body, field[1])
+ try:
+ # split into field type and argument
+ fieldtype_name, fieldarg = field_name.astext().split(None, 1)
+ except ValueError:
+ # maybe an argument-less field type?
+ fieldtype_name, fieldarg = field_name.astext(), ''
+ typedesc, is_typefield = typemap.get(fieldtype_name, (None, None))
+
+ # collect the content, trying not to keep unnecessary paragraphs
+ if _is_single_paragraph(field_body):
+ paragraph = cast(nodes.paragraph, field_body[0])
+ content = paragraph.children
+ else:
+ content = field_body.children
+
+ # sort out unknown fields
+ if typedesc is None or typedesc.has_arg != bool(fieldarg):
+ # either the field name is unknown, or the argument doesn't
+ # match the spec; capitalize field name and be done with it
+ new_fieldname = fieldtype_name[0:1].upper() + fieldtype_name[1:]
+ if fieldarg:
+ new_fieldname += ' ' + fieldarg
+ field_name[0] = nodes.Text(new_fieldname)
+ entries.append(field)
+
+ # but if this has a type then we can at least link it
+ if (typedesc and is_typefield and content and
+ len(content) == 1 and isinstance(content[0], nodes.Text)):
+ typed_field = cast(TypedField, typedesc)
+ target = content[0].astext()
+ xrefs = typed_field.make_xrefs(
+ typed_field.typerolename,
+ self.directive.domain or '',
+ target,
+ contnode=content[0],
+ env=self.directive.state.document.settings.env,
+ )
+ if _is_single_paragraph(field_body):
+ paragraph = cast(nodes.paragraph, field_body[0])
+ paragraph.clear()
+ paragraph.extend(xrefs)
+ else:
+ field_body.clear()
+ field_body += nodes.paragraph('', '', *xrefs)
+
+ continue
+
+ typename = typedesc.name
+
+ # if the field specifies a type, put it in the types collection
+ if is_typefield:
+ # filter out only inline nodes; others will result in invalid
+ # markup being written out
+ content = [n for n in content if isinstance(n, (nodes.Inline, nodes.Text))]
+ if content:
+ types.setdefault(typename, {})[fieldarg] = content
+ continue
+
+ # also support syntax like ``:param type name:``
+ if typedesc.is_typed:
+ try:
+ argtype, argname = fieldarg.rsplit(None, 1)
+ except ValueError:
+ pass
+ else:
+ types.setdefault(typename, {})[argname] = \
+ [nodes.Text(argtype)]
+ fieldarg = argname
+
+ translatable_content = nodes.inline(field_body.rawsource,
+ translatable=True)
+ translatable_content.document = field_body.parent.document
+ translatable_content.source = field_body.parent.source
+ translatable_content.line = field_body.parent.line
+ translatable_content += content
+
+ # grouped entries need to be collected in one entry, while others
+ # get one entry per field
+ if typedesc.is_grouped:
+ if typename in groupindices:
+ group = cast(tuple[Field, list, Node], entries[groupindices[typename]])
+ else:
+ groupindices[typename] = len(entries)
+ group = (typedesc, [], field)
+ entries.append(group)
+ new_entry = typedesc.make_entry(fieldarg, [translatable_content])
+ group[1].append(new_entry)
+ else:
+ new_entry = typedesc.make_entry(fieldarg, [translatable_content])
+ entries.append((typedesc, new_entry, field))
+
+ # step 2: all entries are collected, construct the new field list
+ new_list = nodes.field_list()
+ for entry in entries:
+ if isinstance(entry, nodes.field):
+ # pass-through old field
+ new_list += entry
+ else:
+ fieldtype, items, location = entry
+ fieldtypes = types.get(fieldtype.name, {})
+ env = self.directive.state.document.settings.env
+ inliner = self.directive.state.inliner
+ domain = self.directive.domain or ''
+ new_list += fieldtype.make_field(fieldtypes, domain, items,
+ env=env, inliner=inliner, location=location)
+
+ node.replace_self(new_list)
diff --git a/sphinx/util/docstrings.py b/sphinx/util/docstrings.py
new file mode 100644
index 0000000..6ccc538
--- /dev/null
+++ b/sphinx/util/docstrings.py
@@ -0,0 +1,88 @@
+"""Utilities for docstring processing."""
+
+from __future__ import annotations
+
+import re
+import sys
+
+from docutils.parsers.rst.states import Body
+
+field_list_item_re = re.compile(Body.patterns['field_marker'])
+
+
+def separate_metadata(s: str | None) -> tuple[str | None, dict[str, str]]:
+ """Separate docstring into metadata and others."""
+ in_other_element = False
+ metadata: dict[str, str] = {}
+ lines = []
+
+ if not s:
+ return s, metadata
+
+ for line in prepare_docstring(s):
+ if line.strip() == '':
+ in_other_element = False
+ lines.append(line)
+ else:
+ matched = field_list_item_re.match(line)
+ if matched and not in_other_element:
+ field_name = matched.group()[1:].split(':', 1)[0]
+ if field_name.startswith('meta '):
+ name = field_name[5:].strip()
+ metadata[name] = line[matched.end():].strip()
+ else:
+ lines.append(line)
+ else:
+ in_other_element = True
+ lines.append(line)
+
+ return '\n'.join(lines), metadata
+
+
+def prepare_docstring(s: str, tabsize: int = 8) -> list[str]:
+ """Convert a docstring into lines of parseable reST. Remove common leading
+ indentation, where the indentation of the first line is ignored.
+
+ Return the docstring as a list of lines usable for inserting into a docutils
+ ViewList (used as argument of nested_parse().) An empty line is added to
+ act as a separator between this docstring and following content.
+ """
+ lines = s.expandtabs(tabsize).splitlines()
+ # Find minimum indentation of any non-blank lines after ignored lines.
+ margin = sys.maxsize
+ for line in lines[1:]:
+ content = len(line.lstrip())
+ if content:
+ indent = len(line) - content
+ margin = min(margin, indent)
+ # Remove indentation from the first line.
+ if len(lines):
+ lines[0] = lines[0].lstrip()
+ if margin < sys.maxsize:
+ for i in range(1, len(lines)):
+ lines[i] = lines[i][margin:]
+ # Remove any leading blank lines.
+ while lines and not lines[0]:
+ lines.pop(0)
+ # make sure there is an empty line at the end
+ if lines and lines[-1]:
+ lines.append('')
+ return lines
+
+
+def prepare_commentdoc(s: str) -> list[str]:
+ """Extract documentation comment lines (starting with #:) and return them
+ as a list of lines. Returns an empty list if there is no documentation.
+ """
+ result = []
+ lines = [line.strip() for line in s.expandtabs().splitlines()]
+ for line in lines:
+ if line.startswith('#:'):
+ line = line[2:]
+ # the first space after the comment is ignored
+ if line and line[0] == ' ':
+ line = line[1:]
+ result.append(line)
+ if result and result[-1]:
+ result.append('')
+ return result
diff --git a/sphinx/util/docutils.py b/sphinx/util/docutils.py
new file mode 100644
index 0000000..a862417
--- /dev/null
+++ b/sphinx/util/docutils.py
@@ -0,0 +1,635 @@
+"""Utility functions for docutils."""
+
+from __future__ import annotations
+
+import os
+import re
+from collections.abc import Sequence # NoQA: TCH003
+from contextlib import contextmanager
+from copy import copy
+from os import path
+from typing import IO, TYPE_CHECKING, Any, Callable, cast
+
+import docutils
+from docutils import nodes
+from docutils.io import FileOutput
+from docutils.parsers.rst import Directive, directives, roles
+from docutils.parsers.rst.states import Inliner # NoQA: TCH002
+from docutils.statemachine import State, StateMachine, StringList
+from docutils.utils import Reporter, unescape
+from docutils.writers._html_base import HTMLTranslator
+
+from sphinx.errors import SphinxError
+from sphinx.locale import _, __
+from sphinx.util import logging
+
+logger = logging.getLogger(__name__)
+report_re = re.compile('^(.+?:(?:\\d+)?): \\((DEBUG|INFO|WARNING|ERROR|SEVERE)/(\\d+)?\\) ')
+
+if TYPE_CHECKING:
+ from collections.abc import Generator
+ from types import ModuleType
+
+ from docutils.frontend import Values
+ from docutils.nodes import Element, Node, system_message
+
+ from sphinx.builders import Builder
+ from sphinx.config import Config
+ from sphinx.environment import BuildEnvironment
+ from sphinx.util.typing import RoleFunction
+
+# deprecated name -> (object to return, canonical path or empty string)
+_DEPRECATED_OBJECTS = {
+ '__version_info__': (docutils.__version_info__, 'docutils.__version_info__'),
+}
+
+
+def __getattr__(name):
+ if name not in _DEPRECATED_OBJECTS:
+ msg = f'module {__name__!r} has no attribute {name!r}'
+ raise AttributeError(msg)
+
+ from sphinx.deprecation import _deprecation_warning
+
+ deprecated_object, canonical_name = _DEPRECATED_OBJECTS[name]
+ _deprecation_warning(__name__, name, canonical_name, remove=(7, 0))
+ return deprecated_object
+
+
+additional_nodes: set[type[Element]] = set()
+
+
+@contextmanager
+def docutils_namespace() -> Generator[None, None, None]:
+ """Create namespace for reST parsers."""
+ try:
+ _directives = copy(directives._directives) # type: ignore[attr-defined]
+ _roles = copy(roles._roles) # type: ignore[attr-defined]
+
+ yield
+ finally:
+ directives._directives = _directives # type: ignore[attr-defined]
+ roles._roles = _roles # type: ignore[attr-defined]
+
+ for node in list(additional_nodes):
+ unregister_node(node)
+ additional_nodes.discard(node)
+
+
+def is_directive_registered(name: str) -> bool:
+ """Check the *name* directive is already registered."""
+ return name in directives._directives # type: ignore[attr-defined]
+
+
+def register_directive(name: str, directive: type[Directive]) -> None:
+ """Register a directive to docutils.
+
+ This modifies global state of docutils. So it is better to use this
+ inside ``docutils_namespace()`` to prevent side-effects.
+ """
+ directives.register_directive(name, directive)
+
+
+def is_role_registered(name: str) -> bool:
+ """Check the *name* role is already registered."""
+ return name in roles._roles # type: ignore[attr-defined]
+
+
+def register_role(name: str, role: RoleFunction) -> None:
+ """Register a role to docutils.
+
+ This modifies global state of docutils. So it is better to use this
+ inside ``docutils_namespace()`` to prevent side-effects.
+ """
+ roles.register_local_role(name, role)
+
+
+def unregister_role(name: str) -> None:
+ """Unregister a role from docutils."""
+ roles._roles.pop(name, None) # type: ignore[attr-defined]
+
+
+def is_node_registered(node: type[Element]) -> bool:
+ """Check the *node* is already registered."""
+ return hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__)
+
+
+def register_node(node: type[Element]) -> None:
+ """Register a node to docutils.
+
+ This modifies global state of some visitors. So it is better to use this
+ inside ``docutils_namespace()`` to prevent side-effects.
+ """
+ if not hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__):
+ nodes._add_node_class_names([node.__name__]) # type: ignore[attr-defined]
+ additional_nodes.add(node)
+
+
+def unregister_node(node: type[Element]) -> None:
+ """Unregister a node from docutils.
+
+ This is inverse of ``nodes._add_nodes_class_names()``.
+ """
+ if hasattr(nodes.GenericNodeVisitor, 'visit_' + node.__name__):
+ delattr(nodes.GenericNodeVisitor, "visit_" + node.__name__)
+ delattr(nodes.GenericNodeVisitor, "depart_" + node.__name__)
+ delattr(nodes.SparseNodeVisitor, 'visit_' + node.__name__)
+ delattr(nodes.SparseNodeVisitor, 'depart_' + node.__name__)
+
+
+@contextmanager
+def patched_get_language() -> Generator[None, None, None]:
+ """Patch docutils.languages.get_language() temporarily.
+
+ This ignores the second argument ``reporter`` to suppress warnings.
+ refs: https://github.com/sphinx-doc/sphinx/issues/3788
+ """
+ from docutils.languages import get_language
+
+ def patched_get_language(language_code: str, reporter: Reporter | None = None) -> Any:
+ return get_language(language_code)
+
+ try:
+ docutils.languages.get_language = patched_get_language
+ yield
+ finally:
+ # restore original implementations
+ docutils.languages.get_language = get_language
+
+
+@contextmanager
+def patched_rst_get_language() -> Generator[None, None, None]:
+ """Patch docutils.parsers.rst.languages.get_language().
+ Starting from docutils 0.17, get_language() in ``rst.languages``
+ also has a reporter, which needs to be disabled temporarily.
+
+ This should also work for old versions of docutils,
+ because reporter is none by default.
+
+ refs: https://github.com/sphinx-doc/sphinx/issues/10179
+ """
+ from docutils.parsers.rst.languages import get_language
+
+ def patched_get_language(language_code: str, reporter: Reporter | None = None) -> Any:
+ return get_language(language_code)
+
+ try:
+ docutils.parsers.rst.languages.get_language = patched_get_language
+ yield
+ finally:
+ # restore original implementations
+ docutils.parsers.rst.languages.get_language = get_language
+
+
+@contextmanager
+def using_user_docutils_conf(confdir: str | None) -> Generator[None, None, None]:
+ """Let docutils know the location of ``docutils.conf`` for Sphinx."""
+ try:
+ docutilsconfig = os.environ.get('DOCUTILSCONFIG', None)
+ if confdir:
+ os.environ['DOCUTILSCONFIG'] = path.join(path.abspath(confdir), 'docutils.conf')
+
+ yield
+ finally:
+ if docutilsconfig is None:
+ os.environ.pop('DOCUTILSCONFIG', None)
+ else:
+ os.environ['DOCUTILSCONFIG'] = docutilsconfig
+
+
+@contextmanager
+def du19_footnotes() -> Generator[None, None, None]:
+ def visit_footnote(self, node):
+ label_style = self.settings.footnote_references
+ if not isinstance(node.previous_sibling(), type(node)):
+ self.body.append(f'<aside class="footnote-list {label_style}">\n')
+ self.body.append(self.starttag(node, 'aside',
+ classes=[node.tagname, label_style],
+ role="note"))
+
+ def depart_footnote(self, node):
+ self.body.append('</aside>\n')
+ if not isinstance(node.next_node(descend=False, siblings=True),
+ type(node)):
+ self.body.append('</aside>\n')
+
+ old_visit_footnote = HTMLTranslator.visit_footnote
+ old_depart_footnote = HTMLTranslator.depart_footnote
+
+ # Only apply on Docutils 0.18 or 0.18.1, as 0.17 and earlier used a <dl> based
+ # approach, and 0.19 and later use the fixed approach by default.
+ if docutils.__version_info__[:2] == (0, 18):
+ HTMLTranslator.visit_footnote = visit_footnote # type: ignore[method-assign]
+ HTMLTranslator.depart_footnote = depart_footnote # type: ignore[method-assign]
+
+ try:
+ yield
+ finally:
+ if docutils.__version_info__[:2] == (0, 18):
+ HTMLTranslator.visit_footnote = old_visit_footnote # type: ignore[method-assign]
+ HTMLTranslator.depart_footnote = old_depart_footnote # type: ignore[method-assign]
+
+
+@contextmanager
+def patch_docutils(confdir: str | None = None) -> Generator[None, None, None]:
+ """Patch to docutils temporarily."""
+ with patched_get_language(), \
+ patched_rst_get_language(), \
+ using_user_docutils_conf(confdir), \
+ du19_footnotes():
+ yield
+
+
+class CustomReSTDispatcher:
+ """Custom reST's mark-up dispatcher.
+
+ This replaces docutils's directives and roles dispatch mechanism for reST parser
+ by original one temporarily.
+ """
+
+ def __init__(self) -> None:
+ self.directive_func: Callable = lambda *args: (None, [])
+ self.roles_func: Callable = lambda *args: (None, [])
+
+ def __enter__(self) -> None:
+ self.enable()
+
+ def __exit__(
+ self, exc_type: type[Exception], exc_value: Exception, traceback: Any,
+ ) -> None:
+ self.disable()
+
+ def enable(self) -> None:
+ self.directive_func = directives.directive
+ self.role_func = roles.role
+
+ directives.directive = self.directive
+ roles.role = self.role
+
+ def disable(self) -> None:
+ directives.directive = self.directive_func
+ roles.role = self.role_func
+
+ def directive(self,
+ directive_name: str, language_module: ModuleType, document: nodes.document,
+ ) -> tuple[type[Directive] | None, list[system_message]]:
+ return self.directive_func(directive_name, language_module, document)
+
+ def role(
+ self, role_name: str, language_module: ModuleType, lineno: int, reporter: Reporter,
+ ) -> tuple[RoleFunction, list[system_message]]:
+ return self.role_func(role_name, language_module, # type: ignore[return-value]
+ lineno, reporter)
+
+
+class ElementLookupError(Exception):
+ pass
+
+
+class sphinx_domains(CustomReSTDispatcher):
+ """Monkey-patch directive and role dispatch, so that domain-specific
+ markup takes precedence.
+ """
+ def __init__(self, env: BuildEnvironment) -> None:
+ self.env = env
+ super().__init__()
+
+ def lookup_domain_element(self, type: str, name: str) -> Any:
+ """Lookup a markup element (directive or role), given its name which can
+ be a full name (with domain).
+ """
+ name = name.lower()
+ # explicit domain given?
+ if ':' in name:
+ domain_name, name = name.split(':', 1)
+ if domain_name in self.env.domains:
+ domain = self.env.get_domain(domain_name)
+ element = getattr(domain, type)(name)
+ if element is not None:
+ return element, []
+ else:
+ logger.warning(_('unknown directive or role name: %s:%s'), domain_name, name)
+ # else look in the default domain
+ else:
+ def_domain = self.env.temp_data.get('default_domain')
+ if def_domain is not None:
+ element = getattr(def_domain, type)(name)
+ if element is not None:
+ return element, []
+
+ # always look in the std domain
+ element = getattr(self.env.get_domain('std'), type)(name)
+ if element is not None:
+ return element, []
+
+ raise ElementLookupError
+
+ def directive(self,
+ directive_name: str, language_module: ModuleType, document: nodes.document,
+ ) -> tuple[type[Directive] | None, list[system_message]]:
+ try:
+ return self.lookup_domain_element('directive', directive_name)
+ except ElementLookupError:
+ return super().directive(directive_name, language_module, document)
+
+ def role(
+ self, role_name: str, language_module: ModuleType, lineno: int, reporter: Reporter,
+ ) -> tuple[RoleFunction, list[system_message]]:
+ try:
+ return self.lookup_domain_element('role', role_name)
+ except ElementLookupError:
+ return super().role(role_name, language_module, lineno, reporter)
+
+
+class WarningStream:
+ def write(self, text: str) -> None:
+ matched = report_re.search(text)
+ if not matched:
+ logger.warning(text.rstrip("\r\n"))
+ else:
+ location, type, level = matched.groups()
+ message = report_re.sub('', text).rstrip()
+ logger.log(type, message, location=location)
+
+
+class LoggingReporter(Reporter):
+ @classmethod
+ def from_reporter(cls, reporter: Reporter) -> LoggingReporter:
+ """Create an instance of LoggingReporter from other reporter object."""
+ return cls(reporter.source, reporter.report_level, reporter.halt_level,
+ reporter.debug_flag, reporter.error_handler)
+
+ def __init__(self, source: str, report_level: int = Reporter.WARNING_LEVEL,
+ halt_level: int = Reporter.SEVERE_LEVEL, debug: bool = False,
+ error_handler: str = 'backslashreplace') -> None:
+ stream = cast(IO, WarningStream())
+ super().__init__(source, report_level, halt_level,
+ stream, debug, error_handler=error_handler)
+
+
+class NullReporter(Reporter):
+ """A dummy reporter; write nothing."""
+
+ def __init__(self) -> None:
+ super().__init__('', 999, 4)
+
+
+@contextmanager
+def switch_source_input(state: State, content: StringList) -> Generator[None, None, None]:
+ """Switch current source input of state temporarily."""
+ try:
+ # remember the original ``get_source_and_line()`` method
+ gsal = state.memo.reporter.get_source_and_line # type: ignore[attr-defined]
+
+ # replace it by new one
+ state_machine = StateMachine([], None) # type: ignore[arg-type]
+ state_machine.input_lines = content
+ state.memo.reporter.get_source_and_line = state_machine.get_source_and_line # type: ignore[attr-defined] # noqa: E501
+
+ yield
+ finally:
+ # restore the method
+ state.memo.reporter.get_source_and_line = gsal # type: ignore[attr-defined]
+
+
+class SphinxFileOutput(FileOutput):
+ """Better FileOutput class for Sphinx."""
+
+ def __init__(self, **kwargs: Any) -> None:
+ self.overwrite_if_changed = kwargs.pop('overwrite_if_changed', False)
+ kwargs.setdefault('encoding', 'utf-8')
+ super().__init__(**kwargs)
+
+ def write(self, data: str) -> str:
+ if (self.destination_path and self.autoclose and 'b' not in self.mode and
+ self.overwrite_if_changed and os.path.exists(self.destination_path)):
+ with open(self.destination_path, encoding=self.encoding) as f:
+ # skip writing: content not changed
+ if f.read() == data:
+ return data
+
+ return super().write(data)
+
+
+class SphinxDirective(Directive):
+ """A base class for Sphinx directives.
+
+ This class provides helper methods for Sphinx directives.
+
+ .. note:: The subclasses of this class might not work with docutils.
+ This class is strongly coupled with Sphinx.
+ """
+
+ @property
+ def env(self) -> BuildEnvironment:
+ """Reference to the :class:`.BuildEnvironment` object."""
+ return self.state.document.settings.env
+
+ @property
+ def config(self) -> Config:
+ """Reference to the :class:`.Config` object."""
+ return self.env.config
+
+ def get_source_info(self) -> tuple[str, int]:
+ """Get source and line number."""
+ return self.state_machine.get_source_and_line(self.lineno)
+
+ def set_source_info(self, node: Node) -> None:
+ """Set source and line number to the node."""
+ node.source, node.line = self.get_source_info()
+
+ def get_location(self) -> str:
+ """Get current location info for logging."""
+ return ':'.join(str(s) for s in self.get_source_info())
+
+
+class SphinxRole:
+ """A base class for Sphinx roles.
+
+ This class provides helper methods for Sphinx roles.
+
+ .. note:: The subclasses of this class might not work with docutils.
+ This class is strongly coupled with Sphinx.
+ """
+ name: str #: The role name actually used in the document.
+ rawtext: str #: A string containing the entire interpreted text input.
+ text: str #: The interpreted text content.
+ lineno: int #: The line number where the interpreted text begins.
+ inliner: Inliner #: The ``docutils.parsers.rst.states.Inliner`` object.
+ #: A dictionary of directive options for customisation
+ #: (from the "role" directive).
+ options: dict[str, Any]
+ #: A list of strings, the directive content for customisation
+ #: (from the "role" directive).
+ content: Sequence[str]
+
+ def __call__(self, name: str, rawtext: str, text: str, lineno: int,
+ inliner: Inliner, options: dict | None = None, content: Sequence[str] = (),
+ ) -> tuple[list[Node], list[system_message]]:
+ self.rawtext = rawtext
+ self.text = unescape(text)
+ self.lineno = lineno
+ self.inliner = inliner
+ self.options = options if options is not None else {}
+ self.content = content
+
+ # guess role type
+ if name:
+ self.name = name.lower()
+ else:
+ self.name = self.env.temp_data.get('default_role', '')
+ if not self.name:
+ self.name = self.env.config.default_role
+ if not self.name:
+ msg = 'cannot determine default role!'
+ raise SphinxError(msg)
+
+ return self.run()
+
+ def run(self) -> tuple[list[Node], list[system_message]]:
+ raise NotImplementedError
+
+ @property
+ def env(self) -> BuildEnvironment:
+ """Reference to the :class:`.BuildEnvironment` object."""
+ return self.inliner.document.settings.env
+
+ @property
+ def config(self) -> Config:
+ """Reference to the :class:`.Config` object."""
+ return self.env.config
+
+ def get_source_info(self, lineno: int | None = None) -> tuple[str, int]:
+ if lineno is None:
+ lineno = self.lineno
+ return self.inliner.reporter.get_source_and_line(lineno) # type: ignore[attr-defined]
+
+ def set_source_info(self, node: Node, lineno: int | None = None) -> None:
+ node.source, node.line = self.get_source_info(lineno)
+
+ def get_location(self) -> str:
+ """Get current location info for logging."""
+ return ':'.join(str(s) for s in self.get_source_info())
+
+
+class ReferenceRole(SphinxRole):
+ """A base class for reference roles.
+
+ The reference roles can accept ``link title <target>`` style as a text for
+ the role. The parsed result; link title and target will be stored to
+ ``self.title`` and ``self.target``.
+ """
+ has_explicit_title: bool #: A boolean indicates the role has explicit title or not.
+ disabled: bool #: A boolean indicates the reference is disabled.
+ title: str #: The link title for the interpreted text.
+ target: str #: The link target for the interpreted text.
+
+ # \x00 means the "<" was backslash-escaped
+ explicit_title_re = re.compile(r'^(.+?)\s*(?<!\x00)<(.*?)>$', re.DOTALL)
+
+ def __call__(self, name: str, rawtext: str, text: str, lineno: int,
+ inliner: Inliner, options: dict | None = None, content: Sequence[str] = (),
+ ) -> tuple[list[Node], list[system_message]]:
+ if options is None:
+ options = {}
+
+ # if the first character is a bang, don't cross-reference at all
+ self.disabled = text.startswith('!')
+
+ matched = self.explicit_title_re.match(text)
+ if matched:
+ self.has_explicit_title = True
+ self.title = unescape(matched.group(1))
+ self.target = unescape(matched.group(2))
+ else:
+ self.has_explicit_title = False
+ self.title = unescape(text)
+ self.target = unescape(text)
+
+ return super().__call__(name, rawtext, text, lineno, inliner, options, content)
+
+
+class SphinxTranslator(nodes.NodeVisitor):
+ """A base class for Sphinx translators.
+
+ This class adds a support for visitor/departure method for super node class
+ if visitor/departure method for node class is not found.
+
+ It also provides helper methods for Sphinx translators.
+
+ .. note:: The subclasses of this class might not work with docutils.
+ This class is strongly coupled with Sphinx.
+ """
+
+ def __init__(self, document: nodes.document, builder: Builder) -> None:
+ super().__init__(document)
+ self.builder = builder
+ self.config = builder.config
+ self.settings = document.settings
+
+ def dispatch_visit(self, node: Node) -> None:
+ """
+ Dispatch node to appropriate visitor method.
+ The priority of visitor method is:
+
+ 1. ``self.visit_{node_class}()``
+ 2. ``self.visit_{super_node_class}()``
+ 3. ``self.unknown_visit()``
+ """
+ for node_class in node.__class__.__mro__:
+ method = getattr(self, 'visit_%s' % (node_class.__name__), None)
+ if method:
+ method(node)
+ break
+ else:
+ super().dispatch_visit(node)
+
+ def dispatch_departure(self, node: Node) -> None:
+ """
+ Dispatch node to appropriate departure method.
+ The priority of departure method is:
+
+ 1. ``self.depart_{node_class}()``
+ 2. ``self.depart_{super_node_class}()``
+ 3. ``self.unknown_departure()``
+ """
+ for node_class in node.__class__.__mro__:
+ method = getattr(self, 'depart_%s' % (node_class.__name__), None)
+ if method:
+ method(node)
+ break
+ else:
+ super().dispatch_departure(node)
+
+ def unknown_visit(self, node: Node) -> None:
+ logger.warning(__('unknown node type: %r'), node, location=node)
+
+
+# cache a vanilla instance of nodes.document
+# Used in new_document() function
+__document_cache__: tuple[Values, Reporter]
+
+
+def new_document(source_path: str, settings: Any = None) -> nodes.document:
+ """Return a new empty document object. This is an alternative of docutils'.
+
+ This is a simple wrapper for ``docutils.utils.new_document()``. It
+ caches the result of docutils' and use it on second call for instantiation.
+ This makes an instantiation of document nodes much faster.
+ """
+ global __document_cache__
+ try:
+ cached_settings, reporter = __document_cache__
+ except NameError:
+ doc = docutils.utils.new_document(source_path)
+ __document_cache__ = cached_settings, reporter = doc.settings, doc.reporter
+
+ if settings is None:
+ # Make a copy of the cached settings to accelerate instantiation
+ settings = copy(cached_settings)
+
+ # Create a new instance of nodes.document using cached reporter
+ from sphinx import addnodes
+ document = addnodes.document(settings, reporter, source=source_path)
+ document.note_source(source_path, -1)
+ return document
diff --git a/sphinx/util/exceptions.py b/sphinx/util/exceptions.py
new file mode 100644
index 0000000..9e25695
--- /dev/null
+++ b/sphinx/util/exceptions.py
@@ -0,0 +1,67 @@
+from __future__ import annotations
+
+import sys
+import traceback
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING
+
+from sphinx.errors import SphinxParallelError
+from sphinx.util.console import strip_colors
+
+if TYPE_CHECKING:
+ from sphinx.application import Sphinx
+
+
+def save_traceback(app: Sphinx | None, exc: BaseException) -> str:
+ """Save the given exception's traceback in a temporary file."""
+ import platform
+
+ import docutils
+ import jinja2
+ import pygments
+
+ import sphinx
+
+ if isinstance(exc, SphinxParallelError):
+ exc_format = '(Error in parallel process)\n' + exc.traceback
+ else:
+ exc_format = traceback.format_exc()
+
+ if app is None:
+ last_msgs = exts_list = ''
+ else:
+ extensions = app.extensions.values()
+ last_msgs = '\n'.join(f'# {strip_colors(s).strip()}' for s in app.messagelog)
+ exts_list = '\n'.join(f'# {ext.name} ({ext.version})' for ext in extensions
+ if ext.version != 'builtin')
+
+ with NamedTemporaryFile('w', suffix='.log', prefix='sphinx-err-', delete=False) as f:
+ f.write(f"""\
+# Platform: {sys.platform}; ({platform.platform()})
+# Sphinx version: {sphinx.__display_version__}
+# Python version: {platform.python_version()} ({platform.python_implementation()})
+# Docutils version: {docutils.__version__}
+# Jinja2 version: {jinja2.__version__}
+# Pygments version: {pygments.__version__}
+
+# Last messages:
+{last_msgs}
+
+# Loaded extensions:
+{exts_list}
+
+# Traceback:
+{exc_format}
+""")
+ return f.name
+
+
+def format_exception_cut_frames(x: int = 1) -> str:
+ """Format an exception with traceback, but only the last x frames."""
+ typ, val, tb = sys.exc_info()
+ # res = ['Traceback (most recent call last):\n']
+ res: list[str] = []
+ tbres = traceback.format_tb(tb)
+ res += tbres[-x:]
+ res += traceback.format_exception_only(typ, val)
+ return ''.join(res)
diff --git a/sphinx/util/fileutil.py b/sphinx/util/fileutil.py
new file mode 100644
index 0000000..316ec39
--- /dev/null
+++ b/sphinx/util/fileutil.py
@@ -0,0 +1,100 @@
+"""File utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import os
+import posixpath
+from typing import TYPE_CHECKING, Callable
+
+from docutils.utils import relative_path
+
+from sphinx.util.osutil import copyfile, ensuredir
+
+if TYPE_CHECKING:
+ from sphinx.util.template import BaseRenderer
+ from sphinx.util.typing import PathMatcher
+
+
+def copy_asset_file(source: str | os.PathLike[str], destination: str | os.PathLike[str],
+ context: dict | None = None,
+ renderer: BaseRenderer | None = None) -> None:
+ """Copy an asset file to destination.
+
+ On copying, it expands the template variables if context argument is given and
+ the asset is a template file.
+
+ :param source: The path to source file
+ :param destination: The path to destination file or directory
+ :param context: The template variables. If not given, template files are simply copied
+ :param renderer: The template engine. If not given, SphinxRenderer is used by default
+ """
+ if not os.path.exists(source):
+ return
+
+ if os.path.isdir(destination):
+ # Use source filename if destination points a directory
+ destination = os.path.join(destination, os.path.basename(source))
+ else:
+ destination = str(destination)
+
+ if os.path.basename(source).endswith(('_t', '_T')) and context is not None:
+ if renderer is None:
+ from sphinx.util.template import SphinxRenderer
+ renderer = SphinxRenderer()
+
+ with open(source, encoding='utf-8') as fsrc:
+ if destination.endswith(('_t', '_T')):
+ destination = destination[:-2]
+ with open(destination, 'w', encoding='utf-8') as fdst:
+ fdst.write(renderer.render_string(fsrc.read(), context))
+ else:
+ copyfile(source, destination)
+
+
+def copy_asset(source: str | os.PathLike[str], destination: str | os.PathLike[str],
+ excluded: PathMatcher = lambda path: False,
+ context: dict | None = None, renderer: BaseRenderer | None = None,
+ onerror: Callable[[str, Exception], None] | None = None) -> None:
+ """Copy asset files to destination recursively.
+
+ On copying, it expands the template variables if context argument is given and
+ the asset is a template file.
+
+ :param source: The path to source file or directory
+ :param destination: The path to destination directory
+ :param excluded: The matcher to determine the given path should be copied or not
+ :param context: The template variables. If not given, template files are simply copied
+ :param renderer: The template engine. If not given, SphinxRenderer is used by default
+ :param onerror: The error handler.
+ """
+ if not os.path.exists(source):
+ return
+
+ if renderer is None:
+ from sphinx.util.template import SphinxRenderer
+ renderer = SphinxRenderer()
+
+ ensuredir(destination)
+ if os.path.isfile(source):
+ copy_asset_file(source, destination, context, renderer)
+ return
+
+ for root, dirs, files in os.walk(source, followlinks=True):
+ reldir = relative_path(source, root) # type: ignore[arg-type]
+ for dir in dirs[:]:
+ if excluded(posixpath.join(reldir, dir)):
+ dirs.remove(dir)
+ else:
+ ensuredir(posixpath.join(destination, reldir, dir))
+
+ for filename in files:
+ if not excluded(posixpath.join(reldir, filename)):
+ try:
+ copy_asset_file(posixpath.join(root, filename),
+ posixpath.join(destination, reldir),
+ context, renderer)
+ except Exception as exc:
+ if onerror:
+ onerror(posixpath.join(root, filename), exc)
+ else:
+ raise
diff --git a/sphinx/util/http_date.py b/sphinx/util/http_date.py
new file mode 100644
index 0000000..8e245cb
--- /dev/null
+++ b/sphinx/util/http_date.py
@@ -0,0 +1,39 @@
+"""Convert times to and from HTTP-date serialisations.
+
+Reference: https://www.rfc-editor.org/rfc/rfc7231#section-7.1.1.1
+"""
+
+import time
+import warnings
+from email.utils import formatdate, parsedate_tz
+
+from sphinx.deprecation import RemovedInSphinx90Warning
+
+_GMT_OFFSET = float(time.localtime().tm_gmtoff)
+
+
+def epoch_to_rfc1123(epoch: float) -> str:
+ """Return HTTP-date string from epoch offset."""
+ return formatdate(epoch, usegmt=True)
+
+
+def rfc1123_to_epoch(rfc1123: str) -> float:
+ """Return epoch offset from HTTP-date string."""
+ t = parsedate_tz(rfc1123)
+ if t is None:
+ raise ValueError
+ if not rfc1123.endswith(" GMT"):
+ warnings.warn(
+ "HTTP-date string does not meet RFC 7231 requirements "
+ f"(must end with 'GMT'): {rfc1123!r}",
+ RemovedInSphinx90Warning, stacklevel=3,
+ )
+ epoch_secs = time.mktime(time.struct_time(t[:9])) + _GMT_OFFSET
+ if (gmt_offset := t[9]) != 0:
+ warnings.warn(
+ "HTTP-date string does not meet RFC 7231 requirements "
+ f"(must be GMT time): {rfc1123!r}",
+ RemovedInSphinx90Warning, stacklevel=3,
+ )
+ return epoch_secs - (gmt_offset or 0)
+ return epoch_secs
diff --git a/sphinx/util/i18n.py b/sphinx/util/i18n.py
new file mode 100644
index 0000000..b820884
--- /dev/null
+++ b/sphinx/util/i18n.py
@@ -0,0 +1,253 @@
+"""Builder superclass for all builders."""
+
+from __future__ import annotations
+
+import os
+import re
+from datetime import datetime, timezone
+from os import path
+from typing import TYPE_CHECKING, Callable, NamedTuple
+
+import babel.dates
+from babel.messages.mofile import write_mo
+from babel.messages.pofile import read_po
+
+from sphinx.errors import SphinxError
+from sphinx.locale import __
+from sphinx.util import logging
+from sphinx.util.osutil import SEP, canon_path, relpath
+
+if TYPE_CHECKING:
+ from collections.abc import Generator
+
+ from sphinx.environment import BuildEnvironment
+
+
+logger = logging.getLogger(__name__)
+
+
+class LocaleFileInfoBase(NamedTuple):
+ base_dir: str
+ domain: str
+ charset: str
+
+
+class CatalogInfo(LocaleFileInfoBase):
+
+ @property
+ def po_file(self) -> str:
+ return self.domain + '.po'
+
+ @property
+ def mo_file(self) -> str:
+ return self.domain + '.mo'
+
+ @property
+ def po_path(self) -> str:
+ return path.join(self.base_dir, self.po_file)
+
+ @property
+ def mo_path(self) -> str:
+ return path.join(self.base_dir, self.mo_file)
+
+ def is_outdated(self) -> bool:
+ return (
+ not path.exists(self.mo_path) or
+ path.getmtime(self.mo_path) < path.getmtime(self.po_path))
+
+ def write_mo(self, locale: str, use_fuzzy: bool = False) -> None:
+ with open(self.po_path, encoding=self.charset) as file_po:
+ try:
+ po = read_po(file_po, locale)
+ except Exception as exc:
+ logger.warning(__('reading error: %s, %s'), self.po_path, exc)
+ return
+
+ with open(self.mo_path, 'wb') as file_mo:
+ try:
+ write_mo(file_mo, po, use_fuzzy)
+ except Exception as exc:
+ logger.warning(__('writing error: %s, %s'), self.mo_path, exc)
+
+
+class CatalogRepository:
+ """A repository for message catalogs."""
+
+ def __init__(self, basedir: str | os.PathLike[str], locale_dirs: list[str],
+ language: str, encoding: str) -> None:
+ self.basedir = basedir
+ self._locale_dirs = locale_dirs
+ self.language = language
+ self.encoding = encoding
+
+ @property
+ def locale_dirs(self) -> Generator[str, None, None]:
+ if not self.language:
+ return
+
+ for locale_dir in self._locale_dirs:
+ locale_dir = path.join(self.basedir, locale_dir)
+ locale_path = path.join(locale_dir, self.language, 'LC_MESSAGES')
+ if path.exists(locale_path):
+ yield locale_dir
+ else:
+ logger.verbose(__('locale_dir %s does not exist'), locale_path)
+
+ @property
+ def pofiles(self) -> Generator[tuple[str, str], None, None]:
+ for locale_dir in self.locale_dirs:
+ basedir = path.join(locale_dir, self.language, 'LC_MESSAGES')
+ for root, dirnames, filenames in os.walk(basedir):
+ # skip dot-directories
+ for dirname in dirnames:
+ if dirname.startswith('.'):
+ dirnames.remove(dirname)
+
+ for filename in filenames:
+ if filename.endswith('.po'):
+ fullpath = path.join(root, filename)
+ yield basedir, relpath(fullpath, basedir)
+
+ @property
+ def catalogs(self) -> Generator[CatalogInfo, None, None]:
+ for basedir, filename in self.pofiles:
+ domain = canon_path(path.splitext(filename)[0])
+ yield CatalogInfo(basedir, domain, self.encoding)
+
+
+def docname_to_domain(docname: str, compaction: bool | str) -> str:
+ """Convert docname to domain for catalogs."""
+ if isinstance(compaction, str):
+ return compaction
+ if compaction:
+ return docname.split(SEP, 1)[0]
+ else:
+ return docname
+
+
+# date_format mappings: ustrftime() to babel.dates.format_datetime()
+date_format_mappings = {
+ '%a': 'EEE', # Weekday as locale’s abbreviated name.
+ '%A': 'EEEE', # Weekday as locale’s full name.
+ '%b': 'MMM', # Month as locale’s abbreviated name.
+ '%B': 'MMMM', # Month as locale’s full name.
+ '%c': 'medium', # Locale’s appropriate date and time representation.
+ '%-d': 'd', # Day of the month as a decimal number.
+ '%d': 'dd', # Day of the month as a zero-padded decimal number.
+ '%-H': 'H', # Hour (24-hour clock) as a decimal number [0,23].
+ '%H': 'HH', # Hour (24-hour clock) as a zero-padded decimal number [00,23].
+ '%-I': 'h', # Hour (12-hour clock) as a decimal number [1,12].
+ '%I': 'hh', # Hour (12-hour clock) as a zero-padded decimal number [01,12].
+ '%-j': 'D', # Day of the year as a decimal number.
+ '%j': 'DDD', # Day of the year as a zero-padded decimal number.
+ '%-m': 'M', # Month as a decimal number.
+ '%m': 'MM', # Month as a zero-padded decimal number.
+ '%-M': 'm', # Minute as a decimal number [0,59].
+ '%M': 'mm', # Minute as a zero-padded decimal number [00,59].
+ '%p': 'a', # Locale’s equivalent of either AM or PM.
+ '%-S': 's', # Second as a decimal number.
+ '%S': 'ss', # Second as a zero-padded decimal number.
+ '%U': 'WW', # Week number of the year (Sunday as the first day of the week)
+ # as a zero padded decimal number. All days in a new year preceding
+ # the first Sunday are considered to be in week 0.
+ '%w': 'e', # Weekday as a decimal number, where 0 is Sunday and 6 is Saturday.
+ '%-W': 'W', # Week number of the year (Monday as the first day of the week)
+ # as a decimal number. All days in a new year preceding the first
+ # Monday are considered to be in week 0.
+ '%W': 'WW', # Week number of the year (Monday as the first day of the week)
+ # as a zero-padded decimal number.
+ '%x': 'medium', # Locale’s appropriate date representation.
+ '%X': 'medium', # Locale’s appropriate time representation.
+ '%y': 'YY', # Year without century as a zero-padded decimal number.
+ '%Y': 'yyyy', # Year with century as a decimal number.
+ '%Z': 'zzz', # Time zone name (no characters if no time zone exists).
+ '%z': 'ZZZ', # UTC offset in the form ±HHMM[SS[.ffffff]]
+ # (empty string if the object is naive).
+ '%%': '%',
+}
+
+date_format_re = re.compile('(%s)' % '|'.join(date_format_mappings))
+
+
+def babel_format_date(date: datetime, format: str, locale: str,
+ formatter: Callable = babel.dates.format_date) -> str:
+ # Check if we have the tzinfo attribute. If not we cannot do any time
+ # related formats.
+ if not hasattr(date, 'tzinfo'):
+ formatter = babel.dates.format_date
+
+ try:
+ return formatter(date, format, locale=locale)
+ except (ValueError, babel.core.UnknownLocaleError):
+ # fallback to English
+ return formatter(date, format, locale='en')
+ except AttributeError:
+ logger.warning(__('Invalid date format. Quote the string by single quote '
+ 'if you want to output it directly: %s'), format)
+ return format
+
+
+def format_date(
+ format: str, *, date: datetime | None = None, language: str,
+) -> str:
+ if date is None:
+ # If time is not specified, try to use $SOURCE_DATE_EPOCH variable
+ # See https://wiki.debian.org/ReproducibleBuilds/TimestampsProposal
+ source_date_epoch = os.getenv('SOURCE_DATE_EPOCH')
+ if source_date_epoch is not None:
+ date = datetime.fromtimestamp(float(source_date_epoch), tz=timezone.utc)
+ else:
+ date = datetime.now(tz=timezone.utc).astimezone()
+
+ result = []
+ tokens = date_format_re.split(format)
+ for token in tokens:
+ if token in date_format_mappings:
+ babel_format = date_format_mappings.get(token, '')
+
+ # Check if we have to use a different babel formatter then
+ # format_datetime, because we only want to format a date
+ # or a time.
+ if token == '%x':
+ function = babel.dates.format_date
+ elif token == '%X':
+ function = babel.dates.format_time
+ else:
+ function = babel.dates.format_datetime
+
+ result.append(babel_format_date(date, babel_format, locale=language,
+ formatter=function))
+ else:
+ result.append(token)
+
+ return "".join(result)
+
+
+def get_image_filename_for_language(
+ filename: str | os.PathLike[str],
+ env: BuildEnvironment,
+) -> str:
+ root, ext = path.splitext(filename)
+ dirname = path.dirname(root)
+ docpath = path.dirname(env.docname)
+ try:
+ return env.config.figure_language_filename.format(
+ root=root,
+ ext=ext,
+ path=dirname and dirname + SEP,
+ basename=path.basename(root),
+ docpath=docpath and docpath + SEP,
+ language=env.config.language,
+ )
+ except KeyError as exc:
+ msg = f'Invalid figure_language_filename: {exc!r}'
+ raise SphinxError(msg) from exc
+
+
+def search_image_for_language(filename: str, env: BuildEnvironment) -> str:
+ translated = get_image_filename_for_language(filename, env)
+ _, abspath = env.relfn2path(translated)
+ if path.exists(abspath):
+ return translated
+ else:
+ return filename
diff --git a/sphinx/util/images.py b/sphinx/util/images.py
new file mode 100644
index 0000000..ac0e7f4
--- /dev/null
+++ b/sphinx/util/images.py
@@ -0,0 +1,146 @@
+"""Image utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import base64
+from os import path
+from typing import TYPE_CHECKING, NamedTuple, overload
+
+import imagesize
+
+if TYPE_CHECKING:
+ from os import PathLike
+
+try:
+ from PIL import Image
+except ImportError:
+ Image = None
+
+mime_suffixes = {
+ '.gif': 'image/gif',
+ '.jpg': 'image/jpeg',
+ '.png': 'image/png',
+ '.pdf': 'application/pdf',
+ '.svg': 'image/svg+xml',
+ '.svgz': 'image/svg+xml',
+ '.ai': 'application/illustrator',
+}
+_suffix_from_mime = {v: k for k, v in reversed(mime_suffixes.items())}
+
+
+class DataURI(NamedTuple):
+ mimetype: str
+ charset: str
+ data: bytes
+
+
+def get_image_size(filename: str) -> tuple[int, int] | None:
+ try:
+ size = imagesize.get(filename)
+ if size[0] == -1:
+ size = None
+ elif isinstance(size[0], float) or isinstance(size[1], float):
+ size = (int(size[0]), int(size[1]))
+
+ if size is None and Image: # fallback to Pillow
+ with Image.open(filename) as im:
+ size = im.size
+
+ return size
+ except Exception:
+ return None
+
+
+@overload
+def guess_mimetype(filename: PathLike[str] | str, default: str) -> str:
+ ...
+
+
+@overload
+def guess_mimetype(filename: PathLike[str] | str, default: None = None) -> str | None:
+ ...
+
+
+def guess_mimetype(
+ filename: PathLike[str] | str = '',
+ default: str | None = None,
+) -> str | None:
+ ext = path.splitext(filename)[1].lower()
+ if ext in mime_suffixes:
+ return mime_suffixes[ext]
+ if path.exists(filename):
+ try:
+ imgtype = _image_type_from_file(filename)
+ except ValueError:
+ pass
+ else:
+ return 'image/' + imgtype
+ return default
+
+
+def get_image_extension(mimetype: str) -> str | None:
+ return _suffix_from_mime.get(mimetype)
+
+
+def parse_data_uri(uri: str) -> DataURI | None:
+ if not uri.startswith('data:'):
+ return None
+
+ # data:[<MIME-type>][;charset=<encoding>][;base64],<data>
+ mimetype = 'text/plain'
+ charset = 'US-ASCII'
+
+ properties, data = uri[5:].split(',', 1)
+ for prop in properties.split(';'):
+ if prop == 'base64':
+ pass # skip
+ elif prop.startswith('charset='):
+ charset = prop[8:]
+ elif prop:
+ mimetype = prop
+
+ image_data = base64.b64decode(data)
+ return DataURI(mimetype, charset, image_data)
+
+
+def _image_type_from_file(filename: PathLike[str] | str) -> str:
+ with open(filename, 'rb') as f:
+ header = f.read(32) # 32 bytes
+
+ # Bitmap
+ # https://en.wikipedia.org/wiki/BMP_file_format#Bitmap_file_header
+ if header.startswith(b'BM'):
+ return 'bmp'
+
+ # GIF
+ # https://en.wikipedia.org/wiki/GIF#File_format
+ if header.startswith((b'GIF87a', b'GIF89a')):
+ return 'gif'
+
+ # JPEG data
+ # https://en.wikipedia.org/wiki/JPEG_File_Interchange_Format#File_format_structure
+ if header.startswith(b'\xFF\xD8'):
+ return 'jpeg'
+
+ # Portable Network Graphics
+ # https://en.wikipedia.org/wiki/PNG#File_header
+ if header.startswith(b'\x89PNG\r\n\x1A\n'):
+ return 'png'
+
+ # Scalable Vector Graphics
+ # https://svgwg.org/svg2-draft/struct.html
+ if b'<svg' in header.lower():
+ return 'svg+xml'
+
+ # TIFF
+ # https://en.wikipedia.org/wiki/TIFF#Byte_order
+ if header.startswith((b'MM', b'II')):
+ return 'tiff'
+
+ # WebP
+ # https://en.wikipedia.org/wiki/WebP#Technology
+ if header.startswith(b'RIFF') and header[8:12] == b'WEBP':
+ return 'webp'
+
+ msg = 'Could not detect image type!'
+ raise ValueError(msg)
diff --git a/sphinx/util/index_entries.py b/sphinx/util/index_entries.py
new file mode 100644
index 0000000..1004684
--- /dev/null
+++ b/sphinx/util/index_entries.py
@@ -0,0 +1,27 @@
+from __future__ import annotations
+
+
+def split_index_msg(entry_type: str, value: str) -> list[str]:
+ # new entry types must be listed in util/nodes.py!
+ if entry_type == 'single':
+ try:
+ return _split_into(2, 'single', value)
+ except ValueError:
+ return _split_into(1, 'single', value)
+ if entry_type == 'pair':
+ return _split_into(2, 'pair', value)
+ if entry_type == 'triple':
+ return _split_into(3, 'triple', value)
+ if entry_type in {'see', 'seealso'}:
+ return _split_into(2, 'see', value)
+ msg = f'invalid {entry_type} index entry {value!r}'
+ raise ValueError(msg)
+
+
+def _split_into(n: int, type: str, value: str) -> list[str]:
+ """Split an index entry into a given number of parts at semicolons."""
+ parts = [x.strip() for x in value.split(';', n - 1)]
+ if len(list(filter(None, parts))) < n:
+ msg = f'invalid {type} index entry {value!r}'
+ raise ValueError(msg)
+ return parts
diff --git a/sphinx/util/inspect.py b/sphinx/util/inspect.py
new file mode 100644
index 0000000..7d7fbb8
--- /dev/null
+++ b/sphinx/util/inspect.py
@@ -0,0 +1,833 @@
+"""Helpers for inspecting Python modules."""
+
+from __future__ import annotations
+
+import ast
+import builtins
+import contextlib
+import enum
+import inspect
+import re
+import sys
+import types
+import typing
+from collections.abc import Mapping, Sequence
+from functools import cached_property, partial, partialmethod, singledispatchmethod
+from importlib import import_module
+from inspect import ( # noqa: F401
+ Parameter,
+ isasyncgenfunction,
+ isclass,
+ ismethod,
+ ismethoddescriptor,
+ ismodule,
+)
+from io import StringIO
+from types import (
+ ClassMethodDescriptorType,
+ MethodDescriptorType,
+ MethodType,
+ ModuleType,
+ WrapperDescriptorType,
+)
+from typing import Any, Callable, cast
+
+from sphinx.pycode.ast import unparse as ast_unparse
+from sphinx.util import logging
+from sphinx.util.typing import ForwardRef, stringify_annotation
+
+logger = logging.getLogger(__name__)
+
+memory_address_re = re.compile(r' at 0x[0-9a-f]{8,16}(?=>)', re.IGNORECASE)
+
+
+def unwrap(obj: Any) -> Any:
+ """Get an original object from wrapped object (wrapped functions)."""
+ if hasattr(obj, '__sphinx_mock__'):
+ # Skip unwrapping mock object to avoid RecursionError
+ return obj
+ try:
+ return inspect.unwrap(obj)
+ except ValueError:
+ # might be a mock object
+ return obj
+
+
+def unwrap_all(obj: Any, *, stop: Callable | None = None) -> Any:
+ """
+ Get an original object from wrapped object (unwrapping partials, wrapped
+ functions, and other decorators).
+ """
+ while True:
+ if stop and stop(obj):
+ return obj
+ if ispartial(obj):
+ obj = obj.func
+ elif inspect.isroutine(obj) and hasattr(obj, '__wrapped__'):
+ obj = obj.__wrapped__
+ elif isclassmethod(obj) or isstaticmethod(obj):
+ obj = obj.__func__
+ else:
+ return obj
+
+
+def getall(obj: Any) -> Sequence[str] | None:
+ """Get __all__ attribute of the module as dict.
+
+ Return None if given *obj* does not have __all__.
+ Raises ValueError if given *obj* have invalid __all__.
+ """
+ __all__ = safe_getattr(obj, '__all__', None)
+ if __all__ is None:
+ return None
+ if isinstance(__all__, (list, tuple)) and all(isinstance(e, str) for e in __all__):
+ return __all__
+ raise ValueError(__all__)
+
+
+def getannotations(obj: Any) -> Mapping[str, Any]:
+ """Get __annotations__ from given *obj* safely."""
+ __annotations__ = safe_getattr(obj, '__annotations__', None)
+ if isinstance(__annotations__, Mapping):
+ return __annotations__
+ else:
+ return {}
+
+
+def getglobals(obj: Any) -> Mapping[str, Any]:
+ """Get __globals__ from given *obj* safely."""
+ __globals__ = safe_getattr(obj, '__globals__', None)
+ if isinstance(__globals__, Mapping):
+ return __globals__
+ else:
+ return {}
+
+
+def getmro(obj: Any) -> tuple[type, ...]:
+ """Get __mro__ from given *obj* safely."""
+ __mro__ = safe_getattr(obj, '__mro__', None)
+ if isinstance(__mro__, tuple):
+ return __mro__
+ else:
+ return ()
+
+
+def getorigbases(obj: Any) -> tuple[Any, ...] | None:
+ """Get __orig_bases__ from *obj* safely."""
+ if not inspect.isclass(obj):
+ return None
+
+ # Get __orig_bases__ from obj.__dict__ to avoid accessing the parent's __orig_bases__.
+ # refs: https://github.com/sphinx-doc/sphinx/issues/9607
+ __dict__ = safe_getattr(obj, '__dict__', {})
+ __orig_bases__ = __dict__.get('__orig_bases__')
+ if isinstance(__orig_bases__, tuple) and len(__orig_bases__) > 0:
+ return __orig_bases__
+ else:
+ return None
+
+
+def getslots(obj: Any) -> dict[str, Any] | None:
+ """Get __slots__ attribute of the class as dict.
+
+ Return None if gienv *obj* does not have __slots__.
+ Raises TypeError if given *obj* is not a class.
+ Raises ValueError if given *obj* have invalid __slots__.
+ """
+ if not inspect.isclass(obj):
+ raise TypeError
+
+ __slots__ = safe_getattr(obj, '__slots__', None)
+ if __slots__ is None:
+ return None
+ elif isinstance(__slots__, dict):
+ return __slots__
+ elif isinstance(__slots__, str):
+ return {__slots__: None}
+ elif isinstance(__slots__, (list, tuple)):
+ return dict.fromkeys(__slots__)
+ else:
+ raise ValueError
+
+
+def isNewType(obj: Any) -> bool:
+ """Check the if object is a kind of NewType."""
+ if sys.version_info[:2] >= (3, 10):
+ return isinstance(obj, typing.NewType)
+ __module__ = safe_getattr(obj, '__module__', None)
+ __qualname__ = safe_getattr(obj, '__qualname__', None)
+ return __module__ == 'typing' and __qualname__ == 'NewType.<locals>.new_type'
+
+
+def isenumclass(x: Any) -> bool:
+ """Check if the object is subclass of enum."""
+ return inspect.isclass(x) and issubclass(x, enum.Enum)
+
+
+def isenumattribute(x: Any) -> bool:
+ """Check if the object is attribute of enum."""
+ return isinstance(x, enum.Enum)
+
+
+def unpartial(obj: Any) -> Any:
+ """Get an original object from partial object.
+
+ This returns given object itself if not partial.
+ """
+ while ispartial(obj):
+ obj = obj.func
+
+ return obj
+
+
+def ispartial(obj: Any) -> bool:
+ """Check if the object is partial."""
+ return isinstance(obj, (partial, partialmethod))
+
+
+def isclassmethod(obj: Any, cls: Any = None, name: str | None = None) -> bool:
+ """Check if the object is classmethod."""
+ if isinstance(obj, classmethod):
+ return True
+ if inspect.ismethod(obj) and obj.__self__ is not None and isclass(obj.__self__):
+ return True
+ if cls and name:
+ placeholder = object()
+ for basecls in getmro(cls):
+ meth = basecls.__dict__.get(name, placeholder)
+ if meth is not placeholder:
+ return isclassmethod(meth)
+
+ return False
+
+
+def isstaticmethod(obj: Any, cls: Any = None, name: str | None = None) -> bool:
+ """Check if the object is staticmethod."""
+ if isinstance(obj, staticmethod):
+ return True
+ if cls and name:
+ # trace __mro__ if the method is defined in parent class
+ #
+ # .. note:: This only works well with new style classes.
+ for basecls in getattr(cls, '__mro__', [cls]):
+ meth = basecls.__dict__.get(name)
+ if meth:
+ return isinstance(meth, staticmethod)
+ return False
+
+
+def isdescriptor(x: Any) -> bool:
+ """Check if the object is some kind of descriptor."""
+ return any(
+ callable(safe_getattr(x, item, None))
+ for item in ['__get__', '__set__', '__delete__']
+ )
+
+
+def isabstractmethod(obj: Any) -> bool:
+ """Check if the object is an abstractmethod."""
+ return safe_getattr(obj, '__isabstractmethod__', False) is True
+
+
+def isboundmethod(method: MethodType) -> bool:
+ """Check if the method is a bound method."""
+ return safe_getattr(method, '__self__', None) is not None
+
+
+def is_cython_function_or_method(obj: Any) -> bool:
+ """Check if the object is a function or method in cython."""
+ try:
+ return obj.__class__.__name__ == 'cython_function_or_method'
+ except AttributeError:
+ return False
+
+
+def isattributedescriptor(obj: Any) -> bool:
+ """Check if the object is an attribute like descriptor."""
+ if inspect.isdatadescriptor(obj):
+ # data descriptor is kind of attribute
+ return True
+ if isdescriptor(obj):
+ # non data descriptor
+ unwrapped = unwrap(obj)
+ if isfunction(unwrapped) or isbuiltin(unwrapped) or inspect.ismethod(unwrapped):
+ # attribute must not be either function, builtin and method
+ return False
+ if is_cython_function_or_method(unwrapped):
+ # attribute must not be either function and method (for cython)
+ return False
+ if inspect.isclass(unwrapped):
+ # attribute must not be a class
+ return False
+ if isinstance(unwrapped, (ClassMethodDescriptorType,
+ MethodDescriptorType,
+ WrapperDescriptorType)):
+ # attribute must not be a method descriptor
+ return False
+ if type(unwrapped).__name__ == "instancemethod":
+ # attribute must not be an instancemethod (C-API)
+ return False
+ return True
+ return False
+
+
+def is_singledispatch_function(obj: Any) -> bool:
+ """Check if the object is singledispatch function."""
+ return (inspect.isfunction(obj) and
+ hasattr(obj, 'dispatch') and
+ hasattr(obj, 'register') and
+ obj.dispatch.__module__ == 'functools')
+
+
+def is_singledispatch_method(obj: Any) -> bool:
+ """Check if the object is singledispatch method."""
+ return isinstance(obj, singledispatchmethod)
+
+
+def isfunction(obj: Any) -> bool:
+ """Check if the object is function."""
+ return inspect.isfunction(unpartial(obj))
+
+
+def isbuiltin(obj: Any) -> bool:
+ """Check if the object is function."""
+ return inspect.isbuiltin(unpartial(obj))
+
+
+def isroutine(obj: Any) -> bool:
+ """Check is any kind of function or method."""
+ return inspect.isroutine(unpartial(obj))
+
+
+def iscoroutinefunction(obj: Any) -> bool:
+ """Check if the object is coroutine-function."""
+ def iswrappedcoroutine(obj: Any) -> bool:
+ """Check if the object is wrapped coroutine-function."""
+ if isstaticmethod(obj) or isclassmethod(obj) or ispartial(obj):
+ # staticmethod, classmethod and partial method are not a wrapped coroutine-function
+ # Note: Since 3.10, staticmethod and classmethod becomes a kind of wrappers
+ return False
+ return hasattr(obj, '__wrapped__')
+
+ obj = unwrap_all(obj, stop=iswrappedcoroutine)
+ return inspect.iscoroutinefunction(obj)
+
+
+def isproperty(obj: Any) -> bool:
+ """Check if the object is property."""
+ return isinstance(obj, (property, cached_property))
+
+
+def isgenericalias(obj: Any) -> bool:
+ """Check if the object is GenericAlias."""
+ return isinstance(
+ obj, (types.GenericAlias, typing._BaseGenericAlias)) # type: ignore[attr-defined]
+
+
+def safe_getattr(obj: Any, name: str, *defargs: Any) -> Any:
+ """A getattr() that turns all exceptions into AttributeErrors."""
+ try:
+ return getattr(obj, name, *defargs)
+ except Exception as exc:
+ # sometimes accessing a property raises an exception (e.g.
+ # NotImplementedError), so let's try to read the attribute directly
+ try:
+ # In case the object does weird things with attribute access
+ # such that accessing `obj.__dict__` may raise an exception
+ return obj.__dict__[name]
+ except Exception:
+ pass
+
+ # this is a catch-all for all the weird things that some modules do
+ # with attribute access
+ if defargs:
+ return defargs[0]
+
+ raise AttributeError(name) from exc
+
+
+def object_description(obj: Any, *, _seen: frozenset = frozenset()) -> str:
+ """A repr() implementation that returns text safe to use in reST context.
+
+ Maintains a set of 'seen' object IDs to detect and avoid infinite recursion.
+ """
+ seen = _seen
+ if isinstance(obj, dict):
+ if id(obj) in seen:
+ return 'dict(...)'
+ seen |= {id(obj)}
+ try:
+ sorted_keys = sorted(obj)
+ except TypeError:
+ # Cannot sort dict keys, fall back to using descriptions as a sort key
+ sorted_keys = sorted(obj, key=lambda k: object_description(k, _seen=seen))
+
+ items = ((object_description(key, _seen=seen),
+ object_description(obj[key], _seen=seen)) for key in sorted_keys)
+ return '{%s}' % ', '.join(f'{key}: {value}' for (key, value) in items)
+ elif isinstance(obj, set):
+ if id(obj) in seen:
+ return 'set(...)'
+ seen |= {id(obj)}
+ try:
+ sorted_values = sorted(obj)
+ except TypeError:
+ # Cannot sort set values, fall back to using descriptions as a sort key
+ sorted_values = sorted(obj, key=lambda x: object_description(x, _seen=seen))
+ return '{%s}' % ', '.join(object_description(x, _seen=seen) for x in sorted_values)
+ elif isinstance(obj, frozenset):
+ if id(obj) in seen:
+ return 'frozenset(...)'
+ seen |= {id(obj)}
+ try:
+ sorted_values = sorted(obj)
+ except TypeError:
+ # Cannot sort frozenset values, fall back to using descriptions as a sort key
+ sorted_values = sorted(obj, key=lambda x: object_description(x, _seen=seen))
+ return 'frozenset({%s})' % ', '.join(object_description(x, _seen=seen)
+ for x in sorted_values)
+ elif isinstance(obj, enum.Enum):
+ return f'{obj.__class__.__name__}.{obj.name}'
+ elif isinstance(obj, tuple):
+ if id(obj) in seen:
+ return 'tuple(...)'
+ seen |= frozenset([id(obj)])
+ return '(%s%s)' % (
+ ', '.join(object_description(x, _seen=seen) for x in obj),
+ ',' * (len(obj) == 1),
+ )
+ elif isinstance(obj, list):
+ if id(obj) in seen:
+ return 'list(...)'
+ seen |= {id(obj)}
+ return '[%s]' % ', '.join(object_description(x, _seen=seen) for x in obj)
+
+ try:
+ s = repr(obj)
+ except Exception as exc:
+ raise ValueError from exc
+ # Strip non-deterministic memory addresses such as
+ # ``<__main__.A at 0x7f68cb685710>``
+ s = memory_address_re.sub('', s)
+ return s.replace('\n', ' ')
+
+
+def is_builtin_class_method(obj: Any, attr_name: str) -> bool:
+ """If attr_name is implemented at builtin class, return True.
+
+ >>> is_builtin_class_method(int, '__init__')
+ True
+
+ Why this function needed? CPython implements int.__init__ by Descriptor
+ but PyPy implements it by pure Python code.
+ """
+ try:
+ mro = getmro(obj)
+ cls = next(c for c in mro if attr_name in safe_getattr(c, '__dict__', {}))
+ except StopIteration:
+ return False
+
+ try:
+ name = safe_getattr(cls, '__name__')
+ except AttributeError:
+ return False
+
+ return getattr(builtins, name, None) is cls
+
+
+class DefaultValue:
+ """A simple wrapper for default value of the parameters of overload functions."""
+
+ def __init__(self, value: str) -> None:
+ self.value = value
+
+ def __eq__(self, other: object) -> bool:
+ return self.value == other
+
+ def __repr__(self) -> str:
+ return self.value
+
+
+class TypeAliasForwardRef:
+ """Pseudo typing class for autodoc_type_aliases.
+
+ This avoids the error on evaluating the type inside `get_type_hints()`.
+ """
+ def __init__(self, name: str) -> None:
+ self.name = name
+
+ def __call__(self) -> None:
+ # Dummy method to imitate special typing classes
+ pass
+
+ def __eq__(self, other: Any) -> bool:
+ return self.name == other
+
+ def __hash__(self) -> int:
+ return hash(self.name)
+
+ def __repr__(self) -> str:
+ return self.name
+
+
+class TypeAliasModule:
+ """Pseudo module class for autodoc_type_aliases."""
+
+ def __init__(self, modname: str, mapping: dict[str, str]) -> None:
+ self.__modname = modname
+ self.__mapping = mapping
+
+ self.__module: ModuleType | None = None
+
+ def __getattr__(self, name: str) -> Any:
+ fullname = '.'.join(filter(None, [self.__modname, name]))
+ if fullname in self.__mapping:
+ # exactly matched
+ return TypeAliasForwardRef(self.__mapping[fullname])
+ else:
+ prefix = fullname + '.'
+ nested = {k: v for k, v in self.__mapping.items() if k.startswith(prefix)}
+ if nested:
+ # sub modules or classes found
+ return TypeAliasModule(fullname, nested)
+ else:
+ # no sub modules or classes found.
+ try:
+ # return the real submodule if exists
+ return import_module(fullname)
+ except ImportError:
+ # return the real class
+ if self.__module is None:
+ self.__module = import_module(self.__modname)
+
+ return getattr(self.__module, name)
+
+
+class TypeAliasNamespace(dict[str, Any]):
+ """Pseudo namespace class for autodoc_type_aliases.
+
+ This enables to look up nested modules and classes like `mod1.mod2.Class`.
+ """
+
+ def __init__(self, mapping: dict[str, str]) -> None:
+ self.__mapping = mapping
+
+ def __getitem__(self, key: str) -> Any:
+ if key in self.__mapping:
+ # exactly matched
+ return TypeAliasForwardRef(self.__mapping[key])
+ else:
+ prefix = key + '.'
+ nested = {k: v for k, v in self.__mapping.items() if k.startswith(prefix)}
+ if nested:
+ # sub modules or classes found
+ return TypeAliasModule(key, nested)
+ else:
+ raise KeyError
+
+
+def _should_unwrap(subject: Callable) -> bool:
+ """Check the function should be unwrapped on getting signature."""
+ __globals__ = getglobals(subject)
+ if (__globals__.get('__name__') == 'contextlib' and
+ __globals__.get('__file__') == contextlib.__file__):
+ # contextmanger should be unwrapped
+ return True
+
+ return False
+
+
+def signature(subject: Callable, bound_method: bool = False, type_aliases: dict | None = None,
+ ) -> inspect.Signature:
+ """Return a Signature object for the given *subject*.
+
+ :param bound_method: Specify *subject* is a bound method or not
+ """
+ if type_aliases is None:
+ type_aliases = {}
+
+ try:
+ if _should_unwrap(subject):
+ signature = inspect.signature(subject)
+ else:
+ signature = inspect.signature(subject, follow_wrapped=True)
+ except ValueError:
+ # follow built-in wrappers up (ex. functools.lru_cache)
+ signature = inspect.signature(subject)
+ parameters = list(signature.parameters.values())
+ return_annotation = signature.return_annotation
+
+ try:
+ # Resolve annotations using ``get_type_hints()`` and type_aliases.
+ localns = TypeAliasNamespace(type_aliases)
+ annotations = typing.get_type_hints(subject, None, localns)
+ for i, param in enumerate(parameters):
+ if param.name in annotations:
+ annotation = annotations[param.name]
+ if isinstance(annotation, TypeAliasForwardRef):
+ annotation = annotation.name
+ parameters[i] = param.replace(annotation=annotation)
+ if 'return' in annotations:
+ if isinstance(annotations['return'], TypeAliasForwardRef):
+ return_annotation = annotations['return'].name
+ else:
+ return_annotation = annotations['return']
+ except Exception:
+ # ``get_type_hints()`` does not support some kind of objects like partial,
+ # ForwardRef and so on.
+ pass
+
+ if bound_method:
+ if inspect.ismethod(subject):
+ # ``inspect.signature()`` considers the subject is a bound method and removes
+ # first argument from signature. Therefore no skips are needed here.
+ pass
+ else:
+ if len(parameters) > 0:
+ parameters.pop(0)
+
+ # To allow to create signature object correctly for pure python functions,
+ # pass an internal parameter __validate_parameters__=False to Signature
+ #
+ # For example, this helps a function having a default value `inspect._empty`.
+ # refs: https://github.com/sphinx-doc/sphinx/issues/7935
+ return inspect.Signature(parameters, return_annotation=return_annotation,
+ __validate_parameters__=False)
+
+
+def evaluate_signature(sig: inspect.Signature, globalns: dict | None = None,
+ localns: dict | None = None,
+ ) -> inspect.Signature:
+ """Evaluate unresolved type annotations in a signature object."""
+ def evaluate_forwardref(ref: ForwardRef, globalns: dict, localns: dict) -> Any:
+ """Evaluate a forward reference."""
+ return ref._evaluate(globalns, localns, frozenset())
+
+ def evaluate(annotation: Any, globalns: dict, localns: dict) -> Any:
+ """Evaluate unresolved type annotation."""
+ try:
+ if isinstance(annotation, str):
+ ref = ForwardRef(annotation, True)
+ annotation = evaluate_forwardref(ref, globalns, localns)
+
+ if isinstance(annotation, ForwardRef):
+ annotation = evaluate_forwardref(ref, globalns, localns)
+ elif isinstance(annotation, str):
+ # might be a ForwardRef'ed annotation in overloaded functions
+ ref = ForwardRef(annotation, True)
+ annotation = evaluate_forwardref(ref, globalns, localns)
+ except (NameError, TypeError):
+ # failed to evaluate type. skipped.
+ pass
+
+ return annotation
+
+ if globalns is None:
+ globalns = {}
+ if localns is None:
+ localns = globalns
+
+ parameters = list(sig.parameters.values())
+ for i, param in enumerate(parameters):
+ if param.annotation:
+ annotation = evaluate(param.annotation, globalns, localns)
+ parameters[i] = param.replace(annotation=annotation)
+
+ return_annotation = sig.return_annotation
+ if return_annotation:
+ return_annotation = evaluate(return_annotation, globalns, localns)
+
+ return sig.replace(parameters=parameters, return_annotation=return_annotation)
+
+
+def stringify_signature(sig: inspect.Signature, show_annotation: bool = True,
+ show_return_annotation: bool = True,
+ unqualified_typehints: bool = False) -> str:
+ """Stringify a Signature object.
+
+ :param show_annotation: If enabled, show annotations on the signature
+ :param show_return_annotation: If enabled, show annotation of the return value
+ :param unqualified_typehints: If enabled, show annotations as unqualified
+ (ex. io.StringIO -> StringIO)
+ """
+ if unqualified_typehints:
+ mode = 'smart'
+ else:
+ mode = 'fully-qualified'
+
+ args = []
+ last_kind = None
+ for param in sig.parameters.values():
+ if param.kind != param.POSITIONAL_ONLY and last_kind == param.POSITIONAL_ONLY:
+ # PEP-570: Separator for Positional Only Parameter: /
+ args.append('/')
+ if param.kind == param.KEYWORD_ONLY and last_kind in (param.POSITIONAL_OR_KEYWORD,
+ param.POSITIONAL_ONLY,
+ None):
+ # PEP-3102: Separator for Keyword Only Parameter: *
+ args.append('*')
+
+ arg = StringIO()
+ if param.kind == param.VAR_POSITIONAL:
+ arg.write('*' + param.name)
+ elif param.kind == param.VAR_KEYWORD:
+ arg.write('**' + param.name)
+ else:
+ arg.write(param.name)
+
+ if show_annotation and param.annotation is not param.empty:
+ arg.write(': ')
+ arg.write(stringify_annotation(param.annotation, mode))
+ if param.default is not param.empty:
+ if show_annotation and param.annotation is not param.empty:
+ arg.write(' = ')
+ else:
+ arg.write('=')
+ arg.write(object_description(param.default))
+
+ args.append(arg.getvalue())
+ last_kind = param.kind
+
+ if last_kind == Parameter.POSITIONAL_ONLY:
+ # PEP-570: Separator for Positional Only Parameter: /
+ args.append('/')
+
+ concatenated_args = ', '.join(args)
+ if (sig.return_annotation is Parameter.empty or
+ show_annotation is False or
+ show_return_annotation is False):
+ return f'({concatenated_args})'
+ else:
+ annotation = stringify_annotation(sig.return_annotation, mode)
+ return f'({concatenated_args}) -> {annotation}'
+
+
+def signature_from_str(signature: str) -> inspect.Signature:
+ """Create a Signature object from string."""
+ code = 'def func' + signature + ': pass'
+ module = ast.parse(code)
+ function = cast(ast.FunctionDef, module.body[0])
+
+ return signature_from_ast(function, code)
+
+
+def signature_from_ast(node: ast.FunctionDef, code: str = '') -> inspect.Signature:
+ """Create a Signature object from AST *node*."""
+ args = node.args
+ defaults = list(args.defaults)
+ params = []
+ if hasattr(args, "posonlyargs"):
+ posonlyargs = len(args.posonlyargs)
+ positionals = posonlyargs + len(args.args)
+ else:
+ posonlyargs = 0
+ positionals = len(args.args)
+
+ for _ in range(len(defaults), positionals):
+ defaults.insert(0, Parameter.empty) # type: ignore[arg-type]
+
+ if hasattr(args, "posonlyargs"):
+ for i, arg in enumerate(args.posonlyargs):
+ if defaults[i] is Parameter.empty:
+ default = Parameter.empty
+ else:
+ default = DefaultValue(
+ ast_unparse(defaults[i], code)) # type: ignore[assignment]
+
+ annotation = ast_unparse(arg.annotation, code) or Parameter.empty
+ params.append(Parameter(arg.arg, Parameter.POSITIONAL_ONLY,
+ default=default, annotation=annotation))
+
+ for i, arg in enumerate(args.args):
+ if defaults[i + posonlyargs] is Parameter.empty:
+ default = Parameter.empty
+ else:
+ default = DefaultValue(
+ ast_unparse(defaults[i + posonlyargs], code), # type: ignore[assignment]
+ )
+
+ annotation = ast_unparse(arg.annotation, code) or Parameter.empty
+ params.append(Parameter(arg.arg, Parameter.POSITIONAL_OR_KEYWORD,
+ default=default, annotation=annotation))
+
+ if args.vararg:
+ annotation = ast_unparse(args.vararg.annotation, code) or Parameter.empty
+ params.append(Parameter(args.vararg.arg, Parameter.VAR_POSITIONAL,
+ annotation=annotation))
+
+ for i, arg in enumerate(args.kwonlyargs):
+ if args.kw_defaults[i] is None:
+ default = Parameter.empty
+ else:
+ default = DefaultValue(
+ ast_unparse(args.kw_defaults[i], code)) # type: ignore[arg-type,assignment]
+ annotation = ast_unparse(arg.annotation, code) or Parameter.empty
+ params.append(Parameter(arg.arg, Parameter.KEYWORD_ONLY, default=default,
+ annotation=annotation))
+
+ if args.kwarg:
+ annotation = ast_unparse(args.kwarg.annotation, code) or Parameter.empty
+ params.append(Parameter(args.kwarg.arg, Parameter.VAR_KEYWORD,
+ annotation=annotation))
+
+ return_annotation = ast_unparse(node.returns, code) or Parameter.empty
+
+ return inspect.Signature(params, return_annotation=return_annotation)
+
+
+def getdoc(
+ obj: Any,
+ attrgetter: Callable = safe_getattr,
+ allow_inherited: bool = False,
+ cls: Any = None,
+ name: str | None = None,
+) -> str | None:
+ """Get the docstring for the object.
+
+ This tries to obtain the docstring for some kind of objects additionally:
+
+ * partial functions
+ * inherited docstring
+ * inherited decorated methods
+ """
+ def getdoc_internal(obj: Any, attrgetter: Callable = safe_getattr) -> str | None:
+ doc = attrgetter(obj, '__doc__', None)
+ if isinstance(doc, str):
+ return doc
+ else:
+ return None
+
+ if cls and name and isclassmethod(obj, cls, name):
+ for basecls in getmro(cls):
+ meth = basecls.__dict__.get(name)
+ if meth and hasattr(meth, '__func__'):
+ doc: str | None = getdoc(meth.__func__)
+ if doc is not None or not allow_inherited:
+ return doc
+
+ doc = getdoc_internal(obj)
+ if ispartial(obj) and doc == obj.__class__.__doc__:
+ return getdoc(obj.func)
+ elif doc is None and allow_inherited:
+ if cls and name:
+ # Check a docstring of the attribute or method from super classes.
+ for basecls in getmro(cls):
+ meth = safe_getattr(basecls, name, None)
+ if meth is not None:
+ doc = getdoc_internal(meth)
+ if doc is not None:
+ break
+
+ if doc is None:
+ # retry using `inspect.getdoc()`
+ for basecls in getmro(cls):
+ meth = safe_getattr(basecls, name, None)
+ if meth is not None:
+ doc = inspect.getdoc(meth)
+ if doc is not None:
+ break
+
+ if doc is None:
+ doc = inspect.getdoc(obj)
+
+ return doc
diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py
new file mode 100644
index 0000000..89f0070
--- /dev/null
+++ b/sphinx/util/inventory.py
@@ -0,0 +1,172 @@
+"""Inventory utility functions for Sphinx."""
+from __future__ import annotations
+
+import os
+import re
+import zlib
+from typing import IO, TYPE_CHECKING, Callable
+
+from sphinx.util import logging
+
+BUFSIZE = 16 * 1024
+logger = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from sphinx.builders import Builder
+ from sphinx.environment import BuildEnvironment
+ from sphinx.util.typing import Inventory, InventoryItem
+
+
+class InventoryFileReader:
+ """A file reader for an inventory file.
+
+ This reader supports mixture of texts and compressed texts.
+ """
+
+ def __init__(self, stream: IO) -> None:
+ self.stream = stream
+ self.buffer = b''
+ self.eof = False
+
+ def read_buffer(self) -> None:
+ chunk = self.stream.read(BUFSIZE)
+ if chunk == b'':
+ self.eof = True
+ self.buffer += chunk
+
+ def readline(self) -> str:
+ pos = self.buffer.find(b'\n')
+ if pos != -1:
+ line = self.buffer[:pos].decode()
+ self.buffer = self.buffer[pos + 1:]
+ elif self.eof:
+ line = self.buffer.decode()
+ self.buffer = b''
+ else:
+ self.read_buffer()
+ line = self.readline()
+
+ return line
+
+ def readlines(self) -> Iterator[str]:
+ while not self.eof:
+ line = self.readline()
+ if line:
+ yield line
+
+ def read_compressed_chunks(self) -> Iterator[bytes]:
+ decompressor = zlib.decompressobj()
+ while not self.eof:
+ self.read_buffer()
+ yield decompressor.decompress(self.buffer)
+ self.buffer = b''
+ yield decompressor.flush()
+
+ def read_compressed_lines(self) -> Iterator[str]:
+ buf = b''
+ for chunk in self.read_compressed_chunks():
+ buf += chunk
+ pos = buf.find(b'\n')
+ while pos != -1:
+ yield buf[:pos].decode()
+ buf = buf[pos + 1:]
+ pos = buf.find(b'\n')
+
+
+class InventoryFile:
+ @classmethod
+ def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory:
+ reader = InventoryFileReader(stream)
+ line = reader.readline().rstrip()
+ if line == '# Sphinx inventory version 1':
+ return cls.load_v1(reader, uri, joinfunc)
+ elif line == '# Sphinx inventory version 2':
+ return cls.load_v2(reader, uri, joinfunc)
+ else:
+ raise ValueError('invalid inventory header: %s' % line)
+
+ @classmethod
+ def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
+ invdata: Inventory = {}
+ projname = stream.readline().rstrip()[11:]
+ version = stream.readline().rstrip()[11:]
+ for line in stream.readlines():
+ name, type, location = line.rstrip().split(None, 2)
+ location = join(uri, location)
+ # version 1 did not add anchors to the location
+ if type == 'mod':
+ type = 'py:module'
+ location += '#module-' + name
+ else:
+ type = 'py:' + type
+ location += '#' + name
+ invdata.setdefault(type, {})[name] = (projname, version, location, '-')
+ return invdata
+
+ @classmethod
+ def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
+ invdata: Inventory = {}
+ projname = stream.readline().rstrip()[11:]
+ version = stream.readline().rstrip()[11:]
+ line = stream.readline()
+ if 'zlib' not in line:
+ raise ValueError('invalid inventory header (not compressed): %s' % line)
+
+ for line in stream.read_compressed_lines():
+ # be careful to handle names with embedded spaces correctly
+ m = re.match(r'(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)',
+ line.rstrip(), flags=re.VERBOSE)
+ if not m:
+ continue
+ name, type, prio, location, dispname = m.groups()
+ if ':' not in type:
+ # wrong type value. type should be in the form of "{domain}:{objtype}"
+ #
+ # Note: To avoid the regex DoS, this is implemented in python (refs: #8175)
+ continue
+ if type == 'py:module' and type in invdata and name in invdata[type]:
+ # due to a bug in 1.1 and below,
+ # two inventory entries are created
+ # for Python modules, and the first
+ # one is correct
+ continue
+ if location.endswith('$'):
+ location = location[:-1] + name
+ location = join(uri, location)
+ inv_item: InventoryItem = projname, version, location, dispname
+ invdata.setdefault(type, {})[name] = inv_item
+ return invdata
+
+ @classmethod
+ def dump(cls, filename: str, env: BuildEnvironment, builder: Builder) -> None:
+ def escape(string: str) -> str:
+ return re.sub("\\s+", " ", string)
+
+ with open(os.path.join(filename), 'wb') as f:
+ # header
+ f.write(('# Sphinx inventory version 2\n'
+ '# Project: %s\n'
+ '# Version: %s\n'
+ '# The remainder of this file is compressed using zlib.\n' %
+ (escape(env.config.project),
+ escape(env.config.version))).encode())
+
+ # body
+ compressor = zlib.compressobj(9)
+ for domainname, domain in sorted(env.domains.items()):
+ for name, dispname, typ, docname, anchor, prio in \
+ sorted(domain.get_objects()):
+ if anchor.endswith(name):
+ # this can shorten the inventory by as much as 25%
+ anchor = anchor[:-len(name)] + '$'
+ uri = builder.get_target_uri(docname)
+ if anchor:
+ uri += '#' + anchor
+ if dispname == name:
+ dispname = '-'
+ entry = ('%s %s:%s %s %s %s\n' %
+ (name, domainname, typ, prio, uri, dispname))
+ f.write(compressor.compress(entry.encode()))
+ f.write(compressor.flush())
diff --git a/sphinx/util/logging.py b/sphinx/util/logging.py
new file mode 100644
index 0000000..429018a
--- /dev/null
+++ b/sphinx/util/logging.py
@@ -0,0 +1,602 @@
+"""Logging utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import logging
+import logging.handlers
+from collections import defaultdict
+from contextlib import contextmanager
+from typing import IO, TYPE_CHECKING, Any
+
+from docutils import nodes
+from docutils.utils import get_source_line
+
+from sphinx.errors import SphinxWarning
+from sphinx.util.console import colorize
+from sphinx.util.osutil import abspath
+
+if TYPE_CHECKING:
+ from collections.abc import Generator
+
+ from docutils.nodes import Node
+
+ from sphinx.application import Sphinx
+
+
+NAMESPACE = 'sphinx'
+VERBOSE = 15
+
+LEVEL_NAMES: defaultdict[str, int] = defaultdict(lambda: logging.WARNING, {
+ 'CRITICAL': logging.CRITICAL,
+ 'SEVERE': logging.CRITICAL,
+ 'ERROR': logging.ERROR,
+ 'WARNING': logging.WARNING,
+ 'INFO': logging.INFO,
+ 'VERBOSE': VERBOSE,
+ 'DEBUG': logging.DEBUG,
+})
+
+VERBOSITY_MAP: defaultdict[int, int] = defaultdict(lambda: logging.NOTSET, {
+ 0: logging.INFO,
+ 1: VERBOSE,
+ 2: logging.DEBUG,
+})
+
+COLOR_MAP: defaultdict[int, str] = defaultdict(lambda: 'blue', {
+ logging.ERROR: 'darkred',
+ logging.WARNING: 'red',
+ logging.DEBUG: 'darkgray',
+})
+
+
+def getLogger(name: str) -> SphinxLoggerAdapter:
+ """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`.
+
+ Sphinx logger always uses ``sphinx.*`` namespace to be independent from
+ settings of root logger. It ensures logging is consistent even if a
+ third-party extension or imported application resets logger settings.
+
+ Example usage::
+
+ >>> from sphinx.util import logging
+ >>> logger = logging.getLogger(__name__)
+ >>> logger.info('Hello, this is an extension!')
+ Hello, this is an extension!
+ """
+ # add sphinx prefix to name forcely
+ logger = logging.getLogger(NAMESPACE + '.' + name)
+ # Forcely enable logger
+ logger.disabled = False
+ # wrap logger by SphinxLoggerAdapter
+ return SphinxLoggerAdapter(logger, {})
+
+
+def convert_serializable(records: list[logging.LogRecord]) -> None:
+ """Convert LogRecord serializable."""
+ for r in records:
+ # extract arguments to a message and clear them
+ r.msg = r.getMessage()
+ r.args = ()
+
+ location = getattr(r, 'location', None)
+ if isinstance(location, nodes.Node):
+ r.location = get_node_location(location)
+
+
+class SphinxLogRecord(logging.LogRecord):
+ """Log record class supporting location"""
+ prefix = ''
+ location: Any = None
+
+ def getMessage(self) -> str:
+ message = super().getMessage()
+ location = getattr(self, 'location', None)
+ if location:
+ message = f'{location}: {self.prefix}{message}'
+ elif self.prefix not in message:
+ message = self.prefix + message
+
+ return message
+
+
+class SphinxInfoLogRecord(SphinxLogRecord):
+ """Info log record class supporting location"""
+ prefix = '' # do not show any prefix for INFO messages
+
+
+class SphinxWarningLogRecord(SphinxLogRecord):
+ """Warning log record class supporting location"""
+ @property
+ def prefix(self) -> str: # type: ignore[override]
+ if self.levelno >= logging.CRITICAL:
+ return 'CRITICAL: '
+ elif self.levelno >= logging.ERROR:
+ return 'ERROR: '
+ else:
+ return 'WARNING: '
+
+
+class SphinxLoggerAdapter(logging.LoggerAdapter):
+ """LoggerAdapter allowing ``type`` and ``subtype`` keywords."""
+ KEYWORDS = ['type', 'subtype', 'location', 'nonl', 'color', 'once']
+
+ def log( # type: ignore[override]
+ self, level: int | str, msg: str, *args: Any, **kwargs: Any,
+ ) -> None:
+ if isinstance(level, int):
+ super().log(level, msg, *args, **kwargs)
+ else:
+ levelno = LEVEL_NAMES[level]
+ super().log(levelno, msg, *args, **kwargs)
+
+ def verbose(self, msg: str, *args: Any, **kwargs: Any) -> None:
+ self.log(VERBOSE, msg, *args, **kwargs)
+
+ def process(self, msg: str, kwargs: dict) -> tuple[str, dict]: # type: ignore[override]
+ extra = kwargs.setdefault('extra', {})
+ for keyword in self.KEYWORDS:
+ if keyword in kwargs:
+ extra[keyword] = kwargs.pop(keyword)
+
+ return msg, kwargs
+
+ def handle(self, record: logging.LogRecord) -> None:
+ self.logger.handle(record)
+
+
+class WarningStreamHandler(logging.StreamHandler):
+ """StreamHandler for warnings."""
+ pass
+
+
+class NewLineStreamHandler(logging.StreamHandler):
+ """StreamHandler which switches line terminator by record.nonl flag."""
+
+ def emit(self, record: logging.LogRecord) -> None:
+ try:
+ self.acquire()
+ if getattr(record, 'nonl', False):
+ # skip appending terminator when nonl=True
+ self.terminator = ''
+ super().emit(record)
+ finally:
+ self.terminator = '\n'
+ self.release()
+
+
+class MemoryHandler(logging.handlers.BufferingHandler):
+ """Handler buffering all logs."""
+
+ buffer: list[logging.LogRecord]
+
+ def __init__(self) -> None:
+ super().__init__(-1)
+
+ def shouldFlush(self, record: logging.LogRecord) -> bool:
+ return False # never flush
+
+ def flush(self) -> None:
+ # suppress any flushes triggered by importing packages that flush
+ # all handlers at initialization time
+ pass
+
+ def flushTo(self, logger: logging.Logger) -> None:
+ self.acquire()
+ try:
+ for record in self.buffer:
+ logger.handle(record)
+ self.buffer = []
+ finally:
+ self.release()
+
+ def clear(self) -> list[logging.LogRecord]:
+ buffer, self.buffer = self.buffer, []
+ return buffer
+
+
+@contextmanager
+def pending_warnings() -> Generator[logging.Handler, None, None]:
+ """Context manager to postpone logging warnings temporarily.
+
+ Similar to :func:`pending_logging`.
+ """
+ logger = logging.getLogger(NAMESPACE)
+ memhandler = MemoryHandler()
+ memhandler.setLevel(logging.WARNING)
+
+ try:
+ handlers = []
+ for handler in logger.handlers[:]:
+ if isinstance(handler, WarningStreamHandler):
+ logger.removeHandler(handler)
+ handlers.append(handler)
+
+ logger.addHandler(memhandler)
+ yield memhandler
+ finally:
+ logger.removeHandler(memhandler)
+
+ for handler in handlers:
+ logger.addHandler(handler)
+
+ memhandler.flushTo(logger)
+
+
+@contextmanager
+def suppress_logging() -> Generator[MemoryHandler, None, None]:
+ """Context manager to suppress logging all logs temporarily.
+
+ For example::
+
+ >>> with suppress_logging():
+ >>> logger.warning('Warning message!') # suppressed
+ >>> some_long_process()
+ >>>
+ """
+ logger = logging.getLogger(NAMESPACE)
+ memhandler = MemoryHandler()
+
+ try:
+ handlers = []
+ for handler in logger.handlers[:]:
+ logger.removeHandler(handler)
+ handlers.append(handler)
+
+ logger.addHandler(memhandler)
+ yield memhandler
+ finally:
+ logger.removeHandler(memhandler)
+
+ for handler in handlers:
+ logger.addHandler(handler)
+
+
+@contextmanager
+def pending_logging() -> Generator[MemoryHandler, None, None]:
+ """Context manager to postpone logging all logs temporarily.
+
+ For example::
+
+ >>> with pending_logging():
+ >>> logger.warning('Warning message!') # not flushed yet
+ >>> some_long_process()
+ >>>
+ Warning message! # the warning is flushed here
+ """
+ logger = logging.getLogger(NAMESPACE)
+ try:
+ with suppress_logging() as memhandler:
+ yield memhandler
+ finally:
+ memhandler.flushTo(logger)
+
+
+@contextmanager
+def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]:
+ """Context manager to skip WarningIsErrorFilter temporarily."""
+ logger = logging.getLogger(NAMESPACE)
+
+ if skip is False:
+ yield
+ else:
+ try:
+ disabler = DisableWarningIsErrorFilter()
+ for handler in logger.handlers:
+ # use internal method; filters.insert() directly to install disabler
+ # before WarningIsErrorFilter
+ handler.filters.insert(0, disabler)
+ yield
+ finally:
+ for handler in logger.handlers:
+ handler.removeFilter(disabler)
+
+
+@contextmanager
+def prefixed_warnings(prefix: str) -> Generator[None, None, None]:
+ """Context manager to prepend prefix to all warning log records temporarily.
+
+ For example::
+
+ >>> with prefixed_warnings("prefix:"):
+ >>> logger.warning('Warning message!') # => prefix: Warning message!
+
+ .. versionadded:: 2.0
+ """
+ logger = logging.getLogger(NAMESPACE)
+ warning_handler = None
+ for handler in logger.handlers:
+ if isinstance(handler, WarningStreamHandler):
+ warning_handler = handler
+ break
+ else:
+ # warning stream not found
+ yield
+ return
+
+ prefix_filter = None
+ for _filter in warning_handler.filters:
+ if isinstance(_filter, MessagePrefixFilter):
+ prefix_filter = _filter
+ break
+
+ if prefix_filter:
+ # already prefixed
+ try:
+ previous = prefix_filter.prefix
+ prefix_filter.prefix = prefix
+ yield
+ finally:
+ prefix_filter.prefix = previous
+ else:
+ # not prefixed yet
+ prefix_filter = MessagePrefixFilter(prefix)
+ try:
+ warning_handler.addFilter(prefix_filter)
+ yield
+ finally:
+ warning_handler.removeFilter(prefix_filter)
+
+
+class LogCollector:
+ def __init__(self) -> None:
+ self.logs: list[logging.LogRecord] = []
+
+ @contextmanager
+ def collect(self) -> Generator[None, None, None]:
+ with pending_logging() as memhandler:
+ yield
+
+ self.logs = memhandler.clear()
+
+
+class InfoFilter(logging.Filter):
+ """Filter error and warning messages."""
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ return record.levelno < logging.WARNING
+
+
+def is_suppressed_warning(type: str, subtype: str, suppress_warnings: list[str]) -> bool:
+ """Check whether the warning is suppressed or not."""
+ if type is None:
+ return False
+
+ subtarget: str | None
+
+ for warning_type in suppress_warnings:
+ if '.' in warning_type:
+ target, subtarget = warning_type.split('.', 1)
+ else:
+ target, subtarget = warning_type, None
+
+ if target == type and subtarget in (None, subtype, "*"):
+ return True
+
+ return False
+
+
+class WarningSuppressor(logging.Filter):
+ """Filter logs by `suppress_warnings`."""
+
+ def __init__(self, app: Sphinx) -> None:
+ self.app = app
+ super().__init__()
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ type = getattr(record, 'type', '')
+ subtype = getattr(record, 'subtype', '')
+
+ try:
+ suppress_warnings = self.app.config.suppress_warnings
+ except AttributeError:
+ # config is not initialized yet (ex. in conf.py)
+ suppress_warnings = []
+
+ if is_suppressed_warning(type, subtype, suppress_warnings):
+ return False
+ else:
+ self.app._warncount += 1
+ return True
+
+
+class WarningIsErrorFilter(logging.Filter):
+ """Raise exception if warning emitted."""
+
+ def __init__(self, app: Sphinx) -> None:
+ self.app = app
+ super().__init__()
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ if getattr(record, 'skip_warningsiserror', False):
+ # disabled by DisableWarningIsErrorFilter
+ return True
+ elif self.app.warningiserror:
+ location = getattr(record, 'location', '')
+ try:
+ message = record.msg % record.args
+ except (TypeError, ValueError):
+ message = record.msg # use record.msg itself
+
+ if location:
+ exc = SphinxWarning(location + ":" + str(message))
+ else:
+ exc = SphinxWarning(message)
+ if record.exc_info is not None:
+ raise exc from record.exc_info[1]
+ else:
+ raise exc
+ else:
+ return True
+
+
+class DisableWarningIsErrorFilter(logging.Filter):
+ """Disable WarningIsErrorFilter if this filter installed."""
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ record.skip_warningsiserror = True
+ return True
+
+
+class MessagePrefixFilter(logging.Filter):
+ """Prepend prefix to all log records."""
+
+ def __init__(self, prefix: str) -> None:
+ self.prefix = prefix
+ super().__init__()
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ if self.prefix:
+ record.msg = self.prefix + ' ' + record.msg
+ return True
+
+
+class OnceFilter(logging.Filter):
+ """Show the message only once."""
+
+ def __init__(self, name: str = '') -> None:
+ super().__init__(name)
+ self.messages: dict[str, list] = {}
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ once = getattr(record, 'once', '')
+ if not once:
+ return True
+ else:
+ params = self.messages.setdefault(record.msg, [])
+ if record.args in params:
+ return False
+
+ params.append(record.args)
+ return True
+
+
+class SphinxLogRecordTranslator(logging.Filter):
+ """Converts a log record to one Sphinx expects
+
+ * Make a instance of SphinxLogRecord
+ * docname to path if location given
+ """
+ LogRecordClass: type[logging.LogRecord]
+
+ def __init__(self, app: Sphinx) -> None:
+ self.app = app
+ super().__init__()
+
+ def filter(self, record: SphinxWarningLogRecord) -> bool: # type: ignore[override]
+ if isinstance(record, logging.LogRecord):
+ # force subclassing to handle location
+ record.__class__ = self.LogRecordClass # type: ignore[assignment]
+
+ location = getattr(record, 'location', None)
+ if isinstance(location, tuple):
+ docname, lineno = location
+ if docname:
+ if lineno:
+ record.location = f'{self.app.env.doc2path(docname)}:{lineno}'
+ else:
+ record.location = f'{self.app.env.doc2path(docname)}'
+ else:
+ record.location = None
+ elif isinstance(location, nodes.Node):
+ record.location = get_node_location(location)
+ elif location and ':' not in location:
+ record.location = f'{self.app.env.doc2path(location)}'
+
+ return True
+
+
+class InfoLogRecordTranslator(SphinxLogRecordTranslator):
+ """LogRecordTranslator for INFO level log records."""
+ LogRecordClass = SphinxInfoLogRecord
+
+
+class WarningLogRecordTranslator(SphinxLogRecordTranslator):
+ """LogRecordTranslator for WARNING level log records."""
+ LogRecordClass = SphinxWarningLogRecord
+
+
+def get_node_location(node: Node) -> str | None:
+ source, line = get_source_line(node)
+ if source:
+ source = abspath(source)
+ if source and line:
+ return f"{source}:{line}"
+ if source:
+ return f"{source}:"
+ if line:
+ return f"<unknown>:{line}"
+ return None
+
+
+class ColorizeFormatter(logging.Formatter):
+ def format(self, record: logging.LogRecord) -> str:
+ message = super().format(record)
+ color = getattr(record, 'color', None)
+ if color is None:
+ color = COLOR_MAP.get(record.levelno)
+
+ if color:
+ return colorize(color, message)
+ else:
+ return message
+
+
+class SafeEncodingWriter:
+ """Stream writer which ignores UnicodeEncodeError silently"""
+ def __init__(self, stream: IO) -> None:
+ self.stream = stream
+ self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii'
+
+ def write(self, data: str) -> None:
+ try:
+ self.stream.write(data)
+ except UnicodeEncodeError:
+ # stream accept only str, not bytes. So, we encode and replace
+ # non-encodable characters, then decode them.
+ self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding))
+
+ def flush(self) -> None:
+ if hasattr(self.stream, 'flush'):
+ self.stream.flush()
+
+
+class LastMessagesWriter:
+ """Stream writer storing last 10 messages in memory to save trackback"""
+ def __init__(self, app: Sphinx, stream: IO) -> None:
+ self.app = app
+
+ def write(self, data: str) -> None:
+ self.app.messagelog.append(data)
+
+
+def setup(app: Sphinx, status: IO, warning: IO) -> None:
+ """Setup root logger for Sphinx"""
+ logger = logging.getLogger(NAMESPACE)
+ logger.setLevel(logging.DEBUG)
+ logger.propagate = False
+
+ # clear all handlers
+ for handler in logger.handlers[:]:
+ logger.removeHandler(handler)
+
+ info_handler = NewLineStreamHandler(SafeEncodingWriter(status))
+ info_handler.addFilter(InfoFilter())
+ info_handler.addFilter(InfoLogRecordTranslator(app))
+ info_handler.setLevel(VERBOSITY_MAP[app.verbosity])
+ info_handler.setFormatter(ColorizeFormatter())
+
+ warning_handler = WarningStreamHandler(SafeEncodingWriter(warning))
+ warning_handler.addFilter(WarningSuppressor(app))
+ warning_handler.addFilter(WarningLogRecordTranslator(app))
+ warning_handler.addFilter(WarningIsErrorFilter(app))
+ warning_handler.addFilter(OnceFilter())
+ warning_handler.setLevel(logging.WARNING)
+ warning_handler.setFormatter(ColorizeFormatter())
+
+ messagelog_handler = logging.StreamHandler(LastMessagesWriter(app, status))
+ messagelog_handler.addFilter(InfoFilter())
+ messagelog_handler.setLevel(VERBOSITY_MAP[app.verbosity])
+
+ logger.addHandler(info_handler)
+ logger.addHandler(warning_handler)
+ logger.addHandler(messagelog_handler)
diff --git a/sphinx/util/matching.py b/sphinx/util/matching.py
new file mode 100644
index 0000000..dd91905
--- /dev/null
+++ b/sphinx/util/matching.py
@@ -0,0 +1,169 @@
+"""Pattern-matching utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import os.path
+import re
+from typing import TYPE_CHECKING, Callable
+
+from sphinx.util.osutil import canon_path, path_stabilize
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable, Iterator
+
+
+def _translate_pattern(pat: str) -> str:
+ """Translate a shell-style glob pattern to a regular expression.
+
+ Adapted from the fnmatch module, but enhanced so that single stars don't
+ match slashes.
+ """
+ i, n = 0, len(pat)
+ res = ''
+ while i < n:
+ c = pat[i]
+ i += 1
+ if c == '*':
+ if i < n and pat[i] == '*':
+ # double star matches slashes too
+ i += 1
+ res = res + '.*'
+ else:
+ # single star doesn't match slashes
+ res = res + '[^/]*'
+ elif c == '?':
+ # question mark doesn't match slashes too
+ res = res + '[^/]'
+ elif c == '[':
+ j = i
+ if j < n and pat[j] == '!':
+ j += 1
+ if j < n and pat[j] == ']':
+ j += 1
+ while j < n and pat[j] != ']':
+ j += 1
+ if j >= n:
+ res = res + '\\['
+ else:
+ stuff = pat[i:j].replace('\\', '\\\\')
+ i = j + 1
+ if stuff[0] == '!':
+ # negative pattern mustn't match slashes too
+ stuff = '^/' + stuff[1:]
+ elif stuff[0] == '^':
+ stuff = '\\' + stuff
+ res = f'{res}[{stuff}]'
+ else:
+ res += re.escape(c)
+ return res + '$'
+
+
+def compile_matchers(
+ patterns: Iterable[str],
+) -> list[Callable[[str], re.Match[str] | None]]:
+ return [re.compile(_translate_pattern(pat)).match for pat in patterns]
+
+
+class Matcher:
+ """A pattern matcher for Multiple shell-style glob patterns.
+
+ Note: this modifies the patterns to work with copy_asset().
+ For example, "**/index.rst" matches with "index.rst"
+ """
+
+ def __init__(self, exclude_patterns: Iterable[str]) -> None:
+ expanded = [pat[3:] for pat in exclude_patterns if pat.startswith('**/')]
+ self.patterns = compile_matchers(list(exclude_patterns) + expanded)
+
+ def __call__(self, string: str) -> bool:
+ return self.match(string)
+
+ def match(self, string: str) -> bool:
+ string = canon_path(string)
+ return any(pat(string) for pat in self.patterns)
+
+
+DOTFILES = Matcher(['**/.*'])
+
+
+_pat_cache: dict[str, re.Pattern[str]] = {}
+
+
+def patmatch(name: str, pat: str) -> re.Match[str] | None:
+ """Return if name matches the regular expression (pattern)
+ ``pat```. Adapted from fnmatch module."""
+ if pat not in _pat_cache:
+ _pat_cache[pat] = re.compile(_translate_pattern(pat))
+ return _pat_cache[pat].match(name)
+
+
+def patfilter(names: Iterable[str], pat: str) -> list[str]:
+ """Return the subset of the list ``names`` that match
+ the regular expression (pattern) ``pat``.
+
+ Adapted from fnmatch module.
+ """
+ if pat not in _pat_cache:
+ _pat_cache[pat] = re.compile(_translate_pattern(pat))
+ match = _pat_cache[pat].match
+ return list(filter(match, names))
+
+
+def get_matching_files(
+ dirname: str | os.PathLike[str],
+ include_patterns: Iterable[str] = ("**",),
+ exclude_patterns: Iterable[str] = (),
+) -> Iterator[str]:
+ """Get all file names in a directory, recursively.
+
+ Filter file names by the glob-style include_patterns and exclude_patterns.
+ The default values include all files ("**") and exclude nothing ("").
+
+ Only files matching some pattern in *include_patterns* are included, and
+ exclusions from *exclude_patterns* take priority over inclusions.
+
+ """
+ # dirname is a normalized absolute path.
+ dirname = os.path.normpath(os.path.abspath(dirname))
+
+ exclude_matchers = compile_matchers(exclude_patterns)
+ include_matchers = compile_matchers(include_patterns)
+
+ for root, dirs, files in os.walk(dirname, followlinks=True):
+ relative_root = os.path.relpath(root, dirname)
+ if relative_root == ".":
+ relative_root = "" # suppress dirname for files on the target dir
+
+ # Filter files
+ included_files = []
+ for entry in sorted(files):
+ entry = path_stabilize(os.path.join(relative_root, entry))
+ keep = False
+ for matcher in include_matchers:
+ if matcher(entry):
+ keep = True
+ break # break the inner loop
+
+ for matcher in exclude_matchers:
+ if matcher(entry):
+ keep = False
+ break # break the inner loop
+
+ if keep:
+ included_files.append(entry)
+
+ # Filter directories
+ filtered_dirs = []
+ for dir_name in sorted(dirs):
+ normalised = path_stabilize(os.path.join(relative_root, dir_name))
+ for matcher in exclude_matchers:
+ if matcher(normalised):
+ break # break the inner loop
+ else:
+ # if the loop didn't break
+ filtered_dirs.append(dir_name)
+
+ dirs[:] = filtered_dirs
+
+ # Yield filtered files
+ yield from included_files
diff --git a/sphinx/util/math.py b/sphinx/util/math.py
new file mode 100644
index 0000000..ef0eb39
--- /dev/null
+++ b/sphinx/util/math.py
@@ -0,0 +1,61 @@
+"""Utility functions for math."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from docutils import nodes
+
+ from sphinx.builders.html import HTML5Translator
+
+
+def get_node_equation_number(writer: HTML5Translator, node: nodes.math_block) -> str:
+ if writer.builder.config.math_numfig and writer.builder.config.numfig:
+ figtype = 'displaymath'
+ if writer.builder.name == 'singlehtml':
+ key = f"{writer.docnames[-1]}/{figtype}" # type: ignore[has-type]
+ else:
+ key = figtype
+
+ id = node['ids'][0]
+ number = writer.builder.fignumbers.get(key, {}).get(id, ())
+ return '.'.join(map(str, number))
+ else:
+ return node['number']
+
+
+def wrap_displaymath(text: str, label: str | None, numbering: bool) -> str:
+ def is_equation(part: str) -> str:
+ return part.strip()
+
+ if label is None:
+ labeldef = ''
+ else:
+ labeldef = r'\label{%s}' % label
+ numbering = True
+
+ parts = list(filter(is_equation, text.split('\n\n')))
+ equations = []
+ if len(parts) == 0:
+ return ''
+ elif len(parts) == 1:
+ if numbering:
+ begin = r'\begin{equation}' + labeldef
+ end = r'\end{equation}'
+ else:
+ begin = r'\begin{equation*}' + labeldef
+ end = r'\end{equation*}'
+ equations.append('\\begin{split}%s\\end{split}\n' % parts[0])
+ else:
+ if numbering:
+ begin = r'\begin{align}%s\!\begin{aligned}' % labeldef
+ end = r'\end{aligned}\end{align}'
+ else:
+ begin = r'\begin{align*}%s\!\begin{aligned}' % labeldef
+ end = r'\end{aligned}\end{align*}'
+ for part in parts:
+ equations.append('%s\\\\\n' % part.strip())
+
+ concatenated_equations = ''.join(equations)
+ return f'{begin}\n{concatenated_equations}{end}'
diff --git a/sphinx/util/nodes.py b/sphinx/util/nodes.py
new file mode 100644
index 0000000..b68b7fd
--- /dev/null
+++ b/sphinx/util/nodes.py
@@ -0,0 +1,672 @@
+"""Docutils node-related utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import contextlib
+import re
+import unicodedata
+from typing import TYPE_CHECKING, Any, Callable
+
+from docutils import nodes
+
+from sphinx import addnodes
+from sphinx.locale import __
+from sphinx.util import logging
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
+ from docutils.nodes import Element, Node
+ from docutils.parsers.rst import Directive
+ from docutils.parsers.rst.states import Inliner
+ from docutils.statemachine import StringList
+
+ from sphinx.builders import Builder
+ from sphinx.environment import BuildEnvironment
+ from sphinx.util.tags import Tags
+
+logger = logging.getLogger(__name__)
+
+
+# \x00 means the "<" was backslash-escaped
+explicit_title_re = re.compile(r'^(.+?)\s*(?<!\x00)<([^<]*?)>$', re.DOTALL)
+caption_ref_re = explicit_title_re # b/w compat alias
+
+
+class NodeMatcher:
+ """A helper class for Node.findall().
+
+ It checks that the given node is an instance of the specified node-classes and
+ has the specified node-attributes.
+
+ For example, following example searches ``reference`` node having ``refdomain``
+ and ``reftype`` attributes::
+
+ matcher = NodeMatcher(nodes.reference, refdomain='std', reftype='citation')
+ doctree.findall(matcher)
+ # => [<reference ...>, <reference ...>, ...]
+
+ A special value ``typing.Any`` matches any kind of node-attributes. For example,
+ following example searches ``reference`` node having ``refdomain`` attributes::
+
+ from __future__ import annotations
+from typing import TYPE_CHECKING, Any
+ matcher = NodeMatcher(nodes.reference, refdomain=Any)
+ doctree.findall(matcher)
+ # => [<reference ...>, <reference ...>, ...]
+ """
+
+ def __init__(self, *node_classes: type[Node], **attrs: Any) -> None:
+ self.classes = node_classes
+ self.attrs = attrs
+
+ def match(self, node: Node) -> bool:
+ try:
+ if self.classes and not isinstance(node, self.classes):
+ return False
+
+ if self.attrs:
+ if not isinstance(node, nodes.Element):
+ return False
+
+ for key, value in self.attrs.items():
+ if key not in node:
+ return False
+ elif value is Any:
+ continue
+ elif node.get(key) != value:
+ return False
+
+ return True
+ except Exception:
+ # for non-Element nodes
+ return False
+
+ def __call__(self, node: Node) -> bool:
+ return self.match(node)
+
+
+def get_full_module_name(node: Node) -> str:
+ """
+ Return full module dotted path like: 'docutils.nodes.paragraph'
+
+ :param nodes.Node node: target node
+ :return: full module dotted path
+ """
+ return f'{node.__module__}.{node.__class__.__name__}'
+
+
+def repr_domxml(node: Node, length: int = 80) -> str:
+ """
+ return DOM XML representation of the specified node like:
+ '<paragraph translatable="False"><inline classes="versionmodified">New in version...'
+
+ :param nodes.Node node: target node
+ :param int length:
+ length of return value to be striped. if false-value is specified, repr_domxml
+ returns full of DOM XML representation.
+ :return: DOM XML representation
+ """
+ try:
+ text = node.asdom().toxml()
+ except Exception:
+ text = str(node)
+ if length and len(text) > length:
+ text = text[:length] + '...'
+ return text
+
+
+def apply_source_workaround(node: Element) -> None:
+ # workaround: nodes.term have wrong rawsource if classifier is specified.
+ # The behavior of docutils-0.11, 0.12 is:
+ # * when ``term text : classifier1 : classifier2`` is specified,
+ # * rawsource of term node will have: ``term text : classifier1 : classifier2``
+ # * rawsource of classifier node will be None
+ if isinstance(node, nodes.classifier) and not node.rawsource:
+ logger.debug('[i18n] PATCH: %r to have source, line and rawsource: %s',
+ get_full_module_name(node), repr_domxml(node))
+ definition_list_item = node.parent
+ node.source = definition_list_item.source
+ node.line = definition_list_item.line - 1
+ node.rawsource = node.astext() # set 'classifier1' (or 'classifier2')
+ elif isinstance(node, nodes.classifier) and not node.source:
+ # docutils-0.15 fills in rawsource attribute, but not in source.
+ node.source = node.parent.source
+ if isinstance(node, nodes.image) and node.source is None:
+ logger.debug('[i18n] PATCH: %r to have source, line: %s',
+ get_full_module_name(node), repr_domxml(node))
+ node.source, node.line = node.parent.source, node.parent.line
+ if isinstance(node, nodes.title) and node.source is None:
+ logger.debug('[i18n] PATCH: %r to have source: %s',
+ get_full_module_name(node), repr_domxml(node))
+ node.source, node.line = node.parent.source, node.parent.line
+ if isinstance(node, nodes.term):
+ logger.debug('[i18n] PATCH: %r to have rawsource: %s',
+ get_full_module_name(node), repr_domxml(node))
+ # strip classifier from rawsource of term
+ for classifier in reversed(list(node.parent.findall(nodes.classifier))):
+ node.rawsource = re.sub(r'\s*:\s*%s' % re.escape(classifier.astext()),
+ '', node.rawsource)
+ if isinstance(node, nodes.topic) and node.source is None:
+ # docutils-0.18 does not fill the source attribute of topic
+ logger.debug('[i18n] PATCH: %r to have source, line: %s',
+ get_full_module_name(node), repr_domxml(node))
+ node.source, node.line = node.parent.source, node.parent.line
+
+ # workaround: literal_block under bullet list (#4913)
+ if isinstance(node, nodes.literal_block) and node.source is None:
+ with contextlib.suppress(ValueError):
+ node.source = get_node_source(node)
+
+ # workaround: recommonmark-0.2.0 doesn't set rawsource attribute
+ if not node.rawsource:
+ node.rawsource = node.astext()
+
+ if node.source and node.rawsource:
+ return
+
+ # workaround: some docutils nodes doesn't have source, line.
+ if (isinstance(node, (
+ nodes.rubric, # #1305 rubric directive
+ nodes.line, # #1477 line node
+ nodes.image, # #3093 image directive in substitution
+ nodes.field_name, # #3335 field list syntax
+ ))):
+ logger.debug('[i18n] PATCH: %r to have source and line: %s',
+ get_full_module_name(node), repr_domxml(node))
+ try:
+ node.source = get_node_source(node)
+ except ValueError:
+ node.source = ''
+ node.line = 0 # need fix docutils to get `node.line`
+ return
+
+
+IGNORED_NODES = (
+ nodes.Invisible,
+ nodes.literal_block,
+ nodes.doctest_block,
+ addnodes.versionmodified,
+ # XXX there are probably more
+)
+
+
+def is_translatable(node: Node) -> bool:
+ if isinstance(node, addnodes.translatable):
+ return True
+
+ # image node marked as translatable or having alt text
+ if isinstance(node, nodes.image) and (node.get('translatable') or node.get('alt')):
+ return True
+
+ if isinstance(node, nodes.Inline) and 'translatable' not in node: # type: ignore[operator]
+ # inline node must not be translated if 'translatable' is not set
+ return False
+
+ if isinstance(node, nodes.TextElement):
+ if not node.source:
+ logger.debug('[i18n] SKIP %r because no node.source: %s',
+ get_full_module_name(node), repr_domxml(node))
+ return False # built-in message
+ if isinstance(node, IGNORED_NODES) and 'translatable' not in node:
+ logger.debug("[i18n] SKIP %r because node is in IGNORED_NODES "
+ "and no node['translatable']: %s",
+ get_full_module_name(node), repr_domxml(node))
+ return False
+ if not node.get('translatable', True):
+ # not(node['translatable'] == True or node['translatable'] is None)
+ logger.debug("[i18n] SKIP %r because not node['translatable']: %s",
+ get_full_module_name(node), repr_domxml(node))
+ return False
+ # <field_name>orphan</field_name>
+ # XXX ignore all metadata (== docinfo)
+ if isinstance(node, nodes.field_name) and node.children[0] == 'orphan':
+ logger.debug('[i18n] SKIP %r because orphan node: %s',
+ get_full_module_name(node), repr_domxml(node))
+ return False
+ return True
+
+ if isinstance(node, nodes.meta): # type: ignore[attr-defined]
+ return True
+
+ return False
+
+
+LITERAL_TYPE_NODES = (
+ nodes.literal_block,
+ nodes.doctest_block,
+ nodes.math_block,
+ nodes.raw,
+)
+IMAGE_TYPE_NODES = (
+ nodes.image,
+)
+
+
+def extract_messages(doctree: Element) -> Iterable[tuple[Element, str]]:
+ """Extract translatable messages from a document tree."""
+ for node in doctree.findall(is_translatable): # type: Element
+ if isinstance(node, addnodes.translatable):
+ for msg in node.extract_original_messages():
+ yield node, msg
+ continue
+ if isinstance(node, LITERAL_TYPE_NODES):
+ msg = node.rawsource
+ if not msg:
+ msg = node.astext()
+ elif isinstance(node, nodes.image):
+ if node.get('alt'):
+ yield node, node['alt']
+ if node.get('translatable'):
+ image_uri = node.get('original_uri', node['uri'])
+ msg = f'.. image:: {image_uri}'
+ else:
+ msg = ''
+ elif isinstance(node, nodes.meta): # type: ignore[attr-defined]
+ msg = node["content"]
+ else:
+ msg = node.rawsource.replace('\n', ' ').strip()
+
+ # XXX nodes rendering empty are likely a bug in sphinx.addnodes
+ if msg:
+ yield node, msg
+
+
+def get_node_source(node: Element) -> str:
+ for pnode in traverse_parent(node):
+ if pnode.source:
+ return pnode.source
+ msg = 'node source not found'
+ raise ValueError(msg)
+
+
+def get_node_line(node: Element) -> int:
+ for pnode in traverse_parent(node):
+ if pnode.line:
+ return pnode.line
+ msg = 'node line not found'
+ raise ValueError(msg)
+
+
+def traverse_parent(node: Element, cls: Any = None) -> Iterable[Element]:
+ while node:
+ if cls is None or isinstance(node, cls):
+ yield node
+ node = node.parent
+
+
+def get_prev_node(node: Node) -> Node | None:
+ pos = node.parent.index(node)
+ if pos > 0:
+ return node.parent[pos - 1]
+ else:
+ return None
+
+
+def traverse_translatable_index(
+ doctree: Element,
+) -> Iterable[tuple[Element, list[tuple[str, str, str, str, str | None]]]]:
+ """Traverse translatable index node from a document tree."""
+ matcher = NodeMatcher(addnodes.index, inline=False)
+ for node in doctree.findall(matcher): # type: addnodes.index
+ if 'raw_entries' in node:
+ entries = node['raw_entries']
+ else:
+ entries = node['entries']
+ yield node, entries
+
+
+def nested_parse_with_titles(state: Any, content: StringList, node: Node,
+ content_offset: int = 0) -> str:
+ """Version of state.nested_parse() that allows titles and does not require
+ titles to have the same decoration as the calling document.
+
+ This is useful when the parsed content comes from a completely different
+ context, such as docstrings.
+ """
+ # hack around title style bookkeeping
+ surrounding_title_styles = state.memo.title_styles
+ surrounding_section_level = state.memo.section_level
+ state.memo.title_styles = []
+ state.memo.section_level = 0
+ try:
+ return state.nested_parse(content, content_offset, node, match_titles=1)
+ finally:
+ state.memo.title_styles = surrounding_title_styles
+ state.memo.section_level = surrounding_section_level
+
+
+def clean_astext(node: Element) -> str:
+ """Like node.astext(), but ignore images."""
+ node = node.deepcopy()
+ for img in node.findall(nodes.image):
+ img['alt'] = ''
+ for raw in list(node.findall(nodes.raw)):
+ raw.parent.remove(raw)
+ return node.astext()
+
+
+def split_explicit_title(text: str) -> tuple[bool, str, str]:
+ """Split role content into title and target, if given."""
+ match = explicit_title_re.match(text)
+ if match:
+ return True, match.group(1), match.group(2)
+ return False, text, text
+
+
+indextypes = [
+ 'single', 'pair', 'double', 'triple', 'see', 'seealso',
+]
+
+
+def process_index_entry(entry: str, targetid: str,
+ ) -> list[tuple[str, str, str, str, str | None]]:
+ from sphinx.domains.python import pairindextypes
+
+ indexentries: list[tuple[str, str, str, str, str | None]] = []
+ entry = entry.strip()
+ oentry = entry
+ main = ''
+ if entry.startswith('!'):
+ main = 'main'
+ entry = entry[1:].lstrip()
+ for index_type in pairindextypes:
+ if entry.startswith(f'{index_type}:'):
+ value = entry[len(index_type) + 1:].strip()
+ value = f'{pairindextypes[index_type]}; {value}'
+ # xref RemovedInSphinx90Warning
+ logger.warning(__('%r is deprecated for index entries (from entry %r). '
+ "Use 'pair: %s' instead."),
+ index_type, entry, value, type='index')
+ indexentries.append(('pair', value, targetid, main, None))
+ break
+ else:
+ for index_type in indextypes:
+ if entry.startswith(f'{index_type}:'):
+ value = entry[len(index_type) + 1:].strip()
+ if index_type == 'double':
+ index_type = 'pair'
+ indexentries.append((index_type, value, targetid, main, None))
+ break
+ # shorthand notation for single entries
+ else:
+ for value in oentry.split(','):
+ value = value.strip()
+ main = ''
+ if value.startswith('!'):
+ main = 'main'
+ value = value[1:].lstrip()
+ if not value:
+ continue
+ indexentries.append(('single', value, targetid, main, None))
+ return indexentries
+
+
+def inline_all_toctrees(builder: Builder, docnameset: set[str], docname: str,
+ tree: nodes.document, colorfunc: Callable, traversed: list[str],
+ ) -> nodes.document:
+ """Inline all toctrees in the *tree*.
+
+ Record all docnames in *docnameset*, and output docnames with *colorfunc*.
+ """
+ tree = tree.deepcopy()
+ for toctreenode in list(tree.findall(addnodes.toctree)):
+ newnodes = []
+ includefiles = map(str, toctreenode['includefiles'])
+ for includefile in includefiles:
+ if includefile not in traversed:
+ try:
+ traversed.append(includefile)
+ logger.info(colorfunc(includefile) + " ", nonl=True)
+ subtree = inline_all_toctrees(builder, docnameset, includefile,
+ builder.env.get_doctree(includefile),
+ colorfunc, traversed)
+ docnameset.add(includefile)
+ except Exception:
+ logger.warning(__('toctree contains ref to nonexisting file %r'),
+ includefile, location=docname)
+ else:
+ sof = addnodes.start_of_file(docname=includefile)
+ sof.children = subtree.children
+ for sectionnode in sof.findall(nodes.section):
+ if 'docname' not in sectionnode:
+ sectionnode['docname'] = includefile
+ newnodes.append(sof)
+ toctreenode.parent.replace(toctreenode, newnodes)
+ return tree
+
+
+def _make_id(string: str) -> str:
+ """Convert `string` into an identifier and return it.
+
+ This function is a modified version of ``docutils.nodes.make_id()`` of
+ docutils-0.16.
+
+ Changes:
+
+ * Allow to use capital alphabet characters
+ * Allow to use dots (".") and underscores ("_") for an identifier
+ without a leading character.
+
+ # Author: David Goodger <goodger@python.org>
+ # Maintainer: docutils-develop@lists.sourceforge.net
+ # Copyright: This module has been placed in the public domain.
+ """
+ id = string.translate(_non_id_translate_digraphs)
+ id = id.translate(_non_id_translate)
+ # get rid of non-ascii characters.
+ # 'ascii' lowercase to prevent problems with turkish locale.
+ id = unicodedata.normalize('NFKD', id).encode('ascii', 'ignore').decode('ascii')
+ # shrink runs of whitespace and replace by hyphen
+ id = _non_id_chars.sub('-', ' '.join(id.split()))
+ id = _non_id_at_ends.sub('', id)
+ return str(id)
+
+
+_non_id_chars = re.compile('[^a-zA-Z0-9._]+')
+_non_id_at_ends = re.compile('^[-0-9._]+|-+$')
+_non_id_translate = {
+ 0x00f8: 'o', # o with stroke
+ 0x0111: 'd', # d with stroke
+ 0x0127: 'h', # h with stroke
+ 0x0131: 'i', # dotless i
+ 0x0142: 'l', # l with stroke
+ 0x0167: 't', # t with stroke
+ 0x0180: 'b', # b with stroke
+ 0x0183: 'b', # b with topbar
+ 0x0188: 'c', # c with hook
+ 0x018c: 'd', # d with topbar
+ 0x0192: 'f', # f with hook
+ 0x0199: 'k', # k with hook
+ 0x019a: 'l', # l with bar
+ 0x019e: 'n', # n with long right leg
+ 0x01a5: 'p', # p with hook
+ 0x01ab: 't', # t with palatal hook
+ 0x01ad: 't', # t with hook
+ 0x01b4: 'y', # y with hook
+ 0x01b6: 'z', # z with stroke
+ 0x01e5: 'g', # g with stroke
+ 0x0225: 'z', # z with hook
+ 0x0234: 'l', # l with curl
+ 0x0235: 'n', # n with curl
+ 0x0236: 't', # t with curl
+ 0x0237: 'j', # dotless j
+ 0x023c: 'c', # c with stroke
+ 0x023f: 's', # s with swash tail
+ 0x0240: 'z', # z with swash tail
+ 0x0247: 'e', # e with stroke
+ 0x0249: 'j', # j with stroke
+ 0x024b: 'q', # q with hook tail
+ 0x024d: 'r', # r with stroke
+ 0x024f: 'y', # y with stroke
+}
+_non_id_translate_digraphs = {
+ 0x00df: 'sz', # ligature sz
+ 0x00e6: 'ae', # ae
+ 0x0153: 'oe', # ligature oe
+ 0x0238: 'db', # db digraph
+ 0x0239: 'qp', # qp digraph
+}
+
+
+def make_id(env: BuildEnvironment, document: nodes.document,
+ prefix: str = '', term: str | None = None) -> str:
+ """Generate an appropriate node_id for given *prefix* and *term*."""
+ node_id = None
+ if prefix:
+ idformat = prefix + "-%s"
+ else:
+ idformat = (document.settings.id_prefix or "id") + "%s"
+
+ # try to generate node_id by *term*
+ if prefix and term:
+ node_id = _make_id(idformat % term)
+ if node_id == prefix:
+ # *term* is not good to generate a node_id.
+ node_id = None
+ elif term:
+ node_id = _make_id(term)
+ if node_id == '':
+ node_id = None # fallback to None
+
+ while node_id is None or node_id in document.ids:
+ node_id = idformat % env.new_serialno(prefix)
+
+ return node_id
+
+
+def find_pending_xref_condition(node: addnodes.pending_xref, condition: str,
+ ) -> Element | None:
+ """Pick matched pending_xref_condition node up from the pending_xref."""
+ for subnode in node:
+ if (isinstance(subnode, addnodes.pending_xref_condition) and
+ subnode.get('condition') == condition):
+ return subnode
+ return None
+
+
+def make_refnode(builder: Builder, fromdocname: str, todocname: str, targetid: str | None,
+ child: Node | list[Node], title: str | None = None,
+ ) -> nodes.reference:
+ """Shortcut to create a reference node."""
+ node = nodes.reference('', '', internal=True)
+ if fromdocname == todocname and targetid:
+ node['refid'] = targetid
+ else:
+ if targetid:
+ node['refuri'] = (builder.get_relative_uri(fromdocname, todocname) +
+ '#' + targetid)
+ else:
+ node['refuri'] = builder.get_relative_uri(fromdocname, todocname)
+ if title:
+ node['reftitle'] = title
+ node += child
+ return node
+
+
+def set_source_info(directive: Directive, node: Node) -> None:
+ node.source, node.line = \
+ directive.state_machine.get_source_and_line(directive.lineno)
+
+
+def set_role_source_info(inliner: Inliner, lineno: int, node: Node) -> None:
+ gsal = inliner.reporter.get_source_and_line # type: ignore[attr-defined]
+ node.source, node.line = gsal(lineno)
+
+
+def copy_source_info(src: Element, dst: Element) -> None:
+ with contextlib.suppress(ValueError):
+ dst.source = get_node_source(src)
+ dst.line = get_node_line(src)
+
+
+NON_SMARTQUOTABLE_PARENT_NODES = (
+ nodes.FixedTextElement,
+ nodes.literal,
+ nodes.math,
+ nodes.image,
+ nodes.raw,
+ nodes.problematic,
+ addnodes.not_smartquotable,
+)
+
+
+def is_smartquotable(node: Node) -> bool:
+ """Check whether the node is smart-quotable or not."""
+ for pnode in traverse_parent(node.parent):
+ if isinstance(pnode, NON_SMARTQUOTABLE_PARENT_NODES):
+ return False
+ if pnode.get('support_smartquotes', None) is False:
+ return False
+
+ if getattr(node, 'support_smartquotes', None) is False:
+ return False
+
+ return True
+
+
+def process_only_nodes(document: Node, tags: Tags) -> None:
+ """Filter ``only`` nodes which do not match *tags*."""
+ for node in document.findall(addnodes.only):
+ if _only_node_keep_children(node, tags):
+ node.replace_self(node.children or nodes.comment())
+ else:
+ # A comment on the comment() nodes being inserted: replacing by [] would
+ # result in a "Losing ids" exception if there is a target node before
+ # the only node, so we make sure docutils can transfer the id to
+ # something, even if it's just a comment and will lose the id anyway...
+ node.replace_self(nodes.comment())
+
+
+def _only_node_keep_children(node: addnodes.only, tags: Tags) -> bool:
+ """Keep children if tags match or error."""
+ try:
+ return tags.eval_condition(node['expr'])
+ except Exception as err:
+ logger.warning(
+ __('exception while evaluating only directive expression: %s'),
+ err,
+ location=node)
+ return True
+
+
+def _copy_except__document(el: Element) -> Element:
+ """Monkey-patch ```nodes.Element.copy``` to not copy the ``_document``
+ attribute.
+
+ xref: https://github.com/sphinx-doc/sphinx/issues/11116#issuecomment-1376767086
+ """
+ newnode = object.__new__(el.__class__)
+ # set in Element.__init__()
+ newnode.children = []
+ newnode.rawsource = el.rawsource
+ newnode.tagname = el.tagname
+ # copied in Element.copy()
+ newnode.attributes = {k: (v
+ if k not in {'ids', 'classes', 'names', 'dupnames', 'backrefs'}
+ else v[:])
+ for k, v in el.attributes.items()}
+ newnode.line = el.line
+ newnode.source = el.source
+ return newnode
+
+
+nodes.Element.copy = _copy_except__document # type: ignore[assignment]
+
+
+def _deepcopy(el: Element) -> Element:
+ """Monkey-patch ```nodes.Element.deepcopy``` for speed."""
+ newnode = el.copy()
+ newnode.children = [child.deepcopy() for child in el.children]
+ for child in newnode.children:
+ child.parent = newnode
+ if el.document:
+ child.document = el.document
+ if child.source is None:
+ child.source = el.document.current_source
+ if child.line is None:
+ child.line = el.document.current_line
+ return newnode
+
+
+nodes.Element.deepcopy = _deepcopy # type: ignore[assignment]
diff --git a/sphinx/util/osutil.py b/sphinx/util/osutil.py
new file mode 100644
index 0000000..c6adbe4
--- /dev/null
+++ b/sphinx/util/osutil.py
@@ -0,0 +1,217 @@
+"""Operating system-related utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import contextlib
+import filecmp
+import os
+import re
+import shutil
+import sys
+import unicodedata
+from io import StringIO
+from os import path
+from typing import TYPE_CHECKING, Any
+
+from sphinx.deprecation import _deprecation_warning
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+# SEP separates path elements in the canonical file names
+#
+# Define SEP as a manifest constant, not so much because we expect it to change
+# in the future as to avoid the suspicion that a stray "/" in the code is a
+# hangover from more *nix-oriented origins.
+SEP = "/"
+
+
+def os_path(canonical_path: str, /) -> str:
+ return canonical_path.replace(SEP, path.sep)
+
+
+def canon_path(native_path: str | os.PathLike[str], /) -> str:
+ """Return path in OS-independent form"""
+ return os.fspath(native_path).replace(path.sep, SEP)
+
+
+def path_stabilize(filepath: str | os.PathLike[str], /) -> str:
+ "Normalize path separator and unicode string"
+ new_path = canon_path(filepath)
+ return unicodedata.normalize('NFC', new_path)
+
+
+def relative_uri(base: str, to: str) -> str:
+ """Return a relative URL from ``base`` to ``to``."""
+ if to.startswith(SEP):
+ return to
+ b2 = base.split('#')[0].split(SEP)
+ t2 = to.split('#')[0].split(SEP)
+ # remove common segments (except the last segment)
+ for x, y in zip(b2[:-1], t2[:-1]):
+ if x != y:
+ break
+ b2.pop(0)
+ t2.pop(0)
+ if b2 == t2:
+ # Special case: relative_uri('f/index.html','f/index.html')
+ # returns '', not 'index.html'
+ return ''
+ if len(b2) == 1 and t2 == ['']:
+ # Special case: relative_uri('f/index.html','f/') should
+ # return './', not ''
+ return '.' + SEP
+ return ('..' + SEP) * (len(b2) - 1) + SEP.join(t2)
+
+
+def ensuredir(file: str | os.PathLike[str]) -> None:
+ """Ensure that a path exists."""
+ os.makedirs(file, exist_ok=True)
+
+
+def mtimes_of_files(dirnames: list[str], suffix: str) -> Iterator[float]:
+ for dirname in dirnames:
+ for root, _dirs, files in os.walk(dirname):
+ for sfile in files:
+ if sfile.endswith(suffix):
+ with contextlib.suppress(OSError):
+ yield path.getmtime(path.join(root, sfile))
+
+
+def copytimes(source: str | os.PathLike[str], dest: str | os.PathLike[str]) -> None:
+ """Copy a file's modification times."""
+ st = os.stat(source)
+ if hasattr(os, 'utime'):
+ os.utime(dest, (st.st_atime, st.st_mtime))
+
+
+def copyfile(source: str | os.PathLike[str], dest: str | os.PathLike[str]) -> None:
+ """Copy a file and its modification times, if possible.
+
+ Note: ``copyfile`` skips copying if the file has not been changed"""
+ if not path.exists(dest) or not filecmp.cmp(source, dest):
+ shutil.copyfile(source, dest)
+ with contextlib.suppress(OSError):
+ # don't do full copystat because the source may be read-only
+ copytimes(source, dest)
+
+
+no_fn_re = re.compile(r'[^a-zA-Z0-9_-]')
+project_suffix_re = re.compile(' Documentation$')
+
+
+def make_filename(string: str) -> str:
+ return no_fn_re.sub('', string) or 'sphinx'
+
+
+def make_filename_from_project(project: str) -> str:
+ return make_filename(project_suffix_re.sub('', project)).lower()
+
+
+def relpath(path: str | os.PathLike[str],
+ start: str | os.PathLike[str] | None = os.curdir) -> str:
+ """Return a relative filepath to *path* either from the current directory or
+ from an optional *start* directory.
+
+ This is an alternative of ``os.path.relpath()``. This returns original path
+ if *path* and *start* are on different drives (for Windows platform).
+ """
+ try:
+ return os.path.relpath(path, start)
+ except ValueError:
+ return str(path)
+
+
+safe_relpath = relpath # for compatibility
+fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
+
+
+abspath = path.abspath
+
+
+class _chdir:
+ """Remove this fall-back once support for Python 3.10 is removed."""
+ def __init__(self, target_dir: str, /):
+ self.path = target_dir
+ self._dirs: list[str] = []
+
+ def __enter__(self):
+ self._dirs.append(os.getcwd())
+ os.chdir(self.path)
+
+ def __exit__(self, _exc_type, _exc_value, _traceback, /):
+ os.chdir(self._dirs.pop())
+
+
+@contextlib.contextmanager
+def cd(target_dir: str) -> Iterator[None]:
+ if sys.version_info[:2] >= (3, 11):
+ _deprecation_warning(__name__, 'cd', 'contextlib.chdir', remove=(8, 0))
+ with _chdir(target_dir):
+ yield
+
+
+class FileAvoidWrite:
+ """File-like object that buffers output and only writes if content changed.
+
+ Use this class like when writing to a file to avoid touching the original
+ file if the content hasn't changed. This is useful in scenarios where file
+ mtime is used to invalidate caches or trigger new behavior.
+
+ When writing to this file handle, all writes are buffered until the object
+ is closed.
+
+ Objects can be used as context managers.
+ """
+ def __init__(self, path: str) -> None:
+ self._path = path
+ self._io: StringIO | None = None
+
+ def write(self, data: str) -> None:
+ if not self._io:
+ self._io = StringIO()
+ self._io.write(data)
+
+ def close(self) -> None:
+ """Stop accepting writes and write file, if needed."""
+ if not self._io:
+ msg = 'FileAvoidWrite does not support empty files.'
+ raise Exception(msg)
+
+ buf = self.getvalue()
+ self._io.close()
+
+ try:
+ with open(self._path, encoding='utf-8') as old_f:
+ old_content = old_f.read()
+ if old_content == buf:
+ return
+ except OSError:
+ pass
+
+ with open(self._path, 'w', encoding='utf-8') as f:
+ f.write(buf)
+
+ def __enter__(self) -> FileAvoidWrite:
+ return self
+
+ def __exit__(
+ self, exc_type: type[Exception], exc_value: Exception, traceback: Any,
+ ) -> bool:
+ self.close()
+ return True
+
+ def __getattr__(self, name: str) -> Any:
+ # Proxy to _io instance.
+ if not self._io:
+ msg = 'Must write to FileAvoidWrite before other methods can be used'
+ raise Exception(msg)
+
+ return getattr(self._io, name)
+
+
+def rmtree(path: str) -> None:
+ if os.path.isdir(path):
+ shutil.rmtree(path)
+ else:
+ os.remove(path)
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
new file mode 100644
index 0000000..0afdff9
--- /dev/null
+++ b/sphinx/util/parallel.py
@@ -0,0 +1,154 @@
+"""Parallel building utilities."""
+
+from __future__ import annotations
+
+import os
+import time
+import traceback
+from math import sqrt
+from typing import TYPE_CHECKING, Any, Callable
+
+try:
+ import multiprocessing
+ HAS_MULTIPROCESSING = True
+except ImportError:
+ HAS_MULTIPROCESSING = False
+
+from sphinx.errors import SphinxParallelError
+from sphinx.util import logging
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+logger = logging.getLogger(__name__)
+
+# our parallel functionality only works for the forking Process
+parallel_available = multiprocessing and os.name == 'posix'
+
+
+class SerialTasks:
+ """Has the same interface as ParallelTasks, but executes tasks directly."""
+
+ def __init__(self, nproc: int = 1) -> None:
+ pass
+
+ def add_task(
+ self, task_func: Callable, arg: Any = None, result_func: Callable | None = None,
+ ) -> None:
+ if arg is not None:
+ res = task_func(arg)
+ else:
+ res = task_func()
+ if result_func:
+ result_func(res)
+
+ def join(self) -> None:
+ pass
+
+
+class ParallelTasks:
+ """Executes *nproc* tasks in parallel after forking."""
+
+ def __init__(self, nproc: int) -> None:
+ self.nproc = nproc
+ # (optional) function performed by each task on the result of main task
+ self._result_funcs: dict[int, Callable] = {}
+ # task arguments
+ self._args: dict[int, list[Any] | None] = {}
+ # list of subprocesses (both started and waiting)
+ self._procs: dict[int, Any] = {}
+ # list of receiving pipe connections of running subprocesses
+ self._precvs: dict[int, Any] = {}
+ # list of receiving pipe connections of waiting subprocesses
+ self._precvsWaiting: dict[int, Any] = {}
+ # number of working subprocesses
+ self._pworking = 0
+ # task number of each subprocess
+ self._taskid = 0
+
+ def _process(self, pipe: Any, func: Callable, arg: Any) -> None:
+ try:
+ collector = logging.LogCollector()
+ with collector.collect():
+ if arg is None:
+ ret = func()
+ else:
+ ret = func(arg)
+ failed = False
+ except BaseException as err:
+ failed = True
+ errmsg = traceback.format_exception_only(err.__class__, err)[0].strip()
+ ret = (errmsg, traceback.format_exc())
+ logging.convert_serializable(collector.logs)
+ pipe.send((failed, collector.logs, ret))
+
+ def add_task(
+ self, task_func: Callable, arg: Any = None, result_func: Callable | None = None,
+ ) -> None:
+ tid = self._taskid
+ self._taskid += 1
+ self._result_funcs[tid] = result_func or (lambda arg, result: None)
+ self._args[tid] = arg
+ precv, psend = multiprocessing.Pipe(False)
+ context: Any = multiprocessing.get_context('fork')
+ proc = context.Process(target=self._process, args=(psend, task_func, arg))
+ self._procs[tid] = proc
+ self._precvsWaiting[tid] = precv
+ self._join_one()
+
+ def join(self) -> None:
+ try:
+ while self._pworking:
+ if not self._join_one():
+ time.sleep(0.02)
+ finally:
+ # shutdown other child processes on failure
+ self.terminate()
+
+ def terminate(self) -> None:
+ for tid in list(self._precvs):
+ self._procs[tid].terminate()
+ self._result_funcs.pop(tid)
+ self._procs.pop(tid)
+ self._precvs.pop(tid)
+ self._pworking -= 1
+
+ def _join_one(self) -> bool:
+ joined_any = False
+ for tid, pipe in self._precvs.items():
+ if pipe.poll():
+ exc, logs, result = pipe.recv()
+ if exc:
+ raise SphinxParallelError(*result)
+ for log in logs:
+ logger.handle(log)
+ self._result_funcs.pop(tid)(self._args.pop(tid), result)
+ self._procs[tid].join()
+ self._precvs.pop(tid)
+ self._pworking -= 1
+ joined_any = True
+ break
+
+ while self._precvsWaiting and self._pworking < self.nproc:
+ newtid, newprecv = self._precvsWaiting.popitem()
+ self._precvs[newtid] = newprecv
+ self._procs[newtid].start()
+ self._pworking += 1
+
+ return joined_any
+
+
+def make_chunks(arguments: Sequence[str], nproc: int, maxbatch: int = 10) -> list[Any]:
+ # determine how many documents to read in one go
+ nargs = len(arguments)
+ chunksize = nargs // nproc
+ if chunksize >= maxbatch:
+ # try to improve batch size vs. number of batches
+ chunksize = int(sqrt(nargs / nproc * maxbatch))
+ if chunksize == 0:
+ chunksize = 1
+ nchunks, rest = divmod(nargs, chunksize)
+ if rest:
+ nchunks += 1
+ # partition documents in "chunks" that will be written by one Process
+ return [arguments[i * chunksize:(i + 1) * chunksize] for i in range(nchunks)]
diff --git a/sphinx/util/png.py b/sphinx/util/png.py
new file mode 100644
index 0000000..6c94219
--- /dev/null
+++ b/sphinx/util/png.py
@@ -0,0 +1,43 @@
+"""PNG image manipulation helpers."""
+
+from __future__ import annotations
+
+import binascii
+import struct
+
+LEN_IEND = 12
+LEN_DEPTH = 22
+
+DEPTH_CHUNK_LEN = struct.pack('!i', 10)
+DEPTH_CHUNK_START = b'tEXtDepth\x00'
+IEND_CHUNK = b'\x00\x00\x00\x00IEND\xAE\x42\x60\x82'
+
+
+def read_png_depth(filename: str) -> int | None:
+ """Read the special tEXt chunk indicating the depth from a PNG file."""
+ with open(filename, 'rb') as f:
+ f.seek(- (LEN_IEND + LEN_DEPTH), 2)
+ depthchunk = f.read(LEN_DEPTH)
+ if not depthchunk.startswith(DEPTH_CHUNK_LEN + DEPTH_CHUNK_START):
+ # either not a PNG file or not containing the depth chunk
+ return None
+ else:
+ return struct.unpack('!i', depthchunk[14:18])[0]
+
+
+def write_png_depth(filename: str, depth: int) -> None:
+ """Write the special tEXt chunk indicating the depth to a PNG file.
+
+ The chunk is placed immediately before the special IEND chunk.
+ """
+ data = struct.pack('!i', depth)
+ with open(filename, 'r+b') as f:
+ # seek to the beginning of the IEND chunk
+ f.seek(-LEN_IEND, 2)
+ # overwrite it with the depth chunk
+ f.write(DEPTH_CHUNK_LEN + DEPTH_CHUNK_START + data)
+ # calculate the checksum over chunk name and data
+ crc = binascii.crc32(DEPTH_CHUNK_START + data) & 0xffffffff
+ f.write(struct.pack('!I', crc))
+ # replace the IEND chunk
+ f.write(IEND_CHUNK)
diff --git a/sphinx/util/requests.py b/sphinx/util/requests.py
new file mode 100644
index 0000000..ec3d8d2
--- /dev/null
+++ b/sphinx/util/requests.py
@@ -0,0 +1,73 @@
+"""Simple requests package loader"""
+
+from __future__ import annotations
+
+import warnings
+from typing import Any
+from urllib.parse import urlsplit
+
+import requests
+from urllib3.exceptions import InsecureRequestWarning
+
+import sphinx
+
+_USER_AGENT = (f'Mozilla/5.0 (X11; Linux x86_64; rv:100.0) Gecko/20100101 Firefox/100.0 '
+ f'Sphinx/{sphinx.__version__}')
+
+
+def _get_tls_cacert(url: str, certs: str | dict[str, str] | None) -> str | bool:
+ """Get additional CA cert for a specific URL."""
+ if not certs:
+ return True
+ elif isinstance(certs, (str, tuple)):
+ return certs
+ else:
+ hostname = urlsplit(url).netloc
+ if '@' in hostname:
+ _, hostname = hostname.split('@', 1)
+
+ return certs.get(hostname, True)
+
+
+def get(url: str, **kwargs: Any) -> requests.Response:
+ """Sends a GET request like requests.get().
+
+ This sets up User-Agent header and TLS verification automatically."""
+ with _Session() as session:
+ return session.get(url, **kwargs)
+
+
+def head(url: str, **kwargs: Any) -> requests.Response:
+ """Sends a HEAD request like requests.head().
+
+ This sets up User-Agent header and TLS verification automatically."""
+ with _Session() as session:
+ return session.head(url, **kwargs)
+
+
+class _Session(requests.Session):
+ def request( # type: ignore[override]
+ self, method: str, url: str,
+ _user_agent: str = '',
+ _tls_info: tuple[bool, str | dict[str, str] | None] = (), # type: ignore[assignment]
+ **kwargs: Any,
+ ) -> requests.Response:
+ """Sends a request with an HTTP verb and url.
+
+ This sets up User-Agent header and TLS verification automatically."""
+ headers = kwargs.setdefault('headers', {})
+ headers.setdefault('User-Agent', _user_agent or _USER_AGENT)
+ if _tls_info:
+ tls_verify, tls_cacerts = _tls_info
+ verify = bool(kwargs.get('verify', tls_verify))
+ kwargs.setdefault('verify', verify and _get_tls_cacert(url, tls_cacerts))
+ else:
+ verify = kwargs.get('verify', True)
+
+ if verify:
+ return super().request(method, url, **kwargs)
+
+ with warnings.catch_warnings():
+ # ignore InsecureRequestWarning if verify=False
+ warnings.filterwarnings("ignore", category=InsecureRequestWarning)
+ return super().request(method, url, **kwargs)
diff --git a/sphinx/util/rst.py b/sphinx/util/rst.py
new file mode 100644
index 0000000..1e8fd66
--- /dev/null
+++ b/sphinx/util/rst.py
@@ -0,0 +1,110 @@
+"""reST helper functions."""
+
+from __future__ import annotations
+
+import re
+from collections import defaultdict
+from contextlib import contextmanager
+from typing import TYPE_CHECKING
+from unicodedata import east_asian_width
+
+from docutils.parsers.rst import roles
+from docutils.parsers.rst.languages import en as english
+from docutils.parsers.rst.states import Body
+from docutils.utils import Reporter
+from jinja2 import Environment, pass_environment
+
+from sphinx.locale import __
+from sphinx.util import docutils, logging
+
+if TYPE_CHECKING:
+ from collections.abc import Generator
+
+ from docutils.statemachine import StringList
+
+logger = logging.getLogger(__name__)
+
+FIELD_NAME_RE = re.compile(Body.patterns['field_marker'])
+symbols_re = re.compile(r'([!-\-/:-@\[-`{-~])') # symbols without dot(0x2e)
+SECTIONING_CHARS = ['=', '-', '~']
+
+# width of characters
+WIDECHARS: dict[str, str] = defaultdict(lambda: "WF") # WF: Wide + Full-width
+WIDECHARS["ja"] = "WFA" # In Japanese, Ambiguous characters also have double width
+
+
+def escape(text: str) -> str:
+ text = symbols_re.sub(r'\\\1', text)
+ text = re.sub(r'^\.', r'\.', text) # escape a dot at top
+ return text
+
+
+def textwidth(text: str, widechars: str = 'WF') -> int:
+ """Get width of text."""
+ def charwidth(char: str, widechars: str) -> int:
+ if east_asian_width(char) in widechars:
+ return 2
+ else:
+ return 1
+
+ return sum(charwidth(c, widechars) for c in text)
+
+
+@pass_environment
+def heading(env: Environment, text: str, level: int = 1) -> str:
+ """Create a heading for *level*."""
+ assert level <= 3
+ width = textwidth(text, WIDECHARS[env.language])
+ sectioning_char = SECTIONING_CHARS[level - 1]
+ return f'{text}\n{sectioning_char * width}'
+
+
+@contextmanager
+def default_role(docname: str, name: str) -> Generator[None, None, None]:
+ if name:
+ dummy_reporter = Reporter('', 4, 4)
+ role_fn, _ = roles.role(name, english, 0, dummy_reporter)
+ if role_fn: # type: ignore[truthy-function]
+ docutils.register_role('', role_fn) # type: ignore[arg-type]
+ else:
+ logger.warning(__('default role %s not found'), name, location=docname)
+
+ yield
+
+ docutils.unregister_role('')
+
+
+def prepend_prolog(content: StringList, prolog: str) -> None:
+ """Prepend a string to content body as prolog."""
+ if prolog:
+ pos = 0
+ for line in content:
+ if FIELD_NAME_RE.match(line):
+ pos += 1
+ else:
+ break
+
+ if pos > 0:
+ # insert a blank line after docinfo
+ content.insert(pos, '', '<generated>', 0)
+ pos += 1
+
+ # insert prolog (after docinfo if exists)
+ lineno = 0
+ for lineno, line in enumerate(prolog.splitlines()):
+ content.insert(pos + lineno, line, '<rst_prolog>', lineno)
+
+ content.insert(pos + lineno + 1, '', '<generated>', 0)
+
+
+def append_epilog(content: StringList, epilog: str) -> None:
+ """Append a string to content body as epilog."""
+ if epilog:
+ if len(content) > 0:
+ source, lineno = content.info(-1)
+ else:
+ source = '<generated>'
+ lineno = 0
+ content.append('', source, lineno + 1)
+ for lineno, line in enumerate(epilog.splitlines()):
+ content.append(line, '<rst_epilog>', lineno)
diff --git a/sphinx/util/tags.py b/sphinx/util/tags.py
new file mode 100644
index 0000000..73e1a83
--- /dev/null
+++ b/sphinx/util/tags.py
@@ -0,0 +1,88 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from jinja2 import nodes
+from jinja2.environment import Environment
+from jinja2.parser import Parser
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from jinja2.nodes import Node
+
+
+env = Environment()
+
+
+class BooleanParser(Parser):
+ """
+ Only allow condition exprs and/or/not operations.
+ """
+
+ def parse_compare(self) -> Node:
+ node: Node
+ token = self.stream.current
+ if token.type == 'name':
+ if token.value in ('true', 'false', 'True', 'False'):
+ node = nodes.Const(token.value in ('true', 'True'),
+ lineno=token.lineno)
+ elif token.value in ('none', 'None'):
+ node = nodes.Const(None, lineno=token.lineno)
+ else:
+ node = nodes.Name(token.value, 'load', lineno=token.lineno)
+ next(self.stream)
+ elif token.type == 'lparen':
+ next(self.stream)
+ node = self.parse_expression()
+ self.stream.expect('rparen')
+ else:
+ self.fail(f"unexpected token '{token}'", token.lineno)
+ return node
+
+
+class Tags:
+ def __init__(self, tags: list[str] | None = None) -> None:
+ self.tags = dict.fromkeys(tags or [], True)
+
+ def has(self, tag: str) -> bool:
+ return tag in self.tags
+
+ __contains__ = has
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self.tags)
+
+ def add(self, tag: str) -> None:
+ self.tags[tag] = True
+
+ def remove(self, tag: str) -> None:
+ self.tags.pop(tag, None)
+
+ def eval_condition(self, condition: str) -> bool:
+ # exceptions are handled by the caller
+ parser = BooleanParser(env, condition, state='variable')
+ expr = parser.parse_expression()
+ if not parser.stream.eos:
+ msg = 'chunk after expression'
+ raise ValueError(msg)
+
+ def eval_node(node: Node) -> bool:
+ if isinstance(node, nodes.CondExpr):
+ if eval_node(node.test):
+ return eval_node(node.expr1)
+ else:
+ return eval_node(node.expr2)
+ elif isinstance(node, nodes.And):
+ return eval_node(node.left) and eval_node(node.right)
+ elif isinstance(node, nodes.Or):
+ return eval_node(node.left) or eval_node(node.right)
+ elif isinstance(node, nodes.Not):
+ return not eval_node(node.node)
+ elif isinstance(node, nodes.Name):
+ return self.tags.get(node.name, False)
+ else:
+ msg = 'invalid node, check parsing'
+ raise ValueError(msg)
+
+ return eval_node(expr)
diff --git a/sphinx/util/template.py b/sphinx/util/template.py
new file mode 100644
index 0000000..a16a7a1
--- /dev/null
+++ b/sphinx/util/template.py
@@ -0,0 +1,135 @@
+"""Templates utility functions for Sphinx."""
+
+from __future__ import annotations
+
+import os
+from functools import partial
+from os import path
+from typing import TYPE_CHECKING, Any, Callable
+
+from jinja2 import TemplateNotFound
+from jinja2.loaders import BaseLoader
+from jinja2.sandbox import SandboxedEnvironment
+
+from sphinx import package_dir
+from sphinx.jinja2glue import SphinxFileSystemLoader
+from sphinx.locale import get_translator
+from sphinx.util import rst, texescape
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+ from jinja2.environment import Environment
+
+
+class BaseRenderer:
+ def __init__(self, loader: BaseLoader | None = None) -> None:
+ self.env = SandboxedEnvironment(loader=loader, extensions=['jinja2.ext.i18n'])
+ self.env.filters['repr'] = repr
+ self.env.install_gettext_translations(get_translator())
+
+ def render(self, template_name: str, context: dict[str, Any]) -> str:
+ return self.env.get_template(template_name).render(context)
+
+ def render_string(self, source: str, context: dict[str, Any]) -> str:
+ return self.env.from_string(source).render(context)
+
+
+class FileRenderer(BaseRenderer):
+ def __init__(self, search_path: Sequence[str | os.PathLike[str]]) -> None:
+ if isinstance(search_path, (str, os.PathLike)):
+ search_path = [search_path]
+ else:
+ # filter "None" paths
+ search_path = list(filter(None, search_path))
+
+ loader = SphinxFileSystemLoader(search_path)
+ super().__init__(loader)
+
+ @classmethod
+ def render_from_file(cls, filename: str, context: dict[str, Any]) -> str:
+ dirname = os.path.dirname(filename)
+ basename = os.path.basename(filename)
+ return cls(dirname).render(basename, context)
+
+
+class SphinxRenderer(FileRenderer):
+ def __init__(self, template_path: Sequence[str | os.PathLike[str]] | None = None) -> None:
+ if template_path is None:
+ template_path = os.path.join(package_dir, 'templates')
+ super().__init__(template_path)
+
+ @classmethod
+ def render_from_file(cls, filename: str, context: dict[str, Any]) -> str:
+ return FileRenderer.render_from_file(filename, context)
+
+
+class LaTeXRenderer(SphinxRenderer):
+ def __init__(self, template_path: Sequence[str | os.PathLike[str]] | None = None,
+ latex_engine: str | None = None) -> None:
+ if template_path is None:
+ template_path = [os.path.join(package_dir, 'templates', 'latex')]
+ super().__init__(template_path)
+
+ # use texescape as escape filter
+ escape = partial(texescape.escape, latex_engine=latex_engine)
+ self.env.filters['e'] = escape
+ self.env.filters['escape'] = escape
+ self.env.filters['eabbr'] = texescape.escape_abbr
+
+ # use JSP/eRuby like tagging instead because curly bracket; the default
+ # tagging of jinja2 is not good for LaTeX sources.
+ self.env.variable_start_string = '<%='
+ self.env.variable_end_string = '%>'
+ self.env.block_start_string = '<%'
+ self.env.block_end_string = '%>'
+ self.env.comment_start_string = '<#'
+ self.env.comment_end_string = '#>'
+
+
+class ReSTRenderer(SphinxRenderer):
+ def __init__(self, template_path: Sequence[str | os.PathLike[str]] | None = None,
+ language: str | None = None) -> None:
+ super().__init__(template_path)
+
+ # add language to environment
+ self.env.extend(language=language)
+
+ # use texescape as escape filter
+ self.env.filters['e'] = rst.escape
+ self.env.filters['escape'] = rst.escape
+ self.env.filters['heading'] = rst.heading
+
+
+class SphinxTemplateLoader(BaseLoader):
+ """A loader supporting template inheritance"""
+
+ def __init__(self, confdir: str | os.PathLike[str],
+ templates_paths: Sequence[str | os.PathLike[str]],
+ system_templates_paths: Sequence[str | os.PathLike[str]]) -> None:
+ self.loaders = []
+ self.sysloaders = []
+
+ for templates_path in templates_paths:
+ loader = SphinxFileSystemLoader(path.join(confdir, templates_path))
+ self.loaders.append(loader)
+
+ for templates_path in system_templates_paths:
+ loader = SphinxFileSystemLoader(templates_path)
+ self.loaders.append(loader)
+ self.sysloaders.append(loader)
+
+ def get_source(self, environment: Environment, template: str) -> tuple[str, str, Callable]:
+ if template.startswith('!'):
+ # search a template from ``system_templates_paths``
+ loaders = self.sysloaders
+ template = template[1:]
+ else:
+ loaders = self.loaders
+
+ for loader in loaders:
+ try:
+ return loader.get_source(environment, template)
+ except TemplateNotFound:
+ pass
+ raise TemplateNotFound(template)
diff --git a/sphinx/util/texescape.py b/sphinx/util/texescape.py
new file mode 100644
index 0000000..8527441
--- /dev/null
+++ b/sphinx/util/texescape.py
@@ -0,0 +1,153 @@
+"""TeX escaping helper."""
+
+from __future__ import annotations
+
+import re
+
+tex_replacements = [
+ # map TeX special chars
+ ('$', r'\$'),
+ ('%', r'\%'),
+ ('&', r'\&'),
+ ('#', r'\#'),
+ ('_', r'\_'),
+ ('{', r'\{'),
+ ('}', r'\}'),
+ ('\\', r'\textbackslash{}'),
+ ('~', r'\textasciitilde{}'),
+ ('^', r'\textasciicircum{}'),
+ # map chars to avoid mis-interpretation in LaTeX
+ ('[', r'{[}'),
+ (']', r'{]}'),
+ # map special Unicode characters to TeX commands
+ ('✓', r'\(\checkmark\)'),
+ ('✔', r'\(\pmb{\checkmark}\)'),
+ ('✕', r'\(\times\)'),
+ ('✖', r'\(\pmb{\times}\)'),
+ # used to separate -- in options
+ ('', r'{}'),
+ # map some special Unicode characters to similar ASCII ones
+ # (even for Unicode LaTeX as may not be supported by OpenType font)
+ ('⎽', r'\_'),
+ ('ℯ', r'e'),
+ ('ⅈ', r'i'),
+ # Greek alphabet not escaped: pdflatex handles it via textalpha and inputenc
+ # OHM SIGN U+2126 is handled by LaTeX textcomp package
+]
+
+# A map to avoid TeX ligatures or character replacements in PDF output
+# xelatex/lualatex/uplatex are handled differently (#5790, #6888)
+ascii_tex_replacements = [
+ # Note: the " renders curly in OT1 encoding but straight in T1, T2A, LY1...
+ # escaping it to \textquotedbl would break documents using OT1
+ # Sphinx does \shorthandoff{"} to avoid problems with some languages
+ # There is no \text... LaTeX escape for the hyphen character -
+ ('-', r'\sphinxhyphen{}'), # -- and --- are TeX ligatures
+ # ,, is a TeX ligature in T1 encoding, but escaping the comma adds
+ # complications (whether by {}, or a macro) and is not done
+ # the next two require textcomp package
+ ("'", r'\textquotesingle{}'), # else ' renders curly, and '' is a ligature
+ ('`', r'\textasciigrave{}'), # else \` and \`\` render curly
+ ('<', r'\textless{}'), # < is inv. exclam in OT1, << is a T1-ligature
+ ('>', r'\textgreater{}'), # > is inv. quest. mark in 0T1, >> a T1-ligature
+]
+
+# A map Unicode characters to LaTeX representation
+# (for LaTeX engines which don't support unicode)
+unicode_tex_replacements = [
+ # map some more common Unicode characters to TeX commands
+ ('¶', r'\P{}'),
+ ('§', r'\S{}'),
+ ('€', r'\texteuro{}'),
+ ('∞', r'\(\infty\)'),
+ ('±', r'\(\pm\)'),
+ ('→', r'\(\rightarrow\)'),
+ ('‣', r'\(\rightarrow\)'),
+ ('–', r'\textendash{}'),
+ # superscript
+ ('⁰', r'\(\sp{\text{0}}\)'),
+ ('¹', r'\(\sp{\text{1}}\)'),
+ ('²', r'\(\sp{\text{2}}\)'),
+ ('³', r'\(\sp{\text{3}}\)'),
+ ('⁴', r'\(\sp{\text{4}}\)'),
+ ('⁵', r'\(\sp{\text{5}}\)'),
+ ('⁶', r'\(\sp{\text{6}}\)'),
+ ('⁷', r'\(\sp{\text{7}}\)'),
+ ('⁸', r'\(\sp{\text{8}}\)'),
+ ('⁹', r'\(\sp{\text{9}}\)'),
+ # subscript
+ ('₀', r'\(\sb{\text{0}}\)'),
+ ('₁', r'\(\sb{\text{1}}\)'),
+ ('₂', r'\(\sb{\text{2}}\)'),
+ ('₃', r'\(\sb{\text{3}}\)'),
+ ('₄', r'\(\sb{\text{4}}\)'),
+ ('₅', r'\(\sb{\text{5}}\)'),
+ ('₆', r'\(\sb{\text{6}}\)'),
+ ('₇', r'\(\sb{\text{7}}\)'),
+ ('₈', r'\(\sb{\text{8}}\)'),
+ ('₉', r'\(\sb{\text{9}}\)'),
+]
+
+# TODO: this should be called tex_idescape_map because its only use is in
+# sphinx.writers.latex.LaTeXTranslator.idescape()
+# %, {, }, \, #, and ~ are the only ones which must be replaced by _ character
+# It would be simpler to define it entirely here rather than in init().
+# Unicode replacements are superfluous, as idescape() uses backslashreplace
+tex_replace_map: dict[int, str] = {}
+
+_tex_escape_map: dict[int, str] = {}
+_tex_escape_map_without_unicode: dict[int, str] = {}
+_tex_hlescape_map: dict[int, str] = {}
+_tex_hlescape_map_without_unicode: dict[int, str] = {}
+
+
+def escape(s: str, latex_engine: str | None = None) -> str:
+ """Escape text for LaTeX output."""
+ if latex_engine in ('lualatex', 'xelatex'):
+ # unicode based LaTeX engine
+ return s.translate(_tex_escape_map_without_unicode)
+ else:
+ return s.translate(_tex_escape_map)
+
+
+def hlescape(s: str, latex_engine: str | None = None) -> str:
+ """Escape text for LaTeX highlighter."""
+ if latex_engine in ('lualatex', 'xelatex'):
+ # unicode based LaTeX engine
+ return s.translate(_tex_hlescape_map_without_unicode)
+ else:
+ return s.translate(_tex_hlescape_map)
+
+
+def escape_abbr(text: str) -> str:
+ """Adjust spacing after abbreviations. Works with @ letter or other."""
+ return re.sub(r'\.(?=\s|$)', r'.\@{}', text)
+
+
+def init() -> None:
+ for a, b in tex_replacements:
+ _tex_escape_map[ord(a)] = b
+ _tex_escape_map_without_unicode[ord(a)] = b
+ tex_replace_map[ord(a)] = '_'
+
+ # no reason to do this for _tex_escape_map_without_unicode
+ for a, b in ascii_tex_replacements:
+ _tex_escape_map[ord(a)] = b
+
+ # but the hyphen has a specific PDF bookmark problem
+ # https://github.com/latex3/hyperref/issues/112
+ _tex_escape_map_without_unicode[ord('-')] = r'\sphinxhyphen{}'
+
+ for a, b in unicode_tex_replacements:
+ _tex_escape_map[ord(a)] = b
+ # This is actually unneeded:
+ tex_replace_map[ord(a)] = '_'
+
+ for a, b in tex_replacements:
+ if a in '[]{}\\':
+ continue
+ _tex_hlescape_map[ord(a)] = b
+ _tex_hlescape_map_without_unicode[ord(a)] = b
+
+ for a, b in unicode_tex_replacements:
+ _tex_hlescape_map[ord(a)] = b
diff --git a/sphinx/util/typing.py b/sphinx/util/typing.py
new file mode 100644
index 0000000..171420d
--- /dev/null
+++ b/sphinx/util/typing.py
@@ -0,0 +1,402 @@
+"""The composite types for Sphinx."""
+
+from __future__ import annotations
+
+import sys
+import typing
+from collections.abc import Sequence
+from struct import Struct
+from types import TracebackType
+from typing import TYPE_CHECKING, Any, Callable, ForwardRef, TypeVar, Union
+
+from docutils import nodes
+from docutils.parsers.rst.states import Inliner
+
+if TYPE_CHECKING:
+ import enum
+
+try:
+ from types import UnionType # type: ignore[attr-defined] # python 3.10 or above
+except ImportError:
+ UnionType = None
+
+# classes that have incorrect __module__
+INVALID_BUILTIN_CLASSES = {
+ Struct: 'struct.Struct', # Struct.__module__ == '_struct'
+ TracebackType: 'types.TracebackType', # TracebackType.__module__ == 'builtins'
+}
+
+
+def is_invalid_builtin_class(obj: Any) -> bool:
+ """Check *obj* is an invalid built-in class."""
+ try:
+ return obj in INVALID_BUILTIN_CLASSES
+ except TypeError: # unhashable type
+ return False
+
+
+# Text like nodes which are initialized with text and rawsource
+TextlikeNode = Union[nodes.Text, nodes.TextElement]
+
+# type of None
+NoneType = type(None)
+
+# path matcher
+PathMatcher = Callable[[str], bool]
+
+# common role functions
+RoleFunction = Callable[[str, str, str, int, Inliner, dict[str, Any], Sequence[str]],
+ tuple[list[nodes.Node], list[nodes.system_message]]]
+
+# A option spec for directive
+OptionSpec = dict[str, Callable[[str], Any]]
+
+# title getter functions for enumerable nodes (see sphinx.domains.std)
+TitleGetter = Callable[[nodes.Node], str]
+
+# inventory data on memory
+InventoryItem = tuple[
+ str, # project name
+ str, # project version
+ str, # URL
+ str, # display name
+]
+Inventory = dict[str, dict[str, InventoryItem]]
+
+
+def get_type_hints(
+ obj: Any, globalns: dict[str, Any] | None = None, localns: dict[str, Any] | None = None,
+) -> dict[str, Any]:
+ """Return a dictionary containing type hints for a function, method, module or class
+ object.
+
+ This is a simple wrapper of `typing.get_type_hints()` that does not raise an error on
+ runtime.
+ """
+ from sphinx.util.inspect import safe_getattr # lazy loading
+
+ try:
+ return typing.get_type_hints(obj, globalns, localns)
+ except NameError:
+ # Failed to evaluate ForwardRef (maybe TYPE_CHECKING)
+ return safe_getattr(obj, '__annotations__', {})
+ except AttributeError:
+ # Failed to evaluate ForwardRef (maybe not runtime checkable)
+ return safe_getattr(obj, '__annotations__', {})
+ except TypeError:
+ # Invalid object is given. But try to get __annotations__ as a fallback.
+ return safe_getattr(obj, '__annotations__', {})
+ except KeyError:
+ # a broken class found (refs: https://github.com/sphinx-doc/sphinx/issues/8084)
+ return {}
+
+
+def is_system_TypeVar(typ: Any) -> bool:
+ """Check *typ* is system defined TypeVar."""
+ modname = getattr(typ, '__module__', '')
+ return modname == 'typing' and isinstance(typ, TypeVar)
+
+
+def restify(cls: type | None, mode: str = 'fully-qualified-except-typing') -> str:
+ """Convert python class to a reST reference.
+
+ :param mode: Specify a method how annotations will be stringified.
+
+ 'fully-qualified-except-typing'
+ Show the module name and qualified name of the annotation except
+ the "typing" module.
+ 'smart'
+ Show the name of the annotation.
+ """
+ from sphinx.ext.autodoc.mock import ismock, ismockmodule # lazy loading
+ from sphinx.util import inspect # lazy loading
+
+ if mode == 'smart':
+ modprefix = '~'
+ else:
+ modprefix = ''
+
+ try:
+ if cls is None or cls is NoneType:
+ return ':py:obj:`None`'
+ elif cls is Ellipsis:
+ return '...'
+ elif isinstance(cls, str):
+ return cls
+ elif ismockmodule(cls):
+ return f':py:class:`{modprefix}{cls.__name__}`'
+ elif ismock(cls):
+ return f':py:class:`{modprefix}{cls.__module__}.{cls.__name__}`'
+ elif is_invalid_builtin_class(cls):
+ return f':py:class:`{modprefix}{INVALID_BUILTIN_CLASSES[cls]}`'
+ elif inspect.isNewType(cls):
+ if sys.version_info[:2] >= (3, 10):
+ # newtypes have correct module info since Python 3.10+
+ return f':py:class:`{modprefix}{cls.__module__}.{cls.__name__}`'
+ else:
+ return ':py:class:`%s`' % cls.__name__
+ elif UnionType and isinstance(cls, UnionType):
+ if len(cls.__args__) > 1 and None in cls.__args__:
+ args = ' | '.join(restify(a, mode) for a in cls.__args__ if a)
+ return 'Optional[%s]' % args
+ else:
+ return ' | '.join(restify(a, mode) for a in cls.__args__)
+ elif cls.__module__ in ('__builtin__', 'builtins'):
+ if hasattr(cls, '__args__'):
+ if not cls.__args__: # Empty tuple, list, ...
+ return fr':py:class:`{cls.__name__}`\ [{cls.__args__!r}]'
+
+ concatenated_args = ', '.join(restify(arg, mode) for arg in cls.__args__)
+ return fr':py:class:`{cls.__name__}`\ [{concatenated_args}]'
+ else:
+ return ':py:class:`%s`' % cls.__name__
+ elif (inspect.isgenericalias(cls)
+ and cls.__module__ == 'typing'
+ and cls.__origin__ is Union): # type: ignore[attr-defined]
+ if (len(cls.__args__) > 1 # type: ignore[attr-defined]
+ and cls.__args__[-1] is NoneType): # type: ignore[attr-defined]
+ if len(cls.__args__) > 2: # type: ignore[attr-defined]
+ args = ', '.join(restify(a, mode)
+ for a in cls.__args__[:-1]) # type: ignore[attr-defined]
+ return ':py:obj:`~typing.Optional`\\ [:obj:`~typing.Union`\\ [%s]]' % args
+ else:
+ return ':py:obj:`~typing.Optional`\\ [%s]' % restify(
+ cls.__args__[0], mode) # type: ignore[attr-defined]
+ else:
+ args = ', '.join(restify(a, mode)
+ for a in cls.__args__) # type: ignore[attr-defined]
+ return ':py:obj:`~typing.Union`\\ [%s]' % args
+ elif inspect.isgenericalias(cls):
+ if isinstance(cls.__origin__, typing._SpecialForm): # type: ignore[attr-defined]
+ text = restify(cls.__origin__, mode) # type: ignore[attr-defined,arg-type]
+ elif getattr(cls, '_name', None):
+ cls_name = cls._name # type: ignore[attr-defined]
+ if cls.__module__ == 'typing':
+ text = f':py:class:`~{cls.__module__}.{cls_name}`'
+ else:
+ text = f':py:class:`{modprefix}{cls.__module__}.{cls_name}`'
+ else:
+ text = restify(cls.__origin__, mode) # type: ignore[attr-defined]
+
+ origin = getattr(cls, '__origin__', None)
+ if not hasattr(cls, '__args__'): # NoQA: SIM114
+ pass
+ elif all(is_system_TypeVar(a) for a in cls.__args__):
+ # Suppress arguments if all system defined TypeVars (ex. Dict[KT, VT])
+ pass
+ elif (cls.__module__ == 'typing'
+ and cls._name == 'Callable'): # type: ignore[attr-defined]
+ args = ', '.join(restify(a, mode) for a in cls.__args__[:-1])
+ text += fr"\ [[{args}], {restify(cls.__args__[-1], mode)}]"
+ elif cls.__module__ == 'typing' and getattr(origin, '_name', None) == 'Literal':
+ literal_args = []
+ for a in cls.__args__:
+ if inspect.isenumattribute(a):
+ literal_args.append(_format_literal_enum_arg(a, mode=mode))
+ else:
+ literal_args.append(repr(a))
+ text += r"\ [%s]" % ', '.join(literal_args)
+ del literal_args
+ elif cls.__args__:
+ text += r"\ [%s]" % ", ".join(restify(a, mode) for a in cls.__args__)
+
+ return text
+ elif isinstance(cls, typing._SpecialForm):
+ return f':py:obj:`~{cls.__module__}.{cls._name}`' # type: ignore[attr-defined]
+ elif sys.version_info[:2] >= (3, 11) and cls is typing.Any:
+ # handle bpo-46998
+ return f':py:obj:`~{cls.__module__}.{cls.__name__}`'
+ elif hasattr(cls, '__qualname__'):
+ if cls.__module__ == 'typing':
+ return f':py:class:`~{cls.__module__}.{cls.__qualname__}`'
+ else:
+ return f':py:class:`{modprefix}{cls.__module__}.{cls.__qualname__}`'
+ elif isinstance(cls, ForwardRef):
+ return ':py:class:`%s`' % cls.__forward_arg__
+ else:
+ # not a class (ex. TypeVar)
+ if cls.__module__ == 'typing':
+ return f':py:obj:`~{cls.__module__}.{cls.__name__}`'
+ else:
+ return f':py:obj:`{modprefix}{cls.__module__}.{cls.__name__}`'
+ except (AttributeError, TypeError):
+ return inspect.object_description(cls)
+
+
+def stringify_annotation(
+ annotation: Any,
+ /,
+ mode: str = 'fully-qualified-except-typing',
+) -> str:
+ """Stringify type annotation object.
+
+ :param annotation: The annotation to stringified.
+ :param mode: Specify a method how annotations will be stringified.
+
+ 'fully-qualified-except-typing'
+ Show the module name and qualified name of the annotation except
+ the "typing" module.
+ 'smart'
+ Show the name of the annotation.
+ 'fully-qualified'
+ Show the module name and qualified name of the annotation.
+ """
+ from sphinx.ext.autodoc.mock import ismock, ismockmodule # lazy loading
+ from sphinx.util.inspect import isNewType # lazy loading
+
+ if mode not in {'fully-qualified-except-typing', 'fully-qualified', 'smart'}:
+ msg = ("'mode' must be one of 'fully-qualified-except-typing', "
+ f"'fully-qualified', or 'smart'; got {mode!r}.")
+ raise ValueError(msg)
+
+ if mode == 'smart':
+ module_prefix = '~'
+ else:
+ module_prefix = ''
+
+ annotation_qualname = getattr(annotation, '__qualname__', '')
+ annotation_module = getattr(annotation, '__module__', '')
+ annotation_name = getattr(annotation, '__name__', '')
+ annotation_module_is_typing = annotation_module == 'typing'
+
+ if isinstance(annotation, str):
+ if annotation.startswith("'") and annotation.endswith("'"):
+ # might be a double Forward-ref'ed type. Go unquoting.
+ return annotation[1:-1]
+ else:
+ return annotation
+ elif isinstance(annotation, TypeVar):
+ if annotation_module_is_typing and mode in {'fully-qualified-except-typing', 'smart'}:
+ return annotation_name
+ else:
+ return module_prefix + f'{annotation_module}.{annotation_name}'
+ elif isNewType(annotation):
+ if sys.version_info[:2] >= (3, 10):
+ # newtypes have correct module info since Python 3.10+
+ return module_prefix + f'{annotation_module}.{annotation_name}'
+ else:
+ return annotation_name
+ elif not annotation:
+ return repr(annotation)
+ elif annotation is NoneType:
+ return 'None'
+ elif ismockmodule(annotation):
+ return module_prefix + annotation_name
+ elif ismock(annotation):
+ return module_prefix + f'{annotation_module}.{annotation_name}'
+ elif is_invalid_builtin_class(annotation):
+ return module_prefix + INVALID_BUILTIN_CLASSES[annotation]
+ elif str(annotation).startswith('typing.Annotated'): # for py310+
+ pass
+ elif annotation_module == 'builtins' and annotation_qualname:
+ if (args := getattr(annotation, '__args__', None)) is not None: # PEP 585 generic
+ if not args: # Empty tuple, list, ...
+ return repr(annotation)
+
+ concatenated_args = ', '.join(stringify_annotation(arg, mode) for arg in args)
+ return f'{annotation_qualname}[{concatenated_args}]'
+ else:
+ return annotation_qualname
+ elif annotation is Ellipsis:
+ return '...'
+
+ module_prefix = f'{annotation_module}.'
+ annotation_forward_arg = getattr(annotation, '__forward_arg__', None)
+ if annotation_qualname or (annotation_module_is_typing and not annotation_forward_arg):
+ if mode == 'smart':
+ module_prefix = '~' + module_prefix
+ if annotation_module_is_typing and mode == 'fully-qualified-except-typing':
+ module_prefix = ''
+ else:
+ module_prefix = ''
+
+ if annotation_module_is_typing:
+ if annotation_forward_arg:
+ # handle ForwardRefs
+ qualname = annotation_forward_arg
+ else:
+ _name = getattr(annotation, '_name', '')
+ if _name:
+ qualname = _name
+ elif annotation_qualname:
+ qualname = annotation_qualname
+ else:
+ qualname = stringify_annotation(
+ annotation.__origin__, 'fully-qualified-except-typing',
+ ).replace('typing.', '') # ex. Union
+ elif annotation_qualname:
+ qualname = annotation_qualname
+ elif hasattr(annotation, '__origin__'):
+ # instantiated generic provided by a user
+ qualname = stringify_annotation(annotation.__origin__, mode)
+ elif UnionType and isinstance(annotation, UnionType): # types.UnionType (for py3.10+)
+ qualname = 'types.UnionType'
+ else:
+ # we weren't able to extract the base type, appending arguments would
+ # only make them appear twice
+ return repr(annotation)
+
+ annotation_args = getattr(annotation, '__args__', None)
+ if annotation_args:
+ if not isinstance(annotation_args, (list, tuple)):
+ # broken __args__ found
+ pass
+ elif qualname in {'Optional', 'Union', 'types.UnionType'}:
+ return ' | '.join(stringify_annotation(a, mode) for a in annotation_args)
+ elif qualname == 'Callable':
+ args = ', '.join(stringify_annotation(a, mode) for a in annotation_args[:-1])
+ returns = stringify_annotation(annotation_args[-1], mode)
+ return f'{module_prefix}Callable[[{args}], {returns}]'
+ elif qualname == 'Literal':
+ from sphinx.util.inspect import isenumattribute # lazy loading
+
+ def format_literal_arg(arg):
+ if isenumattribute(arg):
+ enumcls = arg.__class__
+
+ if mode == 'smart':
+ # MyEnum.member
+ return f'{enumcls.__qualname__}.{arg.name}'
+
+ # module.MyEnum.member
+ return f'{enumcls.__module__}.{enumcls.__qualname__}.{arg.name}'
+ return repr(arg)
+
+ args = ', '.join(map(format_literal_arg, annotation_args))
+ return f'{module_prefix}Literal[{args}]'
+ elif str(annotation).startswith('typing.Annotated'): # for py39+
+ return stringify_annotation(annotation_args[0], mode)
+ elif all(is_system_TypeVar(a) for a in annotation_args):
+ # Suppress arguments if all system defined TypeVars (ex. Dict[KT, VT])
+ return module_prefix + qualname
+ else:
+ args = ', '.join(stringify_annotation(a, mode) for a in annotation_args)
+ return f'{module_prefix}{qualname}[{args}]'
+
+ return module_prefix + qualname
+
+
+def _format_literal_enum_arg(arg: enum.Enum, /, *, mode: str) -> str:
+ enum_cls = arg.__class__
+ if mode == 'smart' or enum_cls.__module__ == 'typing':
+ return f':py:attr:`~{enum_cls.__module__}.{enum_cls.__qualname__}.{arg.name}`'
+ else:
+ return f':py:attr:`{enum_cls.__module__}.{enum_cls.__qualname__}.{arg.name}`'
+
+
+# deprecated name -> (object to return, canonical path or empty string)
+_DEPRECATED_OBJECTS = {
+ 'stringify': (stringify_annotation, 'sphinx.util.typing.stringify_annotation'),
+}
+
+
+def __getattr__(name):
+ if name not in _DEPRECATED_OBJECTS:
+ msg = f'module {__name__!r} has no attribute {name!r}'
+ raise AttributeError(msg)
+
+ from sphinx.deprecation import _deprecation_warning
+
+ deprecated_object, canonical_name = _DEPRECATED_OBJECTS[name]
+ _deprecation_warning(__name__, name, canonical_name, remove=(8, 0))
+ return deprecated_object