diff options
Diffstat (limited to 'src/ansible_compat/runtime.py')
-rw-r--r-- | src/ansible_compat/runtime.py | 961 |
1 files changed, 961 insertions, 0 deletions
diff --git a/src/ansible_compat/runtime.py b/src/ansible_compat/runtime.py new file mode 100644 index 0000000..ad81132 --- /dev/null +++ b/src/ansible_compat/runtime.py @@ -0,0 +1,961 @@ +"""Ansible runtime environment manager.""" +from __future__ import annotations + +import contextlib +import importlib +import json +import logging +import os +import re +import shutil +import subprocess +import sys +import warnings +from collections import OrderedDict +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, no_type_check + +import subprocess_tee +from packaging.version import Version + +from ansible_compat.config import ( + AnsibleConfig, + ansible_collections_path, + ansible_version, + parse_ansible_version, +) +from ansible_compat.constants import ( + META_MAIN, + MSG_INVALID_FQRL, + RC_ANSIBLE_OPTIONS_ERROR, + REQUIREMENT_LOCATIONS, +) +from ansible_compat.errors import ( + AnsibleCommandError, + AnsibleCompatError, + InvalidPrerequisiteError, + MissingAnsibleError, +) +from ansible_compat.loaders import colpath_from_path, yaml_from_file +from ansible_compat.prerun import get_cache_dir + +if TYPE_CHECKING: + # https://github.com/PyCQA/pylint/issues/3240 + # pylint: disable=unsubscriptable-object + CompletedProcess = subprocess.CompletedProcess[Any] +else: + CompletedProcess = subprocess.CompletedProcess + + +_logger = logging.getLogger(__name__) +# regex to extract the first version from a collection range specifier +version_re = re.compile(":[>=<]*([^,]*)") +namespace_re = re.compile("^[a-z][a-z0-9_]+$") + + +class AnsibleWarning(Warning): + """Warnings related to Ansible runtime.""" + + +@dataclass +class Collection: + """Container for Ansible collection information.""" + + name: str + version: str + path: Path + + +class CollectionVersion(Version): + """Collection version.""" + + def __init__(self, version: str) -> None: + """Initialize collection version.""" + # As packaging Version class does not support wildcard, we convert it + # to "0", as this being the smallest version possible. + if version == "*": + version = "0" + super().__init__(version) + + +@dataclass +class Plugins: # pylint: disable=too-many-instance-attributes + """Dataclass to access installed Ansible plugins, uses ansible-doc to retrieve them.""" + + runtime: Runtime + become: dict[str, str] = field(init=False) + cache: dict[str, str] = field(init=False) + callback: dict[str, str] = field(init=False) + cliconf: dict[str, str] = field(init=False) + connection: dict[str, str] = field(init=False) + httpapi: dict[str, str] = field(init=False) + inventory: dict[str, str] = field(init=False) + lookup: dict[str, str] = field(init=False) + netconf: dict[str, str] = field(init=False) + shell: dict[str, str] = field(init=False) + vars: dict[str, str] = field(init=False) # noqa: A003 + module: dict[str, str] = field(init=False) + strategy: dict[str, str] = field(init=False) + test: dict[str, str] = field(init=False) + filter: dict[str, str] = field(init=False) # noqa: A003 + role: dict[str, str] = field(init=False) + keyword: dict[str, str] = field(init=False) + + @no_type_check + def __getattribute__(self, attr: str): # noqa: ANN204 + """Get attribute.""" + if attr in { + "become", + "cache", + "callback", + "cliconf", + "connection", + "httpapi", + "inventory", + "lookup", + "netconf", + "shell", + "vars", + "module", + "strategy", + "test", + "filter", + "role", + "keyword", + }: + try: + result = super().__getattribute__(attr) + except AttributeError as exc: + if ansible_version() < Version("2.14") and attr in {"filter", "test"}: + msg = "Ansible version below 2.14 does not support retrieving filter and test plugins." + raise RuntimeError(msg) from exc + proc = self.runtime.run( + ["ansible-doc", "--json", "-l", "-t", attr], + ) + data = json.loads(proc.stdout) + if not isinstance(data, dict): # pragma: no cover + msg = "Unexpected output from ansible-doc" + raise AnsibleCompatError(msg) from exc + result = data + else: + result = super().__getattribute__(attr) + + return result + + +# pylint: disable=too-many-instance-attributes +class Runtime: + """Ansible Runtime manager.""" + + _version: Version | None = None + collections: OrderedDict[str, Collection] = OrderedDict() + cache_dir: Path | None = None + # Used to track if we have already initialized the Ansible runtime as attempts + # to do it multiple tilmes will cause runtime warnings from within ansible-core + initialized: bool = False + plugins: Plugins + + def __init__( + self, + project_dir: Path | None = None, + *, + isolated: bool = False, + min_required_version: str | None = None, + require_module: bool = False, + max_retries: int = 0, + environ: dict[str, str] | None = None, + verbosity: int = 0, + ) -> None: + """Initialize Ansible runtime environment. + + :param project_dir: The directory containing the Ansible project. If + not mentioned it will be guessed from the current + working directory. + :param isolated: Assure that installation of collections or roles + does not affect Ansible installation, an unique cache + directory being used instead. + :param min_required_version: Minimal version of Ansible required. If + not found, a :class:`RuntimeError` + exception is raised. + :param require_module: If set, instantiation will fail if Ansible + Python module is missing or is not matching + the same version as the Ansible command line. + That is useful for consumers that expect to + also perform Python imports from Ansible. + :param max_retries: Number of times it should retry network operations. + Default is 0, no retries. + :param environ: Environment dictionary to use, if undefined + ``os.environ`` will be copied and used. + :param verbosity: Verbosity level to use. + """ + self.project_dir = project_dir or Path.cwd() + self.isolated = isolated + self.max_retries = max_retries + self.environ = environ or os.environ.copy() + self.plugins = Plugins(runtime=self) + self.verbosity = verbosity + + self.initialize_logger(level=self.verbosity) + + # Reduce noise from paramiko, unless user already defined PYTHONWARNINGS + # paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated + # https://github.com/paramiko/paramiko/issues/2038 + # As CryptographyDeprecationWarning is not a builtin, we cannot use + # PYTHONWARNINGS to ignore it using category but we can use message. + # https://stackoverflow.com/q/68251969/99834 + if "PYTHONWARNINGS" not in self.environ: # pragma: no cover + self.environ["PYTHONWARNINGS"] = "ignore:Blowfish has been deprecated" + + if isolated: + self.cache_dir = get_cache_dir(self.project_dir) + self.config = AnsibleConfig() + + # Add the sys.path to the collection paths if not isolated + self._add_sys_path_to_collection_paths() + + if not self.version_in_range(lower=min_required_version): + msg = f"Found incompatible version of ansible runtime {self.version}, instead of {min_required_version} or newer." + raise RuntimeError(msg) + if require_module: + self._ensure_module_available() + + # pylint: disable=import-outside-toplevel + from ansible.utils.display import Display + + # pylint: disable=unused-argument + def warning( + self: Display, # noqa: ARG001 + msg: str, + *, + formatted: bool = False, # noqa: ARG001 + ) -> None: + """Override ansible.utils.display.Display.warning to avoid printing warnings.""" + warnings.warn( + message=msg, + category=AnsibleWarning, + stacklevel=2, + source={"msg": msg}, + ) + + # Monkey patch ansible warning in order to use warnings module. + Display.warning = warning + + def initialize_logger(self, level: int = 0) -> None: + """Set up the global logging level based on the verbosity number.""" + verbosity_map = { + -2: logging.CRITICAL, + -1: logging.ERROR, + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + # Unknown logging level is treated as DEBUG + logging_level = verbosity_map.get(level, logging.DEBUG) + _logger.setLevel(logging_level) + # Use module-level _logger instance to validate it + _logger.debug("Logging initialized to level %s", logging_level) + + def _add_sys_path_to_collection_paths(self) -> None: + """Add the sys.path to the collection paths.""" + if self.config.collections_scan_sys_path: + for path in sys.path: + if ( + path not in self.config.collections_paths + and (Path(path) / "ansible_collections").is_dir() + ): + self.config.collections_paths.append( # pylint: disable=E1101 + path, + ) + + def load_collections(self) -> None: + """Load collection data.""" + self.collections = OrderedDict() + no_collections_msg = "None of the provided paths were usable" + + proc = self.run(["ansible-galaxy", "collection", "list", "--format=json"]) + if proc.returncode == RC_ANSIBLE_OPTIONS_ERROR and ( + no_collections_msg in proc.stdout or no_collections_msg in proc.stderr + ): + _logger.debug("Ansible reported no installed collections at all.") + return + if proc.returncode != 0: + _logger.error(proc) + msg = f"Unable to list collections: {proc}" + raise RuntimeError(msg) + data = json.loads(proc.stdout) + if not isinstance(data, dict): + msg = f"Unexpected collection data, {data}" + raise TypeError(msg) + for path in data: + for collection, collection_info in data[path].items(): + if not isinstance(collection, str): + msg = f"Unexpected collection data, {collection}" + raise TypeError(msg) + if not isinstance(collection_info, dict): + msg = f"Unexpected collection data, {collection_info}" + raise TypeError(msg) + + self.collections[collection] = Collection( + name=collection, + version=collection_info["version"], + path=path, + ) + + def _ensure_module_available(self) -> None: + """Assure that Ansible Python module is installed and matching CLI version.""" + ansible_release_module = None + with contextlib.suppress(ModuleNotFoundError, ImportError): + ansible_release_module = importlib.import_module("ansible.release") + + if ansible_release_module is None: + msg = "Unable to find Ansible python module." + raise RuntimeError(msg) + + ansible_module_version = Version( + ansible_release_module.__version__, + ) + if ansible_module_version != self.version: + msg = f"Ansible CLI ({self.version}) and python module ({ansible_module_version}) versions do not match. This indicates a broken execution environment." + raise RuntimeError(msg) + + # For ansible 2.15+ we need to initialize the plugin loader + # https://github.com/ansible/ansible-lint/issues/2945 + if not Runtime.initialized: + col_path = [f"{self.cache_dir}/collections"] + if self.version >= Version("2.15.0.dev0"): + # pylint: disable=import-outside-toplevel,no-name-in-module + from ansible.plugins.loader import init_plugin_loader + + init_plugin_loader(col_path) + else: + # noinspection PyProtectedMember + from ansible.utils.collection_loader._collection_finder import ( # pylint: disable=import-outside-toplevel + _AnsibleCollectionFinder, + ) + + # noinspection PyProtectedMember + # pylint: disable=protected-access + col_path += self.config.collections_paths + col_path += os.path.dirname( # noqa: PTH120 + os.environ.get(ansible_collections_path(), "."), + ).split(":") + _AnsibleCollectionFinder( # noqa: SLF001 + paths=col_path, + )._install() # pylint: disable=protected-access + Runtime.initialized = True + + def clean(self) -> None: + """Remove content of cache_dir.""" + if self.cache_dir: + shutil.rmtree(self.cache_dir, ignore_errors=True) + + def run( # ruff: disable=PLR0913 + self, + args: str | list[str], + *, + retry: bool = False, + tee: bool = False, + env: dict[str, str] | None = None, + cwd: Path | None = None, + ) -> CompletedProcess: + """Execute a command inside an Ansible environment. + + :param retry: Retry network operations on failures. + :param tee: Also pass captured stdout/stderr to system while running. + """ + if tee: + run_func: Callable[..., CompletedProcess] = subprocess_tee.run + else: + run_func = subprocess.run + env = self.environ if env is None else env.copy() + # Presence of ansible debug variable or config option will prevent us + # from parsing its JSON output due to extra debug messages on stdout. + env["ANSIBLE_DEBUG"] = "0" + + # https://github.com/ansible/ansible-lint/issues/3522 + env["ANSIBLE_VERBOSE_TO_STDERR"] = "True" + + for _ in range(self.max_retries + 1 if retry else 1): + result = run_func( + args, + universal_newlines=True, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd=str(cwd) if cwd else None, + ) + if result.returncode == 0: + break + _logger.debug("Environment: %s", env) + if retry: + _logger.warning( + "Retrying execution failure %s of: %s", + result.returncode, + " ".join(args), + ) + return result + + @property + def version(self) -> Version: + """Return current Version object for Ansible. + + If version is not mentioned, it returns current version as detected. + When version argument is mentioned, it return converts the version string + to Version object in order to make it usable in comparisons. + """ + if self._version: + return self._version + + proc = self.run(["ansible", "--version"]) + if proc.returncode == 0: + self._version = parse_ansible_version(proc.stdout) + return self._version + + msg = "Unable to find a working copy of ansible executable." + raise MissingAnsibleError(msg, proc=proc) + + def version_in_range( + self, + lower: str | None = None, + upper: str | None = None, + ) -> bool: + """Check if Ansible version is inside a required range. + + The lower limit is inclusive and the upper one exclusive. + """ + if lower and self.version < Version(lower): + return False + if upper and self.version >= Version(upper): + return False + return True + + def install_collection( + self, + collection: str | Path, + *, + destination: Path | None = None, + force: bool = False, + ) -> None: + """Install an Ansible collection. + + Can accept arguments like: + 'foo.bar:>=1.2.3' + 'git+https://github.com/ansible-collections/ansible.posix.git,main' + """ + cmd = [ + "ansible-galaxy", + "collection", + "install", + "-vvv", # this is needed to make ansible display important info in case of failures + ] + if force: + cmd.append("--force") + + if isinstance(collection, Path): + collection = str(collection) + # As ansible-galaxy install is not able to automatically determine + # if the range requires a pre-release, we need to manually add the --pre + # flag when needed. + matches = version_re.search(collection) + + if ( + not is_url(collection) + and matches + and CollectionVersion(matches[1]).is_prerelease + ): + cmd.append("--pre") + + cpaths: list[str] = self.config.collections_paths + if destination and str(destination) not in cpaths: + # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so + # we hack ansible_collections_path instead and inject our own path there. + # pylint: disable=no-member + cpaths.insert(0, str(destination)) + cmd.append(f"{collection}") + + _logger.info("Running from %s : %s", Path.cwd(), " ".join(cmd)) + process = self.run( + cmd, + retry=True, + env={**self.environ, ansible_collections_path(): ":".join(cpaths)}, + ) + if process.returncode != 0: + msg = f"Command returned {process.returncode} code:\n{process.stdout}\n{process.stderr}" + _logger.error(msg) + raise InvalidPrerequisiteError(msg) + + def install_collection_from_disk( + self, + path: Path, + destination: Path | None = None, + ) -> None: + """Build and install collection from a given disk path.""" + self.install_collection(path, destination=destination, force=True) + + # pylint: disable=too-many-branches + def install_requirements( # noqa: C901 + self, + requirement: Path, + *, + retry: bool = False, + offline: bool = False, + ) -> None: + """Install dependencies from a requirements.yml. + + :param requirement: path to requirements.yml file + :param retry: retry network operations on failures + :param offline: bypass installation, may fail if requirements are not met. + """ + if not Path(requirement).exists(): + return + reqs_yaml = yaml_from_file(Path(requirement)) + if not isinstance(reqs_yaml, (dict, list)): + msg = f"{requirement} file is not a valid Ansible requirements file." + raise InvalidPrerequisiteError(msg) + + if isinstance(reqs_yaml, dict): + for key in reqs_yaml: + if key not in ("roles", "collections"): + msg = f"{requirement} file is not a valid Ansible requirements file. Only 'roles' and 'collections' keys are allowed at root level. Recognized valid locations are: {', '.join(REQUIREMENT_LOCATIONS)}" + raise InvalidPrerequisiteError(msg) + + if isinstance(reqs_yaml, list) or "roles" in reqs_yaml: + cmd = [ + "ansible-galaxy", + "role", + "install", + "-r", + f"{requirement}", + ] + if self.verbosity > 0: + cmd.extend(["-" + ("v" * self.verbosity)]) + if self.cache_dir: + cmd.extend(["--roles-path", f"{self.cache_dir}/roles"]) + + if offline: + _logger.warning( + "Skipped installing old role dependencies due to running in offline mode.", + ) + else: + _logger.info("Running %s", " ".join(cmd)) + + result = self.run(cmd, retry=retry) + _logger.debug(result.stdout) + if result.returncode != 0: + _logger.error(result.stderr) + raise AnsibleCommandError(result) + + # Run galaxy collection install works on v2 requirements.yml + if "collections" in reqs_yaml and reqs_yaml["collections"] is not None: + cmd = [ + "ansible-galaxy", + "collection", + "install", + ] + if self.verbosity > 0: + cmd.extend(["-" + ("v" * self.verbosity)]) + + for collection in reqs_yaml["collections"]: + if isinstance(collection, dict) and collection.get("type", "") == "git": + _logger.info( + "Adding '--pre' to ansible-galaxy collection install because we detected one collection being sourced from git.", + ) + cmd.append("--pre") + break + if offline: + _logger.warning( + "Skipped installing collection dependencies due to running in offline mode.", + ) + else: + cmd.extend(["-r", str(requirement)]) + cpaths = self.config.collections_paths + if self.cache_dir: + # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so + # we hack ansible_collections_path instead and inject our own path there. + dest_path = f"{self.cache_dir}/collections" + if dest_path not in cpaths: + # pylint: disable=no-member + cpaths.insert(0, dest_path) + _logger.info("Running %s", " ".join(cmd)) + result = self.run( + cmd, + retry=retry, + env={**os.environ, "ANSIBLE_COLLECTIONS_PATH": ":".join(cpaths)}, + ) + _logger.debug(result.stdout) + if result.returncode != 0: + _logger.error(result.stderr) + raise AnsibleCommandError(result) + + def prepare_environment( # noqa: C901 + self, + required_collections: dict[str, str] | None = None, + *, + retry: bool = False, + install_local: bool = False, + offline: bool = False, + role_name_check: int = 0, + ) -> None: + """Make dependencies available if needed.""" + destination: Path | None = None + if required_collections is None: + required_collections = {} + + # first one is standard for collection layout repos and the last two + # are part of Tower specification + # https://docs.ansible.com/ansible-tower/latest/html/userguide/projects.html#ansible-galaxy-support + # https://docs.ansible.com/ansible-tower/latest/html/userguide/projects.html#collections-support + for req_file in REQUIREMENT_LOCATIONS: + self.install_requirements(Path(req_file), retry=retry, offline=offline) + + self._prepare_ansible_paths() + + if not install_local: + return + + for gpath in search_galaxy_paths(self.project_dir): + # processing all found galaxy.yml files + galaxy_path = Path(gpath) + if galaxy_path.exists(): + data = yaml_from_file(galaxy_path) + if isinstance(data, dict) and "dependencies" in data: + for name, required_version in data["dependencies"].items(): + _logger.info( + "Provisioning collection %s:%s from galaxy.yml", + name, + required_version, + ) + self.install_collection( + f"{name}{',' if is_url(name) else ':'}{required_version}", + destination=destination, + ) + + if self.cache_dir: + destination = self.cache_dir / "collections" + for name, min_version in required_collections.items(): + self.install_collection( + f"{name}:>={min_version}", + destination=destination, + ) + + if (self.project_dir / "galaxy.yml").exists(): + if destination: + # while function can return None, that would not break the logic + colpath = Path( + f"{destination}/ansible_collections/{colpath_from_path(self.project_dir)}", + ) + if colpath.is_symlink(): + if os.path.realpath(colpath) == str(Path.cwd()): + _logger.warning( + "Found symlinked collection, skipping its installation.", + ) + return + _logger.warning( + "Collection is symlinked, but not pointing to %s directory, so we will remove it.", + Path.cwd(), + ) + colpath.unlink() + + # molecule scenario within a collection + self.install_collection_from_disk( + galaxy_path.parent, + destination=destination, + ) + elif ( + Path().resolve().parent.name == "roles" + and Path("../../galaxy.yml").exists() + ): + # molecule scenario located within roles/<role-name>/molecule inside + # a collection + self.install_collection_from_disk( + Path("../.."), + destination=destination, + ) + else: + # no collection, try to recognize and install a standalone role + self._install_galaxy_role( + self.project_dir, + role_name_check=role_name_check, + ignore_errors=True, + ) + # reload collections + self.load_collections() + + def require_collection( + self, + name: str, + version: str | None = None, + *, + install: bool = True, + ) -> tuple[CollectionVersion, Path]: + """Check if a minimal collection version is present or exits. + + In the future this method may attempt to install a missing or outdated + collection before failing. + + :param name: collection name + :param version: minimal version required + :param install: if True, attempt to install a missing collection + :returns: tuple of (found_version, collection_path) + """ + try: + ns, coll = name.split(".", 1) + except ValueError as exc: + msg = f"Invalid collection name supplied: {name}%s" + raise InvalidPrerequisiteError( + msg, + ) from exc + + paths: list[str] = self.config.collections_paths + if not paths or not isinstance(paths, list): + msg = f"Unable to determine ansible collection paths. ({paths})" + raise InvalidPrerequisiteError( + msg, + ) + + if self.cache_dir: + # if we have a cache dir, we want to be use that would be preferred + # destination when installing a missing collection + # https://github.com/PyCQA/pylint/issues/4667 + paths.insert(0, f"{self.cache_dir}/collections") # pylint: disable=E1101 + + for path in paths: + collpath = Path(path) / "ansible_collections" / ns / coll + if collpath.exists(): + mpath = collpath / "MANIFEST.json" + if not mpath.exists(): + msg = f"Found collection at '{collpath}' but missing MANIFEST.json, cannot get info." + _logger.fatal(msg) + raise InvalidPrerequisiteError(msg) + + with mpath.open(encoding="utf-8") as f: + manifest = json.loads(f.read()) + found_version = CollectionVersion( + manifest["collection_info"]["version"], + ) + if version and found_version < CollectionVersion(version): + if install: + self.install_collection(f"{name}:>={version}") + self.require_collection(name, version, install=False) + else: + msg = f"Found {name} collection {found_version} but {version} or newer is required." + _logger.fatal(msg) + raise InvalidPrerequisiteError(msg) + return found_version, collpath.resolve() + break + else: + if install: + self.install_collection(f"{name}:>={version}" if version else name) + return self.require_collection( + name=name, + version=version, + install=False, + ) + msg = f"Collection '{name}' not found in '{paths}'" + _logger.fatal(msg) + raise InvalidPrerequisiteError(msg) + + def _prepare_ansible_paths(self) -> None: + """Configure Ansible environment variables.""" + try: + library_paths: list[str] = self.config.default_module_path.copy() + roles_path: list[str] = self.config.default_roles_path.copy() + collections_path: list[str] = self.config.collections_paths.copy() + except AttributeError as exc: + msg = "Unexpected ansible configuration" + raise RuntimeError(msg) from exc + + alterations_list: list[tuple[list[str], str, bool]] = [ + (library_paths, "plugins/modules", True), + (roles_path, "roles", True), + ] + + alterations_list.extend( + [ + (roles_path, f"{self.cache_dir}/roles", False), + (library_paths, f"{self.cache_dir}/modules", False), + (collections_path, f"{self.cache_dir}/collections", False), + ] + if self.isolated + else [], + ) + + for path_list, path_, must_be_present in alterations_list: + path = Path(path_) + if not path.exists(): + if must_be_present: + continue + path.mkdir(parents=True, exist_ok=True) + if str(path) not in path_list: + path_list.insert(0, str(path)) + + if library_paths != self.config.DEFAULT_MODULE_PATH: + self._update_env("ANSIBLE_LIBRARY", library_paths) + if collections_path != self.config.default_collections_path: + self._update_env(ansible_collections_path(), collections_path) + if roles_path != self.config.default_roles_path: + self._update_env("ANSIBLE_ROLES_PATH", roles_path) + + def _get_roles_path(self) -> Path: + """Return roles installation path. + + If `self.isolated` is set to `True`, `self.cache_dir` would be + created, then it returns the `self.cache_dir/roles`. When `self.isolated` is + not mentioned or set to `False`, it returns the first path in + `default_roles_path`. + """ + if self.cache_dir: + path = Path(f"{self.cache_dir}/roles") + else: + path = Path(self.config.default_roles_path[0]).expanduser() + return path + + def _install_galaxy_role( + self, + project_dir: Path, + role_name_check: int = 0, + *, + ignore_errors: bool = False, + ) -> None: + """Detect standalone galaxy role and installs it. + + :param: role_name_check: logic to used to check role name + 0: exit with error if name is not compliant (default) + 1: warn if name is not compliant + 2: bypass any name checking + + :param: ignore_errors: if True, bypass installing invalid roles. + + Our implementation aims to match ansible-galaxy's behaviour for installing + roles from a tarball or scm. For example ansible-galaxy will install a role + that has both galaxy.yml and meta/main.yml present but empty. Also missing + galaxy.yml is accepted but missing meta/main.yml is not. + """ + yaml = None + galaxy_info = {} + + for meta_main in META_MAIN: + meta_filename = Path(project_dir) / meta_main + + if meta_filename.exists(): + break + else: + if ignore_errors: + return + + yaml = yaml_from_file(meta_filename) + + if yaml and "galaxy_info" in yaml: + galaxy_info = yaml["galaxy_info"] + + fqrn = _get_role_fqrn(galaxy_info, project_dir) + + if role_name_check in [0, 1]: + if not re.match(r"[a-z0-9][a-z0-9_]+\.[a-z][a-z0-9_]+$", fqrn): + msg = MSG_INVALID_FQRL.format(fqrn) + if role_name_check == 1: + _logger.warning(msg) + else: + _logger.error(msg) + raise InvalidPrerequisiteError(msg) + elif "role_name" in galaxy_info: + # when 'role-name' is in skip_list, we stick to plain role names + role_namespace = _get_galaxy_role_ns(galaxy_info) + role_name = _get_galaxy_role_name(galaxy_info) + fqrn = f"{role_namespace}{role_name}" + else: + fqrn = Path(project_dir).absolute().name + path = self._get_roles_path() + path.mkdir(parents=True, exist_ok=True) + link_path = path / fqrn + # despite documentation stating that is_file() reports true for symlinks, + # it appears that is_dir() reports true instead, so we rely on exists(). + target = Path(project_dir).absolute() + if not link_path.exists() or ( + link_path.is_symlink() and link_path.readlink() != target + ): + # must call unlink before checking exists because a broken + # link reports as not existing and we want to repair it + link_path.unlink(missing_ok=True) + # https://github.com/python/cpython/issues/73843 + link_path.symlink_to(str(target), target_is_directory=True) + _logger.info( + "Using %s symlink to current repository in order to enable Ansible to find the role using its expected full name.", + link_path, + ) + + def _update_env(self, varname: str, value: list[str], default: str = "") -> None: + """Update colon based environment variable if needed. + + New values are prepended to make sure they take precedence. + """ + if not value: + return + orig_value = self.environ.get(varname, default) + if orig_value: + value = [*value, *orig_value.split(":")] + value_str = ":".join(value) + if value_str != self.environ.get(varname, ""): + self.environ[varname] = value_str + _logger.info("Set %s=%s", varname, value_str) + + +def _get_role_fqrn(galaxy_infos: dict[str, Any], project_dir: Path) -> str: + """Compute role fqrn.""" + role_namespace = _get_galaxy_role_ns(galaxy_infos) + role_name = _get_galaxy_role_name(galaxy_infos) + + if len(role_name) == 0: + role_name = Path(project_dir).absolute().name + role_name = re.sub(r"(ansible-|ansible-role-)", "", role_name).split( + ".", + maxsplit=2, + )[-1] + + return f"{role_namespace}{role_name}" + + +def _get_galaxy_role_ns(galaxy_infos: dict[str, Any]) -> str: + """Compute role namespace from meta/main.yml, including trailing dot.""" + role_namespace = galaxy_infos.get("namespace", "") + if len(role_namespace) == 0: + role_namespace = galaxy_infos.get("author", "") + if not isinstance(role_namespace, str): + msg = f"Role namespace must be string, not {role_namespace}" + raise AnsibleCompatError(msg) + # if there's a space in the name space, it's likely author name + # and not the galaxy login, so act as if there was no namespace + if not role_namespace or re.match(r"^\w+ \w+", role_namespace): + role_namespace = "" + else: + role_namespace = f"{role_namespace}." + return role_namespace + + +def _get_galaxy_role_name(galaxy_infos: dict[str, Any]) -> str: + """Compute role name from meta/main.yml.""" + result = galaxy_infos.get("role_name", "") + if not isinstance(result, str): + return "" + return result + + +def search_galaxy_paths(search_dir: Path) -> list[str]: + """Search for galaxy paths (only one level deep).""" + galaxy_paths: list[str] = [] + for file in [".", *os.listdir(search_dir)]: + # We ignore any folders that are not valid namespaces, just like + # ansible galaxy does at this moment. + if file != "." and not namespace_re.match(file): + continue + file_path = search_dir / file / "galaxy.yml" + if file_path.is_file(): + galaxy_paths.append(str(file_path)) + return galaxy_paths + + +def is_url(name: str) -> bool: + """Return True if a dependency name looks like an URL.""" + return bool(re.match("^git[+@]", name)) |