diff options
Diffstat (limited to 'src/ansiblelint/__main__.py')
-rwxr-xr-x | src/ansiblelint/__main__.py | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/src/ansiblelint/__main__.py b/src/ansiblelint/__main__.py new file mode 100755 index 0000000..0437bc0 --- /dev/null +++ b/src/ansiblelint/__main__.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Command line implementation.""" + +from __future__ import annotations + +import errno +import logging +import os +import pathlib +import shutil +import site +import subprocess +import sys +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any, Callable, Iterator, TextIO + +from ansible_compat.config import ansible_version +from ansible_compat.prerun import get_cache_dir +from filelock import FileLock, Timeout + +from ansiblelint import cli +from ansiblelint._mockings import _perform_mockings_cleanup +from ansiblelint.app import get_app +from ansiblelint.color import ( + console, + console_options, + console_stderr, + reconfigure, + render_yaml, +) +from ansiblelint.config import get_version_warning, options +from ansiblelint.constants import EXIT_CONTROL_C_RC, GIT_CMD, LOCK_TIMEOUT_RC +from ansiblelint.file_utils import abspath, cwd, normpath +from ansiblelint.loaders import load_ignore_txt +from ansiblelint.skip_utils import normalize_tag +from ansiblelint.version import __version__ + +if TYPE_CHECKING: + from argparse import Namespace + + # RulesCollection must be imported lazily or ansible gets imported too early. + from ansiblelint.rules import RulesCollection + from ansiblelint.runner import LintResult + + +_logger = logging.getLogger(__name__) + + +def initialize_logger(level: int = 0) -> None: + """Set up the global logging level based on the verbosity number.""" + # We are about to act on the root logger, which defaults to logging.WARNING. + # That is where our 0 (default) value comes from. + verbosity_map = { + -2: logging.CRITICAL, + -1: logging.ERROR, + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + + handler = logging.StreamHandler() + formatter = logging.Formatter("%(levelname)-8s %(message)s") + handler.setFormatter(formatter) + logger = logging.getLogger() + logger.addHandler(handler) + # Unknown logging level is treated as DEBUG + logging_level = verbosity_map.get(level, logging.DEBUG) + logger.setLevel(logging_level) + logging.captureWarnings(True) # pass all warnings.warn() messages through logging + # Use module-level _logger instance to validate it + _logger.debug("Logging initialized to level %s", logging_level) + + +def initialize_options(arguments: list[str] | None = None) -> None: + """Load config options and store them inside options module.""" + new_options = cli.get_config(arguments or []) + new_options.cwd = pathlib.Path.cwd() + + if new_options.colored is None: + new_options.colored = should_do_markup() + + # persist loaded configuration inside options module + for k, v in new_options.__dict__.items(): + setattr(options, k, v) + + # rename deprecated ids/tags to newer names + options.tags = [normalize_tag(tag) for tag in options.tags] + options.skip_list = [normalize_tag(tag) for tag in options.skip_list] + options.warn_list = [normalize_tag(tag) for tag in options.warn_list] + + options.configured = True + options.cache_dir = get_cache_dir(options.project_dir) + + # add a lock file so we do not have two instances running inside at the same time + os.makedirs(options.cache_dir, exist_ok=True) + + options.cache_dir_lock = None + if not options.offline: # pragma: no cover + options.cache_dir_lock = FileLock(f"{options.cache_dir}/.lock") + try: + options.cache_dir_lock.acquire(timeout=180) + except Timeout: # pragma: no cover + _logger.error( + "Timeout waiting for another instance of ansible-lint to release the lock." + ) + sys.exit(LOCK_TIMEOUT_RC) + + # Avoid extra output noise from Ansible about using devel versions + if "ANSIBLE_DEVEL_WARNING" not in os.environ: # pragma: no branch + os.environ["ANSIBLE_DEVEL_WARNING"] = "false" + + +def _do_list(rules: RulesCollection) -> int: + # On purpose lazy-imports to avoid pre-loading Ansible + # pylint: disable=import-outside-toplevel + from ansiblelint.generate_docs import ( + rules_as_docs, + rules_as_md, + rules_as_rich, + rules_as_str, + ) + + if options.list_rules: + _rule_format_map: dict[str, Callable[..., Any]] = { + "brief": rules_as_str, + "full": rules_as_rich, + "md": rules_as_md, + "docs": rules_as_docs, + } + + console.print( + _rule_format_map.get(options.format, rules_as_str)(rules), highlight=False + ) + return 0 + + if options.list_tags: + console.print(render_yaml(rules.list_tags())) + return 0 + + # we should not get here! + return 1 + + +# noinspection PyShadowingNames +def _do_transform(result: LintResult, opts: Namespace) -> None: + """Create and run Transformer.""" + if "yaml" in opts.skip_list: + # The transformer rewrites yaml files, but the user requested to skip + # the yaml rule or anything tagged with "yaml", so there is nothing to do. + return + + # On purpose lazy-imports to avoid loading transforms unless requested + # pylint: disable=import-outside-toplevel + from ansiblelint.transformer import Transformer + + transformer = Transformer(result, options) + + # this will mark any matches as fixed if the transforms repaired the issue + transformer.run() + + +def support_banner() -> None: + """Display support banner when running on unsupported platform.""" + if sys.version_info < (3, 9, 0): # pragma: no cover + prefix = "::warning::" if "GITHUB_ACTION" in os.environ else "WARNING: " + console_stderr.print( + f"{prefix}ansible-lint is no longer tested under Python {sys.version_info.major}.{sys.version_info.minor} and will soon require 3.9. Do not report bugs for this version.", + style="bold red", + ) + + +# pylint: disable=too-many-branches,too-many-statements +def main(argv: list[str] | None = None) -> int: # noqa: C901 + """Linter CLI entry point.""" + # alter PATH if needed (venv support) + path_inject() + + if argv is None: # pragma: no cover + argv = sys.argv + initialize_options(argv[1:]) + + console_options["force_terminal"] = options.colored + reconfigure(console_options) + + if options.version: + console.print( + f"ansible-lint [repr.number]{__version__}[/] using ansible [repr.number]{ansible_version()}[/]" + ) + msg = get_version_warning() + if msg: + console.print(msg) + support_banner() + sys.exit(0) + else: + support_banner() + + initialize_logger(options.verbosity) + _logger.debug("Options: %s", options) + _logger.debug(os.getcwd()) + + if options.progressive: + _logger.warning( + "Progressive mode is deprecated and will be removed in next major version, use ignore files instead: https://ansible-lint.readthedocs.io/configuring/#ignoring-rules-for-entire-files" + ) + + if not options.offline: + # pylint: disable=import-outside-toplevel + from ansiblelint.schemas import refresh_schemas + + refresh_schemas() + + # pylint: disable=import-outside-toplevel + from ansiblelint.rules import RulesCollection + from ansiblelint.runner import _get_matches + + rules = RulesCollection(options.rulesdirs, profile_name=options.profile) + + if options.list_profiles: + from ansiblelint.generate_docs import profiles_as_rich + + console.print(profiles_as_rich()) + return 0 + + if options.list_rules or options.list_tags: + return _do_list(rules) + + app = get_app() + if isinstance(options.tags, str): + options.tags = options.tags.split(",") # pragma: no cover + result = _get_matches(rules, options) + + if options.write_list: + _do_transform(result, options) + + mark_as_success = True + if result.matches and options.progressive: + mark_as_success = False + _logger.info( + "Matches found, running again on previous revision in order to detect regressions" + ) + with _previous_revision(): + _logger.debug("Options: %s", options) + _logger.debug(os.getcwd()) + old_result = _get_matches(rules, options) + # remove old matches from current list + matches_delta = list(set(result.matches) - set(old_result.matches)) + if len(matches_delta) == 0: + _logger.warning( + "Total violations not increased since previous " + "commit, will mark result as success. (%s -> %s)", + len(old_result.matches), + len(matches_delta), + ) + mark_as_success = True + + ignored = 0 + for match in result.matches: + # if match is not new, mark is as ignored + if match not in matches_delta: + match.ignored = True + ignored += 1 + if ignored: + _logger.warning( + "Marked %s previously known violation(s) as ignored due to" + " progressive mode.", + ignored, + ) + + if options.strict and result.matches: + mark_as_success = False + + # Remove skip_list items from the result + result.matches = [m for m in result.matches if m.tag not in app.options.skip_list] + # Mark matches as ignored inside ignore file + ignore_map = load_ignore_txt() + for match in result.matches: + if match.tag in ignore_map[match.filename]: + match.ignored = True + + app.render_matches(result.matches) + + _perform_mockings_cleanup() + if options.cache_dir_lock: + options.cache_dir_lock.release() + pathlib.Path(options.cache_dir_lock.lock_file).unlink(missing_ok=True) + if options.mock_filters: + _logger.warning( + "The following filters were mocked during the run: %s", + ",".join(options.mock_filters), + ) + + return app.report_outcome(result, mark_as_success=mark_as_success) + + +@contextmanager +def _previous_revision() -> Iterator[None]: + """Create or update a temporary workdir containing the previous revision.""" + worktree_dir = f"{options.cache_dir}/old-rev" + # Update options.exclude_paths to include use the temporary workdir. + rel_exclude_paths = [normpath(p) for p in options.exclude_paths] + options.exclude_paths = [abspath(p, worktree_dir) for p in rel_exclude_paths] + revision = subprocess.run( + [*GIT_CMD, "rev-parse", "HEAD^1"], + check=True, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ).stdout.strip() + _logger.info("Previous revision SHA: %s", revision) + path = pathlib.Path(worktree_dir) + if path.exists(): + shutil.rmtree(worktree_dir) + path.mkdir(parents=True, exist_ok=True) + # Run check will fail if worktree_dir already exists + # pylint: disable=subprocess-run-check + subprocess.run( + [*GIT_CMD, "worktree", "add", "-f", worktree_dir], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + try: + with cwd(worktree_dir): + subprocess.run( + [*GIT_CMD, "checkout", revision], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) + yield + finally: + options.exclude_paths = [abspath(p, os.getcwd()) for p in rel_exclude_paths] + + +def _run_cli_entrypoint() -> None: + """Invoke the main entrypoint with current CLI args. + + This function also processes the runtime exceptions. + """ + try: + sys.exit(main(sys.argv)) + except OSError as exc: + # NOTE: Only "broken pipe" is acceptable to ignore + if exc.errno != errno.EPIPE: # pragma: no cover + raise + except KeyboardInterrupt: # pragma: no cover + sys.exit(EXIT_CONTROL_C_RC) + except RuntimeError as exc: # pragma: no cover + raise SystemExit(exc) from exc + + +def path_inject() -> None: + """Add python interpreter path to top of PATH to fix outside venv calling.""" + # This make it possible to call ansible-lint that was installed inside a + # virtualenv without having to pre-activate it. Otherwise subprocess will + # either fail to find ansible executables or call the wrong ones. + # + # This must be run before we do run any subprocesses, and loading config + # does this as part of the ansible detection. + paths = [x for x in os.environ.get("PATH", "").split(os.pathsep) if x] + + # Expand ~ in PATH as it known to break many tools + expanded = False + for idx, path in enumerate(paths): + if "~" in path: # pragma: no cover + paths[idx] = os.path.expanduser(path) + expanded = True + if expanded: # pragma: no cover + # flake8: noqa: T201 + print( + "WARNING: PATH altered to expand ~ in it. Read https://stackoverflow.com/a/44704799/99834 and correct your system configuration.", + file=sys.stderr, + ) + + inject_paths = [] + + userbase_bin_path = f"{site.getuserbase()}/bin" + if userbase_bin_path not in paths and os.path.exists( + f"{userbase_bin_path}/bin/ansible" + ): + inject_paths.append(userbase_bin_path) + + py_path = os.path.dirname(sys.executable) + if py_path not in paths and os.path.exists(f"{py_path}/ansible"): + inject_paths.append(py_path) + + if inject_paths: + # flake8: noqa: T201 + print( + f"WARNING: PATH altered to include {', '.join(inject_paths)} :: This is usually a sign of broken local setup, which can cause unexpected behaviors.", + file=sys.stderr, + ) + if inject_paths or expanded: + os.environ["PATH"] = os.pathsep.join([*inject_paths, *paths]) + + # We do know that finding ansible in PATH does not guarantee that it is + # functioning or that is in fact the same version that was installed as + # our dependency, but addressing this would be done by ansible-compat. + for cmd in ("ansible", "git"): + if not shutil.which(cmd): + raise RuntimeError(f"Failed to find runtime dependency '{cmd}' in PATH") + + +# Based on Ansible implementation +def to_bool(value: Any) -> bool: # pragma: no cover + """Return a bool for the arg.""" + if value is None or isinstance(value, bool): + return bool(value) + if isinstance(value, str): + value = value.lower() + if value in ("yes", "on", "1", "true", 1): + return True + return False + + +def should_do_markup(stream: TextIO = sys.stdout) -> bool: # pragma: no cover + """Decide about use of ANSI colors.""" + py_colors = None + + # https://xkcd.com/927/ + for env_var in ["PY_COLORS", "CLICOLOR", "FORCE_COLOR", "ANSIBLE_FORCE_COLOR"]: + value = os.environ.get(env_var, None) + if value is not None: + py_colors = to_bool(value) + break + + # If deliberately disabled colors + if os.environ.get("NO_COLOR", None): + return False + + # User configuration requested colors + if py_colors is not None: + return to_bool(py_colors) + + term = os.environ.get("TERM", "") + if "xterm" in term: + return True + + if term == "dumb": + return False + + # Use tty detection logic as last resort because there are numerous + # factors that can make isatty return a misleading value, including: + # - stdin.isatty() is the only one returning true, even on a real terminal + # - stderr returning false if user user uses a error stream coloring solution + return stream.isatty() + + +if __name__ == "__main__": + _run_cli_entrypoint() |