diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-14 20:03:01 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-14 20:03:01 +0000 |
commit | a453ac31f3428614cceb99027f8efbdb9258a40b (patch) | |
tree | f61f87408f32a8511cbd91799f9cececb53e0374 /test/units/module_utils/basic | |
parent | Initial commit. (diff) | |
download | ansible-a453ac31f3428614cceb99027f8efbdb9258a40b.tar.xz ansible-a453ac31f3428614cceb99027f8efbdb9258a40b.zip |
Adding upstream version 2.10.7+merged+base+2.10.8+dfsg.upstream/2.10.7+merged+base+2.10.8+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/units/module_utils/basic')
23 files changed, 3483 insertions, 0 deletions
diff --git a/test/units/module_utils/basic/__init__.py b/test/units/module_utils/basic/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/units/module_utils/basic/__init__.py diff --git a/test/units/module_utils/basic/test__log_invocation.py b/test/units/module_utils/basic/test__log_invocation.py new file mode 100644 index 00000000..3beda8bd --- /dev/null +++ b/test/units/module_utils/basic/test__log_invocation.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# (c) 2016, James Cammarata <jimi@sngx.net> +# (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import pytest + + +ARGS = dict(foo=False, bar=[1, 2, 3], bam="bam", baz=u'baz') +ARGUMENT_SPEC = dict( + foo=dict(default=True, type='bool'), + bar=dict(default=[], type='list'), + bam=dict(default="bam"), + baz=dict(default=u"baz"), + password=dict(default=True), + no_log=dict(default="you shouldn't see me", no_log=True), +) + + +@pytest.mark.parametrize('am, stdin', [(ARGUMENT_SPEC, ARGS)], indirect=['am', 'stdin']) +def test_module_utils_basic__log_invocation(am, mocker): + + am.log = mocker.MagicMock() + am._log_invocation() + + # Message is generated from a dict so it will be in an unknown order. + # have to check this manually rather than with assert_called_with() + args = am.log.call_args[0] + assert len(args) == 1 + message = args[0] + + assert len(message) == \ + len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD') + + assert message.startswith('Invoked with ') + assert ' bam=bam' in message + assert ' bar=[1, 2, 3]' in message + assert ' foo=False' in message + assert ' baz=baz' in message + assert ' no_log=NOT_LOGGING_PARAMETER' in message + assert ' password=NOT_LOGGING_PASSWORD' in message + + kwargs = am.log.call_args[1] + assert kwargs == \ + dict(log_args={ + 'foo': 'False', + 'bar': '[1, 2, 3]', + 'bam': 'bam', + 'baz': 'baz', + 'password': 'NOT_LOGGING_PASSWORD', + 'no_log': 'NOT_LOGGING_PARAMETER', + }) diff --git a/test/units/module_utils/basic/test__symbolic_mode_to_octal.py b/test/units/module_utils/basic/test__symbolic_mode_to_octal.py new file mode 100644 index 00000000..7793b348 --- /dev/null +++ b/test/units/module_utils/basic/test__symbolic_mode_to_octal.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# Copyright: +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016-2017 Ansible Project +# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible.module_utils.basic import AnsibleModule + + +# +# Info helpful for making new test cases: +# +# base_mode = {'dir no perms': 0o040000, +# 'file no perms': 0o100000, +# 'dir all perms': 0o400000 | 0o777, +# 'file all perms': 0o100000, | 0o777} +# +# perm_bits = {'x': 0b001, +# 'w': 0b010, +# 'r': 0b100} +# +# role_shift = {'u': 6, +# 'g': 3, +# 'o': 0} + +DATA = ( # Going from no permissions to setting all for user, group, and/or other + (0o040000, u'a+rwx', 0o0777), + (0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777), + (0o040000, u'o+rwx', 0o0007), + (0o040000, u'g+rwx', 0o0070), + (0o040000, u'u+rwx', 0o0700), + + # Going from all permissions to none for user, group, and/or other + (0o040777, u'a-rwx', 0o0000), + (0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000), + (0o040777, u'o-rwx', 0o0770), + (0o040777, u'g-rwx', 0o0707), + (0o040777, u'u-rwx', 0o0077), + + # now using absolute assignment from None to a set of perms + (0o040000, u'a=rwx', 0o0777), + (0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777), + (0o040000, u'o=rwx', 0o0007), + (0o040000, u'g=rwx', 0o0070), + (0o040000, u'u=rwx', 0o0700), + + # X effect on files and dirs + (0o040000, u'a+X', 0o0111), + (0o100000, u'a+X', 0), + (0o040000, u'a=X', 0o0111), + (0o100000, u'a=X', 0), + (0o040777, u'a-X', 0o0666), + # Same as chmod but is it a bug? + # chmod a-X statfile <== removes execute from statfile + (0o100777, u'a-X', 0o0666), + + # Multiple permissions + (0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755), + (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644), +) + +UMASK_DATA = ( + (0o100000, '+rwx', 0o770), + (0o100777, '-rwx', 0o007), +) + +INVALID_DATA = ( + (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"), + (0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"), +) + + +@pytest.mark.parametrize('stat_info, mode_string, expected', DATA) +def test_good_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected + + +@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA) +def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_umask = mocker.patch('os.umask') + mock_umask.return_value = 0o7 + + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected + + +@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA) +def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info + with pytest.raises(ValueError) as exc: + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah' + assert exc.match(expected) diff --git a/test/units/module_utils/basic/test_argument_spec.py b/test/units/module_utils/basic/test_argument_spec.py new file mode 100644 index 00000000..6e297669 --- /dev/null +++ b/test/units/module_utils/basic/test_argument_spec.py @@ -0,0 +1,706 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# Copyright: Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import json +import os + +import pytest + +from units.compat.mock import MagicMock +from ansible.module_utils import basic +from ansible.module_utils.api import basic_auth_argument_spec, rate_limit_argument_spec, retry_argument_spec +from ansible.module_utils.common.warnings import get_deprecation_messages, get_warning_messages +from ansible.module_utils.six import integer_types, string_types +from ansible.module_utils.six.moves import builtins + + +MOCK_VALIDATOR_FAIL = MagicMock(side_effect=TypeError("bad conversion")) +# Data is argspec, argument, expected +VALID_SPECS = ( + # Simple type=int + ({'arg': {'type': 'int'}}, {'arg': 42}, 42), + # Simple type=int with a large value (will be of type long under Python 2) + ({'arg': {'type': 'int'}}, {'arg': 18765432109876543210}, 18765432109876543210), + # Simple type=list, elements=int + ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': [42, 32]}, [42, 32]), + # Type=int with conversion from string + ({'arg': {'type': 'int'}}, {'arg': '42'}, 42), + # Type=list elements=int with conversion from string + ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': ['42', '32']}, [42, 32]), + # Simple type=float + ({'arg': {'type': 'float'}}, {'arg': 42.0}, 42.0), + # Simple type=list, elements=float + ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': [42.1, 32.2]}, [42.1, 32.2]), + # Type=float conversion from int + ({'arg': {'type': 'float'}}, {'arg': 42}, 42.0), + # type=list, elements=float conversion from int + ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': [42, 32]}, [42.0, 32.0]), + # Type=float conversion from string + ({'arg': {'type': 'float'}}, {'arg': '42.0'}, 42.0), + # type=list, elements=float conversion from string + ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': ['42.1', '32.2']}, [42.1, 32.2]), + # Type=float conversion from string without decimal point + ({'arg': {'type': 'float'}}, {'arg': '42'}, 42.0), + # Type=list elements=float conversion from string without decimal point + ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': ['42', '32.2']}, [42.0, 32.2]), + # Simple type=bool + ({'arg': {'type': 'bool'}}, {'arg': True}, True), + # Simple type=list elements=bool + ({'arg': {'type': 'list', 'elements': 'bool'}}, {'arg': [True, 'true', 1, 'yes', False, 'false', 'no', 0]}, + [True, True, True, True, False, False, False, False]), + # Type=int with conversion from string + ({'arg': {'type': 'bool'}}, {'arg': 'yes'}, True), + # Type=str converts to string + ({'arg': {'type': 'str'}}, {'arg': 42}, '42'), + # Type=list elements=str simple converts to string + ({'arg': {'type': 'list', 'elements': 'str'}}, {'arg': ['42', '32']}, ['42', '32']), + # Type is implicit, converts to string + ({'arg': {'type': 'str'}}, {'arg': 42}, '42'), + # Type=list elements=str implicit converts to string + ({'arg': {'type': 'list', 'elements': 'str'}}, {'arg': [42, 32]}, ['42', '32']), + # parameter is required + ({'arg': {'required': True}}, {'arg': 42}, '42'), +) + +INVALID_SPECS = ( + # Type is int; unable to convert this string + ({'arg': {'type': 'int'}}, {'arg': "wolf"}, "is of type {0} and we were unable to convert to int: {0} cannot be converted to an int".format(type('bad'))), + # Type is list elements is int; unable to convert this string + ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': [1, "bad"]}, "is of type {0} and we were unable to convert to int: {0} cannot be converted to " + "an int".format(type('int'))), + # Type is int; unable to convert float + ({'arg': {'type': 'int'}}, {'arg': 42.1}, "'float'> cannot be converted to an int"), + # Type is list, elements is int; unable to convert float + ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': [42.1, 32, 2]}, "'float'> cannot be converted to an int"), + # type is a callable that fails to convert + ({'arg': {'type': MOCK_VALIDATOR_FAIL}}, {'arg': "bad"}, "bad conversion"), + # type is a list, elements is callable that fails to convert + ({'arg': {'type': 'list', 'elements': MOCK_VALIDATOR_FAIL}}, {'arg': [1, "bad"]}, "bad conversion"), + # unknown parameter + ({'arg': {'type': 'int'}}, {'other': 'bad', '_ansible_module_name': 'ansible_unittest'}, + 'Unsupported parameters for (ansible_unittest) module: other Supported parameters include: arg'), + # parameter is required + ({'arg': {'required': True}}, {}, 'missing required arguments: arg'), +) + +BASIC_AUTH_VALID_ARGS = [ + {'api_username': 'user1', 'api_password': 'password1', 'api_url': 'http://example.com', 'validate_certs': False}, + {'api_username': 'user1', 'api_password': 'password1', 'api_url': 'http://example.com', 'validate_certs': True}, +] + +RATE_LIMIT_VALID_ARGS = [ + {'rate': 1, 'rate_limit': 1}, + {'rate': '1', 'rate_limit': 1}, + {'rate': 1, 'rate_limit': '1'}, + {'rate': '1', 'rate_limit': '1'}, +] + +RETRY_VALID_ARGS = [ + {'retries': 1, 'retry_pause': 1.5}, + {'retries': '1', 'retry_pause': '1.5'}, + {'retries': 1, 'retry_pause': '1.5'}, + {'retries': '1', 'retry_pause': 1.5}, +] + + +@pytest.fixture +def complex_argspec(): + arg_spec = dict( + foo=dict(required=True, aliases=['dup']), + bar=dict(), + bam=dict(), + bing=dict(), + bang=dict(), + bong=dict(), + baz=dict(fallback=(basic.env_fallback, ['BAZ'])), + bar1=dict(type='bool'), + bar3=dict(type='list', elements='path'), + bar_str=dict(type='list', elements=str), + zardoz=dict(choices=['one', 'two']), + zardoz2=dict(type='list', choices=['one', 'two', 'three']), + zardoz3=dict(type='str', aliases=['zodraz'], deprecated_aliases=[dict(name='zodraz', version='9.99')]), + ) + mut_ex = (('bar', 'bam'), ('bing', 'bang', 'bong')) + req_to = (('bam', 'baz'),) + + kwargs = dict( + argument_spec=arg_spec, + mutually_exclusive=mut_ex, + required_together=req_to, + no_log=True, + add_file_common_args=True, + supports_check_mode=True, + ) + return kwargs + + +@pytest.fixture +def options_argspec_list(): + options_spec = dict( + foo=dict(required=True, aliases=['dup']), + bar=dict(), + bar1=dict(type='list', elements='str'), + bar2=dict(type='list', elements='int'), + bar3=dict(type='list', elements='float'), + bar4=dict(type='list', elements='path'), + bam=dict(), + baz=dict(fallback=(basic.env_fallback, ['BAZ'])), + bam1=dict(), + bam2=dict(default='test'), + bam3=dict(type='bool'), + bam4=dict(type='str'), + ) + + arg_spec = dict( + foobar=dict( + type='list', + elements='dict', + options=options_spec, + mutually_exclusive=[ + ['bam', 'bam1'], + ], + required_if=[ + ['foo', 'hello', ['bam']], + ['foo', 'bam2', ['bam2']] + ], + required_one_of=[ + ['bar', 'bam'] + ], + required_together=[ + ['bam1', 'baz'] + ], + required_by={ + 'bam4': ('bam1', 'bam3'), + }, + ) + ) + + kwargs = dict( + argument_spec=arg_spec, + no_log=True, + add_file_common_args=True, + supports_check_mode=True + ) + return kwargs + + +@pytest.fixture +def options_argspec_dict(options_argspec_list): + # should test ok, for options in dict format. + kwargs = options_argspec_list + kwargs['argument_spec']['foobar']['type'] = 'dict' + kwargs['argument_spec']['foobar']['elements'] = None + + return kwargs + + +# +# Tests for one aspect of arg_spec +# + +@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in VALID_SPECS], + indirect=['stdin']) +def test_validator_basic_types(argspec, expected, stdin): + + am = basic.AnsibleModule(argspec) + + if 'type' in argspec['arg']: + if argspec['arg']['type'] == 'int': + type_ = integer_types + else: + type_ = getattr(builtins, argspec['arg']['type']) + else: + type_ = str + + assert isinstance(am.params['arg'], type_) + assert am.params['arg'] == expected + + +@pytest.mark.parametrize('stdin', [{'arg': 42}, {'arg': 18765432109876543210}], indirect=['stdin']) +def test_validator_function(mocker, stdin): + # Type is a callable + MOCK_VALIDATOR_SUCCESS = mocker.MagicMock(return_value=27) + argspec = {'arg': {'type': MOCK_VALIDATOR_SUCCESS}} + am = basic.AnsibleModule(argspec) + + assert isinstance(am.params['arg'], integer_types) + assert am.params['arg'] == 27 + + +@pytest.mark.parametrize('stdin', BASIC_AUTH_VALID_ARGS, indirect=['stdin']) +def test_validate_basic_auth_arg(mocker, stdin): + kwargs = dict( + argument_spec=basic_auth_argument_spec() + ) + am = basic.AnsibleModule(**kwargs) + assert isinstance(am.params['api_username'], string_types) + assert isinstance(am.params['api_password'], string_types) + assert isinstance(am.params['api_url'], string_types) + assert isinstance(am.params['validate_certs'], bool) + + +@pytest.mark.parametrize('stdin', RATE_LIMIT_VALID_ARGS, indirect=['stdin']) +def test_validate_rate_limit_argument_spec(mocker, stdin): + kwargs = dict( + argument_spec=rate_limit_argument_spec() + ) + am = basic.AnsibleModule(**kwargs) + assert isinstance(am.params['rate'], integer_types) + assert isinstance(am.params['rate_limit'], integer_types) + + +@pytest.mark.parametrize('stdin', RETRY_VALID_ARGS, indirect=['stdin']) +def test_validate_retry_argument_spec(mocker, stdin): + kwargs = dict( + argument_spec=retry_argument_spec() + ) + am = basic.AnsibleModule(**kwargs) + assert isinstance(am.params['retries'], integer_types) + assert isinstance(am.params['retry_pause'], float) + + +@pytest.mark.parametrize('stdin', [{'arg': '123'}, {'arg': 123}], indirect=['stdin']) +def test_validator_string_type(mocker, stdin): + # Custom callable that is 'str' + argspec = {'arg': {'type': str}} + am = basic.AnsibleModule(argspec) + + assert isinstance(am.params['arg'], string_types) + assert am.params['arg'] == '123' + + +@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in INVALID_SPECS], + indirect=['stdin']) +def test_validator_fail(stdin, capfd, argspec, expected): + with pytest.raises(SystemExit): + basic.AnsibleModule(argument_spec=argspec) + + out, err = capfd.readouterr() + assert not err + assert expected in json.loads(out)['msg'] + assert json.loads(out)['failed'] + + +class TestComplexArgSpecs: + """Test with a more complex arg_spec""" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello'}, {'dup': 'hello'}], indirect=['stdin']) + def test_complex_required(self, stdin, complex_argspec): + """Test that the complex argspec works if we give it its required param as either the canonical or aliased name""" + am = basic.AnsibleModule(**complex_argspec) + assert isinstance(am.params['foo'], str) + assert am.params['foo'] == 'hello' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello1', 'dup': 'hello2'}], indirect=['stdin']) + def test_complex_duplicate_warning(self, stdin, complex_argspec): + """Test that the complex argspec issues a warning if we specify an option both with its canonical name and its alias""" + am = basic.AnsibleModule(**complex_argspec) + assert isinstance(am.params['foo'], str) + assert 'Both option foo and its alias dup are set.' in get_warning_messages() + assert am.params['foo'] == 'hello2' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bam': 'test'}], indirect=['stdin']) + def test_complex_type_fallback(self, mocker, stdin, complex_argspec): + """Test that the complex argspec works if we get a required parameter via fallback""" + environ = os.environ.copy() + environ['BAZ'] = 'test data' + mocker.patch('ansible.module_utils.basic.os.environ', environ) + + am = basic.AnsibleModule(**complex_argspec) + + assert isinstance(am.params['baz'], str) + assert am.params['baz'] == 'test data' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar': 'bad', 'bam': 'bad2', 'bing': 'a', 'bang': 'b', 'bong': 'c'}], indirect=['stdin']) + def test_fail_mutually_exclusive(self, capfd, stdin, complex_argspec): + """Fail because of mutually exclusive parameters""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are mutually exclusive: bar|bam, bing|bang|bong" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bam': 'bad2'}], indirect=['stdin']) + def test_fail_required_together(self, capfd, stdin, complex_argspec): + """Fail because only one of a required_together pair of parameters was specified""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are required together: bam, baz" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar': 'hi'}], indirect=['stdin']) + def test_fail_required_together_and_default(self, capfd, stdin, complex_argspec): + """Fail because one of a required_together pair of parameters has a default and the other was not specified""" + complex_argspec['argument_spec']['baz'] = {'default': 42} + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are required together: bam, baz" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello'}], indirect=['stdin']) + def test_fail_required_together_and_fallback(self, capfd, mocker, stdin, complex_argspec): + """Fail because one of a required_together pair of parameters has a fallback and the other was not specified""" + environ = os.environ.copy() + environ['BAZ'] = 'test data' + mocker.patch('ansible.module_utils.basic.os.environ', environ) + + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are required together: bam, baz" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'zardoz2': ['one', 'four', 'five']}], indirect=['stdin']) + def test_fail_list_with_choices(self, capfd, mocker, stdin, complex_argspec): + """Fail because one of the items is not in the choice""" + with pytest.raises(SystemExit): + basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "value of zardoz2 must be one or more of: one, two, three. Got no match for: four, five" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'zardoz2': ['one', 'three']}], indirect=['stdin']) + def test_list_with_choices(self, capfd, mocker, stdin, complex_argspec): + """Test choices with list""" + am = basic.AnsibleModule(**complex_argspec) + assert isinstance(am.params['zardoz2'], list) + assert am.params['zardoz2'] == ['one', 'three'] + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar3': ['~/test', 'test/']}], indirect=['stdin']) + def test_list_with_elements_path(self, capfd, mocker, stdin, complex_argspec): + """Test choices with list""" + am = basic.AnsibleModule(**complex_argspec) + assert isinstance(am.params['bar3'], list) + assert am.params['bar3'][0].startswith('/') + assert am.params['bar3'][1] == 'test/' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'zodraz': 'one'}], indirect=['stdin']) + def test_deprecated_alias(self, capfd, mocker, stdin, complex_argspec): + """Test a deprecated alias""" + am = basic.AnsibleModule(**complex_argspec) + + assert "Alias 'zodraz' is deprecated." in get_deprecation_messages()[0]['msg'] + assert get_deprecation_messages()[0]['version'] == '9.99' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar_str': [867, '5309']}], indirect=['stdin']) + def test_list_with_elements_callable_str(self, capfd, mocker, stdin, complex_argspec): + """Test choices with list""" + am = basic.AnsibleModule(**complex_argspec) + assert isinstance(am.params['bar_str'], list) + assert isinstance(am.params['bar_str'][0], string_types) + assert isinstance(am.params['bar_str'][1], string_types) + assert am.params['bar_str'][0] == '867' + assert am.params['bar_str'][1] == '5309' + + +class TestComplexOptions: + """Test arg spec options""" + + # (Parameters, expected value of module.params['foobar']) + OPTIONS_PARAMS_LIST = ( + ({'foobar': [{"foo": "hello", "bam": "good"}, {"foo": "test", "bar": "good"}]}, + [{'foo': 'hello', 'bam': 'good', 'bam2': 'test', 'bar': None, 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None, + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}, + {'foo': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None, + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}] + ), + # Alias for required param + ({'foobar': [{"dup": "test", "bar": "good"}]}, + [{'foo': 'test', 'dup': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None, + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}] + ), + # Required_if utilizing default value of the requirement + ({'foobar': [{"foo": "bam2", "bar": "required_one_of"}]}, + [{'bam': None, 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': 'required_one_of', 'baz': None, 'foo': 'bam2', + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}] + ), + # Check that a bool option is converted + ({"foobar": [{"foo": "required", "bam": "good", "bam3": "yes"}]}, + [{'bam': 'good', 'bam1': None, 'bam2': 'test', 'bam3': True, 'bam4': None, 'bar': None, 'baz': None, 'foo': 'required', + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}] + ), + # Check required_by options + ({"foobar": [{"foo": "required", "bar": "good", "baz": "good", "bam4": "required_by", "bam1": "ok", "bam3": "yes"}]}, + [{'bar': 'good', 'baz': 'good', 'bam1': 'ok', 'bam2': 'test', 'bam3': True, 'bam4': 'required_by', 'bam': None, 'foo': 'required', + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}] + ), + # Check for elements in sub-options + ({"foobar": [{"foo": "good", "bam": "required_one_of", "bar1": [1, "good", "yes"], "bar2": ['1', 1], "bar3":['1.3', 1.3, 1]}]}, + [{'foo': 'good', 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': None, 'baz': None, 'bam': 'required_one_of', + 'bar1': ["1", "good", "yes"], 'bar2': [1, 1], 'bar3': [1.3, 1.3, 1.0], 'bar4': None}] + ), + ) + + # (Parameters, expected value of module.params['foobar']) + OPTIONS_PARAMS_DICT = ( + ({'foobar': {"foo": "hello", "bam": "good"}}, + {'foo': 'hello', 'bam': 'good', 'bam2': 'test', 'bar': None, 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None, + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None} + ), + # Alias for required param + ({'foobar': {"dup": "test", "bar": "good"}}, + {'foo': 'test', 'dup': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None, + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None} + ), + # Required_if utilizing default value of the requirement + ({'foobar': {"foo": "bam2", "bar": "required_one_of"}}, + {'bam': None, 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': 'required_one_of', 'baz': None, 'foo': 'bam2', + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None} + ), + # Check that a bool option is converted + ({"foobar": {"foo": "required", "bam": "good", "bam3": "yes"}}, + {'bam': 'good', 'bam1': None, 'bam2': 'test', 'bam3': True, 'bam4': None, 'bar': None, 'baz': None, 'foo': 'required', + 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None} + ), + # Check required_by options + ({"foobar": {"foo": "required", "bar": "good", "baz": "good", "bam4": "required_by", "bam1": "ok", "bam3": "yes"}}, + {'bar': 'good', 'baz': 'good', 'bam1': 'ok', 'bam2': 'test', 'bam3': True, 'bam4': 'required_by', 'bam': None, + 'foo': 'required', 'bar1': None, 'bar3': None, 'bar2': None, 'bar4': None} + ), + # Check for elements in sub-options + ({"foobar": {"foo": "good", "bam": "required_one_of", "bar1": [1, "good", "yes"], + "bar2": ['1', 1], "bar3": ['1.3', 1.3, 1]}}, + {'foo': 'good', 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': None, + 'baz': None, 'bam': 'required_one_of', + 'bar1': ["1", "good", "yes"], 'bar2': [1, 1], 'bar3': [1.3, 1.3, 1.0], 'bar4': None} + ), + ) + + # (Parameters, failure message) + FAILING_PARAMS_LIST = ( + # Missing required option + ({'foobar': [{}]}, 'missing required arguments: foo found in foobar'), + # Invalid option + ({'foobar': [{"foo": "hello", "bam": "good", "invalid": "bad"}]}, 'module: invalid found in foobar. Supported parameters include'), + # Mutually exclusive options found + ({'foobar': [{"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}]}, + 'parameters are mutually exclusive: bam|bam1 found in foobar'), + # required_if fails + ({'foobar': [{"foo": "hello", "bar": "bad"}]}, + 'foo is hello but all of the following are missing: bam found in foobar'), + # Missing required_one_of option + ({'foobar': [{"foo": "test"}]}, + 'one of the following is required: bar, bam found in foobar'), + # Missing required_together option + ({'foobar': [{"foo": "test", "bar": "required_one_of", "bam1": "bad"}]}, + 'parameters are required together: bam1, baz found in foobar'), + # Missing required_by options + ({'foobar': [{"foo": "test", "bar": "required_one_of", "bam4": "required_by"}]}, + "missing parameter(s) required by 'bam4': bam1, bam3"), + ) + + # (Parameters, failure message) + FAILING_PARAMS_DICT = ( + # Missing required option + ({'foobar': {}}, 'missing required arguments: foo found in foobar'), + # Invalid option + ({'foobar': {"foo": "hello", "bam": "good", "invalid": "bad"}}, + 'module: invalid found in foobar. Supported parameters include'), + # Mutually exclusive options found + ({'foobar': {"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}}, + 'parameters are mutually exclusive: bam|bam1 found in foobar'), + # required_if fails + ({'foobar': {"foo": "hello", "bar": "bad"}}, + 'foo is hello but all of the following are missing: bam found in foobar'), + # Missing required_one_of option + ({'foobar': {"foo": "test"}}, + 'one of the following is required: bar, bam found in foobar'), + # Missing required_together option + ({'foobar': {"foo": "test", "bar": "required_one_of", "bam1": "bad"}}, + 'parameters are required together: bam1, baz found in foobar'), + # Missing required_by options + ({'foobar': {"foo": "test", "bar": "required_one_of", "bam4": "required_by"}}, + "missing parameter(s) required by 'bam4': bam1, bam3"), + ) + + @pytest.mark.parametrize('stdin, expected', OPTIONS_PARAMS_DICT, indirect=['stdin']) + def test_options_type_dict(self, stdin, options_argspec_dict, expected): + """Test that a basic creation with required and required_if works""" + # should test ok, tests basic foo requirement and required_if + am = basic.AnsibleModule(**options_argspec_dict) + + assert isinstance(am.params['foobar'], dict) + assert am.params['foobar'] == expected + + @pytest.mark.parametrize('stdin, expected', OPTIONS_PARAMS_LIST, indirect=['stdin']) + def test_options_type_list(self, stdin, options_argspec_list, expected): + """Test that a basic creation with required and required_if works""" + # should test ok, tests basic foo requirement and required_if + am = basic.AnsibleModule(**options_argspec_list) + + assert isinstance(am.params['foobar'], list) + assert am.params['foobar'] == expected + + @pytest.mark.parametrize('stdin, expected', FAILING_PARAMS_DICT, indirect=['stdin']) + def test_fail_validate_options_dict(self, capfd, stdin, options_argspec_dict, expected): + """Fail because one of a required_together pair of parameters has a default and the other was not specified""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**options_argspec_dict) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert expected in results['msg'] + + @pytest.mark.parametrize('stdin, expected', FAILING_PARAMS_LIST, indirect=['stdin']) + def test_fail_validate_options_list(self, capfd, stdin, options_argspec_list, expected): + """Fail because one of a required_together pair of parameters has a default and the other was not specified""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**options_argspec_list) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert expected in results['msg'] + + @pytest.mark.parametrize('stdin', [{'foobar': {'foo': 'required', 'bam1': 'test', 'bar': 'case'}}], indirect=['stdin']) + def test_fallback_in_option(self, mocker, stdin, options_argspec_dict): + """Test that the complex argspec works if we get a required parameter via fallback""" + environ = os.environ.copy() + environ['BAZ'] = 'test data' + mocker.patch('ansible.module_utils.basic.os.environ', environ) + + am = basic.AnsibleModule(**options_argspec_dict) + + assert isinstance(am.params['foobar']['baz'], str) + assert am.params['foobar']['baz'] == 'test data' + + @pytest.mark.parametrize('stdin', + [{'foobar': {'foo': 'required', 'bam1': 'test', 'baz': 'data', 'bar': 'case', 'bar4': '~/test'}}], + indirect=['stdin']) + def test_elements_path_in_option(self, mocker, stdin, options_argspec_dict): + """Test that the complex argspec works with elements path type""" + + am = basic.AnsibleModule(**options_argspec_dict) + + assert isinstance(am.params['foobar']['bar4'][0], str) + assert am.params['foobar']['bar4'][0].startswith('/') + + @pytest.mark.parametrize('stdin,spec,expected', [ + ({}, + {'one': {'type': 'dict', 'apply_defaults': True, 'options': {'two': {'default': True, 'type': 'bool'}}}}, + {'two': True}), + ({}, + {'one': {'type': 'dict', 'options': {'two': {'default': True, 'type': 'bool'}}}}, + None), + ], indirect=['stdin']) + def test_subspec_not_required_defaults(self, stdin, spec, expected): + # Check that top level not required, processed subspec defaults + am = basic.AnsibleModule(spec) + assert am.params['one'] == expected + + +class TestLoadFileCommonArguments: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_smoketest_load_file_common_args(self, am): + """With no file arguments, an empty dict is returned""" + am.selinux_mls_enabled = MagicMock() + am.selinux_mls_enabled.return_value = True + am.selinux_default_context = MagicMock() + am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3) + + assert am.load_file_common_arguments(params={}) == {} + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_load_file_common_args(self, am, mocker): + am.selinux_mls_enabled = MagicMock() + am.selinux_mls_enabled.return_value = True + am.selinux_default_context = MagicMock() + am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3) + + base_params = dict( + path='/path/to/file', + mode=0o600, + owner='root', + group='root', + seuser='_default', + serole='_default', + setype='_default', + selevel='_default', + ) + + extended_params = base_params.copy() + extended_params.update(dict( + follow=True, + foo='bar', + )) + + final_params = base_params.copy() + final_params.update(dict( + path='/path/to/real_file', + secontext=['unconfined_u', 'object_r', 'default_t', 's0'], + attributes=None, + )) + + # with the proper params specified, the returned dictionary should represent + # only those params which have something to do with the file arguments, excluding + # other params and updated as required with proper values which may have been + # massaged by the method + mocker.patch('os.path.islink', return_value=True) + mocker.patch('os.path.realpath', return_value='/path/to/real_file') + + res = am.load_file_common_arguments(params=extended_params) + + assert res == final_params + + +@pytest.mark.parametrize("stdin", [{"arg_pass": "testing"}], indirect=["stdin"]) +def test_no_log_true(stdin, capfd): + """Explicitly mask an argument (no_log=True).""" + arg_spec = { + "arg_pass": {"no_log": True} + } + am = basic.AnsibleModule(arg_spec) + # no_log=True is picked up by both am._log_invocation and list_no_log_values + # (called by am._handle_no_log_values). As a result, we can check for the + # value in am.no_log_values. + assert "testing" in am.no_log_values + + +@pytest.mark.parametrize("stdin", [{"arg_pass": "testing"}], indirect=["stdin"]) +def test_no_log_false(stdin, capfd): + """Explicitly log and display an argument (no_log=False).""" + arg_spec = { + "arg_pass": {"no_log": False} + } + am = basic.AnsibleModule(arg_spec) + assert "testing" not in am.no_log_values and not get_warning_messages() + + +@pytest.mark.parametrize("stdin", [{"arg_pass": "testing"}], indirect=["stdin"]) +def test_no_log_none(stdin, capfd): + """Allow Ansible to make the decision by matching the argument name + against PASSWORD_MATCH.""" + arg_spec = { + "arg_pass": {} + } + am = basic.AnsibleModule(arg_spec) + # Omitting no_log is only picked up by _log_invocation, so the value never + # makes it into am.no_log_values. Instead we can check for the warning + # emitted by am._log_invocation. + assert len(get_warning_messages()) > 0 diff --git a/test/units/module_utils/basic/test_atomic_move.py b/test/units/module_utils/basic/test_atomic_move.py new file mode 100644 index 00000000..bbdb0519 --- /dev/null +++ b/test/units/module_utils/basic/test_atomic_move.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import os +import errno +import json +from itertools import product + +import pytest + +from ansible.module_utils import basic + + +@pytest.fixture +def atomic_am(am, mocker): + am.selinux_enabled = mocker.MagicMock() + am.selinux_context = mocker.MagicMock() + am.selinux_default_context = mocker.MagicMock() + am.set_context_if_different = mocker.MagicMock() + am._unsafe_writes = mocker.MagicMock() + + yield am + + +@pytest.fixture +def atomic_mocks(mocker, monkeypatch): + environ = dict() + mocks = { + 'chmod': mocker.patch('os.chmod'), + 'chown': mocker.patch('os.chown'), + 'close': mocker.patch('os.close'), + 'environ': mocker.patch('os.environ', environ), + 'getlogin': mocker.patch('os.getlogin'), + 'getuid': mocker.patch('os.getuid'), + 'path_exists': mocker.patch('os.path.exists'), + 'rename': mocker.patch('os.rename'), + 'stat': mocker.patch('os.stat'), + 'umask': mocker.patch('os.umask'), + 'getpwuid': mocker.patch('pwd.getpwuid'), + 'copy2': mocker.patch('shutil.copy2'), + 'copyfileobj': mocker.patch('shutil.copyfileobj'), + 'move': mocker.patch('shutil.move'), + 'mkstemp': mocker.patch('tempfile.mkstemp'), + } + + mocks['getlogin'].return_value = 'root' + mocks['getuid'].return_value = 0 + mocks['getpwuid'].return_value = ('root', '', 0, 0, '', '', '') + mocks['umask'].side_effect = [18, 0] + mocks['rename'].return_value = None + + # normalize OS specific features + monkeypatch.delattr(os, 'chflags', raising=False) + + yield mocks + + +@pytest.fixture +def fake_stat(mocker): + stat1 = mocker.MagicMock() + stat1.st_mode = 0o0644 + stat1.st_uid = 0 + stat1.st_gid = 0 + stat1.st_flags = 0 + yield stat1 + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_new_file(atomic_am, atomic_mocks, mocker, selinux): + # test destination does not exist, login name = 'root', no environment, os.rename() succeeds + mock_context = atomic_am.selinux_default_context.return_value + atomic_mocks['path_exists'].return_value = False + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_existing_file(atomic_am, atomic_mocks, fake_stat, mocker, selinux): + # Test destination already present + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_no_tty_fallback(atomic_am, atomic_mocks, fake_stat, mocker): + """Raise OSError when using getlogin() to simulate no tty cornercase""" + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = True + atomic_mocks['getlogin'].side_effect = OSError() + atomic_mocks['environ']['LOGNAME'] = 'root' + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_existing_file_stat_failure(atomic_am, atomic_mocks, mocker): + """Failure to stat an existing file in order to copy permissions propogates the error (unless EPERM)""" + atomic_mocks['stat'].side_effect = OSError() + atomic_mocks['path_exists'].return_value = True + + with pytest.raises(OSError): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_existing_file_stat_perms_failure(atomic_am, atomic_mocks, mocker): + """Failure to stat an existing file to copy the permissions due to permissions passes fine""" + # and now have os.stat return EPERM, which should not fail + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].side_effect = OSError(errno.EPERM, 'testing os stat with EPERM') + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = True + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + # FIXME: Should atomic_move() set a default permission value when it cannot retrieve the + # existing file's permissions? (Right now it's up to the calling code. + # assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_rename_failure(atomic_am, atomic_mocks, mocker, capfd): + """Test os.rename fails with EIO, causing it to bail out""" + atomic_mocks['path_exists'].side_effect = [False, False] + atomic_mocks['rename'].side_effect = OSError(errno.EIO, 'failing with EIO') + + with pytest.raises(SystemExit): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + out, err = capfd.readouterr() + results = json.loads(out) + + assert 'Could not replace file' in results['msg'] + assert 'failing with EIO' in results['msg'] + assert results['failed'] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_rename_perms_fail_temp_creation_fails(atomic_am, atomic_mocks, mocker, capfd): + """Test os.rename fails with EPERM working but failure in mkstemp""" + atomic_mocks['path_exists'].return_value = False + atomic_mocks['close'].return_value = None + atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + atomic_mocks['mkstemp'].return_value = None + atomic_mocks['mkstemp'].side_effect = OSError() + atomic_am.selinux_enabled.return_value = False + + with pytest.raises(SystemExit): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + out, err = capfd.readouterr() + results = json.loads(out) + + assert 'is not writable by the current user' in results['msg'] + assert results['failed'] + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_rename_perms_fail_temp_succeeds(atomic_am, atomic_mocks, fake_stat, mocker, selinux): + """Test os.rename raising an error but fallback to using mkstemp works""" + mock_context = atomic_am.selinux_default_context.return_value + atomic_mocks['path_exists'].return_value = False + atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['stat'].side_effect = None + atomic_mocks['mkstemp'].return_value = (None, '/path/to/tempfile') + atomic_mocks['mkstemp'].side_effect = None + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + assert atomic_mocks['rename'].call_args_list == [mocker.call(b'/path/to/src', b'/path/to/dest'), + mocker.call(b'/path/to/tempfile', b'/path/to/dest')] + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call(b'/path/to/tempfile', mock_context, False), + mocker.call('/path/to/dest', mock_context, False)] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called diff --git a/test/units/module_utils/basic/test_deprecate_warn.py b/test/units/module_utils/basic/test_deprecate_warn.py new file mode 100644 index 00000000..351cf25b --- /dev/null +++ b/test/units/module_utils/basic/test_deprecate_warn.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +import pytest + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_warn(am, capfd): + + am.warn('warning1') + + with pytest.raises(SystemExit): + am.exit_json(warnings=['warning2']) + out, err = capfd.readouterr() + assert json.loads(out)['warnings'] == ['warning1', 'warning2'] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_deprecate(am, capfd): + am.deprecate('deprecation1') + am.deprecate('deprecation2', '2.3') # pylint: disable=ansible-deprecated-no-collection-name + am.deprecate('deprecation3', version='2.4') # pylint: disable=ansible-deprecated-no-collection-name + am.deprecate('deprecation4', date='2020-03-10') # pylint: disable=ansible-deprecated-no-collection-name + am.deprecate('deprecation5', collection_name='ansible.builtin') + am.deprecate('deprecation6', '2.3', collection_name='ansible.builtin') + am.deprecate('deprecation7', version='2.4', collection_name='ansible.builtin') + am.deprecate('deprecation8', date='2020-03-10', collection_name='ansible.builtin') + + with pytest.raises(SystemExit): + am.exit_json(deprecations=['deprecation9', ('deprecation10', '2.4')]) + + out, err = capfd.readouterr() + output = json.loads(out) + assert ('warnings' not in output or output['warnings'] == []) + assert output['deprecations'] == [ + {u'msg': u'deprecation1', u'version': None, u'collection_name': None}, + {u'msg': u'deprecation2', u'version': '2.3', u'collection_name': None}, + {u'msg': u'deprecation3', u'version': '2.4', u'collection_name': None}, + {u'msg': u'deprecation4', u'date': '2020-03-10', u'collection_name': None}, + {u'msg': u'deprecation5', u'version': None, u'collection_name': 'ansible.builtin'}, + {u'msg': u'deprecation6', u'version': '2.3', u'collection_name': 'ansible.builtin'}, + {u'msg': u'deprecation7', u'version': '2.4', u'collection_name': 'ansible.builtin'}, + {u'msg': u'deprecation8', u'date': '2020-03-10', u'collection_name': 'ansible.builtin'}, + {u'msg': u'deprecation9', u'version': None, u'collection_name': None}, + {u'msg': u'deprecation10', u'version': '2.4', u'collection_name': None}, + ] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_deprecate_without_list(am, capfd): + with pytest.raises(SystemExit): + am.exit_json(deprecations='Simple deprecation warning') + + out, err = capfd.readouterr() + output = json.loads(out) + assert ('warnings' not in output or output['warnings'] == []) + assert output['deprecations'] == [ + {u'msg': u'Simple deprecation warning', u'version': None, u'collection_name': None}, + ] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_deprecate_without_list(am, capfd): + with pytest.raises(AssertionError) as ctx: + am.deprecate('Simple deprecation warning', date='', version='') + assert ctx.value.args[0] == "implementation error -- version and date must not both be set" diff --git a/test/units/module_utils/basic/test_dict_converters.py b/test/units/module_utils/basic/test_dict_converters.py new file mode 100644 index 00000000..f63ed9c6 --- /dev/null +++ b/test/units/module_utils/basic/test_dict_converters.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestTextifyContainers(ModuleTestCase): + def test_module_utils_basic_json_dict_converters(self): + from ansible.module_utils.basic import json_dict_unicode_to_bytes, json_dict_bytes_to_unicode + + test_data = dict( + item1=u"Fóo", + item2=[u"Bár", u"Bam"], + item3=dict(sub1=u"Súb"), + item4=(u"föo", u"bär", u"©"), + item5=42, + ) + res = json_dict_unicode_to_bytes(test_data) + res2 = json_dict_bytes_to_unicode(res) + + self.assertEqual(test_data, res2) diff --git a/test/units/module_utils/basic/test_exit_json.py b/test/units/module_utils/basic/test_exit_json.py new file mode 100644 index 00000000..240095c0 --- /dev/null +++ b/test/units/module_utils/basic/test_exit_json.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015-2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import sys + +import pytest + + +EMPTY_INVOCATION = {u'module_args': {}} + + +class TestAnsibleModuleExitJson: + """ + Test that various means of calling exitJson and FailJson return the messages they've been given + """ + DATA = ( + ({}, {'invocation': EMPTY_INVOCATION}), + ({'msg': 'message'}, {'msg': 'message', 'invocation': EMPTY_INVOCATION}), + ({'msg': 'success', 'changed': True}, + {'msg': 'success', 'changed': True, 'invocation': EMPTY_INVOCATION}), + ({'msg': 'nochange', 'changed': False}, + {'msg': 'nochange', 'changed': False, 'invocation': EMPTY_INVOCATION}), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('args, expected, stdin', ((a, e, {}) for a, e in DATA), indirect=['stdin']) + def test_exit_json_exits(self, am, capfd, args, expected): + with pytest.raises(SystemExit) as ctx: + am.exit_json(**args) + assert ctx.value.code == 0 + + out, err = capfd.readouterr() + return_val = json.loads(out) + assert return_val == expected + + # Fail_json is only legal if it's called with a message + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('args, expected, stdin', + ((a, e, {}) for a, e in DATA if 'msg' in a), # pylint: disable=undefined-variable + indirect=['stdin']) + def test_fail_json_exits(self, am, capfd, args, expected): + with pytest.raises(SystemExit) as ctx: + am.fail_json(**args) + assert ctx.value.code == 1 + + out, err = capfd.readouterr() + return_val = json.loads(out) + # Fail_json should add failed=True + expected['failed'] = True + assert return_val == expected + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_fail_json_msg_positional(self, am, capfd): + with pytest.raises(SystemExit) as ctx: + am.fail_json('This is the msg') + assert ctx.value.code == 1 + + out, err = capfd.readouterr() + return_val = json.loads(out) + # Fail_json should add failed=True + assert return_val == {'msg': 'This is the msg', 'failed': True, + 'invocation': EMPTY_INVOCATION} + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_fail_json_msg_as_kwarg_after(self, am, capfd): + """Test that msg as a kwarg after other kwargs works""" + with pytest.raises(SystemExit) as ctx: + am.fail_json(arbitrary=42, msg='This is the msg') + assert ctx.value.code == 1 + + out, err = capfd.readouterr() + return_val = json.loads(out) + # Fail_json should add failed=True + assert return_val == {'msg': 'This is the msg', 'failed': True, + 'arbitrary': 42, + 'invocation': EMPTY_INVOCATION} + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_fail_json_no_msg(self, am): + with pytest.raises(TypeError) as ctx: + am.fail_json() + + if sys.version_info < (3,): + error_msg = "fail_json() takes exactly 2 arguments (1 given)" + else: + error_msg = "fail_json() missing 1 required positional argument: 'msg'" + + assert ctx.value.args[0] == error_msg + + +class TestAnsibleModuleExitValuesRemoved: + """ + Test that ExitJson and FailJson remove password-like values + """ + OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER' + + DATA = ( + ( + dict(username='person', password='$ecret k3y'), + dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/', + not_secret='following the leader', msg='here'), + dict(one=1, pwd=OMIT, url='https://username:password12345@foo.com/login/', + not_secret='following the leader', msg='here', + invocation=dict(module_args=dict(password=OMIT, token=None, username='person'))), + ), + ( + dict(username='person', password='password12345'), + dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/', + not_secret='following the leader', msg='here'), + dict(one=1, pwd='$ecret k3y', url='https://username:********@foo.com/login/', + not_secret='following the leader', msg='here', + invocation=dict(module_args=dict(password=OMIT, token=None, username='person'))), + ), + ( + dict(username='person', password='$ecret k3y'), + dict(one=1, pwd='$ecret k3y', url='https://username:$ecret k3y@foo.com/login/', + not_secret='following the leader', msg='here'), + dict(one=1, pwd=OMIT, url='https://username:********@foo.com/login/', + not_secret='following the leader', msg='here', + invocation=dict(module_args=dict(password=OMIT, token=None, username='person'))), + ), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('am, stdin, return_val, expected', + (({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e) + for s, r, e in DATA), # pylint: disable=undefined-variable + indirect=['am', 'stdin']) + def test_exit_json_removes_values(self, am, capfd, return_val, expected): + with pytest.raises(SystemExit): + am.exit_json(**return_val) + out, err = capfd.readouterr() + + assert json.loads(out) == expected + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('am, stdin, return_val, expected', + (({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e) + for s, r, e in DATA), # pylint: disable=undefined-variable + indirect=['am', 'stdin']) + def test_fail_json_removes_values(self, am, capfd, return_val, expected): + expected['failed'] = True + with pytest.raises(SystemExit): + am.fail_json(**return_val) == expected + out, err = capfd.readouterr() + + assert json.loads(out) == expected diff --git a/test/units/module_utils/basic/test_filesystem.py b/test/units/module_utils/basic/test_filesystem.py new file mode 100644 index 00000000..37d1c553 --- /dev/null +++ b/test/units/module_utils/basic/test_filesystem.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from units.compat.mock import patch, MagicMock +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestOtherFilesystem(ModuleTestCase): + def test_module_utils_basic_ansible_module_user_and_group(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + mock_stat = MagicMock() + mock_stat.st_uid = 0 + mock_stat.st_gid = 0 + + with patch('os.lstat', return_value=mock_stat): + self.assertEqual(am.user_and_group('/path/to/file'), (0, 0)) + + def test_module_utils_basic_ansible_module_find_mount_point(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + def _mock_ismount(path): + if path == b'/': + return True + return False + + with patch('os.path.ismount', side_effect=_mock_ismount): + self.assertEqual(am.find_mount_point('/root/fs/../mounted/path/to/whatever'), '/') + + def _mock_ismount(path): + if path == b'/subdir/mount': + return True + if path == b'/': + return True + return False + + with patch('os.path.ismount', side_effect=_mock_ismount): + self.assertEqual(am.find_mount_point('/subdir/mount/path/to/whatever'), '/subdir/mount') + + def test_module_utils_basic_ansible_module_set_owner_if_different(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + self.assertEqual(am.set_owner_if_different('/path/to/file', None, True), True) + self.assertEqual(am.set_owner_if_different('/path/to/file', None, False), False) + + am.user_and_group = MagicMock(return_value=(500, 500)) + + with patch('os.lchown', return_value=None) as m: + self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True) + m.assert_called_with(b'/path/to/file', 0, -1) + + def _mock_getpwnam(*args, **kwargs): + mock_pw = MagicMock() + mock_pw.pw_uid = 0 + return mock_pw + + m.reset_mock() + with patch('pwd.getpwnam', side_effect=_mock_getpwnam): + self.assertEqual(am.set_owner_if_different('/path/to/file', 'root', False), True) + m.assert_called_with(b'/path/to/file', 0, -1) + + with patch('pwd.getpwnam', side_effect=KeyError): + self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False) + + m.reset_mock() + am.check_mode = True + self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True) + self.assertEqual(m.called, False) + am.check_mode = False + + with patch('os.lchown', side_effect=OSError) as m: + self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False) + + def test_module_utils_basic_ansible_module_set_group_if_different(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + self.assertEqual(am.set_group_if_different('/path/to/file', None, True), True) + self.assertEqual(am.set_group_if_different('/path/to/file', None, False), False) + + am.user_and_group = MagicMock(return_value=(500, 500)) + + with patch('os.lchown', return_value=None) as m: + self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True) + m.assert_called_with(b'/path/to/file', -1, 0) + + def _mock_getgrnam(*args, **kwargs): + mock_gr = MagicMock() + mock_gr.gr_gid = 0 + return mock_gr + + m.reset_mock() + with patch('grp.getgrnam', side_effect=_mock_getgrnam): + self.assertEqual(am.set_group_if_different('/path/to/file', 'root', False), True) + m.assert_called_with(b'/path/to/file', -1, 0) + + with patch('grp.getgrnam', side_effect=KeyError): + self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False) + + m.reset_mock() + am.check_mode = True + self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True) + self.assertEqual(m.called, False) + am.check_mode = False + + with patch('os.lchown', side_effect=OSError) as m: + self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False) diff --git a/test/units/module_utils/basic/test_get_file_attributes.py b/test/units/module_utils/basic/test_get_file_attributes.py new file mode 100644 index 00000000..5130a5fb --- /dev/null +++ b/test/units/module_utils/basic/test_get_file_attributes.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright: +# (c) 2017, Pierre-Louis Bonicoli <pierre-louis@libregerbil.fr> +# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from itertools import product + +from ansible.module_utils.basic import AnsibleModule + +import pytest + + +DATA = ( + ( + '3353595900 --------------e---- /usr/lib32', + {'attr_flags': 'e', 'version': '3353595900', 'attributes': ['extents']} + ), + # with e2fsprogs < 1.43, output isn't aligned + ( + '78053594 -----------I--e---- /usr/lib', + {'attr_flags': 'Ie', 'version': '78053594', 'attributes': ['indexed', 'extents']} + ), + ( + '15711607 -------A------e---- /tmp/test', + {'attr_flags': 'Ae', 'version': '15711607', 'attributes': ['noatime', 'extents']} + ), + # with e2fsprogs >= 1.43, output is aligned + ( + '78053594 -----------I--e---- /usr/lib', + {'attr_flags': 'Ie', 'version': '78053594', 'attributes': ['indexed', 'extents']} + ), + ( + '15711607 -------A------e---- /tmp/test', + {'attr_flags': 'Ae', 'version': '15711607', 'attributes': ['noatime', 'extents']} + ), +) + + +@pytest.mark.parametrize('stdin, data', product(({},), DATA), indirect=['stdin']) +def test_get_file_attributes(am, stdin, mocker, data): + # Test #18731 + mocker.patch.object(AnsibleModule, 'get_bin_path', return_value=(0, '/usr/bin/lsattr', '')) + mocker.patch.object(AnsibleModule, 'run_command', return_value=(0, data[0], '')) + result = am.get_file_attributes('/path/to/file') + for key, value in data[1].items(): + assert key in result and result[key] == value diff --git a/test/units/module_utils/basic/test_get_module_path.py b/test/units/module_utils/basic/test_get_module_path.py new file mode 100644 index 00000000..6ff4a3bc --- /dev/null +++ b/test/units/module_utils/basic/test_get_module_path.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from units.compat.mock import patch +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestGetModulePath(ModuleTestCase): + def test_module_utils_basic_get_module_path(self): + from ansible.module_utils.basic import get_module_path + with patch('os.path.realpath', return_value='/path/to/foo/'): + self.assertEqual(get_module_path(), '/path/to/foo') diff --git a/test/units/module_utils/basic/test_heuristic_log_sanitize.py b/test/units/module_utils/basic/test_heuristic_log_sanitize.py new file mode 100644 index 00000000..f8a0929d --- /dev/null +++ b/test/units/module_utils/basic/test_heuristic_log_sanitize.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from units.compat import unittest +from ansible.module_utils.basic import heuristic_log_sanitize + + +class TestHeuristicLogSanitize(unittest.TestCase): + def setUp(self): + self.URL_SECRET = 'http://username:pas:word@foo.com/data' + self.SSH_SECRET = 'username:pas:word@foo.com/data' + self.clean_data = repr(self._gen_data(3, True, True, 'no_secret_here')) + self.url_data = repr(self._gen_data(3, True, True, self.URL_SECRET)) + self.ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET)) + + def _gen_data(self, records, per_rec, top_level, secret_text): + hostvars = {'hostvars': {}} + for i in range(1, records, 1): + host_facts = { + 'host%s' % i: { + 'pstack': { + 'running': '875.1', + 'symlinked': '880.0', + 'tars': [], + 'versions': ['885.0'] + }, + } + } + if per_rec: + host_facts['host%s' % i]['secret'] = secret_text + hostvars['hostvars'].update(host_facts) + if top_level: + hostvars['secret'] = secret_text + return hostvars + + def test_did_not_hide_too_much(self): + self.assertEqual(heuristic_log_sanitize(self.clean_data), self.clean_data) + + def test_hides_url_secrets(self): + url_output = heuristic_log_sanitize(self.url_data) + # Basic functionality: Successfully hid the password + self.assertNotIn('pas:word', url_output) + + # Slightly more advanced, we hid all of the password despite the ":" + self.assertNotIn('pas', url_output) + + # In this implementation we replace the password with 8 "*" which is + # also the length of our password. The url fields should be able to + # accurately detect where the password ends so the length should be + # the same: + self.assertEqual(len(url_output), len(self.url_data)) + + def test_hides_ssh_secrets(self): + ssh_output = heuristic_log_sanitize(self.ssh_data) + self.assertNotIn('pas:word', ssh_output) + + # Slightly more advanced, we hid all of the password despite the ":" + self.assertNotIn('pas', ssh_output) + + # ssh checking is harder as the heuristic is overzealous in many + # cases. Since the input will have at least one ":" present before + # the password we can tell some things about the beginning and end of + # the data, though: + self.assertTrue(ssh_output.startswith("{'")) + self.assertTrue(ssh_output.endswith("}")) + self.assertIn(":********@foo.com/data'", ssh_output) + + def test_hides_parameter_secrets(self): + output = heuristic_log_sanitize('token="secret", user="person", token_entry="test=secret"', frozenset(['secret'])) + self.assertNotIn('secret', output) diff --git a/test/units/module_utils/basic/test_imports.py b/test/units/module_utils/basic/test_imports.py new file mode 100644 index 00000000..9d8ae68d --- /dev/null +++ b/test/units/module_utils/basic/test_imports.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import sys + +from units.mock.procenv import ModuleTestCase + +from units.compat import unittest +from units.compat.mock import patch +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestImports(ModuleTestCase): + + def clear_modules(self, mods): + for mod in mods: + if mod in sys.modules: + del sys.modules[mod] + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_syslog(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'syslog': + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['syslog', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.HAS_SYSLOG) + + self.clear_modules(['syslog', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.HAS_SYSLOG) + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_selinux(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'selinux': + raise ImportError + return realimport(name, *args, **kwargs) + + try: + self.clear_modules(['selinux', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.HAVE_SELINUX) + except ImportError: + # no selinux on test system, so skip + pass + + self.clear_modules(['selinux', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.HAVE_SELINUX) + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_json(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'ansible.module_utils.common._json_compat': + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['json', 'ansible.module_utils.basic']) + builtins.__import__('ansible.module_utils.basic') + self.clear_modules(['json', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + with self.assertRaises(SystemExit): + builtins.__import__('ansible.module_utils.basic') + + # FIXME: doesn't work yet + # @patch.object(builtins, 'bytes') + # def test_module_utils_basic_bytes(self, mock_bytes): + # mock_bytes.side_effect = NameError() + # from ansible.module_utils import basic + + @patch.object(builtins, '__import__') + @unittest.skipIf(sys.version_info[0] >= 3, "literal_eval is available in every version of Python3") + def test_module_utils_basic_import_literal_eval(self, mock_import): + def _mock_import(name, *args, **kwargs): + try: + fromlist = kwargs.get('fromlist', args[2]) + except IndexError: + fromlist = [] + if name == 'ast' and 'literal_eval' in fromlist: + raise ImportError + return realimport(name, *args, **kwargs) + + mock_import.side_effect = _mock_import + self.clear_modules(['ast', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertEqual(mod.module_utils.basic.literal_eval("'1'"), "1") + self.assertEqual(mod.module_utils.basic.literal_eval("1"), 1) + self.assertEqual(mod.module_utils.basic.literal_eval("-1"), -1) + self.assertEqual(mod.module_utils.basic.literal_eval("(1,2,3)"), (1, 2, 3)) + self.assertEqual(mod.module_utils.basic.literal_eval("[1]"), [1]) + self.assertEqual(mod.module_utils.basic.literal_eval("True"), True) + self.assertEqual(mod.module_utils.basic.literal_eval("False"), False) + self.assertEqual(mod.module_utils.basic.literal_eval("None"), None) + # self.assertEqual(mod.module_utils.basic.literal_eval('{"a": 1}'), dict(a=1)) + self.assertRaises(ValueError, mod.module_utils.basic.literal_eval, "asdfasdfasdf") + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_systemd_journal(self, mock_import): + def _mock_import(name, *args, **kwargs): + try: + fromlist = kwargs.get('fromlist', args[2]) + except IndexError: + fromlist = [] + if name == 'systemd' and 'journal' in fromlist: + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['systemd', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.has_journal) + + self.clear_modules(['systemd', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.has_journal) diff --git a/test/units/module_utils/basic/test_log.py b/test/units/module_utils/basic/test_log.py new file mode 100644 index 00000000..f3f764fc --- /dev/null +++ b/test/units/module_utils/basic/test_log.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import syslog +from itertools import product + +import pytest + +import ansible.module_utils.basic +from ansible.module_utils.six import PY3 + + +class TestAnsibleModuleLogSmokeTest: + DATA = [u'Text string', u'Toshio くらとみ non-ascii test'] + DATA = DATA + [d.encode('utf-8') for d in DATA] + DATA += [b'non-utf8 :\xff: test'] + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable + def test_smoketest_syslog(self, am, mocker, msg): + # These talk to the live daemons on the system. Need to do this to + # show that what we send doesn't cause an issue once it gets to the + # daemon. These are just smoketests to test that we don't fail. + mocker.patch('ansible.module_utils.basic.has_journal', False) + + am.log(u'Text string') + am.log(u'Toshio くらとみ non-ascii test') + + am.log(b'Byte string') + am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8')) + am.log(b'non-utf8 :\xff: test') + + @pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed') + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable + def test_smoketest_journal(self, am, mocker, msg): + # These talk to the live daemons on the system. Need to do this to + # show that what we send doesn't cause an issue once it gets to the + # daemon. These are just smoketests to test that we don't fail. + mocker.patch('ansible.module_utils.basic.has_journal', True) + + am.log(u'Text string') + am.log(u'Toshio くらとみ non-ascii test') + + am.log(b'Byte string') + am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8')) + am.log(b'non-utf8 :\xff: test') + + +class TestAnsibleModuleLogSyslog: + """Test the AnsibleModule Log Method""" + + PY2_OUTPUT_DATA = [ + (u'Text string', b'Text string'), + (u'Toshio くらとみ non-ascii test', u'Toshio くらとみ non-ascii test'.encode('utf-8')), + (b'Byte string', b'Byte string'), + (u'Toshio くらとみ non-ascii test'.encode('utf-8'), u'Toshio くらとみ non-ascii test'.encode('utf-8')), + (b'non-utf8 :\xff: test', b'non-utf8 :\xff: test'.decode('utf-8', 'replace').encode('utf-8')), + ] + + PY3_OUTPUT_DATA = [ + (u'Text string', u'Text string'), + (u'Toshio くらとみ non-ascii test', u'Toshio くらとみ non-ascii test'), + (b'Byte string', u'Byte string'), + (u'Toshio くらとみ non-ascii test'.encode('utf-8'), u'Toshio くらとみ non-ascii test'), + (b'non-utf8 :\xff: test', b'non-utf8 :\xff: test'.decode('utf-8', 'replace')), + ] + + OUTPUT_DATA = PY3_OUTPUT_DATA if PY3 else PY2_OUTPUT_DATA + + @pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin']) + def test_no_log(self, am, mocker, no_log): + """Test that when no_log is set, logging does not occur""" + mock_syslog = mocker.patch('syslog.syslog', autospec=True) + mocker.patch('ansible.module_utils.basic.has_journal', False) + am.no_log = no_log + am.log('unittest no_log') + if no_log: + assert not mock_syslog.called + else: + mock_syslog.assert_called_once_with(syslog.LOG_INFO, 'unittest no_log') + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, param, stdin', + ((m, p, {}) for m, p in OUTPUT_DATA), # pylint: disable=undefined-variable + indirect=['stdin']) + def test_output_matches(self, am, mocker, msg, param): + """Check that log messages are sent correctly""" + mocker.patch('ansible.module_utils.basic.has_journal', False) + mock_syslog = mocker.patch('syslog.syslog', autospec=True) + + am.log(msg) + mock_syslog.assert_called_once_with(syslog.LOG_INFO, param) + + +@pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed') +class TestAnsibleModuleLogJournal: + """Test the AnsibleModule Log Method""" + + OUTPUT_DATA = [ + (u'Text string', u'Text string'), + (u'Toshio くらとみ non-ascii test', u'Toshio くらとみ non-ascii test'), + (b'Byte string', u'Byte string'), + (u'Toshio くらとみ non-ascii test'.encode('utf-8'), u'Toshio くらとみ non-ascii test'), + (b'non-utf8 :\xff: test', b'non-utf8 :\xff: test'.decode('utf-8', 'replace')), + ] + + @pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin']) + def test_no_log(self, am, mocker, no_log): + journal_send = mocker.patch('systemd.journal.send') + am.no_log = no_log + am.log('unittest no_log') + if no_log: + assert not journal_send.called + else: + assert journal_send.called == 1 + # Message + # call_args is a 2-tuple of (arg_list, kwarg_dict) + assert journal_send.call_args[1]['MESSAGE'].endswith('unittest no_log'), 'Message was not sent to log' + # log adds this journal field + assert 'MODULE' in journal_send.call_args[1] + assert 'basic.py' in journal_send.call_args[1]['MODULE'] + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + @pytest.mark.parametrize('msg, param, stdin', + ((m, p, {}) for m, p in OUTPUT_DATA), # pylint: disable=undefined-variable + indirect=['stdin']) + def test_output_matches(self, am, mocker, msg, param): + journal_send = mocker.patch('systemd.journal.send') + am.log(msg) + assert journal_send.call_count == 1, 'journal.send not called exactly once' + assert journal_send.call_args[1]['MESSAGE'].endswith(param) + + @pytest.mark.parametrize('stdin', ({},), indirect=['stdin']) + def test_log_args(self, am, mocker): + journal_send = mocker.patch('systemd.journal.send') + am.log('unittest log_args', log_args=dict(TEST='log unittest')) + assert journal_send.called == 1 + assert journal_send.call_args[1]['MESSAGE'].endswith('unittest log_args'), 'Message was not sent to log' + + # log adds this journal field + assert 'MODULE' in journal_send.call_args[1] + assert 'basic.py' in journal_send.call_args[1]['MODULE'] + + # We added this journal field + assert 'TEST' in journal_send.call_args[1] + assert 'log unittest' in journal_send.call_args[1]['TEST'] diff --git a/test/units/module_utils/basic/test_no_log.py b/test/units/module_utils/basic/test_no_log.py new file mode 100644 index 00000000..c4797028 --- /dev/null +++ b/test/units/module_utils/basic/test_no_log.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.compat import unittest + +from ansible.module_utils.basic import remove_values +from ansible.module_utils.common.parameters import _return_datastructure_name + + +class TestReturnValues(unittest.TestCase): + dataset = ( + ('string', frozenset(['string'])), + ('', frozenset()), + (1, frozenset(['1'])), + (1.0, frozenset(['1.0'])), + (False, frozenset()), + (['1', '2', '3'], frozenset(['1', '2', '3'])), + (('1', '2', '3'), frozenset(['1', '2', '3'])), + ({'one': 1, 'two': 'dos'}, frozenset(['1', 'dos'])), + ( + { + 'one': 1, + 'two': 'dos', + 'three': [ + 'amigos', 'musketeers', None, { + 'ping': 'pong', + 'base': ( + 'balls', 'raquets' + ) + } + ] + }, + frozenset(['1', 'dos', 'amigos', 'musketeers', 'pong', 'balls', 'raquets']) + ), + (u'Toshio くらとみ', frozenset(['Toshio くらとみ'])), + ('Toshio くらとみ', frozenset(['Toshio くらとみ'])), + ) + + def test_return_datastructure_name(self): + for data, expected in self.dataset: + self.assertEqual(frozenset(_return_datastructure_name(data)), expected) + + def test_unknown_type(self): + self.assertRaises(TypeError, frozenset, _return_datastructure_name(object())) + + +class TestRemoveValues(unittest.TestCase): + OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER' + dataset_no_remove = ( + ('string', frozenset(['nope'])), + (1234, frozenset(['4321'])), + (False, frozenset(['4321'])), + (1.0, frozenset(['4321'])), + (['string', 'strang', 'strung'], frozenset(['nope'])), + ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['nope'])), + ( + { + 'one': 1, + 'two': 'dos', + 'three': [ + 'amigos', 'musketeers', None, { + 'ping': 'pong', 'base': ['balls', 'raquets'] + } + ] + }, + frozenset(['nope']) + ), + (u'Toshio くら'.encode('utf-8'), frozenset([u'とみ'.encode('utf-8')])), + (u'Toshio くら', frozenset([u'とみ'])), + ) + dataset_remove = ( + ('string', frozenset(['string']), OMIT), + (1234, frozenset(['1234']), OMIT), + (1234, frozenset(['23']), OMIT), + (1.0, frozenset(['1.0']), OMIT), + (['string', 'strang', 'strung'], frozenset(['strang']), ['string', OMIT, 'strung']), + (['string', 'strang', 'strung'], frozenset(['strang', 'string', 'strung']), [OMIT, OMIT, OMIT]), + (('string', 'strang', 'strung'), frozenset(['string', 'strung']), [OMIT, 'strang', OMIT]), + ((1234567890, 345678, 987654321), frozenset(['1234567890']), [OMIT, 345678, 987654321]), + ((1234567890, 345678, 987654321), frozenset(['345678']), [OMIT, OMIT, 987654321]), + ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key']), {'one': 1, 'two': 'dos', 'secret': OMIT}), + ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key', 'dos', '1']), {'one': OMIT, 'two': OMIT, 'secret': OMIT}), + ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key', 'dos', '1']), {'one': OMIT, 'two': OMIT, 'secret': OMIT}), + ( + { + 'one': 1, + 'two': 'dos', + 'three': [ + 'amigos', 'musketeers', None, { + 'ping': 'pong', 'base': [ + 'balls', 'raquets' + ] + } + ] + }, + frozenset(['balls', 'base', 'pong', 'amigos']), + { + 'one': 1, + 'two': 'dos', + 'three': [ + OMIT, 'musketeers', None, { + 'ping': OMIT, + 'base': [ + OMIT, 'raquets' + ] + } + ] + } + ), + ( + 'This sentence has an enigma wrapped in a mystery inside of a secret. - mr mystery', + frozenset(['enigma', 'mystery', 'secret']), + 'This sentence has an ******** wrapped in a ******** inside of a ********. - mr ********' + ), + (u'Toshio くらとみ'.encode('utf-8'), frozenset([u'くらとみ'.encode('utf-8')]), u'Toshio ********'.encode('utf-8')), + (u'Toshio くらとみ', frozenset([u'くらとみ']), u'Toshio ********'), + ) + + def test_no_removal(self): + for value, no_log_strings in self.dataset_no_remove: + self.assertEqual(remove_values(value, no_log_strings), value) + + def test_strings_to_remove(self): + for value, no_log_strings, expected in self.dataset_remove: + self.assertEqual(remove_values(value, no_log_strings), expected) + + def test_unknown_type(self): + self.assertRaises(TypeError, remove_values, object(), frozenset()) + + def test_hit_recursion_limit(self): + """ Check that we do not hit a recursion limit""" + data_list = [] + inner_list = data_list + for i in range(0, 10000): + new_list = [] + inner_list.append(new_list) + inner_list = new_list + inner_list.append('secret') + + # Check that this does not hit a recursion limit + actual_data_list = remove_values(data_list, frozenset(('secret',))) + + levels = 0 + inner_list = actual_data_list + while inner_list: + if isinstance(inner_list, list): + self.assertEqual(len(inner_list), 1) + else: + levels -= 1 + break + inner_list = inner_list[0] + levels += 1 + + self.assertEqual(inner_list, self.OMIT) + self.assertEqual(levels, 10000) diff --git a/test/units/module_utils/basic/test_platform_distribution.py b/test/units/module_utils/basic/test_platform_distribution.py new file mode 100644 index 00000000..d7a4510c --- /dev/null +++ b/test/units/module_utils/basic/test_platform_distribution.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017-2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import pytest + +from units.compat.mock import patch + +from ansible.module_utils.six.moves import builtins + +# Functions being tested +from ansible.module_utils.basic import get_platform +from ansible.module_utils.basic import get_all_subclasses +from ansible.module_utils.basic import get_distribution +from ansible.module_utils.basic import get_distribution_version +from ansible.module_utils.basic import load_platform_subclass + + +realimport = builtins.__import__ + + +@pytest.fixture +def platform_linux(mocker): + mocker.patch('platform.system', return_value='Linux') + + +# +# get_platform tests +# + +def test_get_platform(): + with patch('platform.system', return_value='foo'): + assert get_platform() == 'foo' + + +# +# get_distribution tests +# + +def test_get_distribution_not_linux(): + """If it's not Linux, then it has no distribution""" + with patch('platform.system', return_value='Foo'): + assert get_distribution() is None + + +@pytest.mark.usefixtures("platform_linux") +class TestGetDistribution: + """Tests for get_distribution that have to find something""" + def test_distro_known(self): + with patch('ansible.module_utils.distro.id', return_value="alpine"): + assert get_distribution() == "Alpine" + + with patch('ansible.module_utils.distro.id', return_value="arch"): + assert get_distribution() == "Arch" + + with patch('ansible.module_utils.distro.id', return_value="centos"): + assert get_distribution() == "Centos" + + with patch('ansible.module_utils.distro.id', return_value="clear-linux-os"): + assert get_distribution() == "Clear-linux-os" + + with patch('ansible.module_utils.distro.id', return_value="coreos"): + assert get_distribution() == "Coreos" + + with patch('ansible.module_utils.distro.id', return_value="debian"): + assert get_distribution() == "Debian" + + with patch('ansible.module_utils.distro.id', return_value="flatcar"): + assert get_distribution() == "Flatcar" + + with patch('ansible.module_utils.distro.id', return_value="linuxmint"): + assert get_distribution() == "Linuxmint" + + with patch('ansible.module_utils.distro.id', return_value="opensuse"): + assert get_distribution() == "Opensuse" + + with patch('ansible.module_utils.distro.id', return_value="oracle"): + assert get_distribution() == "Oracle" + + with patch('ansible.module_utils.distro.id', return_value="raspian"): + assert get_distribution() == "Raspian" + + with patch('ansible.module_utils.distro.id', return_value="rhel"): + assert get_distribution() == "Redhat" + + with patch('ansible.module_utils.distro.id', return_value="ubuntu"): + assert get_distribution() == "Ubuntu" + + with patch('ansible.module_utils.distro.id', return_value="virtuozzo"): + assert get_distribution() == "Virtuozzo" + + with patch('ansible.module_utils.distro.id', return_value="foo"): + assert get_distribution() == "Foo" + + def test_distro_unknown(self): + with patch('ansible.module_utils.distro.id', return_value=""): + assert get_distribution() == "OtherLinux" + + def test_distro_amazon_linux_short(self): + with patch('ansible.module_utils.distro.id', return_value="amzn"): + assert get_distribution() == "Amazon" + + def test_distro_amazon_linux_long(self): + with patch('ansible.module_utils.distro.id', return_value="amazon"): + assert get_distribution() == "Amazon" + + +# +# get_distribution_version tests +# + +def test_get_distribution_version_not_linux(): + """If it's not Linux, then it has no distribution""" + with patch('platform.system', return_value='Foo'): + assert get_distribution_version() is None + + +@pytest.mark.usefixtures("platform_linux") +def test_distro_found(): + with patch('ansible.module_utils.distro.version', return_value="1"): + assert get_distribution_version() == "1" + + +# +# Tests for LoadPlatformSubclass +# + +class TestLoadPlatformSubclass: + class LinuxTest: + pass + + class Foo(LinuxTest): + platform = "Linux" + distribution = None + + class Bar(LinuxTest): + platform = "Linux" + distribution = "Bar" + + def test_not_linux(self): + # if neither match, the fallback should be the top-level class + with patch('platform.system', return_value="Foo"): + with patch('ansible.module_utils.common.sys_info.get_distribution', return_value=None): + assert isinstance(load_platform_subclass(self.LinuxTest), self.LinuxTest) + + @pytest.mark.usefixtures("platform_linux") + def test_get_distribution_none(self): + # match just the platform class, not a specific distribution + with patch('ansible.module_utils.common.sys_info.get_distribution', return_value=None): + assert isinstance(load_platform_subclass(self.LinuxTest), self.Foo) + + @pytest.mark.usefixtures("platform_linux") + def test_get_distribution_found(self): + # match both the distribution and platform class + with patch('ansible.module_utils.common.sys_info.get_distribution', return_value="Bar"): + assert isinstance(load_platform_subclass(self.LinuxTest), self.Bar) + + +# +# Tests for get_all_subclasses +# + +class TestGetAllSubclasses: + class Base: + pass + + class BranchI(Base): + pass + + class BranchII(Base): + pass + + class BranchIA(BranchI): + pass + + class BranchIB(BranchI): + pass + + class BranchIIA(BranchII): + pass + + class BranchIIB(BranchII): + pass + + def test_bottom_level(self): + assert get_all_subclasses(self.BranchIIB) == [] + + def test_one_inheritance(self): + assert set(get_all_subclasses(self.BranchII)) == set([self.BranchIIA, self.BranchIIB]) + + def test_toplevel(self): + assert set(get_all_subclasses(self.Base)) == set([self.BranchI, self.BranchII, + self.BranchIA, self.BranchIB, + self.BranchIIA, self.BranchIIB]) diff --git a/test/units/module_utils/basic/test_run_command.py b/test/units/module_utils/basic/test_run_command.py new file mode 100644 index 00000000..25f1c48e --- /dev/null +++ b/test/units/module_utils/basic/test_run_command.py @@ -0,0 +1,283 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import errno +from itertools import product +from io import BytesIO + +import pytest + +from ansible.module_utils._text import to_native +from ansible.module_utils.six import PY2 +from ansible.module_utils.compat import selectors + + +class OpenBytesIO(BytesIO): + """BytesIO with dummy close() method + + So that you can inspect the content after close() was called. + """ + + def close(self): + pass + + +@pytest.fixture +def mock_os(mocker): + def mock_os_chdir(path): + if path == '/inaccessible': + raise OSError(errno.EPERM, "Permission denied: '/inaccessible'") + + def mock_os_abspath(path): + if path.startswith('/'): + return path + else: + return os.getcwd.return_value + '/' + path + + os = mocker.patch('ansible.module_utils.basic.os') + + os.path.expandvars.side_effect = lambda x: x + os.path.expanduser.side_effect = lambda x: x + os.environ = {'PATH': '/bin'} + os.getcwd.return_value = '/home/foo' + os.path.isdir.return_value = True + os.chdir.side_effect = mock_os_chdir + os.path.abspath.side_effect = mock_os_abspath + + yield os + + +class DummyFileObj(): + def __init__(self, fileobj): + self.fileobj = fileobj + + +class SpecialBytesIO(BytesIO): + def __init__(self, *args, **kwargs): + fh = kwargs.pop('fh', None) + super(SpecialBytesIO, self).__init__(*args, **kwargs) + self.fh = fh + + def fileno(self): + return self.fh + + # We need to do this because some of our tests create a new value for stdout and stderr + # The new value is able to affect the string that is returned by the subprocess stdout and + # stderr but by the time the test gets it, it is too late to change the SpecialBytesIO that + # subprocess.Popen returns for stdout and stderr. If we could figure out how to change those as + # well, then we wouldn't need this. + def __eq__(self, other): + if id(self) == id(other) or self.fh == other.fileno(): + return True + return False + + +class DummyKey: + def __init__(self, fileobj): + self.fileobj = fileobj + + +@pytest.fixture +def mock_subprocess(mocker): + + class MockSelector(selectors.BaseSelector): + def __init__(self): + super(MockSelector, self).__init__() + self._file_objs = [] + + def register(self, fileobj, events, data=None): + self._file_objs.append(fileobj) + + def unregister(self, fileobj): + self._file_objs.remove(fileobj) + + def select(self, timeout=None): + ready = [] + for file_obj in self._file_objs: + ready.append((DummyKey(subprocess._output[file_obj.fileno()]), selectors.EVENT_READ)) + return ready + + def get_map(self): + return self._file_objs + + def close(self): + super(MockSelector, self).close() + self._file_objs = [] + + selectors.DefaultSelector = MockSelector + + subprocess = mocker.patch('ansible.module_utils.basic.subprocess') + subprocess._output = {mocker.sentinel.stdout: SpecialBytesIO(b'', fh=mocker.sentinel.stdout), + mocker.sentinel.stderr: SpecialBytesIO(b'', fh=mocker.sentinel.stderr)} + + cmd = mocker.MagicMock() + cmd.returncode = 0 + cmd.stdin = OpenBytesIO() + cmd.stdout = subprocess._output[mocker.sentinel.stdout] + cmd.stderr = subprocess._output[mocker.sentinel.stderr] + subprocess.Popen.return_value = cmd + + yield subprocess + + +@pytest.fixture() +def rc_am(mocker, am, mock_os, mock_subprocess): + am.fail_json = mocker.MagicMock(side_effect=SystemExit) + am._os = mock_os + am._subprocess = mock_subprocess + yield am + + +class TestRunCommandArgs: + # Format is command as passed to run_command, command to Popen as list, command to Popen as string + ARGS_DATA = ( + (['/bin/ls', 'a', 'b', 'c'], [b'/bin/ls', b'a', b'b', b'c'], b'/bin/ls a b c'), + ('/bin/ls a " b" "c "', [b'/bin/ls', b'a', b' b', b'c '], b'/bin/ls a " b" "c "'), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('cmd, expected, shell, stdin', + ((arg, cmd_str if sh else cmd_lst, sh, {}) + for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))), + indirect=['stdin']) + def test_args(self, cmd, expected, shell, rc_am): + rc_am.run_command(cmd, use_unsafe_shell=shell) + assert rc_am._subprocess.Popen.called + args, kwargs = rc_am._subprocess.Popen.call_args + assert args == (expected, ) + assert kwargs['shell'] == shell + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_tuple_as_args(self, rc_am): + with pytest.raises(SystemExit): + rc_am.run_command(('ls', '/')) + assert rc_am.fail_json.called + + +class TestRunCommandCwd: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd(self, mocker, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am.run_command('/bin/ls', cwd='/new') + assert rc_am._os.chdir.mock_calls == [mocker.call(b'/new'), mocker.call('/old'), ] + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd_relative_path(self, mocker, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am.run_command('/bin/ls', cwd='sub-dir') + assert rc_am._os.chdir.mock_calls == [mocker.call(b'/old/sub-dir'), mocker.call('/old'), ] + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd_not_a_dir(self, mocker, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir' + rc_am.run_command('/bin/ls', cwd='/not-a-dir') + assert rc_am._os.chdir.mock_calls == [mocker.call('/old'), ] + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_cwd_not_a_dir_noignore(self, rc_am): + rc_am._os.getcwd.return_value = '/old' + rc_am._os.path.isdir.side_effect = lambda d: d != '/not-a-dir' + with pytest.raises(SystemExit): + rc_am.run_command('/bin/ls', cwd='/not-a-dir', ignore_invalid_cwd=False) + assert rc_am.fail_json.called + + +class TestRunCommandPrompt: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_prompt_bad_regex(self, rc_am): + with pytest.raises(SystemExit): + rc_am.run_command('foo', prompt_regex='[pP)assword:') + assert rc_am.fail_json.called + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_prompt_no_match(self, mocker, rc_am): + rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello') + (rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:') + assert rc == 0 + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_prompt_match_wo_data(self, mocker, rc_am): + rc_am._subprocess._output = {mocker.sentinel.stdout: + SpecialBytesIO(b'Authentication required!\nEnter password: ', + fh=mocker.sentinel.stdout), + mocker.sentinel.stderr: + SpecialBytesIO(b'', fh=mocker.sentinel.stderr)} + (rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None) + assert rc == 257 + + +class TestRunCommandRc: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_check_rc_false(self, rc_am): + rc_am._subprocess.Popen.return_value.returncode = 1 + (rc, _, _) = rc_am.run_command('/bin/false', check_rc=False) + assert rc == 1 + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_check_rc_true(self, rc_am): + rc_am._subprocess.Popen.return_value.returncode = 1 + with pytest.raises(SystemExit): + rc_am.run_command('/bin/false', check_rc=True) + assert rc_am.fail_json.called + args, kwargs = rc_am.fail_json.call_args + assert kwargs['rc'] == 1 + + +class TestRunCommandOutput: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_text_stdin(self, rc_am): + (rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world') + assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n' + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_ascii_stdout(self, mocker, rc_am): + rc_am._subprocess._output = {mocker.sentinel.stdout: + SpecialBytesIO(b'hello', fh=mocker.sentinel.stdout), + mocker.sentinel.stderr: + SpecialBytesIO(b'', fh=mocker.sentinel.stderr)} + (rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt') + assert rc == 0 + # module_utils function. On py3 it returns text and py2 it returns + # bytes because it's returning native strings + assert stdout == 'hello' + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_utf8_output(self, mocker, rc_am): + rc_am._subprocess._output = {mocker.sentinel.stdout: + SpecialBytesIO(u'Žarn§'.encode('utf-8'), + fh=mocker.sentinel.stdout), + mocker.sentinel.stderr: + SpecialBytesIO(u'لرئيسية'.encode('utf-8'), + fh=mocker.sentinel.stderr)} + (rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly') + assert rc == 0 + # module_utils function. On py3 it returns text and py2 it returns + # bytes because it's returning native strings + assert stdout == to_native(u'Žarn§') + assert stderr == to_native(u'لرئيسية') + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_run_command_fds(mocker, rc_am): + subprocess_mock = mocker.patch('ansible.module_utils.basic.subprocess') + subprocess_mock.Popen.side_effect = AssertionError + + try: + rc_am.run_command('synchronize', pass_fds=(101, 42)) + except SystemExit: + pass + + if PY2: + assert subprocess_mock.Popen.call_args[1]['close_fds'] is False + assert 'pass_fds' not in subprocess_mock.Popen.call_args[1] + + else: + assert subprocess_mock.Popen.call_args[1]['pass_fds'] == (101, 42) + assert subprocess_mock.Popen.call_args[1]['close_fds'] is True diff --git a/test/units/module_utils/basic/test_safe_eval.py b/test/units/module_utils/basic/test_safe_eval.py new file mode 100644 index 00000000..e8538ca9 --- /dev/null +++ b/test/units/module_utils/basic/test_safe_eval.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# (c) 2015-2017, Toshio Kuratomi <tkuratomi@ansible.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from itertools import chain +import pytest + + +# Strings that should be converted into a typed value +VALID_STRINGS = ( + ("'a'", 'a'), + ("'1'", '1'), + ("1", 1), + ("True", True), + ("False", False), + ("{}", {}), +) + +# Passing things that aren't strings should just return the object +NONSTRINGS = ( + ({'a': 1}, {'a': 1}), +) + +# These strings are not basic types. For security, these should not be +# executed. We return the same string and get an exception for some +INVALID_STRINGS = ( + ("a=1", "a=1", SyntaxError), + ("a.foo()", "a.foo()", None), + ("import foo", "import foo", None), + ("__import__('foo')", "__import__('foo')", ValueError), +) + + +@pytest.mark.parametrize('code, expected, stdin', + ((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)), + indirect=['stdin']) +def test_simple_types(am, code, expected): + # test some basic usage for various types + assert am.safe_eval(code) == expected + + +@pytest.mark.parametrize('code, expected, stdin', + ((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)), + indirect=['stdin']) +def test_simple_types_with_exceptions(am, code, expected): + # Test simple types with exceptions requested + assert am.safe_eval(code, include_exceptions=True), (expected, None) + + +@pytest.mark.parametrize('code, expected, stdin', + ((c, e, {}) for c, e, dummy in INVALID_STRINGS), + indirect=['stdin']) +def test_invalid_strings(am, code, expected): + assert am.safe_eval(code) == expected + + +@pytest.mark.parametrize('code, expected, exception, stdin', + ((c, e, ex, {}) for c, e, ex in INVALID_STRINGS), + indirect=['stdin']) +def test_invalid_strings_with_exceptions(am, code, expected, exception): + res = am.safe_eval(code, include_exceptions=True) + assert res[0] == expected + if exception is None: + assert res[1] == exception + else: + assert type(res[1]) == exception diff --git a/test/units/module_utils/basic/test_sanitize_keys.py b/test/units/module_utils/basic/test_sanitize_keys.py new file mode 100644 index 00000000..180f8662 --- /dev/null +++ b/test/units/module_utils/basic/test_sanitize_keys.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# (c) 2020, Red Hat +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest +from ansible.module_utils.basic import sanitize_keys + + +def test_sanitize_keys_non_dict_types(): + """ Test that non-dict-like objects return the same data. """ + + type_exception = 'Unsupported type for key sanitization.' + no_log_strings = set() + + assert 'string value' == sanitize_keys('string value', no_log_strings) + + assert sanitize_keys(None, no_log_strings) is None + + assert set(['x', 'y']) == sanitize_keys(set(['x', 'y']), no_log_strings) + + assert not sanitize_keys(False, no_log_strings) + + +def _run_comparison(obj): + no_log_strings = set(['secret', 'password']) + + ret = sanitize_keys(obj, no_log_strings) + + expected = [ + None, + True, + 100, + "some string", + set([1, 2]), + [1, 2], + + {'key1': ['value1a', 'value1b'], + 'some-********': 'value-for-some-password', + 'key2': {'key3': set(['value3a', 'value3b']), + 'i-have-a-********': {'********-********': 'value-for-secret-password', 'key4': 'value4'} + } + }, + + {'foo': [{'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER': 1}]} + ] + + assert ret == expected + + +def test_sanitize_keys_dict(): + """ Test that santize_keys works with a dict. """ + + d = [ + None, + True, + 100, + "some string", + set([1, 2]), + [1, 2], + + {'key1': ['value1a', 'value1b'], + 'some-password': 'value-for-some-password', + 'key2': {'key3': set(['value3a', 'value3b']), + 'i-have-a-secret': {'secret-password': 'value-for-secret-password', 'key4': 'value4'} + } + }, + + {'foo': [{'secret': 1}]} + ] + + _run_comparison(d) + + +def test_sanitize_keys_with_ignores(): + """ Test that we can actually ignore keys. """ + + no_log_strings = set(['secret', 'rc']) + ignore_keys = set(['changed', 'rc', 'status']) + + value = {'changed': True, + 'rc': 0, + 'test-rc': 1, + 'another-secret': 2, + 'status': 'okie dokie'} + + # We expect to change 'test-rc' but NOT 'rc'. + expected = {'changed': True, + 'rc': 0, + 'test-********': 1, + 'another-********': 2, + 'status': 'okie dokie'} + + ret = sanitize_keys(value, no_log_strings, ignore_keys) + assert ret == expected diff --git a/test/units/module_utils/basic/test_selinux.py b/test/units/module_utils/basic/test_selinux.py new file mode 100644 index 00000000..8562eb88 --- /dev/null +++ b/test/units/module_utils/basic/test_selinux.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import errno +import json + +from units.mock.procenv import ModuleTestCase, swap_stdin_and_argv + +from units.compat.mock import patch, MagicMock, mock_open, Mock +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestSELinux(ModuleTestCase): + def test_module_utils_basic_ansible_module_selinux_mls_enabled(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + basic.HAVE_SELINUX = False + self.assertEqual(am.selinux_mls_enabled(), False) + + basic.HAVE_SELINUX = True + basic.selinux = Mock() + with patch.dict('sys.modules', {'selinux': basic.selinux}): + with patch('selinux.is_selinux_mls_enabled', return_value=0): + self.assertEqual(am.selinux_mls_enabled(), False) + with patch('selinux.is_selinux_mls_enabled', return_value=1): + self.assertEqual(am.selinux_mls_enabled(), True) + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_selinux_initial_context(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + am.selinux_mls_enabled = MagicMock() + am.selinux_mls_enabled.return_value = False + self.assertEqual(am.selinux_initial_context(), [None, None, None]) + am.selinux_mls_enabled.return_value = True + self.assertEqual(am.selinux_initial_context(), [None, None, None, None]) + + def test_module_utils_basic_ansible_module_selinux_enabled(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + # we first test the cases where the python selinux lib is + # not installed, which has two paths: one in which the system + # does have selinux installed (and the selinuxenabled command + # is present and returns 0 when run), or selinux is not installed + basic.HAVE_SELINUX = False + am.get_bin_path = MagicMock() + am.get_bin_path.return_value = '/path/to/selinuxenabled' + am.run_command = MagicMock() + am.run_command.return_value = (0, '', '') + self.assertRaises(SystemExit, am.selinux_enabled) + am.get_bin_path.return_value = None + self.assertEqual(am.selinux_enabled(), False) + + # finally we test the case where the python selinux lib is installed, + # and both possibilities there (enabled vs. disabled) + basic.HAVE_SELINUX = True + basic.selinux = Mock() + with patch.dict('sys.modules', {'selinux': basic.selinux}): + with patch('selinux.is_selinux_enabled', return_value=0): + self.assertEqual(am.selinux_enabled(), False) + with patch('selinux.is_selinux_enabled', return_value=1): + self.assertEqual(am.selinux_enabled(), True) + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_selinux_default_context(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + am.selinux_initial_context = MagicMock(return_value=[None, None, None, None]) + am.selinux_enabled = MagicMock(return_value=True) + + # we first test the cases where the python selinux lib is not installed + basic.HAVE_SELINUX = False + self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) + + # all following tests assume the python selinux bindings are installed + basic.HAVE_SELINUX = True + + basic.selinux = Mock() + + with patch.dict('sys.modules', {'selinux': basic.selinux}): + # next, we test with a mocked implementation of selinux.matchpathcon to simulate + # an actual context being found + with patch('selinux.matchpathcon', return_value=[0, 'unconfined_u:object_r:default_t:s0']): + self.assertEqual(am.selinux_default_context(path='/foo/bar'), ['unconfined_u', 'object_r', 'default_t', 's0']) + + # we also test the case where matchpathcon returned a failure + with patch('selinux.matchpathcon', return_value=[-1, '']): + self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) + + # finally, we test where an OSError occurred during matchpathcon's call + with patch('selinux.matchpathcon', side_effect=OSError): + self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) + + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_selinux_context(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + am.selinux_initial_context = MagicMock(return_value=[None, None, None, None]) + am.selinux_enabled = MagicMock(return_value=True) + + # we first test the cases where the python selinux lib is not installed + basic.HAVE_SELINUX = False + self.assertEqual(am.selinux_context(path='/foo/bar'), [None, None, None, None]) + + # all following tests assume the python selinux bindings are installed + basic.HAVE_SELINUX = True + + basic.selinux = Mock() + + with patch.dict('sys.modules', {'selinux': basic.selinux}): + # next, we test with a mocked implementation of selinux.lgetfilecon_raw to simulate + # an actual context being found + with patch('selinux.lgetfilecon_raw', return_value=[0, 'unconfined_u:object_r:default_t:s0']): + self.assertEqual(am.selinux_context(path='/foo/bar'), ['unconfined_u', 'object_r', 'default_t', 's0']) + + # we also test the case where matchpathcon returned a failure + with patch('selinux.lgetfilecon_raw', return_value=[-1, '']): + self.assertEqual(am.selinux_context(path='/foo/bar'), [None, None, None, None]) + + # finally, we test where an OSError occurred during matchpathcon's call + e = OSError() + e.errno = errno.ENOENT + with patch('selinux.lgetfilecon_raw', side_effect=e): + self.assertRaises(SystemExit, am.selinux_context, path='/foo/bar') + + e = OSError() + with patch('selinux.lgetfilecon_raw', side_effect=e): + self.assertRaises(SystemExit, am.selinux_context, path='/foo/bar') + + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_is_special_selinux_path(self): + from ansible.module_utils import basic + + args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'_ansible_selinux_special_fs': "nfs,nfsd,foos", + '_ansible_remote_tmp': "/tmp", + '_ansible_keep_remote_files': False})) + + with swap_stdin_and_argv(stdin_data=args): + basic._ANSIBLE_ARGS = None + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + def _mock_find_mount_point(path): + if path.startswith('/some/path'): + return '/some/path' + elif path.startswith('/weird/random/fstype'): + return '/weird/random/fstype' + return '/' + + am.find_mount_point = MagicMock(side_effect=_mock_find_mount_point) + am.selinux_context = MagicMock(return_value=['foo_u', 'foo_r', 'foo_t', 's0']) + + m = mock_open() + m.side_effect = OSError + + with patch.object(builtins, 'open', m, create=True): + self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (False, None)) + + mount_data = [ + '/dev/disk1 / ext4 rw,seclabel,relatime,data=ordered 0 0\n', + '1.1.1.1:/path/to/nfs /some/path nfs ro 0 0\n', + 'whatever /weird/random/fstype foos rw 0 0\n', + ] + + # mock_open has a broken readlines() implementation apparently... + # this should work by default but doesn't, so we fix it + m = mock_open(read_data=''.join(mount_data)) + m.return_value.readlines.return_value = mount_data + + with patch.object(builtins, 'open', m, create=True): + self.assertEqual(am.is_special_selinux_path('/some/random/path'), (False, None)) + self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (True, ['foo_u', 'foo_r', 'foo_t', 's0'])) + self.assertEqual(am.is_special_selinux_path('/weird/random/fstype/path'), (True, ['foo_u', 'foo_r', 'foo_t', 's0'])) + + def test_module_utils_basic_ansible_module_set_context_if_different(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + basic.HAVE_SELINUX = False + + am.selinux_enabled = MagicMock(return_value=False) + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True), True) + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), False) + + basic.HAVE_SELINUX = True + + am.selinux_enabled = MagicMock(return_value=True) + am.selinux_context = MagicMock(return_value=['bar_u', 'bar_r', None, None]) + am.is_special_selinux_path = MagicMock(return_value=(False, None)) + + basic.selinux = Mock() + with patch.dict('sys.modules', {'selinux': basic.selinux}): + with patch('selinux.lsetfilecon', return_value=0) as m: + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) + m.assert_called_with('/path/to/file', 'foo_u:foo_r:foo_t:s0') + m.reset_mock() + am.check_mode = True + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) + self.assertEqual(m.called, False) + am.check_mode = False + + with patch('selinux.lsetfilecon', return_value=1) as m: + self.assertRaises(SystemExit, am.set_context_if_different, '/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) + + with patch('selinux.lsetfilecon', side_effect=OSError) as m: + self.assertRaises(SystemExit, am.set_context_if_different, '/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) + + am.is_special_selinux_path = MagicMock(return_value=(True, ['sp_u', 'sp_r', 'sp_t', 's0'])) + + with patch('selinux.lsetfilecon', return_value=0) as m: + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) + m.assert_called_with('/path/to/file', 'sp_u:sp_r:sp_t:s0') + + delattr(basic, 'selinux') diff --git a/test/units/module_utils/basic/test_set_cwd.py b/test/units/module_utils/basic/test_set_cwd.py new file mode 100644 index 00000000..159236b7 --- /dev/null +++ b/test/units/module_utils/basic/test_set_cwd.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import os +import shutil +import tempfile + +import pytest + +from units.compat.mock import patch, MagicMock +from ansible.module_utils._text import to_bytes + +from ansible.module_utils import basic + + +class TestAnsibleModuleSetCwd: + + def test_set_cwd(self, monkeypatch): + + '''make sure /tmp is used''' + + def mock_getcwd(): + return '/tmp' + + def mock_access(path, perm): + return True + + def mock_chdir(path): + pass + + monkeypatch.setattr(os, 'getcwd', mock_getcwd) + monkeypatch.setattr(os, 'access', mock_access) + monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}}))) + with patch('time.time', return_value=42): + am = basic.AnsibleModule(argument_spec={}) + + result = am._set_cwd() + assert result == '/tmp' + + def test_set_cwd_unreadable_use_self_tmpdir(self, monkeypatch): + + '''pwd is not readable, use instance's tmpdir property''' + + def mock_getcwd(): + return '/tmp' + + def mock_access(path, perm): + if path == '/tmp' and perm == 4: + return False + return True + + def mock_expandvars(var): + if var == '$HOME': + return '/home/foobar' + return var + + def mock_gettempdir(): + return '/tmp/testdir' + + def mock_chdir(path): + if path == '/tmp': + raise Exception() + return + + monkeypatch.setattr(os, 'getcwd', mock_getcwd) + monkeypatch.setattr(os, 'chdir', mock_chdir) + monkeypatch.setattr(os, 'access', mock_access) + monkeypatch.setattr(os.path, 'expandvars', mock_expandvars) + monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}}))) + with patch('time.time', return_value=42): + am = basic.AnsibleModule(argument_spec={}) + + am._tmpdir = '/tmp2' + result = am._set_cwd() + assert result == am._tmpdir + + def test_set_cwd_unreadable_use_home(self, monkeypatch): + + '''cwd and instance tmpdir are unreadable, use home''' + + def mock_getcwd(): + return '/tmp' + + def mock_access(path, perm): + if path in ['/tmp', '/tmp2'] and perm == 4: + return False + return True + + def mock_expandvars(var): + if var == '$HOME': + return '/home/foobar' + return var + + def mock_gettempdir(): + return '/tmp/testdir' + + def mock_chdir(path): + if path == '/tmp': + raise Exception() + return + + monkeypatch.setattr(os, 'getcwd', mock_getcwd) + monkeypatch.setattr(os, 'chdir', mock_chdir) + monkeypatch.setattr(os, 'access', mock_access) + monkeypatch.setattr(os.path, 'expandvars', mock_expandvars) + monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}}))) + with patch('time.time', return_value=42): + am = basic.AnsibleModule(argument_spec={}) + + am._tmpdir = '/tmp2' + result = am._set_cwd() + assert result == '/home/foobar' + + def test_set_cwd_unreadable_use_gettempdir(self, monkeypatch): + + '''fallback to tempfile.gettempdir''' + + thisdir = None + + def mock_getcwd(): + return '/tmp' + + def mock_access(path, perm): + if path in ['/tmp', '/tmp2', '/home/foobar'] and perm == 4: + return False + return True + + def mock_expandvars(var): + if var == '$HOME': + return '/home/foobar' + return var + + def mock_gettempdir(): + return '/tmp3' + + def mock_chdir(path): + if path == '/tmp': + raise Exception() + thisdir = path + + monkeypatch.setattr(os, 'getcwd', mock_getcwd) + monkeypatch.setattr(os, 'chdir', mock_chdir) + monkeypatch.setattr(os, 'access', mock_access) + monkeypatch.setattr(os.path, 'expandvars', mock_expandvars) + monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}}))) + with patch('time.time', return_value=42): + am = basic.AnsibleModule(argument_spec={}) + + am._tmpdir = '/tmp2' + monkeypatch.setattr(tempfile, 'gettempdir', mock_gettempdir) + result = am._set_cwd() + assert result == '/tmp3' + + def test_set_cwd_unreadable_use_None(self, monkeypatch): + + '''all paths are unreable, should return None and not an exception''' + + def mock_getcwd(): + return '/tmp' + + def mock_access(path, perm): + if path in ['/tmp', '/tmp2', '/tmp3', '/home/foobar'] and perm == 4: + return False + return True + + def mock_expandvars(var): + if var == '$HOME': + return '/home/foobar' + return var + + def mock_gettempdir(): + return '/tmp3' + + def mock_chdir(path): + if path == '/tmp': + raise Exception() + + monkeypatch.setattr(os, 'getcwd', mock_getcwd) + monkeypatch.setattr(os, 'chdir', mock_chdir) + monkeypatch.setattr(os, 'access', mock_access) + monkeypatch.setattr(os.path, 'expandvars', mock_expandvars) + monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}}))) + with patch('time.time', return_value=42): + am = basic.AnsibleModule(argument_spec={}) + + am._tmpdir = '/tmp2' + monkeypatch.setattr(tempfile, 'gettempdir', mock_gettempdir) + result = am._set_cwd() + assert result is None diff --git a/test/units/module_utils/basic/test_set_mode_if_different.py b/test/units/module_utils/basic/test_set_mode_if_different.py new file mode 100644 index 00000000..93fe2467 --- /dev/null +++ b/test/units/module_utils/basic/test_set_mode_if_different.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com> +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import errno +import os + +from itertools import product + +try: + import builtins +except ImportError: + import __builtin__ as builtins + +import pytest + + +SYNONYMS_0660 = ( + 0o660, + '0o660', + '660', + 'u+rw-x,g+rw-x,o-rwx', + 'u=rw,g=rw,o-rwx', +) + + +@pytest.fixture +def mock_stats(mocker): + mock_stat1 = mocker.MagicMock() + mock_stat1.st_mode = 0o444 + mock_stat2 = mocker.MagicMock() + mock_stat2.st_mode = 0o660 + yield {"before": mock_stat1, "after": mock_stat2} + + +@pytest.fixture +def am_check_mode(am): + am.check_mode = True + yield am + am.check_mode = False + + +@pytest.fixture +def mock_lchmod(mocker): + m_lchmod = mocker.patch('ansible.module_utils.basic.os.lchmod', return_value=None, create=True) + yield m_lchmod + + +@pytest.mark.parametrize('previous_changes, check_mode, exists, stdin', + product((True, False), (True, False), (True, False), ({},)), + indirect=['stdin']) +def test_no_mode_given_returns_previous_changes(am, mock_stats, mock_lchmod, mocker, previous_changes, check_mode, exists): + am.check_mode = check_mode + mocker.patch('os.lstat', side_effect=[mock_stats['before']]) + m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True) + m_path_exists = mocker.patch('os.path.exists', return_value=exists) + + assert am.set_mode_if_different('/path/to/file', None, previous_changes) == previous_changes + assert not m_lchmod.called + assert not m_path_exists.called + + +@pytest.mark.parametrize('mode, check_mode, stdin', + product(SYNONYMS_0660, (True, False), ({},)), + indirect=['stdin']) +def test_mode_changed_to_0660(am, mock_stats, mocker, mode, check_mode): + # Note: This is for checking that all the different ways of specifying + # 0660 mode work. It cannot be used to check that setting a mode that is + # not equivalent to 0660 works. + am.check_mode = check_mode + mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after'], mock_stats['after']]) + m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True) + mocker.patch('os.path.exists', return_value=True) + + assert am.set_mode_if_different('/path/to/file', mode, False) + if check_mode: + assert not m_lchmod.called + else: + m_lchmod.assert_called_with(b'/path/to/file', 0o660) + + +@pytest.mark.parametrize('mode, check_mode, stdin', + product(SYNONYMS_0660, (True, False), ({},)), + indirect=['stdin']) +def test_mode_unchanged_when_already_0660(am, mock_stats, mocker, mode, check_mode): + # Note: This is for checking that all the different ways of specifying + # 0660 mode work. It cannot be used to check that setting a mode that is + # not equivalent to 0660 works. + am.check_mode = check_mode + mocker.patch('os.lstat', side_effect=[mock_stats['after'], mock_stats['after'], mock_stats['after']]) + m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True) + mocker.patch('os.path.exists', return_value=True) + + assert not am.set_mode_if_different('/path/to/file', mode, False) + assert not m_lchmod.called + + +@pytest.mark.parametrize('check_mode, stdin', + product((True, False), ({},)), + indirect=['stdin']) +def test_missing_lchmod_is_not_link(am, mock_stats, mocker, monkeypatch, check_mode): + """Some platforms have lchmod (*BSD) others do not (Linux)""" + + am.check_mode = check_mode + original_hasattr = hasattr + + monkeypatch.delattr(os, 'lchmod', raising=False) + + mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']]) + mocker.patch('os.path.islink', return_value=False) + mocker.patch('os.path.exists', return_value=True) + m_chmod = mocker.patch('os.chmod', return_value=None) + + assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False) + if check_mode: + assert not m_chmod.called + else: + m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660) + + +@pytest.mark.parametrize('check_mode, stdin', + product((True, False), ({},)), + indirect=['stdin']) +def test_missing_lchmod_is_link(am, mock_stats, mocker, monkeypatch, check_mode): + """Some platforms have lchmod (*BSD) others do not (Linux)""" + + am.check_mode = check_mode + original_hasattr = hasattr + + monkeypatch.delattr(os, 'lchmod', raising=False) + + mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']]) + mocker.patch('os.path.islink', return_value=True) + mocker.patch('os.path.exists', return_value=True) + m_chmod = mocker.patch('os.chmod', return_value=None) + mocker.patch('os.stat', return_value=mock_stats['after']) + + assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False) + if check_mode: + assert not m_chmod.called + else: + m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660) + + mocker.resetall() + mocker.stopall() + + +@pytest.mark.parametrize('stdin,', + ({},), + indirect=['stdin']) +def test_missing_lchmod_is_link_in_sticky_dir(am, mock_stats, mocker): + """Some platforms have lchmod (*BSD) others do not (Linux)""" + + am.check_mode = False + original_hasattr = hasattr + + def _hasattr(obj, name): + if obj == os and name == 'lchmod': + return False + return original_hasattr(obj, name) + + mock_lstat = mocker.MagicMock() + mock_lstat.st_mode = 0o777 + + mocker.patch('os.lstat', side_effect=[mock_lstat, mock_lstat]) + mocker.patch.object(builtins, 'hasattr', side_effect=_hasattr) + mocker.patch('os.path.islink', return_value=True) + mocker.patch('os.path.exists', return_value=True) + m_stat = mocker.patch('os.stat', side_effect=OSError(errno.EACCES, 'Permission denied')) + m_chmod = mocker.patch('os.chmod', return_value=None) + + # not changed: can't set mode on symbolic links + assert not am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False) + m_stat.assert_called_with(b'/path/to/file/no_lchmod') + m_chmod.assert_not_called() + + mocker.resetall() + mocker.stopall() diff --git a/test/units/module_utils/basic/test_tmpdir.py b/test/units/module_utils/basic/test_tmpdir.py new file mode 100644 index 00000000..818cb9b1 --- /dev/null +++ b/test/units/module_utils/basic/test_tmpdir.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import os +import shutil +import tempfile + +import pytest + +from units.compat.mock import patch, MagicMock +from ansible.module_utils._text import to_bytes + +from ansible.module_utils import basic + + +class TestAnsibleModuleTmpDir: + + DATA = ( + ( + { + "_ansible_tmpdir": "/path/to/dir", + "_ansible_remote_tmp": "/path/tmpdir", + "_ansible_keep_remote_files": False, + }, + True, + "/path/to/dir" + ), + ( + { + "_ansible_tmpdir": None, + "_ansible_remote_tmp": "/path/tmpdir", + "_ansible_keep_remote_files": False + }, + False, + "/path/tmpdir/ansible-moduletmp-42-" + ), + ( + { + "_ansible_tmpdir": None, + "_ansible_remote_tmp": "/path/tmpdir", + "_ansible_keep_remote_files": False + }, + True, + "/path/tmpdir/ansible-moduletmp-42-" + ), + ( + { + "_ansible_tmpdir": None, + "_ansible_remote_tmp": "$HOME/.test", + "_ansible_keep_remote_files": False + }, + False, + os.path.join(os.environ['HOME'], ".test/ansible-moduletmp-42-") + ), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('args, expected, stat_exists', ((s, e, t) for s, t, e in DATA)) + def test_tmpdir_property(self, monkeypatch, args, expected, stat_exists): + makedirs = {'called': False} + + def mock_mkdtemp(prefix, dir): + return os.path.join(dir, prefix) + + def mock_makedirs(path, mode): + makedirs['called'] = True + makedirs['path'] = path + makedirs['mode'] = mode + return + + monkeypatch.setattr(tempfile, 'mkdtemp', mock_mkdtemp) + monkeypatch.setattr(os.path, 'exists', lambda x: stat_exists) + monkeypatch.setattr(os, 'makedirs', mock_makedirs) + monkeypatch.setattr(shutil, 'rmtree', lambda x: None) + monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': args}))) + + with patch('time.time', return_value=42): + am = basic.AnsibleModule(argument_spec={}) + actual_tmpdir = am.tmpdir + + assert actual_tmpdir == expected + + # verify subsequent calls always produces the same tmpdir + assert am.tmpdir == actual_tmpdir + + if not stat_exists: + assert makedirs['called'] + expected = os.path.expanduser(os.path.expandvars(am._remote_tmp)) + assert makedirs['path'] == expected + assert makedirs['mode'] == 0o700 + + @pytest.mark.parametrize('stdin', ({"_ansible_tmpdir": None, + "_ansible_remote_tmp": "$HOME/.test", + "_ansible_keep_remote_files": True},), + indirect=['stdin']) + def test_tmpdir_makedirs_failure(self, am, monkeypatch): + + mock_mkdtemp = MagicMock(return_value="/tmp/path") + mock_makedirs = MagicMock(side_effect=OSError("Some OS Error here")) + + monkeypatch.setattr(tempfile, 'mkdtemp', mock_mkdtemp) + monkeypatch.setattr(os.path, 'exists', lambda x: False) + monkeypatch.setattr(os, 'makedirs', mock_makedirs) + + actual = am.tmpdir + assert actual == "/tmp/path" + assert mock_makedirs.call_args[0] == (os.path.expanduser(os.path.expandvars("$HOME/.test")),) + assert mock_makedirs.call_args[1] == {"mode": 0o700} + + # because makedirs failed the dir should be None so it uses the System tmp + assert mock_mkdtemp.call_args[1]['dir'] is None + assert mock_mkdtemp.call_args[1]['prefix'].startswith("ansible-moduletmp-") |