summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/integration/cloud
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/integration/cloud')
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py389
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/acme.py79
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/aws.py131
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/azure.py166
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py62
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/cs.py174
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py55
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py94
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py168
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py55
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py106
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py92
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/nios.py97
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py60
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py114
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py56
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py138
-rw-r--r--test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py55
18 files changed, 2091 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py
new file mode 100644
index 0000000..0c078b9
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py
@@ -0,0 +1,389 @@
+"""Plugin system for cloud providers and environments for use in integration tests."""
+from __future__ import annotations
+
+import abc
+import atexit
+import datetime
+import os
+import re
+import tempfile
+import time
+import typing as t
+
+from ....encoding import (
+ to_bytes,
+)
+
+from ....io import (
+ read_text_file,
+)
+
+from ....util import (
+ ANSIBLE_TEST_CONFIG_ROOT,
+ ApplicationError,
+ display,
+ import_plugins,
+ load_plugins,
+ cache,
+)
+
+from ....util_common import (
+ ResultType,
+ write_json_test_results,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....config import (
+ IntegrationConfig,
+ TestConfig,
+)
+
+from ....ci import (
+ get_ci_provider,
+)
+
+from ....data import (
+ data_context,
+)
+
+from ....docker_util import (
+ docker_available,
+)
+
+
+@cache
+def get_cloud_plugins() -> tuple[dict[str, t.Type[CloudProvider]], dict[str, t.Type[CloudEnvironment]]]:
+ """Import cloud plugins and load them into the plugin dictionaries."""
+ import_plugins('commands/integration/cloud')
+
+ providers: dict[str, t.Type[CloudProvider]] = {}
+ environments: dict[str, t.Type[CloudEnvironment]] = {}
+
+ load_plugins(CloudProvider, providers)
+ load_plugins(CloudEnvironment, environments)
+
+ return providers, environments
+
+
+@cache
+def get_provider_plugins() -> dict[str, t.Type[CloudProvider]]:
+ """Return a dictionary of the available cloud provider plugins."""
+ return get_cloud_plugins()[0]
+
+
+@cache
+def get_environment_plugins() -> dict[str, t.Type[CloudEnvironment]]:
+ """Return a dictionary of the available cloud environment plugins."""
+ return get_cloud_plugins()[1]
+
+
+def get_cloud_platforms(args: TestConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[str]:
+ """Return cloud platform names for the specified targets."""
+ if isinstance(args, IntegrationConfig):
+ if args.list_targets:
+ return []
+
+ if targets is None:
+ cloud_platforms = set(args.metadata.cloud_config or [])
+ else:
+ cloud_platforms = set(get_cloud_platform(target) for target in targets)
+
+ cloud_platforms.discard(None)
+
+ return sorted(cloud_platforms)
+
+
+def get_cloud_platform(target: IntegrationTarget) -> t.Optional[str]:
+ """Return the name of the cloud platform used for the given target, or None if no cloud platform is used."""
+ cloud_platforms = set(a.split('/')[1] for a in target.aliases if a.startswith('cloud/') and a.endswith('/') and a != 'cloud/')
+
+ if not cloud_platforms:
+ return None
+
+ if len(cloud_platforms) == 1:
+ cloud_platform = cloud_platforms.pop()
+
+ if cloud_platform not in get_provider_plugins():
+ raise ApplicationError('Target %s aliases contains unknown cloud platform: %s' % (target.name, cloud_platform))
+
+ return cloud_platform
+
+ raise ApplicationError('Target %s aliases contains multiple cloud platforms: %s' % (target.name, ', '.join(sorted(cloud_platforms))))
+
+
+def get_cloud_providers(args: IntegrationConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[CloudProvider]:
+ """Return a list of cloud providers for the given targets."""
+ return [get_provider_plugins()[p](args) for p in get_cloud_platforms(args, targets)]
+
+
+def get_cloud_environment(args: IntegrationConfig, target: IntegrationTarget) -> t.Optional[CloudEnvironment]:
+ """Return the cloud environment for the given target, or None if no cloud environment is used for the target."""
+ cloud_platform = get_cloud_platform(target)
+
+ if not cloud_platform:
+ return None
+
+ return get_environment_plugins()[cloud_platform](args)
+
+
+def cloud_filter(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> list[str]:
+ """Return a list of target names to exclude based on the given targets."""
+ if args.metadata.cloud_config is not None:
+ return [] # cloud filter already performed prior to delegation
+
+ exclude: list[str] = []
+
+ for provider in get_cloud_providers(args, targets):
+ provider.filter(targets, exclude)
+
+ return exclude
+
+
+def cloud_init(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> None:
+ """Initialize cloud plugins for the given targets."""
+ if args.metadata.cloud_config is not None:
+ return # cloud configuration already established prior to delegation
+
+ args.metadata.cloud_config = {}
+
+ results = {}
+
+ for provider in get_cloud_providers(args, targets):
+ if args.prime_containers and not provider.uses_docker:
+ continue
+
+ args.metadata.cloud_config[provider.platform] = {}
+
+ start_time = time.time()
+ provider.setup()
+ end_time = time.time()
+
+ results[provider.platform] = dict(
+ platform=provider.platform,
+ setup_seconds=int(end_time - start_time),
+ targets=[target.name for target in targets],
+ )
+
+ if not args.explain and results:
+ result_name = '%s-%s.json' % (
+ args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0))))
+
+ data = dict(
+ clouds=results,
+ )
+
+ write_json_test_results(ResultType.DATA, result_name, data)
+
+
+class CloudBase(metaclass=abc.ABCMeta):
+ """Base class for cloud plugins."""
+ _CONFIG_PATH = 'config_path'
+ _RESOURCE_PREFIX = 'resource_prefix'
+ _MANAGED = 'managed'
+ _SETUP_EXECUTED = 'setup_executed'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ self.args = args
+ self.platform = self.__module__.rsplit('.', 1)[-1]
+
+ def config_callback(files: list[tuple[str, str]]) -> None:
+ """Add the config file to the payload file list."""
+ if self.platform not in self.args.metadata.cloud_config:
+ return # platform was initialized, but not used -- such as being skipped due to all tests being disabled
+
+ if self._get_cloud_config(self._CONFIG_PATH, ''):
+ pair = (self.config_path, os.path.relpath(self.config_path, data_context().content.root))
+
+ if pair not in files:
+ display.info('Including %s config: %s -> %s' % (self.platform, pair[0], pair[1]), verbosity=3)
+ files.append(pair)
+
+ data_context().register_payload_callback(config_callback)
+
+ @property
+ def setup_executed(self) -> bool:
+ """True if setup has been executed, otherwise False."""
+ return t.cast(bool, self._get_cloud_config(self._SETUP_EXECUTED, False))
+
+ @setup_executed.setter
+ def setup_executed(self, value: bool) -> None:
+ """True if setup has been executed, otherwise False."""
+ self._set_cloud_config(self._SETUP_EXECUTED, value)
+
+ @property
+ def config_path(self) -> str:
+ """Path to the configuration file."""
+ return os.path.join(data_context().content.root, str(self._get_cloud_config(self._CONFIG_PATH)))
+
+ @config_path.setter
+ def config_path(self, value: str) -> None:
+ """Path to the configuration file."""
+ self._set_cloud_config(self._CONFIG_PATH, value)
+
+ @property
+ def resource_prefix(self) -> str:
+ """Resource prefix."""
+ return str(self._get_cloud_config(self._RESOURCE_PREFIX))
+
+ @resource_prefix.setter
+ def resource_prefix(self, value: str) -> None:
+ """Resource prefix."""
+ self._set_cloud_config(self._RESOURCE_PREFIX, value)
+
+ @property
+ def managed(self) -> bool:
+ """True if resources are managed by ansible-test, otherwise False."""
+ return t.cast(bool, self._get_cloud_config(self._MANAGED))
+
+ @managed.setter
+ def managed(self, value: bool) -> None:
+ """True if resources are managed by ansible-test, otherwise False."""
+ self._set_cloud_config(self._MANAGED, value)
+
+ def _get_cloud_config(self, key: str, default: t.Optional[t.Union[str, int, bool]] = None) -> t.Union[str, int, bool]:
+ """Return the specified value from the internal configuration."""
+ if default is not None:
+ return self.args.metadata.cloud_config[self.platform].get(key, default)
+
+ return self.args.metadata.cloud_config[self.platform][key]
+
+ def _set_cloud_config(self, key: str, value: t.Union[str, int, bool]) -> None:
+ """Set the specified key and value in the internal configuration."""
+ self.args.metadata.cloud_config[self.platform][key] = value
+
+
+class CloudProvider(CloudBase):
+ """Base class for cloud provider plugins. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig, config_extension: str = '.ini') -> None:
+ super().__init__(args)
+
+ self.ci_provider = get_ci_provider()
+ self.remove_config = False
+ self.config_static_name = 'cloud-config-%s%s' % (self.platform, config_extension)
+ self.config_static_path = os.path.join(data_context().content.integration_path, self.config_static_name)
+ self.config_template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, '%s.template' % self.config_static_name)
+ self.config_extension = config_extension
+
+ self.uses_config = False
+ self.uses_docker = False
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ if not self.uses_docker and not self.uses_config:
+ return
+
+ if self.uses_docker and docker_available():
+ return
+
+ if self.uses_config and os.path.exists(self.config_static_path):
+ return
+
+ skip = 'cloud/%s/' % self.platform
+ skipped = [target.name for target in targets if skip in target.aliases]
+
+ if skipped:
+ exclude.append(skip)
+
+ if not self.uses_docker and self.uses_config:
+ display.warning('Excluding tests marked "%s" which require a "%s" config file (see "%s"): %s'
+ % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped)))
+ elif self.uses_docker and not self.uses_config:
+ display.warning('Excluding tests marked "%s" which requires container support: %s'
+ % (skip.rstrip('/'), ', '.join(skipped)))
+ elif self.uses_docker and self.uses_config:
+ display.warning('Excluding tests marked "%s" which requires container support or a "%s" config file (see "%s"): %s'
+ % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped)))
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ self.resource_prefix = self.ci_provider.generate_resource_prefix()
+ self.resource_prefix = re.sub(r'[^a-zA-Z0-9]+', '-', self.resource_prefix)[:63].lower().rstrip('-')
+
+ atexit.register(self.cleanup)
+
+ def cleanup(self) -> None:
+ """Clean up the cloud resource and any temporary configuration files after tests complete."""
+ if self.remove_config:
+ os.remove(self.config_path)
+
+ def _use_static_config(self) -> bool:
+ """Use a static config file if available. Returns True if static config is used, otherwise returns False."""
+ if os.path.isfile(self.config_static_path):
+ display.info('Using existing %s cloud config: %s' % (self.platform, self.config_static_path), verbosity=1)
+ self.config_path = self.config_static_path
+ static = True
+ else:
+ static = False
+
+ self.managed = not static
+
+ return static
+
+ def _write_config(self, content: str) -> None:
+ """Write the given content to the config file."""
+ prefix = '%s-' % os.path.splitext(os.path.basename(self.config_static_path))[0]
+
+ with tempfile.NamedTemporaryFile(dir=data_context().content.integration_path, prefix=prefix, suffix=self.config_extension, delete=False) as config_fd:
+ filename = os.path.join(data_context().content.integration_path, os.path.basename(config_fd.name))
+
+ self.config_path = filename
+ self.remove_config = True
+
+ display.info('>>> Config: %s\n%s' % (filename, content.strip()), verbosity=3)
+
+ config_fd.write(to_bytes(content))
+ config_fd.flush()
+
+ def _read_config_template(self) -> str:
+ """Read and return the configuration template."""
+ lines = read_text_file(self.config_template_path).splitlines()
+ lines = [line for line in lines if not line.startswith('#')]
+ config = '\n'.join(lines).strip() + '\n'
+ return config
+
+ @staticmethod
+ def _populate_config_template(template: str, values: dict[str, str]) -> str:
+ """Populate and return the given template with the provided values."""
+ for key in sorted(values):
+ value = values[key]
+ template = template.replace('@%s' % key, value)
+
+ return template
+
+
+class CloudEnvironment(CloudBase):
+ """Base class for cloud environment plugins. Updates integration test environment after delegation."""
+ def setup_once(self) -> None:
+ """Run setup if it has not already been run."""
+ if self.setup_executed:
+ return
+
+ self.setup()
+ self.setup_executed = True
+
+ def setup(self) -> None:
+ """Setup which should be done once per environment instead of once per test target."""
+
+ @abc.abstractmethod
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+
+ def on_failure(self, target: IntegrationTarget, tries: int) -> None:
+ """Callback to run when an integration target fails."""
+
+
+class CloudEnvironmentConfig:
+ """Configuration for the environment."""
+ def __init__(self,
+ env_vars: t.Optional[dict[str, str]] = None,
+ ansible_vars: t.Optional[dict[str, t.Any]] = None,
+ module_defaults: t.Optional[dict[str, dict[str, t.Any]]] = None,
+ callback_plugins: t.Optional[list[str]] = None,
+ ):
+ self.env_vars = env_vars
+ self.ansible_vars = ansible_vars
+ self.module_defaults = module_defaults
+ self.callback_plugins = callback_plugins
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py
new file mode 100644
index 0000000..007d383
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py
@@ -0,0 +1,79 @@
+"""ACME plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class ACMEProvider(CloudProvider):
+ """ACME plugin. Sets up cloud resources for tests."""
+ DOCKER_SIMULATOR_NAME = 'acme-simulator'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ # The simulator must be pinned to a specific version to guarantee CI passes with the version used.
+ if os.environ.get('ANSIBLE_ACME_CONTAINER'):
+ self.image = os.environ.get('ANSIBLE_ACME_CONTAINER')
+ else:
+ self.image = 'quay.io/ansible/acme-test-container:2.1.0'
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Create a ACME test container using docker."""
+ ports = [
+ 5000, # control port for flask app in container
+ 14000, # Pebble ACME CA
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('acme_host', self.DOCKER_SIMULATOR_NAME)
+
+ def _setup_static(self) -> None:
+ raise NotImplementedError()
+
+
+class ACMEEnvironment(CloudEnvironment):
+ """ACME environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ ansible_vars = dict(
+ acme_host=self._get_cloud_config('acme_host'),
+ )
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py
new file mode 100644
index 0000000..234f311
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py
@@ -0,0 +1,131 @@
+"""AWS plugin for integration tests."""
+from __future__ import annotations
+
+import os
+import uuid
+import configparser
+import typing as t
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....core_ci import (
+ AnsibleCoreCI,
+ CloudResource,
+)
+
+from ....host_configs import (
+ OriginConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class AwsCloudProvider(CloudProvider):
+ """AWS cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ aci = self._create_ansible_core_ci()
+
+ if aci.available:
+ return
+
+ super().filter(targets, exclude)
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ aws_config_path = os.path.expanduser('~/.aws')
+
+ if os.path.exists(aws_config_path) and isinstance(self.args.controller, OriginConfig):
+ raise ApplicationError('Rename "%s" or use the --docker or --remote option to isolate tests.' % aws_config_path)
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Request AWS credentials through the Ansible Core CI service."""
+ display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1)
+
+ config = self._read_config_template()
+
+ aci = self._create_ansible_core_ci()
+
+ response = aci.start()
+
+ if not self.args.explain:
+ credentials = response['aws']['credentials']
+
+ values = dict(
+ ACCESS_KEY=credentials['access_key'],
+ SECRET_KEY=credentials['secret_key'],
+ SECURITY_TOKEN=credentials['session_token'],
+ REGION='us-east-1',
+ )
+
+ display.sensitive.add(values['SECRET_KEY'])
+ display.sensitive.add(values['SECURITY_TOKEN'])
+
+ config = self._populate_config_template(config, values)
+
+ self._write_config(config)
+
+ def _create_ansible_core_ci(self) -> AnsibleCoreCI:
+ """Return an AWS instance of AnsibleCoreCI."""
+ return AnsibleCoreCI(self.args, CloudResource(platform='aws'))
+
+
+class AwsCloudEnvironment(CloudEnvironment):
+ """AWS cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ ansible_vars: dict[str, t.Any] = dict(
+ resource_prefix=self.resource_prefix,
+ tiny_prefix=uuid.uuid4().hex[0:12]
+ )
+
+ ansible_vars.update(dict(parser.items('default')))
+
+ display.sensitive.add(ansible_vars.get('aws_secret_key'))
+ display.sensitive.add(ansible_vars.get('security_token'))
+
+ if 'aws_cleanup' not in ansible_vars:
+ ansible_vars['aws_cleanup'] = not self.managed
+
+ env_vars = {'ANSIBLE_DEBUG_BOTOCORE_LOGS': 'True'}
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ callback_plugins=['aws_resource_actions'],
+ )
+
+ def on_failure(self, target: IntegrationTarget, tries: int) -> None:
+ """Callback to run when an integration target fails."""
+ if not tries and self.managed:
+ display.notice('If %s failed due to permissions, the IAM test policy may need to be updated. '
+ 'https://docs.ansible.com/ansible/devel/collections/amazon/aws/docsite/dev_guidelines.html#aws-permissions-for-integration-tests'
+ % target.name)
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py
new file mode 100644
index 0000000..dc5136a
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py
@@ -0,0 +1,166 @@
+"""Azure plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+import typing as t
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....core_ci import (
+ AnsibleCoreCI,
+ CloudResource,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class AzureCloudProvider(CloudProvider):
+ """Azure cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.aci: t.Optional[AnsibleCoreCI] = None
+
+ self.uses_config = True
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ aci = self._create_ansible_core_ci()
+
+ if aci.available:
+ return
+
+ super().filter(targets, exclude)
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ get_config(self.config_path) # check required variables
+
+ def cleanup(self) -> None:
+ """Clean up the cloud resource and any temporary configuration files after tests complete."""
+ if self.aci:
+ self.aci.stop()
+
+ super().cleanup()
+
+ def _setup_dynamic(self) -> None:
+ """Request Azure credentials through ansible-core-ci."""
+ display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1)
+
+ config = self._read_config_template()
+ response = {}
+
+ aci = self._create_ansible_core_ci()
+
+ aci_result = aci.start()
+
+ if not self.args.explain:
+ response = aci_result['azure']
+ self.aci = aci
+
+ if not self.args.explain:
+ values = dict(
+ AZURE_CLIENT_ID=response['clientId'],
+ AZURE_SECRET=response['clientSecret'],
+ AZURE_SUBSCRIPTION_ID=response['subscriptionId'],
+ AZURE_TENANT=response['tenantId'],
+ RESOURCE_GROUP=response['resourceGroupNames'][0],
+ RESOURCE_GROUP_SECONDARY=response['resourceGroupNames'][1],
+ )
+
+ display.sensitive.add(values['AZURE_SECRET'])
+
+ config = '\n'.join('%s: %s' % (key, values[key]) for key in sorted(values))
+
+ config = '[default]\n' + config
+
+ self._write_config(config)
+
+ def _create_ansible_core_ci(self) -> AnsibleCoreCI:
+ """Return an Azure instance of AnsibleCoreCI."""
+ return AnsibleCoreCI(self.args, CloudResource(platform='azure'))
+
+
+class AzureCloudEnvironment(CloudEnvironment):
+ """Azure cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ env_vars = get_config(self.config_path)
+
+ display.sensitive.add(env_vars.get('AZURE_SECRET'))
+ display.sensitive.add(env_vars.get('AZURE_PASSWORD'))
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items()))
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
+
+ def on_failure(self, target: IntegrationTarget, tries: int) -> None:
+ """Callback to run when an integration target fails."""
+ if not tries and self.managed:
+ display.notice('If %s failed due to permissions, the test policy may need to be updated.' % target.name)
+
+
+def get_config(config_path: str) -> dict[str, str]:
+ """Return a configuration dictionary parsed from the given configuration path."""
+ parser = configparser.ConfigParser()
+ parser.read(config_path)
+
+ config = dict((key.upper(), value) for key, value in parser.items('default'))
+
+ rg_vars = (
+ 'RESOURCE_GROUP',
+ 'RESOURCE_GROUP_SECONDARY',
+ )
+
+ sp_vars = (
+ 'AZURE_CLIENT_ID',
+ 'AZURE_SECRET',
+ 'AZURE_SUBSCRIPTION_ID',
+ 'AZURE_TENANT',
+ )
+
+ ad_vars = (
+ 'AZURE_AD_USER',
+ 'AZURE_PASSWORD',
+ 'AZURE_SUBSCRIPTION_ID',
+ )
+
+ rg_ok = all(var in config for var in rg_vars)
+ sp_ok = all(var in config for var in sp_vars)
+ ad_ok = all(var in config for var in ad_vars)
+
+ if not rg_ok:
+ raise ApplicationError('Resource groups must be defined with: %s' % ', '.join(sorted(rg_vars)))
+
+ if not sp_ok and not ad_ok:
+ raise ApplicationError('Credentials must be defined using either:\nService Principal: %s\nActive Directory: %s' % (
+ ', '.join(sorted(sp_vars)), ', '.join(sorted(ad_vars))))
+
+ return config
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py
new file mode 100644
index 0000000..f453ef3
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+#
+# (c) 2018, Gaudenz Steinlin <gaudenz.steinlin@cloudscale.ch>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+"""Cloudscale plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class CloudscaleCloudProvider(CloudProvider):
+ """Cloudscale cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class CloudscaleCloudEnvironment(CloudEnvironment):
+ """Cloudscale cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ CLOUDSCALE_API_TOKEN=parser.get('default', 'cloudscale_api_token'),
+ )
+
+ display.sensitive.add(env_vars['CLOUDSCALE_API_TOKEN'])
+
+ ansible_vars = dict(
+ cloudscale_resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items()))
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py
new file mode 100644
index 0000000..0037b42
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py
@@ -0,0 +1,174 @@
+"""CloudStack plugin for integration tests."""
+from __future__ import annotations
+
+import json
+import configparser
+import os
+import urllib.parse
+import typing as t
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....docker_util import (
+ docker_exec,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+ wait_for_file,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class CsCloudProvider(CloudProvider):
+ """CloudStack cloud provider plugin. Sets up cloud resources before delegation."""
+ DOCKER_SIMULATOR_NAME = 'cloudstack-sim'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.image = os.environ.get('ANSIBLE_CLOUDSTACK_CONTAINER', 'quay.io/ansible/cloudstack-test-container:1.4.0')
+ self.host = ''
+ self.port = 0
+
+ self.uses_docker = True
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_static(self) -> None:
+ """Configure CloudStack tests for use with static configuration."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_static_path)
+
+ endpoint = parser.get('cloudstack', 'endpoint')
+
+ parts = urllib.parse.urlparse(endpoint)
+
+ self.host = parts.hostname
+
+ if not self.host:
+ raise ApplicationError('Could not determine host from endpoint: %s' % endpoint)
+
+ if parts.port:
+ self.port = parts.port
+ elif parts.scheme == 'http':
+ self.port = 80
+ elif parts.scheme == 'https':
+ self.port = 443
+ else:
+ raise ApplicationError('Could not determine port from endpoint: %s' % endpoint)
+
+ display.info('Read cs host "%s" and port %d from config: %s' % (self.host, self.port, self.config_static_path), verbosity=1)
+
+ def _setup_dynamic(self) -> None:
+ """Create a CloudStack simulator using docker."""
+ config = self._read_config_template()
+
+ self.port = 8888
+
+ ports = [
+ self.port,
+ ]
+
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ if not descriptor:
+ return
+
+ # apply work-around for OverlayFS issue
+ # https://github.com/docker/for-linux/issues/72#issuecomment-319904698
+ docker_exec(self.args, self.DOCKER_SIMULATOR_NAME, ['find', '/var/lib/mysql', '-type', 'f', '-exec', 'touch', '{}', ';'], capture=True)
+
+ if self.args.explain:
+ values = dict(
+ HOST=self.host,
+ PORT=str(self.port),
+ )
+ else:
+ credentials = self._get_credentials(self.DOCKER_SIMULATOR_NAME)
+
+ values = dict(
+ HOST=self.DOCKER_SIMULATOR_NAME,
+ PORT=str(self.port),
+ KEY=credentials['apikey'],
+ SECRET=credentials['secretkey'],
+ )
+
+ display.sensitive.add(values['SECRET'])
+
+ config = self._populate_config_template(config, values)
+
+ self._write_config(config)
+
+ def _get_credentials(self, container_name: str) -> dict[str, t.Any]:
+ """Wait for the CloudStack simulator to return credentials."""
+ def check(value) -> bool:
+ """Return True if the given configuration is valid JSON, otherwise return False."""
+ # noinspection PyBroadException
+ try:
+ json.loads(value)
+ except Exception: # pylint: disable=broad-except
+ return False # sometimes the file exists but is not yet valid JSON
+
+ return True
+
+ stdout = wait_for_file(self.args, container_name, '/var/www/html/admin.json', sleep=10, tries=30, check=check)
+
+ return json.loads(stdout)
+
+
+class CsCloudEnvironment(CloudEnvironment):
+ """CloudStack cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ config = dict(parser.items('default'))
+
+ env_vars = dict(
+ CLOUDSTACK_ENDPOINT=config['endpoint'],
+ CLOUDSTACK_KEY=config['key'],
+ CLOUDSTACK_SECRET=config['secret'],
+ CLOUDSTACK_TIMEOUT=config['timeout'],
+ )
+
+ display.sensitive.add(env_vars['CLOUDSTACK_SECRET'])
+
+ ansible_vars = dict(
+ cs_resource_prefix=self.resource_prefix,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py
new file mode 100644
index 0000000..a46bf70
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py
@@ -0,0 +1,55 @@
+"""DigitalOcean plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class DigitalOceanCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class DigitalOceanCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ DO_API_KEY=parser.get('default', 'key'),
+ )
+
+ display.sensitive.add(env_vars['DO_API_KEY'])
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py
new file mode 100644
index 0000000..c2413ee
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py
@@ -0,0 +1,94 @@
+"""Foreman plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class ForemanProvider(CloudProvider):
+ """Foreman plugin. Sets up Foreman stub server for tests."""
+ DOCKER_SIMULATOR_NAME = 'foreman-stub'
+
+ # Default image to run Foreman stub from.
+ #
+ # The simulator must be pinned to a specific version
+ # to guarantee CI passes with the version used.
+ #
+ # It's source source itself resides at:
+ # https://github.com/ansible/foreman-test-container
+ DOCKER_IMAGE = 'quay.io/ansible/foreman-test-container:1.4.0'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.__container_from_env = os.environ.get('ANSIBLE_FRMNSIM_CONTAINER')
+ """
+ Overrides target container, might be used for development.
+
+ Use ANSIBLE_FRMNSIM_CONTAINER=whatever_you_want if you want
+ to use other image. Omit/empty otherwise.
+ """
+ self.image = self.__container_from_env or self.DOCKER_IMAGE
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup cloud resource before delegation and reg cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Spawn a Foreman stub within docker container."""
+ foreman_port = 8080
+
+ ports = [
+ foreman_port,
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('FOREMAN_HOST', self.DOCKER_SIMULATOR_NAME)
+ self._set_cloud_config('FOREMAN_PORT', str(foreman_port))
+
+ def _setup_static(self) -> None:
+ raise NotImplementedError()
+
+
+class ForemanEnvironment(CloudEnvironment):
+ """Foreman environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ env_vars = dict(
+ FOREMAN_HOST=str(self._get_cloud_config('FOREMAN_HOST')),
+ FOREMAN_PORT=str(self._get_cloud_config('FOREMAN_PORT')),
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py
new file mode 100644
index 0000000..e180a02
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py
@@ -0,0 +1,168 @@
+"""Galaxy (ansible-galaxy) plugin for integration tests."""
+from __future__ import annotations
+
+import os
+import tempfile
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....docker_util import (
+ docker_cp_to,
+)
+
+from ....containers import (
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+# We add BasicAuthentication, to make the tasks that deal with
+# direct API access easier to deal with across galaxy_ng and pulp
+SETTINGS = b'''
+CONTENT_ORIGIN = 'http://ansible-ci-pulp:80'
+ANSIBLE_API_HOSTNAME = 'http://ansible-ci-pulp:80'
+ANSIBLE_CONTENT_HOSTNAME = 'http://ansible-ci-pulp:80/pulp/content'
+TOKEN_AUTH_DISABLED = True
+GALAXY_REQUIRE_CONTENT_APPROVAL = False
+GALAXY_AUTHENTICATION_CLASSES = [
+ "rest_framework.authentication.SessionAuthentication",
+ "rest_framework.authentication.TokenAuthentication",
+ "rest_framework.authentication.BasicAuthentication",
+]
+'''
+
+SET_ADMIN_PASSWORD = b'''#!/usr/bin/execlineb -S0
+foreground {
+ redirfd -w 1 /dev/null
+ redirfd -w 2 /dev/null
+ export DJANGO_SETTINGS_MODULE pulpcore.app.settings
+ export PULP_CONTENT_ORIGIN localhost
+ s6-setuidgid postgres
+ if { /usr/local/bin/django-admin reset-admin-password --password password }
+ if { /usr/local/bin/pulpcore-manager create-group system:partner-engineers --users admin }
+}
+'''
+
+# There are 2 overrides here:
+# 1. Change the gunicorn bind address from 127.0.0.1 to 0.0.0.0 now that Galaxy NG does not allow us to access the
+# Pulp API through it.
+# 2. Grant access allowing us to DELETE a namespace in Galaxy NG. This is as CI deletes and recreates repos and
+# distributions in Pulp which now breaks the namespace in Galaxy NG. Recreating it is the "simple" fix to get it
+# working again.
+# These may not be needed in the future, especially if 1 becomes configurable by an env var but for now they must be
+# done.
+OVERRIDES = b'''#!/usr/bin/execlineb -S0
+foreground {
+ sed -i "0,/\\"127.0.0.1:24817\\"/s//\\"0.0.0.0:24817\\"/" /etc/services.d/pulpcore-api/run
+}
+
+# This sed calls changes the first occurrence to "allow" which is conveniently the delete operation for a namespace.
+# https://github.com/ansible/galaxy_ng/blob/master/galaxy_ng/app/access_control/statements/standalone.py#L9-L11.
+backtick NG_PREFIX { python -c "import galaxy_ng; print(galaxy_ng.__path__[0], end='')" }
+importas ng_prefix NG_PREFIX
+foreground {
+ sed -i "0,/\\"effect\\": \\"deny\\"/s//\\"effect\\": \\"allow\\"/" ${ng_prefix}/app/access_control/statements/standalone.py
+}'''
+
+
+class GalaxyProvider(CloudProvider):
+ """
+ Galaxy plugin. Sets up pulp (ansible-galaxy) servers for tests.
+ The pulp source itself resides at: https://github.com/pulp/pulp-oci-images
+ """
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ # Cannot use the latest container image as either galaxy_ng 4.2.0rc2 or pulp 0.5.0 has sporatic issues with
+ # dropping published collections in CI. Try running the tests multiple times when updating. Will also need to
+ # comment out the cache tests in 'test/integration/targets/ansible-galaxy-collection/tasks/install.yml' when
+ # the newer update is available.
+ self.pulp = os.environ.get(
+ 'ANSIBLE_PULP_CONTAINER',
+ 'quay.io/ansible/pulp-galaxy-ng:b79a7be64eff'
+ )
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup cloud resource before delegation and reg cleanup callback."""
+ super().setup()
+
+ galaxy_port = 80
+ pulp_host = 'ansible-ci-pulp'
+ pulp_port = 24817
+
+ ports = [
+ galaxy_port,
+ pulp_port,
+ ]
+
+ # Create the container, don't run it, we need to inject configs before it starts
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.pulp,
+ pulp_host,
+ ports,
+ start=False,
+ allow_existing=True,
+ )
+
+ if not descriptor:
+ return
+
+ if not descriptor.running:
+ pulp_id = descriptor.container_id
+
+ injected_files = {
+ '/etc/pulp/settings.py': SETTINGS,
+ '/etc/cont-init.d/111-postgres': SET_ADMIN_PASSWORD,
+ '/etc/cont-init.d/000-ansible-test-overrides': OVERRIDES,
+ }
+ for path, content in injected_files.items():
+ with tempfile.NamedTemporaryFile() as temp_fd:
+ temp_fd.write(content)
+ temp_fd.flush()
+ docker_cp_to(self.args, pulp_id, temp_fd.name, path)
+
+ descriptor.start(self.args)
+
+ self._set_cloud_config('PULP_HOST', pulp_host)
+ self._set_cloud_config('PULP_PORT', str(pulp_port))
+ self._set_cloud_config('GALAXY_PORT', str(galaxy_port))
+ self._set_cloud_config('PULP_USER', 'admin')
+ self._set_cloud_config('PULP_PASSWORD', 'password')
+
+
+class GalaxyEnvironment(CloudEnvironment):
+ """Galaxy environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ pulp_user = str(self._get_cloud_config('PULP_USER'))
+ pulp_password = str(self._get_cloud_config('PULP_PASSWORD'))
+ pulp_host = self._get_cloud_config('PULP_HOST')
+ galaxy_port = self._get_cloud_config('GALAXY_PORT')
+ pulp_port = self._get_cloud_config('PULP_PORT')
+
+ return CloudEnvironmentConfig(
+ ansible_vars=dict(
+ pulp_user=pulp_user,
+ pulp_password=pulp_password,
+ pulp_api='http://%s:%s' % (pulp_host, pulp_port),
+ pulp_server='http://%s:%s/pulp_ansible/galaxy/' % (pulp_host, pulp_port),
+ galaxy_ng_server='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port),
+ ),
+ env_vars=dict(
+ PULP_USER=pulp_user,
+ PULP_PASSWORD=pulp_password,
+ PULP_SERVER='http://%s:%s/pulp_ansible/galaxy/api/' % (pulp_host, pulp_port),
+ GALAXY_NG_SERVER='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port),
+ ),
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py
new file mode 100644
index 0000000..28ffb7b
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py
@@ -0,0 +1,55 @@
+# Copyright: (c) 2018, Google Inc.
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""GCP plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class GcpCloudProvider(CloudProvider):
+ """GCP cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ display.notice(
+ 'static configuration could not be used. are you missing a template file?'
+ )
+
+
+class GcpCloudEnvironment(CloudEnvironment):
+ """GCP cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict(parser.items('default')))
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py
new file mode 100644
index 0000000..4d75f22
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py
@@ -0,0 +1,106 @@
+"""Hetzner Cloud plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....target import (
+ IntegrationTarget,
+)
+
+from ....core_ci import (
+ AnsibleCoreCI,
+ CloudResource,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class HcloudCloudProvider(CloudProvider):
+ """Hetzner Cloud provider plugin. Sets up cloud resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None:
+ """Filter out the cloud tests when the necessary config and resources are not available."""
+ aci = self._create_ansible_core_ci()
+
+ if aci.available:
+ return
+
+ super().filter(targets, exclude)
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Request Hetzner credentials through the Ansible Core CI service."""
+ display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1)
+
+ config = self._read_config_template()
+
+ aci = self._create_ansible_core_ci()
+
+ response = aci.start()
+
+ if not self.args.explain:
+ token = response['hetzner']['token']
+
+ display.sensitive.add(token)
+ display.info('Hetzner Cloud Token: %s' % token, verbosity=1)
+
+ values = dict(
+ TOKEN=token,
+ )
+
+ display.sensitive.add(values['TOKEN'])
+
+ config = self._populate_config_template(config, values)
+
+ self._write_config(config)
+
+ def _create_ansible_core_ci(self) -> AnsibleCoreCI:
+ """Return a Heztner instance of AnsibleCoreCI."""
+ return AnsibleCoreCI(self.args, CloudResource(platform='hetzner'))
+
+
+class HcloudCloudEnvironment(CloudEnvironment):
+ """Hetzner Cloud cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ HCLOUD_TOKEN=parser.get('default', 'hcloud_api_token'),
+ )
+
+ display.sensitive.add(env_vars['HCLOUD_TOKEN'])
+
+ ansible_vars = dict(
+ hcloud_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items()))
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py
new file mode 100644
index 0000000..e250eed
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py
@@ -0,0 +1,92 @@
+"""HTTP Tester plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....util import (
+ display,
+ generate_password,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+KRB5_PASSWORD_ENV = 'KRB5_PASSWORD'
+
+
+class HttptesterProvider(CloudProvider):
+ """HTTP Tester provider plugin. Sets up resources before delegation."""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.image = os.environ.get('ANSIBLE_HTTP_TEST_CONTAINER', 'quay.io/ansible/http-test-container:2.1.0')
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup resources before delegation."""
+ super().setup()
+
+ ports = [
+ 80,
+ 88,
+ 443,
+ 444,
+ 749,
+ ]
+
+ aliases = [
+ 'ansible.http.tests',
+ 'sni1.ansible.http.tests',
+ 'fail.ansible.http.tests',
+ 'self-signed.ansible.http.tests',
+ ]
+
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ 'http-test-container',
+ ports,
+ aliases=aliases,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ env={
+ KRB5_PASSWORD_ENV: generate_password(),
+ },
+ )
+
+ if not descriptor:
+ return
+
+ # Read the password from the container environment.
+ # This allows the tests to work when re-using an existing container.
+ # The password is marked as sensitive, since it may differ from the one we generated.
+ krb5_password = descriptor.details.container.env_dict()[KRB5_PASSWORD_ENV]
+ display.sensitive.add(krb5_password)
+
+ self._set_cloud_config(KRB5_PASSWORD_ENV, krb5_password)
+
+
+class HttptesterEnvironment(CloudEnvironment):
+ """HTTP Tester environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ return CloudEnvironmentConfig(
+ env_vars=dict(
+ HTTPTESTER='1', # backwards compatibility for tests intended to work with or without HTTP Tester
+ KRB5_PASSWORD=str(self._get_cloud_config(KRB5_PASSWORD_ENV)),
+ )
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py
new file mode 100644
index 0000000..df0ebb0
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py
@@ -0,0 +1,97 @@
+"""NIOS plugin for integration tests."""
+from __future__ import annotations
+
+import os
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class NiosProvider(CloudProvider):
+ """Nios plugin. Sets up NIOS mock server for tests."""
+ DOCKER_SIMULATOR_NAME = 'nios-simulator'
+
+ # Default image to run the nios simulator.
+ #
+ # The simulator must be pinned to a specific version
+ # to guarantee CI passes with the version used.
+ #
+ # It's source source itself resides at:
+ # https://github.com/ansible/nios-test-container
+ DOCKER_IMAGE = 'quay.io/ansible/nios-test-container:1.4.0'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.__container_from_env = os.environ.get('ANSIBLE_NIOSSIM_CONTAINER')
+ """
+ Overrides target container, might be used for development.
+
+ Use ANSIBLE_NIOSSIM_CONTAINER=whatever_you_want if you want
+ to use other image. Omit/empty otherwise.
+ """
+
+ self.image = self.__container_from_env or self.DOCKER_IMAGE
+
+ self.uses_docker = True
+
+ def setup(self) -> None:
+ """Setup cloud resource before delegation and reg cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_dynamic(self) -> None:
+ """Spawn a NIOS simulator within docker container."""
+ nios_port = 443
+
+ ports = [
+ nios_port,
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('NIOS_HOST', self.DOCKER_SIMULATOR_NAME)
+
+ def _setup_static(self) -> None:
+ raise NotImplementedError()
+
+
+class NiosEnvironment(CloudEnvironment):
+ """NIOS environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ ansible_vars = dict(
+ nios_provider=dict(
+ host=self._get_cloud_config('NIOS_HOST'),
+ username='admin',
+ password='infoblox',
+ ),
+ )
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py
new file mode 100644
index 0000000..d005a3c
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py
@@ -0,0 +1,60 @@
+"""OpenNebula plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class OpenNebulaCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if not self._use_static_config():
+ self._setup_dynamic()
+
+ self.uses_config = True
+
+ def _setup_dynamic(self) -> None:
+ display.info('No config file provided, will run test from fixtures')
+
+ config = self._read_config_template()
+ values = dict(
+ URL="http://localhost/RPC2",
+ USERNAME='oneadmin',
+ PASSWORD='onepass',
+ FIXTURES='true',
+ REPLAY='true',
+ )
+ config = self._populate_config_template(config, values)
+ self._write_config(config)
+
+
+class OpenNebulaCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+
+ ansible_vars.update(dict(parser.items('default')))
+
+ display.sensitive.add(ansible_vars.get('opennebula_password'))
+
+ return CloudEnvironmentConfig(
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py
new file mode 100644
index 0000000..da930c0
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py
@@ -0,0 +1,114 @@
+"""OpenShift plugin for integration tests."""
+from __future__ import annotations
+
+import re
+
+from ....io import (
+ read_text_file,
+)
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+ wait_for_file,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class OpenShiftCloudProvider(CloudProvider):
+ """OpenShift cloud provider plugin. Sets up cloud resources before delegation."""
+ DOCKER_CONTAINER_NAME = 'openshift-origin'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args, config_extension='.kubeconfig')
+
+ # The image must be pinned to a specific version to guarantee CI passes with the version used.
+ self.image = 'quay.io/ansible/openshift-origin:v3.9.0'
+
+ self.uses_docker = True
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ if self._use_static_config():
+ self._setup_static()
+ else:
+ self._setup_dynamic()
+
+ def _setup_static(self) -> None:
+ """Configure OpenShift tests for use with static configuration."""
+ config = read_text_file(self.config_static_path)
+
+ match = re.search(r'^ *server: (?P<server>.*)$', config, flags=re.MULTILINE)
+
+ if not match:
+ display.warning('Could not find OpenShift endpoint in kubeconfig.')
+
+ def _setup_dynamic(self) -> None:
+ """Create a OpenShift container using docker."""
+ port = 8443
+
+ ports = [
+ port,
+ ]
+
+ cmd = ['start', 'master', '--listen', 'https://0.0.0.0:%d' % port]
+
+ descriptor = run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_CONTAINER_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ cmd=cmd,
+ )
+
+ if not descriptor:
+ return
+
+ if self.args.explain:
+ config = '# Unknown'
+ else:
+ config = self._get_config(self.DOCKER_CONTAINER_NAME, 'https://%s:%s/' % (self.DOCKER_CONTAINER_NAME, port))
+
+ self._write_config(config)
+
+ def _get_config(self, container_name: str, server: str) -> str:
+ """Get OpenShift config from container."""
+ stdout = wait_for_file(self.args, container_name, '/var/lib/origin/openshift.local.config/master/admin.kubeconfig', sleep=10, tries=30)
+
+ config = stdout
+ config = re.sub(r'^( *)certificate-authority-data: .*$', r'\1insecure-skip-tls-verify: true', config, flags=re.MULTILINE)
+ config = re.sub(r'^( *)server: .*$', r'\1server: %s' % server, config, flags=re.MULTILINE)
+
+ return config
+
+
+class OpenShiftCloudEnvironment(CloudEnvironment):
+ """OpenShift cloud environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ env_vars = dict(
+ K8S_AUTH_KUBECONFIG=self.config_path,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py
new file mode 100644
index 0000000..04c2d89
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py
@@ -0,0 +1,56 @@
+"""Scaleway plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class ScalewayCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class ScalewayCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ SCW_API_KEY=parser.get('default', 'key'),
+ SCW_ORG=parser.get('default', 'org')
+ )
+
+ display.sensitive.add(env_vars['SCW_API_KEY'])
+
+ ansible_vars = dict(
+ scw_org=parser.get('default', 'org'),
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py
new file mode 100644
index 0000000..df1651f
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py
@@ -0,0 +1,138 @@
+"""VMware vCenter plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+import os
+
+from ....util import (
+ ApplicationError,
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from ....containers import (
+ CleanupMode,
+ run_support_container,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class VcenterProvider(CloudProvider):
+ """VMware vcenter/esx plugin. Sets up cloud resources for tests."""
+ DOCKER_SIMULATOR_NAME = 'vcenter-simulator'
+
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ # The simulator must be pinned to a specific version to guarantee CI passes with the version used.
+ if os.environ.get('ANSIBLE_VCSIM_CONTAINER'):
+ self.image = os.environ.get('ANSIBLE_VCSIM_CONTAINER')
+ else:
+ self.image = 'quay.io/ansible/vcenter-test-container:1.7.0'
+
+ # VMware tests can be run on govcsim or BYO with a static config file.
+ # The simulator is the default if no config is provided.
+ self.vmware_test_platform = os.environ.get('VMWARE_TEST_PLATFORM', 'govcsim')
+
+ if self.vmware_test_platform == 'govcsim':
+ self.uses_docker = True
+ self.uses_config = False
+ elif self.vmware_test_platform == 'static':
+ self.uses_docker = False
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._set_cloud_config('vmware_test_platform', self.vmware_test_platform)
+
+ if self.vmware_test_platform == 'govcsim':
+ self._setup_dynamic_simulator()
+ self.managed = True
+ elif self.vmware_test_platform == 'static':
+ self._use_static_config()
+ self._setup_static()
+ else:
+ raise ApplicationError('Unknown vmware_test_platform: %s' % self.vmware_test_platform)
+
+ def _setup_dynamic_simulator(self) -> None:
+ """Create a vcenter simulator using docker."""
+ ports = [
+ 443,
+ 8080,
+ 8989,
+ 5000, # control port for flask app in simulator
+ ]
+
+ run_support_container(
+ self.args,
+ self.platform,
+ self.image,
+ self.DOCKER_SIMULATOR_NAME,
+ ports,
+ allow_existing=True,
+ cleanup=CleanupMode.YES,
+ )
+
+ self._set_cloud_config('vcenter_hostname', self.DOCKER_SIMULATOR_NAME)
+
+ def _setup_static(self) -> None:
+ if not os.path.exists(self.config_static_path):
+ raise ApplicationError('Configuration file does not exist: %s' % self.config_static_path)
+
+
+class VcenterEnvironment(CloudEnvironment):
+ """VMware vcenter/esx environment plugin. Updates integration test environment after delegation."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ try:
+ # We may be in a container, so we cannot just reach VMWARE_TEST_PLATFORM,
+ # We do a try/except instead
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path) # static
+
+ env_vars = {}
+ ansible_vars = dict(
+ resource_prefix=self.resource_prefix,
+ )
+ ansible_vars.update(dict(parser.items('DEFAULT', raw=True)))
+ except KeyError: # govcsim
+ env_vars = dict(
+ VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')),
+ VCENTER_USERNAME='user',
+ VCENTER_PASSWORD='pass',
+ )
+
+ ansible_vars = dict(
+ vcsim=str(self._get_cloud_config('vcenter_hostname')),
+ vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')),
+ vcenter_username='user',
+ vcenter_password='pass',
+ )
+
+ for key, value in ansible_vars.items():
+ if key.endswith('_password'):
+ display.sensitive.add(value)
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ module_defaults={
+ 'group/vmware': {
+ 'hostname': ansible_vars['vcenter_hostname'],
+ 'username': ansible_vars['vcenter_username'],
+ 'password': ansible_vars['vcenter_password'],
+ 'port': ansible_vars.get('vcenter_port', '443'),
+ 'validate_certs': ansible_vars.get('vmware_validate_certs', 'no'),
+ },
+ },
+ )
diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py
new file mode 100644
index 0000000..1993cda
--- /dev/null
+++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py
@@ -0,0 +1,55 @@
+"""Vultr plugin for integration tests."""
+from __future__ import annotations
+
+import configparser
+
+from ....util import (
+ display,
+)
+
+from ....config import (
+ IntegrationConfig,
+)
+
+from . import (
+ CloudEnvironment,
+ CloudEnvironmentConfig,
+ CloudProvider,
+)
+
+
+class VultrCloudProvider(CloudProvider):
+ """Checks if a configuration file has been passed or fixtures are going to be used for testing"""
+ def __init__(self, args: IntegrationConfig) -> None:
+ super().__init__(args)
+
+ self.uses_config = True
+
+ def setup(self) -> None:
+ """Setup the cloud resource before delegation and register a cleanup callback."""
+ super().setup()
+
+ self._use_static_config()
+
+
+class VultrCloudEnvironment(CloudEnvironment):
+ """Updates integration test environment after delegation. Will setup the config file as parameter."""
+ def get_environment_config(self) -> CloudEnvironmentConfig:
+ """Return environment configuration for use in the test environment after delegation."""
+ parser = configparser.ConfigParser()
+ parser.read(self.config_path)
+
+ env_vars = dict(
+ VULTR_API_KEY=parser.get('default', 'key'),
+ )
+
+ display.sensitive.add(env_vars['VULTR_API_KEY'])
+
+ ansible_vars = dict(
+ vultr_resource_prefix=self.resource_prefix,
+ )
+
+ return CloudEnvironmentConfig(
+ env_vars=env_vars,
+ ansible_vars=ansible_vars,
+ )