summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/dev/container_probe.py
blob: be22e01c6057b55a043dfc5f0d26995155a06a23 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""Diagnostic utilities to probe container cgroup behavior during development and testing (both manual and integration)."""
from __future__ import annotations

import dataclasses
import enum
import json
import os
import pathlib
import pwd
import typing as t

from ..io import (
    read_text_file,
    write_text_file,
)

from ..util import (
    display,
    ANSIBLE_TEST_TARGET_ROOT,
)

from ..config import (
    EnvironmentConfig,
)

from ..docker_util import (
    LOGINUID_NOT_SET,
    docker_exec,
    get_docker_info,
    get_podman_remote,
    require_docker,
)

from ..host_configs import (
    DockerConfig,
)

from ..cgroup import (
    CGroupEntry,
    CGroupPath,
    MountEntry,
    MountType,
)


class CGroupState(enum.Enum):
    """The expected state of a cgroup related mount point."""
    HOST = enum.auto()
    PRIVATE = enum.auto()
    SHADOWED = enum.auto()


@dataclasses.dataclass(frozen=True)
class CGroupMount:
    """Details on a cgroup mount point that is expected to be present in the container."""
    path: str
    type: t.Optional[str]
    writable: t.Optional[bool]
    state: t.Optional[CGroupState]

    def __post_init__(self):
        assert pathlib.PurePosixPath(self.path).is_relative_to(CGroupPath.ROOT)

        if self.type is None:
            assert self.state is None
        elif self.type == MountType.TMPFS:
            assert self.writable is True
            assert self.state is None
        else:
            assert self.type in (MountType.CGROUP_V1, MountType.CGROUP_V2)
            assert self.state is not None


def check_container_cgroup_status(args: EnvironmentConfig, config: DockerConfig, container_name: str, expected_mounts: tuple[CGroupMount, ...]) -> None:
    """Check the running container to examine the state of the cgroup hierarchies."""
    cmd = ['sh', '-c', 'cat /proc/1/cgroup && echo && cat /proc/1/mountinfo']

    stdout = docker_exec(args, container_name, cmd, capture=True)[0]
    cgroups_stdout, mounts_stdout = stdout.split('\n\n')

    cgroups = CGroupEntry.loads(cgroups_stdout)
    mounts = MountEntry.loads(mounts_stdout)

    mounts = tuple(mount for mount in mounts if mount.path.is_relative_to(CGroupPath.ROOT))

    mount_cgroups: dict[MountEntry, CGroupEntry] = {}
    probe_paths: dict[pathlib.PurePosixPath, t.Optional[str]] = {}

    for cgroup in cgroups:
        if cgroup.subsystem:
            mount = ([mount for mount in mounts if
                      mount.type == MountType.CGROUP_V1 and
                      mount.path.is_relative_to(cgroup.root_path) and
                      cgroup.full_path.is_relative_to(mount.path)
                      ] or [None])[-1]
        else:
            mount = ([mount for mount in mounts if
                      mount.type == MountType.CGROUP_V2 and
                      mount.path == cgroup.root_path
                      ] or [None])[-1]

        if mount:
            mount_cgroups[mount] = cgroup

    for mount in mounts:
        probe_paths[mount.path] = None

        if (cgroup := mount_cgroups.get(mount)) and cgroup.full_path != mount.path:  # child of mount.path
            probe_paths[cgroup.full_path] = None

    probe_script = read_text_file(os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'probe_cgroups.py'))
    probe_command = [config.python.path, '-', f'{container_name}-probe'] + [str(path) for path in probe_paths]
    probe_results = json.loads(docker_exec(args, container_name, probe_command, capture=True, data=probe_script)[0])

    for path in probe_paths:
        probe_paths[path] = probe_results[str(path)]

    remaining_mounts: dict[pathlib.PurePosixPath, MountEntry] = {mount.path: mount for mount in mounts}
    results: dict[pathlib.PurePosixPath, tuple[bool, str]] = {}

    for expected_mount in expected_mounts:
        expected_path = pathlib.PurePosixPath(expected_mount.path)

        if not (actual_mount := remaining_mounts.pop(expected_path, None)):
            results[expected_path] = (False, 'not mounted')
            continue

        actual_mount_write_error = probe_paths[actual_mount.path]
        actual_mount_errors = []

        if cgroup := mount_cgroups.get(actual_mount):
            if expected_mount.state == CGroupState.SHADOWED:
                actual_mount_errors.append('unexpected cgroup association')

            if cgroup.root_path == cgroup.full_path and expected_mount.state == CGroupState.HOST:
                results[cgroup.root_path.joinpath('???')] = (False, 'missing cgroup')

            if cgroup.full_path == actual_mount.path:
                if cgroup.root_path != cgroup.full_path and expected_mount.state == CGroupState.PRIVATE:
                    actual_mount_errors.append('unexpected mount')
            else:
                cgroup_write_error = probe_paths[cgroup.full_path]
                cgroup_errors = []

                if expected_mount.state == CGroupState.SHADOWED:
                    cgroup_errors.append('unexpected cgroup association')

                if cgroup.root_path != cgroup.full_path and expected_mount.state == CGroupState.PRIVATE:
                    cgroup_errors.append('unexpected cgroup')

                if cgroup_write_error:
                    cgroup_errors.append(cgroup_write_error)

                if cgroup_errors:
                    results[cgroup.full_path] = (False, f'directory errors: {", ".join(cgroup_errors)}')
                else:
                    results[cgroup.full_path] = (True, 'directory (writable)')
        elif expected_mount.state not in (None, CGroupState.SHADOWED):
            actual_mount_errors.append('missing cgroup association')

        if actual_mount.type != expected_mount.type and expected_mount.type is not None:
            actual_mount_errors.append(f'type not {expected_mount.type}')

        if bool(actual_mount_write_error) == expected_mount.writable:
            actual_mount_errors.append(f'{actual_mount_write_error or "writable"}')

        if actual_mount_errors:
            results[actual_mount.path] = (False, f'{actual_mount.type} errors: {", ".join(actual_mount_errors)}')
        else:
            results[actual_mount.path] = (True, f'{actual_mount.type} ({actual_mount_write_error or "writable"})')

    for remaining_mount in remaining_mounts.values():
        remaining_mount_write_error = probe_paths[remaining_mount.path]

        results[remaining_mount.path] = (False, f'unexpected {remaining_mount.type} mount ({remaining_mount_write_error or "writable"})')

    identity = get_identity(args, config, container_name)
    messages: list[tuple[pathlib.PurePosixPath, bool, str]] = [(path, result[0], result[1]) for path, result in sorted(results.items())]
    message = '\n'.join(f'{"PASS" if result else "FAIL"}: {path} -> {message}' for path, result, message in messages)

    display.info(f'>>> Container: {identity}\n{message.rstrip()}')

    if args.dev_probe_cgroups:
        write_text_file(os.path.join(args.dev_probe_cgroups, f'{identity}.log'), message)


def get_identity(args: EnvironmentConfig, config: DockerConfig, container_name: str) -> str:
    """Generate and return an identity string to use when logging test results."""
    engine = require_docker().command

    try:
        loginuid = int(read_text_file('/proc/self/loginuid'))
    except FileNotFoundError:
        loginuid = LOGINUID_NOT_SET

    user = pwd.getpwuid(os.getuid()).pw_name
    login_user = user if loginuid == LOGINUID_NOT_SET else pwd.getpwuid(loginuid).pw_name
    remote = engine == 'podman' and get_podman_remote()

    tags = (
        config.name,
        engine,
        f'cgroup={config.cgroup.value}@{get_docker_info(args).cgroup_version}',
        f'remote={remote}',
        f'user={user}',
        f'loginuid={login_user}',
        container_name,
    )

    return '|'.join(tags)