diff options
Diffstat (limited to '')
-rw-r--r-- | test/test_utils.py | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 0000000..1b9a2dc --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,449 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Tests for generic utility functions.""" +from __future__ import annotations + +import logging +import subprocess +import sys +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import pytest +from ansible.utils.sentinel import Sentinel +from ansible_compat.runtime import Runtime + +from ansiblelint import cli, constants, utils +from ansiblelint.__main__ import initialize_logger +from ansiblelint.cli import get_rules_dirs +from ansiblelint.constants import RC +from ansiblelint.file_utils import Lintable, cwd +from ansiblelint.runner import Runner + +if TYPE_CHECKING: + from collections.abc import Sequence + + from _pytest.capture import CaptureFixture + from _pytest.logging import LogCaptureFixture + from _pytest.monkeypatch import MonkeyPatch + + from ansiblelint.rules import RulesCollection + + +runtime = Runtime(require_module=True) + + +@pytest.mark.parametrize( + ("string", "expected_cmd", "expected_args", "expected_kwargs"), + ( + pytest.param("", "", [], {}, id="blank"), + pytest.param("vars:", "vars", [], {}, id="single_word"), + pytest.param("hello: a=1", "hello", [], {"a": "1"}, id="string_module_and_arg"), + pytest.param("action: hello a=1", "hello", [], {"a": "1"}, id="strips_action"), + pytest.param( + "action: whatever bobbins x=y z=x c=3", + "whatever", + ["bobbins", "x=y", "z=x", "c=3"], + {}, + id="more_than_one_arg", + ), + pytest.param( + "action: command chdir=wxy creates=zyx tar xzf zyx.tgz", + "command", + ["tar", "xzf", "zyx.tgz"], + {"chdir": "wxy", "creates": "zyx"}, + id="command_with_args", + ), + ), +) +def test_tokenize( + string: str, + expected_cmd: str, + expected_args: Sequence[str], + expected_kwargs: dict[str, Any], +) -> None: + """Test that tokenize works for different input types.""" + (cmd, args, kwargs) = utils.tokenize(string) + assert cmd == expected_cmd + assert args == expected_args + assert kwargs == expected_kwargs + + +@pytest.mark.parametrize( + ("reference_form", "alternate_forms"), + ( + pytest.param( + {"name": "hello", "action": "command chdir=abc echo hello world"}, + ({"name": "hello", "command": "chdir=abc echo hello world"},), + id="simple_command", + ), + pytest.param( + {"git": {"version": "abc"}, "args": {"repo": "blah", "dest": "xyz"}}, + ( + {"git": {"version": "abc", "repo": "blah", "dest": "xyz"}}, + {"git": "version=abc repo=blah dest=xyz"}, + { + "git": None, + "args": {"repo": "blah", "dest": "xyz", "version": "abc"}, + }, + ), + id="args", + ), + ), +) +def test_normalize( + reference_form: dict[str, Any], + alternate_forms: tuple[dict[str, Any]], +) -> None: + """Test that tasks specified differently are normalized same way.""" + normal_form = utils.normalize_task(reference_form, "tasks.yml") + + for form in alternate_forms: + assert normal_form == utils.normalize_task(form, "tasks.yml") + + +def test_normalize_complex_command() -> None: + """Test that tasks specified differently are normalized same way.""" + task1 = { + "name": "hello", + "action": {"module": "pip", "name": "df", "editable": "false"}, + } + task2 = {"name": "hello", "pip": {"name": "df", "editable": "false"}} + task3 = {"name": "hello", "pip": "name=df editable=false"} + task4 = {"name": "hello", "action": "pip name=df editable=false"} + assert utils.normalize_task(task1, "tasks.yml") == utils.normalize_task( + task2, + "tasks.yml", + ) + assert utils.normalize_task(task2, "tasks.yml") == utils.normalize_task( + task3, + "tasks.yml", + ) + assert utils.normalize_task(task3, "tasks.yml") == utils.normalize_task( + task4, + "tasks.yml", + ) + + +@pytest.mark.parametrize( + ("task", "expected_form"), + ( + pytest.param( + { + "name": "ensure apache is at the latest version", + "yum": {"name": "httpd", "state": "latest"}, + }, + { + "delegate_to": Sentinel, + "name": "ensure apache is at the latest version", + "action": { + "__ansible_module__": "yum", + "__ansible_module_original__": "yum", + "name": "httpd", + "state": "latest", + }, + }, + id="0", + ), + pytest.param( + { + "name": "Attempt and graceful roll back", + "block": [ + { + "name": "Install httpd and memcached", + "ansible.builtin.yum": ["httpd", "memcached"], + "state": "present", + }, + ], + }, + { + "name": "Attempt and graceful roll back", + "block": [ + { + "name": "Install httpd and memcached", + "ansible.builtin.yum": ["httpd", "memcached"], + "state": "present", + }, + ], + "action": { + "__ansible_module__": "block/always/rescue", + "__ansible_module_original__": "block/always/rescue", + }, + }, + id="1", + ), + ), +) +def test_normalize_task_v2(task: dict[str, Any], expected_form: dict[str, Any]) -> None: + """Check that it normalizes task and returns the expected form.""" + assert utils.normalize_task_v2(task) == expected_form + + +def test_extract_from_list() -> None: + """Check that tasks get extracted from blocks if present.""" + block = { + "block": [{"tasks": {"name": "hello", "command": "whoami"}}], + "test_none": None, + "test_string": "foo", + } + blocks = [block] + + test_list = utils.extract_from_list(blocks, ["block"]) + test_none = utils.extract_from_list(blocks, ["test_none"]) + + assert list(block["block"]) == test_list # type: ignore[arg-type] + assert not test_none + with pytest.raises(RuntimeError): + utils.extract_from_list(blocks, ["test_string"]) + + +def test_extract_from_list_recursive() -> None: + """Check that tasks get extracted from blocks if present.""" + block = { + "block": [{"block": [{"name": "hello", "command": "whoami"}]}], + } + blocks = [block] + + test_list = utils.extract_from_list(blocks, ["block"]) + assert list(block["block"]) == test_list + + test_list_recursive = utils.extract_from_list(blocks, ["block"], recursive=True) + assert block["block"] + block["block"][0]["block"] == test_list_recursive + + +@pytest.mark.parametrize( + ("template", "output"), + ( + pytest.param("{{ playbook_dir }}", "/a/b/c", id="simple"), + pytest.param( + "{{ 'hello' | doesnotexist }}", + "hello", # newer implementation ignores unknown filters + id="unknown_filter", + ), + pytest.param( + "{{ hello | to_json }}", + "{{ hello | to_json }}", + id="to_json_filter_on_undefined_variable", + ), + pytest.param( + "{{ hello | to_nice_yaml }}", + "{{ hello | to_nice_yaml }}", + id="to_nice_yaml_filter_on_undefined_variable", + ), + ), +) +def test_template(template: str, output: str) -> None: + """Verify that resolvable template vars and filters get rendered.""" + result = utils.template( + basedir=Path("/base/dir"), + value=template, + variables={"playbook_dir": "/a/b/c"}, + fail_on_error=False, + ) + assert result == output + + +def test_task_to_str_unicode() -> None: + """Ensure that extracting messages from tasks preserves Unicode.""" + task = {"fail": {"msg": "unicode é ô à"}} + result = utils.task_to_str(utils.normalize_task(task, "filename.yml")) + assert result == "fail msg=unicode é ô à" + + +def test_logger_debug(caplog: LogCaptureFixture) -> None: + """Test that the double verbosity arg causes logger to be DEBUG.""" + options = cli.get_config(["-vv"]) + initialize_logger(options.verbosity) + + expected_info = ( + "ansiblelint.__main__", + logging.DEBUG, + "Logging initialized to level 10", + ) + + assert expected_info in caplog.record_tuples + + +def test_cli_auto_detect(capfd: CaptureFixture[str]) -> None: + """Test that run without arguments it will detect and lint the entire repository.""" + cmd = [ + sys.executable, + "-m", + "ansiblelint", + "-x", + "schema", # exclude schema as our test file would fail it + "-v", + "-p", + "--nocolor", + ] + result = subprocess.run(cmd, check=False).returncode + + # We de expect to fail on our own repo due to test examples we have + assert result == RC.VIOLATIONS_FOUND + + out, err = capfd.readouterr() + + # An expected rule match from our examples + assert ( + "examples/playbooks/empty_playbook.yml:1:1: " + "syntax-check[empty-playbook]: Empty playbook, nothing to do" in out + ) + # assures that our ansible-lint config exclude was effective in excluding github files + assert "Identified: .github/" not in out + # assures that we can parse playbooks as playbooks + assert "Identified: test/test/always-run-success.yml" not in err + assert ( + "Executing syntax check on playbook examples/playbooks/mocked_dependency.yml" + in err + ) + + +def test_is_playbook() -> None: + """Verify that we can detect a playbook as a playbook.""" + assert utils.is_playbook("examples/playbooks/always-run-success.yml") + + +@pytest.mark.parametrize( + "exclude", + (pytest.param("foo", id="1"), pytest.param("foo/", id="2")), +) +def test_auto_detect_exclude(tmp_path: Path, exclude: str) -> None: + """Verify that exclude option can be used to narrow down detection.""" + with cwd(tmp_path): + subprocess.check_output( + "git init", + stderr=subprocess.STDOUT, + text=True, + shell=True, + cwd=tmp_path, + ) + (tmp_path / "foo").mkdir() + (tmp_path / "bar").mkdir() + (tmp_path / "foo" / "playbook.yml").touch() + (tmp_path / "bar" / "playbook.yml").touch() + + options = cli.get_config(["--exclude", exclude]) + options.cwd = tmp_path + result = utils.get_lintables(options) + assert result == [Lintable("bar/playbook.yml", kind="playbook")] + + # now we also test with .gitignore exclude approach + (tmp_path / ".gitignore").write_text(f".gitignore\n{exclude}\n") + options = cli.get_config([]) + options.cwd = tmp_path + result = utils.get_lintables(options) + assert result == [Lintable("bar/playbook.yml", kind="playbook")] + + +_DEFAULT_RULEDIRS = [constants.DEFAULT_RULESDIR] +_CUSTOM_RULESDIR = Path(__file__).parent / "custom_rules" +_CUSTOM_RULEDIRS = [ + _CUSTOM_RULESDIR / "example_inc", + _CUSTOM_RULESDIR / "example_com", +] + + +@pytest.mark.parametrize( + ("user_ruledirs", "use_default", "expected"), + ( + ([], True, _DEFAULT_RULEDIRS), + ([], False, _DEFAULT_RULEDIRS), + (_CUSTOM_RULEDIRS, True, _CUSTOM_RULEDIRS + _DEFAULT_RULEDIRS), + (_CUSTOM_RULEDIRS, False, _CUSTOM_RULEDIRS), + ), +) +def test_get_rules_dirs( + user_ruledirs: list[Path], + use_default: bool, + expected: list[Path], +) -> None: + """Test it returns expected dir lists.""" + assert get_rules_dirs(user_ruledirs, use_default=use_default) == expected + + +@pytest.mark.parametrize( + ("user_ruledirs", "use_default", "expected"), + ( + ([], True, sorted(_CUSTOM_RULEDIRS) + _DEFAULT_RULEDIRS), + ([], False, sorted(_CUSTOM_RULEDIRS) + _DEFAULT_RULEDIRS), + ( + _CUSTOM_RULEDIRS, + True, + _CUSTOM_RULEDIRS + sorted(_CUSTOM_RULEDIRS) + _DEFAULT_RULEDIRS, + ), + (_CUSTOM_RULEDIRS, False, _CUSTOM_RULEDIRS), + ), +) +def test_get_rules_dirs_with_custom_rules( + user_ruledirs: list[Path], + use_default: bool, + expected: list[Path], + monkeypatch: MonkeyPatch, +) -> None: + """Test it returns expected dir lists when custom rules exist.""" + monkeypatch.setenv(constants.CUSTOM_RULESDIR_ENVVAR, str(_CUSTOM_RULESDIR)) + assert get_rules_dirs(user_ruledirs, use_default=use_default) == expected + + +def test_find_children(default_rules_collection: RulesCollection) -> None: + """Verify correct function of find_children().""" + Runner( + rules=default_rules_collection, + ).find_children(Lintable("examples/playbooks/find_children.yml")) + + +def test_find_children_in_task(default_rules_collection: RulesCollection) -> None: + """Verify correct function of find_children() in tasks.""" + Runner( + Lintable("examples/playbooks/tasks/bug-2875.yml"), + rules=default_rules_collection, + ).run() + + +@pytest.mark.parametrize( + ("file", "names", "positions"), + ( + pytest.param( + "examples/playbooks/task_in_list-0.yml", + ["A", "B", "C", "D", "E", "F", "G"], + [ + ".[0].tasks[0]", + ".[0].tasks[1]", + ".[0].pre_tasks[0]", + ".[0].post_tasks[0]", + ".[0].post_tasks[0].block[0]", + ".[0].post_tasks[0].rescue[0]", + ".[0].post_tasks[0].always[0]", + ], + id="0", + ), + ), +) +def test_task_in_list(file: str, names: list[str], positions: list[str]) -> None: + """Check that tasks get extracted from blocks if present.""" + lintable = Lintable(file) + assert lintable.kind + tasks = list( + utils.task_in_list(data=lintable.data, file=lintable, kind=lintable.kind), + ) + assert len(tasks) == len(names) + for index, task in enumerate(tasks): + assert task.name == names[index] + assert task.position == positions[index] |