diff options
Diffstat (limited to 'src/installer')
-rw-r--r-- | src/installer/__init__.py | 6 | ||||
-rw-r--r-- | src/installer/__main__.py | 98 | ||||
-rw-r--r-- | src/installer/_core.py | 135 | ||||
-rw-r--r-- | src/installer/_scripts/__init__.py | 1 | ||||
-rw-r--r-- | src/installer/destinations.py | 284 | ||||
-rw-r--r-- | src/installer/exceptions.py | 9 | ||||
-rw-r--r-- | src/installer/py.typed | 0 | ||||
-rw-r--r-- | src/installer/records.py | 217 | ||||
-rw-r--r-- | src/installer/scripts.py | 151 | ||||
-rw-r--r-- | src/installer/sources.py | 170 | ||||
-rw-r--r-- | src/installer/utils.py | 252 |
11 files changed, 1323 insertions, 0 deletions
diff --git a/src/installer/__init__.py b/src/installer/__init__.py new file mode 100644 index 0000000..aa8e244 --- /dev/null +++ b/src/installer/__init__.py @@ -0,0 +1,6 @@ +"""A library for installing Python wheels.""" + +__version__ = "0.6.0" +__all__ = ["install"] + +from installer._core import install # noqa diff --git a/src/installer/__main__.py b/src/installer/__main__.py new file mode 100644 index 0000000..51014b9 --- /dev/null +++ b/src/installer/__main__.py @@ -0,0 +1,98 @@ +"""Installer CLI.""" + +import argparse +import os.path +import sys +import sysconfig +from typing import Dict, Optional, Sequence + +import installer +from installer.destinations import SchemeDictionaryDestination +from installer.sources import WheelFile +from installer.utils import get_launcher_kind + + +def _get_main_parser() -> argparse.ArgumentParser: + """Construct the main parser.""" + parser = argparse.ArgumentParser() + parser.add_argument("wheel", type=str, help="wheel file to install") + parser.add_argument( + "--destdir", + "-d", + metavar="path", + type=str, + help="destination directory (prefix to prepend to each file)", + ) + parser.add_argument( + "--prefix", + "-p", + metavar="path", + type=str, + help="override prefix to install packages to", + ) + parser.add_argument( + "--compile-bytecode", + action="append", + metavar="level", + type=int, + choices=[0, 1, 2], + help="generate bytecode for the specified optimization level(s) (default=0, 1)", + ) + parser.add_argument( + "--no-compile-bytecode", + action="store_true", + help="don't generate bytecode for installed modules", + ) + return parser + + +def _get_scheme_dict( + distribution_name: str, prefix: Optional[str] = None +) -> Dict[str, str]: + """Calculate the scheme dictionary for the current Python environment.""" + vars = {} + if prefix is None: + installed_base = sysconfig.get_config_var("base") + assert installed_base + else: + vars["base"] = vars["platbase"] = installed_base = prefix + + scheme_dict = sysconfig.get_paths(vars=vars) + + # calculate 'headers' path, not currently in sysconfig - see + # https://bugs.python.org/issue44445. This is based on what distutils does. + # TODO: figure out original vs normalised distribution names + scheme_dict["headers"] = os.path.join( + sysconfig.get_path("include", vars={"installed_base": installed_base}), + distribution_name, + ) + + return scheme_dict + + +def _main(cli_args: Sequence[str], program: Optional[str] = None) -> None: + """Process arguments and perform the install.""" + parser = _get_main_parser() + if program: + parser.prog = program + args = parser.parse_args(cli_args) + + bytecode_levels = args.compile_bytecode + if args.no_compile_bytecode: + bytecode_levels = [] + elif not bytecode_levels: + bytecode_levels = [0, 1] + + with WheelFile.open(args.wheel) as source: + destination = SchemeDictionaryDestination( + scheme_dict=_get_scheme_dict(source.distribution, prefix=args.prefix), + interpreter=sys.executable, + script_kind=get_launcher_kind(), + bytecode_optimization_levels=bytecode_levels, + destdir=args.destdir, + ) + installer.install(source, destination, {}) + + +if __name__ == "__main__": # pragma: no cover + _main(sys.argv[1:], "python -m installer") diff --git a/src/installer/_core.py b/src/installer/_core.py new file mode 100644 index 0000000..9a02728 --- /dev/null +++ b/src/installer/_core.py @@ -0,0 +1,135 @@ +"""Core wheel installation logic.""" + +import posixpath +from io import BytesIO +from typing import Dict, Tuple, cast + +from installer.destinations import WheelDestination +from installer.exceptions import InvalidWheelSource +from installer.records import RecordEntry +from installer.sources import WheelSource +from installer.utils import SCHEME_NAMES, Scheme, parse_entrypoints, parse_metadata_file + +__all__ = ["install"] + + +def _process_WHEEL_file(source: WheelSource) -> Scheme: + """Process the WHEEL file, from ``source``. + + Returns the scheme that the archive root should go in. + """ + stream = source.read_dist_info("WHEEL") + metadata = parse_metadata_file(stream) + + # Ensure compatibility with this wheel version. + if not (metadata["Wheel-Version"] and metadata["Wheel-Version"].startswith("1.")): + message = "Incompatible Wheel-Version {}, only support version 1.x wheels." + raise InvalidWheelSource(source, message.format(metadata["Wheel-Version"])) + + # Determine where archive root should go. + if metadata["Root-Is-Purelib"] == "true": + return cast(Scheme, "purelib") + else: + return cast(Scheme, "platlib") + + +def _determine_scheme( + path: str, source: WheelSource, root_scheme: Scheme +) -> Tuple[Scheme, str]: + """Determine which scheme to place given path in, from source.""" + data_dir = source.data_dir + + # If it's in not `{distribution}-{version}.data`, then it's in root_scheme. + if posixpath.commonprefix([data_dir, path]) != data_dir: + return root_scheme, path + + # Figure out which scheme this goes to. + parts = [] + scheme_name = None + left = path + while True: + left, right = posixpath.split(left) + parts.append(right) + if left == source.data_dir: + scheme_name = right + break + + if scheme_name not in SCHEME_NAMES: + msg_fmt = "{path} is not contained in a valid .data subdirectory." + raise InvalidWheelSource(source, msg_fmt.format(path=path)) + + return cast(Scheme, scheme_name), posixpath.join(*reversed(parts[:-1])) + + +def install( + source: WheelSource, + destination: WheelDestination, + additional_metadata: Dict[str, bytes], +) -> None: + """Install wheel described by ``source`` into ``destination``. + + :param source: wheel to install. + :param destination: where to write the wheel. + :param additional_metadata: additional metadata files to generate, usually + generated by the caller. + + """ + root_scheme = _process_WHEEL_file(source) + + # RECORD handling + record_file_path = posixpath.join(source.dist_info_dir, "RECORD") + written_records = [] + + # Write the entry_points based scripts. + if "entry_points.txt" in source.dist_info_filenames: + entrypoints_text = source.read_dist_info("entry_points.txt") + for name, module, attr, section in parse_entrypoints(entrypoints_text): + record = destination.write_script( + name=name, + module=module, + attr=attr, + section=section, + ) + written_records.append((Scheme("scripts"), record)) + + # Write all the files from the wheel. + for record_elements, stream, is_executable in source.get_contents(): + source_record = RecordEntry.from_elements(*record_elements) + path = source_record.path + # Skip the RECORD, which is written at the end, based on this info. + if path == record_file_path: + continue + + # Figure out where to write this file. + scheme, destination_path = _determine_scheme( + path=path, + source=source, + root_scheme=root_scheme, + ) + record = destination.write_file( + scheme=scheme, + path=destination_path, + stream=stream, + is_executable=is_executable, + ) + written_records.append((scheme, record)) + + # Write all the installation-specific metadata + for filename, contents in additional_metadata.items(): + path = posixpath.join(source.dist_info_dir, filename) + + with BytesIO(contents) as other_stream: + record = destination.write_file( + scheme=root_scheme, + path=path, + stream=other_stream, + is_executable=False, + ) + written_records.append((root_scheme, record)) + + written_records.append((root_scheme, RecordEntry(record_file_path, None, None))) + destination.finalize_installation( + scheme=root_scheme, + record_file_path=record_file_path, + records=written_records, + ) diff --git a/src/installer/_scripts/__init__.py b/src/installer/_scripts/__init__.py new file mode 100644 index 0000000..0361a58 --- /dev/null +++ b/src/installer/_scripts/__init__.py @@ -0,0 +1 @@ +"""Internal package, containing launcher templates for ``installer.scripts``.""" diff --git a/src/installer/destinations.py b/src/installer/destinations.py new file mode 100644 index 0000000..a3c1967 --- /dev/null +++ b/src/installer/destinations.py @@ -0,0 +1,284 @@ +"""Handles all file writing and post-installation processing.""" + +import compileall +import io +import os +from pathlib import Path +from typing import ( + TYPE_CHECKING, + BinaryIO, + Collection, + Dict, + Iterable, + Optional, + Tuple, + Union, +) + +from installer.records import Hash, RecordEntry +from installer.scripts import Script +from installer.utils import ( + Scheme, + construct_record_file, + copyfileobj_with_hashing, + fix_shebang, + make_file_executable, +) + +if TYPE_CHECKING: + from installer.scripts import LauncherKind, ScriptSection + + +class WheelDestination: + """Handles writing the unpacked files, script generation and ``RECORD`` generation. + + Subclasses provide the concrete script generation logic, as well as the RECORD file + (re)writing. + """ + + def write_script( + self, name: str, module: str, attr: str, section: "ScriptSection" + ) -> RecordEntry: + """Write a script in the correct location to invoke given entry point. + + :param name: name of the script + :param module: module path, to load the entry point from + :param attr: final attribute access, for the entry point + :param section: Denotes the "entry point section" where this was specified. + Valid values are ``"gui"`` and ``"console"``. + :type section: str + + Example usage/behaviour:: + + >>> dest.write_script("pip", "pip._internal.cli", "main", "console") + + """ + raise NotImplementedError + + def write_file( + self, + scheme: Scheme, + path: Union[str, "os.PathLike[str]"], + stream: BinaryIO, + is_executable: bool, + ) -> RecordEntry: + """Write a file to correct ``path`` within the ``scheme``. + + :param scheme: scheme to write the file in (like "purelib", "platlib" etc). + :param path: path within that scheme + :param stream: contents of the file + :param is_executable: whether the file should be made executable + + The stream would be closed by the caller, after this call. + + Example usage/behaviour:: + + >>> with open("__init__.py") as stream: + ... dest.write_file("purelib", "pkg/__init__.py", stream) + + """ + raise NotImplementedError + + def finalize_installation( + self, + scheme: Scheme, + record_file_path: str, + records: Iterable[Tuple[Scheme, RecordEntry]], + ) -> None: + """Finalize installation, after all the files are written. + + Handles (re)writing of the ``RECORD`` file. + + :param scheme: scheme to write the ``RECORD`` file in + :param record_file_path: path of the ``RECORD`` file with that scheme + :param records: entries to write to the ``RECORD`` file + + Example usage/behaviour:: + + >>> dest.finalize_installation("purelib") + + """ + raise NotImplementedError + + +class SchemeDictionaryDestination(WheelDestination): + """Destination, based on a mapping of {scheme: file-system-path}.""" + + def __init__( + self, + scheme_dict: Dict[str, str], + interpreter: str, + script_kind: "LauncherKind", + hash_algorithm: str = "sha256", + bytecode_optimization_levels: Collection[int] = (), + destdir: Optional[str] = None, + ) -> None: + """Construct a ``SchemeDictionaryDestination`` object. + + :param scheme_dict: a mapping of {scheme: file-system-path} + :param interpreter: the interpreter to use for generating scripts + :param script_kind: the "kind" of launcher script to use + :param hash_algorithm: the hashing algorithm to use, which is a member + of :any:`hashlib.algorithms_available` (ideally from + :any:`hashlib.algorithms_guaranteed`). + :param bytecode_optimization_levels: Compile cached bytecode for + installed .py files with these optimization levels. The bytecode + is specific to the minor version of Python (e.g. 3.10) used to + generate it. + :param destdir: A staging directory in which to write all files. This + is expected to be the filesystem root at runtime, so embedded paths + will be written as though this was the root. + """ + self.scheme_dict = scheme_dict + self.interpreter = interpreter + self.script_kind = script_kind + self.hash_algorithm = hash_algorithm + self.bytecode_optimization_levels = bytecode_optimization_levels + self.destdir = destdir + + def _path_with_destdir(self, scheme: Scheme, path: str) -> str: + file = os.path.join(self.scheme_dict[scheme], path) + if self.destdir is not None: + file_path = Path(file) + rel_path = file_path.relative_to(file_path.anchor) + return os.path.join(self.destdir, rel_path) + return file + + def write_to_fs( + self, + scheme: Scheme, + path: str, + stream: BinaryIO, + is_executable: bool, + ) -> RecordEntry: + """Write contents of ``stream`` to the correct location on the filesystem. + + :param scheme: scheme to write the file in (like "purelib", "platlib" etc). + :param path: path within that scheme + :param stream: contents of the file + :param is_executable: whether the file should be made executable + + - Ensures that an existing file is not being overwritten. + - Hashes the written content, to determine the entry in the ``RECORD`` file. + """ + target_path = self._path_with_destdir(scheme, path) + if os.path.exists(target_path): + message = f"File already exists: {target_path}" + raise FileExistsError(message) + + parent_folder = os.path.dirname(target_path) + if not os.path.exists(parent_folder): + os.makedirs(parent_folder) + + with open(target_path, "wb") as f: + hash_, size = copyfileobj_with_hashing(stream, f, self.hash_algorithm) + + if is_executable: + make_file_executable(target_path) + + return RecordEntry(path, Hash(self.hash_algorithm, hash_), size) + + def write_file( + self, + scheme: Scheme, + path: Union[str, "os.PathLike[str]"], + stream: BinaryIO, + is_executable: bool, + ) -> RecordEntry: + """Write a file to correct ``path`` within the ``scheme``. + + :param scheme: scheme to write the file in (like "purelib", "platlib" etc). + :param path: path within that scheme + :param stream: contents of the file + :param is_executable: whether the file should be made executable + + - Changes the shebang for files in the "scripts" scheme. + - Uses :py:meth:`SchemeDictionaryDestination.write_to_fs` for the + filesystem interaction. + """ + path_ = os.fspath(path) + + if scheme == "scripts": + with fix_shebang(stream, self.interpreter) as stream_with_different_shebang: + return self.write_to_fs( + scheme, path_, stream_with_different_shebang, is_executable + ) + + return self.write_to_fs(scheme, path_, stream, is_executable) + + def write_script( + self, name: str, module: str, attr: str, section: "ScriptSection" + ) -> RecordEntry: + """Write a script to invoke an entrypoint. + + :param name: name of the script + :param module: module path, to load the entry point from + :param attr: final attribute access, for the entry point + :param section: Denotes the "entry point section" where this was specified. + Valid values are ``"gui"`` and ``"console"``. + :type section: str + + - Generates a launcher using :any:`Script.generate`. + - Writes to the "scripts" scheme. + - Uses :py:meth:`SchemeDictionaryDestination.write_to_fs` for the + filesystem interaction. + """ + script = Script(name, module, attr, section) + script_name, data = script.generate(self.interpreter, self.script_kind) + + with io.BytesIO(data) as stream: + entry = self.write_to_fs( + Scheme("scripts"), script_name, stream, is_executable=True + ) + + path = self._path_with_destdir(Scheme("scripts"), script_name) + mode = os.stat(path).st_mode + mode |= (mode & 0o444) >> 2 + os.chmod(path, mode) + + return entry + + def _compile_bytecode(self, scheme: Scheme, record: RecordEntry) -> None: + """Compile bytecode for a single .py file.""" + if scheme not in ("purelib", "platlib"): + return + + target_path = self._path_with_destdir(scheme, record.path) + dir_path_to_embed = os.path.dirname( # Without destdir + os.path.join(self.scheme_dict[scheme], record.path) + ) + for level in self.bytecode_optimization_levels: + compileall.compile_file( + target_path, optimize=level, quiet=1, ddir=dir_path_to_embed + ) + + def finalize_installation( + self, + scheme: Scheme, + record_file_path: str, + records: Iterable[Tuple[Scheme, RecordEntry]], + ) -> None: + """Finalize installation, by writing the ``RECORD`` file & compiling bytecode. + + :param scheme: scheme to write the ``RECORD`` file in + :param record_file_path: path of the ``RECORD`` file with that scheme + :param records: entries to write to the ``RECORD`` file + """ + + def prefix_for_scheme(file_scheme: str) -> Optional[str]: + if file_scheme == scheme: + return None + path = os.path.relpath( + self.scheme_dict[file_scheme], + start=self.scheme_dict[scheme], + ) + return path + "/" + + record_list = list(records) + with construct_record_file(record_list, prefix_for_scheme) as record_stream: + self.write_to_fs( + scheme, record_file_path, record_stream, is_executable=False + ) + + for scheme, record in record_list: + self._compile_bytecode(scheme, record) diff --git a/src/installer/exceptions.py b/src/installer/exceptions.py new file mode 100644 index 0000000..01f044a --- /dev/null +++ b/src/installer/exceptions.py @@ -0,0 +1,9 @@ +"""Errors raised from this package.""" + + +class InstallerError(Exception): + """All exceptions raised from this package's code.""" + + +class InvalidWheelSource(InstallerError): + """When a wheel source violates a contract, or is not supported.""" diff --git a/src/installer/py.typed b/src/installer/py.typed new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/installer/py.typed diff --git a/src/installer/records.py b/src/installer/records.py new file mode 100644 index 0000000..36c37d0 --- /dev/null +++ b/src/installer/records.py @@ -0,0 +1,217 @@ +"""Provides an object-oriented model for handling :pep:`376` RECORD files.""" + +import base64 +import csv +import hashlib +import os +from typing import Iterable, Iterator, Optional, Tuple, cast + +__all__ = [ + "Hash", + "RecordEntry", + "InvalidRecordEntry", + "parse_record_file", +] + + +class InvalidRecordEntry(Exception): + """Raised when a RecordEntry is not valid, due to improper element values or count.""" + + def __init__(self, elements, issues): # noqa: D107 + super().__init__(", ".join(issues)) + self.issues = issues + self.elements = elements + + def __repr__(self): + return "InvalidRecordEntry(elements={!r}, issues={!r})".format( + self.elements, self.issues + ) + + +class Hash: + """Represents the "hash" element of a RecordEntry.""" + + def __init__(self, name: str, value: str) -> None: + """Construct a ``Hash`` object. + + Most consumers should use :py:meth:`Hash.parse` instead, since no + validation or parsing is performed by this constructor. + + :param name: name of the hash function + :param value: hashed value + """ + self.name = name + self.value = value + + def __str__(self) -> str: + return f"{self.name}={self.value}" + + def __repr__(self) -> str: + return f"Hash(name={self.name!r}, value={self.value!r})" + + def __eq__(self, other): + if not isinstance(other, Hash): + return NotImplemented + return self.value == other.value and self.name == other.name + + def validate(self, data: bytes) -> bool: + """Validate that ``data`` matches this instance. + + :param data: Contents of the file. + :return: Whether ``data`` matches the hashed value. + """ + digest = hashlib.new(self.name, data).digest() + value = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + return self.value == value + + @classmethod + def parse(cls, h: str) -> "Hash": + """Build a Hash object, from a "name=value" string. + + This accepts a string of the format for the second element in a record, + as described in :pep:`376`. + + Typical usage:: + + Hash.parse("sha256=Y0sCextp4SQtQNU-MSs7SsdxD1W-gfKJtUlEbvZ3i-4") + + :param h: a name=value string + """ + name, value = h.split("=", 1) + return cls(name, value) + + +class RecordEntry: + """Represents a single record in a RECORD file. + + A list of :py:class:`RecordEntry` objects fully represents a RECORD file. + """ + + def __init__(self, path: str, hash_: Optional[Hash], size: Optional[int]) -> None: + r"""Construct a ``RecordEntry`` object. + + Most consumers should use :py:meth:`RecordEntry.from_elements`, since no + validation or parsing is performed by this constructor. + + :param path: file's path + :param hash\_: hash of the file's contents + :param size: file's size in bytes + """ + super().__init__() + + self.path = path + self.hash_ = hash_ + self.size = size + + def to_row(self, path_prefix: Optional[str] = None) -> Tuple[str, str, str]: + """Convert this into a 3-element tuple that can be written in a RECORD file. + + :param path_prefix: A prefix to attach to the path -- must end in `/` + :return: a (path, hash, size) row + """ + if path_prefix is not None: + assert path_prefix.endswith("/") + path = path_prefix + self.path + else: + path = self.path + + # Convert Windows paths to use / for consistency + if os.sep == "\\": + path = path.replace("\\", "/") # pragma: no cover + + return ( + path, + str(self.hash_ or ""), + str(self.size) if self.size is not None else "", + ) + + def __repr__(self) -> str: + return "RecordEntry(path={!r}, hash_={!r}, size={!r})".format( + self.path, self.hash_, self.size + ) + + def __eq__(self, other): + if not isinstance(other, RecordEntry): + return NotImplemented + return ( + self.path == other.path + and self.hash_ == other.hash_ + and self.size == other.size + ) + + def validate(self, data: bytes) -> bool: + """Validate that ``data`` matches this instance. + + :param data: Contents of the file corresponding to this instance. + :return: whether ``data`` matches hash and size. + """ + if self.size is not None and len(data) != self.size: + return False + + if self.hash_: + return self.hash_.validate(data) + + return True + + @classmethod + def from_elements(cls, path: str, hash_: str, size: str) -> "RecordEntry": + r"""Build a RecordEntry object, from values of the elements. + + Typical usage:: + + for row in parse_record_file(f): + record = RecordEntry.from_elements(row[0], row[1], row[2]) + + Meaning of each element is specified in :pep:`376`. + + :param path: first element (file's path) + :param hash\_: second element (hash of the file's contents) + :param size: third element (file's size in bytes) + :raises InvalidRecordEntry: if any element is invalid + """ + # Validate the passed values. + issues = [] + + if not path: + issues.append("`path` cannot be empty") + + if hash_: + try: + hash_value: Optional[Hash] = Hash.parse(hash_) + except ValueError: + issues.append("`hash` does not follow the required format") + else: + hash_value = None + + if size: + try: + size_value: Optional[int] = int(size) + except ValueError: + issues.append("`size` cannot be non-integer") + else: + size_value = None + + if issues: + raise InvalidRecordEntry(elements=(path, hash_, size), issues=issues) + + return cls(path=path, hash_=hash_value, size=size_value) + + +def parse_record_file(rows: Iterable[str]) -> Iterator[Tuple[str, str, str]]: + """Parse a :pep:`376` RECORD. + + Returns an iterable of 3-value tuples, that can be passed to + :any:`RecordEntry.from_elements`. + + :param rows: iterator providing lines of a RECORD (no trailing newlines). + """ + reader = csv.reader(rows, delimiter=",", quotechar='"', lineterminator="\n") + for row_index, elements in enumerate(reader): + if len(elements) != 3: + message = "Row Index {}: expected 3 elements, got {}".format( + row_index, len(elements) + ) + raise InvalidRecordEntry(elements=elements, issues=[message]) + + value = cast(Tuple[str, str, str], tuple(elements)) + yield value diff --git a/src/installer/scripts.py b/src/installer/scripts.py new file mode 100644 index 0000000..7e3c8fc --- /dev/null +++ b/src/installer/scripts.py @@ -0,0 +1,151 @@ +"""Generate executable scripts, on various platforms.""" + +import io +import shlex +import zipfile +from importlib.resources import read_binary +from typing import TYPE_CHECKING, Mapping, Optional, Tuple + +from installer import _scripts + +if TYPE_CHECKING: + from typing import Literal + + LauncherKind = Literal["posix", "win-ia32", "win-amd64", "win-arm", "win-arm64"] + ScriptSection = Literal["console", "gui"] + + +__all__ = ["InvalidScript", "Script"] + + +_ALLOWED_LAUNCHERS: Mapping[Tuple["ScriptSection", "LauncherKind"], str] = { + ("console", "win-ia32"): "t32.exe", + ("console", "win-amd64"): "t64.exe", + ("console", "win-arm"): "t_arm.exe", + ("console", "win-arm64"): "t64-arm.exe", + ("gui", "win-ia32"): "w32.exe", + ("gui", "win-amd64"): "w64.exe", + ("gui", "win-arm"): "w_arm.exe", + ("gui", "win-arm64"): "w64-arm.exe", +} + +_SCRIPT_TEMPLATE = """\ +# -*- coding: utf-8 -*- +import re +import sys +from {module} import {import_name} +if __name__ == "__main__": + sys.argv[0] = re.sub(r"(-script\\.pyw|\\.exe)?$", "", sys.argv[0]) + sys.exit({func_path}()) +""" + + +def _is_executable_simple(executable: bytes) -> bool: + if b" " in executable: + return False + shebang_length = len(executable) + 3 # Prefix #! and newline after. + # According to distlib, Darwin can handle up to 512 characters. But I want + # to avoid platform sniffing to make this as platform agnostic as possible. + # The "complex" script isn't that bad anyway. + return shebang_length <= 127 + + +def _build_shebang(executable: str, forlauncher: bool) -> bytes: + """Build a shebang line. + + The non-launcher cases are taken directly from distlib's implementation, + which tries its best to account for command length, spaces in path, etc. + + https://bitbucket.org/pypa/distlib/src/58cd5c6/distlib/scripts.py#lines-124 + """ + executable_bytes = executable.encode("utf-8") + if forlauncher: # The launcher can just use the command as-is. + return b"#!" + executable_bytes + if _is_executable_simple(executable_bytes): + return b"#!" + executable_bytes + + # Shebang support for an executable with a space in it is under-specified + # and platform-dependent, so we use a clever hack to generate a script to + # run in ``/bin/sh`` that should work on all reasonably modern platforms. + # Read the following message to understand how the hack works: + # https://github.com/pradyunsg/installer/pull/4#issuecomment-623668717 + + quoted = shlex.quote(executable).encode("utf-8") + # I don't understand a lick what this is trying to do. + return b"#!/bin/sh\n'''exec' " + quoted + b' "$0" "$@"\n' + b"' '''" + + +class InvalidScript(ValueError): + """Raised if the user provides incorrect script section or kind.""" + + +class Script: + """Describes a script based on an entry point declaration.""" + + __slots__ = ("name", "module", "attr", "section") + + def __init__( + self, name: str, module: str, attr: str, section: "ScriptSection" + ) -> None: + """Construct a Script object. + + :param name: name of the script + :param module: module path, to load the entry point from + :param attr: final attribute access, for the entry point + :param section: Denotes the "entry point section" where this was specified. + Valid values are ``"gui"`` and ``"console"``. + :type section: str + + """ + self.name = name + self.module = module + self.attr = attr + self.section = section + + def __repr__(self) -> str: + return "Script(name={!r}, module={!r}, attr={!r}".format( + self.name, + self.module, + self.attr, + ) + + def _get_launcher_data(self, kind: "LauncherKind") -> Optional[bytes]: + if kind == "posix": + return None + key = (self.section, kind) + try: + name = _ALLOWED_LAUNCHERS[key] + except KeyError: + error = f"{key!r} not in {sorted(_ALLOWED_LAUNCHERS)!r}" + raise InvalidScript(error) + return read_binary(_scripts, name) + + def generate(self, executable: str, kind: "LauncherKind") -> Tuple[str, bytes]: + """Generate a launcher for this script. + + :param executable: Path to the executable to invoke. + :param kind: Which launcher template should be used. + Valid values are ``"posix"``, ``"win-ia32"``, ``"win-amd64"`` and + ``"win-arm"``. + :type kind: str + + :raises InvalidScript: if no appropriate template is available. + :return: The name and contents of the launcher file. + """ + launcher = self._get_launcher_data(kind) + shebang = _build_shebang(executable, forlauncher=bool(launcher)) + code = _SCRIPT_TEMPLATE.format( + module=self.module, + import_name=self.attr.split(".")[0], + func_path=self.attr, + ).encode("utf-8") + + if launcher is None: + return (self.name, shebang + b"\n" + code) + + stream = io.BytesIO() + with zipfile.ZipFile(stream, "w") as zf: + zf.writestr("__main__.py", code) + name = f"{self.name}.exe" + data = launcher + shebang + b"\n" + stream.getvalue() + return (name, data) diff --git a/src/installer/sources.py b/src/installer/sources.py new file mode 100644 index 0000000..fa0bc34 --- /dev/null +++ b/src/installer/sources.py @@ -0,0 +1,170 @@ +"""Source of information about a wheel file.""" + +import os +import posixpath +import stat +import zipfile +from contextlib import contextmanager +from typing import BinaryIO, Iterator, List, Tuple, cast + +from installer.records import parse_record_file +from installer.utils import parse_wheel_filename + +WheelContentElement = Tuple[Tuple[str, str, str], BinaryIO, bool] + + +__all__ = ["WheelSource", "WheelFile"] + + +class WheelSource: + """Represents an installable wheel. + + This is an abstract class, whose methods have to be implemented by subclasses. + """ + + def __init__(self, distribution: str, version: str) -> None: + """Initialize a WheelSource object. + + :param distribution: distribution name (like ``urllib3``) + :param version: version associated with the wheel + """ + super().__init__() + self.distribution = distribution + self.version = version + + @property + def dist_info_dir(self): + """Name of the dist-info directory.""" + return f"{self.distribution}-{self.version}.dist-info" + + @property + def data_dir(self): + """Name of the data directory.""" + return f"{self.distribution}-{self.version}.data" + + @property + def dist_info_filenames(self) -> List[str]: + """Get names of all files in the dist-info directory. + + Sample usage/behaviour:: + + >>> wheel_source.dist_info_filenames + ['METADATA', 'WHEEL'] + """ + raise NotImplementedError + + def read_dist_info(self, filename: str) -> str: + """Get contents, from ``filename`` in the dist-info directory. + + Sample usage/behaviour:: + + >>> wheel_source.read_dist_info("METADATA") + ... + + :param filename: name of the file + """ + raise NotImplementedError + + def get_contents(self) -> Iterator[WheelContentElement]: + """Sequential access to all contents of the wheel (including dist-info files). + + This method should return an iterable. Each value from the iterable must be a + tuple containing 3 elements: + + - record: 3-value tuple, to pass to + :py:meth:`RecordEntry.from_elements <installer.records.RecordEntry.from_elements>`. + - stream: An :py:class:`io.BufferedReader` object, providing the contents of the + file at the location provided by the first element (path). + - is_executable: A boolean, representing whether the item has an executable bit. + + All paths must be relative to the root of the wheel. + + Sample usage/behaviour:: + + >>> iterable = wheel_source.get_contents() + >>> next(iterable) + (('pkg/__init__.py', '', '0'), <...>, False) + + This method may be called multiple times. Each iterable returned must + provide the same content upon reading from a specific file's stream. + """ + raise NotImplementedError + + +class WheelFile(WheelSource): + """Implements `WheelSource`, for an existing file from the filesystem. + + Example usage:: + + >>> with WheelFile.open("sampleproject-2.0.0-py3-none-any.whl") as source: + ... installer.install(source, destination) + """ + + def __init__(self, f: zipfile.ZipFile) -> None: + """Initialize a WheelFile object. + + :param f: An open zipfile, which will stay open as long as this object is used. + """ + self._zipfile = f + assert f.filename + + basename = os.path.basename(f.filename) + parsed_name = parse_wheel_filename(basename) + super().__init__( + version=parsed_name.version, + distribution=parsed_name.distribution, + ) + + @classmethod + @contextmanager + def open(cls, path: "os.PathLike[str]") -> Iterator["WheelFile"]: + """Create a wheelfile from a given path.""" + with zipfile.ZipFile(path) as f: + yield cls(f) + + @property + def dist_info_filenames(self) -> List[str]: + """Get names of all files in the dist-info directory.""" + base = self.dist_info_dir + return [ + name[len(base) + 1 :] + for name in self._zipfile.namelist() + if name[-1:] != "/" + if base == posixpath.commonprefix([name, base]) + ] + + def read_dist_info(self, filename: str) -> str: + """Get contents, from ``filename`` in the dist-info directory.""" + path = posixpath.join(self.dist_info_dir, filename) + return self._zipfile.read(path).decode("utf-8") + + def get_contents(self) -> Iterator[WheelContentElement]: + """Sequential access to all contents of the wheel (including dist-info files). + + This implementation requires that every file that is a part of the wheel + archive has a corresponding entry in RECORD. If they are not, an + :any:`AssertionError` will be raised. + """ + # Convert the record file into a useful mapping + record_lines = self.read_dist_info("RECORD").splitlines() + records = parse_record_file(record_lines) + record_mapping = {record[0]: record for record in records} + + for item in self._zipfile.infolist(): + if item.filename[-1:] == "/": # looks like a directory + continue + + record = record_mapping.pop(item.filename, None) + assert record is not None, "In {}, {} is not mentioned in RECORD".format( + self._zipfile.filename, + item.filename, + ) # should not happen for valid wheels + + # Borrowed from: + # https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L96-L100 + mode = item.external_attr >> 16 + is_executable = bool(mode and stat.S_ISREG(mode) and mode & 0o111) + + with self._zipfile.open(item) as stream: + stream_casted = cast("BinaryIO", stream) + yield record, stream_casted, is_executable diff --git a/src/installer/utils.py b/src/installer/utils.py new file mode 100644 index 0000000..7b1404d --- /dev/null +++ b/src/installer/utils.py @@ -0,0 +1,252 @@ +"""Utilities related to handling / interacting with wheel files.""" + +import base64 +import contextlib +import csv +import hashlib +import io +import os +import re +import sys +from collections import namedtuple +from configparser import ConfigParser +from email.message import Message +from email.parser import FeedParser +from typing import ( + TYPE_CHECKING, + BinaryIO, + Callable, + Iterable, + Iterator, + NewType, + Optional, + Tuple, + Union, + cast, +) + +from installer.records import RecordEntry + +if TYPE_CHECKING: + from installer.scripts import LauncherKind, ScriptSection + +Scheme = NewType("Scheme", str) +AllSchemes = Tuple[Scheme, ...] + +__all__ = [ + "parse_metadata_file", + "parse_wheel_filename", + "copyfileobj_with_hashing", + "get_launcher_kind", + "fix_shebang", + "construct_record_file", + "parse_entrypoints", + "make_file_executable", + "WheelFilename", + "SCHEME_NAMES", +] + +# Borrowed from https://github.com/python/cpython/blob/v3.9.1/Lib/shutil.py#L52 +_WINDOWS = os.name == "nt" +_COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 + +# According to https://www.python.org/dev/peps/pep-0427/#file-name-convention +_WHEEL_FILENAME_REGEX = re.compile( + r""" + ^ + (?P<distribution>.+?) + -(?P<version>.*?) + (?:-(?P<build_tag>\d[^-]*?))? + -(?P<tag>.+?-.+?-.+?) + \.whl + $ + """, + re.VERBOSE | re.UNICODE, +) +WheelFilename = namedtuple( + "WheelFilename", ["distribution", "version", "build_tag", "tag"] +) + +# Adapted from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L90 # noqa +_ENTRYPOINT_REGEX = re.compile( + r""" + (?P<module>[\w.]+)\s* + (:\s*(?P<attrs>[\w.]+))\s* + (?P<extras>\[.*\])?\s*$ + """, + re.VERBOSE | re.UNICODE, +) + +# According to https://www.python.org/dev/peps/pep-0427/#id7 +SCHEME_NAMES = cast(AllSchemes, ("purelib", "platlib", "headers", "scripts", "data")) + + +def parse_metadata_file(contents: str) -> Message: + """Parse :pep:`376` ``PKG-INFO``-style metadata files. + + ``METADATA`` and ``WHEEL`` files (as per :pep:`427`) use the same syntax + and can also be parsed using this function. + + :param contents: The entire contents of the file + """ + feed_parser = FeedParser() + feed_parser.feed(contents) + return feed_parser.close() + + +def parse_wheel_filename(filename: str) -> WheelFilename: + """Parse a wheel filename, into it's various components. + + :param filename: The filename to parse + """ + wheel_info = _WHEEL_FILENAME_REGEX.match(filename) + if not wheel_info: + raise ValueError(f"Not a valid wheel filename: {filename}") + return WheelFilename(*wheel_info.groups()) + + +def copyfileobj_with_hashing( + source: BinaryIO, + dest: BinaryIO, + hash_algorithm: str, +) -> Tuple[str, int]: + """Copy a buffer while computing the content's hash and size. + + Copies the source buffer into the destination buffer while computing the + hash of the contents. Adapted from :any:`shutil.copyfileobj`. + + :param source: buffer holding the source data + :param dest: destination buffer + :param hash_algorithm: hashing algorithm + + :return: size, hash digest of the contents + """ + hasher = hashlib.new(hash_algorithm) + size = 0 + while True: + buf = source.read(_COPY_BUFSIZE) + if not buf: + break + hasher.update(buf) + dest.write(buf) + size += len(buf) + + return base64.urlsafe_b64encode(hasher.digest()).decode("ascii").rstrip("="), size + + +def get_launcher_kind() -> "LauncherKind": # pragma: no cover + """Get the launcher kind for the current machine.""" + if os.name != "nt": + return "posix" + + if "amd64" in sys.version.lower(): + return "win-amd64" + if "(arm64)" in sys.version.lower(): + return "win-arm64" + if "(arm)" in sys.version.lower(): + return "win-arm" + if sys.platform == "win32": + return "win-ia32" + + raise NotImplementedError("Unknown launcher kind for this machine") + + +@contextlib.contextmanager +def fix_shebang(stream: BinaryIO, interpreter: str) -> Iterator[BinaryIO]: + """Replace ``#!python`` shebang in a stream with the correct interpreter. + + :param stream: stream to modify + :param interpreter: "correct interpreter" to substitute the shebang with + + :returns: A context manager, that provides an appropriately modified stream. + """ + stream.seek(0) + if stream.read(8) == b"#!python": + new_stream = io.BytesIO() + # write our new shebang + new_stream.write(f"#!{interpreter}\n".encode()) + # copy the rest of the stream + stream.seek(0) + stream.readline() # skip first line + while True: + buf = stream.read(_COPY_BUFSIZE) + if not buf: + break + new_stream.write(buf) + new_stream.seek(0) + yield new_stream + new_stream.close() + else: + stream.seek(0) + yield stream + + +def construct_record_file( + records: Iterable[Tuple[Scheme, RecordEntry]], + prefix_for_scheme: Callable[[Scheme], Optional[str]] = lambda _: None, +) -> BinaryIO: + """Construct a RECORD file. + + :param records: + ``records`` as passed into :any:`WheelDestination.finalize_installation` + :param prefix_for_scheme: + function to get a prefix to add for RECORD entries, within a scheme + + :return: A stream that can be written to file. Must be closed by the caller. + """ + stream = io.TextIOWrapper( + io.BytesIO(), encoding="utf-8", write_through=True, newline="" + ) + writer = csv.writer(stream, delimiter=",", quotechar='"', lineterminator="\n") + for scheme, record in records: + writer.writerow(record.to_row(prefix_for_scheme(scheme))) + stream.seek(0) + return stream.detach() + + +def parse_entrypoints(text: str) -> Iterable[Tuple[str, str, str, "ScriptSection"]]: + """Parse ``entry_points.txt``-style files. + + :param text: entire contents of the file + :return: + name of the script, module to use, attribute to call, kind of script (cli / gui) + """ + # Borrowed from https://github.com/python/importlib_metadata/blob/v3.4.0/importlib_metadata/__init__.py#L115 # noqa + config = ConfigParser(delimiters="=") + config.optionxform = str # type: ignore + config.read_string(text) + + for section in config.sections(): + if section not in ["console_scripts", "gui_scripts"]: + continue + + for name, value in config.items(section): + assert isinstance(name, str) + match = _ENTRYPOINT_REGEX.match(value) + assert match + + module = match.group("module") + assert isinstance(module, str) + + attrs = match.group("attrs") + # TODO: make this a proper error, which can be caught. + assert attrs is not None + assert isinstance(attrs, str) + + script_section = cast("ScriptSection", section[: -len("_scripts")]) + + yield name, module, attrs, script_section + + +def _current_umask() -> int: + """Get the current umask which involves having to set it temporarily.""" + mask = os.umask(0) + os.umask(mask) + return mask + + +# Borrowed from: +# https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L93 +def make_file_executable(path: Union[str, "os.PathLike[str]"]) -> None: + """Make the file at the provided path executable.""" + os.chmod(path, (0o777 & ~_current_umask() | 0o111)) |