summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/rules/complexity.py
blob: 04d92d081eaf709823f6981046e34ac9a86049dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Implementation of limiting number of tasks."""

from __future__ import annotations

import re
import sys
from typing import TYPE_CHECKING, Any

from ansiblelint.constants import LINE_NUMBER_KEY
from ansiblelint.rules import AnsibleLintRule, RulesCollection

if TYPE_CHECKING:
    from ansiblelint.config import Options
    from ansiblelint.errors import MatchError
    from ansiblelint.file_utils import Lintable
    from ansiblelint.utils import Task


class ComplexityRule(AnsibleLintRule):
    """Rule for limiting number of tasks inside a file."""

    id = "complexity"
    description = "There should be limited tasks executed inside any file"
    severity = "MEDIUM"
    tags = ["experimental", "idiom"]
    version_added = "v6.18.0 (last update)"
    _re_templated_inside = re.compile(r".*\{\{.*\}\}.*\w.*$")

    def matchplay(self, file: Lintable, data: dict[str, Any]) -> list[MatchError]:
        """Call matchplay for up to no_of_max_tasks inside file and return aggregate results."""
        results: list[MatchError] = []

        if file.kind != "playbook":
            return []
        tasks = data.get("tasks", [])
        if not isinstance(self._collection, RulesCollection):
            msg = "Rules cannot be run outside a rule collection."
            raise TypeError(msg)
        if len(tasks) > self._collection.options.max_tasks:
            results.append(
                self.create_matcherror(
                    message=f"Maximum tasks allowed in a play is {self._collection.options.max_tasks}.",
                    lineno=data[LINE_NUMBER_KEY],
                    tag=f"{self.id}[play]",
                    filename=file,
                ),
            )
        return results

    def matchtask(self, task: Task, file: Lintable | None = None) -> list[MatchError]:
        """Check if the task is a block and count the number of items inside it."""
        results: list[MatchError] = []

        if not isinstance(self._collection, RulesCollection):
            msg = "Rules cannot be run outside a rule collection."
            raise TypeError(msg)

        if task.action == "block/always/rescue":
            block_depth = self.calculate_block_depth(task)
            if block_depth > self._collection.options.max_block_depth:
                results.append(
                    self.create_matcherror(
                        message=f"Replace nested block with an include_tasks to make code easier to maintain. Maximum block depth allowed is {self._collection.options.max_block_depth}.",
                        lineno=task[LINE_NUMBER_KEY],
                        tag=f"{self.id}[nesting]",
                        filename=file,
                    ),
                )
        return results

    def calculate_block_depth(self, task: Task) -> int:
        """Recursively calculate the block depth of a task."""
        if not isinstance(task.position, str):
            raise NotImplementedError
        return task.position.count(".block")


if "pytest" in sys.modules:
    import pytest

    # pylint: disable=ungrouped-imports
    from ansiblelint.runner import Runner

    @pytest.mark.parametrize(
        ("file", "expected_results"),
        (
            pytest.param(
                "examples/playbooks/rule-complexity-pass.yml",
                [],
                id="pass",
            ),
            pytest.param(
                "examples/playbooks/rule-complexity-fail.yml",
                ["complexity[play]", "complexity[nesting]"],
                id="fail",
            ),
        ),
    )
    def test_complexity(
        file: str,
        expected_results: list[str],
        monkeypatch: pytest.MonkeyPatch,
        config_options: Options,
    ) -> None:
        """Test rule."""
        monkeypatch.setattr(config_options, "max_tasks", 5)
        monkeypatch.setattr(config_options, "max_block_depth", 3)
        collection = RulesCollection(options=config_options)
        collection.register(ComplexityRule())
        results = Runner(file, rules=collection).run()

        assert len(results) == len(expected_results)
        for i, result in enumerate(results):
            assert result.rule.id == ComplexityRule.id, result
            assert result.tag == expected_results[i]