# # Copyright (C) 2013 Bro # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # import deluge.component as component import deluge.error from deluge.common import get_localhost_auth from deluge.conftest import BaseTestCase from deluge.core import rpcserver from deluge.core.authmanager import AuthManager from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer from deluge.log import setup_logger setup_logger('none') class DelugeRPCProtocolTester(DelugeRPCProtocol): messages = [] def transfer_message(self, data): self.messages.append(data) class TestRPCServer(BaseTestCase): def set_up(self): self.rpcserver = RPCServer(listen=False) self.rpcserver.factory.protocol = DelugeRPCProtocolTester self.factory = self.rpcserver.factory self.session_id = '0' self.request_id = 11 self.protocol = self.rpcserver.factory.protocol() self.protocol.factory = self.factory self.protocol.transport = self.protocol self.factory.session_protocols[self.session_id] = self.protocol self.factory.authorized_sessions[self.session_id] = None self.factory.interested_events[self.session_id] = ['TorrentFolderRenamedEvent'] self.protocol.sessionno = self.session_id return component.start() def tear_down(self): def on_shutdown(result): del self.rpcserver return component.shutdown().addCallback(on_shutdown) def test_emit_event_for_session_id(self): torrent_id = '12' from deluge.event import TorrentFolderRenamedEvent data = [torrent_id, 'new name', 'old name'] e = TorrentFolderRenamedEvent(*data) self.rpcserver.emit_event_for_session_id(self.session_id, e) msg = self.protocol.messages.pop() assert msg[0] == rpcserver.RPC_EVENT, str(msg) assert msg[1] == 'TorrentFolderRenamedEvent', str(msg) assert msg[2] == data, str(msg) def test_invalid_client_login(self): self.protocol.dispatch(self.request_id, 'daemon.login', [1], {}) msg = self.protocol.messages.pop() assert msg[0] == rpcserver.RPC_ERROR assert msg[1] == self.request_id def test_valid_client_login(self): self.authmanager = AuthManager() auth = get_localhost_auth() self.protocol.dispatch( self.request_id, 'daemon.login', auth, {'client_version': 'Test'} ) msg = self.protocol.messages.pop() assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) assert msg[1] == self.request_id, str(msg) assert msg[2] == rpcserver.AUTH_LEVEL_ADMIN, str(msg) def test_client_login_error(self): # This test causes error log prints while running the test... self.protocol.transport = None # This should cause AttributeError self.authmanager = AuthManager() auth = get_localhost_auth() self.protocol.dispatch( self.request_id, 'daemon.login', auth, {'client_version': 'Test'} ) msg = self.protocol.messages.pop() assert msg[0] == rpcserver.RPC_ERROR assert msg[1] == self.request_id assert msg[2] == 'WrappedException' assert msg[3][1] == 'AttributeError' def test_client_invalid_method_call(self): self.authmanager = AuthManager() auth = get_localhost_auth() self.protocol.dispatch(self.request_id, 'invalid_function', auth, {}) msg = self.protocol.messages.pop() assert msg[0] == rpcserver.RPC_ERROR assert msg[1] == self.request_id assert msg[2] == 'WrappedException' assert msg[3][1] == 'AttributeError' def test_daemon_info(self): self.protocol.dispatch(self.request_id, 'daemon.info', [], {}) msg = self.protocol.messages.pop() assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) assert msg[1] == self.request_id, str(msg) assert msg[2] == deluge.common.get_version(), str(msg)