diff options
Diffstat (limited to 'ansible_collections/community/crypto/plugins/module_utils/acme')
7 files changed, 431 insertions, 30 deletions
diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py b/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py index 74d0bc1ea..7f9b954c0 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py @@ -21,6 +21,8 @@ from ansible.module_utils.common.text.converters import to_bytes from ansible.module_utils.urls import fetch_url from ansible.module_utils.six import PY3 +from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec + from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import ( OpenSSLCLIBackend, ) @@ -42,7 +44,9 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor ) from ansible_collections.community.crypto.plugins.module_utils.acme.utils import ( + compute_cert_id, nopad_b64, + parse_retry_after, ) try: @@ -153,6 +157,9 @@ class ACMEDirectory(object): self.module, msg='Was not able to obtain nonce, giving up after 5 retries', info=info, response=response) retry_count += 1 + def has_renewal_info_endpoint(self): + return 'renewalInfo' in self.directory + class ACMEClient(object): ''' @@ -168,9 +175,9 @@ class ACMEClient(object): self.backend = backend self.version = module.params['acme_version'] # account_key path and content are mutually exclusive - self.account_key_file = module.params['account_key_src'] - self.account_key_content = module.params['account_key_content'] - self.account_key_passphrase = module.params['account_key_passphrase'] + self.account_key_file = module.params.get('account_key_src') + self.account_key_content = module.params.get('account_key_content') + self.account_key_passphrase = module.params.get('account_key_passphrase') # Grab account URI from module parameters. # Make sure empty string is treated as None. @@ -383,24 +390,94 @@ class ACMEClient(object): self.module, msg=error_msg, info=info, content=content, content_json=result if parsed_json_result else None) return result, info + def get_renewal_info( + self, + cert_id=None, + cert_info=None, + cert_filename=None, + cert_content=None, + include_retry_after=False, + retry_after_relative_with_timezone=True, + ): + if not self.directory.has_renewal_info_endpoint(): + raise ModuleFailException('The ACME endpoint does not support ACME Renewal Information retrieval') + + if cert_id is None: + cert_id = compute_cert_id(self.backend, cert_info=cert_info, cert_filename=cert_filename, cert_content=cert_content) + url = '{base}{cert_id}'.format(base=self.directory.directory['renewalInfo'], cert_id=cert_id) + + data, info = self.get_request(url, parse_json_result=True, fail_on_error=True, get_only=True) + + # Include Retry-After header if asked for + if include_retry_after and 'retry-after' in info: + try: + data['retryAfter'] = parse_retry_after( + info['retry-after'], + relative_with_timezone=retry_after_relative_with_timezone, + ) + except ValueError: + pass + return data + def get_default_argspec(): ''' Provides default argument spec for the options documented in the acme doc fragment. + + DEPRECATED: will be removed in community.crypto 3.0.0 ''' return dict( - account_key_src=dict(type='path', aliases=['account_key']), - account_key_content=dict(type='str', no_log=True), - account_key_passphrase=dict(type='str', no_log=True), - account_uri=dict(type='str'), acme_directory=dict(type='str', required=True), acme_version=dict(type='int', required=True, choices=[1, 2]), validate_certs=dict(type='bool', default=True), select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']), request_timeout=dict(type='int', default=10), + account_key_src=dict(type='path', aliases=['account_key']), + account_key_content=dict(type='str', no_log=True), + account_key_passphrase=dict(type='str', no_log=True), + account_uri=dict(type='str'), ) +def create_default_argspec( + with_account=True, + require_account_key=True, + with_certificate=False, +): + ''' + Provides default argument spec for the options documented in the acme doc fragment. + ''' + result = ArgumentSpec( + argument_spec=dict( + acme_directory=dict(type='str', required=True), + acme_version=dict(type='int', required=True, choices=[1, 2]), + validate_certs=dict(type='bool', default=True), + select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']), + request_timeout=dict(type='int', default=10), + ), + ) + if with_account: + result.update_argspec( + account_key_src=dict(type='path', aliases=['account_key']), + account_key_content=dict(type='str', no_log=True), + account_key_passphrase=dict(type='str', no_log=True), + account_uri=dict(type='str'), + ) + if require_account_key: + result.update(required_one_of=[['account_key_src', 'account_key_content']]) + result.update(mutually_exclusive=[['account_key_src', 'account_key_content']]) + if with_certificate: + result.update_argspec( + csr=dict(type='path'), + csr_content=dict(type='str'), + ) + result.update( + required_one_of=[['csr', 'csr_content']], + mutually_exclusive=[['csr', 'csr_content']], + ) + return result + + def create_backend(module, needs_acme_v2): if not HAS_IPADDRESS: module.fail_json(msg=missing_required_lib('ipaddress'), exception=IPADDRESS_IMPORT_ERROR) diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py index 0722c1f99..b652240dc 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py @@ -11,6 +11,7 @@ __metaclass__ = type import base64 import binascii +import datetime import os import traceback @@ -19,7 +20,9 @@ from ansible.module_utils.common.text.converters import to_bytes, to_native, to_ from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( + CertificateInformation, CryptoBackend, + _parse_acme_timestamp, ) from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import ( @@ -35,27 +38,40 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import re from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64 +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + from ansible_collections.community.crypto.plugins.module_utils.crypto.math import ( convert_int_to_bytes, convert_int_to_hex, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - get_now_datetime, - ensure_utc_timezone, - parse_name_field, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, cryptography_name_to_oid, + cryptography_serial_number_of_cert, get_not_valid_after, + get_not_valid_before, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( extract_first_pem, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + parse_name_field, +) + +from ansible_collections.community.crypto.plugins.module_utils.time import ( + ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, + get_now_datetime, + get_relative_time_option, + UTC, +) + CRYPTOGRAPHY_MINIMAL_VERSION = '1.5' CRYPTOGRAPHY_ERROR = None @@ -170,6 +186,32 @@ class CryptographyBackend(CryptoBackend): def __init__(self, module): super(CryptographyBackend, self).__init__(module) + def get_now(self): + return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def parse_acme_timestamp(self, timestamp_str): + return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def parse_module_parameter(self, value, name): + try: + return get_relative_time_option(value, name, backend='cryptography', with_timezone=CRYPTOGRAPHY_TIMEZONE) + except OpenSSLObjectError as exc: + raise BackendException(to_native(exc)) + + def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage): + start = get_epoch_seconds(timestamp_start) + end = get_epoch_seconds(timestamp_end) + return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def get_utc_datetime(self, *args, **kwargs): + kwargs_ext = dict(kwargs) + if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8): + kwargs_ext['tzinfo'] = UTC + result = datetime.datetime(*args, **kwargs_ext) + if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8): + result = ensure_utc_timezone(result) + return result + def parse_key(self, key_file=None, key_content=None, passphrase=None): ''' Parses an RSA or Elliptic Curve key file in PEM format and returns key_data. @@ -376,7 +418,7 @@ class CryptographyBackend(CryptoBackend): raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e)) if now is None: - now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + now = self.get_now() elif CRYPTOGRAPHY_TIMEZONE: now = ensure_utc_timezone(now) return (get_not_valid_after(cert) - now).days @@ -386,3 +428,44 @@ class CryptographyBackend(CryptoBackend): Given a Criterium object, creates a ChainMatcher object. ''' return CryptographyChainMatcher(criterium, self.module) + + def get_cert_information(self, cert_filename=None, cert_content=None): + ''' + Return some information on a X.509 certificate as a CertificateInformation object. + ''' + if cert_filename is not None: + cert_content = read_file(cert_filename) + else: + cert_content = to_bytes(cert_content) + + # Make sure we have at most one PEM. Otherwise cryptography 36.0.0 will barf. + cert_content = to_bytes(extract_first_pem(to_text(cert_content)) or '') + + try: + cert = cryptography.x509.load_pem_x509_certificate(cert_content, _cryptography_backend) + except Exception as e: + if cert_filename is None: + raise BackendException('Cannot parse certificate: {0}'.format(e)) + raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e)) + + ski = None + try: + ext = cert.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier) + ski = ext.value.digest + except cryptography.x509.ExtensionNotFound: + pass + + aki = None + try: + ext = cert.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier) + aki = ext.value.key_identifier + except cryptography.x509.ExtensionNotFound: + pass + + return CertificateInformation( + not_valid_after=get_not_valid_after(cert), + not_valid_before=get_not_valid_before(cert), + serial_number=cryptography_serial_number_of_cert(cert), + subject_key_identifier=ski, + authority_key_identifier=aki, + ) diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py index 9a1ed1f5a..9aab187ac 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py @@ -20,6 +20,7 @@ import traceback from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( + CertificateInformation, CryptoBackend, ) @@ -30,6 +31,8 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64 +from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_bytes_to_int + try: import ipaddress except ImportError: @@ -39,6 +42,33 @@ except ImportError: _OPENSSL_ENVIRONMENT_UPDATE = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C') +def _extract_date(out_text, name, cert_filename_suffix=""): + try: + date_str = re.search(r"\s+%s\s*:\s+(.*)" % name, out_text).group(1) + return datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z') + except AttributeError: + raise BackendException("No '{0}' date found{1}".format(name, cert_filename_suffix)) + except ValueError as exc: + raise BackendException("Failed to parse '{0}' date{1}: {2}".format(name, cert_filename_suffix, exc)) + + +def _decode_octets(octets_text): + return binascii.unhexlify(re.sub(r"(\s|:)", "", octets_text).encode("utf-8")) + + +def _extract_octets(out_text, name, required=True, potential_prefixes=None): + regexp = r"\s+%s:\s*\n\s+%s([A-Fa-f0-9]{2}(?::[A-Fa-f0-9]{2})*)\s*\n" % ( + name, + ('(?:%s)' % '|'.join(re.escape(pp) for pp in potential_prefixes)) if potential_prefixes else '', + ) + match = re.search(regexp, out_text, re.MULTILINE | re.DOTALL) + if match is not None: + return _decode_octets(match.group(1)) + if not required: + return None + raise BackendException("No '{0}' octet string found".format(name)) + + class OpenSSLCLIBackend(CryptoBackend): def __init__(self, module, openssl_binary=None): super(OpenSSLCLIBackend, self).__init__(module) @@ -89,10 +119,12 @@ class OpenSSLCLIBackend(CryptoBackend): dummy, out, dummy = self.module.run_command( openssl_keydump_cmd, check_rc=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) + out_text = to_text(out, errors='surrogate_or_strict') + if account_key_type == 'rsa': - pub_hex, pub_exp = re.search( - r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)", - to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL).groups() + pub_hex = re.search(r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent", out_text, re.MULTILINE | re.DOTALL).group(1) + + pub_exp = re.search(r"\npublicExponent: ([0-9]+)", out_text, re.MULTILINE | re.DOTALL).group(1) pub_exp = "{0:x}".format(int(pub_exp)) if len(pub_exp) % 2: pub_exp = "0{0}".format(pub_exp) @@ -104,17 +136,19 @@ class OpenSSLCLIBackend(CryptoBackend): 'jwk': { "kty": "RSA", "e": nopad_b64(binascii.unhexlify(pub_exp.encode("utf-8"))), - "n": nopad_b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), + "n": nopad_b64(_decode_octets(pub_hex)), }, 'hash': 'sha256', } elif account_key_type == 'ec': pub_data = re.search( r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?", - to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL) + out_text, + re.MULTILINE | re.DOTALL, + ) if pub_data is None: raise KeyParsingError('cannot parse elliptic curve key') - pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8")) + pub_hex = _decode_octets(pub_data.group(1)) asn1_oid_curve = pub_data.group(2).lower() nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None if asn1_oid_curve == 'prime256v1' or nist_curve == 'p-256': @@ -303,13 +337,8 @@ class OpenSSLCLIBackend(CryptoBackend): openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"] dummy, out, dummy = self.module.run_command( openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) - try: - not_after_str = re.search(r"\s+Not After\s*:\s+(.*)", to_text(out, errors='surrogate_or_strict')).group(1) - not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z') - except AttributeError: - raise BackendException("No 'Not after' date found{0}".format(cert_filename_suffix)) - except ValueError: - raise BackendException("Failed to parse 'Not after' date{0}".format(cert_filename_suffix)) + out_text = to_text(out, errors='surrogate_or_strict') + not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix) if now is None: now = datetime.datetime.now() return (not_after - now).days @@ -319,3 +348,43 @@ class OpenSSLCLIBackend(CryptoBackend): Given a Criterium object, creates a ChainMatcher object. ''' raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.') + + def get_cert_information(self, cert_filename=None, cert_content=None): + ''' + Return some information on a X.509 certificate as a CertificateInformation object. + ''' + filename = cert_filename + data = None + if cert_filename is not None: + cert_filename_suffix = ' in {0}'.format(cert_filename) + else: + filename = '/dev/stdin' + data = to_bytes(cert_content) + cert_filename_suffix = '' + + openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"] + dummy, out, dummy = self.module.run_command( + openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) + out_text = to_text(out, errors='surrogate_or_strict') + + not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix) + not_before = _extract_date(out_text, 'Not Before', cert_filename_suffix=cert_filename_suffix) + + sn = re.search( + r" Serial Number: ([0-9]+)", + to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL) + if sn: + serial = int(sn.group(1)) + else: + serial = convert_bytes_to_int(_extract_octets(out_text, 'Serial Number', required=True)) + + ski = _extract_octets(out_text, 'X509v3 Subject Key Identifier', required=False) + aki = _extract_octets(out_text, 'X509v3 Authority Key Identifier', required=False, potential_prefixes=['keyid:', '']) + + return CertificateInformation( + not_valid_after=not_after, + not_valid_before=not_before, + serial_number=serial, + subject_key_identifier=ski, + authority_key_identifier=aki, + ) diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py b/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py index 2d95a3ee3..7c08fae95 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py @@ -9,9 +9,78 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +from collections import namedtuple import abc +import datetime +import re from ansible.module_utils import six +from ansible.module_utils.common.text.converters import to_native + +from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( + BackendException, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + +from ansible_collections.community.crypto.plugins.module_utils.time import ( + ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, + get_now_datetime, + get_relative_time_option, + remove_timezone, +) + + +CertificateInformation = namedtuple( + 'CertificateInformation', + ( + 'not_valid_after', + 'not_valid_before', + 'serial_number', + 'subject_key_identifier', + 'authority_key_identifier', + ), +) + + +_FRACTIONAL_MATCHER = re.compile(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(|\.\d+)(Z|[+-]\d{2}:?\d{2}.*)$') + + +def _reduce_fractional_digits(timestamp_str): + """ + Given a RFC 3339 timestamp that includes too many digits for the fractional seconds part, reduces these to at most 6. + """ + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + m = _FRACTIONAL_MATCHER.match(timestamp_str) + if not m: + raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str)) + timestamp, fractional, timezone = m.groups() + if len(fractional) > 7: + # Python does not support anything smaller than microseconds + # (Golang supports nanoseconds, Boulder often emits more fractional digits, which Python chokes on) + fractional = fractional[:7] + return '%s%s%s' % (timestamp, fractional, timezone) + + +def _parse_acme_timestamp(timestamp_str, with_timezone): + """ + Parses a RFC 3339 timestamp. + """ + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + timestamp_str = _reduce_fractional_digits(timestamp_str) + for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'): + # Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491 + try: + result = datetime.datetime.strptime(timestamp_str, format) + except ValueError: + pass + else: + return ensure_utc_timezone(result) if with_timezone else remove_timezone(result) + raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str)) @six.add_metaclass(abc.ABCMeta) @@ -19,6 +88,30 @@ class CryptoBackend(object): def __init__(self, module): self.module = module + def get_now(self): + return get_now_datetime(with_timezone=False) + + def parse_acme_timestamp(self, timestamp_str): + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + return _parse_acme_timestamp(timestamp_str, with_timezone=False) + + def parse_module_parameter(self, value, name): + try: + return get_relative_time_option(value, name, backend='cryptography', with_timezone=False) + except OpenSSLObjectError as exc: + raise BackendException(to_native(exc)) + + def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage): + start = get_epoch_seconds(timestamp_start) + end = get_epoch_seconds(timestamp_end) + return from_epoch_seconds(start + percentage * (end - start), with_timezone=False) + + def get_utc_datetime(self, *args, **kwargs): + result = datetime.datetime(*args, **kwargs) + if 'tzinfo' in kwargs or len(args) >= 8: + result = remove_timezone(result) + return result + @abc.abstractmethod def parse_key(self, key_file=None, key_content=None, passphrase=None): ''' @@ -74,3 +167,12 @@ class CryptoBackend(object): ''' Given a Criterium object, creates a ChainMatcher object. ''' + + def get_cert_information(self, cert_filename=None, cert_content=None): + ''' + Return some information on a X.509 certificate as a CertificateInformation object. + ''' + # Not implementing this method in a backend is DEPRECATED and will be + # disallowed in community.crypto 3.0.0. This method will be marked as + # @abstractmethod by then. + raise BackendException('This backend does not support get_cert_information()') diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/challenges.py b/ansible_collections/community/crypto/plugins/module_utils/acme/challenges.py index 3a87ffec1..116ca4206 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/challenges.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/challenges.py @@ -103,7 +103,7 @@ class Challenge(object): # https://tools.ietf.org/html/rfc8555#section-8.4 resource = '_acme-challenge' value = nopad_b64(hashlib.sha256(to_bytes(key_authorization)).digest()) - record = (resource + identifier[1:]) if identifier.startswith('*.') else '{0}.{1}'.format(resource, identifier) + record = '{0}.{1}'.format(resource, identifier[2:] if identifier.startswith('*.') else identifier) return { 'resource': resource, 'resource_value': value, @@ -283,13 +283,21 @@ class Authorization(object): return self.status == 'valid' return self.wait_for_validation(client, challenge_type) + def can_deactivate(self): + ''' + Deactivates this authorization. + https://community.letsencrypt.org/t/authorization-deactivation/19860/2 + https://tools.ietf.org/html/rfc8555#section-7.5.2 + ''' + return self.status in ('valid', 'pending') + def deactivate(self, client): ''' Deactivates this authorization. https://community.letsencrypt.org/t/authorization-deactivation/19860/2 https://tools.ietf.org/html/rfc8555#section-7.5.2 ''' - if self.status != 'valid': + if not self.can_deactivate(): return authz_deactivate = { 'status': 'deactivated' diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/orders.py b/ansible_collections/community/crypto/plugins/module_utils/acme/orders.py index 732b430df..98c28445f 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/orders.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/orders.py @@ -32,6 +32,7 @@ class Order(object): self.identifiers = [] for identifier in data['identifiers']: self.identifiers.append((identifier['type'], identifier['value'])) + self.replaces_cert_id = data.get('replaces') self.finalize_uri = data.get('finalize') self.certificate_uri = data.get('certificate') self.authorization_uris = data['authorizations'] @@ -44,6 +45,7 @@ class Order(object): self.status = None self.identifiers = [] + self.replaces_cert_id = None self.finalize_uri = None self.certificate_uri = None self.authorization_uris = [] @@ -62,7 +64,7 @@ class Order(object): return result @classmethod - def create(cls, client, identifiers): + def create(cls, client, identifiers, replaces_cert_id=None): ''' Start a new certificate order (ACME v2 protocol). https://tools.ietf.org/html/rfc8555#section-7.4 @@ -76,6 +78,8 @@ class Order(object): new_order = { "identifiers": acme_identifiers } + if replaces_cert_id is not None: + new_order["replaces"] = replaces_cert_id result, info = client.send_signed_request( client.directory['newOrder'], new_order, error_msg='Failed to start new order', expected_status_codes=[201]) return cls.from_json(client, result, info['location']) diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/utils.py b/ansible_collections/community/crypto/plugins/module_utils/acme/utils.py index 217b6de47..ba460444b 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/utils.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/utils.py @@ -10,6 +10,7 @@ __metaclass__ = type import base64 +import datetime import re import textwrap import traceback @@ -19,6 +20,10 @@ from ansible.module_utils.six.moves.urllib.parse import unquote from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException +from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes + +from ansible_collections.community.crypto.plugins.module_utils.time import get_now_datetime + def nopad_b64(data): return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "") @@ -65,8 +70,61 @@ def pem_to_der(pem_filename=None, pem_content=None): def process_links(info, callback): ''' Process link header, calls callback for every link header with the URL and relation as options. + + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link ''' if 'link' in info: link = info['link'] for url, relation in re.findall(r'<([^>]+)>;\s*rel="(\w+)"', link): callback(unquote(url), relation) + + +def parse_retry_after(value, relative_with_timezone=True, now=None): + ''' + Parse the value of a Retry-After header and return a timestamp. + + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + ''' + # First try a number of seconds + try: + delta = datetime.timedelta(seconds=int(value)) + if now is None: + now = get_now_datetime(relative_with_timezone) + return now + delta + except ValueError: + pass + + try: + return datetime.datetime.strptime(value, '%a, %d %b %Y %H:%M:%S GMT') + except ValueError: + pass + + raise ValueError('Cannot parse Retry-After header value %s' % repr(value)) + + +def compute_cert_id( + backend, + cert_info=None, + cert_filename=None, + cert_content=None, + none_if_required_information_is_missing=False, +): + # Obtain certificate info if not provided + if cert_info is None: + cert_info = backend.get_cert_information(cert_filename=cert_filename, cert_content=cert_content) + + # Convert Authority Key Identifier to string + if cert_info.authority_key_identifier is None: + if none_if_required_information_is_missing: + return None + raise ModuleFailException('Certificate has no Authority Key Identifier extension') + aki = to_native(base64.urlsafe_b64encode(cert_info.authority_key_identifier)).replace('=', '') + + # Convert serial number to string + serial_bytes = convert_int_to_bytes(cert_info.serial_number) + if ord(serial_bytes[:1]) >= 128: + serial_bytes = b'\x00' + serial_bytes + serial = to_native(base64.urlsafe_b64encode(serial_bytes)).replace('=', '') + + # Compose cert ID + return '{aki}.{serial}'.format(aki=aki, serial=serial) |