diff options
Diffstat (limited to 'src/ansiblelint/utils.py')
-rw-r--r-- | src/ansiblelint/utils.py | 914 |
1 files changed, 914 insertions, 0 deletions
diff --git a/src/ansiblelint/utils.py b/src/ansiblelint/utils.py new file mode 100644 index 0000000..3e71e91 --- /dev/null +++ b/src/ansiblelint/utils.py @@ -0,0 +1,914 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# spell-checker:ignore dwim +"""Generic utility helpers.""" +from __future__ import annotations + +import contextlib +import inspect +import logging +import os +import re +import warnings +from argparse import Namespace +from collections.abc import ItemsView, Mapping +from functools import lru_cache +from pathlib import Path +from typing import Any, Callable, Generator, Sequence + +import yaml +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.module_utils.parsing.convert_bool import boolean +from ansible.parsing.dataloader import DataLoader +from ansible.parsing.mod_args import ModuleArgsParser +from ansible.parsing.splitter import split_args +from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping +from ansible.parsing.yaml.loader import AnsibleLoader +from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence +from ansible.plugins.loader import add_all_plugin_dirs +from ansible.template import Templar +from ansible.utils.collection_loader import AnsibleCollectionConfig +from yaml.composer import Composer +from yaml.representer import RepresenterError + +from ansiblelint._internal.rules import ( + AnsibleParserErrorRule, + LoadingFailureRule, + RuntimeErrorRule, +) +from ansiblelint.app import get_app +from ansiblelint.config import options +from ansiblelint.constants import ( + FILENAME_KEY, + INCLUSION_ACTION_NAMES, + LINE_NUMBER_KEY, + NESTED_TASK_KEYS, + PLAYBOOK_TASK_KEYWORDS, + ROLE_IMPORT_ACTION_NAMES, + SKIPPED_RULES_KEY, + FileType, +) +from ansiblelint.errors import MatchError +from ansiblelint.file_utils import Lintable, discover_lintables +from ansiblelint.skip_utils import is_nested_task +from ansiblelint.text import removeprefix + +# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a +# string as the password to enable such yaml files to be opened and parsed +# successfully. +DEFAULT_VAULT_PASSWORD = "x" +COLLECTION_PLAY_RE = re.compile(r"^[\w\d_]+\.[\w\d_]+\.[\w\d_]+$") + +PLAYBOOK_DIR = os.environ.get("ANSIBLE_PLAYBOOK_DIR", None) + + +_logger = logging.getLogger(__name__) + + +def parse_yaml_from_file(filepath: str) -> AnsibleBaseYAMLObject: + """Extract a decrypted YAML object from file.""" + dataloader = DataLoader() + if hasattr(dataloader, "set_vault_password"): + dataloader.set_vault_password(DEFAULT_VAULT_PASSWORD) + return dataloader.load_from_file(filepath) + + +def path_dwim(basedir: str, given: str) -> str: + """Convert a given path do-what-I-mean style.""" + dataloader = DataLoader() + dataloader.set_basedir(basedir) + return str(dataloader.path_dwim(given)) + + +def ansible_templar(basedir: str, templatevars: Any) -> Templar: + """Create an Ansible Templar using templatevars.""" + # `basedir` is the directory containing the lintable file. + # Therefore, for tasks in a role, `basedir` has the form + # `roles/some_role/tasks`. On the other hand, the search path + # is `roles/some_role/{files,templates}`. As a result, the + # `tasks` part in the basedir should be stripped stripped. + if os.path.basename(basedir) == "tasks": + basedir = os.path.dirname(basedir) + + dataloader = DataLoader() + dataloader.set_basedir(basedir) + templar = Templar(dataloader, variables=templatevars) + return templar + + +def ansible_template( + basedir: str, varname: Any, templatevars: Any, **kwargs: Any +) -> Any: + """Render a templated string by mocking missing filters.""" + templar = ansible_templar(basedir=basedir, templatevars=templatevars) + # pylint: disable=unused-variable + for i in range(3): + try: + kwargs["disable_lookups"] = True + return templar.template(varname, **kwargs) + except AnsibleError as exc: + if ( + "was found, however lookups were disabled from templating" + in exc.message + ): + # ansible core does an early exit when disable_lookup=True but + # this happens after the jinja2 syntax already passed. + break + if ( + exc.message.startswith("template error while templating string:") + and "'" in exc.message + ): + missing_filter = exc.message.split("'")[1] + if missing_filter == "end of print statement": + raise + # Mock the filter to avoid and error from Ansible templating + # pylint: disable=protected-access + templar.environment.filters._delegatee[missing_filter] = lambda x: x + # Record the mocked filter so we can warn the user + if missing_filter not in options.mock_filters: + _logger.debug("Mocking missing filter %s", missing_filter) + options.mock_filters.append(missing_filter) + continue + raise + + +BLOCK_NAME_TO_ACTION_TYPE_MAP = { + "tasks": "task", + "handlers": "handler", + "pre_tasks": "task", + "post_tasks": "task", + "block": "meta", + "rescue": "meta", + "always": "meta", +} + + +def tokenize(line: str) -> tuple[str, list[str], dict[str, str]]: + """Parse a string task invocation.""" + tokens = line.lstrip().split(" ") + if tokens[0] == "-": + tokens = tokens[1:] + if tokens[0] == "action:" or tokens[0] == "local_action:": + tokens = tokens[1:] + command = tokens[0].replace(":", "") + + args = [] + kwargs = {} + non_kv_found = False + for arg in tokens[1:]: + if "=" in arg and not non_kv_found: + key_value = arg.split("=", 1) + kwargs[key_value[0]] = key_value[1] + else: + non_kv_found = True + args.append(arg) + return (command, args, kwargs) + + +def _playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore + if isinstance(pb_data, dict): + return pb_data.items() + if not pb_data: + return [] # type: ignore + + # "if play" prevents failure if the play sequence contains None, + # which is weird but currently allowed by Ansible + # https://github.com/ansible/ansible-lint/issues/849 + return [item for play in pb_data if play for item in play.items()] # type: ignore + + +def _set_collections_basedir(basedir: str) -> None: + # Sets the playbook directory as playbook_paths for the collection loader + AnsibleCollectionConfig.playbook_paths = basedir + + +def find_children(lintable: Lintable) -> list[Lintable]: # noqa: C901 + """Traverse children of a single file or folder.""" + if not lintable.path.exists(): + return [] + playbook_dir = str(lintable.path.parent) + _set_collections_basedir(playbook_dir or os.path.abspath(".")) + add_all_plugin_dirs(playbook_dir or ".") + if lintable.kind == "role": + playbook_ds = AnsibleMapping({"roles": [{"role": str(lintable.path)}]}) + elif lintable.kind not in ("playbook", "tasks"): + return [] + else: + try: + playbook_ds = parse_yaml_from_file(str(lintable.path)) + except AnsibleError as exc: + raise SystemExit(exc) from exc + results = [] + basedir = os.path.dirname(str(lintable.path)) + # playbook_ds can be an AnsibleUnicode string, which we consider invalid + if isinstance(playbook_ds, str): + raise MatchError(filename=lintable, rule=LoadingFailureRule()) + for item in _playbook_items(playbook_ds): + # if lintable.kind not in ["playbook"]: + # continue + for child in play_children(basedir, item, lintable.kind, playbook_dir): + # We avoid processing parametrized children + path_str = str(child.path) + if "$" in path_str or "{{" in path_str: + continue + + # Repair incorrect paths obtained when old syntax was used, like: + # - include: simpletask.yml tags=nginx + valid_tokens = [] + for token in split_args(path_str): + if "=" in token: + break + valid_tokens.append(token) + path = " ".join(valid_tokens) + if path != path_str: + child.path = Path(path) + child.name = child.path.name + + results.append(child) + return results + + +def template( + basedir: str, + value: Any, + variables: Any, + fail_on_error: bool = False, + fail_on_undefined: bool = False, + **kwargs: str, +) -> Any: + """Attempt rendering a value with known vars.""" + try: + value = ansible_template( + os.path.abspath(basedir), + value, + variables, + **dict(kwargs, fail_on_undefined=fail_on_undefined), + ) + # Hack to skip the following exception when using to_json filter on a variable. + # I guess the filter doesn't like empty vars... + except (AnsibleError, ValueError, RepresenterError): + # templating failed, so just keep value as is. + if fail_on_error: + raise + return value + + +def play_children( + basedir: str, item: tuple[str, Any], parent_type: FileType, playbook_dir: str +) -> list[Lintable]: + """Flatten the traversed play tasks.""" + # pylint: disable=unused-argument + delegate_map: dict[str, Callable[[str, Any, Any, FileType], list[Lintable]]] = { + "tasks": _taskshandlers_children, + "pre_tasks": _taskshandlers_children, + "post_tasks": _taskshandlers_children, + "block": _taskshandlers_children, + "include": _include_children, + "ansible.builtin.include": _include_children, + "import_playbook": _include_children, + "ansible.builtin.import_playbook": _include_children, + "roles": _roles_children, + "dependencies": _roles_children, + "handlers": _taskshandlers_children, + "include_tasks": _include_children, + "ansible.builtin.include_tasks": _include_children, + "import_tasks": _include_children, + "ansible.builtin.import_tasks": _include_children, + } + (k, v) = item + add_all_plugin_dirs(os.path.abspath(basedir)) + + if k in delegate_map: + if v: + v = template( + os.path.abspath(basedir), + v, + {"playbook_dir": PLAYBOOK_DIR or os.path.abspath(basedir)}, + fail_on_undefined=False, + ) + return delegate_map[k](basedir, k, v, parent_type) + return [] + + +def _include_children( + basedir: str, k: str, v: Any, parent_type: FileType +) -> list[Lintable]: + # handle special case include_tasks: name=filename.yml + if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v: + v = v["file"] + + # we cannot really parse any jinja2 in includes, so we ignore them + if not v or "{{" in v: + return [] + + if "import_playbook" in k and COLLECTION_PLAY_RE.match(v): + # Any import_playbooks from collections should be ignored as ansible + # own syntax check will handle them. + return [] + + # handle include: filename.yml tags=blah + # pylint: disable=unused-variable + (command, args, kwargs) = tokenize(f"{k}: {v}") + + result = path_dwim(basedir, args[0]) + while basedir not in ["", "/"]: + if os.path.exists(result): + break + basedir = os.path.dirname(basedir) + result = path_dwim(basedir, args[0]) + + return [Lintable(result, kind=parent_type)] + + +def _taskshandlers_children( + basedir: str, k: str, v: None | Any, parent_type: FileType +) -> list[Lintable]: + results: list[Lintable] = [] + if v is None: + raise MatchError( + message="A malformed block was encountered while loading a block.", + rule=RuntimeErrorRule(), + ) + for task_handler in v: + # ignore empty tasks, `-` + if not task_handler: + continue + + with contextlib.suppress(LookupError): + children = _get_task_handler_children_for_tasks_or_playbooks( + task_handler, + basedir, + k, + parent_type, + ) + results.append(children) + continue + + if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES): + # lgtm [py/unreachable-statement] + task_handler = normalize_task_v2(task_handler) + _validate_task_handler_action_for_role(task_handler["action"]) + results.extend( + _roles_children( + basedir, + k, + [task_handler["action"].get("name")], + parent_type, + main=task_handler["action"].get("tasks_from", "main"), + ) + ) + continue + + if "block" not in task_handler: + continue + + results.extend( + _taskshandlers_children(basedir, k, task_handler["block"], parent_type) + ) + if "rescue" in task_handler: + results.extend( + _taskshandlers_children(basedir, k, task_handler["rescue"], parent_type) + ) + if "always" in task_handler: + results.extend( + _taskshandlers_children(basedir, k, task_handler["always"], parent_type) + ) + + return results + + +def _get_task_handler_children_for_tasks_or_playbooks( + task_handler: dict[str, Any], + basedir: str, + k: Any, + parent_type: FileType, +) -> Lintable: + """Try to get children of taskhandler for include/import tasks/playbooks.""" + child_type = k if parent_type == "playbook" else parent_type + + # Include the FQCN task names as this happens before normalize + for task_handler_key in INCLUSION_ACTION_NAMES: + with contextlib.suppress(KeyError): + # ignore empty tasks + if not task_handler: # pragma: no branch + continue + + file_name = task_handler[task_handler_key] + if isinstance(file_name, Mapping) and file_name.get("file", None): + file_name = file_name["file"] + + f = path_dwim(basedir, file_name) + while basedir not in ["", "/"]: + if os.path.exists(f): + break + basedir = os.path.dirname(basedir) + f = path_dwim(basedir, file_name) + return Lintable(f, kind=child_type) + + raise LookupError( + f'The node contains none of: {", ".join(sorted(INCLUSION_ACTION_NAMES))}', + ) + + +def _validate_task_handler_action_for_role(th_action: dict[str, Any]) -> None: + """Verify that the task handler action is valid for role include.""" + module = th_action["__ansible_module__"] + + if "name" not in th_action: + raise MatchError(message=f"Failed to find required 'name' key in {module!s}") + + if not isinstance(th_action["name"], str): + raise MatchError( + message=f"Value assigned to 'name' key on '{module!s}' is not a string.", + ) + + +def _roles_children( + basedir: str, k: str, v: Sequence[Any], parent_type: FileType, main: str = "main" +) -> list[Lintable]: + # pylint: disable=unused-argument # parent_type) + results: list[Lintable] = [] + if not v: + # typing does not prevent junk from being passed in + return results + for role in v: + if isinstance(role, dict): + if "role" in role or "name" in role: + if "tags" not in role or "skip_ansible_lint" not in role["tags"]: + results.extend( + _look_for_role_files( + basedir, role.get("role", role.get("name")), main=main + ) + ) + elif k != "dependencies": + raise SystemExit( + f'role dict {role} does not contain a "role" or "name" key' + ) + else: + results.extend(_look_for_role_files(basedir, role, main=main)) + return results + + +def _rolepath(basedir: str, role: str) -> str | None: + role_path = None + + possible_paths = [ + # if included from a playbook + path_dwim(basedir, os.path.join("roles", role)), + path_dwim(basedir, role), + # if included from roles/[role]/meta/main.yml + path_dwim(basedir, os.path.join("..", "..", "..", "roles", role)), + path_dwim(basedir, os.path.join("..", "..", role)), + # if checking a role in the current directory + path_dwim(basedir, os.path.join("..", role)), + ] + + for loc in get_app().runtime.config.default_roles_path: + loc = os.path.expanduser(loc) + possible_paths.append(path_dwim(loc, role)) + + possible_paths.append(path_dwim(basedir, "")) + + for path_option in possible_paths: # pragma: no branch + if os.path.isdir(path_option): + role_path = path_option + break + + if role_path: # pragma: no branch + add_all_plugin_dirs(role_path) + + return role_path + + +def _look_for_role_files( + basedir: str, role: str, main: str | None = "main" +) -> list[Lintable]: + # pylint: disable=unused-argument # main + role_path = _rolepath(basedir, role) + if not role_path: # pragma: no branch + return [] + + results = [] + + for kind in ["tasks", "meta", "handlers", "vars", "defaults"]: + current_path = os.path.join(role_path, kind) + for folder, _, files in os.walk(current_path): + for file in files: + file_ignorecase = file.lower() + if file_ignorecase.endswith((".yml", ".yaml")): + results.append(Lintable(os.path.join(folder, file))) + + return results + + +def _kv_to_dict(v: str) -> dict[str, Any]: + (command, args, kwargs) = tokenize(v) + return {"__ansible_module__": command, "__ansible_arguments__": args, **kwargs} + + +def _sanitize_task(task: dict[str, Any]) -> dict[str, Any]: + """Return a stripped-off task structure compatible with new Ansible. + + This helper takes a copy of the incoming task and drops + any internally used keys from it. + """ + result = task.copy() + # task is an AnsibleMapping which inherits from OrderedDict, so we need + # to use `del` to remove unwanted keys. + for k in [SKIPPED_RULES_KEY, FILENAME_KEY, LINE_NUMBER_KEY]: + if k in result: + del result[k] + return result + + +def _extract_ansible_parsed_keys_from_task( + result: dict[str, Any], + task: dict[str, Any], + keys: tuple[str, ...], +) -> dict[str, Any]: + """Return a dict with existing key in task.""" + for k, v in list(task.items()): + if k in keys: + # we don't want to re-assign these values, which were + # determined by the ModuleArgsParser() above + continue + result[k] = v + return result + + +def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]: + """Ensure tasks have a normalized action key and strings are converted to python objects.""" + result: dict[str, Any] = {} + ansible_parsed_keys = ("action", "local_action", "args", "delegate_to") + + if is_nested_task(task): + _extract_ansible_parsed_keys_from_task(result, task, ansible_parsed_keys) + # Add dummy action for block/always/rescue statements + result["action"] = { + "__ansible_module__": "block/always/rescue", + "__ansible_module_original__": "block/always/rescue", + } + + return result + + sanitized_task = _sanitize_task(task) + mod_arg_parser = ModuleArgsParser(sanitized_task) + + try: + action, arguments, result["delegate_to"] = mod_arg_parser.parse( + skip_action_validation=options.skip_action_validation + ) + except AnsibleParserError as exc: + # pylint: disable=raise-missing-from + raise MatchError( + rule=AnsibleParserErrorRule(), + message=exc.message, + filename=task.get(FILENAME_KEY, "Unknown"), + linenumber=task.get(LINE_NUMBER_KEY, 0), + ) + + # denormalize shell -> command conversion + if "_uses_shell" in arguments: + action = "shell" + del arguments["_uses_shell"] + + _extract_ansible_parsed_keys_from_task( + result, task, ansible_parsed_keys + (action,) + ) + + if not isinstance(action, str): + raise RuntimeError(f"Task actions can only be strings, got {action}") + action_unnormalized = action + # convert builtin fqn calls to short forms because most rules know only + # about short calls but in the future we may switch the normalization to do + # the opposite. Mainly we currently consider normalized the module listing + # used by `ansible-doc -t module -l 2>/dev/null` + action = removeprefix(action, "ansible.builtin.") + result["action"] = { + "__ansible_module__": action, + "__ansible_module_original__": action_unnormalized, + } + + if "_raw_params" in arguments: + # Doing a split here is really bad as it would break jinja2 templating + # parsing of the template must happen before any kind of split. + result["action"]["__ansible_arguments__"] = [arguments["_raw_params"]] + del arguments["_raw_params"] + else: + result["action"]["__ansible_arguments__"] = [] + + if "argv" in arguments and not result["action"]["__ansible_arguments__"]: + result["action"]["__ansible_arguments__"] = arguments["argv"] + del arguments["argv"] + + result["action"].update(arguments) + return result + + +def normalize_task(task: dict[str, Any], filename: str) -> dict[str, Any]: + """Unify task-like object structures.""" + ansible_action_type = task.get("__ansible_action_type__", "task") + if "__ansible_action_type__" in task: + del task["__ansible_action_type__"] + task = normalize_task_v2(task) + task[FILENAME_KEY] = filename + task["__ansible_action_type__"] = ansible_action_type + return task + + +def task_to_str(task: dict[str, Any]) -> str: + """Make a string identifier for the given task.""" + name = task.get("name") + if name: + return str(name) + action = task.get("action") + if isinstance(action, str) or not isinstance(action, dict): + return str(action) + args = [ + f"{k}={v}" + for (k, v) in action.items() + if k + not in [ + "__ansible_module__", + "__ansible_module_original__", + "__ansible_arguments__", + LINE_NUMBER_KEY, + FILENAME_KEY, + ] + ] + + for item in action.get("__ansible_arguments__", []): + args.append(str(item)) + + return f"{action['__ansible_module__']} {' '.join(args)}" + + +def extract_from_list( + blocks: AnsibleBaseYAMLObject, candidates: list[str], recursive: bool = False +) -> list[Any]: + """Get action tasks from block structures.""" + results = [] + for block in blocks: + for candidate in candidates: + if isinstance(block, dict) and candidate in block: + if isinstance(block[candidate], list): + subresults = add_action_type(block[candidate], candidate) + if recursive: + subresults.extend( + extract_from_list(subresults, candidates, recursive) + ) + results.extend(subresults) + elif block[candidate] is not None: + raise RuntimeError( + f"Key '{candidate}' defined, but bad value: '{str(block[candidate])}'" + ) + return results + + +def add_action_type(actions: AnsibleBaseYAMLObject, action_type: str) -> list[Any]: + """Add action markers to task objects.""" + results = [] + for action in actions: + # ignore empty task + if not action: + continue + action["__ansible_action_type__"] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type] + results.append(action) + return results + + +def get_action_tasks(data: AnsibleBaseYAMLObject, file: Lintable) -> list[Any]: + """Get a flattened list of action tasks from the file.""" + tasks = [] + if file.kind in ["tasks", "handlers"]: + tasks = add_action_type(data, file.kind) + else: + tasks.extend(extract_from_list(data, PLAYBOOK_TASK_KEYWORDS)) + + # Add sub-elements of block/rescue/always to tasks list + tasks.extend(extract_from_list(tasks, NESTED_TASK_KEYS, recursive=True)) + + return tasks + + +@lru_cache(maxsize=None) +def parse_yaml_linenumbers( # noqa: max-complexity: 12 + lintable: Lintable, +) -> AnsibleBaseYAMLObject: + """Parse yaml as ansible.utils.parse_yaml but with linenumbers. + + The line numbers are stored in each node's LINE_NUMBER_KEY key. + """ + result = [] + + def compose_node(parent: yaml.nodes.Node, index: int) -> yaml.nodes.Node: + # the line number where the previous token has ended (plus empty lines) + line = loader.line + node = Composer.compose_node(loader, parent, index) + if not isinstance(node, yaml.nodes.Node): + raise RuntimeError("Unexpected yaml data.") + setattr(node, "__line__", line + 1) + return node + + def construct_mapping( + node: AnsibleBaseYAMLObject, deep: bool = False + ) -> AnsibleMapping: + mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep) + if hasattr(node, "__line__"): + mapping[LINE_NUMBER_KEY] = node.__line__ + else: + mapping[ + LINE_NUMBER_KEY + ] = mapping._line_number # pylint: disable=protected-access + mapping[FILENAME_KEY] = lintable.path + return mapping + + try: + kwargs = {} + if "vault_password" in inspect.getfullargspec(AnsibleLoader.__init__).args: + kwargs["vault_password"] = DEFAULT_VAULT_PASSWORD + loader = AnsibleLoader(lintable.content, **kwargs) + loader.compose_node = compose_node + loader.construct_mapping = construct_mapping + # while Ansible only accepts single documents, we also need to load + # multi-documents, as we attempt to load any YAML file, not only + # Ansible managed ones. + while True: + data = loader.get_data() + if data is None: + break + result.append(data) + except ( + yaml.parser.ParserError, + yaml.scanner.ScannerError, + yaml.constructor.ConstructorError, + ) as exc: + raise RuntimeError("Failed to load YAML file") from exc + + if len(result) == 0: + return None # empty documents + if len(result) == 1: + return result[0] + return result + + +def get_first_cmd_arg(task: dict[str, Any]) -> Any: + """Extract the first arg from a cmd task.""" + try: + if "cmd" in task["action"]: + first_cmd_arg = task["action"]["cmd"].split()[0] + else: + first_cmd_arg = task["action"]["__ansible_arguments__"][0].split()[0] + except IndexError: + return None + return first_cmd_arg + + +def get_second_cmd_arg(task: dict[str, Any]) -> Any: + """Extract the second arg from a cmd task.""" + try: + if "cmd" in task["action"]: + second_cmd_arg = task["action"]["cmd"].split()[1] + else: + second_cmd_arg = task["action"]["__ansible_arguments__"][0].split()[1] + except IndexError: + return None + return second_cmd_arg + + +def is_playbook(filename: str) -> bool: + """ + Check if the file is a playbook. + + Given a filename, it should return true if it looks like a playbook. The + function is not supposed to raise exceptions. + """ + # we assume is a playbook if we loaded a sequence of dictionaries where + # at least one of these keys is present: + playbooks_keys = { + "gather_facts", + "hosts", + "import_playbook", + "post_tasks", + "pre_tasks", + "roles", + "tasks", + } + + # makes it work with Path objects by converting them to strings + if not isinstance(filename, str): + filename = str(filename) + + try: + f = parse_yaml_from_file(filename) + except Exception as exc: # pylint: disable=broad-except + _logger.warning( + "Failed to load %s with %s, assuming is not a playbook.", filename, exc + ) + else: + if ( + isinstance(f, AnsibleSequence) + and hasattr(next(iter(f), {}), "keys") + and playbooks_keys.intersection(next(iter(f), {}).keys()) + ): + return True + return False + + +# pylint: disable=too-many-statements +def get_lintables( + opts: Namespace = Namespace(), args: list[str] | None = None +) -> list[Lintable]: + """Detect files and directories that are lintable.""" + lintables: list[Lintable] = [] + + # passing args bypass auto-detection mode + if args: + for arg in args: + lintable = Lintable(arg) + lintables.append(lintable) + else: + for filename in discover_lintables(opts): + path = Path(filename) + # skip exclusions + try: + for file_path in opts.exclude_paths: + if str(path.resolve()).startswith(str(file_path)): + raise FileNotFoundError( + f"File {file_path} matched exclusion entry: {path}" + ) + except FileNotFoundError as exc: + _logger.debug("Ignored %s due to: %s", path, exc) + continue + + if path.is_symlink() and not path.exists(): + _logger.warning("Ignored broken symlink %s -> %s", path, path.resolve()) + continue + + lintables.append(Lintable(path)) + + # stage 2: guess roles from current lintables, as there is no unique + # file that must be present in any kind of role. + _extend_with_roles(lintables) + + return lintables + + +def _extend_with_roles(lintables: list[Lintable]) -> None: + """Detect roles among lintables and adds them to the list.""" + for lintable in lintables: + parts = lintable.path.parent.parts + if "roles" in parts: + role = lintable.path + while role.parent.name != "roles" and role.name: + role = role.parent + if role.exists() and not role.is_file(): + lintable = Lintable(role, kind="role") + if lintable not in lintables: + _logger.debug("Added role: %s", lintable) + lintables.append(lintable) + + +def convert_to_boolean(value: Any) -> bool: + """Use Ansible to convert something to a boolean.""" + return bool(boolean(value)) + + +def nested_items( + data: dict[Any, Any] | list[Any], parent: str = "" +) -> Generator[tuple[Any, Any, str], None, None]: + """Iterate a nested data structure.""" + warnings.warn( + "Call to deprecated function ansiblelint.utils.nested_items. " + "Use ansiblelint.yaml_utils.nested_items_path instead.", + category=DeprecationWarning, + stacklevel=2, + ) + if isinstance(data, dict): + for k, v in data.items(): + yield k, v, parent + # pylint: disable=redefined-outer-name + for k, v, returned_parent in nested_items(v, k): + yield k, v, returned_parent + if isinstance(data, list): + for item in data: + yield "list-item", item, parent + for k, v, returned_parent in nested_items(item): + yield k, v, returned_parent |