summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/crypto/plugins
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-26 04:05:56 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-26 04:05:56 +0000
commit67c6a4d1dccb62159b9d9b2dea4e2f487446e276 (patch)
tree9ccbb35137f480bbbdb899accbda52a8135d3416 /ansible_collections/community/crypto/plugins
parentAdding upstream version 9.4.0+dfsg. (diff)
downloadansible-67c6a4d1dccb62159b9d9b2dea4e2f487446e276.tar.xz
ansible-67c6a4d1dccb62159b9d9b2dea4e2f487446e276.zip
Adding upstream version 9.5.1+dfsg.upstream/9.5.1+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/crypto/plugins')
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py45
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py35
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/acme/backends.py17
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_crl.py45
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_support.py27
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate.py6
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_entrust.py8
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_info.py11
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_ownca.py12
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py12
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/pem.py29
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/crypto/support.py29
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/openssh/certificate.py26
-rw-r--r--ansible_collections/community/crypto/plugins/modules/acme_certificate.py2
-rw-r--r--ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py37
-rw-r--r--ansible_collections/community/crypto/plugins/modules/get_certificate.py14
-rw-r--r--ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py280
-rw-r--r--ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py6
-rw-r--r--ansible_collections/community/crypto/plugins/modules/x509_crl.py31
19 files changed, 592 insertions, 80 deletions
diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py
index 2e388980a..0722c1f99 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_cryptography.py
@@ -11,7 +11,6 @@ __metaclass__ = type
import base64
import binascii
-import datetime
import os
import traceback
@@ -42,11 +41,15 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math impor
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
+ get_now_datetime,
+ ensure_utc_timezone,
parse_name_field,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
cryptography_name_to_oid,
+ get_not_valid_after,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
@@ -298,31 +301,51 @@ class CryptographyBackend(CryptoBackend):
},
}
- def get_csr_identifiers(self, csr_filename=None, csr_content=None):
+ def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
- Return a set of requested identifiers (CN and SANs) for the CSR.
+ Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
+
+ The list is deduplicated, and if a CNAME is present, it will be returned
+ as the first element in the result.
'''
- identifiers = set([])
if csr_content is None:
csr_content = read_file(csr_filename)
else:
csr_content = to_bytes(csr_content)
csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend)
+
+ identifiers = set()
+ result = []
+
+ def add_identifier(identifier):
+ if identifier in identifiers:
+ return
+ identifiers.add(identifier)
+ result.append(identifier)
+
for sub in csr.subject:
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
- identifiers.add(('dns', sub.value))
+ add_identifier(('dns', sub.value))
for extension in csr.extensions:
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
for name in extension.value:
if isinstance(name, cryptography.x509.DNSName):
- identifiers.add(('dns', name.value))
+ add_identifier(('dns', name.value))
elif isinstance(name, cryptography.x509.IPAddress):
- identifiers.add(('ip', name.value.compressed))
+ add_identifier(('ip', name.value.compressed))
else:
raise BackendException('Found unsupported SAN identifier {0}'.format(name))
- return identifiers
+ return result
+
+ def get_csr_identifiers(self, csr_filename=None, csr_content=None):
+ '''
+ Return a set of requested identifiers (CN and SANs) for the CSR.
+ Each identifier is a pair (type, identifier), where type is either
+ 'dns' or 'ip'.
+ '''
+ return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
@@ -353,8 +376,10 @@ class CryptographyBackend(CryptoBackend):
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
if now is None:
- now = datetime.datetime.now()
- return (cert.not_valid_after - now).days
+ now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
+ elif CRYPTOGRAPHY_TIMEZONE:
+ now = ensure_utc_timezone(now)
+ return (get_not_valid_after(cert) - now).days
def create_chain_matcher(self, criterium):
'''
diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py
index dabcbdb3b..9a1ed1f5a 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/acme/backend_openssl_cli.py
@@ -225,11 +225,14 @@ class OpenSSLCLIBackend(CryptoBackend):
# We do not want to error out on something IPAddress() cannot parse
return ip
- def get_csr_identifiers(self, csr_filename=None, csr_content=None):
+ def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
- Return a set of requested identifiers (CN and SANs) for the CSR.
+ Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
+
+ The list is deduplicated, and if a CNAME is present, it will be returned
+ as the first element in the result.
'''
filename = csr_filename
data = None
@@ -241,24 +244,40 @@ class OpenSSLCLIBackend(CryptoBackend):
dummy, out, dummy = self.module.run_command(
openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
- identifiers = set([])
+ identifiers = set()
+ result = []
+
+ def add_identifier(identifier):
+ if identifier in identifiers:
+ return
+ identifiers.add(identifier)
+ result.append(identifier)
+
common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
if common_name is not None:
- identifiers.add(('dns', common_name.group(1)))
+ add_identifier(('dns', common_name.group(1)))
subject_alt_names = re.search(
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.lower().startswith("dns:"):
- identifiers.add(('dns', san[4:]))
+ add_identifier(('dns', san[4:]))
elif san.lower().startswith("ip:"):
- identifiers.add(('ip', self._normalize_ip(san[3:])))
+ add_identifier(('ip', self._normalize_ip(san[3:])))
elif san.lower().startswith("ip address:"):
- identifiers.add(('ip', self._normalize_ip(san[11:])))
+ add_identifier(('ip', self._normalize_ip(san[11:])))
else:
raise BackendException('Found unsupported SAN identifier "{0}"'.format(san))
- return identifiers
+ return result
+
+ def get_csr_identifiers(self, csr_filename=None, csr_content=None):
+ '''
+ Return a set of requested identifiers (CN and SANs) for the CSR.
+ Each identifier is a pair (type, identifier), where type is either
+ 'dns' or 'ip'.
+ '''
+ return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py b/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py
index 5c48e1a74..2d95a3ee3 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/acme/backends.py
@@ -34,6 +34,23 @@ class CryptoBackend(object):
def create_mac_key(self, alg, key):
'''Create a MAC key.'''
+ def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
+ '''
+ Return a list of requested identifiers (CN and SANs) for the CSR.
+ Each identifier is a pair (type, identifier), where type is either
+ 'dns' or 'ip'.
+
+ The list is deduplicated, and if a CNAME is present, it will be returned
+ as the first element in the result.
+ '''
+ self.module.deprecate(
+ "Every backend must override the get_ordered_csr_identifiers() method."
+ " The default implementation will be removed in 3.0.0 and this method will be marked as `abstractmethod` by then.",
+ version='3.0.0',
+ collection_name='community.crypto',
+ )
+ return sorted(self.get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))
+
@abc.abstractmethod
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_crl.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_crl.py
index 62499e08b..8ef0d65da 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_crl.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_crl.py
@@ -19,6 +19,7 @@ from .basic import (
)
from .cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
cryptography_decode_name,
)
@@ -27,6 +28,11 @@ from ._obj2txt import (
)
+# TODO: once cryptography has a _utc variant of InvalidityDate.invalidity_date, set this
+# to True and adjust get_invalidity_date() accordingly.
+# (https://github.com/pyca/cryptography/issues/10818)
+CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE = False
+
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
@@ -55,7 +61,7 @@ else:
def cryptography_decode_revoked_certificate(cert):
result = {
'serial_number': cert.serial_number,
- 'revocation_date': cert.revocation_date,
+ 'revocation_date': get_revocation_date(cert),
'issuer': None,
'issuer_critical': False,
'reason': None,
@@ -77,7 +83,7 @@ def cryptography_decode_revoked_certificate(cert):
pass
try:
ext = cert.extensions.get_extension_for_class(x509.InvalidityDate)
- result['invalidity_date'] = ext.value.invalidity_date
+ result['invalidity_date'] = get_invalidity_date(ext.value)
result['invalidity_date_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
@@ -112,3 +118,38 @@ def cryptography_get_signature_algorithm_oid_from_crl(crl):
crl._x509_crl.sig_alg.algorithm
)
return x509.oid.ObjectIdentifier(dotted)
+
+
+def get_next_update(obj):
+ if CRYPTOGRAPHY_TIMEZONE:
+ return obj.next_update_utc
+ return obj.next_update
+
+
+def get_last_update(obj):
+ if CRYPTOGRAPHY_TIMEZONE:
+ return obj.last_update_utc
+ return obj.last_update
+
+
+def get_revocation_date(obj):
+ if CRYPTOGRAPHY_TIMEZONE:
+ return obj.revocation_date_utc
+ return obj.revocation_date
+
+
+def get_invalidity_date(obj):
+ # TODO: special handling if CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE is True
+ return obj.invalidity_date
+
+
+def set_next_update(builder, value):
+ return builder.next_update(value)
+
+
+def set_last_update(builder, value):
+ return builder.last_update(value)
+
+
+def set_revocation_date(builder, value):
+ return builder.revocation_date(value)
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_support.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_support.py
index b767d3417..3d07b35b1 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_support.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/cryptography_support.py
@@ -29,7 +29,9 @@ try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
import ipaddress
+ _HAS_CRYPTOGRAPHY = True
except ImportError:
+ _HAS_CRYPTOGRAPHY = False
# Error handled in the calling module.
pass
@@ -106,6 +108,11 @@ from ._objects import (
from ._obj2txt import obj2txt
+CRYPTOGRAPHY_TIMEZONE = False
+if _HAS_CRYPTOGRAPHY:
+ CRYPTOGRAPHY_TIMEZONE = LooseVersion(cryptography.__version__) >= LooseVersion('42.0.0')
+
+
DOTTED_OID = re.compile(r'^\d+(?:\.\d+)+$')
@@ -807,3 +814,23 @@ def cryptography_verify_certificate_signature(certificate, signer_public_key):
certificate.signature_hash_algorithm,
signer_public_key
)
+
+
+def get_not_valid_after(obj):
+ if CRYPTOGRAPHY_TIMEZONE:
+ return obj.not_valid_after_utc
+ return obj.not_valid_after
+
+
+def get_not_valid_before(obj):
+ if CRYPTOGRAPHY_TIMEZONE:
+ return obj.not_valid_before_utc
+ return obj.not_valid_before
+
+
+def set_not_valid_after(builder, value):
+ return builder.not_valid_after(value)
+
+
+def set_not_valid_before(builder, value):
+ return builder.not_valid_before(value)
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate.py
index 7a56d7e9d..7bc93d934 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate.py
@@ -32,6 +32,8 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_compare_public_keys,
+ get_not_valid_after,
+ get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import (
@@ -251,12 +253,12 @@ class CertificateBackend(object):
# Check not before
if not_before is not None and not self.ignore_timestamps:
- if self.existing_certificate.not_valid_before != not_before:
+ if get_not_valid_before(self.existing_certificate) != not_before:
return True
# Check not after
if not_after is not None and not self.ignore_timestamps:
- if self.existing_certificate.not_valid_after != not_after:
+ if get_not_valid_after(self.existing_certificate) != not_after:
return True
return False
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_entrust.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_entrust.py
index baf53f5de..7dc4641e1 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_entrust.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_entrust.py
@@ -10,7 +10,6 @@ __metaclass__ = type
import datetime
-import time
import os
from ansible.module_utils.common.text.converters import to_native, to_bytes
@@ -19,11 +18,14 @@ from ansible_collections.community.crypto.plugins.module_utils.ecs.api import EC
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
+ get_now_datetime,
get_relative_time_option,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
cryptography_serial_number_of_cert,
+ get_not_valid_after,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
@@ -99,7 +101,7 @@ class EntrustCertificateBackend(CertificateBackend):
# Handle expiration (30 days if not specified)
expiry = self.notAfter
if not expiry:
- gmt_now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
+ gmt_now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
expiry = gmt_now + datetime.timedelta(days=365)
expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")
@@ -154,7 +156,7 @@ class EntrustCertificateBackend(CertificateBackend):
expiry = None
if self.backend == 'cryptography':
serial_number = "{0:X}".format(cryptography_serial_number_of_cert(self.existing_certificate))
- expiry = self.existing_certificate.not_valid_after
+ expiry = get_not_valid_after(self.existing_certificate)
# get some information about the expiry of this certificate
expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_info.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_info.py
index b10733ceb..5db6c3586 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_info.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_info.py
@@ -12,7 +12,6 @@ __metaclass__ = type
import abc
import binascii
-import datetime
import traceback
from ansible.module_utils import six
@@ -24,13 +23,17 @@ from ansible_collections.community.crypto.plugins.module_utils.version import Lo
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_fingerprint_of_bytes,
+ get_now_datetime,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
cryptography_decode_name,
cryptography_get_extensions_from_cert,
cryptography_oid_to_name,
cryptography_serial_number_of_cert,
+ get_not_valid_after,
+ get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import (
@@ -169,7 +172,7 @@ class CertificateInfoRetrieval(object):
not_after = self.get_not_after()
result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT)
result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT)
- result['expired'] = not_after < datetime.datetime.utcnow()
+ result['expired'] = not_after < get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
result['public_key'] = to_native(self._get_public_key_pem())
@@ -322,10 +325,10 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
return None, False
def get_not_before(self):
- return self.cert.not_valid_before
+ return get_not_valid_before(self.cert)
def get_not_after(self):
- return self.cert.not_valid_after
+ return get_not_valid_after(self.cert)
def _get_public_key_pem(self):
return self.cert.public_key().public_bytes(
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_ownca.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_ownca.py
index ac1cf845a..4d312e6b7 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_ownca.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_ownca.py
@@ -31,6 +31,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
cryptography_key_needs_digest_for_signing,
cryptography_serial_number_of_cert,
cryptography_verify_certificate_signature,
+ get_not_valid_after,
+ get_not_valid_before,
+ set_not_valid_after,
+ set_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
@@ -120,8 +124,8 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
cert_builder = cert_builder.subject_name(self.csr.subject)
cert_builder = cert_builder.issuer_name(self.ca_cert.subject)
cert_builder = cert_builder.serial_number(self.serial_number)
- cert_builder = cert_builder.not_valid_before(self.notBefore)
- cert_builder = cert_builder.not_valid_after(self.notAfter)
+ cert_builder = set_not_valid_before(cert_builder, self.notBefore)
+ cert_builder = set_not_valid_after(cert_builder, self.notAfter)
cert_builder = cert_builder.public_key(self.csr.public_key())
has_ski = False
for extension in self.csr.extensions:
@@ -220,8 +224,8 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
if self.cert is None:
self.cert = self.existing_certificate
result.update({
- 'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
- 'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
+ 'notBefore': get_not_valid_before(self.cert).strftime("%Y%m%d%H%M%SZ"),
+ 'notAfter': get_not_valid_after(self.cert).strftime("%Y%m%d%H%M%SZ"),
'serial_number': cryptography_serial_number_of_cert(self.cert),
})
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py
index 8695d43ee..edd8d8d77 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py
@@ -22,6 +22,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
cryptography_key_needs_digest_for_signing,
cryptography_serial_number_of_cert,
cryptography_verify_certificate_signature,
+ get_not_valid_after,
+ get_not_valid_before,
+ set_not_valid_after,
+ set_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
@@ -95,8 +99,8 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend):
cert_builder = cert_builder.subject_name(self.csr.subject)
cert_builder = cert_builder.issuer_name(self.csr.subject)
cert_builder = cert_builder.serial_number(self.serial_number)
- cert_builder = cert_builder.not_valid_before(self.notBefore)
- cert_builder = cert_builder.not_valid_after(self.notAfter)
+ cert_builder = set_not_valid_before(cert_builder, self.notBefore)
+ cert_builder = set_not_valid_after(cert_builder, self.notAfter)
cert_builder = cert_builder.public_key(self.privatekey.public_key())
has_ski = False
for extension in self.csr.extensions:
@@ -154,8 +158,8 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend):
if self.cert is None:
self.cert = self.existing_certificate
result.update({
- 'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
- 'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
+ 'notBefore': get_not_valid_before(self.cert).strftime("%Y%m%d%H%M%SZ"),
+ 'notAfter': get_not_valid_after(self.cert).strftime("%Y%m%d%H%M%SZ"),
'serial_number': cryptography_serial_number_of_cert(self.cert),
})
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/pem.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/pem.py
index da46548c7..5e6571fe0 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/pem.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/pem.py
@@ -9,6 +9,7 @@ __metaclass__ = type
PEM_START = '-----BEGIN '
+PEM_END_START = '-----END '
PEM_END = '-----'
PKCS8_PRIVATEKEY_NAMES = ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY')
PKCS1_PRIVATEKEY_SUFFIX = ' PRIVATE KEY'
@@ -77,3 +78,31 @@ def extract_first_pem(text):
if not all_pems:
return None
return all_pems[0]
+
+
+def _extract_type(line, start=PEM_START):
+ if not line.startswith(start):
+ return None
+ if not line.endswith(PEM_END):
+ return None
+ return line[len(start):-len(PEM_END)]
+
+
+def extract_pem(content, strict=False):
+ lines = content.splitlines()
+ if len(lines) < 3:
+ raise ValueError('PEM must have at least 3 lines, have only {count}'.format(count=len(lines)))
+ header_type = _extract_type(lines[0])
+ if header_type is None:
+ raise ValueError('First line is not of format {start}...{end}: {line!r}'.format(start=PEM_START, end=PEM_END, line=lines[0]))
+ footer_type = _extract_type(lines[-1], start=PEM_END_START)
+ if strict:
+ if header_type != footer_type:
+ raise ValueError('Header type ({header}) is different from footer type ({footer})'.format(header=header_type, footer=footer_type))
+ for idx, line in enumerate(lines[1:-2]):
+ if len(line) != 64:
+ raise ValueError('Line {idx} has length {len} instead of 64'.format(idx=idx, len=len(line)))
+ if not (0 < len(lines[-2]) <= 64):
+ raise ValueError('Last line has length {len}, should be in (0, 64]'.format(len=len(lines[-2])))
+ content = lines[1:-1]
+ return header_type, ''.join(content)
diff --git a/ansible_collections/community/crypto/plugins/module_utils/crypto/support.py b/ansible_collections/community/crypto/plugins/module_utils/crypto/support.py
index 473246b1b..8b59a3b70 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/crypto/support.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/crypto/support.py
@@ -279,7 +279,19 @@ def parse_ordered_name_field(input_list, name_field_name):
return result
-def convert_relative_to_datetime(relative_time_string):
+def get_now_datetime(with_timezone):
+ if with_timezone:
+ return datetime.datetime.now(tz=datetime.timezone.utc)
+ return datetime.datetime.utcnow()
+
+
+def ensure_utc_timezone(timestamp):
+ if timestamp.tzinfo is not None:
+ return timestamp
+ return timestamp.astimezone(datetime.timezone.utc)
+
+
+def convert_relative_to_datetime(relative_time_string, with_timezone=False):
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
parsed_result = re.match(
@@ -304,13 +316,14 @@ def convert_relative_to_datetime(relative_time_string):
offset += datetime.timedelta(
seconds=int(parsed_result.group("seconds")))
+ now = get_now_datetime(with_timezone=with_timezone)
if parsed_result.group("prefix") == "+":
- return datetime.datetime.utcnow() + offset
+ return now + offset
else:
- return datetime.datetime.utcnow() - offset
+ return now - offset
-def get_relative_time_option(input_string, input_name, backend='cryptography'):
+def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False):
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
string is provided.
@@ -323,7 +336,7 @@ def get_relative_time_option(input_string, input_name, backend='cryptography'):
input_string, input_name)
# Relative time
if result.startswith("+") or result.startswith("-"):
- result_datetime = convert_relative_to_datetime(result)
+ result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone)
if backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif backend == 'cryptography':
@@ -332,9 +345,13 @@ def get_relative_time_option(input_string, input_name, backend='cryptography'):
if backend == 'cryptography':
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
- return datetime.datetime.strptime(result, date_fmt)
+ res = datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
+ else:
+ if with_timezone:
+ res = res.astimezone(datetime.timezone.utc)
+ return res
raise OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %
diff --git a/ansible_collections/community/crypto/plugins/module_utils/openssh/certificate.py b/ansible_collections/community/crypto/plugins/module_utils/openssh/certificate.py
index 54d1b1ec5..f59766651 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/openssh/certificate.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/openssh/certificate.py
@@ -22,7 +22,9 @@ __metaclass__ = type
import abc
import binascii
+import datetime as _datetime
import os
+import sys
from base64 import b64encode
from datetime import datetime
from hashlib import sha256
@@ -61,8 +63,17 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
b'nistp521': 'ecdsa-nistp521',
}
-_ALWAYS = datetime(1970, 1, 1)
-_FOREVER = datetime.max
+_USE_TIMEZONE = sys.version_info >= (3, 6)
+
+
+def _ensure_utc_timezone_if_use_timezone(value):
+ if not _USE_TIMEZONE or value.tzinfo is not None:
+ return value
+ return value.astimezone(_datetime.timezone.utc)
+
+
+_ALWAYS = _ensure_utc_timezone_if_use_timezone(datetime(1970, 1, 1))
+_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _datetime.timezone.utc) if _USE_TIMEZONE else datetime.max
_CRITICAL_OPTIONS = (
'force-command',
@@ -136,7 +147,7 @@ class OpensshCertificateTimeParameters(object):
elif dt == _FOREVER:
result = 'forever'
else:
- result = dt.isoformat() if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S")
+ result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S")
elif date_format == 'timestamp':
td = dt - _ALWAYS
result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
@@ -167,7 +178,10 @@ class OpensshCertificateTimeParameters(object):
result = _FOREVER
else:
try:
- result = datetime.utcfromtimestamp(timestamp)
+ if _USE_TIMEZONE:
+ result = datetime.fromtimestamp(timestamp, tz=_datetime.timezone.utc)
+ else:
+ result = datetime.utcfromtimestamp(timestamp)
except OverflowError as e:
raise ValueError
return result
@@ -180,11 +194,11 @@ class OpensshCertificateTimeParameters(object):
elif time_string == 'forever':
result = _FOREVER
elif is_relative_time_string(time_string):
- result = convert_relative_to_datetime(time_string)
+ result = convert_relative_to_datetime(time_string, with_timezone=_USE_TIMEZONE)
else:
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
- result = datetime.strptime(time_string, time_format)
+ result = _ensure_utc_timezone_if_use_timezone(datetime.strptime(time_string, time_format))
except ValueError:
pass
if result is None:
diff --git a/ansible_collections/community/crypto/plugins/modules/acme_certificate.py b/ansible_collections/community/crypto/plugins/modules/acme_certificate.py
index 9c0b349c4..21a6d6ae9 100644
--- a/ansible_collections/community/crypto/plugins/modules/acme_certificate.py
+++ b/ansible_collections/community/crypto/plugins/modules/acme_certificate.py
@@ -661,7 +661,7 @@ class ACMECertificateClient(object):
raise ModuleFailException("CSR %s not found" % (self.csr))
# Extract list of identifiers from CSR
- self.identifiers = self.client.backend.get_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)
+ self.identifiers = self.client.backend.get_ordered_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)
def is_first_step(self):
'''
diff --git a/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py b/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py
index 9740cd16d..48b65f998 100644
--- a/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py
+++ b/ansible_collections/community/crypto/plugins/modules/acme_challenge_cert_helper.py
@@ -165,6 +165,16 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import (
read_file,
)
+from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
+ get_now_datetime,
+)
+
+from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
+ set_not_valid_after,
+ set_not_valid_before,
+)
+
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
@@ -244,8 +254,9 @@ def main():
domain = to_text(challenge_data['resource'])
identifier_type, identifier = to_text(challenge_data.get('resource_original', 'dns:' + challenge_data['resource'])).split(':', 1)
subject = issuer = cryptography.x509.Name([])
- not_valid_before = datetime.datetime.utcnow()
- not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=10)
+ now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
+ not_valid_before = now
+ not_valid_after = now + datetime.timedelta(days=10)
if identifier_type == 'dns':
san = cryptography.x509.DNSName(identifier)
elif identifier_type == 'ip':
@@ -254,7 +265,7 @@ def main():
raise ModuleFailException('Unsupported identifier type "{0}"'.format(identifier_type))
# Generate regular self-signed certificate
- regular_certificate = cryptography.x509.CertificateBuilder().subject_name(
+ cert_builder = cryptography.x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
@@ -262,14 +273,13 @@ def main():
private_key.public_key()
).serial_number(
cryptography.x509.random_serial_number()
- ).not_valid_before(
- not_valid_before
- ).not_valid_after(
- not_valid_after
).add_extension(
cryptography.x509.SubjectAlternativeName([san]),
critical=False,
- ).sign(
+ )
+ cert_builder = set_not_valid_before(cert_builder, not_valid_before)
+ cert_builder = set_not_valid_after(cert_builder, not_valid_after)
+ regular_certificate = cert_builder.sign(
private_key,
cryptography.hazmat.primitives.hashes.SHA256(),
_cryptography_backend
@@ -278,7 +288,7 @@ def main():
# Process challenge
if challenge == 'tls-alpn-01':
value = base64.b64decode(challenge_data['resource_value'])
- challenge_certificate = cryptography.x509.CertificateBuilder().subject_name(
+ cert_builder = cryptography.x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
@@ -286,10 +296,6 @@ def main():
private_key.public_key()
).serial_number(
cryptography.x509.random_serial_number()
- ).not_valid_before(
- not_valid_before
- ).not_valid_after(
- not_valid_after
).add_extension(
cryptography.x509.SubjectAlternativeName([san]),
critical=False,
@@ -299,7 +305,10 @@ def main():
encode_octet_string(value),
),
critical=True,
- ).sign(
+ )
+ cert_builder = set_not_valid_before(cert_builder, not_valid_before)
+ cert_builder = set_not_valid_after(cert_builder, not_valid_after)
+ challenge_certificate = cert_builder.sign(
private_key,
cryptography.hazmat.primitives.hashes.SHA256(),
_cryptography_backend
diff --git a/ansible_collections/community/crypto/plugins/modules/get_certificate.py b/ansible_collections/community/crypto/plugins/modules/get_certificate.py
index 0f8abc90a..6ae9439d3 100644
--- a/ansible_collections/community/crypto/plugins/modules/get_certificate.py
+++ b/ansible_collections/community/crypto/plugins/modules/get_certificate.py
@@ -209,7 +209,6 @@ EXAMPLES = '''
import atexit
import base64
-import datetime
import traceback
from os.path import isfile
@@ -221,9 +220,16 @@ from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
+from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
+ get_now_datetime,
+)
+
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
cryptography_oid_to_name,
cryptography_get_extensions_from_cert,
+ get_not_valid_after,
+ get_not_valid_before,
)
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
@@ -392,7 +398,7 @@ def main():
for attribute in x509.subject:
result['subject'][cryptography_oid_to_name(attribute.oid, short=True)] = attribute.value
- result['expired'] = x509.not_valid_after < datetime.datetime.utcnow()
+ result['expired'] = get_not_valid_after(x509) < get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
result['extensions'] = []
for dotted_number, entry in cryptography_get_extensions_from_cert(x509).items():
@@ -410,8 +416,8 @@ def main():
for attribute in x509.issuer:
result['issuer'][cryptography_oid_to_name(attribute.oid, short=True)] = attribute.value
- result['not_after'] = x509.not_valid_after.strftime('%Y%m%d%H%M%SZ')
- result['not_before'] = x509.not_valid_before.strftime('%Y%m%d%H%M%SZ')
+ result['not_after'] = get_not_valid_after(x509).strftime('%Y%m%d%H%M%SZ')
+ result['not_before'] = get_not_valid_before(x509).strftime('%Y%m%d%H%M%SZ')
result['serial_number'] = x509.serial_number
result['signature_algorithm'] = cryptography_oid_to_name(x509.signature_algorithm_oid)
diff --git a/ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py b/ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py
new file mode 100644
index 000000000..d3e39dc11
--- /dev/null
+++ b/ansible_collections/community/crypto/plugins/modules/x509_certificate_convert.py
@@ -0,0 +1,280 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024, Felix Fontein <felix@fontein.de>
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+DOCUMENTATION = r'''
+---
+module: x509_certificate_convert
+short_description: Convert X.509 certificates
+version_added: 2.19.0
+description:
+ - This module allows to convert X.509 certificates between different formats.
+author:
+ - Felix Fontein (@felixfontein)
+extends_documentation_fragment:
+ - ansible.builtin.files
+ - community.crypto.attributes
+ - community.crypto.attributes.files
+attributes:
+ check_mode:
+ support: full
+ diff_mode:
+ support: none
+ safe_file_operations:
+ support: full
+options:
+ src_path:
+ description:
+ - Name of the file containing the X.509 certificate to convert.
+ - Exactly one of O(src_path) or O(src_content) must be specified.
+ type: path
+ src_content:
+ description:
+ - The content of the file containing the X.509 certificate to convert.
+ - This must be text. If you are not sure that the input file is PEM, you must Base64 encode
+ the value and set O(src_content_base64=true). You can use the
+ P(ansible.builtin.b64encode#filter) filter plugin for this.
+ - Exactly one of O(src_path) or O(src_content) must be specified.
+ type: str
+ src_content_base64:
+ description:
+ - If set to V(true) when O(src_content) is provided, the module assumes that the value
+ of O(src_content) is Base64 encoded.
+ type: bool
+ default: false
+ format:
+ description:
+ - Determines which format the destination X.509 certificate should be written in.
+ - Please note that not every key can be exported in any format, and that not every
+ format supports encryption.
+ type: str
+ choices:
+ - pem
+ - der
+ required: true
+ strict:
+ description:
+ - If the input is a PEM file, ensure that it contains a single PEM object, that
+ the header and footer match, and are of type C(CERTIFICATE) or C(X509 CERTIFICATE).
+ type: bool
+ default: false
+ dest_path:
+ description:
+ - Name of the file in which the generated TLS/SSL X.509 certificate will be written.
+ type: path
+ required: true
+ backup:
+ description:
+ - Create a backup file including a timestamp so you can get
+ the original X.509 certificate back if you overwrote it with a new one by accident.
+ type: bool
+ default: false
+seealso:
+ - plugin: ansible.builtin.b64encode
+ plugin_type: filter
+ - module: community.crypto.x509_certificate
+ - module: community.crypto.x509_certificate_pipe
+ - module: community.crypto.x509_certificate_info
+'''
+
+EXAMPLES = r'''
+- name: Convert PEM X.509 certificate to DER format
+ community.crypto.x509_certificate_convert:
+ src_path: /etc/ssl/cert/ansible.com.pem
+ dest_path: /etc/ssl/cert/ansible.com.der
+ format: der
+'''
+
+RETURN = r'''
+backup_file:
+ description: Name of backup file created.
+ returned: changed and if O(backup) is V(true)
+ type: str
+ sample: /path/to/cert.pem.2019-03-09@11:22~
+'''
+
+import base64
+import os
+
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.common.text.converters import to_native, to_bytes, to_text
+
+from ansible_collections.community.crypto.plugins.module_utils.io import (
+ load_file_if_exists,
+ write_file,
+)
+
+from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
+ OpenSSLObjectError,
+)
+
+from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
+ PEM_START,
+ PEM_END_START,
+ PEM_END,
+ identify_pem_format,
+ split_pem_list,
+ extract_pem,
+)
+
+from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
+ OpenSSLObject,
+)
+
+
+def parse_certificate(input, strict=False):
+ input_format = 'pem' if identify_pem_format(input) else 'der'
+ if input_format == 'pem':
+ pems = split_pem_list(to_text(input))
+ if len(pems) > 1 and strict:
+ raise ValueError('The input contains {count} PEM objects, expecting only one since strict=true'.format(count=len(pems)))
+ pem_header_type, content = extract_pem(pems[0], strict=strict)
+ if strict and pem_header_type not in ('CERTIFICATE', 'X509 CERTIFICATE'):
+ raise ValueError('type is {type!r}, expecting CERTIFICATE or X509 CERTIFICATE'.format(type=pem_header_type))
+ input = base64.b64decode(content)
+ else:
+ pem_header_type = None
+ return input, input_format, pem_header_type
+
+
+class X509CertificateConvertModule(OpenSSLObject):
+ def __init__(self, module):
+ super(X509CertificateConvertModule, self).__init__(
+ module.params['dest_path'],
+ 'present',
+ False,
+ module.check_mode,
+ )
+
+ self.src_path = module.params['src_path']
+ self.src_content = module.params['src_content']
+ self.src_content_base64 = module.params['src_content_base64']
+ if self.src_content is not None:
+ self.input = to_bytes(self.src_content)
+ if self.src_content_base64:
+ try:
+ self.input = base64.b64decode(self.input)
+ except Exception as exc:
+ module.fail_json(msg='Cannot Base64 decode src_content: {exc}'.format(exc=exc))
+ else:
+ try:
+ with open(self.src_path, 'rb') as f:
+ self.input = f.read()
+ except Exception as exc:
+ module.fail_json(msg='Failure while reading file {fn}: {exc}'.format(fn=self.src_path, exc=exc))
+
+ self.format = module.params['format']
+ self.strict = module.params['strict']
+ self.wanted_pem_type = 'CERTIFICATE'
+
+ try:
+ self.input, self.input_format, dummy = parse_certificate(self.input, strict=self.strict)
+ except Exception as exc:
+ module.fail_json(msg='Error while parsing PEM: {exc}'.format(exc=exc))
+
+ self.backup = module.params['backup']
+ self.backup_file = None
+
+ module.params['path'] = self.path
+
+ self.dest_content = load_file_if_exists(self.path, module)
+ self.dest_content_format = None
+ self.dest_content_pem_type = None
+ if self.dest_content is not None:
+ try:
+ self.dest_content, self.dest_content_format, self.dest_content_pem_type = parse_certificate(
+ self.dest_content, strict=True)
+ except Exception:
+ pass
+
+ def needs_conversion(self):
+ if self.dest_content is None or self.dest_content_format is None:
+ return True
+ if self.dest_content_format != self.format:
+ return True
+ if self.input != self.dest_content:
+ return True
+ if self.format == 'pem' and self.dest_content_pem_type != self.wanted_pem_type:
+ return True
+ return False
+
+ def get_dest_certificate(self):
+ if self.format == 'der':
+ return self.input
+ data = to_bytes(base64.b64encode(self.input))
+ lines = [to_bytes('{0}{1}{2}'.format(PEM_START, self.wanted_pem_type, PEM_END))]
+ lines += [data[i:i + 64] for i in range(0, len(data), 64)]
+ lines.append(to_bytes('{0}{1}{2}\n'.format(PEM_END_START, self.wanted_pem_type, PEM_END)))
+ return b'\n'.join(lines)
+
+ def generate(self, module):
+ """Do conversion."""
+ if self.needs_conversion():
+ # Convert
+ cert_data = self.get_dest_certificate()
+ if not self.check_mode:
+ if self.backup:
+ self.backup_file = module.backup_local(self.path)
+ write_file(module, cert_data)
+ self.changed = True
+
+ file_args = module.load_file_common_arguments(module.params)
+ if module.check_file_absent_if_check_mode(file_args['path']):
+ self.changed = True
+ else:
+ self.changed = module.set_fs_attributes_if_different(file_args, self.changed)
+
+ def dump(self):
+ """Serialize the object into a dictionary."""
+ result = dict(
+ changed=self.changed,
+ )
+ if self.backup_file:
+ result['backup_file'] = self.backup_file
+
+ return result
+
+
+def main():
+ argument_spec = dict(
+ src_path=dict(type='path'),
+ src_content=dict(type='str'),
+ src_content_base64=dict(type='bool', default=False),
+ format=dict(type='str', required=True, choices=['pem', 'der']),
+ strict=dict(type='bool', default=False),
+ dest_path=dict(type='path', required=True),
+ backup=dict(type='bool', default=False),
+ )
+ module = AnsibleModule(
+ argument_spec,
+ supports_check_mode=True,
+ add_file_common_args=True,
+ required_one_of=[('src_path', 'src_content')],
+ mutually_exclusive=[('src_path', 'src_content')],
+ )
+
+ base_dir = os.path.dirname(module.params['dest_path']) or '.'
+ if not os.path.isdir(base_dir):
+ module.fail_json(
+ name=base_dir,
+ msg='The directory %s does not exist or the file is not a directory' % base_dir
+ )
+
+ try:
+ cert = X509CertificateConvertModule(module)
+ cert.generate(module)
+ result = cert.dump()
+ module.exit_json(**result)
+ except OpenSSLObjectError as exc:
+ module.fail_json(msg=to_native(exc))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py b/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py
index d89f610c5..8379937f7 100644
--- a/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py
+++ b/ansible_collections/community/crypto/plugins/modules/x509_certificate_info.py
@@ -410,6 +410,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
get_relative_time_option,
)
+from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
+)
+
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import (
select_backend,
)
@@ -451,7 +455,7 @@ def main():
module.fail_json(
msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v))
)
- valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k))
+ valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k), with_timezone=CRYPTOGRAPHY_TIMEZONE)
try:
result = module_backend.get_info(der_support_enabled=module.params['content'] is None)
diff --git a/ansible_collections/community/crypto/plugins/modules/x509_crl.py b/ansible_collections/community/crypto/plugins/modules/x509_crl.py
index 1ac97005a..527975b88 100644
--- a/ansible_collections/community/crypto/plugins/modules/x509_crl.py
+++ b/ansible_collections/community/crypto/plugins/modules/x509_crl.py
@@ -475,6 +475,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
+ CRYPTOGRAPHY_TIMEZONE,
cryptography_decode_name,
cryptography_get_name,
cryptography_key_needs_digest_for_signing,
@@ -484,11 +485,17 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import (
+ CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE,
REVOCATION_REASON_MAP,
TIMESTAMP_FORMAT,
cryptography_decode_revoked_certificate,
cryptography_dump_revoked,
cryptography_get_signature_algorithm_oid_from_crl,
+ get_next_update,
+ get_last_update,
+ set_next_update,
+ set_last_update,
+ set_revocation_date,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
@@ -560,8 +567,8 @@ class CRL(OpenSSLObject):
except (TypeError, ValueError) as exc:
module.fail_json(msg=to_native(exc))
- self.last_update = get_relative_time_option(module.params['last_update'], 'last_update')
- self.next_update = get_relative_time_option(module.params['next_update'], 'next_update')
+ self.last_update = get_relative_time_option(module.params['last_update'], 'last_update', with_timezone=CRYPTOGRAPHY_TIMEZONE)
+ self.next_update = get_relative_time_option(module.params['next_update'], 'next_update', with_timezone=CRYPTOGRAPHY_TIMEZONE)
self.digest = select_message_digest(module.params['digest'])
if self.digest is None:
@@ -607,7 +614,8 @@ class CRL(OpenSSLObject):
result['issuer_critical'] = rc['issuer_critical']
result['revocation_date'] = get_relative_time_option(
rc['revocation_date'],
- path_prefix + 'revocation_date'
+ path_prefix + 'revocation_date',
+ with_timezone=CRYPTOGRAPHY_TIMEZONE,
)
if rc['reason']:
result['reason'] = REVOCATION_REASON_MAP[rc['reason']]
@@ -615,7 +623,8 @@ class CRL(OpenSSLObject):
if rc['invalidity_date']:
result['invalidity_date'] = get_relative_time_option(
rc['invalidity_date'],
- path_prefix + 'invalidity_date'
+ path_prefix + 'invalidity_date',
+ with_timezone=CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE,
)
result['invalidity_date_critical'] = rc['invalidity_date_critical']
self.revoked_certificates.append(result)
@@ -731,9 +740,9 @@ class CRL(OpenSSLObject):
if self.crl is None:
return False
- if self.last_update != self.crl.last_update and not self.ignore_timestamps:
+ if self.last_update != get_last_update(self.crl) and not self.ignore_timestamps:
return False
- if self.next_update != self.crl.next_update and not self.ignore_timestamps:
+ if self.next_update != get_next_update(self.crl) and not self.ignore_timestamps:
return False
if cryptography_key_needs_digest_for_signing(self.privatekey):
if self.crl.signature_hash_algorithm is None or self.digest.name != self.crl.signature_hash_algorithm.name:
@@ -780,8 +789,8 @@ class CRL(OpenSSLObject):
except ValueError as e:
raise CRLError(e)
- crl = crl.last_update(self.last_update)
- crl = crl.next_update(self.next_update)
+ crl = set_last_update(crl, self.last_update)
+ crl = set_next_update(crl, self.next_update)
if self.update and self.crl:
new_entries = set([self._compress_entry(entry) for entry in self.revoked_certificates])
@@ -792,7 +801,7 @@ class CRL(OpenSSLObject):
for entry in self.revoked_certificates:
revoked_cert = RevokedCertificateBuilder()
revoked_cert = revoked_cert.serial_number(entry['serial_number'])
- revoked_cert = revoked_cert.revocation_date(entry['revocation_date'])
+ revoked_cert = set_revocation_date(revoked_cert, entry['revocation_date'])
if entry['issuer'] is not None:
revoked_cert = revoked_cert.add_extension(
x509.CertificateIssuer(entry['issuer']),
@@ -876,8 +885,8 @@ class CRL(OpenSSLObject):
for entry in self.revoked_certificates:
result['revoked_certificates'].append(cryptography_dump_revoked(entry, idn_rewrite=self.name_encoding))
elif self.crl:
- result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT)
- result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT)
+ result['last_update'] = get_last_update(self.crl).strftime(TIMESTAMP_FORMAT)
+ result['next_update'] = get_next_update(self.crl).strftime(TIMESTAMP_FORMAT)
result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl))
issuer = []
for attribute in self.crl.issuer: