summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/app.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/app.py')
-rw-r--r--src/ansiblelint/app.py360
1 files changed, 360 insertions, 0 deletions
diff --git a/src/ansiblelint/app.py b/src/ansiblelint/app.py
new file mode 100644
index 0000000..7d14504
--- /dev/null
+++ b/src/ansiblelint/app.py
@@ -0,0 +1,360 @@
+"""Application."""
+from __future__ import annotations
+
+import itertools
+import logging
+import os
+from functools import lru_cache
+from typing import TYPE_CHECKING, Any
+
+from ansible_compat.runtime import Runtime
+from rich.markup import escape
+from rich.table import Table
+
+from ansiblelint import formatters
+from ansiblelint._mockings import _perform_mockings
+from ansiblelint.color import console, console_stderr, render_yaml
+from ansiblelint.config import PROFILES, get_version_warning
+from ansiblelint.config import options as default_options
+from ansiblelint.constants import RULE_DOC_URL, SUCCESS_RC, VIOLATIONS_FOUND_RC
+from ansiblelint.errors import MatchError
+from ansiblelint.loaders import IGNORE_TXT
+from ansiblelint.stats import SummarizedResults, TagStats
+
+if TYPE_CHECKING:
+ from argparse import Namespace
+ from typing import Dict, Set # pylint: disable=ungrouped-imports
+
+ from ansiblelint._internal.rules import BaseRule
+ from ansiblelint.file_utils import Lintable
+ from ansiblelint.runner import LintResult
+
+
+_logger = logging.getLogger(__package__)
+
+
+class App:
+ """App class represents an execution of the linter."""
+
+ def __init__(self, options: Namespace):
+ """Construct app run based on already loaded configuration."""
+ options.skip_list = _sanitize_list_options(options.skip_list)
+ options.warn_list = _sanitize_list_options(options.warn_list)
+
+ self.options = options
+
+ formatter_factory = choose_formatter_factory(options)
+ self.formatter = formatter_factory(options.cwd, options.display_relative_path)
+
+ # Without require_module, our _set_collections_basedir may fail
+ self.runtime = Runtime(isolated=True, require_module=True)
+
+ def render_matches(self, matches: list[MatchError]) -> None:
+ """Display given matches (if they are not fixed)."""
+ matches = [match for match in matches if not match.fixed]
+
+ if isinstance(
+ self.formatter,
+ (formatters.CodeclimateJSONFormatter, formatters.SarifFormatter),
+ ):
+ # If formatter CodeclimateJSONFormatter or SarifFormatter is chosen,
+ # then print only the matches in JSON
+ console.print(
+ self.formatter.format_result(matches), markup=False, highlight=False
+ )
+ return
+
+ ignored_matches = [match for match in matches if match.ignored]
+ fatal_matches = [match for match in matches if not match.ignored]
+ # Displayed ignored matches first
+ if ignored_matches:
+ _logger.warning(
+ "Listing %s violation(s) marked as ignored, likely already known",
+ len(ignored_matches),
+ )
+ for match in ignored_matches:
+ if match.ignored:
+ # highlight must be off or apostrophes may produce unexpected results
+ console.print(self.formatter.format(match), highlight=False)
+ if fatal_matches:
+ _logger.warning(
+ "Listing %s violation(s) that are fatal", len(fatal_matches)
+ )
+ for match in fatal_matches:
+ if not match.ignored:
+ console.print(self.formatter.format(match), highlight=False)
+
+ # If run under GitHub Actions we also want to emit output recognized by it.
+ if os.getenv("GITHUB_ACTIONS") == "true" and os.getenv("GITHUB_WORKFLOW"):
+ formatter = formatters.AnnotationsFormatter(self.options.cwd, True)
+ for match in itertools.chain(fatal_matches, ignored_matches):
+ console.print(formatter.format(match), markup=False, highlight=False)
+
+ # If sarif_file is set, we also dump the results to a sarif file.
+ if self.options.sarif_file:
+ sarif = formatters.SarifFormatter(self.options.cwd, True)
+ json = sarif.format_result(matches)
+ with open(self.options.sarif_file, "w", encoding="utf-8") as sarif_file:
+ sarif_file.write(json)
+
+ def count_results(self, matches: list[MatchError]) -> SummarizedResults:
+ """Count failures and warnings in matches."""
+ result = SummarizedResults()
+
+ for match in matches:
+ # any ignores match counts as a warning
+ if match.ignored:
+ result.warnings += 1
+ continue
+ # tag can include a sub-rule id: `yaml[document-start]`
+ # rule.id is the generic rule id: `yaml`
+ # *rule.tags is the list of the rule's tags (categories): `style`
+ if match.tag not in result.tag_stats:
+ result.tag_stats[match.tag] = TagStats(
+ tag=match.tag, count=1, associated_tags=match.rule.tags
+ )
+ else:
+ result.tag_stats[match.tag].count += 1
+
+ if {match.tag, match.rule.id, *match.rule.tags}.isdisjoint(
+ self.options.warn_list
+ ):
+ # not in warn_list
+ if match.fixed:
+ result.fixed_failures += 1
+ else:
+ result.failures += 1
+ else:
+ result.tag_stats[match.tag].warning = True
+ if match.fixed:
+ result.fixed_warnings += 1
+ else:
+ result.warnings += 1
+ return result
+
+ @staticmethod
+ def count_lintables(files: set[Lintable]) -> tuple[int, int]:
+ """Count total and modified files."""
+ files_count = len(files)
+ changed_files_count = len([file for file in files if file.updated])
+ return files_count, changed_files_count
+
+ @staticmethod
+ def _get_matched_skippable_rules(
+ matches: list[MatchError],
+ ) -> dict[str, BaseRule]:
+ """Extract the list of matched rules, if skippable, from the list of matches."""
+ matches_unignored = [match for match in matches if not match.ignored]
+ # match.tag is more specialized than match.rule.id
+ matched_rules = {
+ match.tag or match.rule.id: match.rule for match in matches_unignored
+ }
+ # remove unskippable rules from the list
+ for rule_id in list(matched_rules.keys()):
+ if "unskippable" in matched_rules[rule_id].tags:
+ matched_rules.pop(rule_id)
+ return matched_rules
+
+ def report_outcome(self, result: LintResult, mark_as_success: bool = False) -> int:
+ """Display information about how to skip found rules.
+
+ Returns exit code, 2 if errors were found, 0 when only warnings were found.
+ """
+ msg = ""
+
+ summary = self.count_results(result.matches)
+ files_count, changed_files_count = self.count_lintables(result.files)
+
+ matched_rules = self._get_matched_skippable_rules(result.matches)
+
+ if matched_rules and self.options.generate_ignore:
+ console_stderr.print(f"Writing ignore file to {IGNORE_TXT}")
+ lines = set()
+ for rule in result.matches:
+ lines.add(f"{rule.filename} {rule.tag}\n")
+ with open(IGNORE_TXT, "w", encoding="utf-8") as ignore_file:
+ ignore_file.write(
+ "# This file contains ignores rule violations for ansible-lint\n"
+ )
+ ignore_file.writelines(sorted(list(lines)))
+ elif matched_rules and not self.options.quiet:
+ console_stderr.print(
+ "Read [link=https://ansible-lint.readthedocs.io/configuring/#ignoring-rules-for-entire-files]documentation[/link] for instructions on how to ignore specific rule violations."
+ )
+
+ # Do not deprecate the old tags just yet. Why? Because it is not currently feasible
+ # to migrate old tags to new tags. There are a lot of things out there that still
+ # use ansible-lint 4 (for example, Ansible Galaxy and Automation Hub imports). If we
+ # replace the old tags, those tools will report warnings. If we do not replace them,
+ # ansible-lint 5 will report warnings.
+ #
+ # We can do the deprecation once the ecosystem caught up at least a bit.
+ # for k, v in used_old_tags.items():
+ # _logger.warning(
+ # "Replaced deprecated tag '%s' with '%s' but it will become an "
+ # "error in the future.",
+ # k,
+ # v,
+ # )
+
+ if self.options.write_list and "yaml" in self.options.skip_list:
+ _logger.warning(
+ "You specified '--write', but no files can be modified "
+ "because 'yaml' is in 'skip_list'."
+ )
+
+ if mark_as_success and summary.failures and not self.options.progressive:
+ mark_as_success = False
+
+ if not self.options.quiet:
+ console_stderr.print(render_yaml(msg))
+ self.report_summary(
+ summary, changed_files_count, files_count, is_success=mark_as_success
+ )
+
+ return SUCCESS_RC if mark_as_success else VIOLATIONS_FOUND_RC
+
+ def report_summary( # pylint: disable=too-many-branches,too-many-locals
+ self,
+ summary: SummarizedResults,
+ changed_files_count: int,
+ files_count: int,
+ is_success: bool,
+ ) -> None:
+ """Report match and file counts."""
+ # sort the stats by profiles
+ idx = 0
+ rule_order = {}
+
+ for profile, profile_config in PROFILES.items():
+ for rule in profile_config["rules"]:
+ # print(profile, rule)
+ rule_order[rule] = (idx, profile)
+ idx += 1
+ _logger.debug("Determined rule-profile order: %s", rule_order)
+ failed_profiles = set()
+ for tag, tag_stats in summary.tag_stats.items():
+ if tag in rule_order:
+ tag_stats.order, tag_stats.profile = rule_order.get(tag, (idx, ""))
+ elif "[" in tag:
+ tag_stats.order, tag_stats.profile = rule_order.get(
+ tag.split("[")[0], (idx, "")
+ )
+ if tag_stats.profile:
+ failed_profiles.add(tag_stats.profile)
+ summary.sort()
+
+ if changed_files_count:
+ console_stderr.print(f"Modified {changed_files_count} files.")
+
+ # determine which profile passed
+ summary.passed_profile = ""
+ passed_profile_count = 0
+ for profile in PROFILES.keys():
+ if profile in failed_profiles:
+ break
+ if profile != summary.passed_profile:
+ summary.passed_profile = profile
+ passed_profile_count += 1
+
+ stars = ""
+ if summary.tag_stats:
+ table = Table(
+ title="Rule Violation Summary",
+ collapse_padding=True,
+ box=None,
+ show_lines=False,
+ )
+ table.add_column("count", justify="right")
+ table.add_column("tag")
+ table.add_column("profile")
+ table.add_column("rule associated tags")
+ for tag, stats in summary.tag_stats.items():
+ table.add_row(
+ str(stats.count),
+ f"[link={RULE_DOC_URL}{ tag.split('[')[0] }]{escape(tag)}[/link]",
+ stats.profile,
+ f"{', '.join(stats.associated_tags)}{' (warning)' if stats.warning else ''}",
+ style="yellow" if stats.warning else "red",
+ )
+ # rate stars for the top 5 profiles (min would not get
+ rating = 5 - (len(PROFILES.keys()) - passed_profile_count)
+ if 0 < rating < 6:
+ stars = f", {rating}/5 star rating"
+
+ console_stderr.print(table)
+ console_stderr.print()
+
+ if is_success:
+ msg = "[green]Passed[/] with "
+ else:
+ msg = "[red][bold]Failed[/][/] after "
+
+ if summary.passed_profile:
+ msg += f"[bold]{summary.passed_profile}[/] profile"
+ if stars:
+ msg += stars
+
+ msg += f": {summary.failures} failure(s), {summary.warnings} warning(s)"
+ if summary.fixed:
+ msg += f", and fixed {summary.fixed} issue(s)"
+ msg += f" on {files_count} files."
+
+ # on offline mode and when run under pre-commit we do not want to
+ # check for updates.
+ if not self.options.offline and os.environ.get("PRE_COMMIT", "0") != "1":
+ version_warning = get_version_warning()
+ if version_warning:
+ msg += f"\n{version_warning}"
+
+ console_stderr.print(msg)
+
+
+def choose_formatter_factory(
+ options_list: Namespace,
+) -> type[formatters.BaseFormatter[Any]]:
+ """Select an output formatter based on the incoming command line arguments."""
+ r: type[formatters.BaseFormatter[Any]] = formatters.Formatter
+ if options_list.format == "quiet":
+ r = formatters.QuietFormatter
+ elif options_list.format in ("json", "codeclimate"):
+ r = formatters.CodeclimateJSONFormatter
+ elif options_list.format == "sarif":
+ r = formatters.SarifFormatter
+ elif options_list.parseable or options_list.format == "pep8":
+ r = formatters.ParseableFormatter
+ return r
+
+
+def _sanitize_list_options(tag_list: list[str]) -> list[str]:
+ """Normalize list options."""
+ # expand comma separated entries
+ tags = set()
+ for tag in tag_list:
+ tags.update(str(tag).split(","))
+ # remove duplicates, and return as sorted list
+ return sorted(set(tags))
+
+
+@lru_cache
+def get_app() -> App:
+ """Return the application instance, caching the return value."""
+ offline = default_options.offline
+ app = App(options=default_options)
+ # Make linter use the cache dir from compat
+ default_options.cache_dir = app.runtime.cache_dir
+
+ role_name_check = 0
+ if "role-name" in app.options.warn_list:
+ role_name_check = 1
+ elif "role-name" in app.options.skip_list:
+ role_name_check = 2
+
+ # mocking must happen before prepare_environment or galaxy install might
+ # fail.
+ _perform_mockings()
+ app.runtime.prepare_environment(
+ install_local=(not offline), offline=offline, role_name_check=role_name_check
+ )
+
+ return app