summaryrefslogtreecommitdiffstats
path: root/src/installer
diff options
context:
space:
mode:
Diffstat (limited to 'src/installer')
-rw-r--r--src/installer/__init__.py6
-rw-r--r--src/installer/__main__.py98
-rw-r--r--src/installer/_core.py135
-rw-r--r--src/installer/_scripts/__init__.py1
-rw-r--r--src/installer/destinations.py284
-rw-r--r--src/installer/exceptions.py9
-rw-r--r--src/installer/py.typed0
-rw-r--r--src/installer/records.py217
-rw-r--r--src/installer/scripts.py151
-rw-r--r--src/installer/sources.py170
-rw-r--r--src/installer/utils.py252
11 files changed, 1323 insertions, 0 deletions
diff --git a/src/installer/__init__.py b/src/installer/__init__.py
new file mode 100644
index 0000000..aa8e244
--- /dev/null
+++ b/src/installer/__init__.py
@@ -0,0 +1,6 @@
+"""A library for installing Python wheels."""
+
+__version__ = "0.6.0"
+__all__ = ["install"]
+
+from installer._core import install # noqa
diff --git a/src/installer/__main__.py b/src/installer/__main__.py
new file mode 100644
index 0000000..51014b9
--- /dev/null
+++ b/src/installer/__main__.py
@@ -0,0 +1,98 @@
+"""Installer CLI."""
+
+import argparse
+import os.path
+import sys
+import sysconfig
+from typing import Dict, Optional, Sequence
+
+import installer
+from installer.destinations import SchemeDictionaryDestination
+from installer.sources import WheelFile
+from installer.utils import get_launcher_kind
+
+
+def _get_main_parser() -> argparse.ArgumentParser:
+ """Construct the main parser."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument("wheel", type=str, help="wheel file to install")
+ parser.add_argument(
+ "--destdir",
+ "-d",
+ metavar="path",
+ type=str,
+ help="destination directory (prefix to prepend to each file)",
+ )
+ parser.add_argument(
+ "--prefix",
+ "-p",
+ metavar="path",
+ type=str,
+ help="override prefix to install packages to",
+ )
+ parser.add_argument(
+ "--compile-bytecode",
+ action="append",
+ metavar="level",
+ type=int,
+ choices=[0, 1, 2],
+ help="generate bytecode for the specified optimization level(s) (default=0, 1)",
+ )
+ parser.add_argument(
+ "--no-compile-bytecode",
+ action="store_true",
+ help="don't generate bytecode for installed modules",
+ )
+ return parser
+
+
+def _get_scheme_dict(
+ distribution_name: str, prefix: Optional[str] = None
+) -> Dict[str, str]:
+ """Calculate the scheme dictionary for the current Python environment."""
+ vars = {}
+ if prefix is None:
+ installed_base = sysconfig.get_config_var("base")
+ assert installed_base
+ else:
+ vars["base"] = vars["platbase"] = installed_base = prefix
+
+ scheme_dict = sysconfig.get_paths(vars=vars)
+
+ # calculate 'headers' path, not currently in sysconfig - see
+ # https://bugs.python.org/issue44445. This is based on what distutils does.
+ # TODO: figure out original vs normalised distribution names
+ scheme_dict["headers"] = os.path.join(
+ sysconfig.get_path("include", vars={"installed_base": installed_base}),
+ distribution_name,
+ )
+
+ return scheme_dict
+
+
+def _main(cli_args: Sequence[str], program: Optional[str] = None) -> None:
+ """Process arguments and perform the install."""
+ parser = _get_main_parser()
+ if program:
+ parser.prog = program
+ args = parser.parse_args(cli_args)
+
+ bytecode_levels = args.compile_bytecode
+ if args.no_compile_bytecode:
+ bytecode_levels = []
+ elif not bytecode_levels:
+ bytecode_levels = [0, 1]
+
+ with WheelFile.open(args.wheel) as source:
+ destination = SchemeDictionaryDestination(
+ scheme_dict=_get_scheme_dict(source.distribution, prefix=args.prefix),
+ interpreter=sys.executable,
+ script_kind=get_launcher_kind(),
+ bytecode_optimization_levels=bytecode_levels,
+ destdir=args.destdir,
+ )
+ installer.install(source, destination, {})
+
+
+if __name__ == "__main__": # pragma: no cover
+ _main(sys.argv[1:], "python -m installer")
diff --git a/src/installer/_core.py b/src/installer/_core.py
new file mode 100644
index 0000000..9a02728
--- /dev/null
+++ b/src/installer/_core.py
@@ -0,0 +1,135 @@
+"""Core wheel installation logic."""
+
+import posixpath
+from io import BytesIO
+from typing import Dict, Tuple, cast
+
+from installer.destinations import WheelDestination
+from installer.exceptions import InvalidWheelSource
+from installer.records import RecordEntry
+from installer.sources import WheelSource
+from installer.utils import SCHEME_NAMES, Scheme, parse_entrypoints, parse_metadata_file
+
+__all__ = ["install"]
+
+
+def _process_WHEEL_file(source: WheelSource) -> Scheme:
+ """Process the WHEEL file, from ``source``.
+
+ Returns the scheme that the archive root should go in.
+ """
+ stream = source.read_dist_info("WHEEL")
+ metadata = parse_metadata_file(stream)
+
+ # Ensure compatibility with this wheel version.
+ if not (metadata["Wheel-Version"] and metadata["Wheel-Version"].startswith("1.")):
+ message = "Incompatible Wheel-Version {}, only support version 1.x wheels."
+ raise InvalidWheelSource(source, message.format(metadata["Wheel-Version"]))
+
+ # Determine where archive root should go.
+ if metadata["Root-Is-Purelib"] == "true":
+ return cast(Scheme, "purelib")
+ else:
+ return cast(Scheme, "platlib")
+
+
+def _determine_scheme(
+ path: str, source: WheelSource, root_scheme: Scheme
+) -> Tuple[Scheme, str]:
+ """Determine which scheme to place given path in, from source."""
+ data_dir = source.data_dir
+
+ # If it's in not `{distribution}-{version}.data`, then it's in root_scheme.
+ if posixpath.commonprefix([data_dir, path]) != data_dir:
+ return root_scheme, path
+
+ # Figure out which scheme this goes to.
+ parts = []
+ scheme_name = None
+ left = path
+ while True:
+ left, right = posixpath.split(left)
+ parts.append(right)
+ if left == source.data_dir:
+ scheme_name = right
+ break
+
+ if scheme_name not in SCHEME_NAMES:
+ msg_fmt = "{path} is not contained in a valid .data subdirectory."
+ raise InvalidWheelSource(source, msg_fmt.format(path=path))
+
+ return cast(Scheme, scheme_name), posixpath.join(*reversed(parts[:-1]))
+
+
+def install(
+ source: WheelSource,
+ destination: WheelDestination,
+ additional_metadata: Dict[str, bytes],
+) -> None:
+ """Install wheel described by ``source`` into ``destination``.
+
+ :param source: wheel to install.
+ :param destination: where to write the wheel.
+ :param additional_metadata: additional metadata files to generate, usually
+ generated by the caller.
+
+ """
+ root_scheme = _process_WHEEL_file(source)
+
+ # RECORD handling
+ record_file_path = posixpath.join(source.dist_info_dir, "RECORD")
+ written_records = []
+
+ # Write the entry_points based scripts.
+ if "entry_points.txt" in source.dist_info_filenames:
+ entrypoints_text = source.read_dist_info("entry_points.txt")
+ for name, module, attr, section in parse_entrypoints(entrypoints_text):
+ record = destination.write_script(
+ name=name,
+ module=module,
+ attr=attr,
+ section=section,
+ )
+ written_records.append((Scheme("scripts"), record))
+
+ # Write all the files from the wheel.
+ for record_elements, stream, is_executable in source.get_contents():
+ source_record = RecordEntry.from_elements(*record_elements)
+ path = source_record.path
+ # Skip the RECORD, which is written at the end, based on this info.
+ if path == record_file_path:
+ continue
+
+ # Figure out where to write this file.
+ scheme, destination_path = _determine_scheme(
+ path=path,
+ source=source,
+ root_scheme=root_scheme,
+ )
+ record = destination.write_file(
+ scheme=scheme,
+ path=destination_path,
+ stream=stream,
+ is_executable=is_executable,
+ )
+ written_records.append((scheme, record))
+
+ # Write all the installation-specific metadata
+ for filename, contents in additional_metadata.items():
+ path = posixpath.join(source.dist_info_dir, filename)
+
+ with BytesIO(contents) as other_stream:
+ record = destination.write_file(
+ scheme=root_scheme,
+ path=path,
+ stream=other_stream,
+ is_executable=False,
+ )
+ written_records.append((root_scheme, record))
+
+ written_records.append((root_scheme, RecordEntry(record_file_path, None, None)))
+ destination.finalize_installation(
+ scheme=root_scheme,
+ record_file_path=record_file_path,
+ records=written_records,
+ )
diff --git a/src/installer/_scripts/__init__.py b/src/installer/_scripts/__init__.py
new file mode 100644
index 0000000..0361a58
--- /dev/null
+++ b/src/installer/_scripts/__init__.py
@@ -0,0 +1 @@
+"""Internal package, containing launcher templates for ``installer.scripts``."""
diff --git a/src/installer/destinations.py b/src/installer/destinations.py
new file mode 100644
index 0000000..a3c1967
--- /dev/null
+++ b/src/installer/destinations.py
@@ -0,0 +1,284 @@
+"""Handles all file writing and post-installation processing."""
+
+import compileall
+import io
+import os
+from pathlib import Path
+from typing import (
+ TYPE_CHECKING,
+ BinaryIO,
+ Collection,
+ Dict,
+ Iterable,
+ Optional,
+ Tuple,
+ Union,
+)
+
+from installer.records import Hash, RecordEntry
+from installer.scripts import Script
+from installer.utils import (
+ Scheme,
+ construct_record_file,
+ copyfileobj_with_hashing,
+ fix_shebang,
+ make_file_executable,
+)
+
+if TYPE_CHECKING:
+ from installer.scripts import LauncherKind, ScriptSection
+
+
+class WheelDestination:
+ """Handles writing the unpacked files, script generation and ``RECORD`` generation.
+
+ Subclasses provide the concrete script generation logic, as well as the RECORD file
+ (re)writing.
+ """
+
+ def write_script(
+ self, name: str, module: str, attr: str, section: "ScriptSection"
+ ) -> RecordEntry:
+ """Write a script in the correct location to invoke given entry point.
+
+ :param name: name of the script
+ :param module: module path, to load the entry point from
+ :param attr: final attribute access, for the entry point
+ :param section: Denotes the "entry point section" where this was specified.
+ Valid values are ``"gui"`` and ``"console"``.
+ :type section: str
+
+ Example usage/behaviour::
+
+ >>> dest.write_script("pip", "pip._internal.cli", "main", "console")
+
+ """
+ raise NotImplementedError
+
+ def write_file(
+ self,
+ scheme: Scheme,
+ path: Union[str, "os.PathLike[str]"],
+ stream: BinaryIO,
+ is_executable: bool,
+ ) -> RecordEntry:
+ """Write a file to correct ``path`` within the ``scheme``.
+
+ :param scheme: scheme to write the file in (like "purelib", "platlib" etc).
+ :param path: path within that scheme
+ :param stream: contents of the file
+ :param is_executable: whether the file should be made executable
+
+ The stream would be closed by the caller, after this call.
+
+ Example usage/behaviour::
+
+ >>> with open("__init__.py") as stream:
+ ... dest.write_file("purelib", "pkg/__init__.py", stream)
+
+ """
+ raise NotImplementedError
+
+ def finalize_installation(
+ self,
+ scheme: Scheme,
+ record_file_path: str,
+ records: Iterable[Tuple[Scheme, RecordEntry]],
+ ) -> None:
+ """Finalize installation, after all the files are written.
+
+ Handles (re)writing of the ``RECORD`` file.
+
+ :param scheme: scheme to write the ``RECORD`` file in
+ :param record_file_path: path of the ``RECORD`` file with that scheme
+ :param records: entries to write to the ``RECORD`` file
+
+ Example usage/behaviour::
+
+ >>> dest.finalize_installation("purelib")
+
+ """
+ raise NotImplementedError
+
+
+class SchemeDictionaryDestination(WheelDestination):
+ """Destination, based on a mapping of {scheme: file-system-path}."""
+
+ def __init__(
+ self,
+ scheme_dict: Dict[str, str],
+ interpreter: str,
+ script_kind: "LauncherKind",
+ hash_algorithm: str = "sha256",
+ bytecode_optimization_levels: Collection[int] = (),
+ destdir: Optional[str] = None,
+ ) -> None:
+ """Construct a ``SchemeDictionaryDestination`` object.
+
+ :param scheme_dict: a mapping of {scheme: file-system-path}
+ :param interpreter: the interpreter to use for generating scripts
+ :param script_kind: the "kind" of launcher script to use
+ :param hash_algorithm: the hashing algorithm to use, which is a member
+ of :any:`hashlib.algorithms_available` (ideally from
+ :any:`hashlib.algorithms_guaranteed`).
+ :param bytecode_optimization_levels: Compile cached bytecode for
+ installed .py files with these optimization levels. The bytecode
+ is specific to the minor version of Python (e.g. 3.10) used to
+ generate it.
+ :param destdir: A staging directory in which to write all files. This
+ is expected to be the filesystem root at runtime, so embedded paths
+ will be written as though this was the root.
+ """
+ self.scheme_dict = scheme_dict
+ self.interpreter = interpreter
+ self.script_kind = script_kind
+ self.hash_algorithm = hash_algorithm
+ self.bytecode_optimization_levels = bytecode_optimization_levels
+ self.destdir = destdir
+
+ def _path_with_destdir(self, scheme: Scheme, path: str) -> str:
+ file = os.path.join(self.scheme_dict[scheme], path)
+ if self.destdir is not None:
+ file_path = Path(file)
+ rel_path = file_path.relative_to(file_path.anchor)
+ return os.path.join(self.destdir, rel_path)
+ return file
+
+ def write_to_fs(
+ self,
+ scheme: Scheme,
+ path: str,
+ stream: BinaryIO,
+ is_executable: bool,
+ ) -> RecordEntry:
+ """Write contents of ``stream`` to the correct location on the filesystem.
+
+ :param scheme: scheme to write the file in (like "purelib", "platlib" etc).
+ :param path: path within that scheme
+ :param stream: contents of the file
+ :param is_executable: whether the file should be made executable
+
+ - Ensures that an existing file is not being overwritten.
+ - Hashes the written content, to determine the entry in the ``RECORD`` file.
+ """
+ target_path = self._path_with_destdir(scheme, path)
+ if os.path.exists(target_path):
+ message = f"File already exists: {target_path}"
+ raise FileExistsError(message)
+
+ parent_folder = os.path.dirname(target_path)
+ if not os.path.exists(parent_folder):
+ os.makedirs(parent_folder)
+
+ with open(target_path, "wb") as f:
+ hash_, size = copyfileobj_with_hashing(stream, f, self.hash_algorithm)
+
+ if is_executable:
+ make_file_executable(target_path)
+
+ return RecordEntry(path, Hash(self.hash_algorithm, hash_), size)
+
+ def write_file(
+ self,
+ scheme: Scheme,
+ path: Union[str, "os.PathLike[str]"],
+ stream: BinaryIO,
+ is_executable: bool,
+ ) -> RecordEntry:
+ """Write a file to correct ``path`` within the ``scheme``.
+
+ :param scheme: scheme to write the file in (like "purelib", "platlib" etc).
+ :param path: path within that scheme
+ :param stream: contents of the file
+ :param is_executable: whether the file should be made executable
+
+ - Changes the shebang for files in the "scripts" scheme.
+ - Uses :py:meth:`SchemeDictionaryDestination.write_to_fs` for the
+ filesystem interaction.
+ """
+ path_ = os.fspath(path)
+
+ if scheme == "scripts":
+ with fix_shebang(stream, self.interpreter) as stream_with_different_shebang:
+ return self.write_to_fs(
+ scheme, path_, stream_with_different_shebang, is_executable
+ )
+
+ return self.write_to_fs(scheme, path_, stream, is_executable)
+
+ def write_script(
+ self, name: str, module: str, attr: str, section: "ScriptSection"
+ ) -> RecordEntry:
+ """Write a script to invoke an entrypoint.
+
+ :param name: name of the script
+ :param module: module path, to load the entry point from
+ :param attr: final attribute access, for the entry point
+ :param section: Denotes the "entry point section" where this was specified.
+ Valid values are ``"gui"`` and ``"console"``.
+ :type section: str
+
+ - Generates a launcher using :any:`Script.generate`.
+ - Writes to the "scripts" scheme.
+ - Uses :py:meth:`SchemeDictionaryDestination.write_to_fs` for the
+ filesystem interaction.
+ """
+ script = Script(name, module, attr, section)
+ script_name, data = script.generate(self.interpreter, self.script_kind)
+
+ with io.BytesIO(data) as stream:
+ entry = self.write_to_fs(
+ Scheme("scripts"), script_name, stream, is_executable=True
+ )
+
+ path = self._path_with_destdir(Scheme("scripts"), script_name)
+ mode = os.stat(path).st_mode
+ mode |= (mode & 0o444) >> 2
+ os.chmod(path, mode)
+
+ return entry
+
+ def _compile_bytecode(self, scheme: Scheme, record: RecordEntry) -> None:
+ """Compile bytecode for a single .py file."""
+ if scheme not in ("purelib", "platlib"):
+ return
+
+ target_path = self._path_with_destdir(scheme, record.path)
+ dir_path_to_embed = os.path.dirname( # Without destdir
+ os.path.join(self.scheme_dict[scheme], record.path)
+ )
+ for level in self.bytecode_optimization_levels:
+ compileall.compile_file(
+ target_path, optimize=level, quiet=1, ddir=dir_path_to_embed
+ )
+
+ def finalize_installation(
+ self,
+ scheme: Scheme,
+ record_file_path: str,
+ records: Iterable[Tuple[Scheme, RecordEntry]],
+ ) -> None:
+ """Finalize installation, by writing the ``RECORD`` file & compiling bytecode.
+
+ :param scheme: scheme to write the ``RECORD`` file in
+ :param record_file_path: path of the ``RECORD`` file with that scheme
+ :param records: entries to write to the ``RECORD`` file
+ """
+
+ def prefix_for_scheme(file_scheme: str) -> Optional[str]:
+ if file_scheme == scheme:
+ return None
+ path = os.path.relpath(
+ self.scheme_dict[file_scheme],
+ start=self.scheme_dict[scheme],
+ )
+ return path + "/"
+
+ record_list = list(records)
+ with construct_record_file(record_list, prefix_for_scheme) as record_stream:
+ self.write_to_fs(
+ scheme, record_file_path, record_stream, is_executable=False
+ )
+
+ for scheme, record in record_list:
+ self._compile_bytecode(scheme, record)
diff --git a/src/installer/exceptions.py b/src/installer/exceptions.py
new file mode 100644
index 0000000..01f044a
--- /dev/null
+++ b/src/installer/exceptions.py
@@ -0,0 +1,9 @@
+"""Errors raised from this package."""
+
+
+class InstallerError(Exception):
+ """All exceptions raised from this package's code."""
+
+
+class InvalidWheelSource(InstallerError):
+ """When a wheel source violates a contract, or is not supported."""
diff --git a/src/installer/py.typed b/src/installer/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/installer/py.typed
diff --git a/src/installer/records.py b/src/installer/records.py
new file mode 100644
index 0000000..36c37d0
--- /dev/null
+++ b/src/installer/records.py
@@ -0,0 +1,217 @@
+"""Provides an object-oriented model for handling :pep:`376` RECORD files."""
+
+import base64
+import csv
+import hashlib
+import os
+from typing import Iterable, Iterator, Optional, Tuple, cast
+
+__all__ = [
+ "Hash",
+ "RecordEntry",
+ "InvalidRecordEntry",
+ "parse_record_file",
+]
+
+
+class InvalidRecordEntry(Exception):
+ """Raised when a RecordEntry is not valid, due to improper element values or count."""
+
+ def __init__(self, elements, issues): # noqa: D107
+ super().__init__(", ".join(issues))
+ self.issues = issues
+ self.elements = elements
+
+ def __repr__(self):
+ return "InvalidRecordEntry(elements={!r}, issues={!r})".format(
+ self.elements, self.issues
+ )
+
+
+class Hash:
+ """Represents the "hash" element of a RecordEntry."""
+
+ def __init__(self, name: str, value: str) -> None:
+ """Construct a ``Hash`` object.
+
+ Most consumers should use :py:meth:`Hash.parse` instead, since no
+ validation or parsing is performed by this constructor.
+
+ :param name: name of the hash function
+ :param value: hashed value
+ """
+ self.name = name
+ self.value = value
+
+ def __str__(self) -> str:
+ return f"{self.name}={self.value}"
+
+ def __repr__(self) -> str:
+ return f"Hash(name={self.name!r}, value={self.value!r})"
+
+ def __eq__(self, other):
+ if not isinstance(other, Hash):
+ return NotImplemented
+ return self.value == other.value and self.name == other.name
+
+ def validate(self, data: bytes) -> bool:
+ """Validate that ``data`` matches this instance.
+
+ :param data: Contents of the file.
+ :return: Whether ``data`` matches the hashed value.
+ """
+ digest = hashlib.new(self.name, data).digest()
+ value = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
+ return self.value == value
+
+ @classmethod
+ def parse(cls, h: str) -> "Hash":
+ """Build a Hash object, from a "name=value" string.
+
+ This accepts a string of the format for the second element in a record,
+ as described in :pep:`376`.
+
+ Typical usage::
+
+ Hash.parse("sha256=Y0sCextp4SQtQNU-MSs7SsdxD1W-gfKJtUlEbvZ3i-4")
+
+ :param h: a name=value string
+ """
+ name, value = h.split("=", 1)
+ return cls(name, value)
+
+
+class RecordEntry:
+ """Represents a single record in a RECORD file.
+
+ A list of :py:class:`RecordEntry` objects fully represents a RECORD file.
+ """
+
+ def __init__(self, path: str, hash_: Optional[Hash], size: Optional[int]) -> None:
+ r"""Construct a ``RecordEntry`` object.
+
+ Most consumers should use :py:meth:`RecordEntry.from_elements`, since no
+ validation or parsing is performed by this constructor.
+
+ :param path: file's path
+ :param hash\_: hash of the file's contents
+ :param size: file's size in bytes
+ """
+ super().__init__()
+
+ self.path = path
+ self.hash_ = hash_
+ self.size = size
+
+ def to_row(self, path_prefix: Optional[str] = None) -> Tuple[str, str, str]:
+ """Convert this into a 3-element tuple that can be written in a RECORD file.
+
+ :param path_prefix: A prefix to attach to the path -- must end in `/`
+ :return: a (path, hash, size) row
+ """
+ if path_prefix is not None:
+ assert path_prefix.endswith("/")
+ path = path_prefix + self.path
+ else:
+ path = self.path
+
+ # Convert Windows paths to use / for consistency
+ if os.sep == "\\":
+ path = path.replace("\\", "/") # pragma: no cover
+
+ return (
+ path,
+ str(self.hash_ or ""),
+ str(self.size) if self.size is not None else "",
+ )
+
+ def __repr__(self) -> str:
+ return "RecordEntry(path={!r}, hash_={!r}, size={!r})".format(
+ self.path, self.hash_, self.size
+ )
+
+ def __eq__(self, other):
+ if not isinstance(other, RecordEntry):
+ return NotImplemented
+ return (
+ self.path == other.path
+ and self.hash_ == other.hash_
+ and self.size == other.size
+ )
+
+ def validate(self, data: bytes) -> bool:
+ """Validate that ``data`` matches this instance.
+
+ :param data: Contents of the file corresponding to this instance.
+ :return: whether ``data`` matches hash and size.
+ """
+ if self.size is not None and len(data) != self.size:
+ return False
+
+ if self.hash_:
+ return self.hash_.validate(data)
+
+ return True
+
+ @classmethod
+ def from_elements(cls, path: str, hash_: str, size: str) -> "RecordEntry":
+ r"""Build a RecordEntry object, from values of the elements.
+
+ Typical usage::
+
+ for row in parse_record_file(f):
+ record = RecordEntry.from_elements(row[0], row[1], row[2])
+
+ Meaning of each element is specified in :pep:`376`.
+
+ :param path: first element (file's path)
+ :param hash\_: second element (hash of the file's contents)
+ :param size: third element (file's size in bytes)
+ :raises InvalidRecordEntry: if any element is invalid
+ """
+ # Validate the passed values.
+ issues = []
+
+ if not path:
+ issues.append("`path` cannot be empty")
+
+ if hash_:
+ try:
+ hash_value: Optional[Hash] = Hash.parse(hash_)
+ except ValueError:
+ issues.append("`hash` does not follow the required format")
+ else:
+ hash_value = None
+
+ if size:
+ try:
+ size_value: Optional[int] = int(size)
+ except ValueError:
+ issues.append("`size` cannot be non-integer")
+ else:
+ size_value = None
+
+ if issues:
+ raise InvalidRecordEntry(elements=(path, hash_, size), issues=issues)
+
+ return cls(path=path, hash_=hash_value, size=size_value)
+
+
+def parse_record_file(rows: Iterable[str]) -> Iterator[Tuple[str, str, str]]:
+ """Parse a :pep:`376` RECORD.
+
+ Returns an iterable of 3-value tuples, that can be passed to
+ :any:`RecordEntry.from_elements`.
+
+ :param rows: iterator providing lines of a RECORD (no trailing newlines).
+ """
+ reader = csv.reader(rows, delimiter=",", quotechar='"', lineterminator="\n")
+ for row_index, elements in enumerate(reader):
+ if len(elements) != 3:
+ message = "Row Index {}: expected 3 elements, got {}".format(
+ row_index, len(elements)
+ )
+ raise InvalidRecordEntry(elements=elements, issues=[message])
+
+ value = cast(Tuple[str, str, str], tuple(elements))
+ yield value
diff --git a/src/installer/scripts.py b/src/installer/scripts.py
new file mode 100644
index 0000000..7e3c8fc
--- /dev/null
+++ b/src/installer/scripts.py
@@ -0,0 +1,151 @@
+"""Generate executable scripts, on various platforms."""
+
+import io
+import shlex
+import zipfile
+from importlib.resources import read_binary
+from typing import TYPE_CHECKING, Mapping, Optional, Tuple
+
+from installer import _scripts
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ LauncherKind = Literal["posix", "win-ia32", "win-amd64", "win-arm", "win-arm64"]
+ ScriptSection = Literal["console", "gui"]
+
+
+__all__ = ["InvalidScript", "Script"]
+
+
+_ALLOWED_LAUNCHERS: Mapping[Tuple["ScriptSection", "LauncherKind"], str] = {
+ ("console", "win-ia32"): "t32.exe",
+ ("console", "win-amd64"): "t64.exe",
+ ("console", "win-arm"): "t_arm.exe",
+ ("console", "win-arm64"): "t64-arm.exe",
+ ("gui", "win-ia32"): "w32.exe",
+ ("gui", "win-amd64"): "w64.exe",
+ ("gui", "win-arm"): "w_arm.exe",
+ ("gui", "win-arm64"): "w64-arm.exe",
+}
+
+_SCRIPT_TEMPLATE = """\
+# -*- coding: utf-8 -*-
+import re
+import sys
+from {module} import {import_name}
+if __name__ == "__main__":
+ sys.argv[0] = re.sub(r"(-script\\.pyw|\\.exe)?$", "", sys.argv[0])
+ sys.exit({func_path}())
+"""
+
+
+def _is_executable_simple(executable: bytes) -> bool:
+ if b" " in executable:
+ return False
+ shebang_length = len(executable) + 3 # Prefix #! and newline after.
+ # According to distlib, Darwin can handle up to 512 characters. But I want
+ # to avoid platform sniffing to make this as platform agnostic as possible.
+ # The "complex" script isn't that bad anyway.
+ return shebang_length <= 127
+
+
+def _build_shebang(executable: str, forlauncher: bool) -> bytes:
+ """Build a shebang line.
+
+ The non-launcher cases are taken directly from distlib's implementation,
+ which tries its best to account for command length, spaces in path, etc.
+
+ https://bitbucket.org/pypa/distlib/src/58cd5c6/distlib/scripts.py#lines-124
+ """
+ executable_bytes = executable.encode("utf-8")
+ if forlauncher: # The launcher can just use the command as-is.
+ return b"#!" + executable_bytes
+ if _is_executable_simple(executable_bytes):
+ return b"#!" + executable_bytes
+
+ # Shebang support for an executable with a space in it is under-specified
+ # and platform-dependent, so we use a clever hack to generate a script to
+ # run in ``/bin/sh`` that should work on all reasonably modern platforms.
+ # Read the following message to understand how the hack works:
+ # https://github.com/pradyunsg/installer/pull/4#issuecomment-623668717
+
+ quoted = shlex.quote(executable).encode("utf-8")
+ # I don't understand a lick what this is trying to do.
+ return b"#!/bin/sh\n'''exec' " + quoted + b' "$0" "$@"\n' + b"' '''"
+
+
+class InvalidScript(ValueError):
+ """Raised if the user provides incorrect script section or kind."""
+
+
+class Script:
+ """Describes a script based on an entry point declaration."""
+
+ __slots__ = ("name", "module", "attr", "section")
+
+ def __init__(
+ self, name: str, module: str, attr: str, section: "ScriptSection"
+ ) -> None:
+ """Construct a Script object.
+
+ :param name: name of the script
+ :param module: module path, to load the entry point from
+ :param attr: final attribute access, for the entry point
+ :param section: Denotes the "entry point section" where this was specified.
+ Valid values are ``"gui"`` and ``"console"``.
+ :type section: str
+
+ """
+ self.name = name
+ self.module = module
+ self.attr = attr
+ self.section = section
+
+ def __repr__(self) -> str:
+ return "Script(name={!r}, module={!r}, attr={!r}".format(
+ self.name,
+ self.module,
+ self.attr,
+ )
+
+ def _get_launcher_data(self, kind: "LauncherKind") -> Optional[bytes]:
+ if kind == "posix":
+ return None
+ key = (self.section, kind)
+ try:
+ name = _ALLOWED_LAUNCHERS[key]
+ except KeyError:
+ error = f"{key!r} not in {sorted(_ALLOWED_LAUNCHERS)!r}"
+ raise InvalidScript(error)
+ return read_binary(_scripts, name)
+
+ def generate(self, executable: str, kind: "LauncherKind") -> Tuple[str, bytes]:
+ """Generate a launcher for this script.
+
+ :param executable: Path to the executable to invoke.
+ :param kind: Which launcher template should be used.
+ Valid values are ``"posix"``, ``"win-ia32"``, ``"win-amd64"`` and
+ ``"win-arm"``.
+ :type kind: str
+
+ :raises InvalidScript: if no appropriate template is available.
+ :return: The name and contents of the launcher file.
+ """
+ launcher = self._get_launcher_data(kind)
+ shebang = _build_shebang(executable, forlauncher=bool(launcher))
+ code = _SCRIPT_TEMPLATE.format(
+ module=self.module,
+ import_name=self.attr.split(".")[0],
+ func_path=self.attr,
+ ).encode("utf-8")
+
+ if launcher is None:
+ return (self.name, shebang + b"\n" + code)
+
+ stream = io.BytesIO()
+ with zipfile.ZipFile(stream, "w") as zf:
+ zf.writestr("__main__.py", code)
+ name = f"{self.name}.exe"
+ data = launcher + shebang + b"\n" + stream.getvalue()
+ return (name, data)
diff --git a/src/installer/sources.py b/src/installer/sources.py
new file mode 100644
index 0000000..fa0bc34
--- /dev/null
+++ b/src/installer/sources.py
@@ -0,0 +1,170 @@
+"""Source of information about a wheel file."""
+
+import os
+import posixpath
+import stat
+import zipfile
+from contextlib import contextmanager
+from typing import BinaryIO, Iterator, List, Tuple, cast
+
+from installer.records import parse_record_file
+from installer.utils import parse_wheel_filename
+
+WheelContentElement = Tuple[Tuple[str, str, str], BinaryIO, bool]
+
+
+__all__ = ["WheelSource", "WheelFile"]
+
+
+class WheelSource:
+ """Represents an installable wheel.
+
+ This is an abstract class, whose methods have to be implemented by subclasses.
+ """
+
+ def __init__(self, distribution: str, version: str) -> None:
+ """Initialize a WheelSource object.
+
+ :param distribution: distribution name (like ``urllib3``)
+ :param version: version associated with the wheel
+ """
+ super().__init__()
+ self.distribution = distribution
+ self.version = version
+
+ @property
+ def dist_info_dir(self):
+ """Name of the dist-info directory."""
+ return f"{self.distribution}-{self.version}.dist-info"
+
+ @property
+ def data_dir(self):
+ """Name of the data directory."""
+ return f"{self.distribution}-{self.version}.data"
+
+ @property
+ def dist_info_filenames(self) -> List[str]:
+ """Get names of all files in the dist-info directory.
+
+ Sample usage/behaviour::
+
+ >>> wheel_source.dist_info_filenames
+ ['METADATA', 'WHEEL']
+ """
+ raise NotImplementedError
+
+ def read_dist_info(self, filename: str) -> str:
+ """Get contents, from ``filename`` in the dist-info directory.
+
+ Sample usage/behaviour::
+
+ >>> wheel_source.read_dist_info("METADATA")
+ ...
+
+ :param filename: name of the file
+ """
+ raise NotImplementedError
+
+ def get_contents(self) -> Iterator[WheelContentElement]:
+ """Sequential access to all contents of the wheel (including dist-info files).
+
+ This method should return an iterable. Each value from the iterable must be a
+ tuple containing 3 elements:
+
+ - record: 3-value tuple, to pass to
+ :py:meth:`RecordEntry.from_elements <installer.records.RecordEntry.from_elements>`.
+ - stream: An :py:class:`io.BufferedReader` object, providing the contents of the
+ file at the location provided by the first element (path).
+ - is_executable: A boolean, representing whether the item has an executable bit.
+
+ All paths must be relative to the root of the wheel.
+
+ Sample usage/behaviour::
+
+ >>> iterable = wheel_source.get_contents()
+ >>> next(iterable)
+ (('pkg/__init__.py', '', '0'), <...>, False)
+
+ This method may be called multiple times. Each iterable returned must
+ provide the same content upon reading from a specific file's stream.
+ """
+ raise NotImplementedError
+
+
+class WheelFile(WheelSource):
+ """Implements `WheelSource`, for an existing file from the filesystem.
+
+ Example usage::
+
+ >>> with WheelFile.open("sampleproject-2.0.0-py3-none-any.whl") as source:
+ ... installer.install(source, destination)
+ """
+
+ def __init__(self, f: zipfile.ZipFile) -> None:
+ """Initialize a WheelFile object.
+
+ :param f: An open zipfile, which will stay open as long as this object is used.
+ """
+ self._zipfile = f
+ assert f.filename
+
+ basename = os.path.basename(f.filename)
+ parsed_name = parse_wheel_filename(basename)
+ super().__init__(
+ version=parsed_name.version,
+ distribution=parsed_name.distribution,
+ )
+
+ @classmethod
+ @contextmanager
+ def open(cls, path: "os.PathLike[str]") -> Iterator["WheelFile"]:
+ """Create a wheelfile from a given path."""
+ with zipfile.ZipFile(path) as f:
+ yield cls(f)
+
+ @property
+ def dist_info_filenames(self) -> List[str]:
+ """Get names of all files in the dist-info directory."""
+ base = self.dist_info_dir
+ return [
+ name[len(base) + 1 :]
+ for name in self._zipfile.namelist()
+ if name[-1:] != "/"
+ if base == posixpath.commonprefix([name, base])
+ ]
+
+ def read_dist_info(self, filename: str) -> str:
+ """Get contents, from ``filename`` in the dist-info directory."""
+ path = posixpath.join(self.dist_info_dir, filename)
+ return self._zipfile.read(path).decode("utf-8")
+
+ def get_contents(self) -> Iterator[WheelContentElement]:
+ """Sequential access to all contents of the wheel (including dist-info files).
+
+ This implementation requires that every file that is a part of the wheel
+ archive has a corresponding entry in RECORD. If they are not, an
+ :any:`AssertionError` will be raised.
+ """
+ # Convert the record file into a useful mapping
+ record_lines = self.read_dist_info("RECORD").splitlines()
+ records = parse_record_file(record_lines)
+ record_mapping = {record[0]: record for record in records}
+
+ for item in self._zipfile.infolist():
+ if item.filename[-1:] == "/": # looks like a directory
+ continue
+
+ record = record_mapping.pop(item.filename, None)
+ assert record is not None, "In {}, {} is not mentioned in RECORD".format(
+ self._zipfile.filename,
+ item.filename,
+ ) # should not happen for valid wheels
+
+ # Borrowed from:
+ # https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L96-L100
+ mode = item.external_attr >> 16
+ is_executable = bool(mode and stat.S_ISREG(mode) and mode & 0o111)
+
+ with self._zipfile.open(item) as stream:
+ stream_casted = cast("BinaryIO", stream)
+ yield record, stream_casted, is_executable
diff --git a/src/installer/utils.py b/src/installer/utils.py
new file mode 100644
index 0000000..7b1404d
--- /dev/null
+++ b/src/installer/utils.py
@@ -0,0 +1,252 @@
+"""Utilities related to handling / interacting with wheel files."""
+
+import base64
+import contextlib
+import csv
+import hashlib
+import io
+import os
+import re
+import sys
+from collections import namedtuple
+from configparser import ConfigParser
+from email.message import Message
+from email.parser import FeedParser
+from typing import (
+ TYPE_CHECKING,
+ BinaryIO,
+ Callable,
+ Iterable,
+ Iterator,
+ NewType,
+ Optional,
+ Tuple,
+ Union,
+ cast,
+)
+
+from installer.records import RecordEntry
+
+if TYPE_CHECKING:
+ from installer.scripts import LauncherKind, ScriptSection
+
+Scheme = NewType("Scheme", str)
+AllSchemes = Tuple[Scheme, ...]
+
+__all__ = [
+ "parse_metadata_file",
+ "parse_wheel_filename",
+ "copyfileobj_with_hashing",
+ "get_launcher_kind",
+ "fix_shebang",
+ "construct_record_file",
+ "parse_entrypoints",
+ "make_file_executable",
+ "WheelFilename",
+ "SCHEME_NAMES",
+]
+
+# Borrowed from https://github.com/python/cpython/blob/v3.9.1/Lib/shutil.py#L52
+_WINDOWS = os.name == "nt"
+_COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
+
+# According to https://www.python.org/dev/peps/pep-0427/#file-name-convention
+_WHEEL_FILENAME_REGEX = re.compile(
+ r"""
+ ^
+ (?P<distribution>.+?)
+ -(?P<version>.*?)
+ (?:-(?P<build_tag>\d[^-]*?))?
+ -(?P<tag>.+?-.+?-.+?)
+ \.whl
+ $
+ """,
+ re.VERBOSE | re.UNICODE,
+)
+WheelFilename = namedtuple(
+ "WheelFilename", ["distribution", "version", "build_tag", "tag"]
+)
+
+# Adapted from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L90 # noqa
+_ENTRYPOINT_REGEX = re.compile(
+ r"""
+ (?P<module>[\w.]+)\s*
+ (:\s*(?P<attrs>[\w.]+))\s*
+ (?P<extras>\[.*\])?\s*$
+ """,
+ re.VERBOSE | re.UNICODE,
+)
+
+# According to https://www.python.org/dev/peps/pep-0427/#id7
+SCHEME_NAMES = cast(AllSchemes, ("purelib", "platlib", "headers", "scripts", "data"))
+
+
+def parse_metadata_file(contents: str) -> Message:
+ """Parse :pep:`376` ``PKG-INFO``-style metadata files.
+
+ ``METADATA`` and ``WHEEL`` files (as per :pep:`427`) use the same syntax
+ and can also be parsed using this function.
+
+ :param contents: The entire contents of the file
+ """
+ feed_parser = FeedParser()
+ feed_parser.feed(contents)
+ return feed_parser.close()
+
+
+def parse_wheel_filename(filename: str) -> WheelFilename:
+ """Parse a wheel filename, into it's various components.
+
+ :param filename: The filename to parse
+ """
+ wheel_info = _WHEEL_FILENAME_REGEX.match(filename)
+ if not wheel_info:
+ raise ValueError(f"Not a valid wheel filename: {filename}")
+ return WheelFilename(*wheel_info.groups())
+
+
+def copyfileobj_with_hashing(
+ source: BinaryIO,
+ dest: BinaryIO,
+ hash_algorithm: str,
+) -> Tuple[str, int]:
+ """Copy a buffer while computing the content's hash and size.
+
+ Copies the source buffer into the destination buffer while computing the
+ hash of the contents. Adapted from :any:`shutil.copyfileobj`.
+
+ :param source: buffer holding the source data
+ :param dest: destination buffer
+ :param hash_algorithm: hashing algorithm
+
+ :return: size, hash digest of the contents
+ """
+ hasher = hashlib.new(hash_algorithm)
+ size = 0
+ while True:
+ buf = source.read(_COPY_BUFSIZE)
+ if not buf:
+ break
+ hasher.update(buf)
+ dest.write(buf)
+ size += len(buf)
+
+ return base64.urlsafe_b64encode(hasher.digest()).decode("ascii").rstrip("="), size
+
+
+def get_launcher_kind() -> "LauncherKind": # pragma: no cover
+ """Get the launcher kind for the current machine."""
+ if os.name != "nt":
+ return "posix"
+
+ if "amd64" in sys.version.lower():
+ return "win-amd64"
+ if "(arm64)" in sys.version.lower():
+ return "win-arm64"
+ if "(arm)" in sys.version.lower():
+ return "win-arm"
+ if sys.platform == "win32":
+ return "win-ia32"
+
+ raise NotImplementedError("Unknown launcher kind for this machine")
+
+
+@contextlib.contextmanager
+def fix_shebang(stream: BinaryIO, interpreter: str) -> Iterator[BinaryIO]:
+ """Replace ``#!python`` shebang in a stream with the correct interpreter.
+
+ :param stream: stream to modify
+ :param interpreter: "correct interpreter" to substitute the shebang with
+
+ :returns: A context manager, that provides an appropriately modified stream.
+ """
+ stream.seek(0)
+ if stream.read(8) == b"#!python":
+ new_stream = io.BytesIO()
+ # write our new shebang
+ new_stream.write(f"#!{interpreter}\n".encode())
+ # copy the rest of the stream
+ stream.seek(0)
+ stream.readline() # skip first line
+ while True:
+ buf = stream.read(_COPY_BUFSIZE)
+ if not buf:
+ break
+ new_stream.write(buf)
+ new_stream.seek(0)
+ yield new_stream
+ new_stream.close()
+ else:
+ stream.seek(0)
+ yield stream
+
+
+def construct_record_file(
+ records: Iterable[Tuple[Scheme, RecordEntry]],
+ prefix_for_scheme: Callable[[Scheme], Optional[str]] = lambda _: None,
+) -> BinaryIO:
+ """Construct a RECORD file.
+
+ :param records:
+ ``records`` as passed into :any:`WheelDestination.finalize_installation`
+ :param prefix_for_scheme:
+ function to get a prefix to add for RECORD entries, within a scheme
+
+ :return: A stream that can be written to file. Must be closed by the caller.
+ """
+ stream = io.TextIOWrapper(
+ io.BytesIO(), encoding="utf-8", write_through=True, newline=""
+ )
+ writer = csv.writer(stream, delimiter=",", quotechar='"', lineterminator="\n")
+ for scheme, record in records:
+ writer.writerow(record.to_row(prefix_for_scheme(scheme)))
+ stream.seek(0)
+ return stream.detach()
+
+
+def parse_entrypoints(text: str) -> Iterable[Tuple[str, str, str, "ScriptSection"]]:
+ """Parse ``entry_points.txt``-style files.
+
+ :param text: entire contents of the file
+ :return:
+ name of the script, module to use, attribute to call, kind of script (cli / gui)
+ """
+ # Borrowed from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L115 # noqa
+ config = ConfigParser(delimiters="=")
+ config.optionxform = str # type: ignore
+ config.read_string(text)
+
+ for section in config.sections():
+ if section not in ["console_scripts", "gui_scripts"]:
+ continue
+
+ for name, value in config.items(section):
+ assert isinstance(name, str)
+ match = _ENTRYPOINT_REGEX.match(value)
+ assert match
+
+ module = match.group("module")
+ assert isinstance(module, str)
+
+ attrs = match.group("attrs")
+ # TODO: make this a proper error, which can be caught.
+ assert attrs is not None
+ assert isinstance(attrs, str)
+
+ script_section = cast("ScriptSection", section[: -len("_scripts")])
+
+ yield name, module, attrs, script_section
+
+
+def _current_umask() -> int:
+ """Get the current umask which involves having to set it temporarily."""
+ mask = os.umask(0)
+ os.umask(mask)
+ return mask
+
+
+# Borrowed from:
+# https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L93
+def make_file_executable(path: Union[str, "os.PathLike[str]"]) -> None:
+ """Make the file at the provided path executable."""
+ os.chmod(path, (0o777 & ~_current_umask() | 0o111))