diff options
Diffstat (limited to 'src/ansiblelint/__main__.py')
-rwxr-xr-x | src/ansiblelint/__main__.py | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/src/ansiblelint/__main__.py b/src/ansiblelint/__main__.py new file mode 100755 index 0000000..af434d0 --- /dev/null +++ b/src/ansiblelint/__main__.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Command line implementation.""" + +from __future__ import annotations + +import errno +import logging +import os +import pathlib +import shutil +import site +import sys +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, TextIO + +from ansible_compat.prerun import get_cache_dir +from filelock import FileLock, Timeout +from rich.markup import escape + +from ansiblelint import cli +from ansiblelint._mockings import _perform_mockings_cleanup +from ansiblelint.app import get_app +from ansiblelint.color import ( + console, + console_options, + console_stderr, + reconfigure, + render_yaml, +) +from ansiblelint.config import ( + Options, + get_deps_versions, + get_version_warning, + log_entries, + options, +) +from ansiblelint.constants import RC +from ansiblelint.loaders import load_ignore_txt +from ansiblelint.skip_utils import normalize_tag +from ansiblelint.version import __version__ + +if TYPE_CHECKING: + # RulesCollection must be imported lazily or ansible gets imported too early. + + from ansiblelint.rules import RulesCollection + from ansiblelint.runner import LintResult + + +_logger = logging.getLogger(__name__) +cache_dir_lock: None | FileLock = None + + +class LintLogHandler(logging.Handler): + """Custom handler that uses our rich stderr console.""" + + def emit(self, record: logging.LogRecord) -> None: + try: + msg = self.format(record) + console_stderr.print(f"[dim]{msg}[/dim]", highlight=False) + except RecursionError: # See issue 36272 + raise + except Exception: # pylint: disable=broad-exception-caught # noqa: BLE001 + self.handleError(record) + + +def initialize_logger(level: int = 0) -> None: + """Set up the global logging level based on the verbosity number.""" + # We are about to act on the root logger, which defaults to logging.WARNING. + # That is where our 0 (default) value comes from. + verbosity_map = { + -2: logging.CRITICAL, + -1: logging.ERROR, + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + + handler = LintLogHandler() + formatter = logging.Formatter("%(levelname)-8s %(message)s") + handler.setFormatter(formatter) + logger = logging.getLogger() + logger.addHandler(handler) + # Unknown logging level is treated as DEBUG + logging_level = verbosity_map.get(level, logging.DEBUG) + logger.setLevel(logging_level) + logging.captureWarnings(True) # pass all warnings.warn() messages through logging + # Use module-level _logger instance to validate it + _logger.debug("Logging initialized to level %s", logging_level) + + +def initialize_options(arguments: list[str] | None = None) -> None: + """Load config options and store them inside options module.""" + new_options = cli.get_config(arguments or []) + new_options.cwd = pathlib.Path.cwd() + + if new_options.colored is None: + new_options.colored = should_do_markup() + + # persist loaded configuration inside options module + for k, v in new_options.__dict__.items(): + setattr(options, k, v) + + # rename deprecated ids/tags to newer names + options.tags = [normalize_tag(tag) for tag in options.tags] + options.skip_list = [normalize_tag(tag) for tag in options.skip_list] + options.warn_list = [normalize_tag(tag) for tag in options.warn_list] + + options.configured = True + options.cache_dir = get_cache_dir(pathlib.Path(options.project_dir)) + + # add a lock file so we do not have two instances running inside at the same time + if options.cache_dir: + options.cache_dir.mkdir(parents=True, exist_ok=True) + + if not options.offline: # pragma: no cover + cache_dir_lock = FileLock( # pylint: disable=redefined-outer-name + f"{options.cache_dir}/.lock", + ) + try: + cache_dir_lock.acquire(timeout=180) + except Timeout: # pragma: no cover + _logger.error( + "Timeout waiting for another instance of ansible-lint to release the lock.", + ) + sys.exit(RC.LOCK_TIMEOUT) + + # Avoid extra output noise from Ansible about using devel versions + if "ANSIBLE_DEVEL_WARNING" not in os.environ: # pragma: no branch + os.environ["ANSIBLE_DEVEL_WARNING"] = "false" + + +def _do_list(rules: RulesCollection) -> int: + # On purpose lazy-imports to avoid pre-loading Ansible + # pylint: disable=import-outside-toplevel + from ansiblelint.generate_docs import rules_as_md, rules_as_rich, rules_as_str + + if options.list_rules: + _rule_format_map: dict[str, Callable[..., Any]] = { + "brief": rules_as_str, + "full": rules_as_rich, + "md": rules_as_md, + } + + console.print( + _rule_format_map.get(options.format, rules_as_str)(rules), + highlight=False, + ) + return 0 + + if options.list_tags: + console.print(render_yaml(rules.list_tags())) + return 0 + + # we should not get here! + return 1 + + +# noinspection PyShadowingNames +def _do_transform(result: LintResult, opts: Options) -> None: + """Create and run Transformer.""" + if "yaml" in opts.skip_list: + # The transformer rewrites yaml files, but the user requested to skip + # the yaml rule or anything tagged with "yaml", so there is nothing to do. + return + + # On purpose lazy-imports to avoid loading transforms unless requested + # pylint: disable=import-outside-toplevel + from ansiblelint.transformer import Transformer + + transformer = Transformer(result, options) + + # this will mark any matches as fixed if the transforms repaired the issue + transformer.run() + + +def support_banner() -> None: + """Display support banner when running on unsupported platform.""" + if sys.version_info < (3, 9, 0): # pragma: no cover + prefix = "::warning::" if "GITHUB_ACTION" in os.environ else "WARNING: " + console_stderr.print( + f"{prefix}ansible-lint is no longer tested under Python {sys.version_info.major}.{sys.version_info.minor} and will soon require 3.9. Do not report bugs for this version.", + style="bold red", + ) + + +# pylint: disable=too-many-statements,too-many-locals +def main(argv: list[str] | None = None) -> int: + """Linter CLI entry point.""" + # alter PATH if needed (venv support) + path_inject() + + if argv is None: # pragma: no cover + argv = sys.argv + initialize_options(argv[1:]) + + console_options["force_terminal"] = options.colored + reconfigure(console_options) + + if options.version: + deps = get_deps_versions() + msg = f"ansible-lint [repr.number]{__version__}[/] using[dim]" + for k, v in deps.items(): + msg += f" {escape(k)}:[repr.number]{v}[/]" + msg += "[/]" + console.print(msg, markup=True, highlight=False) + msg = get_version_warning() + if msg: + console.print(msg) + support_banner() + sys.exit(0) + else: + support_banner() + + initialize_logger(options.verbosity) + for level, message in log_entries: + _logger.log(level, message) + _logger.debug("Options: %s", options) + _logger.debug("CWD: %s", Path.cwd()) + + if not options.offline: + # pylint: disable=import-outside-toplevel + from ansiblelint.schemas.__main__ import refresh_schemas + + refresh_schemas() + + # pylint: disable=import-outside-toplevel + from ansiblelint.rules import RulesCollection + from ansiblelint.runner import _get_matches + + if options.list_profiles: + from ansiblelint.generate_docs import profiles_as_rich + + console.print(profiles_as_rich()) + return 0 + + app = get_app(offline=None) # to be sure we use the offline value from settings + rules = RulesCollection( + options.rulesdirs, + profile_name=options.profile, + app=app, + options=options, + ) + + if options.list_rules or options.list_tags: + return _do_list(rules) + + if isinstance(options.tags, str): + options.tags = options.tags.split(",") # pragma: no cover + result = _get_matches(rules, options) + + if options.write_list: + ruamel_safe_version = "0.17.26" + from packaging.version import Version + from ruamel.yaml import __version__ as ruamel_yaml_version_str + + if Version(ruamel_safe_version) > Version(ruamel_yaml_version_str): + _logger.warning( + "We detected use of `--write` feature with a buggy ruamel-yaml %s library instead of >=%s, upgrade it before reporting any bugs like dropped comments.", + ruamel_yaml_version_str, + ruamel_safe_version, + ) + _do_transform(result, options) + + mark_as_success = True + + if options.strict and result.matches: + mark_as_success = False + + # Remove skip_list items from the result + result.matches = [m for m in result.matches if m.tag not in app.options.skip_list] + # Mark matches as ignored inside ignore file + ignore_map = load_ignore_txt(options.ignore_file) + for match in result.matches: + if match.tag in ignore_map[match.filename]: + match.ignored = True + + app.render_matches(result.matches) + + _perform_mockings_cleanup(app.options) + if cache_dir_lock: + cache_dir_lock.release() + pathlib.Path(cache_dir_lock.lock_file).unlink(missing_ok=True) + if options.mock_filters: + _logger.warning( + "The following filters were mocked during the run: %s", + ",".join(options.mock_filters), + ) + + return app.report_outcome(result, mark_as_success=mark_as_success) + + +def _run_cli_entrypoint() -> None: + """Invoke the main entrypoint with current CLI args. + + This function also processes the runtime exceptions. + """ + try: + sys.exit(main(sys.argv)) + except OSError as exc: + # NOTE: Only "broken pipe" is acceptable to ignore + if exc.errno != errno.EPIPE: # pragma: no cover + raise + except KeyboardInterrupt: # pragma: no cover + sys.exit(RC.EXIT_CONTROL_C) + except RuntimeError as exc: # pragma: no cover + raise SystemExit(exc) from exc + + +def path_inject() -> None: + """Add python interpreter path to top of PATH to fix outside venv calling.""" + # This make it possible to call ansible-lint that was installed inside a + # virtualenv without having to pre-activate it. Otherwise subprocess will + # either fail to find ansible executables or call the wrong ones. + # + # This must be run before we do run any subprocesses, and loading config + # does this as part of the ansible detection. + paths = [x for x in os.environ.get("PATH", "").split(os.pathsep) if x] + + # Expand ~ in PATH as it known to break many tools + expanded = False + for idx, path in enumerate(paths): + if path.startswith("~"): # pragma: no cover + paths[idx] = str(Path(path).expanduser()) + expanded = True + if expanded: # pragma: no cover + print( # noqa: T201 + "WARNING: PATH altered to expand ~ in it. Read https://stackoverflow.com/a/44704799/99834 and correct your system configuration.", + file=sys.stderr, + ) + + inject_paths = [] + + userbase_bin_path = Path(site.getuserbase()) / "bin" + if ( + str(userbase_bin_path) not in paths + and (userbase_bin_path / "bin" / "ansible").exists() + ): + inject_paths.append(str(userbase_bin_path)) + + py_path = Path(sys.executable).parent + if str(py_path) not in paths and (py_path / "ansible").exists(): + inject_paths.append(str(py_path)) + + if not os.environ.get("PYENV_VIRTUAL_ENV", None): + if inject_paths: + print( # noqa: T201 + f"WARNING: PATH altered to include {', '.join(inject_paths)} :: This is usually a sign of broken local setup, which can cause unexpected behaviors.", + file=sys.stderr, + ) + if inject_paths or expanded: + os.environ["PATH"] = os.pathsep.join([*inject_paths, *paths]) + + # We do know that finding ansible in PATH does not guarantee that it is + # functioning or that is in fact the same version that was installed as + # our dependency, but addressing this would be done by ansible-compat. + for cmd in ("ansible",): + if not shutil.which(cmd): + msg = f"Failed to find runtime dependency '{cmd}' in PATH" + raise RuntimeError(msg) + + +# Based on Ansible implementation +def to_bool(value: Any) -> bool: # pragma: no cover + """Return a bool for the arg.""" + if value is None or isinstance(value, bool): + return bool(value) + if isinstance(value, str): + value = value.lower() + if value in ("yes", "on", "1", "true", 1): + return True + return False + + +def should_do_markup(stream: TextIO = sys.stdout) -> bool: # pragma: no cover + """Decide about use of ANSI colors.""" + py_colors = None + + # https://xkcd.com/927/ + for env_var in ["PY_COLORS", "CLICOLOR", "FORCE_COLOR", "ANSIBLE_FORCE_COLOR"]: + value = os.environ.get(env_var, None) + if value is not None: + py_colors = to_bool(value) + break + + # If deliberately disabled colors + if os.environ.get("NO_COLOR", None): + return False + + # User configuration requested colors + if py_colors is not None: + return to_bool(py_colors) + + term = os.environ.get("TERM", "") + if "xterm" in term: + return True + + if term == "dumb": + return False + + # Use tty detection logic as last resort because there are numerous + # factors that can make isatty return a misleading value, including: + # - stdin.isatty() is the only one returning true, even on a real terminal + # - stderr returning false if user user uses a error stream coloring solution + return stream.isatty() + + +if __name__ == "__main__": + _run_cli_entrypoint() |