diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:25:40 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:25:40 +0000 |
commit | cf7da1843c45a4c2df7a749f7886a2d2ba0ee92a (patch) | |
tree | 18dcde1a8d1f5570a77cd0c361de3b490d02c789 /sphinx/util/inventory.py | |
parent | Initial commit. (diff) | |
download | sphinx-upstream/7.2.6.tar.xz sphinx-upstream/7.2.6.zip |
Adding upstream version 7.2.6.upstream/7.2.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sphinx/util/inventory.py')
-rw-r--r-- | sphinx/util/inventory.py | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py new file mode 100644 index 0000000..89f0070 --- /dev/null +++ b/sphinx/util/inventory.py @@ -0,0 +1,172 @@ +"""Inventory utility functions for Sphinx.""" +from __future__ import annotations + +import os +import re +import zlib +from typing import IO, TYPE_CHECKING, Callable + +from sphinx.util import logging + +BUFSIZE = 16 * 1024 +logger = logging.getLogger(__name__) + +if TYPE_CHECKING: + from collections.abc import Iterator + + from sphinx.builders import Builder + from sphinx.environment import BuildEnvironment + from sphinx.util.typing import Inventory, InventoryItem + + +class InventoryFileReader: + """A file reader for an inventory file. + + This reader supports mixture of texts and compressed texts. + """ + + def __init__(self, stream: IO) -> None: + self.stream = stream + self.buffer = b'' + self.eof = False + + def read_buffer(self) -> None: + chunk = self.stream.read(BUFSIZE) + if chunk == b'': + self.eof = True + self.buffer += chunk + + def readline(self) -> str: + pos = self.buffer.find(b'\n') + if pos != -1: + line = self.buffer[:pos].decode() + self.buffer = self.buffer[pos + 1:] + elif self.eof: + line = self.buffer.decode() + self.buffer = b'' + else: + self.read_buffer() + line = self.readline() + + return line + + def readlines(self) -> Iterator[str]: + while not self.eof: + line = self.readline() + if line: + yield line + + def read_compressed_chunks(self) -> Iterator[bytes]: + decompressor = zlib.decompressobj() + while not self.eof: + self.read_buffer() + yield decompressor.decompress(self.buffer) + self.buffer = b'' + yield decompressor.flush() + + def read_compressed_lines(self) -> Iterator[str]: + buf = b'' + for chunk in self.read_compressed_chunks(): + buf += chunk + pos = buf.find(b'\n') + while pos != -1: + yield buf[:pos].decode() + buf = buf[pos + 1:] + pos = buf.find(b'\n') + + +class InventoryFile: + @classmethod + def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory: + reader = InventoryFileReader(stream) + line = reader.readline().rstrip() + if line == '# Sphinx inventory version 1': + return cls.load_v1(reader, uri, joinfunc) + elif line == '# Sphinx inventory version 2': + return cls.load_v2(reader, uri, joinfunc) + else: + raise ValueError('invalid inventory header: %s' % line) + + @classmethod + def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory: + invdata: Inventory = {} + projname = stream.readline().rstrip()[11:] + version = stream.readline().rstrip()[11:] + for line in stream.readlines(): + name, type, location = line.rstrip().split(None, 2) + location = join(uri, location) + # version 1 did not add anchors to the location + if type == 'mod': + type = 'py:module' + location += '#module-' + name + else: + type = 'py:' + type + location += '#' + name + invdata.setdefault(type, {})[name] = (projname, version, location, '-') + return invdata + + @classmethod + def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory: + invdata: Inventory = {} + projname = stream.readline().rstrip()[11:] + version = stream.readline().rstrip()[11:] + line = stream.readline() + if 'zlib' not in line: + raise ValueError('invalid inventory header (not compressed): %s' % line) + + for line in stream.read_compressed_lines(): + # be careful to handle names with embedded spaces correctly + m = re.match(r'(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)', + line.rstrip(), flags=re.VERBOSE) + if not m: + continue + name, type, prio, location, dispname = m.groups() + if ':' not in type: + # wrong type value. type should be in the form of "{domain}:{objtype}" + # + # Note: To avoid the regex DoS, this is implemented in python (refs: #8175) + continue + if type == 'py:module' and type in invdata and name in invdata[type]: + # due to a bug in 1.1 and below, + # two inventory entries are created + # for Python modules, and the first + # one is correct + continue + if location.endswith('$'): + location = location[:-1] + name + location = join(uri, location) + inv_item: InventoryItem = projname, version, location, dispname + invdata.setdefault(type, {})[name] = inv_item + return invdata + + @classmethod + def dump(cls, filename: str, env: BuildEnvironment, builder: Builder) -> None: + def escape(string: str) -> str: + return re.sub("\\s+", " ", string) + + with open(os.path.join(filename), 'wb') as f: + # header + f.write(('# Sphinx inventory version 2\n' + '# Project: %s\n' + '# Version: %s\n' + '# The remainder of this file is compressed using zlib.\n' % + (escape(env.config.project), + escape(env.config.version))).encode()) + + # body + compressor = zlib.compressobj(9) + for domainname, domain in sorted(env.domains.items()): + for name, dispname, typ, docname, anchor, prio in \ + sorted(domain.get_objects()): + if anchor.endswith(name): + # this can shorten the inventory by as much as 25% + anchor = anchor[:-len(name)] + '$' + uri = builder.get_target_uri(docname) + if anchor: + uri += '#' + anchor + if dispname == name: + dispname = '-' + entry = ('%s %s:%s %s %s %s\n' % + (name, domainname, typ, prio, uri, dispname)) + f.write(compressor.compress(entry.encode())) + f.write(compressor.flush()) |