diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/ci')
-rw-r--r-- | test/lib/ansible_test/_internal/ci/__init__.py | 214 | ||||
-rw-r--r-- | test/lib/ansible_test/_internal/ci/azp.py | 262 | ||||
-rw-r--r-- | test/lib/ansible_test/_internal/ci/local.py | 212 |
3 files changed, 688 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/ci/__init__.py b/test/lib/ansible_test/_internal/ci/__init__.py new file mode 100644 index 0000000..97e41da --- /dev/null +++ b/test/lib/ansible_test/_internal/ci/__init__.py @@ -0,0 +1,214 @@ +"""Support code for CI environments.""" +from __future__ import annotations + +import abc +import base64 +import json +import os +import tempfile +import typing as t + +from ..encoding import ( + to_bytes, + to_text, +) + +from ..io import ( + read_text_file, + write_text_file, +) + +from ..config import ( + CommonConfig, + TestConfig, +) + +from ..util import ( + ApplicationError, + display, + get_subclasses, + import_plugins, + raw_command, + cache, +) + + +class ChangeDetectionNotSupported(ApplicationError): + """Exception for cases where change detection is not supported.""" + + +class CIProvider(metaclass=abc.ABCMeta): + """Base class for CI provider plugins.""" + priority = 500 + + @staticmethod + @abc.abstractmethod + def is_supported() -> bool: + """Return True if this provider is supported in the current running environment.""" + + @property + @abc.abstractmethod + def code(self) -> str: + """Return a unique code representing this provider.""" + + @property + @abc.abstractmethod + def name(self) -> str: + """Return descriptive name for this provider.""" + + @abc.abstractmethod + def generate_resource_prefix(self) -> str: + """Return a resource prefix specific to this CI provider.""" + + @abc.abstractmethod + def get_base_branch(self) -> str: + """Return the base branch or an empty string.""" + + @abc.abstractmethod + def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]: + """Initialize change detection.""" + + @abc.abstractmethod + def supports_core_ci_auth(self) -> bool: + """Return True if Ansible Core CI is supported.""" + + @abc.abstractmethod + def prepare_core_ci_auth(self) -> dict[str, t.Any]: + """Return authentication details for Ansible Core CI.""" + + @abc.abstractmethod + def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]: + """Return details about git in the current environment.""" + + +@cache +def get_ci_provider() -> CIProvider: + """Return a CI provider instance for the current environment.""" + provider = None + + import_plugins('ci') + + candidates = sorted(get_subclasses(CIProvider), key=lambda subclass: (subclass.priority, subclass.__name__)) + + for candidate in candidates: + if candidate.is_supported(): + provider = candidate() + break + + if provider.code: + display.info('Detected CI provider: %s' % provider.name) + + return provider + + +class AuthHelper(metaclass=abc.ABCMeta): + """Public key based authentication helper for Ansible Core CI.""" + def sign_request(self, request: dict[str, t.Any]) -> None: + """Sign the given auth request and make the public key available.""" + payload_bytes = to_bytes(json.dumps(request, sort_keys=True)) + signature_raw_bytes = self.sign_bytes(payload_bytes) + signature = to_text(base64.b64encode(signature_raw_bytes)) + + request.update(signature=signature) + + def initialize_private_key(self) -> str: + """ + Initialize and publish a new key pair (if needed) and return the private key. + The private key is cached across ansible-test invocations, so it is only generated and published once per CI job. + """ + path = os.path.expanduser('~/.ansible-core-ci-private.key') + + if os.path.exists(to_bytes(path)): + private_key_pem = read_text_file(path) + else: + private_key_pem = self.generate_private_key() + write_text_file(path, private_key_pem) + + return private_key_pem + + @abc.abstractmethod + def sign_bytes(self, payload_bytes: bytes) -> bytes: + """Sign the given payload and return the signature, initializing a new key pair if required.""" + + @abc.abstractmethod + def publish_public_key(self, public_key_pem: str) -> None: + """Publish the given public key.""" + + @abc.abstractmethod + def generate_private_key(self) -> str: + """Generate a new key pair, publishing the public key and returning the private key.""" + + +class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta): + """Cryptography based public key based authentication helper for Ansible Core CI.""" + def sign_bytes(self, payload_bytes: bytes) -> bytes: + """Sign the given payload and return the signature, initializing a new key pair if required.""" + # import cryptography here to avoid overhead and failures in environments which do not use/provide it + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import load_pem_private_key + + private_key_pem = self.initialize_private_key() + private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend()) + + assert isinstance(private_key, ec.EllipticCurvePrivateKey) + + signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256())) + + return signature_raw_bytes + + def generate_private_key(self) -> str: + """Generate a new key pair, publishing the public key and returning the private key.""" + # import cryptography here to avoid overhead and failures in environments which do not use/provide it + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ec + + private_key = ec.generate_private_key(ec.SECP384R1(), default_backend()) + public_key = private_key.public_key() + + private_key_pem = to_text(private_key.private_bytes( # type: ignore[attr-defined] # documented method, but missing from type stubs + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + )) + + public_key_pem = to_text(public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + )) + + self.publish_public_key(public_key_pem) + + return private_key_pem + + +class OpenSSLAuthHelper(AuthHelper, metaclass=abc.ABCMeta): + """OpenSSL based public key based authentication helper for Ansible Core CI.""" + def sign_bytes(self, payload_bytes: bytes) -> bytes: + """Sign the given payload and return the signature, initializing a new key pair if required.""" + private_key_pem = self.initialize_private_key() + + with tempfile.NamedTemporaryFile() as private_key_file: + private_key_file.write(to_bytes(private_key_pem)) + private_key_file.flush() + + with tempfile.NamedTemporaryFile() as payload_file: + payload_file.write(payload_bytes) + payload_file.flush() + + with tempfile.NamedTemporaryFile() as signature_file: + raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True) + signature_raw_bytes = signature_file.read() + + return signature_raw_bytes + + def generate_private_key(self) -> str: + """Generate a new key pair, publishing the public key and returning the private key.""" + private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0] + public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0] + + self.publish_public_key(public_key_pem) + + return private_key_pem diff --git a/test/lib/ansible_test/_internal/ci/azp.py b/test/lib/ansible_test/_internal/ci/azp.py new file mode 100644 index 0000000..9170dfe --- /dev/null +++ b/test/lib/ansible_test/_internal/ci/azp.py @@ -0,0 +1,262 @@ +"""Support code for working with Azure Pipelines.""" +from __future__ import annotations + +import os +import tempfile +import uuid +import typing as t +import urllib.parse + +from ..encoding import ( + to_bytes, +) + +from ..config import ( + CommonConfig, + TestConfig, +) + +from ..git import ( + Git, +) + +from ..http import ( + HttpClient, +) + +from ..util import ( + display, + MissingEnvironmentVariable, +) + +from . import ( + ChangeDetectionNotSupported, + CIProvider, + CryptographyAuthHelper, +) + +CODE = 'azp' + + +class AzurePipelines(CIProvider): + """CI provider implementation for Azure Pipelines.""" + def __init__(self) -> None: + self.auth = AzurePipelinesAuthHelper() + + @staticmethod + def is_supported() -> bool: + """Return True if this provider is supported in the current running environment.""" + return os.environ.get('SYSTEM_COLLECTIONURI', '').startswith('https://dev.azure.com/') + + @property + def code(self) -> str: + """Return a unique code representing this provider.""" + return CODE + + @property + def name(self) -> str: + """Return descriptive name for this provider.""" + return 'Azure Pipelines' + + def generate_resource_prefix(self) -> str: + """Return a resource prefix specific to this CI provider.""" + try: + prefix = 'azp-%s-%s-%s' % ( + os.environ['BUILD_BUILDID'], + os.environ['SYSTEM_JOBATTEMPT'], + os.environ['SYSTEM_JOBIDENTIFIER'], + ) + except KeyError as ex: + raise MissingEnvironmentVariable(name=ex.args[0]) + + return prefix + + def get_base_branch(self) -> str: + """Return the base branch or an empty string.""" + base_branch = os.environ.get('SYSTEM_PULLREQUEST_TARGETBRANCH') or os.environ.get('BUILD_SOURCEBRANCHNAME') + + if base_branch: + base_branch = 'origin/%s' % base_branch + + return base_branch or '' + + def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]: + """Initialize change detection.""" + result = AzurePipelinesChanges(args) + + if result.is_pr: + job_type = 'pull request' + else: + job_type = 'merge commit' + + display.info('Processing %s for branch %s commit %s' % (job_type, result.branch, result.commit)) + + if not args.metadata.changes: + args.metadata.populate_changes(result.diff) + + if result.paths is None: + # There are several likely causes of this: + # - First run on a new branch. + # - Too many pull requests passed since the last merge run passed. + display.warning('No successful commit found. All tests will be executed.') + + return result.paths + + def supports_core_ci_auth(self) -> bool: + """Return True if Ansible Core CI is supported.""" + return True + + def prepare_core_ci_auth(self) -> dict[str, t.Any]: + """Return authentication details for Ansible Core CI.""" + try: + request = dict( + org_name=os.environ['SYSTEM_COLLECTIONURI'].strip('/').split('/')[-1], + project_name=os.environ['SYSTEM_TEAMPROJECT'], + build_id=int(os.environ['BUILD_BUILDID']), + task_id=str(uuid.UUID(os.environ['SYSTEM_TASKINSTANCEID'])), + ) + except KeyError as ex: + raise MissingEnvironmentVariable(name=ex.args[0]) + + self.auth.sign_request(request) + + auth = dict( + azp=request, + ) + + return auth + + def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]: + """Return details about git in the current environment.""" + changes = AzurePipelinesChanges(args) + + details = dict( + base_commit=changes.base_commit, + commit=changes.commit, + ) + + return details + + +class AzurePipelinesAuthHelper(CryptographyAuthHelper): + """ + Authentication helper for Azure Pipelines. + Based on cryptography since it is provided by the default Azure Pipelines environment. + """ + def publish_public_key(self, public_key_pem: str) -> None: + """Publish the given public key.""" + try: + agent_temp_directory = os.environ['AGENT_TEMPDIRECTORY'] + except KeyError as ex: + raise MissingEnvironmentVariable(name=ex.args[0]) + + # the temporary file cannot be deleted because we do not know when the agent has processed it + # placing the file in the agent's temp directory allows it to be picked up when the job is running in a container + with tempfile.NamedTemporaryFile(prefix='public-key-', suffix='.pem', delete=False, dir=agent_temp_directory) as public_key_file: + public_key_file.write(to_bytes(public_key_pem)) + public_key_file.flush() + + # make the agent aware of the public key by declaring it as an attachment + vso_add_attachment('ansible-core-ci', 'public-key.pem', public_key_file.name) + + +class AzurePipelinesChanges: + """Change information for an Azure Pipelines build.""" + def __init__(self, args: CommonConfig) -> None: + self.args = args + self.git = Git() + + try: + self.org_uri = os.environ['SYSTEM_COLLECTIONURI'] # ex: https://dev.azure.com/{org}/ + self.project = os.environ['SYSTEM_TEAMPROJECT'] + self.repo_type = os.environ['BUILD_REPOSITORY_PROVIDER'] # ex: GitHub + self.source_branch = os.environ['BUILD_SOURCEBRANCH'] + self.source_branch_name = os.environ['BUILD_SOURCEBRANCHNAME'] + self.pr_branch_name = os.environ.get('SYSTEM_PULLREQUEST_TARGETBRANCH') + except KeyError as ex: + raise MissingEnvironmentVariable(name=ex.args[0]) + + if self.source_branch.startswith('refs/tags/'): + raise ChangeDetectionNotSupported('Change detection is not supported for tags.') + + self.org = self.org_uri.strip('/').split('/')[-1] + self.is_pr = self.pr_branch_name is not None + + if self.is_pr: + # HEAD is a merge commit of the PR branch into the target branch + # HEAD^1 is HEAD of the target branch (first parent of merge commit) + # HEAD^2 is HEAD of the PR branch (second parent of merge commit) + # see: https://git-scm.com/docs/gitrevisions + self.branch = self.pr_branch_name + self.base_commit = 'HEAD^1' + self.commit = 'HEAD^2' + else: + commits = self.get_successful_merge_run_commits() + + self.branch = self.source_branch_name + self.base_commit = self.get_last_successful_commit(commits) + self.commit = 'HEAD' + + self.commit = self.git.run_git(['rev-parse', self.commit]).strip() + + if self.base_commit: + self.base_commit = self.git.run_git(['rev-parse', self.base_commit]).strip() + + # <commit>...<commit> + # This form is to view the changes on the branch containing and up to the second <commit>, starting at a common ancestor of both <commit>. + # see: https://git-scm.com/docs/git-diff + dot_range = '%s...%s' % (self.base_commit, self.commit) + + self.paths = sorted(self.git.get_diff_names([dot_range])) + self.diff = self.git.get_diff([dot_range]) + else: + self.paths = None # act as though change detection not enabled, do not filter targets + self.diff = [] + + def get_successful_merge_run_commits(self) -> set[str]: + """Return a set of recent successsful merge commits from Azure Pipelines.""" + parameters = dict( + maxBuildsPerDefinition=100, # max 5000 + queryOrder='queueTimeDescending', # assumes under normal circumstances that later queued jobs are for later commits + resultFilter='succeeded', + reasonFilter='batchedCI', # may miss some non-PR reasons, the alternative is to filter the list after receiving it + repositoryType=self.repo_type, + repositoryId='%s/%s' % (self.org, self.project), + ) + + url = '%s%s/_apis/build/builds?api-version=6.0&%s' % (self.org_uri, self.project, urllib.parse.urlencode(parameters)) + + http = HttpClient(self.args, always=True) + response = http.get(url) + + # noinspection PyBroadException + try: + result = response.json() + except Exception: # pylint: disable=broad-except + # most likely due to a private project, which returns an HTTP 203 response with HTML + display.warning('Unable to find project. Cannot determine changes. All tests will be executed.') + return set() + + commits = set(build['sourceVersion'] for build in result['value']) + + return commits + + def get_last_successful_commit(self, commits: set[str]) -> t.Optional[str]: + """Return the last successful commit from git history that is found in the given commit list, or None.""" + commit_history = self.git.get_rev_list(max_count=100) + ordered_successful_commits = [commit for commit in commit_history if commit in commits] + last_successful_commit = ordered_successful_commits[0] if ordered_successful_commits else None + return last_successful_commit + + +def vso_add_attachment(file_type: str, file_name: str, path: str) -> None: + """Upload and attach a file to the current timeline record.""" + vso('task.addattachment', dict(type=file_type, name=file_name), path) + + +def vso(name: str, data: dict[str, str], message: str) -> None: + """ + Write a logging command for the Azure Pipelines agent to process. + See: https://docs.microsoft.com/en-us/azure/devops/pipelines/scripts/logging-commands?view=azure-devops&tabs=bash + """ + display.info('##vso[%s %s]%s' % (name, ';'.join('='.join((key, value)) for key, value in data.items()), message)) diff --git a/test/lib/ansible_test/_internal/ci/local.py b/test/lib/ansible_test/_internal/ci/local.py new file mode 100644 index 0000000..ec03194 --- /dev/null +++ b/test/lib/ansible_test/_internal/ci/local.py @@ -0,0 +1,212 @@ +"""Support code for working without a supported CI provider.""" +from __future__ import annotations + +import os +import platform +import random +import re +import typing as t + +from ..config import ( + CommonConfig, + TestConfig, +) + +from ..io import ( + read_text_file, +) + +from ..git import ( + Git, +) + +from ..util import ( + ApplicationError, + display, + is_binary_file, + SubprocessError, +) + +from . import ( + CIProvider, +) + +CODE = '' # not really a CI provider, so use an empty string for the code + + +class Local(CIProvider): + """CI provider implementation when not using CI.""" + priority = 1000 + + @staticmethod + def is_supported() -> bool: + """Return True if this provider is supported in the current running environment.""" + return True + + @property + def code(self) -> str: + """Return a unique code representing this provider.""" + return CODE + + @property + def name(self) -> str: + """Return descriptive name for this provider.""" + return 'Local' + + def generate_resource_prefix(self) -> str: + """Return a resource prefix specific to this CI provider.""" + prefix = 'ansible-test-%d-%s' % ( + random.randint(10000000, 99999999), + platform.node().split('.')[0], + ) + + return prefix + + def get_base_branch(self) -> str: + """Return the base branch or an empty string.""" + return '' + + def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]: + """Initialize change detection.""" + result = LocalChanges(args) + + display.info('Detected branch %s forked from %s at commit %s' % ( + result.current_branch, result.fork_branch, result.fork_point)) + + if result.untracked and not args.untracked: + display.warning('Ignored %s untracked file(s). Use --untracked to include them.' % + len(result.untracked)) + + if result.committed and not args.committed: + display.warning('Ignored %s committed change(s). Omit --ignore-committed to include them.' % + len(result.committed)) + + if result.staged and not args.staged: + display.warning('Ignored %s staged change(s). Omit --ignore-staged to include them.' % + len(result.staged)) + + if result.unstaged and not args.unstaged: + display.warning('Ignored %s unstaged change(s). Omit --ignore-unstaged to include them.' % + len(result.unstaged)) + + names = set() + + if args.tracked: + names |= set(result.tracked) + if args.untracked: + names |= set(result.untracked) + if args.committed: + names |= set(result.committed) + if args.staged: + names |= set(result.staged) + if args.unstaged: + names |= set(result.unstaged) + + if not args.metadata.changes: + args.metadata.populate_changes(result.diff) + + for path in result.untracked: + if is_binary_file(path): + args.metadata.changes[path] = ((0, 0),) + continue + + line_count = len(read_text_file(path).splitlines()) + + args.metadata.changes[path] = ((1, line_count),) + + return sorted(names) + + def supports_core_ci_auth(self) -> bool: + """Return True if Ansible Core CI is supported.""" + path = self._get_aci_key_path() + return os.path.exists(path) + + def prepare_core_ci_auth(self) -> dict[str, t.Any]: + """Return authentication details for Ansible Core CI.""" + path = self._get_aci_key_path() + auth_key = read_text_file(path).strip() + + request = dict( + key=auth_key, + nonce=None, + ) + + auth = dict( + remote=request, + ) + + return auth + + def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]: + """Return details about git in the current environment.""" + return None # not yet implemented for local + + @staticmethod + def _get_aci_key_path() -> str: + path = os.path.expanduser('~/.ansible-core-ci.key') + return path + + +class InvalidBranch(ApplicationError): + """Exception for invalid branch specification.""" + def __init__(self, branch: str, reason: str) -> None: + message = 'Invalid branch: %s\n%s' % (branch, reason) + + super().__init__(message) + + self.branch = branch + + +class LocalChanges: + """Change information for local work.""" + def __init__(self, args: TestConfig) -> None: + self.args = args + self.git = Git() + + self.current_branch = self.git.get_branch() + + if self.is_official_branch(self.current_branch): + raise InvalidBranch(branch=self.current_branch, + reason='Current branch is not a feature branch.') + + self.fork_branch = None + self.fork_point = None + + self.local_branches = sorted(self.git.get_branches()) + self.official_branches = sorted([b for b in self.local_branches if self.is_official_branch(b)]) + + for self.fork_branch in self.official_branches: + try: + self.fork_point = self.git.get_branch_fork_point(self.fork_branch) + break + except SubprocessError: + pass + + if self.fork_point is None: + raise ApplicationError('Unable to auto-detect fork branch and fork point.') + + # tracked files (including unchanged) + self.tracked = sorted(self.git.get_file_names(['--cached'])) + # untracked files (except ignored) + self.untracked = sorted(self.git.get_file_names(['--others', '--exclude-standard'])) + # tracked changes (including deletions) committed since the branch was forked + self.committed = sorted(self.git.get_diff_names([self.fork_point, 'HEAD'])) + # tracked changes (including deletions) which are staged + self.staged = sorted(self.git.get_diff_names(['--cached'])) + # tracked changes (including deletions) which are not staged + self.unstaged = sorted(self.git.get_diff_names([])) + # diff of all tracked files from fork point to working copy + self.diff = self.git.get_diff([self.fork_point]) + + def is_official_branch(self, name: str) -> bool: + """Return True if the given branch name an official branch for development or releases.""" + if self.args.base_branch: + return name == self.args.base_branch + + if name == 'devel': + return True + + if re.match(r'^stable-[0-9]+\.[0-9]+$', name): + return True + + return False |