summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/app.py
blob: 52581b3e65e6a0614a393072d8e7d1c6bf7d8f49 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
"""Application."""
from __future__ import annotations

import copy
import itertools
import logging
import os
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any

from ansible_compat.runtime import Runtime
from rich.markup import escape
from rich.table import Table

from ansiblelint import formatters
from ansiblelint._mockings import _perform_mockings
from ansiblelint.color import console, console_stderr, render_yaml
from ansiblelint.config import PROFILES, Options, get_version_warning
from ansiblelint.config import options as default_options
from ansiblelint.constants import RC, RULE_DOC_URL
from ansiblelint.loaders import IGNORE_FILE
from ansiblelint.stats import SummarizedResults, TagStats

if TYPE_CHECKING:
    from ansiblelint._internal.rules import BaseRule
    from ansiblelint.errors import MatchError
    from ansiblelint.file_utils import Lintable
    from ansiblelint.runner import LintResult


_logger = logging.getLogger(__package__)


class App:
    """App class represents an execution of the linter."""

    def __init__(self, options: Options):
        """Construct app run based on already loaded configuration."""
        options.skip_list = _sanitize_list_options(options.skip_list)
        options.warn_list = _sanitize_list_options(options.warn_list)

        self.options = options

        formatter_factory = choose_formatter_factory(options)
        self.formatter = formatter_factory(options.cwd, options.display_relative_path)

        # Without require_module, our _set_collections_basedir may fail
        self.runtime = Runtime(isolated=True, require_module=True)

    def render_matches(self, matches: list[MatchError]) -> None:
        """Display given matches (if they are not fixed)."""
        matches = [match for match in matches if not match.fixed]

        if isinstance(
            self.formatter,
            (formatters.CodeclimateJSONFormatter, formatters.SarifFormatter),
        ):
            # If formatter CodeclimateJSONFormatter or SarifFormatter is chosen,
            # then print only the matches in JSON
            console.print(
                self.formatter.format_result(matches),
                markup=False,
                highlight=False,
            )
            return

        ignored_matches = [match for match in matches if match.ignored]
        fatal_matches = [match for match in matches if not match.ignored]
        # Displayed ignored matches first
        if ignored_matches:
            _logger.warning(
                "Listing %s violation(s) marked as ignored, likely already known",
                len(ignored_matches),
            )
            for match in ignored_matches:
                if match.ignored:
                    # highlight must be off or apostrophes may produce unexpected results
                    console.print(self.formatter.apply(match), highlight=False)
        if fatal_matches:
            _logger.warning(
                "Listing %s violation(s) that are fatal",
                len(fatal_matches),
            )
            for match in fatal_matches:
                if not match.ignored:
                    console.print(self.formatter.apply(match), highlight=False)

        # If run under GitHub Actions we also want to emit output recognized by it.
        if os.getenv("GITHUB_ACTIONS") == "true" and os.getenv("GITHUB_WORKFLOW"):
            _logger.info(
                "GitHub Actions environment detected, adding annotations output...",
            )
            formatter = formatters.AnnotationsFormatter(self.options.cwd, True)
            for match in itertools.chain(fatal_matches, ignored_matches):
                console_stderr.print(
                    formatter.apply(match),
                    markup=False,
                    highlight=False,
                )

        # If sarif_file is set, we also dump the results to a sarif file.
        if self.options.sarif_file:
            sarif = formatters.SarifFormatter(self.options.cwd, True)
            json = sarif.format_result(matches)
            with Path.open(
                self.options.sarif_file,
                "w",
                encoding="utf-8",
            ) as sarif_file:
                sarif_file.write(json)

    def count_results(self, matches: list[MatchError]) -> SummarizedResults:
        """Count failures and warnings in matches."""
        result = SummarizedResults()

        for match in matches:
            # any ignores match counts as a warning
            if match.ignored:
                result.warnings += 1
                continue
            # tag can include a sub-rule id: `yaml[document-start]`
            # rule.id is the generic rule id: `yaml`
            # *rule.tags is the list of the rule's tags (categories): `style`
            if match.tag not in result.tag_stats:
                result.tag_stats[match.tag] = TagStats(
                    tag=match.tag,
                    count=1,
                    associated_tags=match.rule.tags,
                )
            else:
                result.tag_stats[match.tag].count += 1

            if {match.tag, match.rule.id, *match.rule.tags}.isdisjoint(
                self.options.warn_list,
            ):
                # not in warn_list
                if match.fixed:
                    result.fixed_failures += 1
                else:
                    result.failures += 1
            else:
                result.tag_stats[match.tag].warning = True
                if match.fixed:
                    result.fixed_warnings += 1
                else:
                    result.warnings += 1
        return result

    @staticmethod
    def count_lintables(files: set[Lintable]) -> tuple[int, int]:
        """Count total and modified files."""
        files_count = len(files)
        changed_files_count = len([file for file in files if file.updated])
        return files_count, changed_files_count

    @staticmethod
    def _get_matched_skippable_rules(
        matches: list[MatchError],
    ) -> dict[str, BaseRule]:
        """Extract the list of matched rules, if skippable, from the list of matches."""
        matches_unignored = [match for match in matches if not match.ignored]
        # match.tag is more specialized than match.rule.id
        matched_rules = {
            match.tag or match.rule.id: match.rule for match in matches_unignored
        }
        # remove unskippable rules from the list
        for rule_id in list(matched_rules.keys()):
            if "unskippable" in matched_rules[rule_id].tags:
                matched_rules.pop(rule_id)
        return matched_rules

    def report_outcome(
        self,
        result: LintResult,
        *,
        mark_as_success: bool = False,
    ) -> int:
        """Display information about how to skip found rules.

        Returns exit code, 2 if errors were found, 0 when only warnings were found.
        """
        msg = ""

        summary = self.count_results(result.matches)
        files_count, changed_files_count = self.count_lintables(result.files)

        matched_rules = self._get_matched_skippable_rules(result.matches)

        if matched_rules and self.options.generate_ignore:
            # ANSIBLE_LINT_IGNORE_FILE environment variable overrides default
            # dumping location in linter and is not documented or supported. We
            # use this only for testing purposes.
            ignore_file_path = Path(
                os.environ.get("ANSIBLE_LINT_IGNORE_FILE", IGNORE_FILE.default),
            )
            console_stderr.print(f"Writing ignore file to {ignore_file_path}")
            lines = set()
            for rule in result.matches:
                lines.add(f"{rule.filename} {rule.tag}\n")
            with ignore_file_path.open("w", encoding="utf-8") as ignore_file:
                ignore_file.write(
                    "# This file contains ignores rule violations for ansible-lint\n",
                )
                ignore_file.writelines(sorted(lines))
        elif matched_rules and not self.options.quiet:
            console_stderr.print(
                "Read [link=https://ansible-lint.readthedocs.io/configuring/#ignoring-rules-for-entire-files]documentation[/link] for instructions on how to ignore specific rule violations.",
            )

        # Do not deprecate the old tags just yet. Why? Because it is not currently feasible
        # to migrate old tags to new tags. There are a lot of things out there that still
        # use ansible-lint 4 (for example, Ansible Galaxy and Automation Hub imports). If we
        # replace the old tags, those tools will report warnings. If we do not replace them,
        # ansible-lint 5 will report warnings.
        #
        # We can do the deprecation once the ecosystem caught up at least a bit.
        # for k, v in used_old_tags.items():
        #     _logger.warning(
        #         "error in the future.",
        #         k,
        #         v,

        if self.options.write_list and "yaml" in self.options.skip_list:
            _logger.warning(
                "You specified '--write', but no files can be modified "
                "because 'yaml' is in 'skip_list'.",
            )

        if mark_as_success and summary.failures:
            mark_as_success = False

        if not self.options.quiet:
            console_stderr.print(render_yaml(msg))
            self.report_summary(
                summary,
                changed_files_count,
                files_count,
                is_success=mark_as_success,
            )
        if mark_as_success:
            if not files_count:
                # success without any file being analyzed is reported as failure
                # to match match, preventing accidents where linter was running
                # not doing anything due to misconfiguration.
                _logger.critical(
                    "Linter finished without analyzing any file, check configuration and arguments given.",
                )
                return RC.NO_FILES_MATCHED
            return RC.SUCCESS
        return RC.VIOLATIONS_FOUND

    def report_summary(  # pylint: disable=too-many-locals # noqa: C901
        self,
        summary: SummarizedResults,
        changed_files_count: int,
        files_count: int,
        is_success: bool,
    ) -> None:
        """Report match and file counts."""
        # sort the stats by profiles
        idx = 0
        rule_order = {}

        for profile, profile_config in PROFILES.items():
            for rule in profile_config["rules"]:
                rule_order[rule] = (idx, profile)
                idx += 1
        _logger.debug("Determined rule-profile order: %s", rule_order)
        failed_profiles = set()
        for tag, tag_stats in summary.tag_stats.items():
            if tag in rule_order:
                tag_stats.order, tag_stats.profile = rule_order.get(tag, (idx, ""))
            elif "[" in tag:
                tag_stats.order, tag_stats.profile = rule_order.get(
                    tag.split("[")[0],
                    (idx, ""),
                )
            if tag_stats.profile:
                failed_profiles.add(tag_stats.profile)
        summary.sort()

        if changed_files_count:
            console_stderr.print(f"Modified {changed_files_count} files.")

        # determine which profile passed
        summary.passed_profile = ""
        passed_profile_count = 0
        for profile in PROFILES:
            if profile in failed_profiles:
                break
            if profile != summary.passed_profile:
                summary.passed_profile = profile
                passed_profile_count += 1

        stars = ""
        if summary.tag_stats:
            table = Table(
                title="Rule Violation Summary",
                collapse_padding=True,
                box=None,
                show_lines=False,
            )
            table.add_column("count", justify="right")
            table.add_column("tag")
            table.add_column("profile")
            table.add_column("rule associated tags")
            for tag, stats in summary.tag_stats.items():
                table.add_row(
                    str(stats.count),
                    f"[link={RULE_DOC_URL}{ tag.split('[')[0] }]{escape(tag)}[/link]",
                    stats.profile,
                    f"{', '.join(stats.associated_tags)}{' (warning)' if stats.warning else ''}",
                    style="yellow" if stats.warning else "red",
                )
            # rate stars for the top 5 profiles (min would not get
            rating = 5 - (len(PROFILES.keys()) - passed_profile_count)
            if 0 < rating < 6:
                stars = f" Rating: {rating}/5 star"

            console_stderr.print(table)
            console_stderr.print()

        msg = "[green]Passed[/]" if is_success else "[red][bold]Failed[/][/]"

        msg += f": {summary.failures} failure(s), {summary.warnings} warning(s)"
        if summary.fixed:
            msg += f", and fixed {summary.fixed} issue(s)"
        msg += f" on {files_count} files."

        # Now we add some information about required and passed profile
        if self.options.profile:
            msg += f" Profile '{self.options.profile}' was required"
            if summary.passed_profile:
                msg += f", but only '{summary.passed_profile}' profile passed."
            else:
                msg += "."
        elif summary.passed_profile:
            msg += f" Last profile that met the validation criteria was '{summary.passed_profile}'."

        if stars:
            msg += stars

        # on offline mode and when run under pre-commit we do not want to
        # check for updates.
        if not self.options.offline and os.environ.get("PRE_COMMIT", "0") != "1":
            version_warning = get_version_warning()
            if version_warning:
                msg += f"\n{version_warning}"

        console_stderr.print(msg)


def choose_formatter_factory(
    options_list: Options,
) -> type[formatters.BaseFormatter[Any]]:
    """Select an output formatter based on the incoming command line arguments."""
    r: type[formatters.BaseFormatter[Any]] = formatters.Formatter
    if options_list.format == "quiet":
        r = formatters.QuietFormatter
    elif options_list.format in ("json", "codeclimate"):
        r = formatters.CodeclimateJSONFormatter
    elif options_list.format == "sarif":
        r = formatters.SarifFormatter
    elif options_list.parseable or options_list.format == "pep8":
        r = formatters.ParseableFormatter
    return r


def _sanitize_list_options(tag_list: list[str]) -> list[str]:
    """Normalize list options."""
    # expand comma separated entries
    tags = set()
    for tag in tag_list:
        tags.update(str(tag).split(","))
    # remove duplicates, and return as sorted list
    return sorted(set(tags))


@lru_cache
def get_app(*, offline: bool | None = None) -> App:
    """Return the application instance, caching the return value."""
    if offline is None:
        offline = default_options.offline

    if default_options.offline != offline:
        options = copy.deepcopy(default_options)
        options.offline = offline
    else:
        options = default_options

    app = App(options=options)
    # Make linter use the cache dir from compat
    options.cache_dir = app.runtime.cache_dir

    role_name_check = 0
    if "role-name" in app.options.warn_list:
        role_name_check = 1
    elif "role-name" in app.options.skip_list:
        role_name_check = 2

    # mocking must happen before prepare_environment or galaxy install might
    # fail.
    _perform_mockings(options=app.options)
    app.runtime.prepare_environment(
        install_local=(not offline),
        offline=offline,
        role_name_check=role_name_check,
    )

    return app