diff options
Diffstat (limited to 'src/ansiblelint/utils.py')
-rw-r--r-- | src/ansiblelint/utils.py | 511 |
1 files changed, 304 insertions, 207 deletions
diff --git a/src/ansiblelint/utils.py b/src/ansiblelint/utils.py index 9cb97aa..3d0e535 100644 --- a/src/ansiblelint/utils.py +++ b/src/ansiblelint/utils.py @@ -22,26 +22,35 @@ """Generic utility helpers.""" from __future__ import annotations +import ast import contextlib import inspect import logging import os import re -from collections.abc import Generator, ItemsView, Iterator, Mapping, Sequence +from collections.abc import ItemsView, Iterable, Iterator, Mapping, Sequence from dataclasses import _MISSING_TYPE, dataclass, field -from functools import cache +from functools import cache, lru_cache from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any +import ruamel.yaml.parser import yaml from ansible.errors import AnsibleError, AnsibleParserError from ansible.module_utils.parsing.convert_bool import boolean from ansible.parsing.dataloader import DataLoader from ansible.parsing.mod_args import ModuleArgsParser +from ansible.parsing.plugin_docs import read_docstring +from ansible.parsing.splitter import split_args from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping from ansible.parsing.yaml.loader import AnsibleLoader from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence -from ansible.plugins.loader import add_all_plugin_dirs +from ansible.plugins.loader import ( + PluginLoadContext, + action_loader, + add_all_plugin_dirs, + module_loader, +) from ansible.template import Templar from ansible.utils.collection_loader import AnsibleCollectionConfig from yaml.composer import Composer @@ -51,7 +60,7 @@ from ansiblelint._internal.rules import ( AnsibleParserErrorRule, RuntimeErrorRule, ) -from ansiblelint.app import get_app +from ansiblelint.app import App, get_app from ansiblelint.config import Options, options from ansiblelint.constants import ( ANNOTATION_KEYS, @@ -67,8 +76,10 @@ from ansiblelint.constants import ( from ansiblelint.errors import MatchError from ansiblelint.file_utils import Lintable, discover_lintables from ansiblelint.skip_utils import is_nested_task -from ansiblelint.text import removeprefix +from ansiblelint.text import has_jinja, removeprefix +if TYPE_CHECKING: + from ansiblelint.rules import RulesCollection # ansible-lint doesn't need/want to know about encrypted secrets, so we pass a # string as the password to enable such yaml files to be opened and parsed # successfully. @@ -164,7 +175,6 @@ def ansible_template( for _i in range(10): try: templated = templar.template(varname, **kwargs) - return templated except AnsibleError as exc: if lookup_error in exc.message: return varname @@ -186,16 +196,14 @@ def ansible_template( _logger.warning(err) raise - # pylint: disable=protected-access - templar.environment.filters._delegatee[ # noqa: SLF001 - missing_filter - ] = mock_filter + templar.environment.filters._delegatee[missing_filter] = mock_filter # fmt: skip # noqa: SLF001 # Record the mocked filter so we can warn the user if missing_filter not in options.mock_filters: _logger.debug("Mocking missing filter %s", missing_filter) options.mock_filters.append(missing_filter) continue raise + return templated return None @@ -210,26 +218,23 @@ BLOCK_NAME_TO_ACTION_TYPE_MAP = { } -def tokenize(line: str) -> tuple[str, list[str], dict[str, str]]: +def tokenize(value: str) -> tuple[list[str], dict[str, str]]: """Parse a string task invocation.""" - tokens = line.lstrip().split(" ") - if tokens[0] == "-": - tokens = tokens[1:] - if tokens[0] == "action:" or tokens[0] == "local_action:": - tokens = tokens[1:] - command = tokens[0].replace(":", "") - - args = [] - kwargs = {} - non_kv_found = False - for arg in tokens[1:]: - if "=" in arg and not non_kv_found: - key_value = arg.split("=", 1) - kwargs[key_value[0]] = key_value[1] + # We do not try to tokenize something very simple because it would fail to + # work for a case like: task_include: path with space.yml + if value and "=" not in value: + return ([value], {}) + + parts = split_args(value) + args: list[str] = [] + kwargs: dict[str, str] = {} + for part in parts: + if "=" not in part: + args.append(part) else: - non_kv_found = True - args.append(arg) - return (command, args, kwargs) + k, v = part.split("=", 1) + kwargs[k] = v + return (args, kwargs) def playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore[type-arg] @@ -278,106 +283,179 @@ def template( return value -def _include_children( - basedir: str, - k: str, - v: Any, - parent_type: FileType, -) -> list[Lintable]: - # handle special case include_tasks: name=filename.yml - if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v: - v = v["file"] - - # we cannot really parse any jinja2 in includes, so we ignore them - if not v or "{{" in v: - return [] - - if "import_playbook" in k and COLLECTION_PLAY_RE.match(v): - # Any import_playbooks from collections should be ignored as ansible - # own syntax check will handle them. - return [] - - # handle include: filename.yml tags=blah - # pylint: disable=unused-variable - (command, args, kwargs) = tokenize(f"{k}: {v}") - - result = path_dwim(basedir, args[0]) - while basedir not in ["", "/"]: - if os.path.exists(result): - break - basedir = os.path.dirname(basedir) - result = path_dwim(basedir, args[0]) - - return [Lintable(result, kind=parent_type)] - - -def _taskshandlers_children( - basedir: str, - k: str, - v: None | Any, - parent_type: FileType, -) -> list[Lintable]: - results: list[Lintable] = [] - if v is None: - raise MatchError( - message="A malformed block was encountered while loading a block.", - rule=RuntimeErrorRule(), - ) - for task_handler in v: - # ignore empty tasks, `-` - if not task_handler: - continue +@dataclass +class HandleChildren: + """Parse task, roles and children.""" + + rules: RulesCollection = field(init=True, repr=False) + app: App + + def include_children( # pylint: disable=too-many-return-statements + self, + lintable: Lintable, + k: str, + v: Any, + parent_type: FileType, + ) -> list[Lintable]: + """Include children.""" + basedir = str(lintable.path.parent) + # import_playbook only accepts a string as argument (no dict syntax) + if k in ( + "import_playbook", + "ansible.builtin.import_playbook", + ) and not isinstance(v, str): + return [] + + # handle special case include_tasks: name=filename.yml + if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v: + v = v["file"] + + # we cannot really parse any jinja2 in includes, so we ignore them + if not v or "{{" in v: + return [] + + if k in ("import_playbook", "ansible.builtin.import_playbook"): + included = Path(basedir) / v + if self.app.runtime.has_playbook(v, basedir=Path(basedir)): + if included.exists(): + return [Lintable(included, kind=parent_type)] + return [] + msg = f"Failed to find {v} playbook." + logging.error(msg) + return [] + + # handle include: filename.yml tags=blah + (args, kwargs) = tokenize(v) + + if args: + file = args[0] + elif "file" in kwargs: + file = kwargs["file"] + else: + return [] - with contextlib.suppress(LookupError): - children = _get_task_handler_children_for_tasks_or_playbooks( - task_handler, - basedir, - k, - parent_type, + result = path_dwim(basedir, file) + while basedir not in ["", "/"]: + if os.path.exists(result): + break + basedir = os.path.dirname(basedir) + result = path_dwim(basedir, file) + + return [Lintable(result, kind=parent_type)] + + def taskshandlers_children( + self, + lintable: Lintable, + k: str, + v: None | Any, + parent_type: FileType, + ) -> list[Lintable]: + """TasksHandlers Children.""" + basedir = str(lintable.path.parent) + results: list[Lintable] = [] + if v is None or isinstance(v, int | str): + raise MatchError( + message="A malformed block was encountered while loading a block.", + rule=RuntimeErrorRule(), ) - results.append(children) - continue + for task_handler in v: + # ignore empty tasks, `-` + if not task_handler: + continue - if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES): - task_handler = normalize_task_v2(task_handler) - _validate_task_handler_action_for_role(task_handler["action"]) - results.extend( - _roles_children( + with contextlib.suppress(LookupError): + children = _get_task_handler_children_for_tasks_or_playbooks( + task_handler, basedir, k, - [task_handler["action"].get("name")], parent_type, - main=task_handler["action"].get("tasks_from", "main"), - ), - ) - continue + ) + results.append(children) + continue - if "block" not in task_handler: - continue + if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES): + task_handler = normalize_task_v2( + Task(task_handler, filename=str(lintable.path)), + ) + self._validate_task_handler_action_for_role(task_handler["action"]) + name = task_handler["action"].get("name") + if has_jinja(name): + # we cannot deal with dynamic imports + continue + results.extend( + self.roles_children(lintable, k, [name], parent_type), + ) + continue - results.extend( - _taskshandlers_children(basedir, k, task_handler["block"], parent_type), - ) - if "rescue" in task_handler: - results.extend( - _taskshandlers_children( - basedir, - k, - task_handler["rescue"], - parent_type, + if "block" not in task_handler: + continue + + for elem in ("block", "rescue", "always"): + if elem in task_handler: + results.extend( + self.taskshandlers_children( + lintable, + k, + task_handler[elem], + parent_type, + ), + ) + + return results + + def _validate_task_handler_action_for_role(self, th_action: dict[str, Any]) -> None: + """Verify that the task handler action is valid for role include.""" + module = th_action["__ansible_module__"] + + if "name" not in th_action: + raise MatchError( + message=f"Failed to find required 'name' key in {module!s}", + rule=self.rules.rules[0], + lintable=Lintable( + ( + self.rules.options.lintables[0] + if self.rules.options.lintables + else "." + ), ), ) - if "always" in task_handler: - results.extend( - _taskshandlers_children( - basedir, - k, - task_handler["always"], - parent_type, - ), + + if not isinstance(th_action["name"], str): + raise MatchError( + message=f"Value assigned to 'name' key on '{module!s}' is not a string.", + rule=self.rules.rules[1], ) - return results + def roles_children( + self, + lintable: Lintable, + k: str, + v: Sequence[Any], + parent_type: FileType, + ) -> list[Lintable]: + """Roles children.""" + # pylint: disable=unused-argument # parent_type) + basedir = str(lintable.path.parent) + results: list[Lintable] = [] + if not v or not isinstance(v, Iterable): + # typing does not prevent junk from being passed in + return results + for role in v: + if isinstance(role, dict): + if "role" in role or "name" in role: + if "tags" not in role or "skip_ansible_lint" not in role["tags"]: + results.extend( + _look_for_role_files( + basedir, + role.get("role", role.get("name")), + ), + ) + elif k != "dependencies": + msg = f'role dict {role} does not contain a "role" or "name" key' + raise SystemExit(msg) + else: + results.extend(_look_for_role_files(basedir, role)) + return results def _get_task_handler_children_for_tasks_or_playbooks( @@ -393,13 +471,27 @@ def _get_task_handler_children_for_tasks_or_playbooks( for task_handler_key in INCLUSION_ACTION_NAMES: with contextlib.suppress(KeyError): # ignore empty tasks - if not task_handler: # pragma: no branch + if not task_handler or isinstance(task_handler, str): # pragma: no branch continue - file_name = task_handler[task_handler_key] - if isinstance(file_name, Mapping) and file_name.get("file", None): - file_name = file_name["file"] + file_name = "" + action_args = task_handler[task_handler_key] + if isinstance(action_args, str): + (args, kwargs) = tokenize(action_args) + if len(args) == 1: + file_name = args[0] + elif kwargs.get("file", None): + file_name = kwargs["file"] + else: + # ignore invalid data (syntax check will outside the scope) + continue + + if isinstance(action_args, Mapping) and action_args.get("file", None): + file_name = action_args["file"] + if not file_name: + # ignore invalid data (syntax check will outside the scope) + continue f = path_dwim(basedir, file_name) while basedir not in ["", "/"]: if os.path.exists(f): @@ -411,50 +503,6 @@ def _get_task_handler_children_for_tasks_or_playbooks( raise LookupError(msg) -def _validate_task_handler_action_for_role(th_action: dict[str, Any]) -> None: - """Verify that the task handler action is valid for role include.""" - module = th_action["__ansible_module__"] - - if "name" not in th_action: - raise MatchError(message=f"Failed to find required 'name' key in {module!s}") - - if not isinstance(th_action["name"], str): - raise MatchError( - message=f"Value assigned to 'name' key on '{module!s}' is not a string.", - ) - - -def _roles_children( - basedir: str, - k: str, - v: Sequence[Any], - parent_type: FileType, # noqa: ARG001 - main: str = "main", -) -> list[Lintable]: - # pylint: disable=unused-argument # parent_type) - results: list[Lintable] = [] - if not v: - # typing does not prevent junk from being passed in - return results - for role in v: - if isinstance(role, dict): - if "role" in role or "name" in role: - if "tags" not in role or "skip_ansible_lint" not in role["tags"]: - results.extend( - _look_for_role_files( - basedir, - role.get("role", role.get("name")), - main=main, - ), - ) - elif k != "dependencies": - msg = f'role dict {role} does not contain a "role" or "name" key' - raise SystemExit(msg) - else: - results.extend(_look_for_role_files(basedir, role, main=main)) - return results - - def _rolepath(basedir: str, role: str) -> str | None: role_path = None @@ -469,7 +517,7 @@ def _rolepath(basedir: str, role: str) -> str | None: path_dwim(basedir, os.path.join("..", role)), ] - for loc in get_app(offline=True).runtime.config.default_roles_path: + for loc in get_app(cached=True).runtime.config.default_roles_path: loc = os.path.expanduser(loc) possible_paths.append(path_dwim(loc, role)) @@ -486,12 +534,7 @@ def _rolepath(basedir: str, role: str) -> str | None: return role_path -def _look_for_role_files( - basedir: str, - role: str, - main: str | None = "main", # noqa: ARG001 -) -> list[Lintable]: - # pylint: disable=unused-argument # main +def _look_for_role_files(basedir: str, role: str) -> list[Lintable]: role_path = _rolepath(basedir, role) if not role_path: # pragma: no branch return [] @@ -539,13 +582,14 @@ def _extract_ansible_parsed_keys_from_task( return result -def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]: +def normalize_task_v2(task: Task) -> dict[str, Any]: """Ensure tasks have a normalized action key and strings are converted to python objects.""" + raw_task = task.raw_task result: dict[str, Any] = {} ansible_parsed_keys = ("action", "local_action", "args", "delegate_to") - if is_nested_task(task): - _extract_ansible_parsed_keys_from_task(result, task, ansible_parsed_keys) + if is_nested_task(raw_task): + _extract_ansible_parsed_keys_from_task(result, raw_task, ansible_parsed_keys) # Add dummy action for block/always/rescue statements result["action"] = { "__ansible_module__": "block/always/rescue", @@ -554,7 +598,7 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]: return result - sanitized_task = _sanitize_task(task) + sanitized_task = _sanitize_task(raw_task) mod_arg_parser = ModuleArgsParser(sanitized_task) try: @@ -562,12 +606,11 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]: skip_action_validation=options.skip_action_validation, ) except AnsibleParserError as exc: - # pylint: disable=raise-missing-from raise MatchError( rule=AnsibleParserErrorRule(), message=exc.message, - filename=task.get(FILENAME_KEY, "Unknown"), - lineno=task.get(LINE_NUMBER_KEY, 0), + lintable=Lintable(task.filename or ""), + lineno=raw_task.get(LINE_NUMBER_KEY, 1), ) from exc # denormalize shell -> command conversion @@ -577,13 +620,13 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]: _extract_ansible_parsed_keys_from_task( result, - task, + raw_task, (*ansible_parsed_keys, action), ) if not isinstance(action, str): msg = f"Task actions can only be strings, got {action}" - raise RuntimeError(msg) + raise TypeError(msg) action_unnormalized = action # convert builtin fqn calls to short forms because most rules know only # about short calls but in the future we may switch the normalization to do @@ -599,17 +642,6 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]: return result -def normalize_task(task: dict[str, Any], filename: str) -> dict[str, Any]: - """Unify task-like object structures.""" - ansible_action_type = task.get("__ansible_action_type__", "task") - if "__ansible_action_type__" in task: - del task["__ansible_action_type__"] - task = normalize_task_v2(task) - task[FILENAME_KEY] = filename - task["__ansible_action_type__"] = ansible_action_type - return task - - def task_to_str(task: dict[str, Any]) -> str: """Make a string identifier for the given task.""" name = task.get("name") @@ -634,7 +666,7 @@ def task_to_str(task: dict[str, Any]) -> str: _raw_params = action.get("_raw_params", []) if isinstance(_raw_params, list): for item in _raw_params: - args.append(str(item)) + args.extend(str(item)) else: args.append(_raw_params) @@ -698,7 +730,11 @@ class Task(dict[str, Any]): @property def name(self) -> str | None: """Return the name of the task.""" - return self.raw_task.get("name", None) + name = self.raw_task.get("name", None) + if name is not None and not isinstance(name, str): + msg = "Task name can only be a string." + raise RuntimeError(msg) + return name @property def action(self) -> str: @@ -706,7 +742,7 @@ class Task(dict[str, Any]): action_name = self.normalized_task["action"]["__ansible_module_original__"] if not isinstance(action_name, str): msg = "Task actions can only be strings." - raise RuntimeError(msg) + raise TypeError(msg) return action_name @property @@ -729,10 +765,7 @@ class Task(dict[str, Any]): """Return the name of the task.""" if not hasattr(self, "_normalized_task"): try: - self._normalized_task = normalize_task( - self.raw_task, - filename=self.filename, - ) + self._normalized_task = self._normalize_task() except MatchError as err: self.error = err # When we cannot normalize it, we just use the raw task instead @@ -740,15 +773,35 @@ class Task(dict[str, Any]): self._normalized_task = self.raw_task if isinstance(self._normalized_task, _MISSING_TYPE): msg = "Task was not normalized" - raise RuntimeError(msg) + raise TypeError(msg) return self._normalized_task + def _normalize_task(self) -> dict[str, Any]: + """Unify task-like object structures.""" + ansible_action_type = self.raw_task.get("__ansible_action_type__", "task") + if "__ansible_action_type__" in self.raw_task: + del self.raw_task["__ansible_action_type__"] + task = normalize_task_v2(self) + task[FILENAME_KEY] = self.filename + task["__ansible_action_type__"] = ansible_action_type + return task + @property def skip_tags(self) -> list[str]: """Return the list of tags to skip.""" skip_tags: list[str] = self.raw_task.get(SKIPPED_RULES_KEY, []) return skip_tags + def is_handler(self) -> bool: + """Return true for tasks that are handlers.""" + is_handler_file = False + if isinstance(self._normalized_task, dict): + file_name = str(self._normalized_task["action"].get(FILENAME_KEY, None)) + if file_name: + paths = file_name.split("/") + is_handler_file = "handlers" in paths + return is_handler_file if is_handler_file else ".handlers[" in self.position + def __repr__(self) -> str: """Return a string representation of the task.""" return f"Task('{self.name}' [{self.position}])" @@ -761,7 +814,7 @@ class Task(dict[str, Any]): """Allow access as task[...].""" return self.normalized_task[index] - def __iter__(self) -> Generator[str, None, None]: + def __iter__(self) -> Iterator[str]: """Provide support for 'key in task'.""" yield from (f for f in self.normalized_task) @@ -857,7 +910,7 @@ def parse_yaml_linenumbers( node = Composer.compose_node(loader, parent, index) if not isinstance(node, yaml.nodes.Node): msg = "Unexpected yaml data." - raise RuntimeError(msg) + raise TypeError(msg) node.__line__ = line + 1 # type: ignore[attr-defined] return node @@ -870,9 +923,7 @@ def parse_yaml_linenumbers( if hasattr(node, "__line__"): mapping[LINE_NUMBER_KEY] = node.__line__ else: - mapping[ - LINE_NUMBER_KEY - ] = mapping._line_number # pylint: disable=protected-access # noqa: SLF001 + mapping[LINE_NUMBER_KEY] = mapping._line_number # noqa: SLF001 mapping[FILENAME_KEY] = lintable.path return mapping @@ -895,8 +946,9 @@ def parse_yaml_linenumbers( yaml.parser.ParserError, yaml.scanner.ScannerError, yaml.constructor.ConstructorError, + ruamel.yaml.parser.ParserError, ) as exc: - msg = "Failed to load YAML file" + msg = f"Failed to load YAML file: {lintable.path}" raise RuntimeError(msg) from exc if len(result) == 0: @@ -975,7 +1027,6 @@ def is_playbook(filename: str) -> bool: return False -# pylint: disable=too-many-statements def get_lintables( opts: Options = options, args: list[str] | None = None, @@ -1018,3 +1069,49 @@ def _extend_with_roles(lintables: list[Lintable]) -> None: def convert_to_boolean(value: Any) -> bool: """Use Ansible to convert something to a boolean.""" return bool(boolean(value)) + + +def parse_examples_from_plugin(lintable: Lintable) -> tuple[int, str]: + """Parse yaml inside plugin EXAMPLES string. + + Store a line number offset to realign returned line numbers later + """ + offset = 1 + parsed = ast.parse(lintable.content) + for child in parsed.body: + if isinstance(child, ast.Assign): + label = child.targets[0] + if isinstance(label, ast.Name) and label.id == "EXAMPLES": + offset = child.lineno - 1 + break + + docs = read_docstring(str(lintable.path)) + examples = docs["plainexamples"] + + # Ignore the leading newline and lack of document start + # as including those in EXAMPLES would be weird. + return offset, (f"---{examples}" if examples else "") + + +@lru_cache +def load_plugin(name: str) -> PluginLoadContext: + """Return loaded ansible plugin/module.""" + loaded_module = action_loader.find_plugin_with_context( + name, + ignore_deprecated=True, + check_aliases=True, + ) + if not loaded_module.resolved: + loaded_module = module_loader.find_plugin_with_context( + name, + ignore_deprecated=True, + check_aliases=True, + ) + if not loaded_module.resolved and name.startswith("ansible.builtin."): + # fallback to core behavior of using legacy + loaded_module = module_loader.find_plugin_with_context( + name.replace("ansible.builtin.", "ansible.legacy."), + ignore_deprecated=True, + check_aliases=True, + ) + return loaded_module |