summaryrefslogtreecommitdiffstats
path: root/deluge/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/tests/test_client.py')
-rw-r--r--deluge/tests/test_client.py204
1 files changed, 204 insertions, 0 deletions
diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py
new file mode 100644
index 0000000..c89ad53
--- /dev/null
+++ b/deluge/tests/test_client.py
@@ -0,0 +1,204 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from twisted.internet import defer
+
+import deluge.component as component
+from deluge import error
+from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth, windows_check
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN
+from deluge.ui.client import Client, DaemonSSLProxy, client
+
+from .basetest import BaseTestCase
+from .daemon_base import DaemonBase
+
+
+class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
+ def authenticate(self, username, password):
+ self.login_deferred = defer.Deferred()
+ d = self.call('daemon.login', username, password)
+ d.addCallback(self.__on_login, username)
+ d.addErrback(self.__on_login_fail)
+ return self.login_deferred
+
+ def __on_login(self, result, username):
+ self.login_deferred.callback(result)
+
+ def __on_login_fail(self, result):
+ self.login_deferred.errback(result)
+
+
+class NoVersionSendingClient(Client):
+ def connect(
+ self,
+ host='127.0.0.1',
+ port=58846,
+ username='',
+ password='',
+ skip_authentication=False,
+ ):
+ self._daemon_proxy = NoVersionSendingDaemonSSLProxy()
+ self._daemon_proxy.set_disconnect_callback(self.__on_disconnect)
+
+ d = self._daemon_proxy.connect(host, port)
+
+ def on_connect_fail(reason):
+ self.disconnect()
+ return reason
+
+ def on_authenticate(result, daemon_info):
+ return result
+
+ def on_authenticate_fail(reason):
+ return reason
+
+ def on_connected(daemon_version):
+ return daemon_version
+
+ def authenticate(daemon_version, username, password):
+ d = self._daemon_proxy.authenticate(username, password)
+ d.addCallback(on_authenticate, daemon_version)
+ d.addErrback(on_authenticate_fail)
+ return d
+
+ d.addCallback(on_connected)
+ d.addErrback(on_connect_fail)
+ if not skip_authentication:
+ d.addCallback(authenticate, username, password)
+ return d
+
+ def __on_disconnect(self):
+ if self.disconnect_callback:
+ self.disconnect_callback()
+
+
+class ClientTestCase(BaseTestCase, DaemonBase):
+
+ if windows_check():
+ skip = 'windows cant start_core not enough arguments for format string'
+
+ def set_up(self):
+ d = self.common_set_up()
+ d.addCallback(self.start_core)
+ d.addErrback(self.terminate_core)
+ return d
+
+ def tear_down(self):
+ d = component.shutdown()
+ d.addCallback(self.terminate_core)
+ return d
+
+ def test_connect_no_credentials(self):
+ d = client.connect('localhost', self.listen_port, username='', password='')
+
+ def on_connect(result):
+ self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
+ self.addCleanup(client.disconnect)
+ return result
+
+ d.addCallbacks(on_connect, self.fail)
+ return d
+
+ def test_connect_localclient(self):
+ username, password = get_localhost_auth()
+ d = client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ def on_connect(result):
+ self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
+ self.addCleanup(client.disconnect)
+ return result
+
+ d.addCallbacks(on_connect, self.fail)
+ return d
+
+ def test_connect_bad_password(self):
+ username, password = get_localhost_auth()
+ d = client.connect(
+ 'localhost', self.listen_port, username=username, password=password + '1'
+ )
+
+ def on_failure(failure):
+ self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
+ self.assertEqual(failure.value.message, 'Password does not match')
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ def test_connect_invalid_user(self):
+ username, password = get_localhost_auth()
+ d = client.connect('localhost', self.listen_port, username='invalid-user')
+
+ def on_failure(failure):
+ self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
+ self.assertEqual(failure.value.message, 'Username does not exist')
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ def test_connect_without_password(self):
+ username, password = get_localhost_auth()
+ d = client.connect('localhost', self.listen_port, username=username)
+
+ def on_failure(failure):
+ self.assertEqual(
+ failure.trap(error.AuthenticationRequired), error.AuthenticationRequired
+ )
+ self.assertEqual(failure.value.username, username)
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ @defer.inlineCallbacks
+ def test_connect_with_password(self):
+ username, password = get_localhost_auth()
+ yield client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+ yield client.core.create_account('testuser', 'testpw', 'DEFAULT')
+ yield client.disconnect()
+ ret = yield client.connect(
+ 'localhost', self.listen_port, username='testuser', password='testpw'
+ )
+ self.assertEqual(ret, AUTH_LEVEL_NORMAL)
+ yield
+
+ @defer.inlineCallbacks
+ def test_invalid_rpc_method_call(self):
+ yield client.connect('localhost', self.listen_port, username='', password='')
+ d = client.core.invalid_method()
+
+ def on_failure(failure):
+ self.assertEqual(
+ failure.trap(error.WrappedException), error.WrappedException
+ )
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ yield d
+
+ def test_connect_without_sending_client_version_fails(self):
+ username, password = get_localhost_auth()
+ no_version_sending_client = NoVersionSendingClient()
+ d = no_version_sending_client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ def on_failure(failure):
+ self.assertEqual(
+ failure.trap(error.IncompatibleClient), error.IncompatibleClient
+ )
+ self.addCleanup(no_version_sending_client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d