diff options
Diffstat (limited to 'ansible_collections/community/crypto/plugins/modules')
6 files changed, 339 insertions, 31 deletions
diff --git a/ansible_collections/community/crypto/plugins/modules/acme_certificate.py b/ansible_collections/community/crypto/plugins/modules/acme_certificate.py index 9c0b349c4..21a6d6ae9 100644 --- a/ansible_collections/community/crypto/plugins/modules/acme_certificate.py +++ b/ansible_collections/community/crypto/plugins/modules/acme_certificate.py @@ -661,7 +661,7 @@ class ACMECertificateClient(object): raise ModuleFailException("CSR %s not found" % (self.csr)) # Extract list of identifiers from CSR - self.identifiers = self.client.backend.get_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content) + self.identifiers = self.client.backend.get_ordered_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content) def is_first_step(self): ''' diff --git a/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py b/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py index 9740cd16d..48b65f998 100644 --- a/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py +++ b/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py @@ -165,6 +165,16 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import ( read_file, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + get_now_datetime, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + CRYPTOGRAPHY_TIMEZONE, + set_not_valid_after, + set_not_valid_before, +) + CRYPTOGRAPHY_IMP_ERR = None try: import cryptography @@ -244,8 +254,9 @@ def main(): domain = to_text(challenge_data['resource']) identifier_type, identifier = to_text(challenge_data.get('resource_original', 'dns:' + challenge_data['resource'])).split(':', 1) subject = issuer = cryptography.x509.Name([]) - not_valid_before = datetime.datetime.utcnow() - not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=10) + now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + not_valid_before = now + not_valid_after = now + datetime.timedelta(days=10) if identifier_type == 'dns': san = cryptography.x509.DNSName(identifier) elif identifier_type == 'ip': @@ -254,7 +265,7 @@ def main(): raise ModuleFailException('Unsupported identifier type "{0}"'.format(identifier_type)) # Generate regular self-signed certificate - regular_certificate = cryptography.x509.CertificateBuilder().subject_name( + cert_builder = cryptography.x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer @@ -262,14 +273,13 @@ def main(): private_key.public_key() ).serial_number( cryptography.x509.random_serial_number() - ).not_valid_before( - not_valid_before - ).not_valid_after( - not_valid_after ).add_extension( cryptography.x509.SubjectAlternativeName([san]), critical=False, - ).sign( + ) + cert_builder = set_not_valid_before(cert_builder, not_valid_before) + cert_builder = set_not_valid_after(cert_builder, not_valid_after) + regular_certificate = cert_builder.sign( private_key, cryptography.hazmat.primitives.hashes.SHA256(), _cryptography_backend @@ -278,7 +288,7 @@ def main(): # Process challenge if challenge == 'tls-alpn-01': value = base64.b64decode(challenge_data['resource_value']) - challenge_certificate = cryptography.x509.CertificateBuilder().subject_name( + cert_builder = cryptography.x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer @@ -286,10 +296,6 @@ def main(): private_key.public_key() ).serial_number( cryptography.x509.random_serial_number() - ).not_valid_before( - not_valid_before - ).not_valid_after( - not_valid_after ).add_extension( cryptography.x509.SubjectAlternativeName([san]), critical=False, @@ -299,7 +305,10 @@ def main(): encode_octet_string(value), ), critical=True, - ).sign( + ) + cert_builder = set_not_valid_before(cert_builder, not_valid_before) + cert_builder = set_not_valid_after(cert_builder, not_valid_after) + challenge_certificate = cert_builder.sign( private_key, cryptography.hazmat.primitives.hashes.SHA256(), _cryptography_backend diff --git a/ansible_collections/community/crypto/plugins/modules/get_certificate.py b/ansible_collections/community/crypto/plugins/modules/get_certificate.py index 0f8abc90a..6ae9439d3 100644 --- a/ansible_collections/community/crypto/plugins/modules/get_certificate.py +++ b/ansible_collections/community/crypto/plugins/modules/get_certificate.py @@ -209,7 +209,6 @@ EXAMPLES = ''' import atexit import base64 -import datetime import traceback from os.path import isfile @@ -221,9 +220,16 @@ from ansible.module_utils.common.text.converters import to_bytes from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + get_now_datetime, +) + from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + CRYPTOGRAPHY_TIMEZONE, cryptography_oid_to_name, cryptography_get_extensions_from_cert, + get_not_valid_after, + get_not_valid_before, ) MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' @@ -392,7 +398,7 @@ def main(): for attribute in x509.subject: result['subject'][cryptography_oid_to_name(attribute.oid, short=True)] = attribute.value - result['expired'] = x509.not_valid_after < datetime.datetime.utcnow() + result['expired'] = get_not_valid_after(x509) < get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) result['extensions'] = [] for dotted_number, entry in cryptography_get_extensions_from_cert(x509).items(): @@ -410,8 +416,8 @@ def main(): for attribute in x509.issuer: result['issuer'][cryptography_oid_to_name(attribute.oid, short=True)] = attribute.value - result['not_after'] = x509.not_valid_after.strftime('%Y%m%d%H%M%SZ') - result['not_before'] = x509.not_valid_before.strftime('%Y%m%d%H%M%SZ') + result['not_after'] = get_not_valid_after(x509).strftime('%Y%m%d%H%M%SZ') + result['not_before'] = get_not_valid_before(x509).strftime('%Y%m%d%H%M%SZ') result['serial_number'] = x509.serial_number result['signature_algorithm'] = cryptography_oid_to_name(x509.signature_algorithm_oid) diff --git a/ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py b/ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py new file mode 100644 index 000000000..d3e39dc11 --- /dev/null +++ b/ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py @@ -0,0 +1,280 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2024, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: x509_certificate_convert +short_description: Convert X.509 certificates +version_added: 2.19.0 +description: + - This module allows to convert X.509 certificates between different formats. +author: + - Felix Fontein (@felixfontein) +extends_documentation_fragment: + - ansible.builtin.files + - community.crypto.attributes + - community.crypto.attributes.files +attributes: + check_mode: + support: full + diff_mode: + support: none + safe_file_operations: + support: full +options: + src_path: + description: + - Name of the file containing the X.509 certificate to convert. + - Exactly one of O(src_path) or O(src_content) must be specified. + type: path + src_content: + description: + - The content of the file containing the X.509 certificate to convert. + - This must be text. If you are not sure that the input file is PEM, you must Base64 encode + the value and set O(src_content_base64=true). You can use the + P(ansible.builtin.b64encode#filter) filter plugin for this. + - Exactly one of O(src_path) or O(src_content) must be specified. + type: str + src_content_base64: + description: + - If set to V(true) when O(src_content) is provided, the module assumes that the value + of O(src_content) is Base64 encoded. + type: bool + default: false + format: + description: + - Determines which format the destination X.509 certificate should be written in. + - Please note that not every key can be exported in any format, and that not every + format supports encryption. + type: str + choices: + - pem + - der + required: true + strict: + description: + - If the input is a PEM file, ensure that it contains a single PEM object, that + the header and footer match, and are of type C(CERTIFICATE) or C(X509 CERTIFICATE). + type: bool + default: false + dest_path: + description: + - Name of the file in which the generated TLS/SSL X.509 certificate will be written. + type: path + required: true + backup: + description: + - Create a backup file including a timestamp so you can get + the original X.509 certificate back if you overwrote it with a new one by accident. + type: bool + default: false +seealso: + - plugin: ansible.builtin.b64encode + plugin_type: filter + - module: community.crypto.x509_certificate + - module: community.crypto.x509_certificate_pipe + - module: community.crypto.x509_certificate_info +''' + +EXAMPLES = r''' +- name: Convert PEM X.509 certificate to DER format + community.crypto.x509_certificate_convert: + src_path: /etc/ssl/cert/ansible.com.pem + dest_path: /etc/ssl/cert/ansible.com.der + format: der +''' + +RETURN = r''' +backup_file: + description: Name of backup file created. + returned: changed and if O(backup) is V(true) + type: str + sample: /path/to/cert.pem.2019-03-09@11:22~ +''' + +import base64 +import os + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.common.text.converters import to_native, to_bytes, to_text + +from ansible_collections.community.crypto.plugins.module_utils.io import ( + load_file_if_exists, + write_file, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( + PEM_START, + PEM_END_START, + PEM_END, + identify_pem_format, + split_pem_list, + extract_pem, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + OpenSSLObject, +) + + +def parse_certificate(input, strict=False): + input_format = 'pem' if identify_pem_format(input) else 'der' + if input_format == 'pem': + pems = split_pem_list(to_text(input)) + if len(pems) > 1 and strict: + raise ValueError('The input contains {count} PEM objects, expecting only one since strict=true'.format(count=len(pems))) + pem_header_type, content = extract_pem(pems[0], strict=strict) + if strict and pem_header_type not in ('CERTIFICATE', 'X509 CERTIFICATE'): + raise ValueError('type is {type!r}, expecting CERTIFICATE or X509 CERTIFICATE'.format(type=pem_header_type)) + input = base64.b64decode(content) + else: + pem_header_type = None + return input, input_format, pem_header_type + + +class X509CertificateConvertModule(OpenSSLObject): + def __init__(self, module): + super(X509CertificateConvertModule, self).__init__( + module.params['dest_path'], + 'present', + False, + module.check_mode, + ) + + self.src_path = module.params['src_path'] + self.src_content = module.params['src_content'] + self.src_content_base64 = module.params['src_content_base64'] + if self.src_content is not None: + self.input = to_bytes(self.src_content) + if self.src_content_base64: + try: + self.input = base64.b64decode(self.input) + except Exception as exc: + module.fail_json(msg='Cannot Base64 decode src_content: {exc}'.format(exc=exc)) + else: + try: + with open(self.src_path, 'rb') as f: + self.input = f.read() + except Exception as exc: + module.fail_json(msg='Failure while reading file {fn}: {exc}'.format(fn=self.src_path, exc=exc)) + + self.format = module.params['format'] + self.strict = module.params['strict'] + self.wanted_pem_type = 'CERTIFICATE' + + try: + self.input, self.input_format, dummy = parse_certificate(self.input, strict=self.strict) + except Exception as exc: + module.fail_json(msg='Error while parsing PEM: {exc}'.format(exc=exc)) + + self.backup = module.params['backup'] + self.backup_file = None + + module.params['path'] = self.path + + self.dest_content = load_file_if_exists(self.path, module) + self.dest_content_format = None + self.dest_content_pem_type = None + if self.dest_content is not None: + try: + self.dest_content, self.dest_content_format, self.dest_content_pem_type = parse_certificate( + self.dest_content, strict=True) + except Exception: + pass + + def needs_conversion(self): + if self.dest_content is None or self.dest_content_format is None: + return True + if self.dest_content_format != self.format: + return True + if self.input != self.dest_content: + return True + if self.format == 'pem' and self.dest_content_pem_type != self.wanted_pem_type: + return True + return False + + def get_dest_certificate(self): + if self.format == 'der': + return self.input + data = to_bytes(base64.b64encode(self.input)) + lines = [to_bytes('{0}{1}{2}'.format(PEM_START, self.wanted_pem_type, PEM_END))] + lines += [data[i:i + 64] for i in range(0, len(data), 64)] + lines.append(to_bytes('{0}{1}{2}\n'.format(PEM_END_START, self.wanted_pem_type, PEM_END))) + return b'\n'.join(lines) + + def generate(self, module): + """Do conversion.""" + if self.needs_conversion(): + # Convert + cert_data = self.get_dest_certificate() + if not self.check_mode: + if self.backup: + self.backup_file = module.backup_local(self.path) + write_file(module, cert_data) + self.changed = True + + file_args = module.load_file_common_arguments(module.params) + if module.check_file_absent_if_check_mode(file_args['path']): + self.changed = True + else: + self.changed = module.set_fs_attributes_if_different(file_args, self.changed) + + def dump(self): + """Serialize the object into a dictionary.""" + result = dict( + changed=self.changed, + ) + if self.backup_file: + result['backup_file'] = self.backup_file + + return result + + +def main(): + argument_spec = dict( + src_path=dict(type='path'), + src_content=dict(type='str'), + src_content_base64=dict(type='bool', default=False), + format=dict(type='str', required=True, choices=['pem', 'der']), + strict=dict(type='bool', default=False), + dest_path=dict(type='path', required=True), + backup=dict(type='bool', default=False), + ) + module = AnsibleModule( + argument_spec, + supports_check_mode=True, + add_file_common_args=True, + required_one_of=[('src_path', 'src_content')], + mutually_exclusive=[('src_path', 'src_content')], + ) + + base_dir = os.path.dirname(module.params['dest_path']) or '.' + if not os.path.isdir(base_dir): + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) + + try: + cert = X509CertificateConvertModule(module) + cert.generate(module) + result = cert.dump() + module.exit_json(**result) + except OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py b/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py index d89f610c5..8379937f7 100644 --- a/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py +++ b/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py @@ -410,6 +410,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im get_relative_time_option, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + CRYPTOGRAPHY_TIMEZONE, +) + from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import ( select_backend, ) @@ -451,7 +455,7 @@ def main(): module.fail_json( msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v)) ) - valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k)) + valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k), with_timezone=CRYPTOGRAPHY_TIMEZONE) try: result = module_backend.get_info(der_support_enabled=module.params['content'] is None) diff --git a/ansible_collections/community/crypto/plugins/modules/x509_crl.py b/ansible_collections/community/crypto/plugins/modules/x509_crl.py index 1ac97005a..527975b88 100644 --- a/ansible_collections/community/crypto/plugins/modules/x509_crl.py +++ b/ansible_collections/community/crypto/plugins/modules/x509_crl.py @@ -475,6 +475,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + CRYPTOGRAPHY_TIMEZONE, cryptography_decode_name, cryptography_get_name, cryptography_key_needs_digest_for_signing, @@ -484,11 +485,17 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp ) from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import ( + CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE, REVOCATION_REASON_MAP, TIMESTAMP_FORMAT, cryptography_decode_revoked_certificate, cryptography_dump_revoked, cryptography_get_signature_algorithm_oid_from_crl, + get_next_update, + get_last_update, + set_next_update, + set_last_update, + set_revocation_date, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( @@ -560,8 +567,8 @@ class CRL(OpenSSLObject): except (TypeError, ValueError) as exc: module.fail_json(msg=to_native(exc)) - self.last_update = get_relative_time_option(module.params['last_update'], 'last_update') - self.next_update = get_relative_time_option(module.params['next_update'], 'next_update') + self.last_update = get_relative_time_option(module.params['last_update'], 'last_update', with_timezone=CRYPTOGRAPHY_TIMEZONE) + self.next_update = get_relative_time_option(module.params['next_update'], 'next_update', with_timezone=CRYPTOGRAPHY_TIMEZONE) self.digest = select_message_digest(module.params['digest']) if self.digest is None: @@ -607,7 +614,8 @@ class CRL(OpenSSLObject): result['issuer_critical'] = rc['issuer_critical'] result['revocation_date'] = get_relative_time_option( rc['revocation_date'], - path_prefix + 'revocation_date' + path_prefix + 'revocation_date', + with_timezone=CRYPTOGRAPHY_TIMEZONE, ) if rc['reason']: result['reason'] = REVOCATION_REASON_MAP[rc['reason']] @@ -615,7 +623,8 @@ class CRL(OpenSSLObject): if rc['invalidity_date']: result['invalidity_date'] = get_relative_time_option( rc['invalidity_date'], - path_prefix + 'invalidity_date' + path_prefix + 'invalidity_date', + with_timezone=CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE, ) result['invalidity_date_critical'] = rc['invalidity_date_critical'] self.revoked_certificates.append(result) @@ -731,9 +740,9 @@ class CRL(OpenSSLObject): if self.crl is None: return False - if self.last_update != self.crl.last_update and not self.ignore_timestamps: + if self.last_update != get_last_update(self.crl) and not self.ignore_timestamps: return False - if self.next_update != self.crl.next_update and not self.ignore_timestamps: + if self.next_update != get_next_update(self.crl) and not self.ignore_timestamps: return False if cryptography_key_needs_digest_for_signing(self.privatekey): if self.crl.signature_hash_algorithm is None or self.digest.name != self.crl.signature_hash_algorithm.name: @@ -780,8 +789,8 @@ class CRL(OpenSSLObject): except ValueError as e: raise CRLError(e) - crl = crl.last_update(self.last_update) - crl = crl.next_update(self.next_update) + crl = set_last_update(crl, self.last_update) + crl = set_next_update(crl, self.next_update) if self.update and self.crl: new_entries = set([self._compress_entry(entry) for entry in self.revoked_certificates]) @@ -792,7 +801,7 @@ class CRL(OpenSSLObject): for entry in self.revoked_certificates: revoked_cert = RevokedCertificateBuilder() revoked_cert = revoked_cert.serial_number(entry['serial_number']) - revoked_cert = revoked_cert.revocation_date(entry['revocation_date']) + revoked_cert = set_revocation_date(revoked_cert, entry['revocation_date']) if entry['issuer'] is not None: revoked_cert = revoked_cert.add_extension( x509.CertificateIssuer(entry['issuer']), @@ -876,8 +885,8 @@ class CRL(OpenSSLObject): for entry in self.revoked_certificates: result['revoked_certificates'].append(cryptography_dump_revoked(entry, idn_rewrite=self.name_encoding)) elif self.crl: - result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT) - result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT) + result['last_update'] = get_last_update(self.crl).strftime(TIMESTAMP_FORMAT) + result['next_update'] = get_next_update(self.crl).strftime(TIMESTAMP_FORMAT) result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl)) issuer = [] for attribute in self.crl.issuer: |