summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/rules/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/rules/__init__.py')
-rw-r--r--src/ansiblelint/rules/__init__.py83
1 files changed, 61 insertions, 22 deletions
diff --git a/src/ansiblelint/rules/__init__.py b/src/ansiblelint/rules/__init__.py
index acb7df1..a1743a0 100644
--- a/src/ansiblelint/rules/__init__.py
+++ b/src/ansiblelint/rules/__init__.py
@@ -1,4 +1,5 @@
"""All internal ansible-lint rules."""
+
from __future__ import annotations
import copy
@@ -23,7 +24,7 @@ from ansiblelint._internal.rules import (
WarningRule,
)
from ansiblelint.app import App, get_app
-from ansiblelint.config import PROFILES, Options, get_rule_config
+from ansiblelint.config import PROFILES, Options
from ansiblelint.config import options as default_options
from ansiblelint.constants import LINE_NUMBER_KEY, RULE_DOC_URL, SKIPPED_RULES_KEY
from ansiblelint.errors import MatchError
@@ -32,6 +33,8 @@ from ansiblelint.file_utils import Lintable, expand_paths_vars
if TYPE_CHECKING:
from ruamel.yaml.comments import CommentedMap, CommentedSeq
+ from ansiblelint.errors import RuleMatchTransformMeta
+
_logger = logging.getLogger(__name__)
match_types = {
@@ -53,11 +56,6 @@ class AnsibleLintRule(BaseRule):
"""Return rule documentation url."""
return RULE_DOC_URL + self.id + "/"
- @property
- def rule_config(self) -> dict[str, Any]:
- """Retrieve rule specific configuration."""
- return get_rule_config(self.id)
-
def get_config(self, key: str) -> Any:
"""Return a configured value for given key string."""
return self.rule_config.get(key, None)
@@ -78,6 +76,7 @@ class AnsibleLintRule(BaseRule):
details: str = "",
filename: Lintable | None = None,
tag: str = "",
+ transform_meta: RuleMatchTransformMeta | None = None,
) -> MatchError:
"""Instantiate a new MatchError."""
match = MatchError(
@@ -87,13 +86,14 @@ class AnsibleLintRule(BaseRule):
lintable=filename or Lintable(""),
rule=copy.copy(self),
tag=tag,
+ transform_meta=transform_meta,
)
# search through callers to find one of the match* methods
frame = inspect.currentframe()
match_type: str | None = None
while not match_type and frame is not None:
func_name = frame.f_code.co_name
- match_type = match_types.get(func_name, None)
+ match_type = match_types.get(func_name)
if match_type:
# add the match_type to the match
match.match_type = match_type
@@ -109,8 +109,8 @@ class AnsibleLintRule(BaseRule):
match.task = task
if not match.details:
match.details = "Task/Handler: " + ansiblelint.utils.task_to_str(task)
- if match.lineno < task[LINE_NUMBER_KEY]:
- match.lineno = task[LINE_NUMBER_KEY]
+
+ match.lineno = max(match.lineno, task[LINE_NUMBER_KEY])
def matchlines(self, file: Lintable) -> list[MatchError]:
matches: list[MatchError] = []
@@ -224,7 +224,16 @@ class AnsibleLintRule(BaseRule):
if isinstance(yaml, str):
if yaml.startswith("$ANSIBLE_VAULT"):
return []
- return [MatchError(lintable=file, rule=LoadingFailureRule())]
+ if self._collection is None:
+ msg = f"Rule {self.id} was not added to a collection."
+ raise RuntimeError(msg)
+ return [
+ # pylint: disable=E1136
+ MatchError(
+ lintable=file,
+ rule=self._collection["load-failure"],
+ ),
+ ]
if not yaml:
return matches
@@ -250,7 +259,7 @@ class AnsibleLintRule(BaseRule):
class TransformMixin:
"""A mixin for AnsibleLintRule to enable transforming files.
- If ansible-lint is started with the ``--write`` option, then the ``Transformer``
+ If ansible-lint is started with the ``--fix`` option, then the ``Transformer``
will call the ``transform()`` method for every MatchError identified if the rule
that identified it subclasses this ``TransformMixin``. Only the rule that identified
a MatchError can do transforms to fix that match.
@@ -324,7 +333,6 @@ class TransformMixin:
return target
-# pylint: disable=too-many-nested-blocks
def load_plugins(
dirs: list[str],
) -> Iterator[AnsibleLintRule]:
@@ -370,7 +378,7 @@ def load_plugins(
class RulesCollection:
"""Container for a collection of rules."""
- def __init__(
+ def __init__( # pylint: disable=too-many-arguments
self,
rulesdirs: list[str] | list[Path] | None = None,
options: Options | None = None,
@@ -388,7 +396,7 @@ class RulesCollection:
else:
self.options = options
self.profile = []
- self.app = app or get_app(offline=True)
+ self.app = app or get_app(cached=True)
if profile_name:
self.profile = PROFILES[profile_name]
@@ -405,6 +413,8 @@ class RulesCollection:
WarningRule(),
],
)
+ for rule in self.rules:
+ rule._collection = self # noqa: SLF001
for rule in load_plugins(rulesdirs_str):
self.register(rule, conditional=conditional)
self.rules = sorted(self.rules)
@@ -443,6 +453,17 @@ class RulesCollection:
"""Return the length of the RulesCollection data."""
return len(self.rules)
+ def __getitem__(self, item: Any) -> BaseRule:
+ """Return a rule from inside the collection based on its id."""
+ if not isinstance(item, str):
+ msg = f"Expected str but got {type(item)} when trying to access rule by it's id"
+ raise TypeError(msg)
+ for rule in self.rules:
+ if rule.id == item:
+ return rule
+ msg = f"Rule {item} is not present inside this collection."
+ raise ValueError(msg)
+
def extend(self, more: list[AnsibleLintRule]) -> None:
"""Combine rules."""
self.rules.extend(more)
@@ -469,7 +490,7 @@ class RulesCollection:
MatchError(
message=str(exc),
lintable=file,
- rule=LoadingFailureRule(),
+ rule=self["load-failure"],
tag=f"{LoadingFailureRule.id}[{exc.__class__.__name__.lower()}]",
),
]
@@ -482,10 +503,18 @@ class RulesCollection:
or rule.has_dynamic_tags
or not set(rule.tags).union([rule.id]).isdisjoint(tags)
):
- rule_definition = set(rule.tags)
- rule_definition.add(rule.id)
- if set(rule_definition).isdisjoint(skip_list):
- matches.extend(rule.getmatches(file))
+ if tags and set(rule.tags).union(list(rule.ids().keys())).isdisjoint(
+ tags,
+ ):
+ _logger.debug("Skipping rule %s", rule.id)
+ else:
+ _logger.debug("Running rule %s", rule.id)
+ rule_definition = set(rule.tags)
+ rule_definition.add(rule.id)
+ if set(rule_definition).isdisjoint(skip_list):
+ matches.extend(rule.getmatches(file))
+ else:
+ _logger.debug("Skipping rule %s", rule.id)
# some rules can produce matches with tags that are inside our
# skip_list, so we need to cleanse the matches
@@ -499,6 +528,15 @@ class RulesCollection:
[rule.verbose() for rule in sorted(self.rules, key=lambda x: x.id)],
)
+ def known_tags(self) -> list[str]:
+ """Return a list of known tags, without returning no sub-tags."""
+ tags = set()
+ for rule in self.rules:
+ tags.add(rule.id)
+ for tag in rule.tags:
+ tags.add(tag)
+ return sorted(tags)
+
def list_tags(self) -> str:
"""Return a string with all the tags in the RulesCollection."""
tag_desc = {
@@ -525,11 +563,10 @@ class RulesCollection:
msg = f"Rule {rule} does not have any of the required tags: {', '.join(tag_desc.keys())}"
raise RuntimeError(msg)
for tag in rule.tags:
- for id_ in rule.ids():
- tags[tag].append(id_)
+ tags[tag] = list(rule.ids())
result = "# List of tags and rules they cover\n"
for tag in sorted(tags):
- desc = tag_desc.get(tag, None)
+ desc = tag_desc.get(tag)
if desc:
result += f"{tag}: # {desc}\n"
else:
@@ -550,6 +587,8 @@ def filter_rules_with_profile(rule_col: list[BaseRule], profile: str) -> None:
included.add(rule)
extends = PROFILES[extends].get("extends", None)
for rule in rule_col.copy():
+ if rule.unloadable:
+ continue
if rule.id not in included:
_logger.debug(
"Unloading %s rule due to not being part of %s profile.",