diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-14 20:03:01 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-14 20:03:01 +0000 |
commit | a453ac31f3428614cceb99027f8efbdb9258a40b (patch) | |
tree | f61f87408f32a8511cbd91799f9cececb53e0374 /test/units/ansible_test/ci | |
parent | Initial commit. (diff) | |
download | ansible-a453ac31f3428614cceb99027f8efbdb9258a40b.tar.xz ansible-a453ac31f3428614cceb99027f8efbdb9258a40b.zip |
Adding upstream version 2.10.7+merged+base+2.10.8+dfsg.upstream/2.10.7+merged+base+2.10.8+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/units/ansible_test/ci')
-rw-r--r-- | test/units/ansible_test/ci/__init__.py | 0 | ||||
-rw-r--r-- | test/units/ansible_test/ci/test_azp.py | 31 | ||||
-rw-r--r-- | test/units/ansible_test/ci/test_shippable.py | 31 | ||||
-rw-r--r-- | test/units/ansible_test/ci/util.py | 53 |
4 files changed, 115 insertions, 0 deletions
diff --git a/test/units/ansible_test/ci/__init__.py b/test/units/ansible_test/ci/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/ansible_test/ci/__init__.py diff --git a/test/units/ansible_test/ci/test_azp.py b/test/units/ansible_test/ci/test_azp.py new file mode 100644 index 00000000..69c4fa49 --- /dev/null +++ b/test/units/ansible_test/ci/test_azp.py @@ -0,0 +1,31 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from .util import common_auth_test + + +def test_auth(): + # noinspection PyProtectedMember + from ansible_test._internal.ci.azp import ( + AzurePipelinesAuthHelper, + ) + + class TestAzurePipelinesAuthHelper(AzurePipelinesAuthHelper): + def __init__(self): + self.public_key_pem = None + self.private_key_pem = None + + def publish_public_key(self, public_key_pem): + # avoid publishing key + self.public_key_pem = public_key_pem + + def initialize_private_key(self): + # cache in memory instead of on disk + if not self.private_key_pem: + self.private_key_pem = self.generate_private_key() + + return self.private_key_pem + + auth = TestAzurePipelinesAuthHelper() + + common_auth_test(auth) diff --git a/test/units/ansible_test/ci/test_shippable.py b/test/units/ansible_test/ci/test_shippable.py new file mode 100644 index 00000000..08b276c7 --- /dev/null +++ b/test/units/ansible_test/ci/test_shippable.py @@ -0,0 +1,31 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from .util import common_auth_test + + +def test_auth(): + # noinspection PyProtectedMember + from ansible_test._internal.ci.shippable import ( + ShippableAuthHelper, + ) + + class TestShippableAuthHelper(ShippableAuthHelper): + def __init__(self): + self.public_key_pem = None + self.private_key_pem = None + + def publish_public_key(self, public_key_pem): + # avoid publishing key + self.public_key_pem = public_key_pem + + def initialize_private_key(self): + # cache in memory instead of on disk + if not self.private_key_pem: + self.private_key_pem = self.generate_private_key() + + return self.private_key_pem + + auth = TestShippableAuthHelper() + + common_auth_test(auth) diff --git a/test/units/ansible_test/ci/util.py b/test/units/ansible_test/ci/util.py new file mode 100644 index 00000000..ba8e358b --- /dev/null +++ b/test/units/ansible_test/ci/util.py @@ -0,0 +1,53 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import base64 +import json +import re + + +def common_auth_test(auth): + private_key_pem = auth.initialize_private_key() + public_key_pem = auth.public_key_pem + + extract_pem_key(private_key_pem, private=True) + extract_pem_key(public_key_pem, private=False) + + request = dict(hello='World') + auth.sign_request(request) + + verify_signature(request, public_key_pem) + + +def extract_pem_key(value, private): + assert isinstance(value, type(u'')) + + key_type = '(EC )?PRIVATE' if private else 'PUBLIC' + pattern = r'^-----BEGIN ' + key_type + r' KEY-----\n(?P<key>.*?)\n-----END ' + key_type + r' KEY-----\n$' + match = re.search(pattern, value, flags=re.DOTALL) + + assert match, 'key "%s" does not match pattern "%s"' % (value, pattern) + + base64.b64decode(match.group('key')) # make sure the key can be decoded + + +def verify_signature(request, public_key_pem): + signature = request.pop('signature') + payload_bytes = json.dumps(request, sort_keys=True).encode() + + assert isinstance(signature, type(u'')) + + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import load_pem_public_key + + public_key = load_pem_public_key(public_key_pem.encode(), default_backend()) + + verifier = public_key.verifier( + base64.b64decode(signature.encode()), + ec.ECDSA(hashes.SHA256()), + ) + + verifier.update(payload_bytes) + verifier.verify() |