summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/commands/integration/filters.py
blob: be03d7f42a8918822a257f9f58e29e0f1c571761 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
"""Logic for filtering out integration test targets which are unsupported for the currently provided arguments and available hosts."""
from __future__ import annotations

import abc
import typing as t

from ...config import (
    IntegrationConfig,
)

from ...util import (
    cache,
    detect_architecture,
    display,
    get_type_map,
)

from ...target import (
    IntegrationTarget,
)

from ...host_configs import (
    ControllerConfig,
    DockerConfig,
    FallbackReason,
    HostConfig,
    NetworkInventoryConfig,
    NetworkRemoteConfig,
    OriginConfig,
    PosixConfig,
    PosixRemoteConfig,
    PosixSshConfig,
    RemoteConfig,
    WindowsInventoryConfig,
    WindowsRemoteConfig,
)

from ...host_profiles import (
    HostProfile,
)

THostConfig = t.TypeVar('THostConfig', bound=HostConfig)
TPosixConfig = t.TypeVar('TPosixConfig', bound=PosixConfig)
TRemoteConfig = t.TypeVar('TRemoteConfig', bound=RemoteConfig)
THostProfile = t.TypeVar('THostProfile', bound=HostProfile)


class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta):
    """Base class for target filters."""
    def __init__(self, args: IntegrationConfig, configs: list[THostConfig], controller: bool) -> None:
        self.args = args
        self.configs = configs
        self.controller = controller
        self.host_type = 'controller' if controller else 'target'

        # values which are not host specific
        self.include_targets = args.include
        self.allow_root = args.allow_root
        self.allow_destructive = args.allow_destructive

    @property
    def config(self) -> THostConfig:
        """The configuration to filter. Only valid when there is a single config."""
        if len(self.configs) != 1:
            raise Exception()

        return self.configs[0]

    def skip(
        self,
        skip: str,
        reason: str,
        targets: list[IntegrationTarget],
        exclude: set[str],
        override: t.Optional[list[str]] = None,
    ) -> None:
        """Apply the specified skip rule to the given targets by updating the provided exclude list."""
        if skip.startswith('skip/'):
            skipped = [target.name for target in targets if skip in target.skips and (not override or target.name not in override)]
        else:
            skipped = [target.name for target in targets if f'{skip}/' in target.aliases and (not override or target.name not in override)]

        self.apply_skip(f'"{skip}"', reason, skipped, exclude)

    def apply_skip(self, marked: str, reason: str, skipped: list[str], exclude: set[str]) -> None:
        """Apply the provided skips to the given exclude list."""
        if not skipped:
            return

        exclude.update(skipped)
        display.warning(f'Excluding {self.host_type} tests marked {marked} {reason}: {", ".join(skipped)}')

    def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]:
        """Filter the list of profiles, returning only those which are not skipped for the given target."""
        del target
        return profiles

    def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
        """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
        if self.controller and self.args.host_settings.controller_fallback and targets:
            affected_targets = [target.name for target in targets]
            reason = self.args.host_settings.controller_fallback.reason

            if reason == FallbackReason.ENVIRONMENT:
                exclude.update(affected_targets)
                display.warning(f'Excluding {self.host_type} tests since a fallback controller is in use: {", ".join(affected_targets)}')
            elif reason == FallbackReason.PYTHON:
                display.warning(f'Some {self.host_type} tests may be redundant since a fallback python is in use: {", ".join(affected_targets)}')

        if not self.allow_destructive and not self.config.is_managed:
            override_destructive = set(target for target in self.include_targets if target.startswith('destructive/'))
            override = [target.name for target in targets if override_destructive & set(target.aliases)]

            self.skip('destructive', 'which require --allow-destructive or prefixing with "destructive/" to run on unmanaged hosts', targets, exclude, override)

        if not self.args.allow_disabled:
            override_disabled = set(target for target in self.args.include if target.startswith('disabled/'))
            override = [target.name for target in targets if override_disabled & set(target.aliases)]

            self.skip('disabled', 'which require --allow-disabled or prefixing with "disabled/"', targets, exclude, override)

        if not self.args.allow_unsupported:
            override_unsupported = set(target for target in self.args.include if target.startswith('unsupported/'))
            override = [target.name for target in targets if override_unsupported & set(target.aliases)]

            self.skip('unsupported', 'which require --allow-unsupported or prefixing with "unsupported/"', targets, exclude, override)

        if not self.args.allow_unstable:
            override_unstable = set(target for target in self.args.include if target.startswith('unstable/'))

            if self.args.allow_unstable_changed:
                override_unstable |= set(self.args.metadata.change_description.focused_targets or [])

            override = [target.name for target in targets if override_unstable & set(target.aliases)]

            self.skip('unstable', 'which require --allow-unstable or prefixing with "unstable/"', targets, exclude, override)


class PosixTargetFilter(TargetFilter[TPosixConfig]):
    """Target filter for POSIX hosts."""
    def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
        """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
        super().filter_targets(targets, exclude)

        if not self.allow_root and not self.config.have_root:
            self.skip('needs/root', 'which require --allow-root or running as root', targets, exclude)

        self.skip(f'skip/python{self.config.python.version}', f'which are not supported by Python {self.config.python.version}', targets, exclude)
        self.skip(f'skip/python{self.config.python.major_version}', f'which are not supported by Python {self.config.python.major_version}', targets, exclude)


class DockerTargetFilter(PosixTargetFilter[DockerConfig]):
    """Target filter for docker hosts."""
    def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
        """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
        super().filter_targets(targets, exclude)

        self.skip('skip/docker', 'which cannot run under docker', targets, exclude)

        if not self.config.privileged:
            self.skip('needs/privileged', 'which require --docker-privileged to run under docker', targets, exclude)


class PosixSshTargetFilter(PosixTargetFilter[PosixSshConfig]):
    """Target filter for POSIX SSH hosts."""


class RemoteTargetFilter(TargetFilter[TRemoteConfig]):
    """Target filter for remote Ansible Core CI managed hosts."""
    def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]:
        """Filter the list of profiles, returning only those which are not skipped for the given target."""
        profiles = super().filter_profiles(profiles, target)

        skipped_profiles = [profile for profile in profiles if any(skip in target.skips for skip in get_remote_skip_aliases(profile.config))]

        if skipped_profiles:
            configs: list[TRemoteConfig] = [profile.config for profile in skipped_profiles]
            display.warning(f'Excluding skipped hosts from inventory: {", ".join(config.name for config in configs)}')

        profiles = [profile for profile in profiles if profile not in skipped_profiles]

        return profiles

    def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
        """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
        super().filter_targets(targets, exclude)

        if len(self.configs) > 1:
            host_skips = {host.name: get_remote_skip_aliases(host) for host in self.configs}

            # Skip only targets which skip all hosts.
            # Targets that skip only some hosts will be handled during inventory generation.
            skipped = [target.name for target in targets if all(any(skip in target.skips for skip in skips) for skips in host_skips.values())]

            if skipped:
                exclude.update(skipped)
                display.warning(f'Excluding tests which do not support {", ".join(host_skips.keys())}: {", ".join(skipped)}')
        else:
            skips = get_remote_skip_aliases(self.config)

            for skip, reason in skips.items():
                self.skip(skip, reason, targets, exclude)


class PosixRemoteTargetFilter(PosixTargetFilter[PosixRemoteConfig], RemoteTargetFilter[PosixRemoteConfig]):
    """Target filter for POSIX remote hosts."""


class WindowsRemoteTargetFilter(RemoteTargetFilter[WindowsRemoteConfig]):
    """Target filter for remote Windows hosts."""


class WindowsInventoryTargetFilter(TargetFilter[WindowsInventoryConfig]):
    """Target filter for Windows inventory."""


class NetworkRemoteTargetFilter(RemoteTargetFilter[NetworkRemoteConfig]):
    """Target filter for remote network hosts."""


class NetworkInventoryTargetFilter(TargetFilter[NetworkInventoryConfig]):
    """Target filter for network inventory."""


class OriginTargetFilter(PosixTargetFilter[OriginConfig]):
    """Target filter for localhost."""
    def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
        """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
        super().filter_targets(targets, exclude)

        arch = detect_architecture(self.config.python.path)

        if arch:
            self.skip(f'skip/{arch}', f'which are not supported by {arch}', targets, exclude)


@cache
def get_host_target_type_map() -> dict[t.Type[HostConfig], t.Type[TargetFilter]]:
    """Create and return a mapping of HostConfig types to TargetFilter types."""
    return get_type_map(TargetFilter, HostConfig)


def get_target_filter(args: IntegrationConfig, configs: list[HostConfig], controller: bool) -> TargetFilter:
    """Return an integration test target filter instance for the provided host configurations."""
    target_type = type(configs[0])

    if issubclass(target_type, ControllerConfig):
        target_type = type(args.controller)
        configs = [args.controller]

    filter_type = get_host_target_type_map()[target_type]
    filter_instance = filter_type(args, configs, controller)

    return filter_instance


def get_remote_skip_aliases(config: RemoteConfig) -> dict[str, str]:
    """Return a dictionary of skip aliases and the reason why they apply."""
    return get_platform_skip_aliases(config.platform, config.version, config.arch)


def get_platform_skip_aliases(platform: str, version: str, arch: t.Optional[str]) -> dict[str, str]:
    """Return a dictionary of skip aliases and the reason why they apply."""
    skips = {
        f'skip/{platform}': platform,
        f'skip/{platform}/{version}': f'{platform} {version}',
        f'skip/{platform}{version}': f'{platform} {version}',  # legacy syntax, use above format
    }

    if arch:
        skips.update({
            f'skip/{arch}': arch,
            f'skip/{arch}/{platform}': f'{platform} on {arch}',
            f'skip/{arch}/{platform}/{version}': f'{platform} {version} on {arch}',
        })

    skips = {alias: f'which are not supported by {description}' for alias, description in skips.items()}

    return skips