diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/modules/rpm_key.py | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream.tar.xz ansible-core-upstream.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/modules/rpm_key.py')
-rw-r--r-- | lib/ansible/modules/rpm_key.py | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/lib/ansible/modules/rpm_key.py b/lib/ansible/modules/rpm_key.py new file mode 100644 index 0000000..f420eec --- /dev/null +++ b/lib/ansible/modules/rpm_key.py @@ -0,0 +1,253 @@ +# -*- coding: utf-8 -*- + +# Ansible module to import third party repo keys to your rpm db +# Copyright: (c) 2013, Héctor Acosta <hector.acosta@gazzang.com> + +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = ''' +--- +module: rpm_key +author: + - Hector Acosta (@hacosta) <hector.acosta@gazzang.com> +short_description: Adds or removes a gpg key from the rpm db +description: + - Adds or removes (rpm --import) a gpg key to your rpm database. +version_added: "1.3" +options: + key: + description: + - Key that will be modified. Can be a url, a file on the managed node, or a keyid if the key + already exists in the database. + type: str + required: true + state: + description: + - If the key will be imported or removed from the rpm db. + type: str + default: present + choices: [ absent, present ] + validate_certs: + description: + - If C(false) and the C(key) is a url starting with https, SSL certificates will not be validated. + - This should only be used on personally controlled sites using self-signed certificates. + type: bool + default: 'yes' + fingerprint: + description: + - The long-form fingerprint of the key being imported. + - This will be used to verify the specified key. + type: str + version_added: 2.9 +extends_documentation_fragment: + - action_common_attributes +attributes: + check_mode: + support: full + diff_mode: + support: none + platform: + platforms: rhel +''' + +EXAMPLES = ''' +- name: Import a key from a url + ansible.builtin.rpm_key: + state: present + key: http://apt.sw.be/RPM-GPG-KEY.dag.txt + +- name: Import a key from a file + ansible.builtin.rpm_key: + state: present + key: /path/to/key.gpg + +- name: Ensure a key is not present in the db + ansible.builtin.rpm_key: + state: absent + key: DEADB33F + +- name: Verify the key, using a fingerprint, before import + ansible.builtin.rpm_key: + key: /path/to/RPM-GPG-KEY.dag.txt + fingerprint: EBC6 E12C 62B1 C734 026B 2122 A20E 5214 6B8D 79E6 +''' + +RETURN = r'''#''' + +import re +import os.path +import tempfile + +# import module snippets +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.urls import fetch_url +from ansible.module_utils._text import to_native + + +def is_pubkey(string): + """Verifies if string is a pubkey""" + pgp_regex = ".*?(-----BEGIN PGP PUBLIC KEY BLOCK-----.*?-----END PGP PUBLIC KEY BLOCK-----).*" + return bool(re.match(pgp_regex, to_native(string, errors='surrogate_or_strict'), re.DOTALL)) + + +class RpmKey(object): + + def __init__(self, module): + # If the key is a url, we need to check if it's present to be idempotent, + # to do that, we need to check the keyid, which we can get from the armor. + keyfile = None + should_cleanup_keyfile = False + self.module = module + self.rpm = self.module.get_bin_path('rpm', True) + state = module.params['state'] + key = module.params['key'] + fingerprint = module.params['fingerprint'] + if fingerprint: + fingerprint = fingerprint.replace(' ', '').upper() + + self.gpg = self.module.get_bin_path('gpg') + if not self.gpg: + self.gpg = self.module.get_bin_path('gpg2', required=True) + + if '://' in key: + keyfile = self.fetch_key(key) + keyid = self.getkeyid(keyfile) + should_cleanup_keyfile = True + elif self.is_keyid(key): + keyid = key + elif os.path.isfile(key): + keyfile = key + keyid = self.getkeyid(keyfile) + else: + self.module.fail_json(msg="Not a valid key %s" % key) + keyid = self.normalize_keyid(keyid) + + if state == 'present': + if self.is_key_imported(keyid): + module.exit_json(changed=False) + else: + if not keyfile: + self.module.fail_json(msg="When importing a key, a valid file must be given") + if fingerprint: + has_fingerprint = self.getfingerprint(keyfile) + if fingerprint != has_fingerprint: + self.module.fail_json( + msg="The specified fingerprint, '%s', does not match the key fingerprint '%s'" % (fingerprint, has_fingerprint) + ) + self.import_key(keyfile) + if should_cleanup_keyfile: + self.module.cleanup(keyfile) + module.exit_json(changed=True) + else: + if self.is_key_imported(keyid): + self.drop_key(keyid) + module.exit_json(changed=True) + else: + module.exit_json(changed=False) + + def fetch_key(self, url): + """Downloads a key from url, returns a valid path to a gpg key""" + rsp, info = fetch_url(self.module, url) + if info['status'] != 200: + self.module.fail_json(msg="failed to fetch key at %s , error was: %s" % (url, info['msg'])) + + key = rsp.read() + if not is_pubkey(key): + self.module.fail_json(msg="Not a public key: %s" % url) + tmpfd, tmpname = tempfile.mkstemp() + self.module.add_cleanup_file(tmpname) + tmpfile = os.fdopen(tmpfd, "w+b") + tmpfile.write(key) + tmpfile.close() + return tmpname + + def normalize_keyid(self, keyid): + """Ensure a keyid doesn't have a leading 0x, has leading or trailing whitespace, and make sure is uppercase""" + ret = keyid.strip().upper() + if ret.startswith('0x'): + return ret[2:] + elif ret.startswith('0X'): + return ret[2:] + else: + return ret + + def getkeyid(self, keyfile): + stdout, stderr = self.execute_command([self.gpg, '--no-tty', '--batch', '--with-colons', '--fixed-list-mode', keyfile]) + for line in stdout.splitlines(): + line = line.strip() + if line.startswith('pub:'): + return line.split(':')[4] + + self.module.fail_json(msg="Unexpected gpg output") + + def getfingerprint(self, keyfile): + stdout, stderr = self.execute_command([ + self.gpg, '--no-tty', '--batch', '--with-colons', + '--fixed-list-mode', '--with-fingerprint', keyfile + ]) + for line in stdout.splitlines(): + line = line.strip() + if line.startswith('fpr:'): + # As mentioned here, + # + # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob_plain;f=doc/DETAILS + # + # The description of the `fpr` field says + # + # "fpr :: Fingerprint (fingerprint is in field 10)" + # + return line.split(':')[9] + + self.module.fail_json(msg="Unexpected gpg output") + + def is_keyid(self, keystr): + """Verifies if a key, as provided by the user is a keyid""" + return re.match('(0x)?[0-9a-f]{8}', keystr, flags=re.IGNORECASE) + + def execute_command(self, cmd): + rc, stdout, stderr = self.module.run_command(cmd, use_unsafe_shell=True) + if rc != 0: + self.module.fail_json(msg=stderr) + return stdout, stderr + + def is_key_imported(self, keyid): + cmd = self.rpm + ' -q gpg-pubkey' + rc, stdout, stderr = self.module.run_command(cmd) + if rc != 0: # No key is installed on system + return False + cmd += ' --qf "%{description}" | ' + self.gpg + ' --no-tty --batch --with-colons --fixed-list-mode -' + stdout, stderr = self.execute_command(cmd) + for line in stdout.splitlines(): + if keyid in line.split(':')[4]: + return True + return False + + def import_key(self, keyfile): + if not self.module.check_mode: + self.execute_command([self.rpm, '--import', keyfile]) + + def drop_key(self, keyid): + if not self.module.check_mode: + self.execute_command([self.rpm, '--erase', '--allmatches', "gpg-pubkey-%s" % keyid[-8:].lower()]) + + +def main(): + module = AnsibleModule( + argument_spec=dict( + state=dict(type='str', default='present', choices=['absent', 'present']), + key=dict(type='str', required=True, no_log=False), + fingerprint=dict(type='str'), + validate_certs=dict(type='bool', default=True), + ), + supports_check_mode=True, + ) + + RpmKey(module) + + +if __name__ == '__main__': + main() |