diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /test/units/plugins/action | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream/2.14.3.tar.xz ansible-core-upstream/2.14.3.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/units/plugins/action')
-rw-r--r-- | test/units/plugins/action/__init__.py | 0 | ||||
-rw-r--r-- | test/units/plugins/action/test_action.py | 912 | ||||
-rw-r--r-- | test/units/plugins/action/test_gather_facts.py | 98 | ||||
-rw-r--r-- | test/units/plugins/action/test_pause.py | 89 | ||||
-rw-r--r-- | test/units/plugins/action/test_raw.py | 105 |
5 files changed, 1204 insertions, 0 deletions
diff --git a/test/units/plugins/action/__init__.py b/test/units/plugins/action/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/units/plugins/action/__init__.py diff --git a/test/units/plugins/action/test_action.py b/test/units/plugins/action/test_action.py new file mode 100644 index 0000000..f2bbe19 --- /dev/null +++ b/test/units/plugins/action/test_action.py @@ -0,0 +1,912 @@ +# -*- coding: utf-8 -*- +# (c) 2015, Florian Apolloner <florian@apolloner.eu> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import re + +from ansible import constants as C +from units.compat import unittest +from unittest.mock import patch, MagicMock, mock_open + +from ansible.errors import AnsibleError, AnsibleAuthenticationFailure +from ansible.module_utils.six import text_type +from ansible.module_utils.six.moves import shlex_quote, builtins +from ansible.module_utils._text import to_bytes +from ansible.playbook.play_context import PlayContext +from ansible.plugins.action import ActionBase +from ansible.template import Templar +from ansible.vars.clean import clean_facts + +from units.mock.loader import DictDataLoader + + +python_module_replacers = br""" +#!/usr/bin/python + +#ANSIBLE_VERSION = "<<ANSIBLE_VERSION>>" +#MODULE_COMPLEX_ARGS = "<<INCLUDE_ANSIBLE_MODULE_COMPLEX_ARGS>>" +#SELINUX_SPECIAL_FS="<<SELINUX_SPECIAL_FILESYSTEMS>>" + +test = u'Toshio \u304f\u3089\u3068\u307f' +from ansible.module_utils.basic import * +""" + +powershell_module_replacers = b""" +WINDOWS_ARGS = "<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>" +# POWERSHELL_COMMON +""" + + +def _action_base(): + fake_loader = DictDataLoader({ + }) + mock_module_loader = MagicMock() + mock_shared_loader_obj = MagicMock() + mock_shared_loader_obj.module_loader = mock_module_loader + mock_connection_loader = MagicMock() + + mock_shared_loader_obj.connection_loader = mock_connection_loader + mock_connection = MagicMock() + + play_context = MagicMock() + + action_base = DerivedActionBase(task=None, + connection=mock_connection, + play_context=play_context, + loader=fake_loader, + templar=None, + shared_loader_obj=mock_shared_loader_obj) + return action_base + + +class DerivedActionBase(ActionBase): + TRANSFERS_FILES = False + + def run(self, tmp=None, task_vars=None): + # We're not testing the plugin run() method, just the helper + # methods ActionBase defines + return super(DerivedActionBase, self).run(tmp=tmp, task_vars=task_vars) + + +class TestActionBase(unittest.TestCase): + + def test_action_base_run(self): + mock_task = MagicMock() + mock_task.action = "foo" + mock_task.args = dict(a=1, b=2, c=3) + + mock_connection = MagicMock() + + play_context = PlayContext() + + mock_task.async_val = None + action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None) + results = action_base.run() + self.assertEqual(results, dict()) + + mock_task.async_val = 0 + action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None) + results = action_base.run() + self.assertEqual(results, {}) + + def test_action_base__configure_module(self): + fake_loader = DictDataLoader({ + }) + + # create our fake task + mock_task = MagicMock() + mock_task.action = "copy" + mock_task.async_val = 0 + mock_task.delegate_to = None + + # create a mock connection, so we don't actually try and connect to things + mock_connection = MagicMock() + + # create a mock shared loader object + def mock_find_plugin_with_context(name, options, collection_list=None): + mockctx = MagicMock() + if name == 'badmodule': + mockctx.resolved = False + mockctx.plugin_resolved_path = None + elif '.ps1' in options: + mockctx.resolved = True + mockctx.plugin_resolved_path = '/fake/path/to/%s.ps1' % name + else: + mockctx.resolved = True + mockctx.plugin_resolved_path = '/fake/path/to/%s' % name + return mockctx + + mock_module_loader = MagicMock() + mock_module_loader.find_plugin_with_context.side_effect = mock_find_plugin_with_context + mock_shared_obj_loader = MagicMock() + mock_shared_obj_loader.module_loader = mock_module_loader + + # we're using a real play context here + play_context = PlayContext() + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=fake_loader, + templar=Templar(loader=fake_loader), + shared_loader_obj=mock_shared_obj_loader, + ) + + # test python module formatting + with patch.object(builtins, 'open', mock_open(read_data=to_bytes(python_module_replacers.strip(), encoding='utf-8'))): + with patch.object(os, 'rename'): + mock_task.args = dict(a=1, foo='fö〩') + mock_connection.module_implementation_preferences = ('',) + (style, shebang, data, path) = action_base._configure_module(mock_task.action, mock_task.args, + task_vars=dict(ansible_python_interpreter='/usr/bin/python', + ansible_playbook_python='/usr/bin/python')) + self.assertEqual(style, "new") + self.assertEqual(shebang, u"#!/usr/bin/python") + + # test module not found + self.assertRaises(AnsibleError, action_base._configure_module, 'badmodule', mock_task.args, {}) + + # test powershell module formatting + with patch.object(builtins, 'open', mock_open(read_data=to_bytes(powershell_module_replacers.strip(), encoding='utf-8'))): + mock_task.action = 'win_copy' + mock_task.args = dict(b=2) + mock_connection.module_implementation_preferences = ('.ps1',) + (style, shebang, data, path) = action_base._configure_module('stat', mock_task.args, {}) + self.assertEqual(style, "new") + self.assertEqual(shebang, u'#!powershell') + + # test module not found + self.assertRaises(AnsibleError, action_base._configure_module, 'badmodule', mock_task.args, {}) + + def test_action_base__compute_environment_string(self): + fake_loader = DictDataLoader({ + }) + + # create our fake task + mock_task = MagicMock() + mock_task.action = "copy" + mock_task.args = dict(a=1) + + # create a mock connection, so we don't actually try and connect to things + def env_prefix(**args): + return ' '.join(['%s=%s' % (k, shlex_quote(text_type(v))) for k, v in args.items()]) + mock_connection = MagicMock() + mock_connection._shell.env_prefix.side_effect = env_prefix + + # we're using a real play context here + play_context = PlayContext() + + # and we're using a real templar here too + templar = Templar(loader=fake_loader) + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=fake_loader, + templar=templar, + shared_loader_obj=None, + ) + + # test standard environment setup + mock_task.environment = [dict(FOO='foo'), None] + env_string = action_base._compute_environment_string() + self.assertEqual(env_string, "FOO=foo") + + # test where environment is not a list + mock_task.environment = dict(FOO='foo') + env_string = action_base._compute_environment_string() + self.assertEqual(env_string, "FOO=foo") + + # test environment with a variable in it + templar.available_variables = dict(the_var='bar') + mock_task.environment = [dict(FOO='{{the_var}}')] + env_string = action_base._compute_environment_string() + self.assertEqual(env_string, "FOO=bar") + + # test with a bad environment set + mock_task.environment = dict(FOO='foo') + mock_task.environment = ['hi there'] + self.assertRaises(AnsibleError, action_base._compute_environment_string) + + def test_action_base__early_needs_tmp_path(self): + # create our fake task + mock_task = MagicMock() + + # create a mock connection, so we don't actually try and connect to things + mock_connection = MagicMock() + + # we're using a real play context here + play_context = PlayContext() + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + + self.assertFalse(action_base._early_needs_tmp_path()) + + action_base.TRANSFERS_FILES = True + self.assertTrue(action_base._early_needs_tmp_path()) + + def test_action_base__make_tmp_path(self): + # create our fake task + mock_task = MagicMock() + + def get_shell_opt(opt): + + ret = None + if opt == 'admin_users': + ret = ['root', 'toor', 'Administrator'] + elif opt == 'remote_tmp': + ret = '~/.ansible/tmp' + + return ret + + # create a mock connection, so we don't actually try and connect to things + mock_connection = MagicMock() + mock_connection.transport = 'ssh' + mock_connection._shell.mkdtemp.return_value = 'mkdir command' + mock_connection._shell.join_path.side_effect = os.path.join + mock_connection._shell.get_option = get_shell_opt + mock_connection._shell.HOMES_RE = re.compile(r'(\'|\")?(~|\$HOME)(.*)') + + # we're using a real play context here + play_context = PlayContext() + play_context.become = True + play_context.become_user = 'foo' + + mock_task.become = True + mock_task.become_user = True + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + + action_base._low_level_execute_command = MagicMock() + action_base._low_level_execute_command.return_value = dict(rc=0, stdout='/some/path') + self.assertEqual(action_base._make_tmp_path('root'), '/some/path/') + + # empty path fails + action_base._low_level_execute_command.return_value = dict(rc=0, stdout='') + self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root') + + # authentication failure + action_base._low_level_execute_command.return_value = dict(rc=5, stdout='') + self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root') + + # ssh error + action_base._low_level_execute_command.return_value = dict(rc=255, stdout='', stderr='') + self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root') + self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root') + + # general error + action_base._low_level_execute_command.return_value = dict(rc=1, stdout='some stuff here', stderr='') + self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root') + action_base._low_level_execute_command.return_value = dict(rc=1, stdout='some stuff here', stderr='No space left on device') + self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root') + + def test_action_base__fixup_perms2(self): + mock_task = MagicMock() + mock_connection = MagicMock() + play_context = PlayContext() + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + action_base._low_level_execute_command = MagicMock() + remote_paths = ['/tmp/foo/bar.txt', '/tmp/baz.txt'] + remote_user = 'remoteuser1' + + # Used for skipping down to common group dir. + CHMOD_ACL_FLAGS = ('+a', 'A+user:remoteuser2:r:allow') + + def runWithNoExpectation(execute=False): + return action_base._fixup_perms2( + remote_paths, + remote_user=remote_user, + execute=execute) + + def assertSuccess(execute=False): + self.assertEqual(runWithNoExpectation(execute), remote_paths) + + def assertThrowRegex(regex, execute=False): + self.assertRaisesRegex( + AnsibleError, + regex, + action_base._fixup_perms2, + remote_paths, + remote_user=remote_user, + execute=execute) + + def get_shell_option_for_arg(args_kv, default): + '''A helper for get_shell_option. Returns a function that, if + called with ``option`` that exists in args_kv, will return the + value, else will return ``default`` for every other given arg''' + def _helper(option, *args, **kwargs): + return args_kv.get(option, default) + return _helper + + action_base.get_become_option = MagicMock() + action_base.get_become_option.return_value = 'remoteuser2' + + # Step 1: On windows, we just return remote_paths + action_base._connection._shell._IS_WINDOWS = True + assertSuccess(execute=False) + assertSuccess(execute=True) + + # But if we're not on windows....we have more work to do. + action_base._connection._shell._IS_WINDOWS = False + + # Step 2: We're /not/ becoming an unprivileged user + action_base._remote_chmod = MagicMock() + action_base._is_become_unprivileged = MagicMock() + action_base._is_become_unprivileged.return_value = False + # Two subcases: + # - _remote_chmod rc is 0 + # - _remote-chmod rc is not 0, something failed + action_base._remote_chmod.return_value = { + 'rc': 0, + 'stdout': 'some stuff here', + 'stderr': '', + } + assertSuccess(execute=True) + + # When execute=False, we just get the list back. But add it here for + # completion. chmod is never called. + assertSuccess() + + action_base._remote_chmod.return_value = { + 'rc': 1, + 'stdout': 'some stuff here', + 'stderr': 'and here', + } + assertThrowRegex( + 'Failed to set execute bit on remote files', + execute=True) + + # Step 3: we are becoming unprivileged + action_base._is_become_unprivileged.return_value = True + + # Step 3a: setfacl + action_base._remote_set_user_facl = MagicMock() + action_base._remote_set_user_facl.return_value = { + 'rc': 0, + 'stdout': '', + 'stderr': '', + } + assertSuccess() + + # Step 3b: chmod +x if we need to + # To get here, setfacl failed, so mock it as such. + action_base._remote_set_user_facl.return_value = { + 'rc': 1, + 'stdout': '', + 'stderr': '', + } + action_base._remote_chmod.return_value = { + 'rc': 1, + 'stdout': 'some stuff here', + 'stderr': '', + } + assertThrowRegex( + 'Failed to set file mode or acl on remote temporary files', + execute=True) + action_base._remote_chmod.return_value = { + 'rc': 0, + 'stdout': 'some stuff here', + 'stderr': '', + } + assertSuccess(execute=True) + + # Step 3c: chown + action_base._remote_chown = MagicMock() + action_base._remote_chown.return_value = { + 'rc': 0, + 'stdout': '', + 'stderr': '', + } + assertSuccess() + action_base._remote_chown.return_value = { + 'rc': 1, + 'stdout': '', + 'stderr': '', + } + remote_user = 'root' + action_base._get_admin_users = MagicMock() + action_base._get_admin_users.return_value = ['root'] + assertThrowRegex('user would be unable to read the file.') + remote_user = 'remoteuser1' + + # Step 3d: chmod +a on osx + assertSuccess() + action_base._remote_chmod.assert_called_with( + ['remoteuser2 allow read'] + remote_paths, + '+a') + + # This case can cause Solaris chmod to return 5 which the ssh plugin + # treats as failure. To prevent a regression and ensure we still try the + # rest of the cases below, we mock the thrown exception here. + # This function ensures that only the macOS case (+a) throws this. + def raise_if_plus_a(definitely_not_underscore, mode): + if mode == '+a': + raise AnsibleAuthenticationFailure() + return {'rc': 0, 'stdout': '', 'stderr': ''} + + action_base._remote_chmod.side_effect = raise_if_plus_a + assertSuccess() + + # Step 3e: chmod A+ on Solaris + # We threw AnsibleAuthenticationFailure above, try Solaris fallback. + # Based on our lambda above, it should be successful. + action_base._remote_chmod.assert_called_with( + remote_paths, + 'A+user:remoteuser2:r:allow') + assertSuccess() + + # Step 3f: Common group + def rc_1_if_chmod_acl(definitely_not_underscore, mode): + rc = 0 + if mode in CHMOD_ACL_FLAGS: + rc = 1 + return {'rc': rc, 'stdout': '', 'stderr': ''} + + action_base._remote_chmod = MagicMock() + action_base._remote_chmod.side_effect = rc_1_if_chmod_acl + + get_shell_option = action_base.get_shell_option + action_base.get_shell_option = MagicMock() + action_base.get_shell_option.side_effect = get_shell_option_for_arg( + { + 'common_remote_group': 'commongroup', + }, + None) + action_base._remote_chgrp = MagicMock() + action_base._remote_chgrp.return_value = { + 'rc': 0, + 'stdout': '', + 'stderr': '', + } + # TODO: Add test to assert warning is shown if + # world_readable_temp is set in this case. + assertSuccess() + action_base._remote_chgrp.assert_called_once_with( + remote_paths, + 'commongroup') + + # Step 4: world-readable tmpdir + action_base.get_shell_option.side_effect = get_shell_option_for_arg( + { + 'world_readable_temp': True, + 'common_remote_group': None, + }, + None) + action_base._remote_chmod.return_value = { + 'rc': 0, + 'stdout': 'some stuff here', + 'stderr': '', + } + assertSuccess() + action_base._remote_chmod = MagicMock() + action_base._remote_chmod.return_value = { + 'rc': 1, + 'stdout': 'some stuff here', + 'stderr': '', + } + assertThrowRegex('Failed to set file mode on remote files') + + # Otherwise if we make it here in this state, we hit the catch-all + action_base.get_shell_option.side_effect = get_shell_option_for_arg( + {}, + None) + assertThrowRegex('on the temporary files Ansible needs to create') + + def test_action_base__remove_tmp_path(self): + # create our fake task + mock_task = MagicMock() + + # create a mock connection, so we don't actually try and connect to things + mock_connection = MagicMock() + mock_connection._shell.remove.return_value = 'rm some stuff' + + # we're using a real play context here + play_context = PlayContext() + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + + action_base._low_level_execute_command = MagicMock() + # these don't really return anything or raise errors, so + # we're pretty much calling these for coverage right now + action_base._remove_tmp_path('/bad/path/dont/remove') + action_base._remove_tmp_path('/good/path/to/ansible-tmp-thing') + + @patch('os.unlink') + @patch('os.fdopen') + @patch('tempfile.mkstemp') + def test_action_base__transfer_data(self, mock_mkstemp, mock_fdopen, mock_unlink): + # create our fake task + mock_task = MagicMock() + + # create a mock connection, so we don't actually try and connect to things + mock_connection = MagicMock() + mock_connection.put_file.return_value = None + + # we're using a real play context here + play_context = PlayContext() + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + + mock_afd = MagicMock() + mock_afile = MagicMock() + mock_mkstemp.return_value = (mock_afd, mock_afile) + + mock_unlink.return_value = None + + mock_afo = MagicMock() + mock_afo.write.return_value = None + mock_afo.flush.return_value = None + mock_afo.close.return_value = None + mock_fdopen.return_value = mock_afo + + self.assertEqual(action_base._transfer_data('/path/to/remote/file', 'some data'), '/path/to/remote/file') + self.assertEqual(action_base._transfer_data('/path/to/remote/file', 'some mixed data: fö〩'), '/path/to/remote/file') + self.assertEqual(action_base._transfer_data('/path/to/remote/file', dict(some_key='some value')), '/path/to/remote/file') + self.assertEqual(action_base._transfer_data('/path/to/remote/file', dict(some_key='fö〩')), '/path/to/remote/file') + + mock_afo.write.side_effect = Exception() + self.assertRaises(AnsibleError, action_base._transfer_data, '/path/to/remote/file', '') + + def test_action_base__execute_remote_stat(self): + # create our fake task + mock_task = MagicMock() + + # create a mock connection, so we don't actually try and connect to things + mock_connection = MagicMock() + + # we're using a real play context here + play_context = PlayContext() + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + + action_base._execute_module = MagicMock() + + # test normal case + action_base._execute_module.return_value = dict(stat=dict(checksum='1111111111111111111111111111111111', exists=True)) + res = action_base._execute_remote_stat(path='/path/to/file', all_vars=dict(), follow=False) + self.assertEqual(res['checksum'], '1111111111111111111111111111111111') + + # test does not exist + action_base._execute_module.return_value = dict(stat=dict(exists=False)) + res = action_base._execute_remote_stat(path='/path/to/file', all_vars=dict(), follow=False) + self.assertFalse(res['exists']) + self.assertEqual(res['checksum'], '1') + + # test no checksum in result from _execute_module + action_base._execute_module.return_value = dict(stat=dict(exists=True)) + res = action_base._execute_remote_stat(path='/path/to/file', all_vars=dict(), follow=False) + self.assertTrue(res['exists']) + self.assertEqual(res['checksum'], '') + + # test stat call failed + action_base._execute_module.return_value = dict(failed=True, msg="because I said so") + self.assertRaises(AnsibleError, action_base._execute_remote_stat, path='/path/to/file', all_vars=dict(), follow=False) + + def test_action_base__execute_module(self): + # create our fake task + mock_task = MagicMock() + mock_task.action = 'copy' + mock_task.args = dict(a=1, b=2, c=3) + mock_task.diff = False + mock_task.check_mode = False + mock_task.no_log = False + + # create a mock connection, so we don't actually try and connect to things + def build_module_command(env_string, shebang, cmd, arg_path=None): + to_run = [env_string, cmd] + if arg_path: + to_run.append(arg_path) + return " ".join(to_run) + + def get_option(option): + return {'admin_users': ['root', 'toor']}.get(option) + + mock_connection = MagicMock() + mock_connection.build_module_command.side_effect = build_module_command + mock_connection.socket_path = None + mock_connection._shell.get_remote_filename.return_value = 'copy.py' + mock_connection._shell.join_path.side_effect = os.path.join + mock_connection._shell.tmpdir = '/var/tmp/mytempdir' + mock_connection._shell.get_option = get_option + + # we're using a real play context here + play_context = PlayContext() + + # our test class + action_base = DerivedActionBase( + task=mock_task, + connection=mock_connection, + play_context=play_context, + loader=None, + templar=None, + shared_loader_obj=None, + ) + + # fake a lot of methods as we test those elsewhere + action_base._configure_module = MagicMock() + action_base._supports_check_mode = MagicMock() + action_base._is_pipelining_enabled = MagicMock() + action_base._make_tmp_path = MagicMock() + action_base._transfer_data = MagicMock() + action_base._compute_environment_string = MagicMock() + action_base._low_level_execute_command = MagicMock() + action_base._fixup_perms2 = MagicMock() + + action_base._configure_module.return_value = ('new', '#!/usr/bin/python', 'this is the module data', 'path') + action_base._is_pipelining_enabled.return_value = False + action_base._compute_environment_string.return_value = '' + action_base._connection.has_pipelining = False + action_base._make_tmp_path.return_value = '/the/tmp/path' + action_base._low_level_execute_command.return_value = dict(stdout='{"rc": 0, "stdout": "ok"}') + self.assertEqual(action_base._execute_module(module_name=None, module_args=None), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok'])) + self.assertEqual( + action_base._execute_module( + module_name='foo', + module_args=dict(z=9, y=8, x=7), + task_vars=dict(a=1) + ), + dict( + _ansible_parsed=True, + rc=0, + stdout="ok", + stdout_lines=['ok'], + ) + ) + + # test with needing/removing a remote tmp path + action_base._configure_module.return_value = ('old', '#!/usr/bin/python', 'this is the module data', 'path') + action_base._is_pipelining_enabled.return_value = False + action_base._make_tmp_path.return_value = '/the/tmp/path' + self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok'])) + + action_base._configure_module.return_value = ('non_native_want_json', '#!/usr/bin/python', 'this is the module data', 'path') + self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok'])) + + play_context.become = True + play_context.become_user = 'foo' + mock_task.become = True + mock_task.become_user = True + self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok'])) + + # test an invalid shebang return + action_base._configure_module.return_value = ('new', '', 'this is the module data', 'path') + action_base._is_pipelining_enabled.return_value = False + action_base._make_tmp_path.return_value = '/the/tmp/path' + self.assertRaises(AnsibleError, action_base._execute_module) + + # test with check mode enabled, once with support for check + # mode and once with support disabled to raise an error + play_context.check_mode = True + mock_task.check_mode = True + action_base._configure_module.return_value = ('new', '#!/usr/bin/python', 'this is the module data', 'path') + self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok'])) + action_base._supports_check_mode = False + self.assertRaises(AnsibleError, action_base._execute_module) + + def test_action_base_sudo_only_if_user_differs(self): + fake_loader = MagicMock() + fake_loader.get_basedir.return_value = os.getcwd() + play_context = PlayContext() + + action_base = DerivedActionBase(None, None, play_context, fake_loader, None, None) + action_base.get_become_option = MagicMock(return_value='root') + action_base._get_remote_user = MagicMock(return_value='root') + + action_base._connection = MagicMock(exec_command=MagicMock(return_value=(0, '', ''))) + + action_base._connection._shell = shell = MagicMock(append_command=MagicMock(return_value=('JOINED CMD'))) + + action_base._connection.become = become = MagicMock() + become.build_become_command.return_value = 'foo' + + action_base._low_level_execute_command('ECHO', sudoable=True) + become.build_become_command.assert_not_called() + + action_base._get_remote_user.return_value = 'apo' + action_base._low_level_execute_command('ECHO', sudoable=True, executable='/bin/csh') + become.build_become_command.assert_called_once_with("ECHO", shell) + + become.build_become_command.reset_mock() + + with patch.object(C, 'BECOME_ALLOW_SAME_USER', new=True): + action_base._get_remote_user.return_value = 'root' + action_base._low_level_execute_command('ECHO SAME', sudoable=True) + become.build_become_command.assert_called_once_with("ECHO SAME", shell) + + def test__remote_expand_user_relative_pathing(self): + action_base = _action_base() + action_base._play_context.remote_addr = 'bar' + action_base._connection.get_option.return_value = 'bar' + action_base._low_level_execute_command = MagicMock(return_value={'stdout': b'../home/user'}) + action_base._connection._shell.join_path.return_value = '../home/user/foo' + with self.assertRaises(AnsibleError) as cm: + action_base._remote_expand_user('~/foo') + self.assertEqual( + cm.exception.message, + "'bar' returned an invalid relative home directory path containing '..'" + ) + + +class TestActionBaseCleanReturnedData(unittest.TestCase): + def test(self): + + fake_loader = DictDataLoader({ + }) + mock_module_loader = MagicMock() + mock_shared_loader_obj = MagicMock() + mock_shared_loader_obj.module_loader = mock_module_loader + connection_loader_paths = ['/tmp/asdfadf', '/usr/lib64/whatever', + 'dfadfasf', + 'foo.py', + '.*', + # FIXME: a path with parans breaks the regex + # '(.*)', + '/path/to/ansible/lib/ansible/plugins/connection/custom_connection.py', + '/path/to/ansible/lib/ansible/plugins/connection/ssh.py'] + + def fake_all(path_only=None): + for path in connection_loader_paths: + yield path + + mock_connection_loader = MagicMock() + mock_connection_loader.all = fake_all + + mock_shared_loader_obj.connection_loader = mock_connection_loader + mock_connection = MagicMock() + # mock_connection._shell.env_prefix.side_effect = env_prefix + + # action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None) + action_base = DerivedActionBase(task=None, + connection=mock_connection, + play_context=None, + loader=fake_loader, + templar=None, + shared_loader_obj=mock_shared_loader_obj) + data = {'ansible_playbook_python': '/usr/bin/python', + # 'ansible_rsync_path': '/usr/bin/rsync', + 'ansible_python_interpreter': '/usr/bin/python', + 'ansible_ssh_some_var': 'whatever', + 'ansible_ssh_host_key_somehost': 'some key here', + 'some_other_var': 'foo bar'} + data = clean_facts(data) + self.assertNotIn('ansible_playbook_python', data) + self.assertNotIn('ansible_python_interpreter', data) + self.assertIn('ansible_ssh_host_key_somehost', data) + self.assertIn('some_other_var', data) + + +class TestActionBaseParseReturnedData(unittest.TestCase): + + def test_fail_no_json(self): + action_base = _action_base() + rc = 0 + stdout = 'foo\nbar\n' + err = 'oopsy' + returned_data = {'rc': rc, + 'stdout': stdout, + 'stdout_lines': stdout.splitlines(), + 'stderr': err} + res = action_base._parse_returned_data(returned_data) + self.assertFalse(res['_ansible_parsed']) + self.assertTrue(res['failed']) + self.assertEqual(res['module_stderr'], err) + + def test_json_empty(self): + action_base = _action_base() + rc = 0 + stdout = '{}\n' + err = '' + returned_data = {'rc': rc, + 'stdout': stdout, + 'stdout_lines': stdout.splitlines(), + 'stderr': err} + res = action_base._parse_returned_data(returned_data) + del res['_ansible_parsed'] # we always have _ansible_parsed + self.assertEqual(len(res), 0) + self.assertFalse(res) + + def test_json_facts(self): + action_base = _action_base() + rc = 0 + stdout = '{"ansible_facts": {"foo": "bar", "ansible_blip": "blip_value"}}\n' + err = '' + + returned_data = {'rc': rc, + 'stdout': stdout, + 'stdout_lines': stdout.splitlines(), + 'stderr': err} + res = action_base._parse_returned_data(returned_data) + self.assertTrue(res['ansible_facts']) + self.assertIn('ansible_blip', res['ansible_facts']) + # TODO: Should this be an AnsibleUnsafe? + # self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe) + + def test_json_facts_add_host(self): + action_base = _action_base() + rc = 0 + stdout = '''{"ansible_facts": {"foo": "bar", "ansible_blip": "blip_value"}, + "add_host": {"host_vars": {"some_key": ["whatever the add_host object is"]} + } + }\n''' + err = '' + + returned_data = {'rc': rc, + 'stdout': stdout, + 'stdout_lines': stdout.splitlines(), + 'stderr': err} + res = action_base._parse_returned_data(returned_data) + self.assertTrue(res['ansible_facts']) + self.assertIn('ansible_blip', res['ansible_facts']) + self.assertIn('add_host', res) + # TODO: Should this be an AnsibleUnsafe? + # self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe) diff --git a/test/units/plugins/action/test_gather_facts.py b/test/units/plugins/action/test_gather_facts.py new file mode 100644 index 0000000..20225aa --- /dev/null +++ b/test/units/plugins/action/test_gather_facts.py @@ -0,0 +1,98 @@ +# (c) 2016, Saran Ahluwalia <ahlusar.ahluwalia@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from unittest.mock import MagicMock, patch + +from ansible import constants as C +from ansible.playbook.task import Task +from ansible.plugins.action.gather_facts import ActionModule as GatherFactsAction +from ansible.template import Templar +from ansible.executor import module_common + +from units.mock.loader import DictDataLoader + + +class TestNetworkFacts(unittest.TestCase): + task = MagicMock(Task) + play_context = MagicMock() + play_context.check_mode = False + connection = MagicMock() + fake_loader = DictDataLoader({ + }) + templar = Templar(loader=fake_loader) + + def setUp(self): + pass + + def tearDown(self): + pass + + @patch.object(module_common, '_get_collection_metadata', return_value={}) + def test_network_gather_facts_smart_facts_module(self, mock_collection_metadata): + self.fqcn_task_vars = {'ansible_network_os': 'ios'} + self.task.action = 'gather_facts' + self.task.async_val = False + self.task.args = {} + + plugin = GatherFactsAction(self.task, self.connection, self.play_context, loader=None, templar=self.templar, shared_loader_obj=None) + get_module_args = MagicMock() + plugin._get_module_args = get_module_args + plugin._execute_module = MagicMock() + + res = plugin.run(task_vars=self.fqcn_task_vars) + + # assert the gather_facts config is 'smart' + facts_modules = C.config.get_config_value('FACTS_MODULES', variables=self.fqcn_task_vars) + self.assertEqual(facts_modules, ['smart']) + + # assert the correct module was found + self.assertEqual(get_module_args.call_count, 1) + + self.assertEqual( + get_module_args.call_args.args, + ('ansible.legacy.ios_facts', {'ansible_network_os': 'ios'},) + ) + + @patch.object(module_common, '_get_collection_metadata', return_value={}) + def test_network_gather_facts_smart_facts_module_fqcn(self, mock_collection_metadata): + self.fqcn_task_vars = {'ansible_network_os': 'cisco.ios.ios'} + self.task.action = 'gather_facts' + self.task.async_val = False + self.task.args = {} + + plugin = GatherFactsAction(self.task, self.connection, self.play_context, loader=None, templar=self.templar, shared_loader_obj=None) + get_module_args = MagicMock() + plugin._get_module_args = get_module_args + plugin._execute_module = MagicMock() + + res = plugin.run(task_vars=self.fqcn_task_vars) + + # assert the gather_facts config is 'smart' + facts_modules = C.config.get_config_value('FACTS_MODULES', variables=self.fqcn_task_vars) + self.assertEqual(facts_modules, ['smart']) + + # assert the correct module was found + self.assertEqual(get_module_args.call_count, 1) + + self.assertEqual( + get_module_args.call_args.args, + ('cisco.ios.ios_facts', {'ansible_network_os': 'cisco.ios.ios'},) + ) diff --git a/test/units/plugins/action/test_pause.py b/test/units/plugins/action/test_pause.py new file mode 100644 index 0000000..8ad6db7 --- /dev/null +++ b/test/units/plugins/action/test_pause.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2021 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import curses +import importlib +import io +import pytest +import sys + +from ansible.plugins.action import pause # noqa: F401 +from ansible.module_utils.six import PY2 + +builtin_import = 'builtins.__import__' +if PY2: + builtin_import = '__builtin__.__import__' + + +def test_pause_curses_tigetstr_none(mocker, monkeypatch): + monkeypatch.delitem(sys.modules, 'ansible.plugins.action.pause') + + dunder_import = __import__ + + def _import(*args, **kwargs): + if args[0] == 'curses': + mock_curses = mocker.Mock() + mock_curses.setupterm = mocker.Mock(return_value=True) + mock_curses.tigetstr = mocker.Mock(return_value=None) + return mock_curses + else: + return dunder_import(*args, **kwargs) + + mocker.patch(builtin_import, _import) + + mod = importlib.import_module('ansible.plugins.action.pause') + + assert mod.HAS_CURSES is True + assert mod.MOVE_TO_BOL == b'\r' + assert mod.CLEAR_TO_EOL == b'\x1b[K' + + +def test_pause_missing_curses(mocker, monkeypatch): + monkeypatch.delitem(sys.modules, 'ansible.plugins.action.pause') + + dunder_import = __import__ + + def _import(*args, **kwargs): + if args[0] == 'curses': + raise ImportError + else: + return dunder_import(*args, **kwargs) + + mocker.patch(builtin_import, _import) + + mod = importlib.import_module('ansible.plugins.action.pause') + + with pytest.raises(AttributeError): + mod.curses + + assert mod.HAS_CURSES is False + assert mod.MOVE_TO_BOL == b'\r' + assert mod.CLEAR_TO_EOL == b'\x1b[K' + + +@pytest.mark.parametrize('exc', (curses.error, TypeError, io.UnsupportedOperation)) +def test_pause_curses_setupterm_error(mocker, monkeypatch, exc): + monkeypatch.delitem(sys.modules, 'ansible.plugins.action.pause') + + dunder_import = __import__ + + def _import(*args, **kwargs): + if args[0] == 'curses': + mock_curses = mocker.Mock() + mock_curses.setupterm = mocker.Mock(side_effect=exc) + mock_curses.error = curses.error + return mock_curses + else: + return dunder_import(*args, **kwargs) + + mocker.patch(builtin_import, _import) + + mod = importlib.import_module('ansible.plugins.action.pause') + + assert mod.HAS_CURSES is False + assert mod.MOVE_TO_BOL == b'\r' + assert mod.CLEAR_TO_EOL == b'\x1b[K' diff --git a/test/units/plugins/action/test_raw.py b/test/units/plugins/action/test_raw.py new file mode 100644 index 0000000..3348051 --- /dev/null +++ b/test/units/plugins/action/test_raw.py @@ -0,0 +1,105 @@ +# (c) 2016, Saran Ahluwalia <ahlusar.ahluwalia@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from ansible.errors import AnsibleActionFail +from units.compat import unittest +from unittest.mock import MagicMock, Mock +from ansible.plugins.action.raw import ActionModule +from ansible.playbook.task import Task +from ansible.plugins.loader import connection_loader + + +class TestCopyResultExclude(unittest.TestCase): + + def setUp(self): + self.play_context = Mock() + self.play_context.shell = 'sh' + self.connection = connection_loader.get('local', self.play_context, os.devnull) + + def tearDown(self): + pass + + # The current behavior of the raw aciton in regards to executable is currently in question; + # the test_raw_executable_is_not_empty_string verifies the current behavior (whether it is desireed or not. + # Please refer to the following for context: + # Issue: https://github.com/ansible/ansible/issues/16054 + # PR: https://github.com/ansible/ansible/pull/16085 + + def test_raw_executable_is_not_empty_string(self): + + task = MagicMock(Task) + task.async_val = False + + task.args = {'_raw_params': 'Args1'} + self.play_context.check_mode = False + + self.mock_am = ActionModule(task, self.connection, self.play_context, loader=None, templar=None, shared_loader_obj=None) + self.mock_am._low_level_execute_command = Mock(return_value={}) + self.mock_am.display = Mock() + self.mock_am._admin_users = ['root', 'toor'] + + self.mock_am.run() + self.mock_am._low_level_execute_command.assert_called_with('Args1', executable=False) + + def test_raw_check_mode_is_True(self): + + task = MagicMock(Task) + task.async_val = False + + task.args = {'_raw_params': 'Args1'} + self.play_context.check_mode = True + + try: + self.mock_am = ActionModule(task, self.connection, self.play_context, loader=None, templar=None, shared_loader_obj=None) + except AnsibleActionFail: + pass + + def test_raw_test_environment_is_None(self): + + task = MagicMock(Task) + task.async_val = False + + task.args = {'_raw_params': 'Args1'} + task.environment = None + self.play_context.check_mode = False + + self.mock_am = ActionModule(task, self.connection, self.play_context, loader=None, templar=None, shared_loader_obj=None) + self.mock_am._low_level_execute_command = Mock(return_value={}) + self.mock_am.display = Mock() + + self.assertEqual(task.environment, None) + + def test_raw_task_vars_is_not_None(self): + + task = MagicMock(Task) + task.async_val = False + + task.args = {'_raw_params': 'Args1'} + task.environment = None + self.play_context.check_mode = False + + self.mock_am = ActionModule(task, self.connection, self.play_context, loader=None, templar=None, shared_loader_obj=None) + self.mock_am._low_level_execute_command = Mock(return_value={}) + self.mock_am.display = Mock() + + self.mock_am.run(task_vars={'a': 'b'}) + self.assertEqual(task.environment, None) |