summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/transformer.py
blob: 097c7ca4c3ed4f0cddada0a5dfdd11d258e0f8a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Transformer implementation."""
from __future__ import annotations

import logging
from argparse import Namespace
from typing import Union, cast

from ruamel.yaml.comments import CommentedMap, CommentedSeq

from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.rules import AnsibleLintRule, TransformMixin
from ansiblelint.runner import LintResult
from ansiblelint.yaml_utils import FormattedYAML, get_path_to_play, get_path_to_task

__all__ = ["Transformer"]

_logger = logging.getLogger(__name__)


# pylint: disable=too-few-public-methods
class Transformer:
    """Transformer class marshals transformations.

    The Transformer is similar to the ``ansiblelint.runner.Runner`` which manages
    running each of the rules. We only expect there to be one ``Transformer`` instance
    which should be instantiated from the main entrypoint function.

    In the future, the transformer will be responsible for running transforms for each
    of the rule matches. For now, it just reads/writes YAML files which is a
    pre-requisite for the planned rule-specific transforms.
    """

    def __init__(self, result: LintResult, options: Namespace):
        """Initialize a Transformer instance."""
        self.write_set = self.effective_write_set(options.write_list)

        self.matches: list[MatchError] = result.matches
        self.files: set[Lintable] = result.files

        lintables: dict[str, Lintable] = {file.filename: file for file in result.files}
        self.matches_per_file: dict[Lintable, list[MatchError]] = {
            file: [] for file in result.files
        }

        for match in self.matches:
            try:
                lintable = lintables[match.filename]
            except KeyError:
                # we shouldn't get here, but this is easy to recover from so do that.
                lintable = Lintable(match.filename)
                self.matches_per_file[lintable] = []
            self.matches_per_file[lintable].append(match)

    @staticmethod
    def effective_write_set(write_list: list[str]) -> set[str]:
        """Simplify write_list based on ``"none"`` and ``"all"`` keywords.

        ``"none"`` resets the enabled rule transforms.
        This returns ``{"none"}`` or a set of everything after the last ``"none"``.

        If ``"all"`` is in the ``write_list`` (after ``"none"`` if present),
        then this will return ``{"all"}``.
        """
        none_indexes = [i for i, value in enumerate(write_list) if value == "none"]
        if none_indexes:
            index = none_indexes[-1]
            if len(write_list) > index + 1:
                index += 1
            write_list = write_list[index:]
        if "all" in write_list:
            return {"all"}
        return set(write_list)

    def run(self) -> None:
        """For each file, read it, execute transforms on it, then write it."""
        for file, matches in self.matches_per_file.items():
            # str() convinces mypy that "text/yaml" is a valid Literal.
            # Otherwise, it thinks base_kind is one of playbook, meta, tasks, ...
            file_is_yaml = str(file.base_kind) == "text/yaml"

            try:
                data: str = file.content
            except (UnicodeDecodeError, IsADirectoryError):
                # we hit a binary file (eg a jar or tar.gz) or a directory
                data = ""
                file_is_yaml = False

            ruamel_data: CommentedMap | CommentedSeq | None = None
            if file_is_yaml:
                # We need a fresh YAML() instance for each load because ruamel.yaml
                # stores intermediate state during load which could affect loading
                # any other files. (Based on suggestion from ruamel.yaml author)
                yaml = FormattedYAML()

                ruamel_data = yaml.loads(data)
                if not isinstance(ruamel_data, (CommentedMap, CommentedSeq)):
                    # This is an empty vars file or similar which loads as None.
                    # It is not safe to write this file or data-loss is likely.
                    # Only maps and sequences can preserve comments. Skip it.
                    continue

            if self.write_set != {"none"}:
                self._do_transforms(file, ruamel_data or data, file_is_yaml, matches)

            if file_is_yaml:
                # noinspection PyUnboundLocalVariable
                file.content = yaml.dumps(ruamel_data)

            if file.updated:
                file.write()

    def _do_transforms(
        self,
        file: Lintable,
        data: CommentedMap | CommentedSeq | str,
        file_is_yaml: bool,
        matches: list[MatchError],
    ) -> None:
        """Do Rule-Transforms handling any last-minute MatchError inspections."""
        for match in sorted(matches):
            if not isinstance(match.rule, TransformMixin):
                continue
            if self.write_set != {"all"}:
                rule = cast(AnsibleLintRule, match.rule)
                rule_definition = set(rule.tags)
                rule_definition.add(rule.id)
                if rule_definition.isdisjoint(self.write_set):
                    # rule transform not requested. Skip it.
                    continue
            if file_is_yaml and not match.yaml_path:
                data = cast(Union[CommentedMap, CommentedSeq], data)
                if match.match_type == "play":
                    match.yaml_path = get_path_to_play(file, match.linenumber, data)
                elif match.task or file.kind in (
                    "tasks",
                    "handlers",
                    "playbook",
                ):
                    match.yaml_path = get_path_to_task(file, match.linenumber, data)
            match.rule.transform(match, file, data)