summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 00:24:37 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 00:24:37 +0000
commit1022b2cebe73db426241c2f420d4ee9f6f3c1bed (patch)
treea5c38ccfaa66e8a52767dec01d3598b67a7422a8 /src
parentInitial commit. (diff)
downloadpython-ansible-compat-1022b2cebe73db426241c2f420d4ee9f6f3c1bed.tar.xz
python-ansible-compat-1022b2cebe73db426241c2f420d4ee9f6f3c1bed.zip
Adding upstream version 4.1.11.upstream/4.1.11
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src')
-rw-r--r--src/ansible_compat/__init__.py9
-rw-r--r--src/ansible_compat/config.py465
-rw-r--r--src/ansible_compat/constants.py42
-rw-r--r--src/ansible_compat/errors.py57
-rw-r--r--src/ansible_compat/loaders.py30
-rw-r--r--src/ansible_compat/ports.py4
-rw-r--r--src/ansible_compat/prerun.py21
-rw-r--r--src/ansible_compat/py.typed0
-rw-r--r--src/ansible_compat/runtime.py961
-rw-r--r--src/ansible_compat/schema.py110
-rw-r--r--src/ansible_compat/types.py23
11 files changed, 1722 insertions, 0 deletions
diff --git a/src/ansible_compat/__init__.py b/src/ansible_compat/__init__.py
new file mode 100644
index 0000000..b23c8ca
--- /dev/null
+++ b/src/ansible_compat/__init__.py
@@ -0,0 +1,9 @@
+"""ansible_compat package."""
+from importlib.metadata import PackageNotFoundError, version
+
+try:
+ __version__ = version("ansible-compat")
+except PackageNotFoundError: # pragma: no cover
+ __version__ = "0.1.dev1"
+
+__all__ = ["__version__"]
diff --git a/src/ansible_compat/config.py b/src/ansible_compat/config.py
new file mode 100644
index 0000000..a0b41b7
--- /dev/null
+++ b/src/ansible_compat/config.py
@@ -0,0 +1,465 @@
+"""Store configuration options as a singleton."""
+from __future__ import annotations
+
+import ast
+import copy
+import os
+import re
+import subprocess
+from collections import UserDict
+from typing import Literal
+
+from packaging.version import Version
+
+from ansible_compat.constants import ANSIBLE_MIN_VERSION
+from ansible_compat.errors import InvalidPrerequisiteError, MissingAnsibleError
+from ansible_compat.ports import cache
+
+
+# do not use lru_cache here, as environment can change between calls
+def ansible_collections_path() -> str:
+ """Return collection path variable for current version of Ansible."""
+ for env_var in [
+ "ANSIBLE_COLLECTIONS_PATH",
+ "ANSIBLE_COLLECTIONS_PATHS",
+ ]:
+ if env_var in os.environ:
+ return env_var
+ return "ANSIBLE_COLLECTIONS_PATH"
+
+
+def parse_ansible_version(stdout: str) -> Version:
+ """Parse output of 'ansible --version'."""
+ # Ansible can produce extra output before displaying version in debug mode.
+
+ # ansible-core 2.11+: 'ansible [core 2.11.3]'
+ match = re.search(
+ r"^ansible \[(?:core|base) (?P<version>[^\]]+)\]",
+ stdout,
+ re.MULTILINE,
+ )
+ if match:
+ return Version(match.group("version"))
+ msg = f"Unable to parse ansible cli version: {stdout}\nKeep in mind that only {ANSIBLE_MIN_VERSION } or newer are supported."
+ raise InvalidPrerequisiteError(msg)
+
+
+@cache
+def ansible_version(version: str = "") -> Version:
+ """Return current Version object for Ansible.
+
+ If version is not mentioned, it returns current version as detected.
+ When version argument is mentioned, it return converts the version string
+ to Version object in order to make it usable in comparisons.
+ """
+ if version:
+ return Version(version)
+
+ proc = subprocess.run(
+ ["ansible", "--version"], # noqa: S603
+ text=True,
+ check=False,
+ capture_output=True,
+ )
+ if proc.returncode != 0:
+ raise MissingAnsibleError(proc=proc)
+
+ return parse_ansible_version(proc.stdout)
+
+
+class AnsibleConfig(UserDict[str, object]): # pylint: disable=too-many-ancestors
+ """Interface to query Ansible configuration.
+
+ This should allow user to access everything provided by `ansible-config dump` without having to parse the data himself.
+ """
+
+ _aliases = {
+ "COLLECTIONS_PATH": "COLLECTIONS_PATHS", # 2.9 -> 2.10
+ }
+ # Expose some attributes to enable auto-complete in editors, based on
+ # https://docs.ansible.com/ansible/latest/reference_appendices/config.html
+ action_warnings: bool = True
+ agnostic_become_prompt: bool = True
+ allow_world_readable_tmpfiles: bool = False
+ ansible_connection_path: str | None = None
+ ansible_cow_acceptlist: list[str]
+ ansible_cow_path: str | None = None
+ ansible_cow_selection: str = "default"
+ ansible_force_color: bool = False
+ ansible_nocolor: bool = False
+ ansible_nocows: bool = False
+ ansible_pipelining: bool = False
+ any_errors_fatal: bool = False
+ become_allow_same_user: bool = False
+ become_plugin_path: list[str] = [
+ "~/.ansible/plugins/become",
+ "/usr/share/ansible/plugins/become",
+ ]
+ cache_plugin: str = "memory"
+ cache_plugin_connection: str | None = None
+ cache_plugin_prefix: str = "ansible_facts"
+ cache_plugin_timeout: int = 86400
+ callable_accept_list: list[str] = []
+ callbacks_enabled: list[str] = []
+ collections_on_ansible_version_mismatch: Literal["warning", "ignore"] = "warning"
+ collections_paths: list[str] = [
+ "~/.ansible/collections",
+ "/usr/share/ansible/collections",
+ ]
+ collections_scan_sys_path: bool = True
+ color_changed: str = "yellow"
+ color_console_prompt: str = "white"
+ color_debug: str = "dark gray"
+ color_deprecate: str = "purple"
+ color_diff_add: str = "green"
+ color_diff_lines: str = "cyan"
+ color_diff_remove: str = "red"
+ color_error: str = "red"
+ color_highlight: str = "white"
+ color_ok: str = "green"
+ color_skip: str = "cyan"
+ color_unreachable: str = "bright red"
+ color_verbose: str = "blue"
+ color_warn: str = "bright purple"
+ command_warnings: bool = False
+ conditional_bare_vars: bool = False
+ connection_facts_modules: dict[str, str]
+ controller_python_warning: bool = True
+ coverage_remote_output: str | None
+ coverage_remote_paths: list[str]
+ default_action_plugin_path: list[str] = [
+ "~/.ansible/plugins/action",
+ "/usr/share/ansible/plugins/action",
+ ]
+ default_allow_unsafe_lookups: bool = False
+ default_ask_pass: bool = False
+ default_ask_vault_pass: bool = False
+ default_become: bool = False
+ default_become_ask_pass: bool = False
+ default_become_exe: str | None = None
+ default_become_flags: str
+ default_become_method: str = "sudo"
+ default_become_user: str = "root"
+ default_cache_plugin_path: list[str] = [
+ "~/.ansible/plugins/cache",
+ "/usr/share/ansible/plugins/cache",
+ ]
+ default_callback_plugin_path: list[str] = [
+ "~/.ansible/plugins/callback",
+ "/usr/share/ansible/plugins/callback",
+ ]
+ default_cliconf_plugin_path: list[str] = [
+ "~/.ansible/plugins/cliconf",
+ "/usr/share/ansible/plugins/cliconf",
+ ]
+ default_connection_plugin_path: list[str] = [
+ "~/.ansible/plugins/connection",
+ "/usr/share/ansible/plugins/connection",
+ ]
+ default_debug: bool = False
+ default_executable: str = "/bin/sh"
+ default_fact_path: str | None = None
+ default_filter_plugin_path: list[str] = [
+ "~/.ansible/plugins/filter",
+ "/usr/share/ansible/plugins/filter",
+ ]
+ default_force_handlers: bool = False
+ default_forks: int = 5
+ default_gathering: Literal["smart", "explicit", "implicit"] = "smart"
+ default_gather_subset: list[str] = ["all"]
+ default_gather_timeout: int = 10
+ default_handler_includes_static: bool = False
+ default_hash_behaviour: str = "replace"
+ default_host_list: list[str] = ["/etc/ansible/hosts"]
+ default_httpapi_plugin_path: list[str] = [
+ "~/.ansible/plugins/httpapi",
+ "/usr/share/ansible/plugins/httpapi",
+ ]
+ default_internal_poll_interval: float = 0.001
+ default_inventory_plugin_path: list[str] = [
+ "~/.ansible/plugins/inventory",
+ "/usr/share/ansible/plugins/inventory",
+ ]
+ default_jinja2_extensions: list[str] = []
+ default_jinja2_native: bool = False
+ default_keep_remote_files: bool = False
+ default_libvirt_lxc_noseclabel: bool = False
+ default_load_callback_plugins: bool = False
+ default_local_tmp: str = "~/.ansible/tmp"
+ default_log_filter: list[str] = []
+ default_log_path: str | None = None
+ default_lookup_lugin_path: list[str] = [
+ "~/.ansible/plugins/lookup",
+ "/usr/share/ansible/plugins/lookup",
+ ]
+ default_managed_str: str = "Ansible managed"
+ default_module_args: str
+ default_module_compression: str = "ZIP_DEFLATED"
+ default_module_name: str = "command"
+ default_module_path: list[str] = [
+ "~/.ansible/plugins/modules",
+ "/usr/share/ansible/plugins/modules",
+ ]
+ default_module_utils_path: list[str] = [
+ "~/.ansible/plugins/module_utils",
+ "/usr/share/ansible/plugins/module_utils",
+ ]
+ default_netconf_plugin_path: list[str] = [
+ "~/.ansible/plugins/netconf",
+ "/usr/share/ansible/plugins/netconf",
+ ]
+ default_no_log: bool = False
+ default_no_target_syslog: bool = False
+ default_null_representation: str | None = None
+ default_poll_interval: int = 15
+ default_private_key_file: str | None = None
+ default_private_role_vars: bool = False
+ default_remote_port: str | None = None
+ default_remote_user: str | None = None
+ # https://docs.ansible.com/ansible/latest/reference_appendices/config.html#collections-paths
+ default_collections_path: list[str] = [
+ "~/.ansible/collections",
+ "/usr/share/ansible/collections",
+ ]
+ default_roles_path: list[str] = [
+ "~/.ansible/roles",
+ "/usr/share/ansible/roles",
+ "/etc/ansible/roles",
+ ]
+ default_selinux_special_fs: list[str] = [
+ "fuse",
+ "nfs",
+ "vboxsf",
+ "ramfs",
+ "9p",
+ "vfat",
+ ]
+ default_stdout_callback: str = "default"
+ default_strategy: str = "linear"
+ default_strategy_plugin_path: list[str] = [
+ "~/.ansible/plugins/strategy",
+ "/usr/share/ansible/plugins/strategy",
+ ]
+ default_su: bool = False
+ default_syslog_facility: str = "LOG_USER"
+ default_task_includes_static: bool = False
+ default_terminal_plugin_path: list[str] = [
+ "~/.ansible/plugins/terminal",
+ "/usr/share/ansible/plugins/terminal",
+ ]
+ default_test_plugin_path: list[str] = [
+ "~/.ansible/plugins/test",
+ "/usr/share/ansible/plugins/test",
+ ]
+ default_timeout: int = 10
+ default_transport: str = "smart"
+ default_undefined_var_behavior: bool = True
+ default_vars_plugin_path: list[str] = [
+ "~/.ansible/plugins/vars",
+ "/usr/share/ansible/plugins/vars",
+ ]
+ default_vault_encrypt_identity: str | None = None
+ default_vault_identity: str = "default"
+ default_vault_identity_list: list[str] = []
+ default_vault_id_match: bool = False
+ default_vault_password_file: str | None = None
+ default_verbosity: int = 0
+ deprecation_warnings: bool = False
+ devel_warning: bool = True
+ diff_always: bool = False
+ diff_context: int = 3
+ display_args_to_stdout: bool = False
+ display_skipped_hosts: bool = True
+ docsite_root_url: str = "https://docs.ansible.com/ansible/"
+ doc_fragment_plugin_path: list[str] = [
+ "~/.ansible/plugins/doc_fragments",
+ "/usr/share/ansible/plugins/doc_fragments",
+ ]
+ duplicate_yaml_dict_key: Literal["warn", "error", "ignore"] = "warn"
+ enable_task_debugger: bool = False
+ error_on_missing_handler: bool = True
+ facts_modules: list[str] = ["smart"]
+ galaxy_cache_dir: str = "~/.ansible/galaxy_cache"
+ galaxy_display_progress: str | None = None
+ galaxy_ignore_certs: bool = False
+ galaxy_role_skeleton: str | None = None
+ galaxy_role_skeleton_ignore: list[str] = ["^.git$", "^.*/.git_keep$"]
+ galaxy_server: str = "https://galaxy.ansible.com"
+ galaxy_server_list: str | None = None
+ galaxy_token_path: str = "~/.ansible/galaxy_token"
+ host_key_checking: bool = True
+ host_pattern_mismatch: Literal["warning", "error", "ignore"] = "warning"
+ inject_facts_as_vars: bool = True
+ interpreter_python: str = "auto_legacy"
+ interpreter_python_distro_map: dict[str, str]
+ interpreter_python_fallback: list[str]
+ invalid_task_attribute_failed: bool = True
+ inventory_any_unparsed_is_failed: bool = False
+ inventory_cache_enabled: bool = False
+ inventory_cache_plugin: str | None = None
+ inventory_cache_plugin_connection: str | None = None
+ inventory_cache_plugin_prefix: str = "ansible_facts"
+ inventory_cache_timeout: int = 3600
+ inventory_enabled: list[str] = [
+ "host_list",
+ "script",
+ "auto",
+ "yaml",
+ "ini",
+ "toml",
+ ]
+ inventory_export: bool = False
+ inventory_ignore_exts: str
+ inventory_ignore_patterns: list[str] = []
+ inventory_unparsed_is_failed: bool = False
+ localhost_warning: bool = True
+ max_file_size_for_diff: int = 104448
+ module_ignore_exts: str
+ netconf_ssh_config: str | None = None
+ network_group_modules: list[str] = [
+ "eos",
+ "nxos",
+ "ios",
+ "iosxr",
+ "junos",
+ "enos",
+ "ce",
+ "vyos",
+ "sros",
+ "dellos9",
+ "dellos10",
+ "dellos6",
+ "asa",
+ "aruba",
+ "aireos",
+ "bigip",
+ "ironware",
+ "onyx",
+ "netconf",
+ "exos",
+ "voss",
+ "slxos",
+ ]
+ old_plugin_cache_clearing: bool = False
+ paramiko_host_key_auto_add: bool = False
+ paramiko_look_for_keys: bool = True
+ persistent_command_timeout: int = 30
+ persistent_connect_retry_timeout: int = 15
+ persistent_connect_timeout: int = 30
+ persistent_control_path_dir: str = "~/.ansible/pc"
+ playbook_dir: str | None
+ playbook_vars_root: Literal["top", "bottom", "all"] = "top"
+ plugin_filters_cfg: str | None = None
+ python_module_rlimit_nofile: int = 0
+ retry_files_enabled: bool = False
+ retry_files_save_path: str | None = None
+ run_vars_plugins: str = "demand"
+ show_custom_stats: bool = False
+ string_conversion_action: Literal["warn", "error", "ignore"] = "warn"
+ string_type_filters: list[str] = [
+ "string",
+ "to_json",
+ "to_nice_json",
+ "to_yaml",
+ "to_nice_yaml",
+ "ppretty",
+ "json",
+ ]
+ system_warnings: bool = True
+ tags_run: list[str] = []
+ tags_skip: list[str] = []
+ task_debugger_ignore_errors: bool = True
+ task_timeout: int = 0
+ transform_invalid_group_chars: Literal[
+ "always",
+ "never",
+ "ignore",
+ "silently",
+ ] = "never"
+ use_persistent_connections: bool = False
+ variable_plugins_enabled: list[str] = ["host_group_vars"]
+ variable_precedence: list[str] = [
+ "all_inventory",
+ "groups_inventory",
+ "all_plugins_inventory",
+ "all_plugins_play",
+ "groups_plugins_inventory",
+ "groups_plugins_play",
+ ]
+ verbose_to_stderr: bool = False
+ win_async_startup_timeout: int = 5
+ worker_shutdown_poll_count: int = 0
+ worker_shutdown_poll_delay: float = 0.1
+ yaml_filename_extensions: list[str] = [".yml", ".yaml", ".json"]
+
+ def __init__(
+ self,
+ config_dump: str | None = None,
+ data: dict[str, object] | None = None,
+ ) -> None:
+ """Load config dictionary."""
+ super().__init__()
+
+ if data:
+ self.data = copy.deepcopy(data)
+ return
+
+ if not config_dump:
+ env = os.environ.copy()
+ # Avoid possible ANSI garbage
+ env["ANSIBLE_FORCE_COLOR"] = "0"
+ config_dump = subprocess.check_output(
+ ["ansible-config", "dump"], # noqa: S603
+ universal_newlines=True,
+ env=env,
+ )
+
+ for match in re.finditer(
+ r"^(?P<key>[A-Za-z0-9_]+).* = (?P<value>.*)$",
+ config_dump,
+ re.MULTILINE,
+ ):
+ key = match.groupdict()["key"]
+ value = match.groupdict()["value"]
+ try:
+ self[key] = ast.literal_eval(value)
+ except (NameError, SyntaxError, ValueError):
+ self[key] = value
+
+ def __getattribute__(self, attr_name: str) -> object:
+ """Allow access of config options as attributes."""
+ _dict = super().__dict__ # pylint: disable=no-member
+ if attr_name in _dict:
+ return _dict[attr_name]
+
+ data = super().__getattribute__("data")
+ if attr_name == "data": # pragma: no cover
+ return data
+
+ name = attr_name.upper()
+ if name in data:
+ return data[name]
+ if name in AnsibleConfig._aliases:
+ return data[AnsibleConfig._aliases[name]]
+
+ return super().__getattribute__(attr_name)
+
+ def __getitem__(self, name: str) -> object:
+ """Allow access to config options using indexing."""
+ return super().__getitem__(name.upper())
+
+ def __copy__(self) -> AnsibleConfig:
+ """Allow users to run copy on Config."""
+ return AnsibleConfig(data=self.data)
+
+ def __deepcopy__(self, memo: object) -> AnsibleConfig:
+ """Allow users to run deeepcopy on Config."""
+ return AnsibleConfig(data=self.data)
+
+
+__all__ = [
+ "ansible_collections_path",
+ "parse_ansible_version",
+ "ansible_version",
+ "AnsibleConfig",
+]
diff --git a/src/ansible_compat/constants.py b/src/ansible_compat/constants.py
new file mode 100644
index 0000000..f3d7866
--- /dev/null
+++ b/src/ansible_compat/constants.py
@@ -0,0 +1,42 @@
+"""Constants used by ansible_compat."""
+
+from pathlib import Path
+
+META_MAIN = (Path("meta") / Path("main.yml"), Path("meta") / Path("main.yaml"))
+REQUIREMENT_LOCATIONS = [
+ "requirements.yml",
+ "roles/requirements.yml",
+ "collections/requirements.yml",
+ # These is more of less the official way to store test requirements in collections so far, comments shows number of repos using this reported by https://sourcegraph.com/ at the time of writing
+ "tests/requirements.yml", # 170
+ "tests/integration/requirements.yml", # 3
+ "tests/unit/requirements.yml", # 1
+]
+
+# Minimal version of Ansible we support for runtime
+ANSIBLE_MIN_VERSION = "2.12"
+
+# Based on https://docs.ansible.com/ansible/latest/reference_appendices/config.html
+ANSIBLE_DEFAULT_ROLES_PATH = (
+ "~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles"
+)
+
+INVALID_CONFIG_RC = 2
+ANSIBLE_MISSING_RC = 4
+INVALID_PREREQUISITES_RC = 10
+
+MSG_INVALID_FQRL = """\
+Computed fully qualified role name of {0} does not follow current galaxy requirements.
+Please edit meta/main.yml and assure we can correctly determine full role name:
+
+galaxy_info:
+role_name: my_name # if absent directory name hosting role is used instead
+namespace: my_galaxy_namespace # if absent, author is used instead
+
+Namespace: https://galaxy.ansible.com/docs/contributing/namespaces.html#galaxy-namespace-limitations
+Role: https://galaxy.ansible.com/docs/contributing/creating_role.html#role-names
+
+As an alternative, you can add 'role-name' to either skip_list or warn_list.
+"""
+
+RC_ANSIBLE_OPTIONS_ERROR = 5
diff --git a/src/ansible_compat/errors.py b/src/ansible_compat/errors.py
new file mode 100644
index 0000000..6369412
--- /dev/null
+++ b/src/ansible_compat/errors.py
@@ -0,0 +1,57 @@
+"""Module to deal with errors."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from ansible_compat.constants import ANSIBLE_MISSING_RC, INVALID_PREREQUISITES_RC
+
+if TYPE_CHECKING:
+ from subprocess import CompletedProcess
+
+
+class AnsibleCompatError(RuntimeError):
+ """Generic error originating from ansible_compat library."""
+
+ code = 1 # generic error
+
+ def __init__(
+ self,
+ message: str | None = None,
+ proc: CompletedProcess[Any] | None = None,
+ ) -> None:
+ """Construct generic library exception."""
+ super().__init__(message)
+ self.proc = proc
+
+
+class AnsibleCommandError(RuntimeError):
+ """Exception running an Ansible command."""
+
+ def __init__(self, proc: CompletedProcess[Any]) -> None:
+ """Construct an exception given a completed process."""
+ message = (
+ f"Got {proc.returncode} exit code while running: {' '.join(proc.args)}"
+ )
+ super().__init__(message)
+ self.proc = proc
+
+
+class MissingAnsibleError(AnsibleCompatError):
+ """Reports a missing or broken Ansible installation."""
+
+ code = ANSIBLE_MISSING_RC
+
+ def __init__(
+ self,
+ message: str | None = "Unable to find a working copy of ansible executable.",
+ proc: CompletedProcess[Any] | None = None,
+ ) -> None:
+ """."""
+ super().__init__(message)
+ self.proc = proc
+
+
+class InvalidPrerequisiteError(AnsibleCompatError):
+ """Reports a missing requirement."""
+
+ code = INVALID_PREREQUISITES_RC
diff --git a/src/ansible_compat/loaders.py b/src/ansible_compat/loaders.py
new file mode 100644
index 0000000..d2ae080
--- /dev/null
+++ b/src/ansible_compat/loaders.py
@@ -0,0 +1,30 @@
+"""Utilities for loading various files."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import yaml
+
+from ansible_compat.errors import InvalidPrerequisiteError
+
+if TYPE_CHECKING:
+ from pathlib import Path
+
+
+def yaml_from_file(path: Path) -> Any: # noqa: ANN401
+ """Return a loaded YAML file."""
+ with path.open(encoding="utf-8") as content:
+ return yaml.load(content, Loader=yaml.SafeLoader)
+
+
+def colpath_from_path(path: Path) -> str | None:
+ """Return a FQCN from a path."""
+ galaxy_file = path / "galaxy.yml"
+ if galaxy_file.exists():
+ galaxy = yaml_from_file(galaxy_file)
+ for k in ("namespace", "name"):
+ if k not in galaxy:
+ msg = f"{galaxy_file} is missing the following mandatory field {k}"
+ raise InvalidPrerequisiteError(msg)
+ return f"{galaxy['namespace']}/{galaxy['name']}"
+ return None
diff --git a/src/ansible_compat/ports.py b/src/ansible_compat/ports.py
new file mode 100644
index 0000000..9c46ae6
--- /dev/null
+++ b/src/ansible_compat/ports.py
@@ -0,0 +1,4 @@
+"""Portability helpers."""
+from functools import cache, cached_property
+
+__all__ = ["cache", "cached_property"]
diff --git a/src/ansible_compat/prerun.py b/src/ansible_compat/prerun.py
new file mode 100644
index 0000000..6dfa44f
--- /dev/null
+++ b/src/ansible_compat/prerun.py
@@ -0,0 +1,21 @@
+"""Utilities for configuring ansible runtime environment."""
+import hashlib
+import os
+from pathlib import Path
+
+
+def get_cache_dir(project_dir: Path) -> Path:
+ """Compute cache directory to be used based on project path."""
+ # we only use the basename instead of the full path in order to ensure that
+ # we would use the same key regardless the location of the user home
+ # directory or where the project is clones (as long the project folder uses
+ # the same name).
+ basename = project_dir.resolve().name.encode(encoding="utf-8")
+ # 6 chars of entropy should be enough
+ cache_key = hashlib.sha256(basename).hexdigest()[:6]
+ cache_dir = (
+ Path(os.getenv("XDG_CACHE_HOME", "~/.cache")).expanduser()
+ / "ansible-compat"
+ / cache_key
+ )
+ return cache_dir
diff --git a/src/ansible_compat/py.typed b/src/ansible_compat/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/ansible_compat/py.typed
diff --git a/src/ansible_compat/runtime.py b/src/ansible_compat/runtime.py
new file mode 100644
index 0000000..ad81132
--- /dev/null
+++ b/src/ansible_compat/runtime.py
@@ -0,0 +1,961 @@
+"""Ansible runtime environment manager."""
+from __future__ import annotations
+
+import contextlib
+import importlib
+import json
+import logging
+import os
+import re
+import shutil
+import subprocess
+import sys
+import warnings
+from collections import OrderedDict
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Callable, no_type_check
+
+import subprocess_tee
+from packaging.version import Version
+
+from ansible_compat.config import (
+ AnsibleConfig,
+ ansible_collections_path,
+ ansible_version,
+ parse_ansible_version,
+)
+from ansible_compat.constants import (
+ META_MAIN,
+ MSG_INVALID_FQRL,
+ RC_ANSIBLE_OPTIONS_ERROR,
+ REQUIREMENT_LOCATIONS,
+)
+from ansible_compat.errors import (
+ AnsibleCommandError,
+ AnsibleCompatError,
+ InvalidPrerequisiteError,
+ MissingAnsibleError,
+)
+from ansible_compat.loaders import colpath_from_path, yaml_from_file
+from ansible_compat.prerun import get_cache_dir
+
+if TYPE_CHECKING:
+ # https://github.com/PyCQA/pylint/issues/3240
+ # pylint: disable=unsubscriptable-object
+ CompletedProcess = subprocess.CompletedProcess[Any]
+else:
+ CompletedProcess = subprocess.CompletedProcess
+
+
+_logger = logging.getLogger(__name__)
+# regex to extract the first version from a collection range specifier
+version_re = re.compile(":[>=<]*([^,]*)")
+namespace_re = re.compile("^[a-z][a-z0-9_]+$")
+
+
+class AnsibleWarning(Warning):
+ """Warnings related to Ansible runtime."""
+
+
+@dataclass
+class Collection:
+ """Container for Ansible collection information."""
+
+ name: str
+ version: str
+ path: Path
+
+
+class CollectionVersion(Version):
+ """Collection version."""
+
+ def __init__(self, version: str) -> None:
+ """Initialize collection version."""
+ # As packaging Version class does not support wildcard, we convert it
+ # to "0", as this being the smallest version possible.
+ if version == "*":
+ version = "0"
+ super().__init__(version)
+
+
+@dataclass
+class Plugins: # pylint: disable=too-many-instance-attributes
+ """Dataclass to access installed Ansible plugins, uses ansible-doc to retrieve them."""
+
+ runtime: Runtime
+ become: dict[str, str] = field(init=False)
+ cache: dict[str, str] = field(init=False)
+ callback: dict[str, str] = field(init=False)
+ cliconf: dict[str, str] = field(init=False)
+ connection: dict[str, str] = field(init=False)
+ httpapi: dict[str, str] = field(init=False)
+ inventory: dict[str, str] = field(init=False)
+ lookup: dict[str, str] = field(init=False)
+ netconf: dict[str, str] = field(init=False)
+ shell: dict[str, str] = field(init=False)
+ vars: dict[str, str] = field(init=False) # noqa: A003
+ module: dict[str, str] = field(init=False)
+ strategy: dict[str, str] = field(init=False)
+ test: dict[str, str] = field(init=False)
+ filter: dict[str, str] = field(init=False) # noqa: A003
+ role: dict[str, str] = field(init=False)
+ keyword: dict[str, str] = field(init=False)
+
+ @no_type_check
+ def __getattribute__(self, attr: str): # noqa: ANN204
+ """Get attribute."""
+ if attr in {
+ "become",
+ "cache",
+ "callback",
+ "cliconf",
+ "connection",
+ "httpapi",
+ "inventory",
+ "lookup",
+ "netconf",
+ "shell",
+ "vars",
+ "module",
+ "strategy",
+ "test",
+ "filter",
+ "role",
+ "keyword",
+ }:
+ try:
+ result = super().__getattribute__(attr)
+ except AttributeError as exc:
+ if ansible_version() < Version("2.14") and attr in {"filter", "test"}:
+ msg = "Ansible version below 2.14 does not support retrieving filter and test plugins."
+ raise RuntimeError(msg) from exc
+ proc = self.runtime.run(
+ ["ansible-doc", "--json", "-l", "-t", attr],
+ )
+ data = json.loads(proc.stdout)
+ if not isinstance(data, dict): # pragma: no cover
+ msg = "Unexpected output from ansible-doc"
+ raise AnsibleCompatError(msg) from exc
+ result = data
+ else:
+ result = super().__getattribute__(attr)
+
+ return result
+
+
+# pylint: disable=too-many-instance-attributes
+class Runtime:
+ """Ansible Runtime manager."""
+
+ _version: Version | None = None
+ collections: OrderedDict[str, Collection] = OrderedDict()
+ cache_dir: Path | None = None
+ # Used to track if we have already initialized the Ansible runtime as attempts
+ # to do it multiple tilmes will cause runtime warnings from within ansible-core
+ initialized: bool = False
+ plugins: Plugins
+
+ def __init__(
+ self,
+ project_dir: Path | None = None,
+ *,
+ isolated: bool = False,
+ min_required_version: str | None = None,
+ require_module: bool = False,
+ max_retries: int = 0,
+ environ: dict[str, str] | None = None,
+ verbosity: int = 0,
+ ) -> None:
+ """Initialize Ansible runtime environment.
+
+ :param project_dir: The directory containing the Ansible project. If
+ not mentioned it will be guessed from the current
+ working directory.
+ :param isolated: Assure that installation of collections or roles
+ does not affect Ansible installation, an unique cache
+ directory being used instead.
+ :param min_required_version: Minimal version of Ansible required. If
+ not found, a :class:`RuntimeError`
+ exception is raised.
+ :param require_module: If set, instantiation will fail if Ansible
+ Python module is missing or is not matching
+ the same version as the Ansible command line.
+ That is useful for consumers that expect to
+ also perform Python imports from Ansible.
+ :param max_retries: Number of times it should retry network operations.
+ Default is 0, no retries.
+ :param environ: Environment dictionary to use, if undefined
+ ``os.environ`` will be copied and used.
+ :param verbosity: Verbosity level to use.
+ """
+ self.project_dir = project_dir or Path.cwd()
+ self.isolated = isolated
+ self.max_retries = max_retries
+ self.environ = environ or os.environ.copy()
+ self.plugins = Plugins(runtime=self)
+ self.verbosity = verbosity
+
+ self.initialize_logger(level=self.verbosity)
+
+ # Reduce noise from paramiko, unless user already defined PYTHONWARNINGS
+ # paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated
+ # https://github.com/paramiko/paramiko/issues/2038
+ # As CryptographyDeprecationWarning is not a builtin, we cannot use
+ # PYTHONWARNINGS to ignore it using category but we can use message.
+ # https://stackoverflow.com/q/68251969/99834
+ if "PYTHONWARNINGS" not in self.environ: # pragma: no cover
+ self.environ["PYTHONWARNINGS"] = "ignore:Blowfish has been deprecated"
+
+ if isolated:
+ self.cache_dir = get_cache_dir(self.project_dir)
+ self.config = AnsibleConfig()
+
+ # Add the sys.path to the collection paths if not isolated
+ self._add_sys_path_to_collection_paths()
+
+ if not self.version_in_range(lower=min_required_version):
+ msg = f"Found incompatible version of ansible runtime {self.version}, instead of {min_required_version} or newer."
+ raise RuntimeError(msg)
+ if require_module:
+ self._ensure_module_available()
+
+ # pylint: disable=import-outside-toplevel
+ from ansible.utils.display import Display
+
+ # pylint: disable=unused-argument
+ def warning(
+ self: Display, # noqa: ARG001
+ msg: str,
+ *,
+ formatted: bool = False, # noqa: ARG001
+ ) -> None:
+ """Override ansible.utils.display.Display.warning to avoid printing warnings."""
+ warnings.warn(
+ message=msg,
+ category=AnsibleWarning,
+ stacklevel=2,
+ source={"msg": msg},
+ )
+
+ # Monkey patch ansible warning in order to use warnings module.
+ Display.warning = warning
+
+ def initialize_logger(self, level: int = 0) -> None:
+ """Set up the global logging level based on the verbosity number."""
+ verbosity_map = {
+ -2: logging.CRITICAL,
+ -1: logging.ERROR,
+ 0: logging.WARNING,
+ 1: logging.INFO,
+ 2: logging.DEBUG,
+ }
+ # Unknown logging level is treated as DEBUG
+ logging_level = verbosity_map.get(level, logging.DEBUG)
+ _logger.setLevel(logging_level)
+ # Use module-level _logger instance to validate it
+ _logger.debug("Logging initialized to level %s", logging_level)
+
+ def _add_sys_path_to_collection_paths(self) -> None:
+ """Add the sys.path to the collection paths."""
+ if self.config.collections_scan_sys_path:
+ for path in sys.path:
+ if (
+ path not in self.config.collections_paths
+ and (Path(path) / "ansible_collections").is_dir()
+ ):
+ self.config.collections_paths.append( # pylint: disable=E1101
+ path,
+ )
+
+ def load_collections(self) -> None:
+ """Load collection data."""
+ self.collections = OrderedDict()
+ no_collections_msg = "None of the provided paths were usable"
+
+ proc = self.run(["ansible-galaxy", "collection", "list", "--format=json"])
+ if proc.returncode == RC_ANSIBLE_OPTIONS_ERROR and (
+ no_collections_msg in proc.stdout or no_collections_msg in proc.stderr
+ ):
+ _logger.debug("Ansible reported no installed collections at all.")
+ return
+ if proc.returncode != 0:
+ _logger.error(proc)
+ msg = f"Unable to list collections: {proc}"
+ raise RuntimeError(msg)
+ data = json.loads(proc.stdout)
+ if not isinstance(data, dict):
+ msg = f"Unexpected collection data, {data}"
+ raise TypeError(msg)
+ for path in data:
+ for collection, collection_info in data[path].items():
+ if not isinstance(collection, str):
+ msg = f"Unexpected collection data, {collection}"
+ raise TypeError(msg)
+ if not isinstance(collection_info, dict):
+ msg = f"Unexpected collection data, {collection_info}"
+ raise TypeError(msg)
+
+ self.collections[collection] = Collection(
+ name=collection,
+ version=collection_info["version"],
+ path=path,
+ )
+
+ def _ensure_module_available(self) -> None:
+ """Assure that Ansible Python module is installed and matching CLI version."""
+ ansible_release_module = None
+ with contextlib.suppress(ModuleNotFoundError, ImportError):
+ ansible_release_module = importlib.import_module("ansible.release")
+
+ if ansible_release_module is None:
+ msg = "Unable to find Ansible python module."
+ raise RuntimeError(msg)
+
+ ansible_module_version = Version(
+ ansible_release_module.__version__,
+ )
+ if ansible_module_version != self.version:
+ msg = f"Ansible CLI ({self.version}) and python module ({ansible_module_version}) versions do not match. This indicates a broken execution environment."
+ raise RuntimeError(msg)
+
+ # For ansible 2.15+ we need to initialize the plugin loader
+ # https://github.com/ansible/ansible-lint/issues/2945
+ if not Runtime.initialized:
+ col_path = [f"{self.cache_dir}/collections"]
+ if self.version >= Version("2.15.0.dev0"):
+ # pylint: disable=import-outside-toplevel,no-name-in-module
+ from ansible.plugins.loader import init_plugin_loader
+
+ init_plugin_loader(col_path)
+ else:
+ # noinspection PyProtectedMember
+ from ansible.utils.collection_loader._collection_finder import ( # pylint: disable=import-outside-toplevel
+ _AnsibleCollectionFinder,
+ )
+
+ # noinspection PyProtectedMember
+ # pylint: disable=protected-access
+ col_path += self.config.collections_paths
+ col_path += os.path.dirname( # noqa: PTH120
+ os.environ.get(ansible_collections_path(), "."),
+ ).split(":")
+ _AnsibleCollectionFinder( # noqa: SLF001
+ paths=col_path,
+ )._install() # pylint: disable=protected-access
+ Runtime.initialized = True
+
+ def clean(self) -> None:
+ """Remove content of cache_dir."""
+ if self.cache_dir:
+ shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+ def run( # ruff: disable=PLR0913
+ self,
+ args: str | list[str],
+ *,
+ retry: bool = False,
+ tee: bool = False,
+ env: dict[str, str] | None = None,
+ cwd: Path | None = None,
+ ) -> CompletedProcess:
+ """Execute a command inside an Ansible environment.
+
+ :param retry: Retry network operations on failures.
+ :param tee: Also pass captured stdout/stderr to system while running.
+ """
+ if tee:
+ run_func: Callable[..., CompletedProcess] = subprocess_tee.run
+ else:
+ run_func = subprocess.run
+ env = self.environ if env is None else env.copy()
+ # Presence of ansible debug variable or config option will prevent us
+ # from parsing its JSON output due to extra debug messages on stdout.
+ env["ANSIBLE_DEBUG"] = "0"
+
+ # https://github.com/ansible/ansible-lint/issues/3522
+ env["ANSIBLE_VERBOSE_TO_STDERR"] = "True"
+
+ for _ in range(self.max_retries + 1 if retry else 1):
+ result = run_func(
+ args,
+ universal_newlines=True,
+ check=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env,
+ cwd=str(cwd) if cwd else None,
+ )
+ if result.returncode == 0:
+ break
+ _logger.debug("Environment: %s", env)
+ if retry:
+ _logger.warning(
+ "Retrying execution failure %s of: %s",
+ result.returncode,
+ " ".join(args),
+ )
+ return result
+
+ @property
+ def version(self) -> Version:
+ """Return current Version object for Ansible.
+
+ If version is not mentioned, it returns current version as detected.
+ When version argument is mentioned, it return converts the version string
+ to Version object in order to make it usable in comparisons.
+ """
+ if self._version:
+ return self._version
+
+ proc = self.run(["ansible", "--version"])
+ if proc.returncode == 0:
+ self._version = parse_ansible_version(proc.stdout)
+ return self._version
+
+ msg = "Unable to find a working copy of ansible executable."
+ raise MissingAnsibleError(msg, proc=proc)
+
+ def version_in_range(
+ self,
+ lower: str | None = None,
+ upper: str | None = None,
+ ) -> bool:
+ """Check if Ansible version is inside a required range.
+
+ The lower limit is inclusive and the upper one exclusive.
+ """
+ if lower and self.version < Version(lower):
+ return False
+ if upper and self.version >= Version(upper):
+ return False
+ return True
+
+ def install_collection(
+ self,
+ collection: str | Path,
+ *,
+ destination: Path | None = None,
+ force: bool = False,
+ ) -> None:
+ """Install an Ansible collection.
+
+ Can accept arguments like:
+ 'foo.bar:>=1.2.3'
+ 'git+https://github.com/ansible-collections/ansible.posix.git,main'
+ """
+ cmd = [
+ "ansible-galaxy",
+ "collection",
+ "install",
+ "-vvv", # this is needed to make ansible display important info in case of failures
+ ]
+ if force:
+ cmd.append("--force")
+
+ if isinstance(collection, Path):
+ collection = str(collection)
+ # As ansible-galaxy install is not able to automatically determine
+ # if the range requires a pre-release, we need to manually add the --pre
+ # flag when needed.
+ matches = version_re.search(collection)
+
+ if (
+ not is_url(collection)
+ and matches
+ and CollectionVersion(matches[1]).is_prerelease
+ ):
+ cmd.append("--pre")
+
+ cpaths: list[str] = self.config.collections_paths
+ if destination and str(destination) not in cpaths:
+ # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so
+ # we hack ansible_collections_path instead and inject our own path there.
+ # pylint: disable=no-member
+ cpaths.insert(0, str(destination))
+ cmd.append(f"{collection}")
+
+ _logger.info("Running from %s : %s", Path.cwd(), " ".join(cmd))
+ process = self.run(
+ cmd,
+ retry=True,
+ env={**self.environ, ansible_collections_path(): ":".join(cpaths)},
+ )
+ if process.returncode != 0:
+ msg = f"Command returned {process.returncode} code:\n{process.stdout}\n{process.stderr}"
+ _logger.error(msg)
+ raise InvalidPrerequisiteError(msg)
+
+ def install_collection_from_disk(
+ self,
+ path: Path,
+ destination: Path | None = None,
+ ) -> None:
+ """Build and install collection from a given disk path."""
+ self.install_collection(path, destination=destination, force=True)
+
+ # pylint: disable=too-many-branches
+ def install_requirements( # noqa: C901
+ self,
+ requirement: Path,
+ *,
+ retry: bool = False,
+ offline: bool = False,
+ ) -> None:
+ """Install dependencies from a requirements.yml.
+
+ :param requirement: path to requirements.yml file
+ :param retry: retry network operations on failures
+ :param offline: bypass installation, may fail if requirements are not met.
+ """
+ if not Path(requirement).exists():
+ return
+ reqs_yaml = yaml_from_file(Path(requirement))
+ if not isinstance(reqs_yaml, (dict, list)):
+ msg = f"{requirement} file is not a valid Ansible requirements file."
+ raise InvalidPrerequisiteError(msg)
+
+ if isinstance(reqs_yaml, dict):
+ for key in reqs_yaml:
+ if key not in ("roles", "collections"):
+ msg = f"{requirement} file is not a valid Ansible requirements file. Only 'roles' and 'collections' keys are allowed at root level. Recognized valid locations are: {', '.join(REQUIREMENT_LOCATIONS)}"
+ raise InvalidPrerequisiteError(msg)
+
+ if isinstance(reqs_yaml, list) or "roles" in reqs_yaml:
+ cmd = [
+ "ansible-galaxy",
+ "role",
+ "install",
+ "-r",
+ f"{requirement}",
+ ]
+ if self.verbosity > 0:
+ cmd.extend(["-" + ("v" * self.verbosity)])
+ if self.cache_dir:
+ cmd.extend(["--roles-path", f"{self.cache_dir}/roles"])
+
+ if offline:
+ _logger.warning(
+ "Skipped installing old role dependencies due to running in offline mode.",
+ )
+ else:
+ _logger.info("Running %s", " ".join(cmd))
+
+ result = self.run(cmd, retry=retry)
+ _logger.debug(result.stdout)
+ if result.returncode != 0:
+ _logger.error(result.stderr)
+ raise AnsibleCommandError(result)
+
+ # Run galaxy collection install works on v2 requirements.yml
+ if "collections" in reqs_yaml and reqs_yaml["collections"] is not None:
+ cmd = [
+ "ansible-galaxy",
+ "collection",
+ "install",
+ ]
+ if self.verbosity > 0:
+ cmd.extend(["-" + ("v" * self.verbosity)])
+
+ for collection in reqs_yaml["collections"]:
+ if isinstance(collection, dict) and collection.get("type", "") == "git":
+ _logger.info(
+ "Adding '--pre' to ansible-galaxy collection install because we detected one collection being sourced from git.",
+ )
+ cmd.append("--pre")
+ break
+ if offline:
+ _logger.warning(
+ "Skipped installing collection dependencies due to running in offline mode.",
+ )
+ else:
+ cmd.extend(["-r", str(requirement)])
+ cpaths = self.config.collections_paths
+ if self.cache_dir:
+ # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so
+ # we hack ansible_collections_path instead and inject our own path there.
+ dest_path = f"{self.cache_dir}/collections"
+ if dest_path not in cpaths:
+ # pylint: disable=no-member
+ cpaths.insert(0, dest_path)
+ _logger.info("Running %s", " ".join(cmd))
+ result = self.run(
+ cmd,
+ retry=retry,
+ env={**os.environ, "ANSIBLE_COLLECTIONS_PATH": ":".join(cpaths)},
+ )
+ _logger.debug(result.stdout)
+ if result.returncode != 0:
+ _logger.error(result.stderr)
+ raise AnsibleCommandError(result)
+
+ def prepare_environment( # noqa: C901
+ self,
+ required_collections: dict[str, str] | None = None,
+ *,
+ retry: bool = False,
+ install_local: bool = False,
+ offline: bool = False,
+ role_name_check: int = 0,
+ ) -> None:
+ """Make dependencies available if needed."""
+ destination: Path | None = None
+ if required_collections is None:
+ required_collections = {}
+
+ # first one is standard for collection layout repos and the last two
+ # are part of Tower specification
+ # https://docs.ansible.com/ansible-tower/latest/html/userguide/projects.html#ansible-galaxy-support
+ # https://docs.ansible.com/ansible-tower/latest/html/userguide/projects.html#collections-support
+ for req_file in REQUIREMENT_LOCATIONS:
+ self.install_requirements(Path(req_file), retry=retry, offline=offline)
+
+ self._prepare_ansible_paths()
+
+ if not install_local:
+ return
+
+ for gpath in search_galaxy_paths(self.project_dir):
+ # processing all found galaxy.yml files
+ galaxy_path = Path(gpath)
+ if galaxy_path.exists():
+ data = yaml_from_file(galaxy_path)
+ if isinstance(data, dict) and "dependencies" in data:
+ for name, required_version in data["dependencies"].items():
+ _logger.info(
+ "Provisioning collection %s:%s from galaxy.yml",
+ name,
+ required_version,
+ )
+ self.install_collection(
+ f"{name}{',' if is_url(name) else ':'}{required_version}",
+ destination=destination,
+ )
+
+ if self.cache_dir:
+ destination = self.cache_dir / "collections"
+ for name, min_version in required_collections.items():
+ self.install_collection(
+ f"{name}:>={min_version}",
+ destination=destination,
+ )
+
+ if (self.project_dir / "galaxy.yml").exists():
+ if destination:
+ # while function can return None, that would not break the logic
+ colpath = Path(
+ f"{destination}/ansible_collections/{colpath_from_path(self.project_dir)}",
+ )
+ if colpath.is_symlink():
+ if os.path.realpath(colpath) == str(Path.cwd()):
+ _logger.warning(
+ "Found symlinked collection, skipping its installation.",
+ )
+ return
+ _logger.warning(
+ "Collection is symlinked, but not pointing to %s directory, so we will remove it.",
+ Path.cwd(),
+ )
+ colpath.unlink()
+
+ # molecule scenario within a collection
+ self.install_collection_from_disk(
+ galaxy_path.parent,
+ destination=destination,
+ )
+ elif (
+ Path().resolve().parent.name == "roles"
+ and Path("../../galaxy.yml").exists()
+ ):
+ # molecule scenario located within roles/<role-name>/molecule inside
+ # a collection
+ self.install_collection_from_disk(
+ Path("../.."),
+ destination=destination,
+ )
+ else:
+ # no collection, try to recognize and install a standalone role
+ self._install_galaxy_role(
+ self.project_dir,
+ role_name_check=role_name_check,
+ ignore_errors=True,
+ )
+ # reload collections
+ self.load_collections()
+
+ def require_collection(
+ self,
+ name: str,
+ version: str | None = None,
+ *,
+ install: bool = True,
+ ) -> tuple[CollectionVersion, Path]:
+ """Check if a minimal collection version is present or exits.
+
+ In the future this method may attempt to install a missing or outdated
+ collection before failing.
+
+ :param name: collection name
+ :param version: minimal version required
+ :param install: if True, attempt to install a missing collection
+ :returns: tuple of (found_version, collection_path)
+ """
+ try:
+ ns, coll = name.split(".", 1)
+ except ValueError as exc:
+ msg = f"Invalid collection name supplied: {name}%s"
+ raise InvalidPrerequisiteError(
+ msg,
+ ) from exc
+
+ paths: list[str] = self.config.collections_paths
+ if not paths or not isinstance(paths, list):
+ msg = f"Unable to determine ansible collection paths. ({paths})"
+ raise InvalidPrerequisiteError(
+ msg,
+ )
+
+ if self.cache_dir:
+ # if we have a cache dir, we want to be use that would be preferred
+ # destination when installing a missing collection
+ # https://github.com/PyCQA/pylint/issues/4667
+ paths.insert(0, f"{self.cache_dir}/collections") # pylint: disable=E1101
+
+ for path in paths:
+ collpath = Path(path) / "ansible_collections" / ns / coll
+ if collpath.exists():
+ mpath = collpath / "MANIFEST.json"
+ if not mpath.exists():
+ msg = f"Found collection at '{collpath}' but missing MANIFEST.json, cannot get info."
+ _logger.fatal(msg)
+ raise InvalidPrerequisiteError(msg)
+
+ with mpath.open(encoding="utf-8") as f:
+ manifest = json.loads(f.read())
+ found_version = CollectionVersion(
+ manifest["collection_info"]["version"],
+ )
+ if version and found_version < CollectionVersion(version):
+ if install:
+ self.install_collection(f"{name}:>={version}")
+ self.require_collection(name, version, install=False)
+ else:
+ msg = f"Found {name} collection {found_version} but {version} or newer is required."
+ _logger.fatal(msg)
+ raise InvalidPrerequisiteError(msg)
+ return found_version, collpath.resolve()
+ break
+ else:
+ if install:
+ self.install_collection(f"{name}:>={version}" if version else name)
+ return self.require_collection(
+ name=name,
+ version=version,
+ install=False,
+ )
+ msg = f"Collection '{name}' not found in '{paths}'"
+ _logger.fatal(msg)
+ raise InvalidPrerequisiteError(msg)
+
+ def _prepare_ansible_paths(self) -> None:
+ """Configure Ansible environment variables."""
+ try:
+ library_paths: list[str] = self.config.default_module_path.copy()
+ roles_path: list[str] = self.config.default_roles_path.copy()
+ collections_path: list[str] = self.config.collections_paths.copy()
+ except AttributeError as exc:
+ msg = "Unexpected ansible configuration"
+ raise RuntimeError(msg) from exc
+
+ alterations_list: list[tuple[list[str], str, bool]] = [
+ (library_paths, "plugins/modules", True),
+ (roles_path, "roles", True),
+ ]
+
+ alterations_list.extend(
+ [
+ (roles_path, f"{self.cache_dir}/roles", False),
+ (library_paths, f"{self.cache_dir}/modules", False),
+ (collections_path, f"{self.cache_dir}/collections", False),
+ ]
+ if self.isolated
+ else [],
+ )
+
+ for path_list, path_, must_be_present in alterations_list:
+ path = Path(path_)
+ if not path.exists():
+ if must_be_present:
+ continue
+ path.mkdir(parents=True, exist_ok=True)
+ if str(path) not in path_list:
+ path_list.insert(0, str(path))
+
+ if library_paths != self.config.DEFAULT_MODULE_PATH:
+ self._update_env("ANSIBLE_LIBRARY", library_paths)
+ if collections_path != self.config.default_collections_path:
+ self._update_env(ansible_collections_path(), collections_path)
+ if roles_path != self.config.default_roles_path:
+ self._update_env("ANSIBLE_ROLES_PATH", roles_path)
+
+ def _get_roles_path(self) -> Path:
+ """Return roles installation path.
+
+ If `self.isolated` is set to `True`, `self.cache_dir` would be
+ created, then it returns the `self.cache_dir/roles`. When `self.isolated` is
+ not mentioned or set to `False`, it returns the first path in
+ `default_roles_path`.
+ """
+ if self.cache_dir:
+ path = Path(f"{self.cache_dir}/roles")
+ else:
+ path = Path(self.config.default_roles_path[0]).expanduser()
+ return path
+
+ def _install_galaxy_role(
+ self,
+ project_dir: Path,
+ role_name_check: int = 0,
+ *,
+ ignore_errors: bool = False,
+ ) -> None:
+ """Detect standalone galaxy role and installs it.
+
+ :param: role_name_check: logic to used to check role name
+ 0: exit with error if name is not compliant (default)
+ 1: warn if name is not compliant
+ 2: bypass any name checking
+
+ :param: ignore_errors: if True, bypass installing invalid roles.
+
+ Our implementation aims to match ansible-galaxy's behaviour for installing
+ roles from a tarball or scm. For example ansible-galaxy will install a role
+ that has both galaxy.yml and meta/main.yml present but empty. Also missing
+ galaxy.yml is accepted but missing meta/main.yml is not.
+ """
+ yaml = None
+ galaxy_info = {}
+
+ for meta_main in META_MAIN:
+ meta_filename = Path(project_dir) / meta_main
+
+ if meta_filename.exists():
+ break
+ else:
+ if ignore_errors:
+ return
+
+ yaml = yaml_from_file(meta_filename)
+
+ if yaml and "galaxy_info" in yaml:
+ galaxy_info = yaml["galaxy_info"]
+
+ fqrn = _get_role_fqrn(galaxy_info, project_dir)
+
+ if role_name_check in [0, 1]:
+ if not re.match(r"[a-z0-9][a-z0-9_]+\.[a-z][a-z0-9_]+$", fqrn):
+ msg = MSG_INVALID_FQRL.format(fqrn)
+ if role_name_check == 1:
+ _logger.warning(msg)
+ else:
+ _logger.error(msg)
+ raise InvalidPrerequisiteError(msg)
+ elif "role_name" in galaxy_info:
+ # when 'role-name' is in skip_list, we stick to plain role names
+ role_namespace = _get_galaxy_role_ns(galaxy_info)
+ role_name = _get_galaxy_role_name(galaxy_info)
+ fqrn = f"{role_namespace}{role_name}"
+ else:
+ fqrn = Path(project_dir).absolute().name
+ path = self._get_roles_path()
+ path.mkdir(parents=True, exist_ok=True)
+ link_path = path / fqrn
+ # despite documentation stating that is_file() reports true for symlinks,
+ # it appears that is_dir() reports true instead, so we rely on exists().
+ target = Path(project_dir).absolute()
+ if not link_path.exists() or (
+ link_path.is_symlink() and link_path.readlink() != target
+ ):
+ # must call unlink before checking exists because a broken
+ # link reports as not existing and we want to repair it
+ link_path.unlink(missing_ok=True)
+ # https://github.com/python/cpython/issues/73843
+ link_path.symlink_to(str(target), target_is_directory=True)
+ _logger.info(
+ "Using %s symlink to current repository in order to enable Ansible to find the role using its expected full name.",
+ link_path,
+ )
+
+ def _update_env(self, varname: str, value: list[str], default: str = "") -> None:
+ """Update colon based environment variable if needed.
+
+ New values are prepended to make sure they take precedence.
+ """
+ if not value:
+ return
+ orig_value = self.environ.get(varname, default)
+ if orig_value:
+ value = [*value, *orig_value.split(":")]
+ value_str = ":".join(value)
+ if value_str != self.environ.get(varname, ""):
+ self.environ[varname] = value_str
+ _logger.info("Set %s=%s", varname, value_str)
+
+
+def _get_role_fqrn(galaxy_infos: dict[str, Any], project_dir: Path) -> str:
+ """Compute role fqrn."""
+ role_namespace = _get_galaxy_role_ns(galaxy_infos)
+ role_name = _get_galaxy_role_name(galaxy_infos)
+
+ if len(role_name) == 0:
+ role_name = Path(project_dir).absolute().name
+ role_name = re.sub(r"(ansible-|ansible-role-)", "", role_name).split(
+ ".",
+ maxsplit=2,
+ )[-1]
+
+ return f"{role_namespace}{role_name}"
+
+
+def _get_galaxy_role_ns(galaxy_infos: dict[str, Any]) -> str:
+ """Compute role namespace from meta/main.yml, including trailing dot."""
+ role_namespace = galaxy_infos.get("namespace", "")
+ if len(role_namespace) == 0:
+ role_namespace = galaxy_infos.get("author", "")
+ if not isinstance(role_namespace, str):
+ msg = f"Role namespace must be string, not {role_namespace}"
+ raise AnsibleCompatError(msg)
+ # if there's a space in the name space, it's likely author name
+ # and not the galaxy login, so act as if there was no namespace
+ if not role_namespace or re.match(r"^\w+ \w+", role_namespace):
+ role_namespace = ""
+ else:
+ role_namespace = f"{role_namespace}."
+ return role_namespace
+
+
+def _get_galaxy_role_name(galaxy_infos: dict[str, Any]) -> str:
+ """Compute role name from meta/main.yml."""
+ result = galaxy_infos.get("role_name", "")
+ if not isinstance(result, str):
+ return ""
+ return result
+
+
+def search_galaxy_paths(search_dir: Path) -> list[str]:
+ """Search for galaxy paths (only one level deep)."""
+ galaxy_paths: list[str] = []
+ for file in [".", *os.listdir(search_dir)]:
+ # We ignore any folders that are not valid namespaces, just like
+ # ansible galaxy does at this moment.
+ if file != "." and not namespace_re.match(file):
+ continue
+ file_path = search_dir / file / "galaxy.yml"
+ if file_path.is_file():
+ galaxy_paths.append(str(file_path))
+ return galaxy_paths
+
+
+def is_url(name: str) -> bool:
+ """Return True if a dependency name looks like an URL."""
+ return bool(re.match("^git[+@]", name))
diff --git a/src/ansible_compat/schema.py b/src/ansible_compat/schema.py
new file mode 100644
index 0000000..2950e08
--- /dev/null
+++ b/src/ansible_compat/schema.py
@@ -0,0 +1,110 @@
+"""Utils for JSON Schema validation."""
+from __future__ import annotations
+
+import json
+from collections.abc import Mapping, Sequence
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+import jsonschema
+from jsonschema.validators import validator_for
+
+if TYPE_CHECKING:
+ from ansible_compat.types import JSON
+
+
+def to_path(schema_path: Sequence[str | int]) -> str:
+ """Flatten a path to a dot delimited string.
+
+ :param schema_path: The schema path
+ :returns: The dot delimited path
+ """
+ return ".".join(str(index) for index in schema_path)
+
+
+def json_path(absolute_path: Sequence[str | int]) -> str:
+ """Flatten a data path to a dot delimited string.
+
+ :param absolute_path: The path
+ :returns: The dot delimited string
+ """
+ path = "$"
+ for elem in absolute_path:
+ if isinstance(elem, int):
+ path += "[" + str(elem) + "]"
+ else:
+ path += "." + elem
+ return path
+
+
+@dataclass(order=True)
+class JsonSchemaError:
+ # pylint: disable=too-many-instance-attributes
+ """Data structure to hold a json schema validation error."""
+
+ # order of attributes below is important for sorting
+ schema_path: str
+ data_path: str
+ json_path: str
+ message: str
+ expected: bool | int | str
+ relative_schema: str
+ validator: str
+ found: str
+
+ def to_friendly(self) -> str:
+ """Provide a friendly explanation of the error.
+
+ :returns: The error message
+ """
+ return f"In '{self.data_path}': {self.message}."
+
+
+def validate(
+ schema: JSON,
+ data: JSON,
+) -> list[JsonSchemaError]:
+ """Validate some data against a JSON schema.
+
+ :param schema: the JSON schema to use for validation
+ :param data: The data to validate
+ :returns: Any errors encountered
+ """
+ errors: list[JsonSchemaError] = []
+
+ if isinstance(schema, str):
+ schema = json.loads(schema)
+ try:
+ if not isinstance(schema, Mapping):
+ msg = "Invalid schema, must be a mapping"
+ raise jsonschema.SchemaError(msg) # noqa: TRY301
+ validator = validator_for(schema)
+ validator.check_schema(schema)
+ except jsonschema.SchemaError as exc:
+ error = JsonSchemaError(
+ message=str(exc),
+ data_path="schema sanity check",
+ json_path="",
+ schema_path="",
+ relative_schema="",
+ expected="",
+ validator="",
+ found="",
+ )
+ errors.append(error)
+ return errors
+
+ for validation_error in validator(schema).iter_errors(data):
+ if isinstance(validation_error, jsonschema.ValidationError):
+ error = JsonSchemaError(
+ message=validation_error.message,
+ data_path=to_path(validation_error.absolute_path),
+ json_path=json_path(validation_error.absolute_path),
+ schema_path=to_path(validation_error.schema_path),
+ relative_schema=validation_error.schema,
+ expected=validation_error.validator_value,
+ validator=str(validation_error.validator),
+ found=str(validation_error.instance),
+ )
+ errors.append(error)
+ return sorted(errors)
diff --git a/src/ansible_compat/types.py b/src/ansible_compat/types.py
new file mode 100644
index 0000000..4514606
--- /dev/null
+++ b/src/ansible_compat/types.py
@@ -0,0 +1,23 @@
+"""Custom types."""
+from __future__ import annotations
+
+from collections.abc import Mapping, Sequence
+from typing import Union
+
+try: # py39 does not have TypeAlias
+ from typing_extensions import TypeAlias
+except ImportError:
+ from typing import TypeAlias # type: ignore[no-redef,attr-defined]
+
+JSON: TypeAlias = Union[dict[str, "JSON"], list["JSON"], str, int, float, bool, None]
+JSON_ro: TypeAlias = Union[
+ Mapping[str, "JSON_ro"],
+ Sequence["JSON_ro"],
+ str,
+ int,
+ float,
+ bool,
+ None,
+]
+
+__all__ = ["JSON", "JSON_ro"]