diff options
Diffstat (limited to 'test/units/plugins/connection')
-rw-r--r-- | test/units/plugins/connection/__init__.py | 0 | ||||
-rw-r--r-- | test/units/plugins/connection/test_connection.py | 163 | ||||
-rw-r--r-- | test/units/plugins/connection/test_local.py | 40 | ||||
-rw-r--r-- | test/units/plugins/connection/test_paramiko.py | 56 | ||||
-rw-r--r-- | test/units/plugins/connection/test_psrp.py | 233 | ||||
-rw-r--r-- | test/units/plugins/connection/test_ssh.py | 696 | ||||
-rw-r--r-- | test/units/plugins/connection/test_winrm.py | 443 |
7 files changed, 1631 insertions, 0 deletions
diff --git a/test/units/plugins/connection/__init__.py b/test/units/plugins/connection/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/units/plugins/connection/__init__.py diff --git a/test/units/plugins/connection/test_connection.py b/test/units/plugins/connection/test_connection.py new file mode 100644 index 0000000..38d6691 --- /dev/null +++ b/test/units/plugins/connection/test_connection.py @@ -0,0 +1,163 @@ +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from io import StringIO + +from units.compat import unittest +from ansible.playbook.play_context import PlayContext +from ansible.plugins.connection import ConnectionBase +from ansible.plugins.loader import become_loader + + +class TestConnectionBaseClass(unittest.TestCase): + + def setUp(self): + self.play_context = PlayContext() + self.play_context.prompt = ( + '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: ' + ) + self.in_stream = StringIO() + + def tearDown(self): + pass + + def test_subclass_error(self): + class ConnectionModule1(ConnectionBase): + pass + with self.assertRaises(TypeError): + ConnectionModule1() # pylint: disable=abstract-class-instantiated + + class ConnectionModule2(ConnectionBase): + def get(self, key): + super(ConnectionModule2, self).get(key) + + with self.assertRaises(TypeError): + ConnectionModule2() # pylint: disable=abstract-class-instantiated + + def test_subclass_success(self): + class ConnectionModule3(ConnectionBase): + + @property + def transport(self): + pass + + def _connect(self): + pass + + def exec_command(self): + pass + + def put_file(self): + pass + + def fetch_file(self): + pass + + def close(self): + pass + + self.assertIsInstance(ConnectionModule3(self.play_context, self.in_stream), ConnectionModule3) + + def test_check_password_prompt(self): + local = ( + b'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: \n' + b'BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq\n' + ) + + ssh_pipelining_vvvv = b''' +debug3: mux_master_read_cb: channel 1 packet type 0x10000002 len 251 +debug2: process_mux_new_session: channel 1: request tty 0, X 1, agent 1, subsys 0, term "xterm-256color", cmd "/bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0'", env 0 +debug3: process_mux_new_session: got fds stdin 9, stdout 10, stderr 11 +debug2: client_session2_setup: id 2 +debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0' +debug2: channel 2: request exec confirm 1 +debug2: channel 2: rcvd ext data 67 +[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: debug2: channel 2: written 67 to efd 11 +BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq +debug3: receive packet: type 98 +''' # noqa + + ssh_nopipelining_vvvv = b''' +debug3: mux_master_read_cb: channel 1 packet type 0x10000002 len 251 +debug2: process_mux_new_session: channel 1: request tty 1, X 1, agent 1, subsys 0, term "xterm-256color", cmd "/bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0'", env 0 +debug3: mux_client_request_session: session request sent +debug3: send packet: type 98 +debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0' +debug2: channel 2: request exec confirm 1 +debug2: exec request accepted on channel 2 +[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: debug3: receive packet: type 2 +debug3: Received SSH2_MSG_IGNORE +debug3: Received SSH2_MSG_IGNORE + +BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq +debug3: receive packet: type 98 +''' # noqa + + ssh_novvvv = ( + b'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: \n' + b'BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq\n' + ) + + dns_issue = ( + b'timeout waiting for privilege escalation password prompt:\n' + b'sudo: sudo: unable to resolve host tcloud014\n' + b'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: \n' + b'BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq\n' + ) + + nothing = b'' + + in_front = b''' +debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo +''' + + class ConnectionFoo(ConnectionBase): + + @property + def transport(self): + pass + + def _connect(self): + pass + + def exec_command(self): + pass + + def put_file(self): + pass + + def fetch_file(self): + pass + + def close(self): + pass + + c = ConnectionFoo(self.play_context, self.in_stream) + c.set_become_plugin(become_loader.get('sudo')) + c.become.prompt = '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: ' + + self.assertTrue(c.become.check_password_prompt(local)) + self.assertTrue(c.become.check_password_prompt(ssh_pipelining_vvvv)) + self.assertTrue(c.become.check_password_prompt(ssh_nopipelining_vvvv)) + self.assertTrue(c.become.check_password_prompt(ssh_novvvv)) + self.assertTrue(c.become.check_password_prompt(dns_issue)) + self.assertFalse(c.become.check_password_prompt(nothing)) + self.assertFalse(c.become.check_password_prompt(in_front)) diff --git a/test/units/plugins/connection/test_local.py b/test/units/plugins/connection/test_local.py new file mode 100644 index 0000000..e552585 --- /dev/null +++ b/test/units/plugins/connection/test_local.py @@ -0,0 +1,40 @@ +# +# (c) 2020 Red Hat Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from io import StringIO +import pytest + +from units.compat import unittest +from ansible.plugins.connection import local +from ansible.playbook.play_context import PlayContext + + +class TestLocalConnectionClass(unittest.TestCase): + + def test_local_connection_module(self): + play_context = PlayContext() + play_context.prompt = ( + '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: ' + ) + in_stream = StringIO() + + self.assertIsInstance(local.Connection(play_context, in_stream), local.Connection) diff --git a/test/units/plugins/connection/test_paramiko.py b/test/units/plugins/connection/test_paramiko.py new file mode 100644 index 0000000..dcf3177 --- /dev/null +++ b/test/units/plugins/connection/test_paramiko.py @@ -0,0 +1,56 @@ +# +# (c) 2020 Red Hat Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from io import StringIO +import pytest + +from ansible.plugins.connection import paramiko_ssh +from ansible.playbook.play_context import PlayContext + + +@pytest.fixture +def play_context(): + play_context = PlayContext() + play_context.prompt = ( + '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: ' + ) + + return play_context + + +@pytest.fixture() +def in_stream(): + return StringIO() + + +def test_paramiko_connection_module(play_context, in_stream): + assert isinstance( + paramiko_ssh.Connection(play_context, in_stream), + paramiko_ssh.Connection) + + +def test_paramiko_connect(play_context, in_stream, mocker): + mocker.patch.object(paramiko_ssh.Connection, '_connect_uncached') + connection = paramiko_ssh.Connection(play_context, in_stream)._connect() + + assert isinstance(connection, paramiko_ssh.Connection) + assert connection._connected is True diff --git a/test/units/plugins/connection/test_psrp.py b/test/units/plugins/connection/test_psrp.py new file mode 100644 index 0000000..38052e8 --- /dev/null +++ b/test/units/plugins/connection/test_psrp.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +# (c) 2018, Jordan Borean <jborean@redhat.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest +import sys + +from io import StringIO +from unittest.mock import MagicMock + +from ansible.playbook.play_context import PlayContext +from ansible.plugins.loader import connection_loader +from ansible.utils.display import Display + + +@pytest.fixture(autouse=True) +def psrp_connection(): + """Imports the psrp connection plugin with a mocked pypsrp module for testing""" + + # Take a snapshot of sys.modules before we manipulate it + orig_modules = sys.modules.copy() + try: + fake_pypsrp = MagicMock() + fake_pypsrp.FEATURES = [ + 'wsman_locale', + 'wsman_read_timeout', + 'wsman_reconnections', + ] + + fake_wsman = MagicMock() + fake_wsman.AUTH_KWARGS = { + "certificate": ["certificate_key_pem", "certificate_pem"], + "credssp": ["credssp_auth_mechanism", "credssp_disable_tlsv1_2", + "credssp_minimum_version"], + "negotiate": ["negotiate_delegate", "negotiate_hostname_override", + "negotiate_send_cbt", "negotiate_service"], + "mock": ["mock_test1", "mock_test2"], + } + + sys.modules["pypsrp"] = fake_pypsrp + sys.modules["pypsrp.complex_objects"] = MagicMock() + sys.modules["pypsrp.exceptions"] = MagicMock() + sys.modules["pypsrp.host"] = MagicMock() + sys.modules["pypsrp.powershell"] = MagicMock() + sys.modules["pypsrp.shell"] = MagicMock() + sys.modules["pypsrp.wsman"] = fake_wsman + sys.modules["requests.exceptions"] = MagicMock() + + from ansible.plugins.connection import psrp + + # Take a copy of the original import state vars before we set to an ok import + orig_has_psrp = psrp.HAS_PYPSRP + orig_psrp_imp_err = psrp.PYPSRP_IMP_ERR + + yield psrp + + psrp.HAS_PYPSRP = orig_has_psrp + psrp.PYPSRP_IMP_ERR = orig_psrp_imp_err + finally: + # Restore sys.modules back to our pre-shenanigans + sys.modules = orig_modules + + +class TestConnectionPSRP(object): + + OPTIONS_DATA = ( + # default options + ( + {'_extras': {}}, + { + '_psrp_auth': 'negotiate', + '_psrp_cert_validation': True, + '_psrp_configuration_name': 'Microsoft.PowerShell', + '_psrp_connection_timeout': 30, + '_psrp_message_encryption': 'auto', + '_psrp_host': 'inventory_hostname', + '_psrp_conn_kwargs': { + 'server': 'inventory_hostname', + 'port': 5986, + 'username': None, + 'password': None, + 'ssl': True, + 'path': 'wsman', + 'auth': 'negotiate', + 'cert_validation': True, + 'connection_timeout': 30, + 'encryption': 'auto', + 'proxy': None, + 'no_proxy': False, + 'max_envelope_size': 153600, + 'operation_timeout': 20, + 'certificate_key_pem': None, + 'certificate_pem': None, + 'credssp_auth_mechanism': 'auto', + 'credssp_disable_tlsv1_2': False, + 'credssp_minimum_version': 2, + 'negotiate_delegate': None, + 'negotiate_hostname_override': None, + 'negotiate_send_cbt': True, + 'negotiate_service': 'WSMAN', + 'read_timeout': 30, + 'reconnection_backoff': 2.0, + 'reconnection_retries': 0, + }, + '_psrp_max_envelope_size': 153600, + '_psrp_ignore_proxy': False, + '_psrp_operation_timeout': 20, + '_psrp_pass': None, + '_psrp_path': 'wsman', + '_psrp_port': 5986, + '_psrp_proxy': None, + '_psrp_protocol': 'https', + '_psrp_user': None + }, + ), + # ssl=False when port defined to 5985 + ( + {'_extras': {}, 'ansible_port': '5985'}, + { + '_psrp_port': 5985, + '_psrp_protocol': 'http' + }, + ), + # ssl=True when port defined to not 5985 + ( + {'_extras': {}, 'ansible_port': 1234}, + { + '_psrp_port': 1234, + '_psrp_protocol': 'https' + }, + ), + # port 5986 when ssl=True + ( + {'_extras': {}, 'ansible_psrp_protocol': 'https'}, + { + '_psrp_port': 5986, + '_psrp_protocol': 'https' + }, + ), + # port 5985 when ssl=False + ( + {'_extras': {}, 'ansible_psrp_protocol': 'http'}, + { + '_psrp_port': 5985, + '_psrp_protocol': 'http' + }, + ), + # psrp extras + ( + {'_extras': {'ansible_psrp_mock_test1': True}}, + { + '_psrp_conn_kwargs': { + 'server': 'inventory_hostname', + 'port': 5986, + 'username': None, + 'password': None, + 'ssl': True, + 'path': 'wsman', + 'auth': 'negotiate', + 'cert_validation': True, + 'connection_timeout': 30, + 'encryption': 'auto', + 'proxy': None, + 'no_proxy': False, + 'max_envelope_size': 153600, + 'operation_timeout': 20, + 'certificate_key_pem': None, + 'certificate_pem': None, + 'credssp_auth_mechanism': 'auto', + 'credssp_disable_tlsv1_2': False, + 'credssp_minimum_version': 2, + 'negotiate_delegate': None, + 'negotiate_hostname_override': None, + 'negotiate_send_cbt': True, + 'negotiate_service': 'WSMAN', + 'read_timeout': 30, + 'reconnection_backoff': 2.0, + 'reconnection_retries': 0, + 'mock_test1': True + }, + }, + ), + # cert validation through string repr of bool + ( + {'_extras': {}, 'ansible_psrp_cert_validation': 'ignore'}, + { + '_psrp_cert_validation': False + }, + ), + # cert validation path + ( + {'_extras': {}, 'ansible_psrp_cert_trust_path': '/path/cert.pem'}, + { + '_psrp_cert_validation': '/path/cert.pem' + }, + ), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('options, expected', + ((o, e) for o, e in OPTIONS_DATA)) + def test_set_options(self, options, expected): + pc = PlayContext() + new_stdin = StringIO() + + conn = connection_loader.get('psrp', pc, new_stdin) + conn.set_options(var_options=options) + conn._build_kwargs() + + for attr, expected in expected.items(): + actual = getattr(conn, attr) + assert actual == expected, \ + "psrp attr '%s', actual '%s' != expected '%s'"\ + % (attr, actual, expected) + + def test_set_invalid_extras_options(self, monkeypatch): + pc = PlayContext() + new_stdin = StringIO() + + conn = connection_loader.get('psrp', pc, new_stdin) + conn.set_options(var_options={'_extras': {'ansible_psrp_mock_test3': True}}) + + mock_display = MagicMock() + monkeypatch.setattr(Display, "warning", mock_display) + conn._build_kwargs() + + assert mock_display.call_args[0][0] == \ + 'ansible_psrp_mock_test3 is unsupported by the current psrp version installed' diff --git a/test/units/plugins/connection/test_ssh.py b/test/units/plugins/connection/test_ssh.py new file mode 100644 index 0000000..662dff9 --- /dev/null +++ b/test/units/plugins/connection/test_ssh.py @@ -0,0 +1,696 @@ +# -*- coding: utf-8 -*- +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from io import StringIO +import pytest + + +from ansible import constants as C +from ansible.errors import AnsibleAuthenticationFailure +from units.compat import unittest +from unittest.mock import patch, MagicMock, PropertyMock +from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound +from ansible.module_utils.compat.selectors import SelectorKey, EVENT_READ +from ansible.module_utils.six.moves import shlex_quote +from ansible.module_utils._text import to_bytes +from ansible.playbook.play_context import PlayContext +from ansible.plugins.connection import ssh +from ansible.plugins.loader import connection_loader, become_loader + + +class TestConnectionBaseClass(unittest.TestCase): + + def test_plugins_connection_ssh_module(self): + play_context = PlayContext() + play_context.prompt = ( + '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: ' + ) + in_stream = StringIO() + + self.assertIsInstance(ssh.Connection(play_context, in_stream), ssh.Connection) + + def test_plugins_connection_ssh_basic(self): + pc = PlayContext() + new_stdin = StringIO() + conn = ssh.Connection(pc, new_stdin) + + # connect just returns self, so assert that + res = conn._connect() + self.assertEqual(conn, res) + + ssh.SSHPASS_AVAILABLE = False + self.assertFalse(conn._sshpass_available()) + + ssh.SSHPASS_AVAILABLE = True + self.assertTrue(conn._sshpass_available()) + + with patch('subprocess.Popen') as p: + ssh.SSHPASS_AVAILABLE = None + p.return_value = MagicMock() + self.assertTrue(conn._sshpass_available()) + + ssh.SSHPASS_AVAILABLE = None + p.return_value = None + p.side_effect = OSError() + self.assertFalse(conn._sshpass_available()) + + conn.close() + self.assertFalse(conn._connected) + + def test_plugins_connection_ssh__build_command(self): + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('ssh', pc, new_stdin) + conn.get_option = MagicMock() + conn.get_option.return_value = "" + conn._build_command('ssh', 'ssh') + + def test_plugins_connection_ssh_exec_command(self): + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('ssh', pc, new_stdin) + + conn._build_command = MagicMock() + conn._build_command.return_value = 'ssh something something' + conn._run = MagicMock() + conn._run.return_value = (0, 'stdout', 'stderr') + conn.get_option = MagicMock() + conn.get_option.return_value = True + + res, stdout, stderr = conn.exec_command('ssh') + res, stdout, stderr = conn.exec_command('ssh', 'this is some data') + + def test_plugins_connection_ssh__examine_output(self): + pc = PlayContext() + new_stdin = StringIO() + become_success_token = b'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' + + conn = connection_loader.get('ssh', pc, new_stdin) + conn.set_become_plugin(become_loader.get('sudo')) + + conn.become.check_password_prompt = MagicMock() + conn.become.check_success = MagicMock() + conn.become.check_incorrect_password = MagicMock() + conn.become.check_missing_password = MagicMock() + + def _check_password_prompt(line): + return b'foo' in line + + def _check_become_success(line): + return become_success_token in line + + def _check_incorrect_password(line): + return b'incorrect password' in line + + def _check_missing_password(line): + return b'bad password' in line + + # test examining output for prompt + conn._flags = dict( + become_prompt=False, + become_success=False, + become_error=False, + become_nopasswd_error=False, + ) + + pc.prompt = True + + # override become plugin + conn.become.prompt = True + conn.become.check_password_prompt = MagicMock(side_effect=_check_password_prompt) + conn.become.check_success = MagicMock(side_effect=_check_become_success) + conn.become.check_incorrect_password = MagicMock(side_effect=_check_incorrect_password) + conn.become.check_missing_password = MagicMock(side_effect=_check_missing_password) + + def get_option(option): + if option == 'become_pass': + return 'password' + return None + + conn.become.get_option = get_option + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nfoo\nline 3\nthis should be the remainder', False) + self.assertEqual(output, b'line 1\nline 2\nline 3\n') + self.assertEqual(unprocessed, b'this should be the remainder') + self.assertTrue(conn._flags['become_prompt']) + self.assertFalse(conn._flags['become_success']) + self.assertFalse(conn._flags['become_error']) + self.assertFalse(conn._flags['become_nopasswd_error']) + + # test examining output for become prompt + conn._flags = dict( + become_prompt=False, + become_success=False, + become_error=False, + become_nopasswd_error=False, + ) + + pc.prompt = False + conn.become.prompt = False + pc.success_key = str(become_success_token) + conn.become.success = str(become_success_token) + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\n%s\nline 3\n' % become_success_token, False) + self.assertEqual(output, b'line 1\nline 2\nline 3\n') + self.assertEqual(unprocessed, b'') + self.assertFalse(conn._flags['become_prompt']) + self.assertTrue(conn._flags['become_success']) + self.assertFalse(conn._flags['become_error']) + self.assertFalse(conn._flags['become_nopasswd_error']) + + # test we dont detect become success from ssh debug: lines + conn._flags = dict( + become_prompt=False, + become_success=False, + become_error=False, + become_nopasswd_error=False, + ) + + pc.prompt = False + conn.become.prompt = True + pc.success_key = str(become_success_token) + conn.become.success = str(become_success_token) + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\ndebug1: %s\nline 3\n' % become_success_token, False) + self.assertEqual(output, b'line 1\nline 2\ndebug1: %s\nline 3\n' % become_success_token) + self.assertEqual(unprocessed, b'') + self.assertFalse(conn._flags['become_success']) + + # test examining output for become failure + conn._flags = dict( + become_prompt=False, + become_success=False, + become_error=False, + become_nopasswd_error=False, + ) + + pc.prompt = False + conn.become.prompt = False + pc.success_key = None + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nincorrect password\n', True) + self.assertEqual(output, b'line 1\nline 2\nincorrect password\n') + self.assertEqual(unprocessed, b'') + self.assertFalse(conn._flags['become_prompt']) + self.assertFalse(conn._flags['become_success']) + self.assertTrue(conn._flags['become_error']) + self.assertFalse(conn._flags['become_nopasswd_error']) + + # test examining output for missing password + conn._flags = dict( + become_prompt=False, + become_success=False, + become_error=False, + become_nopasswd_error=False, + ) + + pc.prompt = False + conn.become.prompt = False + pc.success_key = None + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nbad password\n', True) + self.assertEqual(output, b'line 1\nbad password\n') + self.assertEqual(unprocessed, b'') + self.assertFalse(conn._flags['become_prompt']) + self.assertFalse(conn._flags['become_success']) + self.assertFalse(conn._flags['become_error']) + self.assertTrue(conn._flags['become_nopasswd_error']) + + @patch('time.sleep') + @patch('os.path.exists') + def test_plugins_connection_ssh_put_file(self, mock_ospe, mock_sleep): + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('ssh', pc, new_stdin) + conn._build_command = MagicMock() + conn._bare_run = MagicMock() + + mock_ospe.return_value = True + conn._build_command.return_value = 'some command to run' + conn._bare_run.return_value = (0, '', '') + conn.host = "some_host" + + conn.set_option('reconnection_retries', 9) + conn.set_option('ssh_transfer_method', None) # unless set to None scp_if_ssh is ignored + + # Test with SCP_IF_SSH set to smart + # Test when SFTP works + conn.set_option('scp_if_ssh', 'smart') + expected_in_data = b' '.join((b'put', to_bytes(shlex_quote('/path/to/in/file')), to_bytes(shlex_quote('/path/to/dest/file')))) + b'\n' + conn.put_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', expected_in_data, checkrc=False) + + # Test when SFTP doesn't work but SCP does + conn._bare_run.side_effect = [(1, 'stdout', 'some errors'), (0, '', '')] + conn.put_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', None, checkrc=False) + conn._bare_run.side_effect = None + + # test with SCP_IF_SSH enabled + conn.set_option('scp_if_ssh', True) + conn.put_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', None, checkrc=False) + + conn.put_file(u'/path/to/in/file/with/unicode-fö〩', u'/path/to/dest/file/with/unicode-fö〩') + conn._bare_run.assert_called_with('some command to run', None, checkrc=False) + + # test with SCPP_IF_SSH disabled + conn.set_option('scp_if_ssh', False) + expected_in_data = b' '.join((b'put', to_bytes(shlex_quote('/path/to/in/file')), to_bytes(shlex_quote('/path/to/dest/file')))) + b'\n' + conn.put_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', expected_in_data, checkrc=False) + + expected_in_data = b' '.join((b'put', + to_bytes(shlex_quote('/path/to/in/file/with/unicode-fö〩')), + to_bytes(shlex_quote('/path/to/dest/file/with/unicode-fö〩')))) + b'\n' + conn.put_file(u'/path/to/in/file/with/unicode-fö〩', u'/path/to/dest/file/with/unicode-fö〩') + conn._bare_run.assert_called_with('some command to run', expected_in_data, checkrc=False) + + # test that a non-zero rc raises an error + conn._bare_run.return_value = (1, 'stdout', 'some errors') + self.assertRaises(AnsibleError, conn.put_file, '/path/to/bad/file', '/remote/path/to/file') + + # test that a not-found path raises an error + mock_ospe.return_value = False + conn._bare_run.return_value = (0, 'stdout', '') + self.assertRaises(AnsibleFileNotFound, conn.put_file, '/path/to/bad/file', '/remote/path/to/file') + + @patch('time.sleep') + def test_plugins_connection_ssh_fetch_file(self, mock_sleep): + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('ssh', pc, new_stdin) + conn._build_command = MagicMock() + conn._bare_run = MagicMock() + conn._load_name = 'ssh' + + conn._build_command.return_value = 'some command to run' + conn._bare_run.return_value = (0, '', '') + conn.host = "some_host" + + conn.set_option('reconnection_retries', 9) + conn.set_option('ssh_transfer_method', None) # unless set to None scp_if_ssh is ignored + + # Test with SCP_IF_SSH set to smart + # Test when SFTP works + conn.set_option('scp_if_ssh', 'smart') + expected_in_data = b' '.join((b'get', to_bytes(shlex_quote('/path/to/in/file')), to_bytes(shlex_quote('/path/to/dest/file')))) + b'\n' + conn.set_options({}) + conn.fetch_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', expected_in_data, checkrc=False) + + # Test when SFTP doesn't work but SCP does + conn._bare_run.side_effect = [(1, 'stdout', 'some errors'), (0, '', '')] + conn.fetch_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', None, checkrc=False) + + # test with SCP_IF_SSH enabled + conn._bare_run.side_effect = None + conn.set_option('ssh_transfer_method', None) # unless set to None scp_if_ssh is ignored + conn.set_option('scp_if_ssh', 'True') + conn.fetch_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', None, checkrc=False) + + conn.fetch_file(u'/path/to/in/file/with/unicode-fö〩', u'/path/to/dest/file/with/unicode-fö〩') + conn._bare_run.assert_called_with('some command to run', None, checkrc=False) + + # test with SCP_IF_SSH disabled + conn.set_option('scp_if_ssh', False) + expected_in_data = b' '.join((b'get', to_bytes(shlex_quote('/path/to/in/file')), to_bytes(shlex_quote('/path/to/dest/file')))) + b'\n' + conn.fetch_file('/path/to/in/file', '/path/to/dest/file') + conn._bare_run.assert_called_with('some command to run', expected_in_data, checkrc=False) + + expected_in_data = b' '.join((b'get', + to_bytes(shlex_quote('/path/to/in/file/with/unicode-fö〩')), + to_bytes(shlex_quote('/path/to/dest/file/with/unicode-fö〩')))) + b'\n' + conn.fetch_file(u'/path/to/in/file/with/unicode-fö〩', u'/path/to/dest/file/with/unicode-fö〩') + conn._bare_run.assert_called_with('some command to run', expected_in_data, checkrc=False) + + # test that a non-zero rc raises an error + conn._bare_run.return_value = (1, 'stdout', 'some errors') + self.assertRaises(AnsibleError, conn.fetch_file, '/path/to/bad/file', '/remote/path/to/file') + + +class MockSelector(object): + def __init__(self): + self.files_watched = 0 + self.register = MagicMock(side_effect=self._register) + self.unregister = MagicMock(side_effect=self._unregister) + self.close = MagicMock() + self.get_map = MagicMock(side_effect=self._get_map) + self.select = MagicMock() + + def _register(self, *args, **kwargs): + self.files_watched += 1 + + def _unregister(self, *args, **kwargs): + self.files_watched -= 1 + + def _get_map(self, *args, **kwargs): + return self.files_watched + + +@pytest.fixture +def mock_run_env(request, mocker): + pc = PlayContext() + new_stdin = StringIO() + + conn = connection_loader.get('ssh', pc, new_stdin) + conn.set_become_plugin(become_loader.get('sudo')) + conn._send_initial_data = MagicMock() + conn._examine_output = MagicMock() + conn._terminate_process = MagicMock() + conn._load_name = 'ssh' + conn.sshpass_pipe = [MagicMock(), MagicMock()] + + request.cls.pc = pc + request.cls.conn = conn + + mock_popen_res = MagicMock() + mock_popen_res.poll = MagicMock() + mock_popen_res.wait = MagicMock() + mock_popen_res.stdin = MagicMock() + mock_popen_res.stdin.fileno.return_value = 1000 + mock_popen_res.stdout = MagicMock() + mock_popen_res.stdout.fileno.return_value = 1001 + mock_popen_res.stderr = MagicMock() + mock_popen_res.stderr.fileno.return_value = 1002 + mock_popen_res.returncode = 0 + request.cls.mock_popen_res = mock_popen_res + + mock_popen = mocker.patch('subprocess.Popen', return_value=mock_popen_res) + request.cls.mock_popen = mock_popen + + request.cls.mock_selector = MockSelector() + mocker.patch('ansible.module_utils.compat.selectors.DefaultSelector', lambda: request.cls.mock_selector) + + request.cls.mock_openpty = mocker.patch('pty.openpty') + + mocker.patch('fcntl.fcntl') + mocker.patch('os.write') + mocker.patch('os.close') + + +@pytest.mark.usefixtures('mock_run_env') +class TestSSHConnectionRun(object): + # FIXME: + # These tests are little more than a smoketest. Need to enhance them + # a bit to check that they're calling the relevant functions and making + # complete coverage of the code paths + def test_no_escalation(self): + self.mock_popen_res.stdout.read.side_effect = [b"my_stdout\n", b"second_line"] + self.mock_popen_res.stderr.read.side_effect = [b"my_stderr"] + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + []] + self.mock_selector.get_map.side_effect = lambda: True + + return_code, b_stdout, b_stderr = self.conn._run("ssh", "this is input data") + assert return_code == 0 + assert b_stdout == b'my_stdout\nsecond_line' + assert b_stderr == b'my_stderr' + assert self.mock_selector.register.called is True + assert self.mock_selector.register.call_count == 2 + assert self.conn._send_initial_data.called is True + assert self.conn._send_initial_data.call_count == 1 + assert self.conn._send_initial_data.call_args[0][1] == 'this is input data' + + def test_with_password(self): + # test with a password set to trigger the sshpass write + self.pc.password = '12345' + self.mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""] + self.mock_popen_res.stderr.read.side_effect = [b""] + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + []] + self.mock_selector.get_map.side_effect = lambda: True + + return_code, b_stdout, b_stderr = self.conn._run(["ssh", "is", "a", "cmd"], "this is more data") + assert return_code == 0 + assert b_stdout == b'some data' + assert b_stderr == b'' + assert self.mock_selector.register.called is True + assert self.mock_selector.register.call_count == 2 + assert self.conn._send_initial_data.called is True + assert self.conn._send_initial_data.call_count == 1 + assert self.conn._send_initial_data.call_args[0][1] == 'this is more data' + + def _password_with_prompt_examine_output(self, sourice, state, b_chunk, sudoable): + if state == 'awaiting_prompt': + self.conn._flags['become_prompt'] = True + elif state == 'awaiting_escalation': + self.conn._flags['become_success'] = True + return (b'', b'') + + def test_password_with_prompt(self): + # test with password prompting enabled + self.pc.password = None + self.conn.become.prompt = b'Password:' + self.conn._examine_output.side_effect = self._password_with_prompt_examine_output + self.mock_popen_res.stdout.read.side_effect = [b"Password:", b"Success", b""] + self.mock_popen_res.stderr.read.side_effect = [b""] + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ), + (SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + []] + self.mock_selector.get_map.side_effect = lambda: True + + return_code, b_stdout, b_stderr = self.conn._run("ssh", "this is input data") + assert return_code == 0 + assert b_stdout == b'' + assert b_stderr == b'' + assert self.mock_selector.register.called is True + assert self.mock_selector.register.call_count == 2 + assert self.conn._send_initial_data.called is True + assert self.conn._send_initial_data.call_count == 1 + assert self.conn._send_initial_data.call_args[0][1] == 'this is input data' + + def test_password_with_become(self): + # test with some become settings + self.pc.prompt = b'Password:' + self.conn.become.prompt = b'Password:' + self.pc.become = True + self.pc.success_key = 'BECOME-SUCCESS-abcdefg' + self.conn.become._id = 'abcdefg' + self.conn._examine_output.side_effect = self._password_with_prompt_examine_output + self.mock_popen_res.stdout.read.side_effect = [b"Password:", b"BECOME-SUCCESS-abcdefg", b"abc"] + self.mock_popen_res.stderr.read.side_effect = [b"123"] + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + []] + self.mock_selector.get_map.side_effect = lambda: True + + return_code, b_stdout, b_stderr = self.conn._run("ssh", "this is input data") + self.mock_popen_res.stdin.flush.assert_called_once_with() + assert return_code == 0 + assert b_stdout == b'abc' + assert b_stderr == b'123' + assert self.mock_selector.register.called is True + assert self.mock_selector.register.call_count == 2 + assert self.conn._send_initial_data.called is True + assert self.conn._send_initial_data.call_count == 1 + assert self.conn._send_initial_data.call_args[0][1] == 'this is input data' + + def test_pasword_without_data(self): + # simulate no data input but Popen using new pty's fails + self.mock_popen.return_value = None + self.mock_popen.side_effect = [OSError(), self.mock_popen_res] + + # simulate no data input + self.mock_openpty.return_value = (98, 99) + self.mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""] + self.mock_popen_res.stderr.read.side_effect = [b""] + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + []] + self.mock_selector.get_map.side_effect = lambda: True + + return_code, b_stdout, b_stderr = self.conn._run("ssh", "") + assert return_code == 0 + assert b_stdout == b'some data' + assert b_stderr == b'' + assert self.mock_selector.register.called is True + assert self.mock_selector.register.call_count == 2 + assert self.conn._send_initial_data.called is False + + +@pytest.mark.usefixtures('mock_run_env') +class TestSSHConnectionRetries(object): + def test_incorrect_password(self, monkeypatch): + self.conn.set_option('host_key_checking', False) + self.conn.set_option('reconnection_retries', 5) + monkeypatch.setattr('time.sleep', lambda x: None) + + self.mock_popen_res.stdout.read.side_effect = [b''] + self.mock_popen_res.stderr.read.side_effect = [b'Permission denied, please try again.\r\n'] + type(self.mock_popen_res).returncode = PropertyMock(side_effect=[5] * 4) + + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [], + ] + + self.mock_selector.get_map.side_effect = lambda: True + + self.conn._build_command = MagicMock() + self.conn._build_command.return_value = [b'sshpass', b'-d41', b'ssh', b'-C'] + + exception_info = pytest.raises(AnsibleAuthenticationFailure, self.conn.exec_command, 'sshpass', 'some data') + assert exception_info.value.message == ('Invalid/incorrect username/password. Skipping remaining 5 retries to prevent account lockout: ' + 'Permission denied, please try again.') + assert self.mock_popen.call_count == 1 + + def test_retry_then_success(self, monkeypatch): + self.conn.set_option('host_key_checking', False) + self.conn.set_option('reconnection_retries', 3) + + monkeypatch.setattr('time.sleep', lambda x: None) + + self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"second_line"] + self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"] + type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 3 + [0] * 4) + + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [] + ] + self.mock_selector.get_map.side_effect = lambda: True + + self.conn._build_command = MagicMock() + self.conn._build_command.return_value = 'ssh' + + return_code, b_stdout, b_stderr = self.conn.exec_command('ssh', 'some data') + assert return_code == 0 + assert b_stdout == b'my_stdout\nsecond_line' + assert b_stderr == b'my_stderr' + + def test_multiple_failures(self, monkeypatch): + self.conn.set_option('host_key_checking', False) + self.conn.set_option('reconnection_retries', 9) + + monkeypatch.setattr('time.sleep', lambda x: None) + + self.mock_popen_res.stdout.read.side_effect = [b""] * 10 + self.mock_popen_res.stderr.read.side_effect = [b""] * 10 + type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 30) + + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [], + ] * 10 + self.mock_selector.get_map.side_effect = lambda: True + + self.conn._build_command = MagicMock() + self.conn._build_command.return_value = 'ssh' + + pytest.raises(AnsibleConnectionFailure, self.conn.exec_command, 'ssh', 'some data') + assert self.mock_popen.call_count == 10 + + def test_abitrary_exceptions(self, monkeypatch): + self.conn.set_option('host_key_checking', False) + self.conn.set_option('reconnection_retries', 9) + + monkeypatch.setattr('time.sleep', lambda x: None) + + self.conn._build_command = MagicMock() + self.conn._build_command.return_value = 'ssh' + + self.mock_popen.side_effect = [Exception('bad')] * 10 + pytest.raises(Exception, self.conn.exec_command, 'ssh', 'some data') + assert self.mock_popen.call_count == 10 + + def test_put_file_retries(self, monkeypatch): + self.conn.set_option('host_key_checking', False) + self.conn.set_option('reconnection_retries', 3) + + monkeypatch.setattr('time.sleep', lambda x: None) + monkeypatch.setattr('ansible.plugins.connection.ssh.os.path.exists', lambda x: True) + + self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"second_line"] + self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"] + type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 4 + [0] * 4) + + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [] + ] + self.mock_selector.get_map.side_effect = lambda: True + + self.conn._build_command = MagicMock() + self.conn._build_command.return_value = 'sftp' + + return_code, b_stdout, b_stderr = self.conn.put_file('/path/to/in/file', '/path/to/dest/file') + assert return_code == 0 + assert b_stdout == b"my_stdout\nsecond_line" + assert b_stderr == b"my_stderr" + assert self.mock_popen.call_count == 2 + + def test_fetch_file_retries(self, monkeypatch): + self.conn.set_option('host_key_checking', False) + self.conn.set_option('reconnection_retries', 3) + + monkeypatch.setattr('time.sleep', lambda x: None) + monkeypatch.setattr('ansible.plugins.connection.ssh.os.path.exists', lambda x: True) + + self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"second_line"] + self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"] + type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 4 + [0] * 4) + + self.mock_selector.select.side_effect = [ + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)], + [(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)], + [] + ] + self.mock_selector.get_map.side_effect = lambda: True + + self.conn._build_command = MagicMock() + self.conn._build_command.return_value = 'sftp' + + return_code, b_stdout, b_stderr = self.conn.fetch_file('/path/to/in/file', '/path/to/dest/file') + assert return_code == 0 + assert b_stdout == b"my_stdout\nsecond_line" + assert b_stderr == b"my_stderr" + assert self.mock_popen.call_count == 2 diff --git a/test/units/plugins/connection/test_winrm.py b/test/units/plugins/connection/test_winrm.py new file mode 100644 index 0000000..cb52814 --- /dev/null +++ b/test/units/plugins/connection/test_winrm.py @@ -0,0 +1,443 @@ +# -*- coding: utf-8 -*- +# (c) 2018, Jordan Borean <jborean@redhat.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +import pytest + +from io import StringIO + +from unittest.mock import MagicMock +from ansible.errors import AnsibleConnectionFailure +from ansible.module_utils._text import to_bytes +from ansible.playbook.play_context import PlayContext +from ansible.plugins.loader import connection_loader +from ansible.plugins.connection import winrm + +pytest.importorskip("winrm") + + +class TestConnectionWinRM(object): + + OPTIONS_DATA = ( + # default options + ( + {'_extras': {}}, + {}, + { + '_kerb_managed': False, + '_kinit_cmd': 'kinit', + '_winrm_connection_timeout': None, + '_winrm_host': 'inventory_hostname', + '_winrm_kwargs': {'username': None, 'password': None}, + '_winrm_pass': None, + '_winrm_path': '/wsman', + '_winrm_port': 5986, + '_winrm_scheme': 'https', + '_winrm_transport': ['ssl'], + '_winrm_user': None + }, + False + ), + # http through port + ( + {'_extras': {}, 'ansible_port': 5985}, + {}, + { + '_winrm_kwargs': {'username': None, 'password': None}, + '_winrm_port': 5985, + '_winrm_scheme': 'http', + '_winrm_transport': ['plaintext'], + }, + False + ), + # kerberos user with kerb present + ( + {'_extras': {}, 'ansible_user': 'user@domain.com'}, + {}, + { + '_kerb_managed': False, + '_kinit_cmd': 'kinit', + '_winrm_kwargs': {'username': 'user@domain.com', + 'password': None}, + '_winrm_pass': None, + '_winrm_transport': ['kerberos', 'ssl'], + '_winrm_user': 'user@domain.com' + }, + True + ), + # kerberos user without kerb present + ( + {'_extras': {}, 'ansible_user': 'user@domain.com'}, + {}, + { + '_kerb_managed': False, + '_kinit_cmd': 'kinit', + '_winrm_kwargs': {'username': 'user@domain.com', + 'password': None}, + '_winrm_pass': None, + '_winrm_transport': ['ssl'], + '_winrm_user': 'user@domain.com' + }, + False + ), + # kerberos user with managed ticket (implicit) + ( + {'_extras': {}, 'ansible_user': 'user@domain.com'}, + {'remote_password': 'pass'}, + { + '_kerb_managed': True, + '_kinit_cmd': 'kinit', + '_winrm_kwargs': {'username': 'user@domain.com', + 'password': 'pass'}, + '_winrm_pass': 'pass', + '_winrm_transport': ['kerberos', 'ssl'], + '_winrm_user': 'user@domain.com' + }, + True + ), + # kerb with managed ticket (explicit) + ( + {'_extras': {}, 'ansible_user': 'user@domain.com', + 'ansible_winrm_kinit_mode': 'managed'}, + {'password': 'pass'}, + { + '_kerb_managed': True, + }, + True + ), + # kerb with unmanaged ticket (explicit)) + ( + {'_extras': {}, 'ansible_user': 'user@domain.com', + 'ansible_winrm_kinit_mode': 'manual'}, + {'password': 'pass'}, + { + '_kerb_managed': False, + }, + True + ), + # transport override (single) + ( + {'_extras': {}, 'ansible_user': 'user@domain.com', + 'ansible_winrm_transport': 'ntlm'}, + {}, + { + '_winrm_kwargs': {'username': 'user@domain.com', + 'password': None}, + '_winrm_pass': None, + '_winrm_transport': ['ntlm'], + }, + False + ), + # transport override (list) + ( + {'_extras': {}, 'ansible_user': 'user@domain.com', + 'ansible_winrm_transport': ['ntlm', 'certificate']}, + {}, + { + '_winrm_kwargs': {'username': 'user@domain.com', + 'password': None}, + '_winrm_pass': None, + '_winrm_transport': ['ntlm', 'certificate'], + }, + False + ), + # winrm extras + ( + {'_extras': {'ansible_winrm_server_cert_validation': 'ignore', + 'ansible_winrm_service': 'WSMAN'}}, + {}, + { + '_winrm_kwargs': {'username': None, 'password': None, + 'server_cert_validation': 'ignore', + 'service': 'WSMAN'}, + }, + False + ), + # direct override + ( + {'_extras': {}, 'ansible_winrm_connection_timeout': 5}, + {'connection_timeout': 10}, + { + '_winrm_connection_timeout': 10, + }, + False + ), + # password as ansible_password + ( + {'_extras': {}, 'ansible_password': 'pass'}, + {}, + { + '_winrm_pass': 'pass', + '_winrm_kwargs': {'username': None, 'password': 'pass'} + }, + False + ), + # password as ansible_winrm_pass + ( + {'_extras': {}, 'ansible_winrm_pass': 'pass'}, + {}, + { + '_winrm_pass': 'pass', + '_winrm_kwargs': {'username': None, 'password': 'pass'} + }, + False + ), + + # password as ansible_winrm_password + ( + {'_extras': {}, 'ansible_winrm_password': 'pass'}, + {}, + { + '_winrm_pass': 'pass', + '_winrm_kwargs': {'username': None, 'password': 'pass'} + }, + False + ), + ) + + # pylint bug: https://github.com/PyCQA/pylint/issues/511 + # pylint: disable=undefined-variable + @pytest.mark.parametrize('options, direct, expected, kerb', + ((o, d, e, k) for o, d, e, k in OPTIONS_DATA)) + def test_set_options(self, options, direct, expected, kerb): + winrm.HAVE_KERBEROS = kerb + + pc = PlayContext() + new_stdin = StringIO() + + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options=options, direct=direct) + conn._build_winrm_kwargs() + + for attr, expected in expected.items(): + actual = getattr(conn, attr) + assert actual == expected, \ + "winrm attr '%s', actual '%s' != expected '%s'"\ + % (attr, actual, expected) + + +class TestWinRMKerbAuth(object): + + @pytest.mark.parametrize('options, expected', [ + [{"_extras": {}}, + (["kinit", "user@domain"],)], + [{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'}, + (["kinit2", "user@domain"],)], + [{"_extras": {'ansible_winrm_kerberos_delegation': True}}, + (["kinit", "-f", "user@domain"],)], + [{"_extras": {}, 'ansible_winrm_kinit_args': '-f -p'}, + (["kinit", "-f", "-p", "user@domain"],)], + [{"_extras": {}, 'ansible_winrm_kerberos_delegation': True, 'ansible_winrm_kinit_args': '-p'}, + (["kinit", "-p", "user@domain"],)] + ]) + def test_kinit_success_subprocess(self, monkeypatch, options, expected): + def mock_communicate(input=None, timeout=None): + return b"", b"" + + mock_popen = MagicMock() + mock_popen.return_value.communicate = mock_communicate + mock_popen.return_value.returncode = 0 + monkeypatch.setattr("subprocess.Popen", mock_popen) + + winrm.HAS_PEXPECT = False + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options=options) + conn._build_winrm_kwargs() + + conn._kerb_auth("user@domain", "pass") + mock_calls = mock_popen.mock_calls + assert len(mock_calls) == 1 + assert mock_calls[0][1] == expected + actual_env = mock_calls[0][2]['env'] + assert sorted(list(actual_env.keys())) == ['KRB5CCNAME', 'PATH'] + assert actual_env['KRB5CCNAME'].startswith("FILE:/") + assert actual_env['PATH'] == os.environ['PATH'] + + @pytest.mark.parametrize('options, expected', [ + [{"_extras": {}}, + ("kinit", ["user@domain"],)], + [{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'}, + ("kinit2", ["user@domain"],)], + [{"_extras": {'ansible_winrm_kerberos_delegation': True}}, + ("kinit", ["-f", "user@domain"],)], + [{"_extras": {}, 'ansible_winrm_kinit_args': '-f -p'}, + ("kinit", ["-f", "-p", "user@domain"],)], + [{"_extras": {}, 'ansible_winrm_kerberos_delegation': True, 'ansible_winrm_kinit_args': '-p'}, + ("kinit", ["-p", "user@domain"],)] + ]) + def test_kinit_success_pexpect(self, monkeypatch, options, expected): + pytest.importorskip("pexpect") + mock_pexpect = MagicMock() + mock_pexpect.return_value.exitstatus = 0 + monkeypatch.setattr("pexpect.spawn", mock_pexpect) + + winrm.HAS_PEXPECT = True + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options=options) + conn._build_winrm_kwargs() + + conn._kerb_auth("user@domain", "pass") + mock_calls = mock_pexpect.mock_calls + assert mock_calls[0][1] == expected + actual_env = mock_calls[0][2]['env'] + assert sorted(list(actual_env.keys())) == ['KRB5CCNAME', 'PATH'] + assert actual_env['KRB5CCNAME'].startswith("FILE:/") + assert actual_env['PATH'] == os.environ['PATH'] + assert mock_calls[0][2]['echo'] is False + assert mock_calls[1][0] == "().expect" + assert mock_calls[1][1] == (".*:",) + assert mock_calls[2][0] == "().sendline" + assert mock_calls[2][1] == ("pass",) + assert mock_calls[3][0] == "().read" + assert mock_calls[4][0] == "().wait" + + def test_kinit_with_missing_executable_subprocess(self, monkeypatch): + expected_err = "[Errno 2] No such file or directory: " \ + "'/fake/kinit': '/fake/kinit'" + mock_popen = MagicMock(side_effect=OSError(expected_err)) + + monkeypatch.setattr("subprocess.Popen", mock_popen) + + winrm.HAS_PEXPECT = False + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"} + conn.set_options(var_options=options) + conn._build_winrm_kwargs() + + with pytest.raises(AnsibleConnectionFailure) as err: + conn._kerb_auth("user@domain", "pass") + assert str(err.value) == "Kerberos auth failure when calling " \ + "kinit cmd '/fake/kinit': %s" % expected_err + + def test_kinit_with_missing_executable_pexpect(self, monkeypatch): + pexpect = pytest.importorskip("pexpect") + + expected_err = "The command was not found or was not " \ + "executable: /fake/kinit" + mock_pexpect = \ + MagicMock(side_effect=pexpect.ExceptionPexpect(expected_err)) + + monkeypatch.setattr("pexpect.spawn", mock_pexpect) + + winrm.HAS_PEXPECT = True + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"} + conn.set_options(var_options=options) + conn._build_winrm_kwargs() + + with pytest.raises(AnsibleConnectionFailure) as err: + conn._kerb_auth("user@domain", "pass") + assert str(err.value) == "Kerberos auth failure when calling " \ + "kinit cmd '/fake/kinit': %s" % expected_err + + def test_kinit_error_subprocess(self, monkeypatch): + expected_err = "kinit: krb5_parse_name: " \ + "Configuration file does not specify default realm" + + def mock_communicate(input=None, timeout=None): + return b"", to_bytes(expected_err) + + mock_popen = MagicMock() + mock_popen.return_value.communicate = mock_communicate + mock_popen.return_value.returncode = 1 + monkeypatch.setattr("subprocess.Popen", mock_popen) + + winrm.HAS_PEXPECT = False + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options={"_extras": {}}) + conn._build_winrm_kwargs() + + with pytest.raises(AnsibleConnectionFailure) as err: + conn._kerb_auth("invaliduser", "pass") + + assert str(err.value) == \ + "Kerberos auth failure for principal invaliduser with " \ + "subprocess: %s" % (expected_err) + + def test_kinit_error_pexpect(self, monkeypatch): + pytest.importorskip("pexpect") + + expected_err = "Configuration file does not specify default realm" + mock_pexpect = MagicMock() + mock_pexpect.return_value.expect = MagicMock(side_effect=OSError) + mock_pexpect.return_value.read.return_value = to_bytes(expected_err) + mock_pexpect.return_value.exitstatus = 1 + + monkeypatch.setattr("pexpect.spawn", mock_pexpect) + + winrm.HAS_PEXPECT = True + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options={"_extras": {}}) + conn._build_winrm_kwargs() + + with pytest.raises(AnsibleConnectionFailure) as err: + conn._kerb_auth("invaliduser", "pass") + + assert str(err.value) == \ + "Kerberos auth failure for principal invaliduser with " \ + "pexpect: %s" % (expected_err) + + def test_kinit_error_pass_in_output_subprocess(self, monkeypatch): + def mock_communicate(input=None, timeout=None): + return b"", b"Error with kinit\n" + input + + mock_popen = MagicMock() + mock_popen.return_value.communicate = mock_communicate + mock_popen.return_value.returncode = 1 + monkeypatch.setattr("subprocess.Popen", mock_popen) + + winrm.HAS_PEXPECT = False + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options={"_extras": {}}) + conn._build_winrm_kwargs() + + with pytest.raises(AnsibleConnectionFailure) as err: + conn._kerb_auth("username", "password") + assert str(err.value) == \ + "Kerberos auth failure for principal username with subprocess: " \ + "Error with kinit\n<redacted>" + + def test_kinit_error_pass_in_output_pexpect(self, monkeypatch): + pytest.importorskip("pexpect") + + mock_pexpect = MagicMock() + mock_pexpect.return_value.expect = MagicMock() + mock_pexpect.return_value.read.return_value = \ + b"Error with kinit\npassword\n" + mock_pexpect.return_value.exitstatus = 1 + + monkeypatch.setattr("pexpect.spawn", mock_pexpect) + + winrm.HAS_PEXPECT = True + pc = PlayContext() + pc = PlayContext() + new_stdin = StringIO() + conn = connection_loader.get('winrm', pc, new_stdin) + conn.set_options(var_options={"_extras": {}}) + conn._build_winrm_kwargs() + + with pytest.raises(AnsibleConnectionFailure) as err: + conn._kerb_auth("username", "password") + assert str(err.value) == \ + "Kerberos auth failure for principal username with pexpect: " \ + "Error with kinit\n<redacted>" |