summaryrefslogtreecommitdiffstats
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py837
1 files changed, 837 insertions, 0 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
new file mode 100644
index 0000000..1c0c6c8
--- /dev/null
+++ b/tests/test_client.py
@@ -0,0 +1,837 @@
+# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Some unit tests for SSHClient.
+"""
+
+
+import gc
+import os
+import platform
+import socket
+import threading
+import time
+import unittest
+import warnings
+import weakref
+from tempfile import mkstemp
+
+import pytest
+from pytest_relaxed import raises
+from unittest.mock import patch, Mock
+
+import paramiko
+from paramiko import SSHClient
+from paramiko.pkey import PublicBlob
+from paramiko.ssh_exception import SSHException, AuthenticationException
+
+from ._util import _support, requires_sha1_signing, slow
+
+
+requires_gss_auth = unittest.skipUnless(
+ paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
+)
+
+FINGERPRINTS = {
+ "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa
+ "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa
+ "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa
+ "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
+}
+
+
+class NullServer(paramiko.ServerInterface):
+ def __init__(self, *args, **kwargs):
+ # Allow tests to enable/disable specific key types
+ self.__allowed_keys = kwargs.pop("allowed_keys", [])
+ # And allow them to set a (single...meh) expected public blob (cert)
+ self.__expected_public_blob = kwargs.pop("public_blob", None)
+ super().__init__(*args, **kwargs)
+
+ def get_allowed_auths(self, username):
+ if username == "slowdive":
+ return "publickey,password"
+ return "publickey"
+
+ def check_auth_password(self, username, password):
+ if (username == "slowdive") and (password == "pygmalion"):
+ return paramiko.AUTH_SUCCESSFUL
+ if (username == "slowdive") and (password == "unresponsive-server"):
+ time.sleep(5)
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def check_auth_publickey(self, username, key):
+ try:
+ expected = FINGERPRINTS[key.get_name()]
+ except KeyError:
+ return paramiko.AUTH_FAILED
+ # Base check: allowed auth type & fingerprint matches
+ happy = (
+ key.get_name() in self.__allowed_keys
+ and key.get_fingerprint() == expected
+ )
+ # Secondary check: if test wants assertions about cert data
+ if (
+ self.__expected_public_blob is not None
+ and key.public_blob != self.__expected_public_blob
+ ):
+ happy = False
+ return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED
+
+ def check_channel_request(self, kind, chanid):
+ return paramiko.OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != b"yes":
+ return False
+ return True
+
+ def check_channel_env_request(self, channel, name, value):
+ if name == "INVALID_ENV":
+ return False
+
+ if not hasattr(channel, "env"):
+ setattr(channel, "env", {})
+
+ channel.env[name] = value
+ return True
+
+
+class ClientTest(unittest.TestCase):
+ def setUp(self):
+ self.sockl = socket.socket()
+ self.sockl.bind(("localhost", 0))
+ self.sockl.listen(1)
+ self.addr, self.port = self.sockl.getsockname()
+ self.connect_kwargs = dict(
+ hostname=self.addr,
+ port=self.port,
+ username="slowdive",
+ look_for_keys=False,
+ )
+ self.event = threading.Event()
+ self.kill_event = threading.Event()
+
+ def tearDown(self):
+ # Shut down client Transport
+ if hasattr(self, "tc"):
+ self.tc.close()
+ # Shut down shared socket
+ if hasattr(self, "sockl"):
+ # Signal to server thread that it should shut down early; it checks
+ # this immediately after accept(). (In scenarios where connection
+ # actually succeeded during the test, this becomes a no-op.)
+ self.kill_event.set()
+ # Forcibly connect to server sock in case the server thread is
+ # hanging out in its accept() (e.g. if the client side of the test
+ # fails before it even gets to connecting); there's no other good
+ # way to force an accept() to exit.
+ put_a_sock_in_it = socket.socket()
+ put_a_sock_in_it.connect((self.addr, self.port))
+ put_a_sock_in_it.close()
+ # Then close "our" end of the socket (which _should_ cause the
+ # accept() to bail out, but does not, for some reason. I blame
+ # threading.)
+ self.sockl.close()
+
+ def _run(
+ self,
+ allowed_keys=None,
+ delay=0,
+ public_blob=None,
+ kill_event=None,
+ server_name=None,
+ ):
+ if allowed_keys is None:
+ allowed_keys = FINGERPRINTS.keys()
+ self.socks, addr = self.sockl.accept()
+ # If the kill event was set at this point, it indicates an early
+ # shutdown, so bail out now and don't even try setting up a Transport
+ # (which will just verbosely die.)
+ if kill_event and kill_event.is_set():
+ self.socks.close()
+ return
+ self.ts = paramiko.Transport(self.socks)
+ if server_name is not None:
+ self.ts.local_version = server_name
+ keypath = _support("rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file(keypath)
+ self.ts.add_server_key(host_key)
+ keypath = _support("ecdsa-256.key")
+ host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
+ self.ts.add_server_key(host_key)
+ server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
+ if delay:
+ time.sleep(delay)
+ self.ts.start_server(self.event, server)
+
+ def _test_connection(self, **kwargs):
+ """
+ (Most) kwargs get passed directly into SSHClient.connect().
+
+ The exceptions are ``allowed_keys``/``public_blob``/``server_name``
+ which are stripped and handed to the ``NullServer`` used for testing.
+ """
+ run_kwargs = {"kill_event": self.kill_event}
+ for key in ("allowed_keys", "public_blob", "server_name"):
+ run_kwargs[key] = kwargs.pop(key, None)
+ # Server setup
+ threading.Thread(target=self._run, kwargs=run_kwargs).start()
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ # Client setup
+ self.tc = SSHClient()
+ self.tc.get_host_keys().add(
+ f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key
+ )
+
+ # Actual connection
+ self.tc.connect(**dict(self.connect_kwargs, **kwargs))
+
+ # Authentication successful?
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual(
+ self.connect_kwargs["username"], self.ts.get_username()
+ )
+ self.assertEqual(True, self.ts.is_authenticated())
+ self.assertEqual(False, self.tc.get_transport().gss_kex_used)
+
+ # Command execution functions?
+ stdin, stdout, stderr = self.tc.exec_command("yes")
+ schan = self.ts.accept(1.0)
+
+ # Nobody else tests the API of exec_command so let's do it here for
+ # now. :weary:
+ assert isinstance(stdin, paramiko.ChannelStdinFile)
+ assert isinstance(stdout, paramiko.ChannelFile)
+ assert isinstance(stderr, paramiko.ChannelStderrFile)
+
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
+ schan.close()
+
+ self.assertEqual("Hello there.\n", stdout.readline())
+ self.assertEqual("", stdout.readline())
+ self.assertEqual("This is on stderr.\n", stderr.readline())
+ self.assertEqual("", stderr.readline())
+
+ # Cleanup
+ stdin.close()
+ stdout.close()
+ stderr.close()
+
+
+class SSHClientTest(ClientTest):
+ @requires_sha1_signing
+ def test_client(self):
+ """
+ verify that the SSHClient stuff works too.
+ """
+ self._test_connection(password="pygmalion")
+
+ @requires_sha1_signing
+ def test_client_dsa(self):
+ """
+ verify that SSHClient works with a DSA key.
+ """
+ self._test_connection(key_filename=_support("dss.key"))
+
+ @requires_sha1_signing
+ def test_client_rsa(self):
+ """
+ verify that SSHClient works with an RSA key.
+ """
+ self._test_connection(key_filename=_support("rsa.key"))
+
+ @requires_sha1_signing
+ def test_client_ecdsa(self):
+ """
+ verify that SSHClient works with an ECDSA key.
+ """
+ self._test_connection(key_filename=_support("ecdsa-256.key"))
+
+ @requires_sha1_signing
+ def test_client_ed25519(self):
+ self._test_connection(key_filename=_support("ed25519.key"))
+
+ @requires_sha1_signing
+ def test_multiple_key_files(self):
+ """
+ verify that SSHClient accepts and tries multiple key files.
+ """
+ # This is dumb :(
+ types_ = {
+ "rsa": "ssh-rsa",
+ "dss": "ssh-dss",
+ "ecdsa": "ecdsa-sha2-nistp256",
+ }
+ # Various combos of attempted & valid keys
+ # TODO: try every possible combo using itertools functions
+ # TODO: use new key(s) fixture(s)
+ for attempt, accept in (
+ (["rsa", "dss"], ["dss"]), # Original test #3
+ (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly
+ (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail
+ (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success
+ ):
+ try:
+ self._test_connection(
+ key_filename=[
+ _support("{}.key".format(x)) for x in attempt
+ ],
+ allowed_keys=[types_[x] for x in accept],
+ )
+ finally:
+ # Clean up to avoid occasional gc-related deadlocks.
+ # TODO: use nose test generators after nose port
+ self.tearDown()
+ self.setUp()
+
+ @requires_sha1_signing
+ def test_multiple_key_files_failure(self):
+ """
+ Expect failure when multiple keys in play and none are accepted
+ """
+ # Until #387 is fixed we have to catch a high-up exception since
+ # various platforms trigger different errors here >_<
+ self.assertRaises(
+ SSHException,
+ self._test_connection,
+ key_filename=[_support("rsa.key")],
+ allowed_keys=["ecdsa-sha2-nistp256"],
+ )
+
+ @requires_sha1_signing
+ def test_certs_allowed_as_key_filename_values(self):
+ # NOTE: giving cert path here, not key path. (Key path test is below.
+ # They're similar except for which path is given; the expected auth and
+ # server-side behavior is 100% identical.)
+ # NOTE: only bothered whipping up one cert per overall class/family.
+ for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"):
+ key_path = _support(f"{type_}.key")
+ self._test_connection(
+ key_filename=key_path,
+ public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"),
+ )
+
+ @requires_sha1_signing
+ def test_certs_implicitly_loaded_alongside_key_filename_keys(self):
+ # NOTE: a regular test_connection() w/ rsa.key would incidentally
+ # test this (because test_xxx.key-cert.pub exists) but incidental tests
+ # stink, so NullServer and friends were updated to allow assertions
+ # about the server-side key object's public blob. Thus, we can prove
+ # that a specific cert was found, along with regular authorization
+ # succeeding proving that the overall flow works.
+ for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"):
+ key_path = _support(f"{type_}.key")
+ self._test_connection(
+ key_filename=key_path,
+ public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"),
+ )
+
+ def _cert_algo_test(self, ver, alg):
+ # Issue #2017; see auth_handler.py
+ self.connect_kwargs["username"] = "somecertuser" # neuter pw auth
+ self._test_connection(
+ # NOTE: SSHClient is able to take either the key or the cert & will
+ # set up its internals as needed
+ key_filename=_support("rsa.key-cert.pub"),
+ server_name="SSH-2.0-OpenSSH_{}".format(ver),
+ )
+ assert (
+ self.tc._transport._agreed_pubkey_algorithm
+ == "{}-cert-v01@openssh.com".format(alg)
+ )
+
+ @requires_sha1_signing
+ def test_old_openssh_needs_ssh_rsa_for_certs_not_rsa_sha2(self):
+ self._cert_algo_test(ver="7.7", alg="ssh-rsa")
+
+ @requires_sha1_signing
+ def test_newer_openssh_uses_rsa_sha2_for_certs_not_ssh_rsa(self):
+ # NOTE: 512 happens to be first in our list and is thus chosen
+ self._cert_algo_test(ver="7.8", alg="rsa-sha2-512")
+
+ def test_default_key_locations_trigger_cert_loads_if_found(self):
+ # TODO: what it says on the tin: ~/.ssh/id_rsa tries to load
+ # ~/.ssh/id_rsa-cert.pub. Right now no other tests actually test that
+ # code path (!) so we're punting too, sob.
+ pass
+
+ def test_auto_add_policy(self):
+ """
+ verify that SSHClient's AutoAddPolicy works.
+ """
+ threading.Thread(target=self._run).start()
+ hostname = f"[{self.addr}]:{self.port}"
+ key_file = _support("ecdsa-256.key")
+ public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ self.assertEqual(0, len(self.tc.get_host_keys()))
+ self.tc.connect(password="pygmalion", **self.connect_kwargs)
+
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual("slowdive", self.ts.get_username())
+ self.assertEqual(True, self.ts.is_authenticated())
+ self.assertEqual(1, len(self.tc.get_host_keys()))
+ new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
+ self.assertEqual(public_host_key, new_host_key)
+
+ def test_save_host_keys(self):
+ """
+ verify that SSHClient correctly saves a known_hosts file.
+ """
+ warnings.filterwarnings("ignore", "tempnam.*")
+
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ fd, localname = mkstemp()
+ os.close(fd)
+
+ client = SSHClient()
+ assert len(client.get_host_keys()) == 0
+
+ host_id = f"[{self.addr}]:{self.port}"
+
+ client.get_host_keys().add(host_id, "ssh-rsa", public_host_key)
+ assert len(client.get_host_keys()) == 1
+ assert public_host_key == client.get_host_keys()[host_id]["ssh-rsa"]
+
+ client.save_host_keys(localname)
+
+ with open(localname) as fd:
+ assert host_id in fd.read()
+
+ os.unlink(localname)
+
+ def test_cleanup(self):
+ """
+ verify that when an SSHClient is collected, its transport (and the
+ transport's packetizer) is closed.
+ """
+ # Skipped on PyPy because it fails on CI for unknown reasons
+ if platform.python_implementation() == "PyPy":
+ return
+
+ threading.Thread(target=self._run).start()
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ assert len(self.tc.get_host_keys()) == 0
+ self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
+
+ self.event.wait(1.0)
+ assert self.event.is_set()
+ assert self.ts.is_active()
+
+ p = weakref.ref(self.tc._transport.packetizer)
+ assert p() is not None
+ self.tc.close()
+ del self.tc
+
+ # force a collection to see whether the SSHClient object is deallocated
+ # 2 GCs are needed on PyPy, time is needed for Python 3
+ # TODO 4.0: this still fails randomly under CircleCI under Python 3.7,
+ # 3.8 at the very least. bumped sleep 0.3->1.0s but the underlying
+ # functionality should get reevaluated now we've dropped Python 2.
+ time.sleep(1)
+ gc.collect()
+ gc.collect()
+
+ assert p() is None
+
+ @patch("paramiko.client.socket.socket")
+ @patch("paramiko.client.socket.getaddrinfo")
+ def test_closes_socket_on_socket_errors(self, getaddrinfo, mocket):
+ getaddrinfo.return_value = (
+ ("irrelevant", None, None, None, "whatever"),
+ )
+
+ class SocksToBeYou(socket.error):
+ pass
+
+ my_socket = mocket.return_value
+ my_socket.connect.side_effect = SocksToBeYou
+ client = SSHClient()
+ with pytest.raises(SocksToBeYou):
+ client.connect(hostname="nope")
+ my_socket.close.assert_called_once_with()
+
+ def test_client_can_be_used_as_context_manager(self):
+ """
+ verify that an SSHClient can be used a context manager
+ """
+ threading.Thread(target=self._run).start()
+
+ with SSHClient() as tc:
+ self.tc = tc
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ assert len(self.tc.get_host_keys()) == 0
+ self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
+
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+
+ self.assertTrue(self.tc._transport is not None)
+
+ self.assertTrue(self.tc._transport is None)
+
+ def test_banner_timeout(self):
+ """
+ verify that the SSHClient has a configurable banner timeout.
+ """
+ # Start the thread with a 1 second wait.
+ threading.Thread(target=self._run, kwargs={"delay": 1}).start()
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = SSHClient()
+ self.tc.get_host_keys().add(
+ f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key
+ )
+ # Connect with a half second banner timeout.
+ kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
+ self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
+
+ @requires_sha1_signing
+ def test_auth_trickledown(self):
+ """
+ Failed key auth doesn't prevent subsequent pw auth from succeeding
+ """
+ # NOTE: re #387, re #394
+ # If pkey module used within Client._auth isn't correctly handling auth
+ # errors (e.g. if it allows things like ValueError to bubble up as per
+ # midway through #394) client.connect() will fail (at key load step)
+ # instead of succeeding (at password step)
+ kwargs = dict(
+ # Password-protected key whose passphrase is not 'pygmalion' (it's
+ # 'television' as per tests/test_pkey.py). NOTE: must use
+ # key_filename, loading the actual key here with PKey will except
+ # immediately; we're testing the try/except crap within Client.
+ key_filename=[_support("test_rsa_password.key")],
+ # Actual password for default 'slowdive' user
+ password="pygmalion",
+ )
+ self._test_connection(**kwargs)
+
+ @requires_sha1_signing
+ @slow
+ def test_auth_timeout(self):
+ """
+ verify that the SSHClient has a configurable auth timeout
+ """
+ # Connect with a half second auth timeout
+ self.assertRaises(
+ AuthenticationException,
+ self._test_connection,
+ password="unresponsive-server",
+ auth_timeout=0.5,
+ )
+
+ @patch.object(
+ paramiko.Channel,
+ "_set_remote_channel",
+ lambda *args, **kwargs: time.sleep(100),
+ )
+ def test_channel_timeout(self):
+ """
+ verify that the SSHClient has a configurable channel timeout
+ """
+ threading.Thread(target=self._run).start()
+ # Client setup
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ # Actual connection
+ self.tc.connect(
+ **dict(
+ self.connect_kwargs, password="pygmalion", channel_timeout=0.5
+ )
+ )
+ self.event.wait(1.0)
+
+ self.assertRaises(paramiko.SSHException, self.tc.open_sftp)
+
+ @requires_gss_auth
+ def test_auth_trickledown_gsskex(self):
+ """
+ Failed gssapi-keyex doesn't prevent subsequent key from succeeding
+ """
+ kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")])
+ self._test_connection(**kwargs)
+
+ @requires_gss_auth
+ def test_auth_trickledown_gssauth(self):
+ """
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
+ """
+ kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")])
+ self._test_connection(**kwargs)
+
+ def test_reject_policy(self):
+ """
+ verify that SSHClient's RejectPolicy works.
+ """
+ threading.Thread(target=self._run).start()
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
+ self.assertEqual(0, len(self.tc.get_host_keys()))
+ self.assertRaises(
+ paramiko.SSHException,
+ self.tc.connect,
+ password="pygmalion",
+ **self.connect_kwargs,
+ )
+
+ @requires_gss_auth
+ def test_reject_policy_gsskex(self):
+ """
+ verify that SSHClient's RejectPolicy works,
+ even if gssapi-keyex was enabled but not used.
+ """
+ # Test for a bug present in paramiko versions released before
+ # 2017-08-01
+ threading.Thread(target=self._run).start()
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
+ self.assertEqual(0, len(self.tc.get_host_keys()))
+ self.assertRaises(
+ paramiko.SSHException,
+ self.tc.connect,
+ password="pygmalion",
+ gss_kex=True,
+ **self.connect_kwargs,
+ )
+
+ def _client_host_key_bad(self, host_key):
+ threading.Thread(target=self._run).start()
+ hostname = f"[{self.addr}]:{self.port}"
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
+ known_hosts = self.tc.get_host_keys()
+ known_hosts.add(hostname, host_key.get_name(), host_key)
+
+ self.assertRaises(
+ paramiko.BadHostKeyException,
+ self.tc.connect,
+ password="pygmalion",
+ **self.connect_kwargs,
+ )
+
+ def _client_host_key_good(self, ktype, kfile):
+ threading.Thread(target=self._run).start()
+ hostname = f"[{self.addr}]:{self.port}"
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
+ host_key = ktype.from_private_key_file(_support(kfile))
+ known_hosts = self.tc.get_host_keys()
+ known_hosts.add(hostname, host_key.get_name(), host_key)
+
+ self.tc.connect(password="pygmalion", **self.connect_kwargs)
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual(True, self.ts.is_authenticated())
+
+ def test_host_key_negotiation_1(self):
+ host_key = paramiko.ECDSAKey.generate()
+ self._client_host_key_bad(host_key)
+
+ @requires_sha1_signing
+ def test_host_key_negotiation_2(self):
+ host_key = paramiko.RSAKey.generate(2048)
+ self._client_host_key_bad(host_key)
+
+ def test_host_key_negotiation_3(self):
+ self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key")
+
+ @requires_sha1_signing
+ def test_host_key_negotiation_4(self):
+ self._client_host_key_good(paramiko.RSAKey, "rsa.key")
+
+ def _setup_for_env(self):
+ threading.Thread(target=self._run).start()
+
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ self.assertEqual(0, len(self.tc.get_host_keys()))
+ self.tc.connect(
+ self.addr, self.port, username="slowdive", password="pygmalion"
+ )
+
+ self.event.wait(1.0)
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
+
+ def test_update_environment(self):
+ """
+ Verify that environment variables can be set by the client.
+ """
+ self._setup_for_env()
+ target_env = {b"A": b"B", b"C": b"d"}
+
+ self.tc.exec_command("yes", environment=target_env)
+ schan = self.ts.accept(1.0)
+ self.assertEqual(target_env, getattr(schan, "env", {}))
+ schan.close()
+
+ @unittest.skip("Clients normally fail silently, thus so do we, for now")
+ def test_env_update_failures(self):
+ self._setup_for_env()
+ with self.assertRaises(SSHException) as manager:
+ # Verify that a rejection by the server can be detected
+ self.tc.exec_command("yes", environment={b"INVALID_ENV": b""})
+ self.assertTrue(
+ "INVALID_ENV" in str(manager.exception),
+ "Expected variable name in error message",
+ )
+ self.assertTrue(
+ isinstance(manager.exception.args[1], SSHException),
+ "Expected original SSHException in exception",
+ )
+
+ def test_missing_key_policy_accepts_classes_or_instances(self):
+ """
+ Client.missing_host_key_policy() can take classes or instances.
+ """
+ # AN ACTUAL UNIT TEST?! GOOD LORD
+ # (But then we have to test a private API...meh.)
+ client = SSHClient()
+ # Default
+ assert isinstance(client._policy, paramiko.RejectPolicy)
+ # Hand in an instance (classic behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ # Hand in just the class (new behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
+
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_defaults_to_None(self, Transport):
+ SSHClient().connect("host", sock=Mock(), password="no")
+ assert Transport.call_args[1]["disabled_algorithms"] is None
+
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_passed_directly_if_given(self, Transport):
+ SSHClient().connect(
+ "host",
+ sock=Mock(),
+ password="no",
+ disabled_algorithms={"keys": ["ssh-dss"]},
+ )
+ call_arg = Transport.call_args[1]["disabled_algorithms"]
+ assert call_arg == {"keys": ["ssh-dss"]}
+
+ @patch("paramiko.client.Transport")
+ def test_transport_factory_defaults_to_Transport(self, Transport):
+ sock, kex, creds, algos = Mock(), Mock(), Mock(), Mock()
+ SSHClient().connect(
+ "host",
+ sock=sock,
+ password="no",
+ gss_kex=kex,
+ gss_deleg_creds=creds,
+ disabled_algorithms=algos,
+ )
+ Transport.assert_called_once_with(
+ sock, gss_kex=kex, gss_deleg_creds=creds, disabled_algorithms=algos
+ )
+
+ @patch("paramiko.client.Transport")
+ def test_transport_factory_may_be_specified(self, Transport):
+ factory = Mock()
+ sock, kex, creds, algos = Mock(), Mock(), Mock(), Mock()
+ SSHClient().connect(
+ "host",
+ sock=sock,
+ password="no",
+ gss_kex=kex,
+ gss_deleg_creds=creds,
+ disabled_algorithms=algos,
+ transport_factory=factory,
+ )
+ factory.assert_called_once_with(
+ sock, gss_kex=kex, gss_deleg_creds=creds, disabled_algorithms=algos
+ )
+ # Safety check
+ assert not Transport.called
+
+
+class PasswordPassphraseTests(ClientTest):
+ # TODO: most of these could reasonably be set up to use mocks/assertions
+ # (e.g. "gave passphrase -> expect PKey was given it as the passphrase")
+ # instead of suffering a real connection cycle.
+ # TODO: in that case, move the below to be part of an integration suite?
+
+ @requires_sha1_signing
+ def test_password_kwarg_works_for_password_auth(self):
+ # Straightforward / duplicate of earlier basic password test.
+ self._test_connection(password="pygmalion")
+
+ # TODO: more granular exception pending #387; should be signaling "no auth
+ # methods available" because no key and no password
+ @raises(SSHException)
+ @requires_sha1_signing
+ def test_passphrase_kwarg_not_used_for_password_auth(self):
+ # Using the "right" password in the "wrong" field shouldn't work.
+ self._test_connection(passphrase="pygmalion")
+
+ @requires_sha1_signing
+ def test_passphrase_kwarg_used_for_key_passphrase(self):
+ # Straightforward again, with new passphrase kwarg.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ passphrase="television",
+ )
+
+ @requires_sha1_signing
+ def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(
+ self,
+ ): # noqa
+ # Backwards compatibility: passphrase in the password field.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ )
+
+ @raises(AuthenticationException) # TODO: more granular
+ @requires_sha1_signing
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa
+ self,
+ ):
+ # Sanity: if we're given both fields, the password field is NOT used as
+ # a passphrase.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ passphrase="wat? lol no",
+ )