summaryrefslogtreecommitdiffstats
path: root/sphinx/util/inventory.py
diff options
context:
space:
mode:
Diffstat (limited to 'sphinx/util/inventory.py')
-rw-r--r--sphinx/util/inventory.py172
1 files changed, 172 insertions, 0 deletions
diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py
new file mode 100644
index 0000000..89f0070
--- /dev/null
+++ b/sphinx/util/inventory.py
@@ -0,0 +1,172 @@
+"""Inventory utility functions for Sphinx."""
+from __future__ import annotations
+
+import os
+import re
+import zlib
+from typing import IO, TYPE_CHECKING, Callable
+
+from sphinx.util import logging
+
+BUFSIZE = 16 * 1024
+logger = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from sphinx.builders import Builder
+ from sphinx.environment import BuildEnvironment
+ from sphinx.util.typing import Inventory, InventoryItem
+
+
+class InventoryFileReader:
+ """A file reader for an inventory file.
+
+ This reader supports mixture of texts and compressed texts.
+ """
+
+ def __init__(self, stream: IO) -> None:
+ self.stream = stream
+ self.buffer = b''
+ self.eof = False
+
+ def read_buffer(self) -> None:
+ chunk = self.stream.read(BUFSIZE)
+ if chunk == b'':
+ self.eof = True
+ self.buffer += chunk
+
+ def readline(self) -> str:
+ pos = self.buffer.find(b'\n')
+ if pos != -1:
+ line = self.buffer[:pos].decode()
+ self.buffer = self.buffer[pos + 1:]
+ elif self.eof:
+ line = self.buffer.decode()
+ self.buffer = b''
+ else:
+ self.read_buffer()
+ line = self.readline()
+
+ return line
+
+ def readlines(self) -> Iterator[str]:
+ while not self.eof:
+ line = self.readline()
+ if line:
+ yield line
+
+ def read_compressed_chunks(self) -> Iterator[bytes]:
+ decompressor = zlib.decompressobj()
+ while not self.eof:
+ self.read_buffer()
+ yield decompressor.decompress(self.buffer)
+ self.buffer = b''
+ yield decompressor.flush()
+
+ def read_compressed_lines(self) -> Iterator[str]:
+ buf = b''
+ for chunk in self.read_compressed_chunks():
+ buf += chunk
+ pos = buf.find(b'\n')
+ while pos != -1:
+ yield buf[:pos].decode()
+ buf = buf[pos + 1:]
+ pos = buf.find(b'\n')
+
+
+class InventoryFile:
+ @classmethod
+ def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory:
+ reader = InventoryFileReader(stream)
+ line = reader.readline().rstrip()
+ if line == '# Sphinx inventory version 1':
+ return cls.load_v1(reader, uri, joinfunc)
+ elif line == '# Sphinx inventory version 2':
+ return cls.load_v2(reader, uri, joinfunc)
+ else:
+ raise ValueError('invalid inventory header: %s' % line)
+
+ @classmethod
+ def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
+ invdata: Inventory = {}
+ projname = stream.readline().rstrip()[11:]
+ version = stream.readline().rstrip()[11:]
+ for line in stream.readlines():
+ name, type, location = line.rstrip().split(None, 2)
+ location = join(uri, location)
+ # version 1 did not add anchors to the location
+ if type == 'mod':
+ type = 'py:module'
+ location += '#module-' + name
+ else:
+ type = 'py:' + type
+ location += '#' + name
+ invdata.setdefault(type, {})[name] = (projname, version, location, '-')
+ return invdata
+
+ @classmethod
+ def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
+ invdata: Inventory = {}
+ projname = stream.readline().rstrip()[11:]
+ version = stream.readline().rstrip()[11:]
+ line = stream.readline()
+ if 'zlib' not in line:
+ raise ValueError('invalid inventory header (not compressed): %s' % line)
+
+ for line in stream.read_compressed_lines():
+ # be careful to handle names with embedded spaces correctly
+ m = re.match(r'(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)',
+ line.rstrip(), flags=re.VERBOSE)
+ if not m:
+ continue
+ name, type, prio, location, dispname = m.groups()
+ if ':' not in type:
+ # wrong type value. type should be in the form of "{domain}:{objtype}"
+ #
+ # Note: To avoid the regex DoS, this is implemented in python (refs: #8175)
+ continue
+ if type == 'py:module' and type in invdata and name in invdata[type]:
+ # due to a bug in 1.1 and below,
+ # two inventory entries are created
+ # for Python modules, and the first
+ # one is correct
+ continue
+ if location.endswith('$'):
+ location = location[:-1] + name
+ location = join(uri, location)
+ inv_item: InventoryItem = projname, version, location, dispname
+ invdata.setdefault(type, {})[name] = inv_item
+ return invdata
+
+ @classmethod
+ def dump(cls, filename: str, env: BuildEnvironment, builder: Builder) -> None:
+ def escape(string: str) -> str:
+ return re.sub("\\s+", " ", string)
+
+ with open(os.path.join(filename), 'wb') as f:
+ # header
+ f.write(('# Sphinx inventory version 2\n'
+ '# Project: %s\n'
+ '# Version: %s\n'
+ '# The remainder of this file is compressed using zlib.\n' %
+ (escape(env.config.project),
+ escape(env.config.version))).encode())
+
+ # body
+ compressor = zlib.compressobj(9)
+ for domainname, domain in sorted(env.domains.items()):
+ for name, dispname, typ, docname, anchor, prio in \
+ sorted(domain.get_objects()):
+ if anchor.endswith(name):
+ # this can shorten the inventory by as much as 25%
+ anchor = anchor[:-len(name)] + '$'
+ uri = builder.get_target_uri(docname)
+ if anchor:
+ uri += '#' + anchor
+ if dispname == name:
+ dispname = '-'
+ entry = ('%s %s:%s %s %s %s\n' %
+ (name, domainname, typ, prio, uri, dispname))
+ f.write(compressor.compress(entry.encode()))
+ f.write(compressor.flush())