diff options
Diffstat (limited to 'test/units/module_utils/facts/test_ansible_collector.py')
-rw-r--r-- | test/units/module_utils/facts/test_ansible_collector.py | 524 |
1 files changed, 524 insertions, 0 deletions
diff --git a/test/units/module_utils/facts/test_ansible_collector.py b/test/units/module_utils/facts/test_ansible_collector.py new file mode 100644 index 0000000..47d88df --- /dev/null +++ b/test/units/module_utils/facts/test_ansible_collector.py @@ -0,0 +1,524 @@ +# -*- coding: utf-8 -*- +# +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +# for testing +from units.compat import unittest +from units.compat.mock import Mock, patch + +from ansible.module_utils.facts import collector +from ansible.module_utils.facts import ansible_collector +from ansible.module_utils.facts import namespace + +from ansible.module_utils.facts.other.facter import FacterFactCollector +from ansible.module_utils.facts.other.ohai import OhaiFactCollector + +from ansible.module_utils.facts.system.apparmor import ApparmorFactCollector +from ansible.module_utils.facts.system.caps import SystemCapabilitiesFactCollector +from ansible.module_utils.facts.system.date_time import DateTimeFactCollector +from ansible.module_utils.facts.system.env import EnvFactCollector +from ansible.module_utils.facts.system.distribution import DistributionFactCollector +from ansible.module_utils.facts.system.dns import DnsFactCollector +from ansible.module_utils.facts.system.fips import FipsFactCollector +from ansible.module_utils.facts.system.local import LocalFactCollector +from ansible.module_utils.facts.system.lsb import LSBFactCollector +from ansible.module_utils.facts.system.pkg_mgr import PkgMgrFactCollector, OpenBSDPkgMgrFactCollector +from ansible.module_utils.facts.system.platform import PlatformFactCollector +from ansible.module_utils.facts.system.python import PythonFactCollector +from ansible.module_utils.facts.system.selinux import SelinuxFactCollector +from ansible.module_utils.facts.system.service_mgr import ServiceMgrFactCollector +from ansible.module_utils.facts.system.user import UserFactCollector + +# from ansible.module_utils.facts.hardware.base import HardwareCollector +from ansible.module_utils.facts.network.base import NetworkCollector +from ansible.module_utils.facts.virtual.base import VirtualCollector + +ALL_COLLECTOR_CLASSES = \ + [PlatformFactCollector, + DistributionFactCollector, + SelinuxFactCollector, + ApparmorFactCollector, + SystemCapabilitiesFactCollector, + FipsFactCollector, + PkgMgrFactCollector, + OpenBSDPkgMgrFactCollector, + ServiceMgrFactCollector, + LSBFactCollector, + DateTimeFactCollector, + UserFactCollector, + LocalFactCollector, + EnvFactCollector, + DnsFactCollector, + PythonFactCollector, + # FIXME: re-enable when hardware doesnt Hardware() doesnt munge self.facts + # HardwareCollector + NetworkCollector, + VirtualCollector, + OhaiFactCollector, + FacterFactCollector] + + +def mock_module(gather_subset=None, + filter=None): + if gather_subset is None: + gather_subset = ['all', '!facter', '!ohai'] + if filter is None: + filter = '*' + mock_module = Mock() + mock_module.params = {'gather_subset': gather_subset, + 'gather_timeout': 5, + 'filter': filter} + mock_module.get_bin_path = Mock(return_value=None) + return mock_module + + +def _collectors(module, + all_collector_classes=None, + minimal_gather_subset=None): + gather_subset = module.params.get('gather_subset') + if all_collector_classes is None: + all_collector_classes = ALL_COLLECTOR_CLASSES + if minimal_gather_subset is None: + minimal_gather_subset = frozenset([]) + + collector_classes = \ + collector.collector_classes_from_gather_subset(all_collector_classes=all_collector_classes, + minimal_gather_subset=minimal_gather_subset, + gather_subset=gather_subset) + + collectors = [] + for collector_class in collector_classes: + collector_obj = collector_class() + collectors.append(collector_obj) + + # Add a collector that knows what gather_subset we used so it it can provide a fact + collector_meta_data_collector = \ + ansible_collector.CollectorMetaDataCollector(gather_subset=gather_subset, + module_setup=True) + collectors.append(collector_meta_data_collector) + + return collectors + + +ns = namespace.PrefixFactNamespace('ansible_facts', 'ansible_') + + +# FIXME: this is brute force, but hopefully enough to get some refactoring to make facts testable +class TestInPlace(unittest.TestCase): + def _mock_module(self, gather_subset=None): + return mock_module(gather_subset=gather_subset) + + def _collectors(self, module, + all_collector_classes=None, + minimal_gather_subset=None): + return _collectors(module=module, + all_collector_classes=all_collector_classes, + minimal_gather_subset=minimal_gather_subset) + + def test(self): + gather_subset = ['all'] + mock_module = self._mock_module(gather_subset=gather_subset) + all_collector_classes = [EnvFactCollector] + collectors = self._collectors(mock_module, + all_collector_classes=all_collector_classes) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=collectors, + namespace=ns) + + res = fact_collector.collect(module=mock_module) + self.assertIsInstance(res, dict) + self.assertIn('env', res) + self.assertIn('gather_subset', res) + self.assertEqual(res['gather_subset'], ['all']) + + def test1(self): + gather_subset = ['all'] + mock_module = self._mock_module(gather_subset=gather_subset) + collectors = self._collectors(mock_module) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=collectors, + namespace=ns) + + res = fact_collector.collect(module=mock_module) + self.assertIsInstance(res, dict) + # just assert it's not almost empty + # with run_command and get_file_content mock, many facts are empty, like network + self.assertGreater(len(res), 20) + + def test_empty_all_collector_classes(self): + mock_module = self._mock_module() + all_collector_classes = [] + + collectors = self._collectors(mock_module, + all_collector_classes=all_collector_classes) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=collectors, + namespace=ns) + + res = fact_collector.collect() + self.assertIsInstance(res, dict) + # just assert it's not almost empty + self.assertLess(len(res), 3) + +# def test_facts_class(self): +# mock_module = self._mock_module() +# Facts(mock_module) + +# def test_facts_class_load_on_init_false(self): +# mock_module = self._mock_module() +# Facts(mock_module, load_on_init=False) +# # FIXME: assert something + + +class TestCollectedFacts(unittest.TestCase): + gather_subset = ['all', '!facter', '!ohai'] + min_fact_count = 30 + max_fact_count = 1000 + + # TODO: add ansible_cmdline, ansible_*_pubkey* back when TempFactCollector goes away + expected_facts = ['date_time', + 'user_id', 'distribution', + 'gather_subset', 'module_setup', + 'env'] + not_expected_facts = ['facter', 'ohai'] + + collected_facts = {} + + def _mock_module(self, gather_subset=None): + return mock_module(gather_subset=self.gather_subset) + + @patch('platform.system', return_value='Linux') + @patch('ansible.module_utils.facts.system.service_mgr.get_file_content', return_value='systemd') + def setUp(self, mock_gfc, mock_ps): + mock_module = self._mock_module() + collectors = self._collectors(mock_module) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=collectors, + namespace=ns) + self.facts = fact_collector.collect(module=mock_module, + collected_facts=self.collected_facts) + + def _collectors(self, module, + all_collector_classes=None, + minimal_gather_subset=None): + return _collectors(module=module, + all_collector_classes=all_collector_classes, + minimal_gather_subset=minimal_gather_subset) + + def test_basics(self): + self._assert_basics(self.facts) + + def test_expected_facts(self): + self._assert_expected_facts(self.facts) + + def test_not_expected_facts(self): + self._assert_not_expected_facts(self.facts) + + def _assert_basics(self, facts): + self.assertIsInstance(facts, dict) + # just assert it's not almost empty + self.assertGreaterEqual(len(facts), self.min_fact_count) + # and that is not huge number of keys + self.assertLess(len(facts), self.max_fact_count) + + # everything starts with ansible_ namespace + def _assert_ansible_namespace(self, facts): + + # FIXME: kluge for non-namespace fact + facts.pop('module_setup', None) + facts.pop('gather_subset', None) + + for fact_key in facts: + self.assertTrue(fact_key.startswith('ansible_'), + 'The fact name "%s" does not startwith "ansible_"' % fact_key) + + def _assert_expected_facts(self, facts): + + facts_keys = sorted(facts.keys()) + for expected_fact in self.expected_facts: + self.assertIn(expected_fact, facts_keys) + + def _assert_not_expected_facts(self, facts): + + facts_keys = sorted(facts.keys()) + for not_expected_fact in self.not_expected_facts: + self.assertNotIn(not_expected_fact, facts_keys) + + +class ProvidesOtherFactCollector(collector.BaseFactCollector): + name = 'provides_something' + _fact_ids = set(['needed_fact']) + + def collect(self, module=None, collected_facts=None): + return {'needed_fact': 'THE_NEEDED_FACT_VALUE'} + + +class RequiresOtherFactCollector(collector.BaseFactCollector): + name = 'requires_something' + + def collect(self, module=None, collected_facts=None): + collected_facts = collected_facts or {} + fact_dict = {} + fact_dict['needed_fact'] = collected_facts['needed_fact'] + fact_dict['compound_fact'] = "compound-%s" % collected_facts['needed_fact'] + return fact_dict + + +class ConCatFactCollector(collector.BaseFactCollector): + name = 'concat_collected' + + def collect(self, module=None, collected_facts=None): + collected_facts = collected_facts or {} + fact_dict = {} + con_cat_list = [] + for key, value in collected_facts.items(): + con_cat_list.append(value) + + fact_dict['concat_fact'] = '-'.join(con_cat_list) + return fact_dict + + +class TestCollectorDepsWithFilter(unittest.TestCase): + gather_subset = ['all', '!facter', '!ohai'] + + def _mock_module(self, gather_subset=None, filter=None): + return mock_module(gather_subset=self.gather_subset, + filter=filter) + + def setUp(self): + self.mock_module = self._mock_module() + self.collectors = self._collectors(mock_module) + + def _collectors(self, module, + all_collector_classes=None, + minimal_gather_subset=None): + return [ProvidesOtherFactCollector(), + RequiresOtherFactCollector()] + + def test_no_filter(self): + _mock_module = mock_module(gather_subset=['all', '!facter', '!ohai']) + + facts_dict = self._collect(_mock_module) + + expected = {'needed_fact': 'THE_NEEDED_FACT_VALUE', + 'compound_fact': 'compound-THE_NEEDED_FACT_VALUE'} + + self.assertEqual(expected, facts_dict) + + def test_with_filter_on_compound_fact(self): + _mock_module = mock_module(gather_subset=['all', '!facter', '!ohai'], + filter='compound_fact') + + facts_dict = self._collect(_mock_module) + + expected = {'compound_fact': 'compound-THE_NEEDED_FACT_VALUE'} + + self.assertEqual(expected, facts_dict) + + def test_with_filter_on_needed_fact(self): + _mock_module = mock_module(gather_subset=['all', '!facter', '!ohai'], + filter='needed_fact') + + facts_dict = self._collect(_mock_module) + + expected = {'needed_fact': 'THE_NEEDED_FACT_VALUE'} + + self.assertEqual(expected, facts_dict) + + def test_with_filter_on_compound_gather_compound(self): + _mock_module = mock_module(gather_subset=['!all', '!any', 'compound_fact'], + filter='compound_fact') + + facts_dict = self._collect(_mock_module) + + expected = {'compound_fact': 'compound-THE_NEEDED_FACT_VALUE'} + + self.assertEqual(expected, facts_dict) + + def test_with_filter_no_match(self): + _mock_module = mock_module(gather_subset=['all', '!facter', '!ohai'], + filter='ansible_this_doesnt_exist') + + facts_dict = self._collect(_mock_module) + + expected = {} + self.assertEqual(expected, facts_dict) + + def test_concat_collector(self): + _mock_module = mock_module(gather_subset=['all', '!facter', '!ohai']) + + _collectors = self._collectors(_mock_module) + _collectors.append(ConCatFactCollector()) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=_collectors, + namespace=ns, + filter_spec=_mock_module.params['filter']) + + collected_facts = {} + facts_dict = fact_collector.collect(module=_mock_module, + collected_facts=collected_facts) + self.assertIn('concat_fact', facts_dict) + self.assertTrue('THE_NEEDED_FACT_VALUE' in facts_dict['concat_fact']) + + def test_concat_collector_with_filter_on_concat(self): + _mock_module = mock_module(gather_subset=['all', '!facter', '!ohai'], + filter='concat_fact') + + _collectors = self._collectors(_mock_module) + _collectors.append(ConCatFactCollector()) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=_collectors, + namespace=ns, + filter_spec=_mock_module.params['filter']) + + collected_facts = {} + facts_dict = fact_collector.collect(module=_mock_module, + collected_facts=collected_facts) + self.assertIn('concat_fact', facts_dict) + self.assertTrue('THE_NEEDED_FACT_VALUE' in facts_dict['concat_fact']) + self.assertTrue('compound' in facts_dict['concat_fact']) + + def _collect(self, _mock_module, collected_facts=None): + _collectors = self._collectors(_mock_module) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=_collectors, + namespace=ns, + filter_spec=_mock_module.params['filter']) + facts_dict = fact_collector.collect(module=_mock_module, + collected_facts=collected_facts) + return facts_dict + + +class ExceptionThrowingCollector(collector.BaseFactCollector): + def collect(self, module=None, collected_facts=None): + raise Exception('A collector failed') + + +class TestExceptionCollectedFacts(TestCollectedFacts): + + def _collectors(self, module, + all_collector_classes=None, + minimal_gather_subset=None): + collectors = _collectors(module=module, + all_collector_classes=all_collector_classes, + minimal_gather_subset=minimal_gather_subset) + + c = [ExceptionThrowingCollector()] + collectors + return c + + +class TestOnlyExceptionCollector(TestCollectedFacts): + expected_facts = [] + min_fact_count = 0 + + def _collectors(self, module, + all_collector_classes=None, + minimal_gather_subset=None): + return [ExceptionThrowingCollector()] + + +class TestMinimalCollectedFacts(TestCollectedFacts): + gather_subset = ['!all'] + min_fact_count = 1 + max_fact_count = 10 + expected_facts = ['gather_subset', + 'module_setup'] + not_expected_facts = ['lsb'] + + +class TestFacterCollectedFacts(TestCollectedFacts): + gather_subset = ['!all', 'facter'] + min_fact_count = 1 + max_fact_count = 10 + expected_facts = ['gather_subset', + 'module_setup'] + not_expected_facts = ['lsb'] + + +class TestOhaiCollectedFacts(TestCollectedFacts): + gather_subset = ['!all', 'ohai'] + min_fact_count = 1 + max_fact_count = 10 + expected_facts = ['gather_subset', + 'module_setup'] + not_expected_facts = ['lsb'] + + +class TestPkgMgrFacts(TestCollectedFacts): + gather_subset = ['pkg_mgr'] + min_fact_count = 1 + max_fact_count = 20 + expected_facts = ['gather_subset', + 'module_setup', + 'pkg_mgr'] + collected_facts = { + "ansible_distribution": "Fedora", + "ansible_distribution_major_version": "28", + "ansible_os_family": "RedHat" + } + + +class TestPkgMgrOSTreeFacts(TestPkgMgrFacts): + @patch( + 'ansible.module_utils.facts.system.pkg_mgr.os.path.exists', + side_effect=lambda x: x == '/run/ostree-booted') + def _recollect_facts(self, distribution, version, mock_exists): + self.collected_facts['ansible_distribution'] = distribution + self.collected_facts['ansible_distribution_major_version'] = \ + str(version) + # Recollect facts + self.setUp() + self.assertIn('pkg_mgr', self.facts) + self.assertEqual(self.facts['pkg_mgr'], 'atomic_container') + + def test_is_rhel_edge_ostree(self): + self._recollect_facts('RedHat', 8) + + def test_is_fedora_ostree(self): + self._recollect_facts('Fedora', 33) + + +class TestOpenBSDPkgMgrFacts(TestPkgMgrFacts): + def test_is_openbsd_pkg(self): + self.assertIn('pkg_mgr', self.facts) + self.assertEqual(self.facts['pkg_mgr'], 'openbsd_pkg') + + def setUp(self): + self.patcher = patch('platform.system') + mock_platform = self.patcher.start() + mock_platform.return_value = 'OpenBSD' + + mock_module = self._mock_module() + collectors = self._collectors(mock_module) + + fact_collector = \ + ansible_collector.AnsibleFactCollector(collectors=collectors, + namespace=ns) + self.facts = fact_collector.collect(module=mock_module) + + def tearDown(self): + self.patcher.stop() |