summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/classification
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_internal/classification')
-rw-r--r--test/lib/ansible_test/_internal/classification/__init__.py900
-rw-r--r--test/lib/ansible_test/_internal/classification/common.py26
-rw-r--r--test/lib/ansible_test/_internal/classification/csharp.py97
-rw-r--r--test/lib/ansible_test/_internal/classification/powershell.py98
-rw-r--r--test/lib/ansible_test/_internal/classification/python.py341
5 files changed, 1462 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/classification/__init__.py b/test/lib/ansible_test/_internal/classification/__init__.py
new file mode 100644
index 0000000..aacc2ca
--- /dev/null
+++ b/test/lib/ansible_test/_internal/classification/__init__.py
@@ -0,0 +1,900 @@
+"""Classify changes in Ansible code."""
+from __future__ import annotations
+
+import collections
+import os
+import re
+import time
+import typing as t
+
+from ..target import (
+ walk_module_targets,
+ walk_integration_targets,
+ walk_units_targets,
+ walk_compile_targets,
+ walk_sanity_targets,
+ load_integration_prefixes,
+ analyze_integration_target_dependencies,
+ IntegrationTarget,
+)
+
+from ..util import (
+ display,
+ is_subdir,
+)
+
+from .python import (
+ get_python_module_utils_imports,
+ get_python_module_utils_name,
+)
+
+from .csharp import (
+ get_csharp_module_utils_imports,
+ get_csharp_module_utils_name,
+)
+
+from .powershell import (
+ get_powershell_module_utils_imports,
+ get_powershell_module_utils_name,
+)
+
+from ..config import (
+ TestConfig,
+ IntegrationConfig,
+)
+
+from ..metadata import (
+ ChangeDescription,
+)
+
+from ..data import (
+ data_context,
+)
+
+FOCUSED_TARGET = '__focused__'
+
+
+def categorize_changes(args: TestConfig, paths: list[str], verbose_command: t.Optional[str] = None) -> ChangeDescription:
+ """Categorize the given list of changed paths and return a description of the changes."""
+ mapper = PathMapper(args)
+
+ commands: dict[str, set[str]] = {
+ 'sanity': set(),
+ 'units': set(),
+ 'integration': set(),
+ 'windows-integration': set(),
+ 'network-integration': set(),
+ }
+
+ focused_commands = collections.defaultdict(set)
+
+ deleted_paths: set[str] = set()
+ original_paths: set[str] = set()
+ additional_paths: set[str] = set()
+ no_integration_paths: set[str] = set()
+
+ for path in paths:
+ if not os.path.exists(path):
+ deleted_paths.add(path)
+ continue
+
+ original_paths.add(path)
+
+ dependent_paths = mapper.get_dependent_paths(path)
+
+ if not dependent_paths:
+ continue
+
+ display.info('Expanded "%s" to %d dependent file(s):' % (path, len(dependent_paths)), verbosity=2)
+
+ for dependent_path in dependent_paths:
+ display.info(dependent_path, verbosity=2)
+ additional_paths.add(dependent_path)
+
+ additional_paths -= set(paths) # don't count changed paths as additional paths
+
+ if additional_paths:
+ display.info('Expanded %d changed file(s) into %d additional dependent file(s).' % (len(paths), len(additional_paths)))
+ paths = sorted(set(paths) | additional_paths)
+
+ display.info('Mapping %d changed file(s) to tests.' % len(paths))
+
+ none_count = 0
+
+ for path in paths:
+ tests = mapper.classify(path)
+
+ if tests is None:
+ focused_target = False
+
+ display.info('%s -> all' % path, verbosity=1)
+ tests = all_tests(args) # not categorized, run all tests
+ display.warning('Path not categorized: %s' % path)
+ else:
+ focused_target = bool(tests.pop(FOCUSED_TARGET, None)) and path in original_paths
+
+ tests = dict((key, value) for key, value in tests.items() if value)
+
+ if focused_target and not any('integration' in command for command in tests):
+ no_integration_paths.add(path) # path triggers no integration tests
+
+ if verbose_command:
+ result = '%s: %s' % (verbose_command, tests.get(verbose_command) or 'none')
+
+ # identify targeted integration tests (those which only target a single integration command)
+ if 'integration' in verbose_command and tests.get(verbose_command):
+ if not any('integration' in command for command in tests if command != verbose_command):
+ if focused_target:
+ result += ' (focused)'
+
+ result += ' (targeted)'
+ else:
+ result = '%s' % tests
+
+ if not tests.get(verbose_command):
+ # minimize excessive output from potentially thousands of files which do not trigger tests
+ none_count += 1
+ verbosity = 2
+ else:
+ verbosity = 1
+
+ if args.verbosity >= verbosity:
+ display.info('%s -> %s' % (path, result), verbosity=1)
+
+ for command, target in tests.items():
+ commands[command].add(target)
+
+ if focused_target:
+ focused_commands[command].add(target)
+
+ if none_count > 0 and args.verbosity < 2:
+ display.notice('Omitted %d file(s) that triggered no tests.' % none_count)
+
+ for command, targets in commands.items():
+ targets.discard('none')
+
+ if any(target == 'all' for target in targets):
+ commands[command] = {'all'}
+
+ sorted_commands = dict((cmd, sorted(targets)) for cmd, targets in commands.items() if targets)
+ focused_commands = dict((cmd, sorted(targets)) for cmd, targets in focused_commands.items())
+
+ for command, targets in sorted_commands.items():
+ if targets == ['all']:
+ sorted_commands[command] = [] # changes require testing all targets, do not filter targets
+
+ changes = ChangeDescription()
+ changes.command = verbose_command
+ changes.changed_paths = sorted(original_paths)
+ changes.deleted_paths = sorted(deleted_paths)
+ changes.regular_command_targets = sorted_commands
+ changes.focused_command_targets = focused_commands
+ changes.no_integration_paths = sorted(no_integration_paths)
+
+ return changes
+
+
+class PathMapper:
+ """Map file paths to test commands and targets."""
+ def __init__(self, args: TestConfig) -> None:
+ self.args = args
+ self.integration_all_target = get_integration_all_target(self.args)
+
+ self.integration_targets = list(walk_integration_targets())
+ self.module_targets = list(walk_module_targets())
+ self.compile_targets = list(walk_compile_targets())
+ self.units_targets = list(walk_units_targets())
+ self.sanity_targets = list(walk_sanity_targets())
+ self.powershell_targets = [target for target in self.sanity_targets if os.path.splitext(target.path)[1] in ('.ps1', '.psm1')]
+ self.csharp_targets = [target for target in self.sanity_targets if os.path.splitext(target.path)[1] == '.cs']
+
+ self.units_modules = set(target.module for target in self.units_targets if target.module)
+ self.units_paths = set(a for target in self.units_targets for a in target.aliases)
+ self.sanity_paths = set(target.path for target in self.sanity_targets)
+
+ self.module_names_by_path = dict((target.path, target.module) for target in self.module_targets)
+ self.integration_targets_by_name = dict((target.name, target) for target in self.integration_targets)
+ self.integration_targets_by_alias = dict((a, target) for target in self.integration_targets for a in target.aliases)
+
+ self.posix_integration_by_module = dict((m, target.name) for target in self.integration_targets
+ if 'posix/' in target.aliases for m in target.modules)
+ self.windows_integration_by_module = dict((m, target.name) for target in self.integration_targets
+ if 'windows/' in target.aliases for m in target.modules)
+ self.network_integration_by_module = dict((m, target.name) for target in self.integration_targets
+ if 'network/' in target.aliases for m in target.modules)
+
+ self.prefixes = load_integration_prefixes()
+ self.integration_dependencies = analyze_integration_target_dependencies(self.integration_targets)
+
+ self.python_module_utils_imports: dict[str, set[str]] = {} # populated on first use to reduce overhead when not needed
+ self.powershell_module_utils_imports: dict[str, set[str]] = {} # populated on first use to reduce overhead when not needed
+ self.csharp_module_utils_imports: dict[str, set[str]] = {} # populated on first use to reduce overhead when not needed
+
+ self.paths_to_dependent_targets: dict[str, set[IntegrationTarget]] = {}
+
+ for target in self.integration_targets:
+ for path in target.needs_file:
+ if path not in self.paths_to_dependent_targets:
+ self.paths_to_dependent_targets[path] = set()
+
+ self.paths_to_dependent_targets[path].add(target)
+
+ def get_dependent_paths(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path, recursively expanding dependent paths as well."""
+ unprocessed_paths = set(self.get_dependent_paths_non_recursive(path))
+ paths = set()
+
+ while unprocessed_paths:
+ queued_paths = list(unprocessed_paths)
+ paths |= unprocessed_paths
+ unprocessed_paths = set()
+
+ for queued_path in queued_paths:
+ new_paths = self.get_dependent_paths_non_recursive(queued_path)
+
+ for new_path in new_paths:
+ if new_path not in paths:
+ unprocessed_paths.add(new_path)
+
+ return sorted(paths)
+
+ def get_dependent_paths_non_recursive(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path, including dependent integration test target paths."""
+ paths = self.get_dependent_paths_internal(path)
+ paths += [target.path + '/' for target in self.paths_to_dependent_targets.get(path, set())]
+ paths = sorted(set(paths))
+
+ return paths
+
+ def get_dependent_paths_internal(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path."""
+ ext = os.path.splitext(os.path.split(path)[1])[1]
+
+ if is_subdir(path, data_context().content.module_utils_path):
+ if ext == '.py':
+ return self.get_python_module_utils_usage(path)
+
+ if ext == '.psm1':
+ return self.get_powershell_module_utils_usage(path)
+
+ if ext == '.cs':
+ return self.get_csharp_module_utils_usage(path)
+
+ if is_subdir(path, data_context().content.integration_targets_path):
+ return self.get_integration_target_usage(path)
+
+ return []
+
+ def get_python_module_utils_usage(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path which is a Python module_utils file."""
+ if not self.python_module_utils_imports:
+ display.info('Analyzing python module_utils imports...')
+ before = time.time()
+ self.python_module_utils_imports = get_python_module_utils_imports(self.compile_targets)
+ after = time.time()
+ display.info('Processed %d python module_utils in %d second(s).' % (len(self.python_module_utils_imports), after - before))
+
+ name = get_python_module_utils_name(path)
+
+ return sorted(self.python_module_utils_imports[name])
+
+ def get_powershell_module_utils_usage(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path which is a PowerShell module_utils file."""
+ if not self.powershell_module_utils_imports:
+ display.info('Analyzing powershell module_utils imports...')
+ before = time.time()
+ self.powershell_module_utils_imports = get_powershell_module_utils_imports(self.powershell_targets)
+ after = time.time()
+ display.info('Processed %d powershell module_utils in %d second(s).' % (len(self.powershell_module_utils_imports), after - before))
+
+ name = get_powershell_module_utils_name(path)
+
+ return sorted(self.powershell_module_utils_imports[name])
+
+ def get_csharp_module_utils_usage(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path which is a C# module_utils file."""
+ if not self.csharp_module_utils_imports:
+ display.info('Analyzing C# module_utils imports...')
+ before = time.time()
+ self.csharp_module_utils_imports = get_csharp_module_utils_imports(self.powershell_targets, self.csharp_targets)
+ after = time.time()
+ display.info('Processed %d C# module_utils in %d second(s).' % (len(self.csharp_module_utils_imports), after - before))
+
+ name = get_csharp_module_utils_name(path)
+
+ return sorted(self.csharp_module_utils_imports[name])
+
+ def get_integration_target_usage(self, path: str) -> list[str]:
+ """Return a list of paths which depend on the given path which is an integration target file."""
+ target_name = path.split('/')[3]
+ dependents = [os.path.join(data_context().content.integration_targets_path, target) + os.path.sep
+ for target in sorted(self.integration_dependencies.get(target_name, set()))]
+
+ return dependents
+
+ def classify(self, path: str) -> t.Optional[dict[str, str]]:
+ """Classify the given path and return an optional dictionary of the results."""
+ result = self._classify(path)
+
+ # run all tests when no result given
+ if result is None:
+ return None
+
+ # run sanity on path unless result specified otherwise
+ if path in self.sanity_paths and 'sanity' not in result:
+ result['sanity'] = path
+
+ return result
+
+ def _classify(self, path: str) -> t.Optional[dict[str, str]]:
+ """Return the classification for the given path."""
+ if data_context().content.is_ansible:
+ return self._classify_ansible(path)
+
+ if data_context().content.collection:
+ return self._classify_collection(path)
+
+ return None
+
+ def _classify_common(self, path: str) -> t.Optional[dict[str, str]]:
+ """Return the classification for the given path using rules common to all layouts."""
+ dirname = os.path.dirname(path)
+ filename = os.path.basename(path)
+ name, ext = os.path.splitext(filename)
+
+ minimal: dict[str, str] = {}
+
+ if os.path.sep not in path:
+ if filename in (
+ 'azure-pipelines.yml',
+ ):
+ return all_tests(self.args) # test infrastructure, run all tests
+
+ if is_subdir(path, '.azure-pipelines'):
+ return all_tests(self.args) # test infrastructure, run all tests
+
+ if is_subdir(path, '.github'):
+ return minimal
+
+ if is_subdir(path, data_context().content.integration_targets_path):
+ if not os.path.exists(path):
+ return minimal
+
+ target = self.integration_targets_by_name.get(path.split('/')[3])
+
+ if not target:
+ display.warning('Unexpected non-target found: %s' % path)
+ return minimal
+
+ if 'hidden/' in target.aliases:
+ return minimal # already expanded using get_dependent_paths
+
+ return {
+ 'integration': target.name if 'posix/' in target.aliases else None,
+ 'windows-integration': target.name if 'windows/' in target.aliases else None,
+ 'network-integration': target.name if 'network/' in target.aliases else None,
+ FOCUSED_TARGET: target.name,
+ }
+
+ if is_subdir(path, data_context().content.integration_path):
+ if dirname == data_context().content.integration_path:
+ for command in (
+ 'integration',
+ 'windows-integration',
+ 'network-integration',
+ ):
+ if name == command and ext == '.cfg':
+ return {
+ command: self.integration_all_target,
+ }
+
+ if name == command + '.requirements' and ext == '.txt':
+ return {
+ command: self.integration_all_target,
+ }
+
+ return {
+ 'integration': self.integration_all_target,
+ 'windows-integration': self.integration_all_target,
+ 'network-integration': self.integration_all_target,
+ }
+
+ if is_subdir(path, data_context().content.sanity_path):
+ return {
+ 'sanity': 'all', # test infrastructure, run all sanity checks
+ }
+
+ if is_subdir(path, data_context().content.unit_path):
+ if path in self.units_paths:
+ return {
+ 'units': path,
+ }
+
+ # changes to files which are not unit tests should trigger tests from the nearest parent directory
+
+ test_path = os.path.dirname(path)
+
+ while test_path:
+ if test_path + '/' in self.units_paths:
+ return {
+ 'units': test_path + '/',
+ }
+
+ test_path = os.path.dirname(test_path)
+
+ if is_subdir(path, data_context().content.module_path):
+ module_name = self.module_names_by_path.get(path)
+
+ if module_name:
+ return {
+ 'units': module_name if module_name in self.units_modules else None,
+ 'integration': self.posix_integration_by_module.get(module_name) if ext == '.py' else None,
+ 'windows-integration': self.windows_integration_by_module.get(module_name) if ext in ['.cs', '.ps1'] else None,
+ 'network-integration': self.network_integration_by_module.get(module_name),
+ FOCUSED_TARGET: module_name,
+ }
+
+ return minimal
+
+ if is_subdir(path, data_context().content.module_utils_path):
+ if ext == '.cs':
+ return minimal # already expanded using get_dependent_paths
+
+ if ext == '.psm1':
+ return minimal # already expanded using get_dependent_paths
+
+ if ext == '.py':
+ return minimal # already expanded using get_dependent_paths
+
+ if is_subdir(path, data_context().content.plugin_paths['action']):
+ if ext == '.py':
+ if name.startswith('net_'):
+ network_target = 'network/.*_%s' % name[4:]
+
+ if any(re.search(r'^%s$' % network_target, alias) for alias in self.integration_targets_by_alias):
+ return {
+ 'network-integration': network_target,
+ 'units': 'all',
+ }
+
+ return {
+ 'network-integration': self.integration_all_target,
+ 'units': 'all',
+ }
+
+ if self.prefixes.get(name) == 'network':
+ network_platform = name
+ elif name.endswith('_config') and self.prefixes.get(name[:-7]) == 'network':
+ network_platform = name[:-7]
+ elif name.endswith('_template') and self.prefixes.get(name[:-9]) == 'network':
+ network_platform = name[:-9]
+ else:
+ network_platform = None
+
+ if network_platform:
+ network_target = 'network/%s/' % network_platform
+
+ if network_target in self.integration_targets_by_alias:
+ return {
+ 'network-integration': network_target,
+ 'units': 'all',
+ }
+
+ display.warning('Integration tests for "%s" not found.' % network_target, unique=True)
+
+ return {
+ 'units': 'all',
+ }
+
+ if is_subdir(path, data_context().content.plugin_paths['connection']):
+ units_dir = os.path.join(data_context().content.unit_path, 'plugins', 'connection')
+ if name == '__init__':
+ return {
+ 'integration': self.integration_all_target,
+ 'windows-integration': self.integration_all_target,
+ 'network-integration': self.integration_all_target,
+ 'units': os.path.join(units_dir, ''),
+ }
+
+ units_path = os.path.join(units_dir, 'test_%s.py' % name)
+
+ if units_path not in self.units_paths:
+ units_path = None
+
+ integration_name = 'connection_%s' % name
+
+ if integration_name not in self.integration_targets_by_name:
+ integration_name = None
+
+ windows_integration_name = 'connection_windows_%s' % name
+
+ if windows_integration_name not in self.integration_targets_by_name:
+ windows_integration_name = None
+
+ # entire integration test commands depend on these connection plugins
+
+ if name in ['winrm', 'psrp']:
+ return {
+ 'windows-integration': self.integration_all_target,
+ 'units': units_path,
+ }
+
+ if name == 'local':
+ return {
+ 'integration': self.integration_all_target,
+ 'network-integration': self.integration_all_target,
+ 'units': units_path,
+ }
+
+ if name == 'network_cli':
+ return {
+ 'network-integration': self.integration_all_target,
+ 'units': units_path,
+ }
+
+ if name == 'paramiko_ssh':
+ return {
+ 'integration': integration_name,
+ 'network-integration': self.integration_all_target,
+ 'units': units_path,
+ }
+
+ # other connection plugins have isolated integration and unit tests
+
+ return {
+ 'integration': integration_name,
+ 'windows-integration': windows_integration_name,
+ 'units': units_path,
+ }
+
+ if is_subdir(path, data_context().content.plugin_paths['doc_fragments']):
+ return {
+ 'sanity': 'all',
+ }
+
+ if is_subdir(path, data_context().content.plugin_paths['inventory']):
+ if name == '__init__':
+ return all_tests(self.args) # broad impact, run all tests
+
+ # These inventory plugins are enabled by default (see INVENTORY_ENABLED).
+ # Without dedicated integration tests for these we must rely on the incidental coverage from other tests.
+ test_all = [
+ 'host_list',
+ 'script',
+ 'yaml',
+ 'ini',
+ 'auto',
+ ]
+
+ if name in test_all:
+ posix_integration_fallback = get_integration_all_target(self.args)
+ else:
+ posix_integration_fallback = None
+
+ target = self.integration_targets_by_name.get('inventory_%s' % name)
+ units_dir = os.path.join(data_context().content.unit_path, 'plugins', 'inventory')
+ units_path = os.path.join(units_dir, 'test_%s.py' % name)
+
+ if units_path not in self.units_paths:
+ units_path = None
+
+ return {
+ 'integration': target.name if target and 'posix/' in target.aliases else posix_integration_fallback,
+ 'windows-integration': target.name if target and 'windows/' in target.aliases else None,
+ 'network-integration': target.name if target and 'network/' in target.aliases else None,
+ 'units': units_path,
+ FOCUSED_TARGET: target.name if target else None,
+ }
+
+ if is_subdir(path, data_context().content.plugin_paths['filter']):
+ return self._simple_plugin_tests('filter', name)
+
+ if is_subdir(path, data_context().content.plugin_paths['lookup']):
+ return self._simple_plugin_tests('lookup', name)
+
+ if (is_subdir(path, data_context().content.plugin_paths['terminal']) or
+ is_subdir(path, data_context().content.plugin_paths['cliconf']) or
+ is_subdir(path, data_context().content.plugin_paths['netconf'])):
+ if ext == '.py':
+ if name in self.prefixes and self.prefixes[name] == 'network':
+ network_target = 'network/%s/' % name
+
+ if network_target in self.integration_targets_by_alias:
+ return {
+ 'network-integration': network_target,
+ 'units': 'all',
+ }
+
+ display.warning('Integration tests for "%s" not found.' % network_target, unique=True)
+
+ return {
+ 'units': 'all',
+ }
+
+ return {
+ 'network-integration': self.integration_all_target,
+ 'units': 'all',
+ }
+
+ if is_subdir(path, data_context().content.plugin_paths['test']):
+ return self._simple_plugin_tests('test', name)
+
+ return None
+
+ def _classify_collection(self, path: str) -> t.Optional[dict[str, str]]:
+ """Return the classification for the given path using rules specific to collections."""
+ result = self._classify_common(path)
+
+ if result is not None:
+ return result
+
+ filename = os.path.basename(path)
+ dummy, ext = os.path.splitext(filename)
+
+ minimal: dict[str, str] = {}
+
+ if path.startswith('changelogs/'):
+ return minimal
+
+ if path.startswith('docs/'):
+ return minimal
+
+ if '/' not in path:
+ if path in (
+ '.gitignore',
+ 'COPYING',
+ 'LICENSE',
+ 'Makefile',
+ ):
+ return minimal
+
+ if ext in (
+ '.in',
+ '.md',
+ '.rst',
+ '.toml',
+ '.txt',
+ ):
+ return minimal
+
+ return None
+
+ def _classify_ansible(self, path: str) -> t.Optional[dict[str, str]]:
+ """Return the classification for the given path using rules specific to Ansible."""
+ if path.startswith('test/units/compat/'):
+ return {
+ 'units': 'test/units/',
+ }
+
+ result = self._classify_common(path)
+
+ if result is not None:
+ return result
+
+ dirname = os.path.dirname(path)
+ filename = os.path.basename(path)
+ name, ext = os.path.splitext(filename)
+
+ minimal: dict[str, str] = {}
+
+ if path.startswith('bin/'):
+ return all_tests(self.args) # broad impact, run all tests
+
+ if path.startswith('changelogs/'):
+ return minimal
+
+ if path.startswith('docs/'):
+ return minimal
+
+ if path.startswith('examples/'):
+ if path == 'examples/scripts/ConfigureRemotingForAnsible.ps1':
+ return {
+ 'windows-integration': 'connection_winrm',
+ }
+
+ return minimal
+
+ if path.startswith('hacking/'):
+ return minimal
+
+ if path.startswith('lib/ansible/executor/powershell/'):
+ units_path = 'test/units/executor/powershell/'
+
+ if units_path not in self.units_paths:
+ units_path = None
+
+ return {
+ 'windows-integration': self.integration_all_target,
+ 'units': units_path,
+ }
+
+ if path.startswith('lib/ansible/'):
+ return all_tests(self.args) # broad impact, run all tests
+
+ if path.startswith('licenses/'):
+ return minimal
+
+ if path.startswith('packaging/'):
+ return minimal
+
+ if path.startswith('test/ansible_test/'):
+ return minimal # these tests are not invoked from ansible-test
+
+ if path.startswith('test/lib/ansible_test/config/'):
+ if name.startswith('cloud-config-'):
+ cloud_target = 'cloud/%s/' % name.split('-')[2].split('.')[0]
+
+ if cloud_target in self.integration_targets_by_alias:
+ return {
+ 'integration': cloud_target,
+ }
+
+ if path.startswith('test/lib/ansible_test/_data/completion/'):
+ if path == 'test/lib/ansible_test/_data/completion/docker.txt':
+ return all_tests(self.args, force=True) # force all tests due to risk of breaking changes in new test environment
+
+ if path.startswith('test/lib/ansible_test/_internal/commands/integration/cloud/'):
+ cloud_target = 'cloud/%s/' % name
+
+ if cloud_target in self.integration_targets_by_alias:
+ return {
+ 'integration': cloud_target,
+ }
+
+ return all_tests(self.args) # test infrastructure, run all tests
+
+ if path.startswith('test/lib/ansible_test/_internal/commands/sanity/'):
+ return {
+ 'sanity': 'all', # test infrastructure, run all sanity checks
+ 'integration': 'ansible-test/', # run ansible-test self tests
+ }
+
+ if path.startswith('test/lib/ansible_test/_internal/commands/units/'):
+ return {
+ 'units': 'all', # test infrastructure, run all unit tests
+ 'integration': 'ansible-test/', # run ansible-test self tests
+ }
+
+ if path.startswith('test/lib/ansible_test/_data/requirements/'):
+ if name in (
+ 'integration',
+ 'network-integration',
+ 'windows-integration',
+ ):
+ return {
+ name: self.integration_all_target,
+ }
+
+ if name in (
+ 'sanity',
+ 'units',
+ ):
+ return {
+ name: 'all',
+ }
+
+ if path.startswith('test/lib/ansible_test/_util/controller/sanity/') or path.startswith('test/lib/ansible_test/_util/target/sanity/'):
+ return {
+ 'sanity': 'all', # test infrastructure, run all sanity checks
+ 'integration': 'ansible-test/', # run ansible-test self tests
+ }
+
+ if path.startswith('test/lib/ansible_test/_util/target/pytest/'):
+ return {
+ 'units': 'all', # test infrastructure, run all unit tests
+ 'integration': 'ansible-test/', # run ansible-test self tests
+ }
+
+ if path.startswith('test/lib/'):
+ return all_tests(self.args) # test infrastructure, run all tests
+
+ if path.startswith('test/support/'):
+ return all_tests(self.args) # test infrastructure, run all tests
+
+ if path.startswith('test/utils/shippable/'):
+ if dirname == 'test/utils/shippable':
+ test_map = {
+ 'cloud.sh': 'integration:cloud/',
+ 'linux.sh': 'integration:all',
+ 'network.sh': 'network-integration:all',
+ 'remote.sh': 'integration:all',
+ 'sanity.sh': 'sanity:all',
+ 'units.sh': 'units:all',
+ 'windows.sh': 'windows-integration:all',
+ }
+
+ test_match = test_map.get(filename)
+
+ if test_match:
+ test_command, test_target = test_match.split(':')
+
+ return {
+ test_command: test_target,
+ }
+
+ cloud_target = 'cloud/%s/' % name
+
+ if cloud_target in self.integration_targets_by_alias:
+ return {
+ 'integration': cloud_target,
+ }
+
+ return all_tests(self.args) # test infrastructure, run all tests
+
+ if path.startswith('test/utils/'):
+ return minimal
+
+ if '/' not in path:
+ if path in (
+ '.gitattributes',
+ '.gitignore',
+ '.mailmap',
+ 'COPYING',
+ 'Makefile',
+ ):
+ return minimal
+
+ if path in (
+ 'setup.py',
+ ):
+ return all_tests(self.args) # broad impact, run all tests
+
+ if ext in (
+ '.in',
+ '.md',
+ '.rst',
+ '.toml',
+ '.txt',
+ ):
+ return minimal
+
+ return None # unknown, will result in fall-back to run all tests
+
+ def _simple_plugin_tests(self, plugin_type: str, plugin_name: str) -> dict[str, t.Optional[str]]:
+ """
+ Return tests for the given plugin type and plugin name.
+ This function is useful for plugin types which do not require special processing.
+ """
+ if plugin_name == '__init__':
+ return all_tests(self.args, True)
+
+ integration_target = self.integration_targets_by_name.get('%s_%s' % (plugin_type, plugin_name))
+
+ if integration_target:
+ integration_name = integration_target.name
+ else:
+ integration_name = None
+
+ units_path = os.path.join(data_context().content.unit_path, 'plugins', plugin_type, 'test_%s.py' % plugin_name)
+
+ if units_path not in self.units_paths:
+ units_path = None
+
+ return dict(
+ integration=integration_name,
+ units=units_path,
+ )
+
+
+def all_tests(args: TestConfig, force: bool = False) -> dict[str, str]:
+ """Return the targets for each test command when all tests should be run."""
+ if force:
+ integration_all_target = 'all'
+ else:
+ integration_all_target = get_integration_all_target(args)
+
+ return {
+ 'sanity': 'all',
+ 'units': 'all',
+ 'integration': integration_all_target,
+ 'windows-integration': integration_all_target,
+ 'network-integration': integration_all_target,
+ }
+
+
+def get_integration_all_target(args: TestConfig) -> str:
+ """Return the target to use when all tests should be run."""
+ if isinstance(args, IntegrationConfig):
+ return args.changed_all_target
+
+ return 'all'
diff --git a/test/lib/ansible_test/_internal/classification/common.py b/test/lib/ansible_test/_internal/classification/common.py
new file mode 100644
index 0000000..a999b6e
--- /dev/null
+++ b/test/lib/ansible_test/_internal/classification/common.py
@@ -0,0 +1,26 @@
+"""Common classification code used by multiple languages."""
+from __future__ import annotations
+
+import os
+
+from ..data import (
+ data_context,
+)
+
+
+def resolve_csharp_ps_util(import_name: str, path: str) -> str:
+ """Return the fully qualified name of the given import if possible, otherwise return the original import name."""
+ if data_context().content.is_ansible or not import_name.startswith('.'):
+ # We don't support relative paths for builtin utils, there's no point.
+ return import_name
+
+ packages = import_name.split('.')
+ module_packages = path.split(os.path.sep)
+
+ for package in packages:
+ if not module_packages or package:
+ break
+ del module_packages[-1]
+
+ return 'ansible_collections.%s%s' % (data_context().content.prefix,
+ '.'.join(module_packages + [p for p in packages if p]))
diff --git a/test/lib/ansible_test/_internal/classification/csharp.py b/test/lib/ansible_test/_internal/classification/csharp.py
new file mode 100644
index 0000000..edd4101
--- /dev/null
+++ b/test/lib/ansible_test/_internal/classification/csharp.py
@@ -0,0 +1,97 @@
+"""Analyze C# import statements."""
+from __future__ import annotations
+
+import os
+import re
+
+from ..io import (
+ open_text_file,
+)
+
+from ..util import (
+ display,
+)
+
+from .common import (
+ resolve_csharp_ps_util,
+)
+
+from ..data import (
+ data_context,
+)
+
+from ..target import (
+ TestTarget,
+)
+
+
+def get_csharp_module_utils_imports(powershell_targets: list[TestTarget], csharp_targets: list[TestTarget]) -> dict[str, set[str]]:
+ """Return a dictionary of module_utils names mapped to sets of powershell file paths."""
+ module_utils = enumerate_module_utils()
+
+ imports_by_target_path = {}
+
+ for target in powershell_targets:
+ imports_by_target_path[target.path] = extract_csharp_module_utils_imports(target.path, module_utils, False)
+
+ for target in csharp_targets:
+ imports_by_target_path[target.path] = extract_csharp_module_utils_imports(target.path, module_utils, True)
+
+ imports: dict[str, set[str]] = {module_util: set() for module_util in module_utils}
+
+ for target_path, modules in imports_by_target_path.items():
+ for module_util in modules:
+ imports[module_util].add(target_path)
+
+ for module_util in sorted(imports):
+ if not imports[module_util]:
+ display.warning('No imports found which use the "%s" module_util.' % module_util)
+
+ return imports
+
+
+def get_csharp_module_utils_name(path: str) -> str:
+ """Return a namespace and name from the given module_utils path."""
+ base_path = data_context().content.module_utils_csharp_path
+
+ if data_context().content.collection:
+ prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils.'
+ else:
+ prefix = ''
+
+ name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
+
+ return name
+
+
+def enumerate_module_utils() -> set[str]:
+ """Return a set of available module_utils imports."""
+ return set(get_csharp_module_utils_name(p)
+ for p in data_context().content.walk_files(data_context().content.module_utils_csharp_path)
+ if os.path.splitext(p)[1] == '.cs')
+
+
+def extract_csharp_module_utils_imports(path: str, module_utils: set[str], is_pure_csharp: bool) -> set[str]:
+ """Return a set of module_utils imports found in the specified source file."""
+ imports = set()
+ if is_pure_csharp:
+ pattern = re.compile(r'(?i)^using\s((?:Ansible|AnsibleCollections)\..+);$')
+ else:
+ pattern = re.compile(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+((?:Ansible|ansible.collections|\.)\..+)')
+
+ with open_text_file(path) as module_file:
+ for line_number, line in enumerate(module_file, 1):
+ match = re.search(pattern, line)
+
+ if not match:
+ continue
+
+ import_name = resolve_csharp_ps_util(match.group(1), path)
+
+ if import_name in module_utils:
+ imports.add(import_name)
+ elif data_context().content.is_ansible or \
+ import_name.startswith('ansible_collections.%s' % data_context().content.prefix):
+ display.warning('%s:%d Invalid module_utils import: %s' % (path, line_number, import_name))
+
+ return imports
diff --git a/test/lib/ansible_test/_internal/classification/powershell.py b/test/lib/ansible_test/_internal/classification/powershell.py
new file mode 100644
index 0000000..29be6d4
--- /dev/null
+++ b/test/lib/ansible_test/_internal/classification/powershell.py
@@ -0,0 +1,98 @@
+"""Analyze powershell import statements."""
+from __future__ import annotations
+
+import os
+import re
+
+from ..io import (
+ read_text_file,
+)
+
+from ..util import (
+ display,
+)
+
+from .common import (
+ resolve_csharp_ps_util,
+)
+
+from ..data import (
+ data_context,
+)
+
+from ..target import (
+ TestTarget,
+)
+
+
+def get_powershell_module_utils_imports(powershell_targets: list[TestTarget]) -> dict[str, set[str]]:
+ """Return a dictionary of module_utils names mapped to sets of powershell file paths."""
+ module_utils = enumerate_module_utils()
+
+ imports_by_target_path = {}
+
+ for target in powershell_targets:
+ imports_by_target_path[target.path] = extract_powershell_module_utils_imports(target.path, module_utils)
+
+ imports: dict[str, set[str]] = {module_util: set() for module_util in module_utils}
+
+ for target_path, modules in imports_by_target_path.items():
+ for module_util in modules:
+ imports[module_util].add(target_path)
+
+ for module_util in sorted(imports):
+ if not imports[module_util]:
+ display.warning('No imports found which use the "%s" module_util.' % module_util)
+
+ return imports
+
+
+def get_powershell_module_utils_name(path: str) -> str:
+ """Return a namespace and name from the given module_utils path."""
+ base_path = data_context().content.module_utils_powershell_path
+
+ if data_context().content.collection:
+ prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils.'
+ else:
+ prefix = ''
+
+ name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
+
+ return name
+
+
+def enumerate_module_utils() -> set[str]:
+ """Return a set of available module_utils imports."""
+ return set(get_powershell_module_utils_name(p)
+ for p in data_context().content.walk_files(data_context().content.module_utils_powershell_path)
+ if os.path.splitext(p)[1] == '.psm1')
+
+
+def extract_powershell_module_utils_imports(path: str, module_utils: set[str]) -> set[str]:
+ """Return a set of module_utils imports found in the specified source file."""
+ imports = set()
+
+ code = read_text_file(path)
+
+ if data_context().content.is_ansible and '# POWERSHELL_COMMON' in code:
+ imports.add('Ansible.ModuleUtils.Legacy')
+
+ lines = code.splitlines()
+ line_number = 0
+
+ for line in lines:
+ line_number += 1
+ match = re.search(r'(?i)^#\s*(?:requires\s+-modules?|ansiblerequires\s+-powershell)\s*((?:Ansible|ansible_collections|\.)\..+)', line)
+
+ if not match:
+ continue
+
+ import_name = resolve_csharp_ps_util(match.group(1), path)
+
+ if import_name in module_utils:
+ imports.add(import_name)
+ elif data_context().content.is_ansible or \
+ import_name.startswith('ansible_collections.%s' % data_context().content.prefix):
+ display.warning('%s:%d Invalid module_utils import: %s' % (path, line_number, import_name))
+
+ return imports
diff --git a/test/lib/ansible_test/_internal/classification/python.py b/test/lib/ansible_test/_internal/classification/python.py
new file mode 100644
index 0000000..77ffeac
--- /dev/null
+++ b/test/lib/ansible_test/_internal/classification/python.py
@@ -0,0 +1,341 @@
+"""Analyze python import statements."""
+from __future__ import annotations
+
+import ast
+import os
+import re
+import typing as t
+
+from ..io import (
+ read_binary_file,
+)
+
+from ..util import (
+ display,
+ ApplicationError,
+ is_subdir,
+)
+
+from ..data import (
+ data_context,
+)
+
+from ..target import (
+ TestTarget,
+)
+
+VIRTUAL_PACKAGES = {
+ 'ansible.module_utils.six',
+}
+
+
+def get_python_module_utils_imports(compile_targets: list[TestTarget]) -> dict[str, set[str]]:
+ """Return a dictionary of module_utils names mapped to sets of python file paths."""
+ module_utils = enumerate_module_utils()
+
+ virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES))
+ module_utils -= virtual_utils
+
+ imports_by_target_path = {}
+
+ for target in compile_targets:
+ imports_by_target_path[target.path] = extract_python_module_utils_imports(target.path, module_utils)
+
+ def recurse_import(import_name: str, depth: int = 0, seen: t.Optional[set[str]] = None) -> set[str]:
+ """Recursively expand module_utils imports from module_utils files."""
+ display.info('module_utils import: %s%s' % (' ' * depth, import_name), verbosity=4)
+
+ if seen is None:
+ seen = {import_name}
+
+ results = {import_name}
+
+ # virtual packages depend on the modules they contain instead of the reverse
+ if import_name in VIRTUAL_PACKAGES:
+ for sub_import in sorted(virtual_utils):
+ if sub_import.startswith('%s.' % import_name):
+ if sub_import in seen:
+ continue
+
+ seen.add(sub_import)
+
+ matches = sorted(recurse_import(sub_import, depth + 1, seen))
+
+ for result in matches:
+ results.add(result)
+
+ import_path = get_import_path(import_name)
+
+ if import_path not in imports_by_target_path:
+ import_path = get_import_path(import_name, package=True)
+
+ if import_path not in imports_by_target_path:
+ raise ApplicationError('Cannot determine path for module_utils import: %s' % import_name)
+
+ # process imports in reverse so the deepest imports come first
+ for name in sorted(imports_by_target_path[import_path], reverse=True):
+ if name in virtual_utils:
+ continue
+
+ if name in seen:
+ continue
+
+ seen.add(name)
+
+ matches = sorted(recurse_import(name, depth + 1, seen))
+
+ for result in matches:
+ results.add(result)
+
+ return results
+
+ for module_util in module_utils:
+ # recurse over module_utils imports while excluding self
+ module_util_imports = recurse_import(module_util)
+ module_util_imports.remove(module_util)
+
+ # add recursive imports to all path entries which import this module_util
+ for target_path, modules in imports_by_target_path.items():
+ if module_util in modules:
+ for module_util_import in sorted(module_util_imports):
+ if module_util_import not in modules:
+ display.info('%s inherits import %s via %s' % (target_path, module_util_import, module_util), verbosity=6)
+ modules.add(module_util_import)
+
+ imports: dict[str, set[str]] = {module_util: set() for module_util in module_utils | virtual_utils}
+
+ for target_path, modules in imports_by_target_path.items():
+ for module_util in modules:
+ imports[module_util].add(target_path)
+
+ # for purposes of mapping module_utils to paths, treat imports of virtual utils the same as the parent package
+ for virtual_util in virtual_utils:
+ parent_package = '.'.join(virtual_util.split('.')[:-1])
+ imports[virtual_util] = imports[parent_package]
+ display.info('%s reports imports from parent package %s' % (virtual_util, parent_package), verbosity=6)
+
+ for module_util in sorted(imports):
+ if not imports[module_util]:
+ package_path = get_import_path(module_util, package=True)
+
+ if os.path.exists(package_path) and not os.path.getsize(package_path):
+ continue # ignore empty __init__.py files
+
+ display.warning('No imports found which use the "%s" module_util.' % module_util)
+
+ return imports
+
+
+def get_python_module_utils_name(path: str) -> str:
+ """Return a namespace and name from the given module_utils path."""
+ base_path = data_context().content.module_utils_path
+
+ if data_context().content.collection:
+ prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils'
+ else:
+ prefix = 'ansible.module_utils'
+
+ if path.endswith('/__init__.py'):
+ path = os.path.dirname(path)
+
+ if path == base_path:
+ name = prefix
+ else:
+ name = prefix + '.' + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
+
+ return name
+
+
+def enumerate_module_utils() -> set[str]:
+ """Return a list of available module_utils imports."""
+ module_utils = []
+
+ for path in data_context().content.walk_files(data_context().content.module_utils_path):
+ ext = os.path.splitext(path)[1]
+
+ if ext != '.py':
+ continue
+
+ module_utils.append(get_python_module_utils_name(path))
+
+ return set(module_utils)
+
+
+def extract_python_module_utils_imports(path: str, module_utils: set[str]) -> set[str]:
+ """Return a list of module_utils imports found in the specified source file."""
+ # Python code must be read as bytes to avoid a SyntaxError when the source uses comments to declare the file encoding.
+ # See: https://www.python.org/dev/peps/pep-0263
+ # Specifically: If a Unicode string with a coding declaration is passed to compile(), a SyntaxError will be raised.
+ code = read_binary_file(path)
+
+ try:
+ tree = ast.parse(code)
+ except SyntaxError as ex:
+ # Treat this error as a warning so tests can be executed as best as possible.
+ # The compile test will detect and report this syntax error.
+ display.warning('%s:%s Syntax error extracting module_utils imports: %s' % (path, ex.lineno, ex.msg))
+ return set()
+
+ finder = ModuleUtilFinder(path, module_utils)
+ finder.visit(tree)
+ return finder.imports
+
+
+def get_import_path(name: str, package: bool = False) -> str:
+ """Return a path from an import name."""
+ if package:
+ filename = os.path.join(name.replace('.', '/'), '__init__.py')
+ else:
+ filename = '%s.py' % name.replace('.', '/')
+
+ if name.startswith('ansible.module_utils.') or name == 'ansible.module_utils':
+ path = os.path.join('lib', filename)
+ elif data_context().content.collection and (
+ name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name) or
+ name == 'ansible_collections.%s.plugins.module_utils' % data_context().content.collection.full_name):
+ path = '/'.join(filename.split('/')[3:])
+ else:
+ raise Exception('Unexpected import name: %s' % name)
+
+ return path
+
+
+def path_to_module(path: str) -> str:
+ """Convert the given path to a module name."""
+ module = os.path.splitext(path)[0].replace(os.path.sep, '.')
+
+ if module.endswith('.__init__'):
+ module = module[:-9]
+
+ return module
+
+
+def relative_to_absolute(name: str, level: int, module: str, path: str, lineno: int) -> str:
+ """Convert a relative import to an absolute import."""
+ if level <= 0:
+ absolute_name = name
+ elif not module:
+ display.warning('Cannot resolve relative import "%s%s" in unknown module at %s:%d' % ('.' * level, name, path, lineno))
+ absolute_name = 'relative.nomodule'
+ else:
+ parts = module.split('.')
+
+ if level >= len(parts):
+ display.warning('Cannot resolve relative import "%s%s" above module "%s" at %s:%d' % ('.' * level, name, module, path, lineno))
+ absolute_name = 'relative.abovelevel'
+ else:
+ absolute_name = '.'.join(parts[:-level] + [name])
+
+ return absolute_name
+
+
+class ModuleUtilFinder(ast.NodeVisitor):
+ """AST visitor to find valid module_utils imports."""
+ def __init__(self, path: str, module_utils: set[str]) -> None:
+ self.path = path
+ self.module_utils = module_utils
+ self.imports: set[str] = set()
+
+ # implicitly import parent package
+
+ if path.endswith('/__init__.py'):
+ path = os.path.split(path)[0]
+
+ if path.startswith('lib/ansible/module_utils/'):
+ package = os.path.split(path)[0].replace('/', '.')[4:]
+
+ if package != 'ansible.module_utils' and package not in VIRTUAL_PACKAGES:
+ self.add_import(package, 0)
+
+ self.module = None
+
+ if data_context().content.is_ansible:
+ # Various parts of the Ansible source tree execute within different modules.
+ # To support import analysis, each file which uses relative imports must reside under a path defined here.
+ # The mapping is a tuple consisting of a path pattern to match and a replacement path.
+ # During analysis, any relative imports not covered here will result in warnings, which can be fixed by adding the appropriate entry.
+ path_map = (
+ ('^hacking/build_library/build_ansible/', 'build_ansible/'),
+ ('^lib/ansible/', 'ansible/'),
+ ('^test/lib/ansible_test/_util/controller/sanity/validate-modules/', 'validate_modules/'),
+ ('^test/units/', 'test/units/'),
+ ('^test/lib/ansible_test/_internal/', 'ansible_test/_internal/'),
+ ('^test/integration/targets/.*/ansible_collections/(?P<ns>[^/]*)/(?P<col>[^/]*)/', r'ansible_collections/\g<ns>/\g<col>/'),
+ ('^test/integration/targets/.*/library/', 'ansible/modules/'),
+ )
+
+ for pattern, replacement in path_map:
+ if re.search(pattern, self.path):
+ revised_path = re.sub(pattern, replacement, self.path)
+ self.module = path_to_module(revised_path)
+ break
+ else:
+ # This assumes that all files within the collection are executed by Ansible as part of the collection.
+ # While that will usually be true, there are exceptions which will result in this resolution being incorrect.
+ self.module = path_to_module(os.path.join(data_context().content.collection.directory, self.path))
+
+ # pylint: disable=locally-disabled, invalid-name
+ def visit_Import(self, node: ast.Import) -> None:
+ """Visit an import node."""
+ self.generic_visit(node)
+
+ # import ansible.module_utils.MODULE[.MODULE]
+ # import ansible_collections.{ns}.{col}.plugins.module_utils.module_utils.MODULE[.MODULE]
+ self.add_imports([alias.name for alias in node.names], node.lineno)
+
+ # pylint: disable=locally-disabled, invalid-name
+ def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
+ """Visit an import from node."""
+ self.generic_visit(node)
+
+ if not node.module:
+ return
+
+ module = relative_to_absolute(node.module, node.level, self.module, self.path, node.lineno)
+
+ if not module.startswith('ansible'):
+ return
+
+ # from ansible.module_utils import MODULE[, MODULE]
+ # from ansible.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
+ # from ansible_collections.{ns}.{col}.plugins.module_utils import MODULE[, MODULE]
+ # from ansible_collections.{ns}.{col}.plugins.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
+ self.add_imports(['%s.%s' % (module, alias.name) for alias in node.names], node.lineno)
+
+ def add_import(self, name: str, line_number: int) -> None:
+ """Record the specified import."""
+ import_name = name
+
+ while self.is_module_util_name(name):
+ if name in self.module_utils:
+ if name not in self.imports:
+ display.info('%s:%d imports module_utils: %s' % (self.path, line_number, name), verbosity=5)
+ self.imports.add(name)
+
+ return # duplicate imports are ignored
+
+ name = '.'.join(name.split('.')[:-1])
+
+ if is_subdir(self.path, data_context().content.test_path):
+ return # invalid imports in tests are ignored
+
+ # Treat this error as a warning so tests can be executed as best as possible.
+ # This error should be detected by unit or integration tests.
+ display.warning('%s:%d Invalid module_utils import: %s' % (self.path, line_number, import_name))
+
+ def add_imports(self, names: list[str], line_no: int) -> None:
+ """Add the given import names if they are module_utils imports."""
+ for name in names:
+ if self.is_module_util_name(name):
+ self.add_import(name, line_no)
+
+ @staticmethod
+ def is_module_util_name(name: str) -> bool:
+ """Return True if the given name is a module_util name for the content under test. External module_utils are ignored."""
+ if data_context().content.is_ansible and name.startswith('ansible.module_utils.'):
+ return True
+
+ if data_context().content.collection and name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name):
+ return True
+
+ return False