summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/runner.py')
-rw-r--r--src/ansiblelint/runner.py249
1 files changed, 249 insertions, 0 deletions
diff --git a/src/ansiblelint/runner.py b/src/ansiblelint/runner.py
new file mode 100644
index 0000000..a98fe51
--- /dev/null
+++ b/src/ansiblelint/runner.py
@@ -0,0 +1,249 @@
+"""Runner implementation."""
+from __future__ import annotations
+
+import logging
+import multiprocessing
+import multiprocessing.pool
+import os
+from dataclasses import dataclass
+from fnmatch import fnmatch
+from typing import TYPE_CHECKING, Any, Generator
+
+import ansiblelint.skip_utils
+import ansiblelint.utils
+from ansiblelint._internal.rules import LoadingFailureRule
+from ansiblelint.constants import States
+from ansiblelint.errors import MatchError
+from ansiblelint.file_utils import Lintable, expand_dirs_in_lintables
+from ansiblelint.rules.syntax_check import AnsibleSyntaxCheckRule
+
+if TYPE_CHECKING:
+ from argparse import Namespace
+
+ from ansiblelint.rules import RulesCollection
+
+_logger = logging.getLogger(__name__)
+
+
+@dataclass
+class LintResult:
+ """Class that tracks result of linting."""
+
+ matches: list[MatchError]
+ files: set[Lintable]
+
+
+class Runner:
+ """Runner class performs the linting process."""
+
+ # pylint: disable=too-many-arguments,too-many-instance-attributes
+ def __init__(
+ self,
+ *lintables: Lintable | str,
+ rules: RulesCollection,
+ tags: frozenset[Any] = frozenset(),
+ skip_list: list[str] | None = None,
+ exclude_paths: list[str] | None = None,
+ verbosity: int = 0,
+ checked_files: set[Lintable] | None = None,
+ project_dir: str | None = None,
+ ) -> None:
+ """Initialize a Runner instance."""
+ self.rules = rules
+ self.lintables: set[Lintable] = set()
+ self.project_dir = os.path.abspath(project_dir) if project_dir else None
+
+ if skip_list is None:
+ skip_list = []
+ if exclude_paths is None:
+ exclude_paths = []
+
+ # Assure consistent type
+ for item in lintables:
+ if not isinstance(item, Lintable):
+ item = Lintable(item)
+ self.lintables.add(item)
+
+ # Expand folders (roles) to their components
+ expand_dirs_in_lintables(self.lintables)
+
+ self.tags = tags
+ self.skip_list = skip_list
+ self._update_exclude_paths(exclude_paths)
+ self.verbosity = verbosity
+ if checked_files is None:
+ checked_files = set()
+ self.checked_files = checked_files
+
+ def _update_exclude_paths(self, exclude_paths: list[str]) -> None:
+ if exclude_paths:
+ # These will be (potentially) relative paths
+ paths = ansiblelint.file_utils.expand_paths_vars(exclude_paths)
+ # Since ansiblelint.utils.find_children returns absolute paths,
+ # and the list of files we create in `Runner.run` can contain both
+ # relative and absolute paths, we need to cover both bases.
+ self.exclude_paths = paths + [os.path.abspath(p) for p in paths]
+ else:
+ self.exclude_paths = []
+
+ def is_excluded(self, lintable: Lintable) -> bool:
+ """Verify if a file path should be excluded."""
+ # Any will short-circuit as soon as something returns True, but will
+ # be poor performance for the case where the path under question is
+ # not excluded.
+
+ # Exclusions should be evaluated only using absolute paths in order
+ # to work correctly.
+ abs_path = str(lintable.abspath)
+ if self.project_dir and not abs_path.startswith(self.project_dir):
+ _logger.debug(
+ "Skipping %s as it is outside of the project directory.", abs_path
+ )
+ return True
+
+ return any(
+ abs_path.startswith(path)
+ or lintable.path.match(path)
+ or fnmatch(str(abs_path), path)
+ or fnmatch(str(lintable), path)
+ for path in self.exclude_paths
+ )
+
+ def run(self) -> list[MatchError]: # noqa: C901
+ """Execute the linting process."""
+ files: list[Lintable] = []
+ matches: list[MatchError] = []
+
+ # remove exclusions
+ for lintable in self.lintables.copy():
+ if self.is_excluded(lintable):
+ _logger.debug("Excluded %s", lintable)
+ self.lintables.remove(lintable)
+ continue
+ if isinstance(lintable.data, States) and lintable.exc:
+ matches.append(
+ MatchError(
+ filename=lintable,
+ message=str(lintable.exc),
+ details=str(lintable.exc.__cause__),
+ rule=LoadingFailureRule(),
+ )
+ )
+ lintable.stop_processing = True
+
+ # -- phase 1 : syntax check in parallel --
+ def worker(lintable: Lintable) -> list[MatchError]:
+ # pylint: disable=protected-access
+ return AnsibleSyntaxCheckRule._get_ansible_syntax_check_matches(lintable)
+
+ # playbooks: List[Lintable] = []
+ for lintable in self.lintables:
+ if lintable.kind != "playbook" or lintable.stop_processing:
+ continue
+ files.append(lintable)
+
+ # avoid resource leak warning, https://github.com/python/cpython/issues/90549
+ # pylint: disable=unused-variable
+ global_resource = multiprocessing.Semaphore()
+
+ pool = multiprocessing.pool.ThreadPool(processes=multiprocessing.cpu_count())
+ return_list = pool.map(worker, files, chunksize=1)
+ pool.close()
+ pool.join()
+ for data in return_list:
+ matches.extend(data)
+
+ # -- phase 2 ---
+ if not matches:
+ # do our processing only when ansible syntax check passed in order
+ # to avoid causing runtime exceptions. Our processing is not as
+ # resilient to be able process garbage.
+ matches.extend(self._emit_matches(files))
+
+ # remove duplicates from files list
+ files = [value for n, value in enumerate(files) if value not in files[:n]]
+
+ for file in self.lintables:
+ if file in self.checked_files or not file.kind:
+ continue
+ _logger.debug(
+ "Examining %s of type %s",
+ ansiblelint.file_utils.normpath(file.path),
+ file.kind,
+ )
+
+ matches.extend(
+ self.rules.run(file, tags=set(self.tags), skip_list=self.skip_list)
+ )
+
+ # update list of checked files
+ self.checked_files.update(self.lintables)
+
+ # remove any matches made inside excluded files
+ matches = list(
+ filter(
+ lambda match: not self.is_excluded(Lintable(match.filename))
+ and hasattr(match, "lintable")
+ and match.tag not in match.lintable.line_skips[match.linenumber],
+ matches,
+ )
+ )
+
+ return sorted(set(matches))
+
+ def _emit_matches(self, files: list[Lintable]) -> Generator[MatchError, None, None]:
+ visited: set[Lintable] = set()
+ while visited != self.lintables:
+ for lintable in self.lintables - visited:
+ try:
+ for child in ansiblelint.utils.find_children(lintable):
+ if self.is_excluded(child):
+ continue
+ self.lintables.add(child)
+ files.append(child)
+ except MatchError as exc:
+ if not exc.filename: # pragma: no branch
+ exc.filename = str(lintable.path)
+ exc.rule = LoadingFailureRule()
+ yield exc
+ except AttributeError:
+ yield MatchError(filename=lintable, rule=LoadingFailureRule())
+ visited.add(lintable)
+
+
+def _get_matches(rules: RulesCollection, options: Namespace) -> LintResult:
+ lintables = ansiblelint.utils.get_lintables(opts=options, args=options.lintables)
+
+ for rule in rules:
+ if "unskippable" in rule.tags:
+ for entry in (*options.skip_list, *options.warn_list):
+ if rule.id == entry or entry.startswith(f"{rule.id}["):
+ raise RuntimeError(
+ f"Rule '{rule.id}' is unskippable, you cannot use it in 'skip_list' or 'warn_list'. Still, you could exclude the file."
+ )
+ matches = []
+ checked_files: set[Lintable] = set()
+ runner = Runner(
+ *lintables,
+ rules=rules,
+ tags=options.tags,
+ skip_list=options.skip_list,
+ exclude_paths=options.exclude_paths,
+ verbosity=options.verbosity,
+ checked_files=checked_files,
+ project_dir=options.project_dir,
+ )
+ matches.extend(runner.run())
+
+ # Assure we do not print duplicates and the order is consistent
+ matches = sorted(set(matches))
+
+ # Convert reported filenames into human readable ones, so we hide the
+ # fact we used temporary files when processing input from stdin.
+ for match in matches:
+ for lintable in lintables:
+ if match.filename == lintable.filename:
+ match.filename = lintable.name
+ break
+
+ return LintResult(matches=matches, files=checked_files)