diff options
Diffstat (limited to 'test/units/module_utils/basic/test_atomic_move.py')
-rw-r--r-- | test/units/module_utils/basic/test_atomic_move.py | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/test/units/module_utils/basic/test_atomic_move.py b/test/units/module_utils/basic/test_atomic_move.py new file mode 100644 index 0000000..bbdb051 --- /dev/null +++ b/test/units/module_utils/basic/test_atomic_move.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import os +import errno +import json +from itertools import product + +import pytest + +from ansible.module_utils import basic + + +@pytest.fixture +def atomic_am(am, mocker): + am.selinux_enabled = mocker.MagicMock() + am.selinux_context = mocker.MagicMock() + am.selinux_default_context = mocker.MagicMock() + am.set_context_if_different = mocker.MagicMock() + am._unsafe_writes = mocker.MagicMock() + + yield am + + +@pytest.fixture +def atomic_mocks(mocker, monkeypatch): + environ = dict() + mocks = { + 'chmod': mocker.patch('os.chmod'), + 'chown': mocker.patch('os.chown'), + 'close': mocker.patch('os.close'), + 'environ': mocker.patch('os.environ', environ), + 'getlogin': mocker.patch('os.getlogin'), + 'getuid': mocker.patch('os.getuid'), + 'path_exists': mocker.patch('os.path.exists'), + 'rename': mocker.patch('os.rename'), + 'stat': mocker.patch('os.stat'), + 'umask': mocker.patch('os.umask'), + 'getpwuid': mocker.patch('pwd.getpwuid'), + 'copy2': mocker.patch('shutil.copy2'), + 'copyfileobj': mocker.patch('shutil.copyfileobj'), + 'move': mocker.patch('shutil.move'), + 'mkstemp': mocker.patch('tempfile.mkstemp'), + } + + mocks['getlogin'].return_value = 'root' + mocks['getuid'].return_value = 0 + mocks['getpwuid'].return_value = ('root', '', 0, 0, '', '', '') + mocks['umask'].side_effect = [18, 0] + mocks['rename'].return_value = None + + # normalize OS specific features + monkeypatch.delattr(os, 'chflags', raising=False) + + yield mocks + + +@pytest.fixture +def fake_stat(mocker): + stat1 = mocker.MagicMock() + stat1.st_mode = 0o0644 + stat1.st_uid = 0 + stat1.st_gid = 0 + stat1.st_flags = 0 + yield stat1 + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_new_file(atomic_am, atomic_mocks, mocker, selinux): + # test destination does not exist, login name = 'root', no environment, os.rename() succeeds + mock_context = atomic_am.selinux_default_context.return_value + atomic_mocks['path_exists'].return_value = False + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_existing_file(atomic_am, atomic_mocks, fake_stat, mocker, selinux): + # Test destination already present + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_no_tty_fallback(atomic_am, atomic_mocks, fake_stat, mocker): + """Raise OSError when using getlogin() to simulate no tty cornercase""" + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = True + atomic_mocks['getlogin'].side_effect = OSError() + atomic_mocks['environ']['LOGNAME'] = 'root' + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_existing_file_stat_failure(atomic_am, atomic_mocks, mocker): + """Failure to stat an existing file in order to copy permissions propogates the error (unless EPERM)""" + atomic_mocks['stat'].side_effect = OSError() + atomic_mocks['path_exists'].return_value = True + + with pytest.raises(OSError): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_existing_file_stat_perms_failure(atomic_am, atomic_mocks, mocker): + """Failure to stat an existing file to copy the permissions due to permissions passes fine""" + # and now have os.stat return EPERM, which should not fail + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].side_effect = OSError(errno.EPERM, 'testing os stat with EPERM') + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = True + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + # FIXME: Should atomic_move() set a default permission value when it cannot retrieve the + # existing file's permissions? (Right now it's up to the calling code. + # assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_rename_failure(atomic_am, atomic_mocks, mocker, capfd): + """Test os.rename fails with EIO, causing it to bail out""" + atomic_mocks['path_exists'].side_effect = [False, False] + atomic_mocks['rename'].side_effect = OSError(errno.EIO, 'failing with EIO') + + with pytest.raises(SystemExit): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + out, err = capfd.readouterr() + results = json.loads(out) + + assert 'Could not replace file' in results['msg'] + assert 'failing with EIO' in results['msg'] + assert results['failed'] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_rename_perms_fail_temp_creation_fails(atomic_am, atomic_mocks, mocker, capfd): + """Test os.rename fails with EPERM working but failure in mkstemp""" + atomic_mocks['path_exists'].return_value = False + atomic_mocks['close'].return_value = None + atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + atomic_mocks['mkstemp'].return_value = None + atomic_mocks['mkstemp'].side_effect = OSError() + atomic_am.selinux_enabled.return_value = False + + with pytest.raises(SystemExit): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + out, err = capfd.readouterr() + results = json.loads(out) + + assert 'is not writable by the current user' in results['msg'] + assert results['failed'] + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_rename_perms_fail_temp_succeeds(atomic_am, atomic_mocks, fake_stat, mocker, selinux): + """Test os.rename raising an error but fallback to using mkstemp works""" + mock_context = atomic_am.selinux_default_context.return_value + atomic_mocks['path_exists'].return_value = False + atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['stat'].side_effect = None + atomic_mocks['mkstemp'].return_value = (None, '/path/to/tempfile') + atomic_mocks['mkstemp'].side_effect = None + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + assert atomic_mocks['rename'].call_args_list == [mocker.call(b'/path/to/src', b'/path/to/dest'), + mocker.call(b'/path/to/tempfile', b'/path/to/dest')] + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call(b'/path/to/tempfile', mock_context, False), + mocker.call('/path/to/dest', mock_context, False)] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called |