summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/utils.py')
-rw-r--r--src/ansiblelint/utils.py1020
1 files changed, 1020 insertions, 0 deletions
diff --git a/src/ansiblelint/utils.py b/src/ansiblelint/utils.py
new file mode 100644
index 0000000..9cb97aa
--- /dev/null
+++ b/src/ansiblelint/utils.py
@@ -0,0 +1,1020 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+# spell-checker:ignore dwim
+# pylint: disable=too-many-lines
+"""Generic utility helpers."""
+from __future__ import annotations
+
+import contextlib
+import inspect
+import logging
+import os
+import re
+from collections.abc import Generator, ItemsView, Iterator, Mapping, Sequence
+from dataclasses import _MISSING_TYPE, dataclass, field
+from functools import cache
+from pathlib import Path
+from typing import Any
+
+import yaml
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.module_utils.parsing.convert_bool import boolean
+from ansible.parsing.dataloader import DataLoader
+from ansible.parsing.mod_args import ModuleArgsParser
+from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping
+from ansible.parsing.yaml.loader import AnsibleLoader
+from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence
+from ansible.plugins.loader import add_all_plugin_dirs
+from ansible.template import Templar
+from ansible.utils.collection_loader import AnsibleCollectionConfig
+from yaml.composer import Composer
+from yaml.representer import RepresenterError
+
+from ansiblelint._internal.rules import (
+ AnsibleParserErrorRule,
+ RuntimeErrorRule,
+)
+from ansiblelint.app import get_app
+from ansiblelint.config import Options, options
+from ansiblelint.constants import (
+ ANNOTATION_KEYS,
+ FILENAME_KEY,
+ INCLUSION_ACTION_NAMES,
+ LINE_NUMBER_KEY,
+ NESTED_TASK_KEYS,
+ PLAYBOOK_TASK_KEYWORDS,
+ ROLE_IMPORT_ACTION_NAMES,
+ SKIPPED_RULES_KEY,
+ FileType,
+)
+from ansiblelint.errors import MatchError
+from ansiblelint.file_utils import Lintable, discover_lintables
+from ansiblelint.skip_utils import is_nested_task
+from ansiblelint.text import removeprefix
+
+# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a
+# string as the password to enable such yaml files to be opened and parsed
+# successfully.
+DEFAULT_VAULT_PASSWORD = "x" # noqa: S105
+COLLECTION_PLAY_RE = re.compile(r"^[\w\d_]+\.[\w\d_]+\.[\w\d_]+$")
+
+PLAYBOOK_DIR = os.environ.get("ANSIBLE_PLAYBOOK_DIR", None)
+
+
+_logger = logging.getLogger(__name__)
+
+
+def parse_yaml_from_file(filepath: str) -> AnsibleBaseYAMLObject:
+ """Extract a decrypted YAML object from file."""
+ dataloader = DataLoader()
+ if hasattr(dataloader, "set_vault_password"):
+ dataloader.set_vault_password(DEFAULT_VAULT_PASSWORD)
+ return dataloader.load_from_file(filepath)
+
+
+def path_dwim(basedir: str, given: str) -> str:
+ """Convert a given path do-what-I-mean style."""
+ dataloader = DataLoader()
+ dataloader.set_basedir(basedir)
+ return str(dataloader.path_dwim(given))
+
+
+def ansible_templar(basedir: Path, templatevars: Any) -> Templar:
+ """Create an Ansible Templar using templatevars."""
+ # `basedir` is the directory containing the lintable file.
+ # Therefore, for tasks in a role, `basedir` has the form
+ # `roles/some_role/tasks`. On the other hand, the search path
+ # is `roles/some_role/{files,templates}`. As a result, the
+ # `tasks` part in the basedir should be stripped stripped.
+ if basedir.name == "tasks":
+ basedir = basedir.parent
+
+ dataloader = DataLoader()
+ dataloader.set_basedir(basedir)
+ templar = Templar(dataloader, variables=templatevars)
+ return templar
+
+
+def mock_filter(left: Any, *args: Any, **kwargs: Any) -> Any: # noqa: ARG001
+ """Mock a filter that can take any combination of args and kwargs.
+
+ This will return x when x | filter(y,z) is called
+ e.g. {{ foo | ansible.utils.ipaddr('address') }}
+
+ :param left: The left hand side of the filter
+ :param args: The args passed to the filter
+ :param kwargs: The kwargs passed to the filter
+ :return: The left hand side of the filter
+ """
+ # pylint: disable=unused-argument
+ return left
+
+
+def ansible_template(
+ basedir: Path,
+ varname: Any,
+ templatevars: Any,
+ **kwargs: Any,
+) -> Any:
+ """Render a templated string by mocking missing filters.
+
+ In the case of a missing lookup, ansible core does an early exit
+ when disable_lookup=True but this happens after the jinja2 syntax already passed
+ return the original string as if it had been templated.
+
+ In the case of a missing filter, extract the missing filter plugin name
+ from the ansible error, 'Could not load "filter"'. Then mock the filter
+ and template the string again. The range allows for up to 10 unknown filters
+ in succession
+
+ :param basedir: The directory containing the lintable file
+ :param varname: The string to be templated
+ :param templatevars: The variables to be used in the template
+ :param kwargs: Additional arguments to be passed to the templating engine
+ :return: The templated string or None
+ :raises: AnsibleError if the filter plugin cannot be extracted or the
+ string could not be templated in 10 attempts
+ """
+ # pylint: disable=too-many-locals
+ filter_error = "template error while templating string:"
+ lookup_error = "was found, however lookups were disabled from templating"
+ re_filter_fqcn = re.compile(r"\w+\.\w+\.\w+")
+ re_filter_in_err = re.compile(r"Could not load \"(\w+)\"")
+ re_valid_filter = re.compile(r"^\w+(\.\w+\.\w+)?$")
+ templar = ansible_templar(basedir=basedir, templatevars=templatevars)
+
+ kwargs["disable_lookups"] = True
+ for _i in range(10):
+ try:
+ templated = templar.template(varname, **kwargs)
+ return templated
+ except AnsibleError as exc:
+ if lookup_error in exc.message:
+ return varname
+ if exc.message.startswith(filter_error):
+ while True:
+ match = re_filter_in_err.search(exc.message)
+ if match:
+ missing_filter = match.group(1)
+ break
+ match = re_filter_fqcn.search(exc.message)
+ if match:
+ missing_filter = match.group(0)
+ break
+ missing_filter = exc.message.split("'")[1]
+ break
+
+ if not re_valid_filter.match(missing_filter):
+ err = f"Could not parse missing filter name from error message: {exc.message}"
+ _logger.warning(err)
+ raise
+
+ # pylint: disable=protected-access
+ templar.environment.filters._delegatee[ # noqa: SLF001
+ missing_filter
+ ] = mock_filter
+ # Record the mocked filter so we can warn the user
+ if missing_filter not in options.mock_filters:
+ _logger.debug("Mocking missing filter %s", missing_filter)
+ options.mock_filters.append(missing_filter)
+ continue
+ raise
+ return None
+
+
+BLOCK_NAME_TO_ACTION_TYPE_MAP = {
+ "tasks": "task",
+ "handlers": "handler",
+ "pre_tasks": "task",
+ "post_tasks": "task",
+ "block": "meta",
+ "rescue": "meta",
+ "always": "meta",
+}
+
+
+def tokenize(line: str) -> tuple[str, list[str], dict[str, str]]:
+ """Parse a string task invocation."""
+ tokens = line.lstrip().split(" ")
+ if tokens[0] == "-":
+ tokens = tokens[1:]
+ if tokens[0] == "action:" or tokens[0] == "local_action:":
+ tokens = tokens[1:]
+ command = tokens[0].replace(":", "")
+
+ args = []
+ kwargs = {}
+ non_kv_found = False
+ for arg in tokens[1:]:
+ if "=" in arg and not non_kv_found:
+ key_value = arg.split("=", 1)
+ kwargs[key_value[0]] = key_value[1]
+ else:
+ non_kv_found = True
+ args.append(arg)
+ return (command, args, kwargs)
+
+
+def playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore[type-arg]
+ """Return a list of items from within the playbook."""
+ if isinstance(pb_data, dict):
+ return pb_data.items()
+ if not pb_data:
+ return [] # type: ignore[return-value]
+
+ # "if play" prevents failure if the play sequence contains None,
+ # which is weird but currently allowed by Ansible
+ # https://github.com/ansible/ansible-lint/issues/849
+ return [item for play in pb_data if play for item in play.items()] # type: ignore[return-value]
+
+
+def set_collections_basedir(basedir: Path) -> None:
+ """Set the playbook directory as playbook_paths for the collection loader."""
+ # Ansible expects only absolute paths inside `playbook_paths` and will
+ # produce weird errors if we use a relative one.
+ AnsibleCollectionConfig.playbook_paths = str(basedir.resolve())
+
+
+def template(
+ basedir: Path,
+ value: Any,
+ variables: Any,
+ *,
+ fail_on_error: bool = False,
+ fail_on_undefined: bool = False,
+ **kwargs: str,
+) -> Any:
+ """Attempt rendering a value with known vars."""
+ try:
+ value = ansible_template(
+ basedir.resolve(),
+ value,
+ variables,
+ **dict(kwargs, fail_on_undefined=fail_on_undefined),
+ )
+ # Hack to skip the following exception when using to_json filter on a variable. # noqa: FIX004
+ # I guess the filter doesn't like empty vars...
+ except (AnsibleError, ValueError, RepresenterError):
+ # templating failed, so just keep value as is.
+ if fail_on_error:
+ raise
+ return value
+
+
+def _include_children(
+ basedir: str,
+ k: str,
+ v: Any,
+ parent_type: FileType,
+) -> list[Lintable]:
+ # handle special case include_tasks: name=filename.yml
+ if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v:
+ v = v["file"]
+
+ # we cannot really parse any jinja2 in includes, so we ignore them
+ if not v or "{{" in v:
+ return []
+
+ if "import_playbook" in k and COLLECTION_PLAY_RE.match(v):
+ # Any import_playbooks from collections should be ignored as ansible
+ # own syntax check will handle them.
+ return []
+
+ # handle include: filename.yml tags=blah
+ # pylint: disable=unused-variable
+ (command, args, kwargs) = tokenize(f"{k}: {v}")
+
+ result = path_dwim(basedir, args[0])
+ while basedir not in ["", "/"]:
+ if os.path.exists(result):
+ break
+ basedir = os.path.dirname(basedir)
+ result = path_dwim(basedir, args[0])
+
+ return [Lintable(result, kind=parent_type)]
+
+
+def _taskshandlers_children(
+ basedir: str,
+ k: str,
+ v: None | Any,
+ parent_type: FileType,
+) -> list[Lintable]:
+ results: list[Lintable] = []
+ if v is None:
+ raise MatchError(
+ message="A malformed block was encountered while loading a block.",
+ rule=RuntimeErrorRule(),
+ )
+ for task_handler in v:
+ # ignore empty tasks, `-`
+ if not task_handler:
+ continue
+
+ with contextlib.suppress(LookupError):
+ children = _get_task_handler_children_for_tasks_or_playbooks(
+ task_handler,
+ basedir,
+ k,
+ parent_type,
+ )
+ results.append(children)
+ continue
+
+ if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES):
+ task_handler = normalize_task_v2(task_handler)
+ _validate_task_handler_action_for_role(task_handler["action"])
+ results.extend(
+ _roles_children(
+ basedir,
+ k,
+ [task_handler["action"].get("name")],
+ parent_type,
+ main=task_handler["action"].get("tasks_from", "main"),
+ ),
+ )
+ continue
+
+ if "block" not in task_handler:
+ continue
+
+ results.extend(
+ _taskshandlers_children(basedir, k, task_handler["block"], parent_type),
+ )
+ if "rescue" in task_handler:
+ results.extend(
+ _taskshandlers_children(
+ basedir,
+ k,
+ task_handler["rescue"],
+ parent_type,
+ ),
+ )
+ if "always" in task_handler:
+ results.extend(
+ _taskshandlers_children(
+ basedir,
+ k,
+ task_handler["always"],
+ parent_type,
+ ),
+ )
+
+ return results
+
+
+def _get_task_handler_children_for_tasks_or_playbooks(
+ task_handler: dict[str, Any],
+ basedir: str,
+ k: Any,
+ parent_type: FileType,
+) -> Lintable:
+ """Try to get children of taskhandler for include/import tasks/playbooks."""
+ child_type = k if parent_type == "playbook" else parent_type
+
+ # Include the FQCN task names as this happens before normalize
+ for task_handler_key in INCLUSION_ACTION_NAMES:
+ with contextlib.suppress(KeyError):
+ # ignore empty tasks
+ if not task_handler: # pragma: no branch
+ continue
+
+ file_name = task_handler[task_handler_key]
+ if isinstance(file_name, Mapping) and file_name.get("file", None):
+ file_name = file_name["file"]
+
+ f = path_dwim(basedir, file_name)
+ while basedir not in ["", "/"]:
+ if os.path.exists(f):
+ break
+ basedir = os.path.dirname(basedir)
+ f = path_dwim(basedir, file_name)
+ return Lintable(f, kind=child_type)
+ msg = f'The node contains none of: {", ".join(sorted(INCLUSION_ACTION_NAMES))}'
+ raise LookupError(msg)
+
+
+def _validate_task_handler_action_for_role(th_action: dict[str, Any]) -> None:
+ """Verify that the task handler action is valid for role include."""
+ module = th_action["__ansible_module__"]
+
+ if "name" not in th_action:
+ raise MatchError(message=f"Failed to find required 'name' key in {module!s}")
+
+ if not isinstance(th_action["name"], str):
+ raise MatchError(
+ message=f"Value assigned to 'name' key on '{module!s}' is not a string.",
+ )
+
+
+def _roles_children(
+ basedir: str,
+ k: str,
+ v: Sequence[Any],
+ parent_type: FileType, # noqa: ARG001
+ main: str = "main",
+) -> list[Lintable]:
+ # pylint: disable=unused-argument # parent_type)
+ results: list[Lintable] = []
+ if not v:
+ # typing does not prevent junk from being passed in
+ return results
+ for role in v:
+ if isinstance(role, dict):
+ if "role" in role or "name" in role:
+ if "tags" not in role or "skip_ansible_lint" not in role["tags"]:
+ results.extend(
+ _look_for_role_files(
+ basedir,
+ role.get("role", role.get("name")),
+ main=main,
+ ),
+ )
+ elif k != "dependencies":
+ msg = f'role dict {role} does not contain a "role" or "name" key'
+ raise SystemExit(msg)
+ else:
+ results.extend(_look_for_role_files(basedir, role, main=main))
+ return results
+
+
+def _rolepath(basedir: str, role: str) -> str | None:
+ role_path = None
+
+ possible_paths = [
+ # if included from a playbook
+ path_dwim(basedir, os.path.join("roles", role)),
+ path_dwim(basedir, role),
+ # if included from roles/[role]/meta/main.yml
+ path_dwim(basedir, os.path.join("..", "..", "..", "roles", role)),
+ path_dwim(basedir, os.path.join("..", "..", role)),
+ # if checking a role in the current directory
+ path_dwim(basedir, os.path.join("..", role)),
+ ]
+
+ for loc in get_app(offline=True).runtime.config.default_roles_path:
+ loc = os.path.expanduser(loc)
+ possible_paths.append(path_dwim(loc, role))
+
+ possible_paths.append(path_dwim(basedir, ""))
+
+ for path_option in possible_paths: # pragma: no branch
+ if os.path.isdir(path_option):
+ role_path = path_option
+ break
+
+ if role_path: # pragma: no branch
+ add_all_plugin_dirs(role_path)
+
+ return role_path
+
+
+def _look_for_role_files(
+ basedir: str,
+ role: str,
+ main: str | None = "main", # noqa: ARG001
+) -> list[Lintable]:
+ # pylint: disable=unused-argument # main
+ role_path = _rolepath(basedir, role)
+ if not role_path: # pragma: no branch
+ return []
+
+ results = []
+
+ for kind in ["tasks", "meta", "handlers", "vars", "defaults"]:
+ current_path = os.path.join(role_path, kind)
+ for folder, _, files in os.walk(current_path):
+ for file in files:
+ file_ignorecase = file.lower()
+ if file_ignorecase.endswith((".yml", ".yaml")):
+ results.append(Lintable(os.path.join(folder, file)))
+
+ return results
+
+
+def _sanitize_task(task: dict[str, Any]) -> dict[str, Any]:
+ """Return a stripped-off task structure compatible with new Ansible.
+
+ This helper takes a copy of the incoming task and drops
+ any internally used keys from it.
+ """
+ result = task.copy()
+ # task is an AnsibleMapping which inherits from OrderedDict, so we need
+ # to use `del` to remove unwanted keys.
+ for k in [SKIPPED_RULES_KEY, FILENAME_KEY, LINE_NUMBER_KEY]:
+ if k in result:
+ del result[k]
+ return result
+
+
+def _extract_ansible_parsed_keys_from_task(
+ result: dict[str, Any],
+ task: dict[str, Any],
+ keys: tuple[str, ...],
+) -> dict[str, Any]:
+ """Return a dict with existing key in task."""
+ for k, v in list(task.items()):
+ if k in keys:
+ # we don't want to re-assign these values, which were
+ # determined by the ModuleArgsParser() above
+ continue
+ result[k] = v
+ return result
+
+
+def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]:
+ """Ensure tasks have a normalized action key and strings are converted to python objects."""
+ result: dict[str, Any] = {}
+ ansible_parsed_keys = ("action", "local_action", "args", "delegate_to")
+
+ if is_nested_task(task):
+ _extract_ansible_parsed_keys_from_task(result, task, ansible_parsed_keys)
+ # Add dummy action for block/always/rescue statements
+ result["action"] = {
+ "__ansible_module__": "block/always/rescue",
+ "__ansible_module_original__": "block/always/rescue",
+ }
+
+ return result
+
+ sanitized_task = _sanitize_task(task)
+ mod_arg_parser = ModuleArgsParser(sanitized_task)
+
+ try:
+ action, arguments, result["delegate_to"] = mod_arg_parser.parse(
+ skip_action_validation=options.skip_action_validation,
+ )
+ except AnsibleParserError as exc:
+ # pylint: disable=raise-missing-from
+ raise MatchError(
+ rule=AnsibleParserErrorRule(),
+ message=exc.message,
+ filename=task.get(FILENAME_KEY, "Unknown"),
+ lineno=task.get(LINE_NUMBER_KEY, 0),
+ ) from exc
+
+ # denormalize shell -> command conversion
+ if "_uses_shell" in arguments:
+ action = "shell"
+ del arguments["_uses_shell"]
+
+ _extract_ansible_parsed_keys_from_task(
+ result,
+ task,
+ (*ansible_parsed_keys, action),
+ )
+
+ if not isinstance(action, str):
+ msg = f"Task actions can only be strings, got {action}"
+ raise RuntimeError(msg)
+ action_unnormalized = action
+ # convert builtin fqn calls to short forms because most rules know only
+ # about short calls but in the future we may switch the normalization to do
+ # the opposite. Mainly we currently consider normalized the module listing
+ # used by `ansible-doc -t module -l 2>/dev/null`
+ action = removeprefix(action, "ansible.builtin.")
+ result["action"] = {
+ "__ansible_module__": action,
+ "__ansible_module_original__": action_unnormalized,
+ }
+
+ result["action"].update(arguments)
+ return result
+
+
+def normalize_task(task: dict[str, Any], filename: str) -> dict[str, Any]:
+ """Unify task-like object structures."""
+ ansible_action_type = task.get("__ansible_action_type__", "task")
+ if "__ansible_action_type__" in task:
+ del task["__ansible_action_type__"]
+ task = normalize_task_v2(task)
+ task[FILENAME_KEY] = filename
+ task["__ansible_action_type__"] = ansible_action_type
+ return task
+
+
+def task_to_str(task: dict[str, Any]) -> str:
+ """Make a string identifier for the given task."""
+ name = task.get("name")
+ if name:
+ return str(name)
+ action = task.get("action")
+ if isinstance(action, str) or not isinstance(action, dict):
+ return str(action)
+ args = [
+ f"{k}={v}"
+ for (k, v) in action.items()
+ if k
+ not in [
+ "__ansible_module__",
+ "__ansible_module_original__",
+ "_raw_params",
+ LINE_NUMBER_KEY,
+ FILENAME_KEY,
+ ]
+ ]
+
+ _raw_params = action.get("_raw_params", [])
+ if isinstance(_raw_params, list):
+ for item in _raw_params:
+ args.append(str(item))
+ else:
+ args.append(_raw_params)
+
+ return f"{action['__ansible_module__']} {' '.join(args)}"
+
+
+def extract_from_list(
+ blocks: AnsibleBaseYAMLObject,
+ candidates: list[str],
+ *,
+ recursive: bool = False,
+) -> list[Any]:
+ """Get action tasks from block structures."""
+ results = []
+ for block in blocks:
+ for candidate in candidates:
+ if isinstance(block, dict) and candidate in block:
+ if isinstance(block[candidate], list):
+ subresults = add_action_type(block[candidate], candidate)
+ if recursive:
+ subresults.extend(
+ extract_from_list(
+ subresults,
+ candidates,
+ recursive=recursive,
+ ),
+ )
+ results.extend(subresults)
+ elif block[candidate] is not None:
+ msg = f"Key '{candidate}' defined, but bad value: '{block[candidate]!s}'"
+ raise RuntimeError(msg)
+ return results
+
+
+@dataclass
+class Task(dict[str, Any]):
+ """Class that represents a task from linter point of view.
+
+ raw_task:
+ When looping through the tasks in the file, each "raw_task" is minimally
+ processed to include these special keys: __line__, __file__, skipped_rules.
+ normalized_task:
+ When each raw_task is "normalized", action shorthand (strings) get parsed
+ by ansible into python objects and the action key gets normalized. If the task
+ should be skipped (skipped is True) or normalizing it fails (error is not None)
+ then this is just the raw_task instead of a normalized copy.
+ skip_tags:
+ List of tags found to be skipped, from tags block or noqa comments
+ error:
+ This is normally None. It will be a MatchError when the raw_task cannot be
+ normalized due to an AnsibleParserError.
+ position: Any
+ """
+
+ raw_task: dict[str, Any]
+ filename: str = ""
+ _normalized_task: dict[str, Any] | _MISSING_TYPE = field(init=False, repr=False)
+ error: MatchError | None = None
+ position: Any = None
+
+ @property
+ def name(self) -> str | None:
+ """Return the name of the task."""
+ return self.raw_task.get("name", None)
+
+ @property
+ def action(self) -> str:
+ """Return the resolved action name."""
+ action_name = self.normalized_task["action"]["__ansible_module_original__"]
+ if not isinstance(action_name, str):
+ msg = "Task actions can only be strings."
+ raise RuntimeError(msg)
+ return action_name
+
+ @property
+ def args(self) -> Any:
+ """Return the arguments passed to the task action.
+
+ While we usually expect to return a dictionary, it can also
+ return a templated string when jinja is used.
+ """
+ if "args" in self.raw_task:
+ return self.raw_task["args"]
+ result = {}
+ for k, v in self.normalized_task["action"].items():
+ if k not in ANNOTATION_KEYS:
+ result[k] = v
+ return result
+
+ @property
+ def normalized_task(self) -> dict[str, Any]:
+ """Return the name of the task."""
+ if not hasattr(self, "_normalized_task"):
+ try:
+ self._normalized_task = normalize_task(
+ self.raw_task,
+ filename=self.filename,
+ )
+ except MatchError as err:
+ self.error = err
+ # When we cannot normalize it, we just use the raw task instead
+ # to avoid adding extra complexity to the rules.
+ self._normalized_task = self.raw_task
+ if isinstance(self._normalized_task, _MISSING_TYPE):
+ msg = "Task was not normalized"
+ raise RuntimeError(msg)
+ return self._normalized_task
+
+ @property
+ def skip_tags(self) -> list[str]:
+ """Return the list of tags to skip."""
+ skip_tags: list[str] = self.raw_task.get(SKIPPED_RULES_KEY, [])
+ return skip_tags
+
+ def __repr__(self) -> str:
+ """Return a string representation of the task."""
+ return f"Task('{self.name}' [{self.position}])"
+
+ def get(self, key: str, default: Any = None) -> Any:
+ """Get a value from the task."""
+ return self.normalized_task.get(key, default)
+
+ def __getitem__(self, index: str) -> Any:
+ """Allow access as task[...]."""
+ return self.normalized_task[index]
+
+ def __iter__(self) -> Generator[str, None, None]:
+ """Provide support for 'key in task'."""
+ yield from (f for f in self.normalized_task)
+
+
+def task_in_list(
+ data: AnsibleBaseYAMLObject,
+ file: Lintable,
+ kind: str,
+ position: str = ".",
+) -> Iterator[Task]:
+ """Get action tasks from block structures."""
+
+ def each_entry(data: AnsibleBaseYAMLObject, position: str) -> Iterator[Task]:
+ if not data:
+ return
+ for entry_index, entry in enumerate(data):
+ if not entry:
+ continue
+ _pos = f"{position}[{entry_index}]"
+ if isinstance(entry, dict):
+ yield Task(
+ entry,
+ position=_pos,
+ )
+ for block in [k for k in entry if k in NESTED_TASK_KEYS]:
+ yield from task_in_list(
+ data=entry[block],
+ file=file,
+ kind="tasks",
+ position=f"{_pos}.{block}",
+ )
+
+ if not isinstance(data, list):
+ return
+ if kind == "playbook":
+ attributes = ["tasks", "pre_tasks", "post_tasks", "handlers"]
+ for item_index, item in enumerate(data):
+ for attribute in attributes:
+ if not isinstance(item, dict):
+ continue
+ if attribute in item:
+ if isinstance(item[attribute], list):
+ yield from each_entry(
+ item[attribute],
+ f"{position }[{item_index}].{attribute}",
+ )
+ elif item[attribute] is not None:
+ msg = f"Key '{attribute}' defined, but bad value: '{item[attribute]!s}'"
+ raise RuntimeError(msg)
+ else:
+ yield from each_entry(data, position)
+
+
+def add_action_type(actions: AnsibleBaseYAMLObject, action_type: str) -> list[Any]:
+ """Add action markers to task objects."""
+ results = []
+ for action in actions:
+ # ignore empty task
+ if not action:
+ continue
+ action["__ansible_action_type__"] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type]
+ results.append(action)
+ return results
+
+
+def get_action_tasks(data: AnsibleBaseYAMLObject, file: Lintable) -> list[Any]:
+ """Get a flattened list of action tasks from the file."""
+ tasks = []
+ if file.kind in ["tasks", "handlers"]:
+ tasks = add_action_type(data, file.kind)
+ else:
+ tasks.extend(extract_from_list(data, PLAYBOOK_TASK_KEYWORDS))
+
+ # Add sub-elements of block/rescue/always to tasks list
+ tasks.extend(extract_from_list(tasks, NESTED_TASK_KEYS, recursive=True))
+
+ return tasks
+
+
+@cache
+def parse_yaml_linenumbers(
+ lintable: Lintable,
+) -> AnsibleBaseYAMLObject:
+ """Parse yaml as ansible.utils.parse_yaml but with linenumbers.
+
+ The line numbers are stored in each node's LINE_NUMBER_KEY key.
+ """
+ result = []
+
+ def compose_node(parent: yaml.nodes.Node, index: int) -> yaml.nodes.Node:
+ # the line number where the previous token has ended (plus empty lines)
+ line = loader.line
+ node = Composer.compose_node(loader, parent, index)
+ if not isinstance(node, yaml.nodes.Node):
+ msg = "Unexpected yaml data."
+ raise RuntimeError(msg)
+ node.__line__ = line + 1 # type: ignore[attr-defined]
+ return node
+
+ def construct_mapping(
+ node: AnsibleBaseYAMLObject,
+ *,
+ deep: bool = False,
+ ) -> AnsibleMapping:
+ mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep)
+ if hasattr(node, "__line__"):
+ mapping[LINE_NUMBER_KEY] = node.__line__
+ else:
+ mapping[
+ LINE_NUMBER_KEY
+ ] = mapping._line_number # pylint: disable=protected-access # noqa: SLF001
+ mapping[FILENAME_KEY] = lintable.path
+ return mapping
+
+ try:
+ kwargs = {}
+ if "vault_password" in inspect.getfullargspec(AnsibleLoader.__init__).args:
+ kwargs["vault_password"] = DEFAULT_VAULT_PASSWORD
+ loader = AnsibleLoader(lintable.content, **kwargs)
+ loader.compose_node = compose_node
+ loader.construct_mapping = construct_mapping
+ # while Ansible only accepts single documents, we also need to load
+ # multi-documents, as we attempt to load any YAML file, not only
+ # Ansible managed ones.
+ while True:
+ data = loader.get_data()
+ if data is None:
+ break
+ result.append(data)
+ except (
+ yaml.parser.ParserError,
+ yaml.scanner.ScannerError,
+ yaml.constructor.ConstructorError,
+ ) as exc:
+ msg = "Failed to load YAML file"
+ raise RuntimeError(msg) from exc
+
+ if len(result) == 0:
+ return None # empty documents
+ if len(result) == 1:
+ return result[0]
+ return result
+
+
+def get_cmd_args(task: dict[str, Any]) -> str:
+ """Extract the args from a cmd task as a string."""
+ if "cmd" in task["action"]:
+ args = task["action"]["cmd"]
+ else:
+ args = task["action"].get("_raw_params", [])
+ if not isinstance(args, str):
+ return " ".join(args)
+ return args
+
+
+def get_first_cmd_arg(task: dict[str, Any]) -> Any:
+ """Extract the first arg from a cmd task."""
+ try:
+ first_cmd_arg = get_cmd_args(task).split()[0]
+ except IndexError:
+ return None
+ return first_cmd_arg
+
+
+def get_second_cmd_arg(task: dict[str, Any]) -> Any:
+ """Extract the second arg from a cmd task."""
+ try:
+ second_cmd_arg = get_cmd_args(task).split()[1]
+ except IndexError:
+ return None
+ return second_cmd_arg
+
+
+def is_playbook(filename: str) -> bool:
+ """Check if the file is a playbook.
+
+ Given a filename, it should return true if it looks like a playbook. The
+ function is not supposed to raise exceptions.
+ """
+ # we assume is a playbook if we loaded a sequence of dictionaries where
+ # at least one of these keys is present:
+ playbooks_keys = {
+ "gather_facts",
+ "hosts",
+ "import_playbook",
+ "post_tasks",
+ "pre_tasks",
+ "roles",
+ "tasks",
+ }
+
+ # makes it work with Path objects by converting them to strings
+ if not isinstance(filename, str):
+ filename = str(filename)
+
+ try:
+ f = parse_yaml_from_file(filename)
+ except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
+ _logger.warning(
+ "Failed to load %s with %s, assuming is not a playbook.",
+ filename,
+ exc,
+ )
+ else:
+ if (
+ isinstance(f, AnsibleSequence)
+ and hasattr(next(iter(f), {}), "keys")
+ and playbooks_keys.intersection(next(iter(f), {}).keys())
+ ):
+ return True
+ return False
+
+
+# pylint: disable=too-many-statements
+def get_lintables(
+ opts: Options = options,
+ args: list[str] | None = None,
+) -> list[Lintable]:
+ """Detect files and directories that are lintable."""
+ lintables: list[Lintable] = []
+
+ # passing args bypass auto-detection mode
+ if args:
+ for arg in args:
+ lintable = Lintable(arg)
+ lintables.append(lintable)
+ else:
+ for filename in discover_lintables(opts):
+ path = Path(filename)
+ lintables.append(Lintable(path))
+
+ # stage 2: guess roles from current lintables, as there is no unique
+ # file that must be present in any kind of role.
+ _extend_with_roles(lintables)
+
+ return lintables
+
+
+def _extend_with_roles(lintables: list[Lintable]) -> None:
+ """Detect roles among lintables and adds them to the list."""
+ for lintable in lintables:
+ parts = lintable.path.parent.parts
+ if "roles" in parts:
+ role = lintable.path
+ while role.parent.name != "roles" and role.name:
+ role = role.parent
+ if role.exists() and not role.is_file():
+ lintable = Lintable(role)
+ if lintable.kind == "role" and lintable not in lintables:
+ _logger.debug("Added role: %s", lintable)
+ lintables.append(lintable)
+
+
+def convert_to_boolean(value: Any) -> bool:
+ """Use Ansible to convert something to a boolean."""
+ return bool(boolean(value))