summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/collection/concrete_artifact_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/galaxy/collection/concrete_artifact_manager.py')
-rw-r--r--lib/ansible/galaxy/collection/concrete_artifact_manager.py755
1 files changed, 755 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/collection/concrete_artifact_manager.py b/lib/ansible/galaxy/collection/concrete_artifact_manager.py
new file mode 100644
index 0000000..7c920b8
--- /dev/null
+++ b/lib/ansible/galaxy/collection/concrete_artifact_manager.py
@@ -0,0 +1,755 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2020-2021, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Concrete collection candidate management helper module."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+import os
+import tarfile
+import subprocess
+import typing as t
+
+from contextlib import contextmanager
+from hashlib import sha256
+from urllib.error import URLError
+from urllib.parse import urldefrag
+from shutil import rmtree
+from tempfile import mkdtemp
+
+if t.TYPE_CHECKING:
+ from ansible.galaxy.dependency_resolution.dataclasses import (
+ Candidate, Requirement,
+ )
+ from ansible.galaxy.token import GalaxyToken
+
+from ansible.errors import AnsibleError
+from ansible.galaxy import get_collections_galaxy_meta_info
+from ansible.galaxy.dependency_resolution.dataclasses import _GALAXY_YAML
+from ansible.galaxy.user_agent import user_agent
+from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.common.process import get_bin_path
+from ansible.module_utils.common._collections_compat import MutableMapping
+from ansible.module_utils.common.yaml import yaml_load
+from ansible.module_utils.six import raise_from
+from ansible.module_utils.urls import open_url
+from ansible.utils.display import Display
+from ansible.utils.sentinel import Sentinel
+
+import yaml
+
+
+display = Display()
+
+MANIFEST_FILENAME = 'MANIFEST.json'
+
+
+class ConcreteArtifactsManager:
+ """Manager for on-disk collection artifacts.
+
+ It is responsible for:
+ * downloading remote collections from Galaxy-compatible servers and
+ direct links to tarballs or SCM repositories
+ * keeping track of local ones
+ * keeping track of Galaxy API tokens for downloads from Galaxy'ish
+ as well as the artifact hashes
+ * keeping track of Galaxy API signatures for downloads from Galaxy'ish
+ * caching all of above
+ * retrieving the metadata out of the downloaded artifacts
+ """
+ def __init__(self, b_working_directory, validate_certs=True, keyring=None, timeout=60, required_signature_count=None, ignore_signature_errors=None):
+ # type: (bytes, bool, str, int, str, list[str]) -> None
+ """Initialize ConcreteArtifactsManager caches and costraints."""
+ self._validate_certs = validate_certs # type: bool
+ self._artifact_cache = {} # type: dict[bytes, bytes]
+ self._galaxy_artifact_cache = {} # type: dict[Candidate | Requirement, bytes]
+ self._artifact_meta_cache = {} # type: dict[bytes, dict[str, str | list[str] | dict[str, str] | None | t.Type[Sentinel]]]
+ self._galaxy_collection_cache = {} # type: dict[Candidate | Requirement, tuple[str, str, GalaxyToken]]
+ self._galaxy_collection_origin_cache = {} # type: dict[Candidate, tuple[str, list[dict[str, str]]]]
+ self._b_working_directory = b_working_directory # type: bytes
+ self._supplemental_signature_cache = {} # type: dict[str, str]
+ self._keyring = keyring # type: str
+ self.timeout = timeout # type: int
+ self._required_signature_count = required_signature_count # type: str
+ self._ignore_signature_errors = ignore_signature_errors # type: list[str]
+ self._require_build_metadata = True # type: bool
+
+ @property
+ def keyring(self):
+ return self._keyring
+
+ @property
+ def required_successful_signature_count(self):
+ return self._required_signature_count
+
+ @property
+ def ignore_signature_errors(self):
+ if self._ignore_signature_errors is None:
+ return []
+ return self._ignore_signature_errors
+
+ @property
+ def require_build_metadata(self):
+ # type: () -> bool
+ return self._require_build_metadata
+
+ @require_build_metadata.setter
+ def require_build_metadata(self, value):
+ # type: (bool) -> None
+ self._require_build_metadata = value
+
+ def get_galaxy_artifact_source_info(self, collection):
+ # type: (Candidate) -> dict[str, t.Union[str, list[dict[str, str]]]]
+ server = collection.src.api_server
+
+ try:
+ download_url = self._galaxy_collection_cache[collection][0]
+ signatures_url, signatures = self._galaxy_collection_origin_cache[collection]
+ except KeyError as key_err:
+ raise RuntimeError(
+ 'The is no known source for {coll!s}'.
+ format(coll=collection),
+ ) from key_err
+
+ return {
+ "format_version": "1.0.0",
+ "namespace": collection.namespace,
+ "name": collection.name,
+ "version": collection.ver,
+ "server": server,
+ "version_url": signatures_url,
+ "download_url": download_url,
+ "signatures": signatures,
+ }
+
+ def get_galaxy_artifact_path(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> bytes
+ """Given a Galaxy-stored collection, return a cached path.
+
+ If it's not yet on disk, this method downloads the artifact first.
+ """
+ try:
+ return self._galaxy_artifact_cache[collection]
+ except KeyError:
+ pass
+
+ try:
+ url, sha256_hash, token = self._galaxy_collection_cache[collection]
+ except KeyError as key_err:
+ raise_from(
+ RuntimeError(
+ 'The is no known source for {coll!s}'.
+ format(coll=collection),
+ ),
+ key_err,
+ )
+
+ display.vvvv(
+ "Fetching a collection tarball for '{collection!s}' from "
+ 'Ansible Galaxy'.format(collection=collection),
+ )
+
+ try:
+ b_artifact_path = _download_file(
+ url,
+ self._b_working_directory,
+ expected_hash=sha256_hash,
+ validate_certs=self._validate_certs,
+ token=token,
+ ) # type: bytes
+ except URLError as err:
+ raise_from(
+ AnsibleError(
+ 'Failed to download collection tar '
+ "from '{coll_src!s}': {download_err!s}".
+ format(
+ coll_src=to_native(collection.src),
+ download_err=to_native(err),
+ ),
+ ),
+ err,
+ )
+ else:
+ display.vvv(
+ "Collection '{coll!s}' obtained from "
+ 'server {server!s} {url!s}'.format(
+ coll=collection, server=collection.src or 'Galaxy',
+ url=collection.src.api_server if collection.src is not None
+ else '',
+ )
+ )
+
+ self._galaxy_artifact_cache[collection] = b_artifact_path
+ return b_artifact_path
+
+ def get_artifact_path(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> bytes
+ """Given a concrete collection pointer, return a cached path.
+
+ If it's not yet on disk, this method downloads the artifact first.
+ """
+ try:
+ return self._artifact_cache[collection.src]
+ except KeyError:
+ pass
+
+ # NOTE: SCM needs to be special-cased as it may contain either
+ # NOTE: one collection in its root, or a number of top-level
+ # NOTE: collection directories instead.
+ # NOTE: The idea is to store the SCM collection as unpacked
+ # NOTE: directory structure under the temporary location and use
+ # NOTE: a "virtual" collection that has pinned requirements on
+ # NOTE: the directories under that SCM checkout that correspond
+ # NOTE: to collections.
+ # NOTE: This brings us to the idea that we need two separate
+ # NOTE: virtual Requirement/Candidate types --
+ # NOTE: (single) dir + (multidir) subdirs
+ if collection.is_url:
+ display.vvvv(
+ "Collection requirement '{collection!s}' is a URL "
+ 'to a tar artifact'.format(collection=collection.fqcn),
+ )
+ try:
+ b_artifact_path = _download_file(
+ collection.src,
+ self._b_working_directory,
+ expected_hash=None, # NOTE: URLs don't support checksums
+ validate_certs=self._validate_certs,
+ timeout=self.timeout
+ )
+ except Exception as err:
+ raise_from(
+ AnsibleError(
+ 'Failed to download collection tar '
+ "from '{coll_src!s}': {download_err!s}".
+ format(
+ coll_src=to_native(collection.src),
+ download_err=to_native(err),
+ ),
+ ),
+ err,
+ )
+ elif collection.is_scm:
+ b_artifact_path = _extract_collection_from_git(
+ collection.src,
+ collection.ver,
+ self._b_working_directory,
+ )
+ elif collection.is_file or collection.is_dir or collection.is_subdirs:
+ b_artifact_path = to_bytes(collection.src)
+ else:
+ # NOTE: This may happen `if collection.is_online_index_pointer`
+ raise RuntimeError(
+ 'The artifact is of an unexpected type {art_type!s}'.
+ format(art_type=collection.type)
+ )
+
+ self._artifact_cache[collection.src] = b_artifact_path
+ return b_artifact_path
+
+ def _get_direct_collection_namespace(self, collection):
+ # type: (Candidate) -> t.Optional[str]
+ return self.get_direct_collection_meta(collection)['namespace'] # type: ignore[return-value]
+
+ def _get_direct_collection_name(self, collection):
+ # type: (Candidate) -> t.Optional[str]
+ return self.get_direct_collection_meta(collection)['name'] # type: ignore[return-value]
+
+ def get_direct_collection_fqcn(self, collection):
+ # type: (Candidate) -> t.Optional[str]
+ """Extract FQCN from the given on-disk collection artifact.
+
+ If the collection is virtual, ``None`` is returned instead
+ of a string.
+ """
+ if collection.is_virtual:
+ # NOTE: should it be something like "<virtual>"?
+ return None
+
+ return '.'.join(( # type: ignore[type-var]
+ self._get_direct_collection_namespace(collection), # type: ignore[arg-type]
+ self._get_direct_collection_name(collection),
+ ))
+
+ def get_direct_collection_version(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> str
+ """Extract version from the given on-disk collection artifact."""
+ return self.get_direct_collection_meta(collection)['version'] # type: ignore[return-value]
+
+ def get_direct_collection_dependencies(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> dict[str, str]
+ """Extract deps from the given on-disk collection artifact."""
+ collection_dependencies = self.get_direct_collection_meta(collection)['dependencies']
+ if collection_dependencies is None:
+ collection_dependencies = {}
+ return collection_dependencies # type: ignore[return-value]
+
+ def get_direct_collection_meta(self, collection):
+ # type: (t.Union[Candidate, Requirement]) -> dict[str, t.Union[str, dict[str, str], list[str], None, t.Type[Sentinel]]]
+ """Extract meta from the given on-disk collection artifact."""
+ try: # FIXME: use unique collection identifier as a cache key?
+ return self._artifact_meta_cache[collection.src]
+ except KeyError:
+ b_artifact_path = self.get_artifact_path(collection)
+
+ if collection.is_url or collection.is_file:
+ collection_meta = _get_meta_from_tar(b_artifact_path)
+ elif collection.is_dir: # should we just build a coll instead?
+ # FIXME: what if there's subdirs?
+ try:
+ collection_meta = _get_meta_from_dir(b_artifact_path, self.require_build_metadata)
+ except LookupError as lookup_err:
+ raise_from(
+ AnsibleError(
+ 'Failed to find the collection dir deps: {err!s}'.
+ format(err=to_native(lookup_err)),
+ ),
+ lookup_err,
+ )
+ elif collection.is_scm:
+ collection_meta = {
+ 'name': None,
+ 'namespace': None,
+ 'dependencies': {to_native(b_artifact_path): '*'},
+ 'version': '*',
+ }
+ elif collection.is_subdirs:
+ collection_meta = {
+ 'name': None,
+ 'namespace': None,
+ # NOTE: Dropping b_artifact_path since it's based on src anyway
+ 'dependencies': dict.fromkeys(
+ map(to_native, collection.namespace_collection_paths),
+ '*',
+ ),
+ 'version': '*',
+ }
+ else:
+ raise RuntimeError
+
+ self._artifact_meta_cache[collection.src] = collection_meta
+ return collection_meta
+
+ def save_collection_source(self, collection, url, sha256_hash, token, signatures_url, signatures):
+ # type: (Candidate, str, str, GalaxyToken, str, list[dict[str, str]]) -> None
+ """Store collection URL, SHA256 hash and Galaxy API token.
+
+ This is a hook that is supposed to be called before attempting to
+ download Galaxy-based collections with ``get_galaxy_artifact_path()``.
+ """
+ self._galaxy_collection_cache[collection] = url, sha256_hash, token
+ self._galaxy_collection_origin_cache[collection] = signatures_url, signatures
+
+ @classmethod
+ @contextmanager
+ def under_tmpdir(
+ cls,
+ temp_dir_base, # type: str
+ validate_certs=True, # type: bool
+ keyring=None, # type: str
+ required_signature_count=None, # type: str
+ ignore_signature_errors=None, # type: list[str]
+ require_build_metadata=True, # type: bool
+ ): # type: (...) -> t.Iterator[ConcreteArtifactsManager]
+ """Custom ConcreteArtifactsManager constructor with temp dir.
+
+ This method returns a context manager that allocates and cleans
+ up a temporary directory for caching the collection artifacts
+ during the dependency resolution process.
+ """
+ # NOTE: Can't use `with tempfile.TemporaryDirectory:`
+ # NOTE: because it's not in Python 2 stdlib.
+ temp_path = mkdtemp(
+ dir=to_bytes(temp_dir_base, errors='surrogate_or_strict'),
+ )
+ b_temp_path = to_bytes(temp_path, errors='surrogate_or_strict')
+ try:
+ yield cls(
+ b_temp_path,
+ validate_certs,
+ keyring=keyring,
+ required_signature_count=required_signature_count,
+ ignore_signature_errors=ignore_signature_errors
+ )
+ finally:
+ rmtree(b_temp_path)
+
+
+def parse_scm(collection, version):
+ """Extract name, version, path and subdir out of the SCM pointer."""
+ if ',' in collection:
+ collection, version = collection.split(',', 1)
+ elif version == '*' or not version:
+ version = 'HEAD'
+
+ if collection.startswith('git+'):
+ path = collection[4:]
+ else:
+ path = collection
+
+ path, fragment = urldefrag(path)
+ fragment = fragment.strip(os.path.sep)
+
+ if path.endswith(os.path.sep + '.git'):
+ name = path.split(os.path.sep)[-2]
+ elif '://' not in path and '@' not in path:
+ name = path
+ else:
+ name = path.split('/')[-1]
+ if name.endswith('.git'):
+ name = name[:-4]
+
+ return name, version, path, fragment
+
+
+def _extract_collection_from_git(repo_url, coll_ver, b_path):
+ name, version, git_url, fragment = parse_scm(repo_url, coll_ver)
+ b_checkout_path = mkdtemp(
+ dir=b_path,
+ prefix=to_bytes(name, errors='surrogate_or_strict'),
+ ) # type: bytes
+
+ try:
+ git_executable = get_bin_path('git')
+ except ValueError as err:
+ raise AnsibleError(
+ "Could not find git executable to extract the collection from the Git repository `{repo_url!s}`.".
+ format(repo_url=to_native(git_url))
+ ) from err
+
+ # Perform a shallow clone if simply cloning HEAD
+ if version == 'HEAD':
+ git_clone_cmd = git_executable, 'clone', '--depth=1', git_url, to_text(b_checkout_path)
+ else:
+ git_clone_cmd = git_executable, 'clone', git_url, to_text(b_checkout_path)
+ # FIXME: '--branch', version
+
+ try:
+ subprocess.check_call(git_clone_cmd)
+ except subprocess.CalledProcessError as proc_err:
+ raise_from(
+ AnsibleError( # should probably be LookupError
+ 'Failed to clone a Git repository from `{repo_url!s}`.'.
+ format(repo_url=to_native(git_url)),
+ ),
+ proc_err,
+ )
+
+ git_switch_cmd = git_executable, 'checkout', to_text(version)
+ try:
+ subprocess.check_call(git_switch_cmd, cwd=b_checkout_path)
+ except subprocess.CalledProcessError as proc_err:
+ raise_from(
+ AnsibleError( # should probably be LookupError
+ 'Failed to switch a cloned Git repo `{repo_url!s}` '
+ 'to the requested revision `{commitish!s}`.'.
+ format(
+ commitish=to_native(version),
+ repo_url=to_native(git_url),
+ ),
+ ),
+ proc_err,
+ )
+
+ return (
+ os.path.join(b_checkout_path, to_bytes(fragment))
+ if fragment else b_checkout_path
+ )
+
+
+# FIXME: use random subdirs while preserving the file names
+def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeout=60):
+ # type: (str, bytes, t.Optional[str], bool, GalaxyToken, int) -> bytes
+ # ^ NOTE: used in download and verify_collections ^
+ b_tarball_name = to_bytes(
+ url.rsplit('/', 1)[1], errors='surrogate_or_strict',
+ )
+ b_file_name = b_tarball_name[:-len('.tar.gz')]
+
+ b_tarball_dir = mkdtemp(
+ dir=b_path,
+ prefix=b'-'.join((b_file_name, b'')),
+ ) # type: bytes
+
+ b_file_path = os.path.join(b_tarball_dir, b_tarball_name)
+
+ display.display("Downloading %s to %s" % (url, to_text(b_tarball_dir)))
+ # NOTE: Galaxy redirects downloads to S3 which rejects the request
+ # NOTE: if an Authorization header is attached so don't redirect it
+ resp = open_url(
+ to_native(url, errors='surrogate_or_strict'),
+ validate_certs=validate_certs,
+ headers=None if token is None else token.headers(),
+ unredirected_headers=['Authorization'], http_agent=user_agent(),
+ timeout=timeout
+ )
+
+ with open(b_file_path, 'wb') as download_file: # type: t.BinaryIO
+ actual_hash = _consume_file(resp, write_to=download_file)
+
+ if expected_hash:
+ display.vvvv(
+ 'Validating downloaded file hash {actual_hash!s} with '
+ 'expected hash {expected_hash!s}'.
+ format(actual_hash=actual_hash, expected_hash=expected_hash)
+ )
+ if expected_hash != actual_hash:
+ raise AnsibleError('Mismatch artifact hash with downloaded file')
+
+ return b_file_path
+
+
+def _consume_file(read_from, write_to=None):
+ # type: (t.BinaryIO, t.BinaryIO) -> str
+ bufsize = 65536
+ sha256_digest = sha256()
+ data = read_from.read(bufsize)
+ while data:
+ if write_to is not None:
+ write_to.write(data)
+ write_to.flush()
+ sha256_digest.update(data)
+ data = read_from.read(bufsize)
+
+ return sha256_digest.hexdigest()
+
+
+def _normalize_galaxy_yml_manifest(
+ galaxy_yml, # type: dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ b_galaxy_yml_path, # type: bytes
+ require_build_metadata=True, # type: bool
+):
+ # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ galaxy_yml_schema = (
+ get_collections_galaxy_meta_info()
+ ) # type: list[dict[str, t.Any]] # FIXME: <--
+ # FIXME: 👆maybe precise type: list[dict[str, t.Union[bool, str, list[str]]]]
+
+ mandatory_keys = set()
+ string_keys = set() # type: set[str]
+ list_keys = set() # type: set[str]
+ dict_keys = set() # type: set[str]
+ sentinel_keys = set() # type: set[str]
+
+ for info in galaxy_yml_schema:
+ if info.get('required', False):
+ mandatory_keys.add(info['key'])
+
+ key_list_type = {
+ 'str': string_keys,
+ 'list': list_keys,
+ 'dict': dict_keys,
+ 'sentinel': sentinel_keys,
+ }[info.get('type', 'str')]
+ key_list_type.add(info['key'])
+
+ all_keys = frozenset(mandatory_keys | string_keys | list_keys | dict_keys | sentinel_keys)
+
+ set_keys = set(galaxy_yml.keys())
+ missing_keys = mandatory_keys.difference(set_keys)
+ if missing_keys:
+ msg = (
+ "The collection galaxy.yml at '%s' is missing the following mandatory keys: %s"
+ % (to_native(b_galaxy_yml_path), ", ".join(sorted(missing_keys)))
+ )
+ if require_build_metadata:
+ raise AnsibleError(msg)
+ display.warning(msg)
+ raise ValueError(msg)
+
+ extra_keys = set_keys.difference(all_keys)
+ if len(extra_keys) > 0:
+ display.warning("Found unknown keys in collection galaxy.yml at '%s': %s"
+ % (to_text(b_galaxy_yml_path), ", ".join(extra_keys)))
+
+ # Add the defaults if they have not been set
+ for optional_string in string_keys:
+ if optional_string not in galaxy_yml:
+ galaxy_yml[optional_string] = None
+
+ for optional_list in list_keys:
+ list_val = galaxy_yml.get(optional_list, None)
+
+ if list_val is None:
+ galaxy_yml[optional_list] = []
+ elif not isinstance(list_val, list):
+ galaxy_yml[optional_list] = [list_val] # type: ignore[list-item]
+
+ for optional_dict in dict_keys:
+ if optional_dict not in galaxy_yml:
+ galaxy_yml[optional_dict] = {}
+
+ for optional_sentinel in sentinel_keys:
+ if optional_sentinel not in galaxy_yml:
+ galaxy_yml[optional_sentinel] = Sentinel
+
+ # NOTE: `version: null` is only allowed for `galaxy.yml`
+ # NOTE: and not `MANIFEST.json`. The use-case for it is collections
+ # NOTE: that generate the version from Git before building a
+ # NOTE: distributable tarball artifact.
+ if not galaxy_yml.get('version'):
+ galaxy_yml['version'] = '*'
+
+ return galaxy_yml
+
+
+def _get_meta_from_dir(
+ b_path, # type: bytes
+ require_build_metadata=True, # type: bool
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ try:
+ return _get_meta_from_installed_dir(b_path)
+ except LookupError:
+ return _get_meta_from_src_dir(b_path, require_build_metadata)
+
+
+def _get_meta_from_src_dir(
+ b_path, # type: bytes
+ require_build_metadata=True, # type: bool
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ galaxy_yml = os.path.join(b_path, _GALAXY_YAML)
+ if not os.path.isfile(galaxy_yml):
+ raise LookupError(
+ "The collection galaxy.yml path '{path!s}' does not exist.".
+ format(path=to_native(galaxy_yml))
+ )
+
+ with open(galaxy_yml, 'rb') as manifest_file_obj:
+ try:
+ manifest = yaml_load(manifest_file_obj)
+ except yaml.error.YAMLError as yaml_err:
+ raise_from(
+ AnsibleError(
+ "Failed to parse the galaxy.yml at '{path!s}' with "
+ 'the following error:\n{err_txt!s}'.
+ format(
+ path=to_native(galaxy_yml),
+ err_txt=to_native(yaml_err),
+ ),
+ ),
+ yaml_err,
+ )
+
+ if not isinstance(manifest, dict):
+ if require_build_metadata:
+ raise AnsibleError(f"The collection galaxy.yml at '{to_native(galaxy_yml)}' is incorrectly formatted.")
+ # Valid build metadata is not required by ansible-galaxy list. Raise ValueError to fall back to implicit metadata.
+ display.warning(f"The collection galaxy.yml at '{to_native(galaxy_yml)}' is incorrectly formatted.")
+ raise ValueError(f"The collection galaxy.yml at '{to_native(galaxy_yml)}' is incorrectly formatted.")
+
+ return _normalize_galaxy_yml_manifest(manifest, galaxy_yml, require_build_metadata)
+
+
+def _get_json_from_installed_dir(
+ b_path, # type: bytes
+ filename, # type: str
+): # type: (...) -> dict
+
+ b_json_filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict'))
+
+ try:
+ with open(b_json_filepath, 'rb') as manifest_fd:
+ b_json_text = manifest_fd.read()
+ except (IOError, OSError):
+ raise LookupError(
+ "The collection {manifest!s} path '{path!s}' does not exist.".
+ format(
+ manifest=filename,
+ path=to_native(b_json_filepath),
+ )
+ )
+
+ manifest_txt = to_text(b_json_text, errors='surrogate_or_strict')
+
+ try:
+ manifest = json.loads(manifest_txt)
+ except ValueError:
+ raise AnsibleError(
+ 'Collection tar file member {member!s} does not '
+ 'contain a valid json string.'.
+ format(member=filename),
+ )
+
+ return manifest
+
+
+def _get_meta_from_installed_dir(
+ b_path, # type: bytes
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ manifest = _get_json_from_installed_dir(b_path, MANIFEST_FILENAME)
+ collection_info = manifest['collection_info']
+
+ version = collection_info.get('version')
+ if not version:
+ raise AnsibleError(
+ u'Collection metadata file `{manifest_filename!s}` at `{meta_file!s}` is expected '
+ u'to have a valid SemVer version value but got {version!s}'.
+ format(
+ manifest_filename=MANIFEST_FILENAME,
+ meta_file=to_text(b_path),
+ version=to_text(repr(version)),
+ ),
+ )
+
+ return collection_info
+
+
+def _get_meta_from_tar(
+ b_path, # type: bytes
+): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
+ if not tarfile.is_tarfile(b_path):
+ raise AnsibleError(
+ "Collection artifact at '{path!s}' is not a valid tar file.".
+ format(path=to_native(b_path)),
+ )
+
+ with tarfile.open(b_path, mode='r') as collection_tar: # type: tarfile.TarFile
+ try:
+ member = collection_tar.getmember(MANIFEST_FILENAME)
+ except KeyError:
+ raise AnsibleError(
+ "Collection at '{path!s}' does not contain the "
+ 'required file {manifest_file!s}.'.
+ format(
+ path=to_native(b_path),
+ manifest_file=MANIFEST_FILENAME,
+ ),
+ )
+
+ with _tarfile_extract(collection_tar, member) as (_member, member_obj):
+ if member_obj is None:
+ raise AnsibleError(
+ 'Collection tar file does not contain '
+ 'member {member!s}'.format(member=MANIFEST_FILENAME),
+ )
+
+ text_content = to_text(
+ member_obj.read(),
+ errors='surrogate_or_strict',
+ )
+
+ try:
+ manifest = json.loads(text_content)
+ except ValueError:
+ raise AnsibleError(
+ 'Collection tar file member {member!s} does not '
+ 'contain a valid json string.'.
+ format(member=MANIFEST_FILENAME),
+ )
+ return manifest['collection_info']
+
+
+@contextmanager
+def _tarfile_extract(
+ tar, # type: tarfile.TarFile
+ member, # type: tarfile.TarInfo
+):
+ # type: (...) -> t.Iterator[tuple[tarfile.TarInfo, t.Optional[t.IO[bytes]]]]
+ tar_obj = tar.extractfile(member)
+ try:
+ yield member, tar_obj
+ finally:
+ if tar_obj is not None:
+ tar_obj.close()