summaryrefslogtreecommitdiffstats
path: root/tests/test_kex_gss.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_kex_gss.py')
-rw-r--r--tests/test_kex_gss.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
new file mode 100644
index 0000000..a81b195
--- /dev/null
+++ b/tests/test_kex_gss.py
@@ -0,0 +1,154 @@
+# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user
+authentication
+"""
+
+
+import socket
+import threading
+import unittest
+
+import paramiko
+
+from ._util import needs_gssapi, KerberosTestCase, update_env, _support
+
+
+class NullServer(paramiko.ServerInterface):
+ def get_allowed_auths(self, username):
+ return "gssapi-keyex"
+
+ def check_auth_gssapi_keyex(
+ self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None
+ ):
+ if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def enable_auth_gssapi(self):
+ UseGSSAPI = True
+ return UseGSSAPI
+
+ def check_channel_request(self, kind, chanid):
+ return paramiko.OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != b"yes":
+ return False
+ return True
+
+
+@needs_gssapi
+class GSSKexTest(KerberosTestCase):
+ def setUp(self):
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
+ self.sockl = socket.socket()
+ self.sockl.bind((self.realm.hostname, 0))
+ self.sockl.listen(1)
+ self.addr, self.port = self.sockl.getsockname()
+ self.event = threading.Event()
+ update_env(self, self.realm.env)
+ thread = threading.Thread(target=self._run)
+ thread.start()
+
+ def tearDown(self):
+ for attr in "tc ts socks sockl".split():
+ if hasattr(self, attr):
+ getattr(self, attr).close()
+
+ def _run(self):
+ self.socks, addr = self.sockl.accept()
+ self.ts = paramiko.Transport(self.socks, gss_kex=True)
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
+ self.ts.add_server_key(host_key)
+ self.ts.set_gss_host(self.realm.hostname)
+ try:
+ self.ts.load_server_moduli()
+ except:
+ print("(Failed to load moduli -- gex will be unsupported.)")
+ server = NullServer()
+ self.ts.start_server(self.event, server)
+
+ def _test_gsskex_and_auth(self, gss_host, rekey=False):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
+ Diffie-Hellman Key Exchange and user authentication with the GSS-API
+ context created during key exchange.
+ """
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add(
+ f"[{self.hostname}]:{self.port}", "ssh-rsa", public_host_key
+ )
+ self.tc.connect(
+ self.hostname,
+ self.port,
+ username=self.username,
+ gss_auth=True,
+ gss_kex=True,
+ gss_host=gss_host,
+ )
+
+ self.event.wait(1.0)
+ self.assert_(self.event.is_set())
+ self.assert_(self.ts.is_active())
+ self.assertEquals(self.username, self.ts.get_username())
+ self.assertEquals(True, self.ts.is_authenticated())
+ self.assertEquals(True, self.tc.get_transport().gss_kex_used)
+
+ stdin, stdout, stderr = self.tc.exec_command("yes")
+ schan = self.ts.accept(1.0)
+ if rekey:
+ self.tc.get_transport().renegotiate_keys()
+
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
+ schan.close()
+
+ self.assertEquals("Hello there.\n", stdout.readline())
+ self.assertEquals("", stdout.readline())
+ self.assertEquals("This is on stderr.\n", stderr.readline())
+ self.assertEquals("", stderr.readline())
+
+ stdin.close()
+ stdout.close()
+ stderr.close()
+
+ def test_gsskex_and_auth(self):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
+ Diffie-Hellman Key Exchange and user authentication with the GSS-API
+ context created during key exchange.
+ """
+ self._test_gsskex_and_auth(gss_host=None)
+
+ # To be investigated, see https://github.com/paramiko/paramiko/issues/1312
+ @unittest.expectedFailure
+ def test_gsskex_and_auth_rekey(self):
+ """
+ Verify that Paramiko can rekey.
+ """
+ self._test_gsskex_and_auth(gss_host=None, rekey=True)