summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/python_requirements.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/python_requirements.py')
-rw-r--r--test/lib/ansible_test/_internal/python_requirements.py570
1 files changed, 570 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/python_requirements.py b/test/lib/ansible_test/_internal/python_requirements.py
new file mode 100644
index 0000000..e3733a5
--- /dev/null
+++ b/test/lib/ansible_test/_internal/python_requirements.py
@@ -0,0 +1,570 @@
+"""Python requirements management"""
+from __future__ import annotations
+
+import base64
+import dataclasses
+import json
+import os
+import re
+import typing as t
+
+from .encoding import (
+ to_text,
+ to_bytes,
+)
+
+from .io import (
+ read_text_file,
+)
+
+from .util import (
+ ANSIBLE_TEST_DATA_ROOT,
+ ANSIBLE_TEST_TARGET_ROOT,
+ ANSIBLE_TEST_TOOLS_ROOT,
+ ApplicationError,
+ SubprocessError,
+ display,
+ find_executable,
+ raw_command,
+ str_to_version,
+ version_to_str,
+)
+
+from .util_common import (
+ check_pyyaml,
+ create_result_directories,
+)
+
+from .config import (
+ EnvironmentConfig,
+ IntegrationConfig,
+ UnitsConfig,
+)
+
+from .data import (
+ data_context,
+)
+
+from .host_configs import (
+ PosixConfig,
+ PythonConfig,
+)
+
+from .connections import (
+ LocalConnection,
+ Connection,
+)
+
+from .coverage_util import (
+ get_coverage_version,
+)
+
+QUIET_PIP_SCRIPT_PATH = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'quiet_pip.py')
+REQUIREMENTS_SCRIPT_PATH = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'requirements.py')
+
+# IMPORTANT: Keep this in sync with the ansible-test.txt requirements file.
+VIRTUALENV_VERSION = '16.7.12'
+
+# Pip Abstraction
+
+
+class PipUnavailableError(ApplicationError):
+ """Exception raised when pip is not available."""
+ def __init__(self, python: PythonConfig) -> None:
+ super().__init__(f'Python {python.version} at "{python.path}" does not have pip available.')
+
+
+@dataclasses.dataclass(frozen=True)
+class PipCommand:
+ """Base class for pip commands."""""
+
+ def serialize(self) -> tuple[str, dict[str, t.Any]]:
+ """Return a serialized representation of this command."""
+ name = type(self).__name__[3:].lower()
+ return name, self.__dict__
+
+
+@dataclasses.dataclass(frozen=True)
+class PipInstall(PipCommand):
+ """Details required to perform a pip install."""
+ requirements: list[tuple[str, str]]
+ constraints: list[tuple[str, str]]
+ packages: list[str]
+
+ def has_package(self, name: str) -> bool:
+ """Return True if the specified package will be installed, otherwise False."""
+ name = name.lower()
+
+ return (any(name in package.lower() for package in self.packages) or
+ any(name in contents.lower() for path, contents in self.requirements))
+
+
+@dataclasses.dataclass(frozen=True)
+class PipUninstall(PipCommand):
+ """Details required to perform a pip uninstall."""
+ packages: list[str]
+ ignore_errors: bool
+
+
+@dataclasses.dataclass(frozen=True)
+class PipVersion(PipCommand):
+ """Details required to get the pip version."""
+
+
+@dataclasses.dataclass(frozen=True)
+class PipBootstrap(PipCommand):
+ """Details required to bootstrap pip."""
+ pip_version: str
+ packages: list[str]
+
+
+# Entry Points
+
+
+def install_requirements(
+ args: EnvironmentConfig,
+ python: PythonConfig,
+ ansible: bool = False,
+ command: bool = False,
+ coverage: bool = False,
+ virtualenv: bool = False,
+ controller: bool = True,
+ connection: t.Optional[Connection] = None,
+) -> None:
+ """Install requirements for the given Python using the specified arguments."""
+ create_result_directories(args)
+
+ if not requirements_allowed(args, controller):
+ return
+
+ if command and isinstance(args, (UnitsConfig, IntegrationConfig)) and args.coverage:
+ coverage = True
+
+ cryptography = False
+
+ if ansible:
+ try:
+ ansible_cache = install_requirements.ansible_cache # type: ignore[attr-defined]
+ except AttributeError:
+ ansible_cache = install_requirements.ansible_cache = {} # type: ignore[attr-defined]
+
+ ansible_installed = ansible_cache.get(python.path)
+
+ if ansible_installed:
+ ansible = False
+ else:
+ ansible_cache[python.path] = True
+
+ # Install the latest cryptography version that the current requirements can support if it is not already available.
+ # This avoids downgrading cryptography when OS packages provide a newer version than we are able to install using pip.
+ # If not installed here, later install commands may try to install a version of cryptography which cannot be installed.
+ cryptography = not is_cryptography_available(python.path)
+
+ commands = collect_requirements(
+ python=python,
+ controller=controller,
+ ansible=ansible,
+ cryptography=cryptography,
+ command=args.command if command else None,
+ coverage=coverage,
+ virtualenv=virtualenv,
+ minimize=False,
+ sanity=None,
+ )
+
+ if not commands:
+ return
+
+ run_pip(args, python, commands, connection)
+
+ # false positive: pylint: disable=no-member
+ if any(isinstance(command, PipInstall) and command.has_package('pyyaml') for command in commands):
+ check_pyyaml(python)
+
+
+def collect_bootstrap(python: PythonConfig) -> list[PipCommand]:
+ """Return the details necessary to bootstrap pip into an empty virtual environment."""
+ infrastructure_packages = get_venv_packages(python)
+ pip_version = infrastructure_packages['pip']
+ packages = [f'{name}=={version}' for name, version in infrastructure_packages.items()]
+
+ bootstrap = PipBootstrap(
+ pip_version=pip_version,
+ packages=packages,
+ )
+
+ return [bootstrap]
+
+
+def collect_requirements(
+ python: PythonConfig,
+ controller: bool,
+ ansible: bool,
+ cryptography: bool,
+ coverage: bool,
+ virtualenv: bool,
+ minimize: bool,
+ command: t.Optional[str],
+ sanity: t.Optional[str],
+) -> list[PipCommand]:
+ """Collect requirements for the given Python using the specified arguments."""
+ commands: list[PipCommand] = []
+
+ if virtualenv:
+ # sanity tests on Python 2.x install virtualenv when it is too old or is not already installed and the `--requirements` option is given
+ # the last version of virtualenv with no dependencies is used to minimize the changes made outside a virtual environment
+ commands.extend(collect_package_install(packages=[f'virtualenv=={VIRTUALENV_VERSION}'], constraints=False))
+
+ if coverage:
+ commands.extend(collect_package_install(packages=[f'coverage=={get_coverage_version(python.version).coverage_version}'], constraints=False))
+
+ if cryptography:
+ commands.extend(collect_package_install(packages=get_cryptography_requirements(python)))
+
+ if ansible or command:
+ commands.extend(collect_general_install(command, ansible))
+
+ if sanity:
+ commands.extend(collect_sanity_install(sanity))
+
+ if command == 'units':
+ commands.extend(collect_units_install())
+
+ if command in ('integration', 'windows-integration', 'network-integration'):
+ commands.extend(collect_integration_install(command, controller))
+
+ if (sanity or minimize) and any(isinstance(command, PipInstall) for command in commands):
+ # bootstrap the managed virtual environment, which will have been created without any installed packages
+ # sanity tests which install no packages skip this step
+ commands = collect_bootstrap(python) + commands
+
+ # most infrastructure packages can be removed from sanity test virtual environments after they've been created
+ # removing them reduces the size of environments cached in containers
+ uninstall_packages = list(get_venv_packages(python))
+
+ if not minimize:
+ # installed packages may have run-time dependencies on setuptools
+ uninstall_packages.remove('setuptools')
+
+ commands.extend(collect_uninstall(packages=uninstall_packages))
+
+ return commands
+
+
+def run_pip(
+ args: EnvironmentConfig,
+ python: PythonConfig,
+ commands: list[PipCommand],
+ connection: t.Optional[Connection],
+) -> None:
+ """Run the specified pip commands for the given Python, and optionally the specified host."""
+ connection = connection or LocalConnection(args)
+ script = prepare_pip_script(commands)
+
+ if not args.explain:
+ try:
+ connection.run([python.path], data=script, capture=False)
+ except SubprocessError:
+ script = prepare_pip_script([PipVersion()])
+
+ try:
+ connection.run([python.path], data=script, capture=True)
+ except SubprocessError as ex:
+ if 'pip is unavailable:' in ex.stdout + ex.stderr:
+ raise PipUnavailableError(python)
+
+ raise
+
+
+# Collect
+
+
+def collect_general_install(
+ command: t.Optional[str] = None,
+ ansible: bool = False,
+) -> list[PipInstall]:
+ """Return details necessary for the specified general-purpose pip install(s)."""
+ requirements_paths: list[tuple[str, str]] = []
+ constraints_paths: list[tuple[str, str]] = []
+
+ if ansible:
+ path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', 'ansible.txt')
+ requirements_paths.append((ANSIBLE_TEST_DATA_ROOT, path))
+
+ if command:
+ path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', f'{command}.txt')
+ requirements_paths.append((ANSIBLE_TEST_DATA_ROOT, path))
+
+ return collect_install(requirements_paths, constraints_paths)
+
+
+def collect_package_install(packages: list[str], constraints: bool = True) -> list[PipInstall]:
+ """Return the details necessary to install the specified packages."""
+ return collect_install([], [], packages, constraints=constraints)
+
+
+def collect_sanity_install(sanity: str) -> list[PipInstall]:
+ """Return the details necessary for the specified sanity pip install(s)."""
+ requirements_paths: list[tuple[str, str]] = []
+ constraints_paths: list[tuple[str, str]] = []
+
+ path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', f'sanity.{sanity}.txt')
+ requirements_paths.append((ANSIBLE_TEST_DATA_ROOT, path))
+
+ if data_context().content.is_ansible:
+ path = os.path.join(data_context().content.sanity_path, 'code-smell', f'{sanity}.requirements.txt')
+ requirements_paths.append((data_context().content.root, path))
+
+ return collect_install(requirements_paths, constraints_paths, constraints=False)
+
+
+def collect_units_install() -> list[PipInstall]:
+ """Return details necessary for the specified units pip install(s)."""
+ requirements_paths: list[tuple[str, str]] = []
+ constraints_paths: list[tuple[str, str]] = []
+
+ path = os.path.join(data_context().content.unit_path, 'requirements.txt')
+ requirements_paths.append((data_context().content.root, path))
+
+ path = os.path.join(data_context().content.unit_path, 'constraints.txt')
+ constraints_paths.append((data_context().content.root, path))
+
+ return collect_install(requirements_paths, constraints_paths)
+
+
+def collect_integration_install(command: str, controller: bool) -> list[PipInstall]:
+ """Return details necessary for the specified integration pip install(s)."""
+ requirements_paths: list[tuple[str, str]] = []
+ constraints_paths: list[tuple[str, str]] = []
+
+ # Support for prefixed files was added to ansible-test in ansible-core 2.12 when split controller/target testing was implemented.
+ # Previous versions of ansible-test only recognize non-prefixed files.
+ # If a prefixed file exists (even if empty), it takes precedence over the non-prefixed file.
+ prefixes = ('controller.' if controller else 'target.', '')
+
+ for prefix in prefixes:
+ path = os.path.join(data_context().content.integration_path, f'{prefix}requirements.txt')
+
+ if os.path.exists(path):
+ requirements_paths.append((data_context().content.root, path))
+ break
+
+ for prefix in prefixes:
+ path = os.path.join(data_context().content.integration_path, f'{command}.{prefix}requirements.txt')
+
+ if os.path.exists(path):
+ requirements_paths.append((data_context().content.root, path))
+ break
+
+ for prefix in prefixes:
+ path = os.path.join(data_context().content.integration_path, f'{prefix}constraints.txt')
+
+ if os.path.exists(path):
+ constraints_paths.append((data_context().content.root, path))
+ break
+
+ return collect_install(requirements_paths, constraints_paths)
+
+
+def collect_install(
+ requirements_paths: list[tuple[str, str]],
+ constraints_paths: list[tuple[str, str]],
+ packages: t.Optional[list[str]] = None,
+ constraints: bool = True,
+) -> list[PipInstall]:
+ """Build a pip install list from the given requirements, constraints and packages."""
+ # listing content constraints first gives them priority over constraints provided by ansible-test
+ constraints_paths = list(constraints_paths)
+
+ if constraints:
+ constraints_paths.append((ANSIBLE_TEST_DATA_ROOT, os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', 'constraints.txt')))
+
+ requirements = [(os.path.relpath(path, root), read_text_file(path)) for root, path in requirements_paths if usable_pip_file(path)]
+ constraints = [(os.path.relpath(path, root), read_text_file(path)) for root, path in constraints_paths if usable_pip_file(path)]
+ packages = packages or []
+
+ if requirements or packages:
+ installs = [PipInstall(
+ requirements=requirements,
+ constraints=constraints,
+ packages=packages,
+ )]
+ else:
+ installs = []
+
+ return installs
+
+
+def collect_uninstall(packages: list[str], ignore_errors: bool = False) -> list[PipUninstall]:
+ """Return the details necessary for the specified pip uninstall."""
+ uninstall = PipUninstall(
+ packages=packages,
+ ignore_errors=ignore_errors,
+ )
+
+ return [uninstall]
+
+
+# Support
+
+
+def get_venv_packages(python: PythonConfig) -> dict[str, str]:
+ """Return a dictionary of Python packages needed for a consistent virtual environment specific to the given Python version."""
+
+ # NOTE: This same information is needed for building the base-test-container image.
+ # See: https://github.com/ansible/base-test-container/blob/main/files/installer.py
+
+ default_packages = dict(
+ pip='21.3.1',
+ setuptools='60.8.2',
+ wheel='0.37.1',
+ )
+
+ override_packages = {
+ '2.7': dict(
+ pip='20.3.4', # 21.0 requires Python 3.6+
+ setuptools='44.1.1', # 45.0.0 requires Python 3.5+
+ wheel=None,
+ ),
+ '3.5': dict(
+ pip='20.3.4', # 21.0 requires Python 3.6+
+ setuptools='50.3.2', # 51.0.0 requires Python 3.6+
+ wheel=None,
+ ),
+ '3.6': dict(
+ pip='21.3.1', # 22.0 requires Python 3.7+
+ setuptools='59.6.0', # 59.7.0 requires Python 3.7+
+ wheel=None,
+ ),
+ }
+
+ packages = {name: version or default_packages[name] for name, version in override_packages.get(python.version, default_packages).items()}
+
+ return packages
+
+
+def requirements_allowed(args: EnvironmentConfig, controller: bool) -> bool:
+ """
+ Return True if requirements can be installed, otherwise return False.
+
+ Requirements are only allowed if one of the following conditions is met:
+
+ The user specified --requirements manually.
+ The install will occur on the controller and the controller or controller Python is managed by ansible-test.
+ The install will occur on the target and the target or target Python is managed by ansible-test.
+ """
+ if args.requirements:
+ return True
+
+ if controller:
+ return args.controller.is_managed or args.controller.python.is_managed
+
+ target = args.only_targets(PosixConfig)[0]
+
+ return target.is_managed or target.python.is_managed
+
+
+def prepare_pip_script(commands: list[PipCommand]) -> str:
+ """Generate a Python script to perform the requested pip commands."""
+ data = [command.serialize() for command in commands]
+
+ display.info(f'>>> Requirements Commands\n{json.dumps(data, indent=4)}', verbosity=3)
+
+ args = dict(
+ script=read_text_file(QUIET_PIP_SCRIPT_PATH),
+ verbosity=display.verbosity,
+ commands=data,
+ )
+
+ payload = to_text(base64.b64encode(to_bytes(json.dumps(args))))
+ path = REQUIREMENTS_SCRIPT_PATH
+ template = read_text_file(path)
+ script = template.format(payload=payload)
+
+ display.info(f'>>> Python Script from Template ({path})\n{script.strip()}', verbosity=4)
+
+ return script
+
+
+def usable_pip_file(path: t.Optional[str]) -> bool:
+ """Return True if the specified pip file is usable, otherwise False."""
+ return bool(path) and os.path.exists(path) and bool(os.path.getsize(path))
+
+
+# Cryptography
+
+
+def is_cryptography_available(python: str) -> bool:
+ """Return True if cryptography is available for the given python."""
+ try:
+ raw_command([python, '-c', 'import cryptography'], capture=True)
+ except SubprocessError:
+ return False
+
+ return True
+
+
+def get_cryptography_requirements(python: PythonConfig) -> list[str]:
+ """
+ Return the correct cryptography and pyopenssl requirements for the given python version.
+ The version of cryptography installed depends on the python version and openssl version.
+ """
+ openssl_version = get_openssl_version(python)
+
+ if openssl_version and openssl_version < (1, 1, 0):
+ # cryptography 3.2 requires openssl 1.1.x or later
+ # see https://cryptography.io/en/latest/changelog.html#v3-2
+ cryptography = 'cryptography < 3.2'
+ # pyopenssl 20.0.0 requires cryptography 3.2 or later
+ pyopenssl = 'pyopenssl < 20.0.0'
+ else:
+ # cryptography 3.4+ builds require a working rust toolchain
+ # systems bootstrapped using ansible-core-ci can access additional wheels through the spare-tire package index
+ cryptography = 'cryptography'
+ # any future installation of pyopenssl is free to use any compatible version of cryptography
+ pyopenssl = ''
+
+ requirements = [
+ cryptography,
+ pyopenssl,
+ ]
+
+ requirements = [requirement for requirement in requirements if requirement]
+
+ return requirements
+
+
+def get_openssl_version(python: PythonConfig) -> t.Optional[tuple[int, ...]]:
+ """Return the openssl version."""
+ if not python.version.startswith('2.'):
+ # OpenSSL version checking only works on Python 3.x.
+ # This should be the most accurate, since it is the Python we will be using.
+ version = json.loads(raw_command([python.path, os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'sslcheck.py')], capture=True)[0])['version']
+
+ if version:
+ display.info(f'Detected OpenSSL version {version_to_str(version)} under Python {python.version}.', verbosity=1)
+
+ return tuple(version)
+
+ # Fall back to detecting the OpenSSL version from the CLI.
+ # This should provide an adequate solution on Python 2.x.
+ openssl_path = find_executable('openssl', required=False)
+
+ if openssl_path:
+ try:
+ result = raw_command([openssl_path, 'version'], capture=True)[0]
+ except SubprocessError:
+ result = ''
+
+ match = re.search(r'^OpenSSL (?P<version>[0-9]+\.[0-9]+\.[0-9]+)', result)
+
+ if match:
+ version = str_to_version(match.group('version'))
+
+ display.info(f'Detected OpenSSL version {version_to_str(version)} using the openssl CLI.', verbosity=1)
+
+ return version
+
+ display.info('Unable to detect OpenSSL version.', verbosity=1)
+
+ return None