diff options
Diffstat (limited to 'ansible_collections/community/crypto/plugins/module_utils/acme/acme.py')
-rw-r--r-- | ansible_collections/community/crypto/plugins/module_utils/acme/acme.py | 91 |
1 files changed, 84 insertions, 7 deletions
diff --git a/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py b/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py index 74d0bc1ea..7f9b954c0 100644 --- a/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py +++ b/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py @@ -21,6 +21,8 @@ from ansible.module_utils.common.text.converters import to_bytes from ansible.module_utils.urls import fetch_url from ansible.module_utils.six import PY3 +from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec + from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import ( OpenSSLCLIBackend, ) @@ -42,7 +44,9 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor ) from ansible_collections.community.crypto.plugins.module_utils.acme.utils import ( + compute_cert_id, nopad_b64, + parse_retry_after, ) try: @@ -153,6 +157,9 @@ class ACMEDirectory(object): self.module, msg='Was not able to obtain nonce, giving up after 5 retries', info=info, response=response) retry_count += 1 + def has_renewal_info_endpoint(self): + return 'renewalInfo' in self.directory + class ACMEClient(object): ''' @@ -168,9 +175,9 @@ class ACMEClient(object): self.backend = backend self.version = module.params['acme_version'] # account_key path and content are mutually exclusive - self.account_key_file = module.params['account_key_src'] - self.account_key_content = module.params['account_key_content'] - self.account_key_passphrase = module.params['account_key_passphrase'] + self.account_key_file = module.params.get('account_key_src') + self.account_key_content = module.params.get('account_key_content') + self.account_key_passphrase = module.params.get('account_key_passphrase') # Grab account URI from module parameters. # Make sure empty string is treated as None. @@ -383,24 +390,94 @@ class ACMEClient(object): self.module, msg=error_msg, info=info, content=content, content_json=result if parsed_json_result else None) return result, info + def get_renewal_info( + self, + cert_id=None, + cert_info=None, + cert_filename=None, + cert_content=None, + include_retry_after=False, + retry_after_relative_with_timezone=True, + ): + if not self.directory.has_renewal_info_endpoint(): + raise ModuleFailException('The ACME endpoint does not support ACME Renewal Information retrieval') + + if cert_id is None: + cert_id = compute_cert_id(self.backend, cert_info=cert_info, cert_filename=cert_filename, cert_content=cert_content) + url = '{base}{cert_id}'.format(base=self.directory.directory['renewalInfo'], cert_id=cert_id) + + data, info = self.get_request(url, parse_json_result=True, fail_on_error=True, get_only=True) + + # Include Retry-After header if asked for + if include_retry_after and 'retry-after' in info: + try: + data['retryAfter'] = parse_retry_after( + info['retry-after'], + relative_with_timezone=retry_after_relative_with_timezone, + ) + except ValueError: + pass + return data + def get_default_argspec(): ''' Provides default argument spec for the options documented in the acme doc fragment. + + DEPRECATED: will be removed in community.crypto 3.0.0 ''' return dict( - account_key_src=dict(type='path', aliases=['account_key']), - account_key_content=dict(type='str', no_log=True), - account_key_passphrase=dict(type='str', no_log=True), - account_uri=dict(type='str'), acme_directory=dict(type='str', required=True), acme_version=dict(type='int', required=True, choices=[1, 2]), validate_certs=dict(type='bool', default=True), select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']), request_timeout=dict(type='int', default=10), + account_key_src=dict(type='path', aliases=['account_key']), + account_key_content=dict(type='str', no_log=True), + account_key_passphrase=dict(type='str', no_log=True), + account_uri=dict(type='str'), ) +def create_default_argspec( + with_account=True, + require_account_key=True, + with_certificate=False, +): + ''' + Provides default argument spec for the options documented in the acme doc fragment. + ''' + result = ArgumentSpec( + argument_spec=dict( + acme_directory=dict(type='str', required=True), + acme_version=dict(type='int', required=True, choices=[1, 2]), + validate_certs=dict(type='bool', default=True), + select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']), + request_timeout=dict(type='int', default=10), + ), + ) + if with_account: + result.update_argspec( + account_key_src=dict(type='path', aliases=['account_key']), + account_key_content=dict(type='str', no_log=True), + account_key_passphrase=dict(type='str', no_log=True), + account_uri=dict(type='str'), + ) + if require_account_key: + result.update(required_one_of=[['account_key_src', 'account_key_content']]) + result.update(mutually_exclusive=[['account_key_src', 'account_key_content']]) + if with_certificate: + result.update_argspec( + csr=dict(type='path'), + csr_content=dict(type='str'), + ) + result.update( + required_one_of=[['csr', 'csr_content']], + mutually_exclusive=[['csr', 'csr_content']], + ) + return result + + def create_backend(module, needs_acme_v2): if not HAS_IPADDRESS: module.fail_json(msg=missing_required_lib('ipaddress'), exception=IPADDRESS_IMPORT_ERROR) |