1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
"""Optional rule for avoiding keeping owner/group when transferring files."""
from __future__ import annotations
import re
import sys
from typing import TYPE_CHECKING, Any
from ansible.utils.sentinel import Sentinel
from ansiblelint.rules import AnsibleLintRule
if TYPE_CHECKING:
from ansiblelint.file_utils import Lintable
class NoSameOwnerRule(AnsibleLintRule):
"""Do not preserve the owner and group when transferring files across hosts."""
id = "no-same-owner"
description = """
Optional rule that highlights dangers of assuming that user/group on the remote
machines may not exist on ansible controller or vice versa. Owner and group
should not be preserved when transferring files between them.
"""
severity = "LOW"
tags = ["opt-in"]
def matchtask(
self, task: dict[str, Any], file: Lintable | None = None
) -> bool | str:
"""Return matches for a task."""
action = task.get("action")
if not isinstance(action, dict):
return False
module = action["__ansible_module__"]
if module in ["synchronize", "ansible.posix.synchronize"]:
return self.handle_synchronize(task, action)
if module in ["unarchive", "ansible.builtin.unarchive"]:
return self.handle_unarchive(task, action)
return False
@staticmethod
def handle_synchronize(task: Any, action: dict[str, Any]) -> bool:
"""Process a synchronize task."""
if task.get("delegate_to") != Sentinel:
return False
archive = action.get("archive", True)
if action.get("owner", archive) or action.get("group", archive):
return True
return False
@staticmethod
def handle_unarchive(task: Any, action: dict[str, Any]) -> bool:
"""Process unarchive task."""
delegate_to = task.get("delegate_to")
if (
delegate_to == "localhost"
or delegate_to != "localhost"
and not action.get("remote_src")
):
src = action.get("src")
if not isinstance(src, str):
return False
if src.endswith("zip"):
if "-X" in action.get("extra_opts", []):
return True
if re.search(r".*\.tar(\.(gz|bz2|xz))?$", src):
if "--no-same-owner" not in action.get("extra_opts", []):
return True
return False
# testing code to be loaded only with pytest or when executed the rule file
if "pytest" in sys.modules:
import pytest
from ansiblelint.rules import RulesCollection # pylint: disable=ungrouped-imports
from ansiblelint.runner import Runner # pylint: disable=ungrouped-imports
@pytest.mark.parametrize(
("test_file", "failures"),
(
pytest.param(
"examples/roles/role_for_no_same_owner/tasks/fail.yml", 12, id="fail"
),
pytest.param(
"examples/roles/role_for_no_same_owner/tasks/pass.yml", 0, id="pass"
),
),
)
def test_no_same_owner_rule(
default_rules_collection: RulesCollection, test_file: str, failures: int
) -> None:
"""Test rule matches."""
results = Runner(test_file, rules=default_rules_collection).run()
assert len(results) == failures
for result in results:
assert result.message == NoSameOwnerRule().shortdesc
|