From 8a754e0858d922e955e71b253c139e071ecec432 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 18:04:21 +0200 Subject: Adding upstream version 2.14.3. Signed-off-by: Daniel Baumann --- test/units/playbook/__init__.py | 0 test/units/playbook/role/__init__.py | 0 test/units/playbook/role/test_include_role.py | 251 +++++++++++ test/units/playbook/role/test_role.py | 423 ++++++++++++++++++ test/units/playbook/test_attribute.py | 57 +++ test/units/playbook/test_base.py | 615 ++++++++++++++++++++++++++ test/units/playbook/test_block.py | 82 ++++ test/units/playbook/test_collectionsearch.py | 78 ++++ test/units/playbook/test_conditional.py | 212 +++++++++ test/units/playbook/test_helpers.py | 373 ++++++++++++++++ test/units/playbook/test_included_file.py | 332 ++++++++++++++ test/units/playbook/test_play.py | 291 ++++++++++++ test/units/playbook/test_play_context.py | 94 ++++ test/units/playbook/test_playbook.py | 61 +++ test/units/playbook/test_taggable.py | 105 +++++ test/units/playbook/test_task.py | 114 +++++ 16 files changed, 3088 insertions(+) create mode 100644 test/units/playbook/__init__.py create mode 100644 test/units/playbook/role/__init__.py create mode 100644 test/units/playbook/role/test_include_role.py create mode 100644 test/units/playbook/role/test_role.py create mode 100644 test/units/playbook/test_attribute.py create mode 100644 test/units/playbook/test_base.py create mode 100644 test/units/playbook/test_block.py create mode 100644 test/units/playbook/test_collectionsearch.py create mode 100644 test/units/playbook/test_conditional.py create mode 100644 test/units/playbook/test_helpers.py create mode 100644 test/units/playbook/test_included_file.py create mode 100644 test/units/playbook/test_play.py create mode 100644 test/units/playbook/test_play_context.py create mode 100644 test/units/playbook/test_playbook.py create mode 100644 test/units/playbook/test_taggable.py create mode 100644 test/units/playbook/test_task.py (limited to 'test/units/playbook') diff --git a/test/units/playbook/__init__.py b/test/units/playbook/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/units/playbook/role/__init__.py b/test/units/playbook/role/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/units/playbook/role/test_include_role.py b/test/units/playbook/role/test_include_role.py new file mode 100644 index 0000000..5e7625b --- /dev/null +++ b/test/units/playbook/role/test_include_role.py @@ -0,0 +1,251 @@ +# (c) 2016, Daniel Miranda +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from unittest.mock import patch + +from ansible.playbook import Play +from ansible.playbook.role_include import IncludeRole +from ansible.playbook.task import Task +from ansible.vars.manager import VariableManager + +from units.mock.loader import DictDataLoader +from units.mock.path import mock_unfrackpath_noop + + +class TestIncludeRole(unittest.TestCase): + + def setUp(self): + + self.loader = DictDataLoader({ + '/etc/ansible/roles/l1/tasks/main.yml': """ + - shell: echo 'hello world from l1' + - include_role: name=l2 + """, + '/etc/ansible/roles/l1/tasks/alt.yml': """ + - shell: echo 'hello world from l1 alt' + - include_role: name=l2 tasks_from=alt defaults_from=alt + """, + '/etc/ansible/roles/l1/defaults/main.yml': """ + test_variable: l1-main + l1_variable: l1-main + """, + '/etc/ansible/roles/l1/defaults/alt.yml': """ + test_variable: l1-alt + l1_variable: l1-alt + """, + '/etc/ansible/roles/l2/tasks/main.yml': """ + - shell: echo 'hello world from l2' + - include_role: name=l3 + """, + '/etc/ansible/roles/l2/tasks/alt.yml': """ + - shell: echo 'hello world from l2 alt' + - include_role: name=l3 tasks_from=alt defaults_from=alt + """, + '/etc/ansible/roles/l2/defaults/main.yml': """ + test_variable: l2-main + l2_variable: l2-main + """, + '/etc/ansible/roles/l2/defaults/alt.yml': """ + test_variable: l2-alt + l2_variable: l2-alt + """, + '/etc/ansible/roles/l3/tasks/main.yml': """ + - shell: echo 'hello world from l3' + """, + '/etc/ansible/roles/l3/tasks/alt.yml': """ + - shell: echo 'hello world from l3 alt' + """, + '/etc/ansible/roles/l3/defaults/main.yml': """ + test_variable: l3-main + l3_variable: l3-main + """, + '/etc/ansible/roles/l3/defaults/alt.yml': """ + test_variable: l3-alt + l3_variable: l3-alt + """ + }) + + self.var_manager = VariableManager(loader=self.loader) + + def tearDown(self): + pass + + def flatten_tasks(self, tasks): + for task in tasks: + if isinstance(task, IncludeRole): + blocks, handlers = task.get_block_list(loader=self.loader) + for block in blocks: + for t in self.flatten_tasks(block.block): + yield t + elif isinstance(task, Task): + yield task + else: + for t in self.flatten_tasks(task.block): + yield t + + def get_tasks_vars(self, play, tasks): + for task in self.flatten_tasks(tasks): + if task.implicit: + # skip meta: role_complete + continue + role = task._role + if not role: + continue + + yield (role.get_name(), + self.var_manager.get_vars(play=play, task=task)) + + @patch('ansible.playbook.role.definition.unfrackpath', + mock_unfrackpath_noop) + def test_simple(self): + + """Test one-level include with default tasks and variables""" + + play = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[ + {'include_role': 'name=l3'} + ] + ), loader=self.loader, variable_manager=self.var_manager) + + tasks = play.compile() + tested = False + for role, task_vars in self.get_tasks_vars(play, tasks): + tested = True + self.assertEqual(task_vars.get('l3_variable'), 'l3-main') + self.assertEqual(task_vars.get('test_variable'), 'l3-main') + self.assertTrue(tested) + + @patch('ansible.playbook.role.definition.unfrackpath', + mock_unfrackpath_noop) + def test_simple_alt_files(self): + + """Test one-level include with alternative tasks and variables""" + + play = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[{'include_role': 'name=l3 tasks_from=alt defaults_from=alt'}]), + loader=self.loader, variable_manager=self.var_manager) + + tasks = play.compile() + tested = False + for role, task_vars in self.get_tasks_vars(play, tasks): + tested = True + self.assertEqual(task_vars.get('l3_variable'), 'l3-alt') + self.assertEqual(task_vars.get('test_variable'), 'l3-alt') + self.assertTrue(tested) + + @patch('ansible.playbook.role.definition.unfrackpath', + mock_unfrackpath_noop) + def test_nested(self): + + """ + Test nested includes with default tasks and variables. + + Variables from outer roles should be inherited, but overridden in inner + roles. + """ + + play = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[ + {'include_role': 'name=l1'} + ] + ), loader=self.loader, variable_manager=self.var_manager) + + tasks = play.compile() + expected_roles = ['l1', 'l2', 'l3'] + for role, task_vars in self.get_tasks_vars(play, tasks): + expected_roles.remove(role) + # Outer-most role must not have variables from inner roles yet + if role == 'l1': + self.assertEqual(task_vars.get('l1_variable'), 'l1-main') + self.assertEqual(task_vars.get('l2_variable'), None) + self.assertEqual(task_vars.get('l3_variable'), None) + self.assertEqual(task_vars.get('test_variable'), 'l1-main') + # Middle role must have variables from outer role, but not inner + elif role == 'l2': + self.assertEqual(task_vars.get('l1_variable'), 'l1-main') + self.assertEqual(task_vars.get('l2_variable'), 'l2-main') + self.assertEqual(task_vars.get('l3_variable'), None) + self.assertEqual(task_vars.get('test_variable'), 'l2-main') + # Inner role must have variables from both outer roles + elif role == 'l3': + self.assertEqual(task_vars.get('l1_variable'), 'l1-main') + self.assertEqual(task_vars.get('l2_variable'), 'l2-main') + self.assertEqual(task_vars.get('l3_variable'), 'l3-main') + self.assertEqual(task_vars.get('test_variable'), 'l3-main') + else: + self.fail() + self.assertFalse(expected_roles) + + @patch('ansible.playbook.role.definition.unfrackpath', + mock_unfrackpath_noop) + def test_nested_alt_files(self): + + """ + Test nested includes with alternative tasks and variables. + + Variables from outer roles should be inherited, but overridden in inner + roles. + """ + + play = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[ + {'include_role': 'name=l1 tasks_from=alt defaults_from=alt'} + ] + ), loader=self.loader, variable_manager=self.var_manager) + + tasks = play.compile() + expected_roles = ['l1', 'l2', 'l3'] + for role, task_vars in self.get_tasks_vars(play, tasks): + expected_roles.remove(role) + # Outer-most role must not have variables from inner roles yet + if role == 'l1': + self.assertEqual(task_vars.get('l1_variable'), 'l1-alt') + self.assertEqual(task_vars.get('l2_variable'), None) + self.assertEqual(task_vars.get('l3_variable'), None) + self.assertEqual(task_vars.get('test_variable'), 'l1-alt') + # Middle role must have variables from outer role, but not inner + elif role == 'l2': + self.assertEqual(task_vars.get('l1_variable'), 'l1-alt') + self.assertEqual(task_vars.get('l2_variable'), 'l2-alt') + self.assertEqual(task_vars.get('l3_variable'), None) + self.assertEqual(task_vars.get('test_variable'), 'l2-alt') + # Inner role must have variables from both outer roles + elif role == 'l3': + self.assertEqual(task_vars.get('l1_variable'), 'l1-alt') + self.assertEqual(task_vars.get('l2_variable'), 'l2-alt') + self.assertEqual(task_vars.get('l3_variable'), 'l3-alt') + self.assertEqual(task_vars.get('test_variable'), 'l3-alt') + else: + self.fail() + self.assertFalse(expected_roles) diff --git a/test/units/playbook/role/test_role.py b/test/units/playbook/role/test_role.py new file mode 100644 index 0000000..5d47631 --- /dev/null +++ b/test/units/playbook/role/test_role.py @@ -0,0 +1,423 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from collections.abc import Container + +from units.compat import unittest +from unittest.mock import patch, MagicMock + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.playbook.block import Block + +from units.mock.loader import DictDataLoader +from units.mock.path import mock_unfrackpath_noop + +from ansible.playbook.role import Role +from ansible.playbook.role.include import RoleInclude +from ansible.playbook.role import hash_params + + +class TestHashParams(unittest.TestCase): + def test(self): + params = {'foo': 'bar'} + res = hash_params(params) + self._assert_set(res) + self._assert_hashable(res) + + def _assert_hashable(self, res): + a_dict = {} + try: + a_dict[res] = res + except TypeError as e: + self.fail('%s is not hashable: %s' % (res, e)) + + def _assert_set(self, res): + self.assertIsInstance(res, frozenset) + + def test_dict_tuple(self): + params = {'foo': (1, 'bar',)} + res = hash_params(params) + self._assert_set(res) + + def test_tuple(self): + params = (1, None, 'foo') + res = hash_params(params) + self._assert_hashable(res) + + def test_tuple_dict(self): + params = ({'foo': 'bar'}, 37) + res = hash_params(params) + self._assert_hashable(res) + + def test_list(self): + params = ['foo', 'bar', 1, 37, None] + res = hash_params(params) + self._assert_set(res) + self._assert_hashable(res) + + def test_dict_with_list_value(self): + params = {'foo': [1, 4, 'bar']} + res = hash_params(params) + self._assert_set(res) + self._assert_hashable(res) + + def test_empty_set(self): + params = set([]) + res = hash_params(params) + self._assert_hashable(res) + self._assert_set(res) + + def test_generator(self): + def my_generator(): + for i in ['a', 1, None, {}]: + yield i + + params = my_generator() + res = hash_params(params) + self._assert_hashable(res) + + def test_container_but_not_iterable(self): + # This is a Container that is not iterable, which is unlikely but... + class MyContainer(Container): + def __init__(self, some_thing): + self.data = [] + self.data.append(some_thing) + + def __contains__(self, item): + return item in self.data + + def __hash__(self): + return hash(self.data) + + def __len__(self): + return len(self.data) + + def __call__(self): + return False + + foo = MyContainer('foo bar') + params = foo + + self.assertRaises(TypeError, hash_params, params) + + def test_param_dict_dupe_values(self): + params1 = {'foo': False} + params2 = {'bar': False} + + res1 = hash_params(params1) + res2 = hash_params(params2) + + hash1 = hash(res1) + hash2 = hash(res2) + self.assertNotEqual(res1, res2) + self.assertNotEqual(hash1, hash2) + + def test_param_dupe(self): + params1 = { + # 'from_files': {}, + 'tags': [], + u'testvalue': False, + u'testvalue2': True, + # 'when': [] + } + params2 = { + # 'from_files': {}, + 'tags': [], + u'testvalue': True, + u'testvalue2': False, + # 'when': [] + } + res1 = hash_params(params1) + res2 = hash_params(params2) + + self.assertNotEqual(hash(res1), hash(res2)) + self.assertNotEqual(res1, res2) + + foo = {} + foo[res1] = 'params1' + foo[res2] = 'params2' + + self.assertEqual(len(foo), 2) + + del foo[res2] + self.assertEqual(len(foo), 1) + + for key in foo: + self.assertTrue(key in foo) + self.assertIn(key, foo) + + +class TestRole(unittest.TestCase): + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_tasks(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_tasks/tasks/main.yml": """ + - shell: echo 'hello world' + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_tasks', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(str(r), 'foo_tasks') + self.assertEqual(len(r._task_blocks), 1) + assert isinstance(r._task_blocks[0], Block) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_tasks_dir_vs_file(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_tasks/tasks/custom_main/foo.yml": """ + - command: bar + """, + "/etc/ansible/roles/foo_tasks/tasks/custom_main.yml": """ + - command: baz + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_tasks', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play, from_files=dict(tasks='custom_main')) + + self.assertEqual(r._task_blocks[0]._ds[0]['command'], 'baz') + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_handlers(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_handlers/handlers/main.yml": """ + - name: test handler + shell: echo 'hello world' + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_handlers', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(len(r._handler_blocks), 1) + assert isinstance(r._handler_blocks[0], Block) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_vars(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_vars/defaults/main.yml": """ + foo: bar + """, + "/etc/ansible/roles/foo_vars/vars/main.yml": """ + foo: bam + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_vars', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(r._default_vars, dict(foo='bar')) + self.assertEqual(r._role_vars, dict(foo='bam')) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_vars_dirs(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_vars/defaults/main/foo.yml": """ + foo: bar + """, + "/etc/ansible/roles/foo_vars/vars/main/bar.yml": """ + foo: bam + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_vars', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(r._default_vars, dict(foo='bar')) + self.assertEqual(r._role_vars, dict(foo='bam')) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_vars_nested_dirs(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_vars/defaults/main/foo/bar.yml": """ + foo: bar + """, + "/etc/ansible/roles/foo_vars/vars/main/bar/foo.yml": """ + foo: bam + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_vars', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(r._default_vars, dict(foo='bar')) + self.assertEqual(r._role_vars, dict(foo='bam')) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_vars_nested_dirs_combined(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_vars/defaults/main/foo/bar.yml": """ + foo: bar + a: 1 + """, + "/etc/ansible/roles/foo_vars/defaults/main/bar/foo.yml": """ + foo: bam + b: 2 + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_vars', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(r._default_vars, dict(foo='bar', a=1, b=2)) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_vars_dir_vs_file(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_vars/vars/main/foo.yml": """ + foo: bar + """, + "/etc/ansible/roles/foo_vars/vars/main.yml": """ + foo: bam + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_vars', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(r._role_vars, dict(foo='bam')) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_with_metadata(self): + + fake_loader = DictDataLoader({ + '/etc/ansible/roles/foo_metadata/meta/main.yml': """ + allow_duplicates: true + dependencies: + - bar_metadata + galaxy_info: + a: 1 + b: 2 + c: 3 + """, + '/etc/ansible/roles/bar_metadata/meta/main.yml': """ + dependencies: + - baz_metadata + """, + '/etc/ansible/roles/baz_metadata/meta/main.yml': """ + dependencies: + - bam_metadata + """, + '/etc/ansible/roles/bam_metadata/meta/main.yml': """ + dependencies: [] + """, + '/etc/ansible/roles/bad1_metadata/meta/main.yml': """ + 1 + """, + '/etc/ansible/roles/bad2_metadata/meta/main.yml': """ + foo: bar + """, + '/etc/ansible/roles/recursive1_metadata/meta/main.yml': """ + dependencies: ['recursive2_metadata'] + """, + '/etc/ansible/roles/recursive2_metadata/meta/main.yml': """ + dependencies: ['recursive1_metadata'] + """, + }) + + mock_play = MagicMock() + mock_play.collections = None + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load('foo_metadata', play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + role_deps = r.get_direct_dependencies() + + self.assertEqual(len(role_deps), 1) + self.assertEqual(type(role_deps[0]), Role) + self.assertEqual(len(role_deps[0].get_parents()), 1) + self.assertEqual(role_deps[0].get_parents()[0], r) + self.assertEqual(r._metadata.allow_duplicates, True) + self.assertEqual(r._metadata.galaxy_info, dict(a=1, b=2, c=3)) + + all_deps = r.get_all_dependencies() + self.assertEqual(len(all_deps), 3) + self.assertEqual(all_deps[0].get_name(), 'bam_metadata') + self.assertEqual(all_deps[1].get_name(), 'baz_metadata') + self.assertEqual(all_deps[2].get_name(), 'bar_metadata') + + i = RoleInclude.load('bad1_metadata', play=mock_play, loader=fake_loader) + self.assertRaises(AnsibleParserError, Role.load, i, play=mock_play) + + i = RoleInclude.load('bad2_metadata', play=mock_play, loader=fake_loader) + self.assertRaises(AnsibleParserError, Role.load, i, play=mock_play) + + # TODO: re-enable this test once Ansible has proper role dep cycle detection + # that doesn't rely on stack overflows being recoverable (as they aren't in Py3.7+) + # see https://github.com/ansible/ansible/issues/61527 + # i = RoleInclude.load('recursive1_metadata', play=mock_play, loader=fake_loader) + # self.assertRaises(AnsibleError, Role.load, i, play=mock_play) + + @patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop) + def test_load_role_complex(self): + + # FIXME: add tests for the more complex uses of + # params and tags/when statements + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo_complex/tasks/main.yml": """ + - shell: echo 'hello world' + """, + }) + + mock_play = MagicMock() + mock_play.ROLE_CACHE = {} + + i = RoleInclude.load(dict(role='foo_complex'), play=mock_play, loader=fake_loader) + r = Role.load(i, play=mock_play) + + self.assertEqual(r.get_name(), "foo_complex") diff --git a/test/units/playbook/test_attribute.py b/test/units/playbook/test_attribute.py new file mode 100644 index 0000000..bdb37c1 --- /dev/null +++ b/test/units/playbook/test_attribute.py @@ -0,0 +1,57 @@ +# (c) 2015, Marius Gedminas +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.playbook.attribute import Attribute + + +class TestAttribute(unittest.TestCase): + + def setUp(self): + self.one = Attribute(priority=100) + self.two = Attribute(priority=0) + + def test_eq(self): + self.assertTrue(self.one == self.one) + self.assertFalse(self.one == self.two) + + def test_ne(self): + self.assertFalse(self.one != self.one) + self.assertTrue(self.one != self.two) + + def test_lt(self): + self.assertFalse(self.one < self.one) + self.assertTrue(self.one < self.two) + self.assertFalse(self.two < self.one) + + def test_gt(self): + self.assertFalse(self.one > self.one) + self.assertFalse(self.one > self.two) + self.assertTrue(self.two > self.one) + + def test_le(self): + self.assertTrue(self.one <= self.one) + self.assertTrue(self.one <= self.two) + self.assertFalse(self.two <= self.one) + + def test_ge(self): + self.assertTrue(self.one >= self.one) + self.assertFalse(self.one >= self.two) + self.assertTrue(self.two >= self.one) diff --git a/test/units/playbook/test_base.py b/test/units/playbook/test_base.py new file mode 100644 index 0000000..d5810e7 --- /dev/null +++ b/test/units/playbook/test_base.py @@ -0,0 +1,615 @@ +# (c) 2016, Adrian Likins +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest + +from ansible.errors import AnsibleParserError +from ansible.module_utils.six import string_types +from ansible.playbook.attribute import FieldAttribute, NonInheritableFieldAttribute +from ansible.template import Templar +from ansible.playbook import base +from ansible.utils.unsafe_proxy import AnsibleUnsafeBytes, AnsibleUnsafeText +from ansible.utils.sentinel import Sentinel + +from units.mock.loader import DictDataLoader + + +class TestBase(unittest.TestCase): + ClassUnderTest = base.Base + + def setUp(self): + self.assorted_vars = {'var_2_key': 'var_2_value', + 'var_1_key': 'var_1_value', + 'a_list': ['a_list_1', 'a_list_2'], + 'a_dict': {'a_dict_key': 'a_dict_value'}, + 'a_set': set(['set_1', 'set_2']), + 'a_int': 42, + 'a_float': 37.371, + 'a_bool': True, + 'a_none': None, + } + self.b = self.ClassUnderTest() + + def _base_validate(self, ds): + bsc = self.ClassUnderTest() + parent = ExampleParentBaseSubClass() + bsc._parent = parent + bsc._dep_chain = [parent] + parent._dep_chain = None + bsc.load_data(ds) + fake_loader = DictDataLoader({}) + templar = Templar(loader=fake_loader) + bsc.post_validate(templar) + return bsc + + def test(self): + self.assertIsInstance(self.b, base.Base) + self.assertIsInstance(self.b, self.ClassUnderTest) + + # dump me doesnt return anything or change anything so not much to assert + def test_dump_me_empty(self): + self.b.dump_me() + + def test_dump_me(self): + ds = {'environment': [], + 'vars': {'var_2_key': 'var_2_value', + 'var_1_key': 'var_1_value'}} + b = self._base_validate(ds) + b.dump_me() + + def _assert_copy(self, orig, copy): + self.assertIsInstance(copy, self.ClassUnderTest) + self.assertIsInstance(copy, base.Base) + self.assertEqual(len(orig.fattributes), len(copy.fattributes)) + + sentinel = 'Empty DS' + self.assertEqual(getattr(orig, '_ds', sentinel), getattr(copy, '_ds', sentinel)) + + def test_copy_empty(self): + copy = self.b.copy() + self._assert_copy(self.b, copy) + + def test_copy_with_vars(self): + ds = {'vars': self.assorted_vars} + b = self._base_validate(ds) + + copy = b.copy() + self._assert_copy(b, copy) + + def test_serialize(self): + ds = {} + ds = {'environment': [], + 'vars': self.assorted_vars + } + b = self._base_validate(ds) + ret = b.serialize() + self.assertIsInstance(ret, dict) + + def test_deserialize(self): + data = {} + + d = self.ClassUnderTest() + d.deserialize(data) + self.assertIn('_run_once', d.__dict__) + self.assertIn('_check_mode', d.__dict__) + + data = {'no_log': False, + 'remote_user': None, + 'vars': self.assorted_vars, + 'environment': [], + 'run_once': False, + 'connection': None, + 'ignore_errors': False, + 'port': 22, + 'a_sentinel_with_an_unlikely_name': ['sure, a list']} + + d = self.ClassUnderTest() + d.deserialize(data) + self.assertNotIn('_a_sentinel_with_an_unlikely_name', d.__dict__) + self.assertIn('_run_once', d.__dict__) + self.assertIn('_check_mode', d.__dict__) + + def test_serialize_then_deserialize(self): + ds = {'environment': [], + 'vars': self.assorted_vars} + b = self._base_validate(ds) + copy = b.copy() + ret = b.serialize() + b.deserialize(ret) + c = self.ClassUnderTest() + c.deserialize(ret) + # TODO: not a great test, but coverage... + self.maxDiff = None + self.assertDictEqual(b.serialize(), copy.serialize()) + self.assertDictEqual(c.serialize(), copy.serialize()) + + def test_post_validate_empty(self): + fake_loader = DictDataLoader({}) + templar = Templar(loader=fake_loader) + ret = self.b.post_validate(templar) + self.assertIsNone(ret) + + def test_get_ds_none(self): + ds = self.b.get_ds() + self.assertIsNone(ds) + + def test_load_data_ds_is_none(self): + self.assertRaises(AssertionError, self.b.load_data, None) + + def test_load_data_invalid_attr(self): + ds = {'not_a_valid_attr': [], + 'other': None} + + self.assertRaises(AnsibleParserError, self.b.load_data, ds) + + def test_load_data_invalid_attr_type(self): + ds = {'environment': True} + + # environment is supposed to be a list. This + # seems like it shouldn't work? + ret = self.b.load_data(ds) + self.assertEqual(True, ret._environment) + + def test_post_validate(self): + ds = {'environment': [], + 'port': 443} + b = self._base_validate(ds) + self.assertEqual(b.port, 443) + self.assertEqual(b.environment, []) + + def test_post_validate_invalid_attr_types(self): + ds = {'environment': [], + 'port': 'some_port'} + b = self._base_validate(ds) + self.assertEqual(b.port, 'some_port') + + def test_squash(self): + data = self.b.serialize() + self.b.squash() + squashed_data = self.b.serialize() + # TODO: assert something + self.assertFalse(data['squashed']) + self.assertTrue(squashed_data['squashed']) + + def test_vars(self): + # vars as a dict. + ds = {'environment': [], + 'vars': {'var_2_key': 'var_2_value', + 'var_1_key': 'var_1_value'}} + b = self._base_validate(ds) + self.assertEqual(b.vars['var_1_key'], 'var_1_value') + + def test_vars_list_of_dicts(self): + ds = {'environment': [], + 'vars': [{'var_2_key': 'var_2_value'}, + {'var_1_key': 'var_1_value'}] + } + b = self._base_validate(ds) + self.assertEqual(b.vars['var_1_key'], 'var_1_value') + + def test_vars_not_dict_or_list(self): + ds = {'environment': [], + 'vars': 'I am a string, not a dict or a list of dicts'} + self.assertRaises(AnsibleParserError, self.b.load_data, ds) + + def test_vars_not_valid_identifier(self): + ds = {'environment': [], + 'vars': [{'var_2_key': 'var_2_value'}, + {'1an-invalid identifer': 'var_1_value'}] + } + self.assertRaises(AnsibleParserError, self.b.load_data, ds) + + def test_vars_is_list_but_not_of_dicts(self): + ds = {'environment': [], + 'vars': ['foo', 'bar', 'this is a string not a dict'] + } + self.assertRaises(AnsibleParserError, self.b.load_data, ds) + + def test_vars_is_none(self): + # If vars is None, we should get a empty dict back + ds = {'environment': [], + 'vars': None + } + b = self._base_validate(ds) + self.assertEqual(b.vars, {}) + + def test_validate_empty(self): + self.b.validate() + self.assertTrue(self.b._validated) + + def test_getters(self): + # not sure why these exist, but here are tests anyway + loader = self.b.get_loader() + variable_manager = self.b.get_variable_manager() + self.assertEqual(loader, self.b._loader) + self.assertEqual(variable_manager, self.b._variable_manager) + + +class TestExtendValue(unittest.TestCase): + # _extend_value could be a module or staticmethod but since its + # not, the test is here. + def test_extend_value_list_newlist(self): + b = base.Base() + value_list = ['first', 'second'] + new_value_list = ['new_first', 'new_second'] + ret = b._extend_value(value_list, new_value_list) + self.assertEqual(value_list + new_value_list, ret) + + def test_extend_value_list_newlist_prepend(self): + b = base.Base() + value_list = ['first', 'second'] + new_value_list = ['new_first', 'new_second'] + ret_prepend = b._extend_value(value_list, new_value_list, prepend=True) + self.assertEqual(new_value_list + value_list, ret_prepend) + + def test_extend_value_newlist_list(self): + b = base.Base() + value_list = ['first', 'second'] + new_value_list = ['new_first', 'new_second'] + ret = b._extend_value(new_value_list, value_list) + self.assertEqual(new_value_list + value_list, ret) + + def test_extend_value_newlist_list_prepend(self): + b = base.Base() + value_list = ['first', 'second'] + new_value_list = ['new_first', 'new_second'] + ret = b._extend_value(new_value_list, value_list, prepend=True) + self.assertEqual(value_list + new_value_list, ret) + + def test_extend_value_string_newlist(self): + b = base.Base() + some_string = 'some string' + new_value_list = ['new_first', 'new_second'] + ret = b._extend_value(some_string, new_value_list) + self.assertEqual([some_string] + new_value_list, ret) + + def test_extend_value_string_newstring(self): + b = base.Base() + some_string = 'some string' + new_value_string = 'this is the new values' + ret = b._extend_value(some_string, new_value_string) + self.assertEqual([some_string, new_value_string], ret) + + def test_extend_value_list_newstring(self): + b = base.Base() + value_list = ['first', 'second'] + new_value_string = 'this is the new values' + ret = b._extend_value(value_list, new_value_string) + self.assertEqual(value_list + [new_value_string], ret) + + def test_extend_value_none_none(self): + b = base.Base() + ret = b._extend_value(None, None) + self.assertEqual(len(ret), 0) + self.assertFalse(ret) + + def test_extend_value_none_list(self): + b = base.Base() + ret = b._extend_value(None, ['foo']) + self.assertEqual(ret, ['foo']) + + +class ExampleException(Exception): + pass + + +# naming fails me... +class ExampleParentBaseSubClass(base.Base): + test_attr_parent_string = FieldAttribute(isa='string', default='A string attr for a class that may be a parent for testing') + + def __init__(self): + + super(ExampleParentBaseSubClass, self).__init__() + self._dep_chain = None + + def get_dep_chain(self): + return self._dep_chain + + +class ExampleSubClass(base.Base): + test_attr_blip = NonInheritableFieldAttribute(isa='string', default='example sub class test_attr_blip', + always_post_validate=True) + + def __init__(self): + super(ExampleSubClass, self).__init__() + + def get_dep_chain(self): + if self._parent: + return self._parent.get_dep_chain() + else: + return None + + +class BaseSubClass(base.Base): + name = FieldAttribute(isa='string', default='', always_post_validate=True) + test_attr_bool = FieldAttribute(isa='bool', always_post_validate=True) + test_attr_int = FieldAttribute(isa='int', always_post_validate=True) + test_attr_float = FieldAttribute(isa='float', default=3.14159, always_post_validate=True) + test_attr_list = FieldAttribute(isa='list', listof=string_types, always_post_validate=True) + test_attr_list_no_listof = FieldAttribute(isa='list', always_post_validate=True) + test_attr_list_required = FieldAttribute(isa='list', listof=string_types, required=True, + default=list, always_post_validate=True) + test_attr_string = FieldAttribute(isa='string', default='the_test_attr_string_default_value') + test_attr_string_required = FieldAttribute(isa='string', required=True, + default='the_test_attr_string_default_value') + test_attr_percent = FieldAttribute(isa='percent', always_post_validate=True) + test_attr_set = FieldAttribute(isa='set', default=set, always_post_validate=True) + test_attr_dict = FieldAttribute(isa='dict', default=lambda: {'a_key': 'a_value'}, always_post_validate=True) + test_attr_class = FieldAttribute(isa='class', class_type=ExampleSubClass) + test_attr_class_post_validate = FieldAttribute(isa='class', class_type=ExampleSubClass, + always_post_validate=True) + test_attr_unknown_isa = FieldAttribute(isa='not_a_real_isa', always_post_validate=True) + test_attr_example = FieldAttribute(isa='string', default='the_default', + always_post_validate=True) + test_attr_none = FieldAttribute(isa='string', always_post_validate=True) + test_attr_preprocess = FieldAttribute(isa='string', default='the default for preprocess') + test_attr_method = FieldAttribute(isa='string', default='some attr with a getter', + always_post_validate=True) + test_attr_method_missing = FieldAttribute(isa='string', default='some attr with a missing getter', + always_post_validate=True) + + def _get_attr_test_attr_method(self): + return 'foo bar' + + def _validate_test_attr_example(self, attr, name, value): + if not isinstance(value, str): + raise ExampleException('test_attr_example is not a string: %s type=%s' % (value, type(value))) + + def _post_validate_test_attr_example(self, attr, value, templar): + after_template_value = templar.template(value) + return after_template_value + + def _post_validate_test_attr_none(self, attr, value, templar): + return None + + +# terrible name, but it is a TestBase subclass for testing subclasses of Base +class TestBaseSubClass(TestBase): + ClassUnderTest = BaseSubClass + + def _base_validate(self, ds): + ds['test_attr_list_required'] = [] + return super(TestBaseSubClass, self)._base_validate(ds) + + def test_attr_bool(self): + ds = {'test_attr_bool': True} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_bool, True) + + def test_attr_int(self): + MOST_RANDOM_NUMBER = 37 + ds = {'test_attr_int': MOST_RANDOM_NUMBER} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_int, MOST_RANDOM_NUMBER) + + def test_attr_int_del(self): + MOST_RANDOM_NUMBER = 37 + ds = {'test_attr_int': MOST_RANDOM_NUMBER} + bsc = self._base_validate(ds) + del bsc.test_attr_int + self.assertNotIn('_test_attr_int', bsc.__dict__) + + def test_attr_float(self): + roughly_pi = 4.0 + ds = {'test_attr_float': roughly_pi} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_float, roughly_pi) + + def test_attr_percent(self): + percentage = '90%' + percentage_float = 90.0 + ds = {'test_attr_percent': percentage} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_percent, percentage_float) + + # This method works hard and gives it its all and everything it's got. It doesn't + # leave anything on the field. It deserves to pass. It has earned it. + def test_attr_percent_110_percent(self): + percentage = '110.11%' + percentage_float = 110.11 + ds = {'test_attr_percent': percentage} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_percent, percentage_float) + + # This method is just here for the paycheck. + def test_attr_percent_60_no_percent_sign(self): + percentage = '60' + percentage_float = 60.0 + ds = {'test_attr_percent': percentage} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_percent, percentage_float) + + def test_attr_set(self): + test_set = set(['first_string_in_set', 'second_string_in_set']) + ds = {'test_attr_set': test_set} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_set, test_set) + + def test_attr_set_string(self): + test_data = ['something', 'other'] + test_value = ','.join(test_data) + ds = {'test_attr_set': test_value} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_set, set(test_data)) + + def test_attr_set_not_string_or_list(self): + test_value = 37.1 + ds = {'test_attr_set': test_value} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_set, set([test_value])) + + def test_attr_dict(self): + test_dict = {'a_different_key': 'a_different_value'} + ds = {'test_attr_dict': test_dict} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_dict, test_dict) + + def test_attr_dict_string(self): + test_value = 'just_some_random_string' + ds = {'test_attr_dict': test_value} + self.assertRaisesRegex(AnsibleParserError, 'is not a dictionary', self._base_validate, ds) + + def test_attr_class(self): + esc = ExampleSubClass() + ds = {'test_attr_class': esc} + bsc = self._base_validate(ds) + self.assertIs(bsc.test_attr_class, esc) + + def test_attr_class_wrong_type(self): + not_a_esc = ExampleSubClass + ds = {'test_attr_class': not_a_esc} + bsc = self._base_validate(ds) + self.assertIs(bsc.test_attr_class, not_a_esc) + + def test_attr_class_post_validate(self): + esc = ExampleSubClass() + ds = {'test_attr_class_post_validate': esc} + bsc = self._base_validate(ds) + self.assertIs(bsc.test_attr_class_post_validate, esc) + + def test_attr_class_post_validate_class_not_instance(self): + not_a_esc = ExampleSubClass + ds = {'test_attr_class_post_validate': not_a_esc} + self.assertRaisesRegex(AnsibleParserError, "is not a valid.*got a instead", + self._base_validate, ds) + + def test_attr_class_post_validate_wrong_class(self): + not_a_esc = 37 + ds = {'test_attr_class_post_validate': not_a_esc} + self.assertRaisesRegex(AnsibleParserError, 'is not a valid.*got a.*int.*instead', + self._base_validate, ds) + + def test_attr_remote_user(self): + ds = {'remote_user': 'testuser'} + bsc = self._base_validate(ds) + # TODO: attemp to verify we called parent gettters etc + self.assertEqual(bsc.remote_user, 'testuser') + + def test_attr_example_undefined(self): + ds = {'test_attr_example': '{{ some_var_that_shouldnt_exist_to_test_omit }}'} + exc_regex_str = 'test_attr_example.*has an invalid value, which includes an undefined variable.*some_var_that_shouldnt*' + self.assertRaises(AnsibleParserError) + + def test_attr_name_undefined(self): + ds = {'name': '{{ some_var_that_shouldnt_exist_to_test_omit }}'} + bsc = self._base_validate(ds) + # the attribute 'name' is special cases in post_validate + self.assertEqual(bsc.name, '{{ some_var_that_shouldnt_exist_to_test_omit }}') + + def test_subclass_validate_method(self): + ds = {'test_attr_list': ['string_list_item_1', 'string_list_item_2'], + 'test_attr_example': 'the_test_attr_example_value_string'} + # Not throwing an exception here is the test + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_example, 'the_test_attr_example_value_string') + + def test_subclass_validate_method_invalid(self): + ds = {'test_attr_example': [None]} + self.assertRaises(ExampleException, self._base_validate, ds) + + def test_attr_none(self): + ds = {'test_attr_none': 'foo'} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_none, None) + + def test_attr_string(self): + the_string_value = "the new test_attr_string_value" + ds = {'test_attr_string': the_string_value} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_string, the_string_value) + + def test_attr_string_invalid_list(self): + ds = {'test_attr_string': ['The new test_attr_string', 'value, however in a list']} + self.assertRaises(AnsibleParserError, self._base_validate, ds) + + def test_attr_string_required(self): + the_string_value = "the new test_attr_string_required_value" + ds = {'test_attr_string_required': the_string_value} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_string_required, the_string_value) + + def test_attr_list_invalid(self): + ds = {'test_attr_list': {}} + self.assertRaises(AnsibleParserError, self._base_validate, ds) + + def test_attr_list(self): + string_list = ['foo', 'bar'] + ds = {'test_attr_list': string_list} + bsc = self._base_validate(ds) + self.assertEqual(string_list, bsc._test_attr_list) + + def test_attr_list_none(self): + ds = {'test_attr_list': None} + bsc = self._base_validate(ds) + self.assertEqual(None, bsc._test_attr_list) + + def test_attr_list_no_listof(self): + test_list = ['foo', 'bar', 123] + ds = {'test_attr_list_no_listof': test_list} + bsc = self._base_validate(ds) + self.assertEqual(test_list, bsc._test_attr_list_no_listof) + + def test_attr_list_required(self): + string_list = ['foo', 'bar'] + ds = {'test_attr_list_required': string_list} + bsc = self.ClassUnderTest() + bsc.load_data(ds) + fake_loader = DictDataLoader({}) + templar = Templar(loader=fake_loader) + bsc.post_validate(templar) + self.assertEqual(string_list, bsc._test_attr_list_required) + + def test_attr_list_required_empty_string(self): + string_list = [""] + ds = {'test_attr_list_required': string_list} + bsc = self.ClassUnderTest() + bsc.load_data(ds) + fake_loader = DictDataLoader({}) + templar = Templar(loader=fake_loader) + self.assertRaisesRegex(AnsibleParserError, 'cannot have empty values', + bsc.post_validate, templar) + + def test_attr_unknown(self): + a_list = ['some string'] + ds = {'test_attr_unknown_isa': a_list} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_unknown_isa, a_list) + + def test_attr_method(self): + ds = {'test_attr_method': 'value from the ds'} + bsc = self._base_validate(ds) + # The value returned by the subclasses _get_attr_test_attr_method + self.assertEqual(bsc.test_attr_method, 'foo bar') + + def test_attr_method_missing(self): + a_string = 'The value set from the ds' + ds = {'test_attr_method_missing': a_string} + bsc = self._base_validate(ds) + self.assertEqual(bsc.test_attr_method_missing, a_string) + + def test_get_validated_value_string_rewrap_unsafe(self): + attribute = FieldAttribute(isa='string') + value = AnsibleUnsafeText(u'bar') + templar = Templar(None) + bsc = self.ClassUnderTest() + result = bsc.get_validated_value('foo', attribute, value, templar) + self.assertIsInstance(result, AnsibleUnsafeText) + self.assertEqual(result, AnsibleUnsafeText(u'bar')) diff --git a/test/units/playbook/test_block.py b/test/units/playbook/test_block.py new file mode 100644 index 0000000..4847123 --- /dev/null +++ b/test/units/playbook/test_block.py @@ -0,0 +1,82 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.playbook.block import Block +from ansible.playbook.task import Task + + +class TestBlock(unittest.TestCase): + + def test_construct_empty_block(self): + b = Block() + + def test_construct_block_with_role(self): + pass + + def test_load_block_simple(self): + ds = dict( + block=[], + rescue=[], + always=[], + # otherwise=[], + ) + b = Block.load(ds) + self.assertEqual(b.block, []) + self.assertEqual(b.rescue, []) + self.assertEqual(b.always, []) + # not currently used + # self.assertEqual(b.otherwise, []) + + def test_load_block_with_tasks(self): + ds = dict( + block=[dict(action='block')], + rescue=[dict(action='rescue')], + always=[dict(action='always')], + # otherwise=[dict(action='otherwise')], + ) + b = Block.load(ds) + self.assertEqual(len(b.block), 1) + self.assertIsInstance(b.block[0], Task) + self.assertEqual(len(b.rescue), 1) + self.assertIsInstance(b.rescue[0], Task) + self.assertEqual(len(b.always), 1) + self.assertIsInstance(b.always[0], Task) + # not currently used + # self.assertEqual(len(b.otherwise), 1) + # self.assertIsInstance(b.otherwise[0], Task) + + def test_load_implicit_block(self): + ds = [dict(action='foo')] + b = Block.load(ds) + self.assertEqual(len(b.block), 1) + self.assertIsInstance(b.block[0], Task) + + def test_deserialize(self): + ds = dict( + block=[dict(action='block')], + rescue=[dict(action='rescue')], + always=[dict(action='always')], + ) + b = Block.load(ds) + data = dict(parent=ds, parent_type='Block') + b.deserialize(data) + self.assertIsInstance(b._parent, Block) diff --git a/test/units/playbook/test_collectionsearch.py b/test/units/playbook/test_collectionsearch.py new file mode 100644 index 0000000..be40d85 --- /dev/null +++ b/test/units/playbook/test_collectionsearch.py @@ -0,0 +1,78 @@ +# (c) 2020 Ansible Project +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.errors import AnsibleParserError +from ansible.playbook.play import Play +from ansible.playbook.task import Task +from ansible.playbook.block import Block +from ansible.playbook.collectionsearch import CollectionSearch + +import pytest + + +def test_collection_static_warning(capsys): + """Test that collection name is not templated. + + Also, make sure that users see the warning message for the referenced name. + """ + collection_name = "foo.{{bar}}" + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + connection='local', + collections=collection_name, + )) + assert collection_name in p.collections + std_out, std_err = capsys.readouterr() + assert '[WARNING]: "collections" is not templatable, but we found: %s' % collection_name in std_err + assert '' == std_out + + +def test_collection_invalid_data_play(): + """Test that collection as a dict at the play level fails with parser error""" + collection_name = {'name': 'foo'} + with pytest.raises(AnsibleParserError): + Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + connection='local', + collections=collection_name, + )) + + +def test_collection_invalid_data_task(): + """Test that collection as a dict at the task level fails with parser error""" + collection_name = {'name': 'foo'} + with pytest.raises(AnsibleParserError): + Task.load(dict( + name="test task", + collections=collection_name, + )) + + +def test_collection_invalid_data_block(): + """Test that collection as a dict at the block level fails with parser error""" + collection_name = {'name': 'foo'} + with pytest.raises(AnsibleParserError): + Block.load(dict( + block=[dict(name="test task", collections=collection_name)] + )) diff --git a/test/units/playbook/test_conditional.py b/test/units/playbook/test_conditional.py new file mode 100644 index 0000000..8231d16 --- /dev/null +++ b/test/units/playbook/test_conditional.py @@ -0,0 +1,212 @@ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from units.mock.loader import DictDataLoader +from unittest.mock import MagicMock + +from ansible.template import Templar +from ansible import errors + +from ansible.playbook import conditional + + +class TestConditional(unittest.TestCase): + def setUp(self): + self.loader = DictDataLoader({}) + self.cond = conditional.Conditional(loader=self.loader) + self.templar = Templar(loader=self.loader, variables={}) + + def _eval_con(self, when=None, variables=None): + when = when or [] + variables = variables or {} + self.cond.when = when + ret = self.cond.evaluate_conditional(self.templar, variables) + return ret + + def test_false(self): + when = [u"False"] + ret = self._eval_con(when, {}) + self.assertFalse(ret) + + def test_true(self): + when = [u"True"] + ret = self._eval_con(when, {}) + self.assertTrue(ret) + + def test_true_boolean(self): + self.cond.when = [True] + m = MagicMock() + ret = self.cond.evaluate_conditional(m, {}) + self.assertTrue(ret) + self.assertFalse(m.is_template.called) + + def test_false_boolean(self): + self.cond.when = [False] + m = MagicMock() + ret = self.cond.evaluate_conditional(m, {}) + self.assertFalse(ret) + self.assertFalse(m.is_template.called) + + def test_undefined(self): + when = [u"{{ some_undefined_thing }}"] + self.assertRaisesRegex(errors.AnsibleError, "The conditional check '{{ some_undefined_thing }}' failed", + self._eval_con, when, {}) + + def test_defined(self): + variables = {'some_defined_thing': True} + when = [u"{{ some_defined_thing }}"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_dict_defined_values(self): + variables = {'dict_value': 1, + 'some_defined_dict': {'key1': 'value1', + 'key2': '{{ dict_value }}'}} + + when = [u"some_defined_dict"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_dict_defined_values_is_defined(self): + variables = {'dict_value': 1, + 'some_defined_dict': {'key1': 'value1', + 'key2': '{{ dict_value }}'}} + + when = [u"some_defined_dict.key1 is defined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_dict_defined_multiple_values_is_defined(self): + variables = {'dict_value': 1, + 'some_defined_dict': {'key1': 'value1', + 'key2': '{{ dict_value }}'}} + + when = [u"some_defined_dict.key1 is defined", + u"some_defined_dict.key2 is not undefined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_nested_hostvars_undefined_values(self): + variables = {'dict_value': 1, + 'hostvars': {'host1': {'key1': 'value1', + 'key2': '{{ dict_value }}'}, + 'host2': '{{ dict_value }}', + 'host3': '{{ undefined_dict_value }}', + # no host4 + }, + 'some_dict': {'some_dict_key1': '{{ hostvars["host3"] }}'} + } + + when = [u"some_dict.some_dict_key1 == hostvars['host3']"] + # self._eval_con(when, variables) + self.assertRaisesRegex(errors.AnsibleError, + r"The conditional check 'some_dict.some_dict_key1 == hostvars\['host3'\]' failed", + # "The conditional check 'some_dict.some_dict_key1 == hostvars['host3']' failed", + # "The conditional check 'some_dict.some_dict_key1 == hostvars['host3']' failed.", + self._eval_con, + when, variables) + + def test_dict_undefined_values_bare(self): + variables = {'dict_value': 1, + 'some_defined_dict_with_undefined_values': {'key1': 'value1', + 'key2': '{{ dict_value }}', + 'key3': '{{ undefined_dict_value }}' + }} + + # raises an exception when a non-string conditional is passed to extract_defined_undefined() + when = [u"some_defined_dict_with_undefined_values"] + self.assertRaisesRegex(errors.AnsibleError, + "The conditional check 'some_defined_dict_with_undefined_values' failed.", + self._eval_con, + when, variables) + + def test_is_defined(self): + variables = {'some_defined_thing': True} + when = [u"some_defined_thing is defined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_is_undefined(self): + variables = {'some_defined_thing': True} + when = [u"some_defined_thing is undefined"] + ret = self._eval_con(when, variables) + self.assertFalse(ret) + + def test_is_undefined_and_defined(self): + variables = {'some_defined_thing': True} + when = [u"some_defined_thing is undefined", u"some_defined_thing is defined"] + ret = self._eval_con(when, variables) + self.assertFalse(ret) + + def test_is_undefined_and_defined_reversed(self): + variables = {'some_defined_thing': True} + when = [u"some_defined_thing is defined", u"some_defined_thing is undefined"] + ret = self._eval_con(when, variables) + self.assertFalse(ret) + + def test_is_not_undefined(self): + variables = {'some_defined_thing': True} + when = [u"some_defined_thing is not undefined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_is_not_defined(self): + variables = {'some_defined_thing': True} + when = [u"some_undefined_thing is not defined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_is_hostvars_quotes_is_defined(self): + variables = {'hostvars': {'some_host': {}}, + 'compare_targets_single': "hostvars['some_host']", + 'compare_targets_double': 'hostvars["some_host"]', + 'compare_targets': {'double': '{{ compare_targets_double }}', + 'single': "{{ compare_targets_single }}"}, + } + when = [u"hostvars['some_host'] is defined", + u'hostvars["some_host"] is defined', + u"{{ compare_targets.double }} is defined", + u"{{ compare_targets.single }} is defined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_is_hostvars_quotes_is_defined_but_is_not_defined(self): + variables = {'hostvars': {'some_host': {}}, + 'compare_targets_single': "hostvars['some_host']", + 'compare_targets_double': 'hostvars["some_host"]', + 'compare_targets': {'double': '{{ compare_targets_double }}', + 'single': "{{ compare_targets_single }}"}, + } + when = [u"hostvars['some_host'] is defined", + u'hostvars["some_host"] is defined', + u"{{ compare_targets.triple }} is defined", + u"{{ compare_targets.quadruple }} is defined"] + self.assertRaisesRegex(errors.AnsibleError, + "The conditional check '{{ compare_targets.triple }} is defined' failed", + self._eval_con, + when, variables) + + def test_is_hostvars_host_is_defined(self): + variables = {'hostvars': {'some_host': {}, }} + when = [u"hostvars['some_host'] is defined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_is_hostvars_host_undefined_is_defined(self): + variables = {'hostvars': {'some_host': {}, }} + when = [u"hostvars['some_undefined_host'] is defined"] + ret = self._eval_con(when, variables) + self.assertFalse(ret) + + def test_is_hostvars_host_undefined_is_undefined(self): + variables = {'hostvars': {'some_host': {}, }} + when = [u"hostvars['some_undefined_host'] is undefined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) + + def test_is_hostvars_host_undefined_is_not_defined(self): + variables = {'hostvars': {'some_host': {}, }} + when = [u"hostvars['some_undefined_host'] is not defined"] + ret = self._eval_con(when, variables) + self.assertTrue(ret) diff --git a/test/units/playbook/test_helpers.py b/test/units/playbook/test_helpers.py new file mode 100644 index 0000000..a89730c --- /dev/null +++ b/test/units/playbook/test_helpers.py @@ -0,0 +1,373 @@ +# (c) 2016, Adrian Likins +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from units.compat import unittest +from unittest.mock import MagicMock +from units.mock.loader import DictDataLoader + +from ansible import errors +from ansible.playbook.block import Block +from ansible.playbook.handler import Handler +from ansible.playbook.task import Task +from ansible.playbook.task_include import TaskInclude +from ansible.playbook.role.include import RoleInclude + +from ansible.playbook import helpers + + +class MixinForMocks(object): + def _setup(self): + # This is not a very good mixin, lots of side effects + self.fake_loader = DictDataLoader({'include_test.yml': "", + 'other_include_test.yml': ""}) + self.mock_tqm = MagicMock(name='MockTaskQueueManager') + + self.mock_play = MagicMock(name='MockPlay') + self.mock_play._attributes = [] + self.mock_play._collections = None + + self.mock_iterator = MagicMock(name='MockIterator') + self.mock_iterator._play = self.mock_play + + self.mock_inventory = MagicMock(name='MockInventory') + self.mock_inventory._hosts_cache = dict() + + def _get_host(host_name): + return None + + self.mock_inventory.get_host.side_effect = _get_host + # TODO: can we use a real VariableManager? + self.mock_variable_manager = MagicMock(name='MockVariableManager') + self.mock_variable_manager.get_vars.return_value = dict() + + self.mock_block = MagicMock(name='MockBlock') + + # On macOS /etc is actually /private/etc, tests fail when performing literal /etc checks + self.fake_role_loader = DictDataLoader({os.path.join(os.path.realpath("/etc"), "ansible/roles/bogus_role/tasks/main.yml"): """ + - shell: echo 'hello world' + """}) + + self._test_data_path = os.path.dirname(__file__) + self.fake_include_loader = DictDataLoader({"/dev/null/includes/test_include.yml": """ + - include: other_test_include.yml + - shell: echo 'hello world' + """, + "/dev/null/includes/static_test_include.yml": """ + - include: other_test_include.yml + - shell: echo 'hello static world' + """, + "/dev/null/includes/other_test_include.yml": """ + - debug: + msg: other_test_include_debug + """}) + + +class TestLoadListOfTasks(unittest.TestCase, MixinForMocks): + def setUp(self): + self._setup() + + def _assert_is_task_list(self, results): + for result in results: + self.assertIsInstance(result, Task) + + def _assert_is_task_list_or_blocks(self, results): + self.assertIsInstance(results, list) + for result in results: + self.assertIsInstance(result, (Task, Block)) + + def test_ds_not_list(self): + ds = {} + self.assertRaises(AssertionError, helpers.load_list_of_tasks, + ds, self.mock_play, block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None) + + def test_ds_not_dict(self): + ds = [[]] + self.assertRaises(AssertionError, helpers.load_list_of_tasks, + ds, self.mock_play, block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None) + + def test_empty_task(self): + ds = [{}] + self.assertRaisesRegex(errors.AnsibleParserError, + "no module/action detected in task", + helpers.load_list_of_tasks, + ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + + def test_empty_task_use_handlers(self): + ds = [{}] + self.assertRaisesRegex(errors.AnsibleParserError, + "no module/action detected in task.", + helpers.load_list_of_tasks, + ds, + use_handlers=True, + play=self.mock_play, + variable_manager=self.mock_variable_manager, + loader=self.fake_loader) + + def test_one_bogus_block(self): + ds = [{'block': None}] + self.assertRaisesRegex(errors.AnsibleParserError, + "A malformed block was encountered", + helpers.load_list_of_tasks, + ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + + def test_unknown_action(self): + action_name = 'foo_test_unknown_action' + ds = [{'action': action_name}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + self._assert_is_task_list_or_blocks(res) + self.assertEqual(res[0].action, action_name) + + def test_block_unknown_action(self): + action_name = 'foo_test_block_unknown_action' + ds = [{ + 'block': [{'action': action_name}] + }] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Block) + self._assert_default_block(res[0]) + + def _assert_default_block(self, block): + # the expected defaults + self.assertIsInstance(block.block, list) + self.assertEqual(len(block.block), 1) + self.assertIsInstance(block.rescue, list) + self.assertEqual(len(block.rescue), 0) + self.assertIsInstance(block.always, list) + self.assertEqual(len(block.always), 0) + + def test_block_use_handlers(self): + ds = [{'block': True}] + self.assertRaisesRegex(errors.AnsibleParserError, + "Using a block as a handler is not supported.", + helpers.load_list_of_tasks, + ds, play=self.mock_play, use_handlers=True, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + + def test_one_bogus_include(self): + ds = [{'include': 'somefile.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + self.assertIsInstance(res, list) + self.assertEqual(len(res), 0) + + def test_one_bogus_include_use_handlers(self): + ds = [{'include': 'somefile.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, use_handlers=True, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + self.assertIsInstance(res, list) + self.assertEqual(len(res), 0) + + def test_one_bogus_include_static(self): + ds = [{'import_tasks': 'somefile.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_loader) + self.assertIsInstance(res, list) + self.assertEqual(len(res), 0) + + def test_one_include(self): + ds = [{'include': '/dev/null/includes/other_test_include.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self.assertEqual(len(res), 1) + self._assert_is_task_list_or_blocks(res) + + def test_one_parent_include(self): + ds = [{'include': '/dev/null/includes/test_include.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Block) + self.assertIsInstance(res[0]._parent, TaskInclude) + + # TODO/FIXME: do this non deprecated way + def test_one_include_tags(self): + ds = [{'include': '/dev/null/includes/other_test_include.yml', + 'tags': ['test_one_include_tags_tag1', 'and_another_tagB'] + }] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Block) + self.assertIn('test_one_include_tags_tag1', res[0].tags) + self.assertIn('and_another_tagB', res[0].tags) + + # TODO/FIXME: do this non deprecated way + def test_one_parent_include_tags(self): + ds = [{'include': '/dev/null/includes/test_include.yml', + # 'vars': {'tags': ['test_one_parent_include_tags_tag1', 'and_another_tag2']} + 'tags': ['test_one_parent_include_tags_tag1', 'and_another_tag2'] + } + ] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Block) + self.assertIn('test_one_parent_include_tags_tag1', res[0].tags) + self.assertIn('and_another_tag2', res[0].tags) + + def test_one_include_use_handlers(self): + ds = [{'include': '/dev/null/includes/other_test_include.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + use_handlers=True, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Handler) + + def test_one_parent_include_use_handlers(self): + ds = [{'include': '/dev/null/includes/test_include.yml'}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + use_handlers=True, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Handler) + + # default for Handler + self.assertEqual(res[0].listen, []) + + # TODO/FIXME: this doesn't seen right + # figure out how to get the non-static errors to be raised, this seems to just ignore everything + def test_one_include_not_static(self): + ds = [{ + 'include_tasks': '/dev/null/includes/static_test_include.yml', + }] + # a_block = Block() + ti_ds = {'include_tasks': '/dev/null/includes/ssdftatic_test_include.yml'} + a_task_include = TaskInclude() + ti = a_task_include.load(ti_ds) + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + block=ti, + variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) + self._assert_is_task_list_or_blocks(res) + self.assertIsInstance(res[0], Task) + self.assertEqual(res[0].args['_raw_params'], '/dev/null/includes/static_test_include.yml') + + # TODO/FIXME: This two get stuck trying to make a mock_block into a TaskInclude +# def test_one_include(self): +# ds = [{'include': 'other_test_include.yml'}] +# res = helpers.load_list_of_tasks(ds, play=self.mock_play, +# block=self.mock_block, +# variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) +# print(res) + +# def test_one_parent_include(self): +# ds = [{'include': 'test_include.yml'}] +# res = helpers.load_list_of_tasks(ds, play=self.mock_play, +# block=self.mock_block, +# variable_manager=self.mock_variable_manager, loader=self.fake_include_loader) +# print(res) + + def test_one_bogus_include_role(self): + ds = [{'include_role': {'name': 'bogus_role'}, 'collections': []}] + res = helpers.load_list_of_tasks(ds, play=self.mock_play, + block=self.mock_block, + variable_manager=self.mock_variable_manager, loader=self.fake_role_loader) + self.assertEqual(len(res), 1) + self._assert_is_task_list_or_blocks(res) + + def test_one_bogus_include_role_use_handlers(self): + ds = [{'include_role': {'name': 'bogus_role'}, 'collections': []}] + + self.assertRaises(errors.AnsibleError, helpers.load_list_of_tasks, + ds, + self.mock_play, + True, # use_handlers + self.mock_block, + self.mock_variable_manager, + self.fake_role_loader) + + +class TestLoadListOfRoles(unittest.TestCase, MixinForMocks): + def setUp(self): + self._setup() + + def test_ds_not_list(self): + ds = {} + self.assertRaises(AssertionError, helpers.load_list_of_roles, + ds, self.mock_play) + + def test_empty_role(self): + ds = [{}] + self.assertRaisesRegex(errors.AnsibleError, + "role definitions must contain a role name", + helpers.load_list_of_roles, + ds, self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_role_loader) + + def test_empty_role_just_name(self): + ds = [{'name': 'bogus_role'}] + res = helpers.load_list_of_roles(ds, self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_role_loader) + self.assertIsInstance(res, list) + for r in res: + self.assertIsInstance(r, RoleInclude) + + def test_block_unknown_action(self): + ds = [{ + 'block': [{'action': 'foo_test_block_unknown_action'}] + }] + ds = [{'name': 'bogus_role'}] + res = helpers.load_list_of_roles(ds, self.mock_play, + variable_manager=self.mock_variable_manager, loader=self.fake_role_loader) + self.assertIsInstance(res, list) + for r in res: + self.assertIsInstance(r, RoleInclude) + + +class TestLoadListOfBlocks(unittest.TestCase, MixinForMocks): + def setUp(self): + self._setup() + + def test_ds_not_list(self): + ds = {} + mock_play = MagicMock(name='MockPlay') + self.assertRaises(AssertionError, helpers.load_list_of_blocks, + ds, mock_play, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None) + + def test_empty_block(self): + ds = [{}] + mock_play = MagicMock(name='MockPlay') + self.assertRaisesRegex(errors.AnsibleParserError, + "no module/action detected in task", + helpers.load_list_of_blocks, + ds, mock_play, + parent_block=None, + role=None, + task_include=None, + use_handlers=False, + variable_manager=None, + loader=None) + + def test_block_unknown_action(self): + ds = [{'action': 'foo', 'collections': []}] + mock_play = MagicMock(name='MockPlay') + res = helpers.load_list_of_blocks(ds, mock_play, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, + loader=None) + + self.assertIsInstance(res, list) + for block in res: + self.assertIsInstance(block, Block) diff --git a/test/units/playbook/test_included_file.py b/test/units/playbook/test_included_file.py new file mode 100644 index 0000000..7341dff --- /dev/null +++ b/test/units/playbook/test_included_file.py @@ -0,0 +1,332 @@ +# (c) 2016, Adrian Likins +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +import pytest + +from unittest.mock import MagicMock +from units.mock.loader import DictDataLoader + +from ansible.playbook.block import Block +from ansible.playbook.task import Task +from ansible.playbook.task_include import TaskInclude +from ansible.playbook.role_include import IncludeRole +from ansible.executor import task_result + +from ansible.playbook.included_file import IncludedFile +from ansible.errors import AnsibleParserError + + +@pytest.fixture +def mock_iterator(): + mock_iterator = MagicMock(name='MockIterator') + mock_iterator._play = MagicMock(name='MockPlay') + return mock_iterator + + +@pytest.fixture +def mock_variable_manager(): + # TODO: can we use a real VariableManager? + mock_variable_manager = MagicMock(name='MockVariableManager') + mock_variable_manager.get_vars.return_value = dict() + return mock_variable_manager + + +def test_equals_ok(): + uuid = '111-111' + parent = MagicMock(name='MockParent') + parent._uuid = uuid + task = MagicMock(name='MockTask') + task._uuid = uuid + task._parent = parent + inc_a = IncludedFile('a.yml', {}, {}, task) + inc_b = IncludedFile('a.yml', {}, {}, task) + assert inc_a == inc_b + + +def test_equals_different_tasks(): + parent = MagicMock(name='MockParent') + parent._uuid = '111-111' + task_a = MagicMock(name='MockTask') + task_a._uuid = '11-11' + task_a._parent = parent + task_b = MagicMock(name='MockTask') + task_b._uuid = '22-22' + task_b._parent = parent + inc_a = IncludedFile('a.yml', {}, {}, task_a) + inc_b = IncludedFile('a.yml', {}, {}, task_b) + assert inc_a != inc_b + + +def test_equals_different_parents(): + parent_a = MagicMock(name='MockParent') + parent_a._uuid = '111-111' + parent_b = MagicMock(name='MockParent') + parent_b._uuid = '222-222' + task_a = MagicMock(name='MockTask') + task_a._uuid = '11-11' + task_a._parent = parent_a + task_b = MagicMock(name='MockTask') + task_b._uuid = '11-11' + task_b._parent = parent_b + inc_a = IncludedFile('a.yml', {}, {}, task_a) + inc_b = IncludedFile('a.yml', {}, {}, task_b) + assert inc_a != inc_b + + +def test_included_file_instantiation(): + filename = 'somefile.yml' + + inc_file = IncludedFile(filename=filename, args={}, vars={}, task=None) + + assert isinstance(inc_file, IncludedFile) + assert inc_file._filename == filename + assert inc_file._args == {} + assert inc_file._vars == {} + assert inc_file._task is None + + +def test_process_include_results(mock_iterator, mock_variable_manager): + hostname = "testhost1" + hostname2 = "testhost2" + + parent_task_ds = {'debug': 'msg=foo'} + parent_task = Task.load(parent_task_ds) + parent_task._play = None + + task_ds = {'include': 'include_test.yml'} + loaded_task = TaskInclude.load(task_ds, task_include=parent_task) + + return_data = {'include': 'include_test.yml'} + # The task in the TaskResult has to be a TaskInclude so it has a .static attr + result1 = task_result.TaskResult(host=hostname, task=loaded_task, return_data=return_data) + result2 = task_result.TaskResult(host=hostname2, task=loaded_task, return_data=return_data) + results = [result1, result2] + + fake_loader = DictDataLoader({'include_test.yml': ""}) + + res = IncludedFile.process_include_results(results, mock_iterator, fake_loader, mock_variable_manager) + assert isinstance(res, list) + assert len(res) == 1 + assert res[0]._filename == os.path.join(os.getcwd(), 'include_test.yml') + assert res[0]._hosts == ['testhost1', 'testhost2'] + assert res[0]._args == {} + assert res[0]._vars == {} + + +def test_process_include_diff_files(mock_iterator, mock_variable_manager): + hostname = "testhost1" + hostname2 = "testhost2" + + parent_task_ds = {'debug': 'msg=foo'} + parent_task = Task.load(parent_task_ds) + parent_task._play = None + + task_ds = {'include': 'include_test.yml'} + loaded_task = TaskInclude.load(task_ds, task_include=parent_task) + loaded_task._play = None + + child_task_ds = {'include': 'other_include_test.yml'} + loaded_child_task = TaskInclude.load(child_task_ds, task_include=loaded_task) + loaded_child_task._play = None + + return_data = {'include': 'include_test.yml'} + # The task in the TaskResult has to be a TaskInclude so it has a .static attr + result1 = task_result.TaskResult(host=hostname, task=loaded_task, return_data=return_data) + + return_data = {'include': 'other_include_test.yml'} + result2 = task_result.TaskResult(host=hostname2, task=loaded_child_task, return_data=return_data) + results = [result1, result2] + + fake_loader = DictDataLoader({'include_test.yml': "", + 'other_include_test.yml': ""}) + + res = IncludedFile.process_include_results(results, mock_iterator, fake_loader, mock_variable_manager) + assert isinstance(res, list) + assert res[0]._filename == os.path.join(os.getcwd(), 'include_test.yml') + assert res[1]._filename == os.path.join(os.getcwd(), 'other_include_test.yml') + + assert res[0]._hosts == ['testhost1'] + assert res[1]._hosts == ['testhost2'] + + assert res[0]._args == {} + assert res[1]._args == {} + + assert res[0]._vars == {} + assert res[1]._vars == {} + + +def test_process_include_simulate_free(mock_iterator, mock_variable_manager): + hostname = "testhost1" + hostname2 = "testhost2" + + parent_task_ds = {'debug': 'msg=foo'} + parent_task1 = Task.load(parent_task_ds) + parent_task2 = Task.load(parent_task_ds) + + parent_task1._play = None + parent_task2._play = None + + task_ds = {'include': 'include_test.yml'} + loaded_task1 = TaskInclude.load(task_ds, task_include=parent_task1) + loaded_task2 = TaskInclude.load(task_ds, task_include=parent_task2) + + return_data = {'include': 'include_test.yml'} + # The task in the TaskResult has to be a TaskInclude so it has a .static attr + result1 = task_result.TaskResult(host=hostname, task=loaded_task1, return_data=return_data) + result2 = task_result.TaskResult(host=hostname2, task=loaded_task2, return_data=return_data) + results = [result1, result2] + + fake_loader = DictDataLoader({'include_test.yml': ""}) + + res = IncludedFile.process_include_results(results, mock_iterator, fake_loader, mock_variable_manager) + assert isinstance(res, list) + assert len(res) == 2 + assert res[0]._filename == os.path.join(os.getcwd(), 'include_test.yml') + assert res[1]._filename == os.path.join(os.getcwd(), 'include_test.yml') + + assert res[0]._hosts == ['testhost1'] + assert res[1]._hosts == ['testhost2'] + + assert res[0]._args == {} + assert res[1]._args == {} + + assert res[0]._vars == {} + assert res[1]._vars == {} + + +def test_process_include_simulate_free_block_role_tasks(mock_iterator, + mock_variable_manager): + """Test loading the same role returns different included files + + In the case of free, we may end up with included files from roles that + have the same parent but are different tasks. Previously the comparison + for equality did not check if the tasks were the same and only checked + that the parents were the same. This lead to some tasks being run + incorrectly and some tasks being silient dropped.""" + + fake_loader = DictDataLoader({ + 'include_test.yml': "", + '/etc/ansible/roles/foo_role/tasks/task1.yml': """ + - debug: msg=task1 + """, + '/etc/ansible/roles/foo_role/tasks/task2.yml': """ + - debug: msg=task2 + """, + }) + + hostname = "testhost1" + hostname2 = "testhost2" + + role1_ds = { + 'name': 'task1 include', + 'include_role': { + 'name': 'foo_role', + 'tasks_from': 'task1.yml' + } + } + role2_ds = { + 'name': 'task2 include', + 'include_role': { + 'name': 'foo_role', + 'tasks_from': 'task2.yml' + } + } + parent_task_ds = { + 'block': [ + role1_ds, + role2_ds + ] + } + parent_block = Block.load(parent_task_ds, loader=fake_loader) + + parent_block._play = None + + include_role1_ds = { + 'include_args': { + 'name': 'foo_role', + 'tasks_from': 'task1.yml' + } + } + include_role2_ds = { + 'include_args': { + 'name': 'foo_role', + 'tasks_from': 'task2.yml' + } + } + + include_role1 = IncludeRole.load(role1_ds, + block=parent_block, + loader=fake_loader) + include_role2 = IncludeRole.load(role2_ds, + block=parent_block, + loader=fake_loader) + + result1 = task_result.TaskResult(host=hostname, + task=include_role1, + return_data=include_role1_ds) + result2 = task_result.TaskResult(host=hostname2, + task=include_role2, + return_data=include_role2_ds) + results = [result1, result2] + + res = IncludedFile.process_include_results(results, + mock_iterator, + fake_loader, + mock_variable_manager) + assert isinstance(res, list) + # we should get two different includes + assert len(res) == 2 + assert res[0]._filename == 'foo_role' + assert res[1]._filename == 'foo_role' + # with different tasks + assert res[0]._task != res[1]._task + + assert res[0]._hosts == ['testhost1'] + assert res[1]._hosts == ['testhost2'] + + assert res[0]._args == {} + assert res[1]._args == {} + + assert res[0]._vars == {} + assert res[1]._vars == {} + + +def test_empty_raw_params(): + parent_task_ds = {'debug': 'msg=foo'} + parent_task = Task.load(parent_task_ds) + parent_task._play = None + + task_ds_list = [ + { + 'include': '' + }, + { + 'include_tasks': '' + }, + { + 'import_tasks': '' + } + ] + for task_ds in task_ds_list: + with pytest.raises(AnsibleParserError): + TaskInclude.load(task_ds, task_include=parent_task) diff --git a/test/units/playbook/test_play.py b/test/units/playbook/test_play.py new file mode 100644 index 0000000..bcc1e5e --- /dev/null +++ b/test/units/playbook/test_play.py @@ -0,0 +1,291 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible.errors import AnsibleAssertionError, AnsibleParserError +from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode +from ansible.playbook.block import Block +from ansible.playbook.play import Play +from ansible.playbook.role import Role +from ansible.playbook.task import Task + +from units.mock.loader import DictDataLoader + + +def test_empty_play(): + p = Play.load({}) + + assert str(p) == '' + + +def test_play_with_hosts_string(): + p = Play.load({'hosts': 'foo'}) + + assert str(p) == 'foo' + + # Test the caching since self.name should be set by previous call. + assert p.get_name() == 'foo' + + +def test_basic_play(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + connection='local', + remote_user="root", + become=True, + become_user="testing", + )) + + assert p.name == 'test play' + assert p.hosts == ['foo'] + assert p.connection == 'local' + + +def test_play_with_remote_user(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + user="testing", + gather_facts=False, + )) + + assert p.remote_user == "testing" + + +def test_play_with_user_conflict(): + play_data = dict( + name="test play", + hosts=['foo'], + user="testing", + remote_user="testing", + ) + + with pytest.raises(AnsibleParserError): + Play.load(play_data) + + +def test_play_with_bad_ds_type(): + play_data = [] + with pytest.raises(AnsibleAssertionError, match=r"while preprocessing data \(\[\]\), ds should be a dict but was a <(?:class|type) 'list'>"): + Play.load(play_data) + + +def test_play_with_tasks(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[dict(action='shell echo "hello world"')], + )) + + assert len(p.tasks) == 1 + assert isinstance(p.tasks[0], Block) + assert p.tasks[0].has_tasks() is True + + +def test_play_with_handlers(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + handlers=[dict(action='shell echo "hello world"')], + )) + + assert len(p.handlers) >= 1 + assert len(p.get_handlers()) >= 1 + assert isinstance(p.handlers[0], Block) + assert p.handlers[0].has_tasks() is True + + +def test_play_with_pre_tasks(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + pre_tasks=[dict(action='shell echo "hello world"')], + )) + + assert len(p.pre_tasks) >= 1 + assert isinstance(p.pre_tasks[0], Block) + assert p.pre_tasks[0].has_tasks() is True + + assert len(p.get_tasks()) >= 1 + assert isinstance(p.get_tasks()[0][0], Task) + assert p.get_tasks()[0][0].action == 'shell' + + +def test_play_with_post_tasks(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + post_tasks=[dict(action='shell echo "hello world"')], + )) + + assert len(p.post_tasks) >= 1 + assert isinstance(p.post_tasks[0], Block) + assert p.post_tasks[0].has_tasks() is True + + +def test_play_with_roles(mocker): + mocker.patch('ansible.playbook.role.definition.RoleDefinition._load_role_path', return_value=('foo', '/etc/ansible/roles/foo')) + fake_loader = DictDataLoader({ + '/etc/ansible/roles/foo/tasks.yml': """ + - name: role task + shell: echo "hello world" + """, + }) + + mock_var_manager = mocker.MagicMock() + mock_var_manager.get_vars.return_value = {} + + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + roles=['foo'], + ), loader=fake_loader, variable_manager=mock_var_manager) + + blocks = p.compile() + assert len(blocks) > 1 + assert all(isinstance(block, Block) for block in blocks) + assert isinstance(p.get_roles()[0], Role) + + +def test_play_compile(): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[dict(action='shell echo "hello world"')], + )) + + blocks = p.compile() + + # with a single block, there will still be three + # implicit meta flush_handler blocks inserted + assert len(blocks) == 4 + + +@pytest.mark.parametrize( + 'value, expected', + ( + ('my_vars.yml', ['my_vars.yml']), + (['my_vars.yml'], ['my_vars.yml']), + (['my_vars1.yml', 'my_vars2.yml'], ['my_vars1.yml', 'my_vars2.yml']), + (None, []), + ) +) +def test_play_with_vars_files(value, expected): + play = Play.load({ + 'name': 'Play with vars_files', + 'hosts': ['testhost1'], + 'vars_files': value, + }) + + assert play.vars_files == value + assert play.get_vars_files() == expected + + +@pytest.mark.parametrize('value', ([], tuple(), set(), {}, '', None, False, 0)) +def test_play_empty_hosts(value): + with pytest.raises(AnsibleParserError, match='Hosts list cannot be empty'): + Play.load({'hosts': value}) + + +@pytest.mark.parametrize('value', ([None], (None,), ['one', None])) +def test_play_none_hosts(value): + with pytest.raises(AnsibleParserError, match="Hosts list cannot contain values of 'None'"): + Play.load({'hosts': value}) + + +@pytest.mark.parametrize( + 'value', + ( + {'one': None}, + {'one': 'two'}, + True, + 1, + 1.75, + AnsibleVaultEncryptedUnicode('secret'), + ) +) +def test_play_invalid_hosts_sequence(value): + with pytest.raises(AnsibleParserError, match='Hosts list must be a sequence or string'): + Play.load({'hosts': value}) + + +@pytest.mark.parametrize( + 'value', + ( + [[1, 'two']], + [{'one': None}], + [set((None, 'one'))], + ['one', 'two', {'three': None}], + ['one', 'two', {'three': 'four'}], + [AnsibleVaultEncryptedUnicode('secret')], + ) +) +def test_play_invalid_hosts_value(value): + with pytest.raises(AnsibleParserError, match='Hosts list contains an invalid host value'): + Play.load({'hosts': value}) + + +def test_play_with_vars(): + play = Play.load({}, vars={'var1': 'val1'}) + + assert play.get_name() == '' + assert play.vars == {'var1': 'val1'} + assert play.get_vars() == {'var1': 'val1'} + + +def test_play_no_name_hosts_sequence(): + play = Play.load({'hosts': ['host1', 'host2']}) + + assert play.get_name() == 'host1,host2' + + +def test_play_hosts_template_expression(): + play = Play.load({'hosts': "{{ target_hosts }}"}) + + assert play.get_name() == '{{ target_hosts }}' + + +@pytest.mark.parametrize( + 'call', + ( + '_load_tasks', + '_load_pre_tasks', + '_load_post_tasks', + '_load_handlers', + '_load_roles', + ) +) +def test_bad_blocks_roles(mocker, call): + mocker.patch('ansible.playbook.play.load_list_of_blocks', side_effect=AssertionError('Raised intentionally')) + mocker.patch('ansible.playbook.play.load_list_of_roles', side_effect=AssertionError('Raised intentionally')) + + play = Play.load({}) + with pytest.raises(AnsibleParserError, match='A malformed (block|(role declaration)) was encountered'): + getattr(play, call)('', None) diff --git a/test/units/playbook/test_play_context.py b/test/units/playbook/test_play_context.py new file mode 100644 index 0000000..7c24de5 --- /dev/null +++ b/test/units/playbook/test_play_context.py @@ -0,0 +1,94 @@ +# (c) 2012-2014, Michael DeHaan +# (c) 2017 Ansible Project +# +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible import constants as C +from ansible import context +from ansible.cli.arguments import option_helpers as opt_help +from ansible.errors import AnsibleError +from ansible.playbook.play_context import PlayContext +from ansible.playbook.play import Play +from ansible.plugins.loader import become_loader +from ansible.utils import context_objects as co + + +@pytest.fixture +def parser(): + parser = opt_help.create_base_parser('testparser') + + opt_help.add_runas_options(parser) + opt_help.add_meta_options(parser) + opt_help.add_runtask_options(parser) + opt_help.add_vault_options(parser) + opt_help.add_async_options(parser) + opt_help.add_connect_options(parser) + opt_help.add_subset_options(parser) + opt_help.add_check_options(parser) + opt_help.add_inventory_options(parser) + + return parser + + +@pytest.fixture +def reset_cli_args(): + co.GlobalCLIArgs._Singleton__instance = None + yield + co.GlobalCLIArgs._Singleton__instance = None + + +def test_play_context(mocker, parser, reset_cli_args): + options = parser.parse_args(['-vv', '--check']) + context._init_global_context(options) + play = Play.load({}) + play_context = PlayContext(play=play) + + assert play_context.remote_addr is None + assert play_context.remote_user is None + assert play_context.password == '' + assert play_context.private_key_file == C.DEFAULT_PRIVATE_KEY_FILE + assert play_context.timeout == C.DEFAULT_TIMEOUT + assert play_context.verbosity == 2 + assert play_context.check_mode is True + + mock_play = mocker.MagicMock() + mock_play.force_handlers = True + + play_context = PlayContext(play=mock_play) + assert play_context.force_handlers is True + + mock_task = mocker.MagicMock() + mock_task.connection = 'mocktask' + mock_task.remote_user = 'mocktask' + mock_task.port = 1234 + mock_task.no_log = True + mock_task.become = True + mock_task.become_method = 'mocktask' + mock_task.become_user = 'mocktaskroot' + mock_task.become_pass = 'mocktaskpass' + mock_task._local_action = False + mock_task.delegate_to = None + + all_vars = dict( + ansible_connection='mock_inventory', + ansible_ssh_port=4321, + ) + + mock_templar = mocker.MagicMock() + + play_context = PlayContext() + play_context = play_context.set_task_and_variable_override(task=mock_task, variables=all_vars, templar=mock_templar) + + assert play_context.connection == 'mock_inventory' + assert play_context.remote_user == 'mocktask' + assert play_context.no_log is True + + mock_task.no_log = False + play_context = play_context.set_task_and_variable_override(task=mock_task, variables=all_vars, templar=mock_templar) + assert play_context.no_log is False diff --git a/test/units/playbook/test_playbook.py b/test/units/playbook/test_playbook.py new file mode 100644 index 0000000..68a9fb7 --- /dev/null +++ b/test/units/playbook/test_playbook.py @@ -0,0 +1,61 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.errors import AnsibleParserError +from ansible.playbook import Playbook +from ansible.vars.manager import VariableManager + +from units.mock.loader import DictDataLoader + + +class TestPlaybook(unittest.TestCase): + + def test_empty_playbook(self): + fake_loader = DictDataLoader({}) + p = Playbook(loader=fake_loader) + + def test_basic_playbook(self): + fake_loader = DictDataLoader({ + "test_file.yml": """ + - hosts: all + """, + }) + p = Playbook.load("test_file.yml", loader=fake_loader) + plays = p.get_plays() + + def test_bad_playbook_files(self): + fake_loader = DictDataLoader({ + # represents a playbook which is not a list of plays + "bad_list.yml": """ + foo: bar + + """, + # represents a playbook where a play entry is mis-formatted + "bad_entry.yml": """ + - + - "This should be a mapping..." + + """, + }) + vm = VariableManager() + self.assertRaises(AnsibleParserError, Playbook.load, "bad_list.yml", vm, fake_loader) + self.assertRaises(AnsibleParserError, Playbook.load, "bad_entry.yml", vm, fake_loader) diff --git a/test/units/playbook/test_taggable.py b/test/units/playbook/test_taggable.py new file mode 100644 index 0000000..3881e17 --- /dev/null +++ b/test/units/playbook/test_taggable.py @@ -0,0 +1,105 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.playbook.taggable import Taggable +from units.mock.loader import DictDataLoader + + +class TaggableTestObj(Taggable): + + def __init__(self): + self._loader = DictDataLoader({}) + self.tags = [] + + +class TestTaggable(unittest.TestCase): + + def assert_evaluate_equal(self, test_value, tags, only_tags, skip_tags): + taggable_obj = TaggableTestObj() + taggable_obj.tags = tags + + evaluate = taggable_obj.evaluate_tags(only_tags, skip_tags, {}) + + self.assertEqual(test_value, evaluate) + + def test_evaluate_tags_tag_in_only_tags(self): + self.assert_evaluate_equal(True, ['tag1', 'tag2'], ['tag1'], []) + + def test_evaluate_tags_tag_in_skip_tags(self): + self.assert_evaluate_equal(False, ['tag1', 'tag2'], [], ['tag1']) + + def test_evaluate_tags_special_always_in_object_tags(self): + self.assert_evaluate_equal(True, ['tag', 'always'], ['random'], []) + + def test_evaluate_tags_tag_in_skip_tags_special_always_in_object_tags(self): + self.assert_evaluate_equal(False, ['tag', 'always'], ['random'], ['tag']) + + def test_evaluate_tags_special_always_in_skip_tags_and_always_in_tags(self): + self.assert_evaluate_equal(False, ['tag', 'always'], [], ['always']) + + def test_evaluate_tags_special_tagged_in_only_tags_and_object_tagged(self): + self.assert_evaluate_equal(True, ['tag'], ['tagged'], []) + + def test_evaluate_tags_special_tagged_in_only_tags_and_object_untagged(self): + self.assert_evaluate_equal(False, [], ['tagged'], []) + + def test_evaluate_tags_special_tagged_in_skip_tags_and_object_tagged(self): + self.assert_evaluate_equal(False, ['tag'], [], ['tagged']) + + def test_evaluate_tags_special_tagged_in_skip_tags_and_object_untagged(self): + self.assert_evaluate_equal(True, [], [], ['tagged']) + + def test_evaluate_tags_special_untagged_in_only_tags_and_object_tagged(self): + self.assert_evaluate_equal(False, ['tag'], ['untagged'], []) + + def test_evaluate_tags_special_untagged_in_only_tags_and_object_untagged(self): + self.assert_evaluate_equal(True, [], ['untagged'], []) + + def test_evaluate_tags_special_untagged_in_skip_tags_and_object_tagged(self): + self.assert_evaluate_equal(True, ['tag'], [], ['untagged']) + + def test_evaluate_tags_special_untagged_in_skip_tags_and_object_untagged(self): + self.assert_evaluate_equal(False, [], [], ['untagged']) + + def test_evaluate_tags_special_all_in_only_tags(self): + self.assert_evaluate_equal(True, ['tag'], ['all'], ['untagged']) + + def test_evaluate_tags_special_all_in_only_tags_and_object_untagged(self): + self.assert_evaluate_equal(True, [], ['all'], []) + + def test_evaluate_tags_special_all_in_skip_tags(self): + self.assert_evaluate_equal(False, ['tag'], ['tag'], ['all']) + + def test_evaluate_tags_special_all_in_only_tags_and_special_all_in_skip_tags(self): + self.assert_evaluate_equal(False, ['tag'], ['all'], ['all']) + + def test_evaluate_tags_special_all_in_skip_tags_and_always_in_object_tags(self): + self.assert_evaluate_equal(True, ['tag', 'always'], [], ['all']) + + def test_evaluate_tags_special_all_in_skip_tags_and_special_always_in_skip_tags_and_always_in_object_tags(self): + self.assert_evaluate_equal(False, ['tag', 'always'], [], ['all', 'always']) + + def test_evaluate_tags_accepts_lists(self): + self.assert_evaluate_equal(True, ['tag1', 'tag2'], ['tag2'], []) + + def test_evaluate_tags_with_repeated_tags(self): + self.assert_evaluate_equal(False, ['tag', 'tag'], [], ['tag']) diff --git a/test/units/playbook/test_task.py b/test/units/playbook/test_task.py new file mode 100644 index 0000000..070d7aa --- /dev/null +++ b/test/units/playbook/test_task.py @@ -0,0 +1,114 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from unittest.mock import patch +from ansible.playbook.task import Task +from ansible.parsing.yaml import objects +from ansible import errors + + +basic_command_task = dict( + name='Test Task', + command='echo hi' +) + +kv_command_task = dict( + action='command echo hi' +) + +# See #36848 +kv_bad_args_str = '- apk: sdfs sf sdf 37' +kv_bad_args_ds = {'apk': 'sdfs sf sdf 37'} + + +class TestTask(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_construct_empty_task(self): + Task() + + def test_construct_task_with_role(self): + pass + + def test_construct_task_with_block(self): + pass + + def test_construct_task_with_role_and_block(self): + pass + + def test_load_task_simple(self): + t = Task.load(basic_command_task) + assert t is not None + self.assertEqual(t.name, basic_command_task['name']) + self.assertEqual(t.action, 'command') + self.assertEqual(t.args, dict(_raw_params='echo hi')) + + def test_load_task_kv_form(self): + t = Task.load(kv_command_task) + self.assertEqual(t.action, 'command') + self.assertEqual(t.args, dict(_raw_params='echo hi')) + + @patch.object(errors.AnsibleError, '_get_error_lines_from_file') + def test_load_task_kv_form_error_36848(self, mock_get_err_lines): + ds = objects.AnsibleMapping(kv_bad_args_ds) + ds.ansible_pos = ('test_task_faux_playbook.yml', 1, 1) + mock_get_err_lines.return_value = (kv_bad_args_str, '') + + with self.assertRaises(errors.AnsibleParserError) as cm: + Task.load(ds) + + self.assertIsInstance(cm.exception, errors.AnsibleParserError) + self.assertEqual(cm.exception.obj, ds) + self.assertEqual(cm.exception.obj, kv_bad_args_ds) + self.assertIn("The error appears to be in 'test_task_faux_playbook.yml", cm.exception.message) + self.assertIn(kv_bad_args_str, cm.exception.message) + self.assertIn('apk', cm.exception.message) + self.assertEqual(cm.exception.message.count('The offending line'), 1) + self.assertEqual(cm.exception.message.count('The error appears to be in'), 1) + + def test_task_auto_name(self): + assert 'name' not in kv_command_task + Task.load(kv_command_task) + # self.assertEqual(t.name, 'shell echo hi') + + def test_task_auto_name_with_role(self): + pass + + def test_load_task_complex_form(self): + pass + + def test_can_load_module_complex_form(self): + pass + + def test_local_action_implies_delegate(self): + pass + + def test_local_action_conflicts_with_delegate(self): + pass + + def test_delegate_to_parses(self): + pass -- cgit v1.2.3