diff options
Diffstat (limited to 'test/units/plugins/inventory')
-rw-r--r-- | test/units/plugins/inventory/__init__.py | 0 | ||||
-rw-r--r-- | test/units/plugins/inventory/test_constructed.py | 206 | ||||
-rw-r--r-- | test/units/plugins/inventory/test_inventory.py | 207 | ||||
-rw-r--r-- | test/units/plugins/inventory/test_script.py | 105 |
4 files changed, 518 insertions, 0 deletions
diff --git a/test/units/plugins/inventory/__init__.py b/test/units/plugins/inventory/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/plugins/inventory/__init__.py diff --git a/test/units/plugins/inventory/test_constructed.py b/test/units/plugins/inventory/test_constructed.py new file mode 100644 index 00000000..6d521982 --- /dev/null +++ b/test/units/plugins/inventory/test_constructed.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- + +# Copyright 2019 Alan Rominger <arominge@redhat.net> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible.errors import AnsibleParserError +from ansible.plugins.inventory.constructed import InventoryModule +from ansible.inventory.data import InventoryData +from ansible.template import Templar + + +@pytest.fixture() +def inventory_module(): + r = InventoryModule() + r.inventory = InventoryData() + r.templar = Templar(None) + return r + + +def test_group_by_value_only(inventory_module): + inventory_module.inventory.add_host('foohost') + inventory_module.inventory.set_variable('foohost', 'bar', 'my_group_name') + host = inventory_module.inventory.get_host('foohost') + keyed_groups = [ + { + 'prefix': '', + 'separator': '', + 'key': 'bar' + } + ] + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=False + ) + assert 'my_group_name' in inventory_module.inventory.groups + group = inventory_module.inventory.groups['my_group_name'] + assert group.hosts == [host] + + +def test_keyed_group_separator(inventory_module): + inventory_module.inventory.add_host('farm') + inventory_module.inventory.set_variable('farm', 'farmer', 'mcdonald') + inventory_module.inventory.set_variable('farm', 'barn', {'cow': 'betsy'}) + host = inventory_module.inventory.get_host('farm') + keyed_groups = [ + { + 'prefix': 'farmer', + 'separator': '_old_', + 'key': 'farmer' + }, + { + 'separator': 'mmmmmmmmmm', + 'key': 'barn' + } + ] + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=False + ) + for group_name in ('farmer_old_mcdonald', 'mmmmmmmmmmcowmmmmmmmmmmbetsy'): + assert group_name in inventory_module.inventory.groups + group = inventory_module.inventory.groups[group_name] + assert group.hosts == [host] + + +def test_keyed_group_empty_construction(inventory_module): + inventory_module.inventory.add_host('farm') + inventory_module.inventory.set_variable('farm', 'barn', {}) + host = inventory_module.inventory.get_host('farm') + keyed_groups = [ + { + 'separator': 'mmmmmmmmmm', + 'key': 'barn' + } + ] + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=True + ) + assert host.groups == [] + + +def test_keyed_group_host_confusion(inventory_module): + inventory_module.inventory.add_host('cow') + inventory_module.inventory.add_group('cow') + host = inventory_module.inventory.get_host('cow') + host.vars['species'] = 'cow' + keyed_groups = [ + { + 'separator': '', + 'prefix': '', + 'key': 'species' + } + ] + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=True + ) + group = inventory_module.inventory.groups['cow'] + # group cow has host of cow + assert group.hosts == [host] + + +def test_keyed_parent_groups(inventory_module): + inventory_module.inventory.add_host('web1') + inventory_module.inventory.add_host('web2') + inventory_module.inventory.set_variable('web1', 'region', 'japan') + inventory_module.inventory.set_variable('web2', 'region', 'japan') + host1 = inventory_module.inventory.get_host('web1') + host2 = inventory_module.inventory.get_host('web2') + keyed_groups = [ + { + 'prefix': 'region', + 'key': 'region', + 'parent_group': 'region_list' + } + ] + for host in [host1, host2]: + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=False + ) + assert 'region_japan' in inventory_module.inventory.groups + assert 'region_list' in inventory_module.inventory.groups + region_group = inventory_module.inventory.groups['region_japan'] + all_regions = inventory_module.inventory.groups['region_list'] + assert all_regions.child_groups == [region_group] + assert region_group.hosts == [host1, host2] + + +def test_parent_group_templating(inventory_module): + inventory_module.inventory.add_host('cow') + inventory_module.inventory.set_variable('cow', 'sound', 'mmmmmmmmmm') + inventory_module.inventory.set_variable('cow', 'nickname', 'betsy') + host = inventory_module.inventory.get_host('cow') + keyed_groups = [ + { + 'key': 'sound', + 'prefix': 'sound', + 'parent_group': '{{ nickname }}' + }, + { + 'key': 'nickname', + 'prefix': '', + 'separator': '', + 'parent_group': 'nickname' # statically-named parent group, conflicting with hostvar + }, + { + 'key': 'nickname', + 'separator': '', + 'parent_group': '{{ location | default("field") }}' + } + ] + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=True + ) + # first keyed group, "betsy" is a parent group name dynamically generated + betsys_group = inventory_module.inventory.groups['betsy'] + assert [child.name for child in betsys_group.child_groups] == ['sound_mmmmmmmmmm'] + # second keyed group, "nickname" is a statically-named root group + nicknames_group = inventory_module.inventory.groups['nickname'] + assert [child.name for child in nicknames_group.child_groups] == ['betsy'] + # second keyed group actually generated the parent group of the first keyed group + # assert that these are, in fact, the same object + assert nicknames_group.child_groups[0] == betsys_group + # second keyed group has two parents + locations_group = inventory_module.inventory.groups['field'] + assert [child.name for child in locations_group.child_groups] == ['betsy'] + + +def test_parent_group_templating_error(inventory_module): + inventory_module.inventory.add_host('cow') + inventory_module.inventory.set_variable('cow', 'nickname', 'betsy') + host = inventory_module.inventory.get_host('cow') + keyed_groups = [ + { + 'key': 'nickname', + 'separator': '', + 'parent_group': '{{ location.barn-yard }}' + } + ] + with pytest.raises(AnsibleParserError) as err_message: + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=True + ) + assert 'Could not generate parent group' in err_message + # invalid parent group did not raise an exception with strict=False + inventory_module._add_host_to_keyed_groups( + keyed_groups, host.vars, host.name, strict=False + ) + # assert group was never added with invalid parent + assert 'betsy' not in inventory_module.inventory.groups diff --git a/test/units/plugins/inventory/test_inventory.py b/test/units/plugins/inventory/test_inventory.py new file mode 100644 index 00000000..66b5ec37 --- /dev/null +++ b/test/units/plugins/inventory/test_inventory.py @@ -0,0 +1,207 @@ +# Copyright 2015 Abhijit Menon-Sen <ams@2ndQuadrant.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import string +import textwrap + +from ansible import constants as C +from units.compat import mock +from units.compat import unittest +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_text +from units.mock.path import mock_unfrackpath_noop + +from ansible.inventory.manager import InventoryManager, split_host_pattern + +from units.mock.loader import DictDataLoader + + +class TestInventory(unittest.TestCase): + + patterns = { + 'a': ['a'], + 'a, b': ['a', 'b'], + 'a , b': ['a', 'b'], + ' a,b ,c[1:2] ': ['a', 'b', 'c[1:2]'], + '9a01:7f8:191:7701::9': ['9a01:7f8:191:7701::9'], + '9a01:7f8:191:7701::9,9a01:7f8:191:7701::9': ['9a01:7f8:191:7701::9', '9a01:7f8:191:7701::9'], + '9a01:7f8:191:7701::9,9a01:7f8:191:7701::9,foo': ['9a01:7f8:191:7701::9', '9a01:7f8:191:7701::9', 'foo'], + 'foo[1:2]': ['foo[1:2]'], + 'a::b': ['a::b'], + 'a:b': ['a', 'b'], + ' a : b ': ['a', 'b'], + 'foo:bar:baz[1:2]': ['foo', 'bar', 'baz[1:2]'], + 'a,,b': ['a', 'b'], + 'a, ,b,,c, ,': ['a', 'b', 'c'], + ',': [], + '': [], + } + + pattern_lists = [ + [['a'], ['a']], + [['a', 'b'], ['a', 'b']], + [['a, b'], ['a', 'b']], + [['9a01:7f8:191:7701::9', '9a01:7f8:191:7701::9,foo'], + ['9a01:7f8:191:7701::9', '9a01:7f8:191:7701::9', 'foo']] + ] + + # pattern_string: [ ('base_pattern', (a,b)), ['x','y','z'] ] + # a,b are the bounds of the subscript; x..z are the results of the subscript + # when applied to string.ascii_letters. + + subscripts = { + 'a': [('a', None), list(string.ascii_letters)], + 'a[0]': [('a', (0, None)), ['a']], + 'a[1]': [('a', (1, None)), ['b']], + 'a[2:3]': [('a', (2, 3)), ['c', 'd']], + 'a[-1]': [('a', (-1, None)), ['Z']], + 'a[-2]': [('a', (-2, None)), ['Y']], + 'a[48:]': [('a', (48, -1)), ['W', 'X', 'Y', 'Z']], + 'a[49:]': [('a', (49, -1)), ['X', 'Y', 'Z']], + 'a[1:]': [('a', (1, -1)), list(string.ascii_letters[1:])], + } + + ranges_to_expand = { + 'a[1:2]': ['a1', 'a2'], + 'a[1:10:2]': ['a1', 'a3', 'a5', 'a7', 'a9'], + 'a[a:b]': ['aa', 'ab'], + 'a[a:i:3]': ['aa', 'ad', 'ag'], + 'a[a:b][c:d]': ['aac', 'aad', 'abc', 'abd'], + 'a[0:1][2:3]': ['a02', 'a03', 'a12', 'a13'], + 'a[a:b][2:3]': ['aa2', 'aa3', 'ab2', 'ab3'], + } + + def setUp(self): + fake_loader = DictDataLoader({}) + + self.i = InventoryManager(loader=fake_loader, sources=[None]) + + def test_split_patterns(self): + + for p in self.patterns: + r = self.patterns[p] + self.assertEqual(r, split_host_pattern(p)) + + for p, r in self.pattern_lists: + self.assertEqual(r, split_host_pattern(p)) + + def test_ranges(self): + + for s in self.subscripts: + r = self.subscripts[s] + self.assertEqual(r[0], self.i._split_subscript(s)) + self.assertEqual( + r[1], + self.i._apply_subscript( + list(string.ascii_letters), + r[0][1] + ) + ) + + +class TestInventoryPlugins(unittest.TestCase): + + def test_empty_inventory(self): + inventory = self._get_inventory('') + + self.assertIn('all', inventory.groups) + self.assertIn('ungrouped', inventory.groups) + self.assertFalse(inventory.groups['all'].get_hosts()) + self.assertFalse(inventory.groups['ungrouped'].get_hosts()) + + def test_ini(self): + self._test_default_groups(""" + host1 + host2 + host3 + [servers] + host3 + host4 + host5 + """) + + def test_ini_explicit_ungrouped(self): + self._test_default_groups(""" + [ungrouped] + host1 + host2 + host3 + [servers] + host3 + host4 + host5 + """) + + def test_ini_variables_stringify(self): + values = ['string', 'no', 'No', 'false', 'FALSE', [], False, 0] + + inventory_content = "host1 " + inventory_content += ' '.join(['var%s=%s' % (i, to_text(x)) for i, x in enumerate(values)]) + inventory = self._get_inventory(inventory_content) + + variables = inventory.get_host('host1').vars + for i in range(len(values)): + if isinstance(values[i], string_types): + self.assertIsInstance(variables['var%s' % i], string_types) + else: + self.assertIsInstance(variables['var%s' % i], type(values[i])) + + @mock.patch('ansible.inventory.manager.unfrackpath', mock_unfrackpath_noop) + @mock.patch('os.path.exists', lambda x: True) + @mock.patch('os.access', lambda x, y: True) + def test_yaml_inventory(self, filename="test.yaml"): + inventory_content = {filename: textwrap.dedent("""\ + --- + all: + hosts: + test1: + test2: + """)} + C.INVENTORY_ENABLED = ['yaml'] + fake_loader = DictDataLoader(inventory_content) + im = InventoryManager(loader=fake_loader, sources=filename) + self.assertTrue(im._inventory.hosts) + self.assertIn('test1', im._inventory.hosts) + self.assertIn('test2', im._inventory.hosts) + self.assertIn(im._inventory.get_host('test1'), im._inventory.groups['all'].hosts) + self.assertIn(im._inventory.get_host('test2'), im._inventory.groups['all'].hosts) + self.assertEqual(len(im._inventory.groups['all'].hosts), 2) + self.assertIn(im._inventory.get_host('test1'), im._inventory.groups['ungrouped'].hosts) + self.assertIn(im._inventory.get_host('test2'), im._inventory.groups['ungrouped'].hosts) + self.assertEqual(len(im._inventory.groups['ungrouped'].hosts), 2) + + def _get_inventory(self, inventory_content): + + fake_loader = DictDataLoader({__file__: inventory_content}) + + return InventoryManager(loader=fake_loader, sources=[__file__]) + + def _test_default_groups(self, inventory_content): + inventory = self._get_inventory(inventory_content) + + self.assertIn('all', inventory.groups) + self.assertIn('ungrouped', inventory.groups) + all_hosts = set(host.name for host in inventory.groups['all'].get_hosts()) + self.assertEqual(set(['host1', 'host2', 'host3', 'host4', 'host5']), all_hosts) + ungrouped_hosts = set(host.name for host in inventory.groups['ungrouped'].get_hosts()) + self.assertEqual(set(['host1', 'host2']), ungrouped_hosts) + servers_hosts = set(host.name for host in inventory.groups['servers'].get_hosts()) + self.assertEqual(set(['host3', 'host4', 'host5']), servers_hosts) diff --git a/test/units/plugins/inventory/test_script.py b/test/units/plugins/inventory/test_script.py new file mode 100644 index 00000000..5f054813 --- /dev/null +++ b/test/units/plugins/inventory/test_script.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- + +# Copyright 2017 Chris Meyers <cmeyers@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.plugins.loader import PluginLoader +from units.compat import mock +from units.compat import unittest +from ansible.module_utils._text import to_bytes, to_native + + +class TestInventoryModule(unittest.TestCase): + + def setUp(self): + + class Inventory(): + cache = dict() + + class PopenResult(): + returncode = 0 + stdout = b"" + stderr = b"" + + def communicate(self): + return (self.stdout, self.stderr) + + self.popen_result = PopenResult() + self.inventory = Inventory() + self.loader = mock.MagicMock() + self.loader.load = mock.MagicMock() + + inv_loader = PluginLoader('InventoryModule', 'ansible.plugins.inventory', C.DEFAULT_INVENTORY_PLUGIN_PATH, 'inventory_plugins') + self.inventory_module = inv_loader.get('script') + self.inventory_module.set_options() + + def register_patch(name): + patcher = mock.patch(name) + self.addCleanup(patcher.stop) + return patcher.start() + + self.popen = register_patch('subprocess.Popen') + self.popen.return_value = self.popen_result + + self.BaseInventoryPlugin = register_patch('ansible.plugins.inventory.BaseInventoryPlugin') + self.BaseInventoryPlugin.get_cache_prefix.return_value = 'abc123' + + def test_parse_subprocess_path_not_found_fail(self): + self.popen.side_effect = OSError("dummy text") + + with pytest.raises(AnsibleError) as e: + self.inventory_module.parse(self.inventory, self.loader, '/foo/bar/foobar.py') + assert e.value.message == "problem running /foo/bar/foobar.py --list (dummy text)" + + def test_parse_subprocess_err_code_fail(self): + self.popen_result.stdout = to_bytes(u"fooébar", errors='surrogate_escape') + self.popen_result.stderr = to_bytes(u"dummyédata") + + self.popen_result.returncode = 1 + + with pytest.raises(AnsibleError) as e: + self.inventory_module.parse(self.inventory, self.loader, '/foo/bar/foobar.py') + assert e.value.message == to_native("Inventory script (/foo/bar/foobar.py) had an execution error: " + "dummyédata\n ") + + def test_parse_utf8_fail(self): + self.popen_result.returncode = 0 + self.popen_result.stderr = to_bytes("dummyédata") + self.loader.load.side_effect = TypeError('obj must be string') + + with pytest.raises(AnsibleError) as e: + self.inventory_module.parse(self.inventory, self.loader, '/foo/bar/foobar.py') + assert e.value.message == to_native("failed to parse executable inventory script results from " + "/foo/bar/foobar.py: obj must be string\ndummyédata\n") + + def test_parse_dict_fail(self): + self.popen_result.returncode = 0 + self.popen_result.stderr = to_bytes("dummyédata") + self.loader.load.return_value = 'i am not a dict' + + with pytest.raises(AnsibleError) as e: + self.inventory_module.parse(self.inventory, self.loader, '/foo/bar/foobar.py') + assert e.value.message == to_native("failed to parse executable inventory script results from " + "/foo/bar/foobar.py: needs to be a json dict\ndummyédata\n") |