diff options
Diffstat (limited to 'src/ansiblelint/rules/args.py')
-rw-r--r-- | src/ansiblelint/rules/args.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/src/ansiblelint/rules/args.py b/src/ansiblelint/rules/args.py new file mode 100644 index 0000000..2acf32e --- /dev/null +++ b/src/ansiblelint/rules/args.py @@ -0,0 +1,310 @@ +"""Rule definition to validate task options.""" +from __future__ import annotations + +import contextlib +import importlib.util +import io +import json +import logging +import re +import sys +from functools import lru_cache +from typing import TYPE_CHECKING, Any + +# pylint: disable=preferred-module +from unittest import mock +from unittest.mock import patch + +# pylint: disable=reimported +import ansible.module_utils.basic as mock_ansible_module +from ansible.module_utils import basic +from ansible.plugins.loader import PluginLoadContext, module_loader + +from ansiblelint.constants import LINE_NUMBER_KEY +from ansiblelint.rules import AnsibleLintRule, RulesCollection +from ansiblelint.text import has_jinja +from ansiblelint.yaml_utils import clean_json + +if TYPE_CHECKING: + from ansiblelint.errors import MatchError + from ansiblelint.file_utils import Lintable + from ansiblelint.utils import Task + + +_logger = logging.getLogger(__name__) + +ignored_re = re.compile( + "|".join( # noqa: FLY002 + [ + r"^parameters are mutually exclusive:", + # https://github.com/ansible/ansible-lint/issues/3128 as strings can be jinja + # Do not remove unless you manually test if the original example + # from the bug does not trigger the rule anymore. We were not able + # to add a regression test because it would involve installing this + # collection. Attempts to reproduce same bug with other collections + # failed, even if the message originates from Ansible core. + r"^unable to evaluate string as dictionary$", + ], + ), + flags=re.MULTILINE | re.DOTALL, +) + +workarounds_drop_map = { + # https://github.com/ansible/ansible-lint/issues/3110 + "ansible.builtin.copy": ["decrypt"], + # https://github.com/ansible/ansible-lint/issues/2824#issuecomment-1354337466 + # https://github.com/ansible/ansible-lint/issues/3138 + "ansible.builtin.service": ["daemon_reload", "use"], + # Avoid: Unsupported parameters for (basic.py) module: cmd. Supported parameters include: _raw_params, _uses_shell, argv, chdir, creates, executable, removes, stdin, stdin_add_newline, strip_empty_ends. + "ansible.builtin.command": ["cmd"], + # https://github.com/ansible/ansible-lint/issues/3152 + "ansible.posix.synchronize": ["use_ssh_args"], +} +workarounds_inject_map = { + # https://github.com/ansible/ansible-lint/issues/2824 + "ansible.builtin.async_status": {"_async_dir": "/tmp/ansible-async"}, +} + + +@lru_cache +def load_module(module_name: str) -> PluginLoadContext: + """Load plugin from module name and cache it.""" + return module_loader.find_plugin_with_context(module_name) + + +class ValidationPassedError(Exception): + """Exception to be raised when validation passes.""" + + +class CustomAnsibleModule(basic.AnsibleModule): # type: ignore[misc] + """Mock AnsibleModule class.""" + + def __init__(self, *args: str, **kwargs: str) -> None: + """Initialize AnsibleModule mock.""" + super().__init__(*args, **kwargs) + raise ValidationPassedError + + +class ArgsRule(AnsibleLintRule): + """Validating module arguments.""" + + id = "args" + severity = "HIGH" + description = "Check whether tasks are using correct module options." + tags = ["syntax", "experimental"] + version_added = "v6.10.0" + module_aliases: dict[str, str] = {"block/always/rescue": "block/always/rescue"} + _ids = { + "args[module]": description, + } + + def matchtask( + self, + task: Task, + file: Lintable | None = None, + ) -> list[MatchError]: + # pylint: disable=too-many-locals,too-many-return-statements + results: list[MatchError] = [] + module_name = task["action"]["__ansible_module_original__"] + failed_msg = None + + if module_name in self.module_aliases: + return [] + + loaded_module = load_module(module_name) + + # https://github.com/ansible/ansible-lint/issues/3200 + # since "ps1" modules cannot be executed on POSIX platforms, we will + # avoid running this rule for such modules + if isinstance( + loaded_module.plugin_resolved_path, + str, + ) and loaded_module.plugin_resolved_path.endswith(".ps1"): + return [] + + module_args = { + key: value + for key, value in task["action"].items() + if not key.startswith("__") + } + + # Return if 'args' is jinja string + # https://github.com/ansible/ansible-lint/issues/3199 + if ( + "args" in task.raw_task + and isinstance(task.raw_task["args"], str) + and has_jinja(task.raw_task["args"]) + ): + return [] + + if loaded_module.resolved_fqcn in workarounds_inject_map: + module_args.update(workarounds_inject_map[loaded_module.resolved_fqcn]) + if loaded_module.resolved_fqcn in workarounds_drop_map: + for key in workarounds_drop_map[loaded_module.resolved_fqcn]: + if key in module_args: + del module_args[key] + + with mock.patch.object( + mock_ansible_module, + "AnsibleModule", + CustomAnsibleModule, + ): + spec = importlib.util.spec_from_file_location( + name=loaded_module.resolved_fqcn, + location=loaded_module.plugin_resolved_path, + ) + if spec: + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + else: + assert file is not None + _logger.warning( + "Unable to load module %s at %s:%s for options validation", + module_name, + file.filename, + task[LINE_NUMBER_KEY], + ) + return [] + + try: + if not hasattr(module, "main"): + # skip validation for module options that are implemented as action plugin + # as the option values can be changed in action plugin and are not passed + # through `ArgumentSpecValidator` class as in case of modules. + return [] + + with patch.object( + sys, + "argv", + ["", json.dumps({"ANSIBLE_MODULE_ARGS": clean_json(module_args)})], + ): + fio = io.StringIO() + failed_msg = "" + # Warning: avoid running anything while stdout is redirected + # as what happens may be very hard to debug. + with contextlib.redirect_stdout(fio): + # pylint: disable=protected-access + basic._ANSIBLE_ARGS = None # noqa: SLF001 + try: + module.main() + except SystemExit: + failed_msg = fio.getvalue() + if failed_msg: + results.extend( + self._parse_failed_msg(failed_msg, task, module_name, file), + ) + + sanitized_results = self._sanitize_results(results, module_name) + return sanitized_results + except ValidationPassedError: + return [] + + # pylint: disable=unused-argument + def _sanitize_results( + self, + results: list[MatchError], + module_name: str, + ) -> list[MatchError]: + """Remove results that are false positive.""" + sanitized_results = [] + for result in results: + result_msg = result.message + if ignored_re.match(result_msg): + continue + sanitized_results.append(result) + + return sanitized_results + + def _parse_failed_msg( + self, + failed_msg: str, + task: dict[str, Any], + module_name: str, + file: Lintable | None = None, + ) -> list[MatchError]: + """Parse failed message and return list of MatchError.""" + results: list[MatchError] = [] + try: + failed_obj = json.loads(failed_msg) + error_message = failed_obj["msg"] + except json.decoder.JSONDecodeError: + error_message = failed_msg + + option_type_check_error = re.search( + r"argument '(?P<name>.*)' is of type", + error_message, + ) + if option_type_check_error: + # ignore options with templated variable value with type check errors + option_key = option_type_check_error.group("name") + option_value = task["action"][option_key] + if has_jinja(option_value): + _logger.debug( + "Type checking ignored for '%s' option in task '%s' at line %s.", + option_key, + module_name, + task[LINE_NUMBER_KEY], + ) + return results + + value_not_in_choices_error = re.search( + r"value of (?P<name>.*) must be one of:", + error_message, + ) + if value_not_in_choices_error: + # ignore templated value not in allowed choices + choice_key = value_not_in_choices_error.group("name") + choice_value = task["action"][choice_key] + if has_jinja(choice_value): + _logger.debug( + "Value checking ignored for '%s' option in task '%s' at line %s.", + choice_key, + module_name, + task[LINE_NUMBER_KEY], + ) + return results + + results.append( + self.create_matcherror( + message=error_message, + lineno=task[LINE_NUMBER_KEY], + tag="args[module]", + filename=file, + ), + ) + return results + + +# testing code to be loaded only with pytest or when executed the rule file +if "pytest" in sys.modules: + import pytest # noqa: TCH002 + + from ansiblelint.runner import Runner # pylint: disable=ungrouped-imports + + def test_args_module_fail(default_rules_collection: RulesCollection) -> None: + """Test rule invalid module options.""" + success = "examples/playbooks/rule-args-module-fail.yml" + results = Runner(success, rules=default_rules_collection).run() + assert len(results) == 5 + assert results[0].tag == "args[module]" + assert "missing required arguments" in results[0].message + assert results[1].tag == "args[module]" + assert "missing parameter(s) required by " in results[1].message + assert results[2].tag == "args[module]" + assert "Unsupported parameters for" in results[2].message + assert results[3].tag == "args[module]" + assert "Unsupported parameters for" in results[3].message + assert results[4].tag == "args[module]" + assert "value of state must be one of" in results[4].message + + def test_args_module_pass( + default_rules_collection: RulesCollection, + caplog: pytest.LogCaptureFixture, + ) -> None: + """Test rule valid module options.""" + success = "examples/playbooks/rule-args-module-pass.yml" + with caplog.at_level(logging.WARNING): + results = Runner(success, rules=default_rules_collection).run() + assert len(results) == 0, results + assert len(caplog.records) == 0, caplog.records |