summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/crypto/plugins/module_utils/acme/acme.py
blob: 74d0bc1eaba36eaf48cf667f4b58efc30e74d8c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# -*- coding: utf-8 -*-

# Copyright (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function
__metaclass__ = type


import copy
import datetime
import json
import locale
import time
import traceback

from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.urls import fetch_url
from ansible.module_utils.six import PY3

from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import (
    OpenSSLCLIBackend,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import (
    CryptographyBackend,
    CRYPTOGRAPHY_ERROR,
    CRYPTOGRAPHY_MINIMAL_VERSION,
    CRYPTOGRAPHY_VERSION,
    HAS_CURRENT_CRYPTOGRAPHY,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
    ACMEProtocolException,
    NetworkException,
    ModuleFailException,
    KeyParsingError,
    format_http_status,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
    nopad_b64,
)

try:
    import ipaddress  # noqa: F401, pylint: disable=unused-import
except ImportError:
    HAS_IPADDRESS = False
    IPADDRESS_IMPORT_ERROR = traceback.format_exc()
else:
    HAS_IPADDRESS = True
    IPADDRESS_IMPORT_ERROR = None


# -1 usually means connection problems
RETRY_STATUS_CODES = (-1, 408, 429, 503)

RETRY_COUNT = 10


def _decode_retry(module, response, info, retry_count):
    if info['status'] not in RETRY_STATUS_CODES:
        return False

    if retry_count >= RETRY_COUNT:
        raise ACMEProtocolException(
            module, msg='Giving up after {retry} retries'.format(retry=RETRY_COUNT), info=info, response=response)

    # 429 and 503 should have a Retry-After header (https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After)
    try:
        retry_after = min(max(1, int(info.get('retry-after'))), 60)
    except (TypeError, ValueError) as dummy:
        retry_after = 10
    module.log('Retrieved a %s HTTP status on %s, retrying in %s seconds' % (format_http_status(info['status']), info['url'], retry_after))

    time.sleep(retry_after)
    return True


def _assert_fetch_url_success(module, response, info, allow_redirect=False, allow_client_error=True, allow_server_error=True):
    if info['status'] < 0:
        raise NetworkException(msg="Failure downloading %s, %s" % (info['url'], info['msg']))

    if (300 <= info['status'] < 400 and not allow_redirect) or \
       (400 <= info['status'] < 500 and not allow_client_error) or \
       (info['status'] >= 500 and not allow_server_error):
        raise ACMEProtocolException(module, info=info, response=response)


def _is_failed(info, expected_status_codes=None):
    if info['status'] < 200 or info['status'] >= 400:
        return True
    if expected_status_codes is not None and info['status'] not in expected_status_codes:
        return True
    return False


class ACMEDirectory(object):
    '''
    The ACME server directory. Gives access to the available resources,
    and allows to obtain a Replay-Nonce. The acme_directory URL
    needs to support unauthenticated GET requests; ACME endpoints
    requiring authentication are not supported.
    https://tools.ietf.org/html/rfc8555#section-7.1.1
    '''

    def __init__(self, module, account):
        self.module = module
        self.directory_root = module.params['acme_directory']
        self.version = module.params['acme_version']

        self.directory, dummy = account.get_request(self.directory_root, get_only=True)

        self.request_timeout = module.params['request_timeout']

        # Check whether self.version matches what we expect
        if self.version == 1:
            for key in ('new-reg', 'new-authz', 'new-cert'):
                if key not in self.directory:
                    raise ModuleFailException("ACME directory does not seem to follow protocol ACME v1")
        if self.version == 2:
            for key in ('newNonce', 'newAccount', 'newOrder'):
                if key not in self.directory:
                    raise ModuleFailException("ACME directory does not seem to follow protocol ACME v2")
            # Make sure that 'meta' is always available
            if 'meta' not in self.directory:
                self.directory['meta'] = {}

    def __getitem__(self, key):
        return self.directory[key]

    def get_nonce(self, resource=None):
        url = self.directory_root if self.version == 1 else self.directory['newNonce']
        if resource is not None:
            url = resource
        retry_count = 0
        while True:
            response, info = fetch_url(self.module, url, method='HEAD', timeout=self.request_timeout)
            if _decode_retry(self.module, response, info, retry_count):
                retry_count += 1
                continue
            if info['status'] not in (200, 204):
                raise NetworkException("Failed to get replay-nonce, got status {0}".format(format_http_status(info['status'])))
            if 'replay-nonce' in info:
                return info['replay-nonce']
            self.module.log(
                'HEAD to {0} did return status {1}, but no replay-nonce header!'.format(url, format_http_status(info['status'])))
            if retry_count >= 5:
                raise ACMEProtocolException(
                    self.module, msg='Was not able to obtain nonce, giving up after 5 retries', info=info, response=response)
            retry_count += 1


class ACMEClient(object):
    '''
    ACME client object. Handles the authorized communication with the
    ACME server.
    '''

    def __init__(self, module, backend):
        # Set to true to enable logging of all signed requests
        self._debug = False

        self.module = module
        self.backend = backend
        self.version = module.params['acme_version']
        # account_key path and content are mutually exclusive
        self.account_key_file = module.params['account_key_src']
        self.account_key_content = module.params['account_key_content']
        self.account_key_passphrase = module.params['account_key_passphrase']

        # Grab account URI from module parameters.
        # Make sure empty string is treated as None.
        self.account_uri = module.params.get('account_uri') or None

        self.request_timeout = module.params['request_timeout']

        self.account_key_data = None
        self.account_jwk = None
        self.account_jws_header = None
        if self.account_key_file is not None or self.account_key_content is not None:
            try:
                self.account_key_data = self.parse_key(
                    key_file=self.account_key_file,
                    key_content=self.account_key_content,
                    passphrase=self.account_key_passphrase)
            except KeyParsingError as e:
                raise ModuleFailException("Error while parsing account key: {msg}".format(msg=e.msg))
            self.account_jwk = self.account_key_data['jwk']
            self.account_jws_header = {
                "alg": self.account_key_data['alg'],
                "jwk": self.account_jwk,
            }
            if self.account_uri:
                # Make sure self.account_jws_header is updated
                self.set_account_uri(self.account_uri)

        self.directory = ACMEDirectory(module, self)

    def set_account_uri(self, uri):
        '''
        Set account URI. For ACME v2, it needs to be used to sending signed
        requests.
        '''
        self.account_uri = uri
        if self.version != 1:
            self.account_jws_header.pop('jwk')
            self.account_jws_header['kid'] = self.account_uri

    def parse_key(self, key_file=None, key_content=None, passphrase=None):
        '''
        Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
        In case of an error, raises KeyParsingError.
        '''
        if key_file is None and key_content is None:
            raise AssertionError('One of key_file and key_content must be specified!')
        return self.backend.parse_key(key_file, key_content, passphrase=passphrase)

    def sign_request(self, protected, payload, key_data, encode_payload=True):
        '''
        Signs an ACME request.
        '''
        try:
            if payload is None:
                # POST-as-GET
                payload64 = ''
            else:
                # POST
                if encode_payload:
                    payload = self.module.jsonify(payload).encode('utf8')
                payload64 = nopad_b64(to_bytes(payload))
            protected64 = nopad_b64(self.module.jsonify(protected).encode('utf8'))
        except Exception as e:
            raise ModuleFailException("Failed to encode payload / headers as JSON: {0}".format(e))

        return self.backend.sign(payload64, protected64, key_data)

    def _log(self, msg, data=None):
        '''
        Write arguments to acme.log when logging is enabled.
        '''
        if self._debug:
            with open('acme.log', 'ab') as f:
                f.write('[{0}] {1}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%s'), msg).encode('utf-8'))
                if data is not None:
                    f.write('{0}\n\n'.format(json.dumps(data, indent=2, sort_keys=True)).encode('utf-8'))

    def send_signed_request(self, url, payload, key_data=None, jws_header=None, parse_json_result=True,
                            encode_payload=True, fail_on_error=True, error_msg=None, expected_status_codes=None):
        '''
        Sends a JWS signed HTTP POST request to the ACME server and returns
        the response as dictionary (if parse_json_result is True) or in raw form
        (if parse_json_result is False).
        https://tools.ietf.org/html/rfc8555#section-6.2

        If payload is None, a POST-as-GET is performed.
        (https://tools.ietf.org/html/rfc8555#section-6.3)
        '''
        key_data = key_data or self.account_key_data
        jws_header = jws_header or self.account_jws_header
        failed_tries = 0
        while True:
            protected = copy.deepcopy(jws_header)
            protected["nonce"] = self.directory.get_nonce()
            if self.version != 1:
                protected["url"] = url

            self._log('URL', url)
            self._log('protected', protected)
            self._log('payload', payload)
            data = self.sign_request(protected, payload, key_data, encode_payload=encode_payload)
            if self.version == 1:
                data["header"] = jws_header.copy()
                for k, v in protected.items():
                    dummy = data["header"].pop(k, None)
            self._log('signed request', data)
            data = self.module.jsonify(data)

            headers = {
                'Content-Type': 'application/jose+json',
            }
            resp, info = fetch_url(self.module, url, data=data, headers=headers, method='POST', timeout=self.request_timeout)
            if _decode_retry(self.module, resp, info, failed_tries):
                failed_tries += 1
                continue
            _assert_fetch_url_success(self.module, resp, info)
            result = {}

            try:
                # In Python 2, reading from a closed response yields a TypeError.
                # In Python 3, read() simply returns ''
                if PY3 and resp.closed:
                    raise TypeError
                content = resp.read()
            except (AttributeError, TypeError):
                content = info.pop('body', None)

            if content or not parse_json_result:
                if (parse_json_result and info['content-type'].startswith('application/json')) or 400 <= info['status'] < 600:
                    try:
                        decoded_result = self.module.from_json(content.decode('utf8'))
                        self._log('parsed result', decoded_result)
                        # In case of badNonce error, try again (up to 5 times)
                        # (https://tools.ietf.org/html/rfc8555#section-6.7)
                        if all((
                            400 <= info['status'] < 600,
                            decoded_result.get('type') == 'urn:ietf:params:acme:error:badNonce',
                            failed_tries <= 5,
                        )):
                            failed_tries += 1
                            continue
                        if parse_json_result:
                            result = decoded_result
                        else:
                            result = content
                    except ValueError:
                        raise NetworkException("Failed to parse the ACME response: {0} {1}".format(url, content))
                else:
                    result = content

            if fail_on_error and _is_failed(info, expected_status_codes=expected_status_codes):
                raise ACMEProtocolException(
                    self.module, msg=error_msg, info=info, content=content, content_json=result if parse_json_result else None)
            return result, info

    def get_request(self, uri, parse_json_result=True, headers=None, get_only=False,
                    fail_on_error=True, error_msg=None, expected_status_codes=None):
        '''
        Perform a GET-like request. Will try POST-as-GET for ACMEv2, with fallback
        to GET if server replies with a status code of 405.
        '''
        if not get_only and self.version != 1:
            # Try POST-as-GET
            content, info = self.send_signed_request(uri, None, parse_json_result=False, fail_on_error=False)
            if info['status'] == 405:
                # Instead, do unauthenticated GET
                get_only = True
        else:
            # Do unauthenticated GET
            get_only = True

        if get_only:
            # Perform unauthenticated GET
            retry_count = 0
            while True:
                resp, info = fetch_url(self.module, uri, method='GET', headers=headers, timeout=self.request_timeout)
                if not _decode_retry(self.module, resp, info, retry_count):
                    break
                retry_count += 1

            _assert_fetch_url_success(self.module, resp, info)

            try:
                # In Python 2, reading from a closed response yields a TypeError.
                # In Python 3, read() simply returns ''
                if PY3 and resp.closed:
                    raise TypeError
                content = resp.read()
            except (AttributeError, TypeError):
                content = info.pop('body', None)

        # Process result
        parsed_json_result = False
        if parse_json_result:
            result = {}
            if content:
                if info['content-type'].startswith('application/json'):
                    try:
                        result = self.module.from_json(content.decode('utf8'))
                        parsed_json_result = True
                    except ValueError:
                        raise NetworkException("Failed to parse the ACME response: {0} {1}".format(uri, content))
                else:
                    result = content
        else:
            result = content

        if fail_on_error and _is_failed(info, expected_status_codes=expected_status_codes):
            raise ACMEProtocolException(
                self.module, msg=error_msg, info=info, content=content, content_json=result if parsed_json_result else None)
        return result, info


def get_default_argspec():
    '''
    Provides default argument spec for the options documented in the acme doc fragment.
    '''
    return dict(
        account_key_src=dict(type='path', aliases=['account_key']),
        account_key_content=dict(type='str', no_log=True),
        account_key_passphrase=dict(type='str', no_log=True),
        account_uri=dict(type='str'),
        acme_directory=dict(type='str', required=True),
        acme_version=dict(type='int', required=True, choices=[1, 2]),
        validate_certs=dict(type='bool', default=True),
        select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
        request_timeout=dict(type='int', default=10),
    )


def create_backend(module, needs_acme_v2):
    if not HAS_IPADDRESS:
        module.fail_json(msg=missing_required_lib('ipaddress'), exception=IPADDRESS_IMPORT_ERROR)

    backend = module.params['select_crypto_backend']

    # Backend autodetect
    if backend == 'auto':
        backend = 'cryptography' if HAS_CURRENT_CRYPTOGRAPHY else 'openssl'

    # Create backend object
    if backend == 'cryptography':
        if CRYPTOGRAPHY_ERROR is not None:
            # Either we couldn't import cryptography at all, or there was an unexpected error
            if CRYPTOGRAPHY_VERSION is None:
                msg = missing_required_lib('cryptography')
            else:
                msg = 'Unexpected error while preparing cryptography: {0}'.format(CRYPTOGRAPHY_ERROR.splitlines()[-1])
            module.fail_json(msg=msg, exception=CRYPTOGRAPHY_ERROR)
        if not HAS_CURRENT_CRYPTOGRAPHY:
            # We succeeded importing cryptography, but its version is too old.
            module.fail_json(
                msg='Found cryptography, but only version {0}. {1}'.format(
                    CRYPTOGRAPHY_VERSION,
                    missing_required_lib('cryptography >= {0}'.format(CRYPTOGRAPHY_MINIMAL_VERSION))))
        module.debug('Using cryptography backend (library version {0})'.format(CRYPTOGRAPHY_VERSION))
        module_backend = CryptographyBackend(module)
    elif backend == 'openssl':
        module.debug('Using OpenSSL binary backend')
        module_backend = OpenSSLCLIBackend(module)
    else:
        module.fail_json(msg='Unknown crypto backend "{0}"!'.format(backend))

    # Check common module parameters
    if not module.params['validate_certs']:
        module.warn(
            'Disabling certificate validation for communications with ACME endpoint. '
            'This should only be done for testing against a local ACME server for '
            'development purposes, but *never* for production purposes.'
        )

    if needs_acme_v2 and module.params['acme_version'] < 2:
        module.fail_json(msg='The {0} module requires the ACME v2 protocol!'.format(module._name))

    if module.params['acme_version'] == 1:
        module.deprecate("The value 1 for 'acme_version' is deprecated. Please switch to ACME v2",
                         version='3.0.0', collection_name='community.crypto')

    # AnsibleModule() changes the locale, so change it back to C because we rely
    # on datetime.datetime.strptime() when parsing certificate dates.
    locale.setlocale(locale.LC_ALL, 'C')

    return module_backend