summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py
blob: df1651f92f768d1d021f63036a43b30a70637c1a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""VMware vCenter plugin for integration tests."""
from __future__ import annotations

import configparser
import os

from ....util import (
    ApplicationError,
    display,
)

from ....config import (
    IntegrationConfig,
)

from ....containers import (
    CleanupMode,
    run_support_container,
)

from . import (
    CloudEnvironment,
    CloudEnvironmentConfig,
    CloudProvider,
)


class VcenterProvider(CloudProvider):
    """VMware vcenter/esx plugin. Sets up cloud resources for tests."""
    DOCKER_SIMULATOR_NAME = 'vcenter-simulator'

    def __init__(self, args: IntegrationConfig) -> None:
        super().__init__(args)

        # The simulator must be pinned to a specific version to guarantee CI passes with the version used.
        if os.environ.get('ANSIBLE_VCSIM_CONTAINER'):
            self.image = os.environ.get('ANSIBLE_VCSIM_CONTAINER')
        else:
            self.image = 'quay.io/ansible/vcenter-test-container:1.7.0'

        # VMware tests can be run on govcsim or BYO with a static config file.
        # The simulator is the default if no config is provided.
        self.vmware_test_platform = os.environ.get('VMWARE_TEST_PLATFORM', 'govcsim')

        if self.vmware_test_platform == 'govcsim':
            self.uses_docker = True
            self.uses_config = False
        elif self.vmware_test_platform == 'static':
            self.uses_docker = False
            self.uses_config = True

    def setup(self) -> None:
        """Setup the cloud resource before delegation and register a cleanup callback."""
        super().setup()

        self._set_cloud_config('vmware_test_platform', self.vmware_test_platform)

        if self.vmware_test_platform == 'govcsim':
            self._setup_dynamic_simulator()
            self.managed = True
        elif self.vmware_test_platform == 'static':
            self._use_static_config()
            self._setup_static()
        else:
            raise ApplicationError('Unknown vmware_test_platform: %s' % self.vmware_test_platform)

    def _setup_dynamic_simulator(self) -> None:
        """Create a vcenter simulator using docker."""
        ports = [
            443,
            8080,
            8989,
            5000,  # control port for flask app in simulator
        ]

        run_support_container(
            self.args,
            self.platform,
            self.image,
            self.DOCKER_SIMULATOR_NAME,
            ports,
            allow_existing=True,
            cleanup=CleanupMode.YES,
        )

        self._set_cloud_config('vcenter_hostname', self.DOCKER_SIMULATOR_NAME)

    def _setup_static(self) -> None:
        if not os.path.exists(self.config_static_path):
            raise ApplicationError('Configuration file does not exist: %s' % self.config_static_path)


class VcenterEnvironment(CloudEnvironment):
    """VMware vcenter/esx environment plugin. Updates integration test environment after delegation."""
    def get_environment_config(self) -> CloudEnvironmentConfig:
        """Return environment configuration for use in the test environment after delegation."""
        try:
            # We may be in a container, so we cannot just reach VMWARE_TEST_PLATFORM,
            # We do a try/except instead
            parser = configparser.ConfigParser()
            parser.read(self.config_path)  # static

            env_vars = {}
            ansible_vars = dict(
                resource_prefix=self.resource_prefix,
            )
            ansible_vars.update(dict(parser.items('DEFAULT', raw=True)))
        except KeyError:  # govcsim
            env_vars = dict(
                VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')),
                VCENTER_USERNAME='user',
                VCENTER_PASSWORD='pass',
            )

            ansible_vars = dict(
                vcsim=str(self._get_cloud_config('vcenter_hostname')),
                vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')),
                vcenter_username='user',
                vcenter_password='pass',
            )

        for key, value in ansible_vars.items():
            if key.endswith('_password'):
                display.sensitive.add(value)

        return CloudEnvironmentConfig(
            env_vars=env_vars,
            ansible_vars=ansible_vars,
            module_defaults={
                'group/vmware': {
                    'hostname': ansible_vars['vcenter_hostname'],
                    'username': ansible_vars['vcenter_username'],
                    'password': ansible_vars['vcenter_password'],
                    'port': ansible_vars.get('vcenter_port', '443'),
                    'validate_certs': ansible_vars.get('vmware_validate_certs', 'no'),
                },
            },
        )