summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/role.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 16:04:21 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 16:04:21 +0000
commit8a754e0858d922e955e71b253c139e071ecec432 (patch)
tree527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/galaxy/role.py
parentInitial commit. (diff)
downloadansible-core-8a754e0858d922e955e71b253c139e071ecec432.tar.xz
ansible-core-8a754e0858d922e955e71b253c139e071ecec432.zip
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/galaxy/role.py')
-rw-r--r--lib/ansible/galaxy/role.py439
1 files changed, 439 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/role.py b/lib/ansible/galaxy/role.py
new file mode 100644
index 0000000..99bb525
--- /dev/null
+++ b/lib/ansible/galaxy/role.py
@@ -0,0 +1,439 @@
+########################################################################
+#
+# (C) 2015, Brian Coca <bcoca@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+#
+########################################################################
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import errno
+import datetime
+import os
+import tarfile
+import tempfile
+
+from collections.abc import MutableSequence
+from shutil import rmtree
+
+from ansible import context
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.galaxy.api import GalaxyAPI
+from ansible.galaxy.user_agent import user_agent
+from ansible.module_utils._text import to_native, to_text
+from ansible.module_utils.common.yaml import yaml_dump, yaml_load
+from ansible.module_utils.compat.version import LooseVersion
+from ansible.module_utils.urls import open_url
+from ansible.playbook.role.requirement import RoleRequirement
+from ansible.utils.display import Display
+
+display = Display()
+
+
+class GalaxyRole(object):
+
+ SUPPORTED_SCMS = set(['git', 'hg'])
+ META_MAIN = (os.path.join('meta', 'main.yml'), os.path.join('meta', 'main.yaml'))
+ META_INSTALL = os.path.join('meta', '.galaxy_install_info')
+ META_REQUIREMENTS = (os.path.join('meta', 'requirements.yml'), os.path.join('meta', 'requirements.yaml'))
+ ROLE_DIRS = ('defaults', 'files', 'handlers', 'meta', 'tasks', 'templates', 'vars', 'tests')
+
+ def __init__(self, galaxy, api, name, src=None, version=None, scm=None, path=None):
+
+ self._metadata = None
+ self._metadata_dependencies = None
+ self._requirements = None
+ self._install_info = None
+ self._validate_certs = not context.CLIARGS['ignore_certs']
+
+ display.debug('Validate TLS certificates: %s' % self._validate_certs)
+
+ self.galaxy = galaxy
+ self._api = api
+
+ self.name = name
+ self.version = version
+ self.src = src or name
+ self.download_url = None
+ self.scm = scm
+ self.paths = [os.path.join(x, self.name) for x in galaxy.roles_paths]
+
+ if path is not None:
+ if not path.endswith(os.path.join(os.path.sep, self.name)):
+ path = os.path.join(path, self.name)
+ else:
+ # Look for a meta/main.ya?ml inside the potential role dir in case
+ # the role name is the same as parent directory of the role.
+ #
+ # Example:
+ # ./roles/testing/testing/meta/main.yml
+ for meta_main in self.META_MAIN:
+ if os.path.exists(os.path.join(path, name, meta_main)):
+ path = os.path.join(path, self.name)
+ break
+ self.path = path
+ else:
+ # use the first path by default
+ self.path = self.paths[0]
+
+ def __repr__(self):
+ """
+ Returns "rolename (version)" if version is not null
+ Returns "rolename" otherwise
+ """
+ if self.version:
+ return "%s (%s)" % (self.name, self.version)
+ else:
+ return self.name
+
+ def __eq__(self, other):
+ return self.name == other.name
+
+ @property
+ def api(self):
+ if not isinstance(self._api, GalaxyAPI):
+ return self._api.api
+ return self._api
+
+ @property
+ def metadata(self):
+ """
+ Returns role metadata
+ """
+ if self._metadata is None:
+ for path in self.paths:
+ for meta_main in self.META_MAIN:
+ meta_path = os.path.join(path, meta_main)
+ if os.path.isfile(meta_path):
+ try:
+ with open(meta_path, 'r') as f:
+ self._metadata = yaml_load(f)
+ except Exception:
+ display.vvvvv("Unable to load metadata for %s" % self.name)
+ return False
+ break
+
+ return self._metadata
+
+ @property
+ def metadata_dependencies(self):
+ """
+ Returns a list of dependencies from role metadata
+ """
+ if self._metadata_dependencies is None:
+ self._metadata_dependencies = []
+
+ if self.metadata is not None:
+ self._metadata_dependencies = self.metadata.get('dependencies') or []
+
+ if not isinstance(self._metadata_dependencies, MutableSequence):
+ raise AnsibleParserError(
+ f"Expected role dependencies to be a list. Role {self} has meta/main.yml with dependencies {self._metadata_dependencies}"
+ )
+
+ return self._metadata_dependencies
+
+ @property
+ def install_info(self):
+ """
+ Returns role install info
+ """
+ if self._install_info is None:
+
+ info_path = os.path.join(self.path, self.META_INSTALL)
+ if os.path.isfile(info_path):
+ try:
+ f = open(info_path, 'r')
+ self._install_info = yaml_load(f)
+ except Exception:
+ display.vvvvv("Unable to load Galaxy install info for %s" % self.name)
+ return False
+ finally:
+ f.close()
+ return self._install_info
+
+ @property
+ def _exists(self):
+ for path in self.paths:
+ if os.path.isdir(path):
+ return True
+
+ return False
+
+ def _write_galaxy_install_info(self):
+ """
+ Writes a YAML-formatted file to the role's meta/ directory
+ (named .galaxy_install_info) which contains some information
+ we can use later for commands like 'list' and 'info'.
+ """
+
+ info = dict(
+ version=self.version,
+ install_date=datetime.datetime.utcnow().strftime("%c"),
+ )
+ if not os.path.exists(os.path.join(self.path, 'meta')):
+ os.makedirs(os.path.join(self.path, 'meta'))
+ info_path = os.path.join(self.path, self.META_INSTALL)
+ with open(info_path, 'w+') as f:
+ try:
+ self._install_info = yaml_dump(info, f)
+ except Exception:
+ return False
+
+ return True
+
+ def remove(self):
+ """
+ Removes the specified role from the roles path.
+ There is a sanity check to make sure there's a meta/main.yml file at this
+ path so the user doesn't blow away random directories.
+ """
+ if self.metadata:
+ try:
+ rmtree(self.path)
+ return True
+ except Exception:
+ pass
+
+ return False
+
+ def fetch(self, role_data):
+ """
+ Downloads the archived role to a temp location based on role data
+ """
+ if role_data:
+
+ # first grab the file and save it to a temp location
+ if self.download_url is not None:
+ archive_url = self.download_url
+ elif "github_user" in role_data and "github_repo" in role_data:
+ archive_url = 'https://github.com/%s/%s/archive/%s.tar.gz' % (role_data["github_user"], role_data["github_repo"], self.version)
+ else:
+ archive_url = self.src
+
+ display.display("- downloading role from %s" % archive_url)
+
+ try:
+ url_file = open_url(archive_url, validate_certs=self._validate_certs, http_agent=user_agent())
+ temp_file = tempfile.NamedTemporaryFile(delete=False)
+ data = url_file.read()
+ while data:
+ temp_file.write(data)
+ data = url_file.read()
+ temp_file.close()
+ return temp_file.name
+ except Exception as e:
+ display.error(u"failed to download the file: %s" % to_text(e))
+
+ return False
+
+ def install(self):
+
+ if self.scm:
+ # create tar file from scm url
+ tmp_file = RoleRequirement.scm_archive_role(keep_scm_meta=context.CLIARGS['keep_scm_meta'], **self.spec)
+ elif self.src:
+ if os.path.isfile(self.src):
+ tmp_file = self.src
+ elif '://' in self.src:
+ role_data = self.src
+ tmp_file = self.fetch(role_data)
+ else:
+ role_data = self.api.lookup_role_by_name(self.src)
+ if not role_data:
+ raise AnsibleError("- sorry, %s was not found on %s." % (self.src, self.api.api_server))
+
+ if role_data.get('role_type') == 'APP':
+ # Container Role
+ display.warning("%s is a Container App role, and should only be installed using Ansible "
+ "Container" % self.name)
+
+ role_versions = self.api.fetch_role_related('versions', role_data['id'])
+ if not self.version:
+ # convert the version names to LooseVersion objects
+ # and sort them to get the latest version. If there
+ # are no versions in the list, we'll grab the head
+ # of the master branch
+ if len(role_versions) > 0:
+ loose_versions = [LooseVersion(a.get('name', None)) for a in role_versions]
+ try:
+ loose_versions.sort()
+ except TypeError:
+ raise AnsibleError(
+ 'Unable to compare role versions (%s) to determine the most recent version due to incompatible version formats. '
+ 'Please contact the role author to resolve versioning conflicts, or specify an explicit role version to '
+ 'install.' % ', '.join([v.vstring for v in loose_versions])
+ )
+ self.version = to_text(loose_versions[-1])
+ elif role_data.get('github_branch', None):
+ self.version = role_data['github_branch']
+ else:
+ self.version = 'master'
+ elif self.version != 'master':
+ if role_versions and to_text(self.version) not in [a.get('name', None) for a in role_versions]:
+ raise AnsibleError("- the specified version (%s) of %s was not found in the list of available versions (%s)." % (self.version,
+ self.name,
+ role_versions))
+
+ # check if there's a source link/url for our role_version
+ for role_version in role_versions:
+ if role_version['name'] == self.version and 'source' in role_version:
+ self.src = role_version['source']
+ if role_version['name'] == self.version and 'download_url' in role_version:
+ self.download_url = role_version['download_url']
+
+ tmp_file = self.fetch(role_data)
+
+ else:
+ raise AnsibleError("No valid role data found")
+
+ if tmp_file:
+
+ display.debug("installing from %s" % tmp_file)
+
+ if not tarfile.is_tarfile(tmp_file):
+ raise AnsibleError("the downloaded file does not appear to be a valid tar archive.")
+ else:
+ role_tar_file = tarfile.open(tmp_file, "r")
+ # verify the role's meta file
+ meta_file = None
+ members = role_tar_file.getmembers()
+ # next find the metadata file
+ for member in members:
+ for meta_main in self.META_MAIN:
+ if meta_main in member.name:
+ # Look for parent of meta/main.yml
+ # Due to possibility of sub roles each containing meta/main.yml
+ # look for shortest length parent
+ meta_parent_dir = os.path.dirname(os.path.dirname(member.name))
+ if not meta_file:
+ archive_parent_dir = meta_parent_dir
+ meta_file = member
+ else:
+ if len(meta_parent_dir) < len(archive_parent_dir):
+ archive_parent_dir = meta_parent_dir
+ meta_file = member
+ if not meta_file:
+ raise AnsibleError("this role does not appear to have a meta/main.yml file.")
+ else:
+ try:
+ self._metadata = yaml_load(role_tar_file.extractfile(meta_file))
+ except Exception:
+ raise AnsibleError("this role does not appear to have a valid meta/main.yml file.")
+
+ paths = self.paths
+ if self.path != paths[0]:
+ # path can be passed though __init__
+ # FIXME should this be done in __init__?
+ paths[:0] = self.path
+ paths_len = len(paths)
+ for idx, path in enumerate(paths):
+ self.path = path
+ display.display("- extracting %s to %s" % (self.name, self.path))
+ try:
+ if os.path.exists(self.path):
+ if not os.path.isdir(self.path):
+ raise AnsibleError("the specified roles path exists and is not a directory.")
+ elif not context.CLIARGS.get("force", False):
+ raise AnsibleError("the specified role %s appears to already exist. Use --force to replace it." % self.name)
+ else:
+ # using --force, remove the old path
+ if not self.remove():
+ raise AnsibleError("%s doesn't appear to contain a role.\n please remove this directory manually if you really "
+ "want to put the role here." % self.path)
+ else:
+ os.makedirs(self.path)
+
+ # We strip off any higher-level directories for all of the files
+ # contained within the tar file here. The default is 'github_repo-target'.
+ # Gerrit instances, on the other hand, does not have a parent directory at all.
+ for member in members:
+ # we only extract files, and remove any relative path
+ # bits that might be in the file for security purposes
+ # and drop any containing directory, as mentioned above
+ if member.isreg() or member.issym():
+ n_member_name = to_native(member.name)
+ n_archive_parent_dir = to_native(archive_parent_dir)
+ n_parts = n_member_name.replace(n_archive_parent_dir, "", 1).split(os.sep)
+ n_final_parts = []
+ for n_part in n_parts:
+ # TODO if the condition triggers it produces a broken installation.
+ # It will create the parent directory as an empty file and will
+ # explode if the directory contains valid files.
+ # Leaving this as is since the whole module needs a rewrite.
+ if n_part != '..' and not n_part.startswith('~') and '$' not in n_part:
+ n_final_parts.append(n_part)
+ member.name = os.path.join(*n_final_parts)
+ role_tar_file.extract(member, to_native(self.path))
+
+ # write out the install info file for later use
+ self._write_galaxy_install_info()
+ break
+ except OSError as e:
+ if e.errno == errno.EACCES and idx < paths_len - 1:
+ continue
+ raise AnsibleError("Could not update files in %s: %s" % (self.path, to_native(e)))
+
+ # return the parsed yaml metadata
+ display.display("- %s was installed successfully" % str(self))
+ if not (self.src and os.path.isfile(self.src)):
+ try:
+ os.unlink(tmp_file)
+ except (OSError, IOError) as e:
+ display.warning(u"Unable to remove tmp file (%s): %s" % (tmp_file, to_text(e)))
+ return True
+
+ return False
+
+ @property
+ def spec(self):
+ """
+ Returns role spec info
+ {
+ 'scm': 'git',
+ 'src': 'http://git.example.com/repos/repo.git',
+ 'version': 'v1.0',
+ 'name': 'repo'
+ }
+ """
+ return dict(scm=self.scm, src=self.src, version=self.version, name=self.name)
+
+ @property
+ def requirements(self):
+ """
+ Returns role requirements
+ """
+ if self._requirements is None:
+ self._requirements = []
+ for meta_requirements in self.META_REQUIREMENTS:
+ meta_path = os.path.join(self.path, meta_requirements)
+ if os.path.isfile(meta_path):
+ try:
+ f = open(meta_path, 'r')
+ self._requirements = yaml_load(f)
+ except Exception:
+ display.vvvvv("Unable to load requirements for %s" % self.name)
+ finally:
+ f.close()
+
+ break
+
+ if not isinstance(self._requirements, MutableSequence):
+ raise AnsibleParserError(f"Expected role dependencies to be a list. Role {self} has meta/requirements.yml {self._requirements}")
+
+ return self._requirements