summaryrefslogtreecommitdiffstats
path: root/test/units/module_utils/basic
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/module_utils/basic')
-rw-r--r--test/units/module_utils/basic/__init__.py0
-rw-r--r--test/units/module_utils/basic/test__log_invocation.py55
-rw-r--r--test/units/module_utils/basic/test__symbolic_mode_to_octal.py103
-rw-r--r--test/units/module_utils/basic/test_argument_spec.py724
-rw-r--r--test/units/module_utils/basic/test_atomic_move.py223
-rw-r--r--test/units/module_utils/basic/test_command_nonexisting.py31
-rw-r--r--test/units/module_utils/basic/test_deprecate_warn.py77
-rw-r--r--test/units/module_utils/basic/test_dict_converters.py31
-rw-r--r--test/units/module_utils/basic/test_exit_json.py173
-rw-r--r--test/units/module_utils/basic/test_filesystem.py160
-rw-r--r--test/units/module_utils/basic/test_get_file_attributes.py75
-rw-r--r--test/units/module_utils/basic/test_get_module_path.py22
-rw-r--r--test/units/module_utils/basic/test_heuristic_log_sanitize.py92
-rw-r--r--test/units/module_utils/basic/test_imports.py128
-rw-r--r--test/units/module_utils/basic/test_log.py152
-rw-r--r--test/units/module_utils/basic/test_no_log.py160
-rw-r--r--test/units/module_utils/basic/test_platform_distribution.py188
-rw-r--r--test/units/module_utils/basic/test_run_command.py278
-rw-r--r--test/units/module_utils/basic/test_safe_eval.py70
-rw-r--r--test/units/module_utils/basic/test_sanitize_keys.py98
-rw-r--r--test/units/module_utils/basic/test_selinux.py190
-rw-r--r--test/units/module_utils/basic/test_set_cwd.py195
-rw-r--r--test/units/module_utils/basic/test_set_mode_if_different.py190
-rw-r--r--test/units/module_utils/basic/test_tmpdir.py119
24 files changed, 3534 insertions, 0 deletions
diff --git a/test/units/module_utils/basic/__init__.py b/test/units/module_utils/basic/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/units/module_utils/basic/__init__.py
diff --git a/test/units/module_utils/basic/test__log_invocation.py b/test/units/module_utils/basic/test__log_invocation.py
new file mode 100644
index 0000000..3beda8b
--- /dev/null
+++ b/test/units/module_utils/basic/test__log_invocation.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+# (c) 2016, James Cammarata <jimi@sngx.net>
+# (c) 2017, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import pytest
+
+
+ARGS = dict(foo=False, bar=[1, 2, 3], bam="bam", baz=u'baz')
+ARGUMENT_SPEC = dict(
+ foo=dict(default=True, type='bool'),
+ bar=dict(default=[], type='list'),
+ bam=dict(default="bam"),
+ baz=dict(default=u"baz"),
+ password=dict(default=True),
+ no_log=dict(default="you shouldn't see me", no_log=True),
+)
+
+
+@pytest.mark.parametrize('am, stdin', [(ARGUMENT_SPEC, ARGS)], indirect=['am', 'stdin'])
+def test_module_utils_basic__log_invocation(am, mocker):
+
+ am.log = mocker.MagicMock()
+ am._log_invocation()
+
+ # Message is generated from a dict so it will be in an unknown order.
+ # have to check this manually rather than with assert_called_with()
+ args = am.log.call_args[0]
+ assert len(args) == 1
+ message = args[0]
+
+ assert len(message) == \
+ len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD')
+
+ assert message.startswith('Invoked with ')
+ assert ' bam=bam' in message
+ assert ' bar=[1, 2, 3]' in message
+ assert ' foo=False' in message
+ assert ' baz=baz' in message
+ assert ' no_log=NOT_LOGGING_PARAMETER' in message
+ assert ' password=NOT_LOGGING_PASSWORD' in message
+
+ kwargs = am.log.call_args[1]
+ assert kwargs == \
+ dict(log_args={
+ 'foo': 'False',
+ 'bar': '[1, 2, 3]',
+ 'bam': 'bam',
+ 'baz': 'baz',
+ 'password': 'NOT_LOGGING_PASSWORD',
+ 'no_log': 'NOT_LOGGING_PARAMETER',
+ })
diff --git a/test/units/module_utils/basic/test__symbolic_mode_to_octal.py b/test/units/module_utils/basic/test__symbolic_mode_to_octal.py
new file mode 100644
index 0000000..7793b34
--- /dev/null
+++ b/test/units/module_utils/basic/test__symbolic_mode_to_octal.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Copyright:
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016-2017 Ansible Project
+# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible.module_utils.basic import AnsibleModule
+
+
+#
+# Info helpful for making new test cases:
+#
+# base_mode = {'dir no perms': 0o040000,
+# 'file no perms': 0o100000,
+# 'dir all perms': 0o400000 | 0o777,
+# 'file all perms': 0o100000, | 0o777}
+#
+# perm_bits = {'x': 0b001,
+# 'w': 0b010,
+# 'r': 0b100}
+#
+# role_shift = {'u': 6,
+# 'g': 3,
+# 'o': 0}
+
+DATA = ( # Going from no permissions to setting all for user, group, and/or other
+ (0o040000, u'a+rwx', 0o0777),
+ (0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777),
+ (0o040000, u'o+rwx', 0o0007),
+ (0o040000, u'g+rwx', 0o0070),
+ (0o040000, u'u+rwx', 0o0700),
+
+ # Going from all permissions to none for user, group, and/or other
+ (0o040777, u'a-rwx', 0o0000),
+ (0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000),
+ (0o040777, u'o-rwx', 0o0770),
+ (0o040777, u'g-rwx', 0o0707),
+ (0o040777, u'u-rwx', 0o0077),
+
+ # now using absolute assignment from None to a set of perms
+ (0o040000, u'a=rwx', 0o0777),
+ (0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777),
+ (0o040000, u'o=rwx', 0o0007),
+ (0o040000, u'g=rwx', 0o0070),
+ (0o040000, u'u=rwx', 0o0700),
+
+ # X effect on files and dirs
+ (0o040000, u'a+X', 0o0111),
+ (0o100000, u'a+X', 0),
+ (0o040000, u'a=X', 0o0111),
+ (0o100000, u'a=X', 0),
+ (0o040777, u'a-X', 0o0666),
+ # Same as chmod but is it a bug?
+ # chmod a-X statfile <== removes execute from statfile
+ (0o100777, u'a-X', 0o0666),
+
+ # Multiple permissions
+ (0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755),
+ (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644),
+)
+
+UMASK_DATA = (
+ (0o100000, '+rwx', 0o770),
+ (0o100777, '-rwx', 0o007),
+)
+
+INVALID_DATA = (
+ (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"),
+ (0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"),
+)
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', DATA)
+def test_good_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA)
+def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_umask = mocker.patch('os.umask')
+ mock_umask.return_value = 0o7
+
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
+
+
+@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA)
+def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected):
+ mock_stat = mocker.MagicMock()
+ mock_stat.st_mode = stat_info
+ with pytest.raises(ValueError) as exc:
+ assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah'
+ assert exc.match(expected)
diff --git a/test/units/module_utils/basic/test_argument_spec.py b/test/units/module_utils/basic/test_argument_spec.py
new file mode 100644
index 0000000..211d65a
--- /dev/null
+++ b/test/units/module_utils/basic/test_argument_spec.py
@@ -0,0 +1,724 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# Copyright: Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import json
+import os
+
+import pytest
+
+from units.compat.mock import MagicMock
+from ansible.module_utils import basic
+from ansible.module_utils.api import basic_auth_argument_spec, rate_limit_argument_spec, retry_argument_spec
+from ansible.module_utils.common import warnings
+from ansible.module_utils.common.warnings import get_deprecation_messages, get_warning_messages
+from ansible.module_utils.six import integer_types, string_types
+from ansible.module_utils.six.moves import builtins
+
+
+MOCK_VALIDATOR_FAIL = MagicMock(side_effect=TypeError("bad conversion"))
+# Data is argspec, argument, expected
+VALID_SPECS = (
+ # Simple type=int
+ ({'arg': {'type': 'int'}}, {'arg': 42}, 42),
+ # Simple type=int with a large value (will be of type long under Python 2)
+ ({'arg': {'type': 'int'}}, {'arg': 18765432109876543210}, 18765432109876543210),
+ # Simple type=list, elements=int
+ ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': [42, 32]}, [42, 32]),
+ # Type=int with conversion from string
+ ({'arg': {'type': 'int'}}, {'arg': '42'}, 42),
+ # Type=list elements=int with conversion from string
+ ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': ['42', '32']}, [42, 32]),
+ # Simple type=float
+ ({'arg': {'type': 'float'}}, {'arg': 42.0}, 42.0),
+ # Simple type=list, elements=float
+ ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': [42.1, 32.2]}, [42.1, 32.2]),
+ # Type=float conversion from int
+ ({'arg': {'type': 'float'}}, {'arg': 42}, 42.0),
+ # type=list, elements=float conversion from int
+ ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': [42, 32]}, [42.0, 32.0]),
+ # Type=float conversion from string
+ ({'arg': {'type': 'float'}}, {'arg': '42.0'}, 42.0),
+ # type=list, elements=float conversion from string
+ ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': ['42.1', '32.2']}, [42.1, 32.2]),
+ # Type=float conversion from string without decimal point
+ ({'arg': {'type': 'float'}}, {'arg': '42'}, 42.0),
+ # Type=list elements=float conversion from string without decimal point
+ ({'arg': {'type': 'list', 'elements': 'float'}}, {'arg': ['42', '32.2']}, [42.0, 32.2]),
+ # Simple type=bool
+ ({'arg': {'type': 'bool'}}, {'arg': True}, True),
+ # Simple type=list elements=bool
+ ({'arg': {'type': 'list', 'elements': 'bool'}}, {'arg': [True, 'true', 1, 'yes', False, 'false', 'no', 0]},
+ [True, True, True, True, False, False, False, False]),
+ # Type=int with conversion from string
+ ({'arg': {'type': 'bool'}}, {'arg': 'yes'}, True),
+ # Type=str converts to string
+ ({'arg': {'type': 'str'}}, {'arg': 42}, '42'),
+ # Type=list elements=str simple converts to string
+ ({'arg': {'type': 'list', 'elements': 'str'}}, {'arg': ['42', '32']}, ['42', '32']),
+ # Type is implicit, converts to string
+ ({'arg': {'type': 'str'}}, {'arg': 42}, '42'),
+ # Type=list elements=str implicit converts to string
+ ({'arg': {'type': 'list', 'elements': 'str'}}, {'arg': [42, 32]}, ['42', '32']),
+ # parameter is required
+ ({'arg': {'required': True}}, {'arg': 42}, '42'),
+)
+
+INVALID_SPECS = (
+ # Type is int; unable to convert this string
+ ({'arg': {'type': 'int'}}, {'arg': "wolf"}, "is of type {0} and we were unable to convert to int: {0} cannot be converted to an int".format(type('bad'))),
+ # Type is list elements is int; unable to convert this string
+ ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': [1, "bad"]}, "is of type {0} and we were unable to convert to int: {0} cannot be converted to "
+ "an int".format(type('int'))),
+ # Type is int; unable to convert float
+ ({'arg': {'type': 'int'}}, {'arg': 42.1}, "'float'> cannot be converted to an int"),
+ # Type is list, elements is int; unable to convert float
+ ({'arg': {'type': 'list', 'elements': 'int'}}, {'arg': [42.1, 32, 2]}, "'float'> cannot be converted to an int"),
+ # type is a callable that fails to convert
+ ({'arg': {'type': MOCK_VALIDATOR_FAIL}}, {'arg': "bad"}, "bad conversion"),
+ # type is a list, elements is callable that fails to convert
+ ({'arg': {'type': 'list', 'elements': MOCK_VALIDATOR_FAIL}}, {'arg': [1, "bad"]}, "bad conversion"),
+ # unknown parameter
+ ({'arg': {'type': 'int'}}, {'other': 'bad', '_ansible_module_name': 'ansible_unittest'},
+ 'Unsupported parameters for (ansible_unittest) module: other. Supported parameters include: arg.'),
+ ({'arg': {'type': 'int', 'aliases': ['argument']}}, {'other': 'bad', '_ansible_module_name': 'ansible_unittest'},
+ 'Unsupported parameters for (ansible_unittest) module: other. Supported parameters include: arg (argument).'),
+ # parameter is required
+ ({'arg': {'required': True}}, {}, 'missing required arguments: arg'),
+)
+
+BASIC_AUTH_VALID_ARGS = [
+ {'api_username': 'user1', 'api_password': 'password1', 'api_url': 'http://example.com', 'validate_certs': False},
+ {'api_username': 'user1', 'api_password': 'password1', 'api_url': 'http://example.com', 'validate_certs': True},
+]
+
+RATE_LIMIT_VALID_ARGS = [
+ {'rate': 1, 'rate_limit': 1},
+ {'rate': '1', 'rate_limit': 1},
+ {'rate': 1, 'rate_limit': '1'},
+ {'rate': '1', 'rate_limit': '1'},
+]
+
+RETRY_VALID_ARGS = [
+ {'retries': 1, 'retry_pause': 1.5},
+ {'retries': '1', 'retry_pause': '1.5'},
+ {'retries': 1, 'retry_pause': '1.5'},
+ {'retries': '1', 'retry_pause': 1.5},
+]
+
+
+@pytest.fixture
+def complex_argspec():
+ arg_spec = dict(
+ foo=dict(required=True, aliases=['dup']),
+ bar=dict(),
+ bam=dict(),
+ bing=dict(),
+ bang=dict(),
+ bong=dict(),
+ baz=dict(fallback=(basic.env_fallback, ['BAZ'])),
+ bar1=dict(type='bool'),
+ bar3=dict(type='list', elements='path'),
+ bar_str=dict(type='list', elements=str),
+ zardoz=dict(choices=['one', 'two']),
+ zardoz2=dict(type='list', choices=['one', 'two', 'three']),
+ zardoz3=dict(type='str', aliases=['zodraz'], deprecated_aliases=[dict(name='zodraz', version='9.99')]),
+ )
+ mut_ex = (('bar', 'bam'), ('bing', 'bang', 'bong'))
+ req_to = (('bam', 'baz'),)
+
+ kwargs = dict(
+ argument_spec=arg_spec,
+ mutually_exclusive=mut_ex,
+ required_together=req_to,
+ no_log=True,
+ add_file_common_args=True,
+ supports_check_mode=True,
+ )
+ return kwargs
+
+
+@pytest.fixture
+def options_argspec_list():
+ options_spec = dict(
+ foo=dict(required=True, aliases=['dup']),
+ bar=dict(),
+ bar1=dict(type='list', elements='str'),
+ bar2=dict(type='list', elements='int'),
+ bar3=dict(type='list', elements='float'),
+ bar4=dict(type='list', elements='path'),
+ bam=dict(),
+ baz=dict(fallback=(basic.env_fallback, ['BAZ'])),
+ bam1=dict(),
+ bam2=dict(default='test'),
+ bam3=dict(type='bool'),
+ bam4=dict(type='str'),
+ )
+
+ arg_spec = dict(
+ foobar=dict(
+ type='list',
+ elements='dict',
+ options=options_spec,
+ mutually_exclusive=[
+ ['bam', 'bam1'],
+ ],
+ required_if=[
+ ['foo', 'hello', ['bam']],
+ ['foo', 'bam2', ['bam2']]
+ ],
+ required_one_of=[
+ ['bar', 'bam']
+ ],
+ required_together=[
+ ['bam1', 'baz']
+ ],
+ required_by={
+ 'bam4': ('bam1', 'bam3'),
+ },
+ )
+ )
+
+ kwargs = dict(
+ argument_spec=arg_spec,
+ no_log=True,
+ add_file_common_args=True,
+ supports_check_mode=True
+ )
+ return kwargs
+
+
+@pytest.fixture
+def options_argspec_dict(options_argspec_list):
+ # should test ok, for options in dict format.
+ kwargs = options_argspec_list
+ kwargs['argument_spec']['foobar']['type'] = 'dict'
+ kwargs['argument_spec']['foobar']['elements'] = None
+
+ return kwargs
+
+
+#
+# Tests for one aspect of arg_spec
+#
+
+@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in VALID_SPECS],
+ indirect=['stdin'])
+def test_validator_basic_types(argspec, expected, stdin):
+
+ am = basic.AnsibleModule(argspec)
+
+ if 'type' in argspec['arg']:
+ if argspec['arg']['type'] == 'int':
+ type_ = integer_types
+ else:
+ type_ = getattr(builtins, argspec['arg']['type'])
+ else:
+ type_ = str
+
+ assert isinstance(am.params['arg'], type_)
+ assert am.params['arg'] == expected
+
+
+@pytest.mark.parametrize('stdin', [{'arg': 42}, {'arg': 18765432109876543210}], indirect=['stdin'])
+def test_validator_function(mocker, stdin):
+ # Type is a callable
+ MOCK_VALIDATOR_SUCCESS = mocker.MagicMock(return_value=27)
+ argspec = {'arg': {'type': MOCK_VALIDATOR_SUCCESS}}
+ am = basic.AnsibleModule(argspec)
+
+ assert isinstance(am.params['arg'], integer_types)
+ assert am.params['arg'] == 27
+
+
+@pytest.mark.parametrize('stdin', BASIC_AUTH_VALID_ARGS, indirect=['stdin'])
+def test_validate_basic_auth_arg(mocker, stdin):
+ kwargs = dict(
+ argument_spec=basic_auth_argument_spec()
+ )
+ am = basic.AnsibleModule(**kwargs)
+ assert isinstance(am.params['api_username'], string_types)
+ assert isinstance(am.params['api_password'], string_types)
+ assert isinstance(am.params['api_url'], string_types)
+ assert isinstance(am.params['validate_certs'], bool)
+
+
+@pytest.mark.parametrize('stdin', RATE_LIMIT_VALID_ARGS, indirect=['stdin'])
+def test_validate_rate_limit_argument_spec(mocker, stdin):
+ kwargs = dict(
+ argument_spec=rate_limit_argument_spec()
+ )
+ am = basic.AnsibleModule(**kwargs)
+ assert isinstance(am.params['rate'], integer_types)
+ assert isinstance(am.params['rate_limit'], integer_types)
+
+
+@pytest.mark.parametrize('stdin', RETRY_VALID_ARGS, indirect=['stdin'])
+def test_validate_retry_argument_spec(mocker, stdin):
+ kwargs = dict(
+ argument_spec=retry_argument_spec()
+ )
+ am = basic.AnsibleModule(**kwargs)
+ assert isinstance(am.params['retries'], integer_types)
+ assert isinstance(am.params['retry_pause'], float)
+
+
+@pytest.mark.parametrize('stdin', [{'arg': '123'}, {'arg': 123}], indirect=['stdin'])
+def test_validator_string_type(mocker, stdin):
+ # Custom callable that is 'str'
+ argspec = {'arg': {'type': str}}
+ am = basic.AnsibleModule(argspec)
+
+ assert isinstance(am.params['arg'], string_types)
+ assert am.params['arg'] == '123'
+
+
+@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in INVALID_SPECS],
+ indirect=['stdin'])
+def test_validator_fail(stdin, capfd, argspec, expected):
+ with pytest.raises(SystemExit):
+ basic.AnsibleModule(argument_spec=argspec)
+
+ out, err = capfd.readouterr()
+ assert not err
+ assert expected in json.loads(out)['msg']
+ assert json.loads(out)['failed']
+
+
+class TestComplexArgSpecs:
+ """Test with a more complex arg_spec"""
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello'}, {'dup': 'hello'}], indirect=['stdin'])
+ def test_complex_required(self, stdin, complex_argspec):
+ """Test that the complex argspec works if we give it its required param as either the canonical or aliased name"""
+ am = basic.AnsibleModule(**complex_argspec)
+ assert isinstance(am.params['foo'], str)
+ assert am.params['foo'] == 'hello'
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello1', 'dup': 'hello2'}], indirect=['stdin'])
+ def test_complex_duplicate_warning(self, stdin, complex_argspec):
+ """Test that the complex argspec issues a warning if we specify an option both with its canonical name and its alias"""
+ am = basic.AnsibleModule(**complex_argspec)
+ assert isinstance(am.params['foo'], str)
+ assert 'Both option foo and its alias dup are set.' in get_warning_messages()
+ assert am.params['foo'] == 'hello2'
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bam': 'test'}], indirect=['stdin'])
+ def test_complex_type_fallback(self, mocker, stdin, complex_argspec):
+ """Test that the complex argspec works if we get a required parameter via fallback"""
+ environ = os.environ.copy()
+ environ['BAZ'] = 'test data'
+ mocker.patch('ansible.module_utils.basic.os.environ', environ)
+
+ am = basic.AnsibleModule(**complex_argspec)
+
+ assert isinstance(am.params['baz'], str)
+ assert am.params['baz'] == 'test data'
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar': 'bad', 'bam': 'bad2', 'bing': 'a', 'bang': 'b', 'bong': 'c'}], indirect=['stdin'])
+ def test_fail_mutually_exclusive(self, capfd, stdin, complex_argspec):
+ """Fail because of mutually exclusive parameters"""
+ with pytest.raises(SystemExit):
+ am = basic.AnsibleModule(**complex_argspec)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert results['msg'] == "parameters are mutually exclusive: bar|bam, bing|bang|bong"
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bam': 'bad2'}], indirect=['stdin'])
+ def test_fail_required_together(self, capfd, stdin, complex_argspec):
+ """Fail because only one of a required_together pair of parameters was specified"""
+ with pytest.raises(SystemExit):
+ am = basic.AnsibleModule(**complex_argspec)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert results['msg'] == "parameters are required together: bam, baz"
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar': 'hi'}], indirect=['stdin'])
+ def test_fail_required_together_and_default(self, capfd, stdin, complex_argspec):
+ """Fail because one of a required_together pair of parameters has a default and the other was not specified"""
+ complex_argspec['argument_spec']['baz'] = {'default': 42}
+ with pytest.raises(SystemExit):
+ am = basic.AnsibleModule(**complex_argspec)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert results['msg'] == "parameters are required together: bam, baz"
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello'}], indirect=['stdin'])
+ def test_fail_required_together_and_fallback(self, capfd, mocker, stdin, complex_argspec):
+ """Fail because one of a required_together pair of parameters has a fallback and the other was not specified"""
+ environ = os.environ.copy()
+ environ['BAZ'] = 'test data'
+ mocker.patch('ansible.module_utils.basic.os.environ', environ)
+
+ with pytest.raises(SystemExit):
+ am = basic.AnsibleModule(**complex_argspec)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert results['msg'] == "parameters are required together: bam, baz"
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'zardoz2': ['one', 'four', 'five']}], indirect=['stdin'])
+ def test_fail_list_with_choices(self, capfd, mocker, stdin, complex_argspec):
+ """Fail because one of the items is not in the choice"""
+ with pytest.raises(SystemExit):
+ basic.AnsibleModule(**complex_argspec)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert results['msg'] == "value of zardoz2 must be one or more of: one, two, three. Got no match for: four, five"
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'zardoz2': ['one', 'three']}], indirect=['stdin'])
+ def test_list_with_choices(self, capfd, mocker, stdin, complex_argspec):
+ """Test choices with list"""
+ am = basic.AnsibleModule(**complex_argspec)
+ assert isinstance(am.params['zardoz2'], list)
+ assert am.params['zardoz2'] == ['one', 'three']
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar3': ['~/test', 'test/']}], indirect=['stdin'])
+ def test_list_with_elements_path(self, capfd, mocker, stdin, complex_argspec):
+ """Test choices with list"""
+ am = basic.AnsibleModule(**complex_argspec)
+ assert isinstance(am.params['bar3'], list)
+ assert am.params['bar3'][0].startswith('/')
+ assert am.params['bar3'][1] == 'test/'
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'zodraz': 'one'}], indirect=['stdin'])
+ def test_deprecated_alias(self, capfd, mocker, stdin, complex_argspec, monkeypatch):
+ """Test a deprecated alias"""
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+
+ am = basic.AnsibleModule(**complex_argspec)
+
+ assert "Alias 'zodraz' is deprecated." in get_deprecation_messages()[0]['msg']
+ assert get_deprecation_messages()[0]['version'] == '9.99'
+
+ @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar_str': [867, '5309']}], indirect=['stdin'])
+ def test_list_with_elements_callable_str(self, capfd, mocker, stdin, complex_argspec):
+ """Test choices with list"""
+ am = basic.AnsibleModule(**complex_argspec)
+ assert isinstance(am.params['bar_str'], list)
+ assert isinstance(am.params['bar_str'][0], string_types)
+ assert isinstance(am.params['bar_str'][1], string_types)
+ assert am.params['bar_str'][0] == '867'
+ assert am.params['bar_str'][1] == '5309'
+
+
+class TestComplexOptions:
+ """Test arg spec options"""
+
+ # (Parameters, expected value of module.params['foobar'])
+ OPTIONS_PARAMS_LIST = (
+ ({'foobar': [{"foo": "hello", "bam": "good"}, {"foo": "test", "bar": "good"}]},
+ [{'foo': 'hello', 'bam': 'good', 'bam2': 'test', 'bar': None, 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None,
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None},
+ {'foo': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None,
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}]
+ ),
+ # Alias for required param
+ ({'foobar': [{"dup": "test", "bar": "good"}]},
+ [{'foo': 'test', 'dup': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None,
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}]
+ ),
+ # Required_if utilizing default value of the requirement
+ ({'foobar': [{"foo": "bam2", "bar": "required_one_of"}]},
+ [{'bam': None, 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': 'required_one_of', 'baz': None, 'foo': 'bam2',
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}]
+ ),
+ # Check that a bool option is converted
+ ({"foobar": [{"foo": "required", "bam": "good", "bam3": "yes"}]},
+ [{'bam': 'good', 'bam1': None, 'bam2': 'test', 'bam3': True, 'bam4': None, 'bar': None, 'baz': None, 'foo': 'required',
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}]
+ ),
+ # Check required_by options
+ ({"foobar": [{"foo": "required", "bar": "good", "baz": "good", "bam4": "required_by", "bam1": "ok", "bam3": "yes"}]},
+ [{'bar': 'good', 'baz': 'good', 'bam1': 'ok', 'bam2': 'test', 'bam3': True, 'bam4': 'required_by', 'bam': None, 'foo': 'required',
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}]
+ ),
+ # Check for elements in sub-options
+ ({"foobar": [{"foo": "good", "bam": "required_one_of", "bar1": [1, "good", "yes"], "bar2": ['1', 1], "bar3":['1.3', 1.3, 1]}]},
+ [{'foo': 'good', 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': None, 'baz': None, 'bam': 'required_one_of',
+ 'bar1': ["1", "good", "yes"], 'bar2': [1, 1], 'bar3': [1.3, 1.3, 1.0], 'bar4': None}]
+ ),
+ )
+
+ # (Parameters, expected value of module.params['foobar'])
+ OPTIONS_PARAMS_DICT = (
+ ({'foobar': {"foo": "hello", "bam": "good"}},
+ {'foo': 'hello', 'bam': 'good', 'bam2': 'test', 'bar': None, 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None,
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}
+ ),
+ # Alias for required param
+ ({'foobar': {"dup": "test", "bar": "good"}},
+ {'foo': 'test', 'dup': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None, 'bam4': None,
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}
+ ),
+ # Required_if utilizing default value of the requirement
+ ({'foobar': {"foo": "bam2", "bar": "required_one_of"}},
+ {'bam': None, 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': 'required_one_of', 'baz': None, 'foo': 'bam2',
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}
+ ),
+ # Check that a bool option is converted
+ ({"foobar": {"foo": "required", "bam": "good", "bam3": "yes"}},
+ {'bam': 'good', 'bam1': None, 'bam2': 'test', 'bam3': True, 'bam4': None, 'bar': None, 'baz': None, 'foo': 'required',
+ 'bar1': None, 'bar2': None, 'bar3': None, 'bar4': None}
+ ),
+ # Check required_by options
+ ({"foobar": {"foo": "required", "bar": "good", "baz": "good", "bam4": "required_by", "bam1": "ok", "bam3": "yes"}},
+ {'bar': 'good', 'baz': 'good', 'bam1': 'ok', 'bam2': 'test', 'bam3': True, 'bam4': 'required_by', 'bam': None,
+ 'foo': 'required', 'bar1': None, 'bar3': None, 'bar2': None, 'bar4': None}
+ ),
+ # Check for elements in sub-options
+ ({"foobar": {"foo": "good", "bam": "required_one_of", "bar1": [1, "good", "yes"],
+ "bar2": ['1', 1], "bar3": ['1.3', 1.3, 1]}},
+ {'foo': 'good', 'bam1': None, 'bam2': 'test', 'bam3': None, 'bam4': None, 'bar': None,
+ 'baz': None, 'bam': 'required_one_of',
+ 'bar1': ["1", "good", "yes"], 'bar2': [1, 1], 'bar3': [1.3, 1.3, 1.0], 'bar4': None}
+ ),
+ )
+
+ # (Parameters, failure message)
+ FAILING_PARAMS_LIST = (
+ # Missing required option
+ ({'foobar': [{}]}, 'missing required arguments: foo found in foobar'),
+ # Invalid option
+ ({'foobar': [{"foo": "hello", "bam": "good", "invalid": "bad"}]}, 'module: foobar.invalid. Supported parameters include'),
+ # Mutually exclusive options found
+ ({'foobar': [{"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}]},
+ 'parameters are mutually exclusive: bam|bam1 found in foobar'),
+ # required_if fails
+ ({'foobar': [{"foo": "hello", "bar": "bad"}]},
+ 'foo is hello but all of the following are missing: bam found in foobar'),
+ # Missing required_one_of option
+ ({'foobar': [{"foo": "test"}]},
+ 'one of the following is required: bar, bam found in foobar'),
+ # Missing required_together option
+ ({'foobar': [{"foo": "test", "bar": "required_one_of", "bam1": "bad"}]},
+ 'parameters are required together: bam1, baz found in foobar'),
+ # Missing required_by options
+ ({'foobar': [{"foo": "test", "bar": "required_one_of", "bam4": "required_by"}]},
+ "missing parameter(s) required by 'bam4': bam1, bam3"),
+ )
+
+ # (Parameters, failure message)
+ FAILING_PARAMS_DICT = (
+ # Missing required option
+ ({'foobar': {}}, 'missing required arguments: foo found in foobar'),
+ # Invalid option
+ ({'foobar': {"foo": "hello", "bam": "good", "invalid": "bad"}},
+ 'module: foobar.invalid. Supported parameters include'),
+ # Mutually exclusive options found
+ ({'foobar': {"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}},
+ 'parameters are mutually exclusive: bam|bam1 found in foobar'),
+ # required_if fails
+ ({'foobar': {"foo": "hello", "bar": "bad"}},
+ 'foo is hello but all of the following are missing: bam found in foobar'),
+ # Missing required_one_of option
+ ({'foobar': {"foo": "test"}},
+ 'one of the following is required: bar, bam found in foobar'),
+ # Missing required_together option
+ ({'foobar': {"foo": "test", "bar": "required_one_of", "bam1": "bad"}},
+ 'parameters are required together: bam1, baz found in foobar'),
+ # Missing required_by options
+ ({'foobar': {"foo": "test", "bar": "required_one_of", "bam4": "required_by"}},
+ "missing parameter(s) required by 'bam4': bam1, bam3"),
+ )
+
+ @pytest.mark.parametrize('stdin, expected', OPTIONS_PARAMS_DICT, indirect=['stdin'])
+ def test_options_type_dict(self, stdin, options_argspec_dict, expected):
+ """Test that a basic creation with required and required_if works"""
+ # should test ok, tests basic foo requirement and required_if
+ am = basic.AnsibleModule(**options_argspec_dict)
+
+ assert isinstance(am.params['foobar'], dict)
+ assert am.params['foobar'] == expected
+
+ @pytest.mark.parametrize('stdin, expected', OPTIONS_PARAMS_LIST, indirect=['stdin'])
+ def test_options_type_list(self, stdin, options_argspec_list, expected):
+ """Test that a basic creation with required and required_if works"""
+ # should test ok, tests basic foo requirement and required_if
+ am = basic.AnsibleModule(**options_argspec_list)
+
+ assert isinstance(am.params['foobar'], list)
+ assert am.params['foobar'] == expected
+
+ @pytest.mark.parametrize('stdin, expected', FAILING_PARAMS_DICT, indirect=['stdin'])
+ def test_fail_validate_options_dict(self, capfd, stdin, options_argspec_dict, expected):
+ """Fail because one of a required_together pair of parameters has a default and the other was not specified"""
+ with pytest.raises(SystemExit):
+ am = basic.AnsibleModule(**options_argspec_dict)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert expected in results['msg']
+
+ @pytest.mark.parametrize('stdin, expected', FAILING_PARAMS_LIST, indirect=['stdin'])
+ def test_fail_validate_options_list(self, capfd, stdin, options_argspec_list, expected):
+ """Fail because one of a required_together pair of parameters has a default and the other was not specified"""
+ with pytest.raises(SystemExit):
+ am = basic.AnsibleModule(**options_argspec_list)
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert results['failed']
+ assert expected in results['msg']
+
+ @pytest.mark.parametrize('stdin', [{'foobar': {'foo': 'required', 'bam1': 'test', 'bar': 'case'}}], indirect=['stdin'])
+ def test_fallback_in_option(self, mocker, stdin, options_argspec_dict):
+ """Test that the complex argspec works if we get a required parameter via fallback"""
+ environ = os.environ.copy()
+ environ['BAZ'] = 'test data'
+ mocker.patch('ansible.module_utils.basic.os.environ', environ)
+
+ am = basic.AnsibleModule(**options_argspec_dict)
+
+ assert isinstance(am.params['foobar']['baz'], str)
+ assert am.params['foobar']['baz'] == 'test data'
+
+ @pytest.mark.parametrize('stdin',
+ [{'foobar': {'foo': 'required', 'bam1': 'test', 'baz': 'data', 'bar': 'case', 'bar4': '~/test'}}],
+ indirect=['stdin'])
+ def test_elements_path_in_option(self, mocker, stdin, options_argspec_dict):
+ """Test that the complex argspec works with elements path type"""
+
+ am = basic.AnsibleModule(**options_argspec_dict)
+
+ assert isinstance(am.params['foobar']['bar4'][0], str)
+ assert am.params['foobar']['bar4'][0].startswith('/')
+
+ @pytest.mark.parametrize('stdin,spec,expected', [
+ ({},
+ {'one': {'type': 'dict', 'apply_defaults': True, 'options': {'two': {'default': True, 'type': 'bool'}}}},
+ {'two': True}),
+ ({},
+ {'one': {'type': 'dict', 'options': {'two': {'default': True, 'type': 'bool'}}}},
+ None),
+ ], indirect=['stdin'])
+ def test_subspec_not_required_defaults(self, stdin, spec, expected):
+ # Check that top level not required, processed subspec defaults
+ am = basic.AnsibleModule(spec)
+ assert am.params['one'] == expected
+
+
+class TestLoadFileCommonArguments:
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_smoketest_load_file_common_args(self, am):
+ """With no file arguments, an empty dict is returned"""
+ am.selinux_mls_enabled = MagicMock()
+ am.selinux_mls_enabled.return_value = True
+ am.selinux_default_context = MagicMock()
+ am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3)
+
+ assert am.load_file_common_arguments(params={}) == {}
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_load_file_common_args(self, am, mocker):
+ am.selinux_mls_enabled = MagicMock()
+ am.selinux_mls_enabled.return_value = True
+ am.selinux_default_context = MagicMock()
+ am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3)
+
+ base_params = dict(
+ path='/path/to/file',
+ mode=0o600,
+ owner='root',
+ group='root',
+ seuser='_default',
+ serole='_default',
+ setype='_default',
+ selevel='_default',
+ )
+
+ extended_params = base_params.copy()
+ extended_params.update(dict(
+ follow=True,
+ foo='bar',
+ ))
+
+ final_params = base_params.copy()
+ final_params.update(dict(
+ path='/path/to/real_file',
+ secontext=['unconfined_u', 'object_r', 'default_t', 's0'],
+ attributes=None,
+ ))
+
+ # with the proper params specified, the returned dictionary should represent
+ # only those params which have something to do with the file arguments, excluding
+ # other params and updated as required with proper values which may have been
+ # massaged by the method
+ mocker.patch('os.path.islink', return_value=True)
+ mocker.patch('os.path.realpath', return_value='/path/to/real_file')
+
+ res = am.load_file_common_arguments(params=extended_params)
+
+ assert res == final_params
+
+
+@pytest.mark.parametrize("stdin", [{"arg_pass": "testing"}], indirect=["stdin"])
+def test_no_log_true(stdin, capfd):
+ """Explicitly mask an argument (no_log=True)."""
+ arg_spec = {
+ "arg_pass": {"no_log": True}
+ }
+ am = basic.AnsibleModule(arg_spec)
+ # no_log=True is picked up by both am._log_invocation and list_no_log_values
+ # (called by am._handle_no_log_values). As a result, we can check for the
+ # value in am.no_log_values.
+ assert "testing" in am.no_log_values
+
+
+@pytest.mark.parametrize("stdin", [{"arg_pass": "testing"}], indirect=["stdin"])
+def test_no_log_false(stdin, capfd):
+ """Explicitly log and display an argument (no_log=False)."""
+ arg_spec = {
+ "arg_pass": {"no_log": False}
+ }
+ am = basic.AnsibleModule(arg_spec)
+ assert "testing" not in am.no_log_values and not get_warning_messages()
+
+
+@pytest.mark.parametrize("stdin", [{"arg_pass": "testing"}], indirect=["stdin"])
+def test_no_log_none(stdin, capfd):
+ """Allow Ansible to make the decision by matching the argument name
+ against PASSWORD_MATCH."""
+ arg_spec = {
+ "arg_pass": {}
+ }
+ am = basic.AnsibleModule(arg_spec)
+ # Omitting no_log is only picked up by _log_invocation, so the value never
+ # makes it into am.no_log_values. Instead we can check for the warning
+ # emitted by am._log_invocation.
+ assert len(get_warning_messages()) > 0
+
+
+@pytest.mark.parametrize("stdin", [{"pass": "testing"}], indirect=["stdin"])
+def test_no_log_alias(stdin, capfd):
+ """Given module parameters that use an alias for a parameter that matches
+ PASSWORD_MATCH and has no_log=True set, a warning should not be issued.
+ """
+ arg_spec = {
+ "other_pass": {"no_log": True, "aliases": ["pass"]},
+ }
+ am = basic.AnsibleModule(arg_spec)
+
+ assert len(get_warning_messages()) == 0
diff --git a/test/units/module_utils/basic/test_atomic_move.py b/test/units/module_utils/basic/test_atomic_move.py
new file mode 100644
index 0000000..bbdb051
--- /dev/null
+++ b/test/units/module_utils/basic/test_atomic_move.py
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import os
+import errno
+import json
+from itertools import product
+
+import pytest
+
+from ansible.module_utils import basic
+
+
+@pytest.fixture
+def atomic_am(am, mocker):
+ am.selinux_enabled = mocker.MagicMock()
+ am.selinux_context = mocker.MagicMock()
+ am.selinux_default_context = mocker.MagicMock()
+ am.set_context_if_different = mocker.MagicMock()
+ am._unsafe_writes = mocker.MagicMock()
+
+ yield am
+
+
+@pytest.fixture
+def atomic_mocks(mocker, monkeypatch):
+ environ = dict()
+ mocks = {
+ 'chmod': mocker.patch('os.chmod'),
+ 'chown': mocker.patch('os.chown'),
+ 'close': mocker.patch('os.close'),
+ 'environ': mocker.patch('os.environ', environ),
+ 'getlogin': mocker.patch('os.getlogin'),
+ 'getuid': mocker.patch('os.getuid'),
+ 'path_exists': mocker.patch('os.path.exists'),
+ 'rename': mocker.patch('os.rename'),
+ 'stat': mocker.patch('os.stat'),
+ 'umask': mocker.patch('os.umask'),
+ 'getpwuid': mocker.patch('pwd.getpwuid'),
+ 'copy2': mocker.patch('shutil.copy2'),
+ 'copyfileobj': mocker.patch('shutil.copyfileobj'),
+ 'move': mocker.patch('shutil.move'),
+ 'mkstemp': mocker.patch('tempfile.mkstemp'),
+ }
+
+ mocks['getlogin'].return_value = 'root'
+ mocks['getuid'].return_value = 0
+ mocks['getpwuid'].return_value = ('root', '', 0, 0, '', '', '')
+ mocks['umask'].side_effect = [18, 0]
+ mocks['rename'].return_value = None
+
+ # normalize OS specific features
+ monkeypatch.delattr(os, 'chflags', raising=False)
+
+ yield mocks
+
+
+@pytest.fixture
+def fake_stat(mocker):
+ stat1 = mocker.MagicMock()
+ stat1.st_mode = 0o0644
+ stat1.st_uid = 0
+ stat1.st_gid = 0
+ stat1.st_flags = 0
+ yield stat1
+
+
+@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin'])
+def test_new_file(atomic_am, atomic_mocks, mocker, selinux):
+ # test destination does not exist, login name = 'root', no environment, os.rename() succeeds
+ mock_context = atomic_am.selinux_default_context.return_value
+ atomic_mocks['path_exists'].return_value = False
+ atomic_am.selinux_enabled.return_value = selinux
+
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+ atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
+ assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)]
+
+ if selinux:
+ assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')]
+ assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
+ else:
+ assert not atomic_am.selinux_default_context.called
+ assert not atomic_am.set_context_if_different.called
+
+
+@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin'])
+def test_existing_file(atomic_am, atomic_mocks, fake_stat, mocker, selinux):
+ # Test destination already present
+ mock_context = atomic_am.selinux_context.return_value
+ atomic_mocks['stat'].return_value = fake_stat
+ atomic_mocks['path_exists'].return_value = True
+ atomic_am.selinux_enabled.return_value = selinux
+
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+ atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
+ assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)]
+
+ if selinux:
+ assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
+ assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')]
+ else:
+ assert not atomic_am.selinux_default_context.called
+ assert not atomic_am.set_context_if_different.called
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_no_tty_fallback(atomic_am, atomic_mocks, fake_stat, mocker):
+ """Raise OSError when using getlogin() to simulate no tty cornercase"""
+ mock_context = atomic_am.selinux_context.return_value
+ atomic_mocks['stat'].return_value = fake_stat
+ atomic_mocks['path_exists'].return_value = True
+ atomic_am.selinux_enabled.return_value = True
+ atomic_mocks['getlogin'].side_effect = OSError()
+ atomic_mocks['environ']['LOGNAME'] = 'root'
+
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+ atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
+ assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)]
+
+ assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
+ assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')]
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_existing_file_stat_failure(atomic_am, atomic_mocks, mocker):
+ """Failure to stat an existing file in order to copy permissions propogates the error (unless EPERM)"""
+ atomic_mocks['stat'].side_effect = OSError()
+ atomic_mocks['path_exists'].return_value = True
+
+ with pytest.raises(OSError):
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_existing_file_stat_perms_failure(atomic_am, atomic_mocks, mocker):
+ """Failure to stat an existing file to copy the permissions due to permissions passes fine"""
+ # and now have os.stat return EPERM, which should not fail
+ mock_context = atomic_am.selinux_context.return_value
+ atomic_mocks['stat'].side_effect = OSError(errno.EPERM, 'testing os stat with EPERM')
+ atomic_mocks['path_exists'].return_value = True
+ atomic_am.selinux_enabled.return_value = True
+
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+ atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
+ # FIXME: Should atomic_move() set a default permission value when it cannot retrieve the
+ # existing file's permissions? (Right now it's up to the calling code.
+ # assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)]
+ assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
+ assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')]
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_rename_failure(atomic_am, atomic_mocks, mocker, capfd):
+ """Test os.rename fails with EIO, causing it to bail out"""
+ atomic_mocks['path_exists'].side_effect = [False, False]
+ atomic_mocks['rename'].side_effect = OSError(errno.EIO, 'failing with EIO')
+
+ with pytest.raises(SystemExit):
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert 'Could not replace file' in results['msg']
+ assert 'failing with EIO' in results['msg']
+ assert results['failed']
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_rename_perms_fail_temp_creation_fails(atomic_am, atomic_mocks, mocker, capfd):
+ """Test os.rename fails with EPERM working but failure in mkstemp"""
+ atomic_mocks['path_exists'].return_value = False
+ atomic_mocks['close'].return_value = None
+ atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
+ atomic_mocks['mkstemp'].return_value = None
+ atomic_mocks['mkstemp'].side_effect = OSError()
+ atomic_am.selinux_enabled.return_value = False
+
+ with pytest.raises(SystemExit):
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+
+ out, err = capfd.readouterr()
+ results = json.loads(out)
+
+ assert 'is not writable by the current user' in results['msg']
+ assert results['failed']
+
+
+@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin'])
+def test_rename_perms_fail_temp_succeeds(atomic_am, atomic_mocks, fake_stat, mocker, selinux):
+ """Test os.rename raising an error but fallback to using mkstemp works"""
+ mock_context = atomic_am.selinux_default_context.return_value
+ atomic_mocks['path_exists'].return_value = False
+ atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
+ atomic_mocks['stat'].return_value = fake_stat
+ atomic_mocks['stat'].side_effect = None
+ atomic_mocks['mkstemp'].return_value = (None, '/path/to/tempfile')
+ atomic_mocks['mkstemp'].side_effect = None
+ atomic_am.selinux_enabled.return_value = selinux
+
+ atomic_am.atomic_move('/path/to/src', '/path/to/dest')
+ assert atomic_mocks['rename'].call_args_list == [mocker.call(b'/path/to/src', b'/path/to/dest'),
+ mocker.call(b'/path/to/tempfile', b'/path/to/dest')]
+ assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)]
+
+ if selinux:
+ assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')]
+ assert atomic_am.set_context_if_different.call_args_list == [mocker.call(b'/path/to/tempfile', mock_context, False),
+ mocker.call('/path/to/dest', mock_context, False)]
+ else:
+ assert not atomic_am.selinux_default_context.called
+ assert not atomic_am.set_context_if_different.called
diff --git a/test/units/module_utils/basic/test_command_nonexisting.py b/test/units/module_utils/basic/test_command_nonexisting.py
new file mode 100644
index 0000000..6ed7f91
--- /dev/null
+++ b/test/units/module_utils/basic/test_command_nonexisting.py
@@ -0,0 +1,31 @@
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import sys
+import pytest
+import json
+import sys
+import pytest
+import subprocess
+import ansible.module_utils.basic
+from ansible.module_utils._text import to_bytes
+from ansible.module_utils import basic
+
+
+def test_run_non_existent_command(monkeypatch):
+ """ Test that `command` returns std{out,err} even if the executable is not found """
+ def fail_json(msg, **kwargs):
+ assert kwargs["stderr"] == b''
+ assert kwargs["stdout"] == b''
+ sys.exit(1)
+
+ def popen(*args, **kwargs):
+ raise OSError()
+
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}})))
+ monkeypatch.setattr(subprocess, 'Popen', popen)
+
+ am = basic.AnsibleModule(argument_spec={})
+ monkeypatch.setattr(am, 'fail_json', fail_json)
+ with pytest.raises(SystemExit):
+ am.run_command("lecho", "whatever")
diff --git a/test/units/module_utils/basic/test_deprecate_warn.py b/test/units/module_utils/basic/test_deprecate_warn.py
new file mode 100644
index 0000000..581ba6d
--- /dev/null
+++ b/test/units/module_utils/basic/test_deprecate_warn.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.module_utils.common import warnings
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_warn(am, capfd):
+
+ am.warn('warning1')
+
+ with pytest.raises(SystemExit):
+ am.exit_json(warnings=['warning2'])
+ out, err = capfd.readouterr()
+ assert json.loads(out)['warnings'] == ['warning1', 'warning2']
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_deprecate(am, capfd, monkeypatch):
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+
+ am.deprecate('deprecation1')
+ am.deprecate('deprecation2', '2.3') # pylint: disable=ansible-deprecated-no-collection-name
+ am.deprecate('deprecation3', version='2.4') # pylint: disable=ansible-deprecated-no-collection-name
+ am.deprecate('deprecation4', date='2020-03-10') # pylint: disable=ansible-deprecated-no-collection-name
+ am.deprecate('deprecation5', collection_name='ansible.builtin')
+ am.deprecate('deprecation6', '2.3', collection_name='ansible.builtin')
+ am.deprecate('deprecation7', version='2.4', collection_name='ansible.builtin')
+ am.deprecate('deprecation8', date='2020-03-10', collection_name='ansible.builtin')
+
+ with pytest.raises(SystemExit):
+ am.exit_json(deprecations=['deprecation9', ('deprecation10', '2.4')])
+
+ out, err = capfd.readouterr()
+ output = json.loads(out)
+ assert ('warnings' not in output or output['warnings'] == [])
+ assert output['deprecations'] == [
+ {u'msg': u'deprecation1', u'version': None, u'collection_name': None},
+ {u'msg': u'deprecation2', u'version': '2.3', u'collection_name': None},
+ {u'msg': u'deprecation3', u'version': '2.4', u'collection_name': None},
+ {u'msg': u'deprecation4', u'date': '2020-03-10', u'collection_name': None},
+ {u'msg': u'deprecation5', u'version': None, u'collection_name': 'ansible.builtin'},
+ {u'msg': u'deprecation6', u'version': '2.3', u'collection_name': 'ansible.builtin'},
+ {u'msg': u'deprecation7', u'version': '2.4', u'collection_name': 'ansible.builtin'},
+ {u'msg': u'deprecation8', u'date': '2020-03-10', u'collection_name': 'ansible.builtin'},
+ {u'msg': u'deprecation9', u'version': None, u'collection_name': None},
+ {u'msg': u'deprecation10', u'version': '2.4', u'collection_name': None},
+ ]
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_deprecate_without_list(am, capfd):
+ with pytest.raises(SystemExit):
+ am.exit_json(deprecations='Simple deprecation warning')
+
+ out, err = capfd.readouterr()
+ output = json.loads(out)
+ assert ('warnings' not in output or output['warnings'] == [])
+ assert output['deprecations'] == [
+ {u'msg': u'Simple deprecation warning', u'version': None, u'collection_name': None},
+ ]
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_deprecate_without_list_version_date_not_set(am, capfd):
+ with pytest.raises(AssertionError) as ctx:
+ am.deprecate('Simple deprecation warning', date='', version='')
+ assert ctx.value.args[0] == "implementation error -- version and date must not both be set"
diff --git a/test/units/module_utils/basic/test_dict_converters.py b/test/units/module_utils/basic/test_dict_converters.py
new file mode 100644
index 0000000..f63ed9c
--- /dev/null
+++ b/test/units/module_utils/basic/test_dict_converters.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from units.mock.procenv import ModuleTestCase
+
+from ansible.module_utils.six.moves import builtins
+
+realimport = builtins.__import__
+
+
+class TestTextifyContainers(ModuleTestCase):
+ def test_module_utils_basic_json_dict_converters(self):
+ from ansible.module_utils.basic import json_dict_unicode_to_bytes, json_dict_bytes_to_unicode
+
+ test_data = dict(
+ item1=u"Fóo",
+ item2=[u"Bár", u"Bam"],
+ item3=dict(sub1=u"Súb"),
+ item4=(u"föo", u"bär", u"©"),
+ item5=42,
+ )
+ res = json_dict_unicode_to_bytes(test_data)
+ res2 = json_dict_bytes_to_unicode(res)
+
+ self.assertEqual(test_data, res2)
diff --git a/test/units/module_utils/basic/test_exit_json.py b/test/units/module_utils/basic/test_exit_json.py
new file mode 100644
index 0000000..4afcb27
--- /dev/null
+++ b/test/units/module_utils/basic/test_exit_json.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2015-2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+import sys
+import datetime
+
+import pytest
+
+from ansible.module_utils.common import warnings
+
+EMPTY_INVOCATION = {u'module_args': {}}
+DATETIME = datetime.datetime.strptime('2020-07-13 12:50:00', '%Y-%m-%d %H:%M:%S')
+
+
+class TestAnsibleModuleExitJson:
+ """
+ Test that various means of calling exitJson and FailJson return the messages they've been given
+ """
+ DATA = (
+ ({}, {'invocation': EMPTY_INVOCATION}),
+ ({'msg': 'message'}, {'msg': 'message', 'invocation': EMPTY_INVOCATION}),
+ ({'msg': 'success', 'changed': True},
+ {'msg': 'success', 'changed': True, 'invocation': EMPTY_INVOCATION}),
+ ({'msg': 'nochange', 'changed': False},
+ {'msg': 'nochange', 'changed': False, 'invocation': EMPTY_INVOCATION}),
+ ({'msg': 'message', 'date': DATETIME.date()},
+ {'msg': 'message', 'date': DATETIME.date().isoformat(), 'invocation': EMPTY_INVOCATION}),
+ ({'msg': 'message', 'datetime': DATETIME},
+ {'msg': 'message', 'datetime': DATETIME.isoformat(), 'invocation': EMPTY_INVOCATION}),
+ )
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ # pylint: disable=undefined-variable
+ @pytest.mark.parametrize('args, expected, stdin', ((a, e, {}) for a, e in DATA), indirect=['stdin'])
+ def test_exit_json_exits(self, am, capfd, args, expected, monkeypatch):
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+
+ with pytest.raises(SystemExit) as ctx:
+ am.exit_json(**args)
+ assert ctx.value.code == 0
+
+ out, err = capfd.readouterr()
+ return_val = json.loads(out)
+ assert return_val == expected
+
+ # Fail_json is only legal if it's called with a message
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('args, expected, stdin',
+ ((a, e, {}) for a, e in DATA if 'msg' in a), # pylint: disable=undefined-variable
+ indirect=['stdin'])
+ def test_fail_json_exits(self, am, capfd, args, expected, monkeypatch):
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+
+ with pytest.raises(SystemExit) as ctx:
+ am.fail_json(**args)
+ assert ctx.value.code == 1
+
+ out, err = capfd.readouterr()
+ return_val = json.loads(out)
+ # Fail_json should add failed=True
+ expected['failed'] = True
+ assert return_val == expected
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_fail_json_msg_positional(self, am, capfd, monkeypatch):
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+
+ with pytest.raises(SystemExit) as ctx:
+ am.fail_json('This is the msg')
+ assert ctx.value.code == 1
+
+ out, err = capfd.readouterr()
+ return_val = json.loads(out)
+ # Fail_json should add failed=True
+ assert return_val == {'msg': 'This is the msg', 'failed': True,
+ 'invocation': EMPTY_INVOCATION}
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_fail_json_msg_as_kwarg_after(self, am, capfd, monkeypatch):
+ """Test that msg as a kwarg after other kwargs works"""
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+
+ with pytest.raises(SystemExit) as ctx:
+ am.fail_json(arbitrary=42, msg='This is the msg')
+ assert ctx.value.code == 1
+
+ out, err = capfd.readouterr()
+ return_val = json.loads(out)
+ # Fail_json should add failed=True
+ assert return_val == {'msg': 'This is the msg', 'failed': True,
+ 'arbitrary': 42,
+ 'invocation': EMPTY_INVOCATION}
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_fail_json_no_msg(self, am):
+ with pytest.raises(TypeError) as ctx:
+ am.fail_json()
+
+ if sys.version_info < (3,):
+ error_msg = "fail_json() takes exactly 2 arguments (1 given)"
+ elif sys.version_info >= (3, 10):
+ error_msg = "AnsibleModule.fail_json() missing 1 required positional argument: 'msg'"
+ else:
+ error_msg = "fail_json() missing 1 required positional argument: 'msg'"
+
+ assert ctx.value.args[0] == error_msg
+
+
+class TestAnsibleModuleExitValuesRemoved:
+ """
+ Test that ExitJson and FailJson remove password-like values
+ """
+ OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+
+ DATA = (
+ (
+ dict(username='person', password='$ecret k3y'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
+ not_secret='following the leader', msg='here'),
+ dict(one=1, pwd=OMIT, url='https://username:password12345@foo.com/login/',
+ not_secret='following the leader', msg='here',
+ invocation=dict(module_args=dict(password=OMIT, token=None, username='person'))),
+ ),
+ (
+ dict(username='person', password='password12345'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
+ not_secret='following the leader', msg='here'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:********@foo.com/login/',
+ not_secret='following the leader', msg='here',
+ invocation=dict(module_args=dict(password=OMIT, token=None, username='person'))),
+ ),
+ (
+ dict(username='person', password='$ecret k3y'),
+ dict(one=1, pwd='$ecret k3y', url='https://username:$ecret k3y@foo.com/login/',
+ not_secret='following the leader', msg='here'),
+ dict(one=1, pwd=OMIT, url='https://username:********@foo.com/login/',
+ not_secret='following the leader', msg='here',
+ invocation=dict(module_args=dict(password=OMIT, token=None, username='person'))),
+ ),
+ )
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('am, stdin, return_val, expected',
+ (({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e)
+ for s, r, e in DATA), # pylint: disable=undefined-variable
+ indirect=['am', 'stdin'])
+ def test_exit_json_removes_values(self, am, capfd, return_val, expected, monkeypatch):
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+ with pytest.raises(SystemExit):
+ am.exit_json(**return_val)
+ out, err = capfd.readouterr()
+
+ assert json.loads(out) == expected
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('am, stdin, return_val, expected',
+ (({'username': {}, 'password': {'no_log': True}, 'token': {'no_log': True}}, s, r, e)
+ for s, r, e in DATA), # pylint: disable=undefined-variable
+ indirect=['am', 'stdin'])
+ def test_fail_json_removes_values(self, am, capfd, return_val, expected, monkeypatch):
+ monkeypatch.setattr(warnings, '_global_deprecations', [])
+ expected['failed'] = True
+ with pytest.raises(SystemExit):
+ am.fail_json(**return_val) == expected
+ out, err = capfd.readouterr()
+
+ assert json.loads(out) == expected
diff --git a/test/units/module_utils/basic/test_filesystem.py b/test/units/module_utils/basic/test_filesystem.py
new file mode 100644
index 0000000..f09cecf
--- /dev/null
+++ b/test/units/module_utils/basic/test_filesystem.py
@@ -0,0 +1,160 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from units.mock.procenv import ModuleTestCase
+
+from units.compat.mock import patch, MagicMock
+from ansible.module_utils.six.moves import builtins
+
+realimport = builtins.__import__
+
+
+class TestOtherFilesystem(ModuleTestCase):
+ def test_module_utils_basic_ansible_module_user_and_group(self):
+ from ansible.module_utils import basic
+ basic._ANSIBLE_ARGS = None
+
+ am = basic.AnsibleModule(
+ argument_spec=dict(),
+ )
+
+ mock_stat = MagicMock()
+ mock_stat.st_uid = 0
+ mock_stat.st_gid = 0
+
+ with patch('os.lstat', return_value=mock_stat):
+ self.assertEqual(am.user_and_group('/path/to/file'), (0, 0))
+
+ def test_module_utils_basic_ansible_module_find_mount_point(self):
+ from ansible.module_utils import basic
+ basic._ANSIBLE_ARGS = None
+
+ am = basic.AnsibleModule(
+ argument_spec=dict(),
+ )
+
+ def _mock_ismount(path):
+ if path == b'/':
+ return True
+ return False
+
+ with patch('os.path.ismount', side_effect=_mock_ismount):
+ self.assertEqual(am.find_mount_point('/root/fs/../mounted/path/to/whatever'), '/')
+
+ def _mock_ismount(path):
+ if path == b'/subdir/mount':
+ return True
+ if path == b'/':
+ return True
+ return False
+
+ with patch('os.path.ismount', side_effect=_mock_ismount):
+ self.assertEqual(am.find_mount_point('/subdir/mount/path/to/whatever'), '/subdir/mount')
+
+ def test_module_utils_basic_ansible_module_set_owner_if_different(self):
+ from ansible.module_utils import basic
+ basic._ANSIBLE_ARGS = None
+
+ am = basic.AnsibleModule(
+ argument_spec=dict(),
+ )
+
+ self.assertEqual(am.set_owner_if_different('/path/to/file', None, True), True)
+ self.assertEqual(am.set_owner_if_different('/path/to/file', None, False), False)
+
+ am.user_and_group = MagicMock(return_value=(500, 500))
+
+ with patch('os.lchown', return_value=None) as m:
+ self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True)
+ m.assert_called_with(b'/path/to/file', 0, -1)
+
+ def _mock_getpwnam(*args, **kwargs):
+ mock_pw = MagicMock()
+ mock_pw.pw_uid = 0
+ return mock_pw
+
+ m.reset_mock()
+ with patch('pwd.getpwnam', side_effect=_mock_getpwnam):
+ self.assertEqual(am.set_owner_if_different('/path/to/file', 'root', False), True)
+ m.assert_called_with(b'/path/to/file', 0, -1)
+
+ with patch('pwd.getpwnam', side_effect=KeyError):
+ self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False)
+
+ m.reset_mock()
+ am.check_mode = True
+ self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True)
+ self.assertEqual(m.called, False)
+ am.check_mode = False
+
+ with patch('os.lchown', side_effect=OSError) as m:
+ self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False)
+
+ def test_module_utils_basic_ansible_module_set_group_if_different(self):
+ from ansible.module_utils import basic
+ basic._ANSIBLE_ARGS = None
+
+ am = basic.AnsibleModule(
+ argument_spec=dict(),
+ )
+
+ self.assertEqual(am.set_group_if_different('/path/to/file', None, True), True)
+ self.assertEqual(am.set_group_if_different('/path/to/file', None, False), False)
+
+ am.user_and_group = MagicMock(return_value=(500, 500))
+
+ with patch('os.lchown', return_value=None) as m:
+ self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True)
+ m.assert_called_with(b'/path/to/file', -1, 0)
+
+ def _mock_getgrnam(*args, **kwargs):
+ mock_gr = MagicMock()
+ mock_gr.gr_gid = 0
+ return mock_gr
+
+ m.reset_mock()
+ with patch('grp.getgrnam', side_effect=_mock_getgrnam):
+ self.assertEqual(am.set_group_if_different('/path/to/file', 'root', False), True)
+ m.assert_called_with(b'/path/to/file', -1, 0)
+
+ with patch('grp.getgrnam', side_effect=KeyError):
+ self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False)
+
+ m.reset_mock()
+ am.check_mode = True
+ self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True)
+ self.assertEqual(m.called, False)
+ am.check_mode = False
+
+ with patch('os.lchown', side_effect=OSError) as m:
+ self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False)
+
+ def test_module_utils_basic_ansible_module_set_directory_attributes_if_different(self):
+ from ansible.module_utils import basic
+ basic._ANSIBLE_ARGS = None
+
+ am = basic.AnsibleModule(
+ argument_spec=dict(),
+ )
+
+ file_args = {
+ 'path': '/path/to/file',
+ 'mode': None,
+ 'owner': None,
+ 'group': None,
+ 'seuser': None,
+ 'serole': None,
+ 'setype': None,
+ 'selevel': None,
+ 'secontext': [None, None, None],
+ 'attributes': None,
+ }
+
+ self.assertEqual(am.set_directory_attributes_if_different(file_args, True), True)
+ self.assertEqual(am.set_directory_attributes_if_different(file_args, False), False)
diff --git a/test/units/module_utils/basic/test_get_file_attributes.py b/test/units/module_utils/basic/test_get_file_attributes.py
new file mode 100644
index 0000000..836529c
--- /dev/null
+++ b/test/units/module_utils/basic/test_get_file_attributes.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# Copyright:
+# (c) 2017, Pierre-Louis Bonicoli <pierre-louis@libregerbil.fr>
+# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from itertools import product
+
+from ansible.module_utils.basic import AnsibleModule
+
+import pytest
+
+
+DATA = (
+ (
+ '3353595900 --------------e---- /usr/lib32',
+ {'attr_flags': 'e', 'version': '3353595900', 'attributes': ['extents']}
+ ),
+ # with e2fsprogs < 1.43, output isn't aligned
+ (
+ '78053594 -----------I--e---- /usr/lib',
+ {'attr_flags': 'Ie', 'version': '78053594', 'attributes': ['indexed', 'extents']}
+ ),
+ (
+ '15711607 -------A------e---- /tmp/test',
+ {'attr_flags': 'Ae', 'version': '15711607', 'attributes': ['noatime', 'extents']}
+ ),
+ # with e2fsprogs >= 1.43, output is aligned
+ (
+ '78053594 -----------I--e---- /usr/lib',
+ {'attr_flags': 'Ie', 'version': '78053594', 'attributes': ['indexed', 'extents']}
+ ),
+ (
+ '15711607 -------A------e---- /tmp/test',
+ {'attr_flags': 'Ae', 'version': '15711607', 'attributes': ['noatime', 'extents']}
+ ),
+)
+
+NO_VERSION_DATA = (
+ (
+ '--------------e---- /usr/lib32',
+ {'attr_flags': 'e', 'attributes': ['extents']}
+ ),
+ (
+ '-----------I--e---- /usr/lib',
+ {'attr_flags': 'Ie', 'attributes': ['indexed', 'extents']}
+ ),
+ (
+ '-------A------e---- /tmp/test',
+ {'attr_flags': 'Ae', 'attributes': ['noatime', 'extents']}
+ ),
+)
+
+
+@pytest.mark.parametrize('stdin, data', product(({},), DATA), indirect=['stdin'])
+def test_get_file_attributes(am, stdin, mocker, data):
+ # Test #18731
+ mocker.patch.object(AnsibleModule, 'get_bin_path', return_value=(0, '/usr/bin/lsattr', ''))
+ mocker.patch.object(AnsibleModule, 'run_command', return_value=(0, data[0], ''))
+ result = am.get_file_attributes('/path/to/file')
+ for key, value in data[1].items():
+ assert key in result and result[key] == value
+
+
+@pytest.mark.parametrize('stdin, data', product(({},), NO_VERSION_DATA), indirect=['stdin'])
+def test_get_file_attributes_no_version(am, stdin, mocker, data):
+ # Test #18731
+ mocker.patch.object(AnsibleModule, 'get_bin_path', return_value=(0, '/usr/bin/lsattr', ''))
+ mocker.patch.object(AnsibleModule, 'run_command', return_value=(0, data[0], ''))
+ result = am.get_file_attributes('/path/to/file', include_version=False)
+ for key, value in data[1].items():
+ assert key in result and result[key] == value
diff --git a/test/units/module_utils/basic/test_get_module_path.py b/test/units/module_utils/basic/test_get_module_path.py
new file mode 100644
index 0000000..6ff4a3b
--- /dev/null
+++ b/test/units/module_utils/basic/test_get_module_path.py
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from units.mock.procenv import ModuleTestCase
+
+from units.compat.mock import patch
+from ansible.module_utils.six.moves import builtins
+
+realimport = builtins.__import__
+
+
+class TestGetModulePath(ModuleTestCase):
+ def test_module_utils_basic_get_module_path(self):
+ from ansible.module_utils.basic import get_module_path
+ with patch('os.path.realpath', return_value='/path/to/foo/'):
+ self.assertEqual(get_module_path(), '/path/to/foo')
diff --git a/test/units/module_utils/basic/test_heuristic_log_sanitize.py b/test/units/module_utils/basic/test_heuristic_log_sanitize.py
new file mode 100644
index 0000000..664b8a5
--- /dev/null
+++ b/test/units/module_utils/basic/test_heuristic_log_sanitize.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from units.compat import unittest
+from ansible.module_utils.basic import heuristic_log_sanitize
+
+
+class TestHeuristicLogSanitize(unittest.TestCase):
+ def setUp(self):
+ self.URL_SECRET = 'http://username:pas:word@foo.com/data'
+ self.SSH_SECRET = 'username:pas:word@foo.com/data'
+ self.clean_data = repr(self._gen_data(3, True, True, 'no_secret_here'))
+ self.url_data = repr(self._gen_data(3, True, True, self.URL_SECRET))
+ self.ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET))
+
+ def _gen_data(self, records, per_rec, top_level, secret_text):
+ hostvars = {'hostvars': {}}
+ for i in range(1, records, 1):
+ host_facts = {
+ 'host%s' % i: {
+ 'pstack': {
+ 'running': '875.1',
+ 'symlinked': '880.0',
+ 'tars': [],
+ 'versions': ['885.0']
+ },
+ }
+ }
+ if per_rec:
+ host_facts['host%s' % i]['secret'] = secret_text
+ hostvars['hostvars'].update(host_facts)
+ if top_level:
+ hostvars['secret'] = secret_text
+ return hostvars
+
+ def test_did_not_hide_too_much(self):
+ self.assertEqual(heuristic_log_sanitize(self.clean_data), self.clean_data)
+
+ def test_hides_url_secrets(self):
+ url_output = heuristic_log_sanitize(self.url_data)
+ # Basic functionality: Successfully hid the password
+ self.assertNotIn('pas:word', url_output)
+
+ # Slightly more advanced, we hid all of the password despite the ":"
+ self.assertNotIn('pas', url_output)
+
+ # In this implementation we replace the password with 8 "*" which is
+ # also the length of our password. The url fields should be able to
+ # accurately detect where the password ends so the length should be
+ # the same:
+ self.assertEqual(len(url_output), len(self.url_data))
+
+ def test_hides_ssh_secrets(self):
+ ssh_output = heuristic_log_sanitize(self.ssh_data)
+ self.assertNotIn('pas:word', ssh_output)
+
+ # Slightly more advanced, we hid all of the password despite the ":"
+ self.assertNotIn('pas', ssh_output)
+
+ # ssh checking is harder as the heuristic is overzealous in many
+ # cases. Since the input will have at least one ":" present before
+ # the password we can tell some things about the beginning and end of
+ # the data, though:
+ self.assertTrue(ssh_output.startswith("{'"))
+ self.assertTrue(ssh_output.endswith("}"))
+ self.assertIn(":********@foo.com/data'", ssh_output)
+
+ def test_hides_parameter_secrets(self):
+ output = heuristic_log_sanitize('token="secret", user="person", token_entry="test=secret"', frozenset(['secret']))
+ self.assertNotIn('secret', output)
+
+ def test_no_password(self):
+ self.assertEqual(heuristic_log_sanitize('foo@bar'), 'foo@bar')
diff --git a/test/units/module_utils/basic/test_imports.py b/test/units/module_utils/basic/test_imports.py
new file mode 100644
index 0000000..d1a5f37
--- /dev/null
+++ b/test/units/module_utils/basic/test_imports.py
@@ -0,0 +1,128 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import sys
+
+from units.mock.procenv import ModuleTestCase
+
+from units.compat import unittest
+from units.compat.mock import patch
+from ansible.module_utils.six.moves import builtins
+
+realimport = builtins.__import__
+
+
+class TestImports(ModuleTestCase):
+
+ def clear_modules(self, mods):
+ for mod in mods:
+ if mod in sys.modules:
+ del sys.modules[mod]
+
+ @patch.object(builtins, '__import__')
+ def test_module_utils_basic_import_syslog(self, mock_import):
+ def _mock_import(name, *args, **kwargs):
+ if name == 'syslog':
+ raise ImportError
+ return realimport(name, *args, **kwargs)
+
+ self.clear_modules(['syslog', 'ansible.module_utils.basic'])
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertTrue(mod.module_utils.basic.HAS_SYSLOG)
+
+ self.clear_modules(['syslog', 'ansible.module_utils.basic'])
+ mock_import.side_effect = _mock_import
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertFalse(mod.module_utils.basic.HAS_SYSLOG)
+
+ @patch.object(builtins, '__import__')
+ def test_module_utils_basic_import_selinux(self, mock_import):
+ def _mock_import(name, globals=None, locals=None, fromlist=tuple(), level=0, **kwargs):
+ if name == 'ansible.module_utils.compat' and fromlist == ('selinux',):
+ raise ImportError
+ return realimport(name, globals=globals, locals=locals, fromlist=fromlist, level=level, **kwargs)
+
+ try:
+ self.clear_modules(['ansible.module_utils.compat.selinux', 'ansible.module_utils.basic'])
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertTrue(mod.module_utils.basic.HAVE_SELINUX)
+ except ImportError:
+ # no selinux on test system, so skip
+ pass
+
+ self.clear_modules(['ansible.module_utils.compat.selinux', 'ansible.module_utils.basic'])
+ mock_import.side_effect = _mock_import
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertFalse(mod.module_utils.basic.HAVE_SELINUX)
+
+ @patch.object(builtins, '__import__')
+ def test_module_utils_basic_import_json(self, mock_import):
+ def _mock_import(name, *args, **kwargs):
+ if name == 'ansible.module_utils.common._json_compat':
+ raise ImportError
+ return realimport(name, *args, **kwargs)
+
+ self.clear_modules(['json', 'ansible.module_utils.basic'])
+ builtins.__import__('ansible.module_utils.basic')
+ self.clear_modules(['json', 'ansible.module_utils.basic'])
+ mock_import.side_effect = _mock_import
+ with self.assertRaises(SystemExit):
+ builtins.__import__('ansible.module_utils.basic')
+
+ # FIXME: doesn't work yet
+ # @patch.object(builtins, 'bytes')
+ # def test_module_utils_basic_bytes(self, mock_bytes):
+ # mock_bytes.side_effect = NameError()
+ # from ansible.module_utils import basic
+
+ @patch.object(builtins, '__import__')
+ @unittest.skipIf(sys.version_info[0] >= 3, "literal_eval is available in every version of Python3")
+ def test_module_utils_basic_import_literal_eval(self, mock_import):
+ def _mock_import(name, *args, **kwargs):
+ try:
+ fromlist = kwargs.get('fromlist', args[2])
+ except IndexError:
+ fromlist = []
+ if name == 'ast' and 'literal_eval' in fromlist:
+ raise ImportError
+ return realimport(name, *args, **kwargs)
+
+ mock_import.side_effect = _mock_import
+ self.clear_modules(['ast', 'ansible.module_utils.basic'])
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertEqual(mod.module_utils.basic.literal_eval("'1'"), "1")
+ self.assertEqual(mod.module_utils.basic.literal_eval("1"), 1)
+ self.assertEqual(mod.module_utils.basic.literal_eval("-1"), -1)
+ self.assertEqual(mod.module_utils.basic.literal_eval("(1,2,3)"), (1, 2, 3))
+ self.assertEqual(mod.module_utils.basic.literal_eval("[1]"), [1])
+ self.assertEqual(mod.module_utils.basic.literal_eval("True"), True)
+ self.assertEqual(mod.module_utils.basic.literal_eval("False"), False)
+ self.assertEqual(mod.module_utils.basic.literal_eval("None"), None)
+ # self.assertEqual(mod.module_utils.basic.literal_eval('{"a": 1}'), dict(a=1))
+ self.assertRaises(ValueError, mod.module_utils.basic.literal_eval, "asdfasdfasdf")
+
+ @patch.object(builtins, '__import__')
+ def test_module_utils_basic_import_systemd_journal(self, mock_import):
+ def _mock_import(name, *args, **kwargs):
+ try:
+ fromlist = kwargs.get('fromlist', args[2])
+ except IndexError:
+ fromlist = []
+ if name == 'systemd' and 'journal' in fromlist:
+ raise ImportError
+ return realimport(name, *args, **kwargs)
+
+ self.clear_modules(['systemd', 'ansible.module_utils.basic'])
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertTrue(mod.module_utils.basic.has_journal)
+
+ self.clear_modules(['systemd', 'ansible.module_utils.basic'])
+ mock_import.side_effect = _mock_import
+ mod = builtins.__import__('ansible.module_utils.basic')
+ self.assertFalse(mod.module_utils.basic.has_journal)
diff --git a/test/units/module_utils/basic/test_log.py b/test/units/module_utils/basic/test_log.py
new file mode 100644
index 0000000..f3f764f
--- /dev/null
+++ b/test/units/module_utils/basic/test_log.py
@@ -0,0 +1,152 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2017, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import syslog
+from itertools import product
+
+import pytest
+
+import ansible.module_utils.basic
+from ansible.module_utils.six import PY3
+
+
+class TestAnsibleModuleLogSmokeTest:
+ DATA = [u'Text string', u'Toshio くらとみ non-ascii test']
+ DATA = DATA + [d.encode('utf-8') for d in DATA]
+ DATA += [b'non-utf8 :\xff: test']
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable
+ def test_smoketest_syslog(self, am, mocker, msg):
+ # These talk to the live daemons on the system. Need to do this to
+ # show that what we send doesn't cause an issue once it gets to the
+ # daemon. These are just smoketests to test that we don't fail.
+ mocker.patch('ansible.module_utils.basic.has_journal', False)
+
+ am.log(u'Text string')
+ am.log(u'Toshio くらとみ non-ascii test')
+
+ am.log(b'Byte string')
+ am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8'))
+ am.log(b'non-utf8 :\xff: test')
+
+ @pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed')
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('msg, stdin', ((m, {}) for m in DATA), indirect=['stdin']) # pylint: disable=undefined-variable
+ def test_smoketest_journal(self, am, mocker, msg):
+ # These talk to the live daemons on the system. Need to do this to
+ # show that what we send doesn't cause an issue once it gets to the
+ # daemon. These are just smoketests to test that we don't fail.
+ mocker.patch('ansible.module_utils.basic.has_journal', True)
+
+ am.log(u'Text string')
+ am.log(u'Toshio くらとみ non-ascii test')
+
+ am.log(b'Byte string')
+ am.log(u'Toshio くらとみ non-ascii test'.encode('utf-8'))
+ am.log(b'non-utf8 :\xff: test')
+
+
+class TestAnsibleModuleLogSyslog:
+ """Test the AnsibleModule Log Method"""
+
+ PY2_OUTPUT_DATA = [
+ (u'Text string', b'Text string'),
+ (u'Toshio くらとみ non-ascii test', u'Toshio くらとみ non-ascii test'.encode('utf-8')),
+ (b'Byte string', b'Byte string'),
+ (u'Toshio くらとみ non-ascii test'.encode('utf-8'), u'Toshio くらとみ non-ascii test'.encode('utf-8')),
+ (b'non-utf8 :\xff: test', b'non-utf8 :\xff: test'.decode('utf-8', 'replace').encode('utf-8')),
+ ]
+
+ PY3_OUTPUT_DATA = [
+ (u'Text string', u'Text string'),
+ (u'Toshio くらとみ non-ascii test', u'Toshio くらとみ non-ascii test'),
+ (b'Byte string', u'Byte string'),
+ (u'Toshio くらとみ non-ascii test'.encode('utf-8'), u'Toshio くらとみ non-ascii test'),
+ (b'non-utf8 :\xff: test', b'non-utf8 :\xff: test'.decode('utf-8', 'replace')),
+ ]
+
+ OUTPUT_DATA = PY3_OUTPUT_DATA if PY3 else PY2_OUTPUT_DATA
+
+ @pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin'])
+ def test_no_log(self, am, mocker, no_log):
+ """Test that when no_log is set, logging does not occur"""
+ mock_syslog = mocker.patch('syslog.syslog', autospec=True)
+ mocker.patch('ansible.module_utils.basic.has_journal', False)
+ am.no_log = no_log
+ am.log('unittest no_log')
+ if no_log:
+ assert not mock_syslog.called
+ else:
+ mock_syslog.assert_called_once_with(syslog.LOG_INFO, 'unittest no_log')
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('msg, param, stdin',
+ ((m, p, {}) for m, p in OUTPUT_DATA), # pylint: disable=undefined-variable
+ indirect=['stdin'])
+ def test_output_matches(self, am, mocker, msg, param):
+ """Check that log messages are sent correctly"""
+ mocker.patch('ansible.module_utils.basic.has_journal', False)
+ mock_syslog = mocker.patch('syslog.syslog', autospec=True)
+
+ am.log(msg)
+ mock_syslog.assert_called_once_with(syslog.LOG_INFO, param)
+
+
+@pytest.mark.skipif(not ansible.module_utils.basic.has_journal, reason='python systemd bindings not installed')
+class TestAnsibleModuleLogJournal:
+ """Test the AnsibleModule Log Method"""
+
+ OUTPUT_DATA = [
+ (u'Text string', u'Text string'),
+ (u'Toshio くらとみ non-ascii test', u'Toshio くらとみ non-ascii test'),
+ (b'Byte string', u'Byte string'),
+ (u'Toshio くらとみ non-ascii test'.encode('utf-8'), u'Toshio くらとみ non-ascii test'),
+ (b'non-utf8 :\xff: test', b'non-utf8 :\xff: test'.decode('utf-8', 'replace')),
+ ]
+
+ @pytest.mark.parametrize('no_log, stdin', (product((True, False), [{}])), indirect=['stdin'])
+ def test_no_log(self, am, mocker, no_log):
+ journal_send = mocker.patch('systemd.journal.send')
+ am.no_log = no_log
+ am.log('unittest no_log')
+ if no_log:
+ assert not journal_send.called
+ else:
+ assert journal_send.called == 1
+ # Message
+ # call_args is a 2-tuple of (arg_list, kwarg_dict)
+ assert journal_send.call_args[1]['MESSAGE'].endswith('unittest no_log'), 'Message was not sent to log'
+ # log adds this journal field
+ assert 'MODULE' in journal_send.call_args[1]
+ assert 'basic.py' in journal_send.call_args[1]['MODULE']
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('msg, param, stdin',
+ ((m, p, {}) for m, p in OUTPUT_DATA), # pylint: disable=undefined-variable
+ indirect=['stdin'])
+ def test_output_matches(self, am, mocker, msg, param):
+ journal_send = mocker.patch('systemd.journal.send')
+ am.log(msg)
+ assert journal_send.call_count == 1, 'journal.send not called exactly once'
+ assert journal_send.call_args[1]['MESSAGE'].endswith(param)
+
+ @pytest.mark.parametrize('stdin', ({},), indirect=['stdin'])
+ def test_log_args(self, am, mocker):
+ journal_send = mocker.patch('systemd.journal.send')
+ am.log('unittest log_args', log_args=dict(TEST='log unittest'))
+ assert journal_send.called == 1
+ assert journal_send.call_args[1]['MESSAGE'].endswith('unittest log_args'), 'Message was not sent to log'
+
+ # log adds this journal field
+ assert 'MODULE' in journal_send.call_args[1]
+ assert 'basic.py' in journal_send.call_args[1]['MODULE']
+
+ # We added this journal field
+ assert 'TEST' in journal_send.call_args[1]
+ assert 'log unittest' in journal_send.call_args[1]['TEST']
diff --git a/test/units/module_utils/basic/test_no_log.py b/test/units/module_utils/basic/test_no_log.py
new file mode 100644
index 0000000..c479702
--- /dev/null
+++ b/test/units/module_utils/basic/test_no_log.py
@@ -0,0 +1,160 @@
+# -*- coding: utf-8 -*-
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from units.compat import unittest
+
+from ansible.module_utils.basic import remove_values
+from ansible.module_utils.common.parameters import _return_datastructure_name
+
+
+class TestReturnValues(unittest.TestCase):
+ dataset = (
+ ('string', frozenset(['string'])),
+ ('', frozenset()),
+ (1, frozenset(['1'])),
+ (1.0, frozenset(['1.0'])),
+ (False, frozenset()),
+ (['1', '2', '3'], frozenset(['1', '2', '3'])),
+ (('1', '2', '3'), frozenset(['1', '2', '3'])),
+ ({'one': 1, 'two': 'dos'}, frozenset(['1', 'dos'])),
+ (
+ {
+ 'one': 1,
+ 'two': 'dos',
+ 'three': [
+ 'amigos', 'musketeers', None, {
+ 'ping': 'pong',
+ 'base': (
+ 'balls', 'raquets'
+ )
+ }
+ ]
+ },
+ frozenset(['1', 'dos', 'amigos', 'musketeers', 'pong', 'balls', 'raquets'])
+ ),
+ (u'Toshio くらとみ', frozenset(['Toshio くらとみ'])),
+ ('Toshio くらとみ', frozenset(['Toshio くらとみ'])),
+ )
+
+ def test_return_datastructure_name(self):
+ for data, expected in self.dataset:
+ self.assertEqual(frozenset(_return_datastructure_name(data)), expected)
+
+ def test_unknown_type(self):
+ self.assertRaises(TypeError, frozenset, _return_datastructure_name(object()))
+
+
+class TestRemoveValues(unittest.TestCase):
+ OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
+ dataset_no_remove = (
+ ('string', frozenset(['nope'])),
+ (1234, frozenset(['4321'])),
+ (False, frozenset(['4321'])),
+ (1.0, frozenset(['4321'])),
+ (['string', 'strang', 'strung'], frozenset(['nope'])),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['nope'])),
+ (
+ {
+ 'one': 1,
+ 'two': 'dos',
+ 'three': [
+ 'amigos', 'musketeers', None, {
+ 'ping': 'pong', 'base': ['balls', 'raquets']
+ }
+ ]
+ },
+ frozenset(['nope'])
+ ),
+ (u'Toshio くら'.encode('utf-8'), frozenset([u'とみ'.encode('utf-8')])),
+ (u'Toshio くら', frozenset([u'とみ'])),
+ )
+ dataset_remove = (
+ ('string', frozenset(['string']), OMIT),
+ (1234, frozenset(['1234']), OMIT),
+ (1234, frozenset(['23']), OMIT),
+ (1.0, frozenset(['1.0']), OMIT),
+ (['string', 'strang', 'strung'], frozenset(['strang']), ['string', OMIT, 'strung']),
+ (['string', 'strang', 'strung'], frozenset(['strang', 'string', 'strung']), [OMIT, OMIT, OMIT]),
+ (('string', 'strang', 'strung'), frozenset(['string', 'strung']), [OMIT, 'strang', OMIT]),
+ ((1234567890, 345678, 987654321), frozenset(['1234567890']), [OMIT, 345678, 987654321]),
+ ((1234567890, 345678, 987654321), frozenset(['345678']), [OMIT, OMIT, 987654321]),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key']), {'one': 1, 'two': 'dos', 'secret': OMIT}),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key', 'dos', '1']), {'one': OMIT, 'two': OMIT, 'secret': OMIT}),
+ ({'one': 1, 'two': 'dos', 'secret': 'key'}, frozenset(['key', 'dos', '1']), {'one': OMIT, 'two': OMIT, 'secret': OMIT}),
+ (
+ {
+ 'one': 1,
+ 'two': 'dos',
+ 'three': [
+ 'amigos', 'musketeers', None, {
+ 'ping': 'pong', 'base': [
+ 'balls', 'raquets'
+ ]
+ }
+ ]
+ },
+ frozenset(['balls', 'base', 'pong', 'amigos']),
+ {
+ 'one': 1,
+ 'two': 'dos',
+ 'three': [
+ OMIT, 'musketeers', None, {
+ 'ping': OMIT,
+ 'base': [
+ OMIT, 'raquets'
+ ]
+ }
+ ]
+ }
+ ),
+ (
+ 'This sentence has an enigma wrapped in a mystery inside of a secret. - mr mystery',
+ frozenset(['enigma', 'mystery', 'secret']),
+ 'This sentence has an ******** wrapped in a ******** inside of a ********. - mr ********'
+ ),
+ (u'Toshio くらとみ'.encode('utf-8'), frozenset([u'くらとみ'.encode('utf-8')]), u'Toshio ********'.encode('utf-8')),
+ (u'Toshio くらとみ', frozenset([u'くらとみ']), u'Toshio ********'),
+ )
+
+ def test_no_removal(self):
+ for value, no_log_strings in self.dataset_no_remove:
+ self.assertEqual(remove_values(value, no_log_strings), value)
+
+ def test_strings_to_remove(self):
+ for value, no_log_strings, expected in self.dataset_remove:
+ self.assertEqual(remove_values(value, no_log_strings), expected)
+
+ def test_unknown_type(self):
+ self.assertRaises(TypeError, remove_values, object(), frozenset())
+
+ def test_hit_recursion_limit(self):
+ """ Check that we do not hit a recursion limit"""
+ data_list = []
+ inner_list = data_list
+ for i in range(0, 10000):
+ new_list = []
+ inner_list.append(new_list)
+ inner_list = new_list
+ inner_list.append('secret')
+
+ # Check that this does not hit a recursion limit
+ actual_data_list = remove_values(data_list, frozenset(('secret',)))
+
+ levels = 0
+ inner_list = actual_data_list
+ while inner_list:
+ if isinstance(inner_list, list):
+ self.assertEqual(len(inner_list), 1)
+ else:
+ levels -= 1
+ break
+ inner_list = inner_list[0]
+ levels += 1
+
+ self.assertEqual(inner_list, self.OMIT)
+ self.assertEqual(levels, 10000)
diff --git a/test/units/module_utils/basic/test_platform_distribution.py b/test/units/module_utils/basic/test_platform_distribution.py
new file mode 100644
index 0000000..3c1afb7
--- /dev/null
+++ b/test/units/module_utils/basic/test_platform_distribution.py
@@ -0,0 +1,188 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017-2018 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import pytest
+
+from units.compat.mock import patch
+
+from ansible.module_utils.six.moves import builtins
+
+# Functions being tested
+from ansible.module_utils.basic import get_platform
+from ansible.module_utils.basic import get_all_subclasses
+from ansible.module_utils.basic import get_distribution
+from ansible.module_utils.basic import get_distribution_version
+from ansible.module_utils.basic import load_platform_subclass
+
+
+realimport = builtins.__import__
+
+
+@pytest.fixture
+def platform_linux(mocker):
+ mocker.patch('platform.system', return_value='Linux')
+
+
+#
+# get_platform tests
+#
+
+def test_get_platform():
+ with patch('platform.system', return_value='foo'):
+ assert get_platform() == 'foo'
+
+
+#
+# get_distribution tests
+#
+
+@pytest.mark.usefixtures("platform_linux")
+class TestGetDistribution:
+ """Tests for get_distribution that have to find something"""
+ def test_distro_known(self):
+ with patch('ansible.module_utils.distro.id', return_value="alpine"):
+ assert get_distribution() == "Alpine"
+
+ with patch('ansible.module_utils.distro.id', return_value="arch"):
+ assert get_distribution() == "Arch"
+
+ with patch('ansible.module_utils.distro.id', return_value="centos"):
+ assert get_distribution() == "Centos"
+
+ with patch('ansible.module_utils.distro.id', return_value="clear-linux-os"):
+ assert get_distribution() == "Clear-linux-os"
+
+ with patch('ansible.module_utils.distro.id', return_value="coreos"):
+ assert get_distribution() == "Coreos"
+
+ with patch('ansible.module_utils.distro.id', return_value="debian"):
+ assert get_distribution() == "Debian"
+
+ with patch('ansible.module_utils.distro.id', return_value="flatcar"):
+ assert get_distribution() == "Flatcar"
+
+ with patch('ansible.module_utils.distro.id', return_value="linuxmint"):
+ assert get_distribution() == "Linuxmint"
+
+ with patch('ansible.module_utils.distro.id', return_value="opensuse"):
+ assert get_distribution() == "Opensuse"
+
+ with patch('ansible.module_utils.distro.id', return_value="oracle"):
+ assert get_distribution() == "Oracle"
+
+ with patch('ansible.module_utils.distro.id', return_value="raspian"):
+ assert get_distribution() == "Raspian"
+
+ with patch('ansible.module_utils.distro.id', return_value="rhel"):
+ assert get_distribution() == "Redhat"
+
+ with patch('ansible.module_utils.distro.id', return_value="ubuntu"):
+ assert get_distribution() == "Ubuntu"
+
+ with patch('ansible.module_utils.distro.id', return_value="virtuozzo"):
+ assert get_distribution() == "Virtuozzo"
+
+ with patch('ansible.module_utils.distro.id', return_value="foo"):
+ assert get_distribution() == "Foo"
+
+ def test_distro_unknown(self):
+ with patch('ansible.module_utils.distro.id', return_value=""):
+ assert get_distribution() == "OtherLinux"
+
+ def test_distro_amazon_linux_short(self):
+ with patch('ansible.module_utils.distro.id', return_value="amzn"):
+ assert get_distribution() == "Amazon"
+
+ def test_distro_amazon_linux_long(self):
+ with patch('ansible.module_utils.distro.id', return_value="amazon"):
+ assert get_distribution() == "Amazon"
+
+
+#
+# get_distribution_version tests
+#
+
+
+@pytest.mark.usefixtures("platform_linux")
+def test_distro_found():
+ with patch('ansible.module_utils.distro.version', return_value="1"):
+ assert get_distribution_version() == "1"
+
+
+#
+# Tests for LoadPlatformSubclass
+#
+
+class TestLoadPlatformSubclass:
+ class LinuxTest:
+ pass
+
+ class Foo(LinuxTest):
+ platform = "Linux"
+ distribution = None
+
+ class Bar(LinuxTest):
+ platform = "Linux"
+ distribution = "Bar"
+
+ def test_not_linux(self):
+ # if neither match, the fallback should be the top-level class
+ with patch('platform.system', return_value="Foo"):
+ with patch('ansible.module_utils.common.sys_info.get_distribution', return_value=None):
+ assert isinstance(load_platform_subclass(self.LinuxTest), self.LinuxTest)
+
+ @pytest.mark.usefixtures("platform_linux")
+ def test_get_distribution_none(self):
+ # match just the platform class, not a specific distribution
+ with patch('ansible.module_utils.common.sys_info.get_distribution', return_value=None):
+ assert isinstance(load_platform_subclass(self.LinuxTest), self.Foo)
+
+ @pytest.mark.usefixtures("platform_linux")
+ def test_get_distribution_found(self):
+ # match both the distribution and platform class
+ with patch('ansible.module_utils.common.sys_info.get_distribution', return_value="Bar"):
+ assert isinstance(load_platform_subclass(self.LinuxTest), self.Bar)
+
+
+#
+# Tests for get_all_subclasses
+#
+
+class TestGetAllSubclasses:
+ class Base:
+ pass
+
+ class BranchI(Base):
+ pass
+
+ class BranchII(Base):
+ pass
+
+ class BranchIA(BranchI):
+ pass
+
+ class BranchIB(BranchI):
+ pass
+
+ class BranchIIA(BranchII):
+ pass
+
+ class BranchIIB(BranchII):
+ pass
+
+ def test_bottom_level(self):
+ assert get_all_subclasses(self.BranchIIB) == []
+
+ def test_one_inheritance(self):
+ assert set(get_all_subclasses(self.BranchII)) == set([self.BranchIIA, self.BranchIIB])
+
+ def test_toplevel(self):
+ assert set(get_all_subclasses(self.Base)) == set([self.BranchI, self.BranchII,
+ self.BranchIA, self.BranchIB,
+ self.BranchIIA, self.BranchIIB])
diff --git a/test/units/module_utils/basic/test_run_command.py b/test/units/module_utils/basic/test_run_command.py
new file mode 100644
index 0000000..04211e2
--- /dev/null
+++ b/test/units/module_utils/basic/test_run_command.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import errno
+from itertools import product
+from io import BytesIO
+
+import pytest
+
+from ansible.module_utils._text import to_native
+from ansible.module_utils.six import PY2
+from ansible.module_utils.compat import selectors
+
+
+class OpenBytesIO(BytesIO):
+ """BytesIO with dummy close() method
+
+ So that you can inspect the content after close() was called.
+ """
+
+ def close(self):
+ pass
+
+
+@pytest.fixture
+def mock_os(mocker):
+ def mock_os_chdir(path):
+ if path == '/inaccessible':
+ raise OSError(errno.EPERM, "Permission denied: '/inaccessible'")
+
+ def mock_os_abspath(path):
+ if path.startswith('/'):
+ return path
+ else:
+ return os.getcwd.return_value + '/' + path
+
+ os = mocker.patch('ansible.module_utils.basic.os')
+
+ os.path.expandvars.side_effect = lambda x: x
+ os.path.expanduser.side_effect = lambda x: x
+ os.environ = {'PATH': '/bin'}
+ os.getcwd.return_value = '/home/foo'
+ os.path.isdir.return_value = True
+ os.chdir.side_effect = mock_os_chdir
+ os.path.abspath.side_effect = mock_os_abspath
+
+ yield os
+
+
+class DummyFileObj():
+ def __init__(self, fileobj):
+ self.fileobj = fileobj
+
+
+class SpecialBytesIO(BytesIO):
+ def __init__(self, *args, **kwargs):
+ fh = kwargs.pop('fh', None)
+ super(SpecialBytesIO, self).__init__(*args, **kwargs)
+ self.fh = fh
+
+ def fileno(self):
+ return self.fh
+
+ # We need to do this because some of our tests create a new value for stdout and stderr
+ # The new value is able to affect the string that is returned by the subprocess stdout and
+ # stderr but by the time the test gets it, it is too late to change the SpecialBytesIO that
+ # subprocess.Popen returns for stdout and stderr. If we could figure out how to change those as
+ # well, then we wouldn't need this.
+ def __eq__(self, other):
+ if id(self) == id(other) or self.fh == other.fileno():
+ return True
+ return False
+
+
+class DummyKey:
+ def __init__(self, fileobj):
+ self.fileobj = fileobj
+
+
+@pytest.fixture
+def mock_subprocess(mocker):
+
+ class MockSelector(selectors.BaseSelector):
+ def __init__(self):
+ super(MockSelector, self).__init__()
+ self._file_objs = []
+
+ def register(self, fileobj, events, data=None):
+ self._file_objs.append(fileobj)
+
+ def unregister(self, fileobj):
+ self._file_objs.remove(fileobj)
+
+ def select(self, timeout=None):
+ ready = []
+ for file_obj in self._file_objs:
+ ready.append((DummyKey(subprocess._output[file_obj.fileno()]), selectors.EVENT_READ))
+ return ready
+
+ def get_map(self):
+ return self._file_objs
+
+ def close(self):
+ super(MockSelector, self).close()
+ self._file_objs = []
+
+ selectors.DefaultSelector = MockSelector
+
+ subprocess = mocker.patch('ansible.module_utils.basic.subprocess')
+ subprocess._output = {mocker.sentinel.stdout: SpecialBytesIO(b'', fh=mocker.sentinel.stdout),
+ mocker.sentinel.stderr: SpecialBytesIO(b'', fh=mocker.sentinel.stderr)}
+
+ cmd = mocker.MagicMock()
+ cmd.returncode = 0
+ cmd.stdin = OpenBytesIO()
+ cmd.stdout = subprocess._output[mocker.sentinel.stdout]
+ cmd.stderr = subprocess._output[mocker.sentinel.stderr]
+ subprocess.Popen.return_value = cmd
+
+ yield subprocess
+
+
+@pytest.fixture()
+def rc_am(mocker, am, mock_os, mock_subprocess):
+ am.fail_json = mocker.MagicMock(side_effect=SystemExit)
+ am._os = mock_os
+ am._subprocess = mock_subprocess
+ yield am
+
+
+class TestRunCommandArgs:
+ # Format is command as passed to run_command, command to Popen as list, command to Popen as string
+ ARGS_DATA = (
+ (['/bin/ls', 'a', 'b', 'c'], [b'/bin/ls', b'a', b'b', b'c'], b'/bin/ls a b c'),
+ ('/bin/ls a " b" "c "', [b'/bin/ls', b'a', b' b', b'c '], b'/bin/ls a " b" "c "'),
+ )
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ # pylint: disable=undefined-variable
+ @pytest.mark.parametrize('cmd, expected, shell, stdin',
+ ((arg, cmd_str if sh else cmd_lst, sh, {})
+ for (arg, cmd_lst, cmd_str), sh in product(ARGS_DATA, (True, False))),
+ indirect=['stdin'])
+ def test_args(self, cmd, expected, shell, rc_am):
+ rc_am.run_command(cmd, use_unsafe_shell=shell)
+ assert rc_am._subprocess.Popen.called
+ args, kwargs = rc_am._subprocess.Popen.call_args
+ assert args == (expected, )
+ assert kwargs['shell'] == shell
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_tuple_as_args(self, rc_am):
+ with pytest.raises(SystemExit):
+ rc_am.run_command(('ls', '/'))
+ assert rc_am.fail_json.called
+
+
+class TestRunCommandCwd:
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_cwd(self, mocker, rc_am):
+ rc_am.run_command('/bin/ls', cwd='/new')
+ assert rc_am._subprocess.Popen.mock_calls[0][2]['cwd'] == b'/new'
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_cwd_relative_path(self, mocker, rc_am):
+ rc_am.run_command('/bin/ls', cwd='sub-dir')
+ assert rc_am._subprocess.Popen.mock_calls[0][2]['cwd'] == b'/home/foo/sub-dir'
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_cwd_not_a_dir(self, mocker, rc_am):
+ rc_am.run_command('/bin/ls', cwd='/not-a-dir')
+ assert rc_am._subprocess.Popen.mock_calls[0][2]['cwd'] == b'/not-a-dir'
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_cwd_not_a_dir_noignore(self, rc_am):
+ rc_am._os.path.isdir.side_effect = lambda d: d != b'/not-a-dir'
+ with pytest.raises(SystemExit):
+ rc_am.run_command('/bin/ls', cwd='/not-a-dir', ignore_invalid_cwd=False)
+ assert rc_am.fail_json.called
+
+
+class TestRunCommandPrompt:
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_prompt_bad_regex(self, rc_am):
+ with pytest.raises(SystemExit):
+ rc_am.run_command('foo', prompt_regex='[pP)assword:')
+ assert rc_am.fail_json.called
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_prompt_no_match(self, mocker, rc_am):
+ rc_am._os._cmd_out[mocker.sentinel.stdout] = BytesIO(b'hello')
+ (rc, _, _) = rc_am.run_command('foo', prompt_regex='[pP]assword:')
+ assert rc == 0
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_prompt_match_wo_data(self, mocker, rc_am):
+ rc_am._subprocess._output = {mocker.sentinel.stdout:
+ SpecialBytesIO(b'Authentication required!\nEnter password: ',
+ fh=mocker.sentinel.stdout),
+ mocker.sentinel.stderr:
+ SpecialBytesIO(b'', fh=mocker.sentinel.stderr)}
+ (rc, _, _) = rc_am.run_command('foo', prompt_regex=r'[pP]assword:', data=None)
+ assert rc == 257
+
+
+class TestRunCommandRc:
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_check_rc_false(self, rc_am):
+ rc_am._subprocess.Popen.return_value.returncode = 1
+ (rc, _, _) = rc_am.run_command('/bin/false', check_rc=False)
+ assert rc == 1
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_check_rc_true(self, rc_am):
+ rc_am._subprocess.Popen.return_value.returncode = 1
+ with pytest.raises(SystemExit):
+ rc_am.run_command('/bin/false', check_rc=True)
+ assert rc_am.fail_json.called
+ args, kwargs = rc_am.fail_json.call_args
+ assert kwargs['rc'] == 1
+
+
+class TestRunCommandOutput:
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_text_stdin(self, rc_am):
+ (rc, stdout, stderr) = rc_am.run_command('/bin/foo', data='hello world')
+ assert rc_am._subprocess.Popen.return_value.stdin.getvalue() == b'hello world\n'
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_ascii_stdout(self, mocker, rc_am):
+ rc_am._subprocess._output = {mocker.sentinel.stdout:
+ SpecialBytesIO(b'hello', fh=mocker.sentinel.stdout),
+ mocker.sentinel.stderr:
+ SpecialBytesIO(b'', fh=mocker.sentinel.stderr)}
+ (rc, stdout, stderr) = rc_am.run_command('/bin/cat hello.txt')
+ assert rc == 0
+ # module_utils function. On py3 it returns text and py2 it returns
+ # bytes because it's returning native strings
+ assert stdout == 'hello'
+
+ @pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+ def test_utf8_output(self, mocker, rc_am):
+ rc_am._subprocess._output = {mocker.sentinel.stdout:
+ SpecialBytesIO(u'Žarn§'.encode('utf-8'),
+ fh=mocker.sentinel.stdout),
+ mocker.sentinel.stderr:
+ SpecialBytesIO(u'لرئيسية'.encode('utf-8'),
+ fh=mocker.sentinel.stderr)}
+ (rc, stdout, stderr) = rc_am.run_command('/bin/something_ugly')
+ assert rc == 0
+ # module_utils function. On py3 it returns text and py2 it returns
+ # bytes because it's returning native strings
+ assert stdout == to_native(u'Žarn§')
+ assert stderr == to_native(u'لرئيسية')
+
+
+@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
+def test_run_command_fds(mocker, rc_am):
+ subprocess_mock = mocker.patch('ansible.module_utils.basic.subprocess')
+ subprocess_mock.Popen.side_effect = AssertionError
+
+ try:
+ rc_am.run_command('synchronize', pass_fds=(101, 42))
+ except SystemExit:
+ pass
+
+ if PY2:
+ assert subprocess_mock.Popen.call_args[1]['close_fds'] is False
+ assert 'pass_fds' not in subprocess_mock.Popen.call_args[1]
+
+ else:
+ assert subprocess_mock.Popen.call_args[1]['pass_fds'] == (101, 42)
+ assert subprocess_mock.Popen.call_args[1]['close_fds'] is True
diff --git a/test/units/module_utils/basic/test_safe_eval.py b/test/units/module_utils/basic/test_safe_eval.py
new file mode 100644
index 0000000..e8538ca
--- /dev/null
+++ b/test/units/module_utils/basic/test_safe_eval.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# (c) 2015-2017, Toshio Kuratomi <tkuratomi@ansible.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from itertools import chain
+import pytest
+
+
+# Strings that should be converted into a typed value
+VALID_STRINGS = (
+ ("'a'", 'a'),
+ ("'1'", '1'),
+ ("1", 1),
+ ("True", True),
+ ("False", False),
+ ("{}", {}),
+)
+
+# Passing things that aren't strings should just return the object
+NONSTRINGS = (
+ ({'a': 1}, {'a': 1}),
+)
+
+# These strings are not basic types. For security, these should not be
+# executed. We return the same string and get an exception for some
+INVALID_STRINGS = (
+ ("a=1", "a=1", SyntaxError),
+ ("a.foo()", "a.foo()", None),
+ ("import foo", "import foo", None),
+ ("__import__('foo')", "__import__('foo')", ValueError),
+)
+
+
+@pytest.mark.parametrize('code, expected, stdin',
+ ((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)),
+ indirect=['stdin'])
+def test_simple_types(am, code, expected):
+ # test some basic usage for various types
+ assert am.safe_eval(code) == expected
+
+
+@pytest.mark.parametrize('code, expected, stdin',
+ ((c, e, {}) for c, e in chain(VALID_STRINGS, NONSTRINGS)),
+ indirect=['stdin'])
+def test_simple_types_with_exceptions(am, code, expected):
+ # Test simple types with exceptions requested
+ assert am.safe_eval(code, include_exceptions=True), (expected, None)
+
+
+@pytest.mark.parametrize('code, expected, stdin',
+ ((c, e, {}) for c, e, dummy in INVALID_STRINGS),
+ indirect=['stdin'])
+def test_invalid_strings(am, code, expected):
+ assert am.safe_eval(code) == expected
+
+
+@pytest.mark.parametrize('code, expected, exception, stdin',
+ ((c, e, ex, {}) for c, e, ex in INVALID_STRINGS),
+ indirect=['stdin'])
+def test_invalid_strings_with_exceptions(am, code, expected, exception):
+ res = am.safe_eval(code, include_exceptions=True)
+ assert res[0] == expected
+ if exception is None:
+ assert res[1] == exception
+ else:
+ assert type(res[1]) == exception
diff --git a/test/units/module_utils/basic/test_sanitize_keys.py b/test/units/module_utils/basic/test_sanitize_keys.py
new file mode 100644
index 0000000..180f866
--- /dev/null
+++ b/test/units/module_utils/basic/test_sanitize_keys.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+# (c) 2020, Red Hat
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+from ansible.module_utils.basic import sanitize_keys
+
+
+def test_sanitize_keys_non_dict_types():
+ """ Test that non-dict-like objects return the same data. """
+
+ type_exception = 'Unsupported type for key sanitization.'
+ no_log_strings = set()
+
+ assert 'string value' == sanitize_keys('string value', no_log_strings)
+
+ assert sanitize_keys(None, no_log_strings) is None
+
+ assert set(['x', 'y']) == sanitize_keys(set(['x', 'y']), no_log_strings)
+
+ assert not sanitize_keys(False, no_log_strings)
+
+
+def _run_comparison(obj):
+ no_log_strings = set(['secret', 'password'])
+
+ ret = sanitize_keys(obj, no_log_strings)
+
+ expected = [
+ None,
+ True,
+ 100,
+ "some string",
+ set([1, 2]),
+ [1, 2],
+
+ {'key1': ['value1a', 'value1b'],
+ 'some-********': 'value-for-some-password',
+ 'key2': {'key3': set(['value3a', 'value3b']),
+ 'i-have-a-********': {'********-********': 'value-for-secret-password', 'key4': 'value4'}
+ }
+ },
+
+ {'foo': [{'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER': 1}]}
+ ]
+
+ assert ret == expected
+
+
+def test_sanitize_keys_dict():
+ """ Test that santize_keys works with a dict. """
+
+ d = [
+ None,
+ True,
+ 100,
+ "some string",
+ set([1, 2]),
+ [1, 2],
+
+ {'key1': ['value1a', 'value1b'],
+ 'some-password': 'value-for-some-password',
+ 'key2': {'key3': set(['value3a', 'value3b']),
+ 'i-have-a-secret': {'secret-password': 'value-for-secret-password', 'key4': 'value4'}
+ }
+ },
+
+ {'foo': [{'secret': 1}]}
+ ]
+
+ _run_comparison(d)
+
+
+def test_sanitize_keys_with_ignores():
+ """ Test that we can actually ignore keys. """
+
+ no_log_strings = set(['secret', 'rc'])
+ ignore_keys = set(['changed', 'rc', 'status'])
+
+ value = {'changed': True,
+ 'rc': 0,
+ 'test-rc': 1,
+ 'another-secret': 2,
+ 'status': 'okie dokie'}
+
+ # We expect to change 'test-rc' but NOT 'rc'.
+ expected = {'changed': True,
+ 'rc': 0,
+ 'test-********': 1,
+ 'another-********': 2,
+ 'status': 'okie dokie'}
+
+ ret = sanitize_keys(value, no_log_strings, ignore_keys)
+ assert ret == expected
diff --git a/test/units/module_utils/basic/test_selinux.py b/test/units/module_utils/basic/test_selinux.py
new file mode 100644
index 0000000..d855768
--- /dev/null
+++ b/test/units/module_utils/basic/test_selinux.py
@@ -0,0 +1,190 @@
+# -*- coding: utf-8 -*-
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+import errno
+import json
+import pytest
+
+from units.compat.mock import mock_open, patch
+
+from ansible.module_utils import basic
+from ansible.module_utils.common.text.converters import to_bytes
+from ansible.module_utils.six.moves import builtins
+
+
+@pytest.fixture
+def no_args_module_exec():
+ with patch.object(basic, '_ANSIBLE_ARGS', b'{"ANSIBLE_MODULE_ARGS": {}}'):
+ yield # we're patching the global module object, so nothing to yield
+
+
+def no_args_module(selinux_enabled=None, selinux_mls_enabled=None):
+ am = basic.AnsibleModule(argument_spec={})
+ # just dirty-patch the wrappers on the object instance since it's short-lived
+ if isinstance(selinux_enabled, bool):
+ patch.object(am, 'selinux_enabled', return_value=selinux_enabled).start()
+ if isinstance(selinux_mls_enabled, bool):
+ patch.object(am, 'selinux_mls_enabled', return_value=selinux_mls_enabled).start()
+ return am
+
+
+# test AnsibleModule selinux wrapper methods
+@pytest.mark.usefixtures('no_args_module_exec')
+class TestSELinuxMU:
+ def test_selinux_enabled(self):
+ # test selinux unavailable
+ # selinux unavailable, should return false
+ with patch.object(basic, 'HAVE_SELINUX', False):
+ assert no_args_module().selinux_enabled() is False
+
+ # test selinux present/not-enabled
+ disabled_mod = no_args_module()
+ with patch('ansible.module_utils.compat.selinux.is_selinux_enabled', return_value=0):
+ assert disabled_mod.selinux_enabled() is False
+ # ensure value is cached (same answer after unpatching)
+ assert disabled_mod.selinux_enabled() is False
+ # and present / enabled
+ enabled_mod = no_args_module()
+ with patch('ansible.module_utils.compat.selinux.is_selinux_enabled', return_value=1):
+ assert enabled_mod.selinux_enabled() is True
+ # ensure value is cached (same answer after unpatching)
+ assert enabled_mod.selinux_enabled() is True
+
+ def test_selinux_mls_enabled(self):
+ # selinux unavailable, should return false
+ with patch.object(basic, 'HAVE_SELINUX', False):
+ assert no_args_module().selinux_mls_enabled() is False
+ # selinux disabled, should return false
+ with patch('ansible.module_utils.compat.selinux.is_selinux_mls_enabled', return_value=0):
+ assert no_args_module(selinux_enabled=False).selinux_mls_enabled() is False
+ # selinux enabled, should pass through the value of is_selinux_mls_enabled
+ with patch('ansible.module_utils.compat.selinux.is_selinux_mls_enabled', return_value=1):
+ assert no_args_module(selinux_enabled=True).selinux_mls_enabled() is True
+
+ def test_selinux_initial_context(self):
+ # selinux missing/disabled/enabled sans MLS is 3-element None
+ assert no_args_module(selinux_enabled=False, selinux_mls_enabled=False).selinux_initial_context() == [None, None, None]
+ assert no_args_module(selinux_enabled=True, selinux_mls_enabled=False).selinux_initial_context() == [None, None, None]
+ # selinux enabled with MLS is 4-element None
+ assert no_args_module(selinux_enabled=True, selinux_mls_enabled=True).selinux_initial_context() == [None, None, None, None]
+
+ def test_selinux_default_context(self):
+ # selinux unavailable
+ with patch.object(basic, 'HAVE_SELINUX', False):
+ assert no_args_module().selinux_default_context(path='/foo/bar') == [None, None, None]
+
+ am = no_args_module(selinux_enabled=True, selinux_mls_enabled=True)
+ # matchpathcon success
+ with patch('ansible.module_utils.compat.selinux.matchpathcon', return_value=[0, 'unconfined_u:object_r:default_t:s0']):
+ assert am.selinux_default_context(path='/foo/bar') == ['unconfined_u', 'object_r', 'default_t', 's0']
+
+ # matchpathcon fail (return initial context value)
+ with patch('ansible.module_utils.compat.selinux.matchpathcon', return_value=[-1, '']):
+ assert am.selinux_default_context(path='/foo/bar') == [None, None, None, None]
+
+ # matchpathcon OSError
+ with patch('ansible.module_utils.compat.selinux.matchpathcon', side_effect=OSError):
+ assert am.selinux_default_context(path='/foo/bar') == [None, None, None, None]
+
+ def test_selinux_context(self):
+ # selinux unavailable
+ with patch.object(basic, 'HAVE_SELINUX', False):
+ assert no_args_module().selinux_context(path='/foo/bar') == [None, None, None]
+
+ am = no_args_module(selinux_enabled=True, selinux_mls_enabled=True)
+ # lgetfilecon_raw passthru
+ with patch('ansible.module_utils.compat.selinux.lgetfilecon_raw', return_value=[0, 'unconfined_u:object_r:default_t:s0']):
+ assert am.selinux_context(path='/foo/bar') == ['unconfined_u', 'object_r', 'default_t', 's0']
+
+ # lgetfilecon_raw returned a failure
+ with patch('ansible.module_utils.compat.selinux.lgetfilecon_raw', return_value=[-1, '']):
+ assert am.selinux_context(path='/foo/bar') == [None, None, None, None]
+
+ # lgetfilecon_raw OSError (should bomb the module)
+ with patch('ansible.module_utils.compat.selinux.lgetfilecon_raw', side_effect=OSError(errno.ENOENT, 'NotFound')):
+ with pytest.raises(SystemExit):
+ am.selinux_context(path='/foo/bar')
+
+ with patch('ansible.module_utils.compat.selinux.lgetfilecon_raw', side_effect=OSError()):
+ with pytest.raises(SystemExit):
+ am.selinux_context(path='/foo/bar')
+
+ def test_is_special_selinux_path(self):
+ args = to_bytes(json.dumps(dict(ANSIBLE_MODULE_ARGS={'_ansible_selinux_special_fs': "nfs,nfsd,foos",
+ '_ansible_remote_tmp': "/tmp",
+ '_ansible_keep_remote_files': False})))
+
+ with patch.object(basic, '_ANSIBLE_ARGS', args):
+ am = basic.AnsibleModule(
+ argument_spec=dict(),
+ )
+
+ def _mock_find_mount_point(path):
+ if path.startswith('/some/path'):
+ return '/some/path'
+ elif path.startswith('/weird/random/fstype'):
+ return '/weird/random/fstype'
+ return '/'
+
+ am.find_mount_point = _mock_find_mount_point
+ am.selinux_context = lambda path: ['foo_u', 'foo_r', 'foo_t', 's0']
+
+ m = mock_open()
+ m.side_effect = OSError
+
+ with patch.object(builtins, 'open', m, create=True):
+ assert am.is_special_selinux_path('/some/path/that/should/be/nfs') == (False, None)
+
+ mount_data = [
+ '/dev/disk1 / ext4 rw,seclabel,relatime,data=ordered 0 0\n',
+ '10.1.1.1:/path/to/nfs /some/path nfs ro 0 0\n',
+ 'whatever /weird/random/fstype foos rw 0 0\n',
+ ]
+
+ # mock_open has a broken readlines() implementation apparently...
+ # this should work by default but doesn't, so we fix it
+ m = mock_open(read_data=''.join(mount_data))
+ m.return_value.readlines.return_value = mount_data
+
+ with patch.object(builtins, 'open', m, create=True):
+ assert am.is_special_selinux_path('/some/random/path') == (False, None)
+ assert am.is_special_selinux_path('/some/path/that/should/be/nfs') == (True, ['foo_u', 'foo_r', 'foo_t', 's0'])
+ assert am.is_special_selinux_path('/weird/random/fstype/path') == (True, ['foo_u', 'foo_r', 'foo_t', 's0'])
+
+ def test_set_context_if_different(self):
+ am = no_args_module(selinux_enabled=False)
+ assert am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) is True
+ assert am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False) is False
+
+ am = no_args_module(selinux_enabled=True, selinux_mls_enabled=True)
+ am.selinux_context = lambda path: ['bar_u', 'bar_r', None, None]
+ am.is_special_selinux_path = lambda path: (False, None)
+
+ with patch('ansible.module_utils.compat.selinux.lsetfilecon', return_value=0) as m:
+ assert am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False) is True
+ m.assert_called_with('/path/to/file', 'foo_u:foo_r:foo_t:s0')
+ m.reset_mock()
+ am.check_mode = True
+ assert am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False) is True
+ assert not m.called
+ am.check_mode = False
+
+ with patch('ansible.module_utils.compat.selinux.lsetfilecon', return_value=1):
+ with pytest.raises(SystemExit):
+ am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True)
+
+ with patch('ansible.module_utils.compat.selinux.lsetfilecon', side_effect=OSError):
+ with pytest.raises(SystemExit):
+ am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True)
+
+ am.is_special_selinux_path = lambda path: (True, ['sp_u', 'sp_r', 'sp_t', 's0'])
+
+ with patch('ansible.module_utils.compat.selinux.lsetfilecon', return_value=0) as m:
+ assert am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False) is True
+ m.assert_called_with('/path/to/file', 'sp_u:sp_r:sp_t:s0')
diff --git a/test/units/module_utils/basic/test_set_cwd.py b/test/units/module_utils/basic/test_set_cwd.py
new file mode 100644
index 0000000..159236b
--- /dev/null
+++ b/test/units/module_utils/basic/test_set_cwd.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2018 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+import os
+import shutil
+import tempfile
+
+import pytest
+
+from units.compat.mock import patch, MagicMock
+from ansible.module_utils._text import to_bytes
+
+from ansible.module_utils import basic
+
+
+class TestAnsibleModuleSetCwd:
+
+ def test_set_cwd(self, monkeypatch):
+
+ '''make sure /tmp is used'''
+
+ def mock_getcwd():
+ return '/tmp'
+
+ def mock_access(path, perm):
+ return True
+
+ def mock_chdir(path):
+ pass
+
+ monkeypatch.setattr(os, 'getcwd', mock_getcwd)
+ monkeypatch.setattr(os, 'access', mock_access)
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}})))
+ with patch('time.time', return_value=42):
+ am = basic.AnsibleModule(argument_spec={})
+
+ result = am._set_cwd()
+ assert result == '/tmp'
+
+ def test_set_cwd_unreadable_use_self_tmpdir(self, monkeypatch):
+
+ '''pwd is not readable, use instance's tmpdir property'''
+
+ def mock_getcwd():
+ return '/tmp'
+
+ def mock_access(path, perm):
+ if path == '/tmp' and perm == 4:
+ return False
+ return True
+
+ def mock_expandvars(var):
+ if var == '$HOME':
+ return '/home/foobar'
+ return var
+
+ def mock_gettempdir():
+ return '/tmp/testdir'
+
+ def mock_chdir(path):
+ if path == '/tmp':
+ raise Exception()
+ return
+
+ monkeypatch.setattr(os, 'getcwd', mock_getcwd)
+ monkeypatch.setattr(os, 'chdir', mock_chdir)
+ monkeypatch.setattr(os, 'access', mock_access)
+ monkeypatch.setattr(os.path, 'expandvars', mock_expandvars)
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}})))
+ with patch('time.time', return_value=42):
+ am = basic.AnsibleModule(argument_spec={})
+
+ am._tmpdir = '/tmp2'
+ result = am._set_cwd()
+ assert result == am._tmpdir
+
+ def test_set_cwd_unreadable_use_home(self, monkeypatch):
+
+ '''cwd and instance tmpdir are unreadable, use home'''
+
+ def mock_getcwd():
+ return '/tmp'
+
+ def mock_access(path, perm):
+ if path in ['/tmp', '/tmp2'] and perm == 4:
+ return False
+ return True
+
+ def mock_expandvars(var):
+ if var == '$HOME':
+ return '/home/foobar'
+ return var
+
+ def mock_gettempdir():
+ return '/tmp/testdir'
+
+ def mock_chdir(path):
+ if path == '/tmp':
+ raise Exception()
+ return
+
+ monkeypatch.setattr(os, 'getcwd', mock_getcwd)
+ monkeypatch.setattr(os, 'chdir', mock_chdir)
+ monkeypatch.setattr(os, 'access', mock_access)
+ monkeypatch.setattr(os.path, 'expandvars', mock_expandvars)
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}})))
+ with patch('time.time', return_value=42):
+ am = basic.AnsibleModule(argument_spec={})
+
+ am._tmpdir = '/tmp2'
+ result = am._set_cwd()
+ assert result == '/home/foobar'
+
+ def test_set_cwd_unreadable_use_gettempdir(self, monkeypatch):
+
+ '''fallback to tempfile.gettempdir'''
+
+ thisdir = None
+
+ def mock_getcwd():
+ return '/tmp'
+
+ def mock_access(path, perm):
+ if path in ['/tmp', '/tmp2', '/home/foobar'] and perm == 4:
+ return False
+ return True
+
+ def mock_expandvars(var):
+ if var == '$HOME':
+ return '/home/foobar'
+ return var
+
+ def mock_gettempdir():
+ return '/tmp3'
+
+ def mock_chdir(path):
+ if path == '/tmp':
+ raise Exception()
+ thisdir = path
+
+ monkeypatch.setattr(os, 'getcwd', mock_getcwd)
+ monkeypatch.setattr(os, 'chdir', mock_chdir)
+ monkeypatch.setattr(os, 'access', mock_access)
+ monkeypatch.setattr(os.path, 'expandvars', mock_expandvars)
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}})))
+ with patch('time.time', return_value=42):
+ am = basic.AnsibleModule(argument_spec={})
+
+ am._tmpdir = '/tmp2'
+ monkeypatch.setattr(tempfile, 'gettempdir', mock_gettempdir)
+ result = am._set_cwd()
+ assert result == '/tmp3'
+
+ def test_set_cwd_unreadable_use_None(self, monkeypatch):
+
+ '''all paths are unreable, should return None and not an exception'''
+
+ def mock_getcwd():
+ return '/tmp'
+
+ def mock_access(path, perm):
+ if path in ['/tmp', '/tmp2', '/tmp3', '/home/foobar'] and perm == 4:
+ return False
+ return True
+
+ def mock_expandvars(var):
+ if var == '$HOME':
+ return '/home/foobar'
+ return var
+
+ def mock_gettempdir():
+ return '/tmp3'
+
+ def mock_chdir(path):
+ if path == '/tmp':
+ raise Exception()
+
+ monkeypatch.setattr(os, 'getcwd', mock_getcwd)
+ monkeypatch.setattr(os, 'chdir', mock_chdir)
+ monkeypatch.setattr(os, 'access', mock_access)
+ monkeypatch.setattr(os.path, 'expandvars', mock_expandvars)
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': {}})))
+ with patch('time.time', return_value=42):
+ am = basic.AnsibleModule(argument_spec={})
+
+ am._tmpdir = '/tmp2'
+ monkeypatch.setattr(tempfile, 'gettempdir', mock_gettempdir)
+ result = am._set_cwd()
+ assert result is None
diff --git a/test/units/module_utils/basic/test_set_mode_if_different.py b/test/units/module_utils/basic/test_set_mode_if_different.py
new file mode 100644
index 0000000..5fec331
--- /dev/null
+++ b/test/units/module_utils/basic/test_set_mode_if_different.py
@@ -0,0 +1,190 @@
+# -*- coding: utf-8 -*-
+# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import errno
+import os
+
+from itertools import product
+
+try:
+ import builtins
+except ImportError:
+ import __builtin__ as builtins
+
+import pytest
+
+
+SYNONYMS_0660 = (
+ 0o660,
+ '0o660',
+ '660',
+ 'u+rw-x,g+rw-x,o-rwx',
+ 'u=rw,g=rw,o-rwx',
+)
+
+
+@pytest.fixture
+def mock_stats(mocker):
+ mock_stat1 = mocker.MagicMock()
+ mock_stat1.st_mode = 0o444
+ mock_stat2 = mocker.MagicMock()
+ mock_stat2.st_mode = 0o660
+ yield {"before": mock_stat1, "after": mock_stat2}
+
+
+@pytest.fixture
+def am_check_mode(am):
+ am.check_mode = True
+ yield am
+ am.check_mode = False
+
+
+@pytest.fixture
+def mock_lchmod(mocker):
+ m_lchmod = mocker.patch('ansible.module_utils.basic.os.lchmod', return_value=None, create=True)
+ yield m_lchmod
+
+
+@pytest.mark.parametrize('previous_changes, check_mode, exists, stdin',
+ product((True, False), (True, False), (True, False), ({},)),
+ indirect=['stdin'])
+def test_no_mode_given_returns_previous_changes(am, mock_stats, mock_lchmod, mocker, previous_changes, check_mode, exists):
+ am.check_mode = check_mode
+ mocker.patch('os.lstat', side_effect=[mock_stats['before']])
+ m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True)
+ m_path_exists = mocker.patch('os.path.exists', return_value=exists)
+
+ assert am.set_mode_if_different('/path/to/file', None, previous_changes) == previous_changes
+ assert not m_lchmod.called
+ assert not m_path_exists.called
+
+
+@pytest.mark.parametrize('mode, check_mode, stdin',
+ product(SYNONYMS_0660, (True, False), ({},)),
+ indirect=['stdin'])
+def test_mode_changed_to_0660(am, mock_stats, mocker, mode, check_mode):
+ # Note: This is for checking that all the different ways of specifying
+ # 0660 mode work. It cannot be used to check that setting a mode that is
+ # not equivalent to 0660 works.
+ am.check_mode = check_mode
+ mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after'], mock_stats['after']])
+ m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True)
+ mocker.patch('os.path.exists', return_value=True)
+
+ assert am.set_mode_if_different('/path/to/file', mode, False)
+ if check_mode:
+ assert not m_lchmod.called
+ else:
+ m_lchmod.assert_called_with(b'/path/to/file', 0o660)
+
+
+@pytest.mark.parametrize('mode, check_mode, stdin',
+ product(SYNONYMS_0660, (True, False), ({},)),
+ indirect=['stdin'])
+def test_mode_unchanged_when_already_0660(am, mock_stats, mocker, mode, check_mode):
+ # Note: This is for checking that all the different ways of specifying
+ # 0660 mode work. It cannot be used to check that setting a mode that is
+ # not equivalent to 0660 works.
+ am.check_mode = check_mode
+ mocker.patch('os.lstat', side_effect=[mock_stats['after'], mock_stats['after'], mock_stats['after']])
+ m_lchmod = mocker.patch('os.lchmod', return_value=None, create=True)
+ mocker.patch('os.path.exists', return_value=True)
+
+ assert not am.set_mode_if_different('/path/to/file', mode, False)
+ assert not m_lchmod.called
+
+
+@pytest.mark.parametrize('mode, stdin', product(SYNONYMS_0660, ({},)), indirect=['stdin'])
+def test_mode_changed_to_0660_check_mode_no_file(am, mocker, mode):
+ am.check_mode = True
+ mocker.patch('os.path.exists', return_value=False)
+ assert am.set_mode_if_different('/path/to/file', mode, False)
+
+
+@pytest.mark.parametrize('check_mode, stdin',
+ product((True, False), ({},)),
+ indirect=['stdin'])
+def test_missing_lchmod_is_not_link(am, mock_stats, mocker, monkeypatch, check_mode):
+ """Some platforms have lchmod (*BSD) others do not (Linux)"""
+
+ am.check_mode = check_mode
+ original_hasattr = hasattr
+
+ monkeypatch.delattr(os, 'lchmod', raising=False)
+
+ mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']])
+ mocker.patch('os.path.islink', return_value=False)
+ mocker.patch('os.path.exists', return_value=True)
+ m_chmod = mocker.patch('os.chmod', return_value=None)
+
+ assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False)
+ if check_mode:
+ assert not m_chmod.called
+ else:
+ m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660)
+
+
+@pytest.mark.parametrize('check_mode, stdin',
+ product((True, False), ({},)),
+ indirect=['stdin'])
+def test_missing_lchmod_is_link(am, mock_stats, mocker, monkeypatch, check_mode):
+ """Some platforms have lchmod (*BSD) others do not (Linux)"""
+
+ am.check_mode = check_mode
+ original_hasattr = hasattr
+
+ monkeypatch.delattr(os, 'lchmod', raising=False)
+
+ mocker.patch('os.lstat', side_effect=[mock_stats['before'], mock_stats['after']])
+ mocker.patch('os.path.islink', return_value=True)
+ mocker.patch('os.path.exists', return_value=True)
+ m_chmod = mocker.patch('os.chmod', return_value=None)
+ mocker.patch('os.stat', return_value=mock_stats['after'])
+
+ assert am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False)
+ if check_mode:
+ assert not m_chmod.called
+ else:
+ m_chmod.assert_called_with(b'/path/to/file/no_lchmod', 0o660)
+
+ mocker.resetall()
+ mocker.stopall()
+
+
+@pytest.mark.parametrize('stdin,',
+ ({},),
+ indirect=['stdin'])
+def test_missing_lchmod_is_link_in_sticky_dir(am, mock_stats, mocker):
+ """Some platforms have lchmod (*BSD) others do not (Linux)"""
+
+ am.check_mode = False
+ original_hasattr = hasattr
+
+ def _hasattr(obj, name):
+ if obj == os and name == 'lchmod':
+ return False
+ return original_hasattr(obj, name)
+
+ mock_lstat = mocker.MagicMock()
+ mock_lstat.st_mode = 0o777
+
+ mocker.patch('os.lstat', side_effect=[mock_lstat, mock_lstat])
+ mocker.patch.object(builtins, 'hasattr', side_effect=_hasattr)
+ mocker.patch('os.path.islink', return_value=True)
+ mocker.patch('os.path.exists', return_value=True)
+ m_stat = mocker.patch('os.stat', side_effect=OSError(errno.EACCES, 'Permission denied'))
+ m_chmod = mocker.patch('os.chmod', return_value=None)
+
+ # not changed: can't set mode on symbolic links
+ assert not am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False)
+ m_stat.assert_called_with(b'/path/to/file/no_lchmod')
+ m_chmod.assert_not_called()
+
+ mocker.resetall()
+ mocker.stopall()
diff --git a/test/units/module_utils/basic/test_tmpdir.py b/test/units/module_utils/basic/test_tmpdir.py
new file mode 100644
index 0000000..818cb9b
--- /dev/null
+++ b/test/units/module_utils/basic/test_tmpdir.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2018 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+import os
+import shutil
+import tempfile
+
+import pytest
+
+from units.compat.mock import patch, MagicMock
+from ansible.module_utils._text import to_bytes
+
+from ansible.module_utils import basic
+
+
+class TestAnsibleModuleTmpDir:
+
+ DATA = (
+ (
+ {
+ "_ansible_tmpdir": "/path/to/dir",
+ "_ansible_remote_tmp": "/path/tmpdir",
+ "_ansible_keep_remote_files": False,
+ },
+ True,
+ "/path/to/dir"
+ ),
+ (
+ {
+ "_ansible_tmpdir": None,
+ "_ansible_remote_tmp": "/path/tmpdir",
+ "_ansible_keep_remote_files": False
+ },
+ False,
+ "/path/tmpdir/ansible-moduletmp-42-"
+ ),
+ (
+ {
+ "_ansible_tmpdir": None,
+ "_ansible_remote_tmp": "/path/tmpdir",
+ "_ansible_keep_remote_files": False
+ },
+ True,
+ "/path/tmpdir/ansible-moduletmp-42-"
+ ),
+ (
+ {
+ "_ansible_tmpdir": None,
+ "_ansible_remote_tmp": "$HOME/.test",
+ "_ansible_keep_remote_files": False
+ },
+ False,
+ os.path.join(os.environ['HOME'], ".test/ansible-moduletmp-42-")
+ ),
+ )
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ # pylint: disable=undefined-variable
+ @pytest.mark.parametrize('args, expected, stat_exists', ((s, e, t) for s, t, e in DATA))
+ def test_tmpdir_property(self, monkeypatch, args, expected, stat_exists):
+ makedirs = {'called': False}
+
+ def mock_mkdtemp(prefix, dir):
+ return os.path.join(dir, prefix)
+
+ def mock_makedirs(path, mode):
+ makedirs['called'] = True
+ makedirs['path'] = path
+ makedirs['mode'] = mode
+ return
+
+ monkeypatch.setattr(tempfile, 'mkdtemp', mock_mkdtemp)
+ monkeypatch.setattr(os.path, 'exists', lambda x: stat_exists)
+ monkeypatch.setattr(os, 'makedirs', mock_makedirs)
+ monkeypatch.setattr(shutil, 'rmtree', lambda x: None)
+ monkeypatch.setattr(basic, '_ANSIBLE_ARGS', to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': args})))
+
+ with patch('time.time', return_value=42):
+ am = basic.AnsibleModule(argument_spec={})
+ actual_tmpdir = am.tmpdir
+
+ assert actual_tmpdir == expected
+
+ # verify subsequent calls always produces the same tmpdir
+ assert am.tmpdir == actual_tmpdir
+
+ if not stat_exists:
+ assert makedirs['called']
+ expected = os.path.expanduser(os.path.expandvars(am._remote_tmp))
+ assert makedirs['path'] == expected
+ assert makedirs['mode'] == 0o700
+
+ @pytest.mark.parametrize('stdin', ({"_ansible_tmpdir": None,
+ "_ansible_remote_tmp": "$HOME/.test",
+ "_ansible_keep_remote_files": True},),
+ indirect=['stdin'])
+ def test_tmpdir_makedirs_failure(self, am, monkeypatch):
+
+ mock_mkdtemp = MagicMock(return_value="/tmp/path")
+ mock_makedirs = MagicMock(side_effect=OSError("Some OS Error here"))
+
+ monkeypatch.setattr(tempfile, 'mkdtemp', mock_mkdtemp)
+ monkeypatch.setattr(os.path, 'exists', lambda x: False)
+ monkeypatch.setattr(os, 'makedirs', mock_makedirs)
+
+ actual = am.tmpdir
+ assert actual == "/tmp/path"
+ assert mock_makedirs.call_args[0] == (os.path.expanduser(os.path.expandvars("$HOME/.test")),)
+ assert mock_makedirs.call_args[1] == {"mode": 0o700}
+
+ # because makedirs failed the dir should be None so it uses the System tmp
+ assert mock_mkdtemp.call_args[1]['dir'] is None
+ assert mock_mkdtemp.call_args[1]['prefix'].startswith("ansible-moduletmp-")