diff options
Diffstat (limited to 'lib/ansiblelint/rules/__init__.py')
-rw-r--r-- | lib/ansiblelint/rules/__init__.py | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/lib/ansiblelint/rules/__init__.py b/lib/ansiblelint/rules/__init__.py new file mode 100644 index 0000000..fd3e92d --- /dev/null +++ b/lib/ansiblelint/rules/__init__.py @@ -0,0 +1,254 @@ +"""All internal ansible-lint rules.""" +import glob +import importlib.util +import logging +import os +import re +from collections import defaultdict +from importlib.abc import Loader +from time import sleep +from typing import List + +import ansiblelint.utils +from ansiblelint.errors import MatchError +from ansiblelint.skip_utils import append_skipped_rules, get_rule_skips_from_line + +_logger = logging.getLogger(__name__) + + +class AnsibleLintRule(object): + + def __repr__(self) -> str: + """Return a AnsibleLintRule instance representation.""" + return self.id + ": " + self.shortdesc + + def verbose(self) -> str: + return self.id + ": " + self.shortdesc + "\n " + self.description + + id: str = "" + tags: List[str] = [] + shortdesc: str = "" + description: str = "" + version_added: str = "" + severity: str = "" + match = None + matchtask = None + matchplay = None + + @staticmethod + def unjinja(text): + text = re.sub(r"{{.+?}}", "JINJA_EXPRESSION", text) + text = re.sub(r"{%.+?%}", "JINJA_STATEMENT", text) + text = re.sub(r"{#.+?#}", "JINJA_COMMENT", text) + return text + + def matchlines(self, file, text) -> List[MatchError]: + matches: List[MatchError] = [] + if not self.match: + return matches + # arrays are 0-based, line numbers are 1-based + # so use prev_line_no as the counter + for (prev_line_no, line) in enumerate(text.split("\n")): + if line.lstrip().startswith('#'): + continue + + rule_id_list = get_rule_skips_from_line(line) + if self.id in rule_id_list: + continue + + result = self.match(file, line) + if not result: + continue + message = None + if isinstance(result, str): + message = result + m = MatchError( + message=message, + linenumber=prev_line_no + 1, + details=line, + filename=file['path'], + rule=self) + matches.append(m) + return matches + + # TODO(ssbarnea): Reduce mccabe complexity + # https://github.com/ansible/ansible-lint/issues/744 + def matchtasks(self, file: str, text: str) -> List[MatchError]: # noqa: C901 + matches: List[MatchError] = [] + if not self.matchtask: + return matches + + if file['type'] == 'meta': + return matches + + yaml = ansiblelint.utils.parse_yaml_linenumbers(text, file['path']) + if not yaml: + return matches + + yaml = append_skipped_rules(yaml, text, file['type']) + + try: + tasks = ansiblelint.utils.get_normalized_tasks(yaml, file) + except MatchError as e: + return [e] + + for task in tasks: + if self.id in task.get('skipped_rules', ()): + continue + + if 'action' not in task: + continue + result = self.matchtask(file, task) + if not result: + continue + + message = None + if isinstance(result, str): + message = result + task_msg = "Task/Handler: " + ansiblelint.utils.task_to_str(task) + m = MatchError( + message=message, + linenumber=task[ansiblelint.utils.LINE_NUMBER_KEY], + details=task_msg, + filename=file['path'], + rule=self) + matches.append(m) + return matches + + @staticmethod + def _matchplay_linenumber(play, optional_linenumber): + try: + linenumber, = optional_linenumber + except ValueError: + linenumber = play[ansiblelint.utils.LINE_NUMBER_KEY] + return linenumber + + def matchyaml(self, file: str, text: str) -> List[MatchError]: + matches: List[MatchError] = [] + if not self.matchplay: + return matches + + yaml = ansiblelint.utils.parse_yaml_linenumbers(text, file['path']) + if not yaml: + return matches + + if isinstance(yaml, dict): + yaml = [yaml] + + yaml = ansiblelint.skip_utils.append_skipped_rules(yaml, text, file['type']) + + for play in yaml: + if self.id in play.get('skipped_rules', ()): + continue + + result = self.matchplay(file, play) + if not result: + continue + + if isinstance(result, tuple): + result = [result] + + if not isinstance(result, list): + raise TypeError("{} is not a list".format(result)) + + for section, message, *optional_linenumber in result: + linenumber = self._matchplay_linenumber(play, optional_linenumber) + m = MatchError( + message=message, + linenumber=linenumber, + details=str(section), + filename=file['path'], + rule=self) + matches.append(m) + return matches + + +def load_plugins(directory: str) -> List[AnsibleLintRule]: + """Return a list of rule classes.""" + result = [] + + for pluginfile in glob.glob(os.path.join(directory, '[A-Za-z]*.py')): + + pluginname = os.path.basename(pluginfile.replace('.py', '')) + spec = importlib.util.spec_from_file_location(pluginname, pluginfile) + # https://github.com/python/typeshed/issues/2793 + if spec and isinstance(spec.loader, Loader): + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + obj = getattr(module, pluginname)() + result.append(obj) + return result + + +class RulesCollection(object): + + def __init__(self, rulesdirs=None) -> None: + """Initialize a RulesCollection instance.""" + if rulesdirs is None: + rulesdirs = [] + self.rulesdirs = ansiblelint.utils.expand_paths_vars(rulesdirs) + self.rules: List[AnsibleLintRule] = [] + for rulesdir in self.rulesdirs: + _logger.debug("Loading rules from %s", rulesdir) + self.extend(load_plugins(rulesdir)) + self.rules = sorted(self.rules, key=lambda r: r.id) + + def register(self, obj: AnsibleLintRule): + self.rules.append(obj) + + def __iter__(self): + """Return the iterator over the rules in the RulesCollection.""" + return iter(self.rules) + + def __len__(self): + """Return the length of the RulesCollection data.""" + return len(self.rules) + + def extend(self, more: List[AnsibleLintRule]) -> None: + self.rules.extend(more) + + def run(self, playbookfile, tags=set(), skip_list=frozenset()) -> List: + text = "" + matches: List = list() + + for i in range(3): + try: + with open(playbookfile['path'], mode='r', encoding='utf-8') as f: + text = f.read() + break + except IOError as e: + _logger.warning( + "Couldn't open %s - %s [try:%s]", + playbookfile['path'], + e.strerror, + i) + sleep(1) + continue + if i and not text: + return matches + + for rule in self.rules: + if not tags or not set(rule.tags).union([rule.id]).isdisjoint(tags): + rule_definition = set(rule.tags) + rule_definition.add(rule.id) + if set(rule_definition).isdisjoint(skip_list): + matches.extend(rule.matchlines(playbookfile, text)) + matches.extend(rule.matchtasks(playbookfile, text)) + matches.extend(rule.matchyaml(playbookfile, text)) + + return matches + + def __repr__(self) -> str: + """Return a RulesCollection instance representation.""" + return "\n".join([rule.verbose() + for rule in sorted(self.rules, key=lambda x: x.id)]) + + def listtags(self) -> str: + tags = defaultdict(list) + for rule in self.rules: + for tag in rule.tags: + tags[tag].append("[{0}]".format(rule.id)) + results = [] + for tag in sorted(tags): + results.append("{0} {1}".format(tag, tags[tag])) + return "\n".join(results) |