summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/community/crypto/plugins/module_utils/acme/acme.py')
-rw-r--r--ansible_collections/community/crypto/plugins/module_utils/acme/acme.py91
1 files changed, 84 insertions, 7 deletions
diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py b/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py
index 74d0bc1ea..7f9b954c0 100644
--- a/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py
+++ b/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py
@@ -21,6 +21,8 @@ from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.urls import fetch_url
from ansible.module_utils.six import PY3
+from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec
+
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import (
OpenSSLCLIBackend,
)
@@ -42,7 +44,9 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
+ compute_cert_id,
nopad_b64,
+ parse_retry_after,
)
try:
@@ -153,6 +157,9 @@ class ACMEDirectory(object):
self.module, msg='Was not able to obtain nonce, giving up after 5 retries', info=info, response=response)
retry_count += 1
+ def has_renewal_info_endpoint(self):
+ return 'renewalInfo' in self.directory
+
class ACMEClient(object):
'''
@@ -168,9 +175,9 @@ class ACMEClient(object):
self.backend = backend
self.version = module.params['acme_version']
# account_key path and content are mutually exclusive
- self.account_key_file = module.params['account_key_src']
- self.account_key_content = module.params['account_key_content']
- self.account_key_passphrase = module.params['account_key_passphrase']
+ self.account_key_file = module.params.get('account_key_src')
+ self.account_key_content = module.params.get('account_key_content')
+ self.account_key_passphrase = module.params.get('account_key_passphrase')
# Grab account URI from module parameters.
# Make sure empty string is treated as None.
@@ -383,24 +390,94 @@ class ACMEClient(object):
self.module, msg=error_msg, info=info, content=content, content_json=result if parsed_json_result else None)
return result, info
+ def get_renewal_info(
+ self,
+ cert_id=None,
+ cert_info=None,
+ cert_filename=None,
+ cert_content=None,
+ include_retry_after=False,
+ retry_after_relative_with_timezone=True,
+ ):
+ if not self.directory.has_renewal_info_endpoint():
+ raise ModuleFailException('The ACME endpoint does not support ACME Renewal Information retrieval')
+
+ if cert_id is None:
+ cert_id = compute_cert_id(self.backend, cert_info=cert_info, cert_filename=cert_filename, cert_content=cert_content)
+ url = '{base}{cert_id}'.format(base=self.directory.directory['renewalInfo'], cert_id=cert_id)
+
+ data, info = self.get_request(url, parse_json_result=True, fail_on_error=True, get_only=True)
+
+ # Include Retry-After header if asked for
+ if include_retry_after and 'retry-after' in info:
+ try:
+ data['retryAfter'] = parse_retry_after(
+ info['retry-after'],
+ relative_with_timezone=retry_after_relative_with_timezone,
+ )
+ except ValueError:
+ pass
+ return data
+
def get_default_argspec():
'''
Provides default argument spec for the options documented in the acme doc fragment.
+
+ DEPRECATED: will be removed in community.crypto 3.0.0
'''
return dict(
- account_key_src=dict(type='path', aliases=['account_key']),
- account_key_content=dict(type='str', no_log=True),
- account_key_passphrase=dict(type='str', no_log=True),
- account_uri=dict(type='str'),
acme_directory=dict(type='str', required=True),
acme_version=dict(type='int', required=True, choices=[1, 2]),
validate_certs=dict(type='bool', default=True),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
request_timeout=dict(type='int', default=10),
+ account_key_src=dict(type='path', aliases=['account_key']),
+ account_key_content=dict(type='str', no_log=True),
+ account_key_passphrase=dict(type='str', no_log=True),
+ account_uri=dict(type='str'),
)
+def create_default_argspec(
+ with_account=True,
+ require_account_key=True,
+ with_certificate=False,
+):
+ '''
+ Provides default argument spec for the options documented in the acme doc fragment.
+ '''
+ result = ArgumentSpec(
+ argument_spec=dict(
+ acme_directory=dict(type='str', required=True),
+ acme_version=dict(type='int', required=True, choices=[1, 2]),
+ validate_certs=dict(type='bool', default=True),
+ select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
+ request_timeout=dict(type='int', default=10),
+ ),
+ )
+ if with_account:
+ result.update_argspec(
+ account_key_src=dict(type='path', aliases=['account_key']),
+ account_key_content=dict(type='str', no_log=True),
+ account_key_passphrase=dict(type='str', no_log=True),
+ account_uri=dict(type='str'),
+ )
+ if require_account_key:
+ result.update(required_one_of=[['account_key_src', 'account_key_content']])
+ result.update(mutually_exclusive=[['account_key_src', 'account_key_content']])
+ if with_certificate:
+ result.update_argspec(
+ csr=dict(type='path'),
+ csr_content=dict(type='str'),
+ )
+ result.update(
+ required_one_of=[['csr', 'csr_content']],
+ mutually_exclusive=[['csr', 'csr_content']],
+ )
+ return result
+
+
def create_backend(module, needs_acme_v2):
if not HAS_IPADDRESS:
module.fail_json(msg=missing_required_lib('ipaddress'), exception=IPADDRESS_IMPORT_ERROR)