1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
|
"""All internal ansible-lint rules."""
from __future__ import annotations
import copy
import inspect
import logging
import re
import sys
from collections import defaultdict
from collections.abc import Iterable, Iterator, MutableMapping, MutableSequence
from importlib import import_module
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
import ansiblelint.skip_utils
import ansiblelint.utils
import ansiblelint.yaml_utils
from ansiblelint._internal.rules import (
AnsibleParserErrorRule,
BaseRule,
LoadingFailureRule,
RuntimeErrorRule,
WarningRule,
)
from ansiblelint.app import App, get_app
from ansiblelint.config import PROFILES, Options
from ansiblelint.config import options as default_options
from ansiblelint.constants import LINE_NUMBER_KEY, RULE_DOC_URL, SKIPPED_RULES_KEY
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable, expand_paths_vars
if TYPE_CHECKING:
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from ansiblelint.errors import RuleMatchTransformMeta
_logger = logging.getLogger(__name__)
match_types = {
"matchlines": "line",
"match": "line", # called by matchlines
"matchtasks": "task",
"matchtask": "task", # called by matchtasks
"matchyaml": "yaml",
"matchplay": "play", # called by matchyaml
"matchdir": "dir",
}
class AnsibleLintRule(BaseRule):
"""AnsibleLintRule should be used as base for writing new rules."""
@property
def url(self) -> str:
"""Return rule documentation url."""
return RULE_DOC_URL + self.id + "/"
def get_config(self, key: str) -> Any:
"""Return a configured value for given key string."""
return self.rule_config.get(key, None)
@staticmethod
def unjinja(text: str) -> str:
"""Remove jinja2 bits from a string."""
text = re.sub(r"{{.+?}}", "JINJA_EXPRESSION", text)
text = re.sub(r"{%.+?%}", "JINJA_STATEMENT", text)
text = re.sub(r"{#.+?#}", "JINJA_COMMENT", text)
return text
# pylint: disable=too-many-arguments
def create_matcherror(
self,
message: str = "",
lineno: int = 1,
details: str = "",
filename: Lintable | None = None,
tag: str = "",
transform_meta: RuleMatchTransformMeta | None = None,
) -> MatchError:
"""Instantiate a new MatchError."""
match = MatchError(
message=message,
lineno=lineno,
details=details,
lintable=filename or Lintable(""),
rule=copy.copy(self),
tag=tag,
transform_meta=transform_meta,
)
# search through callers to find one of the match* methods
frame = inspect.currentframe()
match_type: str | None = None
while not match_type and frame is not None:
func_name = frame.f_code.co_name
match_type = match_types.get(func_name)
if match_type:
# add the match_type to the match
match.match_type = match_type
break
frame = frame.f_back # get the parent frame for the next iteration
return match
@staticmethod
def _enrich_matcherror_with_task_details(
match: MatchError,
task: ansiblelint.utils.Task,
) -> None:
match.task = task
if not match.details:
match.details = "Task/Handler: " + ansiblelint.utils.task_to_str(task)
match.lineno = max(match.lineno, task[LINE_NUMBER_KEY])
def matchlines(self, file: Lintable) -> list[MatchError]:
matches: list[MatchError] = []
# arrays are 0-based, line numbers are 1-based
# so use prev_line_no as the counter
for prev_line_no, line in enumerate(file.content.split("\n")):
if line.lstrip().startswith("#"):
continue
rule_id_list = ansiblelint.skip_utils.get_rule_skips_from_line(
line,
lintable=file,
)
if self.id in rule_id_list:
continue
result = self.match(line)
if not result:
continue
message = ""
if isinstance(result, str):
message = result
matcherror = self.create_matcherror(
message=message,
lineno=prev_line_no + 1,
details=line,
filename=file,
)
matches.append(matcherror)
return matches
def matchtasks(self, file: Lintable) -> list[MatchError]:
"""Call matchtask for each task inside file and return aggregate results.
Most rules will never need to override matchtasks because its main
purpose is to call matchtask for each task/handlers in the same file,
and to aggregate the results.
"""
matches: list[MatchError] = []
if (
file.kind not in ["handlers", "tasks", "playbook"]
or str(file.base_kind) != "text/yaml"
):
return matches
for task in ansiblelint.utils.task_in_list(
data=file.data,
kind=file.kind,
file=file,
):
if task.error is not None:
# normalize_task converts AnsibleParserError to MatchError
return [task.error]
if (
self.id in task.skip_tags
or ("action" not in task.normalized_task)
or "skip_ansible_lint" in task.normalized_task.get("tags", [])
):
continue
if self.needs_raw_task:
task.normalized_task["__raw_task__"] = task.raw_task
result = self.matchtask(task, file=file)
if not result:
continue
if isinstance(result, Iterable) and not isinstance(
result,
str,
): # list[MatchError]
# https://github.com/PyCQA/pylint/issues/6044
# pylint: disable=not-an-iterable
for match in result:
if match.tag in task.skip_tags:
continue
self._enrich_matcherror_with_task_details(
match,
task,
)
matches.append(match)
continue
if isinstance(result, MatchError):
if result.tag in task.skip_tags:
continue
match = result
else: # bool or string
message = ""
if isinstance(result, str):
message = result
match = self.create_matcherror(
message=message,
lineno=task.normalized_task[LINE_NUMBER_KEY],
filename=file,
)
self._enrich_matcherror_with_task_details(match, task)
matches.append(match)
return matches
def matchyaml(self, file: Lintable) -> list[MatchError]:
matches: list[MatchError] = []
if str(file.base_kind) != "text/yaml":
return matches
yaml = file.data
# yaml returned can be an AnsibleUnicode (a string) when the yaml
# file contains a single string. YAML spec allows this but we consider
# this an fatal error.
if isinstance(yaml, str):
if yaml.startswith("$ANSIBLE_VAULT"):
return []
if self._collection is None:
msg = f"Rule {self.id} was not added to a collection."
raise RuntimeError(msg)
return [
# pylint: disable=E1136
MatchError(
lintable=file,
rule=self._collection["load-failure"],
),
]
if not yaml:
return matches
if isinstance(yaml, dict):
yaml = [yaml]
for play in yaml:
# Bug #849
if play is None:
continue
if self.id in play.get(SKIPPED_RULES_KEY, ()):
continue
if "skip_ansible_lint" in play.get("tags", []):
continue
matches.extend(self.matchplay(file, play))
return matches
class TransformMixin:
"""A mixin for AnsibleLintRule to enable transforming files.
If ansible-lint is started with the ``--fix`` option, then the ``Transformer``
will call the ``transform()`` method for every MatchError identified if the rule
that identified it subclasses this ``TransformMixin``. Only the rule that identified
a MatchError can do transforms to fix that match.
"""
def transform(
self,
match: MatchError,
lintable: Lintable,
data: CommentedMap | CommentedSeq | str,
) -> None:
"""Transform ``data`` to try to fix the MatchError identified by this rule.
The ``match`` was generated by this rule in the ``lintable`` file.
When ``transform()`` is called on a rule, the rule should either fix the
issue, if possible, or make modifications that make it easier to fix manually.
The transform must set ``match.fixed = True`` when data has been transformed to
fix the error.
For YAML files, ``data`` is an editable YAML dict/array that preserves
any comments that were in the original file.
.. code:: python
data[0]["tasks"][0]["when"] = False
This is easier with the ``seek()`` utility method:
.. code :: python
target_task = self.seek(match.yaml_path, data)
target_task["when"] = False
For any files that aren't YAML, ``data`` is the loaded file's content as a string.
To edit non-YAML files, save the updated contents in ``lintable.content``:
.. code:: python
new_data = self.do_something_to_fix_the_match(data)
lintable.content = new_data
"""
@staticmethod
def seek(
yaml_path: list[int | str],
data: MutableMapping[str, Any] | MutableSequence[Any] | str,
) -> Any:
"""Get the element identified by ``yaml_path`` in ``data``.
Rules that work with YAML need to seek, or descend, into nested YAML data
structures to perform the relevant transforms. For example:
.. code:: python
def transform(self, match, lintable, data):
target_task = self.seek(match.yaml_path, data)
# transform target_task
"""
if isinstance(data, str):
# can't descend into a string
return data
target = data
for segment in yaml_path:
# The cast() calls tell mypy what types we expect.
# Essentially this does:
if isinstance(segment, str):
target = cast(MutableMapping[str, Any], target)[segment]
elif isinstance(segment, int):
target = cast(MutableSequence[Any], target)[segment]
return target
def load_plugins(
dirs: list[str],
) -> Iterator[AnsibleLintRule]:
"""Yield a rule class."""
def all_subclasses(cls: type) -> set[type]:
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)],
)
orig_sys_path = sys.path.copy()
for directory in dirs:
if directory not in sys.path:
sys.path.append(str(directory))
# load all modules in the directory
for f in Path(directory).glob("*.py"):
if "__" not in f.stem and f.stem not in "conftest":
import_module(f"{f.stem}")
# restore sys.path
sys.path = orig_sys_path
rules: dict[str, BaseRule] = {}
for rule in all_subclasses(BaseRule):
# we do not return the rules that are not loaded from passed 'directory'
# or rules that do not have a valid id. For example, during testing
# python may load other rule classes, some outside the tested rule
# directories.
if (
rule.id # type: ignore[attr-defined]
and Path(inspect.getfile(rule)).parent.absolute()
in [Path(x).absolute() for x in dirs]
and issubclass(rule, BaseRule)
and rule.id not in rules
):
rules[rule.id] = rule()
for rule in rules.values(): # type: ignore[assignment]
if isinstance(rule, AnsibleLintRule) and bool(rule.id):
yield rule
class RulesCollection:
"""Container for a collection of rules."""
def __init__( # pylint: disable=too-many-arguments
self,
rulesdirs: list[str] | list[Path] | None = None,
options: Options | None = None,
profile_name: str | None = None,
*,
conditional: bool = True,
app: App | None = None,
) -> None:
"""Initialize a RulesCollection instance."""
if options is None:
self.options = copy.deepcopy(default_options)
# When initialized without options argument we want it to always
# be offline as this is done only during testing.
self.options.offline = True
else:
self.options = options
self.profile = []
self.app = app or get_app(cached=True)
if profile_name:
self.profile = PROFILES[profile_name]
rulesdirs_str = [] if rulesdirs is None else [str(r) for r in rulesdirs]
self.rulesdirs = expand_paths_vars(rulesdirs_str)
self.rules: list[BaseRule] = []
# internal rules included in order to expose them for docs as they are
# not directly loaded by our rule loader.
self.rules.extend(
[
RuntimeErrorRule(),
AnsibleParserErrorRule(),
LoadingFailureRule(),
WarningRule(),
],
)
for rule in self.rules:
rule._collection = self # noqa: SLF001
for rule in load_plugins(rulesdirs_str):
self.register(rule, conditional=conditional)
self.rules = sorted(self.rules)
# When we have a profile we unload some of the rules
# But we do include all rules when listing all rules or tags
if profile_name and not (self.options.list_rules or self.options.list_tags):
filter_rules_with_profile(self.rules, profile_name)
def register(self, obj: AnsibleLintRule, *, conditional: bool = False) -> None:
"""Register a rule."""
# We skip opt-in rules which were not manually enabled.
# But we do include opt-in rules when listing all rules or tags
obj._collection = self # pylint: disable=protected-access # noqa: SLF001
if any(
[
not conditional,
self.profile, # when profile is used we load all rules and filter later
"opt-in" not in obj.tags,
obj.id in self.options.enable_list,
self.options.list_rules,
self.options.list_tags,
],
):
self.rules.append(obj)
def __iter__(self) -> Iterator[BaseRule]:
"""Return the iterator over the rules in the RulesCollection."""
return iter(sorted(self.rules))
def alphabetical(self) -> Iterator[BaseRule]:
"""Return an iterator over the rules in the RulesCollection in alphabetical order."""
return iter(sorted(self.rules, key=lambda x: x.id))
def __len__(self) -> int:
"""Return the length of the RulesCollection data."""
return len(self.rules)
def __getitem__(self, item: Any) -> BaseRule:
"""Return a rule from inside the collection based on its id."""
if not isinstance(item, str):
msg = f"Expected str but got {type(item)} when trying to access rule by it's id"
raise TypeError(msg)
for rule in self.rules:
if rule.id == item:
return rule
msg = f"Rule {item} is not present inside this collection."
raise ValueError(msg)
def extend(self, more: list[AnsibleLintRule]) -> None:
"""Combine rules."""
self.rules.extend(more)
def run(
self,
file: Lintable,
tags: set[str] | None = None,
skip_list: list[str] | None = None,
) -> list[MatchError]:
"""Run all the rules against the given lintable."""
matches: list[MatchError] = []
if tags is None:
tags = set()
if skip_list is None:
skip_list = []
if not file.path.is_dir():
try:
if file.content is not None: # loads the file content
pass
except (OSError, UnicodeDecodeError) as exc:
return [
MatchError(
message=str(exc),
lintable=file,
rule=self["load-failure"],
tag=f"{LoadingFailureRule.id}[{exc.__class__.__name__.lower()}]",
),
]
for rule in self.rules:
if rule.id == "syntax-check":
continue
if (
not tags
or rule.has_dynamic_tags
or not set(rule.tags).union([rule.id]).isdisjoint(tags)
):
if tags and set(rule.tags).union(list(rule.ids().keys())).isdisjoint(
tags,
):
_logger.debug("Skipping rule %s", rule.id)
else:
_logger.debug("Running rule %s", rule.id)
rule_definition = set(rule.tags)
rule_definition.add(rule.id)
if set(rule_definition).isdisjoint(skip_list):
matches.extend(rule.getmatches(file))
else:
_logger.debug("Skipping rule %s", rule.id)
# some rules can produce matches with tags that are inside our
# skip_list, so we need to cleanse the matches
matches = [m for m in matches if m.tag not in skip_list]
return matches
def __repr__(self) -> str:
"""Return a RulesCollection instance representation."""
return "\n".join(
[rule.verbose() for rule in sorted(self.rules, key=lambda x: x.id)],
)
def known_tags(self) -> list[str]:
"""Return a list of known tags, without returning no sub-tags."""
tags = set()
for rule in self.rules:
tags.add(rule.id)
for tag in rule.tags:
tags.add(tag)
return sorted(tags)
def list_tags(self) -> str:
"""Return a string with all the tags in the RulesCollection."""
tag_desc = {
"command-shell": "Specific to use of command and shell modules",
"core": "Related to internal implementation of the linter",
"deprecations": "Indicate use of features that are removed from Ansible",
"experimental": "Newly introduced rules, by default triggering only warnings",
"formatting": "Related to code-style",
"idempotency": "Possible indication that consequent runs would produce different results",
"idiom": "Anti-pattern detected, likely to cause undesired behavior",
"metadata": "Invalid metadata, likely related to galaxy, collections or roles",
"opt-in": "Rules that are not used unless manually added to `enable_list`",
"security": "Rules related o potentially security issues, like exposing credentials",
"syntax": "Related to wrong or deprecated syntax",
"unpredictability": "Warn about code that might not work in a predictable way",
"unskippable": "Indicate a fatal error that cannot be ignored or disabled",
"yaml": "External linter which will also produce its own rule codes",
}
tags = defaultdict(list)
for rule in self.rules:
# Fail early if a rule does not have any of our required tags
if not set(rule.tags).intersection(tag_desc.keys()):
msg = f"Rule {rule} does not have any of the required tags: {', '.join(tag_desc.keys())}"
raise RuntimeError(msg)
for tag in rule.tags:
tags[tag] = list(rule.ids())
result = "# List of tags and rules they cover\n"
for tag in sorted(tags):
desc = tag_desc.get(tag)
if desc:
result += f"{tag}: # {desc}\n"
else:
result += f"{tag}:\n"
for name in sorted(tags[tag]):
result += f" - {name}\n"
return result
def filter_rules_with_profile(rule_col: list[BaseRule], profile: str) -> None:
"""Unload rules that are not part of the specified profile."""
included = set()
extends = profile
total_rules = len(rule_col)
while extends:
for rule in PROFILES[extends]["rules"]:
_logger.debug("Activating rule `%s` due to profile `%s`", rule, extends)
included.add(rule)
extends = PROFILES[extends].get("extends", None)
for rule in rule_col.copy():
if rule.unloadable:
continue
if rule.id not in included:
_logger.debug(
"Unloading %s rule due to not being part of %s profile.",
rule.id,
profile,
)
rule_col.remove(rule)
_logger.debug("%s/%s rules included in the profile", len(rule_col), total_rules)
|