diff options
Diffstat (limited to 'ansible_collections/community/sops/plugins')
12 files changed, 2525 insertions, 0 deletions
diff --git a/ansible_collections/community/sops/plugins/action/load_vars.py b/ansible_collections/community/sops/plugins/action/load_vars.py new file mode 100644 index 00000000..0d67732f --- /dev/null +++ b/ansible_collections/community/sops/plugins/action/load_vars.py @@ -0,0 +1,83 @@ +# Copyright (c) 2020, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.module_utils.common._collections_compat import Sequence, Mapping +from ansible.module_utils.six import iteritems, string_types +from ansible.module_utils.common.text.converters import to_native +from ansible.utils.display import Display + +from ansible_collections.community.sops.plugins.module_utils.sops import Sops, get_sops_argument_spec + +from ansible_collections.community.sops.plugins.plugin_utils.action_module import ActionModuleBase, ArgumentSpec + +display = Display() + + +class ActionModule(ActionModuleBase): + + def _load(self, filename, module): + def get_option_value(argument_name): + return module.params.get(argument_name) + + output = Sops.decrypt(filename, display=display, get_option_value=get_option_value) + + data = self._loader.load(output, file_name=filename, show_content=False) + if not data: + data = dict() + if not isinstance(data, dict): + # Should not happen with sops-encrypted files + raise Exception('{0} must be stored as a dictionary/hash'.format(to_native(filename))) + return data + + def _evaluate(self, value): + if isinstance(value, string_types): + # must come *before* Sequence, as strings are also instances of Sequence + return self._templar.template(value) + if isinstance(value, Sequence): + return [self._evaluate(v) for v in value] + if isinstance(value, Mapping): + return dict((k, self._evaluate(v)) for k, v in iteritems(value)) + return value + + @staticmethod + def setup_module(): + argument_spec = ArgumentSpec( + argument_spec=dict( + file=dict(type='path', required=True), + name=dict(type='str'), + expressions=dict(type='str', default='ignore', choices=['ignore', 'evaluate-on-load']), + ), + ) + argument_spec.argument_spec.update(get_sops_argument_spec()) + return argument_spec, {} + + def run_module(self, module): + data = dict() + files = [] + try: + filename = self._find_needle('vars', module.params['file']) + data.update(self._load(filename, module)) + files.append(filename) + except Exception as e: + module.fail_json(msg=to_native(e)) + + name = module.params['name'] + if name is None: + value = data + else: + value = dict() + value[name] = data + + expressions = module.params['expressions'] + if expressions == 'evaluate-on-load': + value = self._evaluate(value) + + module.exit_json( + ansible_included_var_files=files, + ansible_facts=value, + _ansible_no_log=True, + ) diff --git a/ansible_collections/community/sops/plugins/doc_fragments/attributes.py b/ansible_collections/community/sops/plugins/doc_fragments/attributes.py new file mode 100644 index 00000000..72298504 --- /dev/null +++ b/ansible_collections/community/sops/plugins/doc_fragments/attributes.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +class ModuleDocFragment(object): + + # Standard documentation fragment + DOCUMENTATION = r''' +options: {} +attributes: + check_mode: + description: Can run in C(check_mode) and return changed status prediction without modifying target. + diff_mode: + description: Will return details on what has changed (or possibly needs changing in C(check_mode)), when in diff mode. +''' + + # Should be used together with the standard fragment + INFO_MODULE = r''' +options: {} +attributes: + check_mode: + support: full + details: + - This action does not modify state. + diff_mode: + support: N/A + details: + - This action does not modify state. +''' + + FACTS = r''' +options: {} +attributes: + facts: + description: Action returns an C(ansible_facts) dictionary that will update existing host facts. +''' + + # Should be used together with the standard fragment and the FACTS fragment + FACTS_MODULE = r''' +options: {} +attributes: + check_mode: + support: full + details: + - This action does not modify state. + diff_mode: + support: N/A + details: + - This action does not modify state. + facts: + support: full +''' + + FILES = r''' +options: {} +attributes: + safe_file_operations: + description: Uses Ansible's strict file operation functions to ensure proper permissions and avoid data corruption. +''' + + FLOW = r''' +options: {} +attributes: + action: + description: Indicates this has a corresponding action plugin so some parts of the options can be executed on the controller. + async: + description: Supports being used with the C(async) keyword. +''' diff --git a/ansible_collections/community/sops/plugins/doc_fragments/sops.py b/ansible_collections/community/sops/plugins/doc_fragments/sops.py new file mode 100644 index 00000000..ffbfe2d5 --- /dev/null +++ b/ansible_collections/community/sops/plugins/doc_fragments/sops.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2020 Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +class ModuleDocFragment(object): + DOCUMENTATION = r''' +options: + sops_binary: + description: + - Path to the sops binary. + - By default uses C(sops). + type: path + version_added: 1.0.0 + age_key: + description: + - One or more age private keys that can be used to decrypt encrypted files. + - Will be set as the C(SOPS_AGE_KEY) environment variable when calling sops. + type: str + version_added: 1.4.0 + age_keyfile: + description: + - The file containing the age private keys that sops can use to decrypt + encrypted files. + - Will be set as the C(SOPS_AGE_KEY_FILE) environment variable when calling sops. + - By default, sops looks for C(sops/age/keys.txt) inside your user configuration + directory. + type: path + version_added: 1.4.0 + aws_profile: + description: + - The AWS profile to use for requests to AWS. + - This corresponds to the sops C(--aws-profile) option. + type: str + version_added: 1.0.0 + aws_access_key_id: + description: + - The AWS access key ID to use for requests to AWS. + - Sets the environment variable C(AWS_ACCESS_KEY_ID) for the sops call. + type: str + version_added: 1.0.0 + aws_secret_access_key: + description: + - The AWS secret access key to use for requests to AWS. + - Sets the environment variable C(AWS_SECRET_ACCESS_KEY) for the sops call. + type: str + version_added: 1.0.0 + aws_session_token: + description: + - The AWS session token to use for requests to AWS. + - Sets the environment variable C(AWS_SESSION_TOKEN) for the sops call. + type: str + version_added: 1.0.0 + config_path: + description: + - Path to the sops configuration file. + - If not set, sops will recursively search for the config file starting at + the file that is encrypted or decrypted. + - This corresponds to the sops C(--config) option. + type: path + version_added: 1.0.0 + enable_local_keyservice: + description: + - Tell sops to use local key service. + - This corresponds to the sops C(--enable-local-keyservice) option. + type: bool + default: false + version_added: 1.0.0 + keyservice: + description: + - Specify key services to use next to the local one. + - A key service must be specified in the form C(protocol://address), for + example C(tcp://myserver.com:5000). + - This corresponds to the sops C(--keyservice) option. + type: list + elements: str + version_added: 1.0.0 +''' + + ANSIBLE_VARIABLES = r''' +options: + sops_binary: + vars: + - name: sops_binary + age_key: + vars: + - name: sops_age_key + age_keyfile: + vars: + - name: sops_age_keyfile + aws_profile: + vars: + - name: sops_aws_profile + aws_access_key_id: + vars: + - name: sops_aws_access_key_id + aws_secret_access_key: + vars: + - name: sops_aws_secret_access_key + aws_session_token: + vars: + - name: sops_session_token + - name: sops_aws_session_token + version_added: 1.2.0 + config_path: + vars: + - name: sops_config_path + enable_local_keyservice: + vars: + - name: sops_enable_local_keyservice + keyservice: + vars: + - name: sops_keyservice +''' + + ANSIBLE_ENV = r''' +options: + sops_binary: + env: + - name: ANSIBLE_SOPS_BINARY + version_added: 1.2.0 + age_key: + env: + - name: ANSIBLE_SOPS_AGE_KEY + age_keyfile: + env: + - name: ANSIBLE_SOPS_AGE_KEYFILE + aws_profile: + env: + - name: ANSIBLE_SOPS_AWS_PROFILE + version_added: 1.2.0 + aws_access_key_id: + env: + - name: ANSIBLE_SOPS_AWS_ACCESS_KEY_ID + version_added: 1.2.0 + aws_secret_access_key: + env: + - name: ANSIBLE_SOPS_AWS_SECRET_ACCESS_KEY + version_added: 1.2.0 + aws_session_token: + env: + - name: ANSIBLE_SOPS_AWS_SESSION_TOKEN + version_added: 1.2.0 + config_path: + env: + - name: ANSIBLE_SOPS_CONFIG_PATH + version_added: 1.2.0 + enable_local_keyservice: + env: + - name: ANSIBLE_SOPS_ENABLE_LOCAL_KEYSERVICE + version_added: 1.2.0 + keyservice: + env: + - name: ANSIBLE_SOPS_KEYSERVICE + version_added: 1.2.0 +''' + + ANSIBLE_INI = r''' +options: + sops_binary: + ini: + - section: community.sops + key: binary + version_added: 1.2.0 + # We do not provide an INI key for + # age_key + # to make sure that secrets cannot be provided in ansible.ini. Use environment variables or another mechanism for that. + age_keyfile: + ini: + - section: community.sops + key: age_keyfile + aws_profile: + ini: + - section: community.sops + key: aws_profile + version_added: 1.2.0 + aws_access_key_id: + ini: + - section: community.sops + key: aws_access_key_id + version_added: 1.2.0 + # We do not provide an INI key for + # aws_secret_access_key + # to make sure that secrets cannot be provided in ansible.ini. Use environment variables or another mechanism for that. + aws_session_token: + ini: + - section: community.sops + key: aws_session_token + version_added: 1.2.0 + config_path: + ini: + - section: community.sops + key: config_path + version_added: 1.2.0 + enable_local_keyservice: + ini: + - section: community.sops + key: enable_local_keyservice + version_added: 1.2.0 + keyservice: + ini: + - section: community.sops + key: keyservice + version_added: 1.2.0 +''' + + ENCRYPT_SPECIFIC = r''' +options: + age: + description: + - Age fingerprints to use. + - This corresponds to the sops C(--age) option. + type: list + elements: str + version_added: 1.4.0 + kms: + description: + - List of KMS ARNs to use. + - This corresponds to the sops C(--kms) option. + type: list + elements: str + version_added: 1.0.0 + gcp_kms: + description: + - GCP KMS resource IDs to use. + - This corresponds to the sops C(--gcp-kms) option. + type: list + elements: str + version_added: 1.0.0 + azure_kv: + description: + - Azure Key Vault URLs to use. + - This corresponds to the sops C(--azure-kv) option. + type: list + elements: str + version_added: 1.0.0 + hc_vault_transit: + description: + - HashiCorp Vault key URIs to use. + - For example, C(https://vault.example.org:8200/v1/transit/keys/dev). + - This corresponds to the sops C(--hc-vault-transit) option. + type: list + elements: str + version_added: 1.0.0 + pgp: + description: + - PGP fingerprints to use. + - This corresponds to the sops C(--pgp) option. + type: list + elements: str + version_added: 1.0.0 + unencrypted_suffix: + description: + - Override the unencrypted key suffix. + - This corresponds to the sops C(--unencrypted-suffix) option. + type: str + version_added: 1.0.0 + encrypted_suffix: + description: + - Override the encrypted key suffix. + - When set to an empty string, all keys will be encrypted that are not explicitly + marked by I(unencrypted_suffix). + - This corresponds to the sops C(--encrypted-suffix) option. + type: str + version_added: 1.0.0 + unencrypted_regex: + description: + - Set the unencrypted key suffix. + - When specified, only keys matching the regular expression will be left unencrypted. + - This corresponds to the sops C(--unencrypted-regex) option. + type: str + version_added: 1.0.0 + encrypted_regex: + description: + - Set the encrypted key suffix. + - When specified, only keys matching the regular expression will be encrypted. + - This corresponds to the sops C(--encrypted-regex) option. + type: str + version_added: 1.0.0 + encryption_context: + description: + - List of KMS encryption context pairs of format C(key:value). + - This corresponds to the sops C(--encryption-context) option. + type: list + elements: str + version_added: 1.0.0 + shamir_secret_sharing_threshold: + description: + - The number of distinct keys required to retrieve the data key with + L(Shamir's Secret Sharing, https://en.wikipedia.org/wiki/Shamir%27s_Secret_Sharing). + - If not set here and in the sops config file, will default to C(0). + - This corresponds to the sops C(--shamir-secret-sharing-threshold) option. + type: int + version_added: 1.0.0 +''' diff --git a/ansible_collections/community/sops/plugins/filter/_latest_version.py b/ansible_collections/community/sops/plugins/filter/_latest_version.py new file mode 100644 index 00000000..a4de0f17 --- /dev/null +++ b/ansible_collections/community/sops/plugins/filter/_latest_version.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2022, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +DOCUMENTATION = ''' +name: _latest_version +short_description: "[INTERNAL] Get latest version from a list of versions" +version_added: 1.4.0 +author: + - Felix Fontein (@felixfontein) +description: + - B(This is an internal tool and must only be used from roles in this collection!) + If you use it from outside this collection, be warned that its behavior can change + and it can be removed at any time, even in bugfix releases! + - Given a list of version numbers, returns the largest of them. +options: + _input: + description: + - A list of strings. Every string must be a version number. + type: list + elements: string + required: true +''' + +EXAMPLES = ''' +- name: Print latest version + ansible.builtin.debug: + msg: "{{ versions | community.sops._latest_version }}" + vars: + versions: + - 1.0.0 + - 1.0.0rc1 + - 1.1.0 +''' + +RETURN = ''' +_value: + description: + - The latest version from the input. + type: string +''' + +from ansible.module_utils.six import raise_from + +try: + from ansible.module_utils.compat.version import LooseVersion +except ImportError: + try: + from distutils.version import LooseVersion + except ImportError as exc: + msg = ( + 'To use this plugin or module with ansible-core 2.11, ansible-base 2.10,' + ' or Ansible 2.9, you need to use Python < 3.12 with distutils.version present' + ) + raise_from(ImportError(msg), exc) + + +def pick_latest_version(version_list): + '''Pick latest version from a list of versions.''' + if not version_list: + return '' + return sorted(version_list, key=LooseVersion, reverse=True)[0] + + +class FilterModule(object): + '''Helper filters.''' + def filters(self): + return { + '_latest_version': pick_latest_version, + } diff --git a/ansible_collections/community/sops/plugins/filter/decrypt.py b/ansible_collections/community/sops/plugins/filter/decrypt.py new file mode 100644 index 00000000..a27d1c70 --- /dev/null +++ b/ansible_collections/community/sops/plugins/filter/decrypt.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2021, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +DOCUMENTATION = ''' +name: decrpyt +short_description: Decrypt sops encrypted data +version_added: 1.1.0 +author: + - Felix Fontein (@felixfontein) +description: + - Decrypt sops encrypted data. + - Allows to decrypt data that has been provided by an arbitrary source. + - Note that due to Ansible lazy-evaluating expressions, it is better to use M(ansible.builtin.set_fact) + to store the result of an evaluation in a fact to avoid recomputing the value every time the expression + is used. +options: + _input: + description: + - The data to decrypt. + type: string + required: true + rstrip: + description: + - Whether to remove trailing newlines and spaces. + type: bool + default: true + input_type: + description: + - Tell sops how to interpret the encrypted data. + - There is no auto-detection since we do not have a filename. By default + sops is told to treat the input as YAML. If that is wrong, please set this + option to the correct value. + type: str + choices: + - binary + - json + - yaml + - dotenv + default: yaml + output_type: + description: + - Tell sops how to interpret the decrypted file. + - Please note that the output is always text or bytes, depending on the value of I(decode_output). + To parse the resulting JSON or YAML, use corresponding filters such as C(ansible.builtin.from_json) + and C(ansible.builtin.from_yaml). + type: str + choices: + - binary + - json + - yaml + - dotenv + default: yaml + decode_output: + description: + - Whether to decode the output to bytes. + - When I(output_type=binary), and the file isn't known to contain UTF-8 encoded text, + this should better be set to C(false) to prevent mangling the data with UTF-8 decoding. + type: bool + default: true +extends_documentation_fragment: + - community.sops.sops +seealso: + - plugin: community.sops.sops + plugin_type: lookup + - plugin: community.sops.sops + plugin_type: vars + - module: community.sops.load_vars +''' + +EXAMPLES = ''' +- name: Decrypt file fetched from URL + hosts: localhost + gather_facts: false + tasks: + - name: Fetch file from URL + ansible.builtin.uri: + url: https://raw.githubusercontent.com/mozilla/sops/master/functional-tests/res/comments.enc.yaml + return_content: true + register: encrypted_content + + - name: Show encrypted data + debug: + msg: "{{ encrypted_content.content | ansible.builtin.from_yaml }}" + + - name: Decrypt data and decode decrypted YAML + set_fact: + decrypted_data: "{{ encrypted_content.content | community.sops.decrypt | ansible.builtin.from_yaml }}" + + - name: Show decrypted data + debug: + msg: "{{ decrypted_data }}" +''' + +RETURN = ''' +_value: + description: + - Decrypted data as text (I(decode_output=true), default) or binary string (I(decode_output=false)). + type: string +''' + +from ansible.errors import AnsibleFilterError +from ansible.module_utils.common.text.converters import to_bytes, to_native +from ansible.utils.display import Display + +from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError + + +_VALID_TYPES = set(['binary', 'json', 'yaml', 'dotenv']) + + +def decrypt_filter(data, input_type='yaml', output_type='yaml', sops_binary='sops', rstrip=True, decode_output=True, + aws_profile=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, + config_path=None, enable_local_keyservice=False, keyservice=None, age_key=None, age_keyfile=None): + '''Decrypt sops-encrypted data.''' + + # Check parameters + if input_type not in _VALID_TYPES: + raise AnsibleFilterError('input_type must be one of {expected}; got "{value}"'.format( + expected=', '.join(sorted(_VALID_TYPES)), value=input_type)) + if output_type not in _VALID_TYPES: + raise AnsibleFilterError('output_type must be one of {expected}; got "{value}"'.format( + expected=', '.join(sorted(_VALID_TYPES)), value=output_type)) + + # Create option value querier + def get_option_value(argument_name): + if argument_name == 'sops_binary': + return sops_binary + if argument_name == 'age_key': + return age_key + if argument_name == 'age_keyfile': + return age_keyfile + if argument_name == 'aws_profile': + return aws_profile + if argument_name == 'aws_access_key_id': + return aws_access_key_id + if argument_name == 'aws_secret_access_key': + return aws_secret_access_key + if argument_name == 'aws_session_token': + return aws_session_token + if argument_name == 'config_path': + return config_path + if argument_name == 'enable_local_keyservice': + return enable_local_keyservice + if argument_name == 'keyservice': + return keyservice + raise AssertionError('internal error: should not be reached') + + # Decode + data = to_bytes(data) + try: + output = Sops.decrypt( + None, content=data, display=Display(), rstrip=rstrip, decode_output=decode_output, + input_type=input_type, output_type=output_type, get_option_value=get_option_value) + except SopsError as e: + raise AnsibleFilterError(to_native(e)) + + return output + + +class FilterModule(object): + '''Ansible jinja2 filters''' + + def filters(self): + return { + 'decrypt': decrypt_filter, + } diff --git a/ansible_collections/community/sops/plugins/lookup/sops.py b/ansible_collections/community/sops/plugins/lookup/sops.py new file mode 100644 index 00000000..64990ae5 --- /dev/null +++ b/ansible_collections/community/sops/plugins/lookup/sops.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2018 Edoardo Tenani <e.tenani@arduino.cc> (@endorama) +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = """ + name: sops + author: Edoardo Tenani (@endorama) <e.tenani@arduino.cc> + short_description: Read sops encrypted file contents + version_added: '0.1.0' + description: + - This lookup returns the contents from a file on the Ansible controller's file system. + - This lookup requires the C(sops) executable to be available in the controller PATH. + options: + _terms: + description: Path(s) of files to read. + required: true + rstrip: + description: Whether to remove trailing newlines and spaces. + type: bool + default: true + base64: + description: + - Base64-encodes the parsed result. + - Use this if you want to store binary data in Ansible variables. + type: bool + default: false + input_type: + description: + - Tell sops how to interpret the encrypted file. + - By default, sops will chose the input type from the file extension. + If it detects the wrong type for a file, this could result in decryption + failing. + type: str + choices: + - binary + - json + - yaml + - dotenv + output_type: + description: + - Tell sops how to interpret the decrypted file. + - By default, sops will chose the output type from the file extension. + If it detects the wrong type for a file, this could result in decryption + failing. + type: str + choices: + - binary + - json + - yaml + - dotenv + empty_on_not_exist: + description: + - When set to C(true), will not raise an error when a file cannot be found, + but return an empty string instead. + type: bool + default: false + extends_documentation_fragment: + - community.sops.sops + - community.sops.sops.ansible_variables + - community.sops.sops.ansible_env + - community.sops.sops.ansible_ini + notes: + - This lookup does not understand 'globbing' - use the fileglob lookup instead. + seealso: + - ref: community.sops.decrypt filter <ansible_collections.community.sops.decrypt_filter> + description: The decrypt filter can be used to descrypt sops-encrypted in-memory data. + # - plugin: community.sops.decrypt + # plugin_type: filter + - ref: community.sops.sops vars plugin <ansible_collections.community.sops.sops_vars> + description: The sops vars plugin can be used to load sops-encrypted host or group variables. + # - plugin: community.sops.sops + # plugin_type: vars + - module: community.sops.load_vars +""" + +EXAMPLES = """ +- name: Output secrets to screen (BAD IDEA!) + ansible.builtin.debug: + msg: "Content: {{ lookup('community.sops.sops', item) }}" + loop: + - sops-encrypted-file.enc.yaml + +- name: Add SSH private key + ansible.builtin.copy: + # Note that rstrip=false is necessary for some SSH versions to be able to use the key + content: "{{ lookup('community.sops.sops', user + '-id_rsa', rstrip=false) }}" + dest: /home/{{ user }}/.ssh/id_rsa + owner: "{{ user }}" + group: "{{ user }}" + mode: 0600 + no_log: true # avoid content to be written to log + +- name: The file file.json is a YAML file, which contains the encryption of binary data + ansible.builtin.debug: + msg: "Content: {{ lookup('community.sops.sops', 'file.json', input_type='yaml', output_type='binary') }}" + +""" + +RETURN = """ + _raw: + description: Decrypted file content. + type: list + elements: str +""" + +import base64 + +from ansible.errors import AnsibleLookupError +from ansible.plugins.lookup import LookupBase +from ansible.module_utils.common.text.converters import to_native +from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError + +from ansible.utils.display import Display +display = Display() + + +class LookupModule(LookupBase): + + def run(self, terms, variables=None, **kwargs): + self.set_options(var_options=variables, direct=kwargs) + rstrip = self.get_option('rstrip') + use_base64 = self.get_option('base64') + input_type = self.get_option('input_type') + output_type = self.get_option('output_type') + empty_on_not_exist = self.get_option('empty_on_not_exist') + + ret = [] + + def get_option_value(argument_name): + return self.get_option(argument_name) + + for term in terms: + display.debug("Sops lookup term: %s" % term) + lookupfile = self.find_file_in_search_path(variables, 'files', term, ignore_missing=empty_on_not_exist) + display.vvvv(u"Sops lookup using %s as file" % lookupfile) + + if not lookupfile: + if empty_on_not_exist: + ret.append('') + continue + raise AnsibleLookupError("could not locate file in lookup: %s" % to_native(term)) + + try: + output = Sops.decrypt( + lookupfile, display=display, rstrip=rstrip, decode_output=not use_base64, + input_type=input_type, output_type=output_type, get_option_value=get_option_value) + except SopsError as e: + raise AnsibleLookupError(to_native(e)) + + if use_base64: + output = to_native(base64.b64encode(output)) + + ret.append(output) + + return ret diff --git a/ansible_collections/community/sops/plugins/module_utils/io.py b/ansible_collections/community/sops/plugins/module_utils/io.py new file mode 100644 index 00000000..5432237a --- /dev/null +++ b/ansible_collections/community/sops/plugins/module_utils/io.py @@ -0,0 +1,53 @@ +# Copyright (c), Yanis Guenane <yanis+ansible@guenane.org>, 2016 +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import os +import tempfile + + +# This is taken from community.crypto + +def write_file(module, content): + ''' + Writes content into destination file as securely as possible. + Uses file arguments from module. + ''' + # Find out parameters for file + file_args = module.load_file_common_arguments(module.params) + # Create tempfile name + tmp_fd, tmp_name = tempfile.mkstemp(prefix=b'.ansible_tmp') + try: + os.close(tmp_fd) + except Exception: + pass + module.add_cleanup_file(tmp_name) # if we fail, let Ansible try to remove the file + try: + try: + # Create tempfile + file = os.open(tmp_name, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.write(file, content) + os.close(file) + except Exception as e: + try: + os.remove(tmp_name) + except Exception: + pass + module.fail_json(msg='Error while writing result into temporary file: {0}'.format(e)) + # Update destination to wanted permissions + if os.path.exists(file_args['path']): + module.set_fs_attributes_if_different(file_args, False) + # Move tempfile to final destination + module.atomic_move(tmp_name, file_args['path']) + # Try to update permissions again + module.set_fs_attributes_if_different(file_args, False) + except Exception as e: + try: + os.remove(tmp_name) + except Exception: + pass + module.fail_json(msg='Error while writing result: {0}'.format(e)) diff --git a/ansible_collections/community/sops/plugins/module_utils/sops.py b/ansible_collections/community/sops/plugins/module_utils/sops.py new file mode 100644 index 00000000..d3c98d1d --- /dev/null +++ b/ansible_collections/community/sops/plugins/module_utils/sops.py @@ -0,0 +1,303 @@ +# Copyright (c), Edoardo Tenani <e.tenani@arduino.cc>, 2018-2020 +# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause) +# SPDX-License-Identifier: BSD-2-Clause + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import os + +from ansible.module_utils.common.text.converters import to_text, to_native + +# Since this is used both by plugins and modules, we need subprocess in case the `module` parameter is not used +from subprocess import Popen, PIPE + + +# From https://github.com/mozilla/sops/blob/master/cmd/sops/codes/codes.go +# Should be manually updated +SOPS_ERROR_CODES = { + 1: "ErrorGeneric", + 2: "CouldNotReadInputFile", + 3: "CouldNotWriteOutputFile", + 4: "ErrorDumpingTree", + 5: "ErrorReadingConfig", + 6: "ErrorInvalidKMSEncryptionContextFormat", + 7: "ErrorInvalidSetFormat", + 8: "ErrorConflictingParameters", + 21: "ErrorEncryptingMac", + 23: "ErrorEncryptingTree", + 24: "ErrorDecryptingMac", + 25: "ErrorDecryptingTree", + 49: "CannotChangeKeysFromNonExistentFile", + 51: "MacMismatch", + 52: "MacNotFound", + 61: "ConfigFileNotFound", + 85: "KeyboardInterrupt", + 91: "InvalidTreePathFormat", + 100: "NoFileSpecified", + 128: "CouldNotRetrieveKey", + 111: "NoEncryptionKeyFound", + 200: "FileHasNotBeenModified", + 201: "NoEditorFound", + 202: "FailedToCompareVersions", + 203: "FileAlreadyEncrypted" +} + + +def _create_single_arg(argument_name): + def f(value, arguments, env): + arguments.extend([argument_name, to_native(value)]) + + return f + + +def _create_comma_separated(argument_name): + def f(value, arguments, env): + arguments.extend([argument_name, ','.join([to_native(v) for v in value])]) + + return f + + +def _create_repeated(argument_name): + def f(value, arguments, env): + for v in value: + arguments.extend([argument_name, to_native(v)]) + + return f + + +def _create_boolean(argument_name): + def f(value, arguments, env): + if value: + arguments.append(argument_name) + + return f + + +def _create_env_variable(argument_name): + def f(value, arguments, env): + env[argument_name] = value + + return f + + +GENERAL_OPTIONS = { + 'age_key': _create_env_variable('SOPS_AGE_KEY'), + 'age_keyfile': _create_env_variable('SOPS_AGE_KEY_FILE'), + 'aws_profile': _create_single_arg('--aws-profile'), + 'aws_access_key_id': _create_env_variable('AWS_ACCESS_KEY_ID'), + 'aws_secret_access_key': _create_env_variable('AWS_SECRET_ACCESS_KEY'), + 'aws_session_token': _create_env_variable('AWS_SESSION_TOKEN'), + 'config_path': _create_single_arg('--config'), + 'enable_local_keyservice': _create_boolean('--enable-local-keyservice'), + 'keyservice': _create_repeated('--keyservice'), +} + + +ENCRYPT_OPTIONS = { + 'age': _create_comma_separated('--age'), + 'kms': _create_comma_separated('--kms'), + 'gcp_kms': _create_comma_separated('--gcp-kms'), + 'azure_kv': _create_comma_separated('--azure-kv'), + 'hc_vault_transit': _create_comma_separated('--hc-vault-transit'), + 'pgp': _create_comma_separated('--pgp'), + 'unencrypted_suffix': _create_single_arg('--unencrypted-suffix'), + 'encrypted_suffix': _create_single_arg('--encrypted-suffix'), + 'unencrypted_regex': _create_single_arg('--unencrypted-regex'), + 'encrypted_regex': _create_single_arg('--encrypted-regex'), + 'encryption_context': _create_comma_separated('--encryption-context'), + 'shamir_secret_sharing_threshold': _create_single_arg('--shamir-secret-sharing-threshold'), +} + + +class SopsError(Exception): + ''' Extend Exception class with sops specific informations ''' + + def __init__(self, filename, exit_code, message, decryption=True): + if exit_code in SOPS_ERROR_CODES: + exception_name = SOPS_ERROR_CODES[exit_code] + message = "error with file %s: %s exited with code %d: %s" % ( + filename, exception_name, exit_code, to_native(message)) + else: + message = "could not %s file %s; Unknown sops error code: %s; message: %s" % ( + 'decrypt' if decryption else 'encrypt', filename, exit_code, to_native(message)) + super(SopsError, self).__init__(message) + + +class Sops(): + ''' Utility class to perform sops CLI actions ''' + + @staticmethod + def _add_options(command, env, get_option_value, options): + if get_option_value is None: + return + for option, f in options.items(): + v = get_option_value(option) + if v is not None: + f(v, command, env) + + @staticmethod + def get_sops_binary(get_option_value): + cmd = get_option_value('sops_binary') if get_option_value else None + if cmd is None: + cmd = 'sops' + return cmd + + @staticmethod + def decrypt(encrypted_file, content=None, + display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None): + # Run sops directly, python module is deprecated + command = [Sops.get_sops_binary(get_option_value)] + env = os.environ.copy() + Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS) + if input_type is not None: + command.extend(["--input-type", input_type]) + if output_type is not None: + command.extend(["--output-type", output_type]) + if content is not None: + encrypted_file = '/dev/stdin' + command.extend(["--decrypt", encrypted_file]) + + if module: + exit_code, output, err = module.run_command(command, environ_update=env, encoding=None, data=content, binary_data=True) + else: + process = Popen(command, stdin=None if content is None else PIPE, stdout=PIPE, stderr=PIPE, env=env) + (output, err) = process.communicate(input=content) + exit_code = process.returncode + + if decode_output: + # output is binary, we want UTF-8 string + output = to_text(output, errors='surrogate_or_strict') + # the process output is the decrypted secret; be cautious + + # sops logs always to stderr, as stdout is used for + # file content + if err and display: + display.vvvv(to_text(err, errors='surrogate_or_strict')) + + if exit_code != 0: + raise SopsError(encrypted_file, exit_code, err, decryption=True) + + if rstrip: + output = output.rstrip() + + return output + + @staticmethod + def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None): + # Run sops directly, python module is deprecated + command = [Sops.get_sops_binary(get_option_value)] + env = os.environ.copy() + Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS) + Sops._add_options(command, env, get_option_value, ENCRYPT_OPTIONS) + if input_type is not None: + command.extend(["--input-type", input_type]) + if output_type is not None: + command.extend(["--output-type", output_type]) + command.extend(["--encrypt", "/dev/stdin"]) + + if module: + exit_code, output, err = module.run_command(command, data=data, binary_data=True, cwd=cwd, environ_update=env, encoding=None) + else: + process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env) + (output, err) = process.communicate(input=data) + exit_code = process.returncode + + # sops logs always to stderr, as stdout is used for + # file content + if err and display: + display.vvvv(to_text(err, errors='surrogate_or_strict')) + + if exit_code != 0: + raise SopsError('to stdout', exit_code, err, decryption=False) + + return output + + +def get_sops_argument_spec(add_encrypt_specific=False): + argument_spec = { + 'sops_binary': { + 'type': 'path', + }, + 'age_key': { + 'type': 'str', + 'no_log': True, + }, + 'age_keyfile': { + 'type': 'path', + }, + 'aws_profile': { + 'type': 'str', + }, + 'aws_access_key_id': { + 'type': 'str', + }, + 'aws_secret_access_key': { + 'type': 'str', + 'no_log': True, + }, + 'aws_session_token': { + 'type': 'str', + 'no_log': True, + }, + 'config_path': { + 'type': 'path', + }, + 'enable_local_keyservice': { + 'type': 'bool', + 'default': False, + }, + 'keyservice': { + 'type': 'list', + 'elements': 'str', + }, + } + if add_encrypt_specific: + argument_spec.update({ + 'age': { + 'type': 'list', + 'elements': 'str', + }, + 'kms': { + 'type': 'list', + 'elements': 'str', + }, + 'gcp_kms': { + 'type': 'list', + 'elements': 'str', + }, + 'azure_kv': { + 'type': 'list', + 'elements': 'str', + }, + 'hc_vault_transit': { + 'type': 'list', + 'elements': 'str', + }, + 'pgp': { + 'type': 'list', + 'elements': 'str', + }, + 'unencrypted_suffix': { + 'type': 'str', + }, + 'encrypted_suffix': { + 'type': 'str', + }, + 'unencrypted_regex': { + 'type': 'str', + }, + 'encrypted_regex': { + 'type': 'str', + }, + 'encryption_context': { + 'type': 'list', + 'elements': 'str', + }, + 'shamir_secret_sharing_threshold': { + 'type': 'int', + 'no_log': False, + }, + }) + return argument_spec diff --git a/ansible_collections/community/sops/plugins/modules/load_vars.py b/ansible_collections/community/sops/plugins/modules/load_vars.py new file mode 100644 index 00000000..27e9ae8c --- /dev/null +++ b/ansible_collections/community/sops/plugins/modules/load_vars.py @@ -0,0 +1,117 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +author: Felix Fontein (@felixfontein) +module: load_vars +short_description: Load sops-encrypted variables from files, dynamically within a task +version_added: '0.1.0' +description: + - Loads sops-encrypted YAML/JSON variables dynamically from a file during task runtime. + - To assign included variables to a different host than C(inventory_hostname), + use C(delegate_to) and set C(delegate_facts=true). +options: + file: + description: + - The file name from which variables should be loaded. + - If the path is relative, it will look for the file in C(vars/) subdirectory of a role or relative to playbook. + type: path + name: + description: + - The name of a variable into which assign the included vars. + - If omitted (C(null)) they will be made top level vars. + type: str + expressions: + description: + - This option controls how Jinja2 expressions in values in the loaded file are handled. + - If set to C(ignore), expressions will not be evaluated, but treated as regular strings. + - If set to C(evaluate-on-load), expressions will be evaluated on execution of this module, + in other words, when the file is loaded. + - Unfortunately, there is no way for non-core modules to handle expressions "unsafe", + in other words, evaluate them only on use. This can only achieved by M(ansible.builtin.include_vars), + which unfortunately cannot handle sops-encrypted files. + type: str + default: ignore + choices: + - ignore + - evaluate-on-load +extends_documentation_fragment: + - community.sops.sops + - community.sops.attributes + - community.sops.attributes.facts + - community.sops.attributes.flow +attributes: + action: + support: full + async: + support: none + details: + - This action runs completely on the controller. + check_mode: + support: full + diff_mode: + support: N/A + details: + - This action does not modify state. + facts: + support: full +seealso: + - module: ansible.builtin.set_fact + - module: ansible.builtin.include_vars + - ref: playbooks_delegation + description: More information related to task delegation. + - ref: community.sops.sops lookup <ansible_collections.community.sops.sops_lookup> + description: The sops lookup can be used decrypt sops-encrypted files. + # - plugin: community.sops.sops + # plugin_type: lookup + - ref: community.sops.decrypt filter <ansible_collections.community.sops.decrypt_filter> + description: The decrypt filter can be used to descrypt sops-encrypted in-memory data. + # - plugin: community.sops.decrypt + # plugin_type: filter + - ref: community.sops.sops vars plugin <ansible_collections.community.sops.sops_vars> + description: The sops vars plugin can be used to load sops-encrypted host or group variables. + # - plugin: community.sops.sops + # plugin_type: vars +''' + +EXAMPLES = r''' +- name: Include variables of stuff.sops.yaml into the 'stuff' variable + community.sops.load_vars: + file: stuff.sops.yaml + name: stuff + expressions: evaluate-on-load # interpret Jinja2 expressions in stuf.sops.yaml on load-time! + +- name: Conditionally decide to load in variables into 'plans' when x is 0, otherwise do not + community.sops.load_vars: + file: contingency_plan.sops.yaml + name: plans + expressions: ignore # do not interpret possible Jinja2 expressions + when: x == 0 + +- name: Load variables into the global namespace + community.sops.load_vars: + file: contingency_plan.sops.yaml +''' + +RETURN = r''' +ansible_facts: + description: Variables that were included and their values. + returned: success + type: dict + sample: {'variable': 'value'} +ansible_included_var_files: + description: A list of files that were successfully included + returned: success + type: list + elements: str + sample: [ /path/to/file.sops.yaml ] +''' diff --git a/ansible_collections/community/sops/plugins/modules/sops_encrypt.py b/ansible_collections/community/sops/plugins/modules/sops_encrypt.py new file mode 100644 index 00000000..d4ba3435 --- /dev/null +++ b/ansible_collections/community/sops/plugins/modules/sops_encrypt.py @@ -0,0 +1,237 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2020, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +author: Felix Fontein (@felixfontein) +module: sops_encrypt +short_description: Encrypt data with sops +version_added: '0.1.0' +description: + - Allows to encrypt binary data (Base64 encoded), text data, JSON or YAML data with sops. +options: + path: + description: + - The sops encrypt file. + type: path + required: true + force: + description: + - Force rewriting the encrypted file. + type: bool + default: false + content_text: + description: + - The data to encrypt. Must be a Unicode text. + - Please note that the module might not be idempotent if the text can be parsed as JSON or YAML. + - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. + type: str + content_binary: + description: + - The data to encrypt. Must be L(Base64 encoded,https://en.wikipedia.org/wiki/Base64) binary data. + - Please note that the module might not be idempotent if the data can be parsed as JSON or YAML. + - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. + type: str + content_json: + description: + - The data to encrypt. Must be a JSON dictionary. + - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. + type: dict + content_yaml: + description: + - The data to encrypt. Must be a YAML dictionary. + - Please note that Ansible only allows to pass data that can be represented as a JSON dictionary. + - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. + type: dict +extends_documentation_fragment: + - ansible.builtin.files + - community.sops.sops + - community.sops.sops.encrypt_specific + - community.sops.attributes + - community.sops.attributes.files +attributes: + check_mode: + support: full + diff_mode: + support: none + safe_file_operations: + support: full +seealso: + - ref: community.sops.sops lookup <ansible_collections.community.sops.sops_lookup> + description: The sops lookup can be used decrypt sops-encrypted files. + # - plugin: community.sops.sops + # plugin_type: lookup +''' + +EXAMPLES = r''' +- name: Encrypt a secret text + community.sops.sops_encrypt: + path: text-data.sops + content_text: This is a secret text. + +- name: Encrypt the contents of a file + community.sops.sops_encrypt: + path: binary-data.sops + content_binary: "{{ lookup('ansible.builtin.file', '/path/to/file', rstrip=false) | b64encode }}" + +- name: Encrypt some datastructure as YAML + community.sops.sops_encrypt: + path: stuff.sops.yaml + content_yaml: "{{ result }}" +''' + +RETURN = r''' # ''' + + +import base64 +import json +import os +import traceback + +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.common.text.converters import to_text + +from ansible_collections.community.sops.plugins.module_utils.io import write_file +from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError, get_sops_argument_spec + +try: + import yaml + YAML_IMP_ERR = None + HAS_YAML = True +except ImportError: + YAML_IMP_ERR = traceback.format_exc() + HAS_YAML = False + yaml = None + + +def get_data_type(module): + if module.params['content_text'] is not None: + return 'binary' + if module.params['content_binary'] is not None: + return 'binary' + if module.params['content_json'] is not None: + return 'json' + if module.params['content_yaml'] is not None: + return 'yaml' + module.fail_json(msg='Internal error: unknown content type') + + +def compare_encoded_content(module, binary_data, content): + if module.params['content_text'] is not None: + return content == module.params['content_text'].encode('utf-8') + if module.params['content_binary'] is not None: + return content == binary_data + if module.params['content_json'] is not None: + # Compare JSON + try: + return json.loads(content) == module.params['content_json'] + except Exception: + # Treat parsing errors as content not equal + return False + if module.params['content_yaml'] is not None: + # Compare YAML + try: + return yaml.safe_load(content) == module.params['content_yaml'] + except Exception: + # Treat parsing errors as content not equal + return False + module.fail_json(msg='Internal error: unknown content type') + + +def get_encoded_type_content(module, binary_data): + if module.params['content_text'] is not None: + return 'binary', module.params['content_text'].encode('utf-8') + if module.params['content_binary'] is not None: + return 'binary', binary_data + if module.params['content_json'] is not None: + return 'json', json.dumps(module.params['content_json']).encode('utf-8') + if module.params['content_yaml'] is not None: + return 'yaml', yaml.safe_dump(module.params['content_yaml']).encode('utf-8') + module.fail_json(msg='Internal error: unknown content type') + + +def main(): + argument_spec = dict( + path=dict(type='path', required=True), + force=dict(type='bool', default=False), + content_text=dict(type='str', no_log=True), + content_binary=dict(type='str', no_log=True), + content_json=dict(type='dict', no_log=True), + content_yaml=dict(type='dict', no_log=True), + ) + argument_spec.update(get_sops_argument_spec(add_encrypt_specific=True)) + module = AnsibleModule( + argument_spec=argument_spec, + mutually_exclusive=[ + ('content_text', 'content_binary', 'content_json', 'content_yaml'), + ], + required_one_of=[ + ('content_text', 'content_binary', 'content_json', 'content_yaml'), + ], + supports_check_mode=True, + add_file_common_args=True, + ) + + # Check YAML + if module.params['content_yaml'] is not None and not HAS_YAML: + module.fail_json(msg=missing_required_lib('pyyaml'), exception=YAML_IMP_ERR) + + # Decode binary data + binary_data = None + if module.params['content_binary'] is not None: + try: + binary_data = base64.b64decode(module.params['content_binary']) + except Exception as e: + module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e)) + + path = module.params['path'] + directory = os.path.dirname(path) or None + changed = False + + def get_option_value(argument_name): + return module.params.get(argument_name) + + try: + if module.params['force'] or not os.path.exists(path): + # Simply encrypt + changed = True + else: + # Change detection: check if encrypted data equals new data + decrypted_content = Sops.decrypt( + path, decode_output=False, output_type=get_data_type(module), rstrip=False, + get_option_value=get_option_value, module=module, + ) + if not compare_encoded_content(module, binary_data, decrypted_content): + changed = True + + if changed and not module.check_mode: + input_type, input_data = get_encoded_type_content(module, binary_data) + output_type = None + if path.endswith('.json'): + output_type = 'json' + elif path.endswith('.yaml'): + output_type = 'yaml' + data = Sops.encrypt( + data=input_data, cwd=directory, input_type=input_type, output_type=output_type, + get_option_value=get_option_value, module=module, + ) + write_file(module, data) + except SopsError as e: + module.fail_json(msg=to_text(e)) + + file_args = module.load_file_common_arguments(module.params) + changed = module.set_fs_attributes_if_different(file_args, changed) + + module.exit_json(changed=changed) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/sops/plugins/plugin_utils/action_module.py b/ansible_collections/community/sops/plugins/plugin_utils/action_module.py new file mode 100644 index 00000000..f926c1d3 --- /dev/null +++ b/ansible_collections/community/sops/plugins/plugin_utils/action_module.py @@ -0,0 +1,786 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2012-2013 Michael DeHaan <michael.dehaan@gmail.com> +# Copyright (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# Copyright (c) 2019 Ansible Project +# Copyright (c) 2020 Felix Fontein <felix@fontein.de> +# Copyright (c) 2021 Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +# Parts taken from ansible.module_utils.basic and ansible.module_utils.common.warnings. + +# NOTE: THIS IS ONLY FOR ACTION PLUGINS! + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import abc +import copy +import traceback + +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.module_utils import six +from ansible.module_utils.basic import AnsibleFallbackNotFound, SEQUENCETYPE, remove_values +from ansible.module_utils.common._collections_compat import ( + Mapping +) +from ansible.module_utils.common.parameters import ( + PASS_VARS, + PASS_BOOLS, +) +from ansible.module_utils.common.validation import ( + check_mutually_exclusive, + check_required_arguments, + check_required_by, + check_required_if, + check_required_one_of, + check_required_together, + count_terms, + check_type_bool, + check_type_bits, + check_type_bytes, + check_type_float, + check_type_int, + check_type_jsonarg, + check_type_list, + check_type_dict, + check_type_path, + check_type_raw, + check_type_str, + safe_eval, +) +from ansible.module_utils.common.text.formatters import ( + lenient_lowercase, +) +from ansible.module_utils.parsing.convert_bool import BOOLEANS_FALSE, BOOLEANS_TRUE +from ansible.module_utils.six import ( + binary_type, + string_types, + text_type, +) +from ansible.module_utils.common.text.converters import to_native, to_text +from ansible.plugins.action import ActionBase + + +try: + # For ansible-core 2.11, we can use the ArgumentSpecValidator. We also import + # ModuleArgumentSpecValidator since that indicates that the 'classical' approach + # will no longer work. + from ansible.module_utils.common.arg_spec import ( # noqa: F401, pylint: disable=unused-import + ArgumentSpecValidator, + ModuleArgumentSpecValidator, # ModuleArgumentSpecValidator is not used + ) + from ansible.module_utils.errors import UnsupportedError + HAS_ARGSPEC_VALIDATOR = True +except ImportError: + # For ansible-base 2.10 and Ansible 2.9, we need to use the 'classical' approach + from ansible.module_utils.common.parameters import ( + handle_aliases, + list_deprecations, + list_no_log_values, + ) + HAS_ARGSPEC_VALIDATOR = False + + +class _ModuleExitException(Exception): + def __init__(self, result): + super(_ModuleExitException, self).__init__() + self.result = result + + +class AnsibleActionModule(object): + def __init__(self, action_plugin, argument_spec, bypass_checks=False, + mutually_exclusive=None, required_together=None, + required_one_of=None, supports_check_mode=False, + required_if=None, required_by=None): + # Internal data + self.__action_plugin = action_plugin + self.__warnings = [] + self.__deprecations = [] + + # AnsibleModule data + self._name = self.__action_plugin._task.action + self.argument_spec = argument_spec + self.supports_check_mode = supports_check_mode + self.check_mode = self.__action_plugin._play_context.check_mode + self.bypass_checks = bypass_checks + self.no_log = self.__action_plugin._play_context.no_log + + self.mutually_exclusive = mutually_exclusive + self.required_together = required_together + self.required_one_of = required_one_of + self.required_if = required_if + self.required_by = required_by + self._diff = self.__action_plugin._play_context.diff + self._verbosity = self.__action_plugin._display.verbosity + + self.aliases = {} + self._legal_inputs = [] + self._options_context = list() + + self.params = copy.deepcopy(self.__action_plugin._task.args) + self.no_log_values = set() + if HAS_ARGSPEC_VALIDATOR: + self._validator = ArgumentSpecValidator( + self.argument_spec, + self.mutually_exclusive, + self.required_together, + self.required_one_of, + self.required_if, + self.required_by, + ) + self._validation_result = self._validator.validate(self.params) + self.params.update(self._validation_result.validated_parameters) + self.no_log_values.update(self._validation_result._no_log_values) + + try: + error = self._validation_result.errors[0] + except IndexError: + error = None + + # We cannot use ModuleArgumentSpecValidator directly since it uses mechanisms for reporting + # warnings and deprecations that do not work in plugins. This is a copy of that code adjusted + # for our use-case: + for d in self._validation_result._deprecations: + # Before ansible-core 2.14.2, deprecations were always for aliases: + if 'name' in d: + self.deprecate( + "Alias '{name}' is deprecated. See the module docs for more information".format(name=d['name']), + version=d.get('version'), date=d.get('date'), collection_name=d.get('collection_name')) + # Since ansible-core 2.14.2, a message is present that can be directly printed: + if 'msg' in d: + self.deprecate(d['msg'], version=d.get('version'), date=d.get('date'), collection_name=d.get('collection_name')) + + for w in self._validation_result._warnings: + self.warn('Both option {option} and its alias {alias} are set.'.format(option=w['option'], alias=w['alias'])) + + # Fail for validation errors, even in check mode + if error: + msg = self._validation_result.errors.msg + if isinstance(error, UnsupportedError): + msg = "Unsupported parameters for ({name}) {kind}: {msg}".format(name=self._name, kind='module', msg=msg) + + self.fail_json(msg=msg) + else: + self._set_fallbacks() + + # append to legal_inputs and then possibly check against them + try: + self.aliases = self._handle_aliases() + except (ValueError, TypeError) as e: + # Use exceptions here because it isn't safe to call fail_json until no_log is processed + raise _ModuleExitException(dict(failed=True, msg="Module alias error: %s" % to_native(e))) + + # Save parameter values that should never be logged + self._handle_no_log_values() + + self._check_arguments() + + # check exclusive early + if not bypass_checks: + self._check_mutually_exclusive(mutually_exclusive) + + self._set_defaults(pre=True) + + self._CHECK_ARGUMENT_TYPES_DISPATCHER = { + 'str': self._check_type_str, + 'list': check_type_list, + 'dict': check_type_dict, + 'bool': check_type_bool, + 'int': check_type_int, + 'float': check_type_float, + 'path': check_type_path, + 'raw': check_type_raw, + 'jsonarg': check_type_jsonarg, + 'json': check_type_jsonarg, + 'bytes': check_type_bytes, + 'bits': check_type_bits, + } + if not bypass_checks: + self._check_required_arguments() + self._check_argument_types() + self._check_argument_values() + self._check_required_together(required_together) + self._check_required_one_of(required_one_of) + self._check_required_if(required_if) + self._check_required_by(required_by) + + self._set_defaults(pre=False) + + # deal with options sub-spec + self._handle_options() + + def _handle_aliases(self, spec=None, param=None, option_prefix=''): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + # this uses exceptions as it happens before we can safely call fail_json + alias_warnings = [] + alias_results, self._legal_inputs = handle_aliases(spec, param, alias_warnings=alias_warnings) # pylint: disable=used-before-assignment + for option, alias in alias_warnings: + self.warn('Both option %s and its alias %s are set.' % (option_prefix + option, option_prefix + alias)) + + deprecated_aliases = [] + for i in spec.keys(): + if 'deprecated_aliases' in spec[i].keys(): + for alias in spec[i]['deprecated_aliases']: + deprecated_aliases.append(alias) + + for deprecation in deprecated_aliases: + if deprecation['name'] in param.keys(): + self.deprecate("Alias '%s' is deprecated. See the module docs for more information" % deprecation['name'], + version=deprecation.get('version'), date=deprecation.get('date'), + collection_name=deprecation.get('collection_name')) + return alias_results + + def _handle_no_log_values(self, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + try: + self.no_log_values.update(list_no_log_values(spec, param)) # pylint: disable=used-before-assignment + except TypeError as te: + self.fail_json(msg="Failure when processing no_log parameters. Module invocation will be hidden. " + "%s" % to_native(te), invocation={'module_args': 'HIDDEN DUE TO FAILURE'}) + + for message in list_deprecations(spec, param): # pylint: disable=used-before-assignment + self.deprecate(message['msg'], version=message.get('version'), date=message.get('date'), + collection_name=message.get('collection_name')) + + def _check_arguments(self, spec=None, param=None, legal_inputs=None): + self._syslog_facility = 'LOG_USER' + unsupported_parameters = set() + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + if legal_inputs is None: + legal_inputs = self._legal_inputs + + for k in list(param.keys()): + + if k not in legal_inputs: + unsupported_parameters.add(k) + + for k in PASS_VARS: + # handle setting internal properties from internal ansible vars + param_key = '_ansible_%s' % k + if param_key in param: + if k in PASS_BOOLS: + setattr(self, PASS_VARS[k][0], self.boolean(param[param_key])) + else: + setattr(self, PASS_VARS[k][0], param[param_key]) + + # clean up internal top level params: + if param_key in self.params: + del self.params[param_key] + else: + # use defaults if not already set + if not hasattr(self, PASS_VARS[k][0]): + setattr(self, PASS_VARS[k][0], PASS_VARS[k][1]) + + if unsupported_parameters: + msg = "Unsupported parameters for (%s) module: %s" % (self._name, ', '.join(sorted(list(unsupported_parameters)))) + if self._options_context: + msg += " found in %s." % " -> ".join(self._options_context) + supported_parameters = list() + for key in sorted(spec.keys()): + if 'aliases' in spec[key] and spec[key]['aliases']: + supported_parameters.append("%s (%s)" % (key, ', '.join(sorted(spec[key]['aliases'])))) + else: + supported_parameters.append(key) + msg += " Supported parameters include: %s" % (', '.join(supported_parameters)) + self.fail_json(msg=msg) + if self.check_mode and not self.supports_check_mode: + self.exit_json(skipped=True, msg="action module (%s) does not support check mode" % self._name) + + def _count_terms(self, check, param=None): + if param is None: + param = self.params + return count_terms(check, param) + + def _check_mutually_exclusive(self, spec, param=None): + if param is None: + param = self.params + + try: + check_mutually_exclusive(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_one_of(self, spec, param=None): + if spec is None: + return + + if param is None: + param = self.params + + try: + check_required_one_of(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_together(self, spec, param=None): + if spec is None: + return + if param is None: + param = self.params + + try: + check_required_together(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_by(self, spec, param=None): + if spec is None: + return + if param is None: + param = self.params + + try: + check_required_by(spec, param) + except TypeError as e: + self.fail_json(msg=to_native(e)) + + def _check_required_arguments(self, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + try: + check_required_arguments(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_if(self, spec, param=None): + ''' ensure that parameters which conditionally required are present ''' + if spec is None: + return + if param is None: + param = self.params + + try: + check_required_if(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_argument_values(self, spec=None, param=None): + ''' ensure all arguments have the requested values, and there are no stray arguments ''' + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + for (k, v) in spec.items(): + choices = v.get('choices', None) + if choices is None: + continue + if isinstance(choices, SEQUENCETYPE) and not isinstance(choices, (binary_type, text_type)): + if k in param: + # Allow one or more when type='list' param with choices + if isinstance(param[k], list): + diff_list = ", ".join([item for item in param[k] if item not in choices]) + if diff_list: + choices_str = ", ".join([to_native(c) for c in choices]) + msg = "value of %s must be one or more of: %s. Got no match for: %s" % (k, choices_str, diff_list) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + elif param[k] not in choices: + # PyYaml converts certain strings to bools. If we can unambiguously convert back, do so before checking + # the value. If we can't figure this out, module author is responsible. + lowered_choices = None + if param[k] == 'False': + lowered_choices = lenient_lowercase(choices) + overlap = BOOLEANS_FALSE.intersection(choices) + if len(overlap) == 1: + # Extract from a set + (param[k],) = overlap + + if param[k] == 'True': + if lowered_choices is None: + lowered_choices = lenient_lowercase(choices) + overlap = BOOLEANS_TRUE.intersection(choices) + if len(overlap) == 1: + (param[k],) = overlap + + if param[k] not in choices: + choices_str = ", ".join([to_native(c) for c in choices]) + msg = "value of %s must be one of: %s, got: %s" % (k, choices_str, param[k]) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + else: + msg = "internal error: choices for argument %s are not iterable: %s" % (k, choices) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def safe_eval(self, value, locals=None, include_exceptions=False): + return safe_eval(value, locals, include_exceptions) + + def _check_type_str(self, value, param=None, prefix=''): + opts = { + 'error': False, + 'warn': False, + 'ignore': True + } + + # Ignore, warn, or error when converting to a string. + allow_conversion = opts.get(C.STRING_CONVERSION_ACTION, True) + try: + return check_type_str(value, allow_conversion) + except TypeError: + common_msg = 'quote the entire value to ensure it does not change.' + from_msg = '{0!r}'.format(value) + to_msg = '{0!r}'.format(to_text(value)) + + if param is not None: + if prefix: + param = '{0}{1}'.format(prefix, param) + + from_msg = '{0}: {1!r}'.format(param, value) + to_msg = '{0}: {1!r}'.format(param, to_text(value)) + + if C.STRING_CONVERSION_ACTION == 'error': + msg = common_msg.capitalize() + raise TypeError(to_native(msg)) + elif C.STRING_CONVERSION_ACTION == 'warn': + msg = ('The value "{0}" (type {1.__class__.__name__}) was converted to "{2}" (type string). ' + 'If this does not look like what you expect, {3}').format(from_msg, value, to_msg, common_msg) + self.warn(to_native(msg)) + return to_native(value, errors='surrogate_or_strict') + + def _handle_options(self, argument_spec=None, params=None, prefix=''): + ''' deal with options to create sub spec ''' + if argument_spec is None: + argument_spec = self.argument_spec + if params is None: + params = self.params + + for (k, v) in argument_spec.items(): + wanted = v.get('type', None) + if wanted == 'dict' or (wanted == 'list' and v.get('elements', '') == 'dict'): + spec = v.get('options', None) + if v.get('apply_defaults', False): + if spec is not None: + if params.get(k) is None: + params[k] = {} + else: + continue + elif spec is None or k not in params or params[k] is None: + continue + + self._options_context.append(k) + + if isinstance(params[k], dict): + elements = [params[k]] + else: + elements = params[k] + + for idx, param in enumerate(elements): + if not isinstance(param, dict): + self.fail_json(msg="value of %s must be of type dict or list of dict" % k) + + new_prefix = prefix + k + if wanted == 'list': + new_prefix += '[%d]' % idx + new_prefix += '.' + + self._set_fallbacks(spec, param) + options_aliases = self._handle_aliases(spec, param, option_prefix=new_prefix) + + options_legal_inputs = list(spec.keys()) + list(options_aliases.keys()) + + self._check_arguments(spec, param, options_legal_inputs) + + # check exclusive early + if not self.bypass_checks: + self._check_mutually_exclusive(v.get('mutually_exclusive', None), param) + + self._set_defaults(pre=True, spec=spec, param=param) + + if not self.bypass_checks: + self._check_required_arguments(spec, param) + self._check_argument_types(spec, param, new_prefix) + self._check_argument_values(spec, param) + + self._check_required_together(v.get('required_together', None), param) + self._check_required_one_of(v.get('required_one_of', None), param) + self._check_required_if(v.get('required_if', None), param) + self._check_required_by(v.get('required_by', None), param) + + self._set_defaults(pre=False, spec=spec, param=param) + + # handle multi level options (sub argspec) + self._handle_options(spec, param, new_prefix) + self._options_context.pop() + + def _get_wanted_type(self, wanted, k): + if not callable(wanted): + if wanted is None: + # Mostly we want to default to str. + # For values set to None explicitly, return None instead as + # that allows a user to unset a parameter + wanted = 'str' + try: + type_checker = self._CHECK_ARGUMENT_TYPES_DISPATCHER[wanted] + except KeyError: + self.fail_json(msg="implementation error: unknown type %s requested for %s" % (wanted, k)) + else: + # set the type_checker to the callable, and reset wanted to the callable's name (or type if it doesn't have one, ala MagicMock) + type_checker = wanted + wanted = getattr(wanted, '__name__', to_native(type(wanted))) + + return type_checker, wanted + + def _handle_elements(self, wanted, param, values): + type_checker, wanted_name = self._get_wanted_type(wanted, param) + validated_params = [] + # Get param name for strings so we can later display this value in a useful error message if needed + # Only pass 'kwargs' to our checkers and ignore custom callable checkers + kwargs = {} + if wanted_name == 'str' and isinstance(wanted, string_types): + if isinstance(param, string_types): + kwargs['param'] = param + elif isinstance(param, dict): + kwargs['param'] = list(param.keys())[0] + for value in values: + try: + validated_params.append(type_checker(value, **kwargs)) + except (TypeError, ValueError) as e: + msg = "Elements value for option %s" % param + if self._options_context: + msg += " found in '%s'" % " -> ".join(self._options_context) + msg += " is of type %s and we were unable to convert to %s: %s" % (type(value), wanted_name, to_native(e)) + self.fail_json(msg=msg) + return validated_params + + def _check_argument_types(self, spec=None, param=None, prefix=''): + ''' ensure all arguments have the requested type ''' + + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + for (k, v) in spec.items(): + wanted = v.get('type', None) + if k not in param: + continue + + value = param[k] + if value is None: + continue + + type_checker, wanted_name = self._get_wanted_type(wanted, k) + # Get param name for strings so we can later display this value in a useful error message if needed + # Only pass 'kwargs' to our checkers and ignore custom callable checkers + kwargs = {} + if wanted_name == 'str' and isinstance(type_checker, string_types): + kwargs['param'] = list(param.keys())[0] + + # Get the name of the parent key if this is a nested option + if prefix: + kwargs['prefix'] = prefix + + try: + param[k] = type_checker(value, **kwargs) + wanted_elements = v.get('elements', None) + if wanted_elements: + if wanted != 'list' or not isinstance(param[k], list): + msg = "Invalid type %s for option '%s'" % (wanted_name, param) + if self._options_context: + msg += " found in '%s'." % " -> ".join(self._options_context) + msg += ", elements value check is supported only with 'list' type" + self.fail_json(msg=msg) + param[k] = self._handle_elements(wanted_elements, k, param[k]) + + except (TypeError, ValueError) as e: + msg = "argument %s is of type %s" % (k, type(value)) + if self._options_context: + msg += " found in '%s'." % " -> ".join(self._options_context) + msg += " and we were unable to convert to %s: %s" % (wanted_name, to_native(e)) + self.fail_json(msg=msg) + + def _set_defaults(self, pre=True, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + for (k, v) in spec.items(): + default = v.get('default', None) + if pre is True: + # this prevents setting defaults on required items + if default is not None and k not in param: + param[k] = default + else: + # make sure things without a default still get set None + if k not in param: + param[k] = default + + def _set_fallbacks(self, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + for (k, v) in spec.items(): + fallback = v.get('fallback', (None,)) + fallback_strategy = fallback[0] + fallback_args = [] + fallback_kwargs = {} + if k not in param and fallback_strategy is not None: + for item in fallback[1:]: + if isinstance(item, dict): + fallback_kwargs = item + else: + fallback_args = item + try: + param[k] = fallback_strategy(*fallback_args, **fallback_kwargs) + except AnsibleFallbackNotFound: + continue + + def warn(self, warning): + # Copied from ansible.module_utils.common.warnings: + if isinstance(warning, string_types): + self.__warnings.append(warning) + else: + raise TypeError("warn requires a string not a %s" % type(warning)) + + def deprecate(self, msg, version=None, date=None, collection_name=None): + if version is not None and date is not None: + raise AssertionError("implementation error -- version and date must not both be set") + + # Copied from ansible.module_utils.common.warnings: + if isinstance(msg, string_types): + # For compatibility, we accept that neither version nor date is set, + # and treat that the same as if version would haven been set + if date is not None: + self.__deprecations.append({'msg': msg, 'date': date, 'collection_name': collection_name}) + else: + self.__deprecations.append({'msg': msg, 'version': version, 'collection_name': collection_name}) + else: + raise TypeError("deprecate requires a string not a %s" % type(msg)) + + def _return_formatted(self, kwargs): + if 'invocation' not in kwargs: + kwargs['invocation'] = {'module_args': self.params} + + if 'warnings' in kwargs: + if isinstance(kwargs['warnings'], list): + for w in kwargs['warnings']: + self.warn(w) + else: + self.warn(kwargs['warnings']) + + if self.__warnings: + kwargs['warnings'] = self.__warnings + + if 'deprecations' in kwargs: + if isinstance(kwargs['deprecations'], list): + for d in kwargs['deprecations']: + if isinstance(d, SEQUENCETYPE) and len(d) == 2: + self.deprecate(d[0], version=d[1]) + elif isinstance(d, Mapping): + self.deprecate(d['msg'], version=d.get('version'), date=d.get('date'), + collection_name=d.get('collection_name')) + else: + self.deprecate(d) # pylint: disable=ansible-deprecated-no-version + else: + self.deprecate(kwargs['deprecations']) # pylint: disable=ansible-deprecated-no-version + + if self.__deprecations: + kwargs['deprecations'] = self.__deprecations + + kwargs = remove_values(kwargs, self.no_log_values) + raise _ModuleExitException(kwargs) + + def exit_json(self, **kwargs): + result = dict(kwargs) + if 'failed' not in result: + result['failed'] = False + self._return_formatted(result) + + def fail_json(self, msg, **kwargs): + result = dict(kwargs) + result['failed'] = True + result['msg'] = msg + self._return_formatted(result) + + +@six.add_metaclass(abc.ABCMeta) +class ActionModuleBase(ActionBase): + @abc.abstractmethod + def setup_module(self): + """Return pair (ArgumentSpec, kwargs).""" + pass + + @abc.abstractmethod + def run_module(self, module): + """Run module code""" + module.fail_json(msg='Not implemented.') + + def run(self, tmp=None, task_vars=None): + if task_vars is None: + task_vars = dict() + + result = super(ActionModuleBase, self).run(tmp, task_vars) + del tmp # tmp no longer has any effect + + try: + argument_spec, kwargs = self.setup_module() + module = argument_spec.create_ansible_module_helper(AnsibleActionModule, (self, ), **kwargs) + self.run_module(module) + raise AnsibleError('Internal error: action module did not call module.exit_json()') + except _ModuleExitException as mee: + result.update(mee.result) + return result + except Exception as dummy: + result['failed'] = True + result['msg'] = 'MODULE FAILURE' + result['exception'] = traceback.format_exc() + return result + + +class ArgumentSpec: + def __init__(self, argument_spec, mutually_exclusive=None, required_together=None, required_one_of=None, required_if=None, required_by=None): + self.argument_spec = argument_spec + self.mutually_exclusive = mutually_exclusive or [] + self.required_together = required_together or [] + self.required_one_of = required_one_of or [] + self.required_if = required_if or [] + self.required_by = required_by or {} + + def create_ansible_module_helper(self, clazz, args, **kwargs): + return clazz( + *args, + argument_spec=self.argument_spec, + mutually_exclusive=self.mutually_exclusive, + required_together=self.required_together, + required_one_of=self.required_one_of, + required_if=self.required_if, + required_by=self.required_by, + **kwargs) diff --git a/ansible_collections/community/sops/plugins/vars/sops.py b/ansible_collections/community/sops/plugins/vars/sops.py new file mode 100644 index 00000000..54748035 --- /dev/null +++ b/ansible_collections/community/sops/plugins/vars/sops.py @@ -0,0 +1,165 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2018 Edoardo Tenani <e.tenani@arduino.cc> (@endorama) +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + name: sops + author: Edoardo Tenani (@endorama) <e.tenani@arduino.cc> + short_description: Loading sops-encrypted vars files + version_added: '0.1.0' + description: + - Load encrypted YAML files into corresponding groups/hosts in group_vars/ and host_vars/ directories. + - Files are encrypted prior to reading, making this plugin an effective companion to host_group_vars plugin. + - Files are restricted to .sops.yaml, .sops.yml, .sops.json extensions. + - Hidden files are ignored. + options: + _valid_extensions: + default: [".sops.yml", ".sops.yaml", ".sops.json"] + description: + - "Check all of these extensions when looking for 'variable' files which should be YAML or JSON or vaulted versions of these." + - 'This affects vars_files, include_vars, inventory and vars plugins among others.' + type: list + elements: string + stage: + version_added: 0.2.0 + ini: + - key: vars_stage + section: community.sops + env: + - name: ANSIBLE_VARS_SOPS_PLUGIN_STAGE + cache: + description: + - Whether to cache decrypted files or not. + - If the cache is disabled, the files will be decrypted for almost every task. This is very slow! + - Only disable caching if you modify the variable files during a playbook run and want the updated + result to be available from the next task on. + - "Note that setting I(stage) to C(inventory) has the same effect as setting I(cache) to C(true): + the variables will be loaded only once (during inventory loading) and the vars plugin will not + be called for every task." + type: bool + default: true + version_added: 0.2.0 + ini: + - key: vars_cache + section: community.sops + env: + - name: ANSIBLE_VARS_SOPS_PLUGIN_CACHE + _disable_vars_plugin_temporarily: + description: + - Temporarily disable this plugin. + - Useful if ansible-inventory is supposed to be run without decrypting secrets (in AWX for instance). + type: bool + default: false + version_added: 1.3.0 + env: + - name: SOPS_ANSIBLE_AWX_DISABLE_VARS_PLUGIN_TEMPORARILY + extends_documentation_fragment: + - ansible.builtin.vars_plugin_staging + - community.sops.sops + - community.sops.sops.ansible_env + - community.sops.sops.ansible_ini + seealso: + - ref: community.sops.sops lookup <ansible_collections.community.sops.sops_lookup> + description: The sops lookup can be used decrypt sops-encrypted files. + # - plugin: community.sops.sops + # plugin_type: lookup + - ref: community.sops.decrypt filter <ansible_collections.community.sops.decrypt_filter> + description: The decrypt filter can be used to descrypt sops-encrypted in-memory data. + # - plugin: community.sops.decrypt + # plugin_type: filter + - module: community.sops.load_vars +''' + +import os +from ansible.errors import AnsibleParserError +from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text +from ansible.plugins.vars import BaseVarsPlugin +from ansible.inventory.host import Host +from ansible.inventory.group import Group +from ansible.utils.vars import combine_vars +from ansible_collections.community.sops.plugins.module_utils.sops import Sops + +from ansible.utils.display import Display +display = Display() + + +FOUND = {} +DECRYPTED = {} +DEFAULT_VALID_EXTENSIONS = [".sops.yaml", ".sops.yml", ".sops.json"] + + +class VarsModule(BaseVarsPlugin): + + def get_vars(self, loader, path, entities, cache=None): + ''' parses the inventory file ''' + + if not isinstance(entities, list): + entities = [entities] + + super(VarsModule, self).get_vars(loader, path, entities) + + def get_option_value(argument_name): + return self.get_option(argument_name) + + if cache is None: + cache = self.get_option('cache') + + if self.get_option('_disable_vars_plugin_temporarily'): + return {} + + data = {} + for entity in entities: + if isinstance(entity, Host): + subdir = 'host_vars' + elif isinstance(entity, Group): + subdir = 'group_vars' + else: + raise AnsibleParserError("Supplied entity must be Host or Group, got %s instead" % (type(entity))) + + # avoid 'chroot' type inventory hostnames /path/to/chroot + if not entity.name.startswith(os.path.sep): + try: + found_files = [] + # load vars + b_opath = os.path.realpath(to_bytes(os.path.join(self._basedir, subdir))) + opath = to_text(b_opath) + key = '%s.%s' % (entity.name, opath) + self._display.vvvv("key: %s" % (key)) + if cache and key in FOUND: + found_files = FOUND[key] + else: + # no need to do much if path does not exist for basedir + if os.path.exists(b_opath): + if os.path.isdir(b_opath): + self._display.debug("\tprocessing dir %s" % opath) + # NOTE: iterating without extension allow retriving files recursively + # A filter is then applied by iterating on all results and filtering by + # extension. + # See: + # - https://github.com/ansible-collections/community.sops/pull/6 + found_files = loader.find_vars_files(opath, entity.name, extensions=DEFAULT_VALID_EXTENSIONS, allow_dir=False) + found_files.extend([file_path for file_path in loader.find_vars_files(opath, entity.name) + if any(to_text(file_path).endswith(extension) for extension in DEFAULT_VALID_EXTENSIONS)]) + FOUND[key] = found_files + else: + self._display.warning("Found %s that is not a directory, skipping: %s" % (subdir, opath)) + + for found in found_files: + if cache and found in DECRYPTED: + file_content = DECRYPTED[found] + else: + file_content = Sops.decrypt(found, display=display, get_option_value=get_option_value) + DECRYPTED[found] = file_content + new_data = loader.load(file_content) + if new_data: # ignore empty files + data = combine_vars(data, new_data) + + except Exception as e: + raise AnsibleParserError(to_native(e)) + + return data |