summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/ci
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/ci')
-rw-r--r--test/lib/ansible_test/_internal/ci/__init__.py214
-rw-r--r--test/lib/ansible_test/_internal/ci/azp.py262
-rw-r--r--test/lib/ansible_test/_internal/ci/local.py212
3 files changed, 688 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/ci/__init__.py b/test/lib/ansible_test/_internal/ci/__init__.py
new file mode 100644
index 0000000..97e41da
--- /dev/null
+++ b/test/lib/ansible_test/_internal/ci/__init__.py
@@ -0,0 +1,214 @@
+"""Support code for CI environments."""
+from __future__ import annotations
+
+import abc
+import base64
+import json
+import os
+import tempfile
+import typing as t
+
+from ..encoding import (
+ to_bytes,
+ to_text,
+)
+
+from ..io import (
+ read_text_file,
+ write_text_file,
+)
+
+from ..config import (
+ CommonConfig,
+ TestConfig,
+)
+
+from ..util import (
+ ApplicationError,
+ display,
+ get_subclasses,
+ import_plugins,
+ raw_command,
+ cache,
+)
+
+
+class ChangeDetectionNotSupported(ApplicationError):
+ """Exception for cases where change detection is not supported."""
+
+
+class CIProvider(metaclass=abc.ABCMeta):
+ """Base class for CI provider plugins."""
+ priority = 500
+
+ @staticmethod
+ @abc.abstractmethod
+ def is_supported() -> bool:
+ """Return True if this provider is supported in the current running environment."""
+
+ @property
+ @abc.abstractmethod
+ def code(self) -> str:
+ """Return a unique code representing this provider."""
+
+ @property
+ @abc.abstractmethod
+ def name(self) -> str:
+ """Return descriptive name for this provider."""
+
+ @abc.abstractmethod
+ def generate_resource_prefix(self) -> str:
+ """Return a resource prefix specific to this CI provider."""
+
+ @abc.abstractmethod
+ def get_base_branch(self) -> str:
+ """Return the base branch or an empty string."""
+
+ @abc.abstractmethod
+ def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]:
+ """Initialize change detection."""
+
+ @abc.abstractmethod
+ def supports_core_ci_auth(self) -> bool:
+ """Return True if Ansible Core CI is supported."""
+
+ @abc.abstractmethod
+ def prepare_core_ci_auth(self) -> dict[str, t.Any]:
+ """Return authentication details for Ansible Core CI."""
+
+ @abc.abstractmethod
+ def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
+ """Return details about git in the current environment."""
+
+
+@cache
+def get_ci_provider() -> CIProvider:
+ """Return a CI provider instance for the current environment."""
+ provider = None
+
+ import_plugins('ci')
+
+ candidates = sorted(get_subclasses(CIProvider), key=lambda subclass: (subclass.priority, subclass.__name__))
+
+ for candidate in candidates:
+ if candidate.is_supported():
+ provider = candidate()
+ break
+
+ if provider.code:
+ display.info('Detected CI provider: %s' % provider.name)
+
+ return provider
+
+
+class AuthHelper(metaclass=abc.ABCMeta):
+ """Public key based authentication helper for Ansible Core CI."""
+ def sign_request(self, request: dict[str, t.Any]) -> None:
+ """Sign the given auth request and make the public key available."""
+ payload_bytes = to_bytes(json.dumps(request, sort_keys=True))
+ signature_raw_bytes = self.sign_bytes(payload_bytes)
+ signature = to_text(base64.b64encode(signature_raw_bytes))
+
+ request.update(signature=signature)
+
+ def initialize_private_key(self) -> str:
+ """
+ Initialize and publish a new key pair (if needed) and return the private key.
+ The private key is cached across ansible-test invocations, so it is only generated and published once per CI job.
+ """
+ path = os.path.expanduser('~/.ansible-core-ci-private.key')
+
+ if os.path.exists(to_bytes(path)):
+ private_key_pem = read_text_file(path)
+ else:
+ private_key_pem = self.generate_private_key()
+ write_text_file(path, private_key_pem)
+
+ return private_key_pem
+
+ @abc.abstractmethod
+ def sign_bytes(self, payload_bytes: bytes) -> bytes:
+ """Sign the given payload and return the signature, initializing a new key pair if required."""
+
+ @abc.abstractmethod
+ def publish_public_key(self, public_key_pem: str) -> None:
+ """Publish the given public key."""
+
+ @abc.abstractmethod
+ def generate_private_key(self) -> str:
+ """Generate a new key pair, publishing the public key and returning the private key."""
+
+
+class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
+ """Cryptography based public key based authentication helper for Ansible Core CI."""
+ def sign_bytes(self, payload_bytes: bytes) -> bytes:
+ """Sign the given payload and return the signature, initializing a new key pair if required."""
+ # import cryptography here to avoid overhead and failures in environments which do not use/provide it
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.hazmat.primitives.serialization import load_pem_private_key
+
+ private_key_pem = self.initialize_private_key()
+ private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend())
+
+ assert isinstance(private_key, ec.EllipticCurvePrivateKey)
+
+ signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256()))
+
+ return signature_raw_bytes
+
+ def generate_private_key(self) -> str:
+ """Generate a new key pair, publishing the public key and returning the private key."""
+ # import cryptography here to avoid overhead and failures in environments which do not use/provide it
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import ec
+
+ private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
+ public_key = private_key.public_key()
+
+ private_key_pem = to_text(private_key.private_bytes( # type: ignore[attr-defined] # documented method, but missing from type stubs
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=serialization.NoEncryption(),
+ ))
+
+ public_key_pem = to_text(public_key.public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ ))
+
+ self.publish_public_key(public_key_pem)
+
+ return private_key_pem
+
+
+class OpenSSLAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
+ """OpenSSL based public key based authentication helper for Ansible Core CI."""
+ def sign_bytes(self, payload_bytes: bytes) -> bytes:
+ """Sign the given payload and return the signature, initializing a new key pair if required."""
+ private_key_pem = self.initialize_private_key()
+
+ with tempfile.NamedTemporaryFile() as private_key_file:
+ private_key_file.write(to_bytes(private_key_pem))
+ private_key_file.flush()
+
+ with tempfile.NamedTemporaryFile() as payload_file:
+ payload_file.write(payload_bytes)
+ payload_file.flush()
+
+ with tempfile.NamedTemporaryFile() as signature_file:
+ raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True)
+ signature_raw_bytes = signature_file.read()
+
+ return signature_raw_bytes
+
+ def generate_private_key(self) -> str:
+ """Generate a new key pair, publishing the public key and returning the private key."""
+ private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0]
+ public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0]
+
+ self.publish_public_key(public_key_pem)
+
+ return private_key_pem
diff --git a/test/lib/ansible_test/_internal/ci/azp.py b/test/lib/ansible_test/_internal/ci/azp.py
new file mode 100644
index 0000000..9170dfe
--- /dev/null
+++ b/test/lib/ansible_test/_internal/ci/azp.py
@@ -0,0 +1,262 @@
+"""Support code for working with Azure Pipelines."""
+from __future__ import annotations
+
+import os
+import tempfile
+import uuid
+import typing as t
+import urllib.parse
+
+from ..encoding import (
+ to_bytes,
+)
+
+from ..config import (
+ CommonConfig,
+ TestConfig,
+)
+
+from ..git import (
+ Git,
+)
+
+from ..http import (
+ HttpClient,
+)
+
+from ..util import (
+ display,
+ MissingEnvironmentVariable,
+)
+
+from . import (
+ ChangeDetectionNotSupported,
+ CIProvider,
+ CryptographyAuthHelper,
+)
+
+CODE = 'azp'
+
+
+class AzurePipelines(CIProvider):
+ """CI provider implementation for Azure Pipelines."""
+ def __init__(self) -> None:
+ self.auth = AzurePipelinesAuthHelper()
+
+ @staticmethod
+ def is_supported() -> bool:
+ """Return True if this provider is supported in the current running environment."""
+ return os.environ.get('SYSTEM_COLLECTIONURI', '').startswith('https://dev.azure.com/')
+
+ @property
+ def code(self) -> str:
+ """Return a unique code representing this provider."""
+ return CODE
+
+ @property
+ def name(self) -> str:
+ """Return descriptive name for this provider."""
+ return 'Azure Pipelines'
+
+ def generate_resource_prefix(self) -> str:
+ """Return a resource prefix specific to this CI provider."""
+ try:
+ prefix = 'azp-%s-%s-%s' % (
+ os.environ['BUILD_BUILDID'],
+ os.environ['SYSTEM_JOBATTEMPT'],
+ os.environ['SYSTEM_JOBIDENTIFIER'],
+ )
+ except KeyError as ex:
+ raise MissingEnvironmentVariable(name=ex.args[0])
+
+ return prefix
+
+ def get_base_branch(self) -> str:
+ """Return the base branch or an empty string."""
+ base_branch = os.environ.get('SYSTEM_PULLREQUEST_TARGETBRANCH') or os.environ.get('BUILD_SOURCEBRANCHNAME')
+
+ if base_branch:
+ base_branch = 'origin/%s' % base_branch
+
+ return base_branch or ''
+
+ def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]:
+ """Initialize change detection."""
+ result = AzurePipelinesChanges(args)
+
+ if result.is_pr:
+ job_type = 'pull request'
+ else:
+ job_type = 'merge commit'
+
+ display.info('Processing %s for branch %s commit %s' % (job_type, result.branch, result.commit))
+
+ if not args.metadata.changes:
+ args.metadata.populate_changes(result.diff)
+
+ if result.paths is None:
+ # There are several likely causes of this:
+ # - First run on a new branch.
+ # - Too many pull requests passed since the last merge run passed.
+ display.warning('No successful commit found. All tests will be executed.')
+
+ return result.paths
+
+ def supports_core_ci_auth(self) -> bool:
+ """Return True if Ansible Core CI is supported."""
+ return True
+
+ def prepare_core_ci_auth(self) -> dict[str, t.Any]:
+ """Return authentication details for Ansible Core CI."""
+ try:
+ request = dict(
+ org_name=os.environ['SYSTEM_COLLECTIONURI'].strip('/').split('/')[-1],
+ project_name=os.environ['SYSTEM_TEAMPROJECT'],
+ build_id=int(os.environ['BUILD_BUILDID']),
+ task_id=str(uuid.UUID(os.environ['SYSTEM_TASKINSTANCEID'])),
+ )
+ except KeyError as ex:
+ raise MissingEnvironmentVariable(name=ex.args[0])
+
+ self.auth.sign_request(request)
+
+ auth = dict(
+ azp=request,
+ )
+
+ return auth
+
+ def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
+ """Return details about git in the current environment."""
+ changes = AzurePipelinesChanges(args)
+
+ details = dict(
+ base_commit=changes.base_commit,
+ commit=changes.commit,
+ )
+
+ return details
+
+
+class AzurePipelinesAuthHelper(CryptographyAuthHelper):
+ """
+ Authentication helper for Azure Pipelines.
+ Based on cryptography since it is provided by the default Azure Pipelines environment.
+ """
+ def publish_public_key(self, public_key_pem: str) -> None:
+ """Publish the given public key."""
+ try:
+ agent_temp_directory = os.environ['AGENT_TEMPDIRECTORY']
+ except KeyError as ex:
+ raise MissingEnvironmentVariable(name=ex.args[0])
+
+ # the temporary file cannot be deleted because we do not know when the agent has processed it
+ # placing the file in the agent's temp directory allows it to be picked up when the job is running in a container
+ with tempfile.NamedTemporaryFile(prefix='public-key-', suffix='.pem', delete=False, dir=agent_temp_directory) as public_key_file:
+ public_key_file.write(to_bytes(public_key_pem))
+ public_key_file.flush()
+
+ # make the agent aware of the public key by declaring it as an attachment
+ vso_add_attachment('ansible-core-ci', 'public-key.pem', public_key_file.name)
+
+
+class AzurePipelinesChanges:
+ """Change information for an Azure Pipelines build."""
+ def __init__(self, args: CommonConfig) -> None:
+ self.args = args
+ self.git = Git()
+
+ try:
+ self.org_uri = os.environ['SYSTEM_COLLECTIONURI'] # ex: https://dev.azure.com/{org}/
+ self.project = os.environ['SYSTEM_TEAMPROJECT']
+ self.repo_type = os.environ['BUILD_REPOSITORY_PROVIDER'] # ex: GitHub
+ self.source_branch = os.environ['BUILD_SOURCEBRANCH']
+ self.source_branch_name = os.environ['BUILD_SOURCEBRANCHNAME']
+ self.pr_branch_name = os.environ.get('SYSTEM_PULLREQUEST_TARGETBRANCH')
+ except KeyError as ex:
+ raise MissingEnvironmentVariable(name=ex.args[0])
+
+ if self.source_branch.startswith('refs/tags/'):
+ raise ChangeDetectionNotSupported('Change detection is not supported for tags.')
+
+ self.org = self.org_uri.strip('/').split('/')[-1]
+ self.is_pr = self.pr_branch_name is not None
+
+ if self.is_pr:
+ # HEAD is a merge commit of the PR branch into the target branch
+ # HEAD^1 is HEAD of the target branch (first parent of merge commit)
+ # HEAD^2 is HEAD of the PR branch (second parent of merge commit)
+ # see: https://git-scm.com/docs/gitrevisions
+ self.branch = self.pr_branch_name
+ self.base_commit = 'HEAD^1'
+ self.commit = 'HEAD^2'
+ else:
+ commits = self.get_successful_merge_run_commits()
+
+ self.branch = self.source_branch_name
+ self.base_commit = self.get_last_successful_commit(commits)
+ self.commit = 'HEAD'
+
+ self.commit = self.git.run_git(['rev-parse', self.commit]).strip()
+
+ if self.base_commit:
+ self.base_commit = self.git.run_git(['rev-parse', self.base_commit]).strip()
+
+ # <commit>...<commit>
+ # This form is to view the changes on the branch containing and up to the second <commit>, starting at a common ancestor of both <commit>.
+ # see: https://git-scm.com/docs/git-diff
+ dot_range = '%s...%s' % (self.base_commit, self.commit)
+
+ self.paths = sorted(self.git.get_diff_names([dot_range]))
+ self.diff = self.git.get_diff([dot_range])
+ else:
+ self.paths = None # act as though change detection not enabled, do not filter targets
+ self.diff = []
+
+ def get_successful_merge_run_commits(self) -> set[str]:
+ """Return a set of recent successsful merge commits from Azure Pipelines."""
+ parameters = dict(
+ maxBuildsPerDefinition=100, # max 5000
+ queryOrder='queueTimeDescending', # assumes under normal circumstances that later queued jobs are for later commits
+ resultFilter='succeeded',
+ reasonFilter='batchedCI', # may miss some non-PR reasons, the alternative is to filter the list after receiving it
+ repositoryType=self.repo_type,
+ repositoryId='%s/%s' % (self.org, self.project),
+ )
+
+ url = '%s%s/_apis/build/builds?api-version=6.0&%s' % (self.org_uri, self.project, urllib.parse.urlencode(parameters))
+
+ http = HttpClient(self.args, always=True)
+ response = http.get(url)
+
+ # noinspection PyBroadException
+ try:
+ result = response.json()
+ except Exception: # pylint: disable=broad-except
+ # most likely due to a private project, which returns an HTTP 203 response with HTML
+ display.warning('Unable to find project. Cannot determine changes. All tests will be executed.')
+ return set()
+
+ commits = set(build['sourceVersion'] for build in result['value'])
+
+ return commits
+
+ def get_last_successful_commit(self, commits: set[str]) -> t.Optional[str]:
+ """Return the last successful commit from git history that is found in the given commit list, or None."""
+ commit_history = self.git.get_rev_list(max_count=100)
+ ordered_successful_commits = [commit for commit in commit_history if commit in commits]
+ last_successful_commit = ordered_successful_commits[0] if ordered_successful_commits else None
+ return last_successful_commit
+
+
+def vso_add_attachment(file_type: str, file_name: str, path: str) -> None:
+ """Upload and attach a file to the current timeline record."""
+ vso('task.addattachment', dict(type=file_type, name=file_name), path)
+
+
+def vso(name: str, data: dict[str, str], message: str) -> None:
+ """
+ Write a logging command for the Azure Pipelines agent to process.
+ See: https://docs.microsoft.com/en-us/azure/devops/pipelines/scripts/logging-commands?view=azure-devops&tabs=bash
+ """
+ display.info('##vso[%s %s]%s' % (name, ';'.join('='.join((key, value)) for key, value in data.items()), message))
diff --git a/test/lib/ansible_test/_internal/ci/local.py b/test/lib/ansible_test/_internal/ci/local.py
new file mode 100644
index 0000000..ec03194
--- /dev/null
+++ b/test/lib/ansible_test/_internal/ci/local.py
@@ -0,0 +1,212 @@
+"""Support code for working without a supported CI provider."""
+from __future__ import annotations
+
+import os
+import platform
+import random
+import re
+import typing as t
+
+from ..config import (
+ CommonConfig,
+ TestConfig,
+)
+
+from ..io import (
+ read_text_file,
+)
+
+from ..git import (
+ Git,
+)
+
+from ..util import (
+ ApplicationError,
+ display,
+ is_binary_file,
+ SubprocessError,
+)
+
+from . import (
+ CIProvider,
+)
+
+CODE = '' # not really a CI provider, so use an empty string for the code
+
+
+class Local(CIProvider):
+ """CI provider implementation when not using CI."""
+ priority = 1000
+
+ @staticmethod
+ def is_supported() -> bool:
+ """Return True if this provider is supported in the current running environment."""
+ return True
+
+ @property
+ def code(self) -> str:
+ """Return a unique code representing this provider."""
+ return CODE
+
+ @property
+ def name(self) -> str:
+ """Return descriptive name for this provider."""
+ return 'Local'
+
+ def generate_resource_prefix(self) -> str:
+ """Return a resource prefix specific to this CI provider."""
+ prefix = 'ansible-test-%d-%s' % (
+ random.randint(10000000, 99999999),
+ platform.node().split('.')[0],
+ )
+
+ return prefix
+
+ def get_base_branch(self) -> str:
+ """Return the base branch or an empty string."""
+ return ''
+
+ def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]:
+ """Initialize change detection."""
+ result = LocalChanges(args)
+
+ display.info('Detected branch %s forked from %s at commit %s' % (
+ result.current_branch, result.fork_branch, result.fork_point))
+
+ if result.untracked and not args.untracked:
+ display.warning('Ignored %s untracked file(s). Use --untracked to include them.' %
+ len(result.untracked))
+
+ if result.committed and not args.committed:
+ display.warning('Ignored %s committed change(s). Omit --ignore-committed to include them.' %
+ len(result.committed))
+
+ if result.staged and not args.staged:
+ display.warning('Ignored %s staged change(s). Omit --ignore-staged to include them.' %
+ len(result.staged))
+
+ if result.unstaged and not args.unstaged:
+ display.warning('Ignored %s unstaged change(s). Omit --ignore-unstaged to include them.' %
+ len(result.unstaged))
+
+ names = set()
+
+ if args.tracked:
+ names |= set(result.tracked)
+ if args.untracked:
+ names |= set(result.untracked)
+ if args.committed:
+ names |= set(result.committed)
+ if args.staged:
+ names |= set(result.staged)
+ if args.unstaged:
+ names |= set(result.unstaged)
+
+ if not args.metadata.changes:
+ args.metadata.populate_changes(result.diff)
+
+ for path in result.untracked:
+ if is_binary_file(path):
+ args.metadata.changes[path] = ((0, 0),)
+ continue
+
+ line_count = len(read_text_file(path).splitlines())
+
+ args.metadata.changes[path] = ((1, line_count),)
+
+ return sorted(names)
+
+ def supports_core_ci_auth(self) -> bool:
+ """Return True if Ansible Core CI is supported."""
+ path = self._get_aci_key_path()
+ return os.path.exists(path)
+
+ def prepare_core_ci_auth(self) -> dict[str, t.Any]:
+ """Return authentication details for Ansible Core CI."""
+ path = self._get_aci_key_path()
+ auth_key = read_text_file(path).strip()
+
+ request = dict(
+ key=auth_key,
+ nonce=None,
+ )
+
+ auth = dict(
+ remote=request,
+ )
+
+ return auth
+
+ def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
+ """Return details about git in the current environment."""
+ return None # not yet implemented for local
+
+ @staticmethod
+ def _get_aci_key_path() -> str:
+ path = os.path.expanduser('~/.ansible-core-ci.key')
+ return path
+
+
+class InvalidBranch(ApplicationError):
+ """Exception for invalid branch specification."""
+ def __init__(self, branch: str, reason: str) -> None:
+ message = 'Invalid branch: %s\n%s' % (branch, reason)
+
+ super().__init__(message)
+
+ self.branch = branch
+
+
+class LocalChanges:
+ """Change information for local work."""
+ def __init__(self, args: TestConfig) -> None:
+ self.args = args
+ self.git = Git()
+
+ self.current_branch = self.git.get_branch()
+
+ if self.is_official_branch(self.current_branch):
+ raise InvalidBranch(branch=self.current_branch,
+ reason='Current branch is not a feature branch.')
+
+ self.fork_branch = None
+ self.fork_point = None
+
+ self.local_branches = sorted(self.git.get_branches())
+ self.official_branches = sorted([b for b in self.local_branches if self.is_official_branch(b)])
+
+ for self.fork_branch in self.official_branches:
+ try:
+ self.fork_point = self.git.get_branch_fork_point(self.fork_branch)
+ break
+ except SubprocessError:
+ pass
+
+ if self.fork_point is None:
+ raise ApplicationError('Unable to auto-detect fork branch and fork point.')
+
+ # tracked files (including unchanged)
+ self.tracked = sorted(self.git.get_file_names(['--cached']))
+ # untracked files (except ignored)
+ self.untracked = sorted(self.git.get_file_names(['--others', '--exclude-standard']))
+ # tracked changes (including deletions) committed since the branch was forked
+ self.committed = sorted(self.git.get_diff_names([self.fork_point, 'HEAD']))
+ # tracked changes (including deletions) which are staged
+ self.staged = sorted(self.git.get_diff_names(['--cached']))
+ # tracked changes (including deletions) which are not staged
+ self.unstaged = sorted(self.git.get_diff_names([]))
+ # diff of all tracked files from fork point to working copy
+ self.diff = self.git.get_diff([self.fork_point])
+
+ def is_official_branch(self, name: str) -> bool:
+ """Return True if the given branch name an official branch for development or releases."""
+ if self.args.base_branch:
+ return name == self.args.base_branch
+
+ if name == 'devel':
+ return True
+
+ if re.match(r'^stable-[0-9]+\.[0-9]+$', name):
+ return True
+
+ return False