summaryrefslogtreecommitdiffstats
path: root/test/units/executor/module_common
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/executor/module_common')
-rw-r--r--test/units/executor/module_common/test_modify_module.py43
-rw-r--r--test/units/executor/module_common/test_module_common.py200
-rw-r--r--test/units/executor/module_common/test_recursive_finder.py130
3 files changed, 373 insertions, 0 deletions
diff --git a/test/units/executor/module_common/test_modify_module.py b/test/units/executor/module_common/test_modify_module.py
new file mode 100644
index 0000000..dceef76
--- /dev/null
+++ b/test/units/executor/module_common/test_modify_module.py
@@ -0,0 +1,43 @@
+# Copyright (c) 2018 Matt Martz <matt@sivel.net>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+# -*- coding: utf-8 -*-
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible.executor.module_common import modify_module
+from ansible.module_utils.six import PY2
+
+from test_module_common import templar
+
+
+FAKE_OLD_MODULE = b'''#!/usr/bin/python
+import sys
+print('{"result": "%s"}' % sys.executable)
+'''
+
+
+@pytest.fixture
+def fake_old_module_open(mocker):
+ m = mocker.mock_open(read_data=FAKE_OLD_MODULE)
+ if PY2:
+ mocker.patch('__builtin__.open', m)
+ else:
+ mocker.patch('builtins.open', m)
+
+# this test no longer makes sense, since a Python module will always either have interpreter discovery run or
+# an explicit interpreter passed (so we'll never default to the module shebang)
+# def test_shebang(fake_old_module_open, templar):
+# (data, style, shebang) = modify_module('fake_module', 'fake_path', {}, templar)
+# assert shebang == '#!/usr/bin/python'
+
+
+def test_shebang_task_vars(fake_old_module_open, templar):
+ task_vars = {
+ 'ansible_python_interpreter': '/usr/bin/python3'
+ }
+
+ (data, style, shebang) = modify_module('fake_module', 'fake_path', {}, templar, task_vars=task_vars)
+ assert shebang == '#!/usr/bin/python3'
diff --git a/test/units/executor/module_common/test_module_common.py b/test/units/executor/module_common/test_module_common.py
new file mode 100644
index 0000000..fa6add8
--- /dev/null
+++ b/test/units/executor/module_common/test_module_common.py
@@ -0,0 +1,200 @@
+# (c) 2017, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os.path
+
+import pytest
+
+import ansible.errors
+
+from ansible.executor import module_common as amc
+from ansible.executor.interpreter_discovery import InterpreterDiscoveryRequiredError
+from ansible.module_utils.six import PY2
+
+
+class TestStripComments:
+ def test_no_changes(self):
+ no_comments = u"""def some_code():
+ return False"""
+ assert amc._strip_comments(no_comments) == no_comments
+
+ def test_all_comments(self):
+ all_comments = u"""# This is a test
+ # Being as it is
+ # To be
+ """
+ assert amc._strip_comments(all_comments) == u""
+
+ def test_all_whitespace(self):
+ # Note: Do not remove the spaces on the blank lines below. They're
+ # test data to show that the lines get removed despite having spaces
+ # on them
+ all_whitespace = u"""
+
+
+
+\t\t\r\n
+ """ # nopep8
+ assert amc._strip_comments(all_whitespace) == u""
+
+ def test_somewhat_normal(self):
+ mixed = u"""#!/usr/bin/python
+
+# here we go
+def test(arg):
+ # this is a thing
+ thing = '# test'
+ return thing
+# End
+"""
+ mixed_results = u"""def test(arg):
+ thing = '# test'
+ return thing"""
+ assert amc._strip_comments(mixed) == mixed_results
+
+
+class TestSlurp:
+ def test_slurp_nonexistent(self, mocker):
+ mocker.patch('os.path.exists', side_effect=lambda x: False)
+ with pytest.raises(ansible.errors.AnsibleError):
+ amc._slurp('no_file')
+
+ def test_slurp_file(self, mocker):
+ mocker.patch('os.path.exists', side_effect=lambda x: True)
+ m = mocker.mock_open(read_data='This is a test')
+ if PY2:
+ mocker.patch('__builtin__.open', m)
+ else:
+ mocker.patch('builtins.open', m)
+ assert amc._slurp('some_file') == 'This is a test'
+
+ def test_slurp_file_with_newlines(self, mocker):
+ mocker.patch('os.path.exists', side_effect=lambda x: True)
+ m = mocker.mock_open(read_data='#!/usr/bin/python\ndef test(args):\nprint("hi")\n')
+ if PY2:
+ mocker.patch('__builtin__.open', m)
+ else:
+ mocker.patch('builtins.open', m)
+ assert amc._slurp('some_file') == '#!/usr/bin/python\ndef test(args):\nprint("hi")\n'
+
+
+@pytest.fixture
+def templar():
+ class FakeTemplar:
+ def template(self, template_string, *args, **kwargs):
+ return template_string
+
+ return FakeTemplar()
+
+
+class TestGetShebang:
+ """Note: We may want to change the API of this function in the future. It isn't a great API"""
+ def test_no_interpreter_set(self, templar):
+ # normally this would return /usr/bin/python, but so long as we're defaulting to auto python discovery, we'll get
+ # an InterpreterDiscoveryRequiredError here instead
+ with pytest.raises(InterpreterDiscoveryRequiredError):
+ amc._get_shebang(u'/usr/bin/python', {}, templar)
+
+ def test_python_interpreter(self, templar):
+ assert amc._get_shebang(u'/usr/bin/python3.8', {}, templar) == ('#!/usr/bin/python3.8', u'/usr/bin/python3.8')
+
+ def test_non_python_interpreter(self, templar):
+ assert amc._get_shebang(u'/usr/bin/ruby', {}, templar) == ('#!/usr/bin/ruby', u'/usr/bin/ruby')
+
+ def test_interpreter_set_in_task_vars(self, templar):
+ assert amc._get_shebang(u'/usr/bin/python', {u'ansible_python_interpreter': u'/usr/bin/pypy'}, templar) == \
+ (u'#!/usr/bin/pypy', u'/usr/bin/pypy')
+
+ def test_non_python_interpreter_in_task_vars(self, templar):
+ assert amc._get_shebang(u'/usr/bin/ruby', {u'ansible_ruby_interpreter': u'/usr/local/bin/ruby'}, templar) == \
+ (u'#!/usr/local/bin/ruby', u'/usr/local/bin/ruby')
+
+ def test_with_args(self, templar):
+ assert amc._get_shebang(u'/usr/bin/python', {u'ansible_python_interpreter': u'/usr/bin/python3'}, templar, args=('-tt', '-OO')) == \
+ (u'#!/usr/bin/python3 -tt -OO', u'/usr/bin/python3')
+
+ def test_python_via_env(self, templar):
+ assert amc._get_shebang(u'/usr/bin/python', {u'ansible_python_interpreter': u'/usr/bin/env python'}, templar) == \
+ (u'#!/usr/bin/env python', u'/usr/bin/env python')
+
+
+class TestDetectionRegexes:
+ ANSIBLE_MODULE_UTIL_STRINGS = (
+ # Absolute collection imports
+ b'import ansible_collections.my_ns.my_col.plugins.module_utils.my_util',
+ b'from ansible_collections.my_ns.my_col.plugins.module_utils import my_util',
+ b'from ansible_collections.my_ns.my_col.plugins.module_utils.my_util import my_func',
+ # Absolute core imports
+ b'import ansible.module_utils.basic',
+ b'from ansible.module_utils import basic',
+ b'from ansible.module_utils.basic import AnsibleModule',
+ # Relative imports
+ b'from ..module_utils import basic',
+ b'from .. module_utils import basic',
+ b'from ....module_utils import basic',
+ b'from ..module_utils.basic import AnsibleModule',
+ )
+ NOT_ANSIBLE_MODULE_UTIL_STRINGS = (
+ b'from ansible import release',
+ b'from ..release import __version__',
+ b'from .. import release',
+ b'from ansible.modules.system import ping',
+ b'from ansible_collecitons.my_ns.my_col.plugins.modules import function',
+ )
+
+ OFFSET = os.path.dirname(os.path.dirname(amc.__file__))
+ CORE_PATHS = (
+ ('%s/modules/from_role.py' % OFFSET, 'ansible/modules/from_role'),
+ ('%s/modules/system/ping.py' % OFFSET, 'ansible/modules/system/ping'),
+ ('%s/modules/cloud/amazon/s3.py' % OFFSET, 'ansible/modules/cloud/amazon/s3'),
+ )
+
+ COLLECTION_PATHS = (
+ ('/root/ansible_collections/ns/col/plugins/modules/ping.py',
+ 'ansible_collections/ns/col/plugins/modules/ping'),
+ ('/root/ansible_collections/ns/col/plugins/modules/subdir/ping.py',
+ 'ansible_collections/ns/col/plugins/modules/subdir/ping'),
+ )
+
+ @pytest.mark.parametrize('testcase', ANSIBLE_MODULE_UTIL_STRINGS)
+ def test_detect_new_style_python_module_re(self, testcase):
+ assert amc.NEW_STYLE_PYTHON_MODULE_RE.search(testcase)
+
+ @pytest.mark.parametrize('testcase', NOT_ANSIBLE_MODULE_UTIL_STRINGS)
+ def test_no_detect_new_style_python_module_re(self, testcase):
+ assert not amc.NEW_STYLE_PYTHON_MODULE_RE.search(testcase)
+
+ # pylint bug: https://github.com/PyCQA/pylint/issues/511
+ @pytest.mark.parametrize('testcase, result', CORE_PATHS) # pylint: disable=undefined-variable
+ def test_detect_core_library_path_re(self, testcase, result):
+ assert amc.CORE_LIBRARY_PATH_RE.search(testcase).group('path') == result
+
+ @pytest.mark.parametrize('testcase', (p[0] for p in COLLECTION_PATHS)) # pylint: disable=undefined-variable
+ def test_no_detect_core_library_path_re(self, testcase):
+ assert not amc.CORE_LIBRARY_PATH_RE.search(testcase)
+
+ @pytest.mark.parametrize('testcase, result', COLLECTION_PATHS) # pylint: disable=undefined-variable
+ def test_detect_collection_path_re(self, testcase, result):
+ assert amc.COLLECTION_PATH_RE.search(testcase).group('path') == result
+
+ @pytest.mark.parametrize('testcase', (p[0] for p in CORE_PATHS)) # pylint: disable=undefined-variable
+ def test_no_detect_collection_path_re(self, testcase):
+ assert not amc.COLLECTION_PATH_RE.search(testcase)
diff --git a/test/units/executor/module_common/test_recursive_finder.py b/test/units/executor/module_common/test_recursive_finder.py
new file mode 100644
index 0000000..8136a00
--- /dev/null
+++ b/test/units/executor/module_common/test_recursive_finder.py
@@ -0,0 +1,130 @@
+# (c) 2017, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import pytest
+import zipfile
+
+from collections import namedtuple
+from io import BytesIO
+
+import ansible.errors
+
+from ansible.executor.module_common import recursive_finder
+
+
+# These are the modules that are brought in by module_utils/basic.py This may need to be updated
+# when basic.py gains new imports
+# We will remove these when we modify AnsiBallZ to store its args in a separate file instead of in
+# basic.py
+
+MODULE_UTILS_BASIC_FILES = frozenset(('ansible/__init__.py',
+ 'ansible/module_utils/__init__.py',
+ 'ansible/module_utils/_text.py',
+ 'ansible/module_utils/basic.py',
+ 'ansible/module_utils/six/__init__.py',
+ 'ansible/module_utils/_text.py',
+ 'ansible/module_utils/common/_collections_compat.py',
+ 'ansible/module_utils/common/_json_compat.py',
+ 'ansible/module_utils/common/collections.py',
+ 'ansible/module_utils/common/parameters.py',
+ 'ansible/module_utils/common/warnings.py',
+ 'ansible/module_utils/parsing/convert_bool.py',
+ 'ansible/module_utils/common/__init__.py',
+ 'ansible/module_utils/common/file.py',
+ 'ansible/module_utils/common/locale.py',
+ 'ansible/module_utils/common/process.py',
+ 'ansible/module_utils/common/sys_info.py',
+ 'ansible/module_utils/common/text/__init__.py',
+ 'ansible/module_utils/common/text/converters.py',
+ 'ansible/module_utils/common/text/formatters.py',
+ 'ansible/module_utils/common/validation.py',
+ 'ansible/module_utils/common/_utils.py',
+ 'ansible/module_utils/common/arg_spec.py',
+ 'ansible/module_utils/compat/__init__.py',
+ 'ansible/module_utils/compat/_selectors2.py',
+ 'ansible/module_utils/compat/selectors.py',
+ 'ansible/module_utils/compat/selinux.py',
+ 'ansible/module_utils/distro/__init__.py',
+ 'ansible/module_utils/distro/_distro.py',
+ 'ansible/module_utils/errors.py',
+ 'ansible/module_utils/parsing/__init__.py',
+ 'ansible/module_utils/parsing/convert_bool.py',
+ 'ansible/module_utils/pycompat24.py',
+ 'ansible/module_utils/six/__init__.py',
+ ))
+
+ONLY_BASIC_FILE = frozenset(('ansible/module_utils/basic.py',))
+
+ANSIBLE_LIB = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), 'lib', 'ansible')
+
+
+@pytest.fixture
+def finder_containers():
+ FinderContainers = namedtuple('FinderContainers', ['zf'])
+
+ zipoutput = BytesIO()
+ zf = zipfile.ZipFile(zipoutput, mode='w', compression=zipfile.ZIP_STORED)
+
+ return FinderContainers(zf)
+
+
+class TestRecursiveFinder(object):
+ def test_no_module_utils(self, finder_containers):
+ name = 'ping'
+ data = b'#!/usr/bin/python\nreturn \'{\"changed\": false}\''
+ recursive_finder(name, os.path.join(ANSIBLE_LIB, 'modules', 'system', 'ping.py'), data, *finder_containers)
+ assert frozenset(finder_containers.zf.namelist()) == MODULE_UTILS_BASIC_FILES
+
+ def test_module_utils_with_syntax_error(self, finder_containers):
+ name = 'fake_module'
+ data = b'#!/usr/bin/python\ndef something(:\n pass\n'
+ with pytest.raises(ansible.errors.AnsibleError) as exec_info:
+ recursive_finder(name, os.path.join(ANSIBLE_LIB, 'modules', 'system', 'fake_module.py'), data, *finder_containers)
+ assert 'Unable to import fake_module due to invalid syntax' in str(exec_info.value)
+
+ def test_module_utils_with_identation_error(self, finder_containers):
+ name = 'fake_module'
+ data = b'#!/usr/bin/python\n def something():\n pass\n'
+ with pytest.raises(ansible.errors.AnsibleError) as exec_info:
+ recursive_finder(name, os.path.join(ANSIBLE_LIB, 'modules', 'system', 'fake_module.py'), data, *finder_containers)
+ assert 'Unable to import fake_module due to unexpected indent' in str(exec_info.value)
+
+ #
+ # Test importing six with many permutations because it is not a normal module
+ #
+ def test_from_import_six(self, finder_containers):
+ name = 'ping'
+ data = b'#!/usr/bin/python\nfrom ansible.module_utils import six'
+ recursive_finder(name, os.path.join(ANSIBLE_LIB, 'modules', 'system', 'ping.py'), data, *finder_containers)
+ assert frozenset(finder_containers.zf.namelist()) == frozenset(('ansible/module_utils/six/__init__.py', )).union(MODULE_UTILS_BASIC_FILES)
+
+ def test_import_six(self, finder_containers):
+ name = 'ping'
+ data = b'#!/usr/bin/python\nimport ansible.module_utils.six'
+ recursive_finder(name, os.path.join(ANSIBLE_LIB, 'modules', 'system', 'ping.py'), data, *finder_containers)
+ assert frozenset(finder_containers.zf.namelist()) == frozenset(('ansible/module_utils/six/__init__.py', )).union(MODULE_UTILS_BASIC_FILES)
+
+ def test_import_six_from_many_submodules(self, finder_containers):
+ name = 'ping'
+ data = b'#!/usr/bin/python\nfrom ansible.module_utils.six.moves.urllib.parse import urlparse'
+ recursive_finder(name, os.path.join(ANSIBLE_LIB, 'modules', 'system', 'ping.py'), data, *finder_containers)
+ assert frozenset(finder_containers.zf.namelist()) == frozenset(('ansible/module_utils/six/__init__.py',)).union(MODULE_UTILS_BASIC_FILES)