1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
"""VMware vCenter plugin for integration tests."""
from __future__ import annotations
import configparser
import os
from ....util import (
ApplicationError,
display,
)
from ....config import (
IntegrationConfig,
)
from ....containers import (
CleanupMode,
run_support_container,
)
from . import (
CloudEnvironment,
CloudEnvironmentConfig,
CloudProvider,
)
class VcenterProvider(CloudProvider):
"""VMware vcenter/esx plugin. Sets up cloud resources for tests."""
DOCKER_SIMULATOR_NAME = 'vcenter-simulator'
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
# The simulator must be pinned to a specific version to guarantee CI passes with the version used.
if os.environ.get('ANSIBLE_VCSIM_CONTAINER'):
self.image = os.environ.get('ANSIBLE_VCSIM_CONTAINER')
else:
self.image = 'quay.io/ansible/vcenter-test-container:1.7.0'
# VMware tests can be run on govcsim or BYO with a static config file.
# The simulator is the default if no config is provided.
self.vmware_test_platform = os.environ.get('VMWARE_TEST_PLATFORM', 'govcsim')
if self.vmware_test_platform == 'govcsim':
self.uses_docker = True
self.uses_config = False
elif self.vmware_test_platform == 'static':
self.uses_docker = False
self.uses_config = True
def setup(self) -> None:
"""Setup the cloud resource before delegation and register a cleanup callback."""
super().setup()
self._set_cloud_config('vmware_test_platform', self.vmware_test_platform)
if self.vmware_test_platform == 'govcsim':
self._setup_dynamic_simulator()
self.managed = True
elif self.vmware_test_platform == 'static':
self._use_static_config()
self._setup_static()
else:
raise ApplicationError('Unknown vmware_test_platform: %s' % self.vmware_test_platform)
def _setup_dynamic_simulator(self) -> None:
"""Create a vcenter simulator using docker."""
ports = [
443,
8080,
8989,
5000, # control port for flask app in simulator
]
run_support_container(
self.args,
self.platform,
self.image,
self.DOCKER_SIMULATOR_NAME,
ports,
allow_existing=True,
cleanup=CleanupMode.YES,
)
self._set_cloud_config('vcenter_hostname', self.DOCKER_SIMULATOR_NAME)
def _setup_static(self) -> None:
if not os.path.exists(self.config_static_path):
raise ApplicationError('Configuration file does not exist: %s' % self.config_static_path)
class VcenterEnvironment(CloudEnvironment):
"""VMware vcenter/esx environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
try:
# We may be in a container, so we cannot just reach VMWARE_TEST_PLATFORM,
# We do a try/except instead
parser = configparser.ConfigParser()
parser.read(self.config_path) # static
env_vars = {}
ansible_vars = dict(
resource_prefix=self.resource_prefix,
)
ansible_vars.update(dict(parser.items('DEFAULT', raw=True)))
except KeyError: # govcsim
env_vars = dict(
VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')),
VCENTER_USERNAME='user',
VCENTER_PASSWORD='pass',
)
ansible_vars = dict(
vcsim=str(self._get_cloud_config('vcenter_hostname')),
vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')),
vcenter_username='user',
vcenter_password='pass',
)
for key, value in ansible_vars.items():
if key.endswith('_password'):
display.sensitive.add(value)
return CloudEnvironmentConfig(
env_vars=env_vars,
ansible_vars=ansible_vars,
module_defaults={
'group/vmware': {
'hostname': ansible_vars['vcenter_hostname'],
'username': ansible_vars['vcenter_username'],
'password': ansible_vars['vcenter_password'],
'port': ansible_vars.get('vcenter_port', '443'),
'validate_certs': ansible_vars.get('vmware_validate_certs', 'no'),
},
},
)
|