diff options
Diffstat (limited to 'src/ansible_compat')
-rw-r--r-- | src/ansible_compat/__init__.py | 9 | ||||
-rw-r--r-- | src/ansible_compat/config.py | 465 | ||||
-rw-r--r-- | src/ansible_compat/constants.py | 42 | ||||
-rw-r--r-- | src/ansible_compat/errors.py | 57 | ||||
-rw-r--r-- | src/ansible_compat/loaders.py | 30 | ||||
-rw-r--r-- | src/ansible_compat/ports.py | 4 | ||||
-rw-r--r-- | src/ansible_compat/prerun.py | 21 | ||||
-rw-r--r-- | src/ansible_compat/py.typed | 0 | ||||
-rw-r--r-- | src/ansible_compat/runtime.py | 961 | ||||
-rw-r--r-- | src/ansible_compat/schema.py | 110 | ||||
-rw-r--r-- | src/ansible_compat/types.py | 23 |
11 files changed, 1722 insertions, 0 deletions
diff --git a/src/ansible_compat/__init__.py b/src/ansible_compat/__init__.py new file mode 100644 index 0000000..b23c8ca --- /dev/null +++ b/src/ansible_compat/__init__.py @@ -0,0 +1,9 @@ +"""ansible_compat package.""" +from importlib.metadata import PackageNotFoundError, version + +try: + __version__ = version("ansible-compat") +except PackageNotFoundError: # pragma: no cover + __version__ = "0.1.dev1" + +__all__ = ["__version__"] diff --git a/src/ansible_compat/config.py b/src/ansible_compat/config.py new file mode 100644 index 0000000..a0b41b7 --- /dev/null +++ b/src/ansible_compat/config.py @@ -0,0 +1,465 @@ +"""Store configuration options as a singleton.""" +from __future__ import annotations + +import ast +import copy +import os +import re +import subprocess +from collections import UserDict +from typing import Literal + +from packaging.version import Version + +from ansible_compat.constants import ANSIBLE_MIN_VERSION +from ansible_compat.errors import InvalidPrerequisiteError, MissingAnsibleError +from ansible_compat.ports import cache + + +# do not use lru_cache here, as environment can change between calls +def ansible_collections_path() -> str: + """Return collection path variable for current version of Ansible.""" + for env_var in [ + "ANSIBLE_COLLECTIONS_PATH", + "ANSIBLE_COLLECTIONS_PATHS", + ]: + if env_var in os.environ: + return env_var + return "ANSIBLE_COLLECTIONS_PATH" + + +def parse_ansible_version(stdout: str) -> Version: + """Parse output of 'ansible --version'.""" + # Ansible can produce extra output before displaying version in debug mode. + + # ansible-core 2.11+: 'ansible [core 2.11.3]' + match = re.search( + r"^ansible \[(?:core|base) (?P<version>[^\]]+)\]", + stdout, + re.MULTILINE, + ) + if match: + return Version(match.group("version")) + msg = f"Unable to parse ansible cli version: {stdout}\nKeep in mind that only {ANSIBLE_MIN_VERSION } or newer are supported." + raise InvalidPrerequisiteError(msg) + + +@cache +def ansible_version(version: str = "") -> Version: + """Return current Version object for Ansible. + + If version is not mentioned, it returns current version as detected. + When version argument is mentioned, it return converts the version string + to Version object in order to make it usable in comparisons. + """ + if version: + return Version(version) + + proc = subprocess.run( + ["ansible", "--version"], # noqa: S603 + text=True, + check=False, + capture_output=True, + ) + if proc.returncode != 0: + raise MissingAnsibleError(proc=proc) + + return parse_ansible_version(proc.stdout) + + +class AnsibleConfig(UserDict[str, object]): # pylint: disable=too-many-ancestors + """Interface to query Ansible configuration. + + This should allow user to access everything provided by `ansible-config dump` without having to parse the data himself. + """ + + _aliases = { + "COLLECTIONS_PATH": "COLLECTIONS_PATHS", # 2.9 -> 2.10 + } + # Expose some attributes to enable auto-complete in editors, based on + # https://docs.ansible.com/ansible/latest/reference_appendices/config.html + action_warnings: bool = True + agnostic_become_prompt: bool = True + allow_world_readable_tmpfiles: bool = False + ansible_connection_path: str | None = None + ansible_cow_acceptlist: list[str] + ansible_cow_path: str | None = None + ansible_cow_selection: str = "default" + ansible_force_color: bool = False + ansible_nocolor: bool = False + ansible_nocows: bool = False + ansible_pipelining: bool = False + any_errors_fatal: bool = False + become_allow_same_user: bool = False + become_plugin_path: list[str] = [ + "~/.ansible/plugins/become", + "/usr/share/ansible/plugins/become", + ] + cache_plugin: str = "memory" + cache_plugin_connection: str | None = None + cache_plugin_prefix: str = "ansible_facts" + cache_plugin_timeout: int = 86400 + callable_accept_list: list[str] = [] + callbacks_enabled: list[str] = [] + collections_on_ansible_version_mismatch: Literal["warning", "ignore"] = "warning" + collections_paths: list[str] = [ + "~/.ansible/collections", + "/usr/share/ansible/collections", + ] + collections_scan_sys_path: bool = True + color_changed: str = "yellow" + color_console_prompt: str = "white" + color_debug: str = "dark gray" + color_deprecate: str = "purple" + color_diff_add: str = "green" + color_diff_lines: str = "cyan" + color_diff_remove: str = "red" + color_error: str = "red" + color_highlight: str = "white" + color_ok: str = "green" + color_skip: str = "cyan" + color_unreachable: str = "bright red" + color_verbose: str = "blue" + color_warn: str = "bright purple" + command_warnings: bool = False + conditional_bare_vars: bool = False + connection_facts_modules: dict[str, str] + controller_python_warning: bool = True + coverage_remote_output: str | None + coverage_remote_paths: list[str] + default_action_plugin_path: list[str] = [ + "~/.ansible/plugins/action", + "/usr/share/ansible/plugins/action", + ] + default_allow_unsafe_lookups: bool = False + default_ask_pass: bool = False + default_ask_vault_pass: bool = False + default_become: bool = False + default_become_ask_pass: bool = False + default_become_exe: str | None = None + default_become_flags: str + default_become_method: str = "sudo" + default_become_user: str = "root" + default_cache_plugin_path: list[str] = [ + "~/.ansible/plugins/cache", + "/usr/share/ansible/plugins/cache", + ] + default_callback_plugin_path: list[str] = [ + "~/.ansible/plugins/callback", + "/usr/share/ansible/plugins/callback", + ] + default_cliconf_plugin_path: list[str] = [ + "~/.ansible/plugins/cliconf", + "/usr/share/ansible/plugins/cliconf", + ] + default_connection_plugin_path: list[str] = [ + "~/.ansible/plugins/connection", + "/usr/share/ansible/plugins/connection", + ] + default_debug: bool = False + default_executable: str = "/bin/sh" + default_fact_path: str | None = None + default_filter_plugin_path: list[str] = [ + "~/.ansible/plugins/filter", + "/usr/share/ansible/plugins/filter", + ] + default_force_handlers: bool = False + default_forks: int = 5 + default_gathering: Literal["smart", "explicit", "implicit"] = "smart" + default_gather_subset: list[str] = ["all"] + default_gather_timeout: int = 10 + default_handler_includes_static: bool = False + default_hash_behaviour: str = "replace" + default_host_list: list[str] = ["/etc/ansible/hosts"] + default_httpapi_plugin_path: list[str] = [ + "~/.ansible/plugins/httpapi", + "/usr/share/ansible/plugins/httpapi", + ] + default_internal_poll_interval: float = 0.001 + default_inventory_plugin_path: list[str] = [ + "~/.ansible/plugins/inventory", + "/usr/share/ansible/plugins/inventory", + ] + default_jinja2_extensions: list[str] = [] + default_jinja2_native: bool = False + default_keep_remote_files: bool = False + default_libvirt_lxc_noseclabel: bool = False + default_load_callback_plugins: bool = False + default_local_tmp: str = "~/.ansible/tmp" + default_log_filter: list[str] = [] + default_log_path: str | None = None + default_lookup_lugin_path: list[str] = [ + "~/.ansible/plugins/lookup", + "/usr/share/ansible/plugins/lookup", + ] + default_managed_str: str = "Ansible managed" + default_module_args: str + default_module_compression: str = "ZIP_DEFLATED" + default_module_name: str = "command" + default_module_path: list[str] = [ + "~/.ansible/plugins/modules", + "/usr/share/ansible/plugins/modules", + ] + default_module_utils_path: list[str] = [ + "~/.ansible/plugins/module_utils", + "/usr/share/ansible/plugins/module_utils", + ] + default_netconf_plugin_path: list[str] = [ + "~/.ansible/plugins/netconf", + "/usr/share/ansible/plugins/netconf", + ] + default_no_log: bool = False + default_no_target_syslog: bool = False + default_null_representation: str | None = None + default_poll_interval: int = 15 + default_private_key_file: str | None = None + default_private_role_vars: bool = False + default_remote_port: str | None = None + default_remote_user: str | None = None + # https://docs.ansible.com/ansible/latest/reference_appendices/config.html#collections-paths + default_collections_path: list[str] = [ + "~/.ansible/collections", + "/usr/share/ansible/collections", + ] + default_roles_path: list[str] = [ + "~/.ansible/roles", + "/usr/share/ansible/roles", + "/etc/ansible/roles", + ] + default_selinux_special_fs: list[str] = [ + "fuse", + "nfs", + "vboxsf", + "ramfs", + "9p", + "vfat", + ] + default_stdout_callback: str = "default" + default_strategy: str = "linear" + default_strategy_plugin_path: list[str] = [ + "~/.ansible/plugins/strategy", + "/usr/share/ansible/plugins/strategy", + ] + default_su: bool = False + default_syslog_facility: str = "LOG_USER" + default_task_includes_static: bool = False + default_terminal_plugin_path: list[str] = [ + "~/.ansible/plugins/terminal", + "/usr/share/ansible/plugins/terminal", + ] + default_test_plugin_path: list[str] = [ + "~/.ansible/plugins/test", + "/usr/share/ansible/plugins/test", + ] + default_timeout: int = 10 + default_transport: str = "smart" + default_undefined_var_behavior: bool = True + default_vars_plugin_path: list[str] = [ + "~/.ansible/plugins/vars", + "/usr/share/ansible/plugins/vars", + ] + default_vault_encrypt_identity: str | None = None + default_vault_identity: str = "default" + default_vault_identity_list: list[str] = [] + default_vault_id_match: bool = False + default_vault_password_file: str | None = None + default_verbosity: int = 0 + deprecation_warnings: bool = False + devel_warning: bool = True + diff_always: bool = False + diff_context: int = 3 + display_args_to_stdout: bool = False + display_skipped_hosts: bool = True + docsite_root_url: str = "https://docs.ansible.com/ansible/" + doc_fragment_plugin_path: list[str] = [ + "~/.ansible/plugins/doc_fragments", + "/usr/share/ansible/plugins/doc_fragments", + ] + duplicate_yaml_dict_key: Literal["warn", "error", "ignore"] = "warn" + enable_task_debugger: bool = False + error_on_missing_handler: bool = True + facts_modules: list[str] = ["smart"] + galaxy_cache_dir: str = "~/.ansible/galaxy_cache" + galaxy_display_progress: str | None = None + galaxy_ignore_certs: bool = False + galaxy_role_skeleton: str | None = None + galaxy_role_skeleton_ignore: list[str] = ["^.git$", "^.*/.git_keep$"] + galaxy_server: str = "https://galaxy.ansible.com" + galaxy_server_list: str | None = None + galaxy_token_path: str = "~/.ansible/galaxy_token" + host_key_checking: bool = True + host_pattern_mismatch: Literal["warning", "error", "ignore"] = "warning" + inject_facts_as_vars: bool = True + interpreter_python: str = "auto_legacy" + interpreter_python_distro_map: dict[str, str] + interpreter_python_fallback: list[str] + invalid_task_attribute_failed: bool = True + inventory_any_unparsed_is_failed: bool = False + inventory_cache_enabled: bool = False + inventory_cache_plugin: str | None = None + inventory_cache_plugin_connection: str | None = None + inventory_cache_plugin_prefix: str = "ansible_facts" + inventory_cache_timeout: int = 3600 + inventory_enabled: list[str] = [ + "host_list", + "script", + "auto", + "yaml", + "ini", + "toml", + ] + inventory_export: bool = False + inventory_ignore_exts: str + inventory_ignore_patterns: list[str] = [] + inventory_unparsed_is_failed: bool = False + localhost_warning: bool = True + max_file_size_for_diff: int = 104448 + module_ignore_exts: str + netconf_ssh_config: str | None = None + network_group_modules: list[str] = [ + "eos", + "nxos", + "ios", + "iosxr", + "junos", + "enos", + "ce", + "vyos", + "sros", + "dellos9", + "dellos10", + "dellos6", + "asa", + "aruba", + "aireos", + "bigip", + "ironware", + "onyx", + "netconf", + "exos", + "voss", + "slxos", + ] + old_plugin_cache_clearing: bool = False + paramiko_host_key_auto_add: bool = False + paramiko_look_for_keys: bool = True + persistent_command_timeout: int = 30 + persistent_connect_retry_timeout: int = 15 + persistent_connect_timeout: int = 30 + persistent_control_path_dir: str = "~/.ansible/pc" + playbook_dir: str | None + playbook_vars_root: Literal["top", "bottom", "all"] = "top" + plugin_filters_cfg: str | None = None + python_module_rlimit_nofile: int = 0 + retry_files_enabled: bool = False + retry_files_save_path: str | None = None + run_vars_plugins: str = "demand" + show_custom_stats: bool = False + string_conversion_action: Literal["warn", "error", "ignore"] = "warn" + string_type_filters: list[str] = [ + "string", + "to_json", + "to_nice_json", + "to_yaml", + "to_nice_yaml", + "ppretty", + "json", + ] + system_warnings: bool = True + tags_run: list[str] = [] + tags_skip: list[str] = [] + task_debugger_ignore_errors: bool = True + task_timeout: int = 0 + transform_invalid_group_chars: Literal[ + "always", + "never", + "ignore", + "silently", + ] = "never" + use_persistent_connections: bool = False + variable_plugins_enabled: list[str] = ["host_group_vars"] + variable_precedence: list[str] = [ + "all_inventory", + "groups_inventory", + "all_plugins_inventory", + "all_plugins_play", + "groups_plugins_inventory", + "groups_plugins_play", + ] + verbose_to_stderr: bool = False + win_async_startup_timeout: int = 5 + worker_shutdown_poll_count: int = 0 + worker_shutdown_poll_delay: float = 0.1 + yaml_filename_extensions: list[str] = [".yml", ".yaml", ".json"] + + def __init__( + self, + config_dump: str | None = None, + data: dict[str, object] | None = None, + ) -> None: + """Load config dictionary.""" + super().__init__() + + if data: + self.data = copy.deepcopy(data) + return + + if not config_dump: + env = os.environ.copy() + # Avoid possible ANSI garbage + env["ANSIBLE_FORCE_COLOR"] = "0" + config_dump = subprocess.check_output( + ["ansible-config", "dump"], # noqa: S603 + universal_newlines=True, + env=env, + ) + + for match in re.finditer( + r"^(?P<key>[A-Za-z0-9_]+).* = (?P<value>.*)$", + config_dump, + re.MULTILINE, + ): + key = match.groupdict()["key"] + value = match.groupdict()["value"] + try: + self[key] = ast.literal_eval(value) + except (NameError, SyntaxError, ValueError): + self[key] = value + + def __getattribute__(self, attr_name: str) -> object: + """Allow access of config options as attributes.""" + _dict = super().__dict__ # pylint: disable=no-member + if attr_name in _dict: + return _dict[attr_name] + + data = super().__getattribute__("data") + if attr_name == "data": # pragma: no cover + return data + + name = attr_name.upper() + if name in data: + return data[name] + if name in AnsibleConfig._aliases: + return data[AnsibleConfig._aliases[name]] + + return super().__getattribute__(attr_name) + + def __getitem__(self, name: str) -> object: + """Allow access to config options using indexing.""" + return super().__getitem__(name.upper()) + + def __copy__(self) -> AnsibleConfig: + """Allow users to run copy on Config.""" + return AnsibleConfig(data=self.data) + + def __deepcopy__(self, memo: object) -> AnsibleConfig: + """Allow users to run deeepcopy on Config.""" + return AnsibleConfig(data=self.data) + + +__all__ = [ + "ansible_collections_path", + "parse_ansible_version", + "ansible_version", + "AnsibleConfig", +] diff --git a/src/ansible_compat/constants.py b/src/ansible_compat/constants.py new file mode 100644 index 0000000..f3d7866 --- /dev/null +++ b/src/ansible_compat/constants.py @@ -0,0 +1,42 @@ +"""Constants used by ansible_compat.""" + +from pathlib import Path + +META_MAIN = (Path("meta") / Path("main.yml"), Path("meta") / Path("main.yaml")) +REQUIREMENT_LOCATIONS = [ + "requirements.yml", + "roles/requirements.yml", + "collections/requirements.yml", + # These is more of less the official way to store test requirements in collections so far, comments shows number of repos using this reported by https://sourcegraph.com/ at the time of writing + "tests/requirements.yml", # 170 + "tests/integration/requirements.yml", # 3 + "tests/unit/requirements.yml", # 1 +] + +# Minimal version of Ansible we support for runtime +ANSIBLE_MIN_VERSION = "2.12" + +# Based on https://docs.ansible.com/ansible/latest/reference_appendices/config.html +ANSIBLE_DEFAULT_ROLES_PATH = ( + "~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles" +) + +INVALID_CONFIG_RC = 2 +ANSIBLE_MISSING_RC = 4 +INVALID_PREREQUISITES_RC = 10 + +MSG_INVALID_FQRL = """\ +Computed fully qualified role name of {0} does not follow current galaxy requirements. +Please edit meta/main.yml and assure we can correctly determine full role name: + +galaxy_info: +role_name: my_name # if absent directory name hosting role is used instead +namespace: my_galaxy_namespace # if absent, author is used instead + +Namespace: https://galaxy.ansible.com/docs/contributing/namespaces.html#galaxy-namespace-limitations +Role: https://galaxy.ansible.com/docs/contributing/creating_role.html#role-names + +As an alternative, you can add 'role-name' to either skip_list or warn_list. +""" + +RC_ANSIBLE_OPTIONS_ERROR = 5 diff --git a/src/ansible_compat/errors.py b/src/ansible_compat/errors.py new file mode 100644 index 0000000..6369412 --- /dev/null +++ b/src/ansible_compat/errors.py @@ -0,0 +1,57 @@ +"""Module to deal with errors.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from ansible_compat.constants import ANSIBLE_MISSING_RC, INVALID_PREREQUISITES_RC + +if TYPE_CHECKING: + from subprocess import CompletedProcess + + +class AnsibleCompatError(RuntimeError): + """Generic error originating from ansible_compat library.""" + + code = 1 # generic error + + def __init__( + self, + message: str | None = None, + proc: CompletedProcess[Any] | None = None, + ) -> None: + """Construct generic library exception.""" + super().__init__(message) + self.proc = proc + + +class AnsibleCommandError(RuntimeError): + """Exception running an Ansible command.""" + + def __init__(self, proc: CompletedProcess[Any]) -> None: + """Construct an exception given a completed process.""" + message = ( + f"Got {proc.returncode} exit code while running: {' '.join(proc.args)}" + ) + super().__init__(message) + self.proc = proc + + +class MissingAnsibleError(AnsibleCompatError): + """Reports a missing or broken Ansible installation.""" + + code = ANSIBLE_MISSING_RC + + def __init__( + self, + message: str | None = "Unable to find a working copy of ansible executable.", + proc: CompletedProcess[Any] | None = None, + ) -> None: + """.""" + super().__init__(message) + self.proc = proc + + +class InvalidPrerequisiteError(AnsibleCompatError): + """Reports a missing requirement.""" + + code = INVALID_PREREQUISITES_RC diff --git a/src/ansible_compat/loaders.py b/src/ansible_compat/loaders.py new file mode 100644 index 0000000..d2ae080 --- /dev/null +++ b/src/ansible_compat/loaders.py @@ -0,0 +1,30 @@ +"""Utilities for loading various files.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import yaml + +from ansible_compat.errors import InvalidPrerequisiteError + +if TYPE_CHECKING: + from pathlib import Path + + +def yaml_from_file(path: Path) -> Any: # noqa: ANN401 + """Return a loaded YAML file.""" + with path.open(encoding="utf-8") as content: + return yaml.load(content, Loader=yaml.SafeLoader) + + +def colpath_from_path(path: Path) -> str | None: + """Return a FQCN from a path.""" + galaxy_file = path / "galaxy.yml" + if galaxy_file.exists(): + galaxy = yaml_from_file(galaxy_file) + for k in ("namespace", "name"): + if k not in galaxy: + msg = f"{galaxy_file} is missing the following mandatory field {k}" + raise InvalidPrerequisiteError(msg) + return f"{galaxy['namespace']}/{galaxy['name']}" + return None diff --git a/src/ansible_compat/ports.py b/src/ansible_compat/ports.py new file mode 100644 index 0000000..9c46ae6 --- /dev/null +++ b/src/ansible_compat/ports.py @@ -0,0 +1,4 @@ +"""Portability helpers.""" +from functools import cache, cached_property + +__all__ = ["cache", "cached_property"] diff --git a/src/ansible_compat/prerun.py b/src/ansible_compat/prerun.py new file mode 100644 index 0000000..6dfa44f --- /dev/null +++ b/src/ansible_compat/prerun.py @@ -0,0 +1,21 @@ +"""Utilities for configuring ansible runtime environment.""" +import hashlib +import os +from pathlib import Path + + +def get_cache_dir(project_dir: Path) -> Path: + """Compute cache directory to be used based on project path.""" + # we only use the basename instead of the full path in order to ensure that + # we would use the same key regardless the location of the user home + # directory or where the project is clones (as long the project folder uses + # the same name). + basename = project_dir.resolve().name.encode(encoding="utf-8") + # 6 chars of entropy should be enough + cache_key = hashlib.sha256(basename).hexdigest()[:6] + cache_dir = ( + Path(os.getenv("XDG_CACHE_HOME", "~/.cache")).expanduser() + / "ansible-compat" + / cache_key + ) + return cache_dir diff --git a/src/ansible_compat/py.typed b/src/ansible_compat/py.typed new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/ansible_compat/py.typed diff --git a/src/ansible_compat/runtime.py b/src/ansible_compat/runtime.py new file mode 100644 index 0000000..ad81132 --- /dev/null +++ b/src/ansible_compat/runtime.py @@ -0,0 +1,961 @@ +"""Ansible runtime environment manager.""" +from __future__ import annotations + +import contextlib +import importlib +import json +import logging +import os +import re +import shutil +import subprocess +import sys +import warnings +from collections import OrderedDict +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, no_type_check + +import subprocess_tee +from packaging.version import Version + +from ansible_compat.config import ( + AnsibleConfig, + ansible_collections_path, + ansible_version, + parse_ansible_version, +) +from ansible_compat.constants import ( + META_MAIN, + MSG_INVALID_FQRL, + RC_ANSIBLE_OPTIONS_ERROR, + REQUIREMENT_LOCATIONS, +) +from ansible_compat.errors import ( + AnsibleCommandError, + AnsibleCompatError, + InvalidPrerequisiteError, + MissingAnsibleError, +) +from ansible_compat.loaders import colpath_from_path, yaml_from_file +from ansible_compat.prerun import get_cache_dir + +if TYPE_CHECKING: + # https://github.com/PyCQA/pylint/issues/3240 + # pylint: disable=unsubscriptable-object + CompletedProcess = subprocess.CompletedProcess[Any] +else: + CompletedProcess = subprocess.CompletedProcess + + +_logger = logging.getLogger(__name__) +# regex to extract the first version from a collection range specifier +version_re = re.compile(":[>=<]*([^,]*)") +namespace_re = re.compile("^[a-z][a-z0-9_]+$") + + +class AnsibleWarning(Warning): + """Warnings related to Ansible runtime.""" + + +@dataclass +class Collection: + """Container for Ansible collection information.""" + + name: str + version: str + path: Path + + +class CollectionVersion(Version): + """Collection version.""" + + def __init__(self, version: str) -> None: + """Initialize collection version.""" + # As packaging Version class does not support wildcard, we convert it + # to "0", as this being the smallest version possible. + if version == "*": + version = "0" + super().__init__(version) + + +@dataclass +class Plugins: # pylint: disable=too-many-instance-attributes + """Dataclass to access installed Ansible plugins, uses ansible-doc to retrieve them.""" + + runtime: Runtime + become: dict[str, str] = field(init=False) + cache: dict[str, str] = field(init=False) + callback: dict[str, str] = field(init=False) + cliconf: dict[str, str] = field(init=False) + connection: dict[str, str] = field(init=False) + httpapi: dict[str, str] = field(init=False) + inventory: dict[str, str] = field(init=False) + lookup: dict[str, str] = field(init=False) + netconf: dict[str, str] = field(init=False) + shell: dict[str, str] = field(init=False) + vars: dict[str, str] = field(init=False) # noqa: A003 + module: dict[str, str] = field(init=False) + strategy: dict[str, str] = field(init=False) + test: dict[str, str] = field(init=False) + filter: dict[str, str] = field(init=False) # noqa: A003 + role: dict[str, str] = field(init=False) + keyword: dict[str, str] = field(init=False) + + @no_type_check + def __getattribute__(self, attr: str): # noqa: ANN204 + """Get attribute.""" + if attr in { + "become", + "cache", + "callback", + "cliconf", + "connection", + "httpapi", + "inventory", + "lookup", + "netconf", + "shell", + "vars", + "module", + "strategy", + "test", + "filter", + "role", + "keyword", + }: + try: + result = super().__getattribute__(attr) + except AttributeError as exc: + if ansible_version() < Version("2.14") and attr in {"filter", "test"}: + msg = "Ansible version below 2.14 does not support retrieving filter and test plugins." + raise RuntimeError(msg) from exc + proc = self.runtime.run( + ["ansible-doc", "--json", "-l", "-t", attr], + ) + data = json.loads(proc.stdout) + if not isinstance(data, dict): # pragma: no cover + msg = "Unexpected output from ansible-doc" + raise AnsibleCompatError(msg) from exc + result = data + else: + result = super().__getattribute__(attr) + + return result + + +# pylint: disable=too-many-instance-attributes +class Runtime: + """Ansible Runtime manager.""" + + _version: Version | None = None + collections: OrderedDict[str, Collection] = OrderedDict() + cache_dir: Path | None = None + # Used to track if we have already initialized the Ansible runtime as attempts + # to do it multiple tilmes will cause runtime warnings from within ansible-core + initialized: bool = False + plugins: Plugins + + def __init__( + self, + project_dir: Path | None = None, + *, + isolated: bool = False, + min_required_version: str | None = None, + require_module: bool = False, + max_retries: int = 0, + environ: dict[str, str] | None = None, + verbosity: int = 0, + ) -> None: + """Initialize Ansible runtime environment. + + :param project_dir: The directory containing the Ansible project. If + not mentioned it will be guessed from the current + working directory. + :param isolated: Assure that installation of collections or roles + does not affect Ansible installation, an unique cache + directory being used instead. + :param min_required_version: Minimal version of Ansible required. If + not found, a :class:`RuntimeError` + exception is raised. + :param require_module: If set, instantiation will fail if Ansible + Python module is missing or is not matching + the same version as the Ansible command line. + That is useful for consumers that expect to + also perform Python imports from Ansible. + :param max_retries: Number of times it should retry network operations. + Default is 0, no retries. + :param environ: Environment dictionary to use, if undefined + ``os.environ`` will be copied and used. + :param verbosity: Verbosity level to use. + """ + self.project_dir = project_dir or Path.cwd() + self.isolated = isolated + self.max_retries = max_retries + self.environ = environ or os.environ.copy() + self.plugins = Plugins(runtime=self) + self.verbosity = verbosity + + self.initialize_logger(level=self.verbosity) + + # Reduce noise from paramiko, unless user already defined PYTHONWARNINGS + # paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated + # https://github.com/paramiko/paramiko/issues/2038 + # As CryptographyDeprecationWarning is not a builtin, we cannot use + # PYTHONWARNINGS to ignore it using category but we can use message. + # https://stackoverflow.com/q/68251969/99834 + if "PYTHONWARNINGS" not in self.environ: # pragma: no cover + self.environ["PYTHONWARNINGS"] = "ignore:Blowfish has been deprecated" + + if isolated: + self.cache_dir = get_cache_dir(self.project_dir) + self.config = AnsibleConfig() + + # Add the sys.path to the collection paths if not isolated + self._add_sys_path_to_collection_paths() + + if not self.version_in_range(lower=min_required_version): + msg = f"Found incompatible version of ansible runtime {self.version}, instead of {min_required_version} or newer." + raise RuntimeError(msg) + if require_module: + self._ensure_module_available() + + # pylint: disable=import-outside-toplevel + from ansible.utils.display import Display + + # pylint: disable=unused-argument + def warning( + self: Display, # noqa: ARG001 + msg: str, + *, + formatted: bool = False, # noqa: ARG001 + ) -> None: + """Override ansible.utils.display.Display.warning to avoid printing warnings.""" + warnings.warn( + message=msg, + category=AnsibleWarning, + stacklevel=2, + source={"msg": msg}, + ) + + # Monkey patch ansible warning in order to use warnings module. + Display.warning = warning + + def initialize_logger(self, level: int = 0) -> None: + """Set up the global logging level based on the verbosity number.""" + verbosity_map = { + -2: logging.CRITICAL, + -1: logging.ERROR, + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + # Unknown logging level is treated as DEBUG + logging_level = verbosity_map.get(level, logging.DEBUG) + _logger.setLevel(logging_level) + # Use module-level _logger instance to validate it + _logger.debug("Logging initialized to level %s", logging_level) + + def _add_sys_path_to_collection_paths(self) -> None: + """Add the sys.path to the collection paths.""" + if self.config.collections_scan_sys_path: + for path in sys.path: + if ( + path not in self.config.collections_paths + and (Path(path) / "ansible_collections").is_dir() + ): + self.config.collections_paths.append( # pylint: disable=E1101 + path, + ) + + def load_collections(self) -> None: + """Load collection data.""" + self.collections = OrderedDict() + no_collections_msg = "None of the provided paths were usable" + + proc = self.run(["ansible-galaxy", "collection", "list", "--format=json"]) + if proc.returncode == RC_ANSIBLE_OPTIONS_ERROR and ( + no_collections_msg in proc.stdout or no_collections_msg in proc.stderr + ): + _logger.debug("Ansible reported no installed collections at all.") + return + if proc.returncode != 0: + _logger.error(proc) + msg = f"Unable to list collections: {proc}" + raise RuntimeError(msg) + data = json.loads(proc.stdout) + if not isinstance(data, dict): + msg = f"Unexpected collection data, {data}" + raise TypeError(msg) + for path in data: + for collection, collection_info in data[path].items(): + if not isinstance(collection, str): + msg = f"Unexpected collection data, {collection}" + raise TypeError(msg) + if not isinstance(collection_info, dict): + msg = f"Unexpected collection data, {collection_info}" + raise TypeError(msg) + + self.collections[collection] = Collection( + name=collection, + version=collection_info["version"], + path=path, + ) + + def _ensure_module_available(self) -> None: + """Assure that Ansible Python module is installed and matching CLI version.""" + ansible_release_module = None + with contextlib.suppress(ModuleNotFoundError, ImportError): + ansible_release_module = importlib.import_module("ansible.release") + + if ansible_release_module is None: + msg = "Unable to find Ansible python module." + raise RuntimeError(msg) + + ansible_module_version = Version( + ansible_release_module.__version__, + ) + if ansible_module_version != self.version: + msg = f"Ansible CLI ({self.version}) and python module ({ansible_module_version}) versions do not match. This indicates a broken execution environment." + raise RuntimeError(msg) + + # For ansible 2.15+ we need to initialize the plugin loader + # https://github.com/ansible/ansible-lint/issues/2945 + if not Runtime.initialized: + col_path = [f"{self.cache_dir}/collections"] + if self.version >= Version("2.15.0.dev0"): + # pylint: disable=import-outside-toplevel,no-name-in-module + from ansible.plugins.loader import init_plugin_loader + + init_plugin_loader(col_path) + else: + # noinspection PyProtectedMember + from ansible.utils.collection_loader._collection_finder import ( # pylint: disable=import-outside-toplevel + _AnsibleCollectionFinder, + ) + + # noinspection PyProtectedMember + # pylint: disable=protected-access + col_path += self.config.collections_paths + col_path += os.path.dirname( # noqa: PTH120 + os.environ.get(ansible_collections_path(), "."), + ).split(":") + _AnsibleCollectionFinder( # noqa: SLF001 + paths=col_path, + )._install() # pylint: disable=protected-access + Runtime.initialized = True + + def clean(self) -> None: + """Remove content of cache_dir.""" + if self.cache_dir: + shutil.rmtree(self.cache_dir, ignore_errors=True) + + def run( # ruff: disable=PLR0913 + self, + args: str | list[str], + *, + retry: bool = False, + tee: bool = False, + env: dict[str, str] | None = None, + cwd: Path | None = None, + ) -> CompletedProcess: + """Execute a command inside an Ansible environment. + + :param retry: Retry network operations on failures. + :param tee: Also pass captured stdout/stderr to system while running. + """ + if tee: + run_func: Callable[..., CompletedProcess] = subprocess_tee.run + else: + run_func = subprocess.run + env = self.environ if env is None else env.copy() + # Presence of ansible debug variable or config option will prevent us + # from parsing its JSON output due to extra debug messages on stdout. + env["ANSIBLE_DEBUG"] = "0" + + # https://github.com/ansible/ansible-lint/issues/3522 + env["ANSIBLE_VERBOSE_TO_STDERR"] = "True" + + for _ in range(self.max_retries + 1 if retry else 1): + result = run_func( + args, + universal_newlines=True, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd=str(cwd) if cwd else None, + ) + if result.returncode == 0: + break + _logger.debug("Environment: %s", env) + if retry: + _logger.warning( + "Retrying execution failure %s of: %s", + result.returncode, + " ".join(args), + ) + return result + + @property + def version(self) -> Version: + """Return current Version object for Ansible. + + If version is not mentioned, it returns current version as detected. + When version argument is mentioned, it return converts the version string + to Version object in order to make it usable in comparisons. + """ + if self._version: + return self._version + + proc = self.run(["ansible", "--version"]) + if proc.returncode == 0: + self._version = parse_ansible_version(proc.stdout) + return self._version + + msg = "Unable to find a working copy of ansible executable." + raise MissingAnsibleError(msg, proc=proc) + + def version_in_range( + self, + lower: str | None = None, + upper: str | None = None, + ) -> bool: + """Check if Ansible version is inside a required range. + + The lower limit is inclusive and the upper one exclusive. + """ + if lower and self.version < Version(lower): + return False + if upper and self.version >= Version(upper): + return False + return True + + def install_collection( + self, + collection: str | Path, + *, + destination: Path | None = None, + force: bool = False, + ) -> None: + """Install an Ansible collection. + + Can accept arguments like: + 'foo.bar:>=1.2.3' + 'git+https://github.com/ansible-collections/ansible.posix.git,main' + """ + cmd = [ + "ansible-galaxy", + "collection", + "install", + "-vvv", # this is needed to make ansible display important info in case of failures + ] + if force: + cmd.append("--force") + + if isinstance(collection, Path): + collection = str(collection) + # As ansible-galaxy install is not able to automatically determine + # if the range requires a pre-release, we need to manually add the --pre + # flag when needed. + matches = version_re.search(collection) + + if ( + not is_url(collection) + and matches + and CollectionVersion(matches[1]).is_prerelease + ): + cmd.append("--pre") + + cpaths: list[str] = self.config.collections_paths + if destination and str(destination) not in cpaths: + # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so + # we hack ansible_collections_path instead and inject our own path there. + # pylint: disable=no-member + cpaths.insert(0, str(destination)) + cmd.append(f"{collection}") + + _logger.info("Running from %s : %s", Path.cwd(), " ".join(cmd)) + process = self.run( + cmd, + retry=True, + env={**self.environ, ansible_collections_path(): ":".join(cpaths)}, + ) + if process.returncode != 0: + msg = f"Command returned {process.returncode} code:\n{process.stdout}\n{process.stderr}" + _logger.error(msg) + raise InvalidPrerequisiteError(msg) + + def install_collection_from_disk( + self, + path: Path, + destination: Path | None = None, + ) -> None: + """Build and install collection from a given disk path.""" + self.install_collection(path, destination=destination, force=True) + + # pylint: disable=too-many-branches + def install_requirements( # noqa: C901 + self, + requirement: Path, + *, + retry: bool = False, + offline: bool = False, + ) -> None: + """Install dependencies from a requirements.yml. + + :param requirement: path to requirements.yml file + :param retry: retry network operations on failures + :param offline: bypass installation, may fail if requirements are not met. + """ + if not Path(requirement).exists(): + return + reqs_yaml = yaml_from_file(Path(requirement)) + if not isinstance(reqs_yaml, (dict, list)): + msg = f"{requirement} file is not a valid Ansible requirements file." + raise InvalidPrerequisiteError(msg) + + if isinstance(reqs_yaml, dict): + for key in reqs_yaml: + if key not in ("roles", "collections"): + msg = f"{requirement} file is not a valid Ansible requirements file. Only 'roles' and 'collections' keys are allowed at root level. Recognized valid locations are: {', '.join(REQUIREMENT_LOCATIONS)}" + raise InvalidPrerequisiteError(msg) + + if isinstance(reqs_yaml, list) or "roles" in reqs_yaml: + cmd = [ + "ansible-galaxy", + "role", + "install", + "-r", + f"{requirement}", + ] + if self.verbosity > 0: + cmd.extend(["-" + ("v" * self.verbosity)]) + if self.cache_dir: + cmd.extend(["--roles-path", f"{self.cache_dir}/roles"]) + + if offline: + _logger.warning( + "Skipped installing old role dependencies due to running in offline mode.", + ) + else: + _logger.info("Running %s", " ".join(cmd)) + + result = self.run(cmd, retry=retry) + _logger.debug(result.stdout) + if result.returncode != 0: + _logger.error(result.stderr) + raise AnsibleCommandError(result) + + # Run galaxy collection install works on v2 requirements.yml + if "collections" in reqs_yaml and reqs_yaml["collections"] is not None: + cmd = [ + "ansible-galaxy", + "collection", + "install", + ] + if self.verbosity > 0: + cmd.extend(["-" + ("v" * self.verbosity)]) + + for collection in reqs_yaml["collections"]: + if isinstance(collection, dict) and collection.get("type", "") == "git": + _logger.info( + "Adding '--pre' to ansible-galaxy collection install because we detected one collection being sourced from git.", + ) + cmd.append("--pre") + break + if offline: + _logger.warning( + "Skipped installing collection dependencies due to running in offline mode.", + ) + else: + cmd.extend(["-r", str(requirement)]) + cpaths = self.config.collections_paths + if self.cache_dir: + # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so + # we hack ansible_collections_path instead and inject our own path there. + dest_path = f"{self.cache_dir}/collections" + if dest_path not in cpaths: + # pylint: disable=no-member + cpaths.insert(0, dest_path) + _logger.info("Running %s", " ".join(cmd)) + result = self.run( + cmd, + retry=retry, + env={**os.environ, "ANSIBLE_COLLECTIONS_PATH": ":".join(cpaths)}, + ) + _logger.debug(result.stdout) + if result.returncode != 0: + _logger.error(result.stderr) + raise AnsibleCommandError(result) + + def prepare_environment( # noqa: C901 + self, + required_collections: dict[str, str] | None = None, + *, + retry: bool = False, + install_local: bool = False, + offline: bool = False, + role_name_check: int = 0, + ) -> None: + """Make dependencies available if needed.""" + destination: Path | None = None + if required_collections is None: + required_collections = {} + + # first one is standard for collection layout repos and the last two + # are part of Tower specification + # https://docs.ansible.com/ansible-tower/latest/html/userguide/projects.html#ansible-galaxy-support + # https://docs.ansible.com/ansible-tower/latest/html/userguide/projects.html#collections-support + for req_file in REQUIREMENT_LOCATIONS: + self.install_requirements(Path(req_file), retry=retry, offline=offline) + + self._prepare_ansible_paths() + + if not install_local: + return + + for gpath in search_galaxy_paths(self.project_dir): + # processing all found galaxy.yml files + galaxy_path = Path(gpath) + if galaxy_path.exists(): + data = yaml_from_file(galaxy_path) + if isinstance(data, dict) and "dependencies" in data: + for name, required_version in data["dependencies"].items(): + _logger.info( + "Provisioning collection %s:%s from galaxy.yml", + name, + required_version, + ) + self.install_collection( + f"{name}{',' if is_url(name) else ':'}{required_version}", + destination=destination, + ) + + if self.cache_dir: + destination = self.cache_dir / "collections" + for name, min_version in required_collections.items(): + self.install_collection( + f"{name}:>={min_version}", + destination=destination, + ) + + if (self.project_dir / "galaxy.yml").exists(): + if destination: + # while function can return None, that would not break the logic + colpath = Path( + f"{destination}/ansible_collections/{colpath_from_path(self.project_dir)}", + ) + if colpath.is_symlink(): + if os.path.realpath(colpath) == str(Path.cwd()): + _logger.warning( + "Found symlinked collection, skipping its installation.", + ) + return + _logger.warning( + "Collection is symlinked, but not pointing to %s directory, so we will remove it.", + Path.cwd(), + ) + colpath.unlink() + + # molecule scenario within a collection + self.install_collection_from_disk( + galaxy_path.parent, + destination=destination, + ) + elif ( + Path().resolve().parent.name == "roles" + and Path("../../galaxy.yml").exists() + ): + # molecule scenario located within roles/<role-name>/molecule inside + # a collection + self.install_collection_from_disk( + Path("../.."), + destination=destination, + ) + else: + # no collection, try to recognize and install a standalone role + self._install_galaxy_role( + self.project_dir, + role_name_check=role_name_check, + ignore_errors=True, + ) + # reload collections + self.load_collections() + + def require_collection( + self, + name: str, + version: str | None = None, + *, + install: bool = True, + ) -> tuple[CollectionVersion, Path]: + """Check if a minimal collection version is present or exits. + + In the future this method may attempt to install a missing or outdated + collection before failing. + + :param name: collection name + :param version: minimal version required + :param install: if True, attempt to install a missing collection + :returns: tuple of (found_version, collection_path) + """ + try: + ns, coll = name.split(".", 1) + except ValueError as exc: + msg = f"Invalid collection name supplied: {name}%s" + raise InvalidPrerequisiteError( + msg, + ) from exc + + paths: list[str] = self.config.collections_paths + if not paths or not isinstance(paths, list): + msg = f"Unable to determine ansible collection paths. ({paths})" + raise InvalidPrerequisiteError( + msg, + ) + + if self.cache_dir: + # if we have a cache dir, we want to be use that would be preferred + # destination when installing a missing collection + # https://github.com/PyCQA/pylint/issues/4667 + paths.insert(0, f"{self.cache_dir}/collections") # pylint: disable=E1101 + + for path in paths: + collpath = Path(path) / "ansible_collections" / ns / coll + if collpath.exists(): + mpath = collpath / "MANIFEST.json" + if not mpath.exists(): + msg = f"Found collection at '{collpath}' but missing MANIFEST.json, cannot get info." + _logger.fatal(msg) + raise InvalidPrerequisiteError(msg) + + with mpath.open(encoding="utf-8") as f: + manifest = json.loads(f.read()) + found_version = CollectionVersion( + manifest["collection_info"]["version"], + ) + if version and found_version < CollectionVersion(version): + if install: + self.install_collection(f"{name}:>={version}") + self.require_collection(name, version, install=False) + else: + msg = f"Found {name} collection {found_version} but {version} or newer is required." + _logger.fatal(msg) + raise InvalidPrerequisiteError(msg) + return found_version, collpath.resolve() + break + else: + if install: + self.install_collection(f"{name}:>={version}" if version else name) + return self.require_collection( + name=name, + version=version, + install=False, + ) + msg = f"Collection '{name}' not found in '{paths}'" + _logger.fatal(msg) + raise InvalidPrerequisiteError(msg) + + def _prepare_ansible_paths(self) -> None: + """Configure Ansible environment variables.""" + try: + library_paths: list[str] = self.config.default_module_path.copy() + roles_path: list[str] = self.config.default_roles_path.copy() + collections_path: list[str] = self.config.collections_paths.copy() + except AttributeError as exc: + msg = "Unexpected ansible configuration" + raise RuntimeError(msg) from exc + + alterations_list: list[tuple[list[str], str, bool]] = [ + (library_paths, "plugins/modules", True), + (roles_path, "roles", True), + ] + + alterations_list.extend( + [ + (roles_path, f"{self.cache_dir}/roles", False), + (library_paths, f"{self.cache_dir}/modules", False), + (collections_path, f"{self.cache_dir}/collections", False), + ] + if self.isolated + else [], + ) + + for path_list, path_, must_be_present in alterations_list: + path = Path(path_) + if not path.exists(): + if must_be_present: + continue + path.mkdir(parents=True, exist_ok=True) + if str(path) not in path_list: + path_list.insert(0, str(path)) + + if library_paths != self.config.DEFAULT_MODULE_PATH: + self._update_env("ANSIBLE_LIBRARY", library_paths) + if collections_path != self.config.default_collections_path: + self._update_env(ansible_collections_path(), collections_path) + if roles_path != self.config.default_roles_path: + self._update_env("ANSIBLE_ROLES_PATH", roles_path) + + def _get_roles_path(self) -> Path: + """Return roles installation path. + + If `self.isolated` is set to `True`, `self.cache_dir` would be + created, then it returns the `self.cache_dir/roles`. When `self.isolated` is + not mentioned or set to `False`, it returns the first path in + `default_roles_path`. + """ + if self.cache_dir: + path = Path(f"{self.cache_dir}/roles") + else: + path = Path(self.config.default_roles_path[0]).expanduser() + return path + + def _install_galaxy_role( + self, + project_dir: Path, + role_name_check: int = 0, + *, + ignore_errors: bool = False, + ) -> None: + """Detect standalone galaxy role and installs it. + + :param: role_name_check: logic to used to check role name + 0: exit with error if name is not compliant (default) + 1: warn if name is not compliant + 2: bypass any name checking + + :param: ignore_errors: if True, bypass installing invalid roles. + + Our implementation aims to match ansible-galaxy's behaviour for installing + roles from a tarball or scm. For example ansible-galaxy will install a role + that has both galaxy.yml and meta/main.yml present but empty. Also missing + galaxy.yml is accepted but missing meta/main.yml is not. + """ + yaml = None + galaxy_info = {} + + for meta_main in META_MAIN: + meta_filename = Path(project_dir) / meta_main + + if meta_filename.exists(): + break + else: + if ignore_errors: + return + + yaml = yaml_from_file(meta_filename) + + if yaml and "galaxy_info" in yaml: + galaxy_info = yaml["galaxy_info"] + + fqrn = _get_role_fqrn(galaxy_info, project_dir) + + if role_name_check in [0, 1]: + if not re.match(r"[a-z0-9][a-z0-9_]+\.[a-z][a-z0-9_]+$", fqrn): + msg = MSG_INVALID_FQRL.format(fqrn) + if role_name_check == 1: + _logger.warning(msg) + else: + _logger.error(msg) + raise InvalidPrerequisiteError(msg) + elif "role_name" in galaxy_info: + # when 'role-name' is in skip_list, we stick to plain role names + role_namespace = _get_galaxy_role_ns(galaxy_info) + role_name = _get_galaxy_role_name(galaxy_info) + fqrn = f"{role_namespace}{role_name}" + else: + fqrn = Path(project_dir).absolute().name + path = self._get_roles_path() + path.mkdir(parents=True, exist_ok=True) + link_path = path / fqrn + # despite documentation stating that is_file() reports true for symlinks, + # it appears that is_dir() reports true instead, so we rely on exists(). + target = Path(project_dir).absolute() + if not link_path.exists() or ( + link_path.is_symlink() and link_path.readlink() != target + ): + # must call unlink before checking exists because a broken + # link reports as not existing and we want to repair it + link_path.unlink(missing_ok=True) + # https://github.com/python/cpython/issues/73843 + link_path.symlink_to(str(target), target_is_directory=True) + _logger.info( + "Using %s symlink to current repository in order to enable Ansible to find the role using its expected full name.", + link_path, + ) + + def _update_env(self, varname: str, value: list[str], default: str = "") -> None: + """Update colon based environment variable if needed. + + New values are prepended to make sure they take precedence. + """ + if not value: + return + orig_value = self.environ.get(varname, default) + if orig_value: + value = [*value, *orig_value.split(":")] + value_str = ":".join(value) + if value_str != self.environ.get(varname, ""): + self.environ[varname] = value_str + _logger.info("Set %s=%s", varname, value_str) + + +def _get_role_fqrn(galaxy_infos: dict[str, Any], project_dir: Path) -> str: + """Compute role fqrn.""" + role_namespace = _get_galaxy_role_ns(galaxy_infos) + role_name = _get_galaxy_role_name(galaxy_infos) + + if len(role_name) == 0: + role_name = Path(project_dir).absolute().name + role_name = re.sub(r"(ansible-|ansible-role-)", "", role_name).split( + ".", + maxsplit=2, + )[-1] + + return f"{role_namespace}{role_name}" + + +def _get_galaxy_role_ns(galaxy_infos: dict[str, Any]) -> str: + """Compute role namespace from meta/main.yml, including trailing dot.""" + role_namespace = galaxy_infos.get("namespace", "") + if len(role_namespace) == 0: + role_namespace = galaxy_infos.get("author", "") + if not isinstance(role_namespace, str): + msg = f"Role namespace must be string, not {role_namespace}" + raise AnsibleCompatError(msg) + # if there's a space in the name space, it's likely author name + # and not the galaxy login, so act as if there was no namespace + if not role_namespace or re.match(r"^\w+ \w+", role_namespace): + role_namespace = "" + else: + role_namespace = f"{role_namespace}." + return role_namespace + + +def _get_galaxy_role_name(galaxy_infos: dict[str, Any]) -> str: + """Compute role name from meta/main.yml.""" + result = galaxy_infos.get("role_name", "") + if not isinstance(result, str): + return "" + return result + + +def search_galaxy_paths(search_dir: Path) -> list[str]: + """Search for galaxy paths (only one level deep).""" + galaxy_paths: list[str] = [] + for file in [".", *os.listdir(search_dir)]: + # We ignore any folders that are not valid namespaces, just like + # ansible galaxy does at this moment. + if file != "." and not namespace_re.match(file): + continue + file_path = search_dir / file / "galaxy.yml" + if file_path.is_file(): + galaxy_paths.append(str(file_path)) + return galaxy_paths + + +def is_url(name: str) -> bool: + """Return True if a dependency name looks like an URL.""" + return bool(re.match("^git[+@]", name)) diff --git a/src/ansible_compat/schema.py b/src/ansible_compat/schema.py new file mode 100644 index 0000000..2950e08 --- /dev/null +++ b/src/ansible_compat/schema.py @@ -0,0 +1,110 @@ +"""Utils for JSON Schema validation.""" +from __future__ import annotations + +import json +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import jsonschema +from jsonschema.validators import validator_for + +if TYPE_CHECKING: + from ansible_compat.types import JSON + + +def to_path(schema_path: Sequence[str | int]) -> str: + """Flatten a path to a dot delimited string. + + :param schema_path: The schema path + :returns: The dot delimited path + """ + return ".".join(str(index) for index in schema_path) + + +def json_path(absolute_path: Sequence[str | int]) -> str: + """Flatten a data path to a dot delimited string. + + :param absolute_path: The path + :returns: The dot delimited string + """ + path = "$" + for elem in absolute_path: + if isinstance(elem, int): + path += "[" + str(elem) + "]" + else: + path += "." + elem + return path + + +@dataclass(order=True) +class JsonSchemaError: + # pylint: disable=too-many-instance-attributes + """Data structure to hold a json schema validation error.""" + + # order of attributes below is important for sorting + schema_path: str + data_path: str + json_path: str + message: str + expected: bool | int | str + relative_schema: str + validator: str + found: str + + def to_friendly(self) -> str: + """Provide a friendly explanation of the error. + + :returns: The error message + """ + return f"In '{self.data_path}': {self.message}." + + +def validate( + schema: JSON, + data: JSON, +) -> list[JsonSchemaError]: + """Validate some data against a JSON schema. + + :param schema: the JSON schema to use for validation + :param data: The data to validate + :returns: Any errors encountered + """ + errors: list[JsonSchemaError] = [] + + if isinstance(schema, str): + schema = json.loads(schema) + try: + if not isinstance(schema, Mapping): + msg = "Invalid schema, must be a mapping" + raise jsonschema.SchemaError(msg) # noqa: TRY301 + validator = validator_for(schema) + validator.check_schema(schema) + except jsonschema.SchemaError as exc: + error = JsonSchemaError( + message=str(exc), + data_path="schema sanity check", + json_path="", + schema_path="", + relative_schema="", + expected="", + validator="", + found="", + ) + errors.append(error) + return errors + + for validation_error in validator(schema).iter_errors(data): + if isinstance(validation_error, jsonschema.ValidationError): + error = JsonSchemaError( + message=validation_error.message, + data_path=to_path(validation_error.absolute_path), + json_path=json_path(validation_error.absolute_path), + schema_path=to_path(validation_error.schema_path), + relative_schema=validation_error.schema, + expected=validation_error.validator_value, + validator=str(validation_error.validator), + found=str(validation_error.instance), + ) + errors.append(error) + return sorted(errors) diff --git a/src/ansible_compat/types.py b/src/ansible_compat/types.py new file mode 100644 index 0000000..4514606 --- /dev/null +++ b/src/ansible_compat/types.py @@ -0,0 +1,23 @@ +"""Custom types.""" +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from typing import Union + +try: # py39 does not have TypeAlias + from typing_extensions import TypeAlias +except ImportError: + from typing import TypeAlias # type: ignore[no-redef,attr-defined] + +JSON: TypeAlias = Union[dict[str, "JSON"], list["JSON"], str, int, float, bool, None] +JSON_ro: TypeAlias = Union[ + Mapping[str, "JSON_ro"], + Sequence["JSON_ro"], + str, + int, + float, + bool, + None, +] + +__all__ = ["JSON", "JSON_ro"] |