diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/classification')
5 files changed, 1462 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/classification/__init__.py b/test/lib/ansible_test/_internal/classification/__init__.py new file mode 100644 index 0000000..aacc2ca --- /dev/null +++ b/test/lib/ansible_test/_internal/classification/__init__.py @@ -0,0 +1,900 @@ +"""Classify changes in Ansible code.""" +from __future__ import annotations + +import collections +import os +import re +import time +import typing as t + +from ..target import ( + walk_module_targets, + walk_integration_targets, + walk_units_targets, + walk_compile_targets, + walk_sanity_targets, + load_integration_prefixes, + analyze_integration_target_dependencies, + IntegrationTarget, +) + +from ..util import ( + display, + is_subdir, +) + +from .python import ( + get_python_module_utils_imports, + get_python_module_utils_name, +) + +from .csharp import ( + get_csharp_module_utils_imports, + get_csharp_module_utils_name, +) + +from .powershell import ( + get_powershell_module_utils_imports, + get_powershell_module_utils_name, +) + +from ..config import ( + TestConfig, + IntegrationConfig, +) + +from ..metadata import ( + ChangeDescription, +) + +from ..data import ( + data_context, +) + +FOCUSED_TARGET = '__focused__' + + +def categorize_changes(args: TestConfig, paths: list[str], verbose_command: t.Optional[str] = None) -> ChangeDescription: + """Categorize the given list of changed paths and return a description of the changes.""" + mapper = PathMapper(args) + + commands: dict[str, set[str]] = { + 'sanity': set(), + 'units': set(), + 'integration': set(), + 'windows-integration': set(), + 'network-integration': set(), + } + + focused_commands = collections.defaultdict(set) + + deleted_paths: set[str] = set() + original_paths: set[str] = set() + additional_paths: set[str] = set() + no_integration_paths: set[str] = set() + + for path in paths: + if not os.path.exists(path): + deleted_paths.add(path) + continue + + original_paths.add(path) + + dependent_paths = mapper.get_dependent_paths(path) + + if not dependent_paths: + continue + + display.info('Expanded "%s" to %d dependent file(s):' % (path, len(dependent_paths)), verbosity=2) + + for dependent_path in dependent_paths: + display.info(dependent_path, verbosity=2) + additional_paths.add(dependent_path) + + additional_paths -= set(paths) # don't count changed paths as additional paths + + if additional_paths: + display.info('Expanded %d changed file(s) into %d additional dependent file(s).' % (len(paths), len(additional_paths))) + paths = sorted(set(paths) | additional_paths) + + display.info('Mapping %d changed file(s) to tests.' % len(paths)) + + none_count = 0 + + for path in paths: + tests = mapper.classify(path) + + if tests is None: + focused_target = False + + display.info('%s -> all' % path, verbosity=1) + tests = all_tests(args) # not categorized, run all tests + display.warning('Path not categorized: %s' % path) + else: + focused_target = bool(tests.pop(FOCUSED_TARGET, None)) and path in original_paths + + tests = dict((key, value) for key, value in tests.items() if value) + + if focused_target and not any('integration' in command for command in tests): + no_integration_paths.add(path) # path triggers no integration tests + + if verbose_command: + result = '%s: %s' % (verbose_command, tests.get(verbose_command) or 'none') + + # identify targeted integration tests (those which only target a single integration command) + if 'integration' in verbose_command and tests.get(verbose_command): + if not any('integration' in command for command in tests if command != verbose_command): + if focused_target: + result += ' (focused)' + + result += ' (targeted)' + else: + result = '%s' % tests + + if not tests.get(verbose_command): + # minimize excessive output from potentially thousands of files which do not trigger tests + none_count += 1 + verbosity = 2 + else: + verbosity = 1 + + if args.verbosity >= verbosity: + display.info('%s -> %s' % (path, result), verbosity=1) + + for command, target in tests.items(): + commands[command].add(target) + + if focused_target: + focused_commands[command].add(target) + + if none_count > 0 and args.verbosity < 2: + display.notice('Omitted %d file(s) that triggered no tests.' % none_count) + + for command, targets in commands.items(): + targets.discard('none') + + if any(target == 'all' for target in targets): + commands[command] = {'all'} + + sorted_commands = dict((cmd, sorted(targets)) for cmd, targets in commands.items() if targets) + focused_commands = dict((cmd, sorted(targets)) for cmd, targets in focused_commands.items()) + + for command, targets in sorted_commands.items(): + if targets == ['all']: + sorted_commands[command] = [] # changes require testing all targets, do not filter targets + + changes = ChangeDescription() + changes.command = verbose_command + changes.changed_paths = sorted(original_paths) + changes.deleted_paths = sorted(deleted_paths) + changes.regular_command_targets = sorted_commands + changes.focused_command_targets = focused_commands + changes.no_integration_paths = sorted(no_integration_paths) + + return changes + + +class PathMapper: + """Map file paths to test commands and targets.""" + def __init__(self, args: TestConfig) -> None: + self.args = args + self.integration_all_target = get_integration_all_target(self.args) + + self.integration_targets = list(walk_integration_targets()) + self.module_targets = list(walk_module_targets()) + self.compile_targets = list(walk_compile_targets()) + self.units_targets = list(walk_units_targets()) + self.sanity_targets = list(walk_sanity_targets()) + self.powershell_targets = [target for target in self.sanity_targets if os.path.splitext(target.path)[1] in ('.ps1', '.psm1')] + self.csharp_targets = [target for target in self.sanity_targets if os.path.splitext(target.path)[1] == '.cs'] + + self.units_modules = set(target.module for target in self.units_targets if target.module) + self.units_paths = set(a for target in self.units_targets for a in target.aliases) + self.sanity_paths = set(target.path for target in self.sanity_targets) + + self.module_names_by_path = dict((target.path, target.module) for target in self.module_targets) + self.integration_targets_by_name = dict((target.name, target) for target in self.integration_targets) + self.integration_targets_by_alias = dict((a, target) for target in self.integration_targets for a in target.aliases) + + self.posix_integration_by_module = dict((m, target.name) for target in self.integration_targets + if 'posix/' in target.aliases for m in target.modules) + self.windows_integration_by_module = dict((m, target.name) for target in self.integration_targets + if 'windows/' in target.aliases for m in target.modules) + self.network_integration_by_module = dict((m, target.name) for target in self.integration_targets + if 'network/' in target.aliases for m in target.modules) + + self.prefixes = load_integration_prefixes() + self.integration_dependencies = analyze_integration_target_dependencies(self.integration_targets) + + self.python_module_utils_imports: dict[str, set[str]] = {} # populated on first use to reduce overhead when not needed + self.powershell_module_utils_imports: dict[str, set[str]] = {} # populated on first use to reduce overhead when not needed + self.csharp_module_utils_imports: dict[str, set[str]] = {} # populated on first use to reduce overhead when not needed + + self.paths_to_dependent_targets: dict[str, set[IntegrationTarget]] = {} + + for target in self.integration_targets: + for path in target.needs_file: + if path not in self.paths_to_dependent_targets: + self.paths_to_dependent_targets[path] = set() + + self.paths_to_dependent_targets[path].add(target) + + def get_dependent_paths(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path, recursively expanding dependent paths as well.""" + unprocessed_paths = set(self.get_dependent_paths_non_recursive(path)) + paths = set() + + while unprocessed_paths: + queued_paths = list(unprocessed_paths) + paths |= unprocessed_paths + unprocessed_paths = set() + + for queued_path in queued_paths: + new_paths = self.get_dependent_paths_non_recursive(queued_path) + + for new_path in new_paths: + if new_path not in paths: + unprocessed_paths.add(new_path) + + return sorted(paths) + + def get_dependent_paths_non_recursive(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path, including dependent integration test target paths.""" + paths = self.get_dependent_paths_internal(path) + paths += [target.path + '/' for target in self.paths_to_dependent_targets.get(path, set())] + paths = sorted(set(paths)) + + return paths + + def get_dependent_paths_internal(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path.""" + ext = os.path.splitext(os.path.split(path)[1])[1] + + if is_subdir(path, data_context().content.module_utils_path): + if ext == '.py': + return self.get_python_module_utils_usage(path) + + if ext == '.psm1': + return self.get_powershell_module_utils_usage(path) + + if ext == '.cs': + return self.get_csharp_module_utils_usage(path) + + if is_subdir(path, data_context().content.integration_targets_path): + return self.get_integration_target_usage(path) + + return [] + + def get_python_module_utils_usage(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path which is a Python module_utils file.""" + if not self.python_module_utils_imports: + display.info('Analyzing python module_utils imports...') + before = time.time() + self.python_module_utils_imports = get_python_module_utils_imports(self.compile_targets) + after = time.time() + display.info('Processed %d python module_utils in %d second(s).' % (len(self.python_module_utils_imports), after - before)) + + name = get_python_module_utils_name(path) + + return sorted(self.python_module_utils_imports[name]) + + def get_powershell_module_utils_usage(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path which is a PowerShell module_utils file.""" + if not self.powershell_module_utils_imports: + display.info('Analyzing powershell module_utils imports...') + before = time.time() + self.powershell_module_utils_imports = get_powershell_module_utils_imports(self.powershell_targets) + after = time.time() + display.info('Processed %d powershell module_utils in %d second(s).' % (len(self.powershell_module_utils_imports), after - before)) + + name = get_powershell_module_utils_name(path) + + return sorted(self.powershell_module_utils_imports[name]) + + def get_csharp_module_utils_usage(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path which is a C# module_utils file.""" + if not self.csharp_module_utils_imports: + display.info('Analyzing C# module_utils imports...') + before = time.time() + self.csharp_module_utils_imports = get_csharp_module_utils_imports(self.powershell_targets, self.csharp_targets) + after = time.time() + display.info('Processed %d C# module_utils in %d second(s).' % (len(self.csharp_module_utils_imports), after - before)) + + name = get_csharp_module_utils_name(path) + + return sorted(self.csharp_module_utils_imports[name]) + + def get_integration_target_usage(self, path: str) -> list[str]: + """Return a list of paths which depend on the given path which is an integration target file.""" + target_name = path.split('/')[3] + dependents = [os.path.join(data_context().content.integration_targets_path, target) + os.path.sep + for target in sorted(self.integration_dependencies.get(target_name, set()))] + + return dependents + + def classify(self, path: str) -> t.Optional[dict[str, str]]: + """Classify the given path and return an optional dictionary of the results.""" + result = self._classify(path) + + # run all tests when no result given + if result is None: + return None + + # run sanity on path unless result specified otherwise + if path in self.sanity_paths and 'sanity' not in result: + result['sanity'] = path + + return result + + def _classify(self, path: str) -> t.Optional[dict[str, str]]: + """Return the classification for the given path.""" + if data_context().content.is_ansible: + return self._classify_ansible(path) + + if data_context().content.collection: + return self._classify_collection(path) + + return None + + def _classify_common(self, path: str) -> t.Optional[dict[str, str]]: + """Return the classification for the given path using rules common to all layouts.""" + dirname = os.path.dirname(path) + filename = os.path.basename(path) + name, ext = os.path.splitext(filename) + + minimal: dict[str, str] = {} + + if os.path.sep not in path: + if filename in ( + 'azure-pipelines.yml', + ): + return all_tests(self.args) # test infrastructure, run all tests + + if is_subdir(path, '.azure-pipelines'): + return all_tests(self.args) # test infrastructure, run all tests + + if is_subdir(path, '.github'): + return minimal + + if is_subdir(path, data_context().content.integration_targets_path): + if not os.path.exists(path): + return minimal + + target = self.integration_targets_by_name.get(path.split('/')[3]) + + if not target: + display.warning('Unexpected non-target found: %s' % path) + return minimal + + if 'hidden/' in target.aliases: + return minimal # already expanded using get_dependent_paths + + return { + 'integration': target.name if 'posix/' in target.aliases else None, + 'windows-integration': target.name if 'windows/' in target.aliases else None, + 'network-integration': target.name if 'network/' in target.aliases else None, + FOCUSED_TARGET: target.name, + } + + if is_subdir(path, data_context().content.integration_path): + if dirname == data_context().content.integration_path: + for command in ( + 'integration', + 'windows-integration', + 'network-integration', + ): + if name == command and ext == '.cfg': + return { + command: self.integration_all_target, + } + + if name == command + '.requirements' and ext == '.txt': + return { + command: self.integration_all_target, + } + + return { + 'integration': self.integration_all_target, + 'windows-integration': self.integration_all_target, + 'network-integration': self.integration_all_target, + } + + if is_subdir(path, data_context().content.sanity_path): + return { + 'sanity': 'all', # test infrastructure, run all sanity checks + } + + if is_subdir(path, data_context().content.unit_path): + if path in self.units_paths: + return { + 'units': path, + } + + # changes to files which are not unit tests should trigger tests from the nearest parent directory + + test_path = os.path.dirname(path) + + while test_path: + if test_path + '/' in self.units_paths: + return { + 'units': test_path + '/', + } + + test_path = os.path.dirname(test_path) + + if is_subdir(path, data_context().content.module_path): + module_name = self.module_names_by_path.get(path) + + if module_name: + return { + 'units': module_name if module_name in self.units_modules else None, + 'integration': self.posix_integration_by_module.get(module_name) if ext == '.py' else None, + 'windows-integration': self.windows_integration_by_module.get(module_name) if ext in ['.cs', '.ps1'] else None, + 'network-integration': self.network_integration_by_module.get(module_name), + FOCUSED_TARGET: module_name, + } + + return minimal + + if is_subdir(path, data_context().content.module_utils_path): + if ext == '.cs': + return minimal # already expanded using get_dependent_paths + + if ext == '.psm1': + return minimal # already expanded using get_dependent_paths + + if ext == '.py': + return minimal # already expanded using get_dependent_paths + + if is_subdir(path, data_context().content.plugin_paths['action']): + if ext == '.py': + if name.startswith('net_'): + network_target = 'network/.*_%s' % name[4:] + + if any(re.search(r'^%s$' % network_target, alias) for alias in self.integration_targets_by_alias): + return { + 'network-integration': network_target, + 'units': 'all', + } + + return { + 'network-integration': self.integration_all_target, + 'units': 'all', + } + + if self.prefixes.get(name) == 'network': + network_platform = name + elif name.endswith('_config') and self.prefixes.get(name[:-7]) == 'network': + network_platform = name[:-7] + elif name.endswith('_template') and self.prefixes.get(name[:-9]) == 'network': + network_platform = name[:-9] + else: + network_platform = None + + if network_platform: + network_target = 'network/%s/' % network_platform + + if network_target in self.integration_targets_by_alias: + return { + 'network-integration': network_target, + 'units': 'all', + } + + display.warning('Integration tests for "%s" not found.' % network_target, unique=True) + + return { + 'units': 'all', + } + + if is_subdir(path, data_context().content.plugin_paths['connection']): + units_dir = os.path.join(data_context().content.unit_path, 'plugins', 'connection') + if name == '__init__': + return { + 'integration': self.integration_all_target, + 'windows-integration': self.integration_all_target, + 'network-integration': self.integration_all_target, + 'units': os.path.join(units_dir, ''), + } + + units_path = os.path.join(units_dir, 'test_%s.py' % name) + + if units_path not in self.units_paths: + units_path = None + + integration_name = 'connection_%s' % name + + if integration_name not in self.integration_targets_by_name: + integration_name = None + + windows_integration_name = 'connection_windows_%s' % name + + if windows_integration_name not in self.integration_targets_by_name: + windows_integration_name = None + + # entire integration test commands depend on these connection plugins + + if name in ['winrm', 'psrp']: + return { + 'windows-integration': self.integration_all_target, + 'units': units_path, + } + + if name == 'local': + return { + 'integration': self.integration_all_target, + 'network-integration': self.integration_all_target, + 'units': units_path, + } + + if name == 'network_cli': + return { + 'network-integration': self.integration_all_target, + 'units': units_path, + } + + if name == 'paramiko_ssh': + return { + 'integration': integration_name, + 'network-integration': self.integration_all_target, + 'units': units_path, + } + + # other connection plugins have isolated integration and unit tests + + return { + 'integration': integration_name, + 'windows-integration': windows_integration_name, + 'units': units_path, + } + + if is_subdir(path, data_context().content.plugin_paths['doc_fragments']): + return { + 'sanity': 'all', + } + + if is_subdir(path, data_context().content.plugin_paths['inventory']): + if name == '__init__': + return all_tests(self.args) # broad impact, run all tests + + # These inventory plugins are enabled by default (see INVENTORY_ENABLED). + # Without dedicated integration tests for these we must rely on the incidental coverage from other tests. + test_all = [ + 'host_list', + 'script', + 'yaml', + 'ini', + 'auto', + ] + + if name in test_all: + posix_integration_fallback = get_integration_all_target(self.args) + else: + posix_integration_fallback = None + + target = self.integration_targets_by_name.get('inventory_%s' % name) + units_dir = os.path.join(data_context().content.unit_path, 'plugins', 'inventory') + units_path = os.path.join(units_dir, 'test_%s.py' % name) + + if units_path not in self.units_paths: + units_path = None + + return { + 'integration': target.name if target and 'posix/' in target.aliases else posix_integration_fallback, + 'windows-integration': target.name if target and 'windows/' in target.aliases else None, + 'network-integration': target.name if target and 'network/' in target.aliases else None, + 'units': units_path, + FOCUSED_TARGET: target.name if target else None, + } + + if is_subdir(path, data_context().content.plugin_paths['filter']): + return self._simple_plugin_tests('filter', name) + + if is_subdir(path, data_context().content.plugin_paths['lookup']): + return self._simple_plugin_tests('lookup', name) + + if (is_subdir(path, data_context().content.plugin_paths['terminal']) or + is_subdir(path, data_context().content.plugin_paths['cliconf']) or + is_subdir(path, data_context().content.plugin_paths['netconf'])): + if ext == '.py': + if name in self.prefixes and self.prefixes[name] == 'network': + network_target = 'network/%s/' % name + + if network_target in self.integration_targets_by_alias: + return { + 'network-integration': network_target, + 'units': 'all', + } + + display.warning('Integration tests for "%s" not found.' % network_target, unique=True) + + return { + 'units': 'all', + } + + return { + 'network-integration': self.integration_all_target, + 'units': 'all', + } + + if is_subdir(path, data_context().content.plugin_paths['test']): + return self._simple_plugin_tests('test', name) + + return None + + def _classify_collection(self, path: str) -> t.Optional[dict[str, str]]: + """Return the classification for the given path using rules specific to collections.""" + result = self._classify_common(path) + + if result is not None: + return result + + filename = os.path.basename(path) + dummy, ext = os.path.splitext(filename) + + minimal: dict[str, str] = {} + + if path.startswith('changelogs/'): + return minimal + + if path.startswith('docs/'): + return minimal + + if '/' not in path: + if path in ( + '.gitignore', + 'COPYING', + 'LICENSE', + 'Makefile', + ): + return minimal + + if ext in ( + '.in', + '.md', + '.rst', + '.toml', + '.txt', + ): + return minimal + + return None + + def _classify_ansible(self, path: str) -> t.Optional[dict[str, str]]: + """Return the classification for the given path using rules specific to Ansible.""" + if path.startswith('test/units/compat/'): + return { + 'units': 'test/units/', + } + + result = self._classify_common(path) + + if result is not None: + return result + + dirname = os.path.dirname(path) + filename = os.path.basename(path) + name, ext = os.path.splitext(filename) + + minimal: dict[str, str] = {} + + if path.startswith('bin/'): + return all_tests(self.args) # broad impact, run all tests + + if path.startswith('changelogs/'): + return minimal + + if path.startswith('docs/'): + return minimal + + if path.startswith('examples/'): + if path == 'examples/scripts/ConfigureRemotingForAnsible.ps1': + return { + 'windows-integration': 'connection_winrm', + } + + return minimal + + if path.startswith('hacking/'): + return minimal + + if path.startswith('lib/ansible/executor/powershell/'): + units_path = 'test/units/executor/powershell/' + + if units_path not in self.units_paths: + units_path = None + + return { + 'windows-integration': self.integration_all_target, + 'units': units_path, + } + + if path.startswith('lib/ansible/'): + return all_tests(self.args) # broad impact, run all tests + + if path.startswith('licenses/'): + return minimal + + if path.startswith('packaging/'): + return minimal + + if path.startswith('test/ansible_test/'): + return minimal # these tests are not invoked from ansible-test + + if path.startswith('test/lib/ansible_test/config/'): + if name.startswith('cloud-config-'): + cloud_target = 'cloud/%s/' % name.split('-')[2].split('.')[0] + + if cloud_target in self.integration_targets_by_alias: + return { + 'integration': cloud_target, + } + + if path.startswith('test/lib/ansible_test/_data/completion/'): + if path == 'test/lib/ansible_test/_data/completion/docker.txt': + return all_tests(self.args, force=True) # force all tests due to risk of breaking changes in new test environment + + if path.startswith('test/lib/ansible_test/_internal/commands/integration/cloud/'): + cloud_target = 'cloud/%s/' % name + + if cloud_target in self.integration_targets_by_alias: + return { + 'integration': cloud_target, + } + + return all_tests(self.args) # test infrastructure, run all tests + + if path.startswith('test/lib/ansible_test/_internal/commands/sanity/'): + return { + 'sanity': 'all', # test infrastructure, run all sanity checks + 'integration': 'ansible-test/', # run ansible-test self tests + } + + if path.startswith('test/lib/ansible_test/_internal/commands/units/'): + return { + 'units': 'all', # test infrastructure, run all unit tests + 'integration': 'ansible-test/', # run ansible-test self tests + } + + if path.startswith('test/lib/ansible_test/_data/requirements/'): + if name in ( + 'integration', + 'network-integration', + 'windows-integration', + ): + return { + name: self.integration_all_target, + } + + if name in ( + 'sanity', + 'units', + ): + return { + name: 'all', + } + + if path.startswith('test/lib/ansible_test/_util/controller/sanity/') or path.startswith('test/lib/ansible_test/_util/target/sanity/'): + return { + 'sanity': 'all', # test infrastructure, run all sanity checks + 'integration': 'ansible-test/', # run ansible-test self tests + } + + if path.startswith('test/lib/ansible_test/_util/target/pytest/'): + return { + 'units': 'all', # test infrastructure, run all unit tests + 'integration': 'ansible-test/', # run ansible-test self tests + } + + if path.startswith('test/lib/'): + return all_tests(self.args) # test infrastructure, run all tests + + if path.startswith('test/support/'): + return all_tests(self.args) # test infrastructure, run all tests + + if path.startswith('test/utils/shippable/'): + if dirname == 'test/utils/shippable': + test_map = { + 'cloud.sh': 'integration:cloud/', + 'linux.sh': 'integration:all', + 'network.sh': 'network-integration:all', + 'remote.sh': 'integration:all', + 'sanity.sh': 'sanity:all', + 'units.sh': 'units:all', + 'windows.sh': 'windows-integration:all', + } + + test_match = test_map.get(filename) + + if test_match: + test_command, test_target = test_match.split(':') + + return { + test_command: test_target, + } + + cloud_target = 'cloud/%s/' % name + + if cloud_target in self.integration_targets_by_alias: + return { + 'integration': cloud_target, + } + + return all_tests(self.args) # test infrastructure, run all tests + + if path.startswith('test/utils/'): + return minimal + + if '/' not in path: + if path in ( + '.gitattributes', + '.gitignore', + '.mailmap', + 'COPYING', + 'Makefile', + ): + return minimal + + if path in ( + 'setup.py', + ): + return all_tests(self.args) # broad impact, run all tests + + if ext in ( + '.in', + '.md', + '.rst', + '.toml', + '.txt', + ): + return minimal + + return None # unknown, will result in fall-back to run all tests + + def _simple_plugin_tests(self, plugin_type: str, plugin_name: str) -> dict[str, t.Optional[str]]: + """ + Return tests for the given plugin type and plugin name. + This function is useful for plugin types which do not require special processing. + """ + if plugin_name == '__init__': + return all_tests(self.args, True) + + integration_target = self.integration_targets_by_name.get('%s_%s' % (plugin_type, plugin_name)) + + if integration_target: + integration_name = integration_target.name + else: + integration_name = None + + units_path = os.path.join(data_context().content.unit_path, 'plugins', plugin_type, 'test_%s.py' % plugin_name) + + if units_path not in self.units_paths: + units_path = None + + return dict( + integration=integration_name, + units=units_path, + ) + + +def all_tests(args: TestConfig, force: bool = False) -> dict[str, str]: + """Return the targets for each test command when all tests should be run.""" + if force: + integration_all_target = 'all' + else: + integration_all_target = get_integration_all_target(args) + + return { + 'sanity': 'all', + 'units': 'all', + 'integration': integration_all_target, + 'windows-integration': integration_all_target, + 'network-integration': integration_all_target, + } + + +def get_integration_all_target(args: TestConfig) -> str: + """Return the target to use when all tests should be run.""" + if isinstance(args, IntegrationConfig): + return args.changed_all_target + + return 'all' diff --git a/test/lib/ansible_test/_internal/classification/common.py b/test/lib/ansible_test/_internal/classification/common.py new file mode 100644 index 0000000..a999b6e --- /dev/null +++ b/test/lib/ansible_test/_internal/classification/common.py @@ -0,0 +1,26 @@ +"""Common classification code used by multiple languages.""" +from __future__ import annotations + +import os + +from ..data import ( + data_context, +) + + +def resolve_csharp_ps_util(import_name: str, path: str) -> str: + """Return the fully qualified name of the given import if possible, otherwise return the original import name.""" + if data_context().content.is_ansible or not import_name.startswith('.'): + # We don't support relative paths for builtin utils, there's no point. + return import_name + + packages = import_name.split('.') + module_packages = path.split(os.path.sep) + + for package in packages: + if not module_packages or package: + break + del module_packages[-1] + + return 'ansible_collections.%s%s' % (data_context().content.prefix, + '.'.join(module_packages + [p for p in packages if p])) diff --git a/test/lib/ansible_test/_internal/classification/csharp.py b/test/lib/ansible_test/_internal/classification/csharp.py new file mode 100644 index 0000000..edd4101 --- /dev/null +++ b/test/lib/ansible_test/_internal/classification/csharp.py @@ -0,0 +1,97 @@ +"""Analyze C# import statements.""" +from __future__ import annotations + +import os +import re + +from ..io import ( + open_text_file, +) + +from ..util import ( + display, +) + +from .common import ( + resolve_csharp_ps_util, +) + +from ..data import ( + data_context, +) + +from ..target import ( + TestTarget, +) + + +def get_csharp_module_utils_imports(powershell_targets: list[TestTarget], csharp_targets: list[TestTarget]) -> dict[str, set[str]]: + """Return a dictionary of module_utils names mapped to sets of powershell file paths.""" + module_utils = enumerate_module_utils() + + imports_by_target_path = {} + + for target in powershell_targets: + imports_by_target_path[target.path] = extract_csharp_module_utils_imports(target.path, module_utils, False) + + for target in csharp_targets: + imports_by_target_path[target.path] = extract_csharp_module_utils_imports(target.path, module_utils, True) + + imports: dict[str, set[str]] = {module_util: set() for module_util in module_utils} + + for target_path, modules in imports_by_target_path.items(): + for module_util in modules: + imports[module_util].add(target_path) + + for module_util in sorted(imports): + if not imports[module_util]: + display.warning('No imports found which use the "%s" module_util.' % module_util) + + return imports + + +def get_csharp_module_utils_name(path: str) -> str: + """Return a namespace and name from the given module_utils path.""" + base_path = data_context().content.module_utils_csharp_path + + if data_context().content.collection: + prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils.' + else: + prefix = '' + + name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.') + + return name + + +def enumerate_module_utils() -> set[str]: + """Return a set of available module_utils imports.""" + return set(get_csharp_module_utils_name(p) + for p in data_context().content.walk_files(data_context().content.module_utils_csharp_path) + if os.path.splitext(p)[1] == '.cs') + + +def extract_csharp_module_utils_imports(path: str, module_utils: set[str], is_pure_csharp: bool) -> set[str]: + """Return a set of module_utils imports found in the specified source file.""" + imports = set() + if is_pure_csharp: + pattern = re.compile(r'(?i)^using\s((?:Ansible|AnsibleCollections)\..+);$') + else: + pattern = re.compile(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+((?:Ansible|ansible.collections|\.)\..+)') + + with open_text_file(path) as module_file: + for line_number, line in enumerate(module_file, 1): + match = re.search(pattern, line) + + if not match: + continue + + import_name = resolve_csharp_ps_util(match.group(1), path) + + if import_name in module_utils: + imports.add(import_name) + elif data_context().content.is_ansible or \ + import_name.startswith('ansible_collections.%s' % data_context().content.prefix): + display.warning('%s:%d Invalid module_utils import: %s' % (path, line_number, import_name)) + + return imports diff --git a/test/lib/ansible_test/_internal/classification/powershell.py b/test/lib/ansible_test/_internal/classification/powershell.py new file mode 100644 index 0000000..29be6d4 --- /dev/null +++ b/test/lib/ansible_test/_internal/classification/powershell.py @@ -0,0 +1,98 @@ +"""Analyze powershell import statements.""" +from __future__ import annotations + +import os +import re + +from ..io import ( + read_text_file, +) + +from ..util import ( + display, +) + +from .common import ( + resolve_csharp_ps_util, +) + +from ..data import ( + data_context, +) + +from ..target import ( + TestTarget, +) + + +def get_powershell_module_utils_imports(powershell_targets: list[TestTarget]) -> dict[str, set[str]]: + """Return a dictionary of module_utils names mapped to sets of powershell file paths.""" + module_utils = enumerate_module_utils() + + imports_by_target_path = {} + + for target in powershell_targets: + imports_by_target_path[target.path] = extract_powershell_module_utils_imports(target.path, module_utils) + + imports: dict[str, set[str]] = {module_util: set() for module_util in module_utils} + + for target_path, modules in imports_by_target_path.items(): + for module_util in modules: + imports[module_util].add(target_path) + + for module_util in sorted(imports): + if not imports[module_util]: + display.warning('No imports found which use the "%s" module_util.' % module_util) + + return imports + + +def get_powershell_module_utils_name(path: str) -> str: + """Return a namespace and name from the given module_utils path.""" + base_path = data_context().content.module_utils_powershell_path + + if data_context().content.collection: + prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils.' + else: + prefix = '' + + name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.') + + return name + + +def enumerate_module_utils() -> set[str]: + """Return a set of available module_utils imports.""" + return set(get_powershell_module_utils_name(p) + for p in data_context().content.walk_files(data_context().content.module_utils_powershell_path) + if os.path.splitext(p)[1] == '.psm1') + + +def extract_powershell_module_utils_imports(path: str, module_utils: set[str]) -> set[str]: + """Return a set of module_utils imports found in the specified source file.""" + imports = set() + + code = read_text_file(path) + + if data_context().content.is_ansible and '# POWERSHELL_COMMON' in code: + imports.add('Ansible.ModuleUtils.Legacy') + + lines = code.splitlines() + line_number = 0 + + for line in lines: + line_number += 1 + match = re.search(r'(?i)^#\s*(?:requires\s+-modules?|ansiblerequires\s+-powershell)\s*((?:Ansible|ansible_collections|\.)\..+)', line) + + if not match: + continue + + import_name = resolve_csharp_ps_util(match.group(1), path) + + if import_name in module_utils: + imports.add(import_name) + elif data_context().content.is_ansible or \ + import_name.startswith('ansible_collections.%s' % data_context().content.prefix): + display.warning('%s:%d Invalid module_utils import: %s' % (path, line_number, import_name)) + + return imports diff --git a/test/lib/ansible_test/_internal/classification/python.py b/test/lib/ansible_test/_internal/classification/python.py new file mode 100644 index 0000000..77ffeac --- /dev/null +++ b/test/lib/ansible_test/_internal/classification/python.py @@ -0,0 +1,341 @@ +"""Analyze python import statements.""" +from __future__ import annotations + +import ast +import os +import re +import typing as t + +from ..io import ( + read_binary_file, +) + +from ..util import ( + display, + ApplicationError, + is_subdir, +) + +from ..data import ( + data_context, +) + +from ..target import ( + TestTarget, +) + +VIRTUAL_PACKAGES = { + 'ansible.module_utils.six', +} + + +def get_python_module_utils_imports(compile_targets: list[TestTarget]) -> dict[str, set[str]]: + """Return a dictionary of module_utils names mapped to sets of python file paths.""" + module_utils = enumerate_module_utils() + + virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES)) + module_utils -= virtual_utils + + imports_by_target_path = {} + + for target in compile_targets: + imports_by_target_path[target.path] = extract_python_module_utils_imports(target.path, module_utils) + + def recurse_import(import_name: str, depth: int = 0, seen: t.Optional[set[str]] = None) -> set[str]: + """Recursively expand module_utils imports from module_utils files.""" + display.info('module_utils import: %s%s' % (' ' * depth, import_name), verbosity=4) + + if seen is None: + seen = {import_name} + + results = {import_name} + + # virtual packages depend on the modules they contain instead of the reverse + if import_name in VIRTUAL_PACKAGES: + for sub_import in sorted(virtual_utils): + if sub_import.startswith('%s.' % import_name): + if sub_import in seen: + continue + + seen.add(sub_import) + + matches = sorted(recurse_import(sub_import, depth + 1, seen)) + + for result in matches: + results.add(result) + + import_path = get_import_path(import_name) + + if import_path not in imports_by_target_path: + import_path = get_import_path(import_name, package=True) + + if import_path not in imports_by_target_path: + raise ApplicationError('Cannot determine path for module_utils import: %s' % import_name) + + # process imports in reverse so the deepest imports come first + for name in sorted(imports_by_target_path[import_path], reverse=True): + if name in virtual_utils: + continue + + if name in seen: + continue + + seen.add(name) + + matches = sorted(recurse_import(name, depth + 1, seen)) + + for result in matches: + results.add(result) + + return results + + for module_util in module_utils: + # recurse over module_utils imports while excluding self + module_util_imports = recurse_import(module_util) + module_util_imports.remove(module_util) + + # add recursive imports to all path entries which import this module_util + for target_path, modules in imports_by_target_path.items(): + if module_util in modules: + for module_util_import in sorted(module_util_imports): + if module_util_import not in modules: + display.info('%s inherits import %s via %s' % (target_path, module_util_import, module_util), verbosity=6) + modules.add(module_util_import) + + imports: dict[str, set[str]] = {module_util: set() for module_util in module_utils | virtual_utils} + + for target_path, modules in imports_by_target_path.items(): + for module_util in modules: + imports[module_util].add(target_path) + + # for purposes of mapping module_utils to paths, treat imports of virtual utils the same as the parent package + for virtual_util in virtual_utils: + parent_package = '.'.join(virtual_util.split('.')[:-1]) + imports[virtual_util] = imports[parent_package] + display.info('%s reports imports from parent package %s' % (virtual_util, parent_package), verbosity=6) + + for module_util in sorted(imports): + if not imports[module_util]: + package_path = get_import_path(module_util, package=True) + + if os.path.exists(package_path) and not os.path.getsize(package_path): + continue # ignore empty __init__.py files + + display.warning('No imports found which use the "%s" module_util.' % module_util) + + return imports + + +def get_python_module_utils_name(path: str) -> str: + """Return a namespace and name from the given module_utils path.""" + base_path = data_context().content.module_utils_path + + if data_context().content.collection: + prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils' + else: + prefix = 'ansible.module_utils' + + if path.endswith('/__init__.py'): + path = os.path.dirname(path) + + if path == base_path: + name = prefix + else: + name = prefix + '.' + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.') + + return name + + +def enumerate_module_utils() -> set[str]: + """Return a list of available module_utils imports.""" + module_utils = [] + + for path in data_context().content.walk_files(data_context().content.module_utils_path): + ext = os.path.splitext(path)[1] + + if ext != '.py': + continue + + module_utils.append(get_python_module_utils_name(path)) + + return set(module_utils) + + +def extract_python_module_utils_imports(path: str, module_utils: set[str]) -> set[str]: + """Return a list of module_utils imports found in the specified source file.""" + # Python code must be read as bytes to avoid a SyntaxError when the source uses comments to declare the file encoding. + # See: https://www.python.org/dev/peps/pep-0263 + # Specifically: If a Unicode string with a coding declaration is passed to compile(), a SyntaxError will be raised. + code = read_binary_file(path) + + try: + tree = ast.parse(code) + except SyntaxError as ex: + # Treat this error as a warning so tests can be executed as best as possible. + # The compile test will detect and report this syntax error. + display.warning('%s:%s Syntax error extracting module_utils imports: %s' % (path, ex.lineno, ex.msg)) + return set() + + finder = ModuleUtilFinder(path, module_utils) + finder.visit(tree) + return finder.imports + + +def get_import_path(name: str, package: bool = False) -> str: + """Return a path from an import name.""" + if package: + filename = os.path.join(name.replace('.', '/'), '__init__.py') + else: + filename = '%s.py' % name.replace('.', '/') + + if name.startswith('ansible.module_utils.') or name == 'ansible.module_utils': + path = os.path.join('lib', filename) + elif data_context().content.collection and ( + name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name) or + name == 'ansible_collections.%s.plugins.module_utils' % data_context().content.collection.full_name): + path = '/'.join(filename.split('/')[3:]) + else: + raise Exception('Unexpected import name: %s' % name) + + return path + + +def path_to_module(path: str) -> str: + """Convert the given path to a module name.""" + module = os.path.splitext(path)[0].replace(os.path.sep, '.') + + if module.endswith('.__init__'): + module = module[:-9] + + return module + + +def relative_to_absolute(name: str, level: int, module: str, path: str, lineno: int) -> str: + """Convert a relative import to an absolute import.""" + if level <= 0: + absolute_name = name + elif not module: + display.warning('Cannot resolve relative import "%s%s" in unknown module at %s:%d' % ('.' * level, name, path, lineno)) + absolute_name = 'relative.nomodule' + else: + parts = module.split('.') + + if level >= len(parts): + display.warning('Cannot resolve relative import "%s%s" above module "%s" at %s:%d' % ('.' * level, name, module, path, lineno)) + absolute_name = 'relative.abovelevel' + else: + absolute_name = '.'.join(parts[:-level] + [name]) + + return absolute_name + + +class ModuleUtilFinder(ast.NodeVisitor): + """AST visitor to find valid module_utils imports.""" + def __init__(self, path: str, module_utils: set[str]) -> None: + self.path = path + self.module_utils = module_utils + self.imports: set[str] = set() + + # implicitly import parent package + + if path.endswith('/__init__.py'): + path = os.path.split(path)[0] + + if path.startswith('lib/ansible/module_utils/'): + package = os.path.split(path)[0].replace('/', '.')[4:] + + if package != 'ansible.module_utils' and package not in VIRTUAL_PACKAGES: + self.add_import(package, 0) + + self.module = None + + if data_context().content.is_ansible: + # Various parts of the Ansible source tree execute within different modules. + # To support import analysis, each file which uses relative imports must reside under a path defined here. + # The mapping is a tuple consisting of a path pattern to match and a replacement path. + # During analysis, any relative imports not covered here will result in warnings, which can be fixed by adding the appropriate entry. + path_map = ( + ('^hacking/build_library/build_ansible/', 'build_ansible/'), + ('^lib/ansible/', 'ansible/'), + ('^test/lib/ansible_test/_util/controller/sanity/validate-modules/', 'validate_modules/'), + ('^test/units/', 'test/units/'), + ('^test/lib/ansible_test/_internal/', 'ansible_test/_internal/'), + ('^test/integration/targets/.*/ansible_collections/(?P<ns>[^/]*)/(?P<col>[^/]*)/', r'ansible_collections/\g<ns>/\g<col>/'), + ('^test/integration/targets/.*/library/', 'ansible/modules/'), + ) + + for pattern, replacement in path_map: + if re.search(pattern, self.path): + revised_path = re.sub(pattern, replacement, self.path) + self.module = path_to_module(revised_path) + break + else: + # This assumes that all files within the collection are executed by Ansible as part of the collection. + # While that will usually be true, there are exceptions which will result in this resolution being incorrect. + self.module = path_to_module(os.path.join(data_context().content.collection.directory, self.path)) + + # pylint: disable=locally-disabled, invalid-name + def visit_Import(self, node: ast.Import) -> None: + """Visit an import node.""" + self.generic_visit(node) + + # import ansible.module_utils.MODULE[.MODULE] + # import ansible_collections.{ns}.{col}.plugins.module_utils.module_utils.MODULE[.MODULE] + self.add_imports([alias.name for alias in node.names], node.lineno) + + # pylint: disable=locally-disabled, invalid-name + def visit_ImportFrom(self, node: ast.ImportFrom) -> None: + """Visit an import from node.""" + self.generic_visit(node) + + if not node.module: + return + + module = relative_to_absolute(node.module, node.level, self.module, self.path, node.lineno) + + if not module.startswith('ansible'): + return + + # from ansible.module_utils import MODULE[, MODULE] + # from ansible.module_utils.MODULE[.MODULE] import MODULE[, MODULE] + # from ansible_collections.{ns}.{col}.plugins.module_utils import MODULE[, MODULE] + # from ansible_collections.{ns}.{col}.plugins.module_utils.MODULE[.MODULE] import MODULE[, MODULE] + self.add_imports(['%s.%s' % (module, alias.name) for alias in node.names], node.lineno) + + def add_import(self, name: str, line_number: int) -> None: + """Record the specified import.""" + import_name = name + + while self.is_module_util_name(name): + if name in self.module_utils: + if name not in self.imports: + display.info('%s:%d imports module_utils: %s' % (self.path, line_number, name), verbosity=5) + self.imports.add(name) + + return # duplicate imports are ignored + + name = '.'.join(name.split('.')[:-1]) + + if is_subdir(self.path, data_context().content.test_path): + return # invalid imports in tests are ignored + + # Treat this error as a warning so tests can be executed as best as possible. + # This error should be detected by unit or integration tests. + display.warning('%s:%d Invalid module_utils import: %s' % (self.path, line_number, import_name)) + + def add_imports(self, names: list[str], line_no: int) -> None: + """Add the given import names if they are module_utils imports.""" + for name in names: + if self.is_module_util_name(name): + self.add_import(name, line_no) + + @staticmethod + def is_module_util_name(name: str) -> bool: + """Return True if the given name is a module_util name for the content under test. External module_utils are ignored.""" + if data_context().content.is_ansible and name.startswith('ansible.module_utils.'): + return True + + if data_context().content.collection and name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name): + return True + + return False |