summaryrefslogtreecommitdiffstats
path: root/lib/ansiblelint
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/ansiblelint/__init__.py28
-rwxr-xr-xlib/ansiblelint/__main__.py270
-rw-r--r--lib/ansiblelint/cli.py219
-rw-r--r--lib/ansiblelint/color.py31
-rw-r--r--lib/ansiblelint/constants.py18
-rw-r--r--lib/ansiblelint/errors.py81
-rw-r--r--lib/ansiblelint/file_utils.py25
-rw-r--r--lib/ansiblelint/formatters/__init__.py167
-rw-r--r--lib/ansiblelint/generate_docs.py66
-rw-r--r--lib/ansiblelint/rules/AlwaysRunRule.py33
-rw-r--r--lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py80
-rw-r--r--lib/ansiblelint/rules/CommandHasChangesCheckRule.py45
-rw-r--r--lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py65
-rw-r--r--lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py86
-rw-r--r--lib/ansiblelint/rules/ComparisonToEmptyStringRule.py23
-rw-r--r--lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py23
-rw-r--r--lib/ansiblelint/rules/DeprecatedModuleRule.py37
-rw-r--r--lib/ansiblelint/rules/EnvVarsInCommandRule.py48
-rw-r--r--lib/ansiblelint/rules/GitHasVersionRule.py37
-rw-r--r--lib/ansiblelint/rules/IncludeMissingFileRule.py67
-rw-r--r--lib/ansiblelint/rules/LineTooLongRule.py19
-rw-r--r--lib/ansiblelint/rules/LoadingFailureRule.py14
-rw-r--r--lib/ansiblelint/rules/MercurialHasRevisionRule.py37
-rw-r--r--lib/ansiblelint/rules/MetaChangeFromDefaultRule.py40
-rw-r--r--lib/ansiblelint/rules/MetaMainHasInfoRule.py66
-rw-r--r--lib/ansiblelint/rules/MetaTagValidRule.py81
-rw-r--r--lib/ansiblelint/rules/MetaVideoLinksRule.py65
-rw-r--r--lib/ansiblelint/rules/MissingFilePermissionsRule.py95
-rw-r--r--lib/ansiblelint/rules/NestedJinjaRule.py53
-rw-r--r--lib/ansiblelint/rules/NoFormattingInWhenRule.py34
-rw-r--r--lib/ansiblelint/rules/NoTabsRule.py16
-rw-r--r--lib/ansiblelint/rules/OctalPermissionsRule.py73
-rw-r--r--lib/ansiblelint/rules/PackageIsNotLatestRule.py67
-rw-r--r--lib/ansiblelint/rules/PlaybookExtension.py28
-rw-r--r--lib/ansiblelint/rules/RoleNames.py74
-rw-r--r--lib/ansiblelint/rules/RoleRelativePath.py32
-rw-r--r--lib/ansiblelint/rules/ShellWithoutPipefail.py38
-rw-r--r--lib/ansiblelint/rules/SudoRule.py36
-rw-r--r--lib/ansiblelint/rules/TaskHasNameRule.py40
-rw-r--r--lib/ansiblelint/rules/TaskNoLocalAction.py18
-rw-r--r--lib/ansiblelint/rules/TrailingWhitespaceRule.py34
-rw-r--r--lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py45
-rw-r--r--lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py52
-rw-r--r--lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py75
-rw-r--r--lib/ansiblelint/rules/VariableHasSpacesRule.py24
-rw-r--r--lib/ansiblelint/rules/__init__.py254
-rw-r--r--lib/ansiblelint/rules/custom/__init__.py1
-rw-r--r--lib/ansiblelint/runner.py111
-rw-r--r--lib/ansiblelint/skip_utils.py189
-rw-r--r--lib/ansiblelint/testing/__init__.py84
-rw-r--r--lib/ansiblelint/utils.py836
-rw-r--r--lib/ansiblelint/version.py12
52 files changed, 4092 insertions, 0 deletions
diff --git a/lib/ansiblelint/__init__.py b/lib/ansiblelint/__init__.py
new file mode 100644
index 0000000..d8a4cef
--- /dev/null
+++ b/lib/ansiblelint/__init__.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+"""Main ansible-lint package."""
+
+from ansiblelint.rules import AnsibleLintRule
+from ansiblelint.version import __version__
+
+__all__ = (
+ "__version__",
+ "AnsibleLintRule" # deprecated, import it directly from rules
+)
diff --git a/lib/ansiblelint/__main__.py b/lib/ansiblelint/__main__.py
new file mode 100755
index 0000000..ff8d477
--- /dev/null
+++ b/lib/ansiblelint/__main__.py
@@ -0,0 +1,270 @@
+#!/usr/bin/env python
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+"""Command line implementation."""
+
+import errno
+import logging
+import os
+import pathlib
+import subprocess
+import sys
+from contextlib import contextmanager
+from typing import TYPE_CHECKING, Any, List, Set, Type, Union
+
+from rich.markdown import Markdown
+
+from ansiblelint import cli, formatters
+from ansiblelint.color import console, console_stderr
+from ansiblelint.file_utils import cwd
+from ansiblelint.generate_docs import rules_as_rich, rules_as_rst
+from ansiblelint.rules import RulesCollection
+from ansiblelint.runner import Runner
+from ansiblelint.utils import get_playbooks_and_roles, get_rules_dirs
+
+if TYPE_CHECKING:
+ from argparse import Namespace
+
+ from ansiblelint.errors import MatchError
+
+_logger = logging.getLogger(__name__)
+
+_rule_format_map = {
+ 'plain': str,
+ 'rich': rules_as_rich,
+ 'rst': rules_as_rst
+}
+
+
+def initialize_logger(level: int = 0) -> None:
+ """Set up the global logging level based on the verbosity number."""
+ VERBOSITY_MAP = {
+ 0: logging.NOTSET,
+ 1: logging.INFO,
+ 2: logging.DEBUG
+ }
+
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter('%(levelname)-8s %(message)s')
+ handler.setFormatter(formatter)
+ logger = logging.getLogger(__package__)
+ logger.addHandler(handler)
+ # Unknown logging level is treated as DEBUG
+ logging_level = VERBOSITY_MAP.get(level, logging.DEBUG)
+ logger.setLevel(logging_level)
+ # Use module-level _logger instance to validate it
+ _logger.debug("Logging initialized to level %s", logging_level)
+
+
+def choose_formatter_factory(
+ options_list: "Namespace"
+) -> Type[formatters.BaseFormatter]:
+ """Select an output formatter based on the incoming command line arguments."""
+ r: Type[formatters.BaseFormatter] = formatters.Formatter
+ if options_list.quiet:
+ r = formatters.QuietFormatter
+ elif options_list.parseable:
+ r = formatters.ParseableFormatter
+ elif options_list.parseable_severity:
+ r = formatters.ParseableSeverityFormatter
+ return r
+
+
+def report_outcome(matches: List["MatchError"], options) -> int:
+ """Display information about how to skip found rules.
+
+ Returns exit code, 2 if errors were found, 0 when only warnings were found.
+ """
+ failure = False
+ msg = """\
+You can skip specific rules or tags by adding them to your configuration file:
+```yaml
+# .ansible-lint
+warn_list: # or 'skip_list' to silence them completely
+"""
+ matches_unignored = [match for match in matches if not match.ignored]
+
+ matched_rules = {match.rule.id: match.rule for match in matches_unignored}
+ for id in sorted(matched_rules.keys()):
+ if {id, *matched_rules[id].tags}.isdisjoint(options.warn_list):
+ msg += f" - '{id}' # {matched_rules[id].shortdesc}\n"
+ failure = True
+ for match in matches:
+ if "experimental" in match.rule.tags:
+ msg += " - experimental # all rules tagged as experimental\n"
+ break
+ msg += "```"
+
+ if matches and not options.quiet:
+ console_stderr.print(Markdown(msg))
+
+ if failure:
+ return 2
+ else:
+ return 0
+
+
+def main() -> int:
+ """Linter CLI entry point."""
+ cwd = pathlib.Path.cwd()
+
+ options = cli.get_config(sys.argv[1:])
+
+ initialize_logger(options.verbosity)
+ _logger.debug("Options: %s", options)
+
+ formatter_factory = choose_formatter_factory(options)
+ formatter = formatter_factory(cwd, options.display_relative_path)
+
+ rulesdirs = get_rules_dirs([str(rdir) for rdir in options.rulesdir],
+ options.use_default_rules)
+ rules = RulesCollection(rulesdirs)
+
+ if options.listrules:
+ console.print(
+ _rule_format_map[options.format](rules),
+ highlight=False)
+ return 0
+
+ if options.listtags:
+ print(rules.listtags())
+ return 0
+
+ if isinstance(options.tags, str):
+ options.tags = options.tags.split(',')
+
+ skip = set()
+ for s in options.skip_list:
+ skip.update(str(s).split(','))
+ options.skip_list = frozenset(skip)
+
+ matches = _get_matches(rules, options)
+
+ # Assure we do not print duplicates and the order is consistent
+ matches = sorted(set(matches))
+
+ mark_as_success = False
+ if matches and options.progressive:
+ _logger.info(
+ "Matches found, running again on previous revision in order to detect regressions")
+ with _previous_revision():
+ old_matches = _get_matches(rules, options)
+ # remove old matches from current list
+ matches_delta = list(set(matches) - set(old_matches))
+ if len(matches_delta) == 0:
+ _logger.warning(
+ "Total violations not increased since previous "
+ "commit, will mark result as success. (%s -> %s)",
+ len(old_matches), len(matches_delta))
+ mark_as_success = True
+
+ ignored = 0
+ for match in matches:
+ # if match is not new, mark is as ignored
+ if match not in matches_delta:
+ match.ignored = True
+ ignored += 1
+ if ignored:
+ _logger.warning(
+ "Marked %s previously known violation(s) as ignored due to"
+ " progressive mode.", ignored)
+
+ _render_matches(matches, options, formatter, cwd)
+
+ if matches and not mark_as_success:
+ return report_outcome(matches, options=options)
+ else:
+ return 0
+
+
+def _render_matches(
+ matches: List,
+ options: "Namespace",
+ formatter: Any,
+ cwd: Union[str, pathlib.Path]):
+
+ ignored_matches = [match for match in matches if match.ignored]
+ fatal_matches = [match for match in matches if not match.ignored]
+ # Displayed ignored matches first
+ if ignored_matches:
+ _logger.warning(
+ "Listing %s violation(s) marked as ignored, likely already known",
+ len(ignored_matches))
+ for match in ignored_matches:
+ if match.ignored:
+ print(formatter.format(match, options.colored))
+ if fatal_matches:
+ _logger.warning("Listing %s violation(s) that are fatal", len(fatal_matches))
+ for match in fatal_matches:
+ if not match.ignored:
+ print(formatter.format(match, options.colored))
+
+ # If run under GitHub Actions we also want to emit output recognized by it.
+ if os.getenv('GITHUB_ACTIONS') == 'true' and os.getenv('GITHUB_WORKFLOW'):
+ formatter = formatters.AnnotationsFormatter(cwd, True)
+ for match in matches:
+ print(formatter.format(match))
+
+
+def _get_matches(rules: RulesCollection, options: "Namespace") -> list:
+
+ if not options.playbook:
+ # no args triggers auto-detection mode
+ playbooks = get_playbooks_and_roles(options=options)
+ else:
+ playbooks = sorted(set(options.playbook))
+
+ matches = list()
+ checked_files: Set[str] = set()
+ for playbook in playbooks:
+ runner = Runner(rules, playbook, options.tags,
+ options.skip_list, options.exclude_paths,
+ options.verbosity, checked_files)
+ matches.extend(runner.run())
+ return matches
+
+
+@contextmanager
+def _previous_revision():
+ """Create or update a temporary workdir containing the previous revision."""
+ worktree_dir = ".cache/old-rev"
+ revision = subprocess.run(
+ ["git", "rev-parse", "HEAD^1"],
+ check=True,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ ).stdout
+ p = pathlib.Path(worktree_dir)
+ p.mkdir(parents=True, exist_ok=True)
+ os.system(f"git worktree add -f {worktree_dir} 2>/dev/null")
+ with cwd(worktree_dir):
+ os.system(f"git checkout {revision}")
+ yield
+
+
+if __name__ == "__main__":
+ try:
+ sys.exit(main())
+ except IOError as exc:
+ if exc.errno != errno.EPIPE:
+ raise
+ except RuntimeError as e:
+ raise SystemExit(str(e))
diff --git a/lib/ansiblelint/cli.py b/lib/ansiblelint/cli.py
new file mode 100644
index 0000000..6b39561
--- /dev/null
+++ b/lib/ansiblelint/cli.py
@@ -0,0 +1,219 @@
+# -*- coding: utf-8 -*-
+"""CLI parser setup and helpers."""
+import argparse
+import logging
+import os
+import sys
+from pathlib import Path
+from typing import List, NamedTuple
+
+import yaml
+
+from ansiblelint.constants import DEFAULT_RULESDIR, INVALID_CONFIG_RC
+from ansiblelint.utils import expand_path_vars
+from ansiblelint.version import __version__
+
+_logger = logging.getLogger(__name__)
+_PATH_VARS = ['exclude_paths', 'rulesdir', ]
+
+
+def abspath(path: str, base_dir: str) -> str:
+ """Make relative path absolute relative to given directory.
+
+ Args:
+ path (str): the path to make absolute
+ base_dir (str): the directory from which make relative paths
+ absolute
+ default_drive: Windows drive to use to make the path
+ absolute if none is given.
+ """
+ if not os.path.isabs(path):
+ # Don't use abspath as it assumes path is relative to cwd.
+ # We want it relative to base_dir.
+ path = os.path.join(base_dir, path)
+
+ return os.path.normpath(path)
+
+
+def expand_to_normalized_paths(config: dict, base_dir: str = None) -> None:
+ # config can be None (-c /dev/null)
+ if not config:
+ return
+ base_dir = base_dir or os.getcwd()
+ for paths_var in _PATH_VARS:
+ if paths_var not in config:
+ continue # Cause we don't want to add a variable not present
+
+ normalized_paths = []
+ for path in config.pop(paths_var):
+ normalized_path = abspath(expand_path_vars(path), base_dir=base_dir)
+
+ normalized_paths.append(normalized_path)
+
+ config[paths_var] = normalized_paths
+
+
+def load_config(config_file: str) -> dict:
+ config_path = os.path.abspath(config_file or '.ansible-lint')
+
+ if config_file:
+ if not os.path.exists(config_path):
+ _logger.error("Config file not found '%s'", config_path)
+ sys.exit(INVALID_CONFIG_RC)
+ elif not os.path.exists(config_path):
+ # a missing default config file should not trigger an error
+ return {}
+
+ try:
+ with open(config_path, "r") as stream:
+ config = yaml.safe_load(stream)
+ except yaml.YAMLError as e:
+ _logger.error(e)
+ sys.exit(INVALID_CONFIG_RC)
+ # TODO(ssbarnea): implement schema validation for config file
+ if isinstance(config, list):
+ _logger.error(
+ "Invalid configuration '%s', expected YAML mapping in the config file.",
+ config_path)
+ sys.exit(INVALID_CONFIG_RC)
+
+ config_dir = os.path.dirname(config_path)
+ expand_to_normalized_paths(config, config_dir)
+ return config
+
+
+class AbspathArgAction(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ if isinstance(values, (str, Path)):
+ values = [values]
+ normalized_values = [Path(expand_path_vars(path)).resolve() for path in values]
+ previous_values = getattr(namespace, self.dest, [])
+ setattr(namespace, self.dest, previous_values + normalized_values)
+
+
+def get_cli_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('-L', dest='listrules', default=False,
+ action='store_true', help="list all the rules")
+ parser.add_argument('-f', dest='format', default='rich',
+ choices=['rich', 'plain', 'rst'],
+ help="Format used rules output, (default: %(default)s)")
+ parser.add_argument('-q', dest='quiet',
+ default=False,
+ action='store_true',
+ help="quieter, although not silent output")
+ parser.add_argument('-p', dest='parseable',
+ default=False,
+ action='store_true',
+ help="parseable output in the format of pep8")
+ parser.add_argument('--parseable-severity', dest='parseable_severity',
+ default=False,
+ action='store_true',
+ help="parseable output including severity of rule")
+ parser.add_argument('--progressive', dest='progressive',
+ default=False,
+ action='store_true',
+ help="Return success if it detects a reduction in number"
+ " of violations compared with previous git commit. This "
+ "feature works only in git repositories.")
+ parser.add_argument('-r', action=AbspathArgAction, dest='rulesdir',
+ default=[], type=Path,
+ help="Specify custom rule directories. Add -R "
+ f"to keep using embedded rules from {DEFAULT_RULESDIR}")
+ parser.add_argument('-R', action='store_true',
+ default=False,
+ dest='use_default_rules',
+ help="Keep default rules when using -r")
+ parser.add_argument('--show-relpath', dest='display_relative_path', action='store_false',
+ default=True,
+ help="Display path relative to CWD")
+ parser.add_argument('-t', dest='tags',
+ action='append',
+ default=[],
+ help="only check rules whose id/tags match these values")
+ parser.add_argument('-T', dest='listtags', action='store_true',
+ help="list all the tags")
+ parser.add_argument('-v', dest='verbosity', action='count',
+ help="Increase verbosity level",
+ default=0)
+ parser.add_argument('-x', dest='skip_list', default=[], action='append',
+ help="only check rules whose id/tags do not "
+ "match these values")
+ parser.add_argument('-w', dest='warn_list', default=[], action='append',
+ help="only warn about these rules, unless overridden in "
+ "config file defaults to 'experimental'")
+ parser.add_argument('--nocolor', dest='colored',
+ default=hasattr(sys.stdout, 'isatty') and sys.stdout.isatty(),
+ action='store_false',
+ help="disable colored output")
+ parser.add_argument('--force-color', dest='colored',
+ action='store_true',
+ help="Try force colored output (relying on ansible's code)")
+ parser.add_argument('--exclude', dest='exclude_paths',
+ action=AbspathArgAction,
+ type=Path, default=[],
+ help='path to directories or files to skip. '
+ 'This option is repeatable.',
+ )
+ parser.add_argument('-c', dest='config_file',
+ help='Specify configuration file to use. '
+ 'Defaults to ".ansible-lint"')
+ parser.add_argument('--version', action='version',
+ version='%(prog)s {ver!s}'.format(ver=__version__),
+ )
+ parser.add_argument(dest='playbook', nargs='*',
+ help="One or more files or paths. When missing it will "
+ " enable auto-detection mode.")
+
+ return parser
+
+
+def merge_config(file_config, cli_config) -> NamedTuple:
+ bools = (
+ 'display_relative_path',
+ 'parseable',
+ 'parseable_severity',
+ 'quiet',
+ 'use_default_rules',
+ )
+ # maps lists to their default config values
+ lists_map = {
+ 'exclude_paths': [],
+ 'rulesdir': [],
+ 'skip_list': [],
+ 'tags': [],
+ 'warn_list': ['experimental'],
+ }
+
+ if not file_config:
+ return cli_config
+
+ for entry in bools:
+ x = getattr(cli_config, entry) or file_config.get(entry, False)
+ setattr(cli_config, entry, x)
+
+ for entry, default in lists_map.items():
+ getattr(cli_config, entry).extend(file_config.get(entry, default))
+
+ if 'verbosity' in file_config:
+ cli_config.verbosity = (cli_config.verbosity +
+ file_config['verbosity'])
+
+ return cli_config
+
+
+def get_config(arguments: List[str]):
+ parser = get_cli_parser()
+ options = parser.parse_args(arguments)
+
+ config = load_config(options.config_file)
+
+ return merge_config(config, options)
+
+
+def print_help(file=sys.stdout):
+ get_cli_parser().print_help(file=file)
+
+
+# vim: et:sw=4:syntax=python:ts=4:
diff --git a/lib/ansiblelint/color.py b/lib/ansiblelint/color.py
new file mode 100644
index 0000000..b30a89c
--- /dev/null
+++ b/lib/ansiblelint/color.py
@@ -0,0 +1,31 @@
+"""Console coloring and terminal support."""
+import sys
+from enum import Enum
+
+from rich.console import Console
+from rich.theme import Theme
+
+_theme = Theme({
+ "info": "cyan",
+ "warning": "dim yellow",
+ "danger": "bold red",
+ "title": "yellow"
+})
+console = Console(theme=_theme)
+console_stderr = Console(file=sys.stderr, theme=_theme)
+
+
+class Color(Enum):
+ """Color styles."""
+
+ reset = "0"
+ error_code = "1;31" # bright red
+ error_title = "0;31" # red
+ filename = "0;34" # blue
+ linenumber = "0;36" # cyan
+ line = "0;35" # purple
+
+
+def colorize(text: str, color: Color) -> str:
+ """Return ANSI formated string."""
+ return f"\u001b[{color.value}m{text}\u001b[{Color.reset.value}m"
diff --git a/lib/ansiblelint/constants.py b/lib/ansiblelint/constants.py
new file mode 100644
index 0000000..89094f9
--- /dev/null
+++ b/lib/ansiblelint/constants.py
@@ -0,0 +1,18 @@
+"""Constants used by AnsibleLint."""
+import os.path
+import sys
+
+# mypy/pylint idiom for py36-py38 compatibility
+# https://github.com/python/typeshed/issues/3500#issuecomment-560958608
+if sys.version_info >= (3, 8):
+ from typing import Literal # pylint: disable=no-name-in-module
+else:
+ from typing_extensions import Literal
+
+DEFAULT_RULESDIR = os.path.join(os.path.dirname(__file__), 'rules')
+CUSTOM_RULESDIR_ENVVAR = "ANSIBLE_LINT_CUSTOM_RULESDIR"
+
+INVALID_CONFIG_RC = 2
+ANSIBLE_FAILURE_RC = 3
+
+FileType = Literal["playbook", "pre_tasks", "post_tasks"]
diff --git a/lib/ansiblelint/errors.py b/lib/ansiblelint/errors.py
new file mode 100644
index 0000000..8569dca
--- /dev/null
+++ b/lib/ansiblelint/errors.py
@@ -0,0 +1,81 @@
+"""Exceptions and error representations."""
+import functools
+
+from ansiblelint.file_utils import normpath
+
+
+@functools.total_ordering
+class MatchError(ValueError):
+ """Rule violation detected during linting.
+
+ It can be raised as Exception but also just added to the list of found
+ rules violations.
+
+ Note that line argument is not considered when building hash of an
+ instance.
+ """
+
+ # IMPORTANT: any additional comparison protocol methods must return
+ # IMPORTANT: `NotImplemented` singleton to allow the check to use the
+ # IMPORTANT: other object's fallbacks.
+ # Ref: https://docs.python.org/3/reference/datamodel.html#object.__lt__
+
+ def __init__(
+ self,
+ message=None,
+ linenumber=0,
+ details: str = "",
+ filename=None,
+ rule=None) -> None:
+ """Initialize a MatchError instance."""
+ super().__init__(message)
+
+ if not (message or rule):
+ raise TypeError(
+ f'{self.__class__.__name__}() missing a '
+ "required argument: one of 'message' or 'rule'",
+ )
+
+ self.message = message or getattr(rule, 'shortdesc', "")
+ self.linenumber = linenumber
+ self.details = details
+ self.filename = normpath(filename) if filename else None
+ self.rule = rule
+ self.ignored = False # If set it will be displayed but not counted as failure
+
+ def __repr__(self):
+ """Return a MatchError instance representation."""
+ formatstr = u"[{0}] ({1}) matched {2}:{3} {4}"
+ # note that `rule.id` can be int, str or even missing, as users
+ # can defined their own custom rules.
+ _id = getattr(self.rule, "id", "000")
+
+ return formatstr.format(_id, self.message,
+ self.filename, self.linenumber, self.details)
+
+ @property
+ def _hash_key(self):
+ # line attr is knowingly excluded, as dict is not hashable
+ return (
+ self.filename,
+ self.linenumber,
+ str(getattr(self.rule, 'id', 0)),
+ self.message,
+ self.details,
+ )
+
+ def __lt__(self, other):
+ """Return whether the current object is less than the other."""
+ if not isinstance(other, self.__class__):
+ return NotImplemented
+ return self._hash_key < other._hash_key
+
+ def __hash__(self):
+ """Return a hash value of the MatchError instance."""
+ return hash(self._hash_key)
+
+ def __eq__(self, other):
+ """Identify whether the other object represents the same rule match."""
+ if not isinstance(other, self.__class__):
+ return NotImplemented
+ return self.__hash__() == other.__hash__()
diff --git a/lib/ansiblelint/file_utils.py b/lib/ansiblelint/file_utils.py
new file mode 100644
index 0000000..f25382f
--- /dev/null
+++ b/lib/ansiblelint/file_utils.py
@@ -0,0 +1,25 @@
+"""Utility functions related to file operations."""
+import os
+from contextlib import contextmanager
+
+
+def normpath(path) -> str:
+ """
+ Normalize a path in order to provide a more consistent output.
+
+ Currently it generates a relative path but in the future we may want to
+ make this user configurable.
+ """
+ # convertion to string in order to allow receiving non string objects
+ return os.path.relpath(str(path))
+
+
+@contextmanager
+def cwd(path):
+ """Context manager for temporary changing current working directory."""
+ old_pwd = os.getcwd()
+ os.chdir(path)
+ try:
+ yield
+ finally:
+ os.chdir(old_pwd)
diff --git a/lib/ansiblelint/formatters/__init__.py b/lib/ansiblelint/formatters/__init__.py
new file mode 100644
index 0000000..7395183
--- /dev/null
+++ b/lib/ansiblelint/formatters/__init__.py
@@ -0,0 +1,167 @@
+"""Output formatters."""
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Generic, TypeVar, Union
+
+from ansiblelint.color import Color, colorize
+
+if TYPE_CHECKING:
+ from ansiblelint.errors import MatchError
+
+T = TypeVar('T', bound='BaseFormatter')
+
+
+class BaseFormatter(Generic[T]):
+ """Formatter of ansible-lint output.
+
+ Base class for output formatters.
+
+ Args:
+ base_dir (str|Path): reference directory against which display relative path.
+ display_relative_path (bool): whether to show path as relative or absolute
+ """
+
+ def __init__(self, base_dir: Union[str, Path], display_relative_path: bool) -> None:
+ """Initialize a BaseFormatter instance."""
+ if isinstance(base_dir, str):
+ base_dir = Path(base_dir)
+ if base_dir: # can be None
+ base_dir = base_dir.absolute()
+
+ # Required 'cause os.path.relpath() does not accept Path before 3.6
+ if isinstance(base_dir, Path):
+ base_dir = str(base_dir) # Drop when Python 3.5 is no longer supported
+
+ self._base_dir = base_dir if display_relative_path else None
+
+ def _format_path(self, path: Union[str, Path]) -> str:
+ # Required 'cause os.path.relpath() does not accept Path before 3.6
+ if isinstance(path, Path):
+ path = str(path) # Drop when Python 3.5 is no longer supported
+
+ if not self._base_dir:
+ return path
+ # Use os.path.relpath 'cause Path.relative_to() misbehaves
+ return os.path.relpath(path, start=self._base_dir)
+
+ def format(self, match: "MatchError", colored: bool = False) -> str:
+ return str(match)
+
+
+class Formatter(BaseFormatter):
+
+ def format(self, match: "MatchError", colored: bool = False) -> str:
+ formatstr = u"{0} {1}\n{2}:{3}\n{4}\n"
+ _id = getattr(match.rule, 'id', '000')
+ if colored:
+ return formatstr.format(
+ colorize(u"[{0}]".format(_id), Color.error_code),
+ colorize(match.message, Color.error_title),
+ colorize(self._format_path(match.filename or ""), Color.filename),
+ colorize(str(match.linenumber), Color.linenumber),
+ colorize(u"{0}".format(match.details), Color.line))
+ else:
+ return formatstr.format(_id,
+ match.message,
+ match.filename or "",
+ match.linenumber,
+ match.details)
+
+
+class QuietFormatter(BaseFormatter):
+
+ def format(self, match: "MatchError", colored: bool = False) -> str:
+ formatstr = u"{0} {1}:{2}"
+ if colored:
+ return formatstr.format(
+ colorize(u"[{0}]".format(match.rule.id), Color.error_code),
+ colorize(self._format_path(match.filename or ""), Color.filename),
+ colorize(str(match.linenumber), Color.linenumber))
+ else:
+ return formatstr.format(match.rule.id, self._format_path(match.filename or ""),
+ match.linenumber)
+
+
+class ParseableFormatter(BaseFormatter):
+
+ def format(self, match: "MatchError", colored: bool = False) -> str:
+ formatstr = u"{0}:{1}: [{2}] {3}"
+ if colored:
+ return formatstr.format(
+ colorize(self._format_path(match.filename or ""), Color.filename),
+ colorize(str(match.linenumber), Color.linenumber),
+ colorize(u"E{0}".format(match.rule.id), Color.error_code),
+ colorize(u"{0}".format(match.message), Color.error_title))
+ else:
+ return formatstr.format(self._format_path(match.filename or ""),
+ match.linenumber,
+ "E" + match.rule.id,
+ match.message)
+
+
+class AnnotationsFormatter(BaseFormatter):
+ # https://docs.github.com/en/actions/reference/workflow-commands-for-github-actions#setting-a-warning-message
+ """Formatter for emitting violations as GitHub Workflow Commands.
+
+ These commands trigger the GHA Workflow runners platform to post violations
+ in a form of GitHub Checks API annotations that appear rendered in pull-
+ request files view.
+
+ ::debug file={name},line={line},col={col},severity={severity}::{message}
+ ::warning file={name},line={line},col={col},severity={severity}::{message}
+ ::error file={name},line={line},col={col},severity={severity}::{message}
+
+ Supported levels: debug, warning, error
+ """
+
+ def format(self, match: "MatchError", colored: bool = False) -> str:
+ """Prepare a match instance for reporting as a GitHub Actions annotation."""
+ if colored:
+ raise ValueError('The colored mode is not supported.')
+
+ level = self._severity_to_level(match.rule.severity)
+ file_path = self._format_path(match.filename or "")
+ line_num = match.linenumber
+ rule_id = match.rule.id
+ severity = match.rule.severity
+ violation_details = match.message
+ return (
+ f"::{level} file={file_path},line={line_num},severity={severity}"
+ f"::[E{rule_id}] {violation_details}"
+ )
+
+ @staticmethod
+ def _severity_to_level(severity: str) -> str:
+ if severity in ['VERY_LOW', 'LOW']:
+ return 'warning'
+ elif severity in ['INFO']:
+ return 'debug'
+ # ['MEDIUM', 'HIGH', 'VERY_HIGH'] or anything else
+ return 'error'
+
+
+class ParseableSeverityFormatter(BaseFormatter):
+
+ def format(self, match: "MatchError", colored: bool = False) -> str:
+ formatstr = u"{0}:{1}: [{2}] [{3}] {4}"
+
+ filename = self._format_path(match.filename or "")
+ linenumber = str(match.linenumber)
+ rule_id = u"E{0}".format(match.rule.id)
+ severity = match.rule.severity
+ message = str(match.message)
+
+ if colored:
+ filename = colorize(filename, Color.filename)
+ linenumber = colorize(linenumber, Color.linenumber)
+ rule_id = colorize(rule_id, Color.error_code)
+ severity = colorize(severity, Color.error_code)
+ message = colorize(message, Color.error_title)
+
+ return formatstr.format(
+ filename,
+ linenumber,
+ rule_id,
+ severity,
+ message,
+ )
diff --git a/lib/ansiblelint/generate_docs.py b/lib/ansiblelint/generate_docs.py
new file mode 100644
index 0000000..b735b1f
--- /dev/null
+++ b/lib/ansiblelint/generate_docs.py
@@ -0,0 +1,66 @@
+"""Utils to generate rule table .rst documentation."""
+import logging
+from typing import Iterable
+
+from rich import box
+from rich.console import render_group
+from rich.markdown import Markdown
+from rich.table import Table
+
+from ansiblelint.rules import RulesCollection
+
+DOC_HEADER = """
+.. _lint_default_rules:
+
+Default Rules
+=============
+
+.. contents::
+ :local:
+
+Below you can see the list of default rules Ansible Lint use to evaluate playbooks and roles:
+
+"""
+
+_logger = logging.getLogger(__name__)
+
+
+def rules_as_rst(rules: RulesCollection) -> str:
+ """Return RST documentation for a list of rules."""
+ r = DOC_HEADER
+
+ for d in rules:
+ if not hasattr(d, 'id'):
+ _logger.warning(
+ "Rule %s skipped from being documented as it does not have an `id` attribute.",
+ d.__class__.__name__)
+ continue
+
+ if d.id.endswith('01'):
+
+ section = '{} Rules ({}xx)'.format(
+ d.tags[0].title(),
+ d.id[-3:-2])
+ r += f'\n\n{section}\n{ "-" * len(section) }'
+
+ title = f"{d.id}: {d.shortdesc}"
+ r += f"\n\n.. _{d.id}:\n\n{title}\n{'*' * len(title)}\n\n{d.description}"
+
+ return r
+
+
+@render_group()
+def rules_as_rich(rules: RulesCollection) -> Iterable[Table]:
+ """Print documentation for a list of rules, returns empty string."""
+ for rule in rules:
+ table = Table(show_header=True, header_style="title", box=box.MINIMAL)
+ table.add_column(rule.id, style="dim", width=16)
+ table.add_column(Markdown(rule.shortdesc))
+ table.add_row("description", Markdown(rule.description))
+ if rule.version_added:
+ table.add_row("version_added", rule.version_added)
+ if rule.tags:
+ table.add_row("tags", ", ".join(rule.tags))
+ if rule.severity:
+ table.add_row("severity", rule.severity)
+ yield table
diff --git a/lib/ansiblelint/rules/AlwaysRunRule.py b/lib/ansiblelint/rules/AlwaysRunRule.py
new file mode 100644
index 0000000..8d811ff
--- /dev/null
+++ b/lib/ansiblelint/rules/AlwaysRunRule.py
@@ -0,0 +1,33 @@
+# Copyright (c) 2017 Anth Courtney <anthcourtney@gmail.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class AlwaysRunRule(AnsibleLintRule):
+ id = '101'
+ shortdesc = 'Deprecated always_run'
+ description = 'Instead of ``always_run``, use ``check_mode``'
+ severity = 'MEDIUM'
+ tags = ['deprecated', 'ANSIBLE0018']
+ version_added = 'historic'
+
+ def matchtask(self, file, task):
+ return 'always_run' in task
diff --git a/lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py b/lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py
new file mode 100644
index 0000000..e6f3259
--- /dev/null
+++ b/lib/ansiblelint/rules/BecomeUserWithoutBecomeRule.py
@@ -0,0 +1,80 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from functools import reduce
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+def _get_subtasks(data):
+ result = []
+ block_names = [
+ 'tasks',
+ 'pre_tasks',
+ 'post_tasks',
+ 'handlers',
+ 'block',
+ 'always',
+ 'rescue']
+ for name in block_names:
+ if data and name in data:
+ result += (data[name] or [])
+ return result
+
+
+def _nested_search(term, data):
+ if data and term in data:
+ return True
+ return reduce((lambda x, y: x or _nested_search(term, y)), _get_subtasks(data), False)
+
+
+def _become_user_without_become(becomeuserabove, data):
+ if 'become' in data:
+ # If become is in lineage of tree then correct
+ return False
+ if ('become_user' in data and _nested_search('become', data)):
+ # If 'become_user' on tree and become somewhere below
+ # we must check for a case of a second 'become_user' without a
+ # 'become' in its lineage
+ subtasks = _get_subtasks(data)
+ return reduce((lambda x, y: x or _become_user_without_become(False, y)), subtasks, False)
+ if _nested_search('become_user', data):
+ # Keep searching down if 'become_user' exists in the tree below current task
+ subtasks = _get_subtasks(data)
+ return (len(subtasks) == 0 or
+ reduce((lambda x, y: x or
+ _become_user_without_become(
+ becomeuserabove or 'become_user' in data, y)), subtasks, False))
+ # If at bottom of tree, flag up if 'become_user' existed in the lineage of the tree and
+ # 'become' was not. This is an error if any lineage has a 'become_user' but no become
+ return becomeuserabove
+
+
+class BecomeUserWithoutBecomeRule(AnsibleLintRule):
+ id = '501'
+ shortdesc = 'become_user requires become to work as expected'
+ description = '``become_user`` without ``become`` will not actually change user'
+ severity = 'VERY_HIGH'
+ tags = ['task', 'oddity', 'ANSIBLE0017']
+ version_added = 'historic'
+
+ def matchplay(self, file, data):
+ if file['type'] == 'playbook' and _become_user_without_become(False, data):
+ return ({'become_user': data}, self.shortdesc)
diff --git a/lib/ansiblelint/rules/CommandHasChangesCheckRule.py b/lib/ansiblelint/rules/CommandHasChangesCheckRule.py
new file mode 100644
index 0000000..26087b8
--- /dev/null
+++ b/lib/ansiblelint/rules/CommandHasChangesCheckRule.py
@@ -0,0 +1,45 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class CommandHasChangesCheckRule(AnsibleLintRule):
+ id = '301'
+ shortdesc = 'Commands should not change things if nothing needs doing'
+ description = (
+ 'Commands should either read information (and thus set '
+ '``changed_when``) or not do something if it has already been '
+ 'done (using creates/removes) or only do it if another '
+ 'check has a particular result (``when``)'
+ )
+ severity = 'HIGH'
+ tags = ['command-shell', 'idempotency', 'ANSIBLE0012']
+ version_added = 'historic'
+
+ _commands = ['command', 'shell', 'raw']
+
+ def matchtask(self, file, task):
+ if task["__ansible_action_type__"] == 'task':
+ if task["action"]["__ansible_module__"] in self._commands:
+ return 'changed_when' not in task and \
+ 'when' not in task and \
+ 'creates' not in task['action'] and \
+ 'removes' not in task['action']
diff --git a/lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py b/lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py
new file mode 100644
index 0000000..f1adffa
--- /dev/null
+++ b/lib/ansiblelint/rules/CommandsInsteadOfArgumentsRule.py
@@ -0,0 +1,65 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import os
+
+from ansiblelint.rules import AnsibleLintRule
+from ansiblelint.utils import get_first_cmd_arg
+
+try:
+ from ansible.module_utils.parsing.convert_bool import boolean
+except ImportError:
+ try:
+ from ansible.utils.boolean import boolean
+ except ImportError:
+ try:
+ from ansible.utils import boolean
+ except ImportError:
+ from ansible import constants
+ boolean = constants.mk_boolean
+
+
+class CommandsInsteadOfArgumentsRule(AnsibleLintRule):
+ id = '302'
+ shortdesc = 'Using command rather than an argument to e.g. file'
+ description = (
+ 'Executing a command when there are arguments to modules '
+ 'is generally a bad idea'
+ )
+ severity = 'VERY_HIGH'
+ tags = ['command-shell', 'resources', 'ANSIBLE0007']
+ version_added = 'historic'
+
+ _commands = ['command', 'shell', 'raw']
+ _arguments = {'chown': 'owner', 'chmod': 'mode', 'chgrp': 'group',
+ 'ln': 'state=link', 'mkdir': 'state=directory',
+ 'rmdir': 'state=absent', 'rm': 'state=absent'}
+
+ def matchtask(self, file, task):
+ if task["action"]["__ansible_module__"] in self._commands:
+ first_cmd_arg = get_first_cmd_arg(task)
+ if not first_cmd_arg:
+ return
+
+ executable = os.path.basename(first_cmd_arg)
+ if executable in self._arguments and \
+ boolean(task['action'].get('warn', True)):
+ message = "{0} used in place of argument {1} to file module"
+ return message.format(executable, self._arguments[executable])
diff --git a/lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py b/lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py
new file mode 100644
index 0000000..b19c5c2
--- /dev/null
+++ b/lib/ansiblelint/rules/CommandsInsteadOfModulesRule.py
@@ -0,0 +1,86 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import os
+
+from ansiblelint.rules import AnsibleLintRule
+from ansiblelint.utils import get_first_cmd_arg
+
+try:
+ from ansible.module_utils.parsing.convert_bool import boolean
+except ImportError:
+ try:
+ from ansible.utils.boolean import boolean
+ except ImportError:
+ try:
+ from ansible.utils import boolean
+ except ImportError:
+ from ansible import constants
+ boolean = constants.mk_boolean
+
+
+class CommandsInsteadOfModulesRule(AnsibleLintRule):
+ id = '303'
+ shortdesc = 'Using command rather than module'
+ description = (
+ 'Executing a command when there is an Ansible module '
+ 'is generally a bad idea'
+ )
+ severity = 'HIGH'
+ tags = ['command-shell', 'resources', 'ANSIBLE0006']
+ version_added = 'historic'
+
+ _commands = ['command', 'shell']
+ _modules = {
+ 'apt-get': 'apt-get',
+ 'chkconfig': 'service',
+ 'curl': 'get_url or uri',
+ 'git': 'git',
+ 'hg': 'hg',
+ 'letsencrypt': 'acme_certificate',
+ 'mktemp': 'tempfile',
+ 'mount': 'mount',
+ 'patch': 'patch',
+ 'rpm': 'yum or rpm_key',
+ 'rsync': 'synchronize',
+ 'sed': 'template, replace or lineinfile',
+ 'service': 'service',
+ 'supervisorctl': 'supervisorctl',
+ 'svn': 'subversion',
+ 'systemctl': 'systemd',
+ 'tar': 'unarchive',
+ 'unzip': 'unarchive',
+ 'wget': 'get_url or uri',
+ 'yum': 'yum',
+ }
+
+ def matchtask(self, file, task):
+ if task['action']['__ansible_module__'] not in self._commands:
+ return
+
+ first_cmd_arg = get_first_cmd_arg(task)
+ if not first_cmd_arg:
+ return
+
+ executable = os.path.basename(first_cmd_arg)
+ if executable in self._modules and \
+ boolean(task['action'].get('warn', True)):
+ message = '{0} used in place of {1} module'
+ return message.format(executable, self._modules[executable])
diff --git a/lib/ansiblelint/rules/ComparisonToEmptyStringRule.py b/lib/ansiblelint/rules/ComparisonToEmptyStringRule.py
new file mode 100644
index 0000000..a43c4f7
--- /dev/null
+++ b/lib/ansiblelint/rules/ComparisonToEmptyStringRule.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2016, Will Thames and contributors
+# Copyright (c) 2018, Ansible Project
+
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class ComparisonToEmptyStringRule(AnsibleLintRule):
+ id = '602'
+ shortdesc = "Don't compare to empty string"
+ description = (
+ 'Use ``when: var|length > 0`` rather than ``when: var != ""`` (or '
+ 'conversely ``when: var|length == 0`` rather than ``when: var == ""``)'
+ )
+ severity = 'HIGH'
+ tags = ['idiom']
+ version_added = 'v4.0.0'
+
+ empty_string_compare = re.compile("[=!]= ?(\"{2}|'{2})")
+
+ def match(self, file, line):
+ return self.empty_string_compare.search(line)
diff --git a/lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py b/lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py
new file mode 100644
index 0000000..46668d1
--- /dev/null
+++ b/lib/ansiblelint/rules/ComparisonToLiteralBoolRule.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2016, Will Thames and contributors
+# Copyright (c) 2018, Ansible Project
+
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class ComparisonToLiteralBoolRule(AnsibleLintRule):
+ id = '601'
+ shortdesc = "Don't compare to literal True/False"
+ description = (
+ 'Use ``when: var`` rather than ``when: var == True`` '
+ '(or conversely ``when: not var``)'
+ )
+ severity = 'HIGH'
+ tags = ['idiom']
+ version_added = 'v4.0.0'
+
+ literal_bool_compare = re.compile("[=!]= ?(True|true|False|false)")
+
+ def match(self, file, line):
+ return self.literal_bool_compare.search(line)
diff --git a/lib/ansiblelint/rules/DeprecatedModuleRule.py b/lib/ansiblelint/rules/DeprecatedModuleRule.py
new file mode 100644
index 0000000..dc019ed
--- /dev/null
+++ b/lib/ansiblelint/rules/DeprecatedModuleRule.py
@@ -0,0 +1,37 @@
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class DeprecatedModuleRule(AnsibleLintRule):
+ id = '105'
+ shortdesc = 'Deprecated module'
+ description = (
+ 'These are deprecated modules, some modules are kept '
+ 'temporarily for backwards compatibility but usage is discouraged. '
+ 'For more details see: '
+ 'https://docs.ansible.com/ansible/latest/modules/list_of_all_modules.html'
+ )
+ severity = 'HIGH'
+ tags = ['deprecated']
+ version_added = 'v4.0.0'
+
+ _modules = [
+ 'accelerate', 'aos_asn_pool', 'aos_blueprint', 'aos_blueprint_param',
+ 'aos_blueprint_virtnet', 'aos_device', 'aos_external_router',
+ 'aos_ip_pool', 'aos_logical_device', 'aos_logical_device_map',
+ 'aos_login', 'aos_rack_type', 'aos_template', 'azure', 'cl_bond',
+ 'cl_bridge', 'cl_img_install', 'cl_interface', 'cl_interface_policy',
+ 'cl_license', 'cl_ports', 'cs_nic', 'docker', 'ec2_ami_find',
+ 'ec2_ami_search', 'ec2_remote_facts', 'ec2_vpc', 'kubernetes',
+ 'netscaler', 'nxos_ip_interface', 'nxos_mtu', 'nxos_portchannel',
+ 'nxos_switchport', 'oc', 'panos_nat_policy', 'panos_security_policy',
+ 'vsphere_guest', 'win_msi', 'include'
+ ]
+
+ def matchtask(self, file, task):
+ module = task["action"]["__ansible_module__"]
+ if module in self._modules:
+ message = '{0} {1}'
+ return message.format(self.shortdesc, module)
+ return False
diff --git a/lib/ansiblelint/rules/EnvVarsInCommandRule.py b/lib/ansiblelint/rules/EnvVarsInCommandRule.py
new file mode 100644
index 0000000..58dba90
--- /dev/null
+++ b/lib/ansiblelint/rules/EnvVarsInCommandRule.py
@@ -0,0 +1,48 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+from ansiblelint.utils import FILENAME_KEY, LINE_NUMBER_KEY, get_first_cmd_arg
+
+
+class EnvVarsInCommandRule(AnsibleLintRule):
+ id = '304'
+ shortdesc = "Environment variables don't work as part of command"
+ description = (
+ 'Environment variables should be passed to ``shell`` or ``command`` '
+ 'through environment argument'
+ )
+ severity = 'VERY_HIGH'
+ tags = ['command-shell', 'bug', 'ANSIBLE0014']
+ version_added = 'historic'
+
+ expected_args = ['chdir', 'creates', 'executable', 'removes', 'stdin', 'warn',
+ 'stdin_add_newline', 'strip_empty_ends',
+ 'cmd', '__ansible_module__', '__ansible_arguments__',
+ LINE_NUMBER_KEY, FILENAME_KEY]
+
+ def matchtask(self, file, task):
+ if task["action"]["__ansible_module__"] in ['command']:
+ first_cmd_arg = get_first_cmd_arg(task)
+ if not first_cmd_arg:
+ return
+
+ return any([arg not in self.expected_args for arg in task['action']] +
+ ["=" in first_cmd_arg])
diff --git a/lib/ansiblelint/rules/GitHasVersionRule.py b/lib/ansiblelint/rules/GitHasVersionRule.py
new file mode 100644
index 0000000..f0f3680
--- /dev/null
+++ b/lib/ansiblelint/rules/GitHasVersionRule.py
@@ -0,0 +1,37 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class GitHasVersionRule(AnsibleLintRule):
+ id = '401'
+ shortdesc = 'Git checkouts must contain explicit version'
+ description = (
+ 'All version control checkouts must point to '
+ 'an explicit commit or tag, not just ``latest``'
+ )
+ severity = 'MEDIUM'
+ tags = ['module', 'repeatability', 'ANSIBLE0004']
+ version_added = 'historic'
+
+ def matchtask(self, file, task):
+ return (task['action']['__ansible_module__'] == 'git' and
+ task['action'].get('version', 'HEAD') == 'HEAD')
diff --git a/lib/ansiblelint/rules/IncludeMissingFileRule.py b/lib/ansiblelint/rules/IncludeMissingFileRule.py
new file mode 100644
index 0000000..57508fa
--- /dev/null
+++ b/lib/ansiblelint/rules/IncludeMissingFileRule.py
@@ -0,0 +1,67 @@
+# Copyright (c) 2020, Joachim Lusiardi
+# Copyright (c) 2020, Ansible Project
+
+import os.path
+
+import ansible.parsing.yaml.objects
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class IncludeMissingFileRule(AnsibleLintRule):
+ id = '505'
+ shortdesc = 'referenced files must exist'
+ description = (
+ 'All files referenced by by include / import tasks '
+ 'must exist. The check excludes files with jinja2 '
+ 'templates in the filename.'
+ )
+ severity = 'MEDIUM'
+ tags = ['task', 'bug']
+ version_added = 'v4.3.0'
+
+ def matchplay(self, file, data):
+ absolute_directory = file.get('absolute_directory', None)
+ results = []
+
+ # avoid failing with a playbook having tasks: null
+ for task in (data.get('tasks', []) or []):
+
+ # ignore None tasks or
+ # if the id of the current rule is not in list of skipped rules for this play
+ if not task or self.id in task.get('skipped_rules', ()):
+ continue
+
+ # collect information which file was referenced for include / import
+ referenced_file = None
+ for key, val in task.items():
+ if not (key.startswith('include_') or
+ key.startswith('import_') or
+ key == 'include'):
+ continue
+ if isinstance(val, ansible.parsing.yaml.objects.AnsibleMapping):
+ referenced_file = val.get('file', None)
+ else:
+ referenced_file = val
+ # take the file and skip the remaining keys
+ if referenced_file:
+ break
+
+ if referenced_file is None or absolute_directory is None:
+ continue
+
+ # make sure we have a absolute path here and check if it is a file
+ referenced_file = os.path.join(absolute_directory, referenced_file)
+
+ # skip if this is a jinja2 templated reference
+ if '{{' in referenced_file:
+ continue
+
+ # existing files do not produce any error
+ if os.path.isfile(referenced_file):
+ continue
+
+ results.append(({'referenced_file': referenced_file},
+ 'referenced missing file in %s:%i'
+ % (task['__file__'], task['__line__'])))
+ return results
diff --git a/lib/ansiblelint/rules/LineTooLongRule.py b/lib/ansiblelint/rules/LineTooLongRule.py
new file mode 100644
index 0000000..007857e
--- /dev/null
+++ b/lib/ansiblelint/rules/LineTooLongRule.py
@@ -0,0 +1,19 @@
+# Copyright (c) 2016, Will Thames and contributors
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class LineTooLongRule(AnsibleLintRule):
+ id = '204'
+ shortdesc = 'Lines should be no longer than 160 chars'
+ description = (
+ 'Long lines make code harder to read and '
+ 'code review more difficult'
+ )
+ severity = 'VERY_LOW'
+ tags = ['formatting']
+ version_added = 'v4.0.0'
+
+ def match(self, file, line):
+ return len(line) > 160
diff --git a/lib/ansiblelint/rules/LoadingFailureRule.py b/lib/ansiblelint/rules/LoadingFailureRule.py
new file mode 100644
index 0000000..7c37498
--- /dev/null
+++ b/lib/ansiblelint/rules/LoadingFailureRule.py
@@ -0,0 +1,14 @@
+"""Rule definition for a failure to load a file."""
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class LoadingFailureRule(AnsibleLintRule):
+ """File loading failure."""
+
+ id = '901'
+ shortdesc = 'Failed to load or parse file'
+ description = 'Linter failed to process a YAML file, possible not an Ansible file.'
+ severity = 'VERY_HIGH'
+ tags = ['core']
+ version_added = 'v4.3.0'
diff --git a/lib/ansiblelint/rules/MercurialHasRevisionRule.py b/lib/ansiblelint/rules/MercurialHasRevisionRule.py
new file mode 100644
index 0000000..fcfe0a8
--- /dev/null
+++ b/lib/ansiblelint/rules/MercurialHasRevisionRule.py
@@ -0,0 +1,37 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class MercurialHasRevisionRule(AnsibleLintRule):
+ id = '402'
+ shortdesc = 'Mercurial checkouts must contain explicit revision'
+ description = (
+ 'All version control checkouts must point to '
+ 'an explicit commit or tag, not just ``latest``'
+ )
+ severity = 'MEDIUM'
+ tags = ['module', 'repeatability', 'ANSIBLE0005']
+ version_added = 'historic'
+
+ def matchtask(self, file, task):
+ return (task['action']['__ansible_module__'] == 'hg' and
+ task['action'].get('revision', 'default') == 'default')
diff --git a/lib/ansiblelint/rules/MetaChangeFromDefaultRule.py b/lib/ansiblelint/rules/MetaChangeFromDefaultRule.py
new file mode 100644
index 0000000..db52db3
--- /dev/null
+++ b/lib/ansiblelint/rules/MetaChangeFromDefaultRule.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class MetaChangeFromDefaultRule(AnsibleLintRule):
+ id = '703'
+ shortdesc = 'meta/main.yml default values should be changed'
+ field_defaults = [
+ ('author', 'your name'),
+ ('description', 'your description'),
+ ('company', 'your company (optional)'),
+ ('license', 'license (GPLv2, CC-BY, etc)'),
+ ('license', 'license (GPL-2.0-or-later, MIT, etc)'),
+ ]
+ description = (
+ 'meta/main.yml default values should be changed for: ``{}``'.format(
+ ', '.join(f[0] for f in field_defaults)
+ )
+ )
+ severity = 'HIGH'
+ tags = ['metadata']
+ version_added = 'v4.0.0'
+
+ def matchplay(self, file, data):
+ if file['type'] != 'meta':
+ return False
+
+ galaxy_info = data.get('galaxy_info', None)
+ if not galaxy_info:
+ return False
+
+ results = []
+ for field, default in self.field_defaults:
+ value = galaxy_info.get(field, None)
+ if value and value == default:
+ results.append(({'meta/main.yml': data},
+ 'Should change default metadata: %s' % field))
+
+ return results
diff --git a/lib/ansiblelint/rules/MetaMainHasInfoRule.py b/lib/ansiblelint/rules/MetaMainHasInfoRule.py
new file mode 100644
index 0000000..f05f240
--- /dev/null
+++ b/lib/ansiblelint/rules/MetaMainHasInfoRule.py
@@ -0,0 +1,66 @@
+# Copyright (c) 2016, Will Thames and contributors
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+META_STR_INFO = (
+ 'author',
+ 'description'
+)
+META_INFO = tuple(list(META_STR_INFO) + [
+ 'license',
+ 'min_ansible_version',
+ 'platforms',
+])
+
+
+def _platform_info_errors_itr(platforms):
+ if not isinstance(platforms, list):
+ yield 'Platforms should be a list of dictionaries'
+ return
+
+ for platform in platforms:
+ if not isinstance(platform, dict):
+ yield 'Platforms should be a list of dictionaries'
+ elif 'name' not in platform:
+ yield 'Platform should contain name'
+
+
+def _galaxy_info_errors_itr(galaxy_info,
+ info_list=META_INFO,
+ str_info_list=META_STR_INFO):
+ for info in info_list:
+ ginfo = galaxy_info.get(info, False)
+ if ginfo:
+ if info in str_info_list and not isinstance(ginfo, str):
+ yield '{info} should be a string'.format(info=info)
+ elif info == 'platforms':
+ for err in _platform_info_errors_itr(ginfo):
+ yield err
+ else:
+ yield 'Role info should contain {info}'.format(info=info)
+
+
+class MetaMainHasInfoRule(AnsibleLintRule):
+ id = '701'
+ shortdesc = 'meta/main.yml should contain relevant info'
+ str_info = META_STR_INFO
+ info = META_INFO
+ description = (
+ 'meta/main.yml should contain: ``{}``'.format(', '.join(info))
+ )
+ severity = 'HIGH'
+ tags = ['metadata']
+ version_added = 'v4.0.0'
+
+ def matchplay(self, file, data):
+ if file['type'] != 'meta':
+ return False
+
+ meta = {'meta/main.yml': data}
+ galaxy_info = data.get('galaxy_info', False)
+ if galaxy_info:
+ return [(meta, err) for err
+ in _galaxy_info_errors_itr(galaxy_info)]
+
+ return [(meta, "No 'galaxy_info' found")]
diff --git a/lib/ansiblelint/rules/MetaTagValidRule.py b/lib/ansiblelint/rules/MetaTagValidRule.py
new file mode 100644
index 0000000..0739ca3
--- /dev/null
+++ b/lib/ansiblelint/rules/MetaTagValidRule.py
@@ -0,0 +1,81 @@
+# Copyright (c) 2018, Ansible Project
+
+import re
+import sys
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class MetaTagValidRule(AnsibleLintRule):
+ id = '702'
+ shortdesc = 'Tags must contain lowercase letters and digits only'
+ description = (
+ 'Tags must contain lowercase letters and digits only, '
+ 'and ``galaxy_tags`` is expected to be a list'
+ )
+ severity = 'HIGH'
+ tags = ['metadata']
+ version_added = 'v4.0.0'
+
+ TAG_REGEXP = re.compile('^[a-z0-9]+$')
+
+ def matchplay(self, file, data):
+ if file['type'] != 'meta':
+ return False
+
+ galaxy_info = data.get('galaxy_info', None)
+ if not galaxy_info:
+ return False
+
+ tags = []
+ results = []
+
+ if 'galaxy_tags' in galaxy_info:
+ if isinstance(galaxy_info['galaxy_tags'], list):
+ tags += galaxy_info['galaxy_tags']
+ else:
+ results.append(({'meta/main.yml': data},
+ "Expected 'galaxy_tags' to be a list"))
+
+ if 'categories' in galaxy_info:
+ results.append(({'meta/main.yml': data},
+ "Use 'galaxy_tags' rather than 'categories'"))
+ if isinstance(galaxy_info['categories'], list):
+ tags += galaxy_info['categories']
+ else:
+ results.append(({'meta/main.yml': data},
+ "Expected 'categories' to be a list"))
+
+ for tag in tags:
+ msg = self.shortdesc
+ if not isinstance(tag, str):
+ results.append((
+ {'meta/main.yml': data},
+ "Tags must be strings: '{}'".format(tag)))
+ continue
+ if not re.match(self.TAG_REGEXP, tag):
+ results.append(({'meta/main.yml': data},
+ "{}, invalid: '{}'".format(msg, tag)))
+
+ return results
+
+
+META_TAG_VALID = '''
+galaxy_info:
+ galaxy_tags: ['database', 'my s q l', 'MYTAG']
+ categories: 'my_category_not_in_a_list'
+'''
+
+# testing code to be loaded only with pytest or when executed the rule file
+if "pytest" in sys.modules:
+
+ import pytest
+
+ @pytest.mark.parametrize('rule_runner', (MetaTagValidRule, ), indirect=['rule_runner'])
+ def test_valid_tag_rule(rule_runner):
+ """Test rule matches."""
+ results = rule_runner.run_role_meta_main(META_TAG_VALID)
+ assert "Use 'galaxy_tags' rather than 'categories'" in str(results)
+ assert "Expected 'categories' to be a list" in str(results)
+ assert "invalid: 'my s q l'" in str(results)
+ assert "invalid: 'MYTAG'" in str(results)
diff --git a/lib/ansiblelint/rules/MetaVideoLinksRule.py b/lib/ansiblelint/rules/MetaVideoLinksRule.py
new file mode 100644
index 0000000..aa34012
--- /dev/null
+++ b/lib/ansiblelint/rules/MetaVideoLinksRule.py
@@ -0,0 +1,65 @@
+# Copyright (c) 2018, Ansible Project
+
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class MetaVideoLinksRule(AnsibleLintRule):
+ id = '704'
+ shortdesc = "meta/main.yml video_links should be formatted correctly"
+ description = (
+ 'Items in ``video_links`` in meta/main.yml should be '
+ 'dictionaries, and contain only keys ``url`` and ``title``, '
+ 'and have a shared link from a supported provider'
+ )
+ severity = 'LOW'
+ tags = ['metadata']
+ version_added = 'v4.0.0'
+
+ VIDEO_REGEXP = {
+ 'google': re.compile(
+ r'https://drive\.google\.com.*file/d/([0-9A-Za-z-_]+)/.*'),
+ 'vimeo': re.compile(
+ r'https://vimeo\.com/([0-9]+)'),
+ 'youtube': re.compile(
+ r'https://youtu\.be/([0-9A-Za-z-_]+)'),
+ }
+
+ def matchplay(self, file, data):
+ if file['type'] != 'meta':
+ return False
+
+ galaxy_info = data.get('galaxy_info', None)
+ if not galaxy_info:
+ return False
+
+ video_links = galaxy_info.get('video_links', None)
+ if not video_links:
+ return False
+
+ results = []
+
+ for video in video_links:
+ if not isinstance(video, dict):
+ results.append(({'meta/main.yml': data},
+ "Expected item in 'video_links' to be "
+ "a dictionary"))
+ continue
+
+ if set(video) != {'url', 'title', '__file__', '__line__'}:
+ results.append(({'meta/main.yml': data},
+ "Expected item in 'video_links' to contain "
+ "only keys 'url' and 'title'"))
+ continue
+
+ for name, expr in self.VIDEO_REGEXP.items():
+ if expr.match(video['url']):
+ break
+ else:
+ msg = ("URL format '{0}' is not recognized. "
+ "Expected it be a shared link from Vimeo, YouTube, "
+ "or Google Drive.".format(video['url']))
+ results.append(({'meta/main.yml': data}, msg))
+
+ return results
diff --git a/lib/ansiblelint/rules/MissingFilePermissionsRule.py b/lib/ansiblelint/rules/MissingFilePermissionsRule.py
new file mode 100644
index 0000000..bc11cc7
--- /dev/null
+++ b/lib/ansiblelint/rules/MissingFilePermissionsRule.py
@@ -0,0 +1,95 @@
+# Copyright (c) 2020 Sorin Sbarnea <sorin.sbarnea@gmail.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+from ansiblelint.rules import AnsibleLintRule
+
+# Despite documentation mentioning 'preserve' only these modules support it:
+_modules_with_preserve = (
+ 'copy',
+ 'template',
+)
+
+
+class MissingFilePermissionsRule(AnsibleLintRule):
+ id = "208"
+ shortdesc = 'File permissions unset or incorrect'
+ description = (
+ "Missing or unsupported mode parameter can cause unexpected file "
+ "permissions based "
+ "on version of Ansible being used. Be explicit, like ``mode: 0644`` to "
+ "avoid hitting this rule. Special ``preserve`` value is accepted "
+ f"only by {', '.join(_modules_with_preserve)} modules. "
+ "See https://github.com/ansible/ansible/issues/71200"
+ )
+ severity = 'VERY_HIGH'
+ tags = ['unpredictability', 'experimental']
+ version_added = 'v4.3.0'
+
+ _modules = {
+ 'archive',
+ 'assemble',
+ 'copy', # supports preserve
+ 'file',
+ 'replace', # implicit preserve behavior but mode: preserve is invalid
+ 'template', # supports preserve
+ # 'unarchive', # disabled because .tar.gz files can have permissions inside
+ }
+
+ _modules_with_create = {
+ 'blockinfile': False,
+ 'htpasswd': True,
+ 'ini_file': True,
+ 'lineinfile': False,
+ }
+
+ def matchtask(self, file, task):
+ module = task["action"]["__ansible_module__"]
+ mode = task['action'].get('mode', None)
+
+ if module not in self._modules and \
+ module not in self._modules_with_create:
+ return False
+
+ if mode == 'preserve' and module not in _modules_with_preserve:
+ return True
+
+ if module in self._modules_with_create:
+ create = task["action"].get("create", self._modules_with_create[module])
+ return create and mode is None
+
+ # A file that doesn't exist cannot have a mode
+ if task['action'].get('state', None) == "absent":
+ return False
+
+ # A symlink always has mode 0o777
+ if task['action'].get('state', None) == "link":
+ return False
+
+ # The file module does not create anything when state==file (default)
+ if module == "file" and \
+ task['action'].get('state', 'file') == 'file':
+ return False
+
+ # replace module is the only one that has a valid default preserve
+ # behavior, but we want to trigger rule if user used incorrect
+ # documentation and put 'preserve', which is not supported.
+ if module == 'replace' and mode is None:
+ return False
+
+ return mode is None
diff --git a/lib/ansiblelint/rules/NestedJinjaRule.py b/lib/ansiblelint/rules/NestedJinjaRule.py
new file mode 100644
index 0000000..c10d4ec
--- /dev/null
+++ b/lib/ansiblelint/rules/NestedJinjaRule.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+# Author: Adrián Tóth <adtoth@redhat.com>
+#
+# Copyright (c) 2020, Red Hat, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class NestedJinjaRule(AnsibleLintRule):
+ id = '207'
+ shortdesc = 'Nested jinja pattern'
+ description = (
+ "There should not be any nested jinja pattern. "
+ "Example (bad): ``{{ list_one + {{ list_two | max }} }}``, "
+ "example (good): ``{{ list_one + max(list_two) }}``"
+ )
+ severity = 'VERY_HIGH'
+ tags = ['formatting']
+ version_added = 'v4.3.0'
+
+ pattern = re.compile(r"{{(?:[^{}]*)?{{")
+
+ def matchtask(self, file, task):
+
+ command = "".join(
+ str(value)
+ # task properties are stored in the 'action' key
+ for key, value in task['action'].items()
+ # exclude useless values of '__file__', '__ansible_module__', '__*__', etc.
+ if not key.startswith('__') and not key.endswith('__')
+ )
+
+ return bool(self.pattern.search(command))
diff --git a/lib/ansiblelint/rules/NoFormattingInWhenRule.py b/lib/ansiblelint/rules/NoFormattingInWhenRule.py
new file mode 100644
index 0000000..a665311
--- /dev/null
+++ b/lib/ansiblelint/rules/NoFormattingInWhenRule.py
@@ -0,0 +1,34 @@
+from ansiblelint.rules import AnsibleLintRule
+
+
+class NoFormattingInWhenRule(AnsibleLintRule):
+ id = '102'
+ shortdesc = 'No Jinja2 in when'
+ description = '``when`` lines should not include Jinja2 variables'
+ severity = 'HIGH'
+ tags = ['deprecated', 'ANSIBLE0019']
+ version_added = 'historic'
+
+ def _is_valid(self, when):
+ if not isinstance(when, str):
+ return True
+ return when.find('{{') == -1 and when.find('}}') == -1
+
+ def matchplay(self, file, play):
+ errors = []
+ if isinstance(play, dict):
+ if 'roles' not in play or play['roles'] is None:
+ return errors
+ for role in play['roles']:
+ if self.matchtask(file, role):
+ errors.append(({'when': role},
+ 'role "when" clause has Jinja2 templates'))
+ if isinstance(play, list):
+ for play_item in play:
+ sub_errors = self.matchplay(file, play_item)
+ if sub_errors:
+ errors = errors + sub_errors
+ return errors
+
+ def matchtask(self, file, task):
+ return 'when' in task and not self._is_valid(task['when'])
diff --git a/lib/ansiblelint/rules/NoTabsRule.py b/lib/ansiblelint/rules/NoTabsRule.py
new file mode 100644
index 0000000..78222c8
--- /dev/null
+++ b/lib/ansiblelint/rules/NoTabsRule.py
@@ -0,0 +1,16 @@
+# Copyright (c) 2016, Will Thames and contributors
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class NoTabsRule(AnsibleLintRule):
+ id = '203'
+ shortdesc = 'Most files should not contain tabs'
+ description = 'Tabs can cause unexpected display issues, use spaces'
+ severity = 'LOW'
+ tags = ['formatting']
+ version_added = 'v4.0.0'
+
+ def match(self, file, line):
+ return '\t' in line
diff --git a/lib/ansiblelint/rules/OctalPermissionsRule.py b/lib/ansiblelint/rules/OctalPermissionsRule.py
new file mode 100644
index 0000000..b95c322
--- /dev/null
+++ b/lib/ansiblelint/rules/OctalPermissionsRule.py
@@ -0,0 +1,73 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class OctalPermissionsRule(AnsibleLintRule):
+ id = '202'
+ shortdesc = 'Octal file permissions must contain leading zero or be a string'
+ description = (
+ 'Numeric file permissions without leading zero can behave '
+ 'in unexpected ways. See '
+ 'http://docs.ansible.com/ansible/file_module.html'
+ )
+ severity = 'VERY_HIGH'
+ tags = ['formatting', 'ANSIBLE0009']
+ version_added = 'historic'
+
+ _modules = ['assemble', 'copy', 'file', 'ini_file', 'lineinfile',
+ 'replace', 'synchronize', 'template', 'unarchive']
+
+ def is_invalid_permission(self, mode):
+ # sensible file permission modes don't
+ # have write bit set when read bit is
+ # not set and don't have execute bit set
+ # when user execute bit is not set.
+ # also, user permissions are more generous than
+ # group permissions and user and group permissions
+ # are more generous than world permissions
+
+ other_write_without_read = (mode % 8 and mode % 8 < 4 and
+ not (mode % 8 == 1 and (mode >> 6) % 2 == 1))
+ group_write_without_read = ((mode >> 3) % 8 and (mode >> 3) % 8 < 4 and
+ not ((mode >> 3) % 8 == 1 and (mode >> 6) % 2 == 1))
+ user_write_without_read = ((mode >> 6) % 8 and (mode >> 6) % 8 < 4 and
+ not (mode >> 6) % 8 == 1)
+ other_more_generous_than_group = mode % 8 > (mode >> 3) % 8
+ other_more_generous_than_user = mode % 8 > (mode >> 6) % 8
+ group_more_generous_than_user = (mode >> 3) % 8 > (mode >> 6) % 8
+
+ return (other_write_without_read or
+ group_write_without_read or
+ user_write_without_read or
+ other_more_generous_than_group or
+ other_more_generous_than_user or
+ group_more_generous_than_user)
+
+ def matchtask(self, file, task):
+ if task["action"]["__ansible_module__"] in self._modules:
+ mode = task['action'].get('mode', None)
+
+ if isinstance(mode, str):
+ return False
+
+ if isinstance(mode, int):
+ return self.is_invalid_permission(mode)
diff --git a/lib/ansiblelint/rules/PackageIsNotLatestRule.py b/lib/ansiblelint/rules/PackageIsNotLatestRule.py
new file mode 100644
index 0000000..9fddaf4
--- /dev/null
+++ b/lib/ansiblelint/rules/PackageIsNotLatestRule.py
@@ -0,0 +1,67 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class PackageIsNotLatestRule(AnsibleLintRule):
+ id = '403'
+ shortdesc = 'Package installs should not use latest'
+ description = (
+ 'Package installs should use ``state=present`` '
+ 'with or without a version'
+ )
+ severity = 'VERY_LOW'
+ tags = ['module', 'repeatability', 'ANSIBLE0010']
+ version_added = 'historic'
+
+ _package_managers = [
+ 'apk',
+ 'apt',
+ 'bower',
+ 'bundler',
+ 'dnf',
+ 'easy_install',
+ 'gem',
+ 'homebrew',
+ 'jenkins_plugin',
+ 'npm',
+ 'openbsd_package',
+ 'openbsd_pkg',
+ 'package',
+ 'pacman',
+ 'pear',
+ 'pip',
+ 'pkg5',
+ 'pkgutil',
+ 'portage',
+ 'slackpkg',
+ 'sorcery',
+ 'swdepot',
+ 'win_chocolatey',
+ 'yarn',
+ 'yum',
+ 'zypper',
+ ]
+
+ def matchtask(self, file, task):
+ return (task['action']['__ansible_module__'] in self._package_managers and
+ not task['action'].get('version') and
+ task['action'].get('state') == 'latest')
diff --git a/lib/ansiblelint/rules/PlaybookExtension.py b/lib/ansiblelint/rules/PlaybookExtension.py
new file mode 100644
index 0000000..593e5ae
--- /dev/null
+++ b/lib/ansiblelint/rules/PlaybookExtension.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2016, Tsukinowa Inc. <info@tsukinowa.jp>
+# Copyright (c) 2018, Ansible Project
+
+import os
+from typing import List
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class PlaybookExtension(AnsibleLintRule):
+ id = '205'
+ shortdesc = 'Use ".yml" or ".yaml" playbook extension'
+ description = 'Playbooks should have the ".yml" or ".yaml" extension'
+ severity = 'MEDIUM'
+ tags = ['formatting']
+ done = [] # type: List # already noticed path list
+ version_added = 'v4.0.0'
+
+ def match(self, file, text):
+ if file['type'] != 'playbook':
+ return False
+
+ path = file['path']
+ ext = os.path.splitext(path)
+ if ext[1] not in ['.yml', '.yaml'] and path not in self.done:
+ self.done.append(path)
+ return True
+ return False
diff --git a/lib/ansiblelint/rules/RoleNames.py b/lib/ansiblelint/rules/RoleNames.py
new file mode 100644
index 0000000..3d790b3
--- /dev/null
+++ b/lib/ansiblelint/rules/RoleNames.py
@@ -0,0 +1,74 @@
+# Copyright (c) 2020 Gael Chamoulaud <gchamoul@redhat.com>
+# Copyright (c) 2020 Sorin Sbarnea <ssbarnea@redhat.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import re
+from pathlib import Path
+from typing import List
+
+from ansiblelint.rules import AnsibleLintRule
+from ansiblelint.utils import parse_yaml_from_file
+
+ROLE_NAME_REGEX = '^[a-z][a-z0-9_]+$'
+
+
+def _remove_prefix(text, prefix):
+ return re.sub(r'^{0}'.format(re.escape(prefix)), '', text)
+
+
+class RoleNames(AnsibleLintRule):
+ id = '106'
+ shortdesc = (
+ "Role name {} does not match ``%s`` pattern" % ROLE_NAME_REGEX
+ )
+ description = (
+ "Role names are now limited to contain only lowercase alphanumeric "
+ "characters, plus '_' and start with an alpha character. See "
+ "`developing collections <https://docs.ansible.com/ansible/devel/dev_guide/developing_"
+ "collections.html#roles-directory>`_"
+ )
+ severity = 'HIGH'
+ done: List[str] = [] # already noticed roles list
+ tags = ['deprecated']
+ version_added = 'v4.3.0'
+
+ ROLE_NAME_REGEXP = re.compile(ROLE_NAME_REGEX)
+
+ def match(self, file, text):
+ path = file['path'].split("/")
+ if "tasks" in path:
+ role_name = _remove_prefix(path[path.index("tasks") - 1], "ansible-role-")
+ role_root = path[:path.index("tasks")]
+ meta = Path("/".join(role_root)) / "meta" / "main.yml"
+
+ if meta.is_file():
+ meta_data = parse_yaml_from_file(str(meta))
+ if meta_data:
+ try:
+ role_name = meta_data['galaxy_info']['role_name']
+ except KeyError:
+ pass
+
+ if role_name in self.done:
+ return False
+ self.done.append(role_name)
+ if not re.match(self.ROLE_NAME_REGEXP, role_name):
+ return self.shortdesc.format(role_name)
+ return False
diff --git a/lib/ansiblelint/rules/RoleRelativePath.py b/lib/ansiblelint/rules/RoleRelativePath.py
new file mode 100644
index 0000000..87d7ac8
--- /dev/null
+++ b/lib/ansiblelint/rules/RoleRelativePath.py
@@ -0,0 +1,32 @@
+# Copyright (c) 2016, Tsukinowa Inc. <info@tsukinowa.jp>
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class RoleRelativePath(AnsibleLintRule):
+ id = '404'
+ shortdesc = "Doesn't need a relative path in role"
+ description = '``copy`` and ``template`` do not need to use relative path for ``src``'
+ severity = 'HIGH'
+ tags = ['module']
+ version_added = 'v4.0.0'
+
+ _module_to_path_folder = {
+ 'copy': 'files',
+ 'win_copy': 'files',
+ 'template': 'templates',
+ 'win_template': 'win_templates',
+ }
+
+ def matchtask(self, file, task):
+ module = task['action']['__ansible_module__']
+ if module not in self._module_to_path_folder:
+ return False
+
+ if 'src' not in task['action']:
+ return False
+
+ path_to_check = '../{}'.format(self._module_to_path_folder[module])
+ if path_to_check in task['action']['src']:
+ return True
diff --git a/lib/ansiblelint/rules/ShellWithoutPipefail.py b/lib/ansiblelint/rules/ShellWithoutPipefail.py
new file mode 100644
index 0000000..678e5a2
--- /dev/null
+++ b/lib/ansiblelint/rules/ShellWithoutPipefail.py
@@ -0,0 +1,38 @@
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class ShellWithoutPipefail(AnsibleLintRule):
+ id = '306'
+ shortdesc = 'Shells that use pipes should set the pipefail option'
+ description = (
+ 'Without the pipefail option set, a shell command that '
+ 'implements a pipeline can fail and still return 0. If '
+ 'any part of the pipeline other than the terminal command '
+ 'fails, the whole pipeline will still return 0, which may '
+ 'be considered a success by Ansible. '
+ 'Pipefail is available in the bash shell.'
+ )
+ severity = 'MEDIUM'
+ tags = ['command-shell']
+ version_added = 'v4.1.0'
+
+ _pipefail_re = re.compile(r"^\s*set.*[+-][A-z]*o\s*pipefail")
+ _pipe_re = re.compile(r"(?<!\|)\|(?!\|)")
+
+ def matchtask(self, file, task):
+ if task["__ansible_action_type__"] != "task":
+ return False
+
+ if task["action"]["__ansible_module__"] != "shell":
+ return False
+
+ if task.get("ignore_errors"):
+ return False
+
+ unjinjad_cmd = self.unjinja(
+ ' '.join(task["action"].get("__ansible_arguments__", [])))
+
+ return (self._pipe_re.search(unjinjad_cmd) and
+ not self._pipefail_re.match(unjinjad_cmd))
diff --git a/lib/ansiblelint/rules/SudoRule.py b/lib/ansiblelint/rules/SudoRule.py
new file mode 100644
index 0000000..8ea554e
--- /dev/null
+++ b/lib/ansiblelint/rules/SudoRule.py
@@ -0,0 +1,36 @@
+from ansiblelint.rules import AnsibleLintRule
+
+
+class SudoRule(AnsibleLintRule):
+ id = '103'
+ shortdesc = 'Deprecated sudo'
+ description = 'Instead of ``sudo``/``sudo_user``, use ``become``/``become_user``.'
+ severity = 'VERY_HIGH'
+ tags = ['deprecated', 'ANSIBLE0008']
+ version_added = 'historic'
+
+ def _check_value(self, play_frag):
+ results = []
+
+ if isinstance(play_frag, dict):
+ if 'sudo' in play_frag:
+ results.append(({'sudo': play_frag['sudo']},
+ 'Deprecated sudo feature', play_frag['__line__']))
+ if 'sudo_user' in play_frag:
+ results.append(({'sudo_user': play_frag['sudo_user']},
+ 'Deprecated sudo_user feature', play_frag['__line__']))
+ if 'tasks' in play_frag:
+ output = self._check_value(play_frag['tasks'])
+ if output:
+ results += output
+
+ if isinstance(play_frag, list):
+ for item in play_frag:
+ output = self._check_value(item)
+ if output:
+ results += output
+
+ return results
+
+ def matchplay(self, file, play):
+ return self._check_value(play)
diff --git a/lib/ansiblelint/rules/TaskHasNameRule.py b/lib/ansiblelint/rules/TaskHasNameRule.py
new file mode 100644
index 0000000..8757b03
--- /dev/null
+++ b/lib/ansiblelint/rules/TaskHasNameRule.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class TaskHasNameRule(AnsibleLintRule):
+ id = '502'
+ shortdesc = 'All tasks should be named'
+ description = (
+ 'All tasks should have a distinct name for readability '
+ 'and for ``--start-at-task`` to work'
+ )
+ severity = 'MEDIUM'
+ tags = ['task', 'readability', 'ANSIBLE0011']
+ version_added = 'historic'
+
+ _nameless_tasks = ['meta', 'debug', 'include_role', 'import_role',
+ 'include_tasks', 'import_tasks']
+
+ def matchtask(self, file, task):
+ return (not task.get('name') and
+ task["action"]["__ansible_module__"] not in self._nameless_tasks)
diff --git a/lib/ansiblelint/rules/TaskNoLocalAction.py b/lib/ansiblelint/rules/TaskNoLocalAction.py
new file mode 100644
index 0000000..294bb9d
--- /dev/null
+++ b/lib/ansiblelint/rules/TaskNoLocalAction.py
@@ -0,0 +1,18 @@
+# Copyright (c) 2016, Tsukinowa Inc. <info@tsukinowa.jp>
+# Copyright (c) 2018, Ansible Project
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class TaskNoLocalAction(AnsibleLintRule):
+ id = '504'
+ shortdesc = "Do not use 'local_action', use 'delegate_to: localhost'"
+ description = 'Do not use ``local_action``, use ``delegate_to: localhost``'
+ severity = 'MEDIUM'
+ tags = ['task']
+ version_added = 'v4.0.0'
+
+ def match(self, file, text):
+ if 'local_action' in text:
+ return True
+ return False
diff --git a/lib/ansiblelint/rules/TrailingWhitespaceRule.py b/lib/ansiblelint/rules/TrailingWhitespaceRule.py
new file mode 100644
index 0000000..ac0f1c2
--- /dev/null
+++ b/lib/ansiblelint/rules/TrailingWhitespaceRule.py
@@ -0,0 +1,34 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class TrailingWhitespaceRule(AnsibleLintRule):
+ id = '201'
+ shortdesc = 'Trailing whitespace'
+ description = 'There should not be any trailing whitespace'
+ severity = 'INFO'
+ tags = ['formatting', 'ANSIBLE0002']
+ version_added = 'historic'
+
+ def match(self, file, line):
+ line = line.replace("\r", "")
+ return line.rstrip() != line
diff --git a/lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py b/lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py
new file mode 100644
index 0000000..48babcf
--- /dev/null
+++ b/lib/ansiblelint/rules/UseCommandInsteadOfShellRule.py
@@ -0,0 +1,45 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class UseCommandInsteadOfShellRule(AnsibleLintRule):
+ id = '305'
+ shortdesc = 'Use shell only when shell functionality is required'
+ description = (
+ 'Shell should only be used when piping, redirecting '
+ 'or chaining commands (and Ansible would be preferred '
+ 'for some of those!)'
+ )
+ severity = 'HIGH'
+ tags = ['command-shell', 'safety', 'ANSIBLE0013']
+ version_added = 'historic'
+
+ def matchtask(self, file, task):
+ # Use unjinja so that we don't match on jinja filters
+ # rather than pipes
+ if task["action"]["__ansible_module__"] == 'shell':
+ if 'cmd' in task['action']:
+ unjinjad_cmd = self.unjinja(task["action"].get("cmd", []))
+ else:
+ unjinjad_cmd = self.unjinja(
+ ' '.join(task["action"].get("__ansible_arguments__", [])))
+ return not any([ch in unjinjad_cmd for ch in '&|<>;$\n*[]{}?`'])
diff --git a/lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py b/lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py
new file mode 100644
index 0000000..53b389d
--- /dev/null
+++ b/lib/ansiblelint/rules/UseHandlerRatherThanWhenChangedRule.py
@@ -0,0 +1,52 @@
+# Copyright (c) 2016 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+def _changed_in_when(item):
+ if not isinstance(item, str):
+ return False
+ return any(changed in item for changed in
+ ['.changed', '|changed', '["changed"]', "['changed']"])
+
+
+class UseHandlerRatherThanWhenChangedRule(AnsibleLintRule):
+ id = '503'
+ shortdesc = 'Tasks that run when changed should likely be handlers'
+ description = (
+ 'If a task has a ``when: result.changed`` setting, it is effectively '
+ 'acting as a handler'
+ )
+ severity = 'MEDIUM'
+ tags = ['task', 'behaviour', 'ANSIBLE0016']
+ version_added = 'historic'
+
+ def matchtask(self, file, task):
+ if task["__ansible_action_type__"] != 'task':
+ return False
+
+ when = task.get('when')
+
+ if isinstance(when, list):
+ for item in when:
+ return _changed_in_when(item)
+ else:
+ return _changed_in_when(when)
diff --git a/lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py b/lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py
new file mode 100644
index 0000000..a0721ac
--- /dev/null
+++ b/lib/ansiblelint/rules/UsingBareVariablesIsDeprecatedRule.py
@@ -0,0 +1,75 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import os
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class UsingBareVariablesIsDeprecatedRule(AnsibleLintRule):
+ id = '104'
+ shortdesc = 'Using bare variables is deprecated'
+ description = (
+ 'Using bare variables is deprecated. Update your '
+ 'playbooks so that the environment value uses the full variable '
+ 'syntax ``{{ your_variable }}``'
+ )
+ severity = 'VERY_HIGH'
+ tags = ['deprecated', 'formatting', 'ANSIBLE0015']
+ version_added = 'historic'
+
+ _jinja = re.compile(r"{{.*}}", re.DOTALL)
+ _glob = re.compile('[][*?]')
+
+ def matchtask(self, file, task):
+ loop_type = next((key for key in task
+ if key.startswith("with_")), None)
+ if loop_type:
+ if loop_type in ["with_nested", "with_together", "with_flattened", "with_filetree"]:
+ # These loops can either take a list defined directly in the task
+ # or a variable that is a list itself. When a single variable is used
+ # we just need to check that one variable, and not iterate over it like
+ # it's a list. Otherwise, loop through and check all items.
+ items = task[loop_type]
+ if not isinstance(items, (list, tuple)):
+ items = [items]
+ for var in items:
+ return self._matchvar(var, task, loop_type)
+ elif loop_type == "with_subelements":
+ return self._matchvar(task[loop_type][0], task, loop_type)
+ elif loop_type in ["with_sequence", "with_ini",
+ "with_inventory_hostnames"]:
+ pass
+ else:
+ return self._matchvar(task[loop_type], task, loop_type)
+
+ def _matchvar(self, varstring, task, loop_type):
+ if (isinstance(varstring, str) and
+ not self._jinja.match(varstring)):
+ valid = loop_type == 'with_fileglob' and bool(self._jinja.search(varstring) or
+ self._glob.search(varstring))
+
+ valid |= loop_type == 'with_filetree' and bool(self._jinja.search(varstring) or
+ varstring.endswith(os.sep))
+ if not valid:
+ message = "Found a bare variable '{0}' used in a '{1}' loop." + \
+ " You should use the full variable syntax ('{{{{ {0} }}}}')"
+ return message.format(task[loop_type], loop_type)
diff --git a/lib/ansiblelint/rules/VariableHasSpacesRule.py b/lib/ansiblelint/rules/VariableHasSpacesRule.py
new file mode 100644
index 0000000..dd4f441
--- /dev/null
+++ b/lib/ansiblelint/rules/VariableHasSpacesRule.py
@@ -0,0 +1,24 @@
+# Copyright (c) 2016, Will Thames and contributors
+# Copyright (c) 2018, Ansible Project
+
+import re
+
+from ansiblelint.rules import AnsibleLintRule
+
+
+class VariableHasSpacesRule(AnsibleLintRule):
+ id = '206'
+ shortdesc = 'Variables should have spaces before and after: {{ var_name }}'
+ description = 'Variables should have spaces before and after: ``{{ var_name }}``'
+ severity = 'LOW'
+ tags = ['formatting']
+ version_added = 'v4.0.0'
+
+ variable_syntax = re.compile(r"{{.*}}")
+ bracket_regex = re.compile(r"{{[^{' -]|[^ '}-]}}")
+
+ def match(self, file, line):
+ if not self.variable_syntax.search(line):
+ return
+ line_exclude_json = re.sub(r"[^{]{'\w+': ?[^{]{.*?}}", "", line)
+ return self.bracket_regex.search(line_exclude_json)
diff --git a/lib/ansiblelint/rules/__init__.py b/lib/ansiblelint/rules/__init__.py
new file mode 100644
index 0000000..fd3e92d
--- /dev/null
+++ b/lib/ansiblelint/rules/__init__.py
@@ -0,0 +1,254 @@
+"""All internal ansible-lint rules."""
+import glob
+import importlib.util
+import logging
+import os
+import re
+from collections import defaultdict
+from importlib.abc import Loader
+from time import sleep
+from typing import List
+
+import ansiblelint.utils
+from ansiblelint.errors import MatchError
+from ansiblelint.skip_utils import append_skipped_rules, get_rule_skips_from_line
+
+_logger = logging.getLogger(__name__)
+
+
+class AnsibleLintRule(object):
+
+ def __repr__(self) -> str:
+ """Return a AnsibleLintRule instance representation."""
+ return self.id + ": " + self.shortdesc
+
+ def verbose(self) -> str:
+ return self.id + ": " + self.shortdesc + "\n " + self.description
+
+ id: str = ""
+ tags: List[str] = []
+ shortdesc: str = ""
+ description: str = ""
+ version_added: str = ""
+ severity: str = ""
+ match = None
+ matchtask = None
+ matchplay = None
+
+ @staticmethod
+ def unjinja(text):
+ text = re.sub(r"{{.+?}}", "JINJA_EXPRESSION", text)
+ text = re.sub(r"{%.+?%}", "JINJA_STATEMENT", text)
+ text = re.sub(r"{#.+?#}", "JINJA_COMMENT", text)
+ return text
+
+ def matchlines(self, file, text) -> List[MatchError]:
+ matches: List[MatchError] = []
+ if not self.match:
+ return matches
+ # arrays are 0-based, line numbers are 1-based
+ # so use prev_line_no as the counter
+ for (prev_line_no, line) in enumerate(text.split("\n")):
+ if line.lstrip().startswith('#'):
+ continue
+
+ rule_id_list = get_rule_skips_from_line(line)
+ if self.id in rule_id_list:
+ continue
+
+ result = self.match(file, line)
+ if not result:
+ continue
+ message = None
+ if isinstance(result, str):
+ message = result
+ m = MatchError(
+ message=message,
+ linenumber=prev_line_no + 1,
+ details=line,
+ filename=file['path'],
+ rule=self)
+ matches.append(m)
+ return matches
+
+ # TODO(ssbarnea): Reduce mccabe complexity
+ # https://github.com/ansible/ansible-lint/issues/744
+ def matchtasks(self, file: str, text: str) -> List[MatchError]: # noqa: C901
+ matches: List[MatchError] = []
+ if not self.matchtask:
+ return matches
+
+ if file['type'] == 'meta':
+ return matches
+
+ yaml = ansiblelint.utils.parse_yaml_linenumbers(text, file['path'])
+ if not yaml:
+ return matches
+
+ yaml = append_skipped_rules(yaml, text, file['type'])
+
+ try:
+ tasks = ansiblelint.utils.get_normalized_tasks(yaml, file)
+ except MatchError as e:
+ return [e]
+
+ for task in tasks:
+ if self.id in task.get('skipped_rules', ()):
+ continue
+
+ if 'action' not in task:
+ continue
+ result = self.matchtask(file, task)
+ if not result:
+ continue
+
+ message = None
+ if isinstance(result, str):
+ message = result
+ task_msg = "Task/Handler: " + ansiblelint.utils.task_to_str(task)
+ m = MatchError(
+ message=message,
+ linenumber=task[ansiblelint.utils.LINE_NUMBER_KEY],
+ details=task_msg,
+ filename=file['path'],
+ rule=self)
+ matches.append(m)
+ return matches
+
+ @staticmethod
+ def _matchplay_linenumber(play, optional_linenumber):
+ try:
+ linenumber, = optional_linenumber
+ except ValueError:
+ linenumber = play[ansiblelint.utils.LINE_NUMBER_KEY]
+ return linenumber
+
+ def matchyaml(self, file: str, text: str) -> List[MatchError]:
+ matches: List[MatchError] = []
+ if not self.matchplay:
+ return matches
+
+ yaml = ansiblelint.utils.parse_yaml_linenumbers(text, file['path'])
+ if not yaml:
+ return matches
+
+ if isinstance(yaml, dict):
+ yaml = [yaml]
+
+ yaml = ansiblelint.skip_utils.append_skipped_rules(yaml, text, file['type'])
+
+ for play in yaml:
+ if self.id in play.get('skipped_rules', ()):
+ continue
+
+ result = self.matchplay(file, play)
+ if not result:
+ continue
+
+ if isinstance(result, tuple):
+ result = [result]
+
+ if not isinstance(result, list):
+ raise TypeError("{} is not a list".format(result))
+
+ for section, message, *optional_linenumber in result:
+ linenumber = self._matchplay_linenumber(play, optional_linenumber)
+ m = MatchError(
+ message=message,
+ linenumber=linenumber,
+ details=str(section),
+ filename=file['path'],
+ rule=self)
+ matches.append(m)
+ return matches
+
+
+def load_plugins(directory: str) -> List[AnsibleLintRule]:
+ """Return a list of rule classes."""
+ result = []
+
+ for pluginfile in glob.glob(os.path.join(directory, '[A-Za-z]*.py')):
+
+ pluginname = os.path.basename(pluginfile.replace('.py', ''))
+ spec = importlib.util.spec_from_file_location(pluginname, pluginfile)
+ # https://github.com/python/typeshed/issues/2793
+ if spec and isinstance(spec.loader, Loader):
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ obj = getattr(module, pluginname)()
+ result.append(obj)
+ return result
+
+
+class RulesCollection(object):
+
+ def __init__(self, rulesdirs=None) -> None:
+ """Initialize a RulesCollection instance."""
+ if rulesdirs is None:
+ rulesdirs = []
+ self.rulesdirs = ansiblelint.utils.expand_paths_vars(rulesdirs)
+ self.rules: List[AnsibleLintRule] = []
+ for rulesdir in self.rulesdirs:
+ _logger.debug("Loading rules from %s", rulesdir)
+ self.extend(load_plugins(rulesdir))
+ self.rules = sorted(self.rules, key=lambda r: r.id)
+
+ def register(self, obj: AnsibleLintRule):
+ self.rules.append(obj)
+
+ def __iter__(self):
+ """Return the iterator over the rules in the RulesCollection."""
+ return iter(self.rules)
+
+ def __len__(self):
+ """Return the length of the RulesCollection data."""
+ return len(self.rules)
+
+ def extend(self, more: List[AnsibleLintRule]) -> None:
+ self.rules.extend(more)
+
+ def run(self, playbookfile, tags=set(), skip_list=frozenset()) -> List:
+ text = ""
+ matches: List = list()
+
+ for i in range(3):
+ try:
+ with open(playbookfile['path'], mode='r', encoding='utf-8') as f:
+ text = f.read()
+ break
+ except IOError as e:
+ _logger.warning(
+ "Couldn't open %s - %s [try:%s]",
+ playbookfile['path'],
+ e.strerror,
+ i)
+ sleep(1)
+ continue
+ if i and not text:
+ return matches
+
+ for rule in self.rules:
+ if not tags or not set(rule.tags).union([rule.id]).isdisjoint(tags):
+ rule_definition = set(rule.tags)
+ rule_definition.add(rule.id)
+ if set(rule_definition).isdisjoint(skip_list):
+ matches.extend(rule.matchlines(playbookfile, text))
+ matches.extend(rule.matchtasks(playbookfile, text))
+ matches.extend(rule.matchyaml(playbookfile, text))
+
+ return matches
+
+ def __repr__(self) -> str:
+ """Return a RulesCollection instance representation."""
+ return "\n".join([rule.verbose()
+ for rule in sorted(self.rules, key=lambda x: x.id)])
+
+ def listtags(self) -> str:
+ tags = defaultdict(list)
+ for rule in self.rules:
+ for tag in rule.tags:
+ tags[tag].append("[{0}]".format(rule.id))
+ results = []
+ for tag in sorted(tags):
+ results.append("{0} {1}".format(tag, tags[tag]))
+ return "\n".join(results)
diff --git a/lib/ansiblelint/rules/custom/__init__.py b/lib/ansiblelint/rules/custom/__init__.py
new file mode 100644
index 0000000..8c3e048
--- /dev/null
+++ b/lib/ansiblelint/rules/custom/__init__.py
@@ -0,0 +1 @@
+"""A placeholder package for putting custom rules under this dir."""
diff --git a/lib/ansiblelint/runner.py b/lib/ansiblelint/runner.py
new file mode 100644
index 0000000..f73945f
--- /dev/null
+++ b/lib/ansiblelint/runner.py
@@ -0,0 +1,111 @@
+"""Runner implementation."""
+import logging
+import os
+from typing import TYPE_CHECKING, Any, FrozenSet, Generator, List, Optional, Set
+
+import ansiblelint.file_utils
+import ansiblelint.skip_utils
+import ansiblelint.utils
+from ansiblelint.errors import MatchError
+from ansiblelint.rules.LoadingFailureRule import LoadingFailureRule
+
+if TYPE_CHECKING:
+ from ansiblelint.rules import RulesCollection
+
+
+_logger = logging.getLogger(__name__)
+
+
+class Runner(object):
+ """Runner class performs the linting process."""
+
+ def __init__(
+ self,
+ rules: "RulesCollection",
+ playbook: str,
+ tags: FrozenSet[Any] = frozenset(),
+ skip_list: Optional[FrozenSet[Any]] = frozenset(),
+ exclude_paths: List[str] = [],
+ verbosity: int = 0,
+ checked_files: Set[str] = None) -> None:
+ """Initialize a Runner instance."""
+ self.rules = rules
+ self.playbooks = set()
+ # assume role if directory
+ if os.path.isdir(playbook):
+ self.playbooks.add((os.path.join(playbook, ''), 'role'))
+ self.playbook_dir = playbook
+ else:
+ self.playbooks.add((playbook, 'playbook'))
+ self.playbook_dir = os.path.dirname(playbook)
+ self.tags = tags
+ self.skip_list = skip_list
+ self._update_exclude_paths(exclude_paths)
+ self.verbosity = verbosity
+ if checked_files is None:
+ checked_files = set()
+ self.checked_files = checked_files
+
+ def _update_exclude_paths(self, exclude_paths: List[str]) -> None:
+ if exclude_paths:
+ # These will be (potentially) relative paths
+ paths = ansiblelint.utils.expand_paths_vars(exclude_paths)
+ # Since ansiblelint.utils.find_children returns absolute paths,
+ # and the list of files we create in `Runner.run` can contain both
+ # relative and absolute paths, we need to cover both bases.
+ self.exclude_paths = paths + [os.path.abspath(p) for p in paths]
+ else:
+ self.exclude_paths = []
+
+ def is_excluded(self, file_path: str) -> bool:
+ """Verify if a file path should be excluded."""
+ # Any will short-circuit as soon as something returns True, but will
+ # be poor performance for the case where the path under question is
+ # not excluded.
+ return any(file_path.startswith(path) for path in self.exclude_paths)
+
+ def run(self) -> List[MatchError]:
+ """Execute the linting process."""
+ files = list()
+ for playbook in self.playbooks:
+ if self.is_excluded(playbook[0]) or playbook[1] == 'role':
+ continue
+ files.append({'path': ansiblelint.file_utils.normpath(playbook[0]),
+ 'type': playbook[1],
+ # add an absolute path here, so rules are able to validate if
+ # referenced files exist
+ 'absolute_directory': os.path.dirname(playbook[0])})
+ matches = set(self._emit_matches(files))
+
+ # remove duplicates from files list
+ files = [value for n, value in enumerate(files) if value not in files[:n]]
+
+ # remove files that have already been checked
+ files = [x for x in files if x['path'] not in self.checked_files]
+ for file in files:
+ _logger.debug(
+ "Examining %s of type %s",
+ ansiblelint.file_utils.normpath(file['path']),
+ file['type'])
+ matches = matches.union(
+ self.rules.run(file, tags=set(self.tags),
+ skip_list=self.skip_list))
+ # update list of checked files
+ self.checked_files.update([x['path'] for x in files])
+
+ return sorted(matches)
+
+ def _emit_matches(self, files: List) -> Generator[MatchError, None, None]:
+ visited: Set = set()
+ while visited != self.playbooks:
+ for arg in self.playbooks - visited:
+ try:
+ for child in ansiblelint.utils.find_children(arg, self.playbook_dir):
+ if self.is_excluded(child['path']):
+ continue
+ self.playbooks.add((child['path'], child['type']))
+ files.append(child)
+ except MatchError as e:
+ e.rule = LoadingFailureRule
+ yield e
+ visited.add(arg)
diff --git a/lib/ansiblelint/skip_utils.py b/lib/ansiblelint/skip_utils.py
new file mode 100644
index 0000000..c3c0a88
--- /dev/null
+++ b/lib/ansiblelint/skip_utils.py
@@ -0,0 +1,189 @@
+# (c) 2019–2020, Ansible by Red Hat
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""Utils related to inline skipping of rules."""
+import logging
+from functools import lru_cache
+from itertools import product
+from typing import Any, Generator, List, Sequence
+
+import ruamel.yaml
+
+from ansiblelint.constants import FileType
+
+INLINE_SKIP_FLAG = '# noqa '
+
+_logger = logging.getLogger(__name__)
+
+
+# playbook: Sequence currently expects only instances of one of the two
+# classes below but we should consider avoiding this chimera.
+# ruamel.yaml.comments.CommentedSeq
+# ansible.parsing.yaml.objects.AnsibleSequence
+
+
+def get_rule_skips_from_line(line: str) -> List:
+ """Return list of rule ids skipped via comment on the line of yaml."""
+ _before_noqa, _noqa_marker, noqa_text = line.partition(INLINE_SKIP_FLAG)
+ return noqa_text.split()
+
+
+def append_skipped_rules(pyyaml_data: str, file_text: str, file_type: FileType) -> Sequence:
+ """Append 'skipped_rules' to individual tasks or single metadata block.
+
+ For a file, uses 2nd parser (ruamel.yaml) to pull comments out of
+ yaml subsets, check for '# noqa' skipped rules, and append any skips to the
+ original parser (pyyaml) data relied on by remainder of ansible-lint.
+
+ :param pyyaml_data: file text parsed via ansible and pyyaml.
+ :param file_text: raw file text.
+ :param file_type: type of file: tasks, handlers or meta.
+ :returns: original pyyaml_data altered with a 'skipped_rules' list added
+ to individual tasks, or added to the single metadata block.
+ """
+ try:
+ yaml_skip = _append_skipped_rules(pyyaml_data, file_text, file_type)
+ except RuntimeError:
+ # Notify user of skip error, do not stop, do not change exit code
+ _logger.error('Error trying to append skipped rules', exc_info=True)
+ return pyyaml_data
+ return yaml_skip
+
+
+@lru_cache(maxsize=128)
+def load_data(file_text: str) -> Any:
+ """Parse `file_text` as yaml and return parsed structure.
+
+ This is the main culprit for slow performance, each rule asks for loading yaml again and again
+ ideally the `maxsize` on the decorator above MUST be great or equal total number of rules
+ :param file_text: raw text to parse
+ :return: Parsed yaml
+ """
+ yaml = ruamel.yaml.YAML()
+ return yaml.load(file_text)
+
+
+def _append_skipped_rules(pyyaml_data: Sequence, file_text: str, file_type: FileType) -> Sequence:
+ # parse file text using 2nd parser library
+ ruamel_data = load_data(file_text)
+
+ if file_type == 'meta':
+ pyyaml_data[0]['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_data)
+ return pyyaml_data
+
+ # create list of blocks of tasks or nested tasks
+ if file_type in ('tasks', 'handlers'):
+ ruamel_task_blocks = ruamel_data
+ pyyaml_task_blocks = pyyaml_data
+ elif file_type in ('playbook', 'pre_tasks', 'post_tasks'):
+ try:
+ pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data)
+ ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data)
+ except (AttributeError, TypeError):
+ # TODO(awcrosby): running ansible-lint on any .yml file will
+ # assume it is a playbook, check needs to be added higher in the
+ # call stack, and can remove this except
+ return pyyaml_data
+ else:
+ raise RuntimeError('Unexpected file type: {}'.format(file_type))
+
+ # get tasks from blocks of tasks
+ pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks)
+ ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks)
+
+ # append skipped_rules for each task
+ for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks):
+
+ # ignore empty tasks
+ if not pyyaml_task and not ruamel_task:
+ continue
+
+ if pyyaml_task.get('name') != ruamel_task.get('name'):
+ raise RuntimeError('Error in matching skip comment to a task')
+ pyyaml_task['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_task)
+
+ return pyyaml_data
+
+
+def _get_task_blocks_from_playbook(playbook: Sequence) -> List:
+ """Return parts of playbook that contains tasks, and nested tasks.
+
+ :param playbook: playbook yaml from yaml parser.
+ :returns: list of task dictionaries.
+ """
+ PLAYBOOK_TASK_KEYWORDS = [
+ 'tasks',
+ 'pre_tasks',
+ 'post_tasks',
+ 'handlers',
+ ]
+
+ task_blocks = []
+ for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS):
+ task_blocks.extend(play.get(key, []))
+ return task_blocks
+
+
+def _get_tasks_from_blocks(task_blocks: Sequence) -> Generator:
+ """Get list of tasks from list made of tasks and nested tasks."""
+ NESTED_TASK_KEYS = [
+ 'block',
+ 'always',
+ 'rescue',
+ ]
+
+ def get_nested_tasks(task: Any) -> Generator[Any, None, None]:
+ return (
+ subtask
+ for k in NESTED_TASK_KEYS if task and k in task
+ for subtask in task[k]
+ )
+
+ for task in task_blocks:
+ for sub_task in get_nested_tasks(task):
+ yield sub_task
+ yield task
+
+
+def _get_rule_skips_from_yaml(yaml_input: Sequence) -> Sequence:
+ """Traverse yaml for comments with rule skips and return list of rules."""
+ yaml_comment_obj_strs = []
+
+ def traverse_yaml(obj: Any) -> None:
+ yaml_comment_obj_strs.append(str(obj.ca.items))
+ if isinstance(obj, dict):
+ for key, val in obj.items():
+ if isinstance(val, (dict, list)):
+ traverse_yaml(val)
+ elif isinstance(obj, list):
+ for e in obj:
+ if isinstance(e, (dict, list)):
+ traverse_yaml(e)
+ else:
+ return
+
+ traverse_yaml(yaml_input)
+
+ rule_id_list = []
+ for comment_obj_str in yaml_comment_obj_strs:
+ for line in comment_obj_str.split(r'\n'):
+ rule_id_list.extend(get_rule_skips_from_line(line))
+
+ return rule_id_list
diff --git a/lib/ansiblelint/testing/__init__.py b/lib/ansiblelint/testing/__init__.py
new file mode 100644
index 0000000..1ed686f
--- /dev/null
+++ b/lib/ansiblelint/testing/__init__.py
@@ -0,0 +1,84 @@
+"""Test utils for ansible-lint."""
+
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+from typing import TYPE_CHECKING, Dict, List
+
+from ansible import __version__ as ansible_version_str
+
+from ansiblelint.runner import Runner
+
+if TYPE_CHECKING:
+ from ansiblelint.errors import MatchError
+
+
+ANSIBLE_MAJOR_VERSION = tuple(map(int, ansible_version_str.split('.')[:2]))
+
+
+class RunFromText(object):
+ """Use Runner on temp files created from unittest text snippets."""
+
+ def __init__(self, collection):
+ """Initialize a RunFromText instance with rules collection."""
+ self.collection = collection
+
+ def _call_runner(self, path) -> List["MatchError"]:
+ runner = Runner(self.collection, path)
+ return runner.run()
+
+ def run_playbook(self, playbook_text):
+ """Lints received text as a playbook."""
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", prefix="playbook") as fp:
+ fp.write(playbook_text)
+ fp.flush()
+ results = self._call_runner(fp.name)
+ return results
+
+ def run_role_tasks_main(self, tasks_main_text):
+ """Lints received text as tasks."""
+ role_path = tempfile.mkdtemp(prefix='role_')
+ tasks_path = os.path.join(role_path, 'tasks')
+ os.makedirs(tasks_path)
+ with open(os.path.join(tasks_path, 'main.yml'), 'w') as fp:
+ fp.write(tasks_main_text)
+ results = self._call_runner(role_path)
+ shutil.rmtree(role_path)
+ return results
+
+ def run_role_meta_main(self, meta_main_text):
+ """Lints received text as meta."""
+ role_path = tempfile.mkdtemp(prefix='role_')
+ meta_path = os.path.join(role_path, 'meta')
+ os.makedirs(meta_path)
+ with open(os.path.join(meta_path, 'main.yml'), 'w') as fp:
+ fp.write(meta_main_text)
+ results = self._call_runner(role_path)
+ shutil.rmtree(role_path)
+ return results
+
+
+def run_ansible_lint(
+ *argv: str,
+ cwd: str = None,
+ bin: str = None,
+ env: Dict[str, str] = None) -> subprocess.CompletedProcess:
+ """Run ansible-lint on a given path and returns its output."""
+ if not bin:
+ bin = sys.executable
+ args = [sys.executable, "-m", "ansiblelint", *argv]
+ else:
+ args = [bin, *argv]
+
+ return subprocess.run(
+ args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False, # needed when command is a list
+ check=False,
+ cwd=cwd,
+ env=env,
+ universal_newlines=True
+ )
diff --git a/lib/ansiblelint/utils.py b/lib/ansiblelint/utils.py
new file mode 100644
index 0000000..feac4d7
--- /dev/null
+++ b/lib/ansiblelint/utils.py
@@ -0,0 +1,836 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+"""Generic utility helpers."""
+
+import contextlib
+import inspect
+import logging
+import os
+import pprint
+import subprocess
+from argparse import Namespace
+from collections import OrderedDict
+from functools import lru_cache
+from pathlib import Path
+from typing import Callable, ItemsView, List, Optional, Tuple
+
+import yaml
+from ansible import constants
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.parsing.dataloader import DataLoader
+from ansible.parsing.mod_args import ModuleArgsParser
+from ansible.parsing.splitter import split_args
+from ansible.parsing.yaml.constructor import AnsibleConstructor
+from ansible.parsing.yaml.loader import AnsibleLoader
+from ansible.parsing.yaml.objects import AnsibleSequence
+from ansible.plugins.loader import add_all_plugin_dirs
+from ansible.template import Templar
+from yaml.composer import Composer
+from yaml.representer import RepresenterError
+
+from ansiblelint.constants import (
+ ANSIBLE_FAILURE_RC, CUSTOM_RULESDIR_ENVVAR, DEFAULT_RULESDIR, FileType,
+)
+from ansiblelint.errors import MatchError
+from ansiblelint.file_utils import normpath
+
+# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a
+# string as the password to enable such yaml files to be opened and parsed
+# successfully.
+DEFAULT_VAULT_PASSWORD = 'x'
+
+PLAYBOOK_DIR = os.environ.get('ANSIBLE_PLAYBOOK_DIR', None)
+
+
+_logger = logging.getLogger(__name__)
+
+
+def parse_yaml_from_file(filepath: str) -> dict:
+ dl = DataLoader()
+ if hasattr(dl, 'set_vault_password'):
+ dl.set_vault_password(DEFAULT_VAULT_PASSWORD)
+ return dl.load_from_file(filepath)
+
+
+def path_dwim(basedir: str, given: str) -> str:
+ dl = DataLoader()
+ dl.set_basedir(basedir)
+ return dl.path_dwim(given)
+
+
+def ansible_template(basedir, varname, templatevars, **kwargs):
+ dl = DataLoader()
+ dl.set_basedir(basedir)
+ templar = Templar(dl, variables=templatevars)
+ return templar.template(varname, **kwargs)
+
+
+LINE_NUMBER_KEY = '__line__'
+FILENAME_KEY = '__file__'
+
+VALID_KEYS = [
+ 'name', 'action', 'when', 'async', 'poll', 'notify',
+ 'first_available_file', 'include', 'include_tasks', 'import_tasks', 'import_playbook',
+ 'tags', 'register', 'ignore_errors', 'delegate_to',
+ 'local_action', 'transport', 'remote_user', 'sudo',
+ 'sudo_user', 'sudo_pass', 'when', 'connection', 'environment', 'args', 'always_run',
+ 'any_errors_fatal', 'changed_when', 'failed_when', 'check_mode', 'delay',
+ 'retries', 'until', 'su', 'su_user', 'su_pass', 'no_log', 'run_once',
+ 'become', 'become_user', 'become_method', FILENAME_KEY,
+]
+
+BLOCK_NAME_TO_ACTION_TYPE_MAP = {
+ 'tasks': 'task',
+ 'handlers': 'handler',
+ 'pre_tasks': 'task',
+ 'post_tasks': 'task',
+ 'block': 'meta',
+ 'rescue': 'meta',
+ 'always': 'meta',
+}
+
+
+def tokenize(line):
+ tokens = line.lstrip().split(" ")
+ if tokens[0] == '-':
+ tokens = tokens[1:]
+ if tokens[0] == 'action:' or tokens[0] == 'local_action:':
+ tokens = tokens[1:]
+ command = tokens[0].replace(":", "")
+
+ args = list()
+ kwargs = dict()
+ nonkvfound = False
+ for arg in tokens[1:]:
+ if "=" in arg and not nonkvfound:
+ kv = arg.split("=", 1)
+ kwargs[kv[0]] = kv[1]
+ else:
+ nonkvfound = True
+ args.append(arg)
+ return (command, args, kwargs)
+
+
+def _playbook_items(pb_data: dict) -> ItemsView:
+ if isinstance(pb_data, dict):
+ return pb_data.items()
+ elif not pb_data:
+ return []
+ else:
+ return [item for play in pb_data for item in play.items()]
+
+
+def _rebind_match_filename(filename: str, func) -> Callable:
+ def func_wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except MatchError as e:
+ e.filename = filename
+ raise e
+ return func_wrapper
+
+
+def _set_collections_basedir(basedir: str):
+ # Sets the playbook directory as playbook_paths for the collection loader
+ try:
+ # Ansible 2.10+
+ # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
+ from ansible.utils.collection_loader import AnsibleCollectionConfig
+
+ AnsibleCollectionConfig.playbook_paths = basedir
+ except ImportError:
+ # Ansible 2.8 or 2.9
+ # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
+ from ansible.utils.collection_loader import set_collection_playbook_paths
+
+ set_collection_playbook_paths(basedir)
+
+
+def find_children(playbook: Tuple[str, str], playbook_dir: str) -> List:
+ if not os.path.exists(playbook[0]):
+ return []
+ _set_collections_basedir(playbook_dir or '.')
+ add_all_plugin_dirs(playbook_dir or '.')
+ if playbook[1] == 'role':
+ playbook_ds = {'roles': [{'role': playbook[0]}]}
+ else:
+ try:
+ playbook_ds = parse_yaml_from_file(playbook[0])
+ except AnsibleError as e:
+ raise SystemExit(str(e))
+ results = []
+ basedir = os.path.dirname(playbook[0])
+ items = _playbook_items(playbook_ds)
+ for item in items:
+ for child in _rebind_match_filename(playbook[0], play_children)(
+ basedir, item, playbook[1], playbook_dir):
+ if "$" in child['path'] or "{{" in child['path']:
+ continue
+ valid_tokens = list()
+ for token in split_args(child['path']):
+ if '=' in token:
+ break
+ valid_tokens.append(token)
+ path = ' '.join(valid_tokens)
+ results.append({
+ 'path': path_dwim(basedir, path),
+ 'type': child['type']
+ })
+ return results
+
+
+def template(basedir, value, vars, fail_on_undefined=False, **kwargs):
+ try:
+ value = ansible_template(os.path.abspath(basedir), value, vars,
+ **dict(kwargs, fail_on_undefined=fail_on_undefined))
+ # Hack to skip the following exception when using to_json filter on a variable.
+ # I guess the filter doesn't like empty vars...
+ except (AnsibleError, ValueError, RepresenterError):
+ # templating failed, so just keep value as is.
+ pass
+ return value
+
+
+def play_children(basedir, item, parent_type, playbook_dir):
+ delegate_map = {
+ 'tasks': _taskshandlers_children,
+ 'pre_tasks': _taskshandlers_children,
+ 'post_tasks': _taskshandlers_children,
+ 'block': _taskshandlers_children,
+ 'include': _include_children,
+ 'import_playbook': _include_children,
+ 'roles': _roles_children,
+ 'dependencies': _roles_children,
+ 'handlers': _taskshandlers_children,
+ 'include_tasks': _include_children,
+ 'import_tasks': _include_children,
+ }
+ (k, v) = item
+ add_all_plugin_dirs(os.path.abspath(basedir))
+
+ if k in delegate_map:
+ if v:
+ v = template(os.path.abspath(basedir),
+ v,
+ dict(playbook_dir=PLAYBOOK_DIR or os.path.abspath(basedir)),
+ fail_on_undefined=False)
+ return delegate_map[k](basedir, k, v, parent_type)
+ return []
+
+
+def _include_children(basedir, k, v, parent_type):
+ # handle special case include_tasks: name=filename.yml
+ if k == 'include_tasks' and isinstance(v, dict) and 'file' in v:
+ v = v['file']
+
+ # handle include: filename.yml tags=blah
+ (command, args, kwargs) = tokenize("{0}: {1}".format(k, v))
+
+ result = path_dwim(basedir, args[0])
+ if not os.path.exists(result):
+ result = path_dwim(os.path.join(os.path.dirname(basedir)), v)
+ return [{'path': result, 'type': parent_type}]
+
+
+def _taskshandlers_children(basedir, k, v, parent_type: FileType) -> List:
+ results = []
+ for th in v:
+
+ # ignore empty tasks, `-`
+ if not th:
+ continue
+
+ with contextlib.suppress(LookupError):
+ children = _get_task_handler_children_for_tasks_or_playbooks(
+ th, basedir, k, parent_type,
+ )
+ results.append(children)
+ continue
+
+ if 'include_role' in th or 'import_role' in th: # lgtm [py/unreachable-statement]
+ th = normalize_task_v2(th)
+ _validate_task_handler_action_for_role(th['action'])
+ results.extend(_roles_children(basedir, k, [th['action'].get("name")],
+ parent_type,
+ main=th['action'].get('tasks_from', 'main')))
+ continue
+
+ if 'block' not in th:
+ continue
+
+ results.extend(_taskshandlers_children(basedir, k, th['block'], parent_type))
+ if 'rescue' in th:
+ results.extend(_taskshandlers_children(basedir, k, th['rescue'], parent_type))
+ if 'always' in th:
+ results.extend(_taskshandlers_children(basedir, k, th['always'], parent_type))
+
+ return results
+
+
+def _get_task_handler_children_for_tasks_or_playbooks(
+ task_handler, basedir: str, k, parent_type: FileType,
+) -> dict:
+ """Try to get children of taskhandler for include/import tasks/playbooks."""
+ child_type = k if parent_type == 'playbook' else parent_type
+
+ task_include_keys = 'include', 'include_tasks', 'import_playbook', 'import_tasks'
+ for task_handler_key in task_include_keys:
+
+ with contextlib.suppress(KeyError):
+
+ # ignore empty tasks
+ if not task_handler:
+ continue
+
+ return {
+ 'path': path_dwim(basedir, task_handler[task_handler_key]),
+ 'type': child_type,
+ }
+
+ raise LookupError(
+ f'The node contains none of: {", ".join(task_include_keys)}',
+ )
+
+
+def _validate_task_handler_action_for_role(th_action: dict) -> None:
+ """Verify that the task handler action is valid for role include."""
+ module = th_action['__ansible_module__']
+
+ if 'name' not in th_action:
+ raise MatchError(f"Failed to find required 'name' key in {module!s}")
+
+ if not isinstance(th_action['name'], str):
+ raise RuntimeError(
+ f"Value assigned to 'name' key on '{module!s}' is not a string.",
+ )
+
+
+def _roles_children(basedir: str, k, v, parent_type: FileType, main='main') -> list:
+ results = []
+ for role in v:
+ if isinstance(role, dict):
+ if 'role' in role or 'name' in role:
+ if 'tags' not in role or 'skip_ansible_lint' not in role['tags']:
+ results.extend(_look_for_role_files(basedir,
+ role.get('role', role.get('name')),
+ main=main))
+ elif k != 'dependencies':
+ raise SystemExit('role dict {0} does not contain a "role" '
+ 'or "name" key'.format(role))
+ else:
+ results.extend(_look_for_role_files(basedir, role, main=main))
+ return results
+
+
+def _rolepath(basedir: str, role: str) -> Optional[str]:
+ role_path = None
+
+ possible_paths = [
+ # if included from a playbook
+ path_dwim(basedir, os.path.join('roles', role)),
+ path_dwim(basedir, role),
+ # if included from roles/[role]/meta/main.yml
+ path_dwim(
+ basedir, os.path.join('..', '..', '..', 'roles', role)
+ ),
+ path_dwim(basedir, os.path.join('..', '..', role)),
+ ]
+
+ if constants.DEFAULT_ROLES_PATH:
+ search_locations = constants.DEFAULT_ROLES_PATH
+ if isinstance(search_locations, str):
+ search_locations = search_locations.split(os.pathsep)
+ for loc in search_locations:
+ loc = os.path.expanduser(loc)
+ possible_paths.append(path_dwim(loc, role))
+
+ possible_paths.append(path_dwim(basedir, ''))
+
+ for path_option in possible_paths:
+ if os.path.isdir(path_option):
+ role_path = path_option
+ break
+
+ if role_path:
+ add_all_plugin_dirs(role_path)
+
+ return role_path
+
+
+def _look_for_role_files(basedir: str, role: str, main='main') -> list:
+ role_path = _rolepath(basedir, role)
+ if not role_path:
+ return []
+
+ results = []
+
+ for th in ['tasks', 'handlers', 'meta']:
+ current_path = os.path.join(role_path, th)
+ for dir, subdirs, files in os.walk(current_path):
+ for file in files:
+ file_ignorecase = file.lower()
+ if file_ignorecase.endswith(('.yml', '.yaml')):
+ thpath = os.path.join(dir, file)
+ results.append({'path': thpath, 'type': th})
+
+ return results
+
+
+def rolename(filepath):
+ idx = filepath.find('roles/')
+ if idx < 0:
+ return ''
+ role = filepath[idx + 6:]
+ role = role[:role.find('/')]
+ return role
+
+
+def _kv_to_dict(v):
+ (command, args, kwargs) = tokenize(v)
+ return dict(__ansible_module__=command, __ansible_arguments__=args, **kwargs)
+
+
+def _sanitize_task(task: dict) -> dict:
+ """Return a stripped-off task structure compatible with new Ansible.
+
+ This helper takes a copy of the incoming task and drops
+ any internally used keys from it.
+ """
+ result = task.copy()
+ # task is an AnsibleMapping which inherits from OrderedDict, so we need
+ # to use `del` to remove unwanted keys.
+ for k in ['skipped_rules', FILENAME_KEY, LINE_NUMBER_KEY]:
+ if k in result:
+ del result[k]
+ return result
+
+
+# FIXME: drop noqa once this function is made simpler
+# Ref: https://github.com/ansible/ansible-lint/issues/744
+def normalize_task_v2(task: dict) -> dict: # noqa: C901
+ """Ensure tasks have an action key and strings are converted to python objects."""
+ result = dict()
+ if 'always_run' in task:
+ # FIXME(ssbarnea): Delayed import to avoid circular import
+ # See https://github.com/ansible/ansible-lint/issues/880
+ # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
+ from ansiblelint.rules.AlwaysRunRule import AlwaysRunRule
+
+ raise MatchError(
+ rule=AlwaysRunRule,
+ filename=task[FILENAME_KEY],
+ linenumber=task[LINE_NUMBER_KEY])
+
+ sanitized_task = _sanitize_task(task)
+ mod_arg_parser = ModuleArgsParser(sanitized_task)
+ try:
+ action, arguments, result['delegate_to'] = mod_arg_parser.parse()
+ except AnsibleParserError as e:
+ try:
+ task_info = "%s:%s" % (task[FILENAME_KEY], task[LINE_NUMBER_KEY])
+ except KeyError:
+ task_info = "Unknown"
+ pp = pprint.PrettyPrinter(indent=2)
+ task_pprint = pp.pformat(sanitized_task)
+
+ _logger.critical("Couldn't parse task at %s (%s)\n%s", task_info, e.message, task_pprint)
+ raise SystemExit(ANSIBLE_FAILURE_RC)
+
+ # denormalize shell -> command conversion
+ if '_uses_shell' in arguments:
+ action = 'shell'
+ del arguments['_uses_shell']
+
+ for (k, v) in list(task.items()):
+ if k in ('action', 'local_action', 'args', 'delegate_to') or k == action:
+ # we don't want to re-assign these values, which were
+ # determined by the ModuleArgsParser() above
+ continue
+ else:
+ result[k] = v
+
+ result['action'] = dict(__ansible_module__=action)
+
+ if '_raw_params' in arguments:
+ result['action']['__ansible_arguments__'] = arguments['_raw_params'].split(' ')
+ del arguments['_raw_params']
+ else:
+ result['action']['__ansible_arguments__'] = list()
+
+ if 'argv' in arguments and not result['action']['__ansible_arguments__']:
+ result['action']['__ansible_arguments__'] = arguments['argv']
+ del arguments['argv']
+
+ result['action'].update(arguments)
+ return result
+
+
+# FIXME: drop noqa once this function is made simpler
+# Ref: https://github.com/ansible/ansible-lint/issues/744
+def normalize_task_v1(task): # noqa: C901
+ result = dict()
+ for (k, v) in task.items():
+ if k in VALID_KEYS or k.startswith('with_'):
+ if k == 'local_action' or k == 'action':
+ if not isinstance(v, dict):
+ v = _kv_to_dict(v)
+ v['__ansible_arguments__'] = v.get('__ansible_arguments__', list())
+ result['action'] = v
+ else:
+ result[k] = v
+ else:
+ if isinstance(v, str):
+ v = _kv_to_dict(k + ' ' + v)
+ elif not v:
+ v = dict(__ansible_module__=k)
+ else:
+ if isinstance(v, dict):
+ v.update(dict(__ansible_module__=k))
+ else:
+ if k == '__line__':
+ # Keep the line number stored
+ result[k] = v
+ continue
+
+ else:
+ # Tasks that include playbooks (rather than task files)
+ # can get here
+ # https://github.com/ansible/ansible-lint/issues/138
+ raise RuntimeError("Was not expecting value %s of type %s for key %s\n"
+ "Task: %s. Check the syntax of your playbook using "
+ "ansible-playbook --syntax-check" %
+ (str(v), type(v), k, str(task)))
+ v['__ansible_arguments__'] = v.get('__ansible_arguments__', list())
+ result['action'] = v
+ if 'module' in result['action']:
+ # this happens when a task uses
+ # local_action:
+ # module: ec2
+ # etc...
+ result['action']['__ansible_module__'] = result['action']['module']
+ del result['action']['module']
+ if 'args' in result:
+ result['action'].update(result.get('args'))
+ del result['args']
+ return result
+
+
+def normalize_task(task, filename):
+ ansible_action_type = task.get('__ansible_action_type__', 'task')
+ if '__ansible_action_type__' in task:
+ del task['__ansible_action_type__']
+ task = normalize_task_v2(task)
+ task[FILENAME_KEY] = filename
+ task['__ansible_action_type__'] = ansible_action_type
+ return task
+
+
+def task_to_str(task):
+ name = task.get("name")
+ if name:
+ return name
+ action = task.get("action")
+ args = " ".join([u"{0}={1}".format(k, v) for (k, v) in action.items()
+ if k not in ["__ansible_module__", "__ansible_arguments__"]] +
+ action.get("__ansible_arguments__"))
+ return u"{0} {1}".format(action["__ansible_module__"], args)
+
+
+def extract_from_list(blocks, candidates):
+ results = list()
+ for block in blocks:
+ for candidate in candidates:
+ if isinstance(block, dict) and candidate in block:
+ if isinstance(block[candidate], list):
+ results.extend(add_action_type(block[candidate], candidate))
+ elif block[candidate] is not None:
+ raise RuntimeError(
+ "Key '%s' defined, but bad value: '%s'" %
+ (candidate, str(block[candidate])))
+ return results
+
+
+def add_action_type(actions, action_type):
+ results = list()
+ for action in actions:
+ # ignore empty task
+ if not action:
+ continue
+ action['__ansible_action_type__'] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type]
+ results.append(action)
+ return results
+
+
+def get_action_tasks(yaml, file):
+ tasks = list()
+ if file['type'] in ['tasks', 'handlers']:
+ tasks = add_action_type(yaml, file['type'])
+ else:
+ tasks.extend(extract_from_list(yaml, ['tasks', 'handlers', 'pre_tasks', 'post_tasks']))
+
+ # Add sub-elements of block/rescue/always to tasks list
+ tasks.extend(extract_from_list(tasks, ['block', 'rescue', 'always']))
+ # Remove block/rescue/always elements from tasks list
+ block_rescue_always = ('block', 'rescue', 'always')
+ tasks[:] = [task for task in tasks if all(k not in task for k in block_rescue_always)]
+
+ return [task for task in tasks if
+ set(['include', 'include_tasks',
+ 'import_playbook', 'import_tasks']).isdisjoint(task.keys())]
+
+
+def get_normalized_tasks(yaml, file):
+ tasks = get_action_tasks(yaml, file)
+ res = []
+ for task in tasks:
+ # An empty `tags` block causes `None` to be returned if
+ # the `or []` is not present - `task.get('tags', [])`
+ # does not suffice.
+ if 'skip_ansible_lint' in (task.get('tags') or []):
+ # No need to normalize_task is we are skipping it.
+ continue
+ res.append(normalize_task(task, file['path']))
+
+ return res
+
+
+@lru_cache(maxsize=128)
+def parse_yaml_linenumbers(data, filename):
+ """Parse yaml as ansible.utils.parse_yaml but with linenumbers.
+
+ The line numbers are stored in each node's LINE_NUMBER_KEY key.
+ """
+ def compose_node(parent, index):
+ # the line number where the previous token has ended (plus empty lines)
+ line = loader.line
+ node = Composer.compose_node(loader, parent, index)
+ node.__line__ = line + 1
+ return node
+
+ def construct_mapping(node, deep=False):
+ mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep)
+ if hasattr(node, '__line__'):
+ mapping[LINE_NUMBER_KEY] = node.__line__
+ else:
+ mapping[LINE_NUMBER_KEY] = mapping._line_number
+ mapping[FILENAME_KEY] = filename
+ return mapping
+
+ try:
+ kwargs = {}
+ if 'vault_password' in inspect.getfullargspec(AnsibleLoader.__init__).args:
+ kwargs['vault_password'] = DEFAULT_VAULT_PASSWORD
+ loader = AnsibleLoader(data, **kwargs)
+ loader.compose_node = compose_node
+ loader.construct_mapping = construct_mapping
+ data = loader.get_single_data()
+ except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
+ raise SystemExit("Failed to parse YAML in %s: %s" % (filename, str(e)))
+ return data
+
+
+def get_first_cmd_arg(task):
+ try:
+ if 'cmd' in task['action']:
+ first_cmd_arg = task['action']['cmd'].split()[0]
+ else:
+ first_cmd_arg = task['action']['__ansible_arguments__'][0]
+ except IndexError:
+ return None
+ return first_cmd_arg
+
+
+def is_playbook(filename: str) -> bool:
+ """
+ Check if the file is a playbook.
+
+ Given a filename, it should return true if it looks like a playbook. The
+ function is not supposed to raise exceptions.
+ """
+ # we assume is a playbook if we loaded a sequence of dictionaries where
+ # at least one of these keys is present:
+ playbooks_keys = {
+ "gather_facts",
+ "hosts",
+ "import_playbook",
+ "post_tasks",
+ "pre_tasks",
+ "roles"
+ "tasks",
+ }
+
+ # makes it work with Path objects by converting them to strings
+ if not isinstance(filename, str):
+ filename = str(filename)
+
+ try:
+ f = parse_yaml_from_file(filename)
+ except Exception as e:
+ _logger.warning(
+ "Failed to load %s with %s, assuming is not a playbook.",
+ filename, e)
+ else:
+ if (
+ isinstance(f, AnsibleSequence) and
+ hasattr(f, 'keys') and
+ playbooks_keys.intersection(next(iter(f), {}).keys())
+ ):
+ return True
+ return False
+
+
+def get_yaml_files(options: Namespace) -> dict:
+ """Find all yaml files."""
+ # git is preferred as it also considers .gitignore
+ git_command = ['git', 'ls-files', '*.yaml', '*.yml']
+ _logger.info("Discovering files to lint: %s", ' '.join(git_command))
+
+ out = None
+
+ try:
+ out = subprocess.check_output(
+ git_command,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True
+ ).split()
+ except subprocess.CalledProcessError as exc:
+ _logger.warning(
+ "Failed to discover yaml files to lint using git: %s",
+ exc.output.rstrip('\n')
+ )
+ except FileNotFoundError as exc:
+ if options.verbosity:
+ _logger.warning(
+ "Failed to locate command: %s", exc
+ )
+
+ if out is None:
+ out = [
+ os.path.join(root, name)
+ for root, dirs, files in os.walk('.')
+ for name in files
+ if name.endswith('.yaml') or name.endswith('.yml')
+ ]
+
+ return OrderedDict.fromkeys(sorted(out))
+
+
+# FIXME: drop noqa once this function is made simpler
+# Ref: https://github.com/ansible/ansible-lint/issues/744
+def get_playbooks_and_roles(options=None) -> List[str]: # noqa: C901
+ """Find roles and playbooks."""
+ if options is None:
+ options = {}
+
+ files = get_yaml_files(options)
+
+ playbooks = []
+ role_dirs = []
+ role_internals = {
+ 'defaults',
+ 'files',
+ 'handlers',
+ 'meta',
+ 'tasks',
+ 'templates',
+ 'vars',
+ }
+
+ # detect role in repository root:
+ if 'tasks/main.yml' in files or 'tasks/main.yaml' in files:
+ role_dirs.append('.')
+
+ for p in map(Path, files):
+
+ try:
+ for file_path in options.exclude_paths:
+ if str(p.resolve()).startswith(str(file_path)):
+ raise FileNotFoundError(
+ f'File {file_path} matched exclusion entry: {p}')
+ except FileNotFoundError as e:
+ _logger.debug('Ignored %s due to: %s', p, e)
+ continue
+
+ if (next((i for i in p.parts if i.endswith('playbooks')), None) or
+ 'playbook' in p.parts[-1]):
+ playbooks.append(normpath(p))
+ continue
+
+ # ignore if any folder ends with _vars
+ if next((i for i in p.parts if i.endswith('_vars')), None):
+ continue
+ elif 'roles' in p.parts or '.' in role_dirs:
+ if 'tasks' in p.parts and p.parts[-1] in ['main.yaml', 'main.yml']:
+ role_dirs.append(str(p.parents[1]))
+ continue
+ elif role_internals.intersection(p.parts):
+ continue
+ elif 'tests' in p.parts:
+ playbooks.append(normpath(p))
+ if 'molecule' in p.parts:
+ if p.parts[-1] != 'molecule.yml':
+ playbooks.append(normpath(p))
+ continue
+ # hidden files are clearly not playbooks, likely config files.
+ if p.parts[-1].startswith('.'):
+ continue
+
+ if is_playbook(str(p)):
+ playbooks.append(normpath(p))
+ continue
+
+ _logger.info('Unknown file type: %s', normpath(p))
+
+ _logger.info('Found roles: %s', ' '.join(role_dirs))
+ _logger.info('Found playbooks: %s', ' '.join(playbooks))
+
+ return role_dirs + playbooks
+
+
+def expand_path_vars(path: str) -> str:
+ """Expand the environment or ~ variables in a path string."""
+ # It may be possible for function to be called with a Path object
+ path = str(path).strip()
+ path = os.path.expanduser(path)
+ path = os.path.expandvars(path)
+ return path
+
+
+def expand_paths_vars(paths: List[str]) -> List[str]:
+ """Expand the environment or ~ variables in a list."""
+ paths = [expand_path_vars(p) for p in paths]
+ return paths
+
+
+def get_rules_dirs(rulesdir: List[str], use_default: bool) -> List[str]:
+ """Return a list of rules dirs."""
+ default_ruledirs = [DEFAULT_RULESDIR]
+ default_custom_rulesdir = os.environ.get(
+ CUSTOM_RULESDIR_ENVVAR, os.path.join(DEFAULT_RULESDIR, "custom")
+ )
+ custom_ruledirs = sorted(
+ str(rdir.resolve())
+ for rdir in Path(default_custom_rulesdir).iterdir()
+ if rdir.is_dir() and (rdir / "__init__.py").exists()
+ )
+ if use_default:
+ return rulesdir + custom_ruledirs + default_ruledirs
+
+ return rulesdir or custom_ruledirs + default_ruledirs
diff --git a/lib/ansiblelint/version.py b/lib/ansiblelint/version.py
new file mode 100644
index 0000000..7bbf973
--- /dev/null
+++ b/lib/ansiblelint/version.py
@@ -0,0 +1,12 @@
+"""Ansible-lint version information."""
+
+try:
+ import pkg_resources
+except ImportError:
+ pass
+
+
+try:
+ __version__ = pkg_resources.get_distribution('ansible-lint').version
+except Exception:
+ __version__ = 'unknown'