summaryrefslogtreecommitdiffstats
path: root/test/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_utils.py')
-rw-r--r--test/test_utils.py449
1 files changed, 449 insertions, 0 deletions
diff --git a/test/test_utils.py b/test/test_utils.py
new file mode 100644
index 0000000..1b9a2dc
--- /dev/null
+++ b/test/test_utils.py
@@ -0,0 +1,449 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+"""Tests for generic utility functions."""
+from __future__ import annotations
+
+import logging
+import subprocess
+import sys
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import pytest
+from ansible.utils.sentinel import Sentinel
+from ansible_compat.runtime import Runtime
+
+from ansiblelint import cli, constants, utils
+from ansiblelint.__main__ import initialize_logger
+from ansiblelint.cli import get_rules_dirs
+from ansiblelint.constants import RC
+from ansiblelint.file_utils import Lintable, cwd
+from ansiblelint.runner import Runner
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+ from _pytest.capture import CaptureFixture
+ from _pytest.logging import LogCaptureFixture
+ from _pytest.monkeypatch import MonkeyPatch
+
+ from ansiblelint.rules import RulesCollection
+
+
+runtime = Runtime(require_module=True)
+
+
+@pytest.mark.parametrize(
+ ("string", "expected_cmd", "expected_args", "expected_kwargs"),
+ (
+ pytest.param("", "", [], {}, id="blank"),
+ pytest.param("vars:", "vars", [], {}, id="single_word"),
+ pytest.param("hello: a=1", "hello", [], {"a": "1"}, id="string_module_and_arg"),
+ pytest.param("action: hello a=1", "hello", [], {"a": "1"}, id="strips_action"),
+ pytest.param(
+ "action: whatever bobbins x=y z=x c=3",
+ "whatever",
+ ["bobbins", "x=y", "z=x", "c=3"],
+ {},
+ id="more_than_one_arg",
+ ),
+ pytest.param(
+ "action: command chdir=wxy creates=zyx tar xzf zyx.tgz",
+ "command",
+ ["tar", "xzf", "zyx.tgz"],
+ {"chdir": "wxy", "creates": "zyx"},
+ id="command_with_args",
+ ),
+ ),
+)
+def test_tokenize(
+ string: str,
+ expected_cmd: str,
+ expected_args: Sequence[str],
+ expected_kwargs: dict[str, Any],
+) -> None:
+ """Test that tokenize works for different input types."""
+ (cmd, args, kwargs) = utils.tokenize(string)
+ assert cmd == expected_cmd
+ assert args == expected_args
+ assert kwargs == expected_kwargs
+
+
+@pytest.mark.parametrize(
+ ("reference_form", "alternate_forms"),
+ (
+ pytest.param(
+ {"name": "hello", "action": "command chdir=abc echo hello world"},
+ ({"name": "hello", "command": "chdir=abc echo hello world"},),
+ id="simple_command",
+ ),
+ pytest.param(
+ {"git": {"version": "abc"}, "args": {"repo": "blah", "dest": "xyz"}},
+ (
+ {"git": {"version": "abc", "repo": "blah", "dest": "xyz"}},
+ {"git": "version=abc repo=blah dest=xyz"},
+ {
+ "git": None,
+ "args": {"repo": "blah", "dest": "xyz", "version": "abc"},
+ },
+ ),
+ id="args",
+ ),
+ ),
+)
+def test_normalize(
+ reference_form: dict[str, Any],
+ alternate_forms: tuple[dict[str, Any]],
+) -> None:
+ """Test that tasks specified differently are normalized same way."""
+ normal_form = utils.normalize_task(reference_form, "tasks.yml")
+
+ for form in alternate_forms:
+ assert normal_form == utils.normalize_task(form, "tasks.yml")
+
+
+def test_normalize_complex_command() -> None:
+ """Test that tasks specified differently are normalized same way."""
+ task1 = {
+ "name": "hello",
+ "action": {"module": "pip", "name": "df", "editable": "false"},
+ }
+ task2 = {"name": "hello", "pip": {"name": "df", "editable": "false"}}
+ task3 = {"name": "hello", "pip": "name=df editable=false"}
+ task4 = {"name": "hello", "action": "pip name=df editable=false"}
+ assert utils.normalize_task(task1, "tasks.yml") == utils.normalize_task(
+ task2,
+ "tasks.yml",
+ )
+ assert utils.normalize_task(task2, "tasks.yml") == utils.normalize_task(
+ task3,
+ "tasks.yml",
+ )
+ assert utils.normalize_task(task3, "tasks.yml") == utils.normalize_task(
+ task4,
+ "tasks.yml",
+ )
+
+
+@pytest.mark.parametrize(
+ ("task", "expected_form"),
+ (
+ pytest.param(
+ {
+ "name": "ensure apache is at the latest version",
+ "yum": {"name": "httpd", "state": "latest"},
+ },
+ {
+ "delegate_to": Sentinel,
+ "name": "ensure apache is at the latest version",
+ "action": {
+ "__ansible_module__": "yum",
+ "__ansible_module_original__": "yum",
+ "name": "httpd",
+ "state": "latest",
+ },
+ },
+ id="0",
+ ),
+ pytest.param(
+ {
+ "name": "Attempt and graceful roll back",
+ "block": [
+ {
+ "name": "Install httpd and memcached",
+ "ansible.builtin.yum": ["httpd", "memcached"],
+ "state": "present",
+ },
+ ],
+ },
+ {
+ "name": "Attempt and graceful roll back",
+ "block": [
+ {
+ "name": "Install httpd and memcached",
+ "ansible.builtin.yum": ["httpd", "memcached"],
+ "state": "present",
+ },
+ ],
+ "action": {
+ "__ansible_module__": "block/always/rescue",
+ "__ansible_module_original__": "block/always/rescue",
+ },
+ },
+ id="1",
+ ),
+ ),
+)
+def test_normalize_task_v2(task: dict[str, Any], expected_form: dict[str, Any]) -> None:
+ """Check that it normalizes task and returns the expected form."""
+ assert utils.normalize_task_v2(task) == expected_form
+
+
+def test_extract_from_list() -> None:
+ """Check that tasks get extracted from blocks if present."""
+ block = {
+ "block": [{"tasks": {"name": "hello", "command": "whoami"}}],
+ "test_none": None,
+ "test_string": "foo",
+ }
+ blocks = [block]
+
+ test_list = utils.extract_from_list(blocks, ["block"])
+ test_none = utils.extract_from_list(blocks, ["test_none"])
+
+ assert list(block["block"]) == test_list # type: ignore[arg-type]
+ assert not test_none
+ with pytest.raises(RuntimeError):
+ utils.extract_from_list(blocks, ["test_string"])
+
+
+def test_extract_from_list_recursive() -> None:
+ """Check that tasks get extracted from blocks if present."""
+ block = {
+ "block": [{"block": [{"name": "hello", "command": "whoami"}]}],
+ }
+ blocks = [block]
+
+ test_list = utils.extract_from_list(blocks, ["block"])
+ assert list(block["block"]) == test_list
+
+ test_list_recursive = utils.extract_from_list(blocks, ["block"], recursive=True)
+ assert block["block"] + block["block"][0]["block"] == test_list_recursive
+
+
+@pytest.mark.parametrize(
+ ("template", "output"),
+ (
+ pytest.param("{{ playbook_dir }}", "/a/b/c", id="simple"),
+ pytest.param(
+ "{{ 'hello' | doesnotexist }}",
+ "hello", # newer implementation ignores unknown filters
+ id="unknown_filter",
+ ),
+ pytest.param(
+ "{{ hello | to_json }}",
+ "{{ hello | to_json }}",
+ id="to_json_filter_on_undefined_variable",
+ ),
+ pytest.param(
+ "{{ hello | to_nice_yaml }}",
+ "{{ hello | to_nice_yaml }}",
+ id="to_nice_yaml_filter_on_undefined_variable",
+ ),
+ ),
+)
+def test_template(template: str, output: str) -> None:
+ """Verify that resolvable template vars and filters get rendered."""
+ result = utils.template(
+ basedir=Path("/base/dir"),
+ value=template,
+ variables={"playbook_dir": "/a/b/c"},
+ fail_on_error=False,
+ )
+ assert result == output
+
+
+def test_task_to_str_unicode() -> None:
+ """Ensure that extracting messages from tasks preserves Unicode."""
+ task = {"fail": {"msg": "unicode é ô à"}}
+ result = utils.task_to_str(utils.normalize_task(task, "filename.yml"))
+ assert result == "fail msg=unicode é ô à"
+
+
+def test_logger_debug(caplog: LogCaptureFixture) -> None:
+ """Test that the double verbosity arg causes logger to be DEBUG."""
+ options = cli.get_config(["-vv"])
+ initialize_logger(options.verbosity)
+
+ expected_info = (
+ "ansiblelint.__main__",
+ logging.DEBUG,
+ "Logging initialized to level 10",
+ )
+
+ assert expected_info in caplog.record_tuples
+
+
+def test_cli_auto_detect(capfd: CaptureFixture[str]) -> None:
+ """Test that run without arguments it will detect and lint the entire repository."""
+ cmd = [
+ sys.executable,
+ "-m",
+ "ansiblelint",
+ "-x",
+ "schema", # exclude schema as our test file would fail it
+ "-v",
+ "-p",
+ "--nocolor",
+ ]
+ result = subprocess.run(cmd, check=False).returncode
+
+ # We de expect to fail on our own repo due to test examples we have
+ assert result == RC.VIOLATIONS_FOUND
+
+ out, err = capfd.readouterr()
+
+ # An expected rule match from our examples
+ assert (
+ "examples/playbooks/empty_playbook.yml:1:1: "
+ "syntax-check[empty-playbook]: Empty playbook, nothing to do" in out
+ )
+ # assures that our ansible-lint config exclude was effective in excluding github files
+ assert "Identified: .github/" not in out
+ # assures that we can parse playbooks as playbooks
+ assert "Identified: test/test/always-run-success.yml" not in err
+ assert (
+ "Executing syntax check on playbook examples/playbooks/mocked_dependency.yml"
+ in err
+ )
+
+
+def test_is_playbook() -> None:
+ """Verify that we can detect a playbook as a playbook."""
+ assert utils.is_playbook("examples/playbooks/always-run-success.yml")
+
+
+@pytest.mark.parametrize(
+ "exclude",
+ (pytest.param("foo", id="1"), pytest.param("foo/", id="2")),
+)
+def test_auto_detect_exclude(tmp_path: Path, exclude: str) -> None:
+ """Verify that exclude option can be used to narrow down detection."""
+ with cwd(tmp_path):
+ subprocess.check_output(
+ "git init",
+ stderr=subprocess.STDOUT,
+ text=True,
+ shell=True,
+ cwd=tmp_path,
+ )
+ (tmp_path / "foo").mkdir()
+ (tmp_path / "bar").mkdir()
+ (tmp_path / "foo" / "playbook.yml").touch()
+ (tmp_path / "bar" / "playbook.yml").touch()
+
+ options = cli.get_config(["--exclude", exclude])
+ options.cwd = tmp_path
+ result = utils.get_lintables(options)
+ assert result == [Lintable("bar/playbook.yml", kind="playbook")]
+
+ # now we also test with .gitignore exclude approach
+ (tmp_path / ".gitignore").write_text(f".gitignore\n{exclude}\n")
+ options = cli.get_config([])
+ options.cwd = tmp_path
+ result = utils.get_lintables(options)
+ assert result == [Lintable("bar/playbook.yml", kind="playbook")]
+
+
+_DEFAULT_RULEDIRS = [constants.DEFAULT_RULESDIR]
+_CUSTOM_RULESDIR = Path(__file__).parent / "custom_rules"
+_CUSTOM_RULEDIRS = [
+ _CUSTOM_RULESDIR / "example_inc",
+ _CUSTOM_RULESDIR / "example_com",
+]
+
+
+@pytest.mark.parametrize(
+ ("user_ruledirs", "use_default", "expected"),
+ (
+ ([], True, _DEFAULT_RULEDIRS),
+ ([], False, _DEFAULT_RULEDIRS),
+ (_CUSTOM_RULEDIRS, True, _CUSTOM_RULEDIRS + _DEFAULT_RULEDIRS),
+ (_CUSTOM_RULEDIRS, False, _CUSTOM_RULEDIRS),
+ ),
+)
+def test_get_rules_dirs(
+ user_ruledirs: list[Path],
+ use_default: bool,
+ expected: list[Path],
+) -> None:
+ """Test it returns expected dir lists."""
+ assert get_rules_dirs(user_ruledirs, use_default=use_default) == expected
+
+
+@pytest.mark.parametrize(
+ ("user_ruledirs", "use_default", "expected"),
+ (
+ ([], True, sorted(_CUSTOM_RULEDIRS) + _DEFAULT_RULEDIRS),
+ ([], False, sorted(_CUSTOM_RULEDIRS) + _DEFAULT_RULEDIRS),
+ (
+ _CUSTOM_RULEDIRS,
+ True,
+ _CUSTOM_RULEDIRS + sorted(_CUSTOM_RULEDIRS) + _DEFAULT_RULEDIRS,
+ ),
+ (_CUSTOM_RULEDIRS, False, _CUSTOM_RULEDIRS),
+ ),
+)
+def test_get_rules_dirs_with_custom_rules(
+ user_ruledirs: list[Path],
+ use_default: bool,
+ expected: list[Path],
+ monkeypatch: MonkeyPatch,
+) -> None:
+ """Test it returns expected dir lists when custom rules exist."""
+ monkeypatch.setenv(constants.CUSTOM_RULESDIR_ENVVAR, str(_CUSTOM_RULESDIR))
+ assert get_rules_dirs(user_ruledirs, use_default=use_default) == expected
+
+
+def test_find_children(default_rules_collection: RulesCollection) -> None:
+ """Verify correct function of find_children()."""
+ Runner(
+ rules=default_rules_collection,
+ ).find_children(Lintable("examples/playbooks/find_children.yml"))
+
+
+def test_find_children_in_task(default_rules_collection: RulesCollection) -> None:
+ """Verify correct function of find_children() in tasks."""
+ Runner(
+ Lintable("examples/playbooks/tasks/bug-2875.yml"),
+ rules=default_rules_collection,
+ ).run()
+
+
+@pytest.mark.parametrize(
+ ("file", "names", "positions"),
+ (
+ pytest.param(
+ "examples/playbooks/task_in_list-0.yml",
+ ["A", "B", "C", "D", "E", "F", "G"],
+ [
+ ".[0].tasks[0]",
+ ".[0].tasks[1]",
+ ".[0].pre_tasks[0]",
+ ".[0].post_tasks[0]",
+ ".[0].post_tasks[0].block[0]",
+ ".[0].post_tasks[0].rescue[0]",
+ ".[0].post_tasks[0].always[0]",
+ ],
+ id="0",
+ ),
+ ),
+)
+def test_task_in_list(file: str, names: list[str], positions: list[str]) -> None:
+ """Check that tasks get extracted from blocks if present."""
+ lintable = Lintable(file)
+ assert lintable.kind
+ tasks = list(
+ utils.task_in_list(data=lintable.data, file=lintable, kind=lintable.kind),
+ )
+ assert len(tasks) == len(names)
+ for index, task in enumerate(tasks):
+ assert task.name == names[index]
+ assert task.position == positions[index]