diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:06:49 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:06:49 +0000 |
commit | 2fe34b6444502079dc0b84365ce82dbc92de308e (patch) | |
tree | 8fedcab52bbbc3db6c5aa909a88a7a7b81685018 /src/ansiblelint/skip_utils.py | |
parent | Initial commit. (diff) | |
download | ansible-lint-1f847810e1dcffeab102ff853e50a09833fad025.tar.xz ansible-lint-1f847810e1dcffeab102ff853e50a09833fad025.zip |
Adding upstream version 6.17.2.upstream/6.17.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/ansiblelint/skip_utils.py')
-rw-r--r-- | src/ansiblelint/skip_utils.py | 316 |
1 files changed, 316 insertions, 0 deletions
diff --git a/src/ansiblelint/skip_utils.py b/src/ansiblelint/skip_utils.py new file mode 100644 index 0000000..f2f6177 --- /dev/null +++ b/src/ansiblelint/skip_utils.py @@ -0,0 +1,316 @@ +# (c) 2019-2020, Ansible by Red Hat +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""Utils related to inline skipping of rules.""" +from __future__ import annotations + +import collections.abc +import logging +import re +import warnings +from functools import cache +from itertools import product +from typing import TYPE_CHECKING, Any + +# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled +from ruamel.yaml import YAML +from ruamel.yaml.composer import ComposerError +from ruamel.yaml.scanner import ScannerError +from ruamel.yaml.tokens import CommentToken + +from ansiblelint.config import used_old_tags +from ansiblelint.constants import ( + NESTED_TASK_KEYS, + PLAYBOOK_TASK_KEYWORDS, + RENAMED_TAGS, + SKIPPED_RULES_KEY, +) +from ansiblelint.errors import LintWarning, WarnSource + +if TYPE_CHECKING: + from collections.abc import Generator, Sequence + + from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject + + from ansiblelint.file_utils import Lintable + + +_logger = logging.getLogger(__name__) +_found_deprecated_tags: set[str] = set() +_noqa_comment_re = re.compile(r"^# noqa(\s|:)") + +# playbook: Sequence currently expects only instances of one of the two +# classes below but we should consider avoiding this chimera. +# ruamel.yaml.comments.CommentedSeq +# ansible.parsing.yaml.objects.AnsibleSequence + + +def get_rule_skips_from_line( + line: str, + lintable: Lintable, + lineno: int = 1, +) -> list[str]: + """Return list of rule ids skipped via comment on the line of yaml.""" + _before_noqa, _noqa_marker, noqa_text = line.partition("# noqa") + + result = [] + for v in noqa_text.lstrip(" :").split(): + if v in RENAMED_TAGS: + tag = RENAMED_TAGS[v] + if v not in _found_deprecated_tags: + msg = f"Replaced outdated tag '{v}' with '{tag}', replace it to avoid future errors" + warnings.warn( + message=msg, + category=LintWarning, + source=WarnSource( + filename=lintable, + lineno=lineno, + tag="warning[outdated-tag]", + message=msg, + ), + stacklevel=0, + ) + _found_deprecated_tags.add(v) + v = tag + result.append(v) + return result + + +def append_skipped_rules( + pyyaml_data: AnsibleBaseYAMLObject, + lintable: Lintable, +) -> AnsibleBaseYAMLObject: + """Append 'skipped_rules' to individual tasks or single metadata block. + + For a file, uses 2nd parser (ruamel.yaml) to pull comments out of + yaml subsets, check for '# noqa' skipped rules, and append any skips to the + original parser (pyyaml) data relied on by remainder of ansible-lint. + + :param pyyaml_data: file text parsed via ansible and pyyaml. + :param file_text: raw file text. + :param file_type: type of file: tasks, handlers or meta. + :returns: original pyyaml_data altered with a 'skipped_rules' list added \ + to individual tasks, or added to the single metadata block. + """ + try: + yaml_skip = _append_skipped_rules(pyyaml_data, lintable) + except RuntimeError: + # Notify user of skip error, do not stop, do not change exit code + _logger.exception("Error trying to append skipped rules") + return pyyaml_data + + if not yaml_skip: + return pyyaml_data + + return yaml_skip + + +@cache +def load_data(file_text: str) -> Any: + """Parse ``file_text`` as yaml and return parsed structure. + + This is the main culprit for slow performance, each rule asks for loading yaml again and again + ideally the ``maxsize`` on the decorator above MUST be great or equal total number of rules + :param file_text: raw text to parse + :return: Parsed yaml + """ + yaml = YAML() + # Ruamel role is not to validate the yaml file, so we ignore duplicate keys: + yaml.allow_duplicate_keys = True + try: + return yaml.load(file_text) + except ComposerError: + # load fails on multi-documents with ComposerError exception + return yaml.load_all(file_text) + + +def _append_skipped_rules( + pyyaml_data: AnsibleBaseYAMLObject, + lintable: Lintable, +) -> AnsibleBaseYAMLObject | None: + # parse file text using 2nd parser library + try: + ruamel_data = load_data(lintable.content) + except ScannerError as exc: + _logger.debug( + "Ignored loading skipped rules from file %s due to: %s", + lintable, + exc, + ) + # For unparsable file types, we return empty skip lists + return None + skipped_rules = _get_rule_skips_from_yaml(ruamel_data, lintable) + + if lintable.kind in [ + "yaml", + "requirements", + "vars", + "meta", + "reno", + "test-meta", + "galaxy", + ]: + # AnsibleMapping, dict + if hasattr(pyyaml_data, "get"): + pyyaml_data[SKIPPED_RULES_KEY] = skipped_rules + # AnsibleSequence, list + elif ( + not isinstance(pyyaml_data, str) + and isinstance(pyyaml_data, collections.abc.Sequence) + and skipped_rules + ): + pyyaml_data[0][SKIPPED_RULES_KEY] = skipped_rules + + return pyyaml_data + + # create list of blocks of tasks or nested tasks + if lintable.kind in ("tasks", "handlers"): + ruamel_task_blocks = ruamel_data + pyyaml_task_blocks = pyyaml_data + elif lintable.kind == "playbook": + try: + pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data) + ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data) + except (AttributeError, TypeError): + return pyyaml_data + else: + # For unsupported file types, we return empty skip lists + return None + + # get tasks from blocks of tasks + pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks) + ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks) + + # append skipped_rules for each task + for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks): + # ignore empty tasks + if not pyyaml_task and not ruamel_task: + continue + + # AnsibleUnicode or str + if isinstance(pyyaml_task, str): + continue + + if pyyaml_task.get("name") != ruamel_task.get("name"): + msg = "Error in matching skip comment to a task" + raise RuntimeError(msg) + pyyaml_task[SKIPPED_RULES_KEY] = _get_rule_skips_from_yaml( + ruamel_task, + lintable, + ) + + return pyyaml_data + + +def _get_task_blocks_from_playbook(playbook: Sequence[Any]) -> list[Any]: + """Return parts of playbook that contains tasks, and nested tasks. + + :param playbook: playbook yaml from yaml parser. + :returns: list of task dictionaries. + """ + task_blocks = [] + for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS): + task_blocks.extend(play.get(key, [])) + return task_blocks + + +def _get_tasks_from_blocks(task_blocks: Sequence[Any]) -> Generator[Any, None, None]: + """Get list of tasks from list made of tasks and nested tasks.""" + if not task_blocks: + return + + def get_nested_tasks(task: Any) -> Generator[Any, None, None]: + if not task or not is_nested_task(task): + return + for k in NESTED_TASK_KEYS: + if k in task and task[k]: + if hasattr(task[k], "get"): + continue + for subtask in task[k]: + yield from get_nested_tasks(subtask) + yield subtask + + for task in task_blocks: + yield from get_nested_tasks(task) + yield task + + +def _get_rule_skips_from_yaml( + yaml_input: Sequence[Any], + lintable: Lintable, +) -> Sequence[Any]: + """Traverse yaml for comments with rule skips and return list of rules.""" + yaml_comment_obj_strings = [] + + if isinstance(yaml_input, str): + return [] + + def traverse_yaml(obj: Any) -> None: + for entry in obj.ca.items.values(): + for v in entry: + if isinstance(v, CommentToken): + comment_str = v.value + if _noqa_comment_re.match(comment_str): + line = v.start_mark.line + 1 # ruamel line numbers start at 0 + lintable.line_skips[line].update( + get_rule_skips_from_line( + comment_str.strip(), + lintable=lintable, + lineno=line, + ), + ) + yaml_comment_obj_strings.append(str(obj.ca.items)) + if isinstance(obj, dict): + for val in obj.values(): + if isinstance(val, (dict, list)): + traverse_yaml(val) + elif isinstance(obj, list): + for element in obj: + if isinstance(element, (dict, list)): + traverse_yaml(element) + else: + return + + if isinstance(yaml_input, (dict, list)): + traverse_yaml(yaml_input) + + rule_id_list = [] + for comment_obj_str in yaml_comment_obj_strings: + for line in comment_obj_str.split(r"\n"): + rule_id_list.extend(get_rule_skips_from_line(line, lintable=lintable)) + + return [normalize_tag(tag) for tag in rule_id_list] + + +def normalize_tag(tag: str) -> str: + """Return current name of tag.""" + if tag in RENAMED_TAGS: + used_old_tags[tag] = RENAMED_TAGS[tag] + return RENAMED_TAGS[tag] + return tag + + +def is_nested_task(task: dict[str, Any]) -> bool: + """Check if task includes block/always/rescue.""" + # Cannot really trust the input + if isinstance(task, str): + return False + + return any(task.get(key) for key in NESTED_TASK_KEYS) |