summaryrefslogtreecommitdiffstats
path: root/deluge/tests
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/tests')
-rw-r--r--deluge/tests/__init__.py19
-rw-r--r--deluge/tests/basetest.py59
-rw-r--r--deluge/tests/common.py341
-rw-r--r--deluge/tests/common_web.py69
-rw-r--r--deluge/tests/daemon_base.py86
-rw-r--r--deluge/tests/data/deluge.pngbin0 -> 722 bytes
-rw-r--r--deluge/tests/data/dir_with_6_files.torrentbin0 -> 9630 bytes
-rw-r--r--deluge/tests/data/filehash_field.torrent2
-rw-r--r--deluge/tests/data/google.icobin0 -> 5430 bytes
-rw-r--r--deluge/tests/data/seo.icobin0 -> 1150 bytes
-rw-r--r--deluge/tests/data/test.torrent2
-rw-r--r--deluge/tests/data/test_torrent.file.torrent2
-rw-r--r--deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrentbin0 -> 28184 bytes
-rw-r--r--deluge/tests/data/unicode_file.torrent1
-rw-r--r--deluge/tests/data/unicode_filenames.torrentbin0 -> 34298 bytes
-rw-r--r--deluge/tests/test_alertmanager.py39
-rw-r--r--deluge/tests/test_authmanager.py27
-rw-r--r--deluge/tests/test_bencode.py34
-rw-r--r--deluge/tests/test_client.py204
-rw-r--r--deluge/tests/test_common.py202
-rw-r--r--deluge/tests/test_component.py267
-rw-r--r--deluge/tests/test_config.py194
-rw-r--r--deluge/tests/test_core.py498
-rw-r--r--deluge/tests/test_decorators.py52
-rw-r--r--deluge/tests/test_error.py54
-rw-r--r--deluge/tests/test_files_tab.py176
-rw-r--r--deluge/tests/test_httpdownloader.py266
-rw-r--r--deluge/tests/test_json_api.py291
-rw-r--r--deluge/tests/test_log.py51
-rw-r--r--deluge/tests/test_maketorrent.py96
-rw-r--r--deluge/tests/test_metafile.py68
-rw-r--r--deluge/tests/test_plugin_metadata.py31
-rw-r--r--deluge/tests/test_rpcserver.py113
-rw-r--r--deluge/tests/test_security.py184
-rw-r--r--deluge/tests/test_sessionproxy.py164
-rw-r--r--deluge/tests/test_torrent.py347
-rw-r--r--deluge/tests/test_torrentmanager.py120
-rw-r--r--deluge/tests/test_torrentview.py285
-rw-r--r--deluge/tests/test_tracker_icons.py77
-rw-r--r--deluge/tests/test_transfer.py403
-rw-r--r--deluge/tests/test_ui_common.py77
-rw-r--r--deluge/tests/test_ui_console.py67
-rw-r--r--deluge/tests/test_ui_entry.py513
-rw-r--r--deluge/tests/test_web_api.py199
-rw-r--r--deluge/tests/test_web_auth.py36
-rw-r--r--deluge/tests/test_webserver.py59
-rw-r--r--deluge/tests/twisted/plugins/delugereporter.py50
47 files changed, 5825 insertions, 0 deletions
diff --git a/deluge/tests/__init__.py b/deluge/tests/__init__.py
new file mode 100644
index 0000000..d3bf10d
--- /dev/null
+++ b/deluge/tests/__init__.py
@@ -0,0 +1,19 @@
+# Increase open file descriptor limit to allow tests to run
+# without getting error: what(): epoll: Too many open files
+from __future__ import print_function, unicode_literals
+
+from deluge.i18n import setup_translation
+
+try:
+ import resource
+except ImportError: # Does not exist on Windows
+ pass
+else:
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536))
+ except (ValueError, resource.error) as ex:
+ error = 'Failed to raise file descriptor limit: %s' % ex
+ # print(error)
+
+# Initialize gettext
+setup_translation()
diff --git a/deluge/tests/basetest.py b/deluge/tests/basetest.py
new file mode 100644
index 0000000..11ca18e
--- /dev/null
+++ b/deluge/tests/basetest.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import warnings
+
+from twisted.internet.defer import maybeDeferred
+from twisted.trial import unittest
+
+import deluge.component as component
+
+
+class BaseTestCase(unittest.TestCase):
+ """This is the base class that should be used for all test classes
+ that create classes that inherit from deluge.component.Component. It
+ ensures that the component registry has been cleaned up when tests
+ have finished.
+
+ """
+
+ def setUp(self): # NOQA: N803
+
+ if len(component._ComponentRegistry.components) != 0:
+ warnings.warn(
+ 'The component._ComponentRegistry.components is not empty on test setup.\n'
+ 'This is probably caused by another test that did not clean up after finishing!: %s'
+ % component._ComponentRegistry.components
+ )
+ d = maybeDeferred(self.set_up)
+
+ def on_setup_error(error):
+ warnings.warn('Error caught in test setup!\n%s' % error.getTraceback())
+ self.fail()
+
+ return d.addErrback(on_setup_error)
+
+ def tearDown(self): # NOQA: N803
+ d = maybeDeferred(self.tear_down)
+
+ def on_teardown_failed(error):
+ warnings.warn('Error caught in test teardown!\n%s' % error.getTraceback())
+ self.fail()
+
+ def on_teardown_complete(result):
+ component._ComponentRegistry.components.clear()
+ component._ComponentRegistry.dependents.clear()
+
+ return d.addCallbacks(on_teardown_complete, on_teardown_failed)
+
+ def set_up(self):
+ pass
+
+ def tear_down(self):
+ pass
diff --git a/deluge/tests/common.py b/deluge/tests/common.py
new file mode 100644
index 0000000..e92cc0f
--- /dev/null
+++ b/deluge/tests/common.py
@@ -0,0 +1,341 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import os
+import sys
+import tempfile
+import traceback
+
+from twisted.internet import defer, protocol, reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.error import CannotListenError
+from twisted.trial import unittest
+
+import deluge.configmanager
+import deluge.core.preferencesmanager
+import deluge.log
+from deluge.error import DelugeError
+
+# This sets log level to critical, so use log.critical() to debug while running unit tests
+deluge.log.setup_logger('none')
+
+
+def disable_new_release_check():
+ deluge.core.preferencesmanager.DEFAULT_PREFS['new_release_check'] = False
+
+
+def set_tmp_config_dir():
+ config_directory = tempfile.mkdtemp()
+ deluge.configmanager.set_config_dir(config_directory)
+ return config_directory
+
+
+def setup_test_logger(level='info', prefix='deluge'):
+ deluge.log.setup_logger(level, filename='%s.log' % prefix, twisted_observer=False)
+
+
+def get_test_data_file(filename):
+ return os.path.join(os.path.join(os.path.dirname(__file__), 'data'), filename)
+
+
+def todo_test(caller):
+ # If we are using the delugereporter we can set todo mark on the test
+ # Without the delugereporter the todo would print a stack trace, so in
+ # that case we rely only on skipTest
+ if os.environ.get('DELUGE_REPORTER', None):
+ getattr(caller, caller._testMethodName).__func__.todo = 'To be fixed'
+
+ filename = os.path.basename(traceback.extract_stack(None, 2)[0][0])
+ funcname = traceback.extract_stack(None, 2)[0][2]
+ raise unittest.SkipTest('TODO: %s:%s' % (filename, funcname))
+
+
+def add_watchdog(deferred, timeout=0.05, message=None):
+ def callback(value):
+ if not watchdog.called and not watchdog.cancelled:
+ watchdog.cancel()
+ if not deferred.called:
+ if message:
+ print(message)
+ deferred.cancel()
+ return value
+
+ deferred.addBoth(callback)
+ watchdog = reactor.callLater(timeout, defer.timeout, deferred)
+ return watchdog
+
+
+class ReactorOverride(object):
+ """Class used to patch reactor while running unit tests
+ to avoid starting and stopping the twisted reactor
+ """
+
+ def __getattr__(self, attr):
+ if attr == 'run':
+ return self._run
+ if attr == 'stop':
+ return self._stop
+ return getattr(reactor, attr)
+
+ def _run(self):
+ pass
+
+ def _stop(self):
+ pass
+
+ def addReader(self, arg): # NOQA: N802
+ pass
+
+
+class ProcessOutputHandler(protocol.ProcessProtocol):
+ def __init__(
+ self, script, callbacks, logfile=None, print_stdout=True, print_stderr=True
+ ):
+ """Executes a script and handle the process' output to stdout and stderr.
+
+ Args:
+ script (str): The script to execute.
+ callbacks (list): Callbacks to trigger if the expected output if found.
+ logfile (str, optional): Filename to wrote the process' output.
+ print_stderr (bool): Print the process' stderr output to stdout.
+ print_stdout (bool): Print the process' stdout output to stdout.
+
+ """
+ self.callbacks = callbacks
+ self.script = script
+ self.log_output = ''
+ self.stderr_out = ''
+ self.logfile = logfile
+ self.print_stdout = print_stdout
+ self.print_stderr = print_stderr
+ self.quit_d = None
+ self.killed = False
+ self.watchdogs = []
+
+ def connectionMade(self): # NOQA: N802
+ self.transport.write(self.script)
+ self.transport.closeStdin()
+
+ def outConnectionLost(self): # NOQA: N802
+ if not self.logfile:
+ return
+ with open(self.logfile, 'w') as f:
+ f.write(self.log_output)
+
+ def kill(self):
+ """Kill the running process.
+
+ Returns:
+ Deferred: A deferred that is triggered when the process has quit.
+
+ """
+ if self.killed:
+ return
+ self.killed = True
+ self._kill_watchdogs()
+ self.quit_d = Deferred()
+ self.transport.signalProcess('INT')
+ return self.quit_d
+
+ def _kill_watchdogs(self):
+ """"Cancel all watchdogs"""
+ for w in self.watchdogs:
+ if not w.called and not w.cancelled:
+ w.cancel()
+
+ def processEnded(self, status): # NOQA: N802
+ self.transport.loseConnection()
+ if self.quit_d is None:
+ return
+ if status.value.exitCode == 0:
+ self.quit_d.callback(True)
+ else:
+ self.quit_d.errback(status)
+
+ def check_callbacks(self, data, cb_type='stdout'):
+ ret = False
+ for c in self.callbacks:
+ if cb_type not in c['types'] or c['deferred'].called:
+ continue
+ for trigger in c['triggers']:
+ if trigger['expr'] in data:
+ ret = True
+ if 'cb' in trigger:
+ trigger['cb'](self, c['deferred'], data, self.log_output)
+ elif 'value' not in trigger:
+ raise Exception('Trigger must specify either "cb" or "value"')
+ else:
+ val = trigger['value'](self, data, self.log_output)
+ if trigger.get('type', 'callback') == 'errback':
+ c['deferred'].errback(val)
+ else:
+ c['deferred'].callback(val)
+ return ret
+
+ def outReceived(self, data): # NOQA: N802
+ """Process output from stdout"""
+ data = data.decode('utf8')
+ self.log_output += data
+ if self.check_callbacks(data):
+ pass
+ elif '[ERROR' in data:
+ if not self.print_stdout:
+ return
+ print(data, end=' ')
+
+ def errReceived(self, data): # NOQA: N802
+ """Process output from stderr"""
+ data = data.decode('utf8')
+ self.log_output += data
+ self.stderr_out += data
+ self.check_callbacks(data, cb_type='stderr')
+ if not self.print_stderr:
+ return
+ data = '\n%s' % data.strip()
+ prefixed = data.replace('\n', '\nSTDERR: ')
+ print('\n%s' % prefixed)
+
+
+def start_core(
+ listen_port=58846,
+ logfile=None,
+ timeout=10,
+ timeout_msg=None,
+ custom_script='',
+ print_stdout=True,
+ print_stderr=True,
+ extra_callbacks=None,
+):
+ """Start the deluge core as a daemon.
+
+ Args:
+ listen_port (int, optional): The port the daemon listens for client connections.
+ logfile (str, optional): Logfile name to write the output from the process.
+ timeout (int): If none of the callbacks have been triggered before the imeout, the process is killed.
+ timeout_msg (str): The message to print when the timeout expires.
+ custom_script (str): Extra python code to insert into the daemon process script.
+ print_stderr (bool): If the output from the process' stderr should be printed to stdout.
+ print_stdout (bool): If the output from the process' stdout should be printed to stdout.
+ extra_callbacks (list): A list of dictionaries specifying extra callbacks.
+
+ Returns:
+ tuple(Deferred, ProcessOutputHandler):
+
+ The Deferred is fired when the core callback is triggered either after the default
+ output triggers are matched (daemon successfully started, or failed to start),
+ or upon timeout expiry. The ProcessOutputHandler is the handler for the deluged process.
+
+ """
+ config_directory = set_tmp_config_dir()
+ daemon_script = """
+import sys
+import deluge.core.daemon_entry
+
+from deluge.common import windows_check
+
+if windows_check():
+ sys.argv.extend(['-c', '%(dir)s', '-L', 'info', '-p', '%(port)d'])
+else:
+ sys.argv.extend(['-d', '-c', '%(dir)s', '-L', 'info', '-p', '%(port)d'])
+
+try:
+ daemon = deluge.core.daemon_entry.start_daemon(skip_start=True)
+ %(script)s
+ daemon.start()
+except Exception:
+ import traceback
+ sys.stderr.write('Exception raised:\\n %%s' %% traceback.format_exc())
+""" % {
+ 'dir': config_directory,
+ 'port': listen_port,
+ 'script': custom_script,
+ }
+
+ callbacks = []
+ default_core_cb = {'deferred': Deferred(), 'types': 'stdout'}
+ if timeout:
+ default_core_cb['timeout'] = timeout
+
+ # Specify the triggers for daemon log output
+ default_core_cb['triggers'] = [
+ {'expr': 'Finished loading ', 'value': lambda reader, data, data_all: reader},
+ {
+ 'expr': 'Could not listen on localhost:%d' % (listen_port),
+ 'type': 'errback', # Error from libtorrent
+ 'value': lambda reader, data, data_all: CannotListenError(
+ 'localhost',
+ listen_port,
+ 'Could not start deluge test client!\n%s' % data,
+ ),
+ },
+ {
+ 'expr': 'Traceback',
+ 'type': 'errback',
+ 'value': lambda reader, data, data_all: DelugeError(
+ 'Traceback found when starting daemon:\n%s' % data
+ ),
+ },
+ ]
+
+ callbacks.append(default_core_cb)
+ if extra_callbacks:
+ callbacks.extend(extra_callbacks)
+
+ process_protocol = start_process(
+ daemon_script, callbacks, logfile, print_stdout, print_stderr
+ )
+ return default_core_cb['deferred'], process_protocol
+
+
+def start_process(
+ script, callbacks, logfile=None, print_stdout=True, print_stderr=True
+):
+ """
+ Starts an external python process which executes the given script.
+
+ Args:
+ script (str): The content of the script to execute.
+ callbacks (list): list of dictionaries specifying callbacks.
+ logfile (str, optional): Logfile name to write the output from the process.
+ print_stderr (bool): If the output from the process' stderr should be printed to stdout.
+ print_stdout (bool): If the output from the process' stdout should be printed to stdout.
+
+ Returns:
+ ProcessOutputHandler: The handler for the process's output.
+
+ Each entry in the callbacks list is a dictionary with the following keys:
+ * "deferred": The deferred to be called when matched.
+ * "types": The output this callback should be matched against.
+ Possible values: ["stdout", "stderr"]
+ * "timeout" (optional): A timeout in seconds for the deferred.
+ * "triggers": A list of dictionaries, each specifying specifying a trigger:
+ * "expr": A string to match against the log output.
+ * "value": A function to produce the result to be passed to the callback.
+ * "type" (optional): A string that specifies wether to trigger a regular callback or errback.
+
+ """
+ cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ process_protocol = ProcessOutputHandler(
+ script.encode('utf8'), callbacks, logfile, print_stdout, print_stderr
+ )
+
+ # Add timeouts to deferreds
+ for c in callbacks:
+ if 'timeout' in c:
+ w = add_watchdog(
+ c['deferred'], timeout=c['timeout'], message=c.get('timeout_msg', None)
+ )
+ process_protocol.watchdogs.append(w)
+
+ reactor.spawnProcess(
+ process_protocol, sys.executable, args=[sys.executable], path=cwd
+ )
+ return process_protocol
diff --git a/deluge/tests/common_web.py b/deluge/tests/common_web.py
new file mode 100644
index 0000000..706eb8d
--- /dev/null
+++ b/deluge/tests/common_web.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import deluge.common
+import deluge.component as component
+import deluge.ui.web.auth
+import deluge.ui.web.server
+from deluge import configmanager
+from deluge.ui.web.server import DelugeWeb
+
+from .basetest import BaseTestCase
+from .common import ReactorOverride
+from .daemon_base import DaemonBase
+
+
+class WebServerTestBase(BaseTestCase, DaemonBase):
+ """
+ Base class for tests that need a running webapi
+
+ """
+
+ def set_up(self):
+ self.host_id = None
+ deluge.ui.web.server.reactor = ReactorOverride()
+ d = self.common_set_up()
+ d.addCallback(self.start_core)
+ d.addCallback(self.start_webapi)
+ return d
+
+ def start_webapi(self, arg):
+ self.webserver_listen_port = 8999
+
+ config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy()
+ config_defaults['port'] = self.webserver_listen_port
+ self.config = configmanager.ConfigManager('web.conf', config_defaults)
+
+ self.deluge_web = DelugeWeb(daemon=False)
+
+ host = list(self.deluge_web.web_api.hostlist.config['hosts'][0])
+ host[2] = self.listen_port
+ self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host)
+ self.host_id = host[0]
+ self.deluge_web.start()
+
+ def tear_down(self):
+ d = component.shutdown()
+ d.addCallback(self.terminate_core)
+ return d
+
+
+class WebServerMockBase(object):
+ """
+ Class with utility functions for mocking with tests using the webserver
+
+ """
+
+ def mock_authentication_ignore(self, auth):
+ def check_request(request, method=None, level=None):
+ pass
+
+ self.patch(auth, 'check_request', check_request)
diff --git a/deluge/tests/daemon_base.py b/deluge/tests/daemon_base.py
new file mode 100644
index 0000000..eda2193
--- /dev/null
+++ b/deluge/tests/daemon_base.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import os.path
+
+import pytest
+from twisted.internet import defer
+from twisted.internet.error import CannotListenError
+
+import deluge.component as component
+from deluge.common import windows_check
+
+from . import common
+
+
+class DaemonBase(object):
+
+ if windows_check():
+ skip = 'windows cant start_core not enough arguments for format string'
+
+ def common_set_up(self):
+ common.set_tmp_config_dir()
+ self.listen_port = 58900
+ self.core = None
+ return component.start()
+
+ def terminate_core(self, *args):
+ if args[0] is not None:
+ if hasattr(args[0], 'getTraceback'):
+ print('terminate_core: Errback Exception: %s' % args[0].getTraceback())
+
+ if not self.core.killed:
+ d = self.core.kill()
+ return d
+
+ @defer.inlineCallbacks
+ def start_core(
+ self,
+ arg,
+ custom_script='',
+ logfile='',
+ print_stdout=True,
+ print_stderr=True,
+ timeout=5,
+ port_range=10,
+ extra_callbacks=None,
+ ):
+ if logfile == '':
+ logfile = 'daemon_%s.log' % self.id()
+
+ # We are running py.test
+ if hasattr(pytest, 'config'):
+ # Put log file in the py.test --basetemp argument
+ basetemp = pytest.config.option.basetemp
+ if basetemp:
+ if not os.path.exists(basetemp):
+ os.makedirs(basetemp)
+ logfile = os.path.join(basetemp, logfile)
+
+ for dummy in range(port_range):
+ try:
+ d, self.core = common.start_core(
+ listen_port=self.listen_port,
+ logfile=logfile,
+ timeout=timeout,
+ timeout_msg='Timeout!',
+ custom_script=custom_script,
+ print_stdout=print_stdout,
+ print_stderr=print_stderr,
+ extra_callbacks=extra_callbacks,
+ )
+ yield d
+ except CannotListenError as ex:
+ exception_error = ex
+ self.listen_port += 1
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ else:
+ return
+ raise exception_error
diff --git a/deluge/tests/data/deluge.png b/deluge/tests/data/deluge.png
new file mode 100644
index 0000000..e39cd0c
--- /dev/null
+++ b/deluge/tests/data/deluge.png
Binary files differ
diff --git a/deluge/tests/data/dir_with_6_files.torrent b/deluge/tests/data/dir_with_6_files.torrent
new file mode 100644
index 0000000..2c6b5fb
--- /dev/null
+++ b/deluge/tests/data/dir_with_6_files.torrent
Binary files differ
diff --git a/deluge/tests/data/filehash_field.torrent b/deluge/tests/data/filehash_field.torrent
new file mode 100644
index 0000000..99e41f0
--- /dev/null
+++ b/deluge/tests/data/filehash_field.torrent
@@ -0,0 +1,2 @@
+d13:creation datei1476342472e4:infod5:filesld4:ed2k16:2M2&XE 18:filehash20:f4^S96P՝R6:lengthi54e4:pathl8:tull.txteed4:ed2k16:_G L@8:filehash20:vXd/n136:lengthi54e4:pathl56:還在一個人無聊嗎~還不趕緊上來聊天美.txteee4:name16:torrent_filehash12:piece lengthi32768e6:pieces20:G@g\&\ fB
+ee \ No newline at end of file
diff --git a/deluge/tests/data/google.ico b/deluge/tests/data/google.ico
new file mode 100644
index 0000000..82339b3
--- /dev/null
+++ b/deluge/tests/data/google.ico
Binary files differ
diff --git a/deluge/tests/data/seo.ico b/deluge/tests/data/seo.ico
new file mode 100644
index 0000000..841e528
--- /dev/null
+++ b/deluge/tests/data/seo.ico
Binary files differ
diff --git a/deluge/tests/data/test.torrent b/deluge/tests/data/test.torrent
new file mode 100644
index 0000000..847ec58
--- /dev/null
+++ b/deluge/tests/data/test.torrent
@@ -0,0 +1,2 @@
+d8:announce40:http://tracker.aelitis.com:6969/announce13:announce-listll40:http://tracker.aelitis.com:6969/announceel41:http://tracker.aelitis.com:16969/announceee18:azureus_propertiesd17:dht_backup_enablei0ee7:comment34:provided by http://getazureus.com/13:comment.utf-834:provided by http://getazureus.com/10:created by19:Azureus/2.5.0.3_CVS13:creation datei1169429806e4:infod4:ed2k16:>pl]K^\;;e6:lengthi307949e4:name22:azcvsupdater_2.6.2.jar10:name.utf-822:azcvsupdater_2.6.2.jar12:piece lengthi32768e6:pieces200:B'bsu97u<w\fԛ}:qv"7 y! lv{/"PD8-MFx+S`%%
+;ϐ /F>gA2|ڂb#wIfWswKrGWiد#u E?:ub(oj ^AJ?֌&7:privatei0e4:sha120:2ζ"Կ^Khŷee \ No newline at end of file
diff --git a/deluge/tests/data/test_torrent.file.torrent b/deluge/tests/data/test_torrent.file.torrent
new file mode 100644
index 0000000..fca14ca
--- /dev/null
+++ b/deluge/tests/data/test_torrent.file.torrent
@@ -0,0 +1,2 @@
+d7:comment17:Test torrent file10:created by11:Deluge Team13:creation datei1411826665e8:encoding5:UTF-84:infod6:lengthi512000e4:name17:test_torrent.file12:piece lengthi32768e6:pieces320:$2Tj >hU.--~–ޔBzuEB1@ͥ/K"zF0֣[asV1B^Wp-S׶F`M9)},4niW jQI̗(,t؋chi*M}^WS7 h:-beTXa3m|J"]0$}l@L V,4˫zMLģJ*
+\AP&I9}20֎H:_8<V2JYb2'2h0\*_j7:privatei0eee \ No newline at end of file
diff --git a/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent
new file mode 100644
index 0000000..b55c9ae
--- /dev/null
+++ b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent
Binary files differ
diff --git a/deluge/tests/data/unicode_file.torrent b/deluge/tests/data/unicode_file.torrent
new file mode 100644
index 0000000..41db239
--- /dev/null
+++ b/deluge/tests/data/unicode_file.torrent
@@ -0,0 +1 @@
+d13:creation datei1540200743e8:encoding5:UTF-84:infod6:lengthi0e4:name35:সুকুমার রায়.mkv12:piece lengthi32768e6:pieces0:7:privatei0eee
diff --git a/deluge/tests/data/unicode_filenames.torrent b/deluge/tests/data/unicode_filenames.torrent
new file mode 100644
index 0000000..e34f055
--- /dev/null
+++ b/deluge/tests/data/unicode_filenames.torrent
Binary files differ
diff --git a/deluge/tests/test_alertmanager.py b/deluge/tests/test_alertmanager.py
new file mode 100644
index 0000000..f197882
--- /dev/null
+++ b/deluge/tests/test_alertmanager.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import deluge.component as component
+from deluge.core.core import Core
+
+from .basetest import BaseTestCase
+
+
+class AlertManagerTestCase(BaseTestCase):
+ def set_up(self):
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.am = component.get('AlertManager')
+ return component.start(['AlertManager'])
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_register_handler(self):
+ def handler(alert):
+ return
+
+ self.am.register_handler('dummy_alert', handler)
+ self.assertEqual(self.am.handlers['dummy_alert'], [handler])
+
+ def test_deregister_handler(self):
+ def handler(alert):
+ return
+
+ self.am.register_handler('dummy_alert', handler)
+ self.am.deregister_handler(handler)
+ self.assertEqual(self.am.handlers['dummy_alert'], [])
diff --git a/deluge/tests/test_authmanager.py b/deluge/tests/test_authmanager.py
new file mode 100644
index 0000000..91e122f
--- /dev/null
+++ b/deluge/tests/test_authmanager.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import deluge.component as component
+from deluge.common import get_localhost_auth
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AuthManager
+
+from .basetest import BaseTestCase
+
+
+class AuthManagerTestCase(BaseTestCase):
+ def set_up(self):
+ self.auth = AuthManager()
+ self.auth.start()
+
+ def tear_down(self):
+ # We must ensure that the components in component registry are removed
+ return component.shutdown()
+
+ def test_authorize(self):
+ self.assertEqual(self.auth.authorize(*get_localhost_auth()), AUTH_LEVEL_ADMIN)
diff --git a/deluge/tests/test_bencode.py b/deluge/tests/test_bencode.py
new file mode 100644
index 0000000..b49c21f
--- /dev/null
+++ b/deluge/tests/test_bencode.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+from __future__ import unicode_literals
+
+from twisted.trial import unittest
+
+from deluge import bencode
+
+from . import common
+
+
+class BencodeTestCase(unittest.TestCase):
+ def test_bencode_unicode_metainfo(self):
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ metainfo = bencode.bdecode(_file.read())[b'info']
+ bencode.bencode({b'info': metainfo})
+
+ def test_bencode_unicode_value(self):
+ self.assertEqual(bencode.bencode(b'abc'), b'3:abc')
+ self.assertEqual(bencode.bencode('abc'), b'3:abc')
+
+ def test_bdecode(self):
+ self.assertEqual(bencode.bdecode(b'3:dEf'), b'dEf')
+ with self.assertRaises(bencode.BTFailure):
+ bencode.bdecode('dEf')
+ with self.assertRaises(bencode.BTFailure):
+ bencode.bdecode(b'dEf')
+ with self.assertRaises(bencode.BTFailure):
+ bencode.bdecode({'dEf': 123})
diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py
new file mode 100644
index 0000000..c89ad53
--- /dev/null
+++ b/deluge/tests/test_client.py
@@ -0,0 +1,204 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from twisted.internet import defer
+
+import deluge.component as component
+from deluge import error
+from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth, windows_check
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN
+from deluge.ui.client import Client, DaemonSSLProxy, client
+
+from .basetest import BaseTestCase
+from .daemon_base import DaemonBase
+
+
+class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
+ def authenticate(self, username, password):
+ self.login_deferred = defer.Deferred()
+ d = self.call('daemon.login', username, password)
+ d.addCallback(self.__on_login, username)
+ d.addErrback(self.__on_login_fail)
+ return self.login_deferred
+
+ def __on_login(self, result, username):
+ self.login_deferred.callback(result)
+
+ def __on_login_fail(self, result):
+ self.login_deferred.errback(result)
+
+
+class NoVersionSendingClient(Client):
+ def connect(
+ self,
+ host='127.0.0.1',
+ port=58846,
+ username='',
+ password='',
+ skip_authentication=False,
+ ):
+ self._daemon_proxy = NoVersionSendingDaemonSSLProxy()
+ self._daemon_proxy.set_disconnect_callback(self.__on_disconnect)
+
+ d = self._daemon_proxy.connect(host, port)
+
+ def on_connect_fail(reason):
+ self.disconnect()
+ return reason
+
+ def on_authenticate(result, daemon_info):
+ return result
+
+ def on_authenticate_fail(reason):
+ return reason
+
+ def on_connected(daemon_version):
+ return daemon_version
+
+ def authenticate(daemon_version, username, password):
+ d = self._daemon_proxy.authenticate(username, password)
+ d.addCallback(on_authenticate, daemon_version)
+ d.addErrback(on_authenticate_fail)
+ return d
+
+ d.addCallback(on_connected)
+ d.addErrback(on_connect_fail)
+ if not skip_authentication:
+ d.addCallback(authenticate, username, password)
+ return d
+
+ def __on_disconnect(self):
+ if self.disconnect_callback:
+ self.disconnect_callback()
+
+
+class ClientTestCase(BaseTestCase, DaemonBase):
+
+ if windows_check():
+ skip = 'windows cant start_core not enough arguments for format string'
+
+ def set_up(self):
+ d = self.common_set_up()
+ d.addCallback(self.start_core)
+ d.addErrback(self.terminate_core)
+ return d
+
+ def tear_down(self):
+ d = component.shutdown()
+ d.addCallback(self.terminate_core)
+ return d
+
+ def test_connect_no_credentials(self):
+ d = client.connect('localhost', self.listen_port, username='', password='')
+
+ def on_connect(result):
+ self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
+ self.addCleanup(client.disconnect)
+ return result
+
+ d.addCallbacks(on_connect, self.fail)
+ return d
+
+ def test_connect_localclient(self):
+ username, password = get_localhost_auth()
+ d = client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ def on_connect(result):
+ self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
+ self.addCleanup(client.disconnect)
+ return result
+
+ d.addCallbacks(on_connect, self.fail)
+ return d
+
+ def test_connect_bad_password(self):
+ username, password = get_localhost_auth()
+ d = client.connect(
+ 'localhost', self.listen_port, username=username, password=password + '1'
+ )
+
+ def on_failure(failure):
+ self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
+ self.assertEqual(failure.value.message, 'Password does not match')
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ def test_connect_invalid_user(self):
+ username, password = get_localhost_auth()
+ d = client.connect('localhost', self.listen_port, username='invalid-user')
+
+ def on_failure(failure):
+ self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
+ self.assertEqual(failure.value.message, 'Username does not exist')
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ def test_connect_without_password(self):
+ username, password = get_localhost_auth()
+ d = client.connect('localhost', self.listen_port, username=username)
+
+ def on_failure(failure):
+ self.assertEqual(
+ failure.trap(error.AuthenticationRequired), error.AuthenticationRequired
+ )
+ self.assertEqual(failure.value.username, username)
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ @defer.inlineCallbacks
+ def test_connect_with_password(self):
+ username, password = get_localhost_auth()
+ yield client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+ yield client.core.create_account('testuser', 'testpw', 'DEFAULT')
+ yield client.disconnect()
+ ret = yield client.connect(
+ 'localhost', self.listen_port, username='testuser', password='testpw'
+ )
+ self.assertEqual(ret, AUTH_LEVEL_NORMAL)
+ yield
+
+ @defer.inlineCallbacks
+ def test_invalid_rpc_method_call(self):
+ yield client.connect('localhost', self.listen_port, username='', password='')
+ d = client.core.invalid_method()
+
+ def on_failure(failure):
+ self.assertEqual(
+ failure.trap(error.WrappedException), error.WrappedException
+ )
+ self.addCleanup(client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ yield d
+
+ def test_connect_without_sending_client_version_fails(self):
+ username, password = get_localhost_auth()
+ no_version_sending_client = NoVersionSendingClient()
+ d = no_version_sending_client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ def on_failure(failure):
+ self.assertEqual(
+ failure.trap(error.IncompatibleClient), error.IncompatibleClient
+ )
+ self.addCleanup(no_version_sending_client.disconnect)
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
diff --git a/deluge/tests/test_common.py b/deluge/tests/test_common.py
new file mode 100644
index 0000000..3cecb64
--- /dev/null
+++ b/deluge/tests/test_common.py
@@ -0,0 +1,202 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import os
+import tarfile
+
+from twisted.trial import unittest
+
+from deluge.common import (
+ VersionSplit,
+ archive_files,
+ fdate,
+ fpcnt,
+ fpeer,
+ fsize,
+ fspeed,
+ ftime,
+ get_path_size,
+ is_infohash,
+ is_ip,
+ is_ipv4,
+ is_ipv6,
+ is_magnet,
+ is_url,
+ windows_check,
+)
+from deluge.i18n import setup_translation
+
+from .common import get_test_data_file, set_tmp_config_dir
+
+
+class CommonTestCase(unittest.TestCase):
+ def setUp(self): # NOQA
+ self.config_dir = set_tmp_config_dir()
+ setup_translation()
+
+ def tearDown(self): # NOQA
+ pass
+
+ def test_fsize(self):
+ self.assertEqual(fsize(0), '0 B')
+ self.assertEqual(fsize(100), '100 B')
+ self.assertEqual(fsize(1023), '1023 B')
+ self.assertEqual(fsize(1024), '1.0 KiB')
+ self.assertEqual(fsize(1048575), '1024.0 KiB')
+ self.assertEqual(fsize(1048576), '1.0 MiB')
+ self.assertEqual(fsize(1073741823), '1024.0 MiB')
+ self.assertEqual(fsize(1073741824), '1.0 GiB')
+ self.assertEqual(fsize(112245), '109.6 KiB')
+ self.assertEqual(fsize(110723441824), '103.1 GiB')
+ self.assertEqual(fsize(1099511627775), '1024.0 GiB')
+ self.assertEqual(fsize(1099511627777), '1.0 TiB')
+ self.assertEqual(fsize(766148267453245), '696.8 TiB')
+
+ def test_fpcnt(self):
+ self.assertTrue(fpcnt(0.9311) == '93.11%')
+
+ def test_fspeed(self):
+ self.assertTrue(fspeed(43134) == '42.1 KiB/s')
+
+ def test_fpeer(self):
+ self.assertTrue(fpeer(10, 20) == '10 (20)')
+ self.assertTrue(fpeer(10, -1) == '10')
+
+ def test_ftime(self):
+ self.assertEqual(ftime(0), '')
+ self.assertEqual(ftime(5), '5s')
+ self.assertEqual(ftime(100), '1m 40s')
+ self.assertEqual(ftime(3789), '1h 3m')
+ self.assertEqual(ftime(23011), '6h 23m')
+ self.assertEqual(ftime(391187), '4d 12h')
+ self.assertEqual(ftime(604800), '1w 0d')
+ self.assertEqual(ftime(13893086), '22w 6d')
+ self.assertEqual(ftime(59740269), '1y 46w')
+ self.assertEqual(ftime(61.25), '1m 1s')
+ self.assertEqual(ftime(119.9), '1m 59s')
+
+ def test_fdate(self):
+ self.assertTrue(fdate(-1) == '')
+
+ def test_is_url(self):
+ self.assertTrue(is_url('http://deluge-torrent.org'))
+ self.assertFalse(is_url('file://test.torrent'))
+
+ def test_is_magnet(self):
+ self.assertTrue(
+ is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN')
+ )
+ self.assertFalse(is_magnet(None))
+
+ def test_is_infohash(self):
+ self.assertTrue(is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973'))
+
+ def test_get_path_size(self):
+ if windows_check():
+ raise unittest.SkipTest('os devnull is different on windows')
+ self.assertTrue(get_path_size(os.devnull) == 0)
+ self.assertTrue(get_path_size('non-existant.file') == -1)
+
+ def test_is_ip(self):
+ self.assertTrue(is_ip('192.0.2.0'))
+ self.assertFalse(is_ip('192..0.0'))
+ self.assertTrue(is_ip('2001:db8::'))
+ self.assertFalse(is_ip('2001:db8:'))
+
+ def test_is_ipv4(self):
+ self.assertTrue(is_ipv4('192.0.2.0'))
+ self.assertFalse(is_ipv4('192..0.0'))
+
+ def test_is_ipv6(self):
+ self.assertTrue(is_ipv6('2001:db8::'))
+ self.assertFalse(is_ipv6('2001:db8:'))
+
+ def test_version_split(self):
+ self.assertTrue(VersionSplit('1.2.2') == VersionSplit('1.2.2'))
+ self.assertTrue(VersionSplit('1.2.1') < VersionSplit('1.2.2'))
+ self.assertTrue(VersionSplit('1.1.9') < VersionSplit('1.2.2'))
+ self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.1'))
+ self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0'))
+ self.assertTrue(VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2'))
+ self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2'))
+ self.assertTrue(VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2'))
+ self.assertTrue(VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2'))
+ self.assertTrue(VersionSplit('0.14.9') == VersionSplit('0.14.9'))
+ self.assertTrue(VersionSplit('0.14.9') > VersionSplit('0.14.5'))
+ self.assertTrue(VersionSplit('0.14.10') >= VersionSplit('0.14.9'))
+ self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123'))
+ self.assertTrue(VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2'))
+ self.assertTrue(VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123'))
+ self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123'))
+ self.assertTrue(VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0'))
+ self.assertTrue(VersionSplit('1.4.0a1') < VersionSplit('1.4.0'))
+
+ def test_parse_human_size(self):
+ from deluge.common import parse_human_size
+
+ sizes = [
+ ('1', 1),
+ ('10 bytes', 10),
+ ('2048 bytes', 2048),
+ ('1MiB', 2 ** (10 * 2)),
+ ('1 MiB', 2 ** (10 * 2)),
+ ('1 GiB', 2 ** (10 * 3)),
+ ('1 GiB', 2 ** (10 * 3)),
+ ('1M', 10 ** 6),
+ ('1MB', 10 ** 6),
+ ('1 GB', 10 ** 9),
+ ('1 TB', 10 ** 12),
+ ]
+
+ for human_size, byte_size in sizes:
+ parsed = parse_human_size(human_size)
+ self.assertEqual(
+ parsed, byte_size, 'Mismatch when converting: %s' % human_size
+ )
+
+ def test_archive_files(self):
+ arc_filelist = [
+ get_test_data_file('test.torrent'),
+ get_test_data_file('deluge.png'),
+ ]
+ arc_filepath = archive_files('test-arc', arc_filelist)
+
+ with tarfile.open(arc_filepath, 'r') as tar:
+ for tar_info in tar:
+ self.assertTrue(tar_info.isfile())
+ self.assertTrue(
+ tar_info.name in [os.path.basename(arcf) for arcf in arc_filelist]
+ )
+
+ def test_archive_files_missing(self):
+ """Archive exists even with file not found."""
+ filelist = ['test.torrent', 'deluge.png', 'missing.file']
+ arc_filepath = archive_files(
+ 'test-arc', [get_test_data_file(f) for f in filelist]
+ )
+ filelist.remove('missing.file')
+
+ with tarfile.open(arc_filepath, 'r') as tar:
+ self.assertEqual(tar.getnames(), filelist)
+ self.assertTrue(all(tarinfo.isfile() for tarinfo in tar))
+
+ def test_archive_files_message(self):
+ filelist = ['test.torrent', 'deluge.png']
+ arc_filepath = archive_files(
+ 'test-arc', [get_test_data_file(f) for f in filelist], message='test'
+ )
+
+ result_files = filelist + ['archive_message.txt']
+ with tarfile.open(arc_filepath, 'r') as tar:
+ self.assertEqual(tar.getnames(), result_files)
+ for tar_info in tar:
+ self.assertTrue(tar_info.isfile())
+ if tar_info.name == 'archive_message.txt':
+ result = tar.extractfile(tar_info).read().decode()
+ self.assertEqual(result, 'test')
diff --git a/deluge/tests/test_component.py b/deluge/tests/test_component.py
new file mode 100644
index 0000000..26f24ad
--- /dev/null
+++ b/deluge/tests/test_component.py
@@ -0,0 +1,267 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from twisted.internet import defer, threads
+from twisted.trial.unittest import SkipTest
+
+import deluge.component as component
+
+from .basetest import BaseTestCase
+
+
+class ComponentTester(component.Component):
+ def __init__(self, name, depend=None):
+ component.Component.__init__(self, name, depend=depend)
+ self.start_count = 0
+ self.stop_count = 0
+
+ def start(self):
+ self.start_count += 1
+
+ def stop(self):
+ self.stop_count += 1
+
+
+class ComponentTesterDelayStart(ComponentTester):
+ def start(self):
+ def do_sleep():
+ import time
+
+ time.sleep(1)
+
+ d = threads.deferToThread(do_sleep)
+
+ def on_done(result):
+ self.start_count += 1
+
+ return d.addCallback(on_done)
+
+
+class ComponentTesterUpdate(component.Component):
+ def __init__(self, name):
+ component.Component.__init__(self, name)
+ self.counter = 0
+ self.start_count = 0
+ self.stop_count = 0
+
+ def update(self):
+ self.counter += 1
+
+ def stop(self):
+ self.stop_count += 1
+
+
+class ComponentTesterShutdown(component.Component):
+ def __init__(self, name):
+ component.Component.__init__(self, name)
+ self.shutdowned = False
+ self.stop_count = 0
+
+ def shutdown(self):
+ self.shutdowned = True
+
+ def stop(self):
+ self.stop_count += 1
+
+
+class ComponentTestClass(BaseTestCase):
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_start_component(self):
+ def on_start(result, c):
+ self.assertEqual(c._component_state, 'Started')
+ self.assertEqual(c.start_count, 1)
+
+ c = ComponentTester('test_start_c1')
+ d = component.start(['test_start_c1'])
+ d.addCallback(on_start, c)
+ return d
+
+ def test_start_stop_depends(self):
+ def on_stop(result, c1, c2):
+ self.assertEqual(c1._component_state, 'Stopped')
+ self.assertEqual(c2._component_state, 'Stopped')
+ self.assertEqual(c1.stop_count, 1)
+ self.assertEqual(c2.stop_count, 1)
+
+ def on_start(result, c1, c2):
+ self.assertEqual(c1._component_state, 'Started')
+ self.assertEqual(c2._component_state, 'Started')
+ self.assertEqual(c1.start_count, 1)
+ self.assertEqual(c2.start_count, 1)
+ return component.stop(['test_start_depends_c1']).addCallback(
+ on_stop, c1, c2
+ )
+
+ c1 = ComponentTester('test_start_depends_c1')
+ c2 = ComponentTester('test_start_depends_c2', depend=['test_start_depends_c1'])
+
+ d = component.start(['test_start_depends_c2'])
+ d.addCallback(on_start, c1, c2)
+ return d
+
+ def start_with_depends(self):
+ c1 = ComponentTesterDelayStart('test_start_all_c1')
+ c2 = ComponentTester('test_start_all_c2', depend=['test_start_all_c4'])
+ c3 = ComponentTesterDelayStart(
+ 'test_start_all_c3', depend=['test_start_all_c5', 'test_start_all_c1']
+ )
+ c4 = ComponentTester('test_start_all_c4', depend=['test_start_all_c3'])
+ c5 = ComponentTester('test_start_all_c5')
+
+ d = component.start()
+ return (d, c1, c2, c3, c4, c5)
+
+ def finish_start_with_depends(self, *args):
+ for c in args[1:]:
+ component.deregister(c)
+
+ def test_start_all(self):
+ def on_start(*args):
+ for c in args[1:]:
+ self.assertEqual(c._component_state, 'Started')
+ self.assertEqual(c.start_count, 1)
+
+ ret = self.start_with_depends()
+ ret[0].addCallback(on_start, *ret[1:])
+ ret[0].addCallback(self.finish_start_with_depends, *ret[1:])
+ return ret[0]
+
+ def test_register_exception(self):
+ ComponentTester('test_register_exception_c1')
+ self.assertRaises(
+ component.ComponentAlreadyRegistered,
+ ComponentTester,
+ 'test_register_exception_c1',
+ )
+
+ def test_stop_component(self):
+ def on_stop(result, c):
+ self.assertEqual(c._component_state, 'Stopped')
+ self.assertFalse(c._component_timer.running)
+ self.assertEqual(c.stop_count, 1)
+
+ def on_start(result, c):
+ self.assertEqual(c._component_state, 'Started')
+ return component.stop(['test_stop_component_c1']).addCallback(on_stop, c)
+
+ c = ComponentTesterUpdate('test_stop_component_c1')
+ d = component.start(['test_stop_component_c1'])
+ d.addCallback(on_start, c)
+ return d
+
+ def test_stop_all(self):
+ def on_stop(result, *args):
+ for c in args:
+ self.assertEqual(c._component_state, 'Stopped')
+ self.assertEqual(c.stop_count, 1)
+
+ def on_start(result, *args):
+ for c in args:
+ self.assertEqual(c._component_state, 'Started')
+ return component.stop().addCallback(on_stop, *args)
+
+ ret = self.start_with_depends()
+ ret[0].addCallback(on_start, *ret[1:])
+ ret[0].addCallback(self.finish_start_with_depends, *ret[1:])
+ return ret[0]
+
+ def test_update(self):
+ def on_start(result, c1, counter):
+ self.assertTrue(c1._component_timer)
+ self.assertTrue(c1._component_timer.running)
+ self.assertNotEqual(c1.counter, counter)
+ return component.stop()
+
+ c1 = ComponentTesterUpdate('test_update_c1')
+ cnt = int(c1.counter)
+ d = component.start(['test_update_c1'])
+
+ d.addCallback(on_start, c1, cnt)
+ return d
+
+ def test_pause(self):
+ def on_pause(result, c1, counter):
+ self.assertEqual(c1._component_state, 'Paused')
+ self.assertNotEqual(c1.counter, counter)
+ self.assertFalse(c1._component_timer.running)
+
+ def on_start(result, c1, counter):
+ self.assertTrue(c1._component_timer)
+ self.assertNotEqual(c1.counter, counter)
+ d = component.pause(['test_pause_c1'])
+ d.addCallback(on_pause, c1, counter)
+ return d
+
+ c1 = ComponentTesterUpdate('test_pause_c1')
+ cnt = int(c1.counter)
+ d = component.start(['test_pause_c1'])
+
+ d.addCallback(on_start, c1, cnt)
+ return d
+
+ @defer.inlineCallbacks
+ def test_component_start_error(self):
+ ComponentTesterUpdate('test_pause_c1')
+ yield component.start(['test_pause_c1'])
+ yield component.pause(['test_pause_c1'])
+ test_comp = component.get('test_pause_c1')
+ try:
+ result = self.failureResultOf(test_comp._component_start())
+ except AttributeError:
+ raise SkipTest(
+ 'This test requires trial failureResultOf() in Twisted version >= 13'
+ )
+ self.assertEqual(
+ result.check(component.ComponentException), component.ComponentException
+ )
+
+ @defer.inlineCallbacks
+ def test_start_paused_error(self):
+ ComponentTesterUpdate('test_pause_c1')
+ yield component.start(['test_pause_c1'])
+ yield component.pause(['test_pause_c1'])
+
+ # Deferreds that fail in component have to error handler which results in
+ # twisted doing a log.err call which causes the test to fail.
+ # Prevent failure by ignoring the exception
+ self._observer._ignoreErrors(component.ComponentException)
+
+ result = yield component.start()
+ self.assertEqual(
+ [(result[0][0], result[0][1].value)],
+ [
+ (
+ defer.FAILURE,
+ component.ComponentException(
+ 'Trying to start component "%s" but it is '
+ 'not in a stopped state. Current state: %s'
+ % ('test_pause_c1', 'Paused'),
+ '',
+ ),
+ )
+ ],
+ )
+
+ def test_shutdown(self):
+ def on_shutdown(result, c1):
+ self.assertTrue(c1.shutdowned)
+ self.assertEqual(c1._component_state, 'Stopped')
+ self.assertEqual(c1.stop_count, 1)
+
+ def on_start(result, c1):
+ d = component.shutdown()
+ d.addCallback(on_shutdown, c1)
+ return d
+
+ c1 = ComponentTesterShutdown('test_shutdown_c1')
+ d = component.start(['test_shutdown_c1'])
+ d.addCallback(on_start, c1)
+ return d
diff --git a/deluge/tests/test_config.py b/deluge/tests/test_config.py
new file mode 100644
index 0000000..270cc5a
--- /dev/null
+++ b/deluge/tests/test_config.py
@@ -0,0 +1,194 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import os
+from codecs import getwriter
+
+from twisted.internet import task
+from twisted.trial import unittest
+
+import deluge.config
+from deluge.common import JSON_FORMAT
+from deluge.config import Config
+
+from .common import set_tmp_config_dir
+
+DEFAULTS = {
+ 'string': 'foobar',
+ 'int': 1,
+ 'float': 0.435,
+ 'bool': True,
+ 'unicode': 'foobar',
+}
+
+
+class ConfigTestCase(unittest.TestCase):
+ def setUp(self): # NOQA: N803
+ self.config_dir = set_tmp_config_dir()
+
+ def test_init(self):
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ self.assertEqual(DEFAULTS, config.config)
+
+ config = Config('test.conf', config_dir=self.config_dir)
+ self.assertEqual({}, config.config)
+
+ def test_set_get_item(self):
+ config = Config('test.conf', config_dir=self.config_dir)
+ config['foo'] = 1
+ self.assertEqual(config['foo'], 1)
+ self.assertRaises(ValueError, config.set_item, 'foo', 'bar')
+
+ config['foo'] = 2
+ self.assertEqual(config.get_item('foo'), 2)
+
+ config['foo'] = '3'
+ self.assertEqual(config.get_item('foo'), 3)
+
+ config['unicode'] = 'ВИДЕОФИЛЬМЫ'
+ self.assertEqual(config['unicode'], 'ВИДЕОФИЛЬМЫ')
+
+ config['unicode'] = b'foostring'
+ self.assertFalse(isinstance(config.get_item('unicode'), bytes))
+
+ config._save_timer.cancel()
+
+ def test_set_get_item_none(self):
+ config = Config('test.conf', config_dir=self.config_dir)
+
+ config['foo'] = None
+ self.assertIsNone(config['foo'])
+ self.assertIsInstance(config['foo'], type(None))
+
+ config['foo'] = 1
+ self.assertEqual(config.get('foo'), 1)
+
+ config['foo'] = None
+ self.assertIsNone(config['foo'])
+
+ config['bar'] = None
+ self.assertIsNone(config['bar'])
+
+ config['bar'] = None
+ self.assertIsNone(config['bar'])
+
+ config._save_timer.cancel()
+
+ def test_get(self):
+ config = Config('test.conf', config_dir=self.config_dir)
+ config['foo'] = 1
+ self.assertEqual(config.get('foo'), 1)
+ self.assertEqual(config.get('foobar'), None)
+ self.assertEqual(config.get('foobar', 2), 2)
+ config['foobar'] = 5
+ self.assertEqual(config.get('foobar', 2), 5)
+
+ def test_load(self):
+ def check_config():
+ config = Config('test.conf', config_dir=self.config_dir)
+
+ self.assertEqual(config['string'], 'foobar')
+ self.assertEqual(config['float'], 0.435)
+
+ # Test opening a previous 1.2 config file of just a json object
+ import json
+
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ check_config()
+
+ # Test opening a previous 1.2 config file of having the format versions
+ # as ints
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ _file.write(bytes(1) + b'\n')
+ _file.write(bytes(1) + b'\n')
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ check_config()
+
+ # Test the 1.2 config format
+ version = {'format': 1, 'file': 1}
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ json.dump(version, getwriter('utf8')(_file), **JSON_FORMAT)
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ check_config()
+
+ def test_save(self):
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ # We do this twice because the first time we need to save the file to disk
+ # and the second time we do a compare and we should not write
+ ret = config.save()
+ self.assertTrue(ret)
+ ret = config.save()
+ self.assertTrue(ret)
+
+ config['string'] = 'baz'
+ config['int'] = 2
+ ret = config.save()
+ self.assertTrue(ret)
+ del config
+
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ self.assertEqual(config['string'], 'baz')
+ self.assertEqual(config['int'], 2)
+
+ def test_save_timer(self):
+ self.clock = task.Clock()
+ deluge.config.callLater = self.clock.callLater
+
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ config['string'] = 'baz'
+ config['int'] = 2
+ self.assertTrue(config._save_timer.active())
+
+ # Timeout set for 5 seconds in config, so lets move clock by 5 seconds
+ self.clock.advance(5)
+
+ def check_config(config):
+ self.assertTrue(not config._save_timer.active())
+ del config
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ self.assertEqual(config['string'], 'baz')
+ self.assertEqual(config['int'], 2)
+
+ check_config(config)
+
+ def test_find_json_objects(self):
+ s = """{
+ "file": 1,
+ "format": 1
+}{
+ "ssl": true,
+ "enabled": false,
+ "port": 8115
+}\n"""
+
+ from deluge.config import find_json_objects
+
+ objects = find_json_objects(s)
+ self.assertEqual(len(objects), 2)
+
+ def test_find_json_objects_curly_brace(self):
+ """Test with string containing curly brace"""
+ s = """{
+ "file": 1,
+ "format": 1
+}{
+ "ssl": true,
+ "enabled": false,
+ "port": 8115
+ "password": "abc{def"
+}\n"""
+
+ from deluge.config import find_json_objects
+
+ objects = find_json_objects(s)
+ self.assertEqual(len(objects), 2)
diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py
new file mode 100644
index 0000000..15fbc1b
--- /dev/null
+++ b/deluge/tests/test_core.py
@@ -0,0 +1,498 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from base64 import b64encode
+from hashlib import sha1 as sha
+
+import pytest
+from six import integer_types
+from twisted.internet import defer, reactor, task
+from twisted.internet.error import CannotListenError
+from twisted.python.failure import Failure
+from twisted.web.http import FORBIDDEN
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory, Site
+from twisted.web.static import File
+
+import deluge.common
+import deluge.component as component
+import deluge.core.torrent
+from deluge._libtorrent import lt
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.error import AddTorrentError, InvalidTorrentError
+
+from . import common
+from .basetest import BaseTestCase
+
+common.disable_new_release_check()
+
+
+class CookieResource(Resource):
+ def render(self, request):
+ if request.getCookie(b'password') != b'deluge':
+ request.setResponseCode(FORBIDDEN)
+ return
+
+ request.setHeader(b'Content-Type', b'application/x-bittorrent')
+ with open(
+ common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
+ ) as _file:
+ data = _file.read()
+ return data
+
+
+class PartialDownload(Resource):
+ def getChild(self, path, request): # NOQA: N802
+ return EncodingResourceWrapper(self, [GzipEncoderFactory()])
+
+ def render(self, request):
+ with open(
+ common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
+ ) as _file:
+ data = _file.read()
+ request.setHeader(b'Content-Length', str(len(data)))
+ request.setHeader(b'Content-Type', b'application/x-bittorrent')
+ return data
+
+
+class RedirectResource(Resource):
+ def render(self, request):
+ request.redirect(b'/ubuntu-9.04-desktop-i386.iso.torrent')
+ return b''
+
+
+class TopLevelResource(Resource):
+ def __init__(self):
+ Resource.__init__(self)
+ self.putChild(b'cookie', CookieResource())
+ self.putChild(b'partial', PartialDownload())
+ self.putChild(b'redirect', RedirectResource())
+ self.putChild(
+ b'ubuntu-9.04-desktop-i386.iso.torrent',
+ File(common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent')),
+ )
+
+
+class CoreTestCase(BaseTestCase):
+ def set_up(self):
+ common.set_tmp_config_dir()
+ self.rpcserver = RPCServer(listen=False)
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.clock = task.Clock()
+ self.core.torrentmanager.callLater = self.clock.callLater
+ self.listen_port = 51242
+ return component.start().addCallback(self.start_web_server)
+
+ def start_web_server(self, result):
+ website = Site(TopLevelResource())
+ for dummy in range(10):
+ try:
+ self.webserver = reactor.listenTCP(self.listen_port, website)
+ except CannotListenError as ex:
+ error = ex
+ self.listen_port += 1
+ else:
+ break
+ else:
+ raise error
+
+ return result
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+ return self.webserver.stopListening()
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def add_torrent(self, filename, paused=False):
+ if not paused:
+ # Patch libtorrent flags starting torrents paused
+ self.patch(
+ deluge.core.torrentmanager,
+ 'LT_DEFAULT_ADD_TORRENT_FLAGS',
+ lt.add_torrent_params_flags_t.flag_auto_managed
+ | lt.add_torrent_params_flags_t.flag_update_subscribe
+ | lt.add_torrent_params_flags_t.flag_apply_ip_filter,
+ )
+ options = {'add_paused': paused, 'auto_managed': False}
+ filepath = common.get_test_data_file(filename)
+ with open(filepath, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ return torrent_id
+
+ @defer.inlineCallbacks
+ def test_add_torrent_files(self):
+ options = {}
+ filenames = ['test.torrent', 'test_torrent.file.torrent']
+ files_to_add = []
+ for f in filenames:
+ filename = common.get_test_data_file(f)
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ files_to_add.append((filename, filedump, options))
+ errors = yield self.core.add_torrent_files(files_to_add)
+ self.assertEqual(len(errors), 0)
+
+ @defer.inlineCallbacks
+ def test_add_torrent_files_error_duplicate(self):
+ options = {}
+ filenames = ['test.torrent', 'test.torrent']
+ files_to_add = []
+ for f in filenames:
+ filename = common.get_test_data_file(f)
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ files_to_add.append((filename, filedump, options))
+ errors = yield self.core.add_torrent_files(files_to_add)
+ self.assertEqual(len(errors), 1)
+ self.assertTrue(str(errors[0]).startswith('Torrent already in session'))
+
+ @defer.inlineCallbacks
+ def test_add_torrent_file(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+
+ # Get the info hash from the test.torrent
+ from deluge.bencode import bdecode, bencode
+
+ with open(filename, 'rb') as _file:
+ info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest()
+ self.assertEqual(torrent_id, info_hash)
+
+ def test_add_torrent_file_invalid_filedump(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ self.assertRaises(
+ AddTorrentError, self.core.add_torrent_file, filename, False, options
+ )
+
+ @defer.inlineCallbacks
+ def test_add_torrent_url(self):
+ url = (
+ 'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent'
+ % self.listen_port
+ )
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ torrent_id = yield self.core.add_torrent_url(url, options)
+ self.assertEqual(torrent_id, info_hash)
+
+ def test_add_torrent_url_with_cookie(self):
+ url = 'http://localhost:%d/cookie' % self.listen_port
+ options = {}
+ headers = {'Cookie': 'password=deluge'}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ d = self.core.add_torrent_url(url, options)
+ d.addCallbacks(self.fail, self.assertIsInstance, errbackArgs=(Failure,))
+
+ d = self.core.add_torrent_url(url, options, headers)
+ d.addCallbacks(self.assertEqual, self.fail, callbackArgs=(info_hash,))
+
+ return d
+
+ def test_add_torrent_url_with_redirect(self):
+ url = 'http://localhost:%d/redirect' % self.listen_port
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ d = self.core.add_torrent_url(url, options)
+ d.addCallback(self.assertEqual, info_hash)
+ return d
+
+ def test_add_torrent_url_with_partial_download(self):
+ url = 'http://localhost:%d/partial' % self.listen_port
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ d = self.core.add_torrent_url(url, options)
+ d.addCallback(self.assertEqual, info_hash)
+ return d
+
+ @defer.inlineCallbacks
+ def test_add_torrent_magnet(self):
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+ uri = deluge.common.create_magnet_uri(info_hash)
+ options = {}
+ torrent_id = yield self.core.add_torrent_magnet(uri, options)
+ self.assertEqual(torrent_id, info_hash)
+
+ def test_resume_torrent(self):
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ # Assert paused
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertTrue(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertTrue(r2['paused'])
+
+ self.core.resume_torrent(tid2)
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertTrue(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertFalse(r2['paused'])
+
+ def test_resume_torrent_list(self):
+ """Backward compatibility for list of torrent_ids."""
+ torrent_id = self.add_torrent('test.torrent', paused=True)
+ self.core.resume_torrent([torrent_id])
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ self.assertFalse(result['paused'])
+
+ def test_resume_torrents(self):
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ self.core.resume_torrents([tid1, tid2])
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertFalse(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertFalse(r2['paused'])
+
+ def test_resume_torrents_all(self):
+ """With no torrent_ids param, resume all torrents"""
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ self.core.resume_torrents()
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertFalse(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertFalse(r2['paused'])
+
+ def test_pause_torrent(self):
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+ # Assert not paused
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertFalse(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertFalse(r2['paused'])
+
+ self.core.pause_torrent(tid2)
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertFalse(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertTrue(r2['paused'])
+
+ def test_pause_torrent_list(self):
+ """Backward compatibility for list of torrent_ids."""
+ torrent_id = self.add_torrent('test.torrent')
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ self.assertFalse(result['paused'])
+ self.core.pause_torrent([torrent_id])
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ self.assertTrue(result['paused'])
+
+ def test_pause_torrents(self):
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+
+ self.core.pause_torrents([tid1, tid2])
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertTrue(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertTrue(r2['paused'])
+
+ def test_pause_torrents_all(self):
+ """With no torrent_ids param, pause all torrents"""
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+
+ self.core.pause_torrents()
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ self.assertTrue(r1['paused'])
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ self.assertTrue(r2['paused'])
+
+ def test_prefetch_metadata_existing(self):
+ """Check another call with same magnet returns existing deferred."""
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None)
+
+ def on_result(result):
+ self.assertEqual(result, expected)
+
+ d = self.core.prefetch_magnet_metadata(magnet)
+ d.addCallback(on_result)
+ d2 = self.core.prefetch_magnet_metadata(magnet)
+ d2.addCallback(on_result)
+ self.clock.advance(30)
+ return defer.DeferredList([d, d2])
+
+ @defer.inlineCallbacks
+ def test_remove_torrent(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+ removed = self.core.remove_torrent(torrent_id, True)
+ self.assertTrue(removed)
+ self.assertEqual(len(self.core.get_session_state()), 0)
+
+ def test_remove_torrent_invalid(self):
+ self.assertRaises(
+ InvalidTorrentError,
+ self.core.remove_torrent,
+ 'torrentidthatdoesntexist',
+ True,
+ )
+
+ @defer.inlineCallbacks
+ def test_remove_torrents(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+
+ filename2 = common.get_test_data_file('unicode_filenames.torrent')
+ with open(filename2, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id2 = yield self.core.add_torrent_file_async(
+ filename2, filedump, options
+ )
+ d = self.core.remove_torrents([torrent_id, torrent_id2], True)
+
+ def test_ret(val):
+ self.assertTrue(val == [])
+
+ d.addCallback(test_ret)
+
+ def test_session_state(val):
+ self.assertEqual(len(self.core.get_session_state()), 0)
+
+ d.addCallback(test_session_state)
+ yield d
+
+ @defer.inlineCallbacks
+ def test_remove_torrents_invalid(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(
+ filename, filedump, options
+ )
+ val = yield self.core.remove_torrents(
+ ['invalidid1', 'invalidid2', torrent_id], False
+ )
+ self.assertEqual(len(val), 2)
+ self.assertEqual(
+ val[0], ('invalidid1', 'torrent_id invalidid1 not in session.')
+ )
+ self.assertEqual(
+ val[1], ('invalidid2', 'torrent_id invalidid2 not in session.')
+ )
+
+ def test_get_session_status(self):
+ status = self.core.get_session_status(
+ ['net.recv_tracker_bytes', 'net.sent_tracker_bytes']
+ )
+ self.assertIsInstance(status, dict)
+ self.assertEqual(status['net.recv_tracker_bytes'], 0)
+ self.assertEqual(status['net.sent_tracker_bytes'], 0)
+
+ def test_get_session_status_all(self):
+ status = self.core.get_session_status([])
+ self.assertIsInstance(status, dict)
+ self.assertIn('upload_rate', status)
+ self.assertIn('net.recv_bytes', status)
+
+ def test_get_session_status_depr(self):
+ status = self.core.get_session_status(['num_peers', 'num_unchoked'])
+ self.assertIsInstance(status, dict)
+ self.assertEqual(status['num_peers'], 0)
+ self.assertEqual(status['num_unchoked'], 0)
+
+ def test_get_session_status_rates(self):
+ status = self.core.get_session_status(['upload_rate', 'download_rate'])
+ self.assertIsInstance(status, dict)
+ self.assertEqual(status['upload_rate'], 0)
+
+ def test_get_session_status_ratio(self):
+ status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio'])
+ self.assertIsInstance(status, dict)
+ self.assertEqual(status['write_hit_ratio'], 0.0)
+ self.assertEqual(status['read_hit_ratio'], 0.0)
+
+ def test_get_free_space(self):
+ space = self.core.get_free_space('.')
+ # get_free_space returns long on Python 2 (32-bit).
+ self.assertTrue(isinstance(space, integer_types))
+ self.assertTrue(space >= 0)
+ self.assertEqual(self.core.get_free_space('/someinvalidpath'), -1)
+
+ @pytest.mark.slow
+ def test_test_listen_port(self):
+ d = self.core.test_listen_port()
+
+ def result(r):
+ self.assertTrue(r in (True, False))
+
+ d.addCallback(result)
+ return d
+
+ def test_sanitize_filepath(self):
+ pathlist = {
+ '\\backslash\\path\\': 'backslash/path',
+ ' single_file ': 'single_file',
+ '..': '',
+ '/../..../': '',
+ ' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file',
+ '/ test /\\.. /.file/': 'test/.file',
+ 'mytorrent/subfold/file1': 'mytorrent/subfold/file1',
+ 'Torrent/folder/': 'Torrent/folder',
+ }
+
+ for key in pathlist:
+ self.assertEqual(
+ deluge.core.torrent.sanitize_filepath(key, folder=False), pathlist[key]
+ )
+ self.assertEqual(
+ deluge.core.torrent.sanitize_filepath(key, folder=True),
+ pathlist[key] + '/',
+ )
+
+ def test_get_set_config_values(self):
+ self.assertEqual(
+ self.core.get_config_values(['abc', 'foo']), {'foo': None, 'abc': None}
+ )
+ self.assertEqual(self.core.get_config_value('foobar'), None)
+ self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'})
+ self.assertEqual(
+ self.core.get_config_values(['foo', 'abc']), {'foo': 10, 'abc': 'def'}
+ )
+ self.assertEqual(self.core.get_config_value('foobar'), 'barfoo')
+
+ def test_read_only_config_keys(self):
+ key = 'max_upload_speed'
+ self.core.read_only_config_keys = [key]
+
+ old_value = self.core.get_config_value(key)
+ self.core.set_config({key: old_value + 10})
+ new_value = self.core.get_config_value(key)
+ self.assertEqual(old_value, new_value)
+
+ self.core.read_only_config_keys = None
+
+ def test__create_peer_id(self):
+ self.assertEqual(self.core._create_peer_id('2.0.0'), '-DE200s-')
+ self.assertEqual(self.core._create_peer_id('2.0.0.dev15'), '-DE200D-')
+ self.assertEqual(self.core._create_peer_id('2.0.1rc1'), '-DE201r-')
+ self.assertEqual(self.core._create_peer_id('2.11.0b2'), '-DE2B0b-')
+ self.assertEqual(self.core._create_peer_id('2.4.12b2.dev3'), '-DE24CD-')
diff --git a/deluge/tests/test_decorators.py b/deluge/tests/test_decorators.py
new file mode 100644
index 0000000..7d4bd98
--- /dev/null
+++ b/deluge/tests/test_decorators.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from twisted.trial import unittest
+
+from deluge.decorators import proxy
+
+
+class DecoratorsTestCase(unittest.TestCase):
+ def test_proxy_with_simple_functions(self):
+ def negate(func, *args, **kwargs):
+ return not func(*args, **kwargs)
+
+ @proxy(negate)
+ def something(_bool):
+ return _bool
+
+ @proxy(negate)
+ @proxy(negate)
+ def double_nothing(_bool):
+ return _bool
+
+ self.assertTrue(something(False))
+ self.assertFalse(something(True))
+ self.assertTrue(double_nothing(True))
+ self.assertFalse(double_nothing(False))
+
+ def test_proxy_with_class_method(self):
+ def negate(func, *args, **kwargs):
+ return -func(*args, **kwargs)
+
+ class Test(object):
+ def __init__(self, number):
+ self.number = number
+
+ @proxy(negate)
+ def diff(self, number):
+ return self.number - number
+
+ @proxy(negate)
+ def no_diff(self, number):
+ return self.diff(number)
+
+ t = Test(5)
+ self.assertEqual(t.diff(1), -4)
+ self.assertEqual(t.no_diff(1), 4)
diff --git a/deluge/tests/test_error.py b/deluge/tests/test_error.py
new file mode 100644
index 0000000..c552e94
--- /dev/null
+++ b/deluge/tests/test_error.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from twisted.trial import unittest
+
+import deluge.error
+
+
+class ErrorTestCase(unittest.TestCase):
+ def setUp(self): # NOQA: N803
+ pass
+
+ def tearDown(self): # NOQA: N803
+ pass
+
+ def test_deluge_error(self):
+ msg = 'Some message'
+ e = deluge.error.DelugeError(msg)
+ self.assertEqual(str(e), msg)
+ from twisted.internet.defer import DebugInfo
+
+ del DebugInfo.__del__ # Hides all errors
+ self.assertEqual(e._args, (msg,))
+ self.assertEqual(e._kwargs, {})
+
+ def test_incompatible_client(self):
+ version = '1.3.6'
+ e = deluge.error.IncompatibleClient(version)
+ self.assertEqual(
+ str(e),
+ 'Your deluge client is not compatible with the daemon. \
+Please upgrade your client to %s'
+ % version,
+ )
+
+ def test_not_authorized_error(self):
+ current_level = 5
+ required_level = 10
+ e = deluge.error.NotAuthorizedError(current_level, required_level)
+ self.assertEqual(
+ str(e), 'Auth level too low: %d < %d' % (current_level, required_level)
+ )
+
+ def test_bad_login_error(self):
+ message = 'Login failed'
+ username = 'deluge'
+ e = deluge.error.BadLoginError(message, username)
+ self.assertEqual(str(e), message)
diff --git a/deluge/tests/test_files_tab.py b/deluge/tests/test_files_tab.py
new file mode 100644
index 0000000..23865d7
--- /dev/null
+++ b/deluge/tests/test_files_tab.py
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import pytest
+from twisted.trial import unittest
+
+import deluge.component as component
+from deluge.common import windows_check
+from deluge.configmanager import ConfigManager
+from deluge.i18n import setup_translation
+
+from . import common
+from .basetest import BaseTestCase
+
+libs_available = True
+# Allow running other tests without GTKUI dependencies available
+try:
+ from deluge.ui.gtk3.files_tab import FilesTab
+ from deluge.ui.gtk3.gtkui import DEFAULT_PREFS
+ from deluge.ui.gtk3.mainwindow import MainWindow
+except ImportError:
+ libs_available = False
+
+setup_translation()
+
+
+@pytest.mark.gtkui
+class FilesTabTestCase(BaseTestCase):
+ def set_up(self):
+ if libs_available is False:
+ raise unittest.SkipTest('GTKUI dependencies not available')
+
+ common.set_tmp_config_dir()
+ ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS)
+ self.mainwindow = MainWindow()
+ self.filestab = FilesTab()
+ self.t_id = '1'
+ self.filestab.torrent_id = self.t_id
+ self.index = 1
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def print_treestore(self, title, treestore):
+ root = treestore.get_iter_first()
+ level = 1
+
+ def p_level(s, l):
+ print('%s%s' % (' ' * l, s))
+
+ def _print_treestore_children(i, lvl):
+ while i:
+ p_level(treestore[i][0], lvl)
+ if treestore.iter_children(i):
+ _print_treestore_children(treestore.iter_children(i), lvl + 2)
+ i = treestore.iter_next(i)
+
+ print('\n%s' % title)
+ _print_treestore_children(root, level)
+ print('')
+
+ def verify_treestore(self, treestore, tree):
+ def _verify_treestore(itr, tree_values):
+ i = 0
+ while itr:
+ values = tree_values[i]
+ if treestore[itr][0] != values[0]:
+ return False
+ if treestore.iter_children(itr):
+ if not _verify_treestore(treestore.iter_children(itr), values[1]):
+ return False
+ itr = treestore.iter_next(itr)
+ i += 1
+ return True
+
+ return _verify_treestore(treestore.get_iter_first(), tree)
+
+ def test_files_tab(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '2/test_100.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore,
+ [['1/', [['test_10.txt']]], ['2/', [['test_100.txt']]]],
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ self.assertTrue(ret)
+
+ def test_files_tab2(self):
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/1/test_10.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/1/test_100.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore,
+ [['1/', [['1/', [['test_100.txt'], ['test_10.txt']]]]]],
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ self.assertTrue(ret)
+
+ def test_files_tab3(self):
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/test_100.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]]
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ self.assertTrue(ret)
+
+ def test_files_tab4(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': '1/test_100.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/2/test_100.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore,
+ [['1/', [['2/', [['test_100.txt']]], ['test_10.txt']]]],
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ self.assertTrue(ret)
+
+ def test_files_tab5(self):
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': '2/test_100.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/test_100.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]]
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ self.assertTrue(ret)
diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py
new file mode 100644
index 0000000..a503e46
--- /dev/null
+++ b/deluge/tests/test_httpdownloader.py
@@ -0,0 +1,266 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import tempfile
+from email.utils import formatdate
+
+from twisted.internet import reactor
+from twisted.internet.error import CannotListenError
+from twisted.python.failure import Failure
+from twisted.trial import unittest
+from twisted.web.error import PageRedirect
+from twisted.web.http import NOT_MODIFIED
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory, Site
+from twisted.web.util import redirectTo
+
+from deluge.common import windows_check
+from deluge.httpdownloader import download_file
+from deluge.log import setup_logger
+
+temp_dir = tempfile.mkdtemp()
+
+
+def fname(name):
+ return '%s/%s' % (temp_dir, name)
+
+
+class RedirectResource(Resource):
+ def render(self, request):
+ url = self.get_url().encode('utf8')
+ return redirectTo(url, request)
+
+
+class RenameResource(Resource):
+ def render(self, request):
+ filename = request.args.get(b'filename', [b'renamed_file'])[0]
+ request.setHeader(b'Content-Type', b'text/plain')
+ request.setHeader(b'Content-Disposition', b'attachment; filename=' + filename)
+ return b'This file should be called ' + filename
+
+
+class AttachmentResource(Resource):
+ def render(self, request):
+ request.setHeader(b'Content-Type', b'text/plain')
+ request.setHeader(b'Content-Disposition', b'attachment')
+ return b'Attachement with no filename set'
+
+
+class CookieResource(Resource):
+ def render(self, request):
+ request.setHeader(b'Content-Type', b'text/plain')
+ if request.getCookie(b'password') is None:
+ return b'Password cookie not set!'
+
+ if request.getCookie(b'password') == b'deluge':
+ return b'COOKIE MONSTER!'
+
+ return request.getCookie('password')
+
+
+class GzipResource(Resource):
+ def getChild(self, path, request): # NOQA: N802
+ return EncodingResourceWrapper(self, [GzipEncoderFactory()])
+
+ def render(self, request):
+ message = request.args.get(b'msg', [b'EFFICIENCY!'])[0]
+ request.setHeader(b'Content-Type', b'text/plain')
+ return message
+
+
+class PartialDownloadResource(Resource):
+ def __init__(self, *args, **kwargs):
+ Resource.__init__(self)
+ self.render_count = 0
+
+ def render(self, request):
+ # encoding = request.requestHeaders._rawHeaders.get('accept-encoding', None)
+ if self.render_count == 0:
+ request.setHeader(b'content-length', b'5')
+ else:
+ request.setHeader(b'content-length', b'3')
+
+ # if encoding == "deflate, gzip, x-gzip":
+ request.write('abc')
+ self.render_count += 1
+ return ''
+
+
+class TopLevelResource(Resource):
+ def __init__(self):
+ Resource.__init__(self)
+ self.putChild(b'cookie', CookieResource())
+ self.putChild(b'gzip', GzipResource())
+ self.redirect_rsrc = RedirectResource()
+ self.putChild(b'redirect', self.redirect_rsrc)
+ self.putChild(b'rename', RenameResource())
+ self.putChild(b'attachment', AttachmentResource())
+ self.putChild(b'partial', PartialDownloadResource())
+
+ def getChild(self, path, request): # NOQA: N802
+ if not path:
+ return self
+ else:
+ return Resource.getChild(self, path, request)
+
+ def render(self, request):
+ if request.getHeader('If-Modified-Since'):
+ request.setResponseCode(NOT_MODIFIED)
+ return b'<h1>Deluge HTTP Downloader tests webserver here</h1>'
+
+
+class DownloadFileTestCase(unittest.TestCase):
+ def get_url(self, path=''):
+ return 'http://localhost:%d/%s' % (self.listen_port, path)
+
+ def setUp(self): # NOQA
+ setup_logger('warning', fname('log_file'))
+ self.website = Site(TopLevelResource())
+ self.listen_port = 51242
+ self.website.resource.redirect_rsrc.get_url = self.get_url
+ for dummy in range(10):
+ try:
+ self.webserver = reactor.listenTCP(self.listen_port, self.website)
+ except CannotListenError as ex:
+ error = ex
+ self.listen_port += 1
+ else:
+ break
+ else:
+ raise error
+
+ def tearDown(self): # NOQA
+ return self.webserver.stopListening()
+
+ def assertContains(self, filename, contents): # NOQA
+ with open(filename) as _file:
+ try:
+ self.assertEqual(_file.read(), contents)
+ except Exception as ex:
+ self.fail(ex)
+ return filename
+
+ def assertNotContains(self, filename, contents, file_mode=''): # NOQA
+ with open(filename, file_mode) as _file:
+ try:
+ self.assertNotEqual(_file.read(), contents)
+ except Exception as ex:
+ self.fail(ex)
+ return filename
+
+ def test_download(self):
+ d = download_file(self.get_url(), fname('index.html'))
+ d.addCallback(self.assertEqual, fname('index.html'))
+ return d
+
+ def test_download_without_required_cookies(self):
+ url = self.get_url('cookie')
+ d = download_file(url, fname('none'))
+ d.addCallback(self.fail)
+ d.addErrback(self.assertIsInstance, Failure)
+ return d
+
+ def test_download_with_required_cookies(self):
+ url = self.get_url('cookie')
+ cookie = {'cookie': 'password=deluge'}
+ d = download_file(url, fname('monster'), headers=cookie)
+ d.addCallback(self.assertEqual, fname('monster'))
+ d.addCallback(self.assertContains, 'COOKIE MONSTER!')
+ return d
+
+ def test_download_with_rename(self):
+
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+
+ url = self.get_url('rename?filename=renamed')
+ d = download_file(url, fname('original'))
+ d.addCallback(self.assertEqual, fname('renamed'))
+ d.addCallback(self.assertContains, 'This file should be called renamed')
+ return d
+
+ def test_download_with_rename_exists(self):
+
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+
+ open(fname('renamed'), 'w').close()
+ url = self.get_url('rename?filename=renamed')
+ d = download_file(url, fname('original'))
+ d.addCallback(self.assertEqual, fname('renamed-1'))
+ d.addCallback(self.assertContains, 'This file should be called renamed')
+ return d
+
+ def test_download_with_rename_sanitised(self):
+
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+
+ url = self.get_url('rename?filename=/etc/passwd')
+ d = download_file(url, fname('original'))
+ d.addCallback(self.assertEqual, fname('passwd'))
+ d.addCallback(self.assertContains, 'This file should be called /etc/passwd')
+ return d
+
+ def test_download_with_attachment_no_filename(self):
+ url = self.get_url('attachment')
+ d = download_file(url, fname('original'))
+ d.addCallback(self.assertEqual, fname('original'))
+ d.addCallback(self.assertContains, 'Attachement with no filename set')
+ return d
+
+ def test_download_with_rename_prevented(self):
+ url = self.get_url('rename?filename=spam')
+ d = download_file(url, fname('forced'), force_filename=True)
+ d.addCallback(self.assertEqual, fname('forced'))
+ d.addCallback(self.assertContains, 'This file should be called spam')
+ return d
+
+ def test_download_with_gzip_encoding(self):
+ url = self.get_url('gzip?msg=success')
+ d = download_file(url, fname('gzip_encoded'))
+ d.addCallback(self.assertContains, 'success')
+ return d
+
+ def test_download_with_gzip_encoding_disabled(self):
+ url = self.get_url('gzip?msg=unzip')
+ d = download_file(url, fname('gzip_encoded'), allow_compression=False)
+ d.addCallback(self.assertContains, 'unzip')
+ return d
+
+ def test_page_redirect_unhandled(self):
+ url = self.get_url('redirect')
+ d = download_file(url, fname('none'))
+ d.addCallback(self.fail)
+
+ def on_redirect(failure):
+ self.assertTrue(type(failure), PageRedirect)
+
+ d.addErrback(on_redirect)
+ return d
+
+ def test_page_redirect(self):
+ url = self.get_url('redirect')
+ d = download_file(url, fname('none'), handle_redirects=True)
+ d.addCallback(self.assertEqual, fname('none'))
+ d.addErrback(self.fail)
+ return d
+
+ def test_page_not_found(self):
+ d = download_file(self.get_url('page/not/found'), fname('none'))
+ d.addCallback(self.fail)
+ d.addErrback(self.assertIsInstance, Failure)
+ return d
+
+ def test_page_not_modified(self):
+ headers = {'If-Modified-Since': formatdate(usegmt=True)}
+ d = download_file(self.get_url(), fname('index.html'), headers=headers)
+ d.addCallback(self.fail)
+ d.addErrback(self.assertIsInstance, Failure)
+ return d
diff --git a/deluge/tests/test_json_api.py b/deluge/tests/test_json_api.py
new file mode 100644
index 0000000..1da64bf
--- /dev/null
+++ b/deluge/tests/test_json_api.py
@@ -0,0 +1,291 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import json as json_lib
+
+from mock import MagicMock
+from twisted.internet import defer
+from twisted.web import server
+from twisted.web.http import Request
+
+import deluge.common
+import deluge.component as component
+import deluge.ui.web.auth
+import deluge.ui.web.json_api
+from deluge.error import DelugeError
+from deluge.ui.client import client
+from deluge.ui.web.auth import Auth
+from deluge.ui.web.json_api import JSON, JSONException
+
+from . import common
+from .basetest import BaseTestCase
+from .common_web import WebServerMockBase
+from .daemon_base import DaemonBase
+
+common.disable_new_release_check()
+
+
+class JSONBase(BaseTestCase, DaemonBase):
+ def connect_client(self, *args, **kwargs):
+ return client.connect(
+ 'localhost',
+ self.listen_port,
+ username=kwargs.get('user', ''),
+ password=kwargs.get('password', ''),
+ )
+
+ def disconnect_client(self, *args):
+ return client.disconnect()
+
+ def tear_down(self):
+ d = component.shutdown()
+ d.addCallback(self.disconnect_client)
+ d.addCallback(self.terminate_core)
+ return d
+
+
+class JSONTestCase(JSONBase):
+ def set_up(self):
+ d = self.common_set_up()
+ d.addCallback(self.start_core)
+ d.addCallbacks(self.connect_client, self.terminate_core)
+ return d
+
+ @defer.inlineCallbacks
+ def test_get_remote_methods(self):
+ json = JSON()
+ methods = yield json.get_remote_methods()
+ self.assertEqual(type(methods), tuple)
+ self.assertTrue(len(methods) > 0)
+
+ def test_render_fail_disconnected(self):
+ json = JSON()
+ request = MagicMock()
+ request.method = b'POST'
+ request._disconnected = True
+ # When disconnected, returns empty string
+ self.assertEqual(json.render(request), '')
+
+ def test_render_fail(self):
+ json = JSON()
+ request = MagicMock()
+ request.method = b'POST'
+
+ def write(response_str):
+ request.write_was_called = True
+ response = json_lib.loads(response_str.decode())
+ self.assertEqual(response['result'], None)
+ self.assertEqual(response['id'], None)
+ self.assertEqual(
+ response['error']['message'], 'JSONException: JSON not decodable'
+ )
+ self.assertEqual(response['error']['code'], 5)
+
+ request.write = write
+ request.write_was_called = False
+ request._disconnected = False
+ request.getHeader.return_value = b'application/json'
+ self.assertEqual(json.render(request), server.NOT_DONE_YET)
+ self.assertTrue(request.write_was_called)
+
+ def test_handle_request_invalid_method(self):
+ json = JSON()
+ request = MagicMock()
+ json_data = {'method': 'no-existing-module.test', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ request_id, result, error = json._handle_request(request)
+ self.assertEqual(error, {'message': 'Unknown method', 'code': 2})
+
+ def test_handle_request_invalid_json_request(self):
+ json = JSON()
+ request = MagicMock()
+ json_data = {'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ self.assertRaises(JSONException, json._handle_request, request)
+ json_data = {'method': 'some.method', 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ self.assertRaises(JSONException, json._handle_request, request)
+ json_data = {'method': 'some.method', 'id': 0}
+ request.json = json_lib.dumps(json_data).encode()
+ self.assertRaises(JSONException, json._handle_request, request)
+
+ def test_on_json_request_invalid_content_type(self):
+ """Test for exception with content type not application/json"""
+ json = JSON()
+ request = MagicMock()
+ request.getHeader.return_value = b'text/plain'
+ json_data = {'method': 'some.method', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ self.assertRaises(JSONException, json._on_json_request, request)
+
+
+class JSONCustomUserTestCase(JSONBase):
+ def set_up(self):
+ d = self.common_set_up()
+ d.addCallback(self.start_core)
+ return d
+
+ @defer.inlineCallbacks
+ def test_handle_request_auth_error(self):
+ yield self.connect_client()
+ json = JSON()
+ auth_conf = {'session_timeout': 10, 'sessions': {}}
+ Auth(auth_conf) # Must create the component
+
+ # Must be called to update remote methods in json object
+ yield json.get_remote_methods()
+
+ request = MagicMock()
+ request.getCookie = MagicMock(return_value=b'bad_value')
+ json_data = {'method': 'core.get_libtorrent_version', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ request_id, result, error = json._handle_request(request)
+ self.assertEqual(error, {'message': 'Not authenticated', 'code': 1})
+
+
+class RPCRaiseDelugeErrorJSONTestCase(JSONBase):
+ def set_up(self):
+ d = self.common_set_up()
+ custom_script = """
+ from deluge.error import DelugeError
+ from deluge.core.rpcserver import export
+ class TestClass(object):
+ @export()
+ def test(self):
+ raise DelugeError('DelugeERROR')
+
+ test = TestClass()
+ daemon.rpcserver.register_object(test)
+"""
+ d.addCallback(self.start_core, custom_script=custom_script)
+ d.addCallbacks(self.connect_client, self.terminate_core)
+ return d
+
+ @defer.inlineCallbacks
+ def test_handle_request_method_raise_delugeerror(self):
+ json = JSON()
+
+ def get_session_id(s_id):
+ return s_id
+
+ self.patch(deluge.ui.web.auth, 'get_session_id', get_session_id)
+ auth_conf = {'session_timeout': 10, 'sessions': {}}
+ auth = Auth(auth_conf)
+ request = Request(MagicMock(), False)
+ request.base = b''
+ auth._create_session(request)
+ methods = yield json.get_remote_methods()
+ # Verify the function has been registered
+ self.assertTrue('testclass.test' in methods)
+
+ request = MagicMock()
+ session_id = list(auth.config['sessions'])[0]
+ request.getCookie = MagicMock(return_value=session_id.encode())
+ json_data = {'method': 'testclass.test', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ request_id, result, error = json._handle_request(request)
+ result.addCallback(self.fail)
+
+ def on_error(error):
+ self.assertEqual(error.type, DelugeError)
+
+ result.addErrback(on_error)
+ yield result
+
+
+class JSONRequestFailedTestCase(JSONBase, WebServerMockBase):
+ def set_up(self):
+ d = self.common_set_up()
+ custom_script = """
+ from deluge.error import DelugeError
+ from deluge.core.rpcserver import export
+ from twisted.internet import reactor, task
+ class TestClass(object):
+ @export()
+ def test(self):
+ def test_raise_error():
+ raise DelugeError('DelugeERROR')
+
+ return task.deferLater(reactor, 1, test_raise_error)
+
+ test = TestClass()
+ daemon.rpcserver.register_object(test)
+"""
+ from twisted.internet.defer import Deferred
+
+ extra_callback = {
+ 'deferred': Deferred(),
+ 'types': ['stderr'],
+ 'timeout': 10,
+ 'triggers': [
+ {
+ 'expr': 'in test_raise_error',
+ 'value': lambda reader, data, data_all: 'Test',
+ }
+ ],
+ }
+
+ def on_test_raise(*args):
+ self.assertTrue('Unhandled error in Deferred:' in self.core.stderr_out)
+ self.assertTrue('in test_raise_error' in self.core.stderr_out)
+
+ extra_callback['deferred'].addCallback(on_test_raise)
+ d.addCallback(
+ self.start_core,
+ custom_script=custom_script,
+ print_stdout=False,
+ print_stderr=False,
+ timeout=5,
+ extra_callbacks=[extra_callback],
+ )
+ d.addCallbacks(self.connect_client, self.terminate_core)
+ return d
+
+ @defer.inlineCallbacks
+ def test_render_on_rpc_request_failed(self):
+ json = JSON()
+
+ methods = yield json.get_remote_methods()
+ # Verify the function has been registered
+ self.assertTrue('testclass.test' in methods)
+
+ request = MagicMock()
+
+ # Circumvent authentication
+ auth = Auth({})
+ self.mock_authentication_ignore(auth)
+
+ def write(response_str):
+ request.write_was_called = True
+ response = json_lib.loads(response_str.decode())
+ self.assertEqual(response['result'], None, 'BAD RESULT')
+ self.assertEqual(response['id'], 0)
+ self.assertEqual(
+ response['error']['message'],
+ 'Failure: [Failure instance: Traceback (failure with no frames):'
+ " <class 'deluge.error.DelugeError'>: DelugeERROR\n]",
+ )
+ self.assertEqual(response['error']['code'], 4)
+
+ request.write = write
+ request.write_was_called = False
+ request._disconnected = False
+ request.getHeader.return_value = b'application/json'
+ json_data = {'method': 'testclass.test', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ d = json._on_json_request(request)
+
+ def on_success(arg):
+ self.assertEqual(arg, server.NOT_DONE_YET)
+ return True
+
+ d.addCallbacks(on_success, self.fail)
+ yield d
diff --git a/deluge/tests/test_log.py b/deluge/tests/test_log.py
new file mode 100644
index 0000000..572693b
--- /dev/null
+++ b/deluge/tests/test_log.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 Calum Lind <calumlind@gmail.com>
+# Copyright (C) 2010 Pedro Algarvio <ufs@ufsoft.org>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import logging
+import warnings
+
+from deluge.log import setup_logger
+
+from .basetest import BaseTestCase
+
+
+class LogTestCase(BaseTestCase):
+ def set_up(self):
+ setup_logger(logging.DEBUG)
+
+ def tear_down(self):
+ setup_logger('none')
+
+ def test_old_log_deprecation_warning(self):
+ from deluge.log import LOG
+
+ with warnings.catch_warnings(record=True) as w:
+ # Cause all warnings to always be triggered.
+ warnings.simplefilter('always')
+ LOG.debug('foo')
+ self.assertEqual(w[-1].category, DeprecationWarning)
+
+ # def test_twisted_error_log(self):
+ # from twisted.internet import defer
+ # import deluge.component as component
+ # from deluge.core.eventmanager import EventManager
+ # EventManager()
+ #
+ # d = component.start()
+ #
+ # @defer.inlineCallbacks
+ # def call(*args):
+ # yield component.pause(["EventManager"])
+ # yield component.start(["EventManager"])
+ #
+ # d.addCallback(call)
+ # return d
diff --git a/deluge/tests/test_maketorrent.py b/deluge/tests/test_maketorrent.py
new file mode 100644
index 0000000..4e00996
--- /dev/null
+++ b/deluge/tests/test_maketorrent.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import os
+import tempfile
+
+from twisted.trial import unittest
+
+from deluge import maketorrent
+from deluge.common import windows_check
+
+
+def check_torrent(filename):
+ # Test loading with libtorrent to make sure it's valid
+ from deluge._libtorrent import lt
+
+ lt.torrent_info(filename)
+
+ # Test loading with our internal TorrentInfo class
+ from deluge.ui.common import TorrentInfo
+
+ TorrentInfo(filename)
+
+
+class MakeTorrentTestCase(unittest.TestCase):
+ def test_save_multifile(self):
+ # Create a temporary folder for torrent creation
+ tmp_path = tempfile.mkdtemp()
+ with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file:
+ _file.write(b'a' * (312 * 1024))
+ with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file:
+ _file.write(b'b' * (2354 * 1024))
+ with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file:
+ _file.write(b'c' * (11 * 1024))
+
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_path
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(os.path.join(tmp_path, 'file_A'))
+ os.remove(os.path.join(tmp_path, 'file_B'))
+ os.remove(os.path.join(tmp_path, 'file_C'))
+ os.rmdir(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
+
+ def test_save_singlefile(self):
+ if windows_check():
+ raise unittest.SkipTest('on windows file not released')
+ tmp_data = tempfile.mkstemp('testdata')[1]
+ with open(tmp_data, 'wb') as _file:
+ _file.write(b'a' * (2314 * 1024))
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_data
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(tmp_data)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
+
+ def test_save_multifile_padded(self):
+ # Create a temporary folder for torrent creation
+ tmp_path = tempfile.mkdtemp()
+ with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file:
+ _file.write(b'a' * (312 * 1024))
+ with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file:
+ _file.write(b'b' * (2354 * 1024))
+ with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file:
+ _file.write(b'c' * (11 * 1024))
+
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_path
+ t.pad_files = True
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(os.path.join(tmp_path, 'file_A'))
+ os.remove(os.path.join(tmp_path, 'file_B'))
+ os.remove(os.path.join(tmp_path, 'file_C'))
+ os.rmdir(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
diff --git a/deluge/tests/test_metafile.py b/deluge/tests/test_metafile.py
new file mode 100644
index 0000000..fc6507c
--- /dev/null
+++ b/deluge/tests/test_metafile.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import os
+import tempfile
+
+from twisted.trial import unittest
+
+from deluge import metafile
+from deluge.common import windows_check
+
+
+def check_torrent(filename):
+ # Test loading with libtorrent to make sure it's valid
+ from deluge._libtorrent import lt
+
+ lt.torrent_info(filename)
+
+ # Test loading with our internal TorrentInfo class
+ from deluge.ui.common import TorrentInfo
+
+ TorrentInfo(filename)
+
+
+class MetafileTestCase(unittest.TestCase):
+ def test_save_multifile(self):
+ # Create a temporary folder for torrent creation
+ tmp_path = tempfile.mkdtemp()
+ with open(os.path.join(tmp_path, 'file_A'), 'wb') as tmp_file:
+ tmp_file.write(b'a' * (312 * 1024))
+ with open(os.path.join(tmp_path, 'file_B'), 'wb') as tmp_file:
+ tmp_file.write(b'b' * (2354 * 1024))
+ with open(os.path.join(tmp_path, 'file_C'), 'wb') as tmp_file:
+ tmp_file.write(b'c' * (11 * 1024))
+
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(os.path.join(tmp_path, 'file_A'))
+ os.remove(os.path.join(tmp_path, 'file_B'))
+ os.remove(os.path.join(tmp_path, 'file_C'))
+ os.rmdir(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
+
+ def test_save_singlefile(self):
+ if windows_check():
+ raise unittest.SkipTest('on windows \\ != / for path names')
+ tmp_path = tempfile.mkstemp('testdata')[1]
+ with open(tmp_path, 'wb') as tmp_file:
+ tmp_file.write(b'a' * (2314 * 1024))
+
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
diff --git a/deluge/tests/test_plugin_metadata.py b/deluge/tests/test_plugin_metadata.py
new file mode 100644
index 0000000..436fc2c
--- /dev/null
+++ b/deluge/tests/test_plugin_metadata.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 Calum Lind <calumlind@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from deluge.pluginmanagerbase import PluginManagerBase
+
+from . import common
+from .basetest import BaseTestCase
+
+
+class PluginManagerBaseTestCase(BaseTestCase):
+ def set_up(self):
+ common.set_tmp_config_dir()
+
+ def test_get_plugin_info(self):
+ pm = PluginManagerBase('core.conf', 'deluge.plugin.core')
+ for p in pm.get_available_plugins():
+ for key, value in pm.get_plugin_info(p).items():
+ self.assertTrue(isinstance('%s: %s' % (key, value), ''.__class__))
+
+ def test_get_plugin_info_invalid_name(self):
+ pm = PluginManagerBase('core.conf', 'deluge.plugin.core')
+ for key, value in pm.get_plugin_info('random').items():
+ self.assertEqual(value, 'not available')
diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py
new file mode 100644
index 0000000..02f9af0
--- /dev/null
+++ b/deluge/tests/test_rpcserver.py
@@ -0,0 +1,113 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Bro <bro.development@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import deluge.component as component
+import deluge.error
+from deluge.common import get_localhost_auth
+from deluge.core import rpcserver
+from deluge.core.authmanager import AuthManager
+from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer
+from deluge.log import setup_logger
+
+from .basetest import BaseTestCase
+
+setup_logger('none')
+
+
+class DelugeRPCProtocolTester(DelugeRPCProtocol):
+
+ messages = []
+
+ def transfer_message(self, data):
+ self.messages.append(data)
+
+
+class RPCServerTestCase(BaseTestCase):
+ def set_up(self):
+ self.rpcserver = RPCServer(listen=False)
+ self.rpcserver.factory.protocol = DelugeRPCProtocolTester
+ self.factory = self.rpcserver.factory
+ self.session_id = '0'
+ self.request_id = 11
+ self.protocol = self.rpcserver.factory.protocol()
+ self.protocol.factory = self.factory
+ self.protocol.transport = self.protocol
+ self.factory.session_protocols[self.session_id] = self.protocol
+ self.factory.authorized_sessions[self.session_id] = None
+ self.factory.interested_events[self.session_id] = ['TorrentFolderRenamedEvent']
+ self.protocol.sessionno = self.session_id
+ return component.start()
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def test_emit_event_for_session_id(self):
+ torrent_id = '12'
+ from deluge.event import TorrentFolderRenamedEvent
+
+ data = [torrent_id, 'new name', 'old name']
+ e = TorrentFolderRenamedEvent(*data)
+ self.rpcserver.emit_event_for_session_id(self.session_id, e)
+ msg = self.protocol.messages.pop()
+ self.assertEqual(msg[0], rpcserver.RPC_EVENT, str(msg))
+ self.assertEqual(msg[1], 'TorrentFolderRenamedEvent', str(msg))
+ self.assertEqual(msg[2], data, str(msg))
+
+ def test_invalid_client_login(self):
+ self.protocol.dispatch(self.request_id, 'daemon.login', [1], {})
+ msg = self.protocol.messages.pop()
+ self.assertEqual(msg[0], rpcserver.RPC_ERROR)
+ self.assertEqual(msg[1], self.request_id)
+
+ def test_valid_client_login(self):
+ self.authmanager = AuthManager()
+ auth = get_localhost_auth()
+ self.protocol.dispatch(
+ self.request_id, 'daemon.login', auth, {'client_version': 'Test'}
+ )
+ msg = self.protocol.messages.pop()
+ self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg))
+ self.assertEqual(msg[1], self.request_id, str(msg))
+ self.assertEqual(msg[2], rpcserver.AUTH_LEVEL_ADMIN, str(msg))
+
+ def test_client_login_error(self):
+ # This test causes error log prints while running the test...
+ self.protocol.transport = None # This should cause AttributeError
+ self.authmanager = AuthManager()
+ auth = get_localhost_auth()
+ self.protocol.dispatch(
+ self.request_id, 'daemon.login', auth, {'client_version': 'Test'}
+ )
+ msg = self.protocol.messages.pop()
+ self.assertEqual(msg[0], rpcserver.RPC_ERROR)
+ self.assertEqual(msg[1], self.request_id)
+ self.assertEqual(msg[2], 'WrappedException')
+ self.assertEqual(msg[3][1], 'AttributeError')
+
+ def test_client_invalid_method_call(self):
+ self.authmanager = AuthManager()
+ auth = get_localhost_auth()
+ self.protocol.dispatch(self.request_id, 'invalid_function', auth, {})
+ msg = self.protocol.messages.pop()
+ self.assertEqual(msg[0], rpcserver.RPC_ERROR)
+ self.assertEqual(msg[1], self.request_id)
+ self.assertEqual(msg[2], 'WrappedException')
+ self.assertEqual(msg[3][1], 'AttributeError')
+
+ def test_daemon_info(self):
+ self.protocol.dispatch(self.request_id, 'daemon.info', [], {})
+ msg = self.protocol.messages.pop()
+ self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg))
+ self.assertEqual(msg[1], self.request_id, str(msg))
+ self.assertEqual(msg[2], deluge.common.get_version(), str(msg))
diff --git a/deluge/tests/test_security.py b/deluge/tests/test_security.py
new file mode 100644
index 0000000..3794049
--- /dev/null
+++ b/deluge/tests/test_security.py
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import os
+
+import pytest
+from twisted.internet.utils import getProcessOutputAndValue
+
+import deluge.component as component
+import deluge.ui.web.server
+from deluge import configmanager
+from deluge.common import windows_check
+from deluge.ui.web.server import DelugeWeb
+
+from .basetest import BaseTestCase
+from .common import get_test_data_file
+from .common_web import WebServerTestBase
+from .daemon_base import DaemonBase
+
+SECURITY_TESTS = bool(os.getenv('SECURITY_TESTS', False))
+
+
+class SecurityBaseTestCase(object):
+ if windows_check():
+ skip = 'windows can`t run .sh files'
+ elif not SECURITY_TESTS:
+ skip = 'Skipping security tests'
+
+ http_err = 'can\'t run http tests on daemon'
+
+ def __init__(self):
+ self.home_dir = os.path.expanduser('~')
+ self.port = 8112
+
+ def _run_test(self, test):
+ d = getProcessOutputAndValue(
+ 'bash',
+ [
+ get_test_data_file('testssl.sh'),
+ '--quiet',
+ '--nodns',
+ '--color',
+ '0',
+ test,
+ '127.0.0.1:%d' % self.port,
+ ],
+ )
+
+ def on_result(results):
+
+ if test == '-e':
+ results = results[0].split('\n')[7:-6]
+ self.assertTrue(len(results) > 3)
+ else:
+ self.assertIn('OK', results[0])
+ self.assertNotIn('NOT ok', results[0])
+
+ d.addCallback(on_result)
+ return d
+
+ def test_secured_webserver_protocol(self):
+ return self._run_test('-p')
+
+ def test_secured_webserver_standard_ciphers(self):
+ return self._run_test('-s')
+
+ def test_secured_webserver_heartbleed_vulnerability(self):
+ return self._run_test('-H')
+
+ def test_secured_webserver_css_injection_vulnerability(self):
+ return self._run_test('-I')
+
+ def test_secured_webserver_ticketbleed_vulnerability(self):
+ return self._run_test('-T')
+
+ def test_secured_webserver_renegotiation_vulnerabilities(self):
+ return self._run_test('-R')
+
+ def test_secured_webserver_crime_vulnerability(self):
+ return self._run_test('-C')
+
+ def test_secured_webserver_breach_vulnerability(self):
+ return self._run_test('-B')
+
+ def test_secured_webserver_poodle_vulnerability(self):
+ return self._run_test('-O')
+
+ def test_secured_webserver_tls_fallback_scsv_mitigation_vulnerability(self):
+ return self._run_test('-Z')
+
+ def test_secured_webserver_sweet32_vulnerability(self):
+ return self._run_test('-W')
+
+ def test_secured_webserver_beast_vulnerability(self):
+ return self._run_test('-A')
+
+ def test_secured_webserver_lucky13_vulnerability(self):
+ return self._run_test('-L')
+
+ def test_secured_webserver_freak_vulnerability(self):
+ return self._run_test('-F')
+
+ def test_secured_webserver_logjam_vulnerability(self):
+ return self._run_test('-J')
+
+ def test_secured_webserver_drown_vulnerability(self):
+ return self._run_test('-D')
+
+ def test_secured_webserver_forward_secrecy_settings(self):
+ return self._run_test('-f')
+
+ def test_secured_webserver_rc4_ciphers(self):
+ return self._run_test('-4')
+
+ def test_secured_webserver_preference(self):
+ return self._run_test('-P')
+
+ def test_secured_webserver_headers(self):
+ return self._run_test('-h')
+
+ def test_secured_webserver_ciphers(self):
+ return self._run_test('-e')
+
+
+@pytest.mark.security
+class DaemonSecurityTestCase(BaseTestCase, DaemonBase, SecurityBaseTestCase):
+
+ if windows_check():
+ skip = 'windows can\'t start_core not enough arguments for format string'
+
+ def __init__(self, testname):
+ super(DaemonSecurityTestCase, self).__init__(testname)
+ DaemonBase.__init__(self)
+ SecurityBaseTestCase.__init__(self)
+
+ def setUp(self):
+ skip = False
+ for not_http_test in ('breach', 'headers', 'ticketbleed'):
+ if not_http_test in self.id().split('.')[-1]:
+ self.skipTest(SecurityBaseTestCase.http_err)
+ skip = True
+ if not skip:
+ super(DaemonSecurityTestCase, self).setUp()
+
+ def set_up(self):
+ d = self.common_set_up()
+ self.port = self.listen_port
+ d.addCallback(self.start_core)
+ d.addErrback(self.terminate_core)
+ return d
+
+ def tear_down(self):
+ d = component.shutdown()
+ d.addCallback(self.terminate_core)
+ return d
+
+
+@pytest.mark.security
+class WebUISecurityTestBase(WebServerTestBase, SecurityBaseTestCase):
+ def __init__(self, testname):
+ super(WebUISecurityTestBase, self).__init__(testname)
+ SecurityBaseTestCase.__init__(self)
+
+ def start_webapi(self, arg):
+ self.port = self.webserver_listen_port = 8999
+
+ config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy()
+ config_defaults['port'] = self.webserver_listen_port
+ config_defaults['https'] = True
+ self.config = configmanager.ConfigManager('web.conf', config_defaults)
+
+ self.deluge_web = DelugeWeb(daemon=False)
+
+ host = list(self.deluge_web.web_api.hostlist.config['hosts'][0])
+ host[2] = self.listen_port
+ self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host)
+ self.host_id = host[0]
+ self.deluge_web.start()
diff --git a/deluge/tests/test_sessionproxy.py b/deluge/tests/test_sessionproxy.py
new file mode 100644
index 0000000..03f3cc2
--- /dev/null
+++ b/deluge/tests/test_sessionproxy.py
@@ -0,0 +1,164 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from twisted.internet.defer import maybeDeferred, succeed
+from twisted.internet.task import Clock
+
+import deluge.component as component
+import deluge.ui.sessionproxy
+
+from .basetest import BaseTestCase
+
+
+class Core(object):
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.torrents = {}
+ self.torrents['a'] = {'key1': 1, 'key2': 2, 'key3': 3}
+ self.torrents['b'] = {'key1': 1, 'key2': 2, 'key3': 3}
+ self.torrents['c'] = {'key1': 1, 'key2': 2, 'key3': 3}
+ self.prev_status = {}
+
+ def get_session_state(self):
+ return maybeDeferred(self.torrents.keys)
+
+ def get_torrent_status(self, torrent_id, keys, diff=False):
+ if not keys:
+ keys = list(self.torrents[torrent_id])
+
+ if not diff:
+ ret = {}
+ for key in keys:
+ ret[key] = self.torrents[torrent_id][key]
+
+ return succeed(ret)
+
+ else:
+ ret = {}
+ if torrent_id in self.prev_status:
+ for key in keys:
+ if (
+ self.prev_status[torrent_id][key]
+ != self.torrents[torrent_id][key]
+ ):
+ ret[key] = self.torrents[torrent_id][key]
+ else:
+ ret = self.torrents[torrent_id]
+ self.prev_status[torrent_id] = dict(self.torrents[torrent_id])
+ return succeed(ret)
+
+ def get_torrents_status(self, filter_dict, keys, diff=False):
+ if not filter_dict:
+ filter_dict['id'] = list(self.torrents)
+ if not keys:
+ keys = list(self.torrents['a'])
+ if not diff:
+ if 'id' in filter_dict:
+ torrents = filter_dict['id']
+ ret = {}
+ for torrent in torrents:
+ ret[torrent] = {}
+ for key in keys:
+ ret[torrent][key] = self.torrents[torrent][key]
+ return succeed(ret)
+ else:
+ if 'id' in filter_dict:
+ torrents = filter_dict['id']
+ ret = {}
+ for torrent in torrents:
+ ret[torrent] = {}
+ if torrent in self.prev_status:
+ for key in self.prev_status[torrent]:
+ if (
+ self.prev_status[torrent][key]
+ != self.torrents[torrent][key]
+ ):
+ ret[torrent][key] = self.torrents[torrent][key]
+ else:
+ ret[torrent] = dict(self.torrents[torrent])
+
+ self.prev_status[torrent] = dict(self.torrents[torrent])
+ return succeed(ret)
+
+
+class Client(object):
+ def __init__(self):
+ self.core = Core()
+
+ def __noop__(self, *args, **kwargs):
+ return None
+
+ def __getattr__(self, *args, **kwargs):
+ return self.__noop__
+
+
+client = Client()
+
+
+class SessionProxyTestCase(BaseTestCase):
+ def set_up(self):
+ self.clock = Clock()
+ self.patch(deluge.ui.sessionproxy, 'time', self.clock.seconds)
+ self.patch(deluge.ui.sessionproxy, 'client', client)
+ self.sp = deluge.ui.sessionproxy.SessionProxy()
+ client.core.reset()
+ d = self.sp.start()
+
+ def do_get_torrents_status(torrent_ids):
+ inital_keys = ['key1']
+ # Advance clock to expire the cache times
+ self.clock.advance(2)
+ return self.sp.get_torrents_status({'id': torrent_ids}, inital_keys)
+
+ d.addCallback(do_get_torrents_status)
+ return d
+
+ def tear_down(self):
+ return component.deregister(self.sp)
+
+ def test_startup(self):
+ self.assertEqual(client.core.torrents['a'], self.sp.torrents['a'][1])
+
+ def test_get_torrent_status_no_change(self):
+ d = self.sp.get_torrent_status('a', [])
+ d.addCallback(self.assertEqual, client.core.torrents['a'])
+ return d
+
+ def test_get_torrent_status_change_with_cache(self):
+ client.core.torrents['a']['key1'] = 2
+ d = self.sp.get_torrent_status('a', ['key1'])
+ d.addCallback(self.assertEqual, {'key1': 1})
+ return d
+
+ def test_get_torrent_status_change_without_cache(self):
+ client.core.torrents['a']['key1'] = 2
+ self.clock.advance(self.sp.cache_time + 0.1)
+ d = self.sp.get_torrent_status('a', [])
+ d.addCallback(self.assertEqual, client.core.torrents['a'])
+ return d
+
+ def test_get_torrent_status_key_not_updated(self):
+ self.clock.advance(self.sp.cache_time + 0.1)
+ self.sp.get_torrent_status('a', ['key1'])
+ client.core.torrents['a']['key2'] = 99
+ d = self.sp.get_torrent_status('a', ['key2'])
+ d.addCallback(self.assertEqual, {'key2': 99})
+ return d
+
+ def test_get_torrents_status_key_not_updated(self):
+ self.clock.advance(self.sp.cache_time + 0.1)
+ self.sp.get_torrents_status({'id': ['a']}, ['key1'])
+ client.core.torrents['a']['key2'] = 99
+ d = self.sp.get_torrents_status({'id': ['a']}, ['key2'])
+ d.addCallback(self.assertEqual, {'a': {'key2': 99}})
+ return d
diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py
new file mode 100644
index 0000000..70fec47
--- /dev/null
+++ b/deluge/tests/test_torrent.py
@@ -0,0 +1,347 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import os
+import time
+from base64 import b64encode
+
+import mock
+from twisted.internet import reactor
+from twisted.internet.task import defer, deferLater
+from twisted.trial import unittest
+
+import deluge.component as component
+import deluge.core.torrent
+import deluge.tests.common as common
+from deluge._libtorrent import lt
+from deluge.common import utf8_encode_structure, windows_check
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.core.torrent import Torrent
+from deluge.core.torrentmanager import TorrentManager, TorrentState
+
+from .basetest import BaseTestCase
+
+
+class TorrentTestCase(BaseTestCase):
+ def setup_config(self):
+ config_dir = common.set_tmp_config_dir()
+ core_config = deluge.config.Config(
+ 'core.conf',
+ defaults=deluge.core.preferencesmanager.DEFAULT_PREFS,
+ config_dir=config_dir,
+ )
+ core_config.save()
+
+ def set_up(self):
+ self.setup_config()
+ self.rpcserver = RPCServer(listen=False)
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.core.config.config['new_release_check'] = False
+ self.session = self.core.session
+ self.torrent = None
+ return component.start()
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def print_priority_list(self, priorities):
+ tmp = ''
+ for i, p in enumerate(priorities):
+ if i % 100 == 0:
+ print(tmp)
+ tmp = ''
+ tmp += '%s' % p
+ print(tmp)
+
+ def assert_state(self, torrent, state):
+ torrent.update_state()
+ self.assertEqual(torrent.state, state)
+
+ def get_torrent_atp(self, filename):
+ filename = common.get_test_data_file(filename)
+ with open(filename, 'rb') as _file:
+ info = lt.torrent_info(lt.bdecode(_file.read()))
+ atp = {
+ 'ti': info,
+ 'save_path': os.getcwd(),
+ 'storage_mode': lt.storage_mode_t.storage_mode_sparse,
+ 'flags': (
+ lt.add_torrent_params_flags_t.flag_auto_managed
+ | lt.add_torrent_params_flags_t.flag_duplicate_is_error
+ & ~lt.add_torrent_params_flags_t.flag_paused
+ ),
+ }
+ return atp
+
+ def test_set_file_priorities(self):
+ atp = self.get_torrent_atp('dir_with_6_files.torrent')
+ handle = self.session.add_torrent(atp)
+ torrent = Torrent(handle, {})
+
+ result = torrent.get_file_priorities()
+ self.assertTrue(all(x == 4 for x in result))
+
+ new_priorities = [3, 1, 2, 0, 5, 6, 7]
+ torrent.set_file_priorities(new_priorities)
+ self.assertEqual(torrent.get_file_priorities(), new_priorities)
+
+ # Test with handle.piece_priorities as handle.file_priorities async
+ # updates and will return old value. Also need to remove a priority
+ # value as one file is much smaller than piece size so doesn't show.
+ piece_prio = handle.piece_priorities()
+ result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7])
+ self.assertTrue(result)
+
+ def test_set_prioritize_first_last_pieces(self):
+ piece_indexes = [
+ 0,
+ 1,
+ 50,
+ 51,
+ 52,
+ 110,
+ 111,
+ 112,
+ 113,
+ 200,
+ 201,
+ 202,
+ 212,
+ 213,
+ 214,
+ 215,
+ 216,
+ 217,
+ 457,
+ 458,
+ 459,
+ 460,
+ 461,
+ 462,
+ ]
+ self.run_test_set_prioritize_first_last_pieces(
+ 'dir_with_6_files.torrent', piece_indexes
+ )
+
+ def run_test_set_prioritize_first_last_pieces(
+ self, torrent_file, prioritized_piece_indexes
+ ):
+ atp = self.get_torrent_atp(torrent_file)
+ handle = self.session.add_torrent(atp)
+
+ self.torrent = Torrent(handle, {})
+ priorities_original = handle.piece_priorities()
+ self.torrent.set_prioritize_first_last_pieces(True)
+ priorities = handle.piece_priorities()
+
+ # The length of the list of new priorites is the same as the original
+ self.assertEqual(len(priorities_original), len(priorities))
+
+ # Test the priority of all the pieces against the calculated indexes.
+ for idx, priority in enumerate(priorities):
+ if idx in prioritized_piece_indexes:
+ self.assertEqual(priorities[idx], 7)
+ else:
+ self.assertEqual(priorities[idx], 4)
+
+ # self.print_priority_list(priorities)
+
+ def test_set_prioritize_first_last_pieces_false(self):
+ atp = self.get_torrent_atp('dir_with_6_files.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ # First set some pieces prioritized
+ self.torrent.set_prioritize_first_last_pieces(True)
+ # Reset pirorities
+ self.torrent.set_prioritize_first_last_pieces(False)
+ priorities = handle.piece_priorities()
+
+ # Test the priority of the prioritized pieces
+ for i in priorities:
+ self.assertEqual(priorities[i], 4)
+
+ # self.print_priority_list(priorities)
+
+ def test_torrent_error_data_missing(self):
+ if windows_check():
+ raise unittest.SkipTest('unexpected end of file in bencoded string')
+ options = {'seed_mode': True}
+ filename = common.get_test_data_file('test_torrent.file.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ torrent = self.core.torrentmanager.torrents[torrent_id]
+
+ time.sleep(0.5) # Delay to wait for lt to finish check on Travis.
+ self.assert_state(torrent, 'Seeding')
+
+ # Force an error by reading (non-existant) piece from disk
+ torrent.handle.read_piece(0)
+ time.sleep(0.2) # Delay to wait for alert from lt
+ self.assert_state(torrent, 'Error')
+
+ def test_torrent_error_resume_original_state(self):
+ if windows_check():
+ raise unittest.SkipTest('unexpected end of file in bencoded string')
+ options = {'seed_mode': True, 'add_paused': True}
+ filename = common.get_test_data_file('test_torrent.file.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ torrent = self.core.torrentmanager.torrents[torrent_id]
+
+ orig_state = 'Paused'
+ self.assert_state(torrent, orig_state)
+
+ # Force an error by reading (non-existant) piece from disk
+ torrent.handle.read_piece(0)
+ time.sleep(0.2) # Delay to wait for alert from lt
+ self.assert_state(torrent, 'Error')
+
+ # Clear error and verify returned to original state
+ torrent.force_recheck()
+
+ def test_torrent_error_resume_data_unaltered(self):
+ if windows_check():
+ raise unittest.SkipTest('unexpected end of file in bencoded string')
+ if lt.__version__.split('.')[1] == '2':
+ raise unittest.SkipTest('Test not working as expected on lt 1.2')
+
+ resume_data = {
+ 'active_time': 13399,
+ 'num_incomplete': 16777215,
+ 'announce_to_lsd': 1,
+ 'seed_mode': 0,
+ 'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01',
+ 'paused': 0,
+ 'seeding_time': 13399,
+ 'last_scrape': 13399,
+ 'info-hash': '-\xc5\xd0\xe7\x1af\xfeid\x9ad\r9\xcb\x00\xa2YpIs',
+ 'max_uploads': 16777215,
+ 'max_connections': 16777215,
+ 'num_downloaders': 16777215,
+ 'total_downloaded': 0,
+ 'file-format': 'libtorrent resume file',
+ 'peers6': '',
+ 'added_time': 1411826665,
+ 'banned_peers6': '',
+ 'file_priority': [1],
+ 'last_seen_complete': 0,
+ 'total_uploaded': 0,
+ 'piece_priority': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01',
+ 'file-version': 1,
+ 'announce_to_dht': 1,
+ 'auto_managed': 1,
+ 'upload_rate_limit': 0,
+ 'completed_time': 1411826665,
+ 'allocation': 'sparse',
+ 'blocks per piece': 2,
+ 'download_rate_limit': 0,
+ 'libtorrent-version': '0.16.17.0',
+ 'banned_peers': '',
+ 'num_seeds': 16777215,
+ 'sequential_download': 0,
+ 'announce_to_trackers': 1,
+ 'peers': '\n\x00\x02\x0f=\xc6SC\x17]\xd8}\x7f\x00\x00\x01=\xc6',
+ 'finished_time': 13399,
+ 'last_upload': 13399,
+ 'trackers': [[]],
+ 'super_seeding': 0,
+ 'file sizes': [[512000, 1411826586]],
+ 'last_download': 13399,
+ }
+ torrent_state = TorrentState(
+ torrent_id='2dc5d0e71a66fe69649a640d39cb00a259704973',
+ filename='test_torrent.file.torrent',
+ name='',
+ save_path='/home/ubuntu/Downloads',
+ file_priorities=[1],
+ is_finished=True,
+ )
+
+ filename = common.get_test_data_file('test_torrent.file.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = _file.read()
+ resume_data = utf8_encode_structure(resume_data)
+ torrent_id = self.core.torrentmanager.add(
+ state=torrent_state, filedump=filedump, resume_data=lt.bencode(resume_data)
+ )
+ torrent = self.core.torrentmanager.torrents[torrent_id]
+
+ def assert_resume_data():
+ self.assert_state(torrent, 'Error')
+ tm_resume_data = lt.bdecode(
+ self.core.torrentmanager.resume_data[torrent.torrent_id]
+ )
+ self.assertEqual(tm_resume_data, resume_data)
+
+ return deferLater(reactor, 0.5, assert_resume_data)
+
+ def test_get_eta_seeding(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ self.assertEqual(self.torrent.get_eta(), 0)
+ self.torrent.status = mock.MagicMock()
+
+ self.torrent.status.upload_payload_rate = 5000
+ self.torrent.status.download_payload_rate = 0
+ self.torrent.status.all_time_download = 10000
+ self.torrent.status.all_time_upload = 500
+ self.torrent.is_finished = True
+ self.torrent.options = {'stop_at_ratio': False}
+ # Test finished and uploading but no stop_at_ratio set.
+ self.assertEqual(self.torrent.get_eta(), 0)
+
+ self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5}
+ result = self.torrent.get_eta()
+ self.assertEqual(result, 2)
+ self.assertIsInstance(result, int)
+
+ def test_get_eta_downloading(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ self.assertEqual(self.torrent.get_eta(), 0)
+
+ self.torrent.status = mock.MagicMock()
+ self.torrent.status.download_payload_rate = 50
+ self.torrent.status.total_wanted = 10000
+ self.torrent.status.total_wanted_done = 5000
+
+ result = self.torrent.get_eta()
+ self.assertEqual(result, 100)
+ self.assertIsInstance(result, int)
+
+ def test_get_name_unicode(self):
+ """Test retrieving a unicode torrent name from libtorrent."""
+ atp = self.get_torrent_atp('unicode_file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ self.assertEqual(self.torrent.get_name(), 'সুকুমার রায়.mkv')
+
+ def test_rename_unicode(self):
+ """Test renaming file/folders with unicode filenames."""
+ atp = self.get_torrent_atp('unicode_filenames.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ # Ignore TorrentManager method call
+ TorrentManager.save_resume_data = mock.MagicMock
+
+ result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв')
+ self.assertIsInstance(result, defer.DeferredList)
+
+ result = self.torrent.rename_files([[0, 'new_рбачёв']])
+ self.assertIsNone(result)
diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py
new file mode 100644
index 0000000..bf84f45
--- /dev/null
+++ b/deluge/tests/test_torrentmanager.py
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import warnings
+from base64 import b64encode
+
+import mock
+import pytest
+from twisted.internet import defer, task
+
+from deluge import component
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.error import InvalidTorrentError
+
+from . import common
+from .basetest import BaseTestCase
+
+warnings.filterwarnings('ignore', category=RuntimeWarning)
+warnings.resetwarnings()
+
+
+class TorrentmanagerTestCase(BaseTestCase):
+ def set_up(self):
+ common.set_tmp_config_dir()
+ self.rpcserver = RPCServer(listen=False)
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.clock = task.Clock()
+ self.tm = self.core.torrentmanager
+ self.tm.callLater = self.clock.callLater
+ return component.start()
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ @defer.inlineCallbacks
+ def test_remove_torrent(self):
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = _file.read()
+ torrent_id = yield self.core.add_torrent_file_async(
+ filename, b64encode(filedump), {}
+ )
+ self.assertTrue(self.tm.remove(torrent_id, False))
+
+ def test_prefetch_metadata(self):
+ from deluge._libtorrent import lt
+
+ with open(common.get_test_data_file('test.torrent'), 'rb') as _file:
+ t_info = lt.torrent_info(lt.bdecode(_file.read()))
+ mock_alert = mock.MagicMock()
+ mock_alert.handle.info_hash = mock.MagicMock(
+ return_value='ab570cdd5a17ea1b61e970bb72047de141bce173'
+ )
+ mock_alert.handle.get_torrent_info = mock.MagicMock(return_value=t_info)
+
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ d = self.tm.prefetch_metadata(magnet, 30)
+ self.tm.on_alert_metadata_received(mock_alert)
+
+ expected = (
+ 'ab570cdd5a17ea1b61e970bb72047de141bce173',
+ {
+ b'piece length': 32768,
+ b'sha1': (
+ b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh'
+ b'\x9d\xc5\xb7\xac\xdd'
+ ),
+ b'name': b'azcvsupdater_2.6.2.jar',
+ b'private': 0,
+ b'pieces': (
+ b'\xdb\x04B\x05\xc3\'\xdab\xb8su97\xa9u'
+ b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd'
+ b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7'
+ b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3'
+ b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0'
+ b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8'
+ b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed'
+ b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA'
+ b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f'
+ b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee'
+ b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc'
+ b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu'
+ b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?'
+ b'\xa1\xd6\x8c\x83\x9e&'
+ ),
+ b'length': 307949,
+ b'name.utf-8': b'azcvsupdater_2.6.2.jar',
+ b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7',
+ },
+ )
+ self.assertEqual(expected, self.successResultOf(d))
+
+ def test_prefetch_metadata_timeout(self):
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ d = self.tm.prefetch_metadata(magnet, 30)
+ self.clock.advance(30)
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None)
+ return d.addCallback(self.assertEqual, expected)
+
+ @pytest.mark.todo
+ def test_remove_torrent_false(self):
+ """Test when remove_torrent returns False"""
+ common.todo_test(self)
+
+ def test_remove_invalid_torrent(self):
+ self.assertRaises(
+ InvalidTorrentError, self.tm.remove, 'torrentidthatdoesntexist'
+ )
diff --git a/deluge/tests/test_torrentview.py b/deluge/tests/test_torrentview.py
new file mode 100644
index 0000000..590760d
--- /dev/null
+++ b/deluge/tests/test_torrentview.py
@@ -0,0 +1,285 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2014 Bro <bro.development@gmail.com>
+# Copyright (C) 2014 Calum Lind <calumlind@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import pytest
+from twisted.trial import unittest
+
+import deluge.component as component
+from deluge.configmanager import ConfigManager
+from deluge.i18n import setup_translation
+
+from . import common
+from .basetest import BaseTestCase
+
+# Allow running other tests without GTKUI dependencies available
+try:
+ # pylint: disable=ungrouped-imports
+ from gi.repository.GObject import TYPE_UINT64
+
+ from deluge.ui.gtk3.gtkui import DEFAULT_PREFS
+ from deluge.ui.gtk3.mainwindow import MainWindow
+ from deluge.ui.gtk3.menubar import MenuBar
+ from deluge.ui.gtk3.torrentdetails import TorrentDetails
+ from deluge.ui.gtk3.torrentview import TorrentView
+except (ImportError, ValueError):
+ libs_available = False
+ TYPE_UINT64 = 'Whatever'
+else:
+ libs_available = True
+
+setup_translation()
+
+
+@pytest.mark.gtkui
+class TorrentviewTestCase(BaseTestCase):
+
+ default_column_index = [
+ 'filter',
+ 'torrent_id',
+ 'dirty',
+ '#',
+ 'Name',
+ 'Size',
+ 'Downloaded',
+ 'Uploaded',
+ 'Remaining',
+ 'Progress',
+ 'Seeds',
+ 'Peers',
+ 'Seeds:Peers',
+ 'Down Speed',
+ 'Up Speed',
+ 'Down Limit',
+ 'Up Limit',
+ 'ETA',
+ 'Ratio',
+ 'Avail',
+ 'Added',
+ 'Completed',
+ 'Complete Seen',
+ 'Tracker',
+ 'Download Folder',
+ 'Owner',
+ 'Shared',
+ ]
+ default_liststore_columns = [
+ bool,
+ str,
+ bool,
+ int,
+ str,
+ str, # Name
+ TYPE_UINT64,
+ TYPE_UINT64,
+ TYPE_UINT64,
+ TYPE_UINT64,
+ float,
+ str, # Progress
+ int,
+ int,
+ int,
+ int,
+ float, # Seeds, Peers
+ int,
+ int,
+ float,
+ float,
+ int,
+ float,
+ float, # ETA, Ratio, Avail
+ int,
+ int,
+ int,
+ str,
+ str, # Tracker
+ str,
+ str,
+ bool,
+ ] # shared
+
+ def set_up(self):
+ if libs_available is False:
+ raise unittest.SkipTest('GTKUI dependencies not available')
+
+ common.set_tmp_config_dir()
+ # MainWindow loads this config file, so lets make sure it contains the defaults
+ ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS)
+ self.mainwindow = MainWindow()
+ self.torrentview = TorrentView()
+ self.torrentdetails = TorrentDetails()
+ self.menubar = MenuBar()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_torrentview_columns(self):
+
+ self.assertEqual(
+ self.torrentview.column_index, TorrentviewTestCase.default_column_index
+ )
+ self.assertEqual(
+ self.torrentview.liststore_columns,
+ TorrentviewTestCase.default_liststore_columns,
+ )
+ self.assertEqual(
+ self.torrentview.columns['Download Folder'].column_indices, [29]
+ )
+
+ def test_add_column(self):
+
+ # Add a text column
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns) + 1,
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index) + 1,
+ )
+ self.assertEqual(self.torrentview.column_index[-1], test_col)
+ self.assertEqual(self.torrentview.columns[test_col].column_indices, [32])
+
+ def test_add_columns(self):
+
+ # Add a text column
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+
+ # Add a second text column
+ test_col2 = 'Test column2'
+ self.torrentview.add_text_column(test_col2, status_field=['label2'])
+
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns) + 2,
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index) + 2,
+ )
+ # test_col
+ self.assertEqual(self.torrentview.column_index[-2], test_col)
+ self.assertEqual(self.torrentview.columns[test_col].column_indices, [32])
+
+ # test_col2
+ self.assertEqual(self.torrentview.column_index[-1], test_col2)
+ self.assertEqual(self.torrentview.columns[test_col2].column_indices, [33])
+
+ def test_remove_column(self):
+
+ # Add and remove text column
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+ self.torrentview.remove_column(test_col)
+
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns),
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index),
+ )
+ self.assertEqual(
+ self.torrentview.column_index[-1],
+ TorrentviewTestCase.default_column_index[-1],
+ )
+ self.assertEqual(
+ self.torrentview.columns[
+ TorrentviewTestCase.default_column_index[-1]
+ ].column_indices,
+ [31],
+ )
+
+ def test_remove_columns(self):
+
+ # Add two columns
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+ test_col2 = 'Test column2'
+ self.torrentview.add_text_column(test_col2, status_field=['label2'])
+
+ # Remove test_col
+ self.torrentview.remove_column(test_col)
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns) + 1,
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index) + 1,
+ )
+ self.assertEqual(self.torrentview.column_index[-1], test_col2)
+ self.assertEqual(self.torrentview.columns[test_col2].column_indices, [32])
+
+ # Remove test_col2
+ self.torrentview.remove_column(test_col2)
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns),
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index),
+ )
+ self.assertEqual(
+ self.torrentview.column_index[-1],
+ TorrentviewTestCase.default_column_index[-1],
+ )
+ self.assertEqual(
+ self.torrentview.columns[
+ TorrentviewTestCase.default_column_index[-1]
+ ].column_indices,
+ [31],
+ )
+
+ def test_add_remove_column_multiple_types(self):
+
+ # Add a column with multiple column types
+ test_col3 = 'Test column3'
+ self.torrentview.add_progress_column(
+ test_col3, status_field=['progress', 'label3'], col_types=[float, str]
+ )
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns) + 2,
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index) + 1,
+ )
+ self.assertEqual(self.torrentview.column_index[-1], test_col3)
+ self.assertEqual(self.torrentview.columns[test_col3].column_indices, [32, 33])
+
+ # Remove multiple column-types column
+ self.torrentview.remove_column(test_col3)
+
+ self.assertEqual(
+ len(self.torrentview.liststore_columns),
+ len(TorrentviewTestCase.default_liststore_columns),
+ )
+ self.assertEqual(
+ len(self.torrentview.column_index),
+ len(TorrentviewTestCase.default_column_index),
+ )
+ self.assertEqual(
+ self.torrentview.column_index[-1],
+ TorrentviewTestCase.default_column_index[-1],
+ )
+ self.assertEqual(
+ self.torrentview.columns[
+ TorrentviewTestCase.default_column_index[-1]
+ ].column_indices,
+ [31],
+ )
diff --git a/deluge/tests/test_tracker_icons.py b/deluge/tests/test_tracker_icons.py
new file mode 100644
index 0000000..e18d339
--- /dev/null
+++ b/deluge/tests/test_tracker_icons.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import pytest
+from twisted.trial.unittest import SkipTest
+
+import deluge.component as component
+import deluge.ui.tracker_icons
+from deluge.common import windows_check
+from deluge.ui.tracker_icons import TrackerIcon, TrackerIcons
+
+from . import common
+from .basetest import BaseTestCase
+
+common.set_tmp_config_dir()
+deluge.ui.tracker_icons.PIL_INSTALLED = False
+common.disable_new_release_check()
+
+
+@pytest.mark.internet
+class TrackerIconsTestCase(BaseTestCase):
+
+ if windows_check():
+ skip = 'cannot use os.path.samefile to compair on windows(unix only)'
+
+ def set_up(self):
+ # Disable resizing with Pillow for consistency.
+ self.patch(deluge.ui.tracker_icons, 'Image', None)
+ self.icons = TrackerIcons()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_get_deluge_png(self):
+ # Deluge has a png favicon link
+ icon = TrackerIcon(common.get_test_data_file('deluge.png'))
+ d = self.icons.fetch('deluge-torrent.org')
+ d.addCallback(self.assertNotIdentical, None)
+ d.addCallback(self.assertEqual, icon)
+ return d
+
+ def test_get_google_ico(self):
+ # Google doesn't have any icon links
+ # So instead we'll grab its favicon.ico
+ icon = TrackerIcon(common.get_test_data_file('google.ico'))
+ d = self.icons.fetch('www.google.com')
+ d.addCallback(self.assertNotIdentical, None)
+ d.addCallback(self.assertEqual, icon)
+ return d
+
+ def test_get_google_ico_with_redirect(self):
+ # google.com redirects to www.google.com
+ icon = TrackerIcon(common.get_test_data_file('google.ico'))
+ d = self.icons.fetch('google.com')
+ d.addCallback(self.assertNotIdentical, None)
+ d.addCallback(self.assertEqual, icon)
+ return d
+
+ def test_get_seo_ico_with_sni(self):
+ # seo using certificates with SNI support only
+ raise SkipTest('Site certificate expired')
+ icon = TrackerIcon(common.get_test_data_file('seo.ico'))
+ d = self.icons.fetch('www.seo.com')
+ d.addCallback(self.assertNotIdentical, None)
+ d.addCallback(self.assertEqual, icon)
+ return d
+
+ def test_get_empty_string_tracker(self):
+ d = self.icons.fetch('')
+ d.addCallback(self.assertIdentical, None)
+ return d
diff --git a/deluge/tests/test_transfer.py b/deluge/tests/test_transfer.py
new file mode 100644
index 0000000..a048303
--- /dev/null
+++ b/deluge/tests/test_transfer.py
@@ -0,0 +1,403 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2012 Bro <bro.development@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import base64
+
+import rencode
+from twisted.trial import unittest
+
+import deluge.log
+from deluge.transfer import DelugeTransferProtocol
+
+deluge.log.setup_logger('none')
+
+
+class TransferTestClass(DelugeTransferProtocol):
+ def __init__(self):
+ DelugeTransferProtocol.__init__(self)
+ self.transport = self
+ self.messages_out = []
+ self.messages_in = []
+ self.packet_count = 0
+
+ def write(self, message):
+ """
+ Called by DelugeTransferProtocol class
+ This simulates the write method of the self.transport in DelugeTransferProtocol.
+ """
+ self.messages_out.append(message)
+
+ def message_received(self, message):
+ """
+ This method overrides message_received is DelugeTransferProtocol and is
+ called with the complete message as it was sent by DelugeRPCProtocol
+ """
+ self.messages_in.append(message)
+
+ def get_messages_out_joined(self):
+ return b''.join(self.messages_out)
+
+ def get_messages_in(self):
+ return self.messages_in
+
+ def data_received_old_protocol(self, data):
+ """
+ This is the original method logic (as close as possible) for handling data receival on the client
+
+ :param data: a zlib compressed string encoded with rencode.
+
+ """
+ import zlib
+
+ print('\n=== New Data Received ===\nBytes received:', len(data))
+
+ if self._buffer:
+ # We have some data from the last dataReceived() so lets prepend it
+ print('Current buffer:', len(self._buffer) if self._buffer else '0')
+ data = self._buffer + data
+ self._buffer = None
+
+ self.packet_count += 1
+ self._bytes_received += len(data)
+
+ while data:
+ print('\n-- Handle packet data --')
+
+ print('Bytes received:', self._bytes_received)
+ print('Current data:', len(data))
+
+ if self._message_length == 0:
+ # handle_new_message uses _buffer so set data to _buffer.
+ self._buffer = data
+ self._handle_new_message()
+ data = self._buffer
+ self._buffer = None
+ self.packet_count = 1
+ print('New message of length:', self._message_length)
+
+ dobj = zlib.decompressobj()
+ try:
+ request = rencode.loads(dobj.decompress(data))
+ print('Successfully loaded message', end=' ')
+ print(
+ ' - Buffer length: %d, data length: %d, unused length: %d'
+ % (
+ len(data),
+ len(data) - len(dobj.unused_data),
+ len(dobj.unused_data),
+ )
+ )
+ print('Packet count:', self.packet_count)
+ except Exception as ex:
+ # log.debug('Received possible invalid message (%r): %s', data, e)
+ # This could be cut-off data, so we'll save this in the buffer
+ # and try to prepend it on the next dataReceived()
+ self._buffer = data
+ print(
+ 'Failed to load buffer (size %d): %s' % (len(self._buffer), str(ex))
+ )
+ return
+ else:
+ data = dobj.unused_data
+ self._message_length = 0
+
+ self.message_received(request)
+
+
+class DelugeTransferProtocolTestCase(unittest.TestCase):
+ def setUp(self): # NOQA: N803
+ """
+ The expected messages corresponds to the test messages (msg1, msg2) after they've been processed
+ by DelugeTransferProtocol.send, which means that they've first been encoded with rencode,
+ and then compressed with zlib.
+ The expected messages are encoded in base64 to easily including it here in the source.
+ So before comparing the results with the expected messages, the expected messages must be decoded,
+ or the result message be encoded in base64.
+
+ """
+ self.transfer = TransferTestClass()
+ self.msg1 = (
+ 0,
+ 1,
+ {'key_int': 1242429423},
+ {'key_str': b'some string'},
+ {'key_bool': True},
+ )
+ self.msg2 = (
+ 2,
+ 3,
+ {'key_float': 12424.29423},
+ {'key_unicode': 'some string'},
+ {'key_dict_with_tuple': {'key_tuple': (1, 2, 3)}},
+ {'keylist': [4, '5', 6.7]},
+ )
+
+ self.msg1_expected_compressed_base64 = (
+ b'AQAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU'
+ b'XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ'
+ )
+ self.msg2_expected_compressed_base64 = (
+ b'AQAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3il+ZlJuenpH'
+ b'YX5+emKhSXFGXmpadPBkmkZCaXxJdnlmTEl5QW5KRCdIOZhxmB'
+ b'hrUDuTmZxSWHWRpNnRyupaUBAHYlJxI='
+ )
+
+ def test_send_one_message(self):
+ """
+ Send one message and test that it has been sent correctoly to the
+ method 'write' in self.transport.
+
+ """
+ self.transfer.transfer_message(self.msg1)
+ # Get the data as sent by DelugeTransferProtocol
+ messages = self.transfer.get_messages_out_joined()
+ base64_encoded = base64.b64encode(messages)
+ self.assertEqual(base64_encoded, self.msg1_expected_compressed_base64)
+
+ def test_receive_one_message(self):
+ """
+ Receive one message and test that it has been sent to the
+ method 'message_received'.
+
+ """
+ self.transfer.dataReceived(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ # Get the data as sent by DelugeTransferProtocol
+ messages = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(messages))
+
+ def test_receive_old_message(self):
+ """
+ Receive an old message (with no header) and verify that the data is discarded.
+
+ """
+ self.transfer.dataReceived(rencode.dumps(self.msg1))
+ self.assertEqual(len(self.transfer.get_messages_in()), 0)
+ self.assertEqual(self.transfer._message_length, 0)
+ self.assertEqual(len(self.transfer._buffer), 0)
+
+ def test_receive_two_concatenated_messages(self):
+ """
+ This test simply concatenates two messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data as one string.
+
+ """
+ two_concatenated = base64.b64decode(
+ self.msg1_expected_compressed_base64
+ ) + base64.b64decode(self.msg2_expected_compressed_base64)
+ self.transfer.dataReceived(two_concatenated)
+
+ # Get the data as sent by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+
+ def test_receive_three_messages_in_parts(self):
+ """
+ This test concatenates three messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data in multiple parts.
+
+ """
+ msg_bytes = (
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ + base64.b64decode(self.msg2_expected_compressed_base64)
+ + base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ packet_size = 40
+
+ one_message_byte_count = len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ two_messages_byte_count = one_message_byte_count + len(
+ base64.b64decode(self.msg2_expected_compressed_base64)
+ )
+ three_messages_byte_count = two_messages_byte_count + len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+
+ for d in self.receive_parts_helper(msg_bytes, packet_size):
+ bytes_received = self.transfer.get_bytes_recv()
+
+ if bytes_received >= three_messages_byte_count:
+ expected_msgs_received_count = 3
+ elif bytes_received >= two_messages_byte_count:
+ expected_msgs_received_count = 2
+ elif bytes_received >= one_message_byte_count:
+ expected_msgs_received_count = 1
+ else:
+ expected_msgs_received_count = 0
+ # Verify that the expected number of complete messages has arrived
+ self.assertEqual(
+ expected_msgs_received_count, len(self.transfer.get_messages_in())
+ )
+
+ # Get the data as received by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+ message3 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3))
+
+ # Remove underscore to enable test, or run the test directly:
+ # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol
+ def _test_rencode_fail_protocol(self):
+ """
+ This test tries to test the protocol that relies on errors from rencode.
+
+ """
+ msg_bytes = (
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ + base64.b64decode(self.msg2_expected_compressed_base64)
+ + base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ packet_size = 149
+
+ one_message_byte_count = len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ two_messages_byte_count = one_message_byte_count + len(
+ base64.b64decode(self.msg2_expected_compressed_base64)
+ )
+ three_messages_byte_count = two_messages_byte_count + len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+
+ print()
+
+ print(
+ 'Msg1 size:',
+ len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4,
+ )
+ print(
+ 'Msg2 size:',
+ len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4,
+ )
+ print(
+ 'Msg3 size:',
+ len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4,
+ )
+
+ print('one_message_byte_count:', one_message_byte_count)
+ print('two_messages_byte_count:', two_messages_byte_count)
+ print('three_messages_byte_count:', three_messages_byte_count)
+
+ for d in self.receive_parts_helper(
+ msg_bytes, packet_size, self.transfer.data_received_old_protocol
+ ):
+ bytes_received = self.transfer.get_bytes_recv()
+
+ if bytes_received >= three_messages_byte_count:
+ expected_msgs_received_count = 3
+ elif bytes_received >= two_messages_byte_count:
+ expected_msgs_received_count = 2
+ elif bytes_received >= one_message_byte_count:
+ expected_msgs_received_count = 1
+ else:
+ expected_msgs_received_count = 0
+ # Verify that the expected number of complete messages has arrived
+ if expected_msgs_received_count != len(self.transfer.get_messages_in()):
+ print(
+ 'Expected number of messages received is %d, but %d have been received.'
+ % (
+ expected_msgs_received_count,
+ len(self.transfer.get_messages_in()),
+ )
+ )
+
+ # Get the data as received by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+ message3 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3))
+
+ def test_receive_middle_of_header(self):
+ """
+ This test concatenates two messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data in two parts.
+ The first part contains the first message, plus two bytes of the next message.
+ The next part contains the rest of the message.
+
+ This is a special case, as DelugeTransferProtocol can't start parsing
+ a message until it has at least 5 bytes (the size of the header) to be able
+ to read and parse the size of the payload.
+
+ """
+ two_concatenated = base64.b64decode(
+ self.msg1_expected_compressed_base64
+ ) + base64.b64decode(self.msg2_expected_compressed_base64)
+ first_len = len(base64.b64decode(self.msg1_expected_compressed_base64))
+
+ # Now found the entire first message, and half the header of the next message (2 bytes into the header)
+ self.transfer.dataReceived(two_concatenated[: first_len + 2])
+
+ # Should be 1 message in the list
+ self.assertEqual(1, len(self.transfer.get_messages_in()))
+
+ # Send the rest
+ self.transfer.dataReceived(two_concatenated[first_len + 2 :])
+
+ # Should be 2 messages in the list
+ self.assertEqual(2, len(self.transfer.get_messages_in()))
+
+ # Get the data as sent by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+
+ # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon
+ # def test_simulate_big_transfer(self):
+ # filename = '../deluge.torrentlist'
+ #
+ # f = open(filename, 'r')
+ # data = f.read()
+ # message_to_send = eval(data)
+ # self.transfer.transfer_message(message_to_send)
+ #
+ # Get the data as sent to the network by DelugeTransferProtocol
+ # compressed_data = self.transfer.get_messages_out_joined()
+ # packet_size = 16000 # Or something smaller...
+ #
+ # for d in self.receive_parts_helper(compressed_data, packet_size):
+ # bytes_recv = self.transfer.get_bytes_recv()
+ # if bytes_recv < len(compressed_data):
+ # self.assertEqual(len(self.transfer.get_messages_in()), 0)
+ # else:
+ # self.assertEqual(len(self.transfer.get_messages_in()), 1)
+ # Get the data as received by DelugeTransferProtocol
+ # transfered_message = self.transfer.get_messages_in().pop(0)
+ # Test that the data structures are equal
+ # self.assertEqual(transfered_message, message_to_send)
+ # self.assertTrue(transfered_message == message_to_send)
+ #
+ # f.close()
+ # f = open('rencode.torrentlist', 'w')
+ # f.write(str(transfered_message))
+ # f.close()
+
+ def receive_parts_helper(self, data, packet_size, receive_func=None):
+ byte_count = len(data)
+ sent_bytes = 0
+ while byte_count > 0:
+ to_receive = packet_size if byte_count > packet_size else byte_count
+ sent_bytes += to_receive
+ byte_count -= to_receive
+ if receive_func:
+ receive_func(data[:to_receive])
+ else:
+ self.transfer.dataReceived(data[:to_receive])
+ data = data[to_receive:]
+ yield
diff --git a/deluge/tests/test_ui_common.py b/deluge/tests/test_ui_common.py
new file mode 100644
index 0000000..dffd884
--- /dev/null
+++ b/deluge/tests/test_ui_common.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+from __future__ import unicode_literals
+
+from six import assertCountEqual
+from twisted.trial import unittest
+
+from deluge.common import windows_check
+from deluge.ui.common import TorrentInfo
+
+from . import common
+
+
+class UICommonTestCase(unittest.TestCase):
+ def setUp(self): # NOQA: N803
+ pass
+
+ def tearDown(self): # NOQA: N803
+ pass
+
+ def test_utf8_encoded_paths(self):
+ filename = common.get_test_data_file('test.torrent')
+ ti = TorrentInfo(filename)
+ self.assertTrue('azcvsupdater_2.6.2.jar' in ti.files_tree)
+
+ def test_utf8_encoded_paths2(self):
+ if windows_check():
+ raise unittest.SkipTest('on windows KeyError: unicode_filenames')
+ filename = common.get_test_data_file('unicode_filenames.torrent')
+ filepath1 = '\u30c6\u30af\u30b9\u30fb\u30c6\u30af\u30b5\u30f3.mkv'
+ filepath2 = (
+ '\u041c\u0438\u0445\u0430\u0438\u043b \u0413\u043e'
+ '\u0440\u0431\u0430\u0447\u0451\u0432.mkv'
+ )
+ filepath3 = "Alisher ibn G'iyosiddin Navoiy.mkv"
+ filepath4 = 'Ascii title.mkv'
+ filepath5 = '\u09b8\u09c1\u0995\u09c1\u09ae\u09be\u09b0 \u09b0\u09be\u09df.mkv'
+
+ ti = TorrentInfo(filename)
+ files_tree = ti.files_tree['unicode_filenames']
+ self.assertIn(filepath1, files_tree)
+ self.assertIn(filepath2, files_tree)
+ self.assertIn(filepath3, files_tree)
+ self.assertIn(filepath4, files_tree)
+ self.assertIn(filepath5, files_tree)
+
+ result_files = [
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath3,
+ 'size': 126158658,
+ },
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath4,
+ 'size': 189321363,
+ },
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath2,
+ 'size': 106649699,
+ },
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath5,
+ 'size': 21590269,
+ },
+ {'download': True, 'path': 'unicode_filenames/' + filepath1, 'size': 1771},
+ ]
+
+ assertCountEqual(self, ti.files, result_files)
diff --git a/deluge/tests/test_ui_console.py b/deluge/tests/test_ui_console.py
new file mode 100644
index 0000000..8c67322
--- /dev/null
+++ b/deluge/tests/test_ui_console.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import argparse
+
+from deluge.common import windows_check
+from deluge.ui.console.cmdline.commands.add import Command
+from deluge.ui.console.widgets.fields import TextInput
+
+from .basetest import BaseTestCase
+
+
+class MockParent(object):
+ def __init__(self):
+ self.border_off_x = 1
+ self.pane_width = 20
+ self.encoding = 'utf8'
+
+
+class UIConsoleFieldTestCase(BaseTestCase):
+ def setUp(self): # NOQA: N803
+ self.parent = MockParent()
+
+ def tearDown(self): # NOQA: N803
+ pass
+
+ def test_text_input(self):
+ def move_func(self, r, c):
+ self._cursor_row = r
+ self._cursor_col = c
+
+ t = TextInput(
+ self.parent,
+ 'name',
+ 'message',
+ move_func,
+ 20,
+ '/text/field/file/path',
+ complete=False,
+ )
+ self.assertTrue(t)
+ if not windows_check():
+ self.assertTrue(t.handle_read(33))
+
+
+class UIConsoleCommandsTestCase(BaseTestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_add_move_completed(self):
+ completed_path = 'completed_path'
+ parser = argparse.ArgumentParser()
+ cmd = Command()
+ cmd.add_arguments(parser)
+ args = parser.parse_args(['torrent', '-m', completed_path])
+ self.assertEqual(args.move_completed_path, completed_path)
+ args = parser.parse_args(['torrent', '--move-path', completed_path])
+ self.assertEqual(args.move_completed_path, completed_path)
diff --git a/deluge/tests/test_ui_entry.py b/deluge/tests/test_ui_entry.py
new file mode 100644
index 0000000..1d405a1
--- /dev/null
+++ b/deluge/tests/test_ui_entry.py
@@ -0,0 +1,513 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import print_function, unicode_literals
+
+import argparse
+import sys
+from io import StringIO
+
+import mock
+import pytest
+from twisted.internet import defer
+
+import deluge
+import deluge.component as component
+import deluge.ui.web.server
+from deluge.common import PY2, get_localhost_auth, windows_check
+from deluge.ui import ui_entry
+from deluge.ui.web.server import DelugeWeb
+
+from . import common
+from .basetest import BaseTestCase
+from .daemon_base import DaemonBase
+
+if not windows_check():
+ import deluge.ui.console
+ import deluge.ui.console.cmdline.commands.quit
+ import deluge.ui.console.main
+
+DEBUG_COMMAND = False
+
+sys_stdout = sys.stdout
+# To catch output to stdout/stderr while running unit tests, we patch
+# the file descriptors in sys and argparse._sys with StringFileDescriptor.
+# Regular print statements from such tests will therefore write to the
+# StringFileDescriptor object instead of the terminal.
+# To print to terminal from the tests, use: print('Message...', file=sys_stdout)
+
+
+class StringFileDescriptor(object):
+ """File descriptor that writes to string buffer"""
+
+ def __init__(self, fd):
+ self.out = StringIO()
+ self.fd = fd
+ for a in ['encoding']:
+ setattr(self, a, getattr(sys_stdout, a))
+
+ def write(self, *data, **kwargs):
+ # io.StringIO requires unicode strings.
+ data_string = str(*data)
+ if PY2:
+ data_string = data_string.decode()
+ print(data_string, file=self.out, end='')
+
+ def flush(self):
+ self.out.flush()
+
+
+class UIBaseTestCase(object):
+ def __init__(self):
+ self.var = {}
+
+ def set_up(self):
+ common.set_tmp_config_dir()
+ common.setup_test_logger(level='info', prefix=self.id())
+ return component.start()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def exec_command(self):
+ if DEBUG_COMMAND:
+ print('Executing: %s\n' % sys.argv, file=sys_stdout)
+ return self.var['start_cmd']()
+
+
+class UIWithDaemonBaseTestCase(UIBaseTestCase, DaemonBase):
+ """Subclass for test that require a deluged daemon"""
+
+ def __init__(self):
+ UIBaseTestCase.__init__(self)
+
+ def set_up(self):
+ d = self.common_set_up()
+ common.setup_test_logger(level='info', prefix=self.id())
+ d.addCallback(self.start_core)
+ return d
+
+ def tear_down(self):
+ d = UIBaseTestCase.tear_down(self)
+ d.addCallback(self.terminate_core)
+ return d
+
+
+class DelugeEntryTestCase(BaseTestCase):
+
+ if windows_check():
+ skip = 'cannot test console ui on windows'
+
+ def set_up(self):
+ common.set_tmp_config_dir()
+ return component.start()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_deluge_help(self):
+ self.patch(sys, 'argv', ['./deluge', '-h'])
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.assertRaises(SystemExit, ui_entry.start_ui)
+ self.assertTrue('usage: deluge' in fd.out.getvalue())
+ self.assertTrue('UI Options:' in fd.out.getvalue())
+ self.assertTrue('* console' in fd.out.getvalue())
+
+ def test_start_default(self):
+ self.patch(sys, 'argv', ['./deluge'])
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ # Just test that no exception is raised
+ ui_entry.start_ui()
+
+ def test_start_with_log_level(self):
+ _level = []
+
+ def setup_logger(
+ level='error',
+ filename=None,
+ filemode='w',
+ logrotate=None,
+ output_stream=sys.stdout,
+ ):
+ _level.append(level)
+
+ self.patch(deluge.log, 'setup_logger', setup_logger)
+ self.patch(sys, 'argv', ['./deluge', '-L', 'info'])
+
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ # Just test that no exception is raised
+ ui_entry.start_ui()
+
+ self.assertEqual(_level[0], 'info')
+
+
+class GtkUIBaseTestCase(UIBaseTestCase):
+ """Implement all GtkUI tests here"""
+
+ def test_start_gtk3ui(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'])
+
+ from deluge.ui.gtk3 import gtkui
+
+ with mock.patch.object(gtkui.GtkUI, 'start', autospec=True):
+ self.exec_command()
+
+
+@pytest.mark.gtkui
+class GtkUIDelugeScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase):
+ def __init__(self, testname):
+ super(GtkUIDelugeScriptEntryTestCase, self).__init__(testname)
+ GtkUIBaseTestCase.__init__(self)
+
+ self.var['cmd_name'] = 'deluge gtk'
+ self.var['start_cmd'] = ui_entry.start_ui
+ self.var['sys_arg_cmd'] = ['./deluge', 'gtk']
+
+ def set_up(self):
+ return GtkUIBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return GtkUIBaseTestCase.tear_down(self)
+
+
+@pytest.mark.gtkui
+class GtkUIScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase):
+ def __init__(self, testname):
+ super(GtkUIScriptEntryTestCase, self).__init__(testname)
+ GtkUIBaseTestCase.__init__(self)
+ from deluge.ui import gtk3
+
+ self.var['cmd_name'] = 'deluge-gtk'
+ self.var['start_cmd'] = gtk3.start
+ self.var['sys_arg_cmd'] = ['./deluge-gtk']
+
+ def set_up(self):
+ return GtkUIBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return GtkUIBaseTestCase.tear_down(self)
+
+
+class DelugeWebMock(DelugeWeb):
+ def __init__(self, *args, **kwargs):
+ kwargs['daemon'] = False
+ DelugeWeb.__init__(self, *args, **kwargs)
+
+
+class WebUIBaseTestCase(UIBaseTestCase):
+ """Implement all WebUI tests here"""
+
+ def test_start_webserver(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'])
+ self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock)
+ self.exec_command()
+
+ def test_start_web_with_log_level(self):
+ _level = []
+
+ def setup_logger(
+ level='error',
+ filename=None,
+ filemode='w',
+ logrotate=None,
+ output_stream=sys.stdout,
+ ):
+ _level.append(level)
+
+ self.patch(deluge.log, 'setup_logger', setup_logger)
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info'])
+
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'web'
+ config.save()
+
+ self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock)
+ self.exec_command()
+ self.assertEqual(_level[0], 'info')
+
+
+class WebUIScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase):
+
+ if windows_check():
+ skip = 'cannot test console ui on windows'
+
+ def __init__(self, testname):
+ super(WebUIScriptEntryTestCase, self).__init__(testname)
+ WebUIBaseTestCase.__init__(self)
+ self.var['cmd_name'] = 'deluge-web'
+ self.var['start_cmd'] = deluge.ui.web.start
+ self.var['sys_arg_cmd'] = ['./deluge-web', '--do-not-daemonize']
+
+ def set_up(self):
+ return WebUIBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return WebUIBaseTestCase.tear_down(self)
+
+
+class WebUIDelugeScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase):
+
+ if windows_check():
+ skip = 'cannot test console ui on windows'
+
+ def __init__(self, testname):
+ super(WebUIDelugeScriptEntryTestCase, self).__init__(testname)
+ WebUIBaseTestCase.__init__(self)
+ self.var['cmd_name'] = 'deluge web'
+ self.var['start_cmd'] = ui_entry.start_ui
+ self.var['sys_arg_cmd'] = ['./deluge', 'web', '--do-not-daemonize']
+
+ def set_up(self):
+ return WebUIBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return WebUIBaseTestCase.tear_down(self)
+
+
+class ConsoleUIBaseTestCase(UIBaseTestCase):
+ """Implement Console tests that do not require a running daemon"""
+
+ def test_start_console(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'])
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.exec_command()
+
+ def test_start_console_with_log_level(self):
+ _level = []
+
+ def setup_logger(
+ level='error',
+ filename=None,
+ filemode='w',
+ logrotate=None,
+ output_stream=sys.stdout,
+ ):
+ _level.append(level)
+
+ self.patch(deluge.log, 'setup_logger', setup_logger)
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info'])
+
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ # Just test that no exception is raised
+ self.exec_command()
+
+ self.assertEqual(_level[0], 'info')
+
+ def test_console_help(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-h'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.assertRaises(SystemExit, self.exec_command)
+ std_output = fd.out.getvalue()
+ self.assertTrue(
+ ('usage: %s' % self.var['cmd_name']) in std_output
+ ) # Check command name
+ self.assertTrue('Common Options:' in std_output)
+ self.assertTrue('Console Options:' in std_output)
+ self.assertTrue(
+ 'Console Commands:\n The following console commands are available:'
+ in std_output
+ )
+ self.assertTrue(
+ 'The following console commands are available:' in std_output
+ )
+
+ def test_console_command_info(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.exec_command()
+
+ def test_console_command_info_help(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info', '-h'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.assertRaises(SystemExit, self.exec_command)
+ std_output = fd.out.getvalue()
+ self.assertTrue('usage: info' in std_output)
+ self.assertTrue('Show information about the torrents' in std_output)
+
+ def test_console_unrecognized_arguments(self):
+ self.patch(
+ sys, 'argv', ['./deluge', '--ui', 'console']
+ ) # --ui is not longer supported
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stderr', fd)
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.assertRaises(SystemExit, self.exec_command)
+ self.assertTrue('unrecognized arguments: --ui' in fd.out.getvalue())
+
+
+class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase):
+ """Implement Console tests that require a running daemon"""
+
+ def set_up(self):
+ # Avoid calling reactor.shutdown after commands are executed by main.exec_args()
+ deluge.ui.console.main.reactor = common.ReactorOverride()
+ return UIWithDaemonBaseTestCase.set_up(self)
+
+ def patch_arg_command(self, command):
+ if type(command) == str:
+ command = [command]
+ username, password = get_localhost_auth()
+ self.patch(
+ sys,
+ 'argv',
+ self.var['sys_arg_cmd']
+ + ['--port']
+ + ['58900']
+ + ['--username']
+ + [username]
+ + ['--password']
+ + [password]
+ + command,
+ )
+
+ @defer.inlineCallbacks
+ def test_console_command_add(self):
+ filename = common.get_test_data_file('test.torrent')
+ self.patch_arg_command(['add ' + filename])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(sys, 'stdout', fd)
+
+ yield self.exec_command()
+
+ std_output = fd.out.getvalue()
+ self.assertTrue(
+ std_output
+ == 'Attempting to add torrent: ' + filename + '\nTorrent added!\n'
+ )
+
+ @defer.inlineCallbacks
+ def test_console_command_add_move_completed(self):
+ filename = common.get_test_data_file('test.torrent')
+ self.patch_arg_command(
+ [
+ 'add --move-path /tmp ' + filename + ' ; status'
+ ' ; manage'
+ ' ab570cdd5a17ea1b61e970bb72047de141bce173'
+ ' move_completed'
+ ' move_completed_path'
+ ]
+ )
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(sys, 'stdout', fd)
+
+ yield self.exec_command()
+
+ std_output = fd.out.getvalue()
+ self.assertTrue(
+ std_output.endswith('move_completed: True\nmove_completed_path: /tmp\n')
+ or std_output.endswith('move_completed_path: /tmp\nmove_completed: True\n')
+ )
+
+ @defer.inlineCallbacks
+ def test_console_command_status(self):
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch_arg_command(['status'])
+ self.patch(sys, 'stdout', fd)
+
+ yield self.exec_command()
+
+ std_output = fd.out.getvalue()
+ self.assertTrue(
+ std_output.startswith('Total upload: ')
+ and std_output.endswith(' Moving: 0\n')
+ )
+
+
+class ConsoleScriptEntryWithDaemonTestCase(
+ BaseTestCase, ConsoleUIWithDaemonBaseTestCase
+):
+
+ if windows_check():
+ skip = 'cannot test console ui on windows'
+
+ def __init__(self, testname):
+ super(ConsoleScriptEntryWithDaemonTestCase, self).__init__(testname)
+ ConsoleUIWithDaemonBaseTestCase.__init__(self)
+ self.var['cmd_name'] = 'deluge-console'
+ self.var['sys_arg_cmd'] = ['./deluge-console']
+
+ def set_up(self):
+ from deluge.ui.console.console import Console
+
+ def start_console():
+ return Console().start()
+
+ self.patch(deluge.ui.console, 'start', start_console)
+ self.var['start_cmd'] = deluge.ui.console.start
+
+ return ConsoleUIWithDaemonBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return ConsoleUIWithDaemonBaseTestCase.tear_down(self)
+
+
+class ConsoleScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase):
+
+ if windows_check():
+ skip = 'cannot test console ui on windows'
+
+ def __init__(self, testname):
+ super(ConsoleScriptEntryTestCase, self).__init__(testname)
+ ConsoleUIBaseTestCase.__init__(self)
+ self.var['cmd_name'] = 'deluge-console'
+ self.var['start_cmd'] = deluge.ui.console.start
+ self.var['sys_arg_cmd'] = ['./deluge-console']
+
+ def set_up(self):
+ return ConsoleUIBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return ConsoleUIBaseTestCase.tear_down(self)
+
+
+class ConsoleDelugeScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase):
+
+ if windows_check():
+ skip = 'cannot test console ui on windows'
+
+ def __init__(self, testname):
+ super(ConsoleDelugeScriptEntryTestCase, self).__init__(testname)
+ ConsoleUIBaseTestCase.__init__(self)
+ self.var['cmd_name'] = 'deluge console'
+ self.var['start_cmd'] = ui_entry.start_ui
+ self.var['sys_arg_cmd'] = ['./deluge', 'console']
+
+ def set_up(self):
+ return ConsoleUIBaseTestCase.set_up(self)
+
+ def tear_down(self):
+ return ConsoleUIBaseTestCase.tear_down(self)
diff --git a/deluge/tests/test_web_api.py b/deluge/tests/test_web_api.py
new file mode 100644
index 0000000..982a93b
--- /dev/null
+++ b/deluge/tests/test_web_api.py
@@ -0,0 +1,199 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+from io import BytesIO
+
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
+from twisted.web.client import Agent, FileBodyProducer
+from twisted.web.http_headers import Headers
+from twisted.web.static import File
+
+import deluge.component as component
+from deluge.ui.client import client
+
+from . import common
+from .common_web import WebServerTestBase
+
+common.disable_new_release_check()
+
+
+class WebAPITestCase(WebServerTestBase):
+ def test_connect_invalid_host(self):
+ d = self.deluge_web.web_api.connect('id')
+ d.addCallback(self.fail)
+ d.addErrback(self.assertIsInstance, Failure)
+ return d
+
+ def test_connect(self):
+ d = self.deluge_web.web_api.connect(self.host_id)
+
+ def on_connect(result):
+ self.assertEqual(type(result), tuple)
+ self.assertTrue(len(result) > 0)
+ self.addCleanup(client.disconnect)
+ return result
+
+ d.addCallback(on_connect)
+ d.addErrback(self.fail)
+ return d
+
+ def test_disconnect(self):
+ d = self.deluge_web.web_api.connect(self.host_id)
+
+ @defer.inlineCallbacks
+ def on_connect(result):
+ self.assertTrue(self.deluge_web.web_api.connected())
+ yield self.deluge_web.web_api.disconnect()
+ self.assertFalse(self.deluge_web.web_api.connected())
+
+ d.addCallback(on_connect)
+ d.addErrback(self.fail)
+ return d
+
+ def test_get_config(self):
+ config = self.deluge_web.web_api.get_config()
+ self.assertEqual(self.webserver_listen_port, config['port'])
+
+ def test_set_config(self):
+ config = self.deluge_web.web_api.get_config()
+ config['pwd_salt'] = 'new_salt'
+ config['pwd_sha1'] = 'new_sha'
+ config['sessions'] = {
+ '233f23632af0a74748bc5dd1d8717564748877baa16420e6898e17e8aa365e6e': {
+ 'login': 'skrot',
+ 'expires': 1460030877.0,
+ 'level': 10,
+ }
+ }
+ self.deluge_web.web_api.set_config(config)
+ web_config = component.get('DelugeWeb').config.config
+ self.assertNotEquals(config['pwd_salt'], web_config['pwd_salt'])
+ self.assertNotEquals(config['pwd_sha1'], web_config['pwd_sha1'])
+ self.assertNotEquals(config['sessions'], web_config['sessions'])
+
+ @defer.inlineCallbacks
+ def get_host_status(self):
+ host = list(self.deluge_web.web_api._get_host(self.host_id))
+ host[3] = 'Online'
+ host[4] = '2.0.0.dev562'
+ status = yield self.deluge_web.web_api.get_host_status(self.host_id)
+ self.assertEqual(status, tuple(status))
+
+ def test_get_host(self):
+ self.assertFalse(self.deluge_web.web_api._get_host('invalid_id'))
+ conn = list(self.deluge_web.web_api.hostlist.get_hosts_info()[0])
+ self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4])
+
+ def test_add_host(self):
+ conn = ['abcdef', '10.0.0.1', 0, 'user123', 'pass123']
+ self.assertFalse(self.deluge_web.web_api._get_host(conn[0]))
+ # Add valid host
+ result, host_id = self.deluge_web.web_api.add_host(
+ conn[1], conn[2], conn[3], conn[4]
+ )
+ self.assertEqual(result, True)
+ conn[0] = host_id
+ self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4])
+
+ # Add already existing host
+ ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4])
+ self.assertEqual(ret, (False, 'Host details already in hostlist'))
+
+ # Add invalid port
+ conn[2] = 'bad port'
+ ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4])
+ self.assertEqual(ret, (False, 'Invalid port. Must be an integer'))
+
+ def test_remove_host(self):
+ conn = ['connection_id', '', 0, '', '']
+ self.deluge_web.web_api.hostlist.config['hosts'].append(conn)
+ self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4])
+ # Remove valid host
+ self.assertTrue(self.deluge_web.web_api.remove_host(conn[0]))
+ self.assertFalse(self.deluge_web.web_api._get_host(conn[0]))
+ # Remove non-existing host
+ self.assertFalse(self.deluge_web.web_api.remove_host(conn[0]))
+
+ def test_get_torrent_info(self):
+ filename = common.get_test_data_file('test.torrent')
+ ret = self.deluge_web.web_api.get_torrent_info(filename)
+ self.assertEqual(ret['name'], 'azcvsupdater_2.6.2.jar')
+ self.assertEqual(ret['info_hash'], 'ab570cdd5a17ea1b61e970bb72047de141bce173')
+ self.assertTrue('files_tree' in ret)
+
+ def test_get_magnet_info(self):
+ ret = self.deluge_web.web_api.get_magnet_info(
+ 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN'
+ )
+ self.assertEqual(ret['name'], '953bad769164e8482c7785a21d12166f94b9e14d')
+ self.assertEqual(ret['info_hash'], '953bad769164e8482c7785a21d12166f94b9e14d')
+ self.assertTrue('files_tree' in ret)
+
+ @defer.inlineCallbacks
+ def test_get_torrent_files(self):
+ yield self.deluge_web.web_api.connect(self.host_id)
+ filename = common.get_test_data_file('test.torrent')
+ torrents = [
+ {'path': filename, 'options': {'download_location': '/home/deluge/'}}
+ ]
+ yield self.deluge_web.web_api.add_torrents(torrents)
+ ret = yield self.deluge_web.web_api.get_torrent_files(
+ 'ab570cdd5a17ea1b61e970bb72047de141bce173'
+ )
+ self.assertEqual(ret['type'], 'dir')
+ self.assertEqual(
+ ret['contents'],
+ {
+ 'azcvsupdater_2.6.2.jar': {
+ 'priority': 4,
+ 'index': 0,
+ 'offset': 0,
+ 'progress': 0.0,
+ 'path': 'azcvsupdater_2.6.2.jar',
+ 'type': 'file',
+ 'size': 307949,
+ }
+ },
+ )
+
+ @defer.inlineCallbacks
+ def test_download_torrent_from_url(self):
+ filename = 'ubuntu-9.04-desktop-i386.iso.torrent'
+ self.deluge_web.top_level.putChild(
+ filename.encode(), File(common.get_test_data_file(filename))
+ )
+ url = 'http://localhost:%d/%s' % (self.webserver_listen_port, filename)
+ res = yield self.deluge_web.web_api.download_torrent_from_url(url)
+ self.assertTrue(res.endswith(filename))
+
+ @defer.inlineCallbacks
+ def test_invalid_json(self):
+ """
+ If json_api._send_response does not return server.NOT_DONE_YET
+ this error is thrown when json is invalid:
+ exceptions.RuntimeError: Request.write called on a request after Request.finish was called.
+
+ """
+ agent = Agent(reactor)
+ bad_body = b'{ method": "auth.login" }'
+ d = yield agent.request(
+ b'POST',
+ b'http://127.0.0.1:%i/json' % self.webserver_listen_port,
+ Headers(
+ {
+ b'User-Agent': [b'Twisted Web Client Example'],
+ b'Content-Type': [b'application/json'],
+ }
+ ),
+ FileBodyProducer(BytesIO(bad_body)),
+ )
+ yield d
diff --git a/deluge/tests/test_web_auth.py b/deluge/tests/test_web_auth.py
new file mode 100644
index 0000000..a518573
--- /dev/null
+++ b/deluge/tests/test_web_auth.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+from __future__ import unicode_literals
+
+from mock import patch
+from twisted.trial import unittest
+
+from deluge.ui.web import auth
+
+
+class MockConfig(object):
+ def __init__(self, config):
+ self.config = config
+
+ def __getitem__(self, key):
+ return self.config[key]
+
+ def __setitem__(self, key, value):
+ self.config[key] = value
+
+
+class WebAuthTestCase(unittest.TestCase):
+ @patch('deluge.ui.web.auth.JSONComponent.__init__', return_value=None)
+ def test_change_password(self, mock_json):
+ config = MockConfig(
+ {
+ 'pwd_sha1': '8d8ff487626141d2b91025901d3ab57211180b48',
+ 'pwd_salt': '7555d757710158655bd1646e207dee21a89e9226',
+ }
+ )
+ webauth = auth.Auth(config)
+ self.assertTrue(webauth.change_password('deluge', 'deluge_new'))
diff --git a/deluge/tests/test_webserver.py b/deluge/tests/test_webserver.py
new file mode 100644
index 0000000..d9684ba
--- /dev/null
+++ b/deluge/tests/test_webserver.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import json as json_lib
+from io import BytesIO
+
+import twisted.web.client
+from twisted.internet import defer, reactor
+from twisted.web.client import Agent, FileBodyProducer
+from twisted.web.http_headers import Headers
+
+from . import common
+from .common import get_test_data_file
+from .common_web import WebServerMockBase, WebServerTestBase
+
+common.disable_new_release_check()
+
+
+class WebServerTestCase(WebServerTestBase, WebServerMockBase):
+ @defer.inlineCallbacks
+ def test_get_torrent_info(self):
+
+ agent = Agent(reactor)
+
+ self.mock_authentication_ignore(self.deluge_web.auth)
+
+ # This torrent file contains an uncommon field 'filehash' which must be hex
+ # encoded to allow dumping the torrent info to json. Otherwise it will fail with:
+ # UnicodeDecodeError: 'utf8' codec can't decode byte 0xe5 in position 0: invalid continuation byte
+ filename = get_test_data_file('filehash_field.torrent')
+ input_file = (
+ '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}' % filename
+ )
+ headers = {
+ b'User-Agent': ['Twisted Web Client Example'],
+ b'Content-Type': ['application/json'],
+ }
+ url = 'http://127.0.0.1:%s/json' % self.webserver_listen_port
+
+ d = yield agent.request(
+ b'POST',
+ url.encode('utf-8'),
+ Headers(headers),
+ FileBodyProducer(BytesIO(input_file.encode('utf-8'))),
+ )
+
+ body = yield twisted.web.client.readBody(d)
+
+ json = json_lib.loads(body.decode())
+ self.assertEqual(None, json['error'])
+ self.assertEqual('torrent_filehash', json['result']['name'])
diff --git a/deluge/tests/twisted/plugins/delugereporter.py b/deluge/tests/twisted/plugins/delugereporter.py
new file mode 100644
index 0000000..c2a7b52
--- /dev/null
+++ b/deluge/tests/twisted/plugins/delugereporter.py
@@ -0,0 +1,50 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from __future__ import unicode_literals
+
+import os
+
+from twisted.plugin import IPlugin
+from twisted.trial.itrial import IReporter
+from twisted.trial.reporter import TreeReporter
+from zope.interface import implements
+
+
+class _Reporter(object):
+ implements(IPlugin, IReporter)
+
+ def __init__(
+ self, name, module, description, longOpt, shortOpt, klass # noqa: N803
+ ):
+ self.name = name
+ self.module = module
+ self.description = description
+ self.longOpt = longOpt
+ self.shortOpt = shortOpt
+ self.klass = klass
+
+
+deluge = _Reporter(
+ 'Deluge reporter that suppresses Stacktrace from TODO tests',
+ 'twisted.plugins.delugereporter',
+ description='Deluge Reporter',
+ longOpt='deluge-reporter',
+ shortOpt=None,
+ klass='DelugeReporter',
+)
+
+
+class DelugeReporter(TreeReporter):
+ def __init__(self, *args, **kwargs):
+ os.environ['DELUGE_REPORTER'] = 'true'
+ TreeReporter.__init__(self, *args, **kwargs)
+
+ def addExpectedFailure(self, *args): # NOQA: N802
+ # super(TreeReporter, self).addExpectedFailure(*args)
+ self.endLine('[TODO]', self.TODO)