summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/__main__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/__main__.py')
-rwxr-xr-xsrc/ansiblelint/__main__.py427
1 files changed, 427 insertions, 0 deletions
diff --git a/src/ansiblelint/__main__.py b/src/ansiblelint/__main__.py
new file mode 100755
index 0000000..af434d0
--- /dev/null
+++ b/src/ansiblelint/__main__.py
@@ -0,0 +1,427 @@
+#!/usr/bin/env python
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+"""Command line implementation."""
+
+from __future__ import annotations
+
+import errno
+import logging
+import os
+import pathlib
+import shutil
+import site
+import sys
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Callable, TextIO
+
+from ansible_compat.prerun import get_cache_dir
+from filelock import FileLock, Timeout
+from rich.markup import escape
+
+from ansiblelint import cli
+from ansiblelint._mockings import _perform_mockings_cleanup
+from ansiblelint.app import get_app
+from ansiblelint.color import (
+ console,
+ console_options,
+ console_stderr,
+ reconfigure,
+ render_yaml,
+)
+from ansiblelint.config import (
+ Options,
+ get_deps_versions,
+ get_version_warning,
+ log_entries,
+ options,
+)
+from ansiblelint.constants import RC
+from ansiblelint.loaders import load_ignore_txt
+from ansiblelint.skip_utils import normalize_tag
+from ansiblelint.version import __version__
+
+if TYPE_CHECKING:
+ # RulesCollection must be imported lazily or ansible gets imported too early.
+
+ from ansiblelint.rules import RulesCollection
+ from ansiblelint.runner import LintResult
+
+
+_logger = logging.getLogger(__name__)
+cache_dir_lock: None | FileLock = None
+
+
+class LintLogHandler(logging.Handler):
+ """Custom handler that uses our rich stderr console."""
+
+ def emit(self, record: logging.LogRecord) -> None:
+ try:
+ msg = self.format(record)
+ console_stderr.print(f"[dim]{msg}[/dim]", highlight=False)
+ except RecursionError: # See issue 36272
+ raise
+ except Exception: # pylint: disable=broad-exception-caught # noqa: BLE001
+ self.handleError(record)
+
+
+def initialize_logger(level: int = 0) -> None:
+ """Set up the global logging level based on the verbosity number."""
+ # We are about to act on the root logger, which defaults to logging.WARNING.
+ # That is where our 0 (default) value comes from.
+ verbosity_map = {
+ -2: logging.CRITICAL,
+ -1: logging.ERROR,
+ 0: logging.WARNING,
+ 1: logging.INFO,
+ 2: logging.DEBUG,
+ }
+
+ handler = LintLogHandler()
+ formatter = logging.Formatter("%(levelname)-8s %(message)s")
+ handler.setFormatter(formatter)
+ logger = logging.getLogger()
+ logger.addHandler(handler)
+ # Unknown logging level is treated as DEBUG
+ logging_level = verbosity_map.get(level, logging.DEBUG)
+ logger.setLevel(logging_level)
+ logging.captureWarnings(True) # pass all warnings.warn() messages through logging
+ # Use module-level _logger instance to validate it
+ _logger.debug("Logging initialized to level %s", logging_level)
+
+
+def initialize_options(arguments: list[str] | None = None) -> None:
+ """Load config options and store them inside options module."""
+ new_options = cli.get_config(arguments or [])
+ new_options.cwd = pathlib.Path.cwd()
+
+ if new_options.colored is None:
+ new_options.colored = should_do_markup()
+
+ # persist loaded configuration inside options module
+ for k, v in new_options.__dict__.items():
+ setattr(options, k, v)
+
+ # rename deprecated ids/tags to newer names
+ options.tags = [normalize_tag(tag) for tag in options.tags]
+ options.skip_list = [normalize_tag(tag) for tag in options.skip_list]
+ options.warn_list = [normalize_tag(tag) for tag in options.warn_list]
+
+ options.configured = True
+ options.cache_dir = get_cache_dir(pathlib.Path(options.project_dir))
+
+ # add a lock file so we do not have two instances running inside at the same time
+ if options.cache_dir:
+ options.cache_dir.mkdir(parents=True, exist_ok=True)
+
+ if not options.offline: # pragma: no cover
+ cache_dir_lock = FileLock( # pylint: disable=redefined-outer-name
+ f"{options.cache_dir}/.lock",
+ )
+ try:
+ cache_dir_lock.acquire(timeout=180)
+ except Timeout: # pragma: no cover
+ _logger.error(
+ "Timeout waiting for another instance of ansible-lint to release the lock.",
+ )
+ sys.exit(RC.LOCK_TIMEOUT)
+
+ # Avoid extra output noise from Ansible about using devel versions
+ if "ANSIBLE_DEVEL_WARNING" not in os.environ: # pragma: no branch
+ os.environ["ANSIBLE_DEVEL_WARNING"] = "false"
+
+
+def _do_list(rules: RulesCollection) -> int:
+ # On purpose lazy-imports to avoid pre-loading Ansible
+ # pylint: disable=import-outside-toplevel
+ from ansiblelint.generate_docs import rules_as_md, rules_as_rich, rules_as_str
+
+ if options.list_rules:
+ _rule_format_map: dict[str, Callable[..., Any]] = {
+ "brief": rules_as_str,
+ "full": rules_as_rich,
+ "md": rules_as_md,
+ }
+
+ console.print(
+ _rule_format_map.get(options.format, rules_as_str)(rules),
+ highlight=False,
+ )
+ return 0
+
+ if options.list_tags:
+ console.print(render_yaml(rules.list_tags()))
+ return 0
+
+ # we should not get here!
+ return 1
+
+
+# noinspection PyShadowingNames
+def _do_transform(result: LintResult, opts: Options) -> None:
+ """Create and run Transformer."""
+ if "yaml" in opts.skip_list:
+ # The transformer rewrites yaml files, but the user requested to skip
+ # the yaml rule or anything tagged with "yaml", so there is nothing to do.
+ return
+
+ # On purpose lazy-imports to avoid loading transforms unless requested
+ # pylint: disable=import-outside-toplevel
+ from ansiblelint.transformer import Transformer
+
+ transformer = Transformer(result, options)
+
+ # this will mark any matches as fixed if the transforms repaired the issue
+ transformer.run()
+
+
+def support_banner() -> None:
+ """Display support banner when running on unsupported platform."""
+ if sys.version_info < (3, 9, 0): # pragma: no cover
+ prefix = "::warning::" if "GITHUB_ACTION" in os.environ else "WARNING: "
+ console_stderr.print(
+ f"{prefix}ansible-lint is no longer tested under Python {sys.version_info.major}.{sys.version_info.minor} and will soon require 3.9. Do not report bugs for this version.",
+ style="bold red",
+ )
+
+
+# pylint: disable=too-many-statements,too-many-locals
+def main(argv: list[str] | None = None) -> int:
+ """Linter CLI entry point."""
+ # alter PATH if needed (venv support)
+ path_inject()
+
+ if argv is None: # pragma: no cover
+ argv = sys.argv
+ initialize_options(argv[1:])
+
+ console_options["force_terminal"] = options.colored
+ reconfigure(console_options)
+
+ if options.version:
+ deps = get_deps_versions()
+ msg = f"ansible-lint [repr.number]{__version__}[/] using[dim]"
+ for k, v in deps.items():
+ msg += f" {escape(k)}:[repr.number]{v}[/]"
+ msg += "[/]"
+ console.print(msg, markup=True, highlight=False)
+ msg = get_version_warning()
+ if msg:
+ console.print(msg)
+ support_banner()
+ sys.exit(0)
+ else:
+ support_banner()
+
+ initialize_logger(options.verbosity)
+ for level, message in log_entries:
+ _logger.log(level, message)
+ _logger.debug("Options: %s", options)
+ _logger.debug("CWD: %s", Path.cwd())
+
+ if not options.offline:
+ # pylint: disable=import-outside-toplevel
+ from ansiblelint.schemas.__main__ import refresh_schemas
+
+ refresh_schemas()
+
+ # pylint: disable=import-outside-toplevel
+ from ansiblelint.rules import RulesCollection
+ from ansiblelint.runner import _get_matches
+
+ if options.list_profiles:
+ from ansiblelint.generate_docs import profiles_as_rich
+
+ console.print(profiles_as_rich())
+ return 0
+
+ app = get_app(offline=None) # to be sure we use the offline value from settings
+ rules = RulesCollection(
+ options.rulesdirs,
+ profile_name=options.profile,
+ app=app,
+ options=options,
+ )
+
+ if options.list_rules or options.list_tags:
+ return _do_list(rules)
+
+ if isinstance(options.tags, str):
+ options.tags = options.tags.split(",") # pragma: no cover
+ result = _get_matches(rules, options)
+
+ if options.write_list:
+ ruamel_safe_version = "0.17.26"
+ from packaging.version import Version
+ from ruamel.yaml import __version__ as ruamel_yaml_version_str
+
+ if Version(ruamel_safe_version) > Version(ruamel_yaml_version_str):
+ _logger.warning(
+ "We detected use of `--write` feature with a buggy ruamel-yaml %s library instead of >=%s, upgrade it before reporting any bugs like dropped comments.",
+ ruamel_yaml_version_str,
+ ruamel_safe_version,
+ )
+ _do_transform(result, options)
+
+ mark_as_success = True
+
+ if options.strict and result.matches:
+ mark_as_success = False
+
+ # Remove skip_list items from the result
+ result.matches = [m for m in result.matches if m.tag not in app.options.skip_list]
+ # Mark matches as ignored inside ignore file
+ ignore_map = load_ignore_txt(options.ignore_file)
+ for match in result.matches:
+ if match.tag in ignore_map[match.filename]:
+ match.ignored = True
+
+ app.render_matches(result.matches)
+
+ _perform_mockings_cleanup(app.options)
+ if cache_dir_lock:
+ cache_dir_lock.release()
+ pathlib.Path(cache_dir_lock.lock_file).unlink(missing_ok=True)
+ if options.mock_filters:
+ _logger.warning(
+ "The following filters were mocked during the run: %s",
+ ",".join(options.mock_filters),
+ )
+
+ return app.report_outcome(result, mark_as_success=mark_as_success)
+
+
+def _run_cli_entrypoint() -> None:
+ """Invoke the main entrypoint with current CLI args.
+
+ This function also processes the runtime exceptions.
+ """
+ try:
+ sys.exit(main(sys.argv))
+ except OSError as exc:
+ # NOTE: Only "broken pipe" is acceptable to ignore
+ if exc.errno != errno.EPIPE: # pragma: no cover
+ raise
+ except KeyboardInterrupt: # pragma: no cover
+ sys.exit(RC.EXIT_CONTROL_C)
+ except RuntimeError as exc: # pragma: no cover
+ raise SystemExit(exc) from exc
+
+
+def path_inject() -> None:
+ """Add python interpreter path to top of PATH to fix outside venv calling."""
+ # This make it possible to call ansible-lint that was installed inside a
+ # virtualenv without having to pre-activate it. Otherwise subprocess will
+ # either fail to find ansible executables or call the wrong ones.
+ #
+ # This must be run before we do run any subprocesses, and loading config
+ # does this as part of the ansible detection.
+ paths = [x for x in os.environ.get("PATH", "").split(os.pathsep) if x]
+
+ # Expand ~ in PATH as it known to break many tools
+ expanded = False
+ for idx, path in enumerate(paths):
+ if path.startswith("~"): # pragma: no cover
+ paths[idx] = str(Path(path).expanduser())
+ expanded = True
+ if expanded: # pragma: no cover
+ print( # noqa: T201
+ "WARNING: PATH altered to expand ~ in it. Read https://stackoverflow.com/a/44704799/99834 and correct your system configuration.",
+ file=sys.stderr,
+ )
+
+ inject_paths = []
+
+ userbase_bin_path = Path(site.getuserbase()) / "bin"
+ if (
+ str(userbase_bin_path) not in paths
+ and (userbase_bin_path / "bin" / "ansible").exists()
+ ):
+ inject_paths.append(str(userbase_bin_path))
+
+ py_path = Path(sys.executable).parent
+ if str(py_path) not in paths and (py_path / "ansible").exists():
+ inject_paths.append(str(py_path))
+
+ if not os.environ.get("PYENV_VIRTUAL_ENV", None):
+ if inject_paths:
+ print( # noqa: T201
+ f"WARNING: PATH altered to include {', '.join(inject_paths)} :: This is usually a sign of broken local setup, which can cause unexpected behaviors.",
+ file=sys.stderr,
+ )
+ if inject_paths or expanded:
+ os.environ["PATH"] = os.pathsep.join([*inject_paths, *paths])
+
+ # We do know that finding ansible in PATH does not guarantee that it is
+ # functioning or that is in fact the same version that was installed as
+ # our dependency, but addressing this would be done by ansible-compat.
+ for cmd in ("ansible",):
+ if not shutil.which(cmd):
+ msg = f"Failed to find runtime dependency '{cmd}' in PATH"
+ raise RuntimeError(msg)
+
+
+# Based on Ansible implementation
+def to_bool(value: Any) -> bool: # pragma: no cover
+ """Return a bool for the arg."""
+ if value is None or isinstance(value, bool):
+ return bool(value)
+ if isinstance(value, str):
+ value = value.lower()
+ if value in ("yes", "on", "1", "true", 1):
+ return True
+ return False
+
+
+def should_do_markup(stream: TextIO = sys.stdout) -> bool: # pragma: no cover
+ """Decide about use of ANSI colors."""
+ py_colors = None
+
+ # https://xkcd.com/927/
+ for env_var in ["PY_COLORS", "CLICOLOR", "FORCE_COLOR", "ANSIBLE_FORCE_COLOR"]:
+ value = os.environ.get(env_var, None)
+ if value is not None:
+ py_colors = to_bool(value)
+ break
+
+ # If deliberately disabled colors
+ if os.environ.get("NO_COLOR", None):
+ return False
+
+ # User configuration requested colors
+ if py_colors is not None:
+ return to_bool(py_colors)
+
+ term = os.environ.get("TERM", "")
+ if "xterm" in term:
+ return True
+
+ if term == "dumb":
+ return False
+
+ # Use tty detection logic as last resort because there are numerous
+ # factors that can make isatty return a misleading value, including:
+ # - stdin.isatty() is the only one returning true, even on a real terminal
+ # - stderr returning false if user user uses a error stream coloring solution
+ return stream.isatty()
+
+
+if __name__ == "__main__":
+ _run_cli_entrypoint()