diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/commands/sanity')
13 files changed, 2935 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/sanity/__init__.py b/test/lib/ansible_test/_internal/commands/sanity/__init__.py new file mode 100644 index 0000000..00b3031 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/__init__.py @@ -0,0 +1,1173 @@ +"""Execute Ansible sanity tests.""" +from __future__ import annotations + +import abc +import glob +import hashlib +import json +import os +import pathlib +import re +import collections +import collections.abc as c +import typing as t + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_ONLY_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...encoding import ( + to_bytes, +) + +from ...io import ( + read_json_file, + write_json_file, + write_text_file, +) + +from ...util import ( + ApplicationError, + SubprocessError, + display, + import_plugins, + load_plugins, + parse_to_list_of_dict, + ANSIBLE_TEST_CONTROLLER_ROOT, + ANSIBLE_TEST_TARGET_ROOT, + is_binary_file, + read_lines_without_comments, + is_subdir, + paths_to_dirs, + get_ansible_version, + str_to_version, + cache, + remove_tree, +) + +from ...util_common import ( + intercept_python, + handle_layout_messages, + yamlcheck, + create_result_directories, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...target import ( + walk_internal_targets, + walk_sanity_targets, + TestTarget, +) + +from ...executor import ( + get_changes_filter, + AllTargetsSkipped, + Delegate, +) + +from ...python_requirements import ( + PipInstall, + collect_requirements, + run_pip, +) + +from ...config import ( + SanityConfig, +) + +from ...test import ( + TestSuccess, + TestFailure, + TestSkipped, + TestMessage, + TestResult, + calculate_best_confidence, +) + +from ...data import ( + data_context, +) + +from ...content_config import ( + get_content_config, +) + +from ...host_configs import ( + DockerConfig, + PosixConfig, + PythonConfig, + VirtualPythonConfig, +) + +from ...host_profiles import ( + PosixProfile, +) + +from ...provisioning import ( + prepare_profiles, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...venv import ( + create_virtual_environment, +) + +COMMAND = 'sanity' +SANITY_ROOT = os.path.join(ANSIBLE_TEST_CONTROLLER_ROOT, 'sanity') +TARGET_SANITY_ROOT = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'sanity') + +# NOTE: must match ansible.constants.DOCUMENTABLE_PLUGINS, but with 'module' replaced by 'modules'! +DOCUMENTABLE_PLUGINS = ( + 'become', 'cache', 'callback', 'cliconf', 'connection', 'httpapi', 'inventory', 'lookup', 'netconf', 'modules', 'shell', 'strategy', 'vars' +) + +created_venvs: list[str] = [] + + +def command_sanity(args: SanityConfig) -> None: + """Run sanity tests.""" + create_result_directories(args) + + target_configs = t.cast(list[PosixConfig], args.targets) + target_versions: dict[str, PosixConfig] = {target.python.version: target for target in target_configs} + + handle_layout_messages(data_context().content.sanity_messages) + + changes = get_changes_filter(args) + require = args.require + changes + targets = SanityTargets.create(args.include, args.exclude, require) + + if not targets.include: + raise AllTargetsSkipped() + + tests = list(sanity_get_tests()) + + if args.test: + disabled = [] + tests = [target for target in tests if target.name in args.test] + else: + disabled = [target.name for target in tests if not target.enabled and not args.allow_disabled] + tests = [target for target in tests if target.enabled or args.allow_disabled] + + if args.skip_test: + tests = [target for target in tests if target.name not in args.skip_test] + + targets_use_pypi = any(isinstance(test, SanityMultipleVersion) and test.needs_pypi for test in tests) and not args.list_tests + host_state = prepare_profiles(args, targets_use_pypi=targets_use_pypi) # sanity + + get_content_config(args) # make sure content config has been parsed prior to delegation + + if args.delegate: + raise Delegate(host_state=host_state, require=changes, exclude=args.exclude) + + configure_pypi_proxy(args, host_state.controller_profile) # sanity + + if disabled: + display.warning('Skipping tests disabled by default without --allow-disabled: %s' % ', '.join(sorted(disabled))) + + target_profiles: dict[str, PosixProfile] = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)} + + total = 0 + failed = [] + + result: t.Optional[TestResult] + + for test in tests: + if args.list_tests: + print(test.name) # display goes to stderr, this should be on stdout + continue + + for version in SUPPORTED_PYTHON_VERSIONS: + options = '' + + if isinstance(test, SanityMultipleVersion): + if version not in target_versions and version not in args.host_settings.skipped_python_versions: + continue # version was not requested, skip it silently + else: + if version != args.controller_python.version: + continue # only multi-version sanity tests use target versions, the rest use the controller version + + if test.supported_python_versions and version not in test.supported_python_versions: + result = SanitySkipped(test.name, version) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} because it is unsupported.' \ + f' Supported Python versions: {", ".join(test.supported_python_versions)}' + else: + if isinstance(test, SanityCodeSmellTest): + settings = test.load_processor(args) + elif isinstance(test, SanityMultipleVersion): + settings = test.load_processor(args, version) + elif isinstance(test, SanitySingleVersion): + settings = test.load_processor(args) + elif isinstance(test, SanityVersionNeutral): + settings = test.load_processor(args) + else: + raise Exception('Unsupported test type: %s' % type(test)) + + all_targets = list(targets.targets) + + if test.all_targets: + usable_targets = list(targets.targets) + elif test.no_targets: + usable_targets = [] + else: + usable_targets = list(targets.include) + + all_targets = SanityTargets.filter_and_inject_targets(test, all_targets) + usable_targets = SanityTargets.filter_and_inject_targets(test, usable_targets) + + usable_targets = sorted(test.filter_targets_by_version(args, list(usable_targets), version)) + usable_targets = settings.filter_skipped_targets(usable_targets) + sanity_targets = SanityTargets(tuple(all_targets), tuple(usable_targets)) + + test_needed = bool(usable_targets or test.no_targets or args.prime_venvs) + result = None + + if test_needed and version in args.host_settings.skipped_python_versions: + # Deferred checking of Python availability. Done here since it is now known to be required for running the test. + # Earlier checking could cause a spurious warning to be generated for a collection which does not support the Python version. + # If the user specified a Python version, an error will be generated before reaching this point when the Python interpreter is not found. + result = SanitySkipped(test.name, version) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} because it could not be found.' + + if not result: + if isinstance(test, SanityMultipleVersion): + display.info(f'Running sanity test "{test.name}" on Python {version}') + else: + display.info(f'Running sanity test "{test.name}"') + + if test_needed and not result: + if isinstance(test, SanityMultipleVersion): + # multi-version sanity tests handle their own requirements (if any) and use the target python + test_profile = target_profiles[version] + result = test.test(args, sanity_targets, test_profile.python) + options = ' --python %s' % version + elif isinstance(test, SanitySingleVersion): + # single version sanity tests use the controller python + test_profile = host_state.controller_profile + virtualenv_python = create_sanity_virtualenv(args, test_profile.python, test.name) + + if virtualenv_python: + virtualenv_yaml = check_sanity_virtualenv_yaml(virtualenv_python) + + if test.require_libyaml and not virtualenv_yaml: + result = SanitySkipped(test.name) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} due to missing libyaml support in PyYAML.' + else: + if virtualenv_yaml is False: + display.warning(f'Sanity test "{test.name}" on Python {version} may be slow due to missing libyaml support in PyYAML.') + + if args.prime_venvs: + result = SanitySkipped(test.name) + else: + result = test.test(args, sanity_targets, virtualenv_python) + else: + result = SanitySkipped(test.name, version) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} due to missing virtual environment support.' + elif isinstance(test, SanityVersionNeutral): + if args.prime_venvs: + result = SanitySkipped(test.name) + else: + # version neutral sanity tests handle their own requirements (if any) + result = test.test(args, sanity_targets) + else: + raise Exception('Unsupported test type: %s' % type(test)) + elif result: + pass + else: + result = SanitySkipped(test.name, version) + + result.write(args) + + total += 1 + + if isinstance(result, SanityFailure): + failed.append(result.test + options) + + controller = args.controller + + if created_venvs and isinstance(controller, DockerConfig) and controller.name == 'default' and not args.prime_venvs: + names = ', '.join(created_venvs) + display.warning(f'There following sanity test virtual environments are out-of-date in the "default" container: {names}') + + if failed: + message = 'The %d sanity test(s) listed below (out of %d) failed. See error output above for details.\n%s' % ( + len(failed), total, '\n'.join(failed)) + + if args.failure_ok: + display.error(message) + else: + raise ApplicationError(message) + + +@cache +def collect_code_smell_tests() -> tuple[SanityTest, ...]: + """Return a tuple of available code smell sanity tests.""" + paths = glob.glob(os.path.join(SANITY_ROOT, 'code-smell', '*.py')) + + if data_context().content.is_ansible: + # include Ansible specific code-smell tests which are not configured to be skipped + ansible_code_smell_root = os.path.join(data_context().content.root, 'test', 'sanity', 'code-smell') + skip_tests = read_lines_without_comments(os.path.join(ansible_code_smell_root, 'skip.txt'), remove_blank_lines=True, optional=True) + paths.extend(path for path in glob.glob(os.path.join(ansible_code_smell_root, '*.py')) if os.path.basename(path) not in skip_tests) + + tests = tuple(SanityCodeSmellTest(p) for p in paths) + + return tests + + +class SanityIgnoreParser: + """Parser for the consolidated sanity test ignore file.""" + NO_CODE = '_' + + def __init__(self, args: SanityConfig) -> None: + if data_context().content.collection: + ansible_version = '%s.%s' % tuple(get_ansible_version().split('.')[:2]) + + ansible_label = 'Ansible %s' % ansible_version + file_name = 'ignore-%s.txt' % ansible_version + else: + ansible_label = 'Ansible' + file_name = 'ignore.txt' + + self.args = args + self.relative_path = os.path.join(data_context().content.sanity_path, file_name) + self.path = os.path.join(data_context().content.root, self.relative_path) + self.ignores: dict[str, dict[str, dict[str, int]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) + self.skips: dict[str, dict[str, int]] = collections.defaultdict(lambda: collections.defaultdict(int)) + self.parse_errors: list[tuple[int, int, str]] = [] + self.file_not_found_errors: list[tuple[int, str]] = [] + + lines = read_lines_without_comments(self.path, optional=True) + targets = SanityTargets.get_targets() + paths = set(target.path for target in targets) + tests_by_name: dict[str, SanityTest] = {} + versioned_test_names: set[str] = set() + unversioned_test_names: dict[str, str] = {} + directories = paths_to_dirs(list(paths)) + paths_by_test: dict[str, set[str]] = {} + + display.info('Read %d sanity test ignore line(s) for %s from: %s' % (len(lines), ansible_label, self.relative_path), verbosity=1) + + for test in sanity_get_tests(): + test_targets = SanityTargets.filter_and_inject_targets(test, targets) + + if isinstance(test, SanityMultipleVersion): + versioned_test_names.add(test.name) + + for python_version in test.supported_python_versions: + test_name = '%s-%s' % (test.name, python_version) + + paths_by_test[test_name] = set(target.path for target in test.filter_targets_by_version(args, test_targets, python_version)) + tests_by_name[test_name] = test + else: + unversioned_test_names.update(dict(('%s-%s' % (test.name, python_version), test.name) for python_version in SUPPORTED_PYTHON_VERSIONS)) + + paths_by_test[test.name] = set(target.path for target in test.filter_targets_by_version(args, test_targets, '')) + tests_by_name[test.name] = test + + for line_no, line in enumerate(lines, start=1): + if not line: + self.parse_errors.append((line_no, 1, "Line cannot be empty or contain only a comment")) + continue + + parts = line.split(' ') + path = parts[0] + codes = parts[1:] + + if not path: + self.parse_errors.append((line_no, 1, "Line cannot start with a space")) + continue + + if path.endswith(os.path.sep): + if path not in directories: + self.file_not_found_errors.append((line_no, path)) + continue + else: + if path not in paths: + self.file_not_found_errors.append((line_no, path)) + continue + + if not codes: + self.parse_errors.append((line_no, len(path), "Error code required after path")) + continue + + code = codes[0] + + if not code: + self.parse_errors.append((line_no, len(path) + 1, "Error code after path cannot be empty")) + continue + + if len(codes) > 1: + self.parse_errors.append((line_no, len(path) + len(code) + 2, "Error code cannot contain spaces")) + continue + + parts = code.split('!') + code = parts[0] + commands = parts[1:] + + parts = code.split(':') + test_name = parts[0] + error_codes = parts[1:] + + test = tests_by_name.get(test_name) + + if not test: + unversioned_name = unversioned_test_names.get(test_name) + + if unversioned_name: + self.parse_errors.append((line_no, len(path) + len(unversioned_name) + 2, "Sanity test '%s' cannot use a Python version like '%s'" % ( + unversioned_name, test_name))) + elif test_name in versioned_test_names: + self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires a Python version like '%s-%s'" % ( + test_name, test_name, args.controller_python.version))) + else: + self.parse_errors.append((line_no, len(path) + 2, "Sanity test '%s' does not exist" % test_name)) + + continue + + if path.endswith(os.path.sep) and not test.include_directories: + self.parse_errors.append((line_no, 1, "Sanity test '%s' does not support directory paths" % test_name)) + continue + + if path not in paths_by_test[test_name] and not test.no_targets: + self.parse_errors.append((line_no, 1, "Sanity test '%s' does not test path '%s'" % (test_name, path))) + continue + + if commands and error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Error code cannot contain both '!' and ':' characters")) + continue + + if commands: + command = commands[0] + + if len(commands) > 1: + self.parse_errors.append((line_no, len(path) + len(test_name) + len(command) + 3, "Error code cannot contain multiple '!' characters")) + continue + + if command == 'skip': + if not test.can_skip: + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' cannot be skipped" % test_name)) + continue + + existing_line_no = self.skips.get(test_name, {}).get(path) + + if existing_line_no: + self.parse_errors.append((line_no, 1, "Duplicate '%s' skip for path '%s' first found on line %d" % (test_name, path, existing_line_no))) + continue + + self.skips[test_name][path] = line_no + continue + + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Command '!%s' not recognized" % command)) + continue + + if not test.can_ignore: + self.parse_errors.append((line_no, len(path) + 1, "Sanity test '%s' cannot be ignored" % test_name)) + continue + + if test.error_code: + if not error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires an error code" % test_name)) + continue + + error_code = error_codes[0] + + if len(error_codes) > 1: + self.parse_errors.append((line_no, len(path) + len(test_name) + len(error_code) + 3, "Error code cannot contain multiple ':' characters")) + continue + + if error_code in test.optional_error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 3, "Optional error code '%s' cannot be ignored" % ( + error_code))) + continue + else: + if error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' does not support error codes" % test_name)) + continue + + error_code = self.NO_CODE + + existing = self.ignores.get(test_name, {}).get(path, {}).get(error_code) + + if existing: + if test.error_code: + self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for error code '%s' for path '%s' first found on line %d" % ( + test_name, error_code, path, existing))) + else: + self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for path '%s' first found on line %d" % ( + test_name, path, existing))) + + continue + + self.ignores[test_name][path][error_code] = line_no + + @staticmethod + def load(args: SanityConfig) -> SanityIgnoreParser: + """Return the current SanityIgnore instance, initializing it if needed.""" + try: + return SanityIgnoreParser.instance # type: ignore[attr-defined] + except AttributeError: + pass + + instance = SanityIgnoreParser(args) + + SanityIgnoreParser.instance = instance # type: ignore[attr-defined] + + return instance + + +class SanityIgnoreProcessor: + """Processor for sanity test ignores for a single run of one sanity test.""" + def __init__(self, + args: SanityConfig, + test: SanityTest, + python_version: t.Optional[str], + ) -> None: + name = test.name + code = test.error_code + + if python_version: + full_name = '%s-%s' % (name, python_version) + else: + full_name = name + + self.args = args + self.test = test + self.code = code + self.parser = SanityIgnoreParser.load(args) + self.ignore_entries = self.parser.ignores.get(full_name, {}) + self.skip_entries = self.parser.skips.get(full_name, {}) + self.used_line_numbers: set[int] = set() + + def filter_skipped_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given targets, with any skipped paths filtered out.""" + return sorted(target for target in targets if target.path not in self.skip_entries) + + def process_errors(self, errors: list[SanityMessage], paths: list[str]) -> list[SanityMessage]: + """Return the given errors filtered for ignores and with any settings related errors included.""" + errors = self.filter_messages(errors) + errors.extend(self.get_errors(paths)) + + errors = sorted(set(errors)) + + return errors + + def filter_messages(self, messages: list[SanityMessage]) -> list[SanityMessage]: + """Return a filtered list of the given messages using the entries that have been loaded.""" + filtered = [] + + for message in messages: + if message.code in self.test.optional_error_codes and not self.args.enable_optional_errors: + continue + + path_entry = self.ignore_entries.get(message.path) + + if path_entry: + code = message.code if self.code else SanityIgnoreParser.NO_CODE + line_no = path_entry.get(code) + + if line_no: + self.used_line_numbers.add(line_no) + continue + + filtered.append(message) + + return filtered + + def get_errors(self, paths: list[str]) -> list[SanityMessage]: + """Return error messages related to issues with the file.""" + messages: list[SanityMessage] = [] + + # unused errors + + unused: list[tuple[int, str, str]] = [] + + if self.test.no_targets or self.test.all_targets: + # tests which do not accept a target list, or which use all targets, always return all possible errors, so all ignores can be checked + targets = SanityTargets.get_targets() + test_targets = SanityTargets.filter_and_inject_targets(self.test, targets) + paths = [target.path for target in test_targets] + + for path in paths: + path_entry = self.ignore_entries.get(path) + + if not path_entry: + continue + + unused.extend((line_no, path, code) for code, line_no in path_entry.items() if line_no not in self.used_line_numbers) + + messages.extend(SanityMessage( + code=self.code, + message="Ignoring '%s' on '%s' is unnecessary" % (code, path) if self.code else "Ignoring '%s' is unnecessary" % path, + path=self.parser.relative_path, + line=line, + column=1, + confidence=calculate_best_confidence(((self.parser.path, line), (path, 0)), self.args.metadata) if self.args.metadata.changes else None, + ) for line, path, code in unused) + + return messages + + +class SanitySuccess(TestSuccess): + """Sanity test success.""" + def __init__(self, test: str, python_version: t.Optional[str] = None) -> None: + super().__init__(COMMAND, test, python_version) + + +class SanitySkipped(TestSkipped): + """Sanity test skipped.""" + def __init__(self, test: str, python_version: t.Optional[str] = None) -> None: + super().__init__(COMMAND, test, python_version) + + +class SanityFailure(TestFailure): + """Sanity test failure.""" + def __init__( + self, + test: str, + python_version: t.Optional[str] = None, + messages: t.Optional[c.Sequence[SanityMessage]] = None, + summary: t.Optional[str] = None, + ) -> None: + super().__init__(COMMAND, test, python_version, messages, summary) + + +class SanityMessage(TestMessage): + """Single sanity test message for one file.""" + + +class SanityTargets: + """Sanity test target information.""" + def __init__(self, targets: tuple[TestTarget, ...], include: tuple[TestTarget, ...]) -> None: + self.targets = targets + self.include = include + + @staticmethod + def create(include: list[str], exclude: list[str], require: list[str]) -> SanityTargets: + """Create a SanityTargets instance from the given include, exclude and require lists.""" + _targets = SanityTargets.get_targets() + _include = walk_internal_targets(_targets, include, exclude, require) + return SanityTargets(_targets, _include) + + @staticmethod + def filter_and_inject_targets(test: SanityTest, targets: c.Iterable[TestTarget]) -> list[TestTarget]: + """Filter and inject targets based on test requirements and the given target list.""" + test_targets = list(targets) + + if not test.include_symlinks: + # remove all symlinks unless supported by the test + test_targets = [target for target in test_targets if not target.symlink] + + if not test.include_directories or not test.include_symlinks: + # exclude symlinked directories unless supported by the test + test_targets = [target for target in test_targets if not target.path.endswith(os.path.sep)] + + if test.include_directories: + # include directories containing any of the included files + test_targets += tuple(TestTarget(path, None, None, '') for path in paths_to_dirs([target.path for target in test_targets])) + + if not test.include_symlinks: + # remove all directory symlinks unless supported by the test + test_targets = [target for target in test_targets if not target.symlink] + + return test_targets + + @staticmethod + def get_targets() -> tuple[TestTarget, ...]: + """Return a tuple of sanity test targets. Uses a cached version when available.""" + try: + return SanityTargets.get_targets.targets # type: ignore[attr-defined] + except AttributeError: + targets = tuple(sorted(walk_sanity_targets())) + + SanityTargets.get_targets.targets = targets # type: ignore[attr-defined] + + return targets + + +class SanityTest(metaclass=abc.ABCMeta): + """Sanity test base class.""" + ansible_only = False + + def __init__(self, name: t.Optional[str] = None) -> None: + if not name: + name = self.__class__.__name__ + name = re.sub(r'Test$', '', name) # drop Test suffix + name = re.sub(r'(.)([A-Z][a-z]+)', r'\1-\2', name).lower() # use dashes instead of capitalization + + self.name = name + self.enabled = True + + # Optional error codes represent errors which spontaneously occur without changes to the content under test, such as those based on the current date. + # Because these errors can be unpredictable they behave differently than normal error codes: + # * They are not reported by default. The `--enable-optional-errors` option must be used to display these errors. + # * They cannot be ignored. This is done to maintain the integrity of the ignore system. + self.optional_error_codes: set[str] = set() + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return None + + @property + def can_ignore(self) -> bool: + """True if the test supports ignore entries.""" + return True + + @property + def can_skip(self) -> bool: + """True if the test supports skip entries.""" + return not self.all_targets and not self.no_targets + + @property + def all_targets(self) -> bool: + """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets.""" + return False + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return False + + @property + def include_directories(self) -> bool: + """True if the test targets should include directories.""" + return False + + @property + def include_symlinks(self) -> bool: + """True if the test targets should include symlinks.""" + return False + + @property + def py2_compat(self) -> bool: + """True if the test only applies to code that runs on Python 2.x.""" + return False + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + return CONTROLLER_PYTHON_VERSIONS + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: # pylint: disable=unused-argument + """Return the given list of test targets, filtered to include only those relevant for the test.""" + if self.no_targets: + return [] + + raise NotImplementedError('Sanity test "%s" must implement "filter_targets" or set "no_targets" to True.' % self.name) + + def filter_targets_by_version(self, args: SanityConfig, targets: list[TestTarget], python_version: str) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test, taking into account the Python version.""" + del python_version # python_version is not used here, but derived classes may make use of it + + targets = self.filter_targets(targets) + + if self.py2_compat: + # This sanity test is a Python 2.x compatibility test. + content_config = get_content_config(args) + + if content_config.py2_support: + # This collection supports Python 2.x. + # Filter targets to include only those that require support for remote-only Python versions. + targets = self.filter_remote_targets(targets) + else: + # This collection does not support Python 2.x. + # There are no targets to test. + targets = [] + + return targets + + @staticmethod + def filter_remote_targets(targets: list[TestTarget]) -> list[TestTarget]: + """Return a filtered list of the given targets, including only those that require support for remote-only Python versions.""" + targets = [target for target in targets if ( + is_subdir(target.path, data_context().content.module_path) or + is_subdir(target.path, data_context().content.module_utils_path) or + is_subdir(target.path, data_context().content.unit_module_path) or + is_subdir(target.path, data_context().content.unit_module_utils_path) or + # include modules/module_utils within integration test library directories + re.search('^%s/.*/library/' % re.escape(data_context().content.integration_targets_path), target.path) or + # special handling for content in ansible-core + (data_context().content.is_ansible and ( + # utility code that runs in target environments and requires support for remote-only Python versions + is_subdir(target.path, 'test/lib/ansible_test/_util/target/') or + # integration test support modules/module_utils continue to require support for remote-only Python versions + re.search('^test/support/integration/.*/(modules|module_utils)/', target.path) or + # collection loader requires support for remote-only Python versions + re.search('^lib/ansible/utils/collection_loader/', target.path) + )) + )] + + return targets + + +class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta): + """Base class for sanity test plugins which should run on a single python version.""" + @property + def require_libyaml(self) -> bool: + """True if the test requires PyYAML to have libyaml support.""" + return False + + @abc.abstractmethod + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + """Run the sanity test and return the result.""" + + def load_processor(self, args: SanityConfig) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, None) + + +class SanityCodeSmellTest(SanitySingleVersion): + """Sanity test script.""" + def __init__(self, path) -> None: + name = os.path.splitext(os.path.basename(path))[0] + config_path = os.path.splitext(path)[0] + '.json' + + super().__init__(name=name) + + self.path = path + self.config_path = config_path if os.path.exists(config_path) else None + self.config = None + + if self.config_path: + self.config = read_json_file(self.config_path) + + if self.config: + self.enabled = not self.config.get('disabled') + + self.output: t.Optional[str] = self.config.get('output') + self.extensions: list[str] = self.config.get('extensions') + self.prefixes: list[str] = self.config.get('prefixes') + self.files: list[str] = self.config.get('files') + self.text: t.Optional[bool] = self.config.get('text') + self.ignore_self: bool = self.config.get('ignore_self') + self.minimum_python_version: t.Optional[str] = self.config.get('minimum_python_version') + self.maximum_python_version: t.Optional[str] = self.config.get('maximum_python_version') + + self.__all_targets: bool = self.config.get('all_targets') + self.__no_targets: bool = self.config.get('no_targets') + self.__include_directories: bool = self.config.get('include_directories') + self.__include_symlinks: bool = self.config.get('include_symlinks') + self.__py2_compat: bool = self.config.get('py2_compat', False) + else: + self.output = None + self.extensions = [] + self.prefixes = [] + self.files = [] + self.text = None + self.ignore_self = False + self.minimum_python_version = None + self.maximum_python_version = None + + self.__all_targets = False + self.__no_targets = True + self.__include_directories = False + self.__include_symlinks = False + self.__py2_compat = False + + if self.no_targets: + mutually_exclusive = ( + 'extensions', + 'prefixes', + 'files', + 'text', + 'ignore_self', + 'all_targets', + 'include_directories', + 'include_symlinks', + ) + + problems = sorted(name for name in mutually_exclusive if getattr(self, name)) + + if problems: + raise ApplicationError('Sanity test "%s" option "no_targets" is mutually exclusive with options: %s' % (self.name, ', '.join(problems))) + + @property + def all_targets(self) -> bool: + """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets.""" + return self.__all_targets + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return self.__no_targets + + @property + def include_directories(self) -> bool: + """True if the test targets should include directories.""" + return self.__include_directories + + @property + def include_symlinks(self) -> bool: + """True if the test targets should include symlinks.""" + return self.__include_symlinks + + @property + def py2_compat(self) -> bool: + """True if the test only applies to code that runs on Python 2.x.""" + return self.__py2_compat + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + versions = super().supported_python_versions + + if self.minimum_python_version: + versions = tuple(version for version in versions if str_to_version(version) >= str_to_version(self.minimum_python_version)) + + if self.maximum_python_version: + versions = tuple(version for version in versions if str_to_version(version) <= str_to_version(self.maximum_python_version)) + + return versions + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + if self.no_targets: + return [] + + if self.text is not None: + if self.text: + targets = [target for target in targets if not is_binary_file(target.path)] + else: + targets = [target for target in targets if is_binary_file(target.path)] + + if self.extensions: + targets = [target for target in targets if os.path.splitext(target.path)[1] in self.extensions + or (is_subdir(target.path, 'bin') and '.py' in self.extensions)] + + if self.prefixes: + targets = [target for target in targets if any(target.path.startswith(pre) for pre in self.prefixes)] + + if self.files: + targets = [target for target in targets if os.path.basename(target.path) in self.files] + + if self.ignore_self and data_context().content.is_ansible: + relative_self_path = os.path.relpath(self.path, data_context().content.root) + targets = [target for target in targets if target.path != relative_self_path] + + return targets + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + """Run the sanity test and return the result.""" + cmd = [python.path, self.path] + + env = ansible_environment(args, color=False) + env.update(PYTHONUTF8='1') # force all code-smell sanity tests to run with Python UTF-8 Mode enabled + + pattern = None + data = None + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + if self.config: + if self.output == 'path-line-column-message': + pattern = '^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' + elif self.output == 'path-message': + pattern = '^(?P<path>[^:]*): (?P<message>.*)$' + else: + raise ApplicationError('Unsupported output type: %s' % self.output) + + if not self.no_targets: + data = '\n'.join(paths) + + if data: + display.info(data, verbosity=4) + + try: + stdout, stderr = intercept_python(args, python, cmd, data=data, env=env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if args.explain: + return SanitySuccess(self.name) + + if stdout and not stderr: + if pattern: + matches = parse_to_list_of_dict(pattern, stdout) + + messages = [SanityMessage( + message=m['message'], + path=m['path'], + line=int(m.get('line', 0)), + column=int(m.get('column', 0)), + ) for m in matches] + + messages = settings.process_errors(messages, paths) + + if not messages: + return SanitySuccess(self.name) + + return SanityFailure(self.name, messages=messages) + + if stderr or status: + summary = '%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + return SanityFailure(self.name, summary=summary) + + messages = settings.process_errors([], paths) + + if messages: + return SanityFailure(self.name, messages=messages) + + return SanitySuccess(self.name) + + def load_processor(self, args: SanityConfig) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, None) + + +class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta): + """Base class for sanity test plugins which are idependent of the python version being used.""" + @abc.abstractmethod + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + """Run the sanity test and return the result.""" + + def load_processor(self, args: SanityConfig) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, None) + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + return None + + +class SanityMultipleVersion(SanityTest, metaclass=abc.ABCMeta): + """Base class for sanity test plugins which should run on multiple python versions.""" + @abc.abstractmethod + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + """Run the sanity test and return the result.""" + + def load_processor(self, args: SanityConfig, python_version: str) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, python_version) + + @property + def needs_pypi(self) -> bool: + """True if the test requires PyPI, otherwise False.""" + return False + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + return SUPPORTED_PYTHON_VERSIONS + + def filter_targets_by_version(self, args: SanityConfig, targets: list[TestTarget], python_version: str) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test, taking into account the Python version.""" + if not python_version: + raise Exception('python_version is required to filter multi-version tests') + + targets = super().filter_targets_by_version(args, targets, python_version) + + if python_version in REMOTE_ONLY_PYTHON_VERSIONS: + content_config = get_content_config(args) + + if python_version not in content_config.modules.python_versions: + # when a remote-only python version is not supported there are no paths to test + return [] + + # when a remote-only python version is supported, tests must be applied only to targets that support remote-only Python versions + targets = self.filter_remote_targets(targets) + + return targets + + +@cache +def sanity_get_tests() -> tuple[SanityTest, ...]: + """Return a tuple of the available sanity tests.""" + import_plugins('commands/sanity') + sanity_plugins: dict[str, t.Type[SanityTest]] = {} + load_plugins(SanityTest, sanity_plugins) + sanity_plugins.pop('sanity') # SanityCodeSmellTest + sanity_tests = tuple(plugin() for plugin in sanity_plugins.values() if data_context().content.is_ansible or not plugin.ansible_only) + sanity_tests = tuple(sorted(sanity_tests + collect_code_smell_tests(), key=lambda k: k.name)) + return sanity_tests + + +def create_sanity_virtualenv( + args: SanityConfig, + python: PythonConfig, + name: str, + coverage: bool = False, + minimize: bool = False, +) -> t.Optional[VirtualPythonConfig]: + """Return an existing sanity virtual environment matching the requested parameters or create a new one.""" + commands = collect_requirements( # create_sanity_virtualenv() + python=python, + controller=True, + virtualenv=False, + command=None, + ansible=False, + cryptography=False, + coverage=coverage, + minimize=minimize, + sanity=name, + ) + + if commands: + label = f'sanity.{name}' + else: + label = 'sanity' # use a single virtualenv name for tests which have no requirements + + # The path to the virtual environment must be kept short to avoid the 127 character shebang length limit on Linux. + # If the limit is exceeded, generated entry point scripts from pip installed packages will fail with syntax errors. + virtualenv_install = json.dumps([command.serialize() for command in commands], indent=4) + virtualenv_hash = hashlib.sha256(to_bytes(virtualenv_install)).hexdigest()[:8] + virtualenv_cache = os.path.join(os.path.expanduser('~/.ansible/test/venv')) + virtualenv_path = os.path.join(virtualenv_cache, label, f'{python.version}', virtualenv_hash) + virtualenv_marker = os.path.join(virtualenv_path, 'marker.txt') + + meta_install = os.path.join(virtualenv_path, 'meta.install.json') + meta_yaml = os.path.join(virtualenv_path, 'meta.yaml.json') + + virtualenv_python = VirtualPythonConfig( + version=python.version, + path=os.path.join(virtualenv_path, 'bin', 'python'), + ) + + if not os.path.exists(virtualenv_marker): + # a virtualenv without a marker is assumed to have been partially created + remove_tree(virtualenv_path) + + if not create_virtual_environment(args, python, virtualenv_path): + return None + + run_pip(args, virtualenv_python, commands, None) # create_sanity_virtualenv() + + write_text_file(meta_install, virtualenv_install) + + # false positive: pylint: disable=no-member + if any(isinstance(command, PipInstall) and command.has_package('pyyaml') for command in commands): + virtualenv_yaml = yamlcheck(virtualenv_python) + else: + virtualenv_yaml = None + + write_json_file(meta_yaml, virtualenv_yaml) + + created_venvs.append(f'{label}-{python.version}') + + # touch the marker to keep track of when the virtualenv was last used + pathlib.Path(virtualenv_marker).touch() + + return virtualenv_python + + +def check_sanity_virtualenv_yaml(python: VirtualPythonConfig) -> t.Optional[bool]: + """Return True if PyYAML has libyaml support for the given sanity virtual environment, False if it does not and None if it was not found.""" + virtualenv_path = os.path.dirname(os.path.dirname(python.path)) + meta_yaml = os.path.join(virtualenv_path, 'meta.yaml.json') + virtualenv_yaml = read_json_file(meta_yaml) + + return virtualenv_yaml diff --git a/test/lib/ansible_test/_internal/commands/sanity/ansible_doc.py b/test/lib/ansible_test/_internal/commands/sanity/ansible_doc.py new file mode 100644 index 0000000..6815f88 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/ansible_doc.py @@ -0,0 +1,127 @@ +"""Sanity test for ansible-doc.""" +from __future__ import annotations + +import collections +import os +import re + +from . import ( + DOCUMENTABLE_PLUGINS, + SanitySingleVersion, + SanityFailure, + SanitySuccess, + SanityTargets, + SanityMessage, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + is_subdir, +) + +from ...ansible_util import ( + ansible_environment, + intercept_python, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class AnsibleDocTest(SanitySingleVersion): + """Sanity test for ansible-doc.""" + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + plugin_paths = [plugin_path for plugin_type, plugin_path in data_context().content.plugin_paths.items() if plugin_type in DOCUMENTABLE_PLUGINS] + + return [target for target in targets + if os.path.splitext(target.path)[1] == '.py' + and os.path.basename(target.path) != '__init__.py' + and any(is_subdir(target.path, path) for path in plugin_paths) + ] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + doc_targets: dict[str, list[str]] = collections.defaultdict(list) + + remap_types = dict( + modules='module', + ) + + for plugin_type, plugin_path in data_context().content.plugin_paths.items(): + plugin_type = remap_types.get(plugin_type, plugin_type) + + for plugin_file_path in [target.name for target in targets.include if is_subdir(target.path, plugin_path)]: + plugin_parts = os.path.relpath(plugin_file_path, plugin_path).split(os.path.sep) + plugin_name = os.path.splitext(plugin_parts[-1])[0] + + if plugin_name.startswith('_'): + plugin_name = plugin_name[1:] + + plugin_fqcn = data_context().content.prefix + '.'.join(plugin_parts[:-1] + [plugin_name]) + + doc_targets[plugin_type].append(plugin_fqcn) + + env = ansible_environment(args, color=False) + error_messages: list[SanityMessage] = [] + + for doc_type in sorted(doc_targets): + for format_option in [None, '--json']: + cmd = ['ansible-doc', '-t', doc_type] + if format_option is not None: + cmd.append(format_option) + cmd.extend(sorted(doc_targets[doc_type])) + + try: + stdout, stderr = intercept_python(args, python, cmd, env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if status: + summary = '%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr) + return SanityFailure(self.name, summary=summary) + + if stdout: + display.info(stdout.strip(), verbosity=3) + + if stderr: + # ignore removed module/plugin warnings + stderr = re.sub(r'\[WARNING]: [^ ]+ [^ ]+ has been removed\n', '', stderr).strip() + + if stderr: + summary = 'Output on stderr from ansible-doc is considered an error.\n\n%s' % SubprocessError(cmd, stderr=stderr) + return SanityFailure(self.name, summary=summary) + + if args.explain: + return SanitySuccess(self.name) + + error_messages = settings.process_errors(error_messages, paths) + + if error_messages: + return SanityFailure(self.name, messages=error_messages) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/compile.py b/test/lib/ansible_test/_internal/commands/sanity/compile.py new file mode 100644 index 0000000..4505338 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/compile.py @@ -0,0 +1,94 @@ +"""Sanity test for proper python syntax.""" +from __future__ import annotations + +import os + +from . import ( + SanityMultipleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SanitySkipped, + TARGET_SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + parse_to_list_of_dict, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...host_configs import ( + PythonConfig, +) + + +class CompileTest(SanityMultipleVersion): + """Sanity test for proper python syntax.""" + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + if args.prime_venvs: + return SanitySkipped(self.name, python_version=python.version) + + settings = self.load_processor(args, python.version) + + paths = [target.path for target in targets.include] + + cmd = [python.path, os.path.join(TARGET_SANITY_ROOT, 'compile', 'compile.py')] + + data = '\n'.join(paths) + + display.info(data, verbosity=4) + + try: + stdout, stderr = run_command(args, cmd, data=data, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return SanitySuccess(self.name, python_version=python.version) + + pattern = r'^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' + + results = parse_to_list_of_dict(pattern, stdout) + + results = [SanityMessage( + message=r['message'], + path=r['path'].replace('./', ''), + line=int(r['line']), + column=int(r['column']), + ) for r in results] + + results = settings.process_errors(results, paths) + + if results: + return SanityFailure(self.name, messages=results, python_version=python.version) + + return SanitySuccess(self.name, python_version=python.version) diff --git a/test/lib/ansible_test/_internal/commands/sanity/ignores.py b/test/lib/ansible_test/_internal/commands/sanity/ignores.py new file mode 100644 index 0000000..6d9837d --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/ignores.py @@ -0,0 +1,84 @@ +"""Sanity test for the sanity ignore file.""" +from __future__ import annotations + +import os + +from . import ( + SanityFailure, + SanityIgnoreParser, + SanityVersionNeutral, + SanitySuccess, + SanityMessage, + SanityTargets, +) + +from ...test import ( + calculate_confidence, + calculate_best_confidence, + TestResult, +) + +from ...config import ( + SanityConfig, +) + + +class IgnoresTest(SanityVersionNeutral): + """Sanity test for sanity test ignore entries.""" + @property + def can_ignore(self) -> bool: + """True if the test supports ignore entries.""" + return False + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + sanity_ignore = SanityIgnoreParser.load(args) + + messages: list[SanityMessage] = [] + + # parse errors + + messages.extend(SanityMessage( + message=message, + path=sanity_ignore.relative_path, + line=line, + column=column, + confidence=calculate_confidence(sanity_ignore.path, line, args.metadata) if args.metadata.changes else None, + ) for line, column, message in sanity_ignore.parse_errors) + + # file not found errors + + messages.extend(SanityMessage( + message="%s '%s' does not exist" % ("Directory" if path.endswith(os.path.sep) else "File", path), + path=sanity_ignore.relative_path, + line=line, + column=1, + confidence=calculate_best_confidence(((sanity_ignore.path, line), (path, 0)), args.metadata) if args.metadata.changes else None, + ) for line, path in sanity_ignore.file_not_found_errors) + + # conflicting ignores and skips + + for test_name, ignores in sanity_ignore.ignores.items(): + for ignore_path, ignore_entry in ignores.items(): + skip_line_no = sanity_ignore.skips.get(test_name, {}).get(ignore_path) + + if not skip_line_no: + continue + + for ignore_line_no in ignore_entry.values(): + messages.append(SanityMessage( + message="Ignoring '%s' is unnecessary due to skip entry on line %d" % (ignore_path, skip_line_no), + path=sanity_ignore.relative_path, + line=ignore_line_no, + column=1, + confidence=calculate_confidence(sanity_ignore.path, ignore_line_no, args.metadata) if args.metadata.changes else None, + )) + + if messages: + return SanityFailure(self.name, messages=messages) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/import.py b/test/lib/ansible_test/_internal/commands/sanity/import.py new file mode 100644 index 0000000..8511d7a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/import.py @@ -0,0 +1,217 @@ +"""Sanity test for proper import exception handling.""" +from __future__ import annotations + +import collections.abc as c +import os + +from . import ( + SanityMultipleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + TARGET_SANITY_ROOT, + SanityTargets, + create_sanity_virtualenv, + check_sanity_virtualenv_yaml, +) + +from ...constants import ( + CONTROLLER_MIN_PYTHON_VERSION, + REMOTE_ONLY_PYTHON_VERSIONS, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + cache, + SubprocessError, + display, + parse_to_list_of_dict, + is_subdir, + ANSIBLE_TEST_TOOLS_ROOT, +) + +from ...util_common import ( + ResultType, + create_temp_dir, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...python_requirements import ( + PipUnavailableError, + install_requirements, +) + +from ...config import ( + SanityConfig, +) + +from ...coverage_util import ( + cover_python, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + +from ...venv import ( + get_virtualenv_version, +) + + +def _get_module_test(module_restrictions: bool) -> c.Callable[[str], bool]: + """Create a predicate which tests whether a path can be used by modules or not.""" + module_path = data_context().content.module_path + module_utils_path = data_context().content.module_utils_path + if module_restrictions: + return lambda path: is_subdir(path, module_path) or is_subdir(path, module_utils_path) + return lambda path: not (is_subdir(path, module_path) or is_subdir(path, module_utils_path)) + + +class ImportTest(SanityMultipleVersion): + """Sanity test for proper import exception handling.""" + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + if data_context().content.is_ansible: + # all of ansible-core must pass the import test, not just plugins/modules + # modules/module_utils will be tested using the module context + # everything else will be tested using the plugin context + paths = ['lib/ansible'] + else: + # only plugins/modules must pass the import test for collections + paths = list(data_context().content.plugin_paths.values()) + + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' and + any(is_subdir(target.path, path) for path in paths)] + + @property + def needs_pypi(self) -> bool: + """True if the test requires PyPI, otherwise False.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args, python.version) + + paths = [target.path for target in targets.include] + + if python.version.startswith('2.') and (get_virtualenv_version(args, python.path) or (0,)) < (13,): + # hack to make sure that virtualenv is available under Python 2.x + # on Python 3.x we can use the built-in venv + # version 13+ is required to use the `--no-wheel` option + try: + install_requirements(args, python, virtualenv=True, controller=False) # sanity (import) + except PipUnavailableError as ex: + display.warning(str(ex)) + + temp_root = os.path.join(ResultType.TMP.path, 'sanity', 'import') + + messages = [] + + for import_type, test in ( + ('module', _get_module_test(True)), + ('plugin', _get_module_test(False)), + ): + if import_type == 'plugin' and python.version in REMOTE_ONLY_PYTHON_VERSIONS: + continue + + data = '\n'.join([path for path in paths if test(path)]) + + if not data and not args.prime_venvs: + continue + + virtualenv_python = create_sanity_virtualenv(args, python, f'{self.name}.{import_type}', coverage=args.coverage, minimize=True) + + if not virtualenv_python: + display.warning(f'Skipping sanity test "{self.name}" on Python {python.version} due to missing virtual environment support.') + return SanitySkipped(self.name, python.version) + + virtualenv_yaml = check_sanity_virtualenv_yaml(virtualenv_python) + + if virtualenv_yaml is False: + display.warning(f'Sanity test "{self.name}" ({import_type}) on Python {python.version} may be slow due to missing libyaml support in PyYAML.') + + env = ansible_environment(args, color=False) + + env.update( + SANITY_TEMP_PATH=ResultType.TMP.path, + SANITY_IMPORTER_TYPE=import_type, + ) + + if data_context().content.collection: + external_python = create_sanity_virtualenv(args, args.controller_python, self.name) + + env.update( + SANITY_COLLECTION_FULL_NAME=data_context().content.collection.full_name, + SANITY_EXTERNAL_PYTHON=external_python.path, + SANITY_YAML_TO_JSON=os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'yaml_to_json.py'), + ANSIBLE_CONTROLLER_MIN_PYTHON_VERSION=CONTROLLER_MIN_PYTHON_VERSION, + PYTHONPATH=':'.join((get_ansible_test_python_path(), env["PYTHONPATH"])), + ) + + if args.prime_venvs: + continue + + display.info(import_type + ': ' + data, verbosity=4) + + cmd = ['importer.py'] + + # add the importer to the path so it can be accessed through the coverage injector + env.update( + PATH=os.pathsep.join([os.path.join(TARGET_SANITY_ROOT, 'import'), env['PATH']]), + ) + + try: + stdout, stderr = cover_python(args, virtualenv_python, cmd, self.name, env, capture=True, data=data) + + if stdout or stderr: + raise SubprocessError(cmd, stdout=stdout, stderr=stderr) + except SubprocessError as ex: + if ex.status != 10 or ex.stderr or not ex.stdout: + raise + + pattern = r'^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' + + parsed = parse_to_list_of_dict(pattern, ex.stdout) + + relative_temp_root = os.path.relpath(temp_root, data_context().content.root) + os.path.sep + + messages += [SanityMessage( + message=r['message'], + path=os.path.relpath(r['path'], relative_temp_root) if r['path'].startswith(relative_temp_root) else r['path'], + line=int(r['line']), + column=int(r['column']), + ) for r in parsed] + + if args.prime_venvs: + return SanitySkipped(self.name, python_version=python.version) + + results = settings.process_errors(messages, paths) + + if results: + return SanityFailure(self.name, messages=results, python_version=python.version) + + return SanitySuccess(self.name, python_version=python.version) + + +@cache +def get_ansible_test_python_path() -> str: + """ + Return a directory usable for PYTHONPATH, containing only the ansible-test collection loader. + The temporary directory created will be cached for the lifetime of the process and cleaned up at exit. + """ + python_path = create_temp_dir(prefix='ansible-test-') + return python_path diff --git a/test/lib/ansible_test/_internal/commands/sanity/mypy.py b/test/lib/ansible_test/_internal/commands/sanity/mypy.py new file mode 100644 index 0000000..cb8ed12 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/mypy.py @@ -0,0 +1,259 @@ +"""Sanity test which executes mypy.""" +from __future__ import annotations + +import dataclasses +import os +import re +import typing as t + +from . import ( + SanityMultipleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + SanityTargets, + create_sanity_virtualenv, +) + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_ONLY_PYTHON_VERSIONS, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + parse_to_list_of_dict, + ANSIBLE_TEST_CONTROLLER_ROOT, + ApplicationError, + is_subdir, +) + +from ...util_common import ( + intercept_python, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...config import ( + SanityConfig, +) + +from ...host_configs import ( + PythonConfig, + VirtualPythonConfig, +) + + +class MypyTest(SanityMultipleVersion): + """Sanity test which executes mypy.""" + ansible_only = True + + vendored_paths = ( + 'lib/ansible/module_utils/six/__init__.py', + 'lib/ansible/module_utils/distro/_distro.py', + 'lib/ansible/module_utils/compat/_selectors2.py', + ) + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' and target.path not in self.vendored_paths and ( + target.path.startswith('lib/ansible/') or target.path.startswith('test/lib/ansible_test/_internal/') + or target.path.startswith('test/lib/ansible_test/_util/target/sanity/import/'))] + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'ansible-test' + + @property + def needs_pypi(self) -> bool: + """True if the test requires PyPI, otherwise False.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args, python.version) + + paths = [target.path for target in targets.include] + + virtualenv_python = create_sanity_virtualenv(args, args.controller_python, self.name) + + if args.prime_venvs: + return SanitySkipped(self.name, python_version=python.version) + + if not virtualenv_python: + display.warning(f'Skipping sanity test "{self.name}" due to missing virtual environment support on Python {args.controller_python.version}.') + return SanitySkipped(self.name, python.version) + + controller_python_versions = CONTROLLER_PYTHON_VERSIONS + remote_only_python_versions = REMOTE_ONLY_PYTHON_VERSIONS + + contexts = ( + MyPyContext('ansible-test', ['test/lib/ansible_test/_util/target/sanity/import/'], controller_python_versions), + MyPyContext('ansible-test', ['test/lib/ansible_test/_internal/'], controller_python_versions), + MyPyContext('ansible-core', ['lib/ansible/'], controller_python_versions), + MyPyContext('modules', ['lib/ansible/modules/', 'lib/ansible/module_utils/'], remote_only_python_versions), + ) + + unfiltered_messages: list[SanityMessage] = [] + + for context in contexts: + if python.version not in context.python_versions: + continue + + unfiltered_messages.extend(self.test_context(args, virtualenv_python, python, context, paths)) + + notices = [] + messages = [] + + for message in unfiltered_messages: + if message.level != 'error': + notices.append(message) + continue + + match = re.search(r'^(?P<message>.*) {2}\[(?P<code>.*)]$', message.message) + + messages.append(SanityMessage( + message=match.group('message'), + path=message.path, + line=message.line, + column=message.column, + level=message.level, + code=match.group('code'), + )) + + for notice in notices: + display.info(notice.format(), verbosity=3) + + # The following error codes from mypy indicate that results are incomplete. + # That prevents the test from completing successfully, just as if mypy were to traceback or generate unexpected output. + fatal_error_codes = { + 'import', + 'syntax', + } + + fatal_errors = [message for message in messages if message.code in fatal_error_codes] + + if fatal_errors: + error_message = '\n'.join(error.format() for error in fatal_errors) + raise ApplicationError(f'Encountered {len(fatal_errors)} fatal errors reported by mypy:\n{error_message}') + + paths_set = set(paths) + + # Only report messages for paths that were specified as targets. + # Imports in our code are followed by mypy in order to perform its analysis, which is important for accurate results. + # However, it will also report issues on those files, which is not the desired behavior. + messages = [message for message in messages if message.path in paths_set] + + results = settings.process_errors(messages, paths) + + if results: + return SanityFailure(self.name, messages=results, python_version=python.version) + + return SanitySuccess(self.name, python_version=python.version) + + @staticmethod + def test_context( + args: SanityConfig, + virtualenv_python: VirtualPythonConfig, + python: PythonConfig, + context: MyPyContext, + paths: list[str], + ) -> list[SanityMessage]: + """Run mypy tests for the specified context.""" + context_paths = [path for path in paths if any(is_subdir(path, match_path) for match_path in context.paths)] + + if not context_paths: + return [] + + config_path = os.path.join(ANSIBLE_TEST_CONTROLLER_ROOT, 'sanity', 'mypy', f'{context.name}.ini') + + display.info(f'Checking context "{context.name}"', verbosity=1) + + env = ansible_environment(args, color=False) + env['MYPYPATH'] = env['PYTHONPATH'] + + # The --no-site-packages option should not be used, as it will prevent loading of type stubs from the sanity test virtual environment. + + # Enabling the --warn-unused-configs option would help keep the config files clean. + # However, the option can only be used when all files in tested contexts are evaluated. + # Unfortunately sanity tests have no way of making that determination currently. + # The option is also incompatible with incremental mode and caching. + + cmd = [ + # Below are arguments common to all contexts. + # They are kept here to avoid repetition in each config file. + virtualenv_python.path, + '-m', 'mypy', + '--show-column-numbers', + '--show-error-codes', + '--no-error-summary', + # This is a fairly common pattern in our code, so we'll allow it. + '--allow-redefinition', + # Since we specify the path(s) to test, it's important that mypy is configured to use the default behavior of following imports. + '--follow-imports', 'normal', + # Incremental results and caching do not provide significant performance benefits. + # It also prevents the use of the --warn-unused-configs option. + '--no-incremental', + '--cache-dir', '/dev/null', + # The platform is specified here so that results are consistent regardless of what platform the tests are run from. + # In the future, if testing of other platforms is desired, the platform should become part of the test specification, just like the Python version. + '--platform', 'linux', + # Despite what the documentation [1] states, the --python-version option does not cause mypy to search for a corresponding Python executable. + # It will instead use the Python executable that is used to run mypy itself. + # The --python-executable option can be used to specify the Python executable, with the default being the executable used to run mypy. + # As a precaution, that option is used in case the behavior of mypy is updated in the future to match the documentation. + # That should help guarantee that the Python executable providing type hints is the one used to run mypy. + # [1] https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-python-version + '--python-executable', virtualenv_python.path, + '--python-version', python.version, + # Below are context specific arguments. + # They are primarily useful for listing individual 'ignore_missing_imports' entries instead of using a global ignore. + '--config-file', config_path, + ] + + cmd.extend(context_paths) + + try: + stdout, stderr = intercept_python(args, virtualenv_python, cmd, env, capture=True) + + if stdout or stderr: + raise SubprocessError(cmd, stdout=stdout, stderr=stderr) + except SubprocessError as ex: + if ex.status != 1 or ex.stderr or not ex.stdout: + raise + + stdout = ex.stdout + + pattern = r'^(?P<path>[^:]*):(?P<line>[0-9]+):((?P<column>[0-9]+):)? (?P<level>[^:]+): (?P<message>.*)$' + + parsed = parse_to_list_of_dict(pattern, stdout) + + messages = [SanityMessage( + level=r['level'], + message=r['message'], + path=r['path'], + line=int(r['line']), + column=int(r.get('column') or '0'), + ) for r in parsed] + + return messages + + +@dataclasses.dataclass(frozen=True) +class MyPyContext: + """Context details for a single run of mypy.""" + name: str + paths: list[str] + python_versions: tuple[str, ...] diff --git a/test/lib/ansible_test/_internal/commands/sanity/pep8.py b/test/lib/ansible_test/_internal/commands/sanity/pep8.py new file mode 100644 index 0000000..5df9ace --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/pep8.py @@ -0,0 +1,109 @@ +"""Sanity test for PEP 8 style guidelines using pycodestyle.""" +from __future__ import annotations + +import os +import typing as t + +from . import ( + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + read_lines_without_comments, + parse_to_list_of_dict, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...host_configs import ( + PythonConfig, +) + + +class Pep8Test(SanitySingleVersion): + """Sanity test for PEP 8 style guidelines using pycodestyle.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'A100' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + current_ignore_file = os.path.join(SANITY_ROOT, 'pep8', 'current-ignore.txt') + current_ignore = sorted(read_lines_without_comments(current_ignore_file, remove_blank_lines=True)) + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + cmd = [ + python.path, + '-m', 'pycodestyle', + '--max-line-length', '160', + '--config', '/dev/null', + '--ignore', ','.join(sorted(current_ignore)), + ] + paths + + if paths: + try: + stdout, stderr = run_command(args, cmd, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + else: + stdout = None + + if args.explain: + return SanitySuccess(self.name) + + if stdout: + pattern = '^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<code>[WE][0-9]{3}) (?P<message>.*)$' + + results = parse_to_list_of_dict(pattern, stdout) + else: + results = [] + + messages = [SanityMessage( + message=r['message'], + path=r['path'], + line=int(r['line']), + column=int(r['column']), + level='warning' if r['code'].startswith('W') else 'error', + code=r['code'], + ) for r in results] + + errors = settings.process_errors(messages, paths) + + if errors: + return SanityFailure(self.name, messages=errors) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/pslint.py b/test/lib/ansible_test/_internal/commands/sanity/pslint.py new file mode 100644 index 0000000..9136d51 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/pslint.py @@ -0,0 +1,119 @@ +"""Sanity test using PSScriptAnalyzer.""" +from __future__ import annotations + +import json +import os +import re +import typing as t + +from . import ( + SanityVersionNeutral, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + find_executable, + ANSIBLE_TEST_DATA_ROOT, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + + +class PslintTest(SanityVersionNeutral): + """Sanity test using PSScriptAnalyzer.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'AnsibleTest' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] in ('.ps1', '.psm1', '.psd1')] + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + if not find_executable('pwsh', required='warning'): + return SanitySkipped(self.name) + + cmds = [] + + if args.controller.is_managed or args.requirements: + cmds.append(['pwsh', os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', 'sanity.pslint.ps1')]) + + cmds.append(['pwsh', os.path.join(SANITY_ROOT, 'pslint', 'pslint.ps1')] + paths) + + stdout = '' + + for cmd in cmds: + try: + stdout, stderr = run_command(args, cmd, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return SanitySuccess(self.name) + + severity = [ + 'Information', + 'Warning', + 'Error', + 'ParseError', + ] + + cwd = data_context().content.root + '/' + + # replace unicode smart quotes and ellipsis with ascii versions + stdout = re.sub('[\u2018\u2019]', "'", stdout) + stdout = re.sub('[\u201c\u201d]', '"', stdout) + stdout = re.sub('[\u2026]', '...', stdout) + + messages = json.loads(stdout) + + errors = [SanityMessage( + code=m['RuleName'], + message=m['Message'], + path=m['ScriptPath'].replace(cwd, ''), + line=m['Line'] or 0, + column=m['Column'] or 0, + level=severity[m['Severity']], + ) for m in messages] + + errors = settings.process_errors(errors, paths) + + if errors: + return SanityFailure(self.name, messages=errors) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/pylint.py b/test/lib/ansible_test/_internal/commands/sanity/pylint.py new file mode 100644 index 0000000..86f287a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/pylint.py @@ -0,0 +1,270 @@ +"""Sanity test using pylint.""" +from __future__ import annotations + +import collections.abc as c +import itertools +import json +import os +import datetime +import configparser +import typing as t + +from . import ( + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...ansible_util import ( + ansible_environment, + get_collection_detail, + CollectionDetail, + CollectionDetailError, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class PylintTest(SanitySingleVersion): + """Sanity test using pylint.""" + def __init__(self) -> None: + super().__init__() + self.optional_error_codes.update([ + 'ansible-deprecated-date', + 'too-complex', + ]) + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'ansible-test' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + plugin_dir = os.path.join(SANITY_ROOT, 'pylint', 'plugins') + plugin_names = sorted(p[0] for p in [ + os.path.splitext(p) for p in os.listdir(plugin_dir)] if p[1] == '.py' and p[0] != '__init__') + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + module_paths = [os.path.relpath(p, data_context().content.module_path).split(os.path.sep) for p in + paths if is_subdir(p, data_context().content.module_path)] + module_dirs = sorted({p[0] for p in module_paths if len(p) > 1}) + + large_module_group_threshold = 500 + large_module_groups = [key for key, value in + itertools.groupby(module_paths, lambda p: p[0] if len(p) > 1 else '') if len(list(value)) > large_module_group_threshold] + + large_module_group_paths = [os.path.relpath(p, data_context().content.module_path).split(os.path.sep) for p in paths + if any(is_subdir(p, os.path.join(data_context().content.module_path, g)) for g in large_module_groups)] + large_module_group_dirs = sorted({os.path.sep.join(p[:2]) for p in large_module_group_paths if len(p) > 2}) + + contexts = [] + remaining_paths = set(paths) + + def add_context(available_paths: set[str], context_name: str, context_filter: c.Callable[[str], bool]) -> None: + """Add the specified context to the context list, consuming available paths that match the given context filter.""" + filtered_paths = set(p for p in available_paths if context_filter(p)) + contexts.append((context_name, sorted(filtered_paths))) + available_paths -= filtered_paths + + def filter_path(path_filter: str = None) -> c.Callable[[str], bool]: + """Return a function that filters out paths which are not a subdirectory of the given path.""" + def context_filter(path_to_filter: str) -> bool: + """Return true if the given path matches, otherwise return False.""" + return is_subdir(path_to_filter, path_filter) + + return context_filter + + for large_module_dir in large_module_group_dirs: + add_context(remaining_paths, 'modules/%s' % large_module_dir, filter_path(os.path.join(data_context().content.module_path, large_module_dir))) + + for module_dir in module_dirs: + add_context(remaining_paths, 'modules/%s' % module_dir, filter_path(os.path.join(data_context().content.module_path, module_dir))) + + add_context(remaining_paths, 'modules', filter_path(data_context().content.module_path)) + add_context(remaining_paths, 'module_utils', filter_path(data_context().content.module_utils_path)) + + add_context(remaining_paths, 'units', filter_path(data_context().content.unit_path)) + + if data_context().content.collection: + add_context(remaining_paths, 'collection', lambda p: True) + else: + add_context(remaining_paths, 'validate-modules', filter_path('test/lib/ansible_test/_util/controller/sanity/validate-modules/')) + add_context(remaining_paths, 'validate-modules-unit', filter_path('test/lib/ansible_test/tests/validate-modules-unit/')) + add_context(remaining_paths, 'code-smell', filter_path('test/lib/ansible_test/_util/controller/sanity/code-smell/')) + add_context(remaining_paths, 'ansible-test-target', filter_path('test/lib/ansible_test/_util/target/')) + add_context(remaining_paths, 'ansible-test', filter_path('test/lib/')) + add_context(remaining_paths, 'test', filter_path('test/')) + add_context(remaining_paths, 'hacking', filter_path('hacking/')) + add_context(remaining_paths, 'ansible', lambda p: True) + + messages = [] + context_times = [] + + collection_detail = None + + if data_context().content.collection: + try: + collection_detail = get_collection_detail(python) + + if not collection_detail.version: + display.warning('Skipping pylint collection version checks since no collection version was found.') + except CollectionDetailError as ex: + display.warning('Skipping pylint collection version checks since collection detail loading failed: %s' % ex.reason) + + test_start = datetime.datetime.utcnow() + + for context, context_paths in sorted(contexts): + if not context_paths: + continue + + context_start = datetime.datetime.utcnow() + messages += self.pylint(args, context, context_paths, plugin_dir, plugin_names, python, collection_detail) + context_end = datetime.datetime.utcnow() + + context_times.append('%s: %d (%s)' % (context, len(context_paths), context_end - context_start)) + + test_end = datetime.datetime.utcnow() + + for context_time in context_times: + display.info(context_time, verbosity=4) + + display.info('total: %d (%s)' % (len(paths), test_end - test_start), verbosity=4) + + errors = [SanityMessage( + message=m['message'].replace('\n', ' '), + path=m['path'], + line=int(m['line']), + column=int(m['column']), + level=m['type'], + code=m['symbol'], + ) for m in messages] + + if args.explain: + return SanitySuccess(self.name) + + errors = settings.process_errors(errors, paths) + + if errors: + return SanityFailure(self.name, messages=errors) + + return SanitySuccess(self.name) + + @staticmethod + def pylint( + args: SanityConfig, + context: str, + paths: list[str], + plugin_dir: str, + plugin_names: list[str], + python: PythonConfig, + collection_detail: CollectionDetail, + ) -> list[dict[str, str]]: + """Run pylint using the config specified by the context on the specified paths.""" + rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', context.split('/')[0] + '.cfg') + + if not os.path.exists(rcfile): + if data_context().content.collection: + rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', 'collection.cfg') + else: + rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', 'default.cfg') + + parser = configparser.ConfigParser() + parser.read(rcfile) + + if parser.has_section('ansible-test'): + config = dict(parser.items('ansible-test')) + else: + config = {} + + disable_plugins = set(i.strip() for i in config.get('disable-plugins', '').split(',') if i) + load_plugins = set(plugin_names + ['pylint.extensions.mccabe']) - disable_plugins + + cmd = [ + python.path, + '-m', 'pylint', + '--jobs', '0', + '--reports', 'n', + '--max-line-length', '160', + '--max-complexity', '20', + '--rcfile', rcfile, + '--output-format', 'json', + '--load-plugins', ','.join(sorted(load_plugins)), + ] + paths + + if data_context().content.collection: + cmd.extend(['--collection-name', data_context().content.collection.full_name]) + + if collection_detail and collection_detail.version: + cmd.extend(['--collection-version', collection_detail.version]) + + append_python_path = [plugin_dir] + + if data_context().content.collection: + append_python_path.append(data_context().content.collection.root) + + env = ansible_environment(args) + env['PYTHONPATH'] += os.path.pathsep + os.path.pathsep.join(append_python_path) + + # expose plugin paths for use in custom plugins + env.update(dict(('ANSIBLE_TEST_%s_PATH' % k.upper(), os.path.abspath(v) + os.path.sep) for k, v in data_context().content.plugin_paths.items())) + + if paths: + display.info('Checking %d file(s) in context "%s" with config: %s' % (len(paths), context, rcfile), verbosity=1) + + try: + stdout, stderr = run_command(args, cmd, env=env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr or status >= 32: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + else: + stdout = None + + if not args.explain and stdout: + messages = json.loads(stdout) + else: + messages = [] + + return messages diff --git a/test/lib/ansible_test/_internal/commands/sanity/sanity_docs.py b/test/lib/ansible_test/_internal/commands/sanity/sanity_docs.py new file mode 100644 index 0000000..4f14a3a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/sanity_docs.py @@ -0,0 +1,60 @@ +"""Sanity test for documentation of sanity tests.""" +from __future__ import annotations + +import os + +from . import ( + SanityVersionNeutral, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + sanity_get_tests, +) + +from ...test import ( + TestResult, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + + +class SanityDocsTest(SanityVersionNeutral): + """Sanity test for documentation of sanity tests.""" + ansible_only = True + + @property + def can_ignore(self) -> bool: + """True if the test supports ignore entries.""" + return False + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + sanity_dir = 'docs/docsite/rst/dev_guide/testing/sanity' + sanity_docs = set(part[0] for part in (os.path.splitext(os.path.basename(path)) for path in data_context().content.get_files(sanity_dir)) + if part[1] == '.rst') + sanity_tests = set(sanity_test.name for sanity_test in sanity_get_tests()) + + missing = sanity_tests - sanity_docs + + results = [] + + results += [SanityMessage( + message='missing docs for ansible-test sanity --test %s' % r, + path=os.path.join(sanity_dir, '%s.rst' % r), + ) for r in sorted(missing)] + + if results: + return SanityFailure(self.name, messages=results) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/shellcheck.py b/test/lib/ansible_test/_internal/commands/sanity/shellcheck.py new file mode 100644 index 0000000..7de0bda --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/shellcheck.py @@ -0,0 +1,108 @@ +"""Sanity test using shellcheck.""" +from __future__ import annotations + +import os +import typing as t + +from xml.etree.ElementTree import ( + fromstring, + Element, +) + +from . import ( + SanityVersionNeutral, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + read_lines_without_comments, + find_executable, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + + +class ShellcheckTest(SanityVersionNeutral): + """Sanity test using shellcheck.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'AT1000' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.sh'] + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + exclude_file = os.path.join(SANITY_ROOT, 'shellcheck', 'exclude.txt') + exclude = set(read_lines_without_comments(exclude_file, remove_blank_lines=True, optional=True)) + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + if not find_executable('shellcheck', required='warning'): + return SanitySkipped(self.name) + + cmd = [ + 'shellcheck', + '-e', ','.join(sorted(exclude)), + '--format', 'checkstyle', + ] + paths + + try: + stdout, stderr = run_command(args, cmd, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr or status > 1: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return SanitySuccess(self.name) + + # json output is missing file paths in older versions of shellcheck, so we'll use xml instead + root: Element = fromstring(stdout) + + results = [] + + for item in root: + for entry in item: + results.append(SanityMessage( + message=entry.attrib['message'], + path=item.attrib['name'], + line=int(entry.attrib['line']), + column=int(entry.attrib['column']), + level=entry.attrib['severity'], + code=entry.attrib['source'].replace('ShellCheck.', ''), + )) + + results = settings.process_errors(results, paths) + + if results: + return SanityFailure(self.name, messages=results) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/validate_modules.py b/test/lib/ansible_test/_internal/commands/sanity/validate_modules.py new file mode 100644 index 0000000..e1dacb7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/validate_modules.py @@ -0,0 +1,190 @@ +"""Sanity test using validate-modules.""" +from __future__ import annotations + +import collections +import json +import os +import typing as t + +from . import ( + DOCUMENTABLE_PLUGINS, + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, +) + +from ...util_common import ( + run_command, +) + +from ...ansible_util import ( + ansible_environment, + get_collection_detail, + CollectionDetailError, +) + +from ...config import ( + SanityConfig, +) + +from ...ci import ( + get_ci_provider, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class ValidateModulesTest(SanitySingleVersion): + """Sanity test using validate-modules.""" + + def __init__(self) -> None: + super().__init__() + + self.optional_error_codes.update([ + 'deprecated-date', + ]) + + self._prefixes = { + plugin_type: plugin_path + '/' + for plugin_type, plugin_path in data_context().content.plugin_paths.items() + if plugin_type in DOCUMENTABLE_PLUGINS + } + + self._exclusions = set() + + if not data_context().content.collection: + self._exclusions.add('lib/ansible/plugins/cache/base.py') + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'A100' + + def get_plugin_type(self, target: TestTarget) -> t.Optional[str]: + """Return the plugin type of the given target, or None if it is not a plugin or module.""" + if target.path.endswith('/__init__.py'): + return None + + if target.path in self._exclusions: + return None + + for plugin_type, prefix in self._prefixes.items(): + if target.path.startswith(prefix): + return plugin_type + + return None + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if self.get_plugin_type(target) is not None] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + env = ansible_environment(args, color=False) + + settings = self.load_processor(args) + + target_per_type = collections.defaultdict(list) + + for target in targets.include: + target_per_type[self.get_plugin_type(target)].append(target) + + cmd = [ + python.path, + os.path.join(SANITY_ROOT, 'validate-modules', 'validate.py'), + '--format', 'json', + '--arg-spec', + ] + + if data_context().content.collection: + cmd.extend(['--collection', data_context().content.collection.directory]) + + try: + collection_detail = get_collection_detail(python) + + if collection_detail.version: + cmd.extend(['--collection-version', collection_detail.version]) + else: + display.warning('Skipping validate-modules collection version checks since no collection version was found.') + except CollectionDetailError as ex: + display.warning('Skipping validate-modules collection version checks since collection detail loading failed: %s' % ex.reason) + else: + base_branch = args.base_branch or get_ci_provider().get_base_branch() + + if base_branch: + cmd.extend([ + '--base-branch', base_branch, + ]) + else: + display.warning('Cannot perform module comparison against the base branch because the base branch was not detected.') + + errors = [] + + for plugin_type, plugin_targets in sorted(target_per_type.items()): + paths = [target.path for target in plugin_targets] + plugin_cmd = list(cmd) + + if plugin_type != 'modules': + plugin_cmd += ['--plugin-type', plugin_type] + + plugin_cmd += paths + + try: + stdout, stderr = run_command(args, plugin_cmd, env=env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr or status not in (0, 3): + raise SubprocessError(cmd=plugin_cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + continue + + messages = json.loads(stdout) + + for filename in messages: + output = messages[filename] + + for item in output['errors']: + errors.append(SanityMessage( + path=filename, + line=int(item['line']) if 'line' in item else 0, + column=int(item['column']) if 'column' in item else 0, + code='%s' % item['code'], + message=item['msg'], + )) + + all_paths = [target.path for target in targets.include] + all_errors = settings.process_errors(errors, all_paths) + + if args.explain: + return SanitySuccess(self.name) + + if all_errors: + return SanityFailure(self.name, messages=all_errors) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/yamllint.py b/test/lib/ansible_test/_internal/commands/sanity/yamllint.py new file mode 100644 index 0000000..a0d859f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/yamllint.py @@ -0,0 +1,125 @@ +"""Sanity test using yamllint.""" +from __future__ import annotations + +import json +import os +import typing as t + +from . import ( + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class YamllintTest(SanitySingleVersion): + """Sanity test using yamllint.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'ansible-test' + + @property + def require_libyaml(self) -> bool: + """True if the test requires PyYAML to have libyaml support.""" + return True + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + yaml_targets = [target for target in targets if os.path.splitext(target.path)[1] in ('.yml', '.yaml')] + + for plugin_type, plugin_path in sorted(data_context().content.plugin_paths.items()): + if plugin_type == 'module_utils': + continue + + yaml_targets.extend([target for target in targets if + os.path.splitext(target.path)[1] == '.py' and + os.path.basename(target.path) != '__init__.py' and + is_subdir(target.path, plugin_path)]) + + return yaml_targets + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + results = self.test_paths(args, paths, python) + results = settings.process_errors(results, paths) + + if results: + return SanityFailure(self.name, messages=results) + + return SanitySuccess(self.name) + + @staticmethod + def test_paths(args: SanityConfig, paths: list[str], python: PythonConfig) -> list[SanityMessage]: + """Test the specified paths using the given Python and return the results.""" + cmd = [ + python.path, + os.path.join(SANITY_ROOT, 'yamllint', 'yamllinter.py'), + ] + + data = '\n'.join(paths) + + display.info(data, verbosity=4) + + try: + stdout, stderr = run_command(args, cmd, data=data, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return [] + + results = json.loads(stdout)['messages'] + + results = [SanityMessage( + code=r['code'], + message=r['message'], + path=r['path'], + line=int(r['line']), + column=int(r['column']), + level=r['level'], + ) for r in results] + + return results |