diff options
Diffstat (limited to 'lib/ansiblelint/runner.py')
-rw-r--r-- | lib/ansiblelint/runner.py | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/lib/ansiblelint/runner.py b/lib/ansiblelint/runner.py new file mode 100644 index 0000000..f73945f --- /dev/null +++ b/lib/ansiblelint/runner.py @@ -0,0 +1,111 @@ +"""Runner implementation.""" +import logging +import os +from typing import TYPE_CHECKING, Any, FrozenSet, Generator, List, Optional, Set + +import ansiblelint.file_utils +import ansiblelint.skip_utils +import ansiblelint.utils +from ansiblelint.errors import MatchError +from ansiblelint.rules.LoadingFailureRule import LoadingFailureRule + +if TYPE_CHECKING: + from ansiblelint.rules import RulesCollection + + +_logger = logging.getLogger(__name__) + + +class Runner(object): + """Runner class performs the linting process.""" + + def __init__( + self, + rules: "RulesCollection", + playbook: str, + tags: FrozenSet[Any] = frozenset(), + skip_list: Optional[FrozenSet[Any]] = frozenset(), + exclude_paths: List[str] = [], + verbosity: int = 0, + checked_files: Set[str] = None) -> None: + """Initialize a Runner instance.""" + self.rules = rules + self.playbooks = set() + # assume role if directory + if os.path.isdir(playbook): + self.playbooks.add((os.path.join(playbook, ''), 'role')) + self.playbook_dir = playbook + else: + self.playbooks.add((playbook, 'playbook')) + self.playbook_dir = os.path.dirname(playbook) + self.tags = tags + self.skip_list = skip_list + self._update_exclude_paths(exclude_paths) + self.verbosity = verbosity + if checked_files is None: + checked_files = set() + self.checked_files = checked_files + + def _update_exclude_paths(self, exclude_paths: List[str]) -> None: + if exclude_paths: + # These will be (potentially) relative paths + paths = ansiblelint.utils.expand_paths_vars(exclude_paths) + # Since ansiblelint.utils.find_children returns absolute paths, + # and the list of files we create in `Runner.run` can contain both + # relative and absolute paths, we need to cover both bases. + self.exclude_paths = paths + [os.path.abspath(p) for p in paths] + else: + self.exclude_paths = [] + + def is_excluded(self, file_path: str) -> bool: + """Verify if a file path should be excluded.""" + # Any will short-circuit as soon as something returns True, but will + # be poor performance for the case where the path under question is + # not excluded. + return any(file_path.startswith(path) for path in self.exclude_paths) + + def run(self) -> List[MatchError]: + """Execute the linting process.""" + files = list() + for playbook in self.playbooks: + if self.is_excluded(playbook[0]) or playbook[1] == 'role': + continue + files.append({'path': ansiblelint.file_utils.normpath(playbook[0]), + 'type': playbook[1], + # add an absolute path here, so rules are able to validate if + # referenced files exist + 'absolute_directory': os.path.dirname(playbook[0])}) + matches = set(self._emit_matches(files)) + + # remove duplicates from files list + files = [value for n, value in enumerate(files) if value not in files[:n]] + + # remove files that have already been checked + files = [x for x in files if x['path'] not in self.checked_files] + for file in files: + _logger.debug( + "Examining %s of type %s", + ansiblelint.file_utils.normpath(file['path']), + file['type']) + matches = matches.union( + self.rules.run(file, tags=set(self.tags), + skip_list=self.skip_list)) + # update list of checked files + self.checked_files.update([x['path'] for x in files]) + + return sorted(matches) + + def _emit_matches(self, files: List) -> Generator[MatchError, None, None]: + visited: Set = set() + while visited != self.playbooks: + for arg in self.playbooks - visited: + try: + for child in ansiblelint.utils.find_children(arg, self.playbook_dir): + if self.is_excluded(child['path']): + continue + self.playbooks.add((child['path'], child['type'])) + files.append(child) + except MatchError as e: + e.rule = LoadingFailureRule + yield e + visited.add(arg) |