diff options
Diffstat (limited to 'deluge/tests')
47 files changed, 5825 insertions, 0 deletions
diff --git a/deluge/tests/__init__.py b/deluge/tests/__init__.py new file mode 100644 index 0000000..d3bf10d --- /dev/null +++ b/deluge/tests/__init__.py @@ -0,0 +1,19 @@ +# Increase open file descriptor limit to allow tests to run +# without getting error: what(): epoll: Too many open files +from __future__ import print_function, unicode_literals + +from deluge.i18n import setup_translation + +try: + import resource +except ImportError: # Does not exist on Windows + pass +else: + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536)) + except (ValueError, resource.error) as ex: + error = 'Failed to raise file descriptor limit: %s' % ex + # print(error) + +# Initialize gettext +setup_translation() diff --git a/deluge/tests/basetest.py b/deluge/tests/basetest.py new file mode 100644 index 0000000..11ca18e --- /dev/null +++ b/deluge/tests/basetest.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import warnings + +from twisted.internet.defer import maybeDeferred +from twisted.trial import unittest + +import deluge.component as component + + +class BaseTestCase(unittest.TestCase): + """This is the base class that should be used for all test classes + that create classes that inherit from deluge.component.Component. It + ensures that the component registry has been cleaned up when tests + have finished. + + """ + + def setUp(self): # NOQA: N803 + + if len(component._ComponentRegistry.components) != 0: + warnings.warn( + 'The component._ComponentRegistry.components is not empty on test setup.\n' + 'This is probably caused by another test that did not clean up after finishing!: %s' + % component._ComponentRegistry.components + ) + d = maybeDeferred(self.set_up) + + def on_setup_error(error): + warnings.warn('Error caught in test setup!\n%s' % error.getTraceback()) + self.fail() + + return d.addErrback(on_setup_error) + + def tearDown(self): # NOQA: N803 + d = maybeDeferred(self.tear_down) + + def on_teardown_failed(error): + warnings.warn('Error caught in test teardown!\n%s' % error.getTraceback()) + self.fail() + + def on_teardown_complete(result): + component._ComponentRegistry.components.clear() + component._ComponentRegistry.dependents.clear() + + return d.addCallbacks(on_teardown_complete, on_teardown_failed) + + def set_up(self): + pass + + def tear_down(self): + pass diff --git a/deluge/tests/common.py b/deluge/tests/common.py new file mode 100644 index 0000000..e92cc0f --- /dev/null +++ b/deluge/tests/common.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import os +import sys +import tempfile +import traceback + +from twisted.internet import defer, protocol, reactor +from twisted.internet.defer import Deferred +from twisted.internet.error import CannotListenError +from twisted.trial import unittest + +import deluge.configmanager +import deluge.core.preferencesmanager +import deluge.log +from deluge.error import DelugeError + +# This sets log level to critical, so use log.critical() to debug while running unit tests +deluge.log.setup_logger('none') + + +def disable_new_release_check(): + deluge.core.preferencesmanager.DEFAULT_PREFS['new_release_check'] = False + + +def set_tmp_config_dir(): + config_directory = tempfile.mkdtemp() + deluge.configmanager.set_config_dir(config_directory) + return config_directory + + +def setup_test_logger(level='info', prefix='deluge'): + deluge.log.setup_logger(level, filename='%s.log' % prefix, twisted_observer=False) + + +def get_test_data_file(filename): + return os.path.join(os.path.join(os.path.dirname(__file__), 'data'), filename) + + +def todo_test(caller): + # If we are using the delugereporter we can set todo mark on the test + # Without the delugereporter the todo would print a stack trace, so in + # that case we rely only on skipTest + if os.environ.get('DELUGE_REPORTER', None): + getattr(caller, caller._testMethodName).__func__.todo = 'To be fixed' + + filename = os.path.basename(traceback.extract_stack(None, 2)[0][0]) + funcname = traceback.extract_stack(None, 2)[0][2] + raise unittest.SkipTest('TODO: %s:%s' % (filename, funcname)) + + +def add_watchdog(deferred, timeout=0.05, message=None): + def callback(value): + if not watchdog.called and not watchdog.cancelled: + watchdog.cancel() + if not deferred.called: + if message: + print(message) + deferred.cancel() + return value + + deferred.addBoth(callback) + watchdog = reactor.callLater(timeout, defer.timeout, deferred) + return watchdog + + +class ReactorOverride(object): + """Class used to patch reactor while running unit tests + to avoid starting and stopping the twisted reactor + """ + + def __getattr__(self, attr): + if attr == 'run': + return self._run + if attr == 'stop': + return self._stop + return getattr(reactor, attr) + + def _run(self): + pass + + def _stop(self): + pass + + def addReader(self, arg): # NOQA: N802 + pass + + +class ProcessOutputHandler(protocol.ProcessProtocol): + def __init__( + self, script, callbacks, logfile=None, print_stdout=True, print_stderr=True + ): + """Executes a script and handle the process' output to stdout and stderr. + + Args: + script (str): The script to execute. + callbacks (list): Callbacks to trigger if the expected output if found. + logfile (str, optional): Filename to wrote the process' output. + print_stderr (bool): Print the process' stderr output to stdout. + print_stdout (bool): Print the process' stdout output to stdout. + + """ + self.callbacks = callbacks + self.script = script + self.log_output = '' + self.stderr_out = '' + self.logfile = logfile + self.print_stdout = print_stdout + self.print_stderr = print_stderr + self.quit_d = None + self.killed = False + self.watchdogs = [] + + def connectionMade(self): # NOQA: N802 + self.transport.write(self.script) + self.transport.closeStdin() + + def outConnectionLost(self): # NOQA: N802 + if not self.logfile: + return + with open(self.logfile, 'w') as f: + f.write(self.log_output) + + def kill(self): + """Kill the running process. + + Returns: + Deferred: A deferred that is triggered when the process has quit. + + """ + if self.killed: + return + self.killed = True + self._kill_watchdogs() + self.quit_d = Deferred() + self.transport.signalProcess('INT') + return self.quit_d + + def _kill_watchdogs(self): + """"Cancel all watchdogs""" + for w in self.watchdogs: + if not w.called and not w.cancelled: + w.cancel() + + def processEnded(self, status): # NOQA: N802 + self.transport.loseConnection() + if self.quit_d is None: + return + if status.value.exitCode == 0: + self.quit_d.callback(True) + else: + self.quit_d.errback(status) + + def check_callbacks(self, data, cb_type='stdout'): + ret = False + for c in self.callbacks: + if cb_type not in c['types'] or c['deferred'].called: + continue + for trigger in c['triggers']: + if trigger['expr'] in data: + ret = True + if 'cb' in trigger: + trigger['cb'](self, c['deferred'], data, self.log_output) + elif 'value' not in trigger: + raise Exception('Trigger must specify either "cb" or "value"') + else: + val = trigger['value'](self, data, self.log_output) + if trigger.get('type', 'callback') == 'errback': + c['deferred'].errback(val) + else: + c['deferred'].callback(val) + return ret + + def outReceived(self, data): # NOQA: N802 + """Process output from stdout""" + data = data.decode('utf8') + self.log_output += data + if self.check_callbacks(data): + pass + elif '[ERROR' in data: + if not self.print_stdout: + return + print(data, end=' ') + + def errReceived(self, data): # NOQA: N802 + """Process output from stderr""" + data = data.decode('utf8') + self.log_output += data + self.stderr_out += data + self.check_callbacks(data, cb_type='stderr') + if not self.print_stderr: + return + data = '\n%s' % data.strip() + prefixed = data.replace('\n', '\nSTDERR: ') + print('\n%s' % prefixed) + + +def start_core( + listen_port=58846, + logfile=None, + timeout=10, + timeout_msg=None, + custom_script='', + print_stdout=True, + print_stderr=True, + extra_callbacks=None, +): + """Start the deluge core as a daemon. + + Args: + listen_port (int, optional): The port the daemon listens for client connections. + logfile (str, optional): Logfile name to write the output from the process. + timeout (int): If none of the callbacks have been triggered before the imeout, the process is killed. + timeout_msg (str): The message to print when the timeout expires. + custom_script (str): Extra python code to insert into the daemon process script. + print_stderr (bool): If the output from the process' stderr should be printed to stdout. + print_stdout (bool): If the output from the process' stdout should be printed to stdout. + extra_callbacks (list): A list of dictionaries specifying extra callbacks. + + Returns: + tuple(Deferred, ProcessOutputHandler): + + The Deferred is fired when the core callback is triggered either after the default + output triggers are matched (daemon successfully started, or failed to start), + or upon timeout expiry. The ProcessOutputHandler is the handler for the deluged process. + + """ + config_directory = set_tmp_config_dir() + daemon_script = """ +import sys +import deluge.core.daemon_entry + +from deluge.common import windows_check + +if windows_check(): + sys.argv.extend(['-c', '%(dir)s', '-L', 'info', '-p', '%(port)d']) +else: + sys.argv.extend(['-d', '-c', '%(dir)s', '-L', 'info', '-p', '%(port)d']) + +try: + daemon = deluge.core.daemon_entry.start_daemon(skip_start=True) + %(script)s + daemon.start() +except Exception: + import traceback + sys.stderr.write('Exception raised:\\n %%s' %% traceback.format_exc()) +""" % { + 'dir': config_directory, + 'port': listen_port, + 'script': custom_script, + } + + callbacks = [] + default_core_cb = {'deferred': Deferred(), 'types': 'stdout'} + if timeout: + default_core_cb['timeout'] = timeout + + # Specify the triggers for daemon log output + default_core_cb['triggers'] = [ + {'expr': 'Finished loading ', 'value': lambda reader, data, data_all: reader}, + { + 'expr': 'Could not listen on localhost:%d' % (listen_port), + 'type': 'errback', # Error from libtorrent + 'value': lambda reader, data, data_all: CannotListenError( + 'localhost', + listen_port, + 'Could not start deluge test client!\n%s' % data, + ), + }, + { + 'expr': 'Traceback', + 'type': 'errback', + 'value': lambda reader, data, data_all: DelugeError( + 'Traceback found when starting daemon:\n%s' % data + ), + }, + ] + + callbacks.append(default_core_cb) + if extra_callbacks: + callbacks.extend(extra_callbacks) + + process_protocol = start_process( + daemon_script, callbacks, logfile, print_stdout, print_stderr + ) + return default_core_cb['deferred'], process_protocol + + +def start_process( + script, callbacks, logfile=None, print_stdout=True, print_stderr=True +): + """ + Starts an external python process which executes the given script. + + Args: + script (str): The content of the script to execute. + callbacks (list): list of dictionaries specifying callbacks. + logfile (str, optional): Logfile name to write the output from the process. + print_stderr (bool): If the output from the process' stderr should be printed to stdout. + print_stdout (bool): If the output from the process' stdout should be printed to stdout. + + Returns: + ProcessOutputHandler: The handler for the process's output. + + Each entry in the callbacks list is a dictionary with the following keys: + * "deferred": The deferred to be called when matched. + * "types": The output this callback should be matched against. + Possible values: ["stdout", "stderr"] + * "timeout" (optional): A timeout in seconds for the deferred. + * "triggers": A list of dictionaries, each specifying specifying a trigger: + * "expr": A string to match against the log output. + * "value": A function to produce the result to be passed to the callback. + * "type" (optional): A string that specifies wether to trigger a regular callback or errback. + + """ + cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + process_protocol = ProcessOutputHandler( + script.encode('utf8'), callbacks, logfile, print_stdout, print_stderr + ) + + # Add timeouts to deferreds + for c in callbacks: + if 'timeout' in c: + w = add_watchdog( + c['deferred'], timeout=c['timeout'], message=c.get('timeout_msg', None) + ) + process_protocol.watchdogs.append(w) + + reactor.spawnProcess( + process_protocol, sys.executable, args=[sys.executable], path=cwd + ) + return process_protocol diff --git a/deluge/tests/common_web.py b/deluge/tests/common_web.py new file mode 100644 index 0000000..706eb8d --- /dev/null +++ b/deluge/tests/common_web.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import deluge.common +import deluge.component as component +import deluge.ui.web.auth +import deluge.ui.web.server +from deluge import configmanager +from deluge.ui.web.server import DelugeWeb + +from .basetest import BaseTestCase +from .common import ReactorOverride +from .daemon_base import DaemonBase + + +class WebServerTestBase(BaseTestCase, DaemonBase): + """ + Base class for tests that need a running webapi + + """ + + def set_up(self): + self.host_id = None + deluge.ui.web.server.reactor = ReactorOverride() + d = self.common_set_up() + d.addCallback(self.start_core) + d.addCallback(self.start_webapi) + return d + + def start_webapi(self, arg): + self.webserver_listen_port = 8999 + + config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy() + config_defaults['port'] = self.webserver_listen_port + self.config = configmanager.ConfigManager('web.conf', config_defaults) + + self.deluge_web = DelugeWeb(daemon=False) + + host = list(self.deluge_web.web_api.hostlist.config['hosts'][0]) + host[2] = self.listen_port + self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host) + self.host_id = host[0] + self.deluge_web.start() + + def tear_down(self): + d = component.shutdown() + d.addCallback(self.terminate_core) + return d + + +class WebServerMockBase(object): + """ + Class with utility functions for mocking with tests using the webserver + + """ + + def mock_authentication_ignore(self, auth): + def check_request(request, method=None, level=None): + pass + + self.patch(auth, 'check_request', check_request) diff --git a/deluge/tests/daemon_base.py b/deluge/tests/daemon_base.py new file mode 100644 index 0000000..eda2193 --- /dev/null +++ b/deluge/tests/daemon_base.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import os.path + +import pytest +from twisted.internet import defer +from twisted.internet.error import CannotListenError + +import deluge.component as component +from deluge.common import windows_check + +from . import common + + +class DaemonBase(object): + + if windows_check(): + skip = 'windows cant start_core not enough arguments for format string' + + def common_set_up(self): + common.set_tmp_config_dir() + self.listen_port = 58900 + self.core = None + return component.start() + + def terminate_core(self, *args): + if args[0] is not None: + if hasattr(args[0], 'getTraceback'): + print('terminate_core: Errback Exception: %s' % args[0].getTraceback()) + + if not self.core.killed: + d = self.core.kill() + return d + + @defer.inlineCallbacks + def start_core( + self, + arg, + custom_script='', + logfile='', + print_stdout=True, + print_stderr=True, + timeout=5, + port_range=10, + extra_callbacks=None, + ): + if logfile == '': + logfile = 'daemon_%s.log' % self.id() + + # We are running py.test + if hasattr(pytest, 'config'): + # Put log file in the py.test --basetemp argument + basetemp = pytest.config.option.basetemp + if basetemp: + if not os.path.exists(basetemp): + os.makedirs(basetemp) + logfile = os.path.join(basetemp, logfile) + + for dummy in range(port_range): + try: + d, self.core = common.start_core( + listen_port=self.listen_port, + logfile=logfile, + timeout=timeout, + timeout_msg='Timeout!', + custom_script=custom_script, + print_stdout=print_stdout, + print_stderr=print_stderr, + extra_callbacks=extra_callbacks, + ) + yield d + except CannotListenError as ex: + exception_error = ex + self.listen_port += 1 + except (KeyboardInterrupt, SystemExit): + raise + else: + return + raise exception_error diff --git a/deluge/tests/data/deluge.png b/deluge/tests/data/deluge.png Binary files differnew file mode 100644 index 0000000..e39cd0c --- /dev/null +++ b/deluge/tests/data/deluge.png diff --git a/deluge/tests/data/dir_with_6_files.torrent b/deluge/tests/data/dir_with_6_files.torrent Binary files differnew file mode 100644 index 0000000..2c6b5fb --- /dev/null +++ b/deluge/tests/data/dir_with_6_files.torrent diff --git a/deluge/tests/data/filehash_field.torrent b/deluge/tests/data/filehash_field.torrent new file mode 100644 index 0000000..99e41f0 --- /dev/null +++ b/deluge/tests/data/filehash_field.torrent @@ -0,0 +1,2 @@ +d13:creation datei1476342472e4:infod5:filesld4:ed2k16:2M2&XE 18:filehash20:f4^S96P՝R6:lengthi54e4:pathl8:tull.txteed4:ed2k16:_G
L@8:filehash20:vXd/n136:lengthi54e4:pathl56:還在一個人無聊嗎~還不趕緊上來聊天美.txteee4:name16:torrent_filehash12:piece lengthi32768e6:pieces20:G@g\&\
fB +ee
\ No newline at end of file diff --git a/deluge/tests/data/google.ico b/deluge/tests/data/google.ico Binary files differnew file mode 100644 index 0000000..82339b3 --- /dev/null +++ b/deluge/tests/data/google.ico diff --git a/deluge/tests/data/seo.ico b/deluge/tests/data/seo.ico Binary files differnew file mode 100644 index 0000000..841e528 --- /dev/null +++ b/deluge/tests/data/seo.ico diff --git a/deluge/tests/data/test.torrent b/deluge/tests/data/test.torrent new file mode 100644 index 0000000..847ec58 --- /dev/null +++ b/deluge/tests/data/test.torrent @@ -0,0 +1,2 @@ +d8:announce40:http://tracker.aelitis.com:6969/announce13:announce-listll40:http://tracker.aelitis.com:6969/announceel41:http://tracker.aelitis.com:16969/announceee18:azureus_propertiesd17:dht_backup_enablei0ee7:comment34:provided by http://getazureus.com/13:comment.utf-834:provided by http://getazureus.com/10:created by19:Azureus/2.5.0.3_CVS13:creation datei1169429806e4:infod4:ed2k16:>pl]K^\;;e6:lengthi307949e4:name22:azcvsupdater_2.6.2.jar10:name.utf-822:azcvsupdater_2.6.2.jar12:piece lengthi32768e6:pieces200:B'bsu97u<w\fԛ}:qv"7y!
lv{/"PD8-MFx+S`%% +;ϐ/F>gA2|ڂb#wIfWswKrGWiد#uE?:ub(oj
^AJ?&7:privatei0e4:sha120:2ζ"Կ^Khŷee
\ No newline at end of file diff --git a/deluge/tests/data/test_torrent.file.torrent b/deluge/tests/data/test_torrent.file.torrent new file mode 100644 index 0000000..fca14ca --- /dev/null +++ b/deluge/tests/data/test_torrent.file.torrent @@ -0,0 +1,2 @@ +d7:comment17:Test torrent file10:created by11:Deluge Team13:creation datei1411826665e8:encoding5:UTF-84:infod6:lengthi512000e4:name17:test_torrent.file12:piece lengthi32768e6:pieces320:$2Tj
>hU.--~ޔBzuEB1@ͥ/K"zF0֣[asV1B^Wp-SF`M9)},4niW
jQI̗(,t؋chi*M}^WS7 h:-beTXa3m|J"]0$}l@L V,4˫zMLģJ* +\AP&I9}20֎H:_8<V2JYb2'2h0\*_j7:privatei0eee
\ No newline at end of file diff --git a/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent Binary files differnew file mode 100644 index 0000000..b55c9ae --- /dev/null +++ b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent diff --git a/deluge/tests/data/unicode_file.torrent b/deluge/tests/data/unicode_file.torrent new file mode 100644 index 0000000..41db239 --- /dev/null +++ b/deluge/tests/data/unicode_file.torrent @@ -0,0 +1 @@ +d13:creation datei1540200743e8:encoding5:UTF-84:infod6:lengthi0e4:name35:সুকুমার রায়.mkv12:piece lengthi32768e6:pieces0:7:privatei0eee diff --git a/deluge/tests/data/unicode_filenames.torrent b/deluge/tests/data/unicode_filenames.torrent Binary files differnew file mode 100644 index 0000000..e34f055 --- /dev/null +++ b/deluge/tests/data/unicode_filenames.torrent diff --git a/deluge/tests/test_alertmanager.py b/deluge/tests/test_alertmanager.py new file mode 100644 index 0000000..f197882 --- /dev/null +++ b/deluge/tests/test_alertmanager.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import deluge.component as component +from deluge.core.core import Core + +from .basetest import BaseTestCase + + +class AlertManagerTestCase(BaseTestCase): + def set_up(self): + self.core = Core() + self.core.config.config['lsd'] = False + self.am = component.get('AlertManager') + return component.start(['AlertManager']) + + def tear_down(self): + return component.shutdown() + + def test_register_handler(self): + def handler(alert): + return + + self.am.register_handler('dummy_alert', handler) + self.assertEqual(self.am.handlers['dummy_alert'], [handler]) + + def test_deregister_handler(self): + def handler(alert): + return + + self.am.register_handler('dummy_alert', handler) + self.am.deregister_handler(handler) + self.assertEqual(self.am.handlers['dummy_alert'], []) diff --git a/deluge/tests/test_authmanager.py b/deluge/tests/test_authmanager.py new file mode 100644 index 0000000..91e122f --- /dev/null +++ b/deluge/tests/test_authmanager.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import deluge.component as component +from deluge.common import get_localhost_auth +from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AuthManager + +from .basetest import BaseTestCase + + +class AuthManagerTestCase(BaseTestCase): + def set_up(self): + self.auth = AuthManager() + self.auth.start() + + def tear_down(self): + # We must ensure that the components in component registry are removed + return component.shutdown() + + def test_authorize(self): + self.assertEqual(self.auth.authorize(*get_localhost_auth()), AUTH_LEVEL_ADMIN) diff --git a/deluge/tests/test_bencode.py b/deluge/tests/test_bencode.py new file mode 100644 index 0000000..b49c21f --- /dev/null +++ b/deluge/tests/test_bencode.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +from __future__ import unicode_literals + +from twisted.trial import unittest + +from deluge import bencode + +from . import common + + +class BencodeTestCase(unittest.TestCase): + def test_bencode_unicode_metainfo(self): + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + metainfo = bencode.bdecode(_file.read())[b'info'] + bencode.bencode({b'info': metainfo}) + + def test_bencode_unicode_value(self): + self.assertEqual(bencode.bencode(b'abc'), b'3:abc') + self.assertEqual(bencode.bencode('abc'), b'3:abc') + + def test_bdecode(self): + self.assertEqual(bencode.bdecode(b'3:dEf'), b'dEf') + with self.assertRaises(bencode.BTFailure): + bencode.bdecode('dEf') + with self.assertRaises(bencode.BTFailure): + bencode.bdecode(b'dEf') + with self.assertRaises(bencode.BTFailure): + bencode.bdecode({'dEf': 123}) diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py new file mode 100644 index 0000000..c89ad53 --- /dev/null +++ b/deluge/tests/test_client.py @@ -0,0 +1,204 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from twisted.internet import defer + +import deluge.component as component +from deluge import error +from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth, windows_check +from deluge.core.authmanager import AUTH_LEVEL_ADMIN +from deluge.ui.client import Client, DaemonSSLProxy, client + +from .basetest import BaseTestCase +from .daemon_base import DaemonBase + + +class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy): + def authenticate(self, username, password): + self.login_deferred = defer.Deferred() + d = self.call('daemon.login', username, password) + d.addCallback(self.__on_login, username) + d.addErrback(self.__on_login_fail) + return self.login_deferred + + def __on_login(self, result, username): + self.login_deferred.callback(result) + + def __on_login_fail(self, result): + self.login_deferred.errback(result) + + +class NoVersionSendingClient(Client): + def connect( + self, + host='127.0.0.1', + port=58846, + username='', + password='', + skip_authentication=False, + ): + self._daemon_proxy = NoVersionSendingDaemonSSLProxy() + self._daemon_proxy.set_disconnect_callback(self.__on_disconnect) + + d = self._daemon_proxy.connect(host, port) + + def on_connect_fail(reason): + self.disconnect() + return reason + + def on_authenticate(result, daemon_info): + return result + + def on_authenticate_fail(reason): + return reason + + def on_connected(daemon_version): + return daemon_version + + def authenticate(daemon_version, username, password): + d = self._daemon_proxy.authenticate(username, password) + d.addCallback(on_authenticate, daemon_version) + d.addErrback(on_authenticate_fail) + return d + + d.addCallback(on_connected) + d.addErrback(on_connect_fail) + if not skip_authentication: + d.addCallback(authenticate, username, password) + return d + + def __on_disconnect(self): + if self.disconnect_callback: + self.disconnect_callback() + + +class ClientTestCase(BaseTestCase, DaemonBase): + + if windows_check(): + skip = 'windows cant start_core not enough arguments for format string' + + def set_up(self): + d = self.common_set_up() + d.addCallback(self.start_core) + d.addErrback(self.terminate_core) + return d + + def tear_down(self): + d = component.shutdown() + d.addCallback(self.terminate_core) + return d + + def test_connect_no_credentials(self): + d = client.connect('localhost', self.listen_port, username='', password='') + + def on_connect(result): + self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN) + self.addCleanup(client.disconnect) + return result + + d.addCallbacks(on_connect, self.fail) + return d + + def test_connect_localclient(self): + username, password = get_localhost_auth() + d = client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + + def on_connect(result): + self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN) + self.addCleanup(client.disconnect) + return result + + d.addCallbacks(on_connect, self.fail) + return d + + def test_connect_bad_password(self): + username, password = get_localhost_auth() + d = client.connect( + 'localhost', self.listen_port, username=username, password=password + '1' + ) + + def on_failure(failure): + self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError) + self.assertEqual(failure.value.message, 'Password does not match') + self.addCleanup(client.disconnect) + + d.addCallbacks(self.fail, on_failure) + return d + + def test_connect_invalid_user(self): + username, password = get_localhost_auth() + d = client.connect('localhost', self.listen_port, username='invalid-user') + + def on_failure(failure): + self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError) + self.assertEqual(failure.value.message, 'Username does not exist') + self.addCleanup(client.disconnect) + + d.addCallbacks(self.fail, on_failure) + return d + + def test_connect_without_password(self): + username, password = get_localhost_auth() + d = client.connect('localhost', self.listen_port, username=username) + + def on_failure(failure): + self.assertEqual( + failure.trap(error.AuthenticationRequired), error.AuthenticationRequired + ) + self.assertEqual(failure.value.username, username) + self.addCleanup(client.disconnect) + + d.addCallbacks(self.fail, on_failure) + return d + + @defer.inlineCallbacks + def test_connect_with_password(self): + username, password = get_localhost_auth() + yield client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + yield client.core.create_account('testuser', 'testpw', 'DEFAULT') + yield client.disconnect() + ret = yield client.connect( + 'localhost', self.listen_port, username='testuser', password='testpw' + ) + self.assertEqual(ret, AUTH_LEVEL_NORMAL) + yield + + @defer.inlineCallbacks + def test_invalid_rpc_method_call(self): + yield client.connect('localhost', self.listen_port, username='', password='') + d = client.core.invalid_method() + + def on_failure(failure): + self.assertEqual( + failure.trap(error.WrappedException), error.WrappedException + ) + self.addCleanup(client.disconnect) + + d.addCallbacks(self.fail, on_failure) + yield d + + def test_connect_without_sending_client_version_fails(self): + username, password = get_localhost_auth() + no_version_sending_client = NoVersionSendingClient() + d = no_version_sending_client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + + def on_failure(failure): + self.assertEqual( + failure.trap(error.IncompatibleClient), error.IncompatibleClient + ) + self.addCleanup(no_version_sending_client.disconnect) + + d.addCallbacks(self.fail, on_failure) + return d diff --git a/deluge/tests/test_common.py b/deluge/tests/test_common.py new file mode 100644 index 0000000..3cecb64 --- /dev/null +++ b/deluge/tests/test_common.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import os +import tarfile + +from twisted.trial import unittest + +from deluge.common import ( + VersionSplit, + archive_files, + fdate, + fpcnt, + fpeer, + fsize, + fspeed, + ftime, + get_path_size, + is_infohash, + is_ip, + is_ipv4, + is_ipv6, + is_magnet, + is_url, + windows_check, +) +from deluge.i18n import setup_translation + +from .common import get_test_data_file, set_tmp_config_dir + + +class CommonTestCase(unittest.TestCase): + def setUp(self): # NOQA + self.config_dir = set_tmp_config_dir() + setup_translation() + + def tearDown(self): # NOQA + pass + + def test_fsize(self): + self.assertEqual(fsize(0), '0 B') + self.assertEqual(fsize(100), '100 B') + self.assertEqual(fsize(1023), '1023 B') + self.assertEqual(fsize(1024), '1.0 KiB') + self.assertEqual(fsize(1048575), '1024.0 KiB') + self.assertEqual(fsize(1048576), '1.0 MiB') + self.assertEqual(fsize(1073741823), '1024.0 MiB') + self.assertEqual(fsize(1073741824), '1.0 GiB') + self.assertEqual(fsize(112245), '109.6 KiB') + self.assertEqual(fsize(110723441824), '103.1 GiB') + self.assertEqual(fsize(1099511627775), '1024.0 GiB') + self.assertEqual(fsize(1099511627777), '1.0 TiB') + self.assertEqual(fsize(766148267453245), '696.8 TiB') + + def test_fpcnt(self): + self.assertTrue(fpcnt(0.9311) == '93.11%') + + def test_fspeed(self): + self.assertTrue(fspeed(43134) == '42.1 KiB/s') + + def test_fpeer(self): + self.assertTrue(fpeer(10, 20) == '10 (20)') + self.assertTrue(fpeer(10, -1) == '10') + + def test_ftime(self): + self.assertEqual(ftime(0), '') + self.assertEqual(ftime(5), '5s') + self.assertEqual(ftime(100), '1m 40s') + self.assertEqual(ftime(3789), '1h 3m') + self.assertEqual(ftime(23011), '6h 23m') + self.assertEqual(ftime(391187), '4d 12h') + self.assertEqual(ftime(604800), '1w 0d') + self.assertEqual(ftime(13893086), '22w 6d') + self.assertEqual(ftime(59740269), '1y 46w') + self.assertEqual(ftime(61.25), '1m 1s') + self.assertEqual(ftime(119.9), '1m 59s') + + def test_fdate(self): + self.assertTrue(fdate(-1) == '') + + def test_is_url(self): + self.assertTrue(is_url('http://deluge-torrent.org')) + self.assertFalse(is_url('file://test.torrent')) + + def test_is_magnet(self): + self.assertTrue( + is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN') + ) + self.assertFalse(is_magnet(None)) + + def test_is_infohash(self): + self.assertTrue(is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973')) + + def test_get_path_size(self): + if windows_check(): + raise unittest.SkipTest('os devnull is different on windows') + self.assertTrue(get_path_size(os.devnull) == 0) + self.assertTrue(get_path_size('non-existant.file') == -1) + + def test_is_ip(self): + self.assertTrue(is_ip('192.0.2.0')) + self.assertFalse(is_ip('192..0.0')) + self.assertTrue(is_ip('2001:db8::')) + self.assertFalse(is_ip('2001:db8:')) + + def test_is_ipv4(self): + self.assertTrue(is_ipv4('192.0.2.0')) + self.assertFalse(is_ipv4('192..0.0')) + + def test_is_ipv6(self): + self.assertTrue(is_ipv6('2001:db8::')) + self.assertFalse(is_ipv6('2001:db8:')) + + def test_version_split(self): + self.assertTrue(VersionSplit('1.2.2') == VersionSplit('1.2.2')) + self.assertTrue(VersionSplit('1.2.1') < VersionSplit('1.2.2')) + self.assertTrue(VersionSplit('1.1.9') < VersionSplit('1.2.2')) + self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.1')) + self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0')) + self.assertTrue(VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2')) + self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2')) + self.assertTrue(VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2')) + self.assertTrue(VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2')) + self.assertTrue(VersionSplit('0.14.9') == VersionSplit('0.14.9')) + self.assertTrue(VersionSplit('0.14.9') > VersionSplit('0.14.5')) + self.assertTrue(VersionSplit('0.14.10') >= VersionSplit('0.14.9')) + self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123')) + self.assertTrue(VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2')) + self.assertTrue(VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123')) + self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123')) + self.assertTrue(VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0')) + self.assertTrue(VersionSplit('1.4.0a1') < VersionSplit('1.4.0')) + + def test_parse_human_size(self): + from deluge.common import parse_human_size + + sizes = [ + ('1', 1), + ('10 bytes', 10), + ('2048 bytes', 2048), + ('1MiB', 2 ** (10 * 2)), + ('1 MiB', 2 ** (10 * 2)), + ('1 GiB', 2 ** (10 * 3)), + ('1 GiB', 2 ** (10 * 3)), + ('1M', 10 ** 6), + ('1MB', 10 ** 6), + ('1 GB', 10 ** 9), + ('1 TB', 10 ** 12), + ] + + for human_size, byte_size in sizes: + parsed = parse_human_size(human_size) + self.assertEqual( + parsed, byte_size, 'Mismatch when converting: %s' % human_size + ) + + def test_archive_files(self): + arc_filelist = [ + get_test_data_file('test.torrent'), + get_test_data_file('deluge.png'), + ] + arc_filepath = archive_files('test-arc', arc_filelist) + + with tarfile.open(arc_filepath, 'r') as tar: + for tar_info in tar: + self.assertTrue(tar_info.isfile()) + self.assertTrue( + tar_info.name in [os.path.basename(arcf) for arcf in arc_filelist] + ) + + def test_archive_files_missing(self): + """Archive exists even with file not found.""" + filelist = ['test.torrent', 'deluge.png', 'missing.file'] + arc_filepath = archive_files( + 'test-arc', [get_test_data_file(f) for f in filelist] + ) + filelist.remove('missing.file') + + with tarfile.open(arc_filepath, 'r') as tar: + self.assertEqual(tar.getnames(), filelist) + self.assertTrue(all(tarinfo.isfile() for tarinfo in tar)) + + def test_archive_files_message(self): + filelist = ['test.torrent', 'deluge.png'] + arc_filepath = archive_files( + 'test-arc', [get_test_data_file(f) for f in filelist], message='test' + ) + + result_files = filelist + ['archive_message.txt'] + with tarfile.open(arc_filepath, 'r') as tar: + self.assertEqual(tar.getnames(), result_files) + for tar_info in tar: + self.assertTrue(tar_info.isfile()) + if tar_info.name == 'archive_message.txt': + result = tar.extractfile(tar_info).read().decode() + self.assertEqual(result, 'test') diff --git a/deluge/tests/test_component.py b/deluge/tests/test_component.py new file mode 100644 index 0000000..26f24ad --- /dev/null +++ b/deluge/tests/test_component.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from twisted.internet import defer, threads +from twisted.trial.unittest import SkipTest + +import deluge.component as component + +from .basetest import BaseTestCase + + +class ComponentTester(component.Component): + def __init__(self, name, depend=None): + component.Component.__init__(self, name, depend=depend) + self.start_count = 0 + self.stop_count = 0 + + def start(self): + self.start_count += 1 + + def stop(self): + self.stop_count += 1 + + +class ComponentTesterDelayStart(ComponentTester): + def start(self): + def do_sleep(): + import time + + time.sleep(1) + + d = threads.deferToThread(do_sleep) + + def on_done(result): + self.start_count += 1 + + return d.addCallback(on_done) + + +class ComponentTesterUpdate(component.Component): + def __init__(self, name): + component.Component.__init__(self, name) + self.counter = 0 + self.start_count = 0 + self.stop_count = 0 + + def update(self): + self.counter += 1 + + def stop(self): + self.stop_count += 1 + + +class ComponentTesterShutdown(component.Component): + def __init__(self, name): + component.Component.__init__(self, name) + self.shutdowned = False + self.stop_count = 0 + + def shutdown(self): + self.shutdowned = True + + def stop(self): + self.stop_count += 1 + + +class ComponentTestClass(BaseTestCase): + def tear_down(self): + return component.shutdown() + + def test_start_component(self): + def on_start(result, c): + self.assertEqual(c._component_state, 'Started') + self.assertEqual(c.start_count, 1) + + c = ComponentTester('test_start_c1') + d = component.start(['test_start_c1']) + d.addCallback(on_start, c) + return d + + def test_start_stop_depends(self): + def on_stop(result, c1, c2): + self.assertEqual(c1._component_state, 'Stopped') + self.assertEqual(c2._component_state, 'Stopped') + self.assertEqual(c1.stop_count, 1) + self.assertEqual(c2.stop_count, 1) + + def on_start(result, c1, c2): + self.assertEqual(c1._component_state, 'Started') + self.assertEqual(c2._component_state, 'Started') + self.assertEqual(c1.start_count, 1) + self.assertEqual(c2.start_count, 1) + return component.stop(['test_start_depends_c1']).addCallback( + on_stop, c1, c2 + ) + + c1 = ComponentTester('test_start_depends_c1') + c2 = ComponentTester('test_start_depends_c2', depend=['test_start_depends_c1']) + + d = component.start(['test_start_depends_c2']) + d.addCallback(on_start, c1, c2) + return d + + def start_with_depends(self): + c1 = ComponentTesterDelayStart('test_start_all_c1') + c2 = ComponentTester('test_start_all_c2', depend=['test_start_all_c4']) + c3 = ComponentTesterDelayStart( + 'test_start_all_c3', depend=['test_start_all_c5', 'test_start_all_c1'] + ) + c4 = ComponentTester('test_start_all_c4', depend=['test_start_all_c3']) + c5 = ComponentTester('test_start_all_c5') + + d = component.start() + return (d, c1, c2, c3, c4, c5) + + def finish_start_with_depends(self, *args): + for c in args[1:]: + component.deregister(c) + + def test_start_all(self): + def on_start(*args): + for c in args[1:]: + self.assertEqual(c._component_state, 'Started') + self.assertEqual(c.start_count, 1) + + ret = self.start_with_depends() + ret[0].addCallback(on_start, *ret[1:]) + ret[0].addCallback(self.finish_start_with_depends, *ret[1:]) + return ret[0] + + def test_register_exception(self): + ComponentTester('test_register_exception_c1') + self.assertRaises( + component.ComponentAlreadyRegistered, + ComponentTester, + 'test_register_exception_c1', + ) + + def test_stop_component(self): + def on_stop(result, c): + self.assertEqual(c._component_state, 'Stopped') + self.assertFalse(c._component_timer.running) + self.assertEqual(c.stop_count, 1) + + def on_start(result, c): + self.assertEqual(c._component_state, 'Started') + return component.stop(['test_stop_component_c1']).addCallback(on_stop, c) + + c = ComponentTesterUpdate('test_stop_component_c1') + d = component.start(['test_stop_component_c1']) + d.addCallback(on_start, c) + return d + + def test_stop_all(self): + def on_stop(result, *args): + for c in args: + self.assertEqual(c._component_state, 'Stopped') + self.assertEqual(c.stop_count, 1) + + def on_start(result, *args): + for c in args: + self.assertEqual(c._component_state, 'Started') + return component.stop().addCallback(on_stop, *args) + + ret = self.start_with_depends() + ret[0].addCallback(on_start, *ret[1:]) + ret[0].addCallback(self.finish_start_with_depends, *ret[1:]) + return ret[0] + + def test_update(self): + def on_start(result, c1, counter): + self.assertTrue(c1._component_timer) + self.assertTrue(c1._component_timer.running) + self.assertNotEqual(c1.counter, counter) + return component.stop() + + c1 = ComponentTesterUpdate('test_update_c1') + cnt = int(c1.counter) + d = component.start(['test_update_c1']) + + d.addCallback(on_start, c1, cnt) + return d + + def test_pause(self): + def on_pause(result, c1, counter): + self.assertEqual(c1._component_state, 'Paused') + self.assertNotEqual(c1.counter, counter) + self.assertFalse(c1._component_timer.running) + + def on_start(result, c1, counter): + self.assertTrue(c1._component_timer) + self.assertNotEqual(c1.counter, counter) + d = component.pause(['test_pause_c1']) + d.addCallback(on_pause, c1, counter) + return d + + c1 = ComponentTesterUpdate('test_pause_c1') + cnt = int(c1.counter) + d = component.start(['test_pause_c1']) + + d.addCallback(on_start, c1, cnt) + return d + + @defer.inlineCallbacks + def test_component_start_error(self): + ComponentTesterUpdate('test_pause_c1') + yield component.start(['test_pause_c1']) + yield component.pause(['test_pause_c1']) + test_comp = component.get('test_pause_c1') + try: + result = self.failureResultOf(test_comp._component_start()) + except AttributeError: + raise SkipTest( + 'This test requires trial failureResultOf() in Twisted version >= 13' + ) + self.assertEqual( + result.check(component.ComponentException), component.ComponentException + ) + + @defer.inlineCallbacks + def test_start_paused_error(self): + ComponentTesterUpdate('test_pause_c1') + yield component.start(['test_pause_c1']) + yield component.pause(['test_pause_c1']) + + # Deferreds that fail in component have to error handler which results in + # twisted doing a log.err call which causes the test to fail. + # Prevent failure by ignoring the exception + self._observer._ignoreErrors(component.ComponentException) + + result = yield component.start() + self.assertEqual( + [(result[0][0], result[0][1].value)], + [ + ( + defer.FAILURE, + component.ComponentException( + 'Trying to start component "%s" but it is ' + 'not in a stopped state. Current state: %s' + % ('test_pause_c1', 'Paused'), + '', + ), + ) + ], + ) + + def test_shutdown(self): + def on_shutdown(result, c1): + self.assertTrue(c1.shutdowned) + self.assertEqual(c1._component_state, 'Stopped') + self.assertEqual(c1.stop_count, 1) + + def on_start(result, c1): + d = component.shutdown() + d.addCallback(on_shutdown, c1) + return d + + c1 = ComponentTesterShutdown('test_shutdown_c1') + d = component.start(['test_shutdown_c1']) + d.addCallback(on_start, c1) + return d diff --git a/deluge/tests/test_config.py b/deluge/tests/test_config.py new file mode 100644 index 0000000..270cc5a --- /dev/null +++ b/deluge/tests/test_config.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import os +from codecs import getwriter + +from twisted.internet import task +from twisted.trial import unittest + +import deluge.config +from deluge.common import JSON_FORMAT +from deluge.config import Config + +from .common import set_tmp_config_dir + +DEFAULTS = { + 'string': 'foobar', + 'int': 1, + 'float': 0.435, + 'bool': True, + 'unicode': 'foobar', +} + + +class ConfigTestCase(unittest.TestCase): + def setUp(self): # NOQA: N803 + self.config_dir = set_tmp_config_dir() + + def test_init(self): + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + self.assertEqual(DEFAULTS, config.config) + + config = Config('test.conf', config_dir=self.config_dir) + self.assertEqual({}, config.config) + + def test_set_get_item(self): + config = Config('test.conf', config_dir=self.config_dir) + config['foo'] = 1 + self.assertEqual(config['foo'], 1) + self.assertRaises(ValueError, config.set_item, 'foo', 'bar') + + config['foo'] = 2 + self.assertEqual(config.get_item('foo'), 2) + + config['foo'] = '3' + self.assertEqual(config.get_item('foo'), 3) + + config['unicode'] = 'ВИДЕОФИЛЬМЫ' + self.assertEqual(config['unicode'], 'ВИДЕОФИЛЬМЫ') + + config['unicode'] = b'foostring' + self.assertFalse(isinstance(config.get_item('unicode'), bytes)) + + config._save_timer.cancel() + + def test_set_get_item_none(self): + config = Config('test.conf', config_dir=self.config_dir) + + config['foo'] = None + self.assertIsNone(config['foo']) + self.assertIsInstance(config['foo'], type(None)) + + config['foo'] = 1 + self.assertEqual(config.get('foo'), 1) + + config['foo'] = None + self.assertIsNone(config['foo']) + + config['bar'] = None + self.assertIsNone(config['bar']) + + config['bar'] = None + self.assertIsNone(config['bar']) + + config._save_timer.cancel() + + def test_get(self): + config = Config('test.conf', config_dir=self.config_dir) + config['foo'] = 1 + self.assertEqual(config.get('foo'), 1) + self.assertEqual(config.get('foobar'), None) + self.assertEqual(config.get('foobar', 2), 2) + config['foobar'] = 5 + self.assertEqual(config.get('foobar', 2), 5) + + def test_load(self): + def check_config(): + config = Config('test.conf', config_dir=self.config_dir) + + self.assertEqual(config['string'], 'foobar') + self.assertEqual(config['float'], 0.435) + + # Test opening a previous 1.2 config file of just a json object + import json + + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + check_config() + + # Test opening a previous 1.2 config file of having the format versions + # as ints + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + _file.write(bytes(1) + b'\n') + _file.write(bytes(1) + b'\n') + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + check_config() + + # Test the 1.2 config format + version = {'format': 1, 'file': 1} + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + json.dump(version, getwriter('utf8')(_file), **JSON_FORMAT) + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + check_config() + + def test_save(self): + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + # We do this twice because the first time we need to save the file to disk + # and the second time we do a compare and we should not write + ret = config.save() + self.assertTrue(ret) + ret = config.save() + self.assertTrue(ret) + + config['string'] = 'baz' + config['int'] = 2 + ret = config.save() + self.assertTrue(ret) + del config + + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + self.assertEqual(config['string'], 'baz') + self.assertEqual(config['int'], 2) + + def test_save_timer(self): + self.clock = task.Clock() + deluge.config.callLater = self.clock.callLater + + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + config['string'] = 'baz' + config['int'] = 2 + self.assertTrue(config._save_timer.active()) + + # Timeout set for 5 seconds in config, so lets move clock by 5 seconds + self.clock.advance(5) + + def check_config(config): + self.assertTrue(not config._save_timer.active()) + del config + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + self.assertEqual(config['string'], 'baz') + self.assertEqual(config['int'], 2) + + check_config(config) + + def test_find_json_objects(self): + s = """{ + "file": 1, + "format": 1 +}{ + "ssl": true, + "enabled": false, + "port": 8115 +}\n""" + + from deluge.config import find_json_objects + + objects = find_json_objects(s) + self.assertEqual(len(objects), 2) + + def test_find_json_objects_curly_brace(self): + """Test with string containing curly brace""" + s = """{ + "file": 1, + "format": 1 +}{ + "ssl": true, + "enabled": false, + "port": 8115 + "password": "abc{def" +}\n""" + + from deluge.config import find_json_objects + + objects = find_json_objects(s) + self.assertEqual(len(objects), 2) diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py new file mode 100644 index 0000000..15fbc1b --- /dev/null +++ b/deluge/tests/test_core.py @@ -0,0 +1,498 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from base64 import b64encode +from hashlib import sha1 as sha + +import pytest +from six import integer_types +from twisted.internet import defer, reactor, task +from twisted.internet.error import CannotListenError +from twisted.python.failure import Failure +from twisted.web.http import FORBIDDEN +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory, Site +from twisted.web.static import File + +import deluge.common +import deluge.component as component +import deluge.core.torrent +from deluge._libtorrent import lt +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer +from deluge.error import AddTorrentError, InvalidTorrentError + +from . import common +from .basetest import BaseTestCase + +common.disable_new_release_check() + + +class CookieResource(Resource): + def render(self, request): + if request.getCookie(b'password') != b'deluge': + request.setResponseCode(FORBIDDEN) + return + + request.setHeader(b'Content-Type', b'application/x-bittorrent') + with open( + common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb' + ) as _file: + data = _file.read() + return data + + +class PartialDownload(Resource): + def getChild(self, path, request): # NOQA: N802 + return EncodingResourceWrapper(self, [GzipEncoderFactory()]) + + def render(self, request): + with open( + common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb' + ) as _file: + data = _file.read() + request.setHeader(b'Content-Length', str(len(data))) + request.setHeader(b'Content-Type', b'application/x-bittorrent') + return data + + +class RedirectResource(Resource): + def render(self, request): + request.redirect(b'/ubuntu-9.04-desktop-i386.iso.torrent') + return b'' + + +class TopLevelResource(Resource): + def __init__(self): + Resource.__init__(self) + self.putChild(b'cookie', CookieResource()) + self.putChild(b'partial', PartialDownload()) + self.putChild(b'redirect', RedirectResource()) + self.putChild( + b'ubuntu-9.04-desktop-i386.iso.torrent', + File(common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent')), + ) + + +class CoreTestCase(BaseTestCase): + def set_up(self): + common.set_tmp_config_dir() + self.rpcserver = RPCServer(listen=False) + self.core = Core() + self.core.config.config['lsd'] = False + self.clock = task.Clock() + self.core.torrentmanager.callLater = self.clock.callLater + self.listen_port = 51242 + return component.start().addCallback(self.start_web_server) + + def start_web_server(self, result): + website = Site(TopLevelResource()) + for dummy in range(10): + try: + self.webserver = reactor.listenTCP(self.listen_port, website) + except CannotListenError as ex: + error = ex + self.listen_port += 1 + else: + break + else: + raise error + + return result + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + del self.core + return self.webserver.stopListening() + + return component.shutdown().addCallback(on_shutdown) + + def add_torrent(self, filename, paused=False): + if not paused: + # Patch libtorrent flags starting torrents paused + self.patch( + deluge.core.torrentmanager, + 'LT_DEFAULT_ADD_TORRENT_FLAGS', + lt.add_torrent_params_flags_t.flag_auto_managed + | lt.add_torrent_params_flags_t.flag_update_subscribe + | lt.add_torrent_params_flags_t.flag_apply_ip_filter, + ) + options = {'add_paused': paused, 'auto_managed': False} + filepath = common.get_test_data_file(filename) + with open(filepath, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = self.core.add_torrent_file(filename, filedump, options) + return torrent_id + + @defer.inlineCallbacks + def test_add_torrent_files(self): + options = {} + filenames = ['test.torrent', 'test_torrent.file.torrent'] + files_to_add = [] + for f in filenames: + filename = common.get_test_data_file(f) + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + files_to_add.append((filename, filedump, options)) + errors = yield self.core.add_torrent_files(files_to_add) + self.assertEqual(len(errors), 0) + + @defer.inlineCallbacks + def test_add_torrent_files_error_duplicate(self): + options = {} + filenames = ['test.torrent', 'test.torrent'] + files_to_add = [] + for f in filenames: + filename = common.get_test_data_file(f) + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + files_to_add.append((filename, filedump, options)) + errors = yield self.core.add_torrent_files(files_to_add) + self.assertEqual(len(errors), 1) + self.assertTrue(str(errors[0]).startswith('Torrent already in session')) + + @defer.inlineCallbacks + def test_add_torrent_file(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) + + # Get the info hash from the test.torrent + from deluge.bencode import bdecode, bencode + + with open(filename, 'rb') as _file: + info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest() + self.assertEqual(torrent_id, info_hash) + + def test_add_torrent_file_invalid_filedump(self): + options = {} + filename = common.get_test_data_file('test.torrent') + self.assertRaises( + AddTorrentError, self.core.add_torrent_file, filename, False, options + ) + + @defer.inlineCallbacks + def test_add_torrent_url(self): + url = ( + 'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent' + % self.listen_port + ) + options = {} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + torrent_id = yield self.core.add_torrent_url(url, options) + self.assertEqual(torrent_id, info_hash) + + def test_add_torrent_url_with_cookie(self): + url = 'http://localhost:%d/cookie' % self.listen_port + options = {} + headers = {'Cookie': 'password=deluge'} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + d = self.core.add_torrent_url(url, options) + d.addCallbacks(self.fail, self.assertIsInstance, errbackArgs=(Failure,)) + + d = self.core.add_torrent_url(url, options, headers) + d.addCallbacks(self.assertEqual, self.fail, callbackArgs=(info_hash,)) + + return d + + def test_add_torrent_url_with_redirect(self): + url = 'http://localhost:%d/redirect' % self.listen_port + options = {} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + d = self.core.add_torrent_url(url, options) + d.addCallback(self.assertEqual, info_hash) + return d + + def test_add_torrent_url_with_partial_download(self): + url = 'http://localhost:%d/partial' % self.listen_port + options = {} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + d = self.core.add_torrent_url(url, options) + d.addCallback(self.assertEqual, info_hash) + return d + + @defer.inlineCallbacks + def test_add_torrent_magnet(self): + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + uri = deluge.common.create_magnet_uri(info_hash) + options = {} + torrent_id = yield self.core.add_torrent_magnet(uri, options) + self.assertEqual(torrent_id, info_hash) + + def test_resume_torrent(self): + tid1 = self.add_torrent('test.torrent', paused=True) + tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) + # Assert paused + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertTrue(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertTrue(r2['paused']) + + self.core.resume_torrent(tid2) + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertTrue(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertFalse(r2['paused']) + + def test_resume_torrent_list(self): + """Backward compatibility for list of torrent_ids.""" + torrent_id = self.add_torrent('test.torrent', paused=True) + self.core.resume_torrent([torrent_id]) + result = self.core.get_torrent_status(torrent_id, ['paused']) + self.assertFalse(result['paused']) + + def test_resume_torrents(self): + tid1 = self.add_torrent('test.torrent', paused=True) + tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) + self.core.resume_torrents([tid1, tid2]) + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertFalse(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertFalse(r2['paused']) + + def test_resume_torrents_all(self): + """With no torrent_ids param, resume all torrents""" + tid1 = self.add_torrent('test.torrent', paused=True) + tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) + self.core.resume_torrents() + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertFalse(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertFalse(r2['paused']) + + def test_pause_torrent(self): + tid1 = self.add_torrent('test.torrent') + tid2 = self.add_torrent('test_torrent.file.torrent') + # Assert not paused + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertFalse(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertFalse(r2['paused']) + + self.core.pause_torrent(tid2) + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertFalse(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertTrue(r2['paused']) + + def test_pause_torrent_list(self): + """Backward compatibility for list of torrent_ids.""" + torrent_id = self.add_torrent('test.torrent') + result = self.core.get_torrent_status(torrent_id, ['paused']) + self.assertFalse(result['paused']) + self.core.pause_torrent([torrent_id]) + result = self.core.get_torrent_status(torrent_id, ['paused']) + self.assertTrue(result['paused']) + + def test_pause_torrents(self): + tid1 = self.add_torrent('test.torrent') + tid2 = self.add_torrent('test_torrent.file.torrent') + + self.core.pause_torrents([tid1, tid2]) + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertTrue(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertTrue(r2['paused']) + + def test_pause_torrents_all(self): + """With no torrent_ids param, pause all torrents""" + tid1 = self.add_torrent('test.torrent') + tid2 = self.add_torrent('test_torrent.file.torrent') + + self.core.pause_torrents() + r1 = self.core.get_torrent_status(tid1, ['paused']) + self.assertTrue(r1['paused']) + r2 = self.core.get_torrent_status(tid2, ['paused']) + self.assertTrue(r2['paused']) + + def test_prefetch_metadata_existing(self): + """Check another call with same magnet returns existing deferred.""" + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None) + + def on_result(result): + self.assertEqual(result, expected) + + d = self.core.prefetch_magnet_metadata(magnet) + d.addCallback(on_result) + d2 = self.core.prefetch_magnet_metadata(magnet) + d2.addCallback(on_result) + self.clock.advance(30) + return defer.DeferredList([d, d2]) + + @defer.inlineCallbacks + def test_remove_torrent(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) + removed = self.core.remove_torrent(torrent_id, True) + self.assertTrue(removed) + self.assertEqual(len(self.core.get_session_state()), 0) + + def test_remove_torrent_invalid(self): + self.assertRaises( + InvalidTorrentError, + self.core.remove_torrent, + 'torrentidthatdoesntexist', + True, + ) + + @defer.inlineCallbacks + def test_remove_torrents(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) + + filename2 = common.get_test_data_file('unicode_filenames.torrent') + with open(filename2, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id2 = yield self.core.add_torrent_file_async( + filename2, filedump, options + ) + d = self.core.remove_torrents([torrent_id, torrent_id2], True) + + def test_ret(val): + self.assertTrue(val == []) + + d.addCallback(test_ret) + + def test_session_state(val): + self.assertEqual(len(self.core.get_session_state()), 0) + + d.addCallback(test_session_state) + yield d + + @defer.inlineCallbacks + def test_remove_torrents_invalid(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async( + filename, filedump, options + ) + val = yield self.core.remove_torrents( + ['invalidid1', 'invalidid2', torrent_id], False + ) + self.assertEqual(len(val), 2) + self.assertEqual( + val[0], ('invalidid1', 'torrent_id invalidid1 not in session.') + ) + self.assertEqual( + val[1], ('invalidid2', 'torrent_id invalidid2 not in session.') + ) + + def test_get_session_status(self): + status = self.core.get_session_status( + ['net.recv_tracker_bytes', 'net.sent_tracker_bytes'] + ) + self.assertIsInstance(status, dict) + self.assertEqual(status['net.recv_tracker_bytes'], 0) + self.assertEqual(status['net.sent_tracker_bytes'], 0) + + def test_get_session_status_all(self): + status = self.core.get_session_status([]) + self.assertIsInstance(status, dict) + self.assertIn('upload_rate', status) + self.assertIn('net.recv_bytes', status) + + def test_get_session_status_depr(self): + status = self.core.get_session_status(['num_peers', 'num_unchoked']) + self.assertIsInstance(status, dict) + self.assertEqual(status['num_peers'], 0) + self.assertEqual(status['num_unchoked'], 0) + + def test_get_session_status_rates(self): + status = self.core.get_session_status(['upload_rate', 'download_rate']) + self.assertIsInstance(status, dict) + self.assertEqual(status['upload_rate'], 0) + + def test_get_session_status_ratio(self): + status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio']) + self.assertIsInstance(status, dict) + self.assertEqual(status['write_hit_ratio'], 0.0) + self.assertEqual(status['read_hit_ratio'], 0.0) + + def test_get_free_space(self): + space = self.core.get_free_space('.') + # get_free_space returns long on Python 2 (32-bit). + self.assertTrue(isinstance(space, integer_types)) + self.assertTrue(space >= 0) + self.assertEqual(self.core.get_free_space('/someinvalidpath'), -1) + + @pytest.mark.slow + def test_test_listen_port(self): + d = self.core.test_listen_port() + + def result(r): + self.assertTrue(r in (True, False)) + + d.addCallback(result) + return d + + def test_sanitize_filepath(self): + pathlist = { + '\\backslash\\path\\': 'backslash/path', + ' single_file ': 'single_file', + '..': '', + '/../..../': '', + ' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file', + '/ test /\\.. /.file/': 'test/.file', + 'mytorrent/subfold/file1': 'mytorrent/subfold/file1', + 'Torrent/folder/': 'Torrent/folder', + } + + for key in pathlist: + self.assertEqual( + deluge.core.torrent.sanitize_filepath(key, folder=False), pathlist[key] + ) + self.assertEqual( + deluge.core.torrent.sanitize_filepath(key, folder=True), + pathlist[key] + '/', + ) + + def test_get_set_config_values(self): + self.assertEqual( + self.core.get_config_values(['abc', 'foo']), {'foo': None, 'abc': None} + ) + self.assertEqual(self.core.get_config_value('foobar'), None) + self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'}) + self.assertEqual( + self.core.get_config_values(['foo', 'abc']), {'foo': 10, 'abc': 'def'} + ) + self.assertEqual(self.core.get_config_value('foobar'), 'barfoo') + + def test_read_only_config_keys(self): + key = 'max_upload_speed' + self.core.read_only_config_keys = [key] + + old_value = self.core.get_config_value(key) + self.core.set_config({key: old_value + 10}) + new_value = self.core.get_config_value(key) + self.assertEqual(old_value, new_value) + + self.core.read_only_config_keys = None + + def test__create_peer_id(self): + self.assertEqual(self.core._create_peer_id('2.0.0'), '-DE200s-') + self.assertEqual(self.core._create_peer_id('2.0.0.dev15'), '-DE200D-') + self.assertEqual(self.core._create_peer_id('2.0.1rc1'), '-DE201r-') + self.assertEqual(self.core._create_peer_id('2.11.0b2'), '-DE2B0b-') + self.assertEqual(self.core._create_peer_id('2.4.12b2.dev3'), '-DE24CD-') diff --git a/deluge/tests/test_decorators.py b/deluge/tests/test_decorators.py new file mode 100644 index 0000000..7d4bd98 --- /dev/null +++ b/deluge/tests/test_decorators.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from twisted.trial import unittest + +from deluge.decorators import proxy + + +class DecoratorsTestCase(unittest.TestCase): + def test_proxy_with_simple_functions(self): + def negate(func, *args, **kwargs): + return not func(*args, **kwargs) + + @proxy(negate) + def something(_bool): + return _bool + + @proxy(negate) + @proxy(negate) + def double_nothing(_bool): + return _bool + + self.assertTrue(something(False)) + self.assertFalse(something(True)) + self.assertTrue(double_nothing(True)) + self.assertFalse(double_nothing(False)) + + def test_proxy_with_class_method(self): + def negate(func, *args, **kwargs): + return -func(*args, **kwargs) + + class Test(object): + def __init__(self, number): + self.number = number + + @proxy(negate) + def diff(self, number): + return self.number - number + + @proxy(negate) + def no_diff(self, number): + return self.diff(number) + + t = Test(5) + self.assertEqual(t.diff(1), -4) + self.assertEqual(t.no_diff(1), 4) diff --git a/deluge/tests/test_error.py b/deluge/tests/test_error.py new file mode 100644 index 0000000..c552e94 --- /dev/null +++ b/deluge/tests/test_error.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from twisted.trial import unittest + +import deluge.error + + +class ErrorTestCase(unittest.TestCase): + def setUp(self): # NOQA: N803 + pass + + def tearDown(self): # NOQA: N803 + pass + + def test_deluge_error(self): + msg = 'Some message' + e = deluge.error.DelugeError(msg) + self.assertEqual(str(e), msg) + from twisted.internet.defer import DebugInfo + + del DebugInfo.__del__ # Hides all errors + self.assertEqual(e._args, (msg,)) + self.assertEqual(e._kwargs, {}) + + def test_incompatible_client(self): + version = '1.3.6' + e = deluge.error.IncompatibleClient(version) + self.assertEqual( + str(e), + 'Your deluge client is not compatible with the daemon. \ +Please upgrade your client to %s' + % version, + ) + + def test_not_authorized_error(self): + current_level = 5 + required_level = 10 + e = deluge.error.NotAuthorizedError(current_level, required_level) + self.assertEqual( + str(e), 'Auth level too low: %d < %d' % (current_level, required_level) + ) + + def test_bad_login_error(self): + message = 'Login failed' + username = 'deluge' + e = deluge.error.BadLoginError(message, username) + self.assertEqual(str(e), message) diff --git a/deluge/tests/test_files_tab.py b/deluge/tests/test_files_tab.py new file mode 100644 index 0000000..23865d7 --- /dev/null +++ b/deluge/tests/test_files_tab.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import pytest +from twisted.trial import unittest + +import deluge.component as component +from deluge.common import windows_check +from deluge.configmanager import ConfigManager +from deluge.i18n import setup_translation + +from . import common +from .basetest import BaseTestCase + +libs_available = True +# Allow running other tests without GTKUI dependencies available +try: + from deluge.ui.gtk3.files_tab import FilesTab + from deluge.ui.gtk3.gtkui import DEFAULT_PREFS + from deluge.ui.gtk3.mainwindow import MainWindow +except ImportError: + libs_available = False + +setup_translation() + + +@pytest.mark.gtkui +class FilesTabTestCase(BaseTestCase): + def set_up(self): + if libs_available is False: + raise unittest.SkipTest('GTKUI dependencies not available') + + common.set_tmp_config_dir() + ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS) + self.mainwindow = MainWindow() + self.filestab = FilesTab() + self.t_id = '1' + self.filestab.torrent_id = self.t_id + self.index = 1 + + def tear_down(self): + return component.shutdown() + + def print_treestore(self, title, treestore): + root = treestore.get_iter_first() + level = 1 + + def p_level(s, l): + print('%s%s' % (' ' * l, s)) + + def _print_treestore_children(i, lvl): + while i: + p_level(treestore[i][0], lvl) + if treestore.iter_children(i): + _print_treestore_children(treestore.iter_children(i), lvl + 2) + i = treestore.iter_next(i) + + print('\n%s' % title) + _print_treestore_children(root, level) + print('') + + def verify_treestore(self, treestore, tree): + def _verify_treestore(itr, tree_values): + i = 0 + while itr: + values = tree_values[i] + if treestore[itr][0] != values[0]: + return False + if treestore.iter_children(itr): + if not _verify_treestore(treestore.iter_children(itr), values[1]): + return False + itr = treestore.iter_next(itr) + i += 1 + return True + + return _verify_treestore(treestore.get_iter_first(), tree) + + def test_files_tab(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '2/test_100.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, + [['1/', [['test_10.txt']]], ['2/', [['test_100.txt']]]], + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + self.assertTrue(ret) + + def test_files_tab2(self): + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/1/test_10.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/1/test_100.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, + [['1/', [['1/', [['test_100.txt'], ['test_10.txt']]]]]], + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + self.assertTrue(ret) + + def test_files_tab3(self): + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/test_100.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]] + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + self.assertTrue(ret) + + def test_files_tab4(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': '1/test_100.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/2/test_100.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, + [['1/', [['2/', [['test_100.txt']]], ['test_10.txt']]]], + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + self.assertTrue(ret) + + def test_files_tab5(self): + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': '2/test_100.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/test_100.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]] + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + self.assertTrue(ret) diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py new file mode 100644 index 0000000..a503e46 --- /dev/null +++ b/deluge/tests/test_httpdownloader.py @@ -0,0 +1,266 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import tempfile +from email.utils import formatdate + +from twisted.internet import reactor +from twisted.internet.error import CannotListenError +from twisted.python.failure import Failure +from twisted.trial import unittest +from twisted.web.error import PageRedirect +from twisted.web.http import NOT_MODIFIED +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory, Site +from twisted.web.util import redirectTo + +from deluge.common import windows_check +from deluge.httpdownloader import download_file +from deluge.log import setup_logger + +temp_dir = tempfile.mkdtemp() + + +def fname(name): + return '%s/%s' % (temp_dir, name) + + +class RedirectResource(Resource): + def render(self, request): + url = self.get_url().encode('utf8') + return redirectTo(url, request) + + +class RenameResource(Resource): + def render(self, request): + filename = request.args.get(b'filename', [b'renamed_file'])[0] + request.setHeader(b'Content-Type', b'text/plain') + request.setHeader(b'Content-Disposition', b'attachment; filename=' + filename) + return b'This file should be called ' + filename + + +class AttachmentResource(Resource): + def render(self, request): + request.setHeader(b'Content-Type', b'text/plain') + request.setHeader(b'Content-Disposition', b'attachment') + return b'Attachement with no filename set' + + +class CookieResource(Resource): + def render(self, request): + request.setHeader(b'Content-Type', b'text/plain') + if request.getCookie(b'password') is None: + return b'Password cookie not set!' + + if request.getCookie(b'password') == b'deluge': + return b'COOKIE MONSTER!' + + return request.getCookie('password') + + +class GzipResource(Resource): + def getChild(self, path, request): # NOQA: N802 + return EncodingResourceWrapper(self, [GzipEncoderFactory()]) + + def render(self, request): + message = request.args.get(b'msg', [b'EFFICIENCY!'])[0] + request.setHeader(b'Content-Type', b'text/plain') + return message + + +class PartialDownloadResource(Resource): + def __init__(self, *args, **kwargs): + Resource.__init__(self) + self.render_count = 0 + + def render(self, request): + # encoding = request.requestHeaders._rawHeaders.get('accept-encoding', None) + if self.render_count == 0: + request.setHeader(b'content-length', b'5') + else: + request.setHeader(b'content-length', b'3') + + # if encoding == "deflate, gzip, x-gzip": + request.write('abc') + self.render_count += 1 + return '' + + +class TopLevelResource(Resource): + def __init__(self): + Resource.__init__(self) + self.putChild(b'cookie', CookieResource()) + self.putChild(b'gzip', GzipResource()) + self.redirect_rsrc = RedirectResource() + self.putChild(b'redirect', self.redirect_rsrc) + self.putChild(b'rename', RenameResource()) + self.putChild(b'attachment', AttachmentResource()) + self.putChild(b'partial', PartialDownloadResource()) + + def getChild(self, path, request): # NOQA: N802 + if not path: + return self + else: + return Resource.getChild(self, path, request) + + def render(self, request): + if request.getHeader('If-Modified-Since'): + request.setResponseCode(NOT_MODIFIED) + return b'<h1>Deluge HTTP Downloader tests webserver here</h1>' + + +class DownloadFileTestCase(unittest.TestCase): + def get_url(self, path=''): + return 'http://localhost:%d/%s' % (self.listen_port, path) + + def setUp(self): # NOQA + setup_logger('warning', fname('log_file')) + self.website = Site(TopLevelResource()) + self.listen_port = 51242 + self.website.resource.redirect_rsrc.get_url = self.get_url + for dummy in range(10): + try: + self.webserver = reactor.listenTCP(self.listen_port, self.website) + except CannotListenError as ex: + error = ex + self.listen_port += 1 + else: + break + else: + raise error + + def tearDown(self): # NOQA + return self.webserver.stopListening() + + def assertContains(self, filename, contents): # NOQA + with open(filename) as _file: + try: + self.assertEqual(_file.read(), contents) + except Exception as ex: + self.fail(ex) + return filename + + def assertNotContains(self, filename, contents, file_mode=''): # NOQA + with open(filename, file_mode) as _file: + try: + self.assertNotEqual(_file.read(), contents) + except Exception as ex: + self.fail(ex) + return filename + + def test_download(self): + d = download_file(self.get_url(), fname('index.html')) + d.addCallback(self.assertEqual, fname('index.html')) + return d + + def test_download_without_required_cookies(self): + url = self.get_url('cookie') + d = download_file(url, fname('none')) + d.addCallback(self.fail) + d.addErrback(self.assertIsInstance, Failure) + return d + + def test_download_with_required_cookies(self): + url = self.get_url('cookie') + cookie = {'cookie': 'password=deluge'} + d = download_file(url, fname('monster'), headers=cookie) + d.addCallback(self.assertEqual, fname('monster')) + d.addCallback(self.assertContains, 'COOKIE MONSTER!') + return d + + def test_download_with_rename(self): + + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + + url = self.get_url('rename?filename=renamed') + d = download_file(url, fname('original')) + d.addCallback(self.assertEqual, fname('renamed')) + d.addCallback(self.assertContains, 'This file should be called renamed') + return d + + def test_download_with_rename_exists(self): + + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + + open(fname('renamed'), 'w').close() + url = self.get_url('rename?filename=renamed') + d = download_file(url, fname('original')) + d.addCallback(self.assertEqual, fname('renamed-1')) + d.addCallback(self.assertContains, 'This file should be called renamed') + return d + + def test_download_with_rename_sanitised(self): + + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + + url = self.get_url('rename?filename=/etc/passwd') + d = download_file(url, fname('original')) + d.addCallback(self.assertEqual, fname('passwd')) + d.addCallback(self.assertContains, 'This file should be called /etc/passwd') + return d + + def test_download_with_attachment_no_filename(self): + url = self.get_url('attachment') + d = download_file(url, fname('original')) + d.addCallback(self.assertEqual, fname('original')) + d.addCallback(self.assertContains, 'Attachement with no filename set') + return d + + def test_download_with_rename_prevented(self): + url = self.get_url('rename?filename=spam') + d = download_file(url, fname('forced'), force_filename=True) + d.addCallback(self.assertEqual, fname('forced')) + d.addCallback(self.assertContains, 'This file should be called spam') + return d + + def test_download_with_gzip_encoding(self): + url = self.get_url('gzip?msg=success') + d = download_file(url, fname('gzip_encoded')) + d.addCallback(self.assertContains, 'success') + return d + + def test_download_with_gzip_encoding_disabled(self): + url = self.get_url('gzip?msg=unzip') + d = download_file(url, fname('gzip_encoded'), allow_compression=False) + d.addCallback(self.assertContains, 'unzip') + return d + + def test_page_redirect_unhandled(self): + url = self.get_url('redirect') + d = download_file(url, fname('none')) + d.addCallback(self.fail) + + def on_redirect(failure): + self.assertTrue(type(failure), PageRedirect) + + d.addErrback(on_redirect) + return d + + def test_page_redirect(self): + url = self.get_url('redirect') + d = download_file(url, fname('none'), handle_redirects=True) + d.addCallback(self.assertEqual, fname('none')) + d.addErrback(self.fail) + return d + + def test_page_not_found(self): + d = download_file(self.get_url('page/not/found'), fname('none')) + d.addCallback(self.fail) + d.addErrback(self.assertIsInstance, Failure) + return d + + def test_page_not_modified(self): + headers = {'If-Modified-Since': formatdate(usegmt=True)} + d = download_file(self.get_url(), fname('index.html'), headers=headers) + d.addCallback(self.fail) + d.addErrback(self.assertIsInstance, Failure) + return d diff --git a/deluge/tests/test_json_api.py b/deluge/tests/test_json_api.py new file mode 100644 index 0000000..1da64bf --- /dev/null +++ b/deluge/tests/test_json_api.py @@ -0,0 +1,291 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import json as json_lib + +from mock import MagicMock +from twisted.internet import defer +from twisted.web import server +from twisted.web.http import Request + +import deluge.common +import deluge.component as component +import deluge.ui.web.auth +import deluge.ui.web.json_api +from deluge.error import DelugeError +from deluge.ui.client import client +from deluge.ui.web.auth import Auth +from deluge.ui.web.json_api import JSON, JSONException + +from . import common +from .basetest import BaseTestCase +from .common_web import WebServerMockBase +from .daemon_base import DaemonBase + +common.disable_new_release_check() + + +class JSONBase(BaseTestCase, DaemonBase): + def connect_client(self, *args, **kwargs): + return client.connect( + 'localhost', + self.listen_port, + username=kwargs.get('user', ''), + password=kwargs.get('password', ''), + ) + + def disconnect_client(self, *args): + return client.disconnect() + + def tear_down(self): + d = component.shutdown() + d.addCallback(self.disconnect_client) + d.addCallback(self.terminate_core) + return d + + +class JSONTestCase(JSONBase): + def set_up(self): + d = self.common_set_up() + d.addCallback(self.start_core) + d.addCallbacks(self.connect_client, self.terminate_core) + return d + + @defer.inlineCallbacks + def test_get_remote_methods(self): + json = JSON() + methods = yield json.get_remote_methods() + self.assertEqual(type(methods), tuple) + self.assertTrue(len(methods) > 0) + + def test_render_fail_disconnected(self): + json = JSON() + request = MagicMock() + request.method = b'POST' + request._disconnected = True + # When disconnected, returns empty string + self.assertEqual(json.render(request), '') + + def test_render_fail(self): + json = JSON() + request = MagicMock() + request.method = b'POST' + + def write(response_str): + request.write_was_called = True + response = json_lib.loads(response_str.decode()) + self.assertEqual(response['result'], None) + self.assertEqual(response['id'], None) + self.assertEqual( + response['error']['message'], 'JSONException: JSON not decodable' + ) + self.assertEqual(response['error']['code'], 5) + + request.write = write + request.write_was_called = False + request._disconnected = False + request.getHeader.return_value = b'application/json' + self.assertEqual(json.render(request), server.NOT_DONE_YET) + self.assertTrue(request.write_was_called) + + def test_handle_request_invalid_method(self): + json = JSON() + request = MagicMock() + json_data = {'method': 'no-existing-module.test', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + request_id, result, error = json._handle_request(request) + self.assertEqual(error, {'message': 'Unknown method', 'code': 2}) + + def test_handle_request_invalid_json_request(self): + json = JSON() + request = MagicMock() + json_data = {'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + self.assertRaises(JSONException, json._handle_request, request) + json_data = {'method': 'some.method', 'params': []} + request.json = json_lib.dumps(json_data).encode() + self.assertRaises(JSONException, json._handle_request, request) + json_data = {'method': 'some.method', 'id': 0} + request.json = json_lib.dumps(json_data).encode() + self.assertRaises(JSONException, json._handle_request, request) + + def test_on_json_request_invalid_content_type(self): + """Test for exception with content type not application/json""" + json = JSON() + request = MagicMock() + request.getHeader.return_value = b'text/plain' + json_data = {'method': 'some.method', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + self.assertRaises(JSONException, json._on_json_request, request) + + +class JSONCustomUserTestCase(JSONBase): + def set_up(self): + d = self.common_set_up() + d.addCallback(self.start_core) + return d + + @defer.inlineCallbacks + def test_handle_request_auth_error(self): + yield self.connect_client() + json = JSON() + auth_conf = {'session_timeout': 10, 'sessions': {}} + Auth(auth_conf) # Must create the component + + # Must be called to update remote methods in json object + yield json.get_remote_methods() + + request = MagicMock() + request.getCookie = MagicMock(return_value=b'bad_value') + json_data = {'method': 'core.get_libtorrent_version', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + request_id, result, error = json._handle_request(request) + self.assertEqual(error, {'message': 'Not authenticated', 'code': 1}) + + +class RPCRaiseDelugeErrorJSONTestCase(JSONBase): + def set_up(self): + d = self.common_set_up() + custom_script = """ + from deluge.error import DelugeError + from deluge.core.rpcserver import export + class TestClass(object): + @export() + def test(self): + raise DelugeError('DelugeERROR') + + test = TestClass() + daemon.rpcserver.register_object(test) +""" + d.addCallback(self.start_core, custom_script=custom_script) + d.addCallbacks(self.connect_client, self.terminate_core) + return d + + @defer.inlineCallbacks + def test_handle_request_method_raise_delugeerror(self): + json = JSON() + + def get_session_id(s_id): + return s_id + + self.patch(deluge.ui.web.auth, 'get_session_id', get_session_id) + auth_conf = {'session_timeout': 10, 'sessions': {}} + auth = Auth(auth_conf) + request = Request(MagicMock(), False) + request.base = b'' + auth._create_session(request) + methods = yield json.get_remote_methods() + # Verify the function has been registered + self.assertTrue('testclass.test' in methods) + + request = MagicMock() + session_id = list(auth.config['sessions'])[0] + request.getCookie = MagicMock(return_value=session_id.encode()) + json_data = {'method': 'testclass.test', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + request_id, result, error = json._handle_request(request) + result.addCallback(self.fail) + + def on_error(error): + self.assertEqual(error.type, DelugeError) + + result.addErrback(on_error) + yield result + + +class JSONRequestFailedTestCase(JSONBase, WebServerMockBase): + def set_up(self): + d = self.common_set_up() + custom_script = """ + from deluge.error import DelugeError + from deluge.core.rpcserver import export + from twisted.internet import reactor, task + class TestClass(object): + @export() + def test(self): + def test_raise_error(): + raise DelugeError('DelugeERROR') + + return task.deferLater(reactor, 1, test_raise_error) + + test = TestClass() + daemon.rpcserver.register_object(test) +""" + from twisted.internet.defer import Deferred + + extra_callback = { + 'deferred': Deferred(), + 'types': ['stderr'], + 'timeout': 10, + 'triggers': [ + { + 'expr': 'in test_raise_error', + 'value': lambda reader, data, data_all: 'Test', + } + ], + } + + def on_test_raise(*args): + self.assertTrue('Unhandled error in Deferred:' in self.core.stderr_out) + self.assertTrue('in test_raise_error' in self.core.stderr_out) + + extra_callback['deferred'].addCallback(on_test_raise) + d.addCallback( + self.start_core, + custom_script=custom_script, + print_stdout=False, + print_stderr=False, + timeout=5, + extra_callbacks=[extra_callback], + ) + d.addCallbacks(self.connect_client, self.terminate_core) + return d + + @defer.inlineCallbacks + def test_render_on_rpc_request_failed(self): + json = JSON() + + methods = yield json.get_remote_methods() + # Verify the function has been registered + self.assertTrue('testclass.test' in methods) + + request = MagicMock() + + # Circumvent authentication + auth = Auth({}) + self.mock_authentication_ignore(auth) + + def write(response_str): + request.write_was_called = True + response = json_lib.loads(response_str.decode()) + self.assertEqual(response['result'], None, 'BAD RESULT') + self.assertEqual(response['id'], 0) + self.assertEqual( + response['error']['message'], + 'Failure: [Failure instance: Traceback (failure with no frames):' + " <class 'deluge.error.DelugeError'>: DelugeERROR\n]", + ) + self.assertEqual(response['error']['code'], 4) + + request.write = write + request.write_was_called = False + request._disconnected = False + request.getHeader.return_value = b'application/json' + json_data = {'method': 'testclass.test', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + d = json._on_json_request(request) + + def on_success(arg): + self.assertEqual(arg, server.NOT_DONE_YET) + return True + + d.addCallbacks(on_success, self.fail) + yield d diff --git a/deluge/tests/test_log.py b/deluge/tests/test_log.py new file mode 100644 index 0000000..572693b --- /dev/null +++ b/deluge/tests/test_log.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 Calum Lind <calumlind@gmail.com> +# Copyright (C) 2010 Pedro Algarvio <ufs@ufsoft.org> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import logging +import warnings + +from deluge.log import setup_logger + +from .basetest import BaseTestCase + + +class LogTestCase(BaseTestCase): + def set_up(self): + setup_logger(logging.DEBUG) + + def tear_down(self): + setup_logger('none') + + def test_old_log_deprecation_warning(self): + from deluge.log import LOG + + with warnings.catch_warnings(record=True) as w: + # Cause all warnings to always be triggered. + warnings.simplefilter('always') + LOG.debug('foo') + self.assertEqual(w[-1].category, DeprecationWarning) + + # def test_twisted_error_log(self): + # from twisted.internet import defer + # import deluge.component as component + # from deluge.core.eventmanager import EventManager + # EventManager() + # + # d = component.start() + # + # @defer.inlineCallbacks + # def call(*args): + # yield component.pause(["EventManager"]) + # yield component.start(["EventManager"]) + # + # d.addCallback(call) + # return d diff --git a/deluge/tests/test_maketorrent.py b/deluge/tests/test_maketorrent.py new file mode 100644 index 0000000..4e00996 --- /dev/null +++ b/deluge/tests/test_maketorrent.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import os +import tempfile + +from twisted.trial import unittest + +from deluge import maketorrent +from deluge.common import windows_check + + +def check_torrent(filename): + # Test loading with libtorrent to make sure it's valid + from deluge._libtorrent import lt + + lt.torrent_info(filename) + + # Test loading with our internal TorrentInfo class + from deluge.ui.common import TorrentInfo + + TorrentInfo(filename) + + +class MakeTorrentTestCase(unittest.TestCase): + def test_save_multifile(self): + # Create a temporary folder for torrent creation + tmp_path = tempfile.mkdtemp() + with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file: + _file.write(b'a' * (312 * 1024)) + with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file: + _file.write(b'b' * (2354 * 1024)) + with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file: + _file.write(b'c' * (11 * 1024)) + + t = maketorrent.TorrentMetadata() + t.data_path = tmp_path + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + t.save(tmp_file) + + check_torrent(tmp_file) + + os.remove(os.path.join(tmp_path, 'file_A')) + os.remove(os.path.join(tmp_path, 'file_B')) + os.remove(os.path.join(tmp_path, 'file_C')) + os.rmdir(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) + + def test_save_singlefile(self): + if windows_check(): + raise unittest.SkipTest('on windows file not released') + tmp_data = tempfile.mkstemp('testdata')[1] + with open(tmp_data, 'wb') as _file: + _file.write(b'a' * (2314 * 1024)) + t = maketorrent.TorrentMetadata() + t.data_path = tmp_data + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + t.save(tmp_file) + + check_torrent(tmp_file) + + os.remove(tmp_data) + os.close(tmp_fd) + os.remove(tmp_file) + + def test_save_multifile_padded(self): + # Create a temporary folder for torrent creation + tmp_path = tempfile.mkdtemp() + with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file: + _file.write(b'a' * (312 * 1024)) + with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file: + _file.write(b'b' * (2354 * 1024)) + with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file: + _file.write(b'c' * (11 * 1024)) + + t = maketorrent.TorrentMetadata() + t.data_path = tmp_path + t.pad_files = True + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + t.save(tmp_file) + + check_torrent(tmp_file) + + os.remove(os.path.join(tmp_path, 'file_A')) + os.remove(os.path.join(tmp_path, 'file_B')) + os.remove(os.path.join(tmp_path, 'file_C')) + os.rmdir(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) diff --git a/deluge/tests/test_metafile.py b/deluge/tests/test_metafile.py new file mode 100644 index 0000000..fc6507c --- /dev/null +++ b/deluge/tests/test_metafile.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import os +import tempfile + +from twisted.trial import unittest + +from deluge import metafile +from deluge.common import windows_check + + +def check_torrent(filename): + # Test loading with libtorrent to make sure it's valid + from deluge._libtorrent import lt + + lt.torrent_info(filename) + + # Test loading with our internal TorrentInfo class + from deluge.ui.common import TorrentInfo + + TorrentInfo(filename) + + +class MetafileTestCase(unittest.TestCase): + def test_save_multifile(self): + # Create a temporary folder for torrent creation + tmp_path = tempfile.mkdtemp() + with open(os.path.join(tmp_path, 'file_A'), 'wb') as tmp_file: + tmp_file.write(b'a' * (312 * 1024)) + with open(os.path.join(tmp_path, 'file_B'), 'wb') as tmp_file: + tmp_file.write(b'b' * (2354 * 1024)) + with open(os.path.join(tmp_path, 'file_C'), 'wb') as tmp_file: + tmp_file.write(b'c' * (11 * 1024)) + + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file) + + check_torrent(tmp_file) + + os.remove(os.path.join(tmp_path, 'file_A')) + os.remove(os.path.join(tmp_path, 'file_B')) + os.remove(os.path.join(tmp_path, 'file_C')) + os.rmdir(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) + + def test_save_singlefile(self): + if windows_check(): + raise unittest.SkipTest('on windows \\ != / for path names') + tmp_path = tempfile.mkstemp('testdata')[1] + with open(tmp_path, 'wb') as tmp_file: + tmp_file.write(b'a' * (2314 * 1024)) + + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file) + + check_torrent(tmp_file) + + os.remove(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) diff --git a/deluge/tests/test_plugin_metadata.py b/deluge/tests/test_plugin_metadata.py new file mode 100644 index 0000000..436fc2c --- /dev/null +++ b/deluge/tests/test_plugin_metadata.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 Calum Lind <calumlind@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from deluge.pluginmanagerbase import PluginManagerBase + +from . import common +from .basetest import BaseTestCase + + +class PluginManagerBaseTestCase(BaseTestCase): + def set_up(self): + common.set_tmp_config_dir() + + def test_get_plugin_info(self): + pm = PluginManagerBase('core.conf', 'deluge.plugin.core') + for p in pm.get_available_plugins(): + for key, value in pm.get_plugin_info(p).items(): + self.assertTrue(isinstance('%s: %s' % (key, value), ''.__class__)) + + def test_get_plugin_info_invalid_name(self): + pm = PluginManagerBase('core.conf', 'deluge.plugin.core') + for key, value in pm.get_plugin_info('random').items(): + self.assertEqual(value, 'not available') diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py new file mode 100644 index 0000000..02f9af0 --- /dev/null +++ b/deluge/tests/test_rpcserver.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Bro <bro.development@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import deluge.component as component +import deluge.error +from deluge.common import get_localhost_auth +from deluge.core import rpcserver +from deluge.core.authmanager import AuthManager +from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer +from deluge.log import setup_logger + +from .basetest import BaseTestCase + +setup_logger('none') + + +class DelugeRPCProtocolTester(DelugeRPCProtocol): + + messages = [] + + def transfer_message(self, data): + self.messages.append(data) + + +class RPCServerTestCase(BaseTestCase): + def set_up(self): + self.rpcserver = RPCServer(listen=False) + self.rpcserver.factory.protocol = DelugeRPCProtocolTester + self.factory = self.rpcserver.factory + self.session_id = '0' + self.request_id = 11 + self.protocol = self.rpcserver.factory.protocol() + self.protocol.factory = self.factory + self.protocol.transport = self.protocol + self.factory.session_protocols[self.session_id] = self.protocol + self.factory.authorized_sessions[self.session_id] = None + self.factory.interested_events[self.session_id] = ['TorrentFolderRenamedEvent'] + self.protocol.sessionno = self.session_id + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + + return component.shutdown().addCallback(on_shutdown) + + def test_emit_event_for_session_id(self): + torrent_id = '12' + from deluge.event import TorrentFolderRenamedEvent + + data = [torrent_id, 'new name', 'old name'] + e = TorrentFolderRenamedEvent(*data) + self.rpcserver.emit_event_for_session_id(self.session_id, e) + msg = self.protocol.messages.pop() + self.assertEqual(msg[0], rpcserver.RPC_EVENT, str(msg)) + self.assertEqual(msg[1], 'TorrentFolderRenamedEvent', str(msg)) + self.assertEqual(msg[2], data, str(msg)) + + def test_invalid_client_login(self): + self.protocol.dispatch(self.request_id, 'daemon.login', [1], {}) + msg = self.protocol.messages.pop() + self.assertEqual(msg[0], rpcserver.RPC_ERROR) + self.assertEqual(msg[1], self.request_id) + + def test_valid_client_login(self): + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch( + self.request_id, 'daemon.login', auth, {'client_version': 'Test'} + ) + msg = self.protocol.messages.pop() + self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg)) + self.assertEqual(msg[1], self.request_id, str(msg)) + self.assertEqual(msg[2], rpcserver.AUTH_LEVEL_ADMIN, str(msg)) + + def test_client_login_error(self): + # This test causes error log prints while running the test... + self.protocol.transport = None # This should cause AttributeError + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch( + self.request_id, 'daemon.login', auth, {'client_version': 'Test'} + ) + msg = self.protocol.messages.pop() + self.assertEqual(msg[0], rpcserver.RPC_ERROR) + self.assertEqual(msg[1], self.request_id) + self.assertEqual(msg[2], 'WrappedException') + self.assertEqual(msg[3][1], 'AttributeError') + + def test_client_invalid_method_call(self): + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch(self.request_id, 'invalid_function', auth, {}) + msg = self.protocol.messages.pop() + self.assertEqual(msg[0], rpcserver.RPC_ERROR) + self.assertEqual(msg[1], self.request_id) + self.assertEqual(msg[2], 'WrappedException') + self.assertEqual(msg[3][1], 'AttributeError') + + def test_daemon_info(self): + self.protocol.dispatch(self.request_id, 'daemon.info', [], {}) + msg = self.protocol.messages.pop() + self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg)) + self.assertEqual(msg[1], self.request_id, str(msg)) + self.assertEqual(msg[2], deluge.common.get_version(), str(msg)) diff --git a/deluge/tests/test_security.py b/deluge/tests/test_security.py new file mode 100644 index 0000000..3794049 --- /dev/null +++ b/deluge/tests/test_security.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import os + +import pytest +from twisted.internet.utils import getProcessOutputAndValue + +import deluge.component as component +import deluge.ui.web.server +from deluge import configmanager +from deluge.common import windows_check +from deluge.ui.web.server import DelugeWeb + +from .basetest import BaseTestCase +from .common import get_test_data_file +from .common_web import WebServerTestBase +from .daemon_base import DaemonBase + +SECURITY_TESTS = bool(os.getenv('SECURITY_TESTS', False)) + + +class SecurityBaseTestCase(object): + if windows_check(): + skip = 'windows can`t run .sh files' + elif not SECURITY_TESTS: + skip = 'Skipping security tests' + + http_err = 'can\'t run http tests on daemon' + + def __init__(self): + self.home_dir = os.path.expanduser('~') + self.port = 8112 + + def _run_test(self, test): + d = getProcessOutputAndValue( + 'bash', + [ + get_test_data_file('testssl.sh'), + '--quiet', + '--nodns', + '--color', + '0', + test, + '127.0.0.1:%d' % self.port, + ], + ) + + def on_result(results): + + if test == '-e': + results = results[0].split('\n')[7:-6] + self.assertTrue(len(results) > 3) + else: + self.assertIn('OK', results[0]) + self.assertNotIn('NOT ok', results[0]) + + d.addCallback(on_result) + return d + + def test_secured_webserver_protocol(self): + return self._run_test('-p') + + def test_secured_webserver_standard_ciphers(self): + return self._run_test('-s') + + def test_secured_webserver_heartbleed_vulnerability(self): + return self._run_test('-H') + + def test_secured_webserver_css_injection_vulnerability(self): + return self._run_test('-I') + + def test_secured_webserver_ticketbleed_vulnerability(self): + return self._run_test('-T') + + def test_secured_webserver_renegotiation_vulnerabilities(self): + return self._run_test('-R') + + def test_secured_webserver_crime_vulnerability(self): + return self._run_test('-C') + + def test_secured_webserver_breach_vulnerability(self): + return self._run_test('-B') + + def test_secured_webserver_poodle_vulnerability(self): + return self._run_test('-O') + + def test_secured_webserver_tls_fallback_scsv_mitigation_vulnerability(self): + return self._run_test('-Z') + + def test_secured_webserver_sweet32_vulnerability(self): + return self._run_test('-W') + + def test_secured_webserver_beast_vulnerability(self): + return self._run_test('-A') + + def test_secured_webserver_lucky13_vulnerability(self): + return self._run_test('-L') + + def test_secured_webserver_freak_vulnerability(self): + return self._run_test('-F') + + def test_secured_webserver_logjam_vulnerability(self): + return self._run_test('-J') + + def test_secured_webserver_drown_vulnerability(self): + return self._run_test('-D') + + def test_secured_webserver_forward_secrecy_settings(self): + return self._run_test('-f') + + def test_secured_webserver_rc4_ciphers(self): + return self._run_test('-4') + + def test_secured_webserver_preference(self): + return self._run_test('-P') + + def test_secured_webserver_headers(self): + return self._run_test('-h') + + def test_secured_webserver_ciphers(self): + return self._run_test('-e') + + +@pytest.mark.security +class DaemonSecurityTestCase(BaseTestCase, DaemonBase, SecurityBaseTestCase): + + if windows_check(): + skip = 'windows can\'t start_core not enough arguments for format string' + + def __init__(self, testname): + super(DaemonSecurityTestCase, self).__init__(testname) + DaemonBase.__init__(self) + SecurityBaseTestCase.__init__(self) + + def setUp(self): + skip = False + for not_http_test in ('breach', 'headers', 'ticketbleed'): + if not_http_test in self.id().split('.')[-1]: + self.skipTest(SecurityBaseTestCase.http_err) + skip = True + if not skip: + super(DaemonSecurityTestCase, self).setUp() + + def set_up(self): + d = self.common_set_up() + self.port = self.listen_port + d.addCallback(self.start_core) + d.addErrback(self.terminate_core) + return d + + def tear_down(self): + d = component.shutdown() + d.addCallback(self.terminate_core) + return d + + +@pytest.mark.security +class WebUISecurityTestBase(WebServerTestBase, SecurityBaseTestCase): + def __init__(self, testname): + super(WebUISecurityTestBase, self).__init__(testname) + SecurityBaseTestCase.__init__(self) + + def start_webapi(self, arg): + self.port = self.webserver_listen_port = 8999 + + config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy() + config_defaults['port'] = self.webserver_listen_port + config_defaults['https'] = True + self.config = configmanager.ConfigManager('web.conf', config_defaults) + + self.deluge_web = DelugeWeb(daemon=False) + + host = list(self.deluge_web.web_api.hostlist.config['hosts'][0]) + host[2] = self.listen_port + self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host) + self.host_id = host[0] + self.deluge_web.start() diff --git a/deluge/tests/test_sessionproxy.py b/deluge/tests/test_sessionproxy.py new file mode 100644 index 0000000..03f3cc2 --- /dev/null +++ b/deluge/tests/test_sessionproxy.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from twisted.internet.defer import maybeDeferred, succeed +from twisted.internet.task import Clock + +import deluge.component as component +import deluge.ui.sessionproxy + +from .basetest import BaseTestCase + + +class Core(object): + def __init__(self): + self.reset() + + def reset(self): + self.torrents = {} + self.torrents['a'] = {'key1': 1, 'key2': 2, 'key3': 3} + self.torrents['b'] = {'key1': 1, 'key2': 2, 'key3': 3} + self.torrents['c'] = {'key1': 1, 'key2': 2, 'key3': 3} + self.prev_status = {} + + def get_session_state(self): + return maybeDeferred(self.torrents.keys) + + def get_torrent_status(self, torrent_id, keys, diff=False): + if not keys: + keys = list(self.torrents[torrent_id]) + + if not diff: + ret = {} + for key in keys: + ret[key] = self.torrents[torrent_id][key] + + return succeed(ret) + + else: + ret = {} + if torrent_id in self.prev_status: + for key in keys: + if ( + self.prev_status[torrent_id][key] + != self.torrents[torrent_id][key] + ): + ret[key] = self.torrents[torrent_id][key] + else: + ret = self.torrents[torrent_id] + self.prev_status[torrent_id] = dict(self.torrents[torrent_id]) + return succeed(ret) + + def get_torrents_status(self, filter_dict, keys, diff=False): + if not filter_dict: + filter_dict['id'] = list(self.torrents) + if not keys: + keys = list(self.torrents['a']) + if not diff: + if 'id' in filter_dict: + torrents = filter_dict['id'] + ret = {} + for torrent in torrents: + ret[torrent] = {} + for key in keys: + ret[torrent][key] = self.torrents[torrent][key] + return succeed(ret) + else: + if 'id' in filter_dict: + torrents = filter_dict['id'] + ret = {} + for torrent in torrents: + ret[torrent] = {} + if torrent in self.prev_status: + for key in self.prev_status[torrent]: + if ( + self.prev_status[torrent][key] + != self.torrents[torrent][key] + ): + ret[torrent][key] = self.torrents[torrent][key] + else: + ret[torrent] = dict(self.torrents[torrent]) + + self.prev_status[torrent] = dict(self.torrents[torrent]) + return succeed(ret) + + +class Client(object): + def __init__(self): + self.core = Core() + + def __noop__(self, *args, **kwargs): + return None + + def __getattr__(self, *args, **kwargs): + return self.__noop__ + + +client = Client() + + +class SessionProxyTestCase(BaseTestCase): + def set_up(self): + self.clock = Clock() + self.patch(deluge.ui.sessionproxy, 'time', self.clock.seconds) + self.patch(deluge.ui.sessionproxy, 'client', client) + self.sp = deluge.ui.sessionproxy.SessionProxy() + client.core.reset() + d = self.sp.start() + + def do_get_torrents_status(torrent_ids): + inital_keys = ['key1'] + # Advance clock to expire the cache times + self.clock.advance(2) + return self.sp.get_torrents_status({'id': torrent_ids}, inital_keys) + + d.addCallback(do_get_torrents_status) + return d + + def tear_down(self): + return component.deregister(self.sp) + + def test_startup(self): + self.assertEqual(client.core.torrents['a'], self.sp.torrents['a'][1]) + + def test_get_torrent_status_no_change(self): + d = self.sp.get_torrent_status('a', []) + d.addCallback(self.assertEqual, client.core.torrents['a']) + return d + + def test_get_torrent_status_change_with_cache(self): + client.core.torrents['a']['key1'] = 2 + d = self.sp.get_torrent_status('a', ['key1']) + d.addCallback(self.assertEqual, {'key1': 1}) + return d + + def test_get_torrent_status_change_without_cache(self): + client.core.torrents['a']['key1'] = 2 + self.clock.advance(self.sp.cache_time + 0.1) + d = self.sp.get_torrent_status('a', []) + d.addCallback(self.assertEqual, client.core.torrents['a']) + return d + + def test_get_torrent_status_key_not_updated(self): + self.clock.advance(self.sp.cache_time + 0.1) + self.sp.get_torrent_status('a', ['key1']) + client.core.torrents['a']['key2'] = 99 + d = self.sp.get_torrent_status('a', ['key2']) + d.addCallback(self.assertEqual, {'key2': 99}) + return d + + def test_get_torrents_status_key_not_updated(self): + self.clock.advance(self.sp.cache_time + 0.1) + self.sp.get_torrents_status({'id': ['a']}, ['key1']) + client.core.torrents['a']['key2'] = 99 + d = self.sp.get_torrents_status({'id': ['a']}, ['key2']) + d.addCallback(self.assertEqual, {'a': {'key2': 99}}) + return d diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py new file mode 100644 index 0000000..70fec47 --- /dev/null +++ b/deluge/tests/test_torrent.py @@ -0,0 +1,347 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import os +import time +from base64 import b64encode + +import mock +from twisted.internet import reactor +from twisted.internet.task import defer, deferLater +from twisted.trial import unittest + +import deluge.component as component +import deluge.core.torrent +import deluge.tests.common as common +from deluge._libtorrent import lt +from deluge.common import utf8_encode_structure, windows_check +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer +from deluge.core.torrent import Torrent +from deluge.core.torrentmanager import TorrentManager, TorrentState + +from .basetest import BaseTestCase + + +class TorrentTestCase(BaseTestCase): + def setup_config(self): + config_dir = common.set_tmp_config_dir() + core_config = deluge.config.Config( + 'core.conf', + defaults=deluge.core.preferencesmanager.DEFAULT_PREFS, + config_dir=config_dir, + ) + core_config.save() + + def set_up(self): + self.setup_config() + self.rpcserver = RPCServer(listen=False) + self.core = Core() + self.core.config.config['lsd'] = False + self.core.config.config['new_release_check'] = False + self.session = self.core.session + self.torrent = None + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + del self.core + + return component.shutdown().addCallback(on_shutdown) + + def print_priority_list(self, priorities): + tmp = '' + for i, p in enumerate(priorities): + if i % 100 == 0: + print(tmp) + tmp = '' + tmp += '%s' % p + print(tmp) + + def assert_state(self, torrent, state): + torrent.update_state() + self.assertEqual(torrent.state, state) + + def get_torrent_atp(self, filename): + filename = common.get_test_data_file(filename) + with open(filename, 'rb') as _file: + info = lt.torrent_info(lt.bdecode(_file.read())) + atp = { + 'ti': info, + 'save_path': os.getcwd(), + 'storage_mode': lt.storage_mode_t.storage_mode_sparse, + 'flags': ( + lt.add_torrent_params_flags_t.flag_auto_managed + | lt.add_torrent_params_flags_t.flag_duplicate_is_error + & ~lt.add_torrent_params_flags_t.flag_paused + ), + } + return atp + + def test_set_file_priorities(self): + atp = self.get_torrent_atp('dir_with_6_files.torrent') + handle = self.session.add_torrent(atp) + torrent = Torrent(handle, {}) + + result = torrent.get_file_priorities() + self.assertTrue(all(x == 4 for x in result)) + + new_priorities = [3, 1, 2, 0, 5, 6, 7] + torrent.set_file_priorities(new_priorities) + self.assertEqual(torrent.get_file_priorities(), new_priorities) + + # Test with handle.piece_priorities as handle.file_priorities async + # updates and will return old value. Also need to remove a priority + # value as one file is much smaller than piece size so doesn't show. + piece_prio = handle.piece_priorities() + result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7]) + self.assertTrue(result) + + def test_set_prioritize_first_last_pieces(self): + piece_indexes = [ + 0, + 1, + 50, + 51, + 52, + 110, + 111, + 112, + 113, + 200, + 201, + 202, + 212, + 213, + 214, + 215, + 216, + 217, + 457, + 458, + 459, + 460, + 461, + 462, + ] + self.run_test_set_prioritize_first_last_pieces( + 'dir_with_6_files.torrent', piece_indexes + ) + + def run_test_set_prioritize_first_last_pieces( + self, torrent_file, prioritized_piece_indexes + ): + atp = self.get_torrent_atp(torrent_file) + handle = self.session.add_torrent(atp) + + self.torrent = Torrent(handle, {}) + priorities_original = handle.piece_priorities() + self.torrent.set_prioritize_first_last_pieces(True) + priorities = handle.piece_priorities() + + # The length of the list of new priorites is the same as the original + self.assertEqual(len(priorities_original), len(priorities)) + + # Test the priority of all the pieces against the calculated indexes. + for idx, priority in enumerate(priorities): + if idx in prioritized_piece_indexes: + self.assertEqual(priorities[idx], 7) + else: + self.assertEqual(priorities[idx], 4) + + # self.print_priority_list(priorities) + + def test_set_prioritize_first_last_pieces_false(self): + atp = self.get_torrent_atp('dir_with_6_files.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + # First set some pieces prioritized + self.torrent.set_prioritize_first_last_pieces(True) + # Reset pirorities + self.torrent.set_prioritize_first_last_pieces(False) + priorities = handle.piece_priorities() + + # Test the priority of the prioritized pieces + for i in priorities: + self.assertEqual(priorities[i], 4) + + # self.print_priority_list(priorities) + + def test_torrent_error_data_missing(self): + if windows_check(): + raise unittest.SkipTest('unexpected end of file in bencoded string') + options = {'seed_mode': True} + filename = common.get_test_data_file('test_torrent.file.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = self.core.add_torrent_file(filename, filedump, options) + torrent = self.core.torrentmanager.torrents[torrent_id] + + time.sleep(0.5) # Delay to wait for lt to finish check on Travis. + self.assert_state(torrent, 'Seeding') + + # Force an error by reading (non-existant) piece from disk + torrent.handle.read_piece(0) + time.sleep(0.2) # Delay to wait for alert from lt + self.assert_state(torrent, 'Error') + + def test_torrent_error_resume_original_state(self): + if windows_check(): + raise unittest.SkipTest('unexpected end of file in bencoded string') + options = {'seed_mode': True, 'add_paused': True} + filename = common.get_test_data_file('test_torrent.file.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = self.core.add_torrent_file(filename, filedump, options) + torrent = self.core.torrentmanager.torrents[torrent_id] + + orig_state = 'Paused' + self.assert_state(torrent, orig_state) + + # Force an error by reading (non-existant) piece from disk + torrent.handle.read_piece(0) + time.sleep(0.2) # Delay to wait for alert from lt + self.assert_state(torrent, 'Error') + + # Clear error and verify returned to original state + torrent.force_recheck() + + def test_torrent_error_resume_data_unaltered(self): + if windows_check(): + raise unittest.SkipTest('unexpected end of file in bencoded string') + if lt.__version__.split('.')[1] == '2': + raise unittest.SkipTest('Test not working as expected on lt 1.2') + + resume_data = { + 'active_time': 13399, + 'num_incomplete': 16777215, + 'announce_to_lsd': 1, + 'seed_mode': 0, + 'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', + 'paused': 0, + 'seeding_time': 13399, + 'last_scrape': 13399, + 'info-hash': '-\xc5\xd0\xe7\x1af\xfeid\x9ad\r9\xcb\x00\xa2YpIs', + 'max_uploads': 16777215, + 'max_connections': 16777215, + 'num_downloaders': 16777215, + 'total_downloaded': 0, + 'file-format': 'libtorrent resume file', + 'peers6': '', + 'added_time': 1411826665, + 'banned_peers6': '', + 'file_priority': [1], + 'last_seen_complete': 0, + 'total_uploaded': 0, + 'piece_priority': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', + 'file-version': 1, + 'announce_to_dht': 1, + 'auto_managed': 1, + 'upload_rate_limit': 0, + 'completed_time': 1411826665, + 'allocation': 'sparse', + 'blocks per piece': 2, + 'download_rate_limit': 0, + 'libtorrent-version': '0.16.17.0', + 'banned_peers': '', + 'num_seeds': 16777215, + 'sequential_download': 0, + 'announce_to_trackers': 1, + 'peers': '\n\x00\x02\x0f=\xc6SC\x17]\xd8}\x7f\x00\x00\x01=\xc6', + 'finished_time': 13399, + 'last_upload': 13399, + 'trackers': [[]], + 'super_seeding': 0, + 'file sizes': [[512000, 1411826586]], + 'last_download': 13399, + } + torrent_state = TorrentState( + torrent_id='2dc5d0e71a66fe69649a640d39cb00a259704973', + filename='test_torrent.file.torrent', + name='', + save_path='/home/ubuntu/Downloads', + file_priorities=[1], + is_finished=True, + ) + + filename = common.get_test_data_file('test_torrent.file.torrent') + with open(filename, 'rb') as _file: + filedump = _file.read() + resume_data = utf8_encode_structure(resume_data) + torrent_id = self.core.torrentmanager.add( + state=torrent_state, filedump=filedump, resume_data=lt.bencode(resume_data) + ) + torrent = self.core.torrentmanager.torrents[torrent_id] + + def assert_resume_data(): + self.assert_state(torrent, 'Error') + tm_resume_data = lt.bdecode( + self.core.torrentmanager.resume_data[torrent.torrent_id] + ) + self.assertEqual(tm_resume_data, resume_data) + + return deferLater(reactor, 0.5, assert_resume_data) + + def test_get_eta_seeding(self): + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + self.assertEqual(self.torrent.get_eta(), 0) + self.torrent.status = mock.MagicMock() + + self.torrent.status.upload_payload_rate = 5000 + self.torrent.status.download_payload_rate = 0 + self.torrent.status.all_time_download = 10000 + self.torrent.status.all_time_upload = 500 + self.torrent.is_finished = True + self.torrent.options = {'stop_at_ratio': False} + # Test finished and uploading but no stop_at_ratio set. + self.assertEqual(self.torrent.get_eta(), 0) + + self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5} + result = self.torrent.get_eta() + self.assertEqual(result, 2) + self.assertIsInstance(result, int) + + def test_get_eta_downloading(self): + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + self.assertEqual(self.torrent.get_eta(), 0) + + self.torrent.status = mock.MagicMock() + self.torrent.status.download_payload_rate = 50 + self.torrent.status.total_wanted = 10000 + self.torrent.status.total_wanted_done = 5000 + + result = self.torrent.get_eta() + self.assertEqual(result, 100) + self.assertIsInstance(result, int) + + def test_get_name_unicode(self): + """Test retrieving a unicode torrent name from libtorrent.""" + atp = self.get_torrent_atp('unicode_file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + self.assertEqual(self.torrent.get_name(), 'সুকুমার রায়.mkv') + + def test_rename_unicode(self): + """Test renaming file/folders with unicode filenames.""" + atp = self.get_torrent_atp('unicode_filenames.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + # Ignore TorrentManager method call + TorrentManager.save_resume_data = mock.MagicMock + + result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв') + self.assertIsInstance(result, defer.DeferredList) + + result = self.torrent.rename_files([[0, 'new_рбачёв']]) + self.assertIsNone(result) diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py new file mode 100644 index 0000000..bf84f45 --- /dev/null +++ b/deluge/tests/test_torrentmanager.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import warnings +from base64 import b64encode + +import mock +import pytest +from twisted.internet import defer, task + +from deluge import component +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer +from deluge.error import InvalidTorrentError + +from . import common +from .basetest import BaseTestCase + +warnings.filterwarnings('ignore', category=RuntimeWarning) +warnings.resetwarnings() + + +class TorrentmanagerTestCase(BaseTestCase): + def set_up(self): + common.set_tmp_config_dir() + self.rpcserver = RPCServer(listen=False) + self.core = Core() + self.core.config.config['lsd'] = False + self.clock = task.Clock() + self.tm = self.core.torrentmanager + self.tm.callLater = self.clock.callLater + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + del self.core + + return component.shutdown().addCallback(on_shutdown) + + @defer.inlineCallbacks + def test_remove_torrent(self): + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = _file.read() + torrent_id = yield self.core.add_torrent_file_async( + filename, b64encode(filedump), {} + ) + self.assertTrue(self.tm.remove(torrent_id, False)) + + def test_prefetch_metadata(self): + from deluge._libtorrent import lt + + with open(common.get_test_data_file('test.torrent'), 'rb') as _file: + t_info = lt.torrent_info(lt.bdecode(_file.read())) + mock_alert = mock.MagicMock() + mock_alert.handle.info_hash = mock.MagicMock( + return_value='ab570cdd5a17ea1b61e970bb72047de141bce173' + ) + mock_alert.handle.get_torrent_info = mock.MagicMock(return_value=t_info) + + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + d = self.tm.prefetch_metadata(magnet, 30) + self.tm.on_alert_metadata_received(mock_alert) + + expected = ( + 'ab570cdd5a17ea1b61e970bb72047de141bce173', + { + b'piece length': 32768, + b'sha1': ( + b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh' + b'\x9d\xc5\xb7\xac\xdd' + ), + b'name': b'azcvsupdater_2.6.2.jar', + b'private': 0, + b'pieces': ( + b'\xdb\x04B\x05\xc3\'\xdab\xb8su97\xa9u' + b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd' + b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7' + b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3' + b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0' + b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8' + b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed' + b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA' + b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f' + b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee' + b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc' + b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu' + b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?' + b'\xa1\xd6\x8c\x83\x9e&' + ), + b'length': 307949, + b'name.utf-8': b'azcvsupdater_2.6.2.jar', + b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7', + }, + ) + self.assertEqual(expected, self.successResultOf(d)) + + def test_prefetch_metadata_timeout(self): + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + d = self.tm.prefetch_metadata(magnet, 30) + self.clock.advance(30) + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None) + return d.addCallback(self.assertEqual, expected) + + @pytest.mark.todo + def test_remove_torrent_false(self): + """Test when remove_torrent returns False""" + common.todo_test(self) + + def test_remove_invalid_torrent(self): + self.assertRaises( + InvalidTorrentError, self.tm.remove, 'torrentidthatdoesntexist' + ) diff --git a/deluge/tests/test_torrentview.py b/deluge/tests/test_torrentview.py new file mode 100644 index 0000000..590760d --- /dev/null +++ b/deluge/tests/test_torrentview.py @@ -0,0 +1,285 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2014 Bro <bro.development@gmail.com> +# Copyright (C) 2014 Calum Lind <calumlind@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import pytest +from twisted.trial import unittest + +import deluge.component as component +from deluge.configmanager import ConfigManager +from deluge.i18n import setup_translation + +from . import common +from .basetest import BaseTestCase + +# Allow running other tests without GTKUI dependencies available +try: + # pylint: disable=ungrouped-imports + from gi.repository.GObject import TYPE_UINT64 + + from deluge.ui.gtk3.gtkui import DEFAULT_PREFS + from deluge.ui.gtk3.mainwindow import MainWindow + from deluge.ui.gtk3.menubar import MenuBar + from deluge.ui.gtk3.torrentdetails import TorrentDetails + from deluge.ui.gtk3.torrentview import TorrentView +except (ImportError, ValueError): + libs_available = False + TYPE_UINT64 = 'Whatever' +else: + libs_available = True + +setup_translation() + + +@pytest.mark.gtkui +class TorrentviewTestCase(BaseTestCase): + + default_column_index = [ + 'filter', + 'torrent_id', + 'dirty', + '#', + 'Name', + 'Size', + 'Downloaded', + 'Uploaded', + 'Remaining', + 'Progress', + 'Seeds', + 'Peers', + 'Seeds:Peers', + 'Down Speed', + 'Up Speed', + 'Down Limit', + 'Up Limit', + 'ETA', + 'Ratio', + 'Avail', + 'Added', + 'Completed', + 'Complete Seen', + 'Tracker', + 'Download Folder', + 'Owner', + 'Shared', + ] + default_liststore_columns = [ + bool, + str, + bool, + int, + str, + str, # Name + TYPE_UINT64, + TYPE_UINT64, + TYPE_UINT64, + TYPE_UINT64, + float, + str, # Progress + int, + int, + int, + int, + float, # Seeds, Peers + int, + int, + float, + float, + int, + float, + float, # ETA, Ratio, Avail + int, + int, + int, + str, + str, # Tracker + str, + str, + bool, + ] # shared + + def set_up(self): + if libs_available is False: + raise unittest.SkipTest('GTKUI dependencies not available') + + common.set_tmp_config_dir() + # MainWindow loads this config file, so lets make sure it contains the defaults + ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS) + self.mainwindow = MainWindow() + self.torrentview = TorrentView() + self.torrentdetails = TorrentDetails() + self.menubar = MenuBar() + + def tear_down(self): + return component.shutdown() + + def test_torrentview_columns(self): + + self.assertEqual( + self.torrentview.column_index, TorrentviewTestCase.default_column_index + ) + self.assertEqual( + self.torrentview.liststore_columns, + TorrentviewTestCase.default_liststore_columns, + ) + self.assertEqual( + self.torrentview.columns['Download Folder'].column_indices, [29] + ) + + def test_add_column(self): + + # Add a text column + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns) + 1, + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index) + 1, + ) + self.assertEqual(self.torrentview.column_index[-1], test_col) + self.assertEqual(self.torrentview.columns[test_col].column_indices, [32]) + + def test_add_columns(self): + + # Add a text column + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + + # Add a second text column + test_col2 = 'Test column2' + self.torrentview.add_text_column(test_col2, status_field=['label2']) + + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns) + 2, + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index) + 2, + ) + # test_col + self.assertEqual(self.torrentview.column_index[-2], test_col) + self.assertEqual(self.torrentview.columns[test_col].column_indices, [32]) + + # test_col2 + self.assertEqual(self.torrentview.column_index[-1], test_col2) + self.assertEqual(self.torrentview.columns[test_col2].column_indices, [33]) + + def test_remove_column(self): + + # Add and remove text column + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + self.torrentview.remove_column(test_col) + + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns), + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index), + ) + self.assertEqual( + self.torrentview.column_index[-1], + TorrentviewTestCase.default_column_index[-1], + ) + self.assertEqual( + self.torrentview.columns[ + TorrentviewTestCase.default_column_index[-1] + ].column_indices, + [31], + ) + + def test_remove_columns(self): + + # Add two columns + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + test_col2 = 'Test column2' + self.torrentview.add_text_column(test_col2, status_field=['label2']) + + # Remove test_col + self.torrentview.remove_column(test_col) + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns) + 1, + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index) + 1, + ) + self.assertEqual(self.torrentview.column_index[-1], test_col2) + self.assertEqual(self.torrentview.columns[test_col2].column_indices, [32]) + + # Remove test_col2 + self.torrentview.remove_column(test_col2) + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns), + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index), + ) + self.assertEqual( + self.torrentview.column_index[-1], + TorrentviewTestCase.default_column_index[-1], + ) + self.assertEqual( + self.torrentview.columns[ + TorrentviewTestCase.default_column_index[-1] + ].column_indices, + [31], + ) + + def test_add_remove_column_multiple_types(self): + + # Add a column with multiple column types + test_col3 = 'Test column3' + self.torrentview.add_progress_column( + test_col3, status_field=['progress', 'label3'], col_types=[float, str] + ) + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns) + 2, + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index) + 1, + ) + self.assertEqual(self.torrentview.column_index[-1], test_col3) + self.assertEqual(self.torrentview.columns[test_col3].column_indices, [32, 33]) + + # Remove multiple column-types column + self.torrentview.remove_column(test_col3) + + self.assertEqual( + len(self.torrentview.liststore_columns), + len(TorrentviewTestCase.default_liststore_columns), + ) + self.assertEqual( + len(self.torrentview.column_index), + len(TorrentviewTestCase.default_column_index), + ) + self.assertEqual( + self.torrentview.column_index[-1], + TorrentviewTestCase.default_column_index[-1], + ) + self.assertEqual( + self.torrentview.columns[ + TorrentviewTestCase.default_column_index[-1] + ].column_indices, + [31], + ) diff --git a/deluge/tests/test_tracker_icons.py b/deluge/tests/test_tracker_icons.py new file mode 100644 index 0000000..e18d339 --- /dev/null +++ b/deluge/tests/test_tracker_icons.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import pytest +from twisted.trial.unittest import SkipTest + +import deluge.component as component +import deluge.ui.tracker_icons +from deluge.common import windows_check +from deluge.ui.tracker_icons import TrackerIcon, TrackerIcons + +from . import common +from .basetest import BaseTestCase + +common.set_tmp_config_dir() +deluge.ui.tracker_icons.PIL_INSTALLED = False +common.disable_new_release_check() + + +@pytest.mark.internet +class TrackerIconsTestCase(BaseTestCase): + + if windows_check(): + skip = 'cannot use os.path.samefile to compair on windows(unix only)' + + def set_up(self): + # Disable resizing with Pillow for consistency. + self.patch(deluge.ui.tracker_icons, 'Image', None) + self.icons = TrackerIcons() + + def tear_down(self): + return component.shutdown() + + def test_get_deluge_png(self): + # Deluge has a png favicon link + icon = TrackerIcon(common.get_test_data_file('deluge.png')) + d = self.icons.fetch('deluge-torrent.org') + d.addCallback(self.assertNotIdentical, None) + d.addCallback(self.assertEqual, icon) + return d + + def test_get_google_ico(self): + # Google doesn't have any icon links + # So instead we'll grab its favicon.ico + icon = TrackerIcon(common.get_test_data_file('google.ico')) + d = self.icons.fetch('www.google.com') + d.addCallback(self.assertNotIdentical, None) + d.addCallback(self.assertEqual, icon) + return d + + def test_get_google_ico_with_redirect(self): + # google.com redirects to www.google.com + icon = TrackerIcon(common.get_test_data_file('google.ico')) + d = self.icons.fetch('google.com') + d.addCallback(self.assertNotIdentical, None) + d.addCallback(self.assertEqual, icon) + return d + + def test_get_seo_ico_with_sni(self): + # seo using certificates with SNI support only + raise SkipTest('Site certificate expired') + icon = TrackerIcon(common.get_test_data_file('seo.ico')) + d = self.icons.fetch('www.seo.com') + d.addCallback(self.assertNotIdentical, None) + d.addCallback(self.assertEqual, icon) + return d + + def test_get_empty_string_tracker(self): + d = self.icons.fetch('') + d.addCallback(self.assertIdentical, None) + return d diff --git a/deluge/tests/test_transfer.py b/deluge/tests/test_transfer.py new file mode 100644 index 0000000..a048303 --- /dev/null +++ b/deluge/tests/test_transfer.py @@ -0,0 +1,403 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2012 Bro <bro.development@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import base64 + +import rencode +from twisted.trial import unittest + +import deluge.log +from deluge.transfer import DelugeTransferProtocol + +deluge.log.setup_logger('none') + + +class TransferTestClass(DelugeTransferProtocol): + def __init__(self): + DelugeTransferProtocol.__init__(self) + self.transport = self + self.messages_out = [] + self.messages_in = [] + self.packet_count = 0 + + def write(self, message): + """ + Called by DelugeTransferProtocol class + This simulates the write method of the self.transport in DelugeTransferProtocol. + """ + self.messages_out.append(message) + + def message_received(self, message): + """ + This method overrides message_received is DelugeTransferProtocol and is + called with the complete message as it was sent by DelugeRPCProtocol + """ + self.messages_in.append(message) + + def get_messages_out_joined(self): + return b''.join(self.messages_out) + + def get_messages_in(self): + return self.messages_in + + def data_received_old_protocol(self, data): + """ + This is the original method logic (as close as possible) for handling data receival on the client + + :param data: a zlib compressed string encoded with rencode. + + """ + import zlib + + print('\n=== New Data Received ===\nBytes received:', len(data)) + + if self._buffer: + # We have some data from the last dataReceived() so lets prepend it + print('Current buffer:', len(self._buffer) if self._buffer else '0') + data = self._buffer + data + self._buffer = None + + self.packet_count += 1 + self._bytes_received += len(data) + + while data: + print('\n-- Handle packet data --') + + print('Bytes received:', self._bytes_received) + print('Current data:', len(data)) + + if self._message_length == 0: + # handle_new_message uses _buffer so set data to _buffer. + self._buffer = data + self._handle_new_message() + data = self._buffer + self._buffer = None + self.packet_count = 1 + print('New message of length:', self._message_length) + + dobj = zlib.decompressobj() + try: + request = rencode.loads(dobj.decompress(data)) + print('Successfully loaded message', end=' ') + print( + ' - Buffer length: %d, data length: %d, unused length: %d' + % ( + len(data), + len(data) - len(dobj.unused_data), + len(dobj.unused_data), + ) + ) + print('Packet count:', self.packet_count) + except Exception as ex: + # log.debug('Received possible invalid message (%r): %s', data, e) + # This could be cut-off data, so we'll save this in the buffer + # and try to prepend it on the next dataReceived() + self._buffer = data + print( + 'Failed to load buffer (size %d): %s' % (len(self._buffer), str(ex)) + ) + return + else: + data = dobj.unused_data + self._message_length = 0 + + self.message_received(request) + + +class DelugeTransferProtocolTestCase(unittest.TestCase): + def setUp(self): # NOQA: N803 + """ + The expected messages corresponds to the test messages (msg1, msg2) after they've been processed + by DelugeTransferProtocol.send, which means that they've first been encoded with rencode, + and then compressed with zlib. + The expected messages are encoded in base64 to easily including it here in the source. + So before comparing the results with the expected messages, the expected messages must be decoded, + or the result message be encoded in base64. + + """ + self.transfer = TransferTestClass() + self.msg1 = ( + 0, + 1, + {'key_int': 1242429423}, + {'key_str': b'some string'}, + {'key_bool': True}, + ) + self.msg2 = ( + 2, + 3, + {'key_float': 12424.29423}, + {'key_unicode': 'some string'}, + {'key_dict_with_tuple': {'key_tuple': (1, 2, 3)}}, + {'keylist': [4, '5', 6.7]}, + ) + + self.msg1_expected_compressed_base64 = ( + b'AQAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU' + b'XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ' + ) + self.msg2_expected_compressed_base64 = ( + b'AQAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3il+ZlJuenpH' + b'YX5+emKhSXFGXmpadPBkmkZCaXxJdnlmTEl5QW5KRCdIOZhxmB' + b'hrUDuTmZxSWHWRpNnRyupaUBAHYlJxI=' + ) + + def test_send_one_message(self): + """ + Send one message and test that it has been sent correctoly to the + method 'write' in self.transport. + + """ + self.transfer.transfer_message(self.msg1) + # Get the data as sent by DelugeTransferProtocol + messages = self.transfer.get_messages_out_joined() + base64_encoded = base64.b64encode(messages) + self.assertEqual(base64_encoded, self.msg1_expected_compressed_base64) + + def test_receive_one_message(self): + """ + Receive one message and test that it has been sent to the + method 'message_received'. + + """ + self.transfer.dataReceived( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + # Get the data as sent by DelugeTransferProtocol + messages = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(messages)) + + def test_receive_old_message(self): + """ + Receive an old message (with no header) and verify that the data is discarded. + + """ + self.transfer.dataReceived(rencode.dumps(self.msg1)) + self.assertEqual(len(self.transfer.get_messages_in()), 0) + self.assertEqual(self.transfer._message_length, 0) + self.assertEqual(len(self.transfer._buffer), 0) + + def test_receive_two_concatenated_messages(self): + """ + This test simply concatenates two messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data as one string. + + """ + two_concatenated = base64.b64decode( + self.msg1_expected_compressed_base64 + ) + base64.b64decode(self.msg2_expected_compressed_base64) + self.transfer.dataReceived(two_concatenated) + + # Get the data as sent by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + + def test_receive_three_messages_in_parts(self): + """ + This test concatenates three messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data in multiple parts. + + """ + msg_bytes = ( + base64.b64decode(self.msg1_expected_compressed_base64) + + base64.b64decode(self.msg2_expected_compressed_base64) + + base64.b64decode(self.msg1_expected_compressed_base64) + ) + packet_size = 40 + + one_message_byte_count = len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + two_messages_byte_count = one_message_byte_count + len( + base64.b64decode(self.msg2_expected_compressed_base64) + ) + three_messages_byte_count = two_messages_byte_count + len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + + for d in self.receive_parts_helper(msg_bytes, packet_size): + bytes_received = self.transfer.get_bytes_recv() + + if bytes_received >= three_messages_byte_count: + expected_msgs_received_count = 3 + elif bytes_received >= two_messages_byte_count: + expected_msgs_received_count = 2 + elif bytes_received >= one_message_byte_count: + expected_msgs_received_count = 1 + else: + expected_msgs_received_count = 0 + # Verify that the expected number of complete messages has arrived + self.assertEqual( + expected_msgs_received_count, len(self.transfer.get_messages_in()) + ) + + # Get the data as received by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + message3 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3)) + + # Remove underscore to enable test, or run the test directly: + # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol + def _test_rencode_fail_protocol(self): + """ + This test tries to test the protocol that relies on errors from rencode. + + """ + msg_bytes = ( + base64.b64decode(self.msg1_expected_compressed_base64) + + base64.b64decode(self.msg2_expected_compressed_base64) + + base64.b64decode(self.msg1_expected_compressed_base64) + ) + packet_size = 149 + + one_message_byte_count = len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + two_messages_byte_count = one_message_byte_count + len( + base64.b64decode(self.msg2_expected_compressed_base64) + ) + three_messages_byte_count = two_messages_byte_count + len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + + print() + + print( + 'Msg1 size:', + len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4, + ) + print( + 'Msg2 size:', + len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4, + ) + print( + 'Msg3 size:', + len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4, + ) + + print('one_message_byte_count:', one_message_byte_count) + print('two_messages_byte_count:', two_messages_byte_count) + print('three_messages_byte_count:', three_messages_byte_count) + + for d in self.receive_parts_helper( + msg_bytes, packet_size, self.transfer.data_received_old_protocol + ): + bytes_received = self.transfer.get_bytes_recv() + + if bytes_received >= three_messages_byte_count: + expected_msgs_received_count = 3 + elif bytes_received >= two_messages_byte_count: + expected_msgs_received_count = 2 + elif bytes_received >= one_message_byte_count: + expected_msgs_received_count = 1 + else: + expected_msgs_received_count = 0 + # Verify that the expected number of complete messages has arrived + if expected_msgs_received_count != len(self.transfer.get_messages_in()): + print( + 'Expected number of messages received is %d, but %d have been received.' + % ( + expected_msgs_received_count, + len(self.transfer.get_messages_in()), + ) + ) + + # Get the data as received by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + message3 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3)) + + def test_receive_middle_of_header(self): + """ + This test concatenates two messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data in two parts. + The first part contains the first message, plus two bytes of the next message. + The next part contains the rest of the message. + + This is a special case, as DelugeTransferProtocol can't start parsing + a message until it has at least 5 bytes (the size of the header) to be able + to read and parse the size of the payload. + + """ + two_concatenated = base64.b64decode( + self.msg1_expected_compressed_base64 + ) + base64.b64decode(self.msg2_expected_compressed_base64) + first_len = len(base64.b64decode(self.msg1_expected_compressed_base64)) + + # Now found the entire first message, and half the header of the next message (2 bytes into the header) + self.transfer.dataReceived(two_concatenated[: first_len + 2]) + + # Should be 1 message in the list + self.assertEqual(1, len(self.transfer.get_messages_in())) + + # Send the rest + self.transfer.dataReceived(two_concatenated[first_len + 2 :]) + + # Should be 2 messages in the list + self.assertEqual(2, len(self.transfer.get_messages_in())) + + # Get the data as sent by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + + # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon + # def test_simulate_big_transfer(self): + # filename = '../deluge.torrentlist' + # + # f = open(filename, 'r') + # data = f.read() + # message_to_send = eval(data) + # self.transfer.transfer_message(message_to_send) + # + # Get the data as sent to the network by DelugeTransferProtocol + # compressed_data = self.transfer.get_messages_out_joined() + # packet_size = 16000 # Or something smaller... + # + # for d in self.receive_parts_helper(compressed_data, packet_size): + # bytes_recv = self.transfer.get_bytes_recv() + # if bytes_recv < len(compressed_data): + # self.assertEqual(len(self.transfer.get_messages_in()), 0) + # else: + # self.assertEqual(len(self.transfer.get_messages_in()), 1) + # Get the data as received by DelugeTransferProtocol + # transfered_message = self.transfer.get_messages_in().pop(0) + # Test that the data structures are equal + # self.assertEqual(transfered_message, message_to_send) + # self.assertTrue(transfered_message == message_to_send) + # + # f.close() + # f = open('rencode.torrentlist', 'w') + # f.write(str(transfered_message)) + # f.close() + + def receive_parts_helper(self, data, packet_size, receive_func=None): + byte_count = len(data) + sent_bytes = 0 + while byte_count > 0: + to_receive = packet_size if byte_count > packet_size else byte_count + sent_bytes += to_receive + byte_count -= to_receive + if receive_func: + receive_func(data[:to_receive]) + else: + self.transfer.dataReceived(data[:to_receive]) + data = data[to_receive:] + yield diff --git a/deluge/tests/test_ui_common.py b/deluge/tests/test_ui_common.py new file mode 100644 index 0000000..dffd884 --- /dev/null +++ b/deluge/tests/test_ui_common.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +from __future__ import unicode_literals + +from six import assertCountEqual +from twisted.trial import unittest + +from deluge.common import windows_check +from deluge.ui.common import TorrentInfo + +from . import common + + +class UICommonTestCase(unittest.TestCase): + def setUp(self): # NOQA: N803 + pass + + def tearDown(self): # NOQA: N803 + pass + + def test_utf8_encoded_paths(self): + filename = common.get_test_data_file('test.torrent') + ti = TorrentInfo(filename) + self.assertTrue('azcvsupdater_2.6.2.jar' in ti.files_tree) + + def test_utf8_encoded_paths2(self): + if windows_check(): + raise unittest.SkipTest('on windows KeyError: unicode_filenames') + filename = common.get_test_data_file('unicode_filenames.torrent') + filepath1 = '\u30c6\u30af\u30b9\u30fb\u30c6\u30af\u30b5\u30f3.mkv' + filepath2 = ( + '\u041c\u0438\u0445\u0430\u0438\u043b \u0413\u043e' + '\u0440\u0431\u0430\u0447\u0451\u0432.mkv' + ) + filepath3 = "Alisher ibn G'iyosiddin Navoiy.mkv" + filepath4 = 'Ascii title.mkv' + filepath5 = '\u09b8\u09c1\u0995\u09c1\u09ae\u09be\u09b0 \u09b0\u09be\u09df.mkv' + + ti = TorrentInfo(filename) + files_tree = ti.files_tree['unicode_filenames'] + self.assertIn(filepath1, files_tree) + self.assertIn(filepath2, files_tree) + self.assertIn(filepath3, files_tree) + self.assertIn(filepath4, files_tree) + self.assertIn(filepath5, files_tree) + + result_files = [ + { + 'download': True, + 'path': 'unicode_filenames/' + filepath3, + 'size': 126158658, + }, + { + 'download': True, + 'path': 'unicode_filenames/' + filepath4, + 'size': 189321363, + }, + { + 'download': True, + 'path': 'unicode_filenames/' + filepath2, + 'size': 106649699, + }, + { + 'download': True, + 'path': 'unicode_filenames/' + filepath5, + 'size': 21590269, + }, + {'download': True, 'path': 'unicode_filenames/' + filepath1, 'size': 1771}, + ] + + assertCountEqual(self, ti.files, result_files) diff --git a/deluge/tests/test_ui_console.py b/deluge/tests/test_ui_console.py new file mode 100644 index 0000000..8c67322 --- /dev/null +++ b/deluge/tests/test_ui_console.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import argparse + +from deluge.common import windows_check +from deluge.ui.console.cmdline.commands.add import Command +from deluge.ui.console.widgets.fields import TextInput + +from .basetest import BaseTestCase + + +class MockParent(object): + def __init__(self): + self.border_off_x = 1 + self.pane_width = 20 + self.encoding = 'utf8' + + +class UIConsoleFieldTestCase(BaseTestCase): + def setUp(self): # NOQA: N803 + self.parent = MockParent() + + def tearDown(self): # NOQA: N803 + pass + + def test_text_input(self): + def move_func(self, r, c): + self._cursor_row = r + self._cursor_col = c + + t = TextInput( + self.parent, + 'name', + 'message', + move_func, + 20, + '/text/field/file/path', + complete=False, + ) + self.assertTrue(t) + if not windows_check(): + self.assertTrue(t.handle_read(33)) + + +class UIConsoleCommandsTestCase(BaseTestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_add_move_completed(self): + completed_path = 'completed_path' + parser = argparse.ArgumentParser() + cmd = Command() + cmd.add_arguments(parser) + args = parser.parse_args(['torrent', '-m', completed_path]) + self.assertEqual(args.move_completed_path, completed_path) + args = parser.parse_args(['torrent', '--move-path', completed_path]) + self.assertEqual(args.move_completed_path, completed_path) diff --git a/deluge/tests/test_ui_entry.py b/deluge/tests/test_ui_entry.py new file mode 100644 index 0000000..1d405a1 --- /dev/null +++ b/deluge/tests/test_ui_entry.py @@ -0,0 +1,513 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import print_function, unicode_literals + +import argparse +import sys +from io import StringIO + +import mock +import pytest +from twisted.internet import defer + +import deluge +import deluge.component as component +import deluge.ui.web.server +from deluge.common import PY2, get_localhost_auth, windows_check +from deluge.ui import ui_entry +from deluge.ui.web.server import DelugeWeb + +from . import common +from .basetest import BaseTestCase +from .daemon_base import DaemonBase + +if not windows_check(): + import deluge.ui.console + import deluge.ui.console.cmdline.commands.quit + import deluge.ui.console.main + +DEBUG_COMMAND = False + +sys_stdout = sys.stdout +# To catch output to stdout/stderr while running unit tests, we patch +# the file descriptors in sys and argparse._sys with StringFileDescriptor. +# Regular print statements from such tests will therefore write to the +# StringFileDescriptor object instead of the terminal. +# To print to terminal from the tests, use: print('Message...', file=sys_stdout) + + +class StringFileDescriptor(object): + """File descriptor that writes to string buffer""" + + def __init__(self, fd): + self.out = StringIO() + self.fd = fd + for a in ['encoding']: + setattr(self, a, getattr(sys_stdout, a)) + + def write(self, *data, **kwargs): + # io.StringIO requires unicode strings. + data_string = str(*data) + if PY2: + data_string = data_string.decode() + print(data_string, file=self.out, end='') + + def flush(self): + self.out.flush() + + +class UIBaseTestCase(object): + def __init__(self): + self.var = {} + + def set_up(self): + common.set_tmp_config_dir() + common.setup_test_logger(level='info', prefix=self.id()) + return component.start() + + def tear_down(self): + return component.shutdown() + + def exec_command(self): + if DEBUG_COMMAND: + print('Executing: %s\n' % sys.argv, file=sys_stdout) + return self.var['start_cmd']() + + +class UIWithDaemonBaseTestCase(UIBaseTestCase, DaemonBase): + """Subclass for test that require a deluged daemon""" + + def __init__(self): + UIBaseTestCase.__init__(self) + + def set_up(self): + d = self.common_set_up() + common.setup_test_logger(level='info', prefix=self.id()) + d.addCallback(self.start_core) + return d + + def tear_down(self): + d = UIBaseTestCase.tear_down(self) + d.addCallback(self.terminate_core) + return d + + +class DelugeEntryTestCase(BaseTestCase): + + if windows_check(): + skip = 'cannot test console ui on windows' + + def set_up(self): + common.set_tmp_config_dir() + return component.start() + + def tear_down(self): + return component.shutdown() + + def test_deluge_help(self): + self.patch(sys, 'argv', ['./deluge', '-h']) + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.assertRaises(SystemExit, ui_entry.start_ui) + self.assertTrue('usage: deluge' in fd.out.getvalue()) + self.assertTrue('UI Options:' in fd.out.getvalue()) + self.assertTrue('* console' in fd.out.getvalue()) + + def test_start_default(self): + self.patch(sys, 'argv', ['./deluge']) + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + # Just test that no exception is raised + ui_entry.start_ui() + + def test_start_with_log_level(self): + _level = [] + + def setup_logger( + level='error', + filename=None, + filemode='w', + logrotate=None, + output_stream=sys.stdout, + ): + _level.append(level) + + self.patch(deluge.log, 'setup_logger', setup_logger) + self.patch(sys, 'argv', ['./deluge', '-L', 'info']) + + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + # Just test that no exception is raised + ui_entry.start_ui() + + self.assertEqual(_level[0], 'info') + + +class GtkUIBaseTestCase(UIBaseTestCase): + """Implement all GtkUI tests here""" + + def test_start_gtk3ui(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd']) + + from deluge.ui.gtk3 import gtkui + + with mock.patch.object(gtkui.GtkUI, 'start', autospec=True): + self.exec_command() + + +@pytest.mark.gtkui +class GtkUIDelugeScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase): + def __init__(self, testname): + super(GtkUIDelugeScriptEntryTestCase, self).__init__(testname) + GtkUIBaseTestCase.__init__(self) + + self.var['cmd_name'] = 'deluge gtk' + self.var['start_cmd'] = ui_entry.start_ui + self.var['sys_arg_cmd'] = ['./deluge', 'gtk'] + + def set_up(self): + return GtkUIBaseTestCase.set_up(self) + + def tear_down(self): + return GtkUIBaseTestCase.tear_down(self) + + +@pytest.mark.gtkui +class GtkUIScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase): + def __init__(self, testname): + super(GtkUIScriptEntryTestCase, self).__init__(testname) + GtkUIBaseTestCase.__init__(self) + from deluge.ui import gtk3 + + self.var['cmd_name'] = 'deluge-gtk' + self.var['start_cmd'] = gtk3.start + self.var['sys_arg_cmd'] = ['./deluge-gtk'] + + def set_up(self): + return GtkUIBaseTestCase.set_up(self) + + def tear_down(self): + return GtkUIBaseTestCase.tear_down(self) + + +class DelugeWebMock(DelugeWeb): + def __init__(self, *args, **kwargs): + kwargs['daemon'] = False + DelugeWeb.__init__(self, *args, **kwargs) + + +class WebUIBaseTestCase(UIBaseTestCase): + """Implement all WebUI tests here""" + + def test_start_webserver(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd']) + self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock) + self.exec_command() + + def test_start_web_with_log_level(self): + _level = [] + + def setup_logger( + level='error', + filename=None, + filemode='w', + logrotate=None, + output_stream=sys.stdout, + ): + _level.append(level) + + self.patch(deluge.log, 'setup_logger', setup_logger) + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info']) + + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'web' + config.save() + + self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock) + self.exec_command() + self.assertEqual(_level[0], 'info') + + +class WebUIScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase): + + if windows_check(): + skip = 'cannot test console ui on windows' + + def __init__(self, testname): + super(WebUIScriptEntryTestCase, self).__init__(testname) + WebUIBaseTestCase.__init__(self) + self.var['cmd_name'] = 'deluge-web' + self.var['start_cmd'] = deluge.ui.web.start + self.var['sys_arg_cmd'] = ['./deluge-web', '--do-not-daemonize'] + + def set_up(self): + return WebUIBaseTestCase.set_up(self) + + def tear_down(self): + return WebUIBaseTestCase.tear_down(self) + + +class WebUIDelugeScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase): + + if windows_check(): + skip = 'cannot test console ui on windows' + + def __init__(self, testname): + super(WebUIDelugeScriptEntryTestCase, self).__init__(testname) + WebUIBaseTestCase.__init__(self) + self.var['cmd_name'] = 'deluge web' + self.var['start_cmd'] = ui_entry.start_ui + self.var['sys_arg_cmd'] = ['./deluge', 'web', '--do-not-daemonize'] + + def set_up(self): + return WebUIBaseTestCase.set_up(self) + + def tear_down(self): + return WebUIBaseTestCase.tear_down(self) + + +class ConsoleUIBaseTestCase(UIBaseTestCase): + """Implement Console tests that do not require a running daemon""" + + def test_start_console(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd']) + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.exec_command() + + def test_start_console_with_log_level(self): + _level = [] + + def setup_logger( + level='error', + filename=None, + filemode='w', + logrotate=None, + output_stream=sys.stdout, + ): + _level.append(level) + + self.patch(deluge.log, 'setup_logger', setup_logger) + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info']) + + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + # Just test that no exception is raised + self.exec_command() + + self.assertEqual(_level[0], 'info') + + def test_console_help(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-h']) + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.assertRaises(SystemExit, self.exec_command) + std_output = fd.out.getvalue() + self.assertTrue( + ('usage: %s' % self.var['cmd_name']) in std_output + ) # Check command name + self.assertTrue('Common Options:' in std_output) + self.assertTrue('Console Options:' in std_output) + self.assertTrue( + 'Console Commands:\n The following console commands are available:' + in std_output + ) + self.assertTrue( + 'The following console commands are available:' in std_output + ) + + def test_console_command_info(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info']) + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.exec_command() + + def test_console_command_info_help(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info', '-h']) + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.assertRaises(SystemExit, self.exec_command) + std_output = fd.out.getvalue() + self.assertTrue('usage: info' in std_output) + self.assertTrue('Show information about the torrents' in std_output) + + def test_console_unrecognized_arguments(self): + self.patch( + sys, 'argv', ['./deluge', '--ui', 'console'] + ) # --ui is not longer supported + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stderr', fd) + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.assertRaises(SystemExit, self.exec_command) + self.assertTrue('unrecognized arguments: --ui' in fd.out.getvalue()) + + +class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase): + """Implement Console tests that require a running daemon""" + + def set_up(self): + # Avoid calling reactor.shutdown after commands are executed by main.exec_args() + deluge.ui.console.main.reactor = common.ReactorOverride() + return UIWithDaemonBaseTestCase.set_up(self) + + def patch_arg_command(self, command): + if type(command) == str: + command = [command] + username, password = get_localhost_auth() + self.patch( + sys, + 'argv', + self.var['sys_arg_cmd'] + + ['--port'] + + ['58900'] + + ['--username'] + + [username] + + ['--password'] + + [password] + + command, + ) + + @defer.inlineCallbacks + def test_console_command_add(self): + filename = common.get_test_data_file('test.torrent') + self.patch_arg_command(['add ' + filename]) + fd = StringFileDescriptor(sys.stdout) + self.patch(sys, 'stdout', fd) + + yield self.exec_command() + + std_output = fd.out.getvalue() + self.assertTrue( + std_output + == 'Attempting to add torrent: ' + filename + '\nTorrent added!\n' + ) + + @defer.inlineCallbacks + def test_console_command_add_move_completed(self): + filename = common.get_test_data_file('test.torrent') + self.patch_arg_command( + [ + 'add --move-path /tmp ' + filename + ' ; status' + ' ; manage' + ' ab570cdd5a17ea1b61e970bb72047de141bce173' + ' move_completed' + ' move_completed_path' + ] + ) + fd = StringFileDescriptor(sys.stdout) + self.patch(sys, 'stdout', fd) + + yield self.exec_command() + + std_output = fd.out.getvalue() + self.assertTrue( + std_output.endswith('move_completed: True\nmove_completed_path: /tmp\n') + or std_output.endswith('move_completed_path: /tmp\nmove_completed: True\n') + ) + + @defer.inlineCallbacks + def test_console_command_status(self): + fd = StringFileDescriptor(sys.stdout) + self.patch_arg_command(['status']) + self.patch(sys, 'stdout', fd) + + yield self.exec_command() + + std_output = fd.out.getvalue() + self.assertTrue( + std_output.startswith('Total upload: ') + and std_output.endswith(' Moving: 0\n') + ) + + +class ConsoleScriptEntryWithDaemonTestCase( + BaseTestCase, ConsoleUIWithDaemonBaseTestCase +): + + if windows_check(): + skip = 'cannot test console ui on windows' + + def __init__(self, testname): + super(ConsoleScriptEntryWithDaemonTestCase, self).__init__(testname) + ConsoleUIWithDaemonBaseTestCase.__init__(self) + self.var['cmd_name'] = 'deluge-console' + self.var['sys_arg_cmd'] = ['./deluge-console'] + + def set_up(self): + from deluge.ui.console.console import Console + + def start_console(): + return Console().start() + + self.patch(deluge.ui.console, 'start', start_console) + self.var['start_cmd'] = deluge.ui.console.start + + return ConsoleUIWithDaemonBaseTestCase.set_up(self) + + def tear_down(self): + return ConsoleUIWithDaemonBaseTestCase.tear_down(self) + + +class ConsoleScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase): + + if windows_check(): + skip = 'cannot test console ui on windows' + + def __init__(self, testname): + super(ConsoleScriptEntryTestCase, self).__init__(testname) + ConsoleUIBaseTestCase.__init__(self) + self.var['cmd_name'] = 'deluge-console' + self.var['start_cmd'] = deluge.ui.console.start + self.var['sys_arg_cmd'] = ['./deluge-console'] + + def set_up(self): + return ConsoleUIBaseTestCase.set_up(self) + + def tear_down(self): + return ConsoleUIBaseTestCase.tear_down(self) + + +class ConsoleDelugeScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase): + + if windows_check(): + skip = 'cannot test console ui on windows' + + def __init__(self, testname): + super(ConsoleDelugeScriptEntryTestCase, self).__init__(testname) + ConsoleUIBaseTestCase.__init__(self) + self.var['cmd_name'] = 'deluge console' + self.var['start_cmd'] = ui_entry.start_ui + self.var['sys_arg_cmd'] = ['./deluge', 'console'] + + def set_up(self): + return ConsoleUIBaseTestCase.set_up(self) + + def tear_down(self): + return ConsoleUIBaseTestCase.tear_down(self) diff --git a/deluge/tests/test_web_api.py b/deluge/tests/test_web_api.py new file mode 100644 index 0000000..982a93b --- /dev/null +++ b/deluge/tests/test_web_api.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +from io import BytesIO + +from twisted.internet import defer, reactor +from twisted.python.failure import Failure +from twisted.web.client import Agent, FileBodyProducer +from twisted.web.http_headers import Headers +from twisted.web.static import File + +import deluge.component as component +from deluge.ui.client import client + +from . import common +from .common_web import WebServerTestBase + +common.disable_new_release_check() + + +class WebAPITestCase(WebServerTestBase): + def test_connect_invalid_host(self): + d = self.deluge_web.web_api.connect('id') + d.addCallback(self.fail) + d.addErrback(self.assertIsInstance, Failure) + return d + + def test_connect(self): + d = self.deluge_web.web_api.connect(self.host_id) + + def on_connect(result): + self.assertEqual(type(result), tuple) + self.assertTrue(len(result) > 0) + self.addCleanup(client.disconnect) + return result + + d.addCallback(on_connect) + d.addErrback(self.fail) + return d + + def test_disconnect(self): + d = self.deluge_web.web_api.connect(self.host_id) + + @defer.inlineCallbacks + def on_connect(result): + self.assertTrue(self.deluge_web.web_api.connected()) + yield self.deluge_web.web_api.disconnect() + self.assertFalse(self.deluge_web.web_api.connected()) + + d.addCallback(on_connect) + d.addErrback(self.fail) + return d + + def test_get_config(self): + config = self.deluge_web.web_api.get_config() + self.assertEqual(self.webserver_listen_port, config['port']) + + def test_set_config(self): + config = self.deluge_web.web_api.get_config() + config['pwd_salt'] = 'new_salt' + config['pwd_sha1'] = 'new_sha' + config['sessions'] = { + '233f23632af0a74748bc5dd1d8717564748877baa16420e6898e17e8aa365e6e': { + 'login': 'skrot', + 'expires': 1460030877.0, + 'level': 10, + } + } + self.deluge_web.web_api.set_config(config) + web_config = component.get('DelugeWeb').config.config + self.assertNotEquals(config['pwd_salt'], web_config['pwd_salt']) + self.assertNotEquals(config['pwd_sha1'], web_config['pwd_sha1']) + self.assertNotEquals(config['sessions'], web_config['sessions']) + + @defer.inlineCallbacks + def get_host_status(self): + host = list(self.deluge_web.web_api._get_host(self.host_id)) + host[3] = 'Online' + host[4] = '2.0.0.dev562' + status = yield self.deluge_web.web_api.get_host_status(self.host_id) + self.assertEqual(status, tuple(status)) + + def test_get_host(self): + self.assertFalse(self.deluge_web.web_api._get_host('invalid_id')) + conn = list(self.deluge_web.web_api.hostlist.get_hosts_info()[0]) + self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4]) + + def test_add_host(self): + conn = ['abcdef', '10.0.0.1', 0, 'user123', 'pass123'] + self.assertFalse(self.deluge_web.web_api._get_host(conn[0])) + # Add valid host + result, host_id = self.deluge_web.web_api.add_host( + conn[1], conn[2], conn[3], conn[4] + ) + self.assertEqual(result, True) + conn[0] = host_id + self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4]) + + # Add already existing host + ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4]) + self.assertEqual(ret, (False, 'Host details already in hostlist')) + + # Add invalid port + conn[2] = 'bad port' + ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4]) + self.assertEqual(ret, (False, 'Invalid port. Must be an integer')) + + def test_remove_host(self): + conn = ['connection_id', '', 0, '', ''] + self.deluge_web.web_api.hostlist.config['hosts'].append(conn) + self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4]) + # Remove valid host + self.assertTrue(self.deluge_web.web_api.remove_host(conn[0])) + self.assertFalse(self.deluge_web.web_api._get_host(conn[0])) + # Remove non-existing host + self.assertFalse(self.deluge_web.web_api.remove_host(conn[0])) + + def test_get_torrent_info(self): + filename = common.get_test_data_file('test.torrent') + ret = self.deluge_web.web_api.get_torrent_info(filename) + self.assertEqual(ret['name'], 'azcvsupdater_2.6.2.jar') + self.assertEqual(ret['info_hash'], 'ab570cdd5a17ea1b61e970bb72047de141bce173') + self.assertTrue('files_tree' in ret) + + def test_get_magnet_info(self): + ret = self.deluge_web.web_api.get_magnet_info( + 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN' + ) + self.assertEqual(ret['name'], '953bad769164e8482c7785a21d12166f94b9e14d') + self.assertEqual(ret['info_hash'], '953bad769164e8482c7785a21d12166f94b9e14d') + self.assertTrue('files_tree' in ret) + + @defer.inlineCallbacks + def test_get_torrent_files(self): + yield self.deluge_web.web_api.connect(self.host_id) + filename = common.get_test_data_file('test.torrent') + torrents = [ + {'path': filename, 'options': {'download_location': '/home/deluge/'}} + ] + yield self.deluge_web.web_api.add_torrents(torrents) + ret = yield self.deluge_web.web_api.get_torrent_files( + 'ab570cdd5a17ea1b61e970bb72047de141bce173' + ) + self.assertEqual(ret['type'], 'dir') + self.assertEqual( + ret['contents'], + { + 'azcvsupdater_2.6.2.jar': { + 'priority': 4, + 'index': 0, + 'offset': 0, + 'progress': 0.0, + 'path': 'azcvsupdater_2.6.2.jar', + 'type': 'file', + 'size': 307949, + } + }, + ) + + @defer.inlineCallbacks + def test_download_torrent_from_url(self): + filename = 'ubuntu-9.04-desktop-i386.iso.torrent' + self.deluge_web.top_level.putChild( + filename.encode(), File(common.get_test_data_file(filename)) + ) + url = 'http://localhost:%d/%s' % (self.webserver_listen_port, filename) + res = yield self.deluge_web.web_api.download_torrent_from_url(url) + self.assertTrue(res.endswith(filename)) + + @defer.inlineCallbacks + def test_invalid_json(self): + """ + If json_api._send_response does not return server.NOT_DONE_YET + this error is thrown when json is invalid: + exceptions.RuntimeError: Request.write called on a request after Request.finish was called. + + """ + agent = Agent(reactor) + bad_body = b'{ method": "auth.login" }' + d = yield agent.request( + b'POST', + b'http://127.0.0.1:%i/json' % self.webserver_listen_port, + Headers( + { + b'User-Agent': [b'Twisted Web Client Example'], + b'Content-Type': [b'application/json'], + } + ), + FileBodyProducer(BytesIO(bad_body)), + ) + yield d diff --git a/deluge/tests/test_web_auth.py b/deluge/tests/test_web_auth.py new file mode 100644 index 0000000..a518573 --- /dev/null +++ b/deluge/tests/test_web_auth.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +from __future__ import unicode_literals + +from mock import patch +from twisted.trial import unittest + +from deluge.ui.web import auth + + +class MockConfig(object): + def __init__(self, config): + self.config = config + + def __getitem__(self, key): + return self.config[key] + + def __setitem__(self, key, value): + self.config[key] = value + + +class WebAuthTestCase(unittest.TestCase): + @patch('deluge.ui.web.auth.JSONComponent.__init__', return_value=None) + def test_change_password(self, mock_json): + config = MockConfig( + { + 'pwd_sha1': '8d8ff487626141d2b91025901d3ab57211180b48', + 'pwd_salt': '7555d757710158655bd1646e207dee21a89e9226', + } + ) + webauth = auth.Auth(config) + self.assertTrue(webauth.change_password('deluge', 'deluge_new')) diff --git a/deluge/tests/test_webserver.py b/deluge/tests/test_webserver.py new file mode 100644 index 0000000..d9684ba --- /dev/null +++ b/deluge/tests/test_webserver.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import json as json_lib +from io import BytesIO + +import twisted.web.client +from twisted.internet import defer, reactor +from twisted.web.client import Agent, FileBodyProducer +from twisted.web.http_headers import Headers + +from . import common +from .common import get_test_data_file +from .common_web import WebServerMockBase, WebServerTestBase + +common.disable_new_release_check() + + +class WebServerTestCase(WebServerTestBase, WebServerMockBase): + @defer.inlineCallbacks + def test_get_torrent_info(self): + + agent = Agent(reactor) + + self.mock_authentication_ignore(self.deluge_web.auth) + + # This torrent file contains an uncommon field 'filehash' which must be hex + # encoded to allow dumping the torrent info to json. Otherwise it will fail with: + # UnicodeDecodeError: 'utf8' codec can't decode byte 0xe5 in position 0: invalid continuation byte + filename = get_test_data_file('filehash_field.torrent') + input_file = ( + '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}' % filename + ) + headers = { + b'User-Agent': ['Twisted Web Client Example'], + b'Content-Type': ['application/json'], + } + url = 'http://127.0.0.1:%s/json' % self.webserver_listen_port + + d = yield agent.request( + b'POST', + url.encode('utf-8'), + Headers(headers), + FileBodyProducer(BytesIO(input_file.encode('utf-8'))), + ) + + body = yield twisted.web.client.readBody(d) + + json = json_lib.loads(body.decode()) + self.assertEqual(None, json['error']) + self.assertEqual('torrent_filehash', json['result']['name']) diff --git a/deluge/tests/twisted/plugins/delugereporter.py b/deluge/tests/twisted/plugins/delugereporter.py new file mode 100644 index 0000000..c2a7b52 --- /dev/null +++ b/deluge/tests/twisted/plugins/delugereporter.py @@ -0,0 +1,50 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from __future__ import unicode_literals + +import os + +from twisted.plugin import IPlugin +from twisted.trial.itrial import IReporter +from twisted.trial.reporter import TreeReporter +from zope.interface import implements + + +class _Reporter(object): + implements(IPlugin, IReporter) + + def __init__( + self, name, module, description, longOpt, shortOpt, klass # noqa: N803 + ): + self.name = name + self.module = module + self.description = description + self.longOpt = longOpt + self.shortOpt = shortOpt + self.klass = klass + + +deluge = _Reporter( + 'Deluge reporter that suppresses Stacktrace from TODO tests', + 'twisted.plugins.delugereporter', + description='Deluge Reporter', + longOpt='deluge-reporter', + shortOpt=None, + klass='DelugeReporter', +) + + +class DelugeReporter(TreeReporter): + def __init__(self, *args, **kwargs): + os.environ['DELUGE_REPORTER'] = 'true' + TreeReporter.__init__(self, *args, **kwargs) + + def addExpectedFailure(self, *args): # NOQA: N802 + # super(TreeReporter, self).addExpectedFailure(*args) + self.endLine('[TODO]', self.TODO) |