diff options
Diffstat (limited to 'lib/ansiblelint')
52 files changed, 4092 insertions, 0 deletions
diff --git a/lib/ansiblelint/__init__.py b/lib/ansiblelint/__init__.py new file mode 100644 index 0000000..d8a4cef --- /dev/null +++ b/lib/ansiblelint/__init__.py @@ -0,0 +1,28 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Main ansible-lint package.""" + +from ansiblelint.rules import AnsibleLintRule +from ansiblelint.version import __version__ + +__all__ = ( + "__version__", + "AnsibleLintRule" # deprecated, import it directly from rules +) diff --git a/lib/ansiblelint/__main__.py b/lib/ansiblelint/__main__.py new file mode 100755 index 0000000..ff8d477 --- /dev/null +++ b/lib/ansiblelint/__main__.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Command line implementation.""" + +import errno +import logging +import os +import pathlib +import subprocess +import sys +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any, List, Set, Type, Union + +from rich.markdown import Markdown + +from ansiblelint import cli, formatters +from ansiblelint.color import console, console_stderr +from ansiblelint.file_utils import cwd +from ansiblelint.generate_docs import rules_as_rich, rules_as_rst +from ansiblelint.rules import RulesCollection +from ansiblelint.runner import Runner +from ansiblelint.utils import get_playbooks_and_roles, get_rules_dirs + +if TYPE_CHECKING: + from argparse import Namespace + + from ansiblelint.errors import MatchError + +_logger = logging.getLogger(__name__) + +_rule_format_map = { + 'plain': str, + 'rich': rules_as_rich, + 'rst': rules_as_rst +} + + +def initialize_logger(level: int = 0) -> None: + """Set up the global logging level based on the verbosity number.""" + VERBOSITY_MAP = { + 0: logging.NOTSET, + 1: logging.INFO, + 2: logging.DEBUG + } + + handler = logging.StreamHandler() + formatter = logging.Formatter('%(levelname)-8s %(message)s') + handler.setFormatter(formatter) + logger = logging.getLogger(__package__) + logger.addHandler(handler) + # Unknown logging level is treated as DEBUG + logging_level = VERBOSITY_MAP.get(level, logging.DEBUG) + logger.setLevel(logging_level) + # Use module-level _logger instance to validate it + _logger.debug("Logging initialized to level %s", logging_level) + + +def choose_formatter_factory( + options_list: "Namespace" +) -> Type[formatters.BaseFormatter]: + """Select an output formatter based on the incoming command line arguments.""" + r: Type[formatters.BaseFormatter] = formatters.Formatter + if options_list.quiet: + r = formatters.QuietFormatter + elif options_list.parseable: + r = formatters.ParseableFormatter + elif options_list.parseable_severity: + r = formatters.ParseableSeverityFormatter + return r + + +def report_outcome(matches: List["MatchError"], options) -> int: + """Display information about how to skip found rules. + + Returns exit code, 2 if errors were found, 0 when only warnings were found. + """ + failure = False + msg = """\ +You can skip specific rules or tags by adding them to your configuration file: +```yaml +# .ansible-lint +warn_list: # or 'skip_list' to silence them completely +""" + matches_unignored = [match for match in matches if not match.ignored] + + matched_rules = {match.rule.id: match.rule for match in matches_unignored} + for id in sorted(matched_rules.keys()): + if {id, *matched_rules[id].tags}.isdisjoint(options.warn_list): + msg += f" - '{id}' # {matched_rules[id].shortdesc}\n" + failure = True + for match in matches: + if "experimental" in match.rule.tags: + msg += " - experimental # all rules tagged as experimental\n" + break + msg += "```" + + if matches and not options.quiet: + console_stderr.print(Markdown(msg)) + + if failure: + return 2 + else: + return 0 + + +def main() -> int: + """Linter CLI entry point.""" + cwd = pathlib.Path.cwd() + + options = cli.get_config(sys.argv[1:]) + + initialize_logger(options.verbosity) + _logger.debug("Options: %s", options) + + formatter_factory = choose_formatter_factory(options) + formatter = formatter_factory(cwd, options.display_relative_path) + + rulesdirs = get_rules_dirs([str(rdir) for rdir in options.rulesdir], + options.use_default_rules) + rules = RulesCollection(rulesdirs) + + if options.listrules: + console.print( + _rule_format_map[options.format](rules), + highlight=False) + return 0 + + if options.listtags: + print(rules.listtags()) + return 0 + + if isinstance(options.tags, str): + options.tags = options.tags.split(',') + + skip = set() + for s in options.skip_list: + skip.update(str(s).split(',')) + options.skip_list = frozenset(skip) + + matches = _get_matches(rules, options) + + # Assure we do not print duplicates and the order is consistent + matches = sorted(set(matches)) + + mark_as_success = False + if matches and options.progressive: + _logger.info( + "Matches found, running again on previous revision in order to detect regressions") + with _previous_revision(): + old_matches = _get_matches(rules, options) + # remove old matches from current list + matches_delta = list(set(matches) - set(old_matches)) + if len(matches_delta) == 0: + _logger.warning( + "Total violations not increased since previous " + "commit, will mark result as success. (%s -> %s)", + len(old_matches), len(matches_delta)) + mark_as_success = True + + ignored = 0 + for match in matches: + # if match is not new, mark is as ignored + if match not in matches_delta: + match.ignored = True + ignored += 1 + if ignored: + _logger.warning( + "Marked %s previously known violation(s) as ignored due to" + " progressive mode.", ignored) + + _render_matches(matches, options, formatter, cwd) + + if matches and not mark_as_success: + return report_outcome(matches, options=options) + else: + return 0 + + +def _render_matches( + matches: List, + options: "Namespace", + formatter: Any, + cwd: Union[str, pathlib.Path]): + + ignored_matches = [match for match in matches if match.ignored] + fatal_matches = [match for match in matches if not match.ignored] + # Displayed ignored matches first + if ignored_matches: + _logger.warning( + "Listing %s violation(s) marked as ignored, likely already known", + len(ignored_matches)) + for match in ignored_matches: + if match.ignored: + print(formatter.format(match, options.colored)) + if fatal_matches: + _logger.warning("Listing %s violation(s) that are fatal", len(fatal_matches)) + for match in fatal_matches: + if not match.ignored: + print(formatter.format(match, options.colored)) + + # If run under GitHub Actions we also want to emit output recognized by it. + if os.getenv('GITHUB_ACTIONS') == 'true' and os.getenv('GITHUB_WORKFLOW'): + formatter = formatters.AnnotationsFormatter(cwd, True) + for match in matches: + print(formatter.format(match)) + + +def _get_matches(rules: RulesCollection, options: "Namespace") -> list: + + if not options.playbook: + # no args triggers auto-detection mode + playbooks = get_playbooks_and_roles(options=options) + else: + playbooks = sorted(set(options.playbook)) + + matches = list() + checked_files: Set[str] = set() + for playbook in playbooks: + runner = Runner(rules, playbook, options.tags, + options.skip_list, options.exclude_paths, + options.verbosity, checked_files) + matches.extend(runner.run()) + return matches + + +@contextmanager +def _previous_revision(): + """Create or update a temporary workdir containing the previous revision.""" + worktree_dir = ".cache/old-rev" + revision = subprocess.run( + ["git", "rev-parse", "HEAD^1"], + check=True, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ).stdout + p = pathlib.Path(worktree_dir) + p.mkdir(parents=True, exist_ok=True) + os.system(f"git worktree add -f {worktree_dir} 2>/dev/null") + with cwd(worktree_dir): + os.system(f"git checkout {revision}") + yield + + +if __name__ == "__main__": + try: + sys.exit(main()) + except IOError as exc: + if exc.errno != errno.EPIPE: + raise + except RuntimeError as e: + raise SystemExit(str(e)) diff --git a/lib/ansiblelint/cli.py b/lib/ansiblelint/cli.py new file mode 100644 index 0000000..6b39561 --- /dev/null +++ b/lib/ansiblelint/cli.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +"""CLI parser setup and helpers.""" +import argparse +import logging +import os +import sys +from pathlib import Path +from typing import List, NamedTuple + +import yaml + +from ansiblelint.constants import DEFAULT_RULESDIR, INVALID_CONFIG_RC +from ansiblelint.utils import expand_path_vars +from ansiblelint.version import __version__ + +_logger = logging.getLogger(__name__) +_PATH_VARS = ['exclude_paths', 'rulesdir', ] + + +def abspath(path: str, base_dir: str) -> str: + """Make relative path absolute relative to given directory. + + Args: + path (str): the path to make absolute + base_dir (str): the directory from which make relative paths + absolute + default_drive: Windows drive to use to make the path + absolute if none is given. + """ + if not os.path.isabs(path): + # Don't use abspath as it assumes path is relative to cwd. + # We want it relative to base_dir. + path = os.path.join(base_dir, path) + + return os.path.normpath(path) + + +def expand_to_normalized_paths(config: dict, base_dir: str = None) -> None: + # config can be None (-c /dev/null) + if not config: + return + base_dir = base_dir or os.getcwd() + for paths_var in _PATH_VARS: + if paths_var not in config: + continue # Cause we don't want to add a variable not present + + normalized_paths = [] + for path in config.pop(paths_var): + normalized_path = abspath(expand_path_vars(path), base_dir=base_dir) + + normalized_paths.append(normalized_path) + + config[paths_var] = normalized_paths + + +def load_config(config_file: str) -> dict: + config_path = os.path.abspath(config_file or '.ansible-lint') + + if config_file: + if not os.path.exists(config_path): + _logger.error("Config file not found '%s'", config_path) + sys.exit(INVALID_CONFIG_RC) + elif not os.path.exists(config_path): + # a missing default config file should not trigger an error + return {} + + try: + with open(config_path, "r") as stream: + config = yaml.safe_load(stream) + except yaml.YAMLError as e: + _logger.error(e) + sys.exit(INVALID_CONFIG_RC) + # TODO(ssbarnea): implement schema validation for config file + if isinstance(config, list): + _logger.error( + "Invalid configuration '%s', expected YAML mapping in the config file.", + config_path) + sys.exit(INVALID_CONFIG_RC) + + config_dir = os.path.dirname(config_path) + expand_to_normalized_paths(config, config_dir) + return config + + +class AbspathArgAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + if isinstance(values, (str, Path)): + values = [values] + normalized_values = [Path(expand_path_vars(path)).resolve() for path in values] + previous_values = getattr(namespace, self.dest, []) + setattr(namespace, self.dest, previous_values + normalized_values) + + +def get_cli_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + + parser.add_argument('-L', dest='listrules', default=False, + action='store_true', help="list all the rules") + parser.add_argument('-f', dest='format', default='rich', + choices=['rich', 'plain', 'rst'], + help="Format used rules output, (default: %(default)s)") + parser.add_argument('-q', dest='quiet', + default=False, + action='store_true', + help="quieter, although not silent output") + parser.add_argument('-p', dest='parseable', + default=False, + action='store_true', + help="parseable output in the format of pep8") + parser.add_argument('--parseable-severity', dest='parseable_severity', + default=False, + action='store_true', + help="parseable output including severity of rule") + parser.add_argument('--progressive', dest='progressive', + default=False, + action='store_true', + help="Return success if it detects a reduction in number" + " of violations compared with previous git commit. This " + "feature works only in git repositories.") + parser.add_argument('-r', action=AbspathArgAction, dest='rulesdir', + default=[], type=Path, + help="Specify custom rule directories. Add -R " + f"to keep using embedded rules from {DEFAULT_RULESDIR}") + parser.add_argument('-R', action='store_true', + default=False, + dest='use_default_rules', + help="Keep default rules when using -r") + parser.add_argument('--show-relpath', dest='display_relative_path', action='store_false', + default=True, + help="Display path relative to CWD") + parser.add_argument('-t', dest='tags', + action='append', + default=[], + help="only check rules whose id/tags match these values") + parser.add_argument('-T', dest='listtags', action='store_true', + help="list all the tags") + parser.add_argument('-v', dest='verbosity', action='count', + help="Increase verbosity level", + default=0) + parser.add_argument('-x', dest='skip_list', default=[], action='append', + help="only check rules whose id/tags do not " + "match these values") + parser.add_argument('-w', dest='warn_list', default=[], action='append', + help="only warn about these rules, unless overridden in " + "config file defaults to 'experimental'") + parser.add_argument('--nocolor', dest='colored', + default=hasattr(sys.stdout, 'isatty') and sys.stdout.isatty(), + action='store_false', + help="disable colored output") + parser.add_argument('--force-color', dest='colored', + action='store_true', + help="Try force colored output (relying on ansible's code)") + parser.add_argument('--exclude', dest='exclude_paths', + action=AbspathArgAction, + type=Path, default=[], + help='path to directories or files to skip. ' + 'This option is repeatable.', + ) + parser.add_argument('-c', dest='config_file', + help='Specify configuration file to use. ' + 'Defaults to ".ansible-lint"') + parser.add_argument('--version', action='version', + version='%(prog)s {ver!s}'.format(ver=__version__), + ) + parser.add_argument(dest='playbook', nargs='*', + help="One or more files or paths. When missing it will " + " enable auto-detection mode.") + + return parser + + +def merge_config(file_config, cli_config) -> NamedTuple: + bools = ( + 'display_relative_path', + 'parseable', + 'parseable_severity', + 'quiet', + 'use_default_rules', + ) + # maps lists to their default config values + lists_map = { + 'exclude_paths': [], + 'rulesdir': [], + 'skip_list': [], + 'tags': [], + 'warn_list': ['experimental'], + } + + if not file_config: + return cli_config + + for entry in bools: + x = getattr(cli_config, entry) or file_config.get(entry, False) + setattr(cli_config, entry, x) + + for entry, default in lists_map.items(): + getattr(cli_config, entry).extend(file_config.get(entry, default)) + + if 'verbosity' in file_config: + cli_config.verbosity = (cli_config.verbosity + + file_config['verbosity']) + + return cli_config + + +def get_config(arguments: List[str]): + parser = get_cli_parser() + options = parser.parse_args(arguments) + + config = load_config(options.config_file) + + return merge_config(config, options) + + +def print_help(file=sys.stdout): + get_cli_parser().print_help(file=file) + + +# vim: et:sw=4:syntax=python:ts=4: diff --git a/lib/ansiblelint/color.py b/lib/ansiblelint/color.py new file mode 100644 index 0000000..b30a89c --- /dev/null +++ b/lib/ansiblelint/color.py @@ -0,0 +1,31 @@ +"""Console coloring and terminal support.""" +import sys +from enum import Enum + +from rich.console import Console +from rich.theme import Theme + +_theme = Theme({ + "info": "cyan", + "warning": "dim yellow", + "danger": "bold red", + "title": "yellow" +}) +console = Console(theme=_theme) +console_stderr = Console(file=sys.stderr, theme=_theme) + + +class Color(Enum): + """Color styles.""" + + reset = "0" + error_code = "1;31" # bright red + error_title = "0;31" # red + filename = "0;34" # blue + linenumber = "0;36" # cyan + line = "0;35" # purple + + +def colorize(text: str, color: Color) -> str: + """Return ANSI formated string.""" + return f"\u001b[{color.value}m{text}\u001b[{Color.reset.value}m" diff --git a/lib/ansiblelint/constants.py b/lib/ansiblelint/constants.py new file mode 100644 index 0000000..89094f9 --- /dev/null +++ b/lib/ansiblelint/constants.py @@ -0,0 +1,18 @@ +"""Constants used by AnsibleLint.""" +import os.path +import sys + +# mypy/pylint idiom for py36-py38 compatibility +# https://github.com/python/typeshed/issues/3500#issuecomment-560958608 +if sys.version_info >= (3, 8): + from typing import Literal # pylint: disable=no-name-in-module +else: + from typing_extensions import Literal + +DEFAULT_RULESDIR = os.path.join(os.path.dirname(__file__), 'rules') +CUSTOM_RULESDIR_ENVVAR = "ANSIBLE_LINT_CUSTOM_RULESDIR" + +INVALID_CONFIG_RC = 2 +ANSIBLE_FAILURE_RC = 3 + +FileType = Literal["playbook", "pre_tasks", "post_tasks"] diff --git a/lib/ansiblelint/errors.py b/lib/ansiblelint/errors.py new file mode 100644 index 0000000..8569dca --- /dev/null +++ b/lib/ansiblelint/errors.py @@ -0,0 +1,81 @@ +"""Exceptions and error representations.""" +import functools + +from ansiblelint.file_utils import normpath + + +@functools.total_ordering +class MatchError(ValueError): + """Rule violation detected during linting. + + It can be raised as Exception but also just added to the list of found + rules violations. + + Note that line argument is not considered when building hash of an + instance. + """ + + # IMPORTANT: any additional comparison protocol methods must return + # IMPORTANT: `NotImplemented` singleton to allow the check to use the + # IMPORTANT: other object's fallbacks. + # Ref: https://docs.python.org/3/reference/datamodel.html#object.__lt__ + + def __init__( + self, + message=None, + linenumber=0, + details: str = "", + filename=None, + rule=None) -> None: + """Initialize a MatchError instance.""" + super().__init__(message) + + if not (message or rule): + raise TypeError( + f'{self.__class__.__name__}() missing a ' + "required argument: one of 'message' or 'rule'", + ) + + self.message = message or getattr(rule, 'shortdesc', "") + self.linenumber = linenumber + self.details = details + self.filename = normpath(filename) if filename else None + self.rule = rule + self.ignored = False # If set it will be displayed but not counted as failure + + def __repr__(self): + """Return a MatchError instance representation.""" + formatstr = u"[{0}] ({1}) matched {2}:{3} {4}" + # note that `rule.id` can be int, str or even missing, as users + # can defined their own custom rules. + _id = getattr(self.rule, "id", "000") + + return formatstr.format(_id, self.message, + self.filename, self.linenumber, self.details) + + @property + def _hash_key(self): + # line attr is knowingly excluded, as dict is not hashable + return ( + self.filename, + self.linenumber, + str(getattr(self.rule, 'id', 0)), + self.message, + self.details, + ) + + def __lt__(self, other): + """Return whether the current object is less than the other.""" + if not isinstance(other, self.__class__): + return NotImplemented + return self._hash_key < other._hash_key + + def __hash__(self): + """Return a hash value of the MatchError instance.""" + return hash(self._hash_key) + + def __eq__(self, other): + """Identify whether the other object represents the same rule match.""" + if not isinstance(other, self.__class__): + return NotImplemented + return self.__hash__() == other.__hash__() diff --git a/lib/ansiblelint/file_utils.py b/lib/ansiblelint/file_utils.py new file mode 100644 index 0000000..f25382f --- /dev/null +++ b/lib/ansiblelint/file_utils.py @@ -0,0 +1,25 @@ +"""Utility functions related to file operations.""" +import os +from contextlib import contextmanager + + +def normpath(path) -> str: + """ + Normalize a path in order to provide a more consistent output. + + Currently it generates a relative path but in the future we may want to + make this user configurable. + """ + # convertion to string in order to allow receiving non string objects + return os.path.relpath(str(path)) + + +@contextmanager +def cwd(path): + """Context manager for temporary changing current working directory.""" + old_pwd = os.getcwd() + os.chdir(path) + try: + yield + finally: + os.chdir(old_pwd) diff --git a/lib/ansiblelint/formatters/__init__.py b/lib/ansiblelint/formatters/__init__.py new file mode 100644 index 0000000..7395183 --- /dev/null +++ b/lib/ansiblelint/formatters/__init__.py @@ -0,0 +1,167 @@ +"""Output formatters.""" +import os +from pathlib import Path +from typing import TYPE_CHECKING, Generic, TypeVar, Union + +from ansiblelint.color import Color, colorize + +if TYPE_CHECKING: + from ansiblelint.errors import MatchError + +T = TypeVar('T', bound='BaseFormatter') + + +class BaseFormatter(Generic[T]): + """Formatter of ansible-lint output. + + Base class for output formatters. + + Args: + base_dir (str|Path): reference directory against which display relative path. + display_relative_path (bool): whether to show path as relative or absolute + """ + + def __init__(self, base_dir: Union[str, Path], display_relative_path: bool) -> None: + """Initialize a BaseFormatter instance.""" + if isinstance(base_dir, str): + base_dir = Path(base_dir) + if base_dir: # can be None + base_dir = base_dir.absolute() + + # Required 'cause os.path.relpath() does not accept Path before 3.6 + if isinstance(base_dir, Path): + base_dir = str(base_dir) # Drop when Python 3.5 is no longer supported + + self._base_dir = base_dir if display_relative_path else None + + def _format_path(self, path: Union[str, Path]) -> str: + # Required 'cause os.path.relpath() does not accept Path before 3.6 + if isinstance(path, Path): + path = str(path) # Drop when Python 3.5 is no longer supported + + if not self._base_dir: + return path + # Use os.path.relpath 'cause Path.relative_to() misbehaves + return os.path.relpath(path, start=self._base_dir) + + def format(self, match: "MatchError", colored: bool = False) -> str: + return str(match) + + +class Formatter(BaseFormatter): + + def format(self, match: "MatchError", colored: bool = False) -> str: + formatstr = u"{0} {1}\n{2}:{3}\n{4}\n" + _id = getattr(match.rule, 'id', '000') + if colored: + return formatstr.format( + colorize(u"[{0}]".format(_id), Color.error_code), + colorize(match.message, Color.error_title), + colorize(self._format_path(match.filename or ""), Color.filename), + colorize(str(match.linenumber), Color.linenumber), + colorize(u"{0}".format(match.details), Color.line)) + else: + return formatstr.format(_id, + match.message, + match.filename or "", + match.linenumber, + match.details) + + +class QuietFormatter(BaseFormatter): + + def format(self, match: "MatchError", colored: bool = False) -> str: + formatstr = u"{0} {1}:{2}" + if colored: + return formatstr.format( + colorize(u"[{0}]".format(match.rule.id), Color.error_code), + colorize(self._format_path(match.filename or ""), Color.filename), + colorize(str(match.linenumber), Color.linenumber)) + else: + return formatstr.format(match.rule.id, self._format_path(match.filename or ""), + match.linenumber) + + +class ParseableFormatter(BaseFormatter): + + def format(self, match: "MatchError", colored: bool = False) -> str: + formatstr = u"{0}:{1}: [{2}] {3}" + if colored: + return formatstr.format( + colorize(self._format_path(match.filename or ""), Color.filename), + colorize(str(match.linenumber), Color.linenumber), + colorize(u"E{0}".format(match.rule.id), Color.error_code), + colorize(u"{0}".format(match.message), Color.error_title)) + else: + return formatstr.format(self._format_path(match.filename or ""), + match.linenumber, + "E" + match.rule.id, + match.message) + + +class AnnotationsFormatter(BaseFormatter): + # https://docs.github.com/en/actions/reference/workflow-commands-for-github-actions#setting-a-warning-message + """Formatter for emitting violations as GitHub Workflow Commands. + + These commands trigger the GHA Workflow runners platform to post violations + in a form of GitHub Checks API annotations that appear rendered in pull- + request files view. + + ::debug file={name},line={line},col={col},severity={severity}::{message} + ::warning file={name},line={line},col={col},severity={severity}::{message} + ::error file={name},line={line},col={col},severity={severity}::{message} + + Supported levels: debug, warning, error + """ + + def format(self, match: "MatchError", colored: bool = False) -> str: + """Prepare a match instance for reporting as a GitHub Actions annotation.""" + if colored: + raise ValueError('The colored mode is not supported.') + + level = self._severity_to_level(match.rule.severity) + file_path = self._format_path(match.filename or "") + line_num = match.linenumber + rule_id = match.rule.id + severity = match.rule.severity + violation_details = match.message + return ( + f"::{level} file={file_path},line={line_num},severity={severity}" + f"::[E{rule_id}] {violation_details}" + ) + + @staticmethod + def _severity_to_level(severity: str) -> str: + if severity in ['VERY_LOW', 'LOW']: + return 'warning' + elif severity in ['INFO']: + return 'debug' + # ['MEDIUM', 'HIGH', 'VERY_HIGH'] or anything else + return 'error' + + +class ParseableSeverityFormatter(BaseFormatter): + + def format(self, match: "MatchError", colored: bool = False) -> str: + formatstr = u"{0}:{1}: [{2}] [{3}] {4}" + + filename = self._format_path(match.filename or "") + linenumber = str(match.linenumber) + rule_id = u"E{0}".format(match.rule.id) + severity = match.rule.severity + message = str(match.message) + + if colored: + filename = colorize(filename, Color.filename) + linenumber = colorize(linenumber, Color.linenumber) + rule_id = colorize(rule_id, Color.error_code) + severity = colorize(severity, Color.error_code) + message = colorize(message, Color.error_title) + + return formatstr.format( + filename, + linenumber, + rule_id, + severity, + message, + ) diff --git a/lib/ansiblelint/generate_docs.py b/lib/ansiblelint/generate_docs.py new file mode 100644 index 0000000..b735b1f --- /dev/null +++ b/lib/ansiblelint/generate_docs.py @@ -0,0 +1,66 @@ +"""Utils to generate rule table .rst documentation.""" +import logging +from typing import Iterable + +from rich import box +from rich.console import render_group +from rich.markdown import Markdown +from rich.table import Table + +from ansiblelint.rules import RulesCollection + +DOC_HEADER = """ +.. _lint_default_rules: + +Default Rules +============= + +.. contents:: + :local: + +Below you can see the list of default rules Ansible Lint use to evaluate playbooks and roles: + +""" + +_logger = logging.getLogger(__name__) + + +def rules_as_rst(rules: RulesCollection) -> str: + """Return RST documentation for a list of rules.""" + r = DOC_HEADER + + for d in rules: + if not hasattr(d, 'id'): + _logger.warning( + "Rule %s skipped from being documented as it does not have an `id` attribute.", + d.__class__.__name__) + continue + + if d.id.endswith('01'): + + section = '{} Rules ({}xx)'.format( + d.tags[0].title(), + d.id[-3:-2]) + r += f'\n\n{section}\n{ "-" * len(section) }' + + title = f"{d.id}: {d.shortdesc}" + r += f"\n\n.. _{d.id}:\n\n{title}\n{'*' * len(title)}\n\n{d.description}" + + return r + + +@render_group() +def rules_as_rich(rules: RulesCollection) -> Iterable[Table]: + """Print documentation for a list of rules, returns empty string.""" + for rule in rules: + table = Table(show_header=True, header_style="title", box=box.MINIMAL) + table.add_column(rule.id, style="dim", width=16) + table.add_column(Markdown(rule.shortdesc)) + table.add_row("description", Markdown(rule.description)) + if rule.version_added: + table.add_row("version_added", rule.version_added) + if rule.tags: + table.add_row("tags", ", ".join(rule.tags)) + if rule.severity: + table.add_row("severity", rule.severity) + yield table diff --git a/lib/ansiblelint/rules/AlwaysRunRule.py b/lib/ansiblelint/rules/AlwaysRunRule.py new file mode 100644 index 0000000..8d811ff --- /dev/null +++ b/lib/ansiblelint/rules/AlwaysRunRule.py @@ -0,0 +1,33 @@ +# Copyright (c) 2017 Anth Courtney <anthcourtney@gmail.com> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class AlwaysRunRule(AnsibleLintRule): + id = '101' + shortdesc = 'Deprecated always_run' + description = 'Instead of ``always_run``, use ``check_mode``' + severity = 'MEDIUM' + tags = ['deprecated', 'ANSIBLE0018'] + version_added = 'historic' + + def matchtask(self, file, task): + return 'always_run' in task diff --git a/lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py b/lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py new file mode 100644 index 0000000..e6f3259 --- /dev/null +++ b/lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py @@ -0,0 +1,80 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from functools import reduce + +from ansiblelint.rules import AnsibleLintRule + + +def _get_subtasks(data): + result = [] + block_names = [ + 'tasks', + 'pre_tasks', + 'post_tasks', + 'handlers', + 'block', + 'always', + 'rescue'] + for name in block_names: + if data and name in data: + result += (data[name] or []) + return result + + +def _nested_search(term, data): + if data and term in data: + return True + return reduce((lambda x, y: x or _nested_search(term, y)), _get_subtasks(data), False) + + +def _become_user_without_become(becomeuserabove, data): + if 'become' in data: + # If become is in lineage of tree then correct + return False + if ('become_user' in data and _nested_search('become', data)): + # If 'become_user' on tree and become somewhere below + # we must check for a case of a second 'become_user' without a + # 'become' in its lineage + subtasks = _get_subtasks(data) + return reduce((lambda x, y: x or _become_user_without_become(False, y)), subtasks, False) + if _nested_search('become_user', data): + # Keep searching down if 'become_user' exists in the tree below current task + subtasks = _get_subtasks(data) + return (len(subtasks) == 0 or + reduce((lambda x, y: x or + _become_user_without_become( + becomeuserabove or 'become_user' in data, y)), subtasks, False)) + # If at bottom of tree, flag up if 'become_user' existed in the lineage of the tree and + # 'become' was not. This is an error if any lineage has a 'become_user' but no become + return becomeuserabove + + +class BecomeUserWithoutBecomeRule(AnsibleLintRule): + id = '501' + shortdesc = 'become_user requires become to work as expected' + description = '``become_user`` without ``become`` will not actually change user' + severity = 'VERY_HIGH' + tags = ['task', 'oddity', 'ANSIBLE0017'] + version_added = 'historic' + + def matchplay(self, file, data): + if file['type'] == 'playbook' and _become_user_without_become(False, data): + return ({'become_user': data}, self.shortdesc) diff --git a/lib/ansiblelint/rules/CommandHasChangesCheckRule.py b/lib/ansiblelint/rules/CommandHasChangesCheckRule.py new file mode 100644 index 0000000..26087b8 --- /dev/null +++ b/lib/ansiblelint/rules/CommandHasChangesCheckRule.py @@ -0,0 +1,45 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class CommandHasChangesCheckRule(AnsibleLintRule): + id = '301' + shortdesc = 'Commands should not change things if nothing needs doing' + description = ( + 'Commands should either read information (and thus set ' + '``changed_when``) or not do something if it has already been ' + 'done (using creates/removes) or only do it if another ' + 'check has a particular result (``when``)' + ) + severity = 'HIGH' + tags = ['command-shell', 'idempotency', 'ANSIBLE0012'] + version_added = 'historic' + + _commands = ['command', 'shell', 'raw'] + + def matchtask(self, file, task): + if task["__ansible_action_type__"] == 'task': + if task["action"]["__ansible_module__"] in self._commands: + return 'changed_when' not in task and \ + 'when' not in task and \ + 'creates' not in task['action'] and \ + 'removes' not in task['action'] diff --git a/lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py b/lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py new file mode 100644 index 0000000..f1adffa --- /dev/null +++ b/lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py @@ -0,0 +1,65 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import os + +from ansiblelint.rules import AnsibleLintRule +from ansiblelint.utils import get_first_cmd_arg + +try: + from ansible.module_utils.parsing.convert_bool import boolean +except ImportError: + try: + from ansible.utils.boolean import boolean + except ImportError: + try: + from ansible.utils import boolean + except ImportError: + from ansible import constants + boolean = constants.mk_boolean + + +class CommandsInsteadOfArgumentsRule(AnsibleLintRule): + id = '302' + shortdesc = 'Using command rather than an argument to e.g. file' + description = ( + 'Executing a command when there are arguments to modules ' + 'is generally a bad idea' + ) + severity = 'VERY_HIGH' + tags = ['command-shell', 'resources', 'ANSIBLE0007'] + version_added = 'historic' + + _commands = ['command', 'shell', 'raw'] + _arguments = {'chown': 'owner', 'chmod': 'mode', 'chgrp': 'group', + 'ln': 'state=link', 'mkdir': 'state=directory', + 'rmdir': 'state=absent', 'rm': 'state=absent'} + + def matchtask(self, file, task): + if task["action"]["__ansible_module__"] in self._commands: + first_cmd_arg = get_first_cmd_arg(task) + if not first_cmd_arg: + return + + executable = os.path.basename(first_cmd_arg) + if executable in self._arguments and \ + boolean(task['action'].get('warn', True)): + message = "{0} used in place of argument {1} to file module" + return message.format(executable, self._arguments[executable]) diff --git a/lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py b/lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py new file mode 100644 index 0000000..b19c5c2 --- /dev/null +++ b/lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py @@ -0,0 +1,86 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import os + +from ansiblelint.rules import AnsibleLintRule +from ansiblelint.utils import get_first_cmd_arg + +try: + from ansible.module_utils.parsing.convert_bool import boolean +except ImportError: + try: + from ansible.utils.boolean import boolean + except ImportError: + try: + from ansible.utils import boolean + except ImportError: + from ansible import constants + boolean = constants.mk_boolean + + +class CommandsInsteadOfModulesRule(AnsibleLintRule): + id = '303' + shortdesc = 'Using command rather than module' + description = ( + 'Executing a command when there is an Ansible module ' + 'is generally a bad idea' + ) + severity = 'HIGH' + tags = ['command-shell', 'resources', 'ANSIBLE0006'] + version_added = 'historic' + + _commands = ['command', 'shell'] + _modules = { + 'apt-get': 'apt-get', + 'chkconfig': 'service', + 'curl': 'get_url or uri', + 'git': 'git', + 'hg': 'hg', + 'letsencrypt': 'acme_certificate', + 'mktemp': 'tempfile', + 'mount': 'mount', + 'patch': 'patch', + 'rpm': 'yum or rpm_key', + 'rsync': 'synchronize', + 'sed': 'template, replace or lineinfile', + 'service': 'service', + 'supervisorctl': 'supervisorctl', + 'svn': 'subversion', + 'systemctl': 'systemd', + 'tar': 'unarchive', + 'unzip': 'unarchive', + 'wget': 'get_url or uri', + 'yum': 'yum', + } + + def matchtask(self, file, task): + if task['action']['__ansible_module__'] not in self._commands: + return + + first_cmd_arg = get_first_cmd_arg(task) + if not first_cmd_arg: + return + + executable = os.path.basename(first_cmd_arg) + if executable in self._modules and \ + boolean(task['action'].get('warn', True)): + message = '{0} used in place of {1} module' + return message.format(executable, self._modules[executable]) diff --git a/lib/ansiblelint/rules/ComparisonToEmptyStringRule.py b/lib/ansiblelint/rules/ComparisonToEmptyStringRule.py new file mode 100644 index 0000000..a43c4f7 --- /dev/null +++ b/lib/ansiblelint/rules/ComparisonToEmptyStringRule.py @@ -0,0 +1,23 @@ +# Copyright (c) 2016, Will Thames and contributors +# Copyright (c) 2018, Ansible Project + +import re + +from ansiblelint.rules import AnsibleLintRule + + +class ComparisonToEmptyStringRule(AnsibleLintRule): + id = '602' + shortdesc = "Don't compare to empty string" + description = ( + 'Use ``when: var|length > 0`` rather than ``when: var != ""`` (or ' + 'conversely ``when: var|length == 0`` rather than ``when: var == ""``)' + ) + severity = 'HIGH' + tags = ['idiom'] + version_added = 'v4.0.0' + + empty_string_compare = re.compile("[=!]= ?(\"{2}|'{2})") + + def match(self, file, line): + return self.empty_string_compare.search(line) diff --git a/lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py b/lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py new file mode 100644 index 0000000..46668d1 --- /dev/null +++ b/lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py @@ -0,0 +1,23 @@ +# Copyright (c) 2016, Will Thames and contributors +# Copyright (c) 2018, Ansible Project + +import re + +from ansiblelint.rules import AnsibleLintRule + + +class ComparisonToLiteralBoolRule(AnsibleLintRule): + id = '601' + shortdesc = "Don't compare to literal True/False" + description = ( + 'Use ``when: var`` rather than ``when: var == True`` ' + '(or conversely ``when: not var``)' + ) + severity = 'HIGH' + tags = ['idiom'] + version_added = 'v4.0.0' + + literal_bool_compare = re.compile("[=!]= ?(True|true|False|false)") + + def match(self, file, line): + return self.literal_bool_compare.search(line) diff --git a/lib/ansiblelint/rules/DeprecatedModuleRule.py b/lib/ansiblelint/rules/DeprecatedModuleRule.py new file mode 100644 index 0000000..dc019ed --- /dev/null +++ b/lib/ansiblelint/rules/DeprecatedModuleRule.py @@ -0,0 +1,37 @@ +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + + +class DeprecatedModuleRule(AnsibleLintRule): + id = '105' + shortdesc = 'Deprecated module' + description = ( + 'These are deprecated modules, some modules are kept ' + 'temporarily for backwards compatibility but usage is discouraged. ' + 'For more details see: ' + 'https://docs.ansible.com/ansible/latest/modules/list_of_all_modules.html' + ) + severity = 'HIGH' + tags = ['deprecated'] + version_added = 'v4.0.0' + + _modules = [ + 'accelerate', 'aos_asn_pool', 'aos_blueprint', 'aos_blueprint_param', + 'aos_blueprint_virtnet', 'aos_device', 'aos_external_router', + 'aos_ip_pool', 'aos_logical_device', 'aos_logical_device_map', + 'aos_login', 'aos_rack_type', 'aos_template', 'azure', 'cl_bond', + 'cl_bridge', 'cl_img_install', 'cl_interface', 'cl_interface_policy', + 'cl_license', 'cl_ports', 'cs_nic', 'docker', 'ec2_ami_find', + 'ec2_ami_search', 'ec2_remote_facts', 'ec2_vpc', 'kubernetes', + 'netscaler', 'nxos_ip_interface', 'nxos_mtu', 'nxos_portchannel', + 'nxos_switchport', 'oc', 'panos_nat_policy', 'panos_security_policy', + 'vsphere_guest', 'win_msi', 'include' + ] + + def matchtask(self, file, task): + module = task["action"]["__ansible_module__"] + if module in self._modules: + message = '{0} {1}' + return message.format(self.shortdesc, module) + return False diff --git a/lib/ansiblelint/rules/EnvVarsInCommandRule.py b/lib/ansiblelint/rules/EnvVarsInCommandRule.py new file mode 100644 index 0000000..58dba90 --- /dev/null +++ b/lib/ansiblelint/rules/EnvVarsInCommandRule.py @@ -0,0 +1,48 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule +from ansiblelint.utils import FILENAME_KEY, LINE_NUMBER_KEY, get_first_cmd_arg + + +class EnvVarsInCommandRule(AnsibleLintRule): + id = '304' + shortdesc = "Environment variables don't work as part of command" + description = ( + 'Environment variables should be passed to ``shell`` or ``command`` ' + 'through environment argument' + ) + severity = 'VERY_HIGH' + tags = ['command-shell', 'bug', 'ANSIBLE0014'] + version_added = 'historic' + + expected_args = ['chdir', 'creates', 'executable', 'removes', 'stdin', 'warn', + 'stdin_add_newline', 'strip_empty_ends', + 'cmd', '__ansible_module__', '__ansible_arguments__', + LINE_NUMBER_KEY, FILENAME_KEY] + + def matchtask(self, file, task): + if task["action"]["__ansible_module__"] in ['command']: + first_cmd_arg = get_first_cmd_arg(task) + if not first_cmd_arg: + return + + return any([arg not in self.expected_args for arg in task['action']] + + ["=" in first_cmd_arg]) diff --git a/lib/ansiblelint/rules/GitHasVersionRule.py b/lib/ansiblelint/rules/GitHasVersionRule.py new file mode 100644 index 0000000..f0f3680 --- /dev/null +++ b/lib/ansiblelint/rules/GitHasVersionRule.py @@ -0,0 +1,37 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class GitHasVersionRule(AnsibleLintRule): + id = '401' + shortdesc = 'Git checkouts must contain explicit version' + description = ( + 'All version control checkouts must point to ' + 'an explicit commit or tag, not just ``latest``' + ) + severity = 'MEDIUM' + tags = ['module', 'repeatability', 'ANSIBLE0004'] + version_added = 'historic' + + def matchtask(self, file, task): + return (task['action']['__ansible_module__'] == 'git' and + task['action'].get('version', 'HEAD') == 'HEAD') diff --git a/lib/ansiblelint/rules/IncludeMissingFileRule.py b/lib/ansiblelint/rules/IncludeMissingFileRule.py new file mode 100644 index 0000000..57508fa --- /dev/null +++ b/lib/ansiblelint/rules/IncludeMissingFileRule.py @@ -0,0 +1,67 @@ +# Copyright (c) 2020, Joachim Lusiardi +# Copyright (c) 2020, Ansible Project + +import os.path + +import ansible.parsing.yaml.objects + +from ansiblelint.rules import AnsibleLintRule + + +class IncludeMissingFileRule(AnsibleLintRule): + id = '505' + shortdesc = 'referenced files must exist' + description = ( + 'All files referenced by by include / import tasks ' + 'must exist. The check excludes files with jinja2 ' + 'templates in the filename.' + ) + severity = 'MEDIUM' + tags = ['task', 'bug'] + version_added = 'v4.3.0' + + def matchplay(self, file, data): + absolute_directory = file.get('absolute_directory', None) + results = [] + + # avoid failing with a playbook having tasks: null + for task in (data.get('tasks', []) or []): + + # ignore None tasks or + # if the id of the current rule is not in list of skipped rules for this play + if not task or self.id in task.get('skipped_rules', ()): + continue + + # collect information which file was referenced for include / import + referenced_file = None + for key, val in task.items(): + if not (key.startswith('include_') or + key.startswith('import_') or + key == 'include'): + continue + if isinstance(val, ansible.parsing.yaml.objects.AnsibleMapping): + referenced_file = val.get('file', None) + else: + referenced_file = val + # take the file and skip the remaining keys + if referenced_file: + break + + if referenced_file is None or absolute_directory is None: + continue + + # make sure we have a absolute path here and check if it is a file + referenced_file = os.path.join(absolute_directory, referenced_file) + + # skip if this is a jinja2 templated reference + if '{{' in referenced_file: + continue + + # existing files do not produce any error + if os.path.isfile(referenced_file): + continue + + results.append(({'referenced_file': referenced_file}, + 'referenced missing file in %s:%i' + % (task['__file__'], task['__line__']))) + return results diff --git a/lib/ansiblelint/rules/LineTooLongRule.py b/lib/ansiblelint/rules/LineTooLongRule.py new file mode 100644 index 0000000..007857e --- /dev/null +++ b/lib/ansiblelint/rules/LineTooLongRule.py @@ -0,0 +1,19 @@ +# Copyright (c) 2016, Will Thames and contributors +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + + +class LineTooLongRule(AnsibleLintRule): + id = '204' + shortdesc = 'Lines should be no longer than 160 chars' + description = ( + 'Long lines make code harder to read and ' + 'code review more difficult' + ) + severity = 'VERY_LOW' + tags = ['formatting'] + version_added = 'v4.0.0' + + def match(self, file, line): + return len(line) > 160 diff --git a/lib/ansiblelint/rules/LoadingFailureRule.py b/lib/ansiblelint/rules/LoadingFailureRule.py new file mode 100644 index 0000000..7c37498 --- /dev/null +++ b/lib/ansiblelint/rules/LoadingFailureRule.py @@ -0,0 +1,14 @@ +"""Rule definition for a failure to load a file.""" + +from ansiblelint.rules import AnsibleLintRule + + +class LoadingFailureRule(AnsibleLintRule): + """File loading failure.""" + + id = '901' + shortdesc = 'Failed to load or parse file' + description = 'Linter failed to process a YAML file, possible not an Ansible file.' + severity = 'VERY_HIGH' + tags = ['core'] + version_added = 'v4.3.0' diff --git a/lib/ansiblelint/rules/MercurialHasRevisionRule.py b/lib/ansiblelint/rules/MercurialHasRevisionRule.py new file mode 100644 index 0000000..fcfe0a8 --- /dev/null +++ b/lib/ansiblelint/rules/MercurialHasRevisionRule.py @@ -0,0 +1,37 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class MercurialHasRevisionRule(AnsibleLintRule): + id = '402' + shortdesc = 'Mercurial checkouts must contain explicit revision' + description = ( + 'All version control checkouts must point to ' + 'an explicit commit or tag, not just ``latest``' + ) + severity = 'MEDIUM' + tags = ['module', 'repeatability', 'ANSIBLE0005'] + version_added = 'historic' + + def matchtask(self, file, task): + return (task['action']['__ansible_module__'] == 'hg' and + task['action'].get('revision', 'default') == 'default') diff --git a/lib/ansiblelint/rules/MetaChangeFromDefaultRule.py b/lib/ansiblelint/rules/MetaChangeFromDefaultRule.py new file mode 100644 index 0000000..db52db3 --- /dev/null +++ b/lib/ansiblelint/rules/MetaChangeFromDefaultRule.py @@ -0,0 +1,40 @@ +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + + +class MetaChangeFromDefaultRule(AnsibleLintRule): + id = '703' + shortdesc = 'meta/main.yml default values should be changed' + field_defaults = [ + ('author', 'your name'), + ('description', 'your description'), + ('company', 'your company (optional)'), + ('license', 'license (GPLv2, CC-BY, etc)'), + ('license', 'license (GPL-2.0-or-later, MIT, etc)'), + ] + description = ( + 'meta/main.yml default values should be changed for: ``{}``'.format( + ', '.join(f[0] for f in field_defaults) + ) + ) + severity = 'HIGH' + tags = ['metadata'] + version_added = 'v4.0.0' + + def matchplay(self, file, data): + if file['type'] != 'meta': + return False + + galaxy_info = data.get('galaxy_info', None) + if not galaxy_info: + return False + + results = [] + for field, default in self.field_defaults: + value = galaxy_info.get(field, None) + if value and value == default: + results.append(({'meta/main.yml': data}, + 'Should change default metadata: %s' % field)) + + return results diff --git a/lib/ansiblelint/rules/MetaMainHasInfoRule.py b/lib/ansiblelint/rules/MetaMainHasInfoRule.py new file mode 100644 index 0000000..f05f240 --- /dev/null +++ b/lib/ansiblelint/rules/MetaMainHasInfoRule.py @@ -0,0 +1,66 @@ +# Copyright (c) 2016, Will Thames and contributors +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + +META_STR_INFO = ( + 'author', + 'description' +) +META_INFO = tuple(list(META_STR_INFO) + [ + 'license', + 'min_ansible_version', + 'platforms', +]) + + +def _platform_info_errors_itr(platforms): + if not isinstance(platforms, list): + yield 'Platforms should be a list of dictionaries' + return + + for platform in platforms: + if not isinstance(platform, dict): + yield 'Platforms should be a list of dictionaries' + elif 'name' not in platform: + yield 'Platform should contain name' + + +def _galaxy_info_errors_itr(galaxy_info, + info_list=META_INFO, + str_info_list=META_STR_INFO): + for info in info_list: + ginfo = galaxy_info.get(info, False) + if ginfo: + if info in str_info_list and not isinstance(ginfo, str): + yield '{info} should be a string'.format(info=info) + elif info == 'platforms': + for err in _platform_info_errors_itr(ginfo): + yield err + else: + yield 'Role info should contain {info}'.format(info=info) + + +class MetaMainHasInfoRule(AnsibleLintRule): + id = '701' + shortdesc = 'meta/main.yml should contain relevant info' + str_info = META_STR_INFO + info = META_INFO + description = ( + 'meta/main.yml should contain: ``{}``'.format(', '.join(info)) + ) + severity = 'HIGH' + tags = ['metadata'] + version_added = 'v4.0.0' + + def matchplay(self, file, data): + if file['type'] != 'meta': + return False + + meta = {'meta/main.yml': data} + galaxy_info = data.get('galaxy_info', False) + if galaxy_info: + return [(meta, err) for err + in _galaxy_info_errors_itr(galaxy_info)] + + return [(meta, "No 'galaxy_info' found")] diff --git a/lib/ansiblelint/rules/MetaTagValidRule.py b/lib/ansiblelint/rules/MetaTagValidRule.py new file mode 100644 index 0000000..0739ca3 --- /dev/null +++ b/lib/ansiblelint/rules/MetaTagValidRule.py @@ -0,0 +1,81 @@ +# Copyright (c) 2018, Ansible Project + +import re +import sys + +from ansiblelint.rules import AnsibleLintRule + + +class MetaTagValidRule(AnsibleLintRule): + id = '702' + shortdesc = 'Tags must contain lowercase letters and digits only' + description = ( + 'Tags must contain lowercase letters and digits only, ' + 'and ``galaxy_tags`` is expected to be a list' + ) + severity = 'HIGH' + tags = ['metadata'] + version_added = 'v4.0.0' + + TAG_REGEXP = re.compile('^[a-z0-9]+$') + + def matchplay(self, file, data): + if file['type'] != 'meta': + return False + + galaxy_info = data.get('galaxy_info', None) + if not galaxy_info: + return False + + tags = [] + results = [] + + if 'galaxy_tags' in galaxy_info: + if isinstance(galaxy_info['galaxy_tags'], list): + tags += galaxy_info['galaxy_tags'] + else: + results.append(({'meta/main.yml': data}, + "Expected 'galaxy_tags' to be a list")) + + if 'categories' in galaxy_info: + results.append(({'meta/main.yml': data}, + "Use 'galaxy_tags' rather than 'categories'")) + if isinstance(galaxy_info['categories'], list): + tags += galaxy_info['categories'] + else: + results.append(({'meta/main.yml': data}, + "Expected 'categories' to be a list")) + + for tag in tags: + msg = self.shortdesc + if not isinstance(tag, str): + results.append(( + {'meta/main.yml': data}, + "Tags must be strings: '{}'".format(tag))) + continue + if not re.match(self.TAG_REGEXP, tag): + results.append(({'meta/main.yml': data}, + "{}, invalid: '{}'".format(msg, tag))) + + return results + + +META_TAG_VALID = ''' +galaxy_info: + galaxy_tags: ['database', 'my s q l', 'MYTAG'] + categories: 'my_category_not_in_a_list' +''' + +# testing code to be loaded only with pytest or when executed the rule file +if "pytest" in sys.modules: + + import pytest + + @pytest.mark.parametrize('rule_runner', (MetaTagValidRule, ), indirect=['rule_runner']) + def test_valid_tag_rule(rule_runner): + """Test rule matches.""" + results = rule_runner.run_role_meta_main(META_TAG_VALID) + assert "Use 'galaxy_tags' rather than 'categories'" in str(results) + assert "Expected 'categories' to be a list" in str(results) + assert "invalid: 'my s q l'" in str(results) + assert "invalid: 'MYTAG'" in str(results) diff --git a/lib/ansiblelint/rules/MetaVideoLinksRule.py b/lib/ansiblelint/rules/MetaVideoLinksRule.py new file mode 100644 index 0000000..aa34012 --- /dev/null +++ b/lib/ansiblelint/rules/MetaVideoLinksRule.py @@ -0,0 +1,65 @@ +# Copyright (c) 2018, Ansible Project + +import re + +from ansiblelint.rules import AnsibleLintRule + + +class MetaVideoLinksRule(AnsibleLintRule): + id = '704' + shortdesc = "meta/main.yml video_links should be formatted correctly" + description = ( + 'Items in ``video_links`` in meta/main.yml should be ' + 'dictionaries, and contain only keys ``url`` and ``title``, ' + 'and have a shared link from a supported provider' + ) + severity = 'LOW' + tags = ['metadata'] + version_added = 'v4.0.0' + + VIDEO_REGEXP = { + 'google': re.compile( + r'https://drive\.google\.com.*file/d/([0-9A-Za-z-_]+)/.*'), + 'vimeo': re.compile( + r'https://vimeo\.com/([0-9]+)'), + 'youtube': re.compile( + r'https://youtu\.be/([0-9A-Za-z-_]+)'), + } + + def matchplay(self, file, data): + if file['type'] != 'meta': + return False + + galaxy_info = data.get('galaxy_info', None) + if not galaxy_info: + return False + + video_links = galaxy_info.get('video_links', None) + if not video_links: + return False + + results = [] + + for video in video_links: + if not isinstance(video, dict): + results.append(({'meta/main.yml': data}, + "Expected item in 'video_links' to be " + "a dictionary")) + continue + + if set(video) != {'url', 'title', '__file__', '__line__'}: + results.append(({'meta/main.yml': data}, + "Expected item in 'video_links' to contain " + "only keys 'url' and 'title'")) + continue + + for name, expr in self.VIDEO_REGEXP.items(): + if expr.match(video['url']): + break + else: + msg = ("URL format '{0}' is not recognized. " + "Expected it be a shared link from Vimeo, YouTube, " + "or Google Drive.".format(video['url'])) + results.append(({'meta/main.yml': data}, msg)) + + return results diff --git a/lib/ansiblelint/rules/MissingFilePermissionsRule.py b/lib/ansiblelint/rules/MissingFilePermissionsRule.py new file mode 100644 index 0000000..bc11cc7 --- /dev/null +++ b/lib/ansiblelint/rules/MissingFilePermissionsRule.py @@ -0,0 +1,95 @@ +# Copyright (c) 2020 Sorin Sbarnea <sorin.sbarnea@gmail.com> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +from ansiblelint.rules import AnsibleLintRule + +# Despite documentation mentioning 'preserve' only these modules support it: +_modules_with_preserve = ( + 'copy', + 'template', +) + + +class MissingFilePermissionsRule(AnsibleLintRule): + id = "208" + shortdesc = 'File permissions unset or incorrect' + description = ( + "Missing or unsupported mode parameter can cause unexpected file " + "permissions based " + "on version of Ansible being used. Be explicit, like ``mode: 0644`` to " + "avoid hitting this rule. Special ``preserve`` value is accepted " + f"only by {', '.join(_modules_with_preserve)} modules. " + "See https://github.com/ansible/ansible/issues/71200" + ) + severity = 'VERY_HIGH' + tags = ['unpredictability', 'experimental'] + version_added = 'v4.3.0' + + _modules = { + 'archive', + 'assemble', + 'copy', # supports preserve + 'file', + 'replace', # implicit preserve behavior but mode: preserve is invalid + 'template', # supports preserve + # 'unarchive', # disabled because .tar.gz files can have permissions inside + } + + _modules_with_create = { + 'blockinfile': False, + 'htpasswd': True, + 'ini_file': True, + 'lineinfile': False, + } + + def matchtask(self, file, task): + module = task["action"]["__ansible_module__"] + mode = task['action'].get('mode', None) + + if module not in self._modules and \ + module not in self._modules_with_create: + return False + + if mode == 'preserve' and module not in _modules_with_preserve: + return True + + if module in self._modules_with_create: + create = task["action"].get("create", self._modules_with_create[module]) + return create and mode is None + + # A file that doesn't exist cannot have a mode + if task['action'].get('state', None) == "absent": + return False + + # A symlink always has mode 0o777 + if task['action'].get('state', None) == "link": + return False + + # The file module does not create anything when state==file (default) + if module == "file" and \ + task['action'].get('state', 'file') == 'file': + return False + + # replace module is the only one that has a valid default preserve + # behavior, but we want to trigger rule if user used incorrect + # documentation and put 'preserve', which is not supported. + if module == 'replace' and mode is None: + return False + + return mode is None diff --git a/lib/ansiblelint/rules/NestedJinjaRule.py b/lib/ansiblelint/rules/NestedJinjaRule.py new file mode 100644 index 0000000..c10d4ec --- /dev/null +++ b/lib/ansiblelint/rules/NestedJinjaRule.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Author: Adrián Tóth <adtoth@redhat.com> +# +# Copyright (c) 2020, Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import re + +from ansiblelint.rules import AnsibleLintRule + + +class NestedJinjaRule(AnsibleLintRule): + id = '207' + shortdesc = 'Nested jinja pattern' + description = ( + "There should not be any nested jinja pattern. " + "Example (bad): ``{{ list_one + {{ list_two | max }} }}``, " + "example (good): ``{{ list_one + max(list_two) }}``" + ) + severity = 'VERY_HIGH' + tags = ['formatting'] + version_added = 'v4.3.0' + + pattern = re.compile(r"{{(?:[^{}]*)?{{") + + def matchtask(self, file, task): + + command = "".join( + str(value) + # task properties are stored in the 'action' key + for key, value in task['action'].items() + # exclude useless values of '__file__', '__ansible_module__', '__*__', etc. + if not key.startswith('__') and not key.endswith('__') + ) + + return bool(self.pattern.search(command)) diff --git a/lib/ansiblelint/rules/NoFormattingInWhenRule.py b/lib/ansiblelint/rules/NoFormattingInWhenRule.py new file mode 100644 index 0000000..a665311 --- /dev/null +++ b/lib/ansiblelint/rules/NoFormattingInWhenRule.py @@ -0,0 +1,34 @@ +from ansiblelint.rules import AnsibleLintRule + + +class NoFormattingInWhenRule(AnsibleLintRule): + id = '102' + shortdesc = 'No Jinja2 in when' + description = '``when`` lines should not include Jinja2 variables' + severity = 'HIGH' + tags = ['deprecated', 'ANSIBLE0019'] + version_added = 'historic' + + def _is_valid(self, when): + if not isinstance(when, str): + return True + return when.find('{{') == -1 and when.find('}}') == -1 + + def matchplay(self, file, play): + errors = [] + if isinstance(play, dict): + if 'roles' not in play or play['roles'] is None: + return errors + for role in play['roles']: + if self.matchtask(file, role): + errors.append(({'when': role}, + 'role "when" clause has Jinja2 templates')) + if isinstance(play, list): + for play_item in play: + sub_errors = self.matchplay(file, play_item) + if sub_errors: + errors = errors + sub_errors + return errors + + def matchtask(self, file, task): + return 'when' in task and not self._is_valid(task['when']) diff --git a/lib/ansiblelint/rules/NoTabsRule.py b/lib/ansiblelint/rules/NoTabsRule.py new file mode 100644 index 0000000..78222c8 --- /dev/null +++ b/lib/ansiblelint/rules/NoTabsRule.py @@ -0,0 +1,16 @@ +# Copyright (c) 2016, Will Thames and contributors +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + + +class NoTabsRule(AnsibleLintRule): + id = '203' + shortdesc = 'Most files should not contain tabs' + description = 'Tabs can cause unexpected display issues, use spaces' + severity = 'LOW' + tags = ['formatting'] + version_added = 'v4.0.0' + + def match(self, file, line): + return '\t' in line diff --git a/lib/ansiblelint/rules/OctalPermissionsRule.py b/lib/ansiblelint/rules/OctalPermissionsRule.py new file mode 100644 index 0000000..b95c322 --- /dev/null +++ b/lib/ansiblelint/rules/OctalPermissionsRule.py @@ -0,0 +1,73 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class OctalPermissionsRule(AnsibleLintRule): + id = '202' + shortdesc = 'Octal file permissions must contain leading zero or be a string' + description = ( + 'Numeric file permissions without leading zero can behave ' + 'in unexpected ways. See ' + 'http://docs.ansible.com/ansible/file_module.html' + ) + severity = 'VERY_HIGH' + tags = ['formatting', 'ANSIBLE0009'] + version_added = 'historic' + + _modules = ['assemble', 'copy', 'file', 'ini_file', 'lineinfile', + 'replace', 'synchronize', 'template', 'unarchive'] + + def is_invalid_permission(self, mode): + # sensible file permission modes don't + # have write bit set when read bit is + # not set and don't have execute bit set + # when user execute bit is not set. + # also, user permissions are more generous than + # group permissions and user and group permissions + # are more generous than world permissions + + other_write_without_read = (mode % 8 and mode % 8 < 4 and + not (mode % 8 == 1 and (mode >> 6) % 2 == 1)) + group_write_without_read = ((mode >> 3) % 8 and (mode >> 3) % 8 < 4 and + not ((mode >> 3) % 8 == 1 and (mode >> 6) % 2 == 1)) + user_write_without_read = ((mode >> 6) % 8 and (mode >> 6) % 8 < 4 and + not (mode >> 6) % 8 == 1) + other_more_generous_than_group = mode % 8 > (mode >> 3) % 8 + other_more_generous_than_user = mode % 8 > (mode >> 6) % 8 + group_more_generous_than_user = (mode >> 3) % 8 > (mode >> 6) % 8 + + return (other_write_without_read or + group_write_without_read or + user_write_without_read or + other_more_generous_than_group or + other_more_generous_than_user or + group_more_generous_than_user) + + def matchtask(self, file, task): + if task["action"]["__ansible_module__"] in self._modules: + mode = task['action'].get('mode', None) + + if isinstance(mode, str): + return False + + if isinstance(mode, int): + return self.is_invalid_permission(mode) diff --git a/lib/ansiblelint/rules/PackageIsNotLatestRule.py b/lib/ansiblelint/rules/PackageIsNotLatestRule.py new file mode 100644 index 0000000..9fddaf4 --- /dev/null +++ b/lib/ansiblelint/rules/PackageIsNotLatestRule.py @@ -0,0 +1,67 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class PackageIsNotLatestRule(AnsibleLintRule): + id = '403' + shortdesc = 'Package installs should not use latest' + description = ( + 'Package installs should use ``state=present`` ' + 'with or without a version' + ) + severity = 'VERY_LOW' + tags = ['module', 'repeatability', 'ANSIBLE0010'] + version_added = 'historic' + + _package_managers = [ + 'apk', + 'apt', + 'bower', + 'bundler', + 'dnf', + 'easy_install', + 'gem', + 'homebrew', + 'jenkins_plugin', + 'npm', + 'openbsd_package', + 'openbsd_pkg', + 'package', + 'pacman', + 'pear', + 'pip', + 'pkg5', + 'pkgutil', + 'portage', + 'slackpkg', + 'sorcery', + 'swdepot', + 'win_chocolatey', + 'yarn', + 'yum', + 'zypper', + ] + + def matchtask(self, file, task): + return (task['action']['__ansible_module__'] in self._package_managers and + not task['action'].get('version') and + task['action'].get('state') == 'latest') diff --git a/lib/ansiblelint/rules/PlaybookExtension.py b/lib/ansiblelint/rules/PlaybookExtension.py new file mode 100644 index 0000000..593e5ae --- /dev/null +++ b/lib/ansiblelint/rules/PlaybookExtension.py @@ -0,0 +1,28 @@ +# Copyright (c) 2016, Tsukinowa Inc. <info@tsukinowa.jp> +# Copyright (c) 2018, Ansible Project + +import os +from typing import List + +from ansiblelint.rules import AnsibleLintRule + + +class PlaybookExtension(AnsibleLintRule): + id = '205' + shortdesc = 'Use ".yml" or ".yaml" playbook extension' + description = 'Playbooks should have the ".yml" or ".yaml" extension' + severity = 'MEDIUM' + tags = ['formatting'] + done = [] # type: List # already noticed path list + version_added = 'v4.0.0' + + def match(self, file, text): + if file['type'] != 'playbook': + return False + + path = file['path'] + ext = os.path.splitext(path) + if ext[1] not in ['.yml', '.yaml'] and path not in self.done: + self.done.append(path) + return True + return False diff --git a/lib/ansiblelint/rules/RoleNames.py b/lib/ansiblelint/rules/RoleNames.py new file mode 100644 index 0000000..3d790b3 --- /dev/null +++ b/lib/ansiblelint/rules/RoleNames.py @@ -0,0 +1,74 @@ +# Copyright (c) 2020 Gael Chamoulaud <gchamoul@redhat.com> +# Copyright (c) 2020 Sorin Sbarnea <ssbarnea@redhat.com> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import re +from pathlib import Path +from typing import List + +from ansiblelint.rules import AnsibleLintRule +from ansiblelint.utils import parse_yaml_from_file + +ROLE_NAME_REGEX = '^[a-z][a-z0-9_]+$' + + +def _remove_prefix(text, prefix): + return re.sub(r'^{0}'.format(re.escape(prefix)), '', text) + + +class RoleNames(AnsibleLintRule): + id = '106' + shortdesc = ( + "Role name {} does not match ``%s`` pattern" % ROLE_NAME_REGEX + ) + description = ( + "Role names are now limited to contain only lowercase alphanumeric " + "characters, plus '_' and start with an alpha character. See " + "`developing collections <https://docs.ansible.com/ansible/devel/dev_guide/developing_" + "collections.html#roles-directory>`_" + ) + severity = 'HIGH' + done: List[str] = [] # already noticed roles list + tags = ['deprecated'] + version_added = 'v4.3.0' + + ROLE_NAME_REGEXP = re.compile(ROLE_NAME_REGEX) + + def match(self, file, text): + path = file['path'].split("/") + if "tasks" in path: + role_name = _remove_prefix(path[path.index("tasks") - 1], "ansible-role-") + role_root = path[:path.index("tasks")] + meta = Path("/".join(role_root)) / "meta" / "main.yml" + + if meta.is_file(): + meta_data = parse_yaml_from_file(str(meta)) + if meta_data: + try: + role_name = meta_data['galaxy_info']['role_name'] + except KeyError: + pass + + if role_name in self.done: + return False + self.done.append(role_name) + if not re.match(self.ROLE_NAME_REGEXP, role_name): + return self.shortdesc.format(role_name) + return False diff --git a/lib/ansiblelint/rules/RoleRelativePath.py b/lib/ansiblelint/rules/RoleRelativePath.py new file mode 100644 index 0000000..87d7ac8 --- /dev/null +++ b/lib/ansiblelint/rules/RoleRelativePath.py @@ -0,0 +1,32 @@ +# Copyright (c) 2016, Tsukinowa Inc. <info@tsukinowa.jp> +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + + +class RoleRelativePath(AnsibleLintRule): + id = '404' + shortdesc = "Doesn't need a relative path in role" + description = '``copy`` and ``template`` do not need to use relative path for ``src``' + severity = 'HIGH' + tags = ['module'] + version_added = 'v4.0.0' + + _module_to_path_folder = { + 'copy': 'files', + 'win_copy': 'files', + 'template': 'templates', + 'win_template': 'win_templates', + } + + def matchtask(self, file, task): + module = task['action']['__ansible_module__'] + if module not in self._module_to_path_folder: + return False + + if 'src' not in task['action']: + return False + + path_to_check = '../{}'.format(self._module_to_path_folder[module]) + if path_to_check in task['action']['src']: + return True diff --git a/lib/ansiblelint/rules/ShellWithoutPipefail.py b/lib/ansiblelint/rules/ShellWithoutPipefail.py new file mode 100644 index 0000000..678e5a2 --- /dev/null +++ b/lib/ansiblelint/rules/ShellWithoutPipefail.py @@ -0,0 +1,38 @@ +import re + +from ansiblelint.rules import AnsibleLintRule + + +class ShellWithoutPipefail(AnsibleLintRule): + id = '306' + shortdesc = 'Shells that use pipes should set the pipefail option' + description = ( + 'Without the pipefail option set, a shell command that ' + 'implements a pipeline can fail and still return 0. If ' + 'any part of the pipeline other than the terminal command ' + 'fails, the whole pipeline will still return 0, which may ' + 'be considered a success by Ansible. ' + 'Pipefail is available in the bash shell.' + ) + severity = 'MEDIUM' + tags = ['command-shell'] + version_added = 'v4.1.0' + + _pipefail_re = re.compile(r"^\s*set.*[+-][A-z]*o\s*pipefail") + _pipe_re = re.compile(r"(?<!\|)\|(?!\|)") + + def matchtask(self, file, task): + if task["__ansible_action_type__"] != "task": + return False + + if task["action"]["__ansible_module__"] != "shell": + return False + + if task.get("ignore_errors"): + return False + + unjinjad_cmd = self.unjinja( + ' '.join(task["action"].get("__ansible_arguments__", []))) + + return (self._pipe_re.search(unjinjad_cmd) and + not self._pipefail_re.match(unjinjad_cmd)) diff --git a/lib/ansiblelint/rules/SudoRule.py b/lib/ansiblelint/rules/SudoRule.py new file mode 100644 index 0000000..8ea554e --- /dev/null +++ b/lib/ansiblelint/rules/SudoRule.py @@ -0,0 +1,36 @@ +from ansiblelint.rules import AnsibleLintRule + + +class SudoRule(AnsibleLintRule): + id = '103' + shortdesc = 'Deprecated sudo' + description = 'Instead of ``sudo``/``sudo_user``, use ``become``/``become_user``.' + severity = 'VERY_HIGH' + tags = ['deprecated', 'ANSIBLE0008'] + version_added = 'historic' + + def _check_value(self, play_frag): + results = [] + + if isinstance(play_frag, dict): + if 'sudo' in play_frag: + results.append(({'sudo': play_frag['sudo']}, + 'Deprecated sudo feature', play_frag['__line__'])) + if 'sudo_user' in play_frag: + results.append(({'sudo_user': play_frag['sudo_user']}, + 'Deprecated sudo_user feature', play_frag['__line__'])) + if 'tasks' in play_frag: + output = self._check_value(play_frag['tasks']) + if output: + results += output + + if isinstance(play_frag, list): + for item in play_frag: + output = self._check_value(item) + if output: + results += output + + return results + + def matchplay(self, file, play): + return self._check_value(play) diff --git a/lib/ansiblelint/rules/TaskHasNameRule.py b/lib/ansiblelint/rules/TaskHasNameRule.py new file mode 100644 index 0000000..8757b03 --- /dev/null +++ b/lib/ansiblelint/rules/TaskHasNameRule.py @@ -0,0 +1,40 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class TaskHasNameRule(AnsibleLintRule): + id = '502' + shortdesc = 'All tasks should be named' + description = ( + 'All tasks should have a distinct name for readability ' + 'and for ``--start-at-task`` to work' + ) + severity = 'MEDIUM' + tags = ['task', 'readability', 'ANSIBLE0011'] + version_added = 'historic' + + _nameless_tasks = ['meta', 'debug', 'include_role', 'import_role', + 'include_tasks', 'import_tasks'] + + def matchtask(self, file, task): + return (not task.get('name') and + task["action"]["__ansible_module__"] not in self._nameless_tasks) diff --git a/lib/ansiblelint/rules/TaskNoLocalAction.py b/lib/ansiblelint/rules/TaskNoLocalAction.py new file mode 100644 index 0000000..294bb9d --- /dev/null +++ b/lib/ansiblelint/rules/TaskNoLocalAction.py @@ -0,0 +1,18 @@ +# Copyright (c) 2016, Tsukinowa Inc. <info@tsukinowa.jp> +# Copyright (c) 2018, Ansible Project + +from ansiblelint.rules import AnsibleLintRule + + +class TaskNoLocalAction(AnsibleLintRule): + id = '504' + shortdesc = "Do not use 'local_action', use 'delegate_to: localhost'" + description = 'Do not use ``local_action``, use ``delegate_to: localhost``' + severity = 'MEDIUM' + tags = ['task'] + version_added = 'v4.0.0' + + def match(self, file, text): + if 'local_action' in text: + return True + return False diff --git a/lib/ansiblelint/rules/TrailingWhitespaceRule.py b/lib/ansiblelint/rules/TrailingWhitespaceRule.py new file mode 100644 index 0000000..ac0f1c2 --- /dev/null +++ b/lib/ansiblelint/rules/TrailingWhitespaceRule.py @@ -0,0 +1,34 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class TrailingWhitespaceRule(AnsibleLintRule): + id = '201' + shortdesc = 'Trailing whitespace' + description = 'There should not be any trailing whitespace' + severity = 'INFO' + tags = ['formatting', 'ANSIBLE0002'] + version_added = 'historic' + + def match(self, file, line): + line = line.replace("\r", "") + return line.rstrip() != line diff --git a/lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py b/lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py new file mode 100644 index 0000000..48babcf --- /dev/null +++ b/lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py @@ -0,0 +1,45 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +class UseCommandInsteadOfShellRule(AnsibleLintRule): + id = '305' + shortdesc = 'Use shell only when shell functionality is required' + description = ( + 'Shell should only be used when piping, redirecting ' + 'or chaining commands (and Ansible would be preferred ' + 'for some of those!)' + ) + severity = 'HIGH' + tags = ['command-shell', 'safety', 'ANSIBLE0013'] + version_added = 'historic' + + def matchtask(self, file, task): + # Use unjinja so that we don't match on jinja filters + # rather than pipes + if task["action"]["__ansible_module__"] == 'shell': + if 'cmd' in task['action']: + unjinjad_cmd = self.unjinja(task["action"].get("cmd", [])) + else: + unjinjad_cmd = self.unjinja( + ' '.join(task["action"].get("__ansible_arguments__", []))) + return not any([ch in unjinjad_cmd for ch in '&|<>;$\n*[]{}?`']) diff --git a/lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py b/lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py new file mode 100644 index 0000000..53b389d --- /dev/null +++ b/lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py @@ -0,0 +1,52 @@ +# Copyright (c) 2016 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from ansiblelint.rules import AnsibleLintRule + + +def _changed_in_when(item): + if not isinstance(item, str): + return False + return any(changed in item for changed in + ['.changed', '|changed', '["changed"]', "['changed']"]) + + +class UseHandlerRatherThanWhenChangedRule(AnsibleLintRule): + id = '503' + shortdesc = 'Tasks that run when changed should likely be handlers' + description = ( + 'If a task has a ``when: result.changed`` setting, it is effectively ' + 'acting as a handler' + ) + severity = 'MEDIUM' + tags = ['task', 'behaviour', 'ANSIBLE0016'] + version_added = 'historic' + + def matchtask(self, file, task): + if task["__ansible_action_type__"] != 'task': + return False + + when = task.get('when') + + if isinstance(when, list): + for item in when: + return _changed_in_when(item) + else: + return _changed_in_when(when) diff --git a/lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py b/lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py new file mode 100644 index 0000000..a0721ac --- /dev/null +++ b/lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py @@ -0,0 +1,75 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import os +import re + +from ansiblelint.rules import AnsibleLintRule + + +class UsingBareVariablesIsDeprecatedRule(AnsibleLintRule): + id = '104' + shortdesc = 'Using bare variables is deprecated' + description = ( + 'Using bare variables is deprecated. Update your ' + 'playbooks so that the environment value uses the full variable ' + 'syntax ``{{ your_variable }}``' + ) + severity = 'VERY_HIGH' + tags = ['deprecated', 'formatting', 'ANSIBLE0015'] + version_added = 'historic' + + _jinja = re.compile(r"{{.*}}", re.DOTALL) + _glob = re.compile('[][*?]') + + def matchtask(self, file, task): + loop_type = next((key for key in task + if key.startswith("with_")), None) + if loop_type: + if loop_type in ["with_nested", "with_together", "with_flattened", "with_filetree"]: + # These loops can either take a list defined directly in the task + # or a variable that is a list itself. When a single variable is used + # we just need to check that one variable, and not iterate over it like + # it's a list. Otherwise, loop through and check all items. + items = task[loop_type] + if not isinstance(items, (list, tuple)): + items = [items] + for var in items: + return self._matchvar(var, task, loop_type) + elif loop_type == "with_subelements": + return self._matchvar(task[loop_type][0], task, loop_type) + elif loop_type in ["with_sequence", "with_ini", + "with_inventory_hostnames"]: + pass + else: + return self._matchvar(task[loop_type], task, loop_type) + + def _matchvar(self, varstring, task, loop_type): + if (isinstance(varstring, str) and + not self._jinja.match(varstring)): + valid = loop_type == 'with_fileglob' and bool(self._jinja.search(varstring) or + self._glob.search(varstring)) + + valid |= loop_type == 'with_filetree' and bool(self._jinja.search(varstring) or + varstring.endswith(os.sep)) + if not valid: + message = "Found a bare variable '{0}' used in a '{1}' loop." + \ + " You should use the full variable syntax ('{{{{ {0} }}}}')" + return message.format(task[loop_type], loop_type) diff --git a/lib/ansiblelint/rules/VariableHasSpacesRule.py b/lib/ansiblelint/rules/VariableHasSpacesRule.py new file mode 100644 index 0000000..dd4f441 --- /dev/null +++ b/lib/ansiblelint/rules/VariableHasSpacesRule.py @@ -0,0 +1,24 @@ +# Copyright (c) 2016, Will Thames and contributors +# Copyright (c) 2018, Ansible Project + +import re + +from ansiblelint.rules import AnsibleLintRule + + +class VariableHasSpacesRule(AnsibleLintRule): + id = '206' + shortdesc = 'Variables should have spaces before and after: {{ var_name }}' + description = 'Variables should have spaces before and after: ``{{ var_name }}``' + severity = 'LOW' + tags = ['formatting'] + version_added = 'v4.0.0' + + variable_syntax = re.compile(r"{{.*}}") + bracket_regex = re.compile(r"{{[^{' -]|[^ '}-]}}") + + def match(self, file, line): + if not self.variable_syntax.search(line): + return + line_exclude_json = re.sub(r"[^{]{'\w+': ?[^{]{.*?}}", "", line) + return self.bracket_regex.search(line_exclude_json) diff --git a/lib/ansiblelint/rules/__init__.py b/lib/ansiblelint/rules/__init__.py new file mode 100644 index 0000000..fd3e92d --- /dev/null +++ b/lib/ansiblelint/rules/__init__.py @@ -0,0 +1,254 @@ +"""All internal ansible-lint rules.""" +import glob +import importlib.util +import logging +import os +import re +from collections import defaultdict +from importlib.abc import Loader +from time import sleep +from typing import List + +import ansiblelint.utils +from ansiblelint.errors import MatchError +from ansiblelint.skip_utils import append_skipped_rules, get_rule_skips_from_line + +_logger = logging.getLogger(__name__) + + +class AnsibleLintRule(object): + + def __repr__(self) -> str: + """Return a AnsibleLintRule instance representation.""" + return self.id + ": " + self.shortdesc + + def verbose(self) -> str: + return self.id + ": " + self.shortdesc + "\n " + self.description + + id: str = "" + tags: List[str] = [] + shortdesc: str = "" + description: str = "" + version_added: str = "" + severity: str = "" + match = None + matchtask = None + matchplay = None + + @staticmethod + def unjinja(text): + text = re.sub(r"{{.+?}}", "JINJA_EXPRESSION", text) + text = re.sub(r"{%.+?%}", "JINJA_STATEMENT", text) + text = re.sub(r"{#.+?#}", "JINJA_COMMENT", text) + return text + + def matchlines(self, file, text) -> List[MatchError]: + matches: List[MatchError] = [] + if not self.match: + return matches + # arrays are 0-based, line numbers are 1-based + # so use prev_line_no as the counter + for (prev_line_no, line) in enumerate(text.split("\n")): + if line.lstrip().startswith('#'): + continue + + rule_id_list = get_rule_skips_from_line(line) + if self.id in rule_id_list: + continue + + result = self.match(file, line) + if not result: + continue + message = None + if isinstance(result, str): + message = result + m = MatchError( + message=message, + linenumber=prev_line_no + 1, + details=line, + filename=file['path'], + rule=self) + matches.append(m) + return matches + + # TODO(ssbarnea): Reduce mccabe complexity + # https://github.com/ansible/ansible-lint/issues/744 + def matchtasks(self, file: str, text: str) -> List[MatchError]: # noqa: C901 + matches: List[MatchError] = [] + if not self.matchtask: + return matches + + if file['type'] == 'meta': + return matches + + yaml = ansiblelint.utils.parse_yaml_linenumbers(text, file['path']) + if not yaml: + return matches + + yaml = append_skipped_rules(yaml, text, file['type']) + + try: + tasks = ansiblelint.utils.get_normalized_tasks(yaml, file) + except MatchError as e: + return [e] + + for task in tasks: + if self.id in task.get('skipped_rules', ()): + continue + + if 'action' not in task: + continue + result = self.matchtask(file, task) + if not result: + continue + + message = None + if isinstance(result, str): + message = result + task_msg = "Task/Handler: " + ansiblelint.utils.task_to_str(task) + m = MatchError( + message=message, + linenumber=task[ansiblelint.utils.LINE_NUMBER_KEY], + details=task_msg, + filename=file['path'], + rule=self) + matches.append(m) + return matches + + @staticmethod + def _matchplay_linenumber(play, optional_linenumber): + try: + linenumber, = optional_linenumber + except ValueError: + linenumber = play[ansiblelint.utils.LINE_NUMBER_KEY] + return linenumber + + def matchyaml(self, file: str, text: str) -> List[MatchError]: + matches: List[MatchError] = [] + if not self.matchplay: + return matches + + yaml = ansiblelint.utils.parse_yaml_linenumbers(text, file['path']) + if not yaml: + return matches + + if isinstance(yaml, dict): + yaml = [yaml] + + yaml = ansiblelint.skip_utils.append_skipped_rules(yaml, text, file['type']) + + for play in yaml: + if self.id in play.get('skipped_rules', ()): + continue + + result = self.matchplay(file, play) + if not result: + continue + + if isinstance(result, tuple): + result = [result] + + if not isinstance(result, list): + raise TypeError("{} is not a list".format(result)) + + for section, message, *optional_linenumber in result: + linenumber = self._matchplay_linenumber(play, optional_linenumber) + m = MatchError( + message=message, + linenumber=linenumber, + details=str(section), + filename=file['path'], + rule=self) + matches.append(m) + return matches + + +def load_plugins(directory: str) -> List[AnsibleLintRule]: + """Return a list of rule classes.""" + result = [] + + for pluginfile in glob.glob(os.path.join(directory, '[A-Za-z]*.py')): + + pluginname = os.path.basename(pluginfile.replace('.py', '')) + spec = importlib.util.spec_from_file_location(pluginname, pluginfile) + # https://github.com/python/typeshed/issues/2793 + if spec and isinstance(spec.loader, Loader): + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + obj = getattr(module, pluginname)() + result.append(obj) + return result + + +class RulesCollection(object): + + def __init__(self, rulesdirs=None) -> None: + """Initialize a RulesCollection instance.""" + if rulesdirs is None: + rulesdirs = [] + self.rulesdirs = ansiblelint.utils.expand_paths_vars(rulesdirs) + self.rules: List[AnsibleLintRule] = [] + for rulesdir in self.rulesdirs: + _logger.debug("Loading rules from %s", rulesdir) + self.extend(load_plugins(rulesdir)) + self.rules = sorted(self.rules, key=lambda r: r.id) + + def register(self, obj: AnsibleLintRule): + self.rules.append(obj) + + def __iter__(self): + """Return the iterator over the rules in the RulesCollection.""" + return iter(self.rules) + + def __len__(self): + """Return the length of the RulesCollection data.""" + return len(self.rules) + + def extend(self, more: List[AnsibleLintRule]) -> None: + self.rules.extend(more) + + def run(self, playbookfile, tags=set(), skip_list=frozenset()) -> List: + text = "" + matches: List = list() + + for i in range(3): + try: + with open(playbookfile['path'], mode='r', encoding='utf-8') as f: + text = f.read() + break + except IOError as e: + _logger.warning( + "Couldn't open %s - %s [try:%s]", + playbookfile['path'], + e.strerror, + i) + sleep(1) + continue + if i and not text: + return matches + + for rule in self.rules: + if not tags or not set(rule.tags).union([rule.id]).isdisjoint(tags): + rule_definition = set(rule.tags) + rule_definition.add(rule.id) + if set(rule_definition).isdisjoint(skip_list): + matches.extend(rule.matchlines(playbookfile, text)) + matches.extend(rule.matchtasks(playbookfile, text)) + matches.extend(rule.matchyaml(playbookfile, text)) + + return matches + + def __repr__(self) -> str: + """Return a RulesCollection instance representation.""" + return "\n".join([rule.verbose() + for rule in sorted(self.rules, key=lambda x: x.id)]) + + def listtags(self) -> str: + tags = defaultdict(list) + for rule in self.rules: + for tag in rule.tags: + tags[tag].append("[{0}]".format(rule.id)) + results = [] + for tag in sorted(tags): + results.append("{0} {1}".format(tag, tags[tag])) + return "\n".join(results) diff --git a/lib/ansiblelint/rules/custom/__init__.py b/lib/ansiblelint/rules/custom/__init__.py new file mode 100644 index 0000000..8c3e048 --- /dev/null +++ b/lib/ansiblelint/rules/custom/__init__.py @@ -0,0 +1 @@ +"""A placeholder package for putting custom rules under this dir.""" diff --git a/lib/ansiblelint/runner.py b/lib/ansiblelint/runner.py new file mode 100644 index 0000000..f73945f --- /dev/null +++ b/lib/ansiblelint/runner.py @@ -0,0 +1,111 @@ +"""Runner implementation.""" +import logging +import os +from typing import TYPE_CHECKING, Any, FrozenSet, Generator, List, Optional, Set + +import ansiblelint.file_utils +import ansiblelint.skip_utils +import ansiblelint.utils +from ansiblelint.errors import MatchError +from ansiblelint.rules.LoadingFailureRule import LoadingFailureRule + +if TYPE_CHECKING: + from ansiblelint.rules import RulesCollection + + +_logger = logging.getLogger(__name__) + + +class Runner(object): + """Runner class performs the linting process.""" + + def __init__( + self, + rules: "RulesCollection", + playbook: str, + tags: FrozenSet[Any] = frozenset(), + skip_list: Optional[FrozenSet[Any]] = frozenset(), + exclude_paths: List[str] = [], + verbosity: int = 0, + checked_files: Set[str] = None) -> None: + """Initialize a Runner instance.""" + self.rules = rules + self.playbooks = set() + # assume role if directory + if os.path.isdir(playbook): + self.playbooks.add((os.path.join(playbook, ''), 'role')) + self.playbook_dir = playbook + else: + self.playbooks.add((playbook, 'playbook')) + self.playbook_dir = os.path.dirname(playbook) + self.tags = tags + self.skip_list = skip_list + self._update_exclude_paths(exclude_paths) + self.verbosity = verbosity + if checked_files is None: + checked_files = set() + self.checked_files = checked_files + + def _update_exclude_paths(self, exclude_paths: List[str]) -> None: + if exclude_paths: + # These will be (potentially) relative paths + paths = ansiblelint.utils.expand_paths_vars(exclude_paths) + # Since ansiblelint.utils.find_children returns absolute paths, + # and the list of files we create in `Runner.run` can contain both + # relative and absolute paths, we need to cover both bases. + self.exclude_paths = paths + [os.path.abspath(p) for p in paths] + else: + self.exclude_paths = [] + + def is_excluded(self, file_path: str) -> bool: + """Verify if a file path should be excluded.""" + # Any will short-circuit as soon as something returns True, but will + # be poor performance for the case where the path under question is + # not excluded. + return any(file_path.startswith(path) for path in self.exclude_paths) + + def run(self) -> List[MatchError]: + """Execute the linting process.""" + files = list() + for playbook in self.playbooks: + if self.is_excluded(playbook[0]) or playbook[1] == 'role': + continue + files.append({'path': ansiblelint.file_utils.normpath(playbook[0]), + 'type': playbook[1], + # add an absolute path here, so rules are able to validate if + # referenced files exist + 'absolute_directory': os.path.dirname(playbook[0])}) + matches = set(self._emit_matches(files)) + + # remove duplicates from files list + files = [value for n, value in enumerate(files) if value not in files[:n]] + + # remove files that have already been checked + files = [x for x in files if x['path'] not in self.checked_files] + for file in files: + _logger.debug( + "Examining %s of type %s", + ansiblelint.file_utils.normpath(file['path']), + file['type']) + matches = matches.union( + self.rules.run(file, tags=set(self.tags), + skip_list=self.skip_list)) + # update list of checked files + self.checked_files.update([x['path'] for x in files]) + + return sorted(matches) + + def _emit_matches(self, files: List) -> Generator[MatchError, None, None]: + visited: Set = set() + while visited != self.playbooks: + for arg in self.playbooks - visited: + try: + for child in ansiblelint.utils.find_children(arg, self.playbook_dir): + if self.is_excluded(child['path']): + continue + self.playbooks.add((child['path'], child['type'])) + files.append(child) + except MatchError as e: + e.rule = LoadingFailureRule + yield e + visited.add(arg) diff --git a/lib/ansiblelint/skip_utils.py b/lib/ansiblelint/skip_utils.py new file mode 100644 index 0000000..c3c0a88 --- /dev/null +++ b/lib/ansiblelint/skip_utils.py @@ -0,0 +1,189 @@ +# (c) 2019–2020, Ansible by Red Hat +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""Utils related to inline skipping of rules.""" +import logging +from functools import lru_cache +from itertools import product +from typing import Any, Generator, List, Sequence + +import ruamel.yaml + +from ansiblelint.constants import FileType + +INLINE_SKIP_FLAG = '# noqa ' + +_logger = logging.getLogger(__name__) + + +# playbook: Sequence currently expects only instances of one of the two +# classes below but we should consider avoiding this chimera. +# ruamel.yaml.comments.CommentedSeq +# ansible.parsing.yaml.objects.AnsibleSequence + + +def get_rule_skips_from_line(line: str) -> List: + """Return list of rule ids skipped via comment on the line of yaml.""" + _before_noqa, _noqa_marker, noqa_text = line.partition(INLINE_SKIP_FLAG) + return noqa_text.split() + + +def append_skipped_rules(pyyaml_data: str, file_text: str, file_type: FileType) -> Sequence: + """Append 'skipped_rules' to individual tasks or single metadata block. + + For a file, uses 2nd parser (ruamel.yaml) to pull comments out of + yaml subsets, check for '# noqa' skipped rules, and append any skips to the + original parser (pyyaml) data relied on by remainder of ansible-lint. + + :param pyyaml_data: file text parsed via ansible and pyyaml. + :param file_text: raw file text. + :param file_type: type of file: tasks, handlers or meta. + :returns: original pyyaml_data altered with a 'skipped_rules' list added + to individual tasks, or added to the single metadata block. + """ + try: + yaml_skip = _append_skipped_rules(pyyaml_data, file_text, file_type) + except RuntimeError: + # Notify user of skip error, do not stop, do not change exit code + _logger.error('Error trying to append skipped rules', exc_info=True) + return pyyaml_data + return yaml_skip + + +@lru_cache(maxsize=128) +def load_data(file_text: str) -> Any: + """Parse `file_text` as yaml and return parsed structure. + + This is the main culprit for slow performance, each rule asks for loading yaml again and again + ideally the `maxsize` on the decorator above MUST be great or equal total number of rules + :param file_text: raw text to parse + :return: Parsed yaml + """ + yaml = ruamel.yaml.YAML() + return yaml.load(file_text) + + +def _append_skipped_rules(pyyaml_data: Sequence, file_text: str, file_type: FileType) -> Sequence: + # parse file text using 2nd parser library + ruamel_data = load_data(file_text) + + if file_type == 'meta': + pyyaml_data[0]['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_data) + return pyyaml_data + + # create list of blocks of tasks or nested tasks + if file_type in ('tasks', 'handlers'): + ruamel_task_blocks = ruamel_data + pyyaml_task_blocks = pyyaml_data + elif file_type in ('playbook', 'pre_tasks', 'post_tasks'): + try: + pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data) + ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data) + except (AttributeError, TypeError): + # TODO(awcrosby): running ansible-lint on any .yml file will + # assume it is a playbook, check needs to be added higher in the + # call stack, and can remove this except + return pyyaml_data + else: + raise RuntimeError('Unexpected file type: {}'.format(file_type)) + + # get tasks from blocks of tasks + pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks) + ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks) + + # append skipped_rules for each task + for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks): + + # ignore empty tasks + if not pyyaml_task and not ruamel_task: + continue + + if pyyaml_task.get('name') != ruamel_task.get('name'): + raise RuntimeError('Error in matching skip comment to a task') + pyyaml_task['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_task) + + return pyyaml_data + + +def _get_task_blocks_from_playbook(playbook: Sequence) -> List: + """Return parts of playbook that contains tasks, and nested tasks. + + :param playbook: playbook yaml from yaml parser. + :returns: list of task dictionaries. + """ + PLAYBOOK_TASK_KEYWORDS = [ + 'tasks', + 'pre_tasks', + 'post_tasks', + 'handlers', + ] + + task_blocks = [] + for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS): + task_blocks.extend(play.get(key, [])) + return task_blocks + + +def _get_tasks_from_blocks(task_blocks: Sequence) -> Generator: + """Get list of tasks from list made of tasks and nested tasks.""" + NESTED_TASK_KEYS = [ + 'block', + 'always', + 'rescue', + ] + + def get_nested_tasks(task: Any) -> Generator[Any, None, None]: + return ( + subtask + for k in NESTED_TASK_KEYS if task and k in task + for subtask in task[k] + ) + + for task in task_blocks: + for sub_task in get_nested_tasks(task): + yield sub_task + yield task + + +def _get_rule_skips_from_yaml(yaml_input: Sequence) -> Sequence: + """Traverse yaml for comments with rule skips and return list of rules.""" + yaml_comment_obj_strs = [] + + def traverse_yaml(obj: Any) -> None: + yaml_comment_obj_strs.append(str(obj.ca.items)) + if isinstance(obj, dict): + for key, val in obj.items(): + if isinstance(val, (dict, list)): + traverse_yaml(val) + elif isinstance(obj, list): + for e in obj: + if isinstance(e, (dict, list)): + traverse_yaml(e) + else: + return + + traverse_yaml(yaml_input) + + rule_id_list = [] + for comment_obj_str in yaml_comment_obj_strs: + for line in comment_obj_str.split(r'\n'): + rule_id_list.extend(get_rule_skips_from_line(line)) + + return rule_id_list diff --git a/lib/ansiblelint/testing/__init__.py b/lib/ansiblelint/testing/__init__.py new file mode 100644 index 0000000..1ed686f --- /dev/null +++ b/lib/ansiblelint/testing/__init__.py @@ -0,0 +1,84 @@ +"""Test utils for ansible-lint.""" + +import os +import shutil +import subprocess +import sys +import tempfile +from typing import TYPE_CHECKING, Dict, List + +from ansible import __version__ as ansible_version_str + +from ansiblelint.runner import Runner + +if TYPE_CHECKING: + from ansiblelint.errors import MatchError + + +ANSIBLE_MAJOR_VERSION = tuple(map(int, ansible_version_str.split('.')[:2])) + + +class RunFromText(object): + """Use Runner on temp files created from unittest text snippets.""" + + def __init__(self, collection): + """Initialize a RunFromText instance with rules collection.""" + self.collection = collection + + def _call_runner(self, path) -> List["MatchError"]: + runner = Runner(self.collection, path) + return runner.run() + + def run_playbook(self, playbook_text): + """Lints received text as a playbook.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", prefix="playbook") as fp: + fp.write(playbook_text) + fp.flush() + results = self._call_runner(fp.name) + return results + + def run_role_tasks_main(self, tasks_main_text): + """Lints received text as tasks.""" + role_path = tempfile.mkdtemp(prefix='role_') + tasks_path = os.path.join(role_path, 'tasks') + os.makedirs(tasks_path) + with open(os.path.join(tasks_path, 'main.yml'), 'w') as fp: + fp.write(tasks_main_text) + results = self._call_runner(role_path) + shutil.rmtree(role_path) + return results + + def run_role_meta_main(self, meta_main_text): + """Lints received text as meta.""" + role_path = tempfile.mkdtemp(prefix='role_') + meta_path = os.path.join(role_path, 'meta') + os.makedirs(meta_path) + with open(os.path.join(meta_path, 'main.yml'), 'w') as fp: + fp.write(meta_main_text) + results = self._call_runner(role_path) + shutil.rmtree(role_path) + return results + + +def run_ansible_lint( + *argv: str, + cwd: str = None, + bin: str = None, + env: Dict[str, str] = None) -> subprocess.CompletedProcess: + """Run ansible-lint on a given path and returns its output.""" + if not bin: + bin = sys.executable + args = [sys.executable, "-m", "ansiblelint", *argv] + else: + args = [bin, *argv] + + return subprocess.run( + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, # needed when command is a list + check=False, + cwd=cwd, + env=env, + universal_newlines=True + ) diff --git a/lib/ansiblelint/utils.py b/lib/ansiblelint/utils.py new file mode 100644 index 0000000..feac4d7 --- /dev/null +++ b/lib/ansiblelint/utils.py @@ -0,0 +1,836 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Generic utility helpers.""" + +import contextlib +import inspect +import logging +import os +import pprint +import subprocess +from argparse import Namespace +from collections import OrderedDict +from functools import lru_cache +from pathlib import Path +from typing import Callable, ItemsView, List, Optional, Tuple + +import yaml +from ansible import constants +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.parsing.dataloader import DataLoader +from ansible.parsing.mod_args import ModuleArgsParser +from ansible.parsing.splitter import split_args +from ansible.parsing.yaml.constructor import AnsibleConstructor +from ansible.parsing.yaml.loader import AnsibleLoader +from ansible.parsing.yaml.objects import AnsibleSequence +from ansible.plugins.loader import add_all_plugin_dirs +from ansible.template import Templar +from yaml.composer import Composer +from yaml.representer import RepresenterError + +from ansiblelint.constants import ( + ANSIBLE_FAILURE_RC, CUSTOM_RULESDIR_ENVVAR, DEFAULT_RULESDIR, FileType, +) +from ansiblelint.errors import MatchError +from ansiblelint.file_utils import normpath + +# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a +# string as the password to enable such yaml files to be opened and parsed +# successfully. +DEFAULT_VAULT_PASSWORD = 'x' + +PLAYBOOK_DIR = os.environ.get('ANSIBLE_PLAYBOOK_DIR', None) + + +_logger = logging.getLogger(__name__) + + +def parse_yaml_from_file(filepath: str) -> dict: + dl = DataLoader() + if hasattr(dl, 'set_vault_password'): + dl.set_vault_password(DEFAULT_VAULT_PASSWORD) + return dl.load_from_file(filepath) + + +def path_dwim(basedir: str, given: str) -> str: + dl = DataLoader() + dl.set_basedir(basedir) + return dl.path_dwim(given) + + +def ansible_template(basedir, varname, templatevars, **kwargs): + dl = DataLoader() + dl.set_basedir(basedir) + templar = Templar(dl, variables=templatevars) + return templar.template(varname, **kwargs) + + +LINE_NUMBER_KEY = '__line__' +FILENAME_KEY = '__file__' + +VALID_KEYS = [ + 'name', 'action', 'when', 'async', 'poll', 'notify', + 'first_available_file', 'include', 'include_tasks', 'import_tasks', 'import_playbook', + 'tags', 'register', 'ignore_errors', 'delegate_to', + 'local_action', 'transport', 'remote_user', 'sudo', + 'sudo_user', 'sudo_pass', 'when', 'connection', 'environment', 'args', 'always_run', + 'any_errors_fatal', 'changed_when', 'failed_when', 'check_mode', 'delay', + 'retries', 'until', 'su', 'su_user', 'su_pass', 'no_log', 'run_once', + 'become', 'become_user', 'become_method', FILENAME_KEY, +] + +BLOCK_NAME_TO_ACTION_TYPE_MAP = { + 'tasks': 'task', + 'handlers': 'handler', + 'pre_tasks': 'task', + 'post_tasks': 'task', + 'block': 'meta', + 'rescue': 'meta', + 'always': 'meta', +} + + +def tokenize(line): + tokens = line.lstrip().split(" ") + if tokens[0] == '-': + tokens = tokens[1:] + if tokens[0] == 'action:' or tokens[0] == 'local_action:': + tokens = tokens[1:] + command = tokens[0].replace(":", "") + + args = list() + kwargs = dict() + nonkvfound = False + for arg in tokens[1:]: + if "=" in arg and not nonkvfound: + kv = arg.split("=", 1) + kwargs[kv[0]] = kv[1] + else: + nonkvfound = True + args.append(arg) + return (command, args, kwargs) + + +def _playbook_items(pb_data: dict) -> ItemsView: + if isinstance(pb_data, dict): + return pb_data.items() + elif not pb_data: + return [] + else: + return [item for play in pb_data for item in play.items()] + + +def _rebind_match_filename(filename: str, func) -> Callable: + def func_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except MatchError as e: + e.filename = filename + raise e + return func_wrapper + + +def _set_collections_basedir(basedir: str): + # Sets the playbook directory as playbook_paths for the collection loader + try: + # Ansible 2.10+ + # noqa: # pylint:disable=cyclic-import,import-outside-toplevel + from ansible.utils.collection_loader import AnsibleCollectionConfig + + AnsibleCollectionConfig.playbook_paths = basedir + except ImportError: + # Ansible 2.8 or 2.9 + # noqa: # pylint:disable=cyclic-import,import-outside-toplevel + from ansible.utils.collection_loader import set_collection_playbook_paths + + set_collection_playbook_paths(basedir) + + +def find_children(playbook: Tuple[str, str], playbook_dir: str) -> List: + if not os.path.exists(playbook[0]): + return [] + _set_collections_basedir(playbook_dir or '.') + add_all_plugin_dirs(playbook_dir or '.') + if playbook[1] == 'role': + playbook_ds = {'roles': [{'role': playbook[0]}]} + else: + try: + playbook_ds = parse_yaml_from_file(playbook[0]) + except AnsibleError as e: + raise SystemExit(str(e)) + results = [] + basedir = os.path.dirname(playbook[0]) + items = _playbook_items(playbook_ds) + for item in items: + for child in _rebind_match_filename(playbook[0], play_children)( + basedir, item, playbook[1], playbook_dir): + if "$" in child['path'] or "{{" in child['path']: + continue + valid_tokens = list() + for token in split_args(child['path']): + if '=' in token: + break + valid_tokens.append(token) + path = ' '.join(valid_tokens) + results.append({ + 'path': path_dwim(basedir, path), + 'type': child['type'] + }) + return results + + +def template(basedir, value, vars, fail_on_undefined=False, **kwargs): + try: + value = ansible_template(os.path.abspath(basedir), value, vars, + **dict(kwargs, fail_on_undefined=fail_on_undefined)) + # Hack to skip the following exception when using to_json filter on a variable. + # I guess the filter doesn't like empty vars... + except (AnsibleError, ValueError, RepresenterError): + # templating failed, so just keep value as is. + pass + return value + + +def play_children(basedir, item, parent_type, playbook_dir): + delegate_map = { + 'tasks': _taskshandlers_children, + 'pre_tasks': _taskshandlers_children, + 'post_tasks': _taskshandlers_children, + 'block': _taskshandlers_children, + 'include': _include_children, + 'import_playbook': _include_children, + 'roles': _roles_children, + 'dependencies': _roles_children, + 'handlers': _taskshandlers_children, + 'include_tasks': _include_children, + 'import_tasks': _include_children, + } + (k, v) = item + add_all_plugin_dirs(os.path.abspath(basedir)) + + if k in delegate_map: + if v: + v = template(os.path.abspath(basedir), + v, + dict(playbook_dir=PLAYBOOK_DIR or os.path.abspath(basedir)), + fail_on_undefined=False) + return delegate_map[k](basedir, k, v, parent_type) + return [] + + +def _include_children(basedir, k, v, parent_type): + # handle special case include_tasks: name=filename.yml + if k == 'include_tasks' and isinstance(v, dict) and 'file' in v: + v = v['file'] + + # handle include: filename.yml tags=blah + (command, args, kwargs) = tokenize("{0}: {1}".format(k, v)) + + result = path_dwim(basedir, args[0]) + if not os.path.exists(result): + result = path_dwim(os.path.join(os.path.dirname(basedir)), v) + return [{'path': result, 'type': parent_type}] + + +def _taskshandlers_children(basedir, k, v, parent_type: FileType) -> List: + results = [] + for th in v: + + # ignore empty tasks, `-` + if not th: + continue + + with contextlib.suppress(LookupError): + children = _get_task_handler_children_for_tasks_or_playbooks( + th, basedir, k, parent_type, + ) + results.append(children) + continue + + if 'include_role' in th or 'import_role' in th: # lgtm [py/unreachable-statement] + th = normalize_task_v2(th) + _validate_task_handler_action_for_role(th['action']) + results.extend(_roles_children(basedir, k, [th['action'].get("name")], + parent_type, + main=th['action'].get('tasks_from', 'main'))) + continue + + if 'block' not in th: + continue + + results.extend(_taskshandlers_children(basedir, k, th['block'], parent_type)) + if 'rescue' in th: + results.extend(_taskshandlers_children(basedir, k, th['rescue'], parent_type)) + if 'always' in th: + results.extend(_taskshandlers_children(basedir, k, th['always'], parent_type)) + + return results + + +def _get_task_handler_children_for_tasks_or_playbooks( + task_handler, basedir: str, k, parent_type: FileType, +) -> dict: + """Try to get children of taskhandler for include/import tasks/playbooks.""" + child_type = k if parent_type == 'playbook' else parent_type + + task_include_keys = 'include', 'include_tasks', 'import_playbook', 'import_tasks' + for task_handler_key in task_include_keys: + + with contextlib.suppress(KeyError): + + # ignore empty tasks + if not task_handler: + continue + + return { + 'path': path_dwim(basedir, task_handler[task_handler_key]), + 'type': child_type, + } + + raise LookupError( + f'The node contains none of: {", ".join(task_include_keys)}', + ) + + +def _validate_task_handler_action_for_role(th_action: dict) -> None: + """Verify that the task handler action is valid for role include.""" + module = th_action['__ansible_module__'] + + if 'name' not in th_action: + raise MatchError(f"Failed to find required 'name' key in {module!s}") + + if not isinstance(th_action['name'], str): + raise RuntimeError( + f"Value assigned to 'name' key on '{module!s}' is not a string.", + ) + + +def _roles_children(basedir: str, k, v, parent_type: FileType, main='main') -> list: + results = [] + for role in v: + if isinstance(role, dict): + if 'role' in role or 'name' in role: + if 'tags' not in role or 'skip_ansible_lint' not in role['tags']: + results.extend(_look_for_role_files(basedir, + role.get('role', role.get('name')), + main=main)) + elif k != 'dependencies': + raise SystemExit('role dict {0} does not contain a "role" ' + 'or "name" key'.format(role)) + else: + results.extend(_look_for_role_files(basedir, role, main=main)) + return results + + +def _rolepath(basedir: str, role: str) -> Optional[str]: + role_path = None + + possible_paths = [ + # if included from a playbook + path_dwim(basedir, os.path.join('roles', role)), + path_dwim(basedir, role), + # if included from roles/[role]/meta/main.yml + path_dwim( + basedir, os.path.join('..', '..', '..', 'roles', role) + ), + path_dwim(basedir, os.path.join('..', '..', role)), + ] + + if constants.DEFAULT_ROLES_PATH: + search_locations = constants.DEFAULT_ROLES_PATH + if isinstance(search_locations, str): + search_locations = search_locations.split(os.pathsep) + for loc in search_locations: + loc = os.path.expanduser(loc) + possible_paths.append(path_dwim(loc, role)) + + possible_paths.append(path_dwim(basedir, '')) + + for path_option in possible_paths: + if os.path.isdir(path_option): + role_path = path_option + break + + if role_path: + add_all_plugin_dirs(role_path) + + return role_path + + +def _look_for_role_files(basedir: str, role: str, main='main') -> list: + role_path = _rolepath(basedir, role) + if not role_path: + return [] + + results = [] + + for th in ['tasks', 'handlers', 'meta']: + current_path = os.path.join(role_path, th) + for dir, subdirs, files in os.walk(current_path): + for file in files: + file_ignorecase = file.lower() + if file_ignorecase.endswith(('.yml', '.yaml')): + thpath = os.path.join(dir, file) + results.append({'path': thpath, 'type': th}) + + return results + + +def rolename(filepath): + idx = filepath.find('roles/') + if idx < 0: + return '' + role = filepath[idx + 6:] + role = role[:role.find('/')] + return role + + +def _kv_to_dict(v): + (command, args, kwargs) = tokenize(v) + return dict(__ansible_module__=command, __ansible_arguments__=args, **kwargs) + + +def _sanitize_task(task: dict) -> dict: + """Return a stripped-off task structure compatible with new Ansible. + + This helper takes a copy of the incoming task and drops + any internally used keys from it. + """ + result = task.copy() + # task is an AnsibleMapping which inherits from OrderedDict, so we need + # to use `del` to remove unwanted keys. + for k in ['skipped_rules', FILENAME_KEY, LINE_NUMBER_KEY]: + if k in result: + del result[k] + return result + + +# FIXME: drop noqa once this function is made simpler +# Ref: https://github.com/ansible/ansible-lint/issues/744 +def normalize_task_v2(task: dict) -> dict: # noqa: C901 + """Ensure tasks have an action key and strings are converted to python objects.""" + result = dict() + if 'always_run' in task: + # FIXME(ssbarnea): Delayed import to avoid circular import + # See https://github.com/ansible/ansible-lint/issues/880 + # noqa: # pylint:disable=cyclic-import,import-outside-toplevel + from ansiblelint.rules.AlwaysRunRule import AlwaysRunRule + + raise MatchError( + rule=AlwaysRunRule, + filename=task[FILENAME_KEY], + linenumber=task[LINE_NUMBER_KEY]) + + sanitized_task = _sanitize_task(task) + mod_arg_parser = ModuleArgsParser(sanitized_task) + try: + action, arguments, result['delegate_to'] = mod_arg_parser.parse() + except AnsibleParserError as e: + try: + task_info = "%s:%s" % (task[FILENAME_KEY], task[LINE_NUMBER_KEY]) + except KeyError: + task_info = "Unknown" + pp = pprint.PrettyPrinter(indent=2) + task_pprint = pp.pformat(sanitized_task) + + _logger.critical("Couldn't parse task at %s (%s)\n%s", task_info, e.message, task_pprint) + raise SystemExit(ANSIBLE_FAILURE_RC) + + # denormalize shell -> command conversion + if '_uses_shell' in arguments: + action = 'shell' + del arguments['_uses_shell'] + + for (k, v) in list(task.items()): + if k in ('action', 'local_action', 'args', 'delegate_to') or k == action: + # we don't want to re-assign these values, which were + # determined by the ModuleArgsParser() above + continue + else: + result[k] = v + + result['action'] = dict(__ansible_module__=action) + + if '_raw_params' in arguments: + result['action']['__ansible_arguments__'] = arguments['_raw_params'].split(' ') + del arguments['_raw_params'] + else: + result['action']['__ansible_arguments__'] = list() + + if 'argv' in arguments and not result['action']['__ansible_arguments__']: + result['action']['__ansible_arguments__'] = arguments['argv'] + del arguments['argv'] + + result['action'].update(arguments) + return result + + +# FIXME: drop noqa once this function is made simpler +# Ref: https://github.com/ansible/ansible-lint/issues/744 +def normalize_task_v1(task): # noqa: C901 + result = dict() + for (k, v) in task.items(): + if k in VALID_KEYS or k.startswith('with_'): + if k == 'local_action' or k == 'action': + if not isinstance(v, dict): + v = _kv_to_dict(v) + v['__ansible_arguments__'] = v.get('__ansible_arguments__', list()) + result['action'] = v + else: + result[k] = v + else: + if isinstance(v, str): + v = _kv_to_dict(k + ' ' + v) + elif not v: + v = dict(__ansible_module__=k) + else: + if isinstance(v, dict): + v.update(dict(__ansible_module__=k)) + else: + if k == '__line__': + # Keep the line number stored + result[k] = v + continue + + else: + # Tasks that include playbooks (rather than task files) + # can get here + # https://github.com/ansible/ansible-lint/issues/138 + raise RuntimeError("Was not expecting value %s of type %s for key %s\n" + "Task: %s. Check the syntax of your playbook using " + "ansible-playbook --syntax-check" % + (str(v), type(v), k, str(task))) + v['__ansible_arguments__'] = v.get('__ansible_arguments__', list()) + result['action'] = v + if 'module' in result['action']: + # this happens when a task uses + # local_action: + # module: ec2 + # etc... + result['action']['__ansible_module__'] = result['action']['module'] + del result['action']['module'] + if 'args' in result: + result['action'].update(result.get('args')) + del result['args'] + return result + + +def normalize_task(task, filename): + ansible_action_type = task.get('__ansible_action_type__', 'task') + if '__ansible_action_type__' in task: + del task['__ansible_action_type__'] + task = normalize_task_v2(task) + task[FILENAME_KEY] = filename + task['__ansible_action_type__'] = ansible_action_type + return task + + +def task_to_str(task): + name = task.get("name") + if name: + return name + action = task.get("action") + args = " ".join([u"{0}={1}".format(k, v) for (k, v) in action.items() + if k not in ["__ansible_module__", "__ansible_arguments__"]] + + action.get("__ansible_arguments__")) + return u"{0} {1}".format(action["__ansible_module__"], args) + + +def extract_from_list(blocks, candidates): + results = list() + for block in blocks: + for candidate in candidates: + if isinstance(block, dict) and candidate in block: + if isinstance(block[candidate], list): + results.extend(add_action_type(block[candidate], candidate)) + elif block[candidate] is not None: + raise RuntimeError( + "Key '%s' defined, but bad value: '%s'" % + (candidate, str(block[candidate]))) + return results + + +def add_action_type(actions, action_type): + results = list() + for action in actions: + # ignore empty task + if not action: + continue + action['__ansible_action_type__'] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type] + results.append(action) + return results + + +def get_action_tasks(yaml, file): + tasks = list() + if file['type'] in ['tasks', 'handlers']: + tasks = add_action_type(yaml, file['type']) + else: + tasks.extend(extract_from_list(yaml, ['tasks', 'handlers', 'pre_tasks', 'post_tasks'])) + + # Add sub-elements of block/rescue/always to tasks list + tasks.extend(extract_from_list(tasks, ['block', 'rescue', 'always'])) + # Remove block/rescue/always elements from tasks list + block_rescue_always = ('block', 'rescue', 'always') + tasks[:] = [task for task in tasks if all(k not in task for k in block_rescue_always)] + + return [task for task in tasks if + set(['include', 'include_tasks', + 'import_playbook', 'import_tasks']).isdisjoint(task.keys())] + + +def get_normalized_tasks(yaml, file): + tasks = get_action_tasks(yaml, file) + res = [] + for task in tasks: + # An empty `tags` block causes `None` to be returned if + # the `or []` is not present - `task.get('tags', [])` + # does not suffice. + if 'skip_ansible_lint' in (task.get('tags') or []): + # No need to normalize_task is we are skipping it. + continue + res.append(normalize_task(task, file['path'])) + + return res + + +@lru_cache(maxsize=128) +def parse_yaml_linenumbers(data, filename): + """Parse yaml as ansible.utils.parse_yaml but with linenumbers. + + The line numbers are stored in each node's LINE_NUMBER_KEY key. + """ + def compose_node(parent, index): + # the line number where the previous token has ended (plus empty lines) + line = loader.line + node = Composer.compose_node(loader, parent, index) + node.__line__ = line + 1 + return node + + def construct_mapping(node, deep=False): + mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep) + if hasattr(node, '__line__'): + mapping[LINE_NUMBER_KEY] = node.__line__ + else: + mapping[LINE_NUMBER_KEY] = mapping._line_number + mapping[FILENAME_KEY] = filename + return mapping + + try: + kwargs = {} + if 'vault_password' in inspect.getfullargspec(AnsibleLoader.__init__).args: + kwargs['vault_password'] = DEFAULT_VAULT_PASSWORD + loader = AnsibleLoader(data, **kwargs) + loader.compose_node = compose_node + loader.construct_mapping = construct_mapping + data = loader.get_single_data() + except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e: + raise SystemExit("Failed to parse YAML in %s: %s" % (filename, str(e))) + return data + + +def get_first_cmd_arg(task): + try: + if 'cmd' in task['action']: + first_cmd_arg = task['action']['cmd'].split()[0] + else: + first_cmd_arg = task['action']['__ansible_arguments__'][0] + except IndexError: + return None + return first_cmd_arg + + +def is_playbook(filename: str) -> bool: + """ + Check if the file is a playbook. + + Given a filename, it should return true if it looks like a playbook. The + function is not supposed to raise exceptions. + """ + # we assume is a playbook if we loaded a sequence of dictionaries where + # at least one of these keys is present: + playbooks_keys = { + "gather_facts", + "hosts", + "import_playbook", + "post_tasks", + "pre_tasks", + "roles" + "tasks", + } + + # makes it work with Path objects by converting them to strings + if not isinstance(filename, str): + filename = str(filename) + + try: + f = parse_yaml_from_file(filename) + except Exception as e: + _logger.warning( + "Failed to load %s with %s, assuming is not a playbook.", + filename, e) + else: + if ( + isinstance(f, AnsibleSequence) and + hasattr(f, 'keys') and + playbooks_keys.intersection(next(iter(f), {}).keys()) + ): + return True + return False + + +def get_yaml_files(options: Namespace) -> dict: + """Find all yaml files.""" + # git is preferred as it also considers .gitignore + git_command = ['git', 'ls-files', '*.yaml', '*.yml'] + _logger.info("Discovering files to lint: %s", ' '.join(git_command)) + + out = None + + try: + out = subprocess.check_output( + git_command, + stderr=subprocess.STDOUT, + universal_newlines=True + ).split() + except subprocess.CalledProcessError as exc: + _logger.warning( + "Failed to discover yaml files to lint using git: %s", + exc.output.rstrip('\n') + ) + except FileNotFoundError as exc: + if options.verbosity: + _logger.warning( + "Failed to locate command: %s", exc + ) + + if out is None: + out = [ + os.path.join(root, name) + for root, dirs, files in os.walk('.') + for name in files + if name.endswith('.yaml') or name.endswith('.yml') + ] + + return OrderedDict.fromkeys(sorted(out)) + + +# FIXME: drop noqa once this function is made simpler +# Ref: https://github.com/ansible/ansible-lint/issues/744 +def get_playbooks_and_roles(options=None) -> List[str]: # noqa: C901 + """Find roles and playbooks.""" + if options is None: + options = {} + + files = get_yaml_files(options) + + playbooks = [] + role_dirs = [] + role_internals = { + 'defaults', + 'files', + 'handlers', + 'meta', + 'tasks', + 'templates', + 'vars', + } + + # detect role in repository root: + if 'tasks/main.yml' in files or 'tasks/main.yaml' in files: + role_dirs.append('.') + + for p in map(Path, files): + + try: + for file_path in options.exclude_paths: + if str(p.resolve()).startswith(str(file_path)): + raise FileNotFoundError( + f'File {file_path} matched exclusion entry: {p}') + except FileNotFoundError as e: + _logger.debug('Ignored %s due to: %s', p, e) + continue + + if (next((i for i in p.parts if i.endswith('playbooks')), None) or + 'playbook' in p.parts[-1]): + playbooks.append(normpath(p)) + continue + + # ignore if any folder ends with _vars + if next((i for i in p.parts if i.endswith('_vars')), None): + continue + elif 'roles' in p.parts or '.' in role_dirs: + if 'tasks' in p.parts and p.parts[-1] in ['main.yaml', 'main.yml']: + role_dirs.append(str(p.parents[1])) + continue + elif role_internals.intersection(p.parts): + continue + elif 'tests' in p.parts: + playbooks.append(normpath(p)) + if 'molecule' in p.parts: + if p.parts[-1] != 'molecule.yml': + playbooks.append(normpath(p)) + continue + # hidden files are clearly not playbooks, likely config files. + if p.parts[-1].startswith('.'): + continue + + if is_playbook(str(p)): + playbooks.append(normpath(p)) + continue + + _logger.info('Unknown file type: %s', normpath(p)) + + _logger.info('Found roles: %s', ' '.join(role_dirs)) + _logger.info('Found playbooks: %s', ' '.join(playbooks)) + + return role_dirs + playbooks + + +def expand_path_vars(path: str) -> str: + """Expand the environment or ~ variables in a path string.""" + # It may be possible for function to be called with a Path object + path = str(path).strip() + path = os.path.expanduser(path) + path = os.path.expandvars(path) + return path + + +def expand_paths_vars(paths: List[str]) -> List[str]: + """Expand the environment or ~ variables in a list.""" + paths = [expand_path_vars(p) for p in paths] + return paths + + +def get_rules_dirs(rulesdir: List[str], use_default: bool) -> List[str]: + """Return a list of rules dirs.""" + default_ruledirs = [DEFAULT_RULESDIR] + default_custom_rulesdir = os.environ.get( + CUSTOM_RULESDIR_ENVVAR, os.path.join(DEFAULT_RULESDIR, "custom") + ) + custom_ruledirs = sorted( + str(rdir.resolve()) + for rdir in Path(default_custom_rulesdir).iterdir() + if rdir.is_dir() and (rdir / "__init__.py").exists() + ) + if use_default: + return rulesdir + custom_ruledirs + default_ruledirs + + return rulesdir or custom_ruledirs + default_ruledirs diff --git a/lib/ansiblelint/version.py b/lib/ansiblelint/version.py new file mode 100644 index 0000000..7bbf973 --- /dev/null +++ b/lib/ansiblelint/version.py @@ -0,0 +1,12 @@ +"""Ansible-lint version information.""" + +try: + import pkg_resources +except ImportError: + pass + + +try: + __version__ = pkg_resources.get_distribution('ansible-lint').version +except Exception: + __version__ = 'unknown' |