summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/app.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/app.py')
-rw-r--r--src/ansiblelint/app.py411
1 files changed, 411 insertions, 0 deletions
diff --git a/src/ansiblelint/app.py b/src/ansiblelint/app.py
new file mode 100644
index 0000000..52581b3
--- /dev/null
+++ b/src/ansiblelint/app.py
@@ -0,0 +1,411 @@
+"""Application."""
+from __future__ import annotations
+
+import copy
+import itertools
+import logging
+import os
+from functools import lru_cache
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from ansible_compat.runtime import Runtime
+from rich.markup import escape
+from rich.table import Table
+
+from ansiblelint import formatters
+from ansiblelint._mockings import _perform_mockings
+from ansiblelint.color import console, console_stderr, render_yaml
+from ansiblelint.config import PROFILES, Options, get_version_warning
+from ansiblelint.config import options as default_options
+from ansiblelint.constants import RC, RULE_DOC_URL
+from ansiblelint.loaders import IGNORE_FILE
+from ansiblelint.stats import SummarizedResults, TagStats
+
+if TYPE_CHECKING:
+ from ansiblelint._internal.rules import BaseRule
+ from ansiblelint.errors import MatchError
+ from ansiblelint.file_utils import Lintable
+ from ansiblelint.runner import LintResult
+
+
+_logger = logging.getLogger(__package__)
+
+
+class App:
+ """App class represents an execution of the linter."""
+
+ def __init__(self, options: Options):
+ """Construct app run based on already loaded configuration."""
+ options.skip_list = _sanitize_list_options(options.skip_list)
+ options.warn_list = _sanitize_list_options(options.warn_list)
+
+ self.options = options
+
+ formatter_factory = choose_formatter_factory(options)
+ self.formatter = formatter_factory(options.cwd, options.display_relative_path)
+
+ # Without require_module, our _set_collections_basedir may fail
+ self.runtime = Runtime(isolated=True, require_module=True)
+
+ def render_matches(self, matches: list[MatchError]) -> None:
+ """Display given matches (if they are not fixed)."""
+ matches = [match for match in matches if not match.fixed]
+
+ if isinstance(
+ self.formatter,
+ (formatters.CodeclimateJSONFormatter, formatters.SarifFormatter),
+ ):
+ # If formatter CodeclimateJSONFormatter or SarifFormatter is chosen,
+ # then print only the matches in JSON
+ console.print(
+ self.formatter.format_result(matches),
+ markup=False,
+ highlight=False,
+ )
+ return
+
+ ignored_matches = [match for match in matches if match.ignored]
+ fatal_matches = [match for match in matches if not match.ignored]
+ # Displayed ignored matches first
+ if ignored_matches:
+ _logger.warning(
+ "Listing %s violation(s) marked as ignored, likely already known",
+ len(ignored_matches),
+ )
+ for match in ignored_matches:
+ if match.ignored:
+ # highlight must be off or apostrophes may produce unexpected results
+ console.print(self.formatter.apply(match), highlight=False)
+ if fatal_matches:
+ _logger.warning(
+ "Listing %s violation(s) that are fatal",
+ len(fatal_matches),
+ )
+ for match in fatal_matches:
+ if not match.ignored:
+ console.print(self.formatter.apply(match), highlight=False)
+
+ # If run under GitHub Actions we also want to emit output recognized by it.
+ if os.getenv("GITHUB_ACTIONS") == "true" and os.getenv("GITHUB_WORKFLOW"):
+ _logger.info(
+ "GitHub Actions environment detected, adding annotations output...",
+ )
+ formatter = formatters.AnnotationsFormatter(self.options.cwd, True)
+ for match in itertools.chain(fatal_matches, ignored_matches):
+ console_stderr.print(
+ formatter.apply(match),
+ markup=False,
+ highlight=False,
+ )
+
+ # If sarif_file is set, we also dump the results to a sarif file.
+ if self.options.sarif_file:
+ sarif = formatters.SarifFormatter(self.options.cwd, True)
+ json = sarif.format_result(matches)
+ with Path.open(
+ self.options.sarif_file,
+ "w",
+ encoding="utf-8",
+ ) as sarif_file:
+ sarif_file.write(json)
+
+ def count_results(self, matches: list[MatchError]) -> SummarizedResults:
+ """Count failures and warnings in matches."""
+ result = SummarizedResults()
+
+ for match in matches:
+ # any ignores match counts as a warning
+ if match.ignored:
+ result.warnings += 1
+ continue
+ # tag can include a sub-rule id: `yaml[document-start]`
+ # rule.id is the generic rule id: `yaml`
+ # *rule.tags is the list of the rule's tags (categories): `style`
+ if match.tag not in result.tag_stats:
+ result.tag_stats[match.tag] = TagStats(
+ tag=match.tag,
+ count=1,
+ associated_tags=match.rule.tags,
+ )
+ else:
+ result.tag_stats[match.tag].count += 1
+
+ if {match.tag, match.rule.id, *match.rule.tags}.isdisjoint(
+ self.options.warn_list,
+ ):
+ # not in warn_list
+ if match.fixed:
+ result.fixed_failures += 1
+ else:
+ result.failures += 1
+ else:
+ result.tag_stats[match.tag].warning = True
+ if match.fixed:
+ result.fixed_warnings += 1
+ else:
+ result.warnings += 1
+ return result
+
+ @staticmethod
+ def count_lintables(files: set[Lintable]) -> tuple[int, int]:
+ """Count total and modified files."""
+ files_count = len(files)
+ changed_files_count = len([file for file in files if file.updated])
+ return files_count, changed_files_count
+
+ @staticmethod
+ def _get_matched_skippable_rules(
+ matches: list[MatchError],
+ ) -> dict[str, BaseRule]:
+ """Extract the list of matched rules, if skippable, from the list of matches."""
+ matches_unignored = [match for match in matches if not match.ignored]
+ # match.tag is more specialized than match.rule.id
+ matched_rules = {
+ match.tag or match.rule.id: match.rule for match in matches_unignored
+ }
+ # remove unskippable rules from the list
+ for rule_id in list(matched_rules.keys()):
+ if "unskippable" in matched_rules[rule_id].tags:
+ matched_rules.pop(rule_id)
+ return matched_rules
+
+ def report_outcome(
+ self,
+ result: LintResult,
+ *,
+ mark_as_success: bool = False,
+ ) -> int:
+ """Display information about how to skip found rules.
+
+ Returns exit code, 2 if errors were found, 0 when only warnings were found.
+ """
+ msg = ""
+
+ summary = self.count_results(result.matches)
+ files_count, changed_files_count = self.count_lintables(result.files)
+
+ matched_rules = self._get_matched_skippable_rules(result.matches)
+
+ if matched_rules and self.options.generate_ignore:
+ # ANSIBLE_LINT_IGNORE_FILE environment variable overrides default
+ # dumping location in linter and is not documented or supported. We
+ # use this only for testing purposes.
+ ignore_file_path = Path(
+ os.environ.get("ANSIBLE_LINT_IGNORE_FILE", IGNORE_FILE.default),
+ )
+ console_stderr.print(f"Writing ignore file to {ignore_file_path}")
+ lines = set()
+ for rule in result.matches:
+ lines.add(f"{rule.filename} {rule.tag}\n")
+ with ignore_file_path.open("w", encoding="utf-8") as ignore_file:
+ ignore_file.write(
+ "# This file contains ignores rule violations for ansible-lint\n",
+ )
+ ignore_file.writelines(sorted(lines))
+ elif matched_rules and not self.options.quiet:
+ console_stderr.print(
+ "Read [link=https://ansible-lint.readthedocs.io/configuring/#ignoring-rules-for-entire-files]documentation[/link] for instructions on how to ignore specific rule violations.",
+ )
+
+ # Do not deprecate the old tags just yet. Why? Because it is not currently feasible
+ # to migrate old tags to new tags. There are a lot of things out there that still
+ # use ansible-lint 4 (for example, Ansible Galaxy and Automation Hub imports). If we
+ # replace the old tags, those tools will report warnings. If we do not replace them,
+ # ansible-lint 5 will report warnings.
+ #
+ # We can do the deprecation once the ecosystem caught up at least a bit.
+ # for k, v in used_old_tags.items():
+ # _logger.warning(
+ # "error in the future.",
+ # k,
+ # v,
+
+ if self.options.write_list and "yaml" in self.options.skip_list:
+ _logger.warning(
+ "You specified '--write', but no files can be modified "
+ "because 'yaml' is in 'skip_list'.",
+ )
+
+ if mark_as_success and summary.failures:
+ mark_as_success = False
+
+ if not self.options.quiet:
+ console_stderr.print(render_yaml(msg))
+ self.report_summary(
+ summary,
+ changed_files_count,
+ files_count,
+ is_success=mark_as_success,
+ )
+ if mark_as_success:
+ if not files_count:
+ # success without any file being analyzed is reported as failure
+ # to match match, preventing accidents where linter was running
+ # not doing anything due to misconfiguration.
+ _logger.critical(
+ "Linter finished without analyzing any file, check configuration and arguments given.",
+ )
+ return RC.NO_FILES_MATCHED
+ return RC.SUCCESS
+ return RC.VIOLATIONS_FOUND
+
+ def report_summary( # pylint: disable=too-many-locals # noqa: C901
+ self,
+ summary: SummarizedResults,
+ changed_files_count: int,
+ files_count: int,
+ is_success: bool,
+ ) -> None:
+ """Report match and file counts."""
+ # sort the stats by profiles
+ idx = 0
+ rule_order = {}
+
+ for profile, profile_config in PROFILES.items():
+ for rule in profile_config["rules"]:
+ rule_order[rule] = (idx, profile)
+ idx += 1
+ _logger.debug("Determined rule-profile order: %s", rule_order)
+ failed_profiles = set()
+ for tag, tag_stats in summary.tag_stats.items():
+ if tag in rule_order:
+ tag_stats.order, tag_stats.profile = rule_order.get(tag, (idx, ""))
+ elif "[" in tag:
+ tag_stats.order, tag_stats.profile = rule_order.get(
+ tag.split("[")[0],
+ (idx, ""),
+ )
+ if tag_stats.profile:
+ failed_profiles.add(tag_stats.profile)
+ summary.sort()
+
+ if changed_files_count:
+ console_stderr.print(f"Modified {changed_files_count} files.")
+
+ # determine which profile passed
+ summary.passed_profile = ""
+ passed_profile_count = 0
+ for profile in PROFILES:
+ if profile in failed_profiles:
+ break
+ if profile != summary.passed_profile:
+ summary.passed_profile = profile
+ passed_profile_count += 1
+
+ stars = ""
+ if summary.tag_stats:
+ table = Table(
+ title="Rule Violation Summary",
+ collapse_padding=True,
+ box=None,
+ show_lines=False,
+ )
+ table.add_column("count", justify="right")
+ table.add_column("tag")
+ table.add_column("profile")
+ table.add_column("rule associated tags")
+ for tag, stats in summary.tag_stats.items():
+ table.add_row(
+ str(stats.count),
+ f"[link={RULE_DOC_URL}{ tag.split('[')[0] }]{escape(tag)}[/link]",
+ stats.profile,
+ f"{', '.join(stats.associated_tags)}{' (warning)' if stats.warning else ''}",
+ style="yellow" if stats.warning else "red",
+ )
+ # rate stars for the top 5 profiles (min would not get
+ rating = 5 - (len(PROFILES.keys()) - passed_profile_count)
+ if 0 < rating < 6:
+ stars = f" Rating: {rating}/5 star"
+
+ console_stderr.print(table)
+ console_stderr.print()
+
+ msg = "[green]Passed[/]" if is_success else "[red][bold]Failed[/][/]"
+
+ msg += f": {summary.failures} failure(s), {summary.warnings} warning(s)"
+ if summary.fixed:
+ msg += f", and fixed {summary.fixed} issue(s)"
+ msg += f" on {files_count} files."
+
+ # Now we add some information about required and passed profile
+ if self.options.profile:
+ msg += f" Profile '{self.options.profile}' was required"
+ if summary.passed_profile:
+ msg += f", but only '{summary.passed_profile}' profile passed."
+ else:
+ msg += "."
+ elif summary.passed_profile:
+ msg += f" Last profile that met the validation criteria was '{summary.passed_profile}'."
+
+ if stars:
+ msg += stars
+
+ # on offline mode and when run under pre-commit we do not want to
+ # check for updates.
+ if not self.options.offline and os.environ.get("PRE_COMMIT", "0") != "1":
+ version_warning = get_version_warning()
+ if version_warning:
+ msg += f"\n{version_warning}"
+
+ console_stderr.print(msg)
+
+
+def choose_formatter_factory(
+ options_list: Options,
+) -> type[formatters.BaseFormatter[Any]]:
+ """Select an output formatter based on the incoming command line arguments."""
+ r: type[formatters.BaseFormatter[Any]] = formatters.Formatter
+ if options_list.format == "quiet":
+ r = formatters.QuietFormatter
+ elif options_list.format in ("json", "codeclimate"):
+ r = formatters.CodeclimateJSONFormatter
+ elif options_list.format == "sarif":
+ r = formatters.SarifFormatter
+ elif options_list.parseable or options_list.format == "pep8":
+ r = formatters.ParseableFormatter
+ return r
+
+
+def _sanitize_list_options(tag_list: list[str]) -> list[str]:
+ """Normalize list options."""
+ # expand comma separated entries
+ tags = set()
+ for tag in tag_list:
+ tags.update(str(tag).split(","))
+ # remove duplicates, and return as sorted list
+ return sorted(set(tags))
+
+
+@lru_cache
+def get_app(*, offline: bool | None = None) -> App:
+ """Return the application instance, caching the return value."""
+ if offline is None:
+ offline = default_options.offline
+
+ if default_options.offline != offline:
+ options = copy.deepcopy(default_options)
+ options.offline = offline
+ else:
+ options = default_options
+
+ app = App(options=options)
+ # Make linter use the cache dir from compat
+ options.cache_dir = app.runtime.cache_dir
+
+ role_name_check = 0
+ if "role-name" in app.options.warn_list:
+ role_name_check = 1
+ elif "role-name" in app.options.skip_list:
+ role_name_check = 2
+
+ # mocking must happen before prepare_environment or galaxy install might
+ # fail.
+ _perform_mockings(options=app.options)
+ app.runtime.prepare_environment(
+ install_local=(not offline),
+ offline=offline,
+ role_name_check=role_name_check,
+ )
+
+ return app