summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/dev/container_probe.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/dev/container_probe.py')
-rw-r--r--test/lib/ansible_test/_internal/dev/container_probe.py210
1 files changed, 210 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/dev/container_probe.py b/test/lib/ansible_test/_internal/dev/container_probe.py
new file mode 100644
index 0000000..be22e01
--- /dev/null
+++ b/test/lib/ansible_test/_internal/dev/container_probe.py
@@ -0,0 +1,210 @@
+"""Diagnostic utilities to probe container cgroup behavior during development and testing (both manual and integration)."""
+from __future__ import annotations
+
+import dataclasses
+import enum
+import json
+import os
+import pathlib
+import pwd
+import typing as t
+
+from ..io import (
+ read_text_file,
+ write_text_file,
+)
+
+from ..util import (
+ display,
+ ANSIBLE_TEST_TARGET_ROOT,
+)
+
+from ..config import (
+ EnvironmentConfig,
+)
+
+from ..docker_util import (
+ LOGINUID_NOT_SET,
+ docker_exec,
+ get_docker_info,
+ get_podman_remote,
+ require_docker,
+)
+
+from ..host_configs import (
+ DockerConfig,
+)
+
+from ..cgroup import (
+ CGroupEntry,
+ CGroupPath,
+ MountEntry,
+ MountType,
+)
+
+
+class CGroupState(enum.Enum):
+ """The expected state of a cgroup related mount point."""
+ HOST = enum.auto()
+ PRIVATE = enum.auto()
+ SHADOWED = enum.auto()
+
+
+@dataclasses.dataclass(frozen=True)
+class CGroupMount:
+ """Details on a cgroup mount point that is expected to be present in the container."""
+ path: str
+ type: t.Optional[str]
+ writable: t.Optional[bool]
+ state: t.Optional[CGroupState]
+
+ def __post_init__(self):
+ assert pathlib.PurePosixPath(self.path).is_relative_to(CGroupPath.ROOT)
+
+ if self.type is None:
+ assert self.state is None
+ elif self.type == MountType.TMPFS:
+ assert self.writable is True
+ assert self.state is None
+ else:
+ assert self.type in (MountType.CGROUP_V1, MountType.CGROUP_V2)
+ assert self.state is not None
+
+
+def check_container_cgroup_status(args: EnvironmentConfig, config: DockerConfig, container_name: str, expected_mounts: tuple[CGroupMount, ...]) -> None:
+ """Check the running container to examine the state of the cgroup hierarchies."""
+ cmd = ['sh', '-c', 'cat /proc/1/cgroup && echo && cat /proc/1/mountinfo']
+
+ stdout = docker_exec(args, container_name, cmd, capture=True)[0]
+ cgroups_stdout, mounts_stdout = stdout.split('\n\n')
+
+ cgroups = CGroupEntry.loads(cgroups_stdout)
+ mounts = MountEntry.loads(mounts_stdout)
+
+ mounts = tuple(mount for mount in mounts if mount.path.is_relative_to(CGroupPath.ROOT))
+
+ mount_cgroups: dict[MountEntry, CGroupEntry] = {}
+ probe_paths: dict[pathlib.PurePosixPath, t.Optional[str]] = {}
+
+ for cgroup in cgroups:
+ if cgroup.subsystem:
+ mount = ([mount for mount in mounts if
+ mount.type == MountType.CGROUP_V1 and
+ mount.path.is_relative_to(cgroup.root_path) and
+ cgroup.full_path.is_relative_to(mount.path)
+ ] or [None])[-1]
+ else:
+ mount = ([mount for mount in mounts if
+ mount.type == MountType.CGROUP_V2 and
+ mount.path == cgroup.root_path
+ ] or [None])[-1]
+
+ if mount:
+ mount_cgroups[mount] = cgroup
+
+ for mount in mounts:
+ probe_paths[mount.path] = None
+
+ if (cgroup := mount_cgroups.get(mount)) and cgroup.full_path != mount.path: # child of mount.path
+ probe_paths[cgroup.full_path] = None
+
+ probe_script = read_text_file(os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'probe_cgroups.py'))
+ probe_command = [config.python.path, '-', f'{container_name}-probe'] + [str(path) for path in probe_paths]
+ probe_results = json.loads(docker_exec(args, container_name, probe_command, capture=True, data=probe_script)[0])
+
+ for path in probe_paths:
+ probe_paths[path] = probe_results[str(path)]
+
+ remaining_mounts: dict[pathlib.PurePosixPath, MountEntry] = {mount.path: mount for mount in mounts}
+ results: dict[pathlib.PurePosixPath, tuple[bool, str]] = {}
+
+ for expected_mount in expected_mounts:
+ expected_path = pathlib.PurePosixPath(expected_mount.path)
+
+ if not (actual_mount := remaining_mounts.pop(expected_path, None)):
+ results[expected_path] = (False, 'not mounted')
+ continue
+
+ actual_mount_write_error = probe_paths[actual_mount.path]
+ actual_mount_errors = []
+
+ if cgroup := mount_cgroups.get(actual_mount):
+ if expected_mount.state == CGroupState.SHADOWED:
+ actual_mount_errors.append('unexpected cgroup association')
+
+ if cgroup.root_path == cgroup.full_path and expected_mount.state == CGroupState.HOST:
+ results[cgroup.root_path.joinpath('???')] = (False, 'missing cgroup')
+
+ if cgroup.full_path == actual_mount.path:
+ if cgroup.root_path != cgroup.full_path and expected_mount.state == CGroupState.PRIVATE:
+ actual_mount_errors.append('unexpected mount')
+ else:
+ cgroup_write_error = probe_paths[cgroup.full_path]
+ cgroup_errors = []
+
+ if expected_mount.state == CGroupState.SHADOWED:
+ cgroup_errors.append('unexpected cgroup association')
+
+ if cgroup.root_path != cgroup.full_path and expected_mount.state == CGroupState.PRIVATE:
+ cgroup_errors.append('unexpected cgroup')
+
+ if cgroup_write_error:
+ cgroup_errors.append(cgroup_write_error)
+
+ if cgroup_errors:
+ results[cgroup.full_path] = (False, f'directory errors: {", ".join(cgroup_errors)}')
+ else:
+ results[cgroup.full_path] = (True, 'directory (writable)')
+ elif expected_mount.state not in (None, CGroupState.SHADOWED):
+ actual_mount_errors.append('missing cgroup association')
+
+ if actual_mount.type != expected_mount.type and expected_mount.type is not None:
+ actual_mount_errors.append(f'type not {expected_mount.type}')
+
+ if bool(actual_mount_write_error) == expected_mount.writable:
+ actual_mount_errors.append(f'{actual_mount_write_error or "writable"}')
+
+ if actual_mount_errors:
+ results[actual_mount.path] = (False, f'{actual_mount.type} errors: {", ".join(actual_mount_errors)}')
+ else:
+ results[actual_mount.path] = (True, f'{actual_mount.type} ({actual_mount_write_error or "writable"})')
+
+ for remaining_mount in remaining_mounts.values():
+ remaining_mount_write_error = probe_paths[remaining_mount.path]
+
+ results[remaining_mount.path] = (False, f'unexpected {remaining_mount.type} mount ({remaining_mount_write_error or "writable"})')
+
+ identity = get_identity(args, config, container_name)
+ messages: list[tuple[pathlib.PurePosixPath, bool, str]] = [(path, result[0], result[1]) for path, result in sorted(results.items())]
+ message = '\n'.join(f'{"PASS" if result else "FAIL"}: {path} -> {message}' for path, result, message in messages)
+
+ display.info(f'>>> Container: {identity}\n{message.rstrip()}')
+
+ if args.dev_probe_cgroups:
+ write_text_file(os.path.join(args.dev_probe_cgroups, f'{identity}.log'), message)
+
+
+def get_identity(args: EnvironmentConfig, config: DockerConfig, container_name: str) -> str:
+ """Generate and return an identity string to use when logging test results."""
+ engine = require_docker().command
+
+ try:
+ loginuid = int(read_text_file('/proc/self/loginuid'))
+ except FileNotFoundError:
+ loginuid = LOGINUID_NOT_SET
+
+ user = pwd.getpwuid(os.getuid()).pw_name
+ login_user = user if loginuid == LOGINUID_NOT_SET else pwd.getpwuid(loginuid).pw_name
+ remote = engine == 'podman' and get_podman_remote()
+
+ tags = (
+ config.name,
+ engine,
+ f'cgroup={config.cgroup.value}@{get_docker_info(args).cgroup_version}',
+ f'remote={remote}',
+ f'user={user}',
+ f'loginuid={login_user}',
+ container_name,
+ )
+
+ return '|'.join(tags)