summaryrefslogtreecommitdiffstats
path: root/lib/ansiblelint/runner.py
blob: f73945fb4f38c36443ab7de794cb5023b929e58a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Runner implementation."""
import logging
import os
from typing import TYPE_CHECKING, Any, FrozenSet, Generator, List, Optional, Set

import ansiblelint.file_utils
import ansiblelint.skip_utils
import ansiblelint.utils
from ansiblelint.errors import MatchError
from ansiblelint.rules.LoadingFailureRule import LoadingFailureRule

if TYPE_CHECKING:
    from ansiblelint.rules import RulesCollection


_logger = logging.getLogger(__name__)


class Runner(object):
    """Runner class performs the linting process."""

    def __init__(
            self,
            rules: "RulesCollection",
            playbook: str,
            tags: FrozenSet[Any] = frozenset(),
            skip_list: Optional[FrozenSet[Any]] = frozenset(),
            exclude_paths: List[str] = [],
            verbosity: int = 0,
            checked_files: Set[str] = None) -> None:
        """Initialize a Runner instance."""
        self.rules = rules
        self.playbooks = set()
        # assume role if directory
        if os.path.isdir(playbook):
            self.playbooks.add((os.path.join(playbook, ''), 'role'))
            self.playbook_dir = playbook
        else:
            self.playbooks.add((playbook, 'playbook'))
            self.playbook_dir = os.path.dirname(playbook)
        self.tags = tags
        self.skip_list = skip_list
        self._update_exclude_paths(exclude_paths)
        self.verbosity = verbosity
        if checked_files is None:
            checked_files = set()
        self.checked_files = checked_files

    def _update_exclude_paths(self, exclude_paths: List[str]) -> None:
        if exclude_paths:
            # These will be (potentially) relative paths
            paths = ansiblelint.utils.expand_paths_vars(exclude_paths)
            # Since ansiblelint.utils.find_children returns absolute paths,
            # and the list of files we create in `Runner.run` can contain both
            # relative and absolute paths, we need to cover both bases.
            self.exclude_paths = paths + [os.path.abspath(p) for p in paths]
        else:
            self.exclude_paths = []

    def is_excluded(self, file_path: str) -> bool:
        """Verify if a file path should be excluded."""
        # Any will short-circuit as soon as something returns True, but will
        # be poor performance for the case where the path under question is
        # not excluded.
        return any(file_path.startswith(path) for path in self.exclude_paths)

    def run(self) -> List[MatchError]:
        """Execute the linting process."""
        files = list()
        for playbook in self.playbooks:
            if self.is_excluded(playbook[0]) or playbook[1] == 'role':
                continue
            files.append({'path': ansiblelint.file_utils.normpath(playbook[0]),
                          'type': playbook[1],
                          # add an absolute path here, so rules are able to validate if
                          # referenced files exist
                          'absolute_directory': os.path.dirname(playbook[0])})
        matches = set(self._emit_matches(files))

        # remove duplicates from files list
        files = [value for n, value in enumerate(files) if value not in files[:n]]

        # remove files that have already been checked
        files = [x for x in files if x['path'] not in self.checked_files]
        for file in files:
            _logger.debug(
                "Examining %s of type %s",
                ansiblelint.file_utils.normpath(file['path']),
                file['type'])
            matches = matches.union(
                self.rules.run(file, tags=set(self.tags),
                               skip_list=self.skip_list))
        # update list of checked files
        self.checked_files.update([x['path'] for x in files])

        return sorted(matches)

    def _emit_matches(self, files: List) -> Generator[MatchError, None, None]:
        visited: Set = set()
        while visited != self.playbooks:
            for arg in self.playbooks - visited:
                try:
                    for child in ansiblelint.utils.find_children(arg, self.playbook_dir):
                        if self.is_excluded(child['path']):
                            continue
                        self.playbooks.add((child['path'], child['type']))
                        files.append(child)
                except MatchError as e:
                    e.rule = LoadingFailureRule
                    yield e
                visited.add(arg)