diff options
Diffstat (limited to '')
-rw-r--r-- | src/ansiblelint/rules/partial_become.py | 283 |
1 files changed, 200 insertions, 83 deletions
diff --git a/src/ansiblelint/rules/partial_become.py b/src/ansiblelint/rules/partial_become.py index d14c06f..879b186 100644 --- a/src/ansiblelint/rules/partial_become.py +++ b/src/ansiblelint/rules/partial_become.py @@ -1,4 +1,5 @@ """Implementation of partial-become rule.""" + # Copyright (c) 2016 Will Thames <will@thames.id.au> # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -21,115 +22,231 @@ from __future__ import annotations import sys -from functools import reduce from typing import TYPE_CHECKING, Any +from ruamel.yaml.comments import CommentedMap, CommentedSeq + from ansiblelint.constants import LINE_NUMBER_KEY -from ansiblelint.rules import AnsibleLintRule +from ansiblelint.rules import AnsibleLintRule, TransformMixin if TYPE_CHECKING: + from collections.abc import Iterator + from ansiblelint.errors import MatchError from ansiblelint.file_utils import Lintable + from ansiblelint.utils import Task -def _get_subtasks(data: dict[str, Any]) -> list[Any]: - result: list[Any] = [] - block_names = [ - "tasks", - "pre_tasks", - "post_tasks", - "handlers", - "block", - "always", - "rescue", - ] - for name in block_names: - if data and name in data: - result += data[name] or [] - return result - - -def _nested_search(term: str, data: dict[str, Any]) -> Any: - if data and term in data: - return True - return reduce( - (lambda x, y: x or _nested_search(term, y)), - _get_subtasks(data), - False, - ) - - -def _become_user_without_become(becomeuserabove: bool, data: dict[str, Any]) -> Any: - if "become" in data: - # If become is in lineage of tree then correct - return False - if "become_user" in data and _nested_search("become", data): - # If 'become_user' on tree and become somewhere below - # we must check for a case of a second 'become_user' without a - # 'become' in its lineage - subtasks = _get_subtasks(data) - return reduce( - (lambda x, y: x or _become_user_without_become(False, y)), - subtasks, - False, - ) - if _nested_search("become_user", data): - # Keep searching down if 'become_user' exists in the tree below current task - subtasks = _get_subtasks(data) - return len(subtasks) == 0 or reduce( - ( - lambda x, y: x - or _become_user_without_become( - becomeuserabove or "become_user" in data, - y, - ) - ), - subtasks, - False, - ) - # If at bottom of tree, flag up if 'become_user' existed in the lineage of the tree and - # 'become' was not. This is an error if any lineage has a 'become_user' but no become - return becomeuserabove - - -class BecomeUserWithoutBecomeRule(AnsibleLintRule): - """become_user requires become to work as expected.""" +class BecomeUserWithoutBecomeRule(AnsibleLintRule, TransformMixin): + """``become_user`` should have a corresponding ``become`` at the play or task level.""" id = "partial-become" - description = "``become_user`` without ``become`` will not actually change user" + description = "``become_user`` should have a corresponding ``become`` at the play or task level." severity = "VERY_HIGH" tags = ["unpredictability"] version_added = "historic" - def matchplay(self, file: Lintable, data: dict[str, Any]) -> list[MatchError]: - if file.kind == "playbook": - result = _become_user_without_become(False, data) - if result: - return [ - self.create_matcherror( - message=self.shortdesc, - filename=file, - lineno=data[LINE_NUMBER_KEY], - ), - ] - return [] + def matchplay( + self: BecomeUserWithoutBecomeRule, + file: Lintable, + data: dict[str, Any], + ) -> list[MatchError]: + """Match become_user without become in play. + + :param file: The file to lint. + :param data: The data to lint (play) + :returns: A list of errors. + """ + if file.kind != "playbook": + return [] + errors = [] + partial = "become_user" in data and "become" not in data + if partial: + error = self.create_matcherror( + message=self.shortdesc, + filename=file, + tag=f"{self.id}[play]", + lineno=data[LINE_NUMBER_KEY], + ) + errors.append(error) + return errors + + def matchtask( + self: BecomeUserWithoutBecomeRule, + task: Task, + file: Lintable | None = None, + ) -> list[MatchError]: + """Match become_user without become in task. + + :param task: The task to lint. + :param file: The file to lint. + :returns: A list of errors. + """ + data = task.normalized_task + errors = [] + partial = "become_user" in data and "become" not in data + if partial: + error = self.create_matcherror( + message=self.shortdesc, + filename=file, + tag=f"{self.id}[task]", + lineno=task[LINE_NUMBER_KEY], + ) + errors.append(error) + return errors + + def _dive(self: BecomeUserWithoutBecomeRule, data: CommentedSeq) -> Iterator[Any]: + """Dive into the data and yield each item. + + :param data: The data to dive into. + :yield: Each item in the data. + """ + for item in data: + for nested in ("block", "rescue", "always"): + if nested in item: + yield from self._dive(item[nested]) + yield item + + def transform( + self: BecomeUserWithoutBecomeRule, + match: MatchError, + lintable: Lintable, + data: CommentedMap | CommentedSeq | str, + ) -> None: + """Transform the data. + + :param match: The match to transform. + :param lintable: The file to transform. + :param data: The data to transform. + """ + if not isinstance(data, CommentedSeq): + return + + obj = self.seek(match.yaml_path, data) + if "become" in obj and "become_user" in obj: + match.fixed = True + return + if "become" not in obj and "become_user" not in obj: + match.fixed = True + return + + self._transform_plays(plays=data) + + if "become" in obj and "become_user" in obj: + match.fixed = True + return + if "become" not in obj and "become_user" not in obj: + match.fixed = True + return + + def is_ineligible_for_transform( + self: BecomeUserWithoutBecomeRule, + data: CommentedMap, + ) -> bool: + """Check if the data is eligible for transformation. + + :param data: The data to check. + :returns: True if ineligible, False otherwise. + """ + if any("include" in key for key in data): + return True + if "notify" in data: + return True + return False + + def _transform_plays(self, plays: CommentedSeq) -> None: + """Transform the plays. + + :param plays: The plays to transform. + """ + for play in plays: + self._transform_play(play=play) + + def _transform_play(self, play: CommentedMap) -> None: + """Transform the play. + + :param play: The play to transform. + """ + # Ensure we have no includes in this play + task_groups = ("tasks", "pre_tasks", "post_tasks", "handlers") + for task_group in task_groups: + tasks = self._dive(play.get(task_group, [])) + for task in tasks: + if self.is_ineligible_for_transform(task): + return + remove_play_become_user = False + for task_group in task_groups: + tasks = self._dive(play.get(task_group, [])) + for task in tasks: + b_in_t = "become" in task + bu_in_t = "become_user" in task + b_in_p = "become" in play + bu_in_p = "become_user" in play + if b_in_t and not bu_in_t and bu_in_p: + # Preserve the end comment if become is the last key + comment = None + if list(task.keys())[-1] == "become" and "become" in task.ca.items: + comment = task.ca.items.pop("become") + become_index = list(task.keys()).index("become") + task.insert(become_index + 1, "become_user", play["become_user"]) + if comment: + self._attach_comment_end(task, comment) + remove_play_become_user = True + if bu_in_t and not b_in_t and b_in_p: + become_user_index = list(task.keys()).index("become_user") + task.insert(become_user_index, "become", play["become"]) + if bu_in_t and not b_in_t and not b_in_p: + # Preserve the end comment if become_user is the last key + comment = None + if ( + list(task.keys())[-1] == "become_user" + and "become_user" in task.ca.items + ): + comment = task.ca.items.pop("become_user") + task.pop("become_user") + if comment: + self._attach_comment_end(task, comment) + if remove_play_become_user: + del play["become_user"] + + def _attach_comment_end( + self, + obj: CommentedMap | CommentedSeq, + comment: Any, + ) -> None: + """Attach a comment to the end of the object. + + :param obj: The object to attach the comment to. + :param comment: The comment to attach. + """ + if isinstance(obj, CommentedMap): + last = list(obj.keys())[-1] + if not isinstance(obj[last], CommentedSeq | CommentedMap): + obj.ca.items[last] = comment + return + self._attach_comment_end(obj[last], comment) + elif isinstance(obj, CommentedSeq): + if not isinstance(obj[-1], CommentedSeq | CommentedMap): + obj.ca.items[len(obj)] = comment + return + self._attach_comment_end(obj[-1], comment) # testing code to be loaded only with pytest or when executed the rule file if "pytest" in sys.modules: - from ansiblelint.rules import RulesCollection # pylint: disable=ungrouped-imports - from ansiblelint.runner import Runner # pylint: disable=ungrouped-imports + from ansiblelint.rules import RulesCollection + from ansiblelint.runner import Runner - def test_partial_become_positive() -> None: - """Positive test for partial-become.""" + def test_partial_become_pass() -> None: + """No errors found for partial-become.""" collection = RulesCollection() collection.register(BecomeUserWithoutBecomeRule()) success = "examples/playbooks/rule-partial-become-without-become-pass.yml" good_runner = Runner(success, rules=collection) assert [] == good_runner.run() - def test_partial_become_negative() -> None: - """Negative test for partial-become.""" + def test_partial_become_fail() -> None: + """Errors found for partial-become.""" collection = RulesCollection() collection.register(BecomeUserWithoutBecomeRule()) failure = "examples/playbooks/rule-partial-become-without-become-fail.yml" |