diff options
Diffstat (limited to '')
-rw-r--r-- | src/ansiblelint/runner.py | 568 |
1 files changed, 568 insertions, 0 deletions
diff --git a/src/ansiblelint/runner.py b/src/ansiblelint/runner.py new file mode 100644 index 0000000..9d3500d --- /dev/null +++ b/src/ansiblelint/runner.py @@ -0,0 +1,568 @@ +"""Runner implementation.""" +from __future__ import annotations + +import json +import logging +import multiprocessing +import multiprocessing.pool +import os +import re +import subprocess +import tempfile +import warnings +from dataclasses import dataclass +from fnmatch import fnmatch +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from ansible.errors import AnsibleError +from ansible.parsing.splitter import split_args +from ansible.parsing.yaml.constructor import AnsibleMapping +from ansible.plugins.loader import add_all_plugin_dirs +from ansible_compat.runtime import AnsibleWarning + +import ansiblelint.skip_utils +import ansiblelint.utils +from ansiblelint._internal.rules import ( + BaseRule, + LoadingFailureRule, + RuntimeErrorRule, + WarningRule, +) +from ansiblelint.app import App, get_app +from ansiblelint.constants import States +from ansiblelint.errors import LintWarning, MatchError, WarnSource +from ansiblelint.file_utils import Lintable, expand_dirs_in_lintables +from ansiblelint.logger import timed_info +from ansiblelint.rules.syntax_check import OUTPUT_PATTERNS, AnsibleSyntaxCheckRule +from ansiblelint.text import strip_ansi_escape +from ansiblelint.utils import ( + PLAYBOOK_DIR, + _include_children, + _roles_children, + _taskshandlers_children, + template, +) + +if TYPE_CHECKING: + from collections.abc import Generator + from typing import Callable + + from ansiblelint.config import Options + from ansiblelint.constants import FileType + from ansiblelint.rules import RulesCollection + +_logger = logging.getLogger(__name__) + + +@dataclass +class LintResult: + """Class that tracks result of linting.""" + + matches: list[MatchError] + files: set[Lintable] + + +class Runner: + """Runner class performs the linting process.""" + + # pylint: disable=too-many-arguments,too-many-instance-attributes + def __init__( + self, + *lintables: Lintable | str | Path, + rules: RulesCollection, + tags: frozenset[Any] = frozenset(), + skip_list: list[str] | None = None, + exclude_paths: list[str] | None = None, + verbosity: int = 0, + checked_files: set[Lintable] | None = None, + project_dir: str | None = None, + ) -> None: + """Initialize a Runner instance.""" + self.rules = rules + self.lintables: set[Lintable] = set() + self.project_dir = os.path.abspath(project_dir) if project_dir else None + + if skip_list is None: + skip_list = [] + if exclude_paths is None: + exclude_paths = [] + + # Assure consistent type and configure given lintables as explicit (so + # excludes paths would not apply on them). + for item in lintables: + if not isinstance(item, Lintable): + item = Lintable(item) + item.explicit = True + self.lintables.add(item) + + # Expand folders (roles) to their components + expand_dirs_in_lintables(self.lintables) + + self.tags = tags + self.skip_list = skip_list + self._update_exclude_paths(exclude_paths) + self.verbosity = verbosity + if checked_files is None: + checked_files = set() + self.checked_files = checked_files + + def _update_exclude_paths(self, exclude_paths: list[str]) -> None: + if exclude_paths: + # These will be (potentially) relative paths + paths = ansiblelint.file_utils.expand_paths_vars(exclude_paths) + # Since ansiblelint.utils.find_children returns absolute paths, + # and the list of files we create in `Runner.run` can contain both + # relative and absolute paths, we need to cover both bases. + self.exclude_paths = paths + [os.path.abspath(p) for p in paths] + else: + self.exclude_paths = [] + + def is_excluded(self, lintable: Lintable) -> bool: + """Verify if a file path should be excluded.""" + # Any will short-circuit as soon as something returns True, but will + # be poor performance for the case where the path under question is + # not excluded. + + # Exclusions should be evaluated only using absolute paths in order + # to work correctly. + + # Explicit lintables are never excluded + if lintable.explicit: + return False + + abs_path = str(lintable.abspath) + if self.project_dir and not abs_path.startswith(self.project_dir): + _logger.debug( + "Skipping %s as it is outside of the project directory.", + abs_path, + ) + return True + + return any( + abs_path.startswith(path) + or lintable.path.match(path) + or fnmatch(str(abs_path), path) + or fnmatch(str(lintable), path) + for path in self.exclude_paths + ) + + def run(self) -> list[MatchError]: + """Execute the linting process.""" + matches: list[MatchError] = [] + with warnings.catch_warnings(record=True) as captured_warnings: + warnings.simplefilter("always") + matches = self._run() + for warn in captured_warnings: + # Silence Ansible runtime warnings that are unactionable + # https://github.com/ansible/ansible-lint/issues/3216 + if warn.category is AnsibleWarning and isinstance(warn.source, dict): + msg = warn.source["msg"] + if msg.startswith( + "Falling back to Ansible unique filter as Jinja2 one failed", + ): + continue + # For the moment we are ignoring deprecation warnings as Ansible + # modules outside current content can generate them and user + # might not be able to do anything about them. + if warn.category is DeprecationWarning: + continue + if warn.category is LintWarning: + filename: None | Lintable = None + if isinstance(warn.source, WarnSource): + match = MatchError( + message=warn.source.message or warn.category.__name__, + rule=WarningRule(), + filename=warn.source.filename.filename, + tag=warn.source.tag, + lineno=warn.source.lineno, + ) + else: + filename = warn.source + match = MatchError( + message=warn.message + if isinstance(warn.message, str) + else "?", + rule=WarningRule(), + filename=str(filename), + ) + matches.append(match) + continue + _logger.warning( + "%s:%s %s %s", + warn.filename, + warn.lineno or 1, + warn.category.__name__, + warn.message, + ) + return matches + + def _run(self) -> list[MatchError]: + """Run the linting (inner loop).""" + files: list[Lintable] = [] + matches: list[MatchError] = [] + + # remove exclusions + for lintable in self.lintables.copy(): + if self.is_excluded(lintable): + _logger.debug("Excluded %s", lintable) + self.lintables.remove(lintable) + continue + if isinstance(lintable.data, States) and lintable.exc: + lintable.exc.__class__.__name__.lower() + matches.append( + MatchError( + lintable=lintable, + message=str(lintable.exc), + details=str(lintable.exc.__cause__), + rule=LoadingFailureRule(), + tag=f"load-failure[{lintable.exc.__class__.__name__.lower()}]", + ), + ) + lintable.stop_processing = True + # identify missing files/folders + if not lintable.path.exists(): + matches.append( + MatchError( + lintable=lintable, + message="File or directory not found.", + rule=LoadingFailureRule(), + tag="load-failure[not-found]", + ), + ) + + # -- phase 1 : syntax check in parallel -- + app = get_app(offline=True) + + def worker(lintable: Lintable) -> list[MatchError]: + # pylint: disable=protected-access + return self._get_ansible_syntax_check_matches( + lintable=lintable, + app=app, + ) + + for lintable in self.lintables: + if lintable.kind not in ("playbook", "role") or lintable.stop_processing: + continue + files.append(lintable) + + # avoid resource leak warning, https://github.com/python/cpython/issues/90549 + # pylint: disable=unused-variable + global_resource = multiprocessing.Semaphore() # noqa: F841 + + pool = multiprocessing.pool.ThreadPool(processes=multiprocessing.cpu_count()) + return_list = pool.map(worker, files, chunksize=1) + pool.close() + pool.join() + for data in return_list: + matches.extend(data) + + matches = self._filter_excluded_matches(matches) + # -- phase 2 --- + if not matches: + # do our processing only when ansible syntax check passed in order + # to avoid causing runtime exceptions. Our processing is not as + # resilient to be able process garbage. + matches.extend(self._emit_matches(files)) + + # remove duplicates from files list + files = [value for n, value in enumerate(files) if value not in files[:n]] + + for file in self.lintables: + if file in self.checked_files or not file.kind: + continue + _logger.debug( + "Examining %s of type %s", + ansiblelint.file_utils.normpath(file.path), + file.kind, + ) + + matches.extend( + self.rules.run(file, tags=set(self.tags), skip_list=self.skip_list), + ) + + # update list of checked files + self.checked_files.update(self.lintables) + + # remove any matches made inside excluded files + matches = self._filter_excluded_matches(matches) + + return sorted(set(matches)) + + # pylint: disable=too-many-locals + def _get_ansible_syntax_check_matches( + self, + lintable: Lintable, + app: App, + ) -> list[MatchError]: + """Run ansible syntax check and return a list of MatchError(s).""" + default_rule: BaseRule = AnsibleSyntaxCheckRule() + fh = None + results = [] + if lintable.kind not in ("playbook", "role"): + return [] + + with timed_info( + "Executing syntax check on %s %s", + lintable.kind, + lintable.path, + ): + if lintable.kind == "role": + playbook_text = f""" +--- +- name: Temporary playbook for role syntax check + hosts: localhost + tasks: + - ansible.builtin.import_role: + name: {lintable.path.expanduser()!s} +""" + # pylint: disable=consider-using-with + fh = tempfile.NamedTemporaryFile(mode="w", suffix=".yml", prefix="play") + fh.write(playbook_text) + fh.flush() + playbook_path = fh.name + else: + playbook_path = str(lintable.path.expanduser()) + # To avoid noisy warnings we pass localhost as current inventory: + # [WARNING]: No inventory was parsed, only implicit localhost is available + # [WARNING]: provided hosts list is empty, only localhost is available. Note that the implicit localhost does not match 'all' + cmd = [ + "ansible-playbook", + "-i", + "localhost,", + "--syntax-check", + playbook_path, + ] + if app.options.extra_vars: + cmd.extend(["--extra-vars", json.dumps(app.options.extra_vars)]) + + # To reduce noisy warnings like + # CryptographyDeprecationWarning: Blowfish has been deprecated + # https://github.com/paramiko/paramiko/issues/2038 + env = app.runtime.environ.copy() + env["PYTHONWARNINGS"] = "ignore" + + run = subprocess.run( + cmd, + stdin=subprocess.PIPE, + capture_output=True, + shell=False, # needed when command is a list # noqa: S603 + text=True, + check=False, + env=env, + ) + + if run.returncode != 0: + message = None + filename = lintable + lineno = 1 + column = None + + stderr = strip_ansi_escape(run.stderr) + stdout = strip_ansi_escape(run.stdout) + if stderr: + details = stderr + if stdout: + details += "\n" + stdout + else: + details = stdout + + for pattern in OUTPUT_PATTERNS: + rule = default_rule + match = re.search(pattern.regex, stderr) + if match: + groups = match.groupdict() + title = groups.get("title", match.group(0)) + details = groups.get("details", "") + lineno = int(groups.get("line", 1)) + + if "filename" in groups: + filename = Lintable(groups["filename"]) + else: + filename = lintable + column = int(groups.get("column", 1)) + results.append( + MatchError( + message=title, + lintable=filename, + lineno=lineno, + column=column, + rule=rule, + details=details, + tag=f"{rule.id}[{pattern.tag}]", + ), + ) + + if not results: + rule = RuntimeErrorRule() + message = ( + f"Unexpected error code {run.returncode} from " + f"execution of: {' '.join(cmd)}" + ) + results.append( + MatchError( + message=message, + lintable=filename, + lineno=lineno, + column=column, + rule=rule, + details=details, + tag="", + ), + ) + + if fh: + fh.close() + return results + + def _filter_excluded_matches(self, matches: list[MatchError]) -> list[MatchError]: + return [ + match + for match in matches + if not self.is_excluded(match.lintable) + and match.tag not in match.lintable.line_skips[match.lineno] + ] + + def _emit_matches(self, files: list[Lintable]) -> Generator[MatchError, None, None]: + visited: set[Lintable] = set() + while visited != self.lintables: + for lintable in self.lintables - visited: + try: + children = self.find_children(lintable) + for child in children: + if self.is_excluded(child): + continue + self.lintables.add(child) + files.append(child) + except MatchError as exc: + if not exc.filename: # pragma: no branch + exc.filename = str(lintable.path) + exc.rule = LoadingFailureRule() + yield exc + except AttributeError: + yield MatchError(lintable=lintable, rule=LoadingFailureRule()) + visited.add(lintable) + + def find_children(self, lintable: Lintable) -> list[Lintable]: + """Traverse children of a single file or folder.""" + if not lintable.path.exists(): + return [] + playbook_dir = str(lintable.path.parent) + ansiblelint.utils.set_collections_basedir(lintable.path.parent) + add_all_plugin_dirs(playbook_dir or ".") + if lintable.kind == "role": + playbook_ds = AnsibleMapping({"roles": [{"role": str(lintable.path)}]}) + elif lintable.kind not in ("playbook", "tasks"): + return [] + else: + try: + playbook_ds = ansiblelint.utils.parse_yaml_from_file(str(lintable.path)) + except AnsibleError as exc: + raise SystemExit(exc) from exc + results = [] + # playbook_ds can be an AnsibleUnicode string, which we consider invalid + if isinstance(playbook_ds, str): + raise MatchError(lintable=lintable, rule=LoadingFailureRule()) + for item in ansiblelint.utils.playbook_items(playbook_ds): + # if lintable.kind not in ["playbook"]: + for child in self.play_children( + lintable.path.parent, + item, + lintable.kind, + playbook_dir, + ): + # We avoid processing parametrized children + path_str = str(child.path) + if "$" in path_str or "{{" in path_str: + continue + + # Repair incorrect paths obtained when old syntax was used, like: + # - include: simpletask.yml tags=nginx + valid_tokens = [] + for token in split_args(path_str): + if "=" in token: + break + valid_tokens.append(token) + path = " ".join(valid_tokens) + if path != path_str: + child.path = Path(path) + child.name = child.path.name + + results.append(child) + return results + + def play_children( + self, + basedir: Path, + item: tuple[str, Any], + parent_type: FileType, + playbook_dir: str, + ) -> list[Lintable]: + """Flatten the traversed play tasks.""" + # pylint: disable=unused-argument + delegate_map: dict[str, Callable[[str, Any, Any, FileType], list[Lintable]]] = { + "tasks": _taskshandlers_children, + "pre_tasks": _taskshandlers_children, + "post_tasks": _taskshandlers_children, + "block": _taskshandlers_children, + "include": _include_children, + "ansible.builtin.include": _include_children, + "import_playbook": _include_children, + "ansible.builtin.import_playbook": _include_children, + "roles": _roles_children, + "dependencies": _roles_children, + "handlers": _taskshandlers_children, + "include_tasks": _include_children, + "ansible.builtin.include_tasks": _include_children, + "import_tasks": _include_children, + "ansible.builtin.import_tasks": _include_children, + } + (k, v) = item + add_all_plugin_dirs(str(basedir.resolve())) + + if k in delegate_map and v: + v = template( + basedir, + v, + {"playbook_dir": PLAYBOOK_DIR or str(basedir.resolve())}, + fail_on_undefined=False, + ) + return delegate_map[k](str(basedir), k, v, parent_type) + return [] + + +def _get_matches(rules: RulesCollection, options: Options) -> LintResult: + lintables = ansiblelint.utils.get_lintables(opts=options, args=options.lintables) + + for rule in rules: + if "unskippable" in rule.tags: + for entry in (*options.skip_list, *options.warn_list): + if rule.id == entry or entry.startswith(f"{rule.id}["): + msg = f"Rule '{rule.id}' is unskippable, you cannot use it in 'skip_list' or 'warn_list'. Still, you could exclude the file." + raise RuntimeError(msg) + matches = [] + checked_files: set[Lintable] = set() + runner = Runner( + *lintables, + rules=rules, + tags=frozenset(options.tags), + skip_list=options.skip_list, + exclude_paths=options.exclude_paths, + verbosity=options.verbosity, + checked_files=checked_files, + project_dir=options.project_dir, + ) + matches.extend(runner.run()) + + # Assure we do not print duplicates and the order is consistent + matches = sorted(set(matches)) + + # Convert reported filenames into human readable ones, so we hide the + # fact we used temporary files when processing input from stdin. + for match in matches: + for lintable in lintables: + if match.filename == lintable.filename: + match.filename = lintable.name + break + + return LintResult(matches=matches, files=checked_files) |