diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /test/lib/ansible_test/_internal/ci/__init__.py | |
parent | Initial commit. (diff) | |
download | ansible-core-8a754e0858d922e955e71b253c139e071ecec432.tar.xz ansible-core-8a754e0858d922e955e71b253c139e071ecec432.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/lib/ansible_test/_internal/ci/__init__.py')
-rw-r--r-- | test/lib/ansible_test/_internal/ci/__init__.py | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/ci/__init__.py b/test/lib/ansible_test/_internal/ci/__init__.py new file mode 100644 index 0000000..97e41da --- /dev/null +++ b/test/lib/ansible_test/_internal/ci/__init__.py @@ -0,0 +1,214 @@ +"""Support code for CI environments.""" +from __future__ import annotations + +import abc +import base64 +import json +import os +import tempfile +import typing as t + +from ..encoding import ( + to_bytes, + to_text, +) + +from ..io import ( + read_text_file, + write_text_file, +) + +from ..config import ( + CommonConfig, + TestConfig, +) + +from ..util import ( + ApplicationError, + display, + get_subclasses, + import_plugins, + raw_command, + cache, +) + + +class ChangeDetectionNotSupported(ApplicationError): + """Exception for cases where change detection is not supported.""" + + +class CIProvider(metaclass=abc.ABCMeta): + """Base class for CI provider plugins.""" + priority = 500 + + @staticmethod + @abc.abstractmethod + def is_supported() -> bool: + """Return True if this provider is supported in the current running environment.""" + + @property + @abc.abstractmethod + def code(self) -> str: + """Return a unique code representing this provider.""" + + @property + @abc.abstractmethod + def name(self) -> str: + """Return descriptive name for this provider.""" + + @abc.abstractmethod + def generate_resource_prefix(self) -> str: + """Return a resource prefix specific to this CI provider.""" + + @abc.abstractmethod + def get_base_branch(self) -> str: + """Return the base branch or an empty string.""" + + @abc.abstractmethod + def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]: + """Initialize change detection.""" + + @abc.abstractmethod + def supports_core_ci_auth(self) -> bool: + """Return True if Ansible Core CI is supported.""" + + @abc.abstractmethod + def prepare_core_ci_auth(self) -> dict[str, t.Any]: + """Return authentication details for Ansible Core CI.""" + + @abc.abstractmethod + def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]: + """Return details about git in the current environment.""" + + +@cache +def get_ci_provider() -> CIProvider: + """Return a CI provider instance for the current environment.""" + provider = None + + import_plugins('ci') + + candidates = sorted(get_subclasses(CIProvider), key=lambda subclass: (subclass.priority, subclass.__name__)) + + for candidate in candidates: + if candidate.is_supported(): + provider = candidate() + break + + if provider.code: + display.info('Detected CI provider: %s' % provider.name) + + return provider + + +class AuthHelper(metaclass=abc.ABCMeta): + """Public key based authentication helper for Ansible Core CI.""" + def sign_request(self, request: dict[str, t.Any]) -> None: + """Sign the given auth request and make the public key available.""" + payload_bytes = to_bytes(json.dumps(request, sort_keys=True)) + signature_raw_bytes = self.sign_bytes(payload_bytes) + signature = to_text(base64.b64encode(signature_raw_bytes)) + + request.update(signature=signature) + + def initialize_private_key(self) -> str: + """ + Initialize and publish a new key pair (if needed) and return the private key. + The private key is cached across ansible-test invocations, so it is only generated and published once per CI job. + """ + path = os.path.expanduser('~/.ansible-core-ci-private.key') + + if os.path.exists(to_bytes(path)): + private_key_pem = read_text_file(path) + else: + private_key_pem = self.generate_private_key() + write_text_file(path, private_key_pem) + + return private_key_pem + + @abc.abstractmethod + def sign_bytes(self, payload_bytes: bytes) -> bytes: + """Sign the given payload and return the signature, initializing a new key pair if required.""" + + @abc.abstractmethod + def publish_public_key(self, public_key_pem: str) -> None: + """Publish the given public key.""" + + @abc.abstractmethod + def generate_private_key(self) -> str: + """Generate a new key pair, publishing the public key and returning the private key.""" + + +class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta): + """Cryptography based public key based authentication helper for Ansible Core CI.""" + def sign_bytes(self, payload_bytes: bytes) -> bytes: + """Sign the given payload and return the signature, initializing a new key pair if required.""" + # import cryptography here to avoid overhead and failures in environments which do not use/provide it + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import load_pem_private_key + + private_key_pem = self.initialize_private_key() + private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend()) + + assert isinstance(private_key, ec.EllipticCurvePrivateKey) + + signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256())) + + return signature_raw_bytes + + def generate_private_key(self) -> str: + """Generate a new key pair, publishing the public key and returning the private key.""" + # import cryptography here to avoid overhead and failures in environments which do not use/provide it + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ec + + private_key = ec.generate_private_key(ec.SECP384R1(), default_backend()) + public_key = private_key.public_key() + + private_key_pem = to_text(private_key.private_bytes( # type: ignore[attr-defined] # documented method, but missing from type stubs + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + )) + + public_key_pem = to_text(public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + )) + + self.publish_public_key(public_key_pem) + + return private_key_pem + + +class OpenSSLAuthHelper(AuthHelper, metaclass=abc.ABCMeta): + """OpenSSL based public key based authentication helper for Ansible Core CI.""" + def sign_bytes(self, payload_bytes: bytes) -> bytes: + """Sign the given payload and return the signature, initializing a new key pair if required.""" + private_key_pem = self.initialize_private_key() + + with tempfile.NamedTemporaryFile() as private_key_file: + private_key_file.write(to_bytes(private_key_pem)) + private_key_file.flush() + + with tempfile.NamedTemporaryFile() as payload_file: + payload_file.write(payload_bytes) + payload_file.flush() + + with tempfile.NamedTemporaryFile() as signature_file: + raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True) + signature_raw_bytes = signature_file.read() + + return signature_raw_bytes + + def generate_private_key(self) -> str: + """Generate a new key pair, publishing the public key and returning the private key.""" + private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0] + public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0] + + self.publish_public_key(public_key_pem) + + return private_key_pem |