From 2e2851dc13d73352530dd4495c7e05603b2e520d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 10 Apr 2024 23:38:38 +0200 Subject: Adding upstream version 2.1.2~dev0+20240219. Signed-off-by: Daniel Baumann --- deluge/tests/test_rpcserver.py | 108 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 deluge/tests/test_rpcserver.py (limited to 'deluge/tests/test_rpcserver.py') diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py new file mode 100644 index 0000000..77c9f1e --- /dev/null +++ b/deluge/tests/test_rpcserver.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2013 Bro +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import deluge.component as component +import deluge.error +from deluge.common import get_localhost_auth +from deluge.conftest import BaseTestCase +from deluge.core import rpcserver +from deluge.core.authmanager import AuthManager +from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer +from deluge.log import setup_logger + +setup_logger('none') + + +class DelugeRPCProtocolTester(DelugeRPCProtocol): + messages = [] + + def transfer_message(self, data): + self.messages.append(data) + + +class TestRPCServer(BaseTestCase): + def set_up(self): + self.rpcserver = RPCServer(listen=False) + self.rpcserver.factory.protocol = DelugeRPCProtocolTester + self.factory = self.rpcserver.factory + self.session_id = '0' + self.request_id = 11 + self.protocol = self.rpcserver.factory.protocol() + self.protocol.factory = self.factory + self.protocol.transport = self.protocol + self.factory.session_protocols[self.session_id] = self.protocol + self.factory.authorized_sessions[self.session_id] = None + self.factory.interested_events[self.session_id] = ['TorrentFolderRenamedEvent'] + self.protocol.sessionno = self.session_id + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + + return component.shutdown().addCallback(on_shutdown) + + def test_emit_event_for_session_id(self): + torrent_id = '12' + from deluge.event import TorrentFolderRenamedEvent + + data = [torrent_id, 'new name', 'old name'] + e = TorrentFolderRenamedEvent(*data) + self.rpcserver.emit_event_for_session_id(self.session_id, e) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_EVENT, str(msg) + assert msg[1] == 'TorrentFolderRenamedEvent', str(msg) + assert msg[2] == data, str(msg) + + def test_invalid_client_login(self): + self.protocol.dispatch(self.request_id, 'daemon.login', [1], {}) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + + def test_valid_client_login(self): + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch( + self.request_id, 'daemon.login', auth, {'client_version': 'Test'} + ) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) + assert msg[1] == self.request_id, str(msg) + assert msg[2] == rpcserver.AUTH_LEVEL_ADMIN, str(msg) + + def test_client_login_error(self): + # This test causes error log prints while running the test... + self.protocol.transport = None # This should cause AttributeError + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch( + self.request_id, 'daemon.login', auth, {'client_version': 'Test'} + ) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + assert msg[2] == 'WrappedException' + assert msg[3][1] == 'AttributeError' + + def test_client_invalid_method_call(self): + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch(self.request_id, 'invalid_function', auth, {}) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + assert msg[2] == 'WrappedException' + assert msg[3][1] == 'AttributeError' + + def test_daemon_info(self): + self.protocol.dispatch(self.request_id, 'daemon.info', [], {}) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) + assert msg[1] == self.request_id, str(msg) + assert msg[2] == deluge.common.get_version(), str(msg) -- cgit v1.2.3