summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/rules/no_same_owner.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/rules/no_same_owner.py')
-rw-r--r--src/ansiblelint/rules/no_same_owner.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/src/ansiblelint/rules/no_same_owner.py b/src/ansiblelint/rules/no_same_owner.py
new file mode 100644
index 0000000..021900e
--- /dev/null
+++ b/src/ansiblelint/rules/no_same_owner.py
@@ -0,0 +1,114 @@
+"""Optional rule for avoiding keeping owner/group when transferring files."""
+from __future__ import annotations
+
+import re
+import sys
+from typing import TYPE_CHECKING, Any
+
+from ansible.utils.sentinel import Sentinel
+
+from ansiblelint.rules import AnsibleLintRule
+
+if TYPE_CHECKING:
+ from ansiblelint.file_utils import Lintable
+ from ansiblelint.utils import Task
+
+
+class NoSameOwnerRule(AnsibleLintRule):
+ """Do not preserve the owner and group when transferring files across hosts."""
+
+ id = "no-same-owner"
+ description = """
+Optional rule that highlights dangers of assuming that user/group on the remote
+machines may not exist on ansible controller or vice versa. Owner and group
+should not be preserved when transferring files between them.
+"""
+ severity = "LOW"
+ tags = ["opt-in"]
+
+ def matchtask(
+ self,
+ task: Task,
+ file: Lintable | None = None,
+ ) -> bool | str:
+ """Return matches for a task."""
+ action = task.get("action")
+ if not isinstance(action, dict): # pragma: no cover
+ return False
+
+ module = action["__ansible_module__"]
+
+ if module in ["synchronize", "ansible.posix.synchronize"]:
+ return self.handle_synchronize(task, action)
+
+ if module in ["unarchive", "ansible.builtin.unarchive"]:
+ return self.handle_unarchive(task, action)
+
+ return False
+
+ @staticmethod
+ def handle_synchronize(task: Any, action: dict[str, Any]) -> bool:
+ """Process a synchronize task."""
+ if task.get("delegate_to") != Sentinel:
+ return False
+
+ archive = action.get("archive", True)
+ if action.get("owner", archive) or action.get("group", archive):
+ return True
+ return False
+
+ @staticmethod
+ def handle_unarchive(task: Any, action: dict[str, Any]) -> bool:
+ """Process unarchive task."""
+ delegate_to = task.get("delegate_to")
+ if (
+ delegate_to == "localhost"
+ or delegate_to != "localhost"
+ and not action.get("remote_src")
+ ):
+ src = action.get("src")
+ if not isinstance(src, str):
+ return False
+
+ if src.endswith("zip") and "-X" in action.get("extra_opts", []):
+ return True
+ if re.search(
+ r".*\.tar(\.(gz|bz2|xz))?$",
+ src,
+ ) and "--no-same-owner" not in action.get("extra_opts", []):
+ return True
+ return False
+
+
+# testing code to be loaded only with pytest or when executed the rule file
+if "pytest" in sys.modules:
+ import pytest
+
+ from ansiblelint.rules import RulesCollection # pylint: disable=ungrouped-imports
+ from ansiblelint.runner import Runner # pylint: disable=ungrouped-imports
+
+ @pytest.mark.parametrize(
+ ("test_file", "failures"),
+ (
+ pytest.param(
+ "examples/roles/role_for_no_same_owner/tasks/fail.yml",
+ 12,
+ id="fail",
+ ),
+ pytest.param(
+ "examples/roles/role_for_no_same_owner/tasks/pass.yml",
+ 0,
+ id="pass",
+ ),
+ ),
+ )
+ def test_no_same_owner_rule(
+ default_rules_collection: RulesCollection,
+ test_file: str,
+ failures: int,
+ ) -> None:
+ """Test rule matches."""
+ results = Runner(test_file, rules=default_rules_collection).run()
+ assert len(results) == failures
+ for result in results:
+ assert result.message == NoSameOwnerRule().shortdesc