diff options
Diffstat (limited to 'tests/test_kex_gss.py')
-rw-r--r-- | tests/test_kex_gss.py | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py new file mode 100644 index 0000000..a81b195 --- /dev/null +++ b/tests/test_kex_gss.py @@ -0,0 +1,154 @@ +# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> +# Copyright (C) 2013-2014 science + computing ag +# Author: Sebastian Deiss <sebastian.deiss@t-online.de> +# +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user +authentication +""" + + +import socket +import threading +import unittest + +import paramiko + +from ._util import needs_gssapi, KerberosTestCase, update_env, _support + + +class NullServer(paramiko.ServerInterface): + def get_allowed_auths(self, username): + return "gssapi-keyex" + + def check_auth_gssapi_keyex( + self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None + ): + if gss_authenticated == paramiko.AUTH_SUCCESSFUL: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def enable_auth_gssapi(self): + UseGSSAPI = True + return UseGSSAPI + + def check_channel_request(self, kind, chanid): + return paramiko.OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != b"yes": + return False + return True + + +@needs_gssapi +class GSSKexTest(KerberosTestCase): + def setUp(self): + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) + self.sockl = socket.socket() + self.sockl.bind((self.realm.hostname, 0)) + self.sockl.listen(1) + self.addr, self.port = self.sockl.getsockname() + self.event = threading.Event() + update_env(self, self.realm.env) + thread = threading.Thread(target=self._run) + thread.start() + + def tearDown(self): + for attr in "tc ts socks sockl".split(): + if hasattr(self, attr): + getattr(self, attr).close() + + def _run(self): + self.socks, addr = self.sockl.accept() + self.ts = paramiko.Transport(self.socks, gss_kex=True) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) + self.ts.add_server_key(host_key) + self.ts.set_gss_host(self.realm.hostname) + try: + self.ts.load_server_moduli() + except: + print("(Failed to load moduli -- gex will be unsupported.)") + server = NullServer() + self.ts.start_server(self.event, server) + + def _test_gsskex_and_auth(self, gss_host, rekey=False): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated + Diffie-Hellman Key Exchange and user authentication with the GSS-API + context created during key exchange. + """ + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add( + f"[{self.hostname}]:{self.port}", "ssh-rsa", public_host_key + ) + self.tc.connect( + self.hostname, + self.port, + username=self.username, + gss_auth=True, + gss_kex=True, + gss_host=gss_host, + ) + + self.event.wait(1.0) + self.assert_(self.event.is_set()) + self.assert_(self.ts.is_active()) + self.assertEquals(self.username, self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + self.assertEquals(True, self.tc.get_transport().gss_kex_used) + + stdin, stdout, stderr = self.tc.exec_command("yes") + schan = self.ts.accept(1.0) + if rekey: + self.tc.get_transport().renegotiate_keys() + + schan.send("Hello there.\n") + schan.send_stderr("This is on stderr.\n") + schan.close() + + self.assertEquals("Hello there.\n", stdout.readline()) + self.assertEquals("", stdout.readline()) + self.assertEquals("This is on stderr.\n", stderr.readline()) + self.assertEquals("", stderr.readline()) + + stdin.close() + stdout.close() + stderr.close() + + def test_gsskex_and_auth(self): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated + Diffie-Hellman Key Exchange and user authentication with the GSS-API + context created during key exchange. + """ + self._test_gsskex_and_auth(gss_host=None) + + # To be investigated, see https://github.com/paramiko/paramiko/issues/1312 + @unittest.expectedFailure + def test_gsskex_and_auth_rekey(self): + """ + Verify that Paramiko can rekey. + """ + self._test_gsskex_and_auth(gss_host=None, rekey=True) |