diff options
Diffstat (limited to 'src/ansiblelint/app.py')
-rw-r--r-- | src/ansiblelint/app.py | 411 |
1 files changed, 411 insertions, 0 deletions
diff --git a/src/ansiblelint/app.py b/src/ansiblelint/app.py new file mode 100644 index 0000000..52581b3 --- /dev/null +++ b/src/ansiblelint/app.py @@ -0,0 +1,411 @@ +"""Application.""" +from __future__ import annotations + +import copy +import itertools +import logging +import os +from functools import lru_cache +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from ansible_compat.runtime import Runtime +from rich.markup import escape +from rich.table import Table + +from ansiblelint import formatters +from ansiblelint._mockings import _perform_mockings +from ansiblelint.color import console, console_stderr, render_yaml +from ansiblelint.config import PROFILES, Options, get_version_warning +from ansiblelint.config import options as default_options +from ansiblelint.constants import RC, RULE_DOC_URL +from ansiblelint.loaders import IGNORE_FILE +from ansiblelint.stats import SummarizedResults, TagStats + +if TYPE_CHECKING: + from ansiblelint._internal.rules import BaseRule + from ansiblelint.errors import MatchError + from ansiblelint.file_utils import Lintable + from ansiblelint.runner import LintResult + + +_logger = logging.getLogger(__package__) + + +class App: + """App class represents an execution of the linter.""" + + def __init__(self, options: Options): + """Construct app run based on already loaded configuration.""" + options.skip_list = _sanitize_list_options(options.skip_list) + options.warn_list = _sanitize_list_options(options.warn_list) + + self.options = options + + formatter_factory = choose_formatter_factory(options) + self.formatter = formatter_factory(options.cwd, options.display_relative_path) + + # Without require_module, our _set_collections_basedir may fail + self.runtime = Runtime(isolated=True, require_module=True) + + def render_matches(self, matches: list[MatchError]) -> None: + """Display given matches (if they are not fixed).""" + matches = [match for match in matches if not match.fixed] + + if isinstance( + self.formatter, + (formatters.CodeclimateJSONFormatter, formatters.SarifFormatter), + ): + # If formatter CodeclimateJSONFormatter or SarifFormatter is chosen, + # then print only the matches in JSON + console.print( + self.formatter.format_result(matches), + markup=False, + highlight=False, + ) + return + + ignored_matches = [match for match in matches if match.ignored] + fatal_matches = [match for match in matches if not match.ignored] + # Displayed ignored matches first + if ignored_matches: + _logger.warning( + "Listing %s violation(s) marked as ignored, likely already known", + len(ignored_matches), + ) + for match in ignored_matches: + if match.ignored: + # highlight must be off or apostrophes may produce unexpected results + console.print(self.formatter.apply(match), highlight=False) + if fatal_matches: + _logger.warning( + "Listing %s violation(s) that are fatal", + len(fatal_matches), + ) + for match in fatal_matches: + if not match.ignored: + console.print(self.formatter.apply(match), highlight=False) + + # If run under GitHub Actions we also want to emit output recognized by it. + if os.getenv("GITHUB_ACTIONS") == "true" and os.getenv("GITHUB_WORKFLOW"): + _logger.info( + "GitHub Actions environment detected, adding annotations output...", + ) + formatter = formatters.AnnotationsFormatter(self.options.cwd, True) + for match in itertools.chain(fatal_matches, ignored_matches): + console_stderr.print( + formatter.apply(match), + markup=False, + highlight=False, + ) + + # If sarif_file is set, we also dump the results to a sarif file. + if self.options.sarif_file: + sarif = formatters.SarifFormatter(self.options.cwd, True) + json = sarif.format_result(matches) + with Path.open( + self.options.sarif_file, + "w", + encoding="utf-8", + ) as sarif_file: + sarif_file.write(json) + + def count_results(self, matches: list[MatchError]) -> SummarizedResults: + """Count failures and warnings in matches.""" + result = SummarizedResults() + + for match in matches: + # any ignores match counts as a warning + if match.ignored: + result.warnings += 1 + continue + # tag can include a sub-rule id: `yaml[document-start]` + # rule.id is the generic rule id: `yaml` + # *rule.tags is the list of the rule's tags (categories): `style` + if match.tag not in result.tag_stats: + result.tag_stats[match.tag] = TagStats( + tag=match.tag, + count=1, + associated_tags=match.rule.tags, + ) + else: + result.tag_stats[match.tag].count += 1 + + if {match.tag, match.rule.id, *match.rule.tags}.isdisjoint( + self.options.warn_list, + ): + # not in warn_list + if match.fixed: + result.fixed_failures += 1 + else: + result.failures += 1 + else: + result.tag_stats[match.tag].warning = True + if match.fixed: + result.fixed_warnings += 1 + else: + result.warnings += 1 + return result + + @staticmethod + def count_lintables(files: set[Lintable]) -> tuple[int, int]: + """Count total and modified files.""" + files_count = len(files) + changed_files_count = len([file for file in files if file.updated]) + return files_count, changed_files_count + + @staticmethod + def _get_matched_skippable_rules( + matches: list[MatchError], + ) -> dict[str, BaseRule]: + """Extract the list of matched rules, if skippable, from the list of matches.""" + matches_unignored = [match for match in matches if not match.ignored] + # match.tag is more specialized than match.rule.id + matched_rules = { + match.tag or match.rule.id: match.rule for match in matches_unignored + } + # remove unskippable rules from the list + for rule_id in list(matched_rules.keys()): + if "unskippable" in matched_rules[rule_id].tags: + matched_rules.pop(rule_id) + return matched_rules + + def report_outcome( + self, + result: LintResult, + *, + mark_as_success: bool = False, + ) -> int: + """Display information about how to skip found rules. + + Returns exit code, 2 if errors were found, 0 when only warnings were found. + """ + msg = "" + + summary = self.count_results(result.matches) + files_count, changed_files_count = self.count_lintables(result.files) + + matched_rules = self._get_matched_skippable_rules(result.matches) + + if matched_rules and self.options.generate_ignore: + # ANSIBLE_LINT_IGNORE_FILE environment variable overrides default + # dumping location in linter and is not documented or supported. We + # use this only for testing purposes. + ignore_file_path = Path( + os.environ.get("ANSIBLE_LINT_IGNORE_FILE", IGNORE_FILE.default), + ) + console_stderr.print(f"Writing ignore file to {ignore_file_path}") + lines = set() + for rule in result.matches: + lines.add(f"{rule.filename} {rule.tag}\n") + with ignore_file_path.open("w", encoding="utf-8") as ignore_file: + ignore_file.write( + "# This file contains ignores rule violations for ansible-lint\n", + ) + ignore_file.writelines(sorted(lines)) + elif matched_rules and not self.options.quiet: + console_stderr.print( + "Read [link=https://ansible-lint.readthedocs.io/configuring/#ignoring-rules-for-entire-files]documentation[/link] for instructions on how to ignore specific rule violations.", + ) + + # Do not deprecate the old tags just yet. Why? Because it is not currently feasible + # to migrate old tags to new tags. There are a lot of things out there that still + # use ansible-lint 4 (for example, Ansible Galaxy and Automation Hub imports). If we + # replace the old tags, those tools will report warnings. If we do not replace them, + # ansible-lint 5 will report warnings. + # + # We can do the deprecation once the ecosystem caught up at least a bit. + # for k, v in used_old_tags.items(): + # _logger.warning( + # "error in the future.", + # k, + # v, + + if self.options.write_list and "yaml" in self.options.skip_list: + _logger.warning( + "You specified '--write', but no files can be modified " + "because 'yaml' is in 'skip_list'.", + ) + + if mark_as_success and summary.failures: + mark_as_success = False + + if not self.options.quiet: + console_stderr.print(render_yaml(msg)) + self.report_summary( + summary, + changed_files_count, + files_count, + is_success=mark_as_success, + ) + if mark_as_success: + if not files_count: + # success without any file being analyzed is reported as failure + # to match match, preventing accidents where linter was running + # not doing anything due to misconfiguration. + _logger.critical( + "Linter finished without analyzing any file, check configuration and arguments given.", + ) + return RC.NO_FILES_MATCHED + return RC.SUCCESS + return RC.VIOLATIONS_FOUND + + def report_summary( # pylint: disable=too-many-locals # noqa: C901 + self, + summary: SummarizedResults, + changed_files_count: int, + files_count: int, + is_success: bool, + ) -> None: + """Report match and file counts.""" + # sort the stats by profiles + idx = 0 + rule_order = {} + + for profile, profile_config in PROFILES.items(): + for rule in profile_config["rules"]: + rule_order[rule] = (idx, profile) + idx += 1 + _logger.debug("Determined rule-profile order: %s", rule_order) + failed_profiles = set() + for tag, tag_stats in summary.tag_stats.items(): + if tag in rule_order: + tag_stats.order, tag_stats.profile = rule_order.get(tag, (idx, "")) + elif "[" in tag: + tag_stats.order, tag_stats.profile = rule_order.get( + tag.split("[")[0], + (idx, ""), + ) + if tag_stats.profile: + failed_profiles.add(tag_stats.profile) + summary.sort() + + if changed_files_count: + console_stderr.print(f"Modified {changed_files_count} files.") + + # determine which profile passed + summary.passed_profile = "" + passed_profile_count = 0 + for profile in PROFILES: + if profile in failed_profiles: + break + if profile != summary.passed_profile: + summary.passed_profile = profile + passed_profile_count += 1 + + stars = "" + if summary.tag_stats: + table = Table( + title="Rule Violation Summary", + collapse_padding=True, + box=None, + show_lines=False, + ) + table.add_column("count", justify="right") + table.add_column("tag") + table.add_column("profile") + table.add_column("rule associated tags") + for tag, stats in summary.tag_stats.items(): + table.add_row( + str(stats.count), + f"[link={RULE_DOC_URL}{ tag.split('[')[0] }]{escape(tag)}[/link]", + stats.profile, + f"{', '.join(stats.associated_tags)}{' (warning)' if stats.warning else ''}", + style="yellow" if stats.warning else "red", + ) + # rate stars for the top 5 profiles (min would not get + rating = 5 - (len(PROFILES.keys()) - passed_profile_count) + if 0 < rating < 6: + stars = f" Rating: {rating}/5 star" + + console_stderr.print(table) + console_stderr.print() + + msg = "[green]Passed[/]" if is_success else "[red][bold]Failed[/][/]" + + msg += f": {summary.failures} failure(s), {summary.warnings} warning(s)" + if summary.fixed: + msg += f", and fixed {summary.fixed} issue(s)" + msg += f" on {files_count} files." + + # Now we add some information about required and passed profile + if self.options.profile: + msg += f" Profile '{self.options.profile}' was required" + if summary.passed_profile: + msg += f", but only '{summary.passed_profile}' profile passed." + else: + msg += "." + elif summary.passed_profile: + msg += f" Last profile that met the validation criteria was '{summary.passed_profile}'." + + if stars: + msg += stars + + # on offline mode and when run under pre-commit we do not want to + # check for updates. + if not self.options.offline and os.environ.get("PRE_COMMIT", "0") != "1": + version_warning = get_version_warning() + if version_warning: + msg += f"\n{version_warning}" + + console_stderr.print(msg) + + +def choose_formatter_factory( + options_list: Options, +) -> type[formatters.BaseFormatter[Any]]: + """Select an output formatter based on the incoming command line arguments.""" + r: type[formatters.BaseFormatter[Any]] = formatters.Formatter + if options_list.format == "quiet": + r = formatters.QuietFormatter + elif options_list.format in ("json", "codeclimate"): + r = formatters.CodeclimateJSONFormatter + elif options_list.format == "sarif": + r = formatters.SarifFormatter + elif options_list.parseable or options_list.format == "pep8": + r = formatters.ParseableFormatter + return r + + +def _sanitize_list_options(tag_list: list[str]) -> list[str]: + """Normalize list options.""" + # expand comma separated entries + tags = set() + for tag in tag_list: + tags.update(str(tag).split(",")) + # remove duplicates, and return as sorted list + return sorted(set(tags)) + + +@lru_cache +def get_app(*, offline: bool | None = None) -> App: + """Return the application instance, caching the return value.""" + if offline is None: + offline = default_options.offline + + if default_options.offline != offline: + options = copy.deepcopy(default_options) + options.offline = offline + else: + options = default_options + + app = App(options=options) + # Make linter use the cache dir from compat + options.cache_dir = app.runtime.cache_dir + + role_name_check = 0 + if "role-name" in app.options.warn_list: + role_name_check = 1 + elif "role-name" in app.options.skip_list: + role_name_check = 2 + + # mocking must happen before prepare_environment or galaxy install might + # fail. + _perform_mockings(options=app.options) + app.runtime.prepare_environment( + install_local=(not offline), + offline=offline, + role_name_check=role_name_check, + ) + + return app |