diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 837 |
1 files changed, 837 insertions, 0 deletions
diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..1c0c6c8 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,837 @@ +# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Some unit tests for SSHClient. +""" + + +import gc +import os +import platform +import socket +import threading +import time +import unittest +import warnings +import weakref +from tempfile import mkstemp + +import pytest +from pytest_relaxed import raises +from unittest.mock import patch, Mock + +import paramiko +from paramiko import SSHClient +from paramiko.pkey import PublicBlob +from paramiko.ssh_exception import SSHException, AuthenticationException + +from ._util import _support, requires_sha1_signing, slow + + +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) + +FINGERPRINTS = { + "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa + "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa + "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa + "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', +} + + +class NullServer(paramiko.ServerInterface): + def __init__(self, *args, **kwargs): + # Allow tests to enable/disable specific key types + self.__allowed_keys = kwargs.pop("allowed_keys", []) + # And allow them to set a (single...meh) expected public blob (cert) + self.__expected_public_blob = kwargs.pop("public_blob", None) + super().__init__(*args, **kwargs) + + def get_allowed_auths(self, username): + if username == "slowdive": + return "publickey,password" + return "publickey" + + def check_auth_password(self, username, password): + if (username == "slowdive") and (password == "pygmalion"): + return paramiko.AUTH_SUCCESSFUL + if (username == "slowdive") and (password == "unresponsive-server"): + time.sleep(5) + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_auth_publickey(self, username, key): + try: + expected = FINGERPRINTS[key.get_name()] + except KeyError: + return paramiko.AUTH_FAILED + # Base check: allowed auth type & fingerprint matches + happy = ( + key.get_name() in self.__allowed_keys + and key.get_fingerprint() == expected + ) + # Secondary check: if test wants assertions about cert data + if ( + self.__expected_public_blob is not None + and key.public_blob != self.__expected_public_blob + ): + happy = False + return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED + + def check_channel_request(self, kind, chanid): + return paramiko.OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != b"yes": + return False + return True + + def check_channel_env_request(self, channel, name, value): + if name == "INVALID_ENV": + return False + + if not hasattr(channel, "env"): + setattr(channel, "env", {}) + + channel.env[name] = value + return True + + +class ClientTest(unittest.TestCase): + def setUp(self): + self.sockl = socket.socket() + self.sockl.bind(("localhost", 0)) + self.sockl.listen(1) + self.addr, self.port = self.sockl.getsockname() + self.connect_kwargs = dict( + hostname=self.addr, + port=self.port, + username="slowdive", + look_for_keys=False, + ) + self.event = threading.Event() + self.kill_event = threading.Event() + + def tearDown(self): + # Shut down client Transport + if hasattr(self, "tc"): + self.tc.close() + # Shut down shared socket + if hasattr(self, "sockl"): + # Signal to server thread that it should shut down early; it checks + # this immediately after accept(). (In scenarios where connection + # actually succeeded during the test, this becomes a no-op.) + self.kill_event.set() + # Forcibly connect to server sock in case the server thread is + # hanging out in its accept() (e.g. if the client side of the test + # fails before it even gets to connecting); there's no other good + # way to force an accept() to exit. + put_a_sock_in_it = socket.socket() + put_a_sock_in_it.connect((self.addr, self.port)) + put_a_sock_in_it.close() + # Then close "our" end of the socket (which _should_ cause the + # accept() to bail out, but does not, for some reason. I blame + # threading.) + self.sockl.close() + + def _run( + self, + allowed_keys=None, + delay=0, + public_blob=None, + kill_event=None, + server_name=None, + ): + if allowed_keys is None: + allowed_keys = FINGERPRINTS.keys() + self.socks, addr = self.sockl.accept() + # If the kill event was set at this point, it indicates an early + # shutdown, so bail out now and don't even try setting up a Transport + # (which will just verbosely die.) + if kill_event and kill_event.is_set(): + self.socks.close() + return + self.ts = paramiko.Transport(self.socks) + if server_name is not None: + self.ts.local_version = server_name + keypath = _support("rsa.key") + host_key = paramiko.RSAKey.from_private_key_file(keypath) + self.ts.add_server_key(host_key) + keypath = _support("ecdsa-256.key") + host_key = paramiko.ECDSAKey.from_private_key_file(keypath) + self.ts.add_server_key(host_key) + server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) + if delay: + time.sleep(delay) + self.ts.start_server(self.event, server) + + def _test_connection(self, **kwargs): + """ + (Most) kwargs get passed directly into SSHClient.connect(). + + The exceptions are ``allowed_keys``/``public_blob``/``server_name`` + which are stripped and handed to the ``NullServer`` used for testing. + """ + run_kwargs = {"kill_event": self.kill_event} + for key in ("allowed_keys", "public_blob", "server_name"): + run_kwargs[key] = kwargs.pop(key, None) + # Server setup + threading.Thread(target=self._run, kwargs=run_kwargs).start() + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + # Client setup + self.tc = SSHClient() + self.tc.get_host_keys().add( + f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key + ) + + # Actual connection + self.tc.connect(**dict(self.connect_kwargs, **kwargs)) + + # Authentication successful? + self.event.wait(1.0) + self.assertTrue(self.event.is_set()) + self.assertTrue(self.ts.is_active()) + self.assertEqual( + self.connect_kwargs["username"], self.ts.get_username() + ) + self.assertEqual(True, self.ts.is_authenticated()) + self.assertEqual(False, self.tc.get_transport().gss_kex_used) + + # Command execution functions? + stdin, stdout, stderr = self.tc.exec_command("yes") + schan = self.ts.accept(1.0) + + # Nobody else tests the API of exec_command so let's do it here for + # now. :weary: + assert isinstance(stdin, paramiko.ChannelStdinFile) + assert isinstance(stdout, paramiko.ChannelFile) + assert isinstance(stderr, paramiko.ChannelStderrFile) + + schan.send("Hello there.\n") + schan.send_stderr("This is on stderr.\n") + schan.close() + + self.assertEqual("Hello there.\n", stdout.readline()) + self.assertEqual("", stdout.readline()) + self.assertEqual("This is on stderr.\n", stderr.readline()) + self.assertEqual("", stderr.readline()) + + # Cleanup + stdin.close() + stdout.close() + stderr.close() + + +class SSHClientTest(ClientTest): + @requires_sha1_signing + def test_client(self): + """ + verify that the SSHClient stuff works too. + """ + self._test_connection(password="pygmalion") + + @requires_sha1_signing + def test_client_dsa(self): + """ + verify that SSHClient works with a DSA key. + """ + self._test_connection(key_filename=_support("dss.key")) + + @requires_sha1_signing + def test_client_rsa(self): + """ + verify that SSHClient works with an RSA key. + """ + self._test_connection(key_filename=_support("rsa.key")) + + @requires_sha1_signing + def test_client_ecdsa(self): + """ + verify that SSHClient works with an ECDSA key. + """ + self._test_connection(key_filename=_support("ecdsa-256.key")) + + @requires_sha1_signing + def test_client_ed25519(self): + self._test_connection(key_filename=_support("ed25519.key")) + + @requires_sha1_signing + def test_multiple_key_files(self): + """ + verify that SSHClient accepts and tries multiple key files. + """ + # This is dumb :( + types_ = { + "rsa": "ssh-rsa", + "dss": "ssh-dss", + "ecdsa": "ecdsa-sha2-nistp256", + } + # Various combos of attempted & valid keys + # TODO: try every possible combo using itertools functions + # TODO: use new key(s) fixture(s) + for attempt, accept in ( + (["rsa", "dss"], ["dss"]), # Original test #3 + (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly + (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success + ): + try: + self._test_connection( + key_filename=[ + _support("{}.key".format(x)) for x in attempt + ], + allowed_keys=[types_[x] for x in accept], + ) + finally: + # Clean up to avoid occasional gc-related deadlocks. + # TODO: use nose test generators after nose port + self.tearDown() + self.setUp() + + @requires_sha1_signing + def test_multiple_key_files_failure(self): + """ + Expect failure when multiple keys in play and none are accepted + """ + # Until #387 is fixed we have to catch a high-up exception since + # various platforms trigger different errors here >_< + self.assertRaises( + SSHException, + self._test_connection, + key_filename=[_support("rsa.key")], + allowed_keys=["ecdsa-sha2-nistp256"], + ) + + @requires_sha1_signing + def test_certs_allowed_as_key_filename_values(self): + # NOTE: giving cert path here, not key path. (Key path test is below. + # They're similar except for which path is given; the expected auth and + # server-side behavior is 100% identical.) + # NOTE: only bothered whipping up one cert per overall class/family. + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") + self._test_connection( + key_filename=key_path, + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), + ) + + @requires_sha1_signing + def test_certs_implicitly_loaded_alongside_key_filename_keys(self): + # NOTE: a regular test_connection() w/ rsa.key would incidentally + # test this (because test_xxx.key-cert.pub exists) but incidental tests + # stink, so NullServer and friends were updated to allow assertions + # about the server-side key object's public blob. Thus, we can prove + # that a specific cert was found, along with regular authorization + # succeeding proving that the overall flow works. + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") + self._test_connection( + key_filename=key_path, + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), + ) + + def _cert_algo_test(self, ver, alg): + # Issue #2017; see auth_handler.py + self.connect_kwargs["username"] = "somecertuser" # neuter pw auth + self._test_connection( + # NOTE: SSHClient is able to take either the key or the cert & will + # set up its internals as needed + key_filename=_support("rsa.key-cert.pub"), + server_name="SSH-2.0-OpenSSH_{}".format(ver), + ) + assert ( + self.tc._transport._agreed_pubkey_algorithm + == "{}-cert-v01@openssh.com".format(alg) + ) + + @requires_sha1_signing + def test_old_openssh_needs_ssh_rsa_for_certs_not_rsa_sha2(self): + self._cert_algo_test(ver="7.7", alg="ssh-rsa") + + @requires_sha1_signing + def test_newer_openssh_uses_rsa_sha2_for_certs_not_ssh_rsa(self): + # NOTE: 512 happens to be first in our list and is thus chosen + self._cert_algo_test(ver="7.8", alg="rsa-sha2-512") + + def test_default_key_locations_trigger_cert_loads_if_found(self): + # TODO: what it says on the tin: ~/.ssh/id_rsa tries to load + # ~/.ssh/id_rsa-cert.pub. Right now no other tests actually test that + # code path (!) so we're punting too, sob. + pass + + def test_auto_add_policy(self): + """ + verify that SSHClient's AutoAddPolicy works. + """ + threading.Thread(target=self._run).start() + hostname = f"[{self.addr}]:{self.port}" + key_file = _support("ecdsa-256.key") + public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.tc.connect(password="pygmalion", **self.connect_kwargs) + + self.event.wait(1.0) + self.assertTrue(self.event.is_set()) + self.assertTrue(self.ts.is_active()) + self.assertEqual("slowdive", self.ts.get_username()) + self.assertEqual(True, self.ts.is_authenticated()) + self.assertEqual(1, len(self.tc.get_host_keys())) + new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] + self.assertEqual(public_host_key, new_host_key) + + def test_save_host_keys(self): + """ + verify that SSHClient correctly saves a known_hosts file. + """ + warnings.filterwarnings("ignore", "tempnam.*") + + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + fd, localname = mkstemp() + os.close(fd) + + client = SSHClient() + assert len(client.get_host_keys()) == 0 + + host_id = f"[{self.addr}]:{self.port}" + + client.get_host_keys().add(host_id, "ssh-rsa", public_host_key) + assert len(client.get_host_keys()) == 1 + assert public_host_key == client.get_host_keys()[host_id]["ssh-rsa"] + + client.save_host_keys(localname) + + with open(localname) as fd: + assert host_id in fd.read() + + os.unlink(localname) + + def test_cleanup(self): + """ + verify that when an SSHClient is collected, its transport (and the + transport's packetizer) is closed. + """ + # Skipped on PyPy because it fails on CI for unknown reasons + if platform.python_implementation() == "PyPy": + return + + threading.Thread(target=self._run).start() + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert len(self.tc.get_host_keys()) == 0 + self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) + + self.event.wait(1.0) + assert self.event.is_set() + assert self.ts.is_active() + + p = weakref.ref(self.tc._transport.packetizer) + assert p() is not None + self.tc.close() + del self.tc + + # force a collection to see whether the SSHClient object is deallocated + # 2 GCs are needed on PyPy, time is needed for Python 3 + # TODO 4.0: this still fails randomly under CircleCI under Python 3.7, + # 3.8 at the very least. bumped sleep 0.3->1.0s but the underlying + # functionality should get reevaluated now we've dropped Python 2. + time.sleep(1) + gc.collect() + gc.collect() + + assert p() is None + + @patch("paramiko.client.socket.socket") + @patch("paramiko.client.socket.getaddrinfo") + def test_closes_socket_on_socket_errors(self, getaddrinfo, mocket): + getaddrinfo.return_value = ( + ("irrelevant", None, None, None, "whatever"), + ) + + class SocksToBeYou(socket.error): + pass + + my_socket = mocket.return_value + my_socket.connect.side_effect = SocksToBeYou + client = SSHClient() + with pytest.raises(SocksToBeYou): + client.connect(hostname="nope") + my_socket.close.assert_called_once_with() + + def test_client_can_be_used_as_context_manager(self): + """ + verify that an SSHClient can be used a context manager + """ + threading.Thread(target=self._run).start() + + with SSHClient() as tc: + self.tc = tc + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert len(self.tc.get_host_keys()) == 0 + self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) + + self.event.wait(1.0) + self.assertTrue(self.event.is_set()) + self.assertTrue(self.ts.is_active()) + + self.assertTrue(self.tc._transport is not None) + + self.assertTrue(self.tc._transport is None) + + def test_banner_timeout(self): + """ + verify that the SSHClient has a configurable banner timeout. + """ + # Start the thread with a 1 second wait. + threading.Thread(target=self._run, kwargs={"delay": 1}).start() + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = SSHClient() + self.tc.get_host_keys().add( + f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key + ) + # Connect with a half second banner timeout. + kwargs = dict(self.connect_kwargs, banner_timeout=0.5) + self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) + + @requires_sha1_signing + def test_auth_trickledown(self): + """ + Failed key auth doesn't prevent subsequent pw auth from succeeding + """ + # NOTE: re #387, re #394 + # If pkey module used within Client._auth isn't correctly handling auth + # errors (e.g. if it allows things like ValueError to bubble up as per + # midway through #394) client.connect() will fail (at key load step) + # instead of succeeding (at password step) + kwargs = dict( + # Password-protected key whose passphrase is not 'pygmalion' (it's + # 'television' as per tests/test_pkey.py). NOTE: must use + # key_filename, loading the actual key here with PKey will except + # immediately; we're testing the try/except crap within Client. + key_filename=[_support("test_rsa_password.key")], + # Actual password for default 'slowdive' user + password="pygmalion", + ) + self._test_connection(**kwargs) + + @requires_sha1_signing + @slow + def test_auth_timeout(self): + """ + verify that the SSHClient has a configurable auth timeout + """ + # Connect with a half second auth timeout + self.assertRaises( + AuthenticationException, + self._test_connection, + password="unresponsive-server", + auth_timeout=0.5, + ) + + @patch.object( + paramiko.Channel, + "_set_remote_channel", + lambda *args, **kwargs: time.sleep(100), + ) + def test_channel_timeout(self): + """ + verify that the SSHClient has a configurable channel timeout + """ + threading.Thread(target=self._run).start() + # Client setup + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # Actual connection + self.tc.connect( + **dict( + self.connect_kwargs, password="pygmalion", channel_timeout=0.5 + ) + ) + self.event.wait(1.0) + + self.assertRaises(paramiko.SSHException, self.tc.open_sftp) + + @requires_gss_auth + def test_auth_trickledown_gsskex(self): + """ + Failed gssapi-keyex doesn't prevent subsequent key from succeeding + """ + kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")]) + self._test_connection(**kwargs) + + @requires_gss_auth + def test_auth_trickledown_gssauth(self): + """ + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding + """ + kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")]) + self._test_connection(**kwargs) + + def test_reject_policy(self): + """ + verify that SSHClient's RejectPolicy works. + """ + threading.Thread(target=self._run).start() + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.assertRaises( + paramiko.SSHException, + self.tc.connect, + password="pygmalion", + **self.connect_kwargs, + ) + + @requires_gss_auth + def test_reject_policy_gsskex(self): + """ + verify that SSHClient's RejectPolicy works, + even if gssapi-keyex was enabled but not used. + """ + # Test for a bug present in paramiko versions released before + # 2017-08-01 + threading.Thread(target=self._run).start() + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.assertRaises( + paramiko.SSHException, + self.tc.connect, + password="pygmalion", + gss_kex=True, + **self.connect_kwargs, + ) + + def _client_host_key_bad(self, host_key): + threading.Thread(target=self._run).start() + hostname = f"[{self.addr}]:{self.port}" + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) + known_hosts = self.tc.get_host_keys() + known_hosts.add(hostname, host_key.get_name(), host_key) + + self.assertRaises( + paramiko.BadHostKeyException, + self.tc.connect, + password="pygmalion", + **self.connect_kwargs, + ) + + def _client_host_key_good(self, ktype, kfile): + threading.Thread(target=self._run).start() + hostname = f"[{self.addr}]:{self.port}" + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) + host_key = ktype.from_private_key_file(_support(kfile)) + known_hosts = self.tc.get_host_keys() + known_hosts.add(hostname, host_key.get_name(), host_key) + + self.tc.connect(password="pygmalion", **self.connect_kwargs) + self.event.wait(1.0) + self.assertTrue(self.event.is_set()) + self.assertTrue(self.ts.is_active()) + self.assertEqual(True, self.ts.is_authenticated()) + + def test_host_key_negotiation_1(self): + host_key = paramiko.ECDSAKey.generate() + self._client_host_key_bad(host_key) + + @requires_sha1_signing + def test_host_key_negotiation_2(self): + host_key = paramiko.RSAKey.generate(2048) + self._client_host_key_bad(host_key) + + def test_host_key_negotiation_3(self): + self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key") + + @requires_sha1_signing + def test_host_key_negotiation_4(self): + self._client_host_key_good(paramiko.RSAKey, "rsa.key") + + def _setup_for_env(self): + threading.Thread(target=self._run).start() + + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.tc.connect( + self.addr, self.port, username="slowdive", password="pygmalion" + ) + + self.event.wait(1.0) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) + + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() + target_env = {b"A": b"B", b"C": b"d"} + + self.tc.exec_command("yes", environment=target_env) + schan = self.ts.accept(1.0) + self.assertEqual(target_env, getattr(schan, "env", {})) + schan.close() + + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: + # Verify that a rejection by the server can be detected + self.tc.exec_command("yes", environment={b"INVALID_ENV": b""}) + self.assertTrue( + "INVALID_ENV" in str(manager.exception), + "Expected variable name in error message", + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + "Expected original SSHException in exception", + ) + + def test_missing_key_policy_accepts_classes_or_instances(self): + """ + Client.missing_host_key_policy() can take classes or instances. + """ + # AN ACTUAL UNIT TEST?! GOOD LORD + # (But then we have to test a private API...meh.) + client = SSHClient() + # Default + assert isinstance(client._policy, paramiko.RejectPolicy) + # Hand in an instance (classic behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert isinstance(client._policy, paramiko.AutoAddPolicy) + # Hand in just the class (new behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy) + assert isinstance(client._policy, paramiko.AutoAddPolicy) + + @patch("paramiko.client.Transport") + def test_disabled_algorithms_defaults_to_None(self, Transport): + SSHClient().connect("host", sock=Mock(), password="no") + assert Transport.call_args[1]["disabled_algorithms"] is None + + @patch("paramiko.client.Transport") + def test_disabled_algorithms_passed_directly_if_given(self, Transport): + SSHClient().connect( + "host", + sock=Mock(), + password="no", + disabled_algorithms={"keys": ["ssh-dss"]}, + ) + call_arg = Transport.call_args[1]["disabled_algorithms"] + assert call_arg == {"keys": ["ssh-dss"]} + + @patch("paramiko.client.Transport") + def test_transport_factory_defaults_to_Transport(self, Transport): + sock, kex, creds, algos = Mock(), Mock(), Mock(), Mock() + SSHClient().connect( + "host", + sock=sock, + password="no", + gss_kex=kex, + gss_deleg_creds=creds, + disabled_algorithms=algos, + ) + Transport.assert_called_once_with( + sock, gss_kex=kex, gss_deleg_creds=creds, disabled_algorithms=algos + ) + + @patch("paramiko.client.Transport") + def test_transport_factory_may_be_specified(self, Transport): + factory = Mock() + sock, kex, creds, algos = Mock(), Mock(), Mock(), Mock() + SSHClient().connect( + "host", + sock=sock, + password="no", + gss_kex=kex, + gss_deleg_creds=creds, + disabled_algorithms=algos, + transport_factory=factory, + ) + factory.assert_called_once_with( + sock, gss_kex=kex, gss_deleg_creds=creds, disabled_algorithms=algos + ) + # Safety check + assert not Transport.called + + +class PasswordPassphraseTests(ClientTest): + # TODO: most of these could reasonably be set up to use mocks/assertions + # (e.g. "gave passphrase -> expect PKey was given it as the passphrase") + # instead of suffering a real connection cycle. + # TODO: in that case, move the below to be part of an integration suite? + + @requires_sha1_signing + def test_password_kwarg_works_for_password_auth(self): + # Straightforward / duplicate of earlier basic password test. + self._test_connection(password="pygmalion") + + # TODO: more granular exception pending #387; should be signaling "no auth + # methods available" because no key and no password + @raises(SSHException) + @requires_sha1_signing + def test_passphrase_kwarg_not_used_for_password_auth(self): + # Using the "right" password in the "wrong" field shouldn't work. + self._test_connection(passphrase="pygmalion") + + @requires_sha1_signing + def test_passphrase_kwarg_used_for_key_passphrase(self): + # Straightforward again, with new passphrase kwarg. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + passphrase="television", + ) + + @requires_sha1_signing + def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given( + self, + ): # noqa + # Backwards compatibility: passphrase in the password field. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + password="television", + ) + + @raises(AuthenticationException) # TODO: more granular + @requires_sha1_signing + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa + self, + ): + # Sanity: if we're given both fields, the password field is NOT used as + # a passphrase. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + password="television", + passphrase="wat? lol no", + ) |