summaryrefslogtreecommitdiffstats
path: root/deluge/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 21:38:38 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 21:38:38 +0000
commit2e2851dc13d73352530dd4495c7e05603b2e520d (patch)
tree622b9cd8e5d32091c9aa9e4937b533975a40356c /deluge/tests
parentInitial commit. (diff)
downloaddeluge-upstream.tar.xz
deluge-upstream.zip
Adding upstream version 2.1.2~dev0+20240219.upstream/2.1.2_dev0+20240219upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'deluge/tests')
-rw-r--r--deluge/tests/__init__.py17
-rw-r--r--deluge/tests/common.py363
-rw-r--r--deluge/tests/common_web.py57
-rw-r--r--deluge/tests/daemon_base.py67
-rw-r--r--deluge/tests/data/deluge.pngbin0 -> 1126 bytes
-rw-r--r--deluge/tests/data/dir_with_6_files.torrentbin0 -> 9630 bytes
-rw-r--r--deluge/tests/data/dir_with_single_file.torrent1
-rw-r--r--deluge/tests/data/filehash_field.torrent2
-rw-r--r--deluge/tests/data/google.icobin0 -> 5430 bytes
-rw-r--r--deluge/tests/data/md5sum.torrent1
-rw-r--r--deluge/tests/data/seo.svg1
-rw-r--r--deluge/tests/data/test.torrent2
-rw-r--r--deluge/tests/data/test_torrent.file.torrent2
-rw-r--r--deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrentbin0 -> 28184 bytes
-rw-r--r--deluge/tests/data/unicode_file.torrent1
-rw-r--r--deluge/tests/data/unicode_filenames.torrentbin0 -> 34298 bytes
-rw-r--r--deluge/tests/data/utf8_filename_torrents.state85
-rw-r--r--deluge/tests/data/v2_hybrid.torrentbin0 -> 613 bytes
-rw-r--r--deluge/tests/data/v2_test.torrentbin0 -> 345 bytes
-rw-r--r--deluge/tests/test_alertmanager.py102
-rw-r--r--deluge/tests/test_authmanager.py23
-rw-r--r--deluge/tests/test_bencode.py32
-rw-r--r--deluge/tests/test_client.py192
-rw-r--r--deluge/tests/test_common.py227
-rw-r--r--deluge/tests/test_component.py192
-rw-r--r--deluge/tests/test_config.py274
-rw-r--r--deluge/tests/test_core.py511
-rw-r--r--deluge/tests/test_decorators.py48
-rw-r--r--deluge/tests/test_error.py39
-rw-r--r--deluge/tests/test_files_tab.py163
-rw-r--r--deluge/tests/test_httpdownloader.py275
-rw-r--r--deluge/tests/test_json_api.py267
-rw-r--r--deluge/tests/test_log.py47
-rw-r--r--deluge/tests/test_maketorrent.py85
-rw-r--r--deluge/tests/test_maybe_coroutine.py207
-rw-r--r--deluge/tests/test_metafile.py112
-rw-r--r--deluge/tests/test_plugin_metadata.py43
-rw-r--r--deluge/tests/test_rpcserver.py108
-rw-r--r--deluge/tests/test_security.py158
-rw-r--r--deluge/tests/test_sessionproxy.py154
-rw-r--r--deluge/tests/test_torrent.py388
-rw-r--r--deluge/tests/test_torrentmanager.py146
-rw-r--r--deluge/tests/test_torrentview.py224
-rw-r--r--deluge/tests/test_tracker_icons.py71
-rw-r--r--deluge/tests/test_transfer.py398
-rw-r--r--deluge/tests/test_ui_common.py290
-rw-r--r--deluge/tests/test_ui_console.py80
-rw-r--r--deluge/tests/test_ui_entry.py440
-rw-r--r--deluge/tests/test_ui_gtk3.py30
-rw-r--r--deluge/tests/test_web_api.py202
-rw-r--r--deluge/tests/test_web_auth.py33
-rw-r--r--deluge/tests/test_webserver.py108
52 files changed, 6268 insertions, 0 deletions
diff --git a/deluge/tests/__init__.py b/deluge/tests/__init__.py
new file mode 100644
index 0000000..7b6afa1
--- /dev/null
+++ b/deluge/tests/__init__.py
@@ -0,0 +1,17 @@
+# Increase open file descriptor limit to allow tests to run
+# without getting error: what(): epoll: Too many open files
+from deluge.i18n import setup_translation
+
+try:
+ import resource
+except ImportError: # Does not exist on Windows
+ pass
+else:
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536))
+ except (ValueError, resource.error) as ex:
+ error = 'Failed to raise file descriptor limit: %s' % ex
+ # print(error)
+
+# Initialize gettext
+setup_translation()
diff --git a/deluge/tests/common.py b/deluge/tests/common.py
new file mode 100644
index 0000000..b594156
--- /dev/null
+++ b/deluge/tests/common.py
@@ -0,0 +1,363 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+import sys
+import traceback
+
+import pytest
+from twisted.internet import defer, protocol, reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.error import CannotListenError
+
+import deluge.configmanager
+import deluge.core.preferencesmanager
+import deluge.log
+from deluge.common import get_localhost_auth
+from deluge.error import DelugeError
+from deluge.ui.client import Client
+
+# This sets log level to critical, so use log.critical() to debug while running unit tests
+deluge.log.setup_logger('none')
+
+
+def disable_new_release_check():
+ deluge.core.preferencesmanager.DEFAULT_PREFS['new_release_check'] = False
+
+
+def setup_test_logger(level='info', prefix='deluge'):
+ deluge.log.setup_logger(level, filename='%s.log' % prefix, twisted_observer=False)
+
+
+def get_test_data_file(filename):
+ return os.path.join(os.path.join(os.path.dirname(__file__), 'data'), filename)
+
+
+def todo_test(caller):
+ # If we are using the delugereporter we can set todo mark on the test
+ # Without the delugereporter the todo would print a stack trace, so in
+ # that case we rely only on skipTest
+ if os.environ.get('DELUGE_REPORTER', None):
+ getattr(caller, caller._testMethodName).__func__.todo = 'To be fixed'
+
+ filename = os.path.basename(traceback.extract_stack(None, 2)[0][0])
+ funcname = traceback.extract_stack(None, 2)[0][2]
+ pytest.skip(f'TODO: {filename}:{funcname}')
+
+
+def add_watchdog(deferred, timeout=0.05, message=None):
+ def callback(value):
+ if not watchdog.called and not watchdog.cancelled:
+ watchdog.cancel()
+ if not deferred.called:
+ if message:
+ print(message)
+ deferred.cancel()
+ return value
+
+ deferred.addBoth(callback)
+ watchdog = reactor.callLater(timeout, defer.Deferred.addTimeout, deferred)
+ return watchdog
+
+
+class ReactorOverride:
+ """Class used to patch reactor while running unit tests
+ to avoid starting and stopping the twisted reactor
+ """
+
+ def __getattr__(self, attr):
+ if attr == 'run':
+ return self._run
+ if attr == 'stop':
+ return self._stop
+ return getattr(reactor, attr)
+
+ def _run(self):
+ pass
+
+ def _stop(self):
+ pass
+
+ def addReader(self, arg): # NOQA: N802
+ pass
+
+
+class ProcessOutputHandler(protocol.ProcessProtocol):
+ def __init__(
+ self,
+ script,
+ shutdown_func,
+ callbacks,
+ logfile=None,
+ print_stdout=True,
+ print_stderr=True,
+ ):
+ """Executes a script and handle the process' output to stdout and stderr.
+
+ Args:
+ script (str): The script to execute.
+ shutdown_func (func): A function which will gracefully stop the called script.
+ callbacks (list): Callbacks to trigger if the expected output if found.
+ logfile (str, optional): Filename to wrote the process' output.
+ print_stderr (bool): Print the process' stderr output to stdout.
+ print_stdout (bool): Print the process' stdout output to stdout.
+
+ """
+ self.callbacks = callbacks
+ self.script = script
+ self.shutdown_func = shutdown_func
+ self.log_output = ''
+ self.stderr_out = ''
+ self.logfile = logfile
+ self.print_stdout = print_stdout
+ self.print_stderr = print_stderr
+ self.quit_d = None
+ self.killed = False
+ self.watchdogs = []
+
+ def connectionMade(self): # NOQA: N802
+ self.transport.write(self.script)
+ self.transport.closeStdin()
+
+ def outConnectionLost(self): # NOQA: N802
+ if not self.logfile:
+ return
+ with open(self.logfile, 'w') as f:
+ f.write(self.log_output)
+
+ @defer.inlineCallbacks
+ def kill(self):
+ """Kill the running process.
+
+ Returns:
+ Deferred: A deferred that is triggered when the process has quit.
+
+ """
+ if self.killed:
+ return
+ self.killed = True
+ self._kill_watchdogs()
+ self.quit_d = Deferred()
+ shutdown = self.shutdown_func()
+ shutdown.addTimeout(5, reactor)
+ try:
+ yield shutdown
+ except Exception:
+ self.transport.signalProcess('TERM')
+ result = yield self.quit_d
+ return result
+
+ def _kill_watchdogs(self):
+ """Cancel all watchdogs"""
+ for w in self.watchdogs:
+ if not w.called and not w.cancelled:
+ w.cancel()
+
+ def processEnded(self, status): # NOQA: N802
+ self.transport.loseConnection()
+ if self.quit_d is None:
+ return
+ if status.value.exitCode == 0:
+ self.quit_d.callback(True)
+ else:
+ self.quit_d.errback(status)
+
+ def check_callbacks(self, data, cb_type='stdout'):
+ ret = False
+ for c in self.callbacks:
+ if cb_type not in c['types'] or c['deferred'].called:
+ continue
+ for trigger in c['triggers']:
+ if trigger['expr'] in data:
+ ret = True
+ if 'cb' in trigger:
+ trigger['cb'](self, c['deferred'], data, self.log_output)
+ elif 'value' not in trigger:
+ raise Exception('Trigger must specify either "cb" or "value"')
+ else:
+ val = trigger['value'](self, data, self.log_output)
+ if trigger.get('type', 'callback') == 'errback':
+ c['deferred'].errback(val)
+ else:
+ c['deferred'].callback(val)
+ return ret
+
+ def outReceived(self, data): # NOQA: N802
+ """Process output from stdout"""
+ data = data.decode('utf8')
+ self.log_output += data
+ if self.check_callbacks(data):
+ pass
+ elif '[ERROR' in data:
+ if not self.print_stdout:
+ return
+ print(data, end=' ')
+
+ def errReceived(self, data): # NOQA: N802
+ """Process output from stderr"""
+ data = data.decode('utf8')
+ self.log_output += data
+ self.stderr_out += data
+ self.check_callbacks(data, cb_type='stderr')
+ if not self.print_stderr:
+ return
+ data = '\n%s' % data.strip()
+ prefixed = data.replace('\n', '\nSTDERR: ')
+ print('\n%s' % prefixed)
+
+
+def start_core(
+ listen_port=58900,
+ logfile=None,
+ timeout=10,
+ timeout_msg=None,
+ custom_script='',
+ print_stdout=True,
+ print_stderr=True,
+ extra_callbacks=None,
+ config_directory='',
+):
+ """Start the deluge core as a daemon.
+
+ Args:
+ listen_port (int, optional): The port the daemon listens for client connections.
+ logfile (str, optional): Logfile name to write the output from the process.
+ timeout (int): If none of the callbacks have been triggered before the timeout, the process is killed.
+ timeout_msg (str): The message to print when the timeout expires.
+ custom_script (str): Extra python code to insert into the daemon process script.
+ print_stderr (bool): If the output from the process' stderr should be printed to stdout.
+ print_stdout (bool): If the output from the process' stdout should be printed to stdout.
+ extra_callbacks (list): A list of dictionaries specifying extra callbacks.
+
+ Returns:
+ tuple(Deferred, ProcessOutputHandler):
+
+ The Deferred is fired when the core callback is triggered either after the default
+ output triggers are matched (daemon successfully started, or failed to start),
+ or upon timeout expiry. The ProcessOutputHandler is the handler for the deluged process.
+
+ """
+ daemon_script = """
+import sys
+import deluge.core.daemon_entry
+
+from deluge.common import windows_check
+
+if windows_check():
+ sys.argv.extend(['-c', '%(dir)s', '-L', 'info', '-p', '%(port)d'])
+else:
+ sys.argv.extend(['-d', '-c', '%(dir)s', '-L', 'info', '-p', '%(port)d'])
+
+try:
+ daemon = deluge.core.daemon_entry.start_daemon(skip_start=True)
+ %(script)s
+ daemon.start()
+except Exception:
+ import traceback
+ sys.stderr.write('Exception raised:\\n %%s' %% traceback.format_exc())
+""" % {
+ 'dir': config_directory.as_posix(),
+ 'port': listen_port,
+ 'script': custom_script,
+ }
+
+ callbacks = []
+ default_core_cb = {'deferred': Deferred(), 'types': 'stdout'}
+ if timeout:
+ default_core_cb['timeout'] = timeout
+
+ # Specify the triggers for daemon log output
+ default_core_cb['triggers'] = [
+ {'expr': 'Finished loading ', 'value': lambda reader, data, data_all: reader},
+ {
+ 'expr': 'Cannot start deluged, listen port in use.',
+ 'type': 'errback',
+ 'value': lambda reader, data, data_all: CannotListenError(
+ 'localhost',
+ listen_port,
+ 'Could not start deluge test client!\n%s' % data,
+ ),
+ },
+ {
+ 'expr': 'Traceback',
+ 'type': 'errback',
+ 'value': lambda reader, data, data_all: DelugeError(
+ 'Traceback found when starting daemon:\n%s' % data
+ ),
+ },
+ ]
+
+ callbacks.append(default_core_cb)
+ if extra_callbacks:
+ callbacks.extend(extra_callbacks)
+
+ @defer.inlineCallbacks
+ def shutdown_daemon():
+ username, password = get_localhost_auth()
+ client = Client()
+ yield client.connect(
+ 'localhost', listen_port, username=username, password=password
+ )
+ yield client.daemon.shutdown()
+
+ process_protocol = start_process(
+ daemon_script, shutdown_daemon, callbacks, logfile, print_stdout, print_stderr
+ )
+ return default_core_cb['deferred'], process_protocol
+
+
+def start_process(
+ script, shutdown_func, callbacks, logfile=None, print_stdout=True, print_stderr=True
+):
+ """
+ Starts an external python process which executes the given script.
+
+ Args:
+ script (str): The content of the script to execute.
+ shutdown_func (func): A function which will gracefully end the called script.
+ callbacks (list): list of dictionaries specifying callbacks.
+ logfile (str, optional): Logfile name to write the output from the process.
+ print_stderr (bool): If the output from the process' stderr should be printed to stdout.
+ print_stdout (bool): If the output from the process' stdout should be printed to stdout.
+
+ Returns:
+ ProcessOutputHandler: The handler for the process's output.
+
+ Each entry in the callbacks list is a dictionary with the following keys:
+ * "deferred": The deferred to be called when matched.
+ * "types": The output this callback should be matched against.
+ Possible values: ["stdout", "stderr"]
+ * "timeout" (optional): A timeout in seconds for the deferred.
+ * "triggers": A list of dictionaries, each specifying specifying a trigger:
+ * "expr": A string to match against the log output.
+ * "value": A function to produce the result to be passed to the callback.
+ * "type" (optional): A string that specifies wether to trigger a regular callback or errback.
+
+ """
+ cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ process_protocol = ProcessOutputHandler(
+ script.encode('utf8'),
+ shutdown_func,
+ callbacks,
+ logfile,
+ print_stdout,
+ print_stderr,
+ )
+
+ # Add timeouts to deferreds
+ for c in callbacks:
+ if 'timeout' in c:
+ w = add_watchdog(
+ c['deferred'], timeout=c['timeout'], message=c.get('timeout_msg', None)
+ )
+ process_protocol.watchdogs.append(w)
+
+ reactor.spawnProcess(
+ process_protocol, sys.executable, args=[sys.executable], path=cwd
+ )
+ return process_protocol
diff --git a/deluge/tests/common_web.py b/deluge/tests/common_web.py
new file mode 100644
index 0000000..f7da577
--- /dev/null
+++ b/deluge/tests/common_web.py
@@ -0,0 +1,57 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import pytest
+
+import deluge.common
+import deluge.ui.web.auth
+import deluge.ui.web.server
+from deluge import configmanager
+from deluge.conftest import BaseTestCase
+from deluge.ui.web.server import DelugeWeb
+
+from .common import ReactorOverride
+
+
+@pytest.mark.usefixtures('daemon', 'component')
+class WebServerTestBase(BaseTestCase):
+ """
+ Base class for tests that need a running webapi
+
+ """
+
+ def set_up(self):
+ self.host_id = None
+ deluge.ui.web.server.reactor = ReactorOverride()
+ return self.start_webapi(None)
+
+ def start_webapi(self, arg):
+ config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy()
+ config_defaults['port'] = 8999
+ self.config = configmanager.ConfigManager('web.conf', config_defaults)
+
+ self.deluge_web = DelugeWeb(daemon=False)
+
+ host = list(self.deluge_web.web_api.hostlist.config['hosts'][0])
+ host[2] = self.listen_port
+ self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host)
+ self.host_id = host[0]
+ self.deluge_web.start()
+
+
+class WebServerMockBase:
+ """
+ Class with utility functions for mocking with tests using the webserver
+
+ """
+
+ def mock_authentication_ignore(self, auth):
+ def check_request(request, method=None, level=None):
+ pass
+
+ self.patch(auth, 'check_request', check_request)
diff --git a/deluge/tests/daemon_base.py b/deluge/tests/daemon_base.py
new file mode 100644
index 0000000..707570f
--- /dev/null
+++ b/deluge/tests/daemon_base.py
@@ -0,0 +1,67 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import pytest
+from twisted.internet import defer
+from twisted.internet.error import CannotListenError
+
+import deluge.component as component
+
+from . import common
+
+
+@pytest.mark.usefixtures('config_dir')
+class DaemonBase:
+ def common_set_up(self):
+ self.listen_port = 58900
+ self.core = None
+ return component.start()
+
+ def terminate_core(self, *args):
+ if args[0] is not None:
+ if hasattr(args[0], 'getTraceback'):
+ print('terminate_core: Errback Exception: %s' % args[0].getTraceback())
+
+ if not self.core.killed:
+ d = self.core.kill()
+ return d
+
+ @defer.inlineCallbacks
+ def start_core(
+ self,
+ arg,
+ custom_script='',
+ logfile='',
+ print_stdout=True,
+ print_stderr=True,
+ timeout=5,
+ port_range=10,
+ extra_callbacks=None,
+ ):
+ logfile = f'daemon_{self.id()}.log' if logfile == '' else logfile
+
+ for dummy in range(port_range):
+ try:
+ d, self.core = common.start_core(
+ listen_port=self.listen_port,
+ logfile=logfile,
+ timeout=timeout,
+ timeout_msg='Timeout!',
+ custom_script=custom_script,
+ print_stdout=print_stdout,
+ print_stderr=print_stderr,
+ extra_callbacks=extra_callbacks,
+ config_directory=self.config_dir,
+ )
+ yield d
+ except CannotListenError as ex:
+ exception_error = ex
+ self.listen_port += 1
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ else:
+ return
+ raise exception_error
diff --git a/deluge/tests/data/deluge.png b/deluge/tests/data/deluge.png
new file mode 100644
index 0000000..6787fa3
--- /dev/null
+++ b/deluge/tests/data/deluge.png
Binary files differ
diff --git a/deluge/tests/data/dir_with_6_files.torrent b/deluge/tests/data/dir_with_6_files.torrent
new file mode 100644
index 0000000..2c6b5fb
--- /dev/null
+++ b/deluge/tests/data/dir_with_6_files.torrent
Binary files differ
diff --git a/deluge/tests/data/dir_with_single_file.torrent b/deluge/tests/data/dir_with_single_file.torrent
new file mode 100644
index 0000000..33fec2c
--- /dev/null
+++ b/deluge/tests/data/dir_with_single_file.torrent
@@ -0,0 +1 @@
+d10:created by13:mktorrent 1.113:creation datei1684991433e4:infod5:filesld6:lengthi9e4:pathl15:single_file.txteee4:name20:dir_with_single_file12:piece lengthi262144e6:pieces20:Wi,=35Yhee \ No newline at end of file
diff --git a/deluge/tests/data/filehash_field.torrent b/deluge/tests/data/filehash_field.torrent
new file mode 100644
index 0000000..99e41f0
--- /dev/null
+++ b/deluge/tests/data/filehash_field.torrent
@@ -0,0 +1,2 @@
+d13:creation datei1476342472e4:infod5:filesld4:ed2k16:2M2&XE 18:filehash20:f4^S96P՝R6:lengthi54e4:pathl8:tull.txteed4:ed2k16:_G L@8:filehash20:vXd/n136:lengthi54e4:pathl56:還在一個人無聊嗎~還不趕緊上來聊天美.txteee4:name16:torrent_filehash12:piece lengthi32768e6:pieces20:G@g\&\ fB
+ee \ No newline at end of file
diff --git a/deluge/tests/data/google.ico b/deluge/tests/data/google.ico
new file mode 100644
index 0000000..82339b3
--- /dev/null
+++ b/deluge/tests/data/google.ico
Binary files differ
diff --git a/deluge/tests/data/md5sum.torrent b/deluge/tests/data/md5sum.torrent
new file mode 100644
index 0000000..0e8c93f
--- /dev/null
+++ b/deluge/tests/data/md5sum.torrent
@@ -0,0 +1 @@
+d8:announce25:lol.this.is.not.a.tracker7:comment36:created with py3createtorrent v0.9.610:created by23:py3createtorrent v0.9.613:creation datei1590076175e4:infod5:filesld6:lengthi4e6:md5sum32:59bcc3ad6775562f845953cf016242254:pathl3:loleed6:lengthi5e6:md5sum32:10245815f893d79f3d779690774f0b434:pathl4:rofleee4:name4:test12:piece lengthi16384e6:pieces20:86Aڲ-Y>+S]\/ee \ No newline at end of file
diff --git a/deluge/tests/data/seo.svg b/deluge/tests/data/seo.svg
new file mode 100644
index 0000000..fc96f74
--- /dev/null
+++ b/deluge/tests/data/seo.svg
@@ -0,0 +1 @@
+<?xml version="1.0" encoding="UTF-8"?> <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 95.63 110.42" width="10cm" height="10cm"><defs><style>.cls-1{fill:#ec5728;}.cls-2{fill:#fff;}</style></defs><title>seocom-target</title><g id="Layer_2" data-name="Layer 2"><g id="Layer_1-2" data-name="Layer 1"><polygon class="cls-1" points="95.63 82.81 47.81 110.42 0 82.81 0 27.61 47.81 0 95.63 27.61 95.63 82.81"></polygon><path class="cls-2" d="M47.81,18.64A36.57,36.57,0,1,0,84.38,55.21,36.57,36.57,0,0,0,47.81,18.64Zm0,63.92A27.35,27.35,0,1,1,75.16,55.21,27.35,27.35,0,0,1,47.81,82.56Z"></path><path class="cls-2" d="M47.81,39.46A15.75,15.75,0,1,0,63.56,55.21,15.75,15.75,0,0,0,47.81,39.46Zm0,24.25a8.5,8.5,0,1,1,8.5-8.5A8.51,8.51,0,0,1,47.81,63.71Z"></path></g></g></svg> \ No newline at end of file
diff --git a/deluge/tests/data/test.torrent b/deluge/tests/data/test.torrent
new file mode 100644
index 0000000..847ec58
--- /dev/null
+++ b/deluge/tests/data/test.torrent
@@ -0,0 +1,2 @@
+d8:announce40:http://tracker.aelitis.com:6969/announce13:announce-listll40:http://tracker.aelitis.com:6969/announceel41:http://tracker.aelitis.com:16969/announceee18:azureus_propertiesd17:dht_backup_enablei0ee7:comment34:provided by http://getazureus.com/13:comment.utf-834:provided by http://getazureus.com/10:created by19:Azureus/2.5.0.3_CVS13:creation datei1169429806e4:infod4:ed2k16:>pl]K^\;;e6:lengthi307949e4:name22:azcvsupdater_2.6.2.jar10:name.utf-822:azcvsupdater_2.6.2.jar12:piece lengthi32768e6:pieces200:B'bsu97u<w\fԛ}:qv"7 y! lv{/"PD8-MFx+S`%%
+;ϐ /F>gA2|ڂb#wIfWswKrGWiد#u E?:ub(oj ^AJ?֌&7:privatei0e4:sha120:2ζ"Կ^Khŷee \ No newline at end of file
diff --git a/deluge/tests/data/test_torrent.file.torrent b/deluge/tests/data/test_torrent.file.torrent
new file mode 100644
index 0000000..fca14ca
--- /dev/null
+++ b/deluge/tests/data/test_torrent.file.torrent
@@ -0,0 +1,2 @@
+d7:comment17:Test torrent file10:created by11:Deluge Team13:creation datei1411826665e8:encoding5:UTF-84:infod6:lengthi512000e4:name17:test_torrent.file12:piece lengthi32768e6:pieces320:$2Tj >hU.--~–ޔBzuEB1@ͥ/K"zF0֣[asV1B^Wp-S׶F`M9)},4niW jQI̗(,t؋chi*M}^WS7 h:-beTXa3m|J"]0$}l@L V,4˫zMLģJ*
+\AP&I9}20֎H:_8<V2JYb2'2h0\*_j7:privatei0eee \ No newline at end of file
diff --git a/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent
new file mode 100644
index 0000000..b55c9ae
--- /dev/null
+++ b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent
Binary files differ
diff --git a/deluge/tests/data/unicode_file.torrent b/deluge/tests/data/unicode_file.torrent
new file mode 100644
index 0000000..e62fb1f
--- /dev/null
+++ b/deluge/tests/data/unicode_file.torrent
@@ -0,0 +1 @@
+d13:creation datei1627211242e8:encoding5:UTF-84:infod6:lengthi32e4:name35:সুকুমার রায়.txt12:piece lengthi32768e6:pieces20:",.xe2U7:privatei0eee
diff --git a/deluge/tests/data/unicode_filenames.torrent b/deluge/tests/data/unicode_filenames.torrent
new file mode 100644
index 0000000..e34f055
--- /dev/null
+++ b/deluge/tests/data/unicode_filenames.torrent
Binary files differ
diff --git a/deluge/tests/data/utf8_filename_torrents.state b/deluge/tests/data/utf8_filename_torrents.state
new file mode 100644
index 0000000..0e9c33d
--- /dev/null
+++ b/deluge/tests/data/utf8_filename_torrents.state
@@ -0,0 +1,85 @@
+(ideluge.core.torrentmanager
+TorrentManagerState
+p1
+(dp2
+S'torrents'
+p3
+(lp4
+(ideluge.core.torrentmanager
+TorrentState
+p5
+(dp6
+S'max_download_speed'
+p7
+I-1
+sS'move_completed_path'
+p8
+S'/home/calum/Downloads'
+p9
+sS'paused'
+p10
+I00
+sS'max_upload_slots'
+p11
+I-1
+sS'prioritize_first_last'
+p12
+I00
+sS'max_connections'
+p13
+I-1
+sS'compact'
+p14
+I00
+sS'queue'
+p15
+I0
+sS'file_priorities'
+p16
+(lp17
+I4
+asS'filename'
+p18
+S'\xc2\xa2.torrent'
+p19
+sS'max_upload_speed'
+p20
+I-1
+sS'save_path'
+p21
+S'/home/calum/Downloads'
+p22
+sS'time_added'
+p23
+F1573563097.749759
+sS'total_uploaded'
+p24
+I0
+sS'torrent_id'
+p25
+S'80d81d55ef3b85f3c1b634c362e014b35594dc71'
+p26
+sS'auto_managed'
+p27
+I01
+sS'stop_at_ratio'
+p28
+I00
+sS'move_completed'
+p29
+I00
+sS'trackers'
+p30
+(lp31
+sS'magnet'
+p32
+NsS'remove_at_ratio'
+p33
+I00
+sS'stop_ratio'
+p34
+F2
+sS'is_finished'
+p35
+I00
+sbasb.
diff --git a/deluge/tests/data/v2_hybrid.torrent b/deluge/tests/data/v2_hybrid.torrent
new file mode 100644
index 0000000..e58057c
--- /dev/null
+++ b/deluge/tests/data/v2_hybrid.torrent
Binary files differ
diff --git a/deluge/tests/data/v2_test.torrent b/deluge/tests/data/v2_test.torrent
new file mode 100644
index 0000000..fe6cbd0
--- /dev/null
+++ b/deluge/tests/data/v2_test.torrent
Binary files differ
diff --git a/deluge/tests/test_alertmanager.py b/deluge/tests/test_alertmanager.py
new file mode 100644
index 0000000..2d18f4b
--- /dev/null
+++ b/deluge/tests/test_alertmanager.py
@@ -0,0 +1,102 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+from dataclasses import dataclass
+
+import pytest
+
+from deluge.core.core import Core
+
+
+class LtSessionMock:
+ def __init__(self):
+ self.alerts = []
+
+ def push_alerts(self, alerts):
+ self.alerts = alerts
+
+ def wait_for_alert(self, timeout):
+ return self.alerts[0] if len(self.alerts) > 0 else None
+
+ def pop_alerts(self):
+ alerts = self.alerts
+ self.alerts = []
+ return alerts
+
+
+@dataclass
+class LtAlertMock:
+ type: int
+ name: str
+ message: str
+
+ def message(self):
+ return self.message
+
+ def what(self):
+ return self.name
+
+
+@pytest.fixture
+def mock_alert1():
+ return LtAlertMock(type=1, name='mock_alert1', message='Alert 1')
+
+
+@pytest.fixture
+def mock_alert2():
+ return LtAlertMock(type=2, name='mock_alert2', message='Alert 2')
+
+
+class TestAlertManager:
+ @pytest.fixture(autouse=True)
+ def set_up(self, component):
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.am = component.get('AlertManager')
+ self.am.session = LtSessionMock()
+
+ component.start(['AlertManager'])
+
+ def test_register_handler(self):
+ def handler(alert):
+ ...
+
+ self.am.register_handler('dummy1', handler)
+ self.am.register_handler('dummy2_alert', handler)
+ assert self.am.handlers['dummy1'] == [handler]
+ assert self.am.handlers['dummy2'] == [handler]
+
+ async def test_pop_alert(self, mock_callback, mock_alert1, mock_alert2):
+ self.am.register_handler('mock_alert1', mock_callback)
+
+ self.am.session.push_alerts([mock_alert1, mock_alert2])
+
+ await mock_callback.deferred
+
+ mock_callback.assert_called_once_with(mock_alert1)
+
+ async def test_pause_not_pop_alert(
+ self, component, mock_alert1, mock_alert2, mock_callback
+ ):
+ await component.pause(['AlertManager'])
+
+ self.am.register_handler('mock_alert1', mock_callback)
+ self.am.session.push_alerts([mock_alert1, mock_alert2])
+
+ await mock_callback.deferred
+
+ mock_callback.assert_not_called()
+ assert not self.am._event.is_set()
+ assert len(self.am.session.alerts) == 2
+
+ def test_deregister_handler(self):
+ def handler(alert):
+ ...
+
+ self.am.register_handler('dummy1', handler)
+ self.am.register_handler('dummy2_alert', handler)
+ self.am.deregister_handler(handler)
+ assert self.am.handlers['dummy1'] == []
+ assert self.am.handlers['dummy2'] == []
diff --git a/deluge/tests/test_authmanager.py b/deluge/tests/test_authmanager.py
new file mode 100644
index 0000000..aa86fdb
--- /dev/null
+++ b/deluge/tests/test_authmanager.py
@@ -0,0 +1,23 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import deluge.component as component
+from deluge.common import get_localhost_auth
+from deluge.conftest import BaseTestCase
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AuthManager
+
+
+class TestAuthManager(BaseTestCase):
+ def set_up(self):
+ self.auth = AuthManager()
+ self.auth.start()
+
+ def tear_down(self):
+ # We must ensure that the components in component registry are removed
+ return component.shutdown()
+
+ def test_authorize(self):
+ assert self.auth.authorize(*get_localhost_auth()) == AUTH_LEVEL_ADMIN
diff --git a/deluge/tests/test_bencode.py b/deluge/tests/test_bencode.py
new file mode 100644
index 0000000..a4a7681
--- /dev/null
+++ b/deluge/tests/test_bencode.py
@@ -0,0 +1,32 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import pytest
+
+from deluge import bencode
+
+from . import common
+
+
+class TestBencode:
+ def test_bencode_unicode_metainfo(self):
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ metainfo = bencode.bdecode(_file.read())[b'info']
+ bencode.bencode({b'info': metainfo})
+
+ def test_bencode_unicode_value(self):
+ assert bencode.bencode(b'abc') == b'3:abc'
+ assert bencode.bencode('abc') == b'3:abc'
+
+ def test_bdecode(self):
+ assert bencode.bdecode(b'3:dEf') == b'dEf'
+ with pytest.raises(bencode.BTFailure):
+ bencode.bdecode('dEf')
+ with pytest.raises(bencode.BTFailure):
+ bencode.bdecode(b'dEf')
+ with pytest.raises(bencode.BTFailure):
+ bencode.bdecode({'dEf': 123})
diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py
new file mode 100644
index 0000000..763d43c
--- /dev/null
+++ b/deluge/tests/test_client.py
@@ -0,0 +1,192 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import pytest
+import pytest_twisted
+from twisted.internet import defer
+
+from deluge import error
+from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth, get_version
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN
+from deluge.ui.client import Client, DaemonSSLProxy, client
+
+
+class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
+ def authenticate(self, username, password):
+ self.login_deferred = defer.Deferred()
+ d = self.call('daemon.login', username, password)
+ d.addCallback(self.__on_login, username)
+ d.addErrback(self.__on_login_fail)
+ return self.login_deferred
+
+ def __on_login(self, result, username):
+ self.login_deferred.callback(result)
+
+ def __on_login_fail(self, result):
+ self.login_deferred.errback(result)
+
+
+class NoVersionSendingClient(Client):
+ def connect(
+ self,
+ host='127.0.0.1',
+ port=58846,
+ username='',
+ password='',
+ skip_authentication=False,
+ ):
+ self._daemon_proxy = NoVersionSendingDaemonSSLProxy()
+ self._daemon_proxy.set_disconnect_callback(self.__on_disconnect)
+
+ d = self._daemon_proxy.connect(host, port)
+
+ def on_connect_fail(reason):
+ self.disconnect()
+ return reason
+
+ def on_authenticate(result, daemon_info):
+ return result
+
+ def on_authenticate_fail(reason):
+ return reason
+
+ def on_connected(daemon_version):
+ return daemon_version
+
+ def authenticate(daemon_version, username, password):
+ d = self._daemon_proxy.authenticate(username, password)
+ d.addCallback(on_authenticate, daemon_version)
+ d.addErrback(on_authenticate_fail)
+ return d
+
+ d.addCallback(on_connected)
+ d.addErrback(on_connect_fail)
+ if not skip_authentication:
+ d.addCallback(authenticate, username, password)
+ return d
+
+ def __on_disconnect(self):
+ if self.disconnect_callback:
+ self.disconnect_callback()
+
+
+@pytest.mark.usefixtures('daemon', 'client')
+class TestClient:
+ def test_connect_no_credentials(self):
+ d = client.connect('localhost', self.listen_port, username='', password='')
+
+ def on_connect(result):
+ assert client.get_auth_level() == AUTH_LEVEL_ADMIN
+ return result
+
+ d.addCallbacks(on_connect, self.fail)
+ return d
+
+ def test_connect_localclient(self):
+ username, password = get_localhost_auth()
+ d = client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ def on_connect(result):
+ assert client.get_auth_level() == AUTH_LEVEL_ADMIN
+ return result
+
+ d.addCallbacks(on_connect, self.fail)
+ return d
+
+ def test_connect_bad_password(self):
+ username, password = get_localhost_auth()
+ d = client.connect(
+ 'localhost', self.listen_port, username=username, password=password + '1'
+ )
+
+ def on_failure(failure):
+ assert failure.trap(error.BadLoginError) == error.BadLoginError
+ assert failure.value.message == 'Password does not match'
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ def test_connect_invalid_user(self):
+ d = client.connect('localhost', self.listen_port, username='invalid-user')
+
+ def on_failure(failure):
+ assert failure.trap(error.BadLoginError) == error.BadLoginError
+ assert failure.value.message == 'Username does not exist'
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ def test_connect_without_password(self):
+ username, password = get_localhost_auth()
+ d = client.connect('localhost', self.listen_port, username=username)
+
+ def on_failure(failure):
+ assert (
+ failure.trap(error.AuthenticationRequired)
+ == error.AuthenticationRequired
+ )
+ assert failure.value.username == username
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ @pytest_twisted.inlineCallbacks
+ def test_connect_with_password(self):
+ username, password = get_localhost_auth()
+ yield client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+ yield client.core.create_account('testuser', 'testpw', 'DEFAULT')
+ yield client.disconnect()
+ ret = yield client.connect(
+ 'localhost', self.listen_port, username='testuser', password='testpw'
+ )
+ assert ret == AUTH_LEVEL_NORMAL
+
+ @pytest_twisted.inlineCallbacks
+ def test_invalid_rpc_method_call(self):
+ yield client.connect('localhost', self.listen_port, username='', password='')
+ d = client.core.invalid_method()
+
+ def on_failure(failure):
+ assert failure.trap(error.WrappedException) == error.WrappedException
+
+ d.addCallbacks(self.fail, on_failure)
+ yield d
+
+ def test_connect_without_sending_client_version_fails(self):
+ username, password = get_localhost_auth()
+ no_version_sending_client = NoVersionSendingClient()
+ d = no_version_sending_client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ def on_failure(failure):
+ assert failure.trap(error.IncompatibleClient) == error.IncompatibleClient
+
+ d.addCallbacks(self.fail, on_failure)
+ return d
+
+ @pytest_twisted.inlineCallbacks
+ def test_daemon_version(self):
+ username, password = get_localhost_auth()
+ yield client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ assert client.daemon_version == get_version()
+
+ @pytest_twisted.inlineCallbacks
+ def test_daemon_version_check_min(self):
+ username, password = get_localhost_auth()
+ yield client.connect(
+ 'localhost', self.listen_port, username=username, password=password
+ )
+
+ assert client.daemon_version_check_min(get_version())
+ assert not client.daemon_version_check_min(f'{get_version()}1')
+ assert client.daemon_version_check_min('0.1.0')
diff --git a/deluge/tests/test_common.py b/deluge/tests/test_common.py
new file mode 100644
index 0000000..a1af6cc
--- /dev/null
+++ b/deluge/tests/test_common.py
@@ -0,0 +1,227 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+import sys
+import tarfile
+from urllib.parse import quote_plus
+
+import pytest
+
+from deluge.common import (
+ VersionSplit,
+ archive_files,
+ fdate,
+ fpcnt,
+ fpeer,
+ fsize,
+ fspeed,
+ ftime,
+ get_magnet_info,
+ get_path_size,
+ is_infohash,
+ is_interface,
+ is_interface_name,
+ is_ip,
+ is_ipv4,
+ is_ipv6,
+ is_magnet,
+ is_url,
+ parse_human_size,
+ windows_check,
+)
+
+from .common import get_test_data_file
+
+
+class TestCommon:
+ def test_fsize(self):
+ assert fsize(0) == '0 B'
+ assert fsize(100) == '100 B'
+ assert fsize(1023) == '1023 B'
+ assert fsize(1024) == '1.0 KiB'
+ assert fsize(1048575) == '1024.0 KiB'
+ assert fsize(1048576) == '1.0 MiB'
+ assert fsize(1073741823) == '1024.0 MiB'
+ assert fsize(1073741824) == '1.0 GiB'
+ assert fsize(112245) == '109.6 KiB'
+ assert fsize(110723441824) == '103.1 GiB'
+ assert fsize(1099511627775) == '1024.0 GiB'
+ assert fsize(1099511627777) == '1.0 TiB'
+ assert fsize(766148267453245) == '696.8 TiB'
+
+ def test_fpcnt(self):
+ assert fpcnt(0.9311) == '93.11%'
+
+ def test_fspeed(self):
+ assert fspeed(43134) == '42.1 KiB/s'
+
+ def test_fpeer(self):
+ assert fpeer(10, 20) == '10 (20)'
+ assert fpeer(10, -1) == '10'
+
+ def test_ftime(self):
+ assert ftime(0) == ''
+ assert ftime(5) == '5s'
+ assert ftime(100) == '1m 40s'
+ assert ftime(3789) == '1h 3m'
+ assert ftime(23011) == '6h 23m'
+ assert ftime(391187) == '4d 12h'
+ assert ftime(604800) == '1w 0d'
+ assert ftime(13893086) == '22w 6d'
+ assert ftime(59740269) == '1y 46w'
+ assert ftime(61.25) == '1m 1s'
+ assert ftime(119.9) == '1m 59s'
+
+ def test_fdate(self):
+ assert fdate(-1) == ''
+
+ def test_is_url(self):
+ assert is_url('http://deluge-torrent.org')
+ assert not is_url('file://test.torrent')
+
+ def test_is_magnet(self):
+ assert is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN')
+ assert not is_magnet(None)
+
+ def test_is_infohash(self):
+ assert is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973')
+
+ def test_get_path_size(self):
+ if windows_check() and sys.version_info < (3, 8):
+ # https://bugs.python.org/issue1311
+ pytest.skip('os.devnull returns False on Windows')
+ assert get_path_size(os.devnull) == 0
+ assert get_path_size('non-existant.file') == -1
+
+ def test_is_ip(self):
+ assert is_ip('192.0.2.0')
+ assert not is_ip('192..0.0')
+ assert is_ip('2001:db8::')
+ assert not is_ip('2001:db8:')
+
+ def test_is_ipv4(self):
+ assert is_ipv4('192.0.2.0')
+ assert not is_ipv4('192..0.0')
+
+ def test_is_ipv6(self):
+ assert is_ipv6('2001:db8::')
+ assert not is_ipv6('2001:db8:')
+
+ def test_is_interface_name(self):
+ if windows_check():
+ assert not is_interface_name('2001:db8:')
+ assert not is_interface_name('{THIS0000-IS00-ONLY-FOR0-TESTING00000}')
+ else:
+ assert is_interface_name('lo')
+ assert not is_interface_name('127.0.0.1')
+ assert not is_interface_name('eth01101')
+
+ def test_is_interface(self):
+ if windows_check():
+ assert is_interface('127.0.0.1')
+ assert not is_interface('127')
+ assert not is_interface('{THIS0000-IS00-ONLY-FOR0-TESTING00000}')
+ else:
+ assert is_interface('lo')
+ assert is_interface('127.0.0.1')
+ assert not is_interface('127.')
+ assert not is_interface('eth01101')
+
+ def test_version_split(self):
+ assert VersionSplit('1.2.2') == VersionSplit('1.2.2')
+ assert VersionSplit('1.2.1') < VersionSplit('1.2.2')
+ assert VersionSplit('1.1.9') < VersionSplit('1.2.2')
+ assert VersionSplit('1.2.2') > VersionSplit('1.2.1')
+ assert VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0')
+ assert VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2')
+ assert VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2')
+ assert VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2')
+ assert VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2')
+ assert VersionSplit('0.14.9') == VersionSplit('0.14.9')
+ assert VersionSplit('0.14.9') > VersionSplit('0.14.5')
+ assert VersionSplit('0.14.10') >= VersionSplit('0.14.9')
+ assert VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123')
+ assert VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2')
+ assert VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123')
+ assert VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123')
+ assert VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0')
+ assert VersionSplit('1.4.0a1') < VersionSplit('1.4.0')
+
+ @pytest.mark.parametrize(
+ ('human_size', 'expected'),
+ [
+ ('1', 1),
+ ('10 bytes', 10),
+ ('2048 bytes', 2048),
+ ('1MiB', 2 ** (10 * 2)),
+ ('1 MiB', 2 ** (10 * 2)),
+ ('1 GiB', 2 ** (10 * 3)),
+ ('1 TiB', 2 ** (10 * 4)),
+ ('1M', 10**6),
+ ('1p', 10**15),
+ ('1MB', 10**6),
+ ('1 GB', 10**9),
+ ('1 TB', 10**12),
+ ],
+ )
+ def test_parse_human_size(self, human_size, expected):
+ parsed = parse_human_size(human_size)
+ assert parsed == expected, 'Mismatch when converting: %s' % human_size
+
+ def test_archive_files(self):
+ arc_filelist = [
+ get_test_data_file('test.torrent'),
+ get_test_data_file('deluge.png'),
+ ]
+ arc_filepath = archive_files('test-arc', arc_filelist)
+
+ with tarfile.open(arc_filepath, 'r') as tar:
+ for tar_info in tar:
+ assert tar_info.isfile()
+ assert tar_info.name in [
+ os.path.basename(arcf) for arcf in arc_filelist
+ ]
+
+ def test_archive_files_missing(self):
+ """Archive exists even with file not found."""
+ filelist = ['test.torrent', 'deluge.png', 'missing.file']
+ arc_filepath = archive_files(
+ 'test-arc', [get_test_data_file(f) for f in filelist]
+ )
+ filelist.remove('missing.file')
+
+ with tarfile.open(arc_filepath, 'r') as tar:
+ assert tar.getnames() == filelist
+ assert all(tarinfo.isfile() for tarinfo in tar)
+
+ def test_archive_files_message(self):
+ filelist = ['test.torrent', 'deluge.png']
+ arc_filepath = archive_files(
+ 'test-arc', [get_test_data_file(f) for f in filelist], message='test'
+ )
+
+ result_files = filelist + ['archive_message.txt']
+ with tarfile.open(arc_filepath, 'r') as tar:
+ assert tar.getnames() == result_files
+ for tar_info in tar:
+ assert tar_info.isfile()
+ if tar_info.name == 'archive_message.txt':
+ result = tar.extractfile(tar_info).read().decode()
+ assert result == 'test'
+
+ def test_get_magnet_info_tiers(self):
+ tracker1 = 'udp://tracker1.example.com'
+ tracker2 = 'udp://tracker2.example.com'
+ magnet = (
+ 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN'
+ f'&tr.1={quote_plus(tracker1)}'
+ f'&tr.2={quote_plus(tracker2)}'
+ )
+ result = get_magnet_info(magnet)
+ assert result['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d'
+ assert result['trackers'][tracker1] == 1
+ assert result['trackers'][tracker2] == 2
diff --git a/deluge/tests/test_component.py b/deluge/tests/test_component.py
new file mode 100644
index 0000000..907d50b
--- /dev/null
+++ b/deluge/tests/test_component.py
@@ -0,0 +1,192 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import time
+from unittest.mock import Mock
+
+import pytest
+import pytest_twisted
+from twisted.internet import defer, threads
+
+import deluge.component as component
+
+
+class ComponentTester(component.Component):
+ def __init__(self, name, depend=None):
+ super().__init__(name, depend=depend)
+ event_methods = ('start', 'update', 'pause', 'resume', 'stop', 'shutdown')
+ for event_method in event_methods:
+ setattr(self, event_method, Mock())
+
+
+class ComponentTesterDelayStart(ComponentTester):
+ def __init__(self, name, depend=None):
+ super().__init__(name, depend=depend)
+ self.start = Mock(side_effect=self.delay)
+
+ @pytest_twisted.inlineCallbacks
+ def delay(self):
+ yield threads.deferToThread(time.sleep, 0.5)
+
+
+@pytest.mark.usefixtures('component')
+class TestComponent:
+ async def test_start_component(self):
+ c = ComponentTester('test_start')
+ await component.start(['test_start'])
+
+ assert c._component_state == 'Started'
+ assert c.start.call_count == 1
+
+ async def test_start_stop_depends(self):
+ c1 = ComponentTester('test_start_depends_c1')
+ c2 = ComponentTester('test_start_depends_c2', depend=['test_start_depends_c1'])
+
+ await component.start('test_start_depends_c2')
+
+ assert c1._component_state == 'Started'
+ assert c2._component_state == 'Started'
+ assert c1.start.call_count == 1
+ assert c2.start.call_count == 1
+
+ await component.stop(['test_start_depends_c1'])
+
+ assert c1._component_state == 'Stopped'
+ assert c2._component_state == 'Stopped'
+ assert c1.stop.call_count == 1
+ assert c2.stop.call_count == 1
+
+ async def start_with_depends(self):
+ c1 = ComponentTesterDelayStart('test_start_all_c1')
+ c2 = ComponentTester('test_start_all_c2', depend=['test_start_all_c4'])
+ c3 = ComponentTesterDelayStart(
+ 'test_start_all_c3', depend=['test_start_all_c5', 'test_start_all_c1']
+ )
+ c4 = ComponentTester('test_start_all_c4', depend=['test_start_all_c3'])
+ c5 = ComponentTester('test_start_all_c5')
+
+ await component.start()
+ return c1, c2, c3, c4, c5
+
+ def finish_start_with_depends(self, *args):
+ for c in args[1:]:
+ component.deregister(c)
+
+ async def test_start_all(self):
+ components = await self.start_with_depends()
+ for c in components:
+ assert c._component_state == 'Started'
+ assert c.start.call_count == 1
+
+ self.finish_start_with_depends(components)
+
+ def test_register_exception(self):
+ ComponentTester('test_register_exception')
+ with pytest.raises(component.ComponentAlreadyRegistered):
+ ComponentTester(
+ 'test_register_exception',
+ )
+
+ async def test_stop(self):
+ c = ComponentTester('test_stop')
+
+ await component.start(['test_stop'])
+
+ assert c._component_state == 'Started'
+
+ await component.stop(['test_stop'])
+
+ assert c._component_state == 'Stopped'
+ assert not c._component_timer.running
+ assert c.stop.call_count == 1
+
+ async def test_stop_all(self):
+ components = await self.start_with_depends()
+ assert all(c._component_state == 'Started' for c in components)
+
+ component.stop()
+ for c in components:
+ assert c._component_state == 'Stopped'
+ assert c.stop.call_count == 1
+
+ self.finish_start_with_depends(components)
+
+ async def test_update(self):
+ c = ComponentTester('test_update')
+ init_update_count = int(c.update.call_count)
+ await component.start(['test_update'])
+
+ assert c._component_timer
+ assert c._component_timer.running
+ assert c.update.call_count != init_update_count
+ await component.stop()
+
+ async def test_pause(self):
+ c = ComponentTester('test_pause')
+ init_update_count = int(c.update.call_count)
+
+ await component.start(['test_pause'])
+
+ assert c._component_timer
+ assert c.update.call_count != init_update_count
+
+ await component.pause(['test_pause'])
+
+ assert c._component_state == 'Paused'
+ assert c.pause.call_count == 1
+ assert c.update.call_count != init_update_count
+ assert not c._component_timer.running
+
+ async def test_resume(self):
+ c = ComponentTester('test_resume')
+
+ await component.start(['test_resume'])
+
+ assert c._component_state == 'Started'
+
+ await component.pause(['test_resume'])
+
+ assert c._component_state == 'Paused'
+
+ await component.resume(['test_resume'])
+
+ assert c._component_state == 'Started'
+ assert c.resume.call_count == 1
+ assert c._component_timer.running
+
+ async def test_component_start_error(self):
+ ComponentTester('test_start_error')
+ await component.start(['test_start_error'])
+ await component.pause(['test_start_error'])
+ test_comp = component.get('test_start_error')
+ with pytest.raises(component.ComponentException, match='Current state: Paused'):
+ await test_comp._component_start()
+
+ async def test_start_paused_error(self):
+ name = 'test_pause_error'
+ ComponentTester(name)
+ await component.start([name])
+ await component.pause([name])
+
+ (failure, error), *_ = await component.start()
+ assert (failure, error.type, error.value.message) == (
+ defer.FAILURE,
+ component.ComponentException,
+ (
+ f'Trying to start component "{name}" but it is '
+ 'not in a stopped state. Current state: Paused'
+ ),
+ )
+
+ async def test_shutdown(self):
+ c = ComponentTester('test_shutdown')
+
+ await component.start(['test_shutdown'])
+ await component.shutdown()
+
+ assert c.shutdown.call_count == 1
+ assert c._component_state == 'Stopped'
+ assert not c._component_timer.running
+ assert c.stop.call_count == 1
diff --git a/deluge/tests/test_config.py b/deluge/tests/test_config.py
new file mode 100644
index 0000000..146a5c9
--- /dev/null
+++ b/deluge/tests/test_config.py
@@ -0,0 +1,274 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import json
+import logging
+import os
+from codecs import getwriter
+
+import pytest
+from twisted.internet import task
+
+from deluge.common import JSON_FORMAT
+from deluge.config import Config
+from deluge.ui.hostlist import mask_hosts_password
+
+DEFAULTS = {
+ 'string': 'foobar',
+ 'int': 1,
+ 'float': 0.435,
+ 'bool': True,
+ 'unicode': 'foobar',
+ 'password': 'abc123*\\[!]?/<>#{@}=|"+$%(^)~',
+ 'hosts': [
+ ('host1', 'port', '', 'password1234'),
+ ('host2', 'port', '', 'password5678'),
+ ],
+}
+
+
+LOGGER = logging.getLogger(__name__)
+
+
+class TestConfig:
+ def test_init(self):
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ assert DEFAULTS == config.config
+
+ config = Config('test.conf', config_dir=self.config_dir)
+ assert {} == config.config
+
+ def test_set_get_item(self):
+ config = Config('test.conf', config_dir=self.config_dir)
+ config['foo'] = 1
+ assert config['foo'] == 1
+ with pytest.raises(ValueError):
+ config.set_item('foo', 'bar')
+
+ config['foo'] = 2
+ assert config.get_item('foo') == 2
+
+ config['foo'] = '3'
+ assert config.get_item('foo') == 3
+
+ config['unicode'] = 'ВИДЕОФИЛЬМЫ'
+ assert config['unicode'] == 'ВИДЕОФИЛЬМЫ'
+
+ config['unicode'] = b'foostring'
+ assert not isinstance(config.get_item('unicode'), bytes)
+
+ config._save_timer.cancel()
+
+ def test_set_get_item_none(self):
+ config = Config('test.conf', config_dir=self.config_dir)
+
+ config['foo'] = None
+ assert config['foo'] is None
+ assert isinstance(config['foo'], type(None))
+
+ config['foo'] = 1
+ assert config.get('foo') == 1
+
+ config['foo'] = None
+ assert config['foo'] is None
+
+ config['bar'] = None
+ assert config['bar'] is None
+
+ config['bar'] = None
+ assert config['bar'] is None
+
+ config._save_timer.cancel()
+
+ async def test_on_changed_callback(self, mock_callback):
+ config = Config('test.conf', config_dir=self.config_dir)
+ config.register_change_callback(mock_callback)
+ config['foo'] = 1
+ assert config['foo'] == 1
+ await mock_callback.deferred
+ mock_callback.assert_called_once_with('foo', 1)
+
+ async def test_key_function_callback(self, mock_callback):
+ config = Config(
+ 'test.conf', defaults={'foo': 1, 'bar': 1}, config_dir=self.config_dir
+ )
+
+ assert config['foo'] == 1
+ config.register_set_function('foo', mock_callback)
+ await mock_callback.deferred
+ mock_callback.assert_called_once_with('foo', 1)
+
+ mock_callback.reset_mock()
+ config.register_set_function('bar', mock_callback, apply_now=False)
+ mock_callback.assert_not_called()
+ config['bar'] = 2
+ await mock_callback.deferred
+ mock_callback.assert_called_once_with('bar', 2)
+
+ def test_get(self):
+ config = Config('test.conf', config_dir=self.config_dir)
+ config['foo'] = 1
+ assert config.get('foo') == 1
+ assert config.get('foobar') is None
+ assert config.get('foobar', 2) == 2
+ config['foobar'] = 5
+ assert config.get('foobar', 2) == 5
+
+ def test_set_log_mask_funcs(self, caplog):
+ """Test mask func masks key in log"""
+ caplog.set_level(logging.DEBUG)
+ config = Config(
+ 'test.conf',
+ config_dir=self.config_dir,
+ log_mask_funcs={'hosts': mask_hosts_password},
+ )
+ config['hosts'] = DEFAULTS['hosts']
+ assert isinstance(config['hosts'], list)
+ assert 'host1' in caplog.text
+ assert 'host2' in caplog.text
+ assert 'password1234' not in caplog.text
+ assert 'password5678' not in caplog.text
+ assert '*' * 10 in caplog.text
+
+ def test_load_log_mask_funcs(self, caplog):
+ """Test mask func masks key in log"""
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ config = Config(
+ 'test.conf',
+ config_dir=self.config_dir,
+ log_mask_funcs={'hosts': mask_hosts_password},
+ )
+ with caplog.at_level(logging.DEBUG):
+ config.load(os.path.join(self.config_dir, 'test.conf'))
+ assert 'host1' in caplog.text
+ assert 'host2' in caplog.text
+ assert 'foobar' in caplog.text
+ assert 'password1234' not in caplog.text
+ assert 'password5678' not in caplog.text
+ assert '*' * 10 in caplog.text
+
+ def test_load(self):
+ def check_config():
+ config = Config('test.conf', config_dir=self.config_dir)
+
+ assert config['string'] == 'foobar'
+ assert config['float'] == 0.435
+ assert config['password'] == 'abc123*\\[!]?/<>#{@}=|"+$%(^)~'
+
+ # Test opening a previous 1.2 config file of just a json object
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ check_config()
+
+ # Test opening a previous 1.2 config file of having the format versions
+ # as ints
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ _file.write(b'1\n')
+ _file.write(b'1\n')
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ check_config()
+
+ # Test the 1.2 config format
+ version = {'format': 1, 'file': 1}
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ json.dump(version, getwriter('utf8')(_file), **JSON_FORMAT)
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ check_config()
+
+ def test_save(self):
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ # We do this twice because the first time we need to save the file to disk
+ # and the second time we do a compare and we should not write
+ ret = config.save()
+ assert ret
+ ret = config.save()
+ assert ret
+
+ config['string'] = 'baz'
+ config['int'] = 2
+ ret = config.save()
+ assert ret
+ del config
+
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ assert config['string'] == 'baz'
+ assert config['int'] == 2
+
+ def test_save_timer(self):
+ clock = task.Clock()
+
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ config.callLater = clock.callLater
+ config['string'] = 'baz'
+ config['int'] = 2
+ assert config._save_timer.active()
+
+ # Timeout set for 5 seconds in config, so lets move clock by 5 seconds
+ clock.advance(5)
+
+ def check_config(config):
+ assert not config._save_timer.active()
+ del config
+ config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ assert config['string'] == 'baz'
+ assert config['int'] == 2
+
+ check_config(config)
+
+ def test_find_json_objects(self):
+ s = """{
+ "file": 1,
+ "format": 1
+}{
+ "ssl": true,
+ "enabled": false,
+ "port": 8115
+}\n"""
+
+ from deluge.config import find_json_objects
+
+ objects = find_json_objects(s)
+ assert len(objects) == 2
+
+ def test_find_json_objects_curly_brace(self):
+ """Test with string containing curly brace"""
+ s = """{
+ "file": 1,
+ "format": 1
+}{
+ "ssl": true,
+ "enabled": false,
+ "port": 8115,
+ "password": "abc{def"
+}"""
+
+ from deluge.config import find_json_objects
+
+ objects = find_json_objects(s)
+ assert len(objects) == 2
+
+ def test_find_json_objects_double_quote(self):
+ """Test with string containing double quote"""
+ s = r"""{
+ "file": 1,
+ "format": 1
+}{
+ "ssl": true,
+ "enabled": false,
+ "port": 8115,
+ "password": "abc\"def"
+}
+"""
+
+ from deluge.config import find_json_objects
+
+ objects = find_json_objects(s)
+ assert len(objects) == 2
diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py
new file mode 100644
index 0000000..28b5902
--- /dev/null
+++ b/deluge/tests/test_core.py
@@ -0,0 +1,511 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import base64
+import os
+from base64 import b64encode
+from hashlib import sha1 as sha
+
+import pytest
+import pytest_twisted
+from twisted.internet import defer, reactor, task
+from twisted.internet.error import CannotListenError
+from twisted.web.http import FORBIDDEN
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory, Site
+from twisted.web.static import File
+
+import deluge.common
+import deluge.component as component
+import deluge.core.torrent
+from deluge._libtorrent import lt
+from deluge.conftest import BaseTestCase
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.error import AddTorrentError, InvalidTorrentError
+
+from . import common
+
+common.disable_new_release_check()
+
+
+class CookieResource(Resource):
+ def render(self, request):
+ if request.getCookie(b'password') != b'deluge':
+ request.setResponseCode(FORBIDDEN)
+ return
+
+ request.setHeader(b'Content-Type', b'application/x-bittorrent')
+ with open(
+ common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
+ ) as _file:
+ data = _file.read()
+ return data
+
+
+class PartialDownload(Resource):
+ def getChild(self, path, request): # NOQA: N802
+ return EncodingResourceWrapper(self, [GzipEncoderFactory()])
+
+ def render(self, request):
+ with open(
+ common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
+ ) as _file:
+ data = _file.read()
+ request.setHeader(b'Content-Length', str(len(data)))
+ request.setHeader(b'Content-Type', b'application/x-bittorrent')
+ return data
+
+
+class RedirectResource(Resource):
+ def render(self, request):
+ request.redirect(b'/ubuntu-9.04-desktop-i386.iso.torrent')
+ return b''
+
+
+class TopLevelResource(Resource):
+ def __init__(self):
+ Resource.__init__(self)
+ self.putChild(b'cookie', CookieResource())
+ self.putChild(b'partial', PartialDownload())
+ self.putChild(b'redirect', RedirectResource())
+ self.putChild(
+ b'ubuntu-9.04-desktop-i386.iso.torrent',
+ File(common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent')),
+ )
+
+
+class TestCore(BaseTestCase):
+ def set_up(self):
+ self.rpcserver = RPCServer(listen=False)
+ self.core: Core = Core()
+ self.core.config.config['lsd'] = False
+ self.clock = task.Clock()
+ self.core.torrentmanager.clock = self.clock
+ self.listen_port = 51242
+ return component.start().addCallback(self.start_web_server)
+
+ def start_web_server(self, result):
+ website = Site(TopLevelResource())
+ for dummy in range(10):
+ try:
+ self.webserver = reactor.listenTCP(self.listen_port, website)
+ except CannotListenError as ex:
+ error = ex
+ self.listen_port += 1
+ else:
+ break
+ else:
+ raise error
+
+ return result
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+ return self.webserver.stopListening()
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def add_torrent(self, filename, paused=False):
+ if not paused:
+ # Patch libtorrent flags starting torrents paused
+ self.patch(
+ deluge.core.torrentmanager,
+ 'LT_DEFAULT_ADD_TORRENT_FLAGS',
+ lt.torrent_flags.auto_managed
+ | lt.torrent_flags.update_subscribe
+ | lt.torrent_flags.apply_ip_filter,
+ )
+ options = {'add_paused': paused, 'auto_managed': False}
+ filepath = common.get_test_data_file(filename)
+ with open(filepath, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ return torrent_id
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_files(self):
+ options = {}
+ filenames = ['test.torrent', 'test_torrent.file.torrent']
+ files_to_add = []
+ for f in filenames:
+ filename = common.get_test_data_file(f)
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ files_to_add.append((filename, filedump, options))
+ errors = yield self.core.add_torrent_files(files_to_add)
+ assert len(errors) == 0
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_files_error_duplicate(self):
+ options = {}
+ filenames = ['test.torrent', 'test.torrent']
+ files_to_add = []
+ for f in filenames:
+ filename = common.get_test_data_file(f)
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ files_to_add.append((filename, filedump, options))
+ errors = yield self.core.add_torrent_files(files_to_add)
+ assert len(errors) == 1
+ assert str(errors[0]).startswith('Torrent already in session')
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_file(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+
+ # Get the info hash from the test.torrent
+ from deluge.bencode import bdecode, bencode
+
+ with open(filename, 'rb') as _file:
+ info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest()
+ assert torrent_id == info_hash
+
+ def test_add_torrent_file_invalid_filedump(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with pytest.raises(AddTorrentError):
+ self.core.add_torrent_file(filename, False, options)
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_url(self, mock_mkstemp):
+ url = (
+ 'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent'
+ % self.listen_port
+ )
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ torrent_id = yield self.core.add_torrent_url(url, options)
+ assert torrent_id == info_hash
+ assert not os.path.isfile(mock_mkstemp[1])
+
+ async def test_add_torrent_url_with_cookie(self):
+ url = 'http://localhost:%d/cookie' % self.listen_port
+ options = {}
+ headers = {'Cookie': 'password=deluge'}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ with pytest.raises(Exception):
+ await self.core.add_torrent_url(url, options)
+
+ result = await self.core.add_torrent_url(url, options, headers)
+ assert result == info_hash
+
+ async def test_add_torrent_url_with_redirect(self):
+ url = 'http://localhost:%d/redirect' % self.listen_port
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ result = await self.core.add_torrent_url(url, options)
+ assert result == info_hash
+
+ async def test_add_torrent_url_with_partial_download(self):
+ url = 'http://localhost:%d/partial' % self.listen_port
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ result = await self.core.add_torrent_url(url, options)
+ assert result == info_hash
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_magnet(self):
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+ tracker = 'udp://tracker.example.com'
+ name = 'test magnet'
+ uri = deluge.common.create_magnet_uri(info_hash, name=name, trackers=[tracker])
+ options = {}
+ torrent_id = yield self.core.add_torrent_magnet(uri, options)
+ assert torrent_id == info_hash
+ torrent_status = self.core.get_torrent_status(torrent_id, ['name', 'trackers'])
+ assert torrent_status['trackers'][0]['url'] == tracker
+ assert torrent_status['name'] == name
+
+ def test_resume_torrent(self):
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ # Assert paused
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ self.core.resume_torrent(tid2)
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ def test_resume_torrent_list(self):
+ """Backward compatibility for list of torrent_ids."""
+ torrent_id = self.add_torrent('test.torrent', paused=True)
+ self.core.resume_torrent([torrent_id])
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ assert not result['paused']
+
+ def test_resume_torrents(self):
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ self.core.resume_torrents([tid1, tid2])
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ def test_resume_torrents_all(self):
+ """With no torrent_ids param, resume all torrents"""
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ self.core.resume_torrents()
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ def test_pause_torrent(self):
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+ # Assert not paused
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ self.core.pause_torrent(tid2)
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ def test_pause_torrent_list(self):
+ """Backward compatibility for list of torrent_ids."""
+ torrent_id = self.add_torrent('test.torrent')
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ assert not result['paused']
+ self.core.pause_torrent([torrent_id])
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ assert result['paused']
+
+ def test_pause_torrents(self):
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+
+ self.core.pause_torrents([tid1, tid2])
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ def test_pause_torrents_all(self):
+ """With no torrent_ids param, pause all torrents"""
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+
+ self.core.pause_torrents()
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ @pytest_twisted.inlineCallbacks
+ def test_prefetch_metadata_existing(self):
+ """Check another call with same magnet returns existing deferred."""
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
+
+ d1 = self.core.prefetch_magnet_metadata(magnet)
+ d2 = self.core.prefetch_magnet_metadata(magnet)
+ dg = defer.gatherResults([d1, d2], consumeErrors=True)
+ self.clock.advance(30)
+ result = yield dg
+ assert result == [expected] * 2
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrent(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+ removed = self.core.remove_torrent(torrent_id, True)
+ assert removed
+ assert len(self.core.get_session_state()) == 0
+
+ def test_remove_torrent_invalid(self):
+ with pytest.raises(InvalidTorrentError):
+ self.core.remove_torrent(
+ 'torrentidthatdoesntexist',
+ True,
+ )
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrents(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+
+ filename2 = common.get_test_data_file('unicode_filenames.torrent')
+ with open(filename2, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id2 = yield self.core.add_torrent_file_async(
+ filename2, filedump, options
+ )
+ d = self.core.remove_torrents([torrent_id, torrent_id2], True)
+
+ def test_ret(val):
+ assert val == []
+
+ d.addCallback(test_ret)
+
+ def test_session_state(val):
+ assert len(self.core.get_session_state()) == 0
+
+ d.addCallback(test_session_state)
+ yield d
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrents_invalid(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(
+ filename, filedump, options
+ )
+ val = yield self.core.remove_torrents(
+ ['invalidid1', 'invalidid2', torrent_id], False
+ )
+ assert len(val) == 2
+ assert val[0] == ('invalidid1', 'torrent_id invalidid1 not in session.')
+ assert val[1] == ('invalidid2', 'torrent_id invalidid2 not in session.')
+
+ def test_get_session_status(self):
+ status = self.core.get_session_status(
+ ['net.recv_tracker_bytes', 'net.sent_tracker_bytes']
+ )
+ assert isinstance(status, dict)
+ assert status['net.recv_tracker_bytes'] == 0
+ assert status['net.sent_tracker_bytes'] == 0
+
+ def test_get_session_status_all(self):
+ status = self.core.get_session_status([])
+ assert isinstance(status, dict)
+ assert 'upload_rate' in status
+ assert 'net.recv_bytes' in status
+
+ def test_get_session_status_depr(self):
+ status = self.core.get_session_status(['num_peers', 'num_unchoked'])
+ assert isinstance(status, dict)
+ assert status['num_peers'] == 0
+ assert status['num_unchoked'] == 0
+
+ def test_get_session_status_rates(self):
+ status = self.core.get_session_status(['upload_rate', 'download_rate'])
+ assert isinstance(status, dict)
+ assert status['upload_rate'] == 0
+
+ def test_get_session_status_ratio(self):
+ status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio'])
+ assert isinstance(status, dict)
+ assert status['write_hit_ratio'] == 0.0
+ assert status['read_hit_ratio'] == 0.0
+
+ def test_get_free_space(self):
+ space = self.core.get_free_space('.')
+ assert isinstance(space, int)
+ assert space >= 0
+ assert self.core.get_free_space('/someinvalidpath') == -1
+
+ @pytest.mark.slow
+ def test_test_listen_port(self):
+ d = self.core.test_listen_port()
+
+ def result(r):
+ assert r in (True, False)
+
+ d.addCallback(result)
+ return d
+
+ def test_sanitize_filepath(self):
+ pathlist = {
+ '\\backslash\\path\\': 'backslash/path',
+ ' single_file ': 'single_file',
+ '..': '',
+ '/../..../': '',
+ ' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file',
+ '/ test /\\.. /.file/': 'test/.file',
+ 'mytorrent/subfold/file1': 'mytorrent/subfold/file1',
+ 'Torrent/folder/': 'Torrent/folder',
+ }
+
+ for key in pathlist:
+ assert (
+ deluge.core.torrent.sanitize_filepath(key, folder=False)
+ == pathlist[key]
+ )
+
+ assert (
+ deluge.core.torrent.sanitize_filepath(key, folder=True)
+ == pathlist[key] + '/'
+ )
+
+ def test_get_set_config_values(self):
+ assert self.core.get_config_values(['abc', 'foo']) == {'foo': None, 'abc': None}
+ assert self.core.get_config_value('foobar') is None
+ self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'})
+ assert self.core.get_config_values(['foo', 'abc']) == {'foo': 10, 'abc': 'def'}
+ assert self.core.get_config_value('foobar') == 'barfoo'
+
+ def test_read_only_config_keys(self):
+ key = 'max_upload_speed'
+ self.core.read_only_config_keys = [key]
+
+ old_value = self.core.get_config_value(key)
+ self.core.set_config({key: old_value + 10})
+ new_value = self.core.get_config_value(key)
+ assert old_value == new_value
+
+ self.core.read_only_config_keys = None
+
+ def test__create_peer_id(self):
+ assert self.core._create_peer_id('2.0.0') == '-DE200s-'
+ assert self.core._create_peer_id('2.0.0.dev15') == '-DE200D-'
+ assert self.core._create_peer_id('2.0.1rc1') == '-DE201r-'
+ assert self.core._create_peer_id('2.11.0b2') == '-DE2B0b-'
+ assert self.core._create_peer_id('2.4.12b2.dev3') == '-DE24CD-'
+
+ @pytest.mark.parametrize(
+ 'path',
+ [
+ common.get_test_data_file('deluge.png'),
+ os.path.dirname(common.get_test_data_file('deluge.png')),
+ ],
+ )
+ @pytest.mark.parametrize('piece_length', [2**14, 2**16])
+ @pytest_twisted.inlineCallbacks
+ def test_create_torrent(self, path, tmp_path, piece_length):
+ target = tmp_path / 'test.torrent'
+
+ filename, filedump = yield self.core.create_torrent(
+ path=path,
+ tracker=None,
+ piece_length=piece_length,
+ target=target,
+ add_to_session=False,
+ )
+ filecontent = base64.b64decode(filedump)
+
+ with open(target, 'rb') as f:
+ assert f.read() == filecontent
+
+ lt.torrent_info(filecontent)
diff --git a/deluge/tests/test_decorators.py b/deluge/tests/test_decorators.py
new file mode 100644
index 0000000..d2ecd1a
--- /dev/null
+++ b/deluge/tests/test_decorators.py
@@ -0,0 +1,48 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+
+from deluge.decorators import proxy
+
+
+class TestDecorators:
+ def test_proxy_with_simple_functions(self):
+ def negate(func, *args, **kwargs):
+ return not func(*args, **kwargs)
+
+ @proxy(negate)
+ def something(_bool):
+ return _bool
+
+ @proxy(negate)
+ @proxy(negate)
+ def double_nothing(_bool):
+ return _bool
+
+ assert something(False)
+ assert not something(True)
+ assert double_nothing(True)
+ assert not double_nothing(False)
+
+ def test_proxy_with_class_method(self):
+ def negate(func, *args, **kwargs):
+ return -func(*args, **kwargs)
+
+ class Test:
+ def __init__(self, number):
+ self.number = number
+
+ @proxy(negate)
+ def diff(self, number):
+ return self.number - number
+
+ @proxy(negate)
+ def no_diff(self, number):
+ return self.diff(number)
+
+ t = Test(5)
+ assert t.diff(1) == -4
+ assert t.no_diff(1) == 4
diff --git a/deluge/tests/test_error.py b/deluge/tests/test_error.py
new file mode 100644
index 0000000..a87d6a2
--- /dev/null
+++ b/deluge/tests/test_error.py
@@ -0,0 +1,39 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import deluge.error
+
+
+class TestError:
+ def test_deluge_error(self):
+ msg = 'Some message'
+ e = deluge.error.DelugeError(msg)
+ assert str(e) == msg
+ from twisted.internet.defer import DebugInfo
+
+ del DebugInfo.__del__ # Hides all errors
+ assert e._args == (msg,)
+ assert e._kwargs == {}
+
+ def test_incompatible_client(self):
+ version = '1.3.6'
+ e = deluge.error.IncompatibleClient(version)
+ assert (
+ str(e) == 'Your deluge client is not compatible with the daemon. '
+ 'Please upgrade your client to %s' % version
+ )
+
+ def test_not_authorized_error(self):
+ current_level = 5
+ required_level = 10
+ e = deluge.error.NotAuthorizedError(current_level, required_level)
+ assert str(e) == 'Auth level too low: %d < %d' % (current_level, required_level)
+
+ def test_bad_login_error(self):
+ message = 'Login failed'
+ username = 'deluge'
+ e = deluge.error.BadLoginError(message, username)
+ assert str(e) == message
diff --git a/deluge/tests/test_files_tab.py b/deluge/tests/test_files_tab.py
new file mode 100644
index 0000000..1e97cbb
--- /dev/null
+++ b/deluge/tests/test_files_tab.py
@@ -0,0 +1,163 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import pytest
+
+import deluge.component as component
+from deluge.configmanager import ConfigManager
+from deluge.conftest import BaseTestCase
+from deluge.i18n import setup_translation
+
+libs_available = True
+# Allow running other tests without GTKUI dependencies available
+try:
+ from deluge.ui.gtk3.files_tab import FilesTab
+ from deluge.ui.gtk3.gtkui import DEFAULT_PREFS
+ from deluge.ui.gtk3.mainwindow import MainWindow
+except (ImportError, ValueError):
+ # gi.require_version gives ValueError if library not available
+ libs_available = False
+
+setup_translation()
+
+
+@pytest.mark.gtkui
+class TestFilesTab(BaseTestCase):
+ def set_up(self):
+ if libs_available is False:
+ pytest.skip('GTKUI dependencies not available')
+
+ ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS)
+ self.mainwindow = MainWindow()
+ self.filestab = FilesTab()
+ self.t_id = '1'
+ self.filestab.torrent_id = self.t_id
+ self.index = 1
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def print_treestore(self, title, treestore):
+ root = treestore.get_iter_first()
+ level = 1
+
+ def p_level(s, lvl):
+ print('{}{}'.format(' ' * lvl, s))
+
+ def _print_treestore_children(i, lvl):
+ while i:
+ p_level(treestore[i][0], lvl)
+ if treestore.iter_children(i):
+ _print_treestore_children(treestore.iter_children(i), lvl + 2)
+ i = treestore.iter_next(i)
+
+ print('\n%s' % title)
+ _print_treestore_children(root, level)
+ print('')
+
+ def verify_treestore(self, treestore, tree):
+ def _verify_treestore(itr, tree_values):
+ i = 0
+ while itr:
+ values = tree_values[i]
+ if treestore[itr][0] != values[0]:
+ return False
+ if treestore.iter_children(itr):
+ if not _verify_treestore(treestore.iter_children(itr), values[1]):
+ return False
+ itr = treestore.iter_next(itr)
+ i += 1
+ return True
+
+ return _verify_treestore(treestore.get_iter_first(), tree)
+
+ def test_files_tab(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '2/test_100.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore,
+ [['1/', [['test_10.txt']]], ['2/', [['test_100.txt']]]],
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ assert ret
+
+ def test_files_tab2(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/1/test_101.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore,
+ [['1/', [['1/', [['test_100.txt'], ['test_101.txt']]]]]],
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ assert ret
+
+ def test_files_tab3(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/test_101.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]]
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ assert ret
+
+ def test_files_tab4(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': '1/test_101.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/2/test_101.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore,
+ [['1/', [['2/', [['test_101.txt']]], ['test_100.txt']]]],
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ assert ret
+
+ def test_files_tab5(self):
+ self.filestab.files_list[self.t_id] = (
+ {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': '2/test_101.txt', 'offset': 13, 'size': 14},
+ )
+ self.filestab.update_files()
+ self.filestab._on_torrentfilerenamed_event(
+ self.t_id, self.index, '1/test_101.txt'
+ )
+
+ ret = self.verify_treestore(
+ self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]]
+ )
+ if not ret:
+ self.print_treestore('Treestore not expected:', self.filestab.treestore)
+ assert ret
diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py
new file mode 100644
index 0000000..1c27045
--- /dev/null
+++ b/deluge/tests/test_httpdownloader.py
@@ -0,0 +1,275 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+import tempfile
+from email.utils import formatdate
+
+import pytest
+import pytest_twisted
+from twisted.internet import reactor
+from twisted.internet.error import CannotListenError
+from twisted.web.error import Error, PageRedirect
+from twisted.web.http import NOT_MODIFIED
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory, Site
+from twisted.web.util import redirectTo
+
+from deluge.httpdownloader import download_file
+from deluge.log import setup_logger
+
+temp_dir = tempfile.mkdtemp()
+
+
+def fname(name):
+ return os.path.join(temp_dir, name)
+
+
+class RedirectResource(Resource):
+ def render(self, request):
+ url = self.get_url().encode('utf8')
+ return redirectTo(url, request)
+
+
+class RenameResource(Resource):
+ def render(self, request):
+ filename = request.args.get(b'filename', [b'renamed_file'])[0]
+ request.setHeader(b'Content-Type', b'text/plain')
+ request.setHeader(b'Content-Disposition', b'attachment; filename=' + filename)
+ return b'This file should be called ' + filename
+
+
+class AttachmentResource(Resource):
+ def render(self, request):
+ content_type = b'text/plain'
+ charset = request.getHeader(b'content-charset')
+ if charset:
+ content_type += b'; charset=' + charset
+ request.setHeader(b'Content-Type', content_type)
+ request.setHeader(b'Content-Disposition', b'attachment')
+ append = request.getHeader(b'content-append') or b''
+ content = 'Attachment with no filename set{}'.format(append.decode('utf8'))
+ return (
+ content.encode(charset.decode('utf8'))
+ if charset
+ else content.encode('utf8')
+ )
+
+
+class TorrentResource(Resource):
+ def render(self, request):
+ content_type = b'application/x-bittorrent'
+ charset = request.getHeader(b'content-charset')
+ if charset:
+ content_type += b'; charset=' + charset
+ request.setHeader(b'Content-Type', content_type)
+ request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent')
+ return 'Binary attachment ignore charset 世丕且\n'.encode()
+
+
+class CookieResource(Resource):
+ def render(self, request):
+ request.setHeader(b'Content-Type', b'text/plain')
+ if request.getCookie(b'password') is None:
+ return b'Password cookie not set!'
+
+ if request.getCookie(b'password') == b'deluge':
+ return b'COOKIE MONSTER!'
+
+ return request.getCookie('password')
+
+
+class GzipResource(Resource):
+ def getChild(self, path, request): # NOQA: N802
+ return EncodingResourceWrapper(self, [GzipEncoderFactory()])
+
+ def render(self, request):
+ message = request.args.get(b'msg', [b'EFFICIENCY!'])[0]
+ request.setHeader(b'Content-Type', b'text/plain')
+ return message
+
+
+class PartialDownloadResource(Resource):
+ def __init__(self, *args, **kwargs):
+ Resource.__init__(self)
+ self.render_count = 0
+
+ def render(self, request):
+ # encoding = request.requestHeaders._rawHeaders.get('accept-encoding', None)
+ if self.render_count == 0:
+ request.setHeader(b'content-length', b'5')
+ else:
+ request.setHeader(b'content-length', b'3')
+
+ # if encoding == "deflate, gzip, x-gzip":
+ request.write('abc')
+ self.render_count += 1
+ return ''
+
+
+class TopLevelResource(Resource):
+ def __init__(self):
+ Resource.__init__(self)
+ self.putChild(b'cookie', CookieResource())
+ self.putChild(b'gzip', GzipResource())
+ self.redirect_rsrc = RedirectResource()
+ self.putChild(b'redirect', self.redirect_rsrc)
+ self.putChild(b'rename', RenameResource())
+ self.putChild(b'attachment', AttachmentResource())
+ self.putChild(b'torrent', TorrentResource())
+ self.putChild(b'partial', PartialDownloadResource())
+
+ def getChild(self, path, request): # NOQA: N802
+ if not path:
+ return self
+ else:
+ return Resource.getChild(self, path, request)
+
+ def render(self, request):
+ if request.getHeader(b'If-Modified-Since'):
+ request.setResponseCode(NOT_MODIFIED)
+ return b'<h1>Deluge HTTP Downloader tests webserver here</h1>'
+
+
+class TestDownloadFile:
+ def get_url(self, path=''):
+ return 'http://localhost:%d/%s' % (self.listen_port, path)
+
+ @pytest_twisted.async_yield_fixture(autouse=True)
+ async def setUp(self, request): # NOQA
+ self = request.instance
+ setup_logger('warning', fname('log_file'))
+ self.website = Site(TopLevelResource())
+ self.listen_port = 51242
+ self.website.resource.redirect_rsrc.get_url = self.get_url
+ for dummy in range(10):
+ try:
+ self.webserver = reactor.listenTCP(self.listen_port, self.website)
+ except CannotListenError as ex:
+ error = ex
+ self.listen_port += 1
+ else:
+ break
+ else:
+ raise error
+
+ yield
+
+ await self.webserver.stopListening()
+
+ def assert_contains(self, filename, contents):
+ with open(filename, encoding='utf8') as _file:
+ try:
+ assert _file.read() == contents
+ except Exception as ex:
+ pytest.fail(ex)
+ return filename
+
+ def assert_not_contains(self, filename, contents, file_mode=''):
+ with open(filename, encoding='utf8') as _file:
+ try:
+ assert _file.read() != contents
+ except Exception as ex:
+ pytest.fail(ex)
+ return filename
+
+ async def test_download(self):
+ filename = await download_file(self.get_url(), fname('index.html'))
+ assert filename == fname('index.html')
+
+ async def test_download_without_required_cookies(self):
+ url = self.get_url('cookie')
+ filename = await download_file(url, fname('none'))
+ self.assert_contains(filename, 'Password cookie not set!')
+
+ async def test_download_with_required_cookies(self):
+ url = self.get_url('cookie')
+ cookie = {'cookie': 'password=deluge'}
+ filename = await download_file(url, fname('monster'), headers=cookie)
+ assert filename == fname('monster')
+ self.assert_contains(filename, 'COOKIE MONSTER!')
+
+ async def test_download_with_rename(self):
+ url = self.get_url('rename?filename=renamed')
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('renamed')
+ self.assert_contains(filename, 'This file should be called renamed')
+
+ async def test_download_with_rename_exists(self):
+ open(fname('renamed'), 'w').close()
+ url = self.get_url('rename?filename=renamed')
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('renamed-1')
+ self.assert_contains(filename, 'This file should be called renamed')
+
+ async def test_download_with_rename_sanitised(self):
+ url = self.get_url('rename?filename=/etc/passwd')
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('passwd')
+ self.assert_contains(filename, 'This file should be called /etc/passwd')
+
+ async def test_download_with_attachment_no_filename(self):
+ url = self.get_url('attachment')
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('original')
+ self.assert_contains(filename, 'Attachment with no filename set')
+
+ async def test_download_with_rename_prevented(self):
+ url = self.get_url('rename?filename=spam')
+ filename = await download_file(url, fname('forced'), force_filename=True)
+ assert filename == fname('forced')
+ self.assert_contains(filename, 'This file should be called spam')
+
+ async def test_download_with_gzip_encoding(self):
+ url = self.get_url('gzip?msg=success')
+ filename = await download_file(url, fname('gzip_encoded'))
+ self.assert_contains(filename, 'success')
+
+ async def test_download_with_gzip_encoding_disabled(self):
+ url = self.get_url('gzip?msg=unzip')
+ filename = await download_file(
+ url, fname('gzip_encoded'), allow_compression=False
+ )
+ self.assert_contains(filename, 'unzip')
+
+ async def test_page_redirect_unhandled(self):
+ url = self.get_url('redirect')
+ with pytest.raises(PageRedirect):
+ await download_file(url, fname('none'), handle_redirects=False)
+
+ async def test_page_redirect(self):
+ url = self.get_url('redirect')
+ filename = await download_file(url, fname('none'), handle_redirects=True)
+ assert filename == fname('none')
+
+ async def test_page_not_found(self):
+ with pytest.raises(Error):
+ await download_file(self.get_url('page/not/found'), fname('none'))
+
+ @pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.")
+ async def test_page_not_modified(self):
+ headers = {'If-Modified-Since': formatdate(usegmt=True)}
+ with pytest.raises(Error) as exc_info:
+ await download_file(self.get_url(), fname('index.html'), headers=headers)
+ assert exc_info.value.status == NOT_MODIFIED
+
+ async def test_download_text_reencode_charset(self):
+ """Re-encode as UTF-8 specified charset for text content-type header"""
+ url = self.get_url('attachment')
+ filepath = fname('test.txt')
+ headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'}
+ filename = await download_file(url, filepath, headers=headers)
+ assert filename == filepath
+ self.assert_contains(filename, 'Attachment with no filename setбвгде')
+
+ async def test_download_binary_ignore_charset(self):
+ """Ignore charset for binary content-type header e.g. torrent files"""
+ url = self.get_url('torrent')
+ headers = {'content-charset': 'Windows-1251'}
+ filepath = fname('test.torrent')
+ filename = await download_file(url, fname('test.torrent'), headers=headers)
+ assert filename == filepath
+ self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n')
diff --git a/deluge/tests/test_json_api.py b/deluge/tests/test_json_api.py
new file mode 100644
index 0000000..ef21e94
--- /dev/null
+++ b/deluge/tests/test_json_api.py
@@ -0,0 +1,267 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import json as json_lib
+from unittest.mock import MagicMock
+
+import pytest
+import pytest_twisted
+from twisted.internet.defer import Deferred
+from twisted.web import server
+from twisted.web.http import Request
+
+import deluge.common
+import deluge.ui.web.auth
+import deluge.ui.web.json_api
+from deluge.error import DelugeError
+from deluge.ui.web.auth import Auth
+from deluge.ui.web.json_api import JSON, JSONException
+
+from . import common
+from .common_web import WebServerMockBase
+
+common.disable_new_release_check()
+
+
+@pytest.mark.usefixtures('daemon', 'client', 'component')
+class TestJSON:
+ async def test_get_remote_methods(self):
+ json = JSON()
+ methods = await json.get_remote_methods()
+ assert type(methods) == tuple
+ assert len(methods) > 0
+
+ def test_render_fail_disconnected(self):
+ json = JSON()
+ request = MagicMock()
+ request.method = b'POST'
+ request._disconnected = True
+ # When disconnected, returns empty string
+ assert json.render(request) == ''
+
+ def test_render_fail(self):
+ json = JSON()
+ request = MagicMock()
+ request.method = b'POST'
+
+ def write(response_str):
+ request.write_was_called = True
+ response = json_lib.loads(response_str.decode())
+ assert response['result'] is None
+ assert response['id'] is None
+ assert response['error']['message'] == 'JSONException: JSON not decodable'
+ assert response['error']['code'] == 5
+
+ request.write = write
+ request.write_was_called = False
+ request._disconnected = False
+ request.getHeader.return_value = b'application/json'
+ assert json.render(request) == server.NOT_DONE_YET
+ assert request.write_was_called
+
+ def test_handle_request_invalid_method(self):
+ json = JSON()
+ request = MagicMock()
+ json_data = {'method': 'no-existing-module.test', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ request_id, result, error = json._handle_request(request)
+ assert error == {'message': 'Unknown method', 'code': 2}
+
+ def test_handle_request_invalid_json_request(self):
+ json = JSON()
+ request = MagicMock()
+ json_data = {'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ with pytest.raises(JSONException):
+ json._handle_request(request)
+ json_data = {'method': 'some.method', 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ with pytest.raises(JSONException):
+ json._handle_request(request)
+ json_data = {'method': 'some.method', 'id': 0}
+ request.json = json_lib.dumps(json_data).encode()
+ with pytest.raises(JSONException):
+ json._handle_request(request)
+
+ def test_on_json_request_invalid_content_type(self):
+ """Test for exception with content type not application/json"""
+ json = JSON()
+ request = MagicMock()
+ request.getHeader.return_value = b'text/plain'
+ json_data = {'method': 'some.method', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ with pytest.raises(JSONException):
+ json._on_json_request(request)
+
+ def test_on_json_request_valid_content_type(self):
+ """Ensure content-type application/json is accepted"""
+ json = JSON()
+ request = MagicMock()
+ request.getHeader.return_value = b'application/json'
+ json_data = {'method': 'some.method', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ json._on_json_request(request)
+
+ def test_on_json_request_valid_content_type_with_charset(self):
+ """Ensure content-type parameters such as charset are ignored"""
+ json = JSON()
+ request = MagicMock()
+ request.getHeader.return_value = b'application/json;charset=utf-8'
+ json_data = {'method': 'some.method', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ json._on_json_request(request)
+
+
+@pytest.mark.usefixtures('daemon', 'client', 'component')
+class TestJSONCustomUserTestCase:
+ @pytest_twisted.inlineCallbacks
+ def test_handle_request_auth_error(self):
+ json = JSON()
+ auth_conf = {'session_timeout': 10, 'sessions': {}}
+ Auth(auth_conf) # Must create the component
+
+ # Must be called to update remote methods in json object
+ yield json.get_remote_methods()
+
+ request = MagicMock()
+ request.getCookie = MagicMock(return_value=b'bad_value')
+ json_data = {'method': 'core.get_libtorrent_version', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ request_id, result, error = json._handle_request(request)
+ assert error == {'message': 'Not authenticated', 'code': 1}
+
+
+@pytest.mark.usefixtures('daemon', 'client', 'component')
+class TestRPCRaiseDelugeErrorJSON:
+ daemon_custom_script = """
+ from deluge.error import DelugeError
+ from deluge.core.rpcserver import export
+ class TestClass(object):
+ @export()
+ def test(self):
+ raise DelugeError('DelugeERROR')
+
+ test = TestClass()
+ daemon.rpcserver.register_object(test)
+"""
+
+ async def test_handle_request_method_raise_delugeerror(self):
+ json = JSON()
+
+ def get_session_id(s_id):
+ return s_id
+
+ self.patch(deluge.ui.web.auth, 'get_session_id', get_session_id)
+ auth_conf = {'session_timeout': 10, 'sessions': {}}
+ auth = Auth(auth_conf)
+ request = Request(MagicMock(), False)
+ request.base = b''
+ auth._create_session(request)
+ methods = await json.get_remote_methods()
+ # Verify the function has been registered
+ assert 'testclass.test' in methods
+
+ request = MagicMock()
+ session_id = list(auth.config['sessions'])[0]
+ request.getCookie = MagicMock(return_value=session_id.encode())
+ json_data = {'method': 'testclass.test', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ request_id, result, error = json._handle_request(request)
+ with pytest.raises(DelugeError):
+ await result
+
+
+class TestJSONRequestFailed(WebServerMockBase):
+ @pytest_twisted.async_yield_fixture(autouse=True)
+ async def set_up(self, config_dir):
+ custom_script = """
+ from deluge.error import DelugeError
+ from deluge.core.rpcserver import export
+ from twisted.internet import reactor, task
+ class TestClass(object):
+ @export()
+ def test(self):
+ def test_raise_error():
+ raise DelugeError('DelugeERROR')
+
+ return task.deferLater(reactor, 1, test_raise_error)
+
+ test = TestClass()
+ daemon.rpcserver.register_object(test)
+"""
+
+ extra_callback = {
+ 'deferred': Deferred(),
+ 'types': ['stderr'],
+ 'timeout': 10,
+ 'triggers': [
+ {
+ 'expr': 'in test_raise_error',
+ 'value': lambda reader, data, data_all: 'Test',
+ }
+ ],
+ }
+
+ def on_test_raise(*args):
+ assert 'Unhandled error in Deferred:' in daemon.stderr_out
+ assert 'in test_raise_error' in daemon.stderr_out
+
+ d, daemon = common.start_core(
+ custom_script=custom_script,
+ print_stdout=True,
+ print_stderr=False,
+ timeout=5,
+ extra_callbacks=[extra_callback],
+ config_directory=config_dir,
+ )
+ extra_callback['deferred'].addCallback(on_test_raise, daemon)
+
+ await d
+ yield
+ await daemon.kill()
+
+ @pytest_twisted.inlineCallbacks
+ def test_render_on_rpc_request_failed(self, component, client):
+ json = JSON()
+
+ methods = yield json.get_remote_methods()
+ # Verify the function has been registered
+ assert 'testclass.test' in methods
+
+ request = MagicMock()
+
+ # Circumvent authentication
+ auth = Auth({})
+ self.mock_authentication_ignore(auth)
+
+ def write(response_str):
+ request.write_was_called = True
+ response = json_lib.loads(response_str.decode())
+ assert response['result'] is None, 'BAD RESULT'
+ assert response['id'] == 0
+ assert (
+ response['error']['message']
+ == 'Failure: [Failure instance: Traceback (failure with no frames):'
+ " <class 'deluge.error.DelugeError'>: DelugeERROR\n]"
+ )
+ assert response['error']['code'] == 4
+
+ request.write = write
+ request.write_was_called = False
+ request._disconnected = False
+ request.getHeader.return_value = b'application/json'
+ json_data = {'method': 'testclass.test', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ d = json._on_json_request(request)
+
+ def on_success(arg):
+ assert arg == server.NOT_DONE_YET
+ return True
+
+ d.addCallbacks(on_success, pytest.fail)
+ yield d
diff --git a/deluge/tests/test_log.py b/deluge/tests/test_log.py
new file mode 100644
index 0000000..f0dcbee
--- /dev/null
+++ b/deluge/tests/test_log.py
@@ -0,0 +1,47 @@
+#
+# Copyright (C) 2015 Calum Lind <calumlind@gmail.com>
+# Copyright (C) 2010 Pedro Algarvio <ufs@ufsoft.org>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import logging
+import warnings
+
+from deluge.conftest import BaseTestCase
+from deluge.log import setup_logger
+
+
+class TestLog(BaseTestCase):
+ def set_up(self):
+ setup_logger(logging.DEBUG)
+
+ def tear_down(self):
+ setup_logger('none')
+
+ def test_old_log_deprecation_warning(self):
+ from deluge.log import LOG
+
+ with warnings.catch_warnings(record=True) as w:
+ # Cause all warnings to always be triggered.
+ warnings.simplefilter('always')
+ LOG.debug('foo')
+ assert w[-1].category == DeprecationWarning
+
+ # def test_twisted_error_log(self):
+ # from twisted.internet import defer
+ # import deluge.component as component
+ # from deluge.core.eventmanager import EventManager
+ # EventManager()
+ #
+ # d = component.start()
+ #
+ # @defer.inlineCallbacks
+ # def call(*args):
+ # yield component.pause(["EventManager"])
+ # yield component.start(["EventManager"])
+ #
+ # d.addCallback(call)
+ # return d
diff --git a/deluge/tests/test_maketorrent.py b/deluge/tests/test_maketorrent.py
new file mode 100644
index 0000000..a2e473f
--- /dev/null
+++ b/deluge/tests/test_maketorrent.py
@@ -0,0 +1,85 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+import tempfile
+
+from deluge import maketorrent
+
+
+def check_torrent(filename):
+ # Test loading with libtorrent to make sure it's valid
+ from deluge._libtorrent import lt
+
+ lt.torrent_info(filename)
+
+ # Test loading with our internal TorrentInfo class
+ from deluge.ui.common import TorrentInfo
+
+ TorrentInfo(filename)
+
+
+class TestMakeTorrent:
+ def test_save_multifile(self):
+ # Create a temporary folder for torrent creation
+ tmp_path = tempfile.mkdtemp()
+ with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file:
+ _file.write(b'a' * (312 * 1024))
+ with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file:
+ _file.write(b'b' * (2354 * 1024))
+ with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file:
+ _file.write(b'c' * (11 * 1024))
+
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_path
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(os.path.join(tmp_path, 'file_A'))
+ os.remove(os.path.join(tmp_path, 'file_B'))
+ os.remove(os.path.join(tmp_path, 'file_C'))
+ os.rmdir(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
+
+ def test_save_singlefile(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_data = tmp_dir + '/data'
+ with open(tmp_data, 'wb') as _file:
+ _file.write(b'a' * (2314 * 1024))
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_data
+ tmp_file = tmp_dir + '/.torrent'
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
+
+ def test_save_multifile_padded(self):
+ # Create a temporary folder for torrent creation
+ tmp_path = tempfile.mkdtemp()
+ with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file:
+ _file.write(b'a' * (312 * 1024))
+ with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file:
+ _file.write(b'b' * (2354 * 1024))
+ with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file:
+ _file.write(b'c' * (11 * 1024))
+
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_path
+ t.pad_files = True
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(os.path.join(tmp_path, 'file_A'))
+ os.remove(os.path.join(tmp_path, 'file_B'))
+ os.remove(os.path.join(tmp_path, 'file_C'))
+ os.rmdir(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
diff --git a/deluge/tests/test_maybe_coroutine.py b/deluge/tests/test_maybe_coroutine.py
new file mode 100644
index 0000000..afaf171
--- /dev/null
+++ b/deluge/tests/test_maybe_coroutine.py
@@ -0,0 +1,207 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import pytest
+import pytest_twisted
+import twisted.python.failure
+from twisted.internet import defer, reactor, task
+from twisted.internet.defer import maybeDeferred
+
+from deluge.decorators import maybe_coroutine
+
+
+@defer.inlineCallbacks
+def inline_func():
+ result = yield task.deferLater(reactor, 0, lambda: 'function_result')
+ return result
+
+
+@defer.inlineCallbacks
+def inline_error():
+ raise Exception('function_error')
+ yield
+
+
+@maybe_coroutine
+async def coro_func():
+ result = await task.deferLater(reactor, 0, lambda: 'function_result')
+ return result
+
+
+@maybe_coroutine
+async def coro_error():
+ raise Exception('function_error')
+
+
+@defer.inlineCallbacks
+def coro_func_from_inline():
+ result = yield coro_func()
+ return result
+
+
+@defer.inlineCallbacks
+def coro_error_from_inline():
+ result = yield coro_error()
+ return result
+
+
+@maybe_coroutine
+async def coro_func_from_coro():
+ return await coro_func()
+
+
+@maybe_coroutine
+async def coro_error_from_coro():
+ return await coro_error()
+
+
+@maybe_coroutine
+async def inline_func_from_coro():
+ return await inline_func()
+
+
+@maybe_coroutine
+async def inline_error_from_coro():
+ return await inline_error()
+
+
+@pytest_twisted.inlineCallbacks
+def test_standard_twisted():
+ """Sanity check that twisted tests work how we expect.
+
+ Not really testing deluge code at all.
+ """
+ result = yield inline_func()
+ assert result == 'function_result'
+
+ with pytest.raises(Exception, match='function_error'):
+ yield inline_error()
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_func,
+ coro_func,
+ coro_func_from_coro,
+ coro_func_from_inline,
+ inline_func_from_coro,
+ ],
+)
+@pytest_twisted.inlineCallbacks
+def test_from_inline(function):
+ """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds."""
+ result = yield function()
+ assert result == 'function_result'
+
+ def cb(result):
+ assert result == 'function_result'
+
+ d = function()
+ d.addCallback(cb)
+ yield d
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_error,
+ coro_error,
+ coro_error_from_coro,
+ coro_error_from_inline,
+ inline_error_from_coro,
+ ],
+)
+@pytest_twisted.inlineCallbacks
+def test_error_from_inline(function):
+ """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds that raise."""
+ with pytest.raises(Exception, match='function_error'):
+ yield function()
+
+ def eb(result):
+ assert isinstance(result, twisted.python.failure.Failure)
+ assert result.getErrorMessage() == 'function_error'
+
+ d = function()
+ d.addErrback(eb)
+ yield d
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_func,
+ coro_func,
+ coro_func_from_coro,
+ coro_func_from_inline,
+ inline_func_from_coro,
+ ],
+)
+async def test_from_coro(function):
+ """Test our coroutines wrapped with maybe_coroutine work from another coroutine."""
+ result = await function()
+ assert result == 'function_result'
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_error,
+ coro_error,
+ coro_error_from_coro,
+ coro_error_from_inline,
+ inline_error_from_coro,
+ ],
+)
+async def test_error_from_coro(function):
+ """Test our coroutines wrapped with maybe_coroutine work from another coroutine with errors."""
+ with pytest.raises(Exception, match='function_error'):
+ await function()
+
+
+async def test_tracebacks_preserved():
+ with pytest.raises(Exception) as exc:
+ await coro_error_from_coro()
+ traceback_lines = [
+ 'await coro_error_from_coro()',
+ 'return await coro_error()',
+ "raise Exception('function_error')",
+ ]
+ # If each coroutine got wrapped with ensureDeferred, the traceback will be mangled
+ # verify the coroutines passed through by checking the traceback.
+ for expected, actual in zip(traceback_lines, exc.traceback):
+ assert expected in str(actual)
+
+
+async def test_maybe_deferred_coroutine():
+ result = await maybeDeferred(coro_func)
+ assert result == 'function_result'
+
+
+async def test_callback_before_await():
+ def cb(res):
+ assert res == 'function_result'
+ return res
+
+ d = coro_func()
+ d.addCallback(cb)
+ result = await d
+ assert result == 'function_result'
+
+
+async def test_callback_after_await():
+ """If it has already been used as a coroutine, can't be retroactively turned into a Deferred.
+ This limitation could be fixed, but the extra complication doesn't feel worth it.
+ """
+
+ def cb(res):
+ pass
+
+ d = coro_func()
+ await d
+ with pytest.raises(
+ Exception, match='Cannot add callbacks to an already awaited coroutine'
+ ):
+ d.addCallback(cb)
diff --git a/deluge/tests/test_metafile.py b/deluge/tests/test_metafile.py
new file mode 100644
index 0000000..1b16750
--- /dev/null
+++ b/deluge/tests/test_metafile.py
@@ -0,0 +1,112 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+import tempfile
+
+import pytest
+
+from deluge import metafile
+from deluge._libtorrent import LT_VERSION
+from deluge.common import VersionSplit
+
+from . import common
+
+
+def check_torrent(filename):
+ # Test loading with libtorrent to make sure it's valid
+ from deluge._libtorrent import lt
+
+ lt.torrent_info(filename)
+
+ # Test loading with our internal TorrentInfo class
+ from deluge.ui.common import TorrentInfo
+
+ TorrentInfo(filename)
+
+
+class TestMetafile:
+ def test_save_multifile(self):
+ # Create a temporary folder for torrent creation
+ tmp_path = tempfile.mkdtemp()
+ with open(os.path.join(tmp_path, 'file_A'), 'wb') as tmp_file:
+ tmp_file.write(b'a' * (312 * 1024))
+ with open(os.path.join(tmp_path, 'file_B'), 'wb') as tmp_file:
+ tmp_file.write(b'b' * (2354 * 1024))
+ with open(os.path.join(tmp_path, 'file_C'), 'wb') as tmp_file:
+ tmp_file.write(b'c' * (11 * 1024))
+
+ tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
+ metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file)
+
+ check_torrent(tmp_file)
+
+ os.remove(os.path.join(tmp_path, 'file_A'))
+ os.remove(os.path.join(tmp_path, 'file_B'))
+ os.remove(os.path.join(tmp_path, 'file_C'))
+ os.rmdir(tmp_path)
+ os.close(tmp_fd)
+ os.remove(tmp_file)
+
+ def test_save_singlefile(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_data = tmp_dir + '/testdata'
+ with open(tmp_data, 'wb') as tmp_file:
+ tmp_file.write(b'a' * (2314 * 1024))
+
+ tmp_torrent = tmp_dir + '/.torrent'
+ metafile.make_meta_file(tmp_data, '', 32768, target=tmp_torrent)
+
+ check_torrent(tmp_torrent)
+
+ @pytest.mark.parametrize(
+ 'path',
+ [
+ common.get_test_data_file('deluge.png'),
+ common.get_test_data_file('unicode_filenames.torrent'),
+ os.path.dirname(common.get_test_data_file('deluge.png')),
+ ],
+ )
+ @pytest.mark.parametrize(
+ 'torrent_format',
+ [
+ metafile.TorrentFormat.V1,
+ metafile.TorrentFormat.V2,
+ metafile.TorrentFormat.HYBRID,
+ ],
+ )
+ @pytest.mark.parametrize('piece_length', [2**14, 2**15, 2**16])
+ @pytest.mark.parametrize('private', [True, False])
+ def test_create_info(self, path, torrent_format, piece_length, private):
+ our_info, our_piece_layers = metafile.makeinfo(
+ path,
+ piece_length,
+ metafile.dummy,
+ private=private,
+ torrent_format=torrent_format,
+ )
+ lt_info, lt_piece_layers = metafile.makeinfo_lt(
+ path,
+ piece_length,
+ private=private,
+ torrent_format=torrent_format,
+ )
+
+ if (
+ torrent_format == metafile.TorrentFormat.HYBRID
+ and os.path.isdir(path)
+ and VersionSplit(LT_VERSION) <= VersionSplit('2.0.7.0')
+ ):
+ # Libtorrent didn't correctly follow the standard until version 2.0.7 included
+ # https://github.com/arvidn/libtorrent/commit/74d82a0cd7c2e9e3c4294901d7eb65e247050df4
+ # If last file is a padding, ignore that file and the last piece.
+ if our_info[b'files'][-1][b'path'][0] == b'.pad':
+ our_info[b'files'] = our_info[b'files'][:-1]
+ our_info[b'pieces'] = our_info[b'pieces'][:-32]
+ lt_info[b'pieces'] = lt_info[b'pieces'][:-32]
+
+ assert our_info == lt_info
+ assert our_piece_layers == lt_piece_layers
diff --git a/deluge/tests/test_plugin_metadata.py b/deluge/tests/test_plugin_metadata.py
new file mode 100644
index 0000000..adf115d
--- /dev/null
+++ b/deluge/tests/test_plugin_metadata.py
@@ -0,0 +1,43 @@
+#
+# Copyright (C) 2015 Calum Lind <calumlind@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from deluge.pluginmanagerbase import PluginManagerBase
+
+
+class TestPluginManagerBase:
+ def test_get_plugin_info(self):
+ pm = PluginManagerBase('core.conf', 'deluge.plugin.core')
+ for p in pm.get_available_plugins():
+ for key, value in pm.get_plugin_info(p).items():
+ assert isinstance(key, str)
+ assert isinstance(value, str)
+
+ def test_get_plugin_info_invalid_name(self):
+ pm = PluginManagerBase('core.conf', 'deluge.plugin.core')
+ for key, value in pm.get_plugin_info('random').items():
+ result = 'not available' if key in ('Name', 'Version') else ''
+ assert value == result
+
+ def test_parse_pkg_info_metadata_2_1(self):
+ pkg_info = """Metadata-Version: 2.1
+Name: AutoAdd
+Version: 1.8
+Summary: Monitors folders for .torrent files.
+Home-page: http://dev.deluge-torrent.org/wiki/Plugins/AutoAdd
+Author: Chase Sterling, Pedro Algarvio
+Author-email: chase.sterling@gmail.com, pedro@algarvio.me
+License: GPLv3
+Platform: UNKNOWN
+
+Monitors folders for .torrent files.
+ """
+ plugin_info = PluginManagerBase.parse_pkg_info(pkg_info)
+ for value in plugin_info.values():
+ assert value != ''
+ result = 'Monitors folders for .torrent files.'
+ assert plugin_info['Description'] == result
diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py
new file mode 100644
index 0000000..77c9f1e
--- /dev/null
+++ b/deluge/tests/test_rpcserver.py
@@ -0,0 +1,108 @@
+#
+# Copyright (C) 2013 Bro <bro.development@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import deluge.component as component
+import deluge.error
+from deluge.common import get_localhost_auth
+from deluge.conftest import BaseTestCase
+from deluge.core import rpcserver
+from deluge.core.authmanager import AuthManager
+from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer
+from deluge.log import setup_logger
+
+setup_logger('none')
+
+
+class DelugeRPCProtocolTester(DelugeRPCProtocol):
+ messages = []
+
+ def transfer_message(self, data):
+ self.messages.append(data)
+
+
+class TestRPCServer(BaseTestCase):
+ def set_up(self):
+ self.rpcserver = RPCServer(listen=False)
+ self.rpcserver.factory.protocol = DelugeRPCProtocolTester
+ self.factory = self.rpcserver.factory
+ self.session_id = '0'
+ self.request_id = 11
+ self.protocol = self.rpcserver.factory.protocol()
+ self.protocol.factory = self.factory
+ self.protocol.transport = self.protocol
+ self.factory.session_protocols[self.session_id] = self.protocol
+ self.factory.authorized_sessions[self.session_id] = None
+ self.factory.interested_events[self.session_id] = ['TorrentFolderRenamedEvent']
+ self.protocol.sessionno = self.session_id
+ return component.start()
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def test_emit_event_for_session_id(self):
+ torrent_id = '12'
+ from deluge.event import TorrentFolderRenamedEvent
+
+ data = [torrent_id, 'new name', 'old name']
+ e = TorrentFolderRenamedEvent(*data)
+ self.rpcserver.emit_event_for_session_id(self.session_id, e)
+ msg = self.protocol.messages.pop()
+ assert msg[0] == rpcserver.RPC_EVENT, str(msg)
+ assert msg[1] == 'TorrentFolderRenamedEvent', str(msg)
+ assert msg[2] == data, str(msg)
+
+ def test_invalid_client_login(self):
+ self.protocol.dispatch(self.request_id, 'daemon.login', [1], {})
+ msg = self.protocol.messages.pop()
+ assert msg[0] == rpcserver.RPC_ERROR
+ assert msg[1] == self.request_id
+
+ def test_valid_client_login(self):
+ self.authmanager = AuthManager()
+ auth = get_localhost_auth()
+ self.protocol.dispatch(
+ self.request_id, 'daemon.login', auth, {'client_version': 'Test'}
+ )
+ msg = self.protocol.messages.pop()
+ assert msg[0] == rpcserver.RPC_RESPONSE, str(msg)
+ assert msg[1] == self.request_id, str(msg)
+ assert msg[2] == rpcserver.AUTH_LEVEL_ADMIN, str(msg)
+
+ def test_client_login_error(self):
+ # This test causes error log prints while running the test...
+ self.protocol.transport = None # This should cause AttributeError
+ self.authmanager = AuthManager()
+ auth = get_localhost_auth()
+ self.protocol.dispatch(
+ self.request_id, 'daemon.login', auth, {'client_version': 'Test'}
+ )
+ msg = self.protocol.messages.pop()
+ assert msg[0] == rpcserver.RPC_ERROR
+ assert msg[1] == self.request_id
+ assert msg[2] == 'WrappedException'
+ assert msg[3][1] == 'AttributeError'
+
+ def test_client_invalid_method_call(self):
+ self.authmanager = AuthManager()
+ auth = get_localhost_auth()
+ self.protocol.dispatch(self.request_id, 'invalid_function', auth, {})
+ msg = self.protocol.messages.pop()
+ assert msg[0] == rpcserver.RPC_ERROR
+ assert msg[1] == self.request_id
+ assert msg[2] == 'WrappedException'
+ assert msg[3][1] == 'AttributeError'
+
+ def test_daemon_info(self):
+ self.protocol.dispatch(self.request_id, 'daemon.info', [], {})
+ msg = self.protocol.messages.pop()
+ assert msg[0] == rpcserver.RPC_RESPONSE, str(msg)
+ assert msg[1] == self.request_id, str(msg)
+ assert msg[2] == deluge.common.get_version(), str(msg)
diff --git a/deluge/tests/test_security.py b/deluge/tests/test_security.py
new file mode 100644
index 0000000..c472d16
--- /dev/null
+++ b/deluge/tests/test_security.py
@@ -0,0 +1,158 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+
+import pytest
+from twisted.internet.utils import getProcessOutputAndValue
+
+import deluge.component as component
+import deluge.ui.web.server
+from deluge import configmanager
+from deluge.common import windows_check
+from deluge.conftest import BaseTestCase
+from deluge.ui.web.server import DelugeWeb
+
+from .common import get_test_data_file
+from .common_web import WebServerTestBase
+from .daemon_base import DaemonBase
+
+SECURITY_TESTS = bool(os.getenv('SECURITY_TESTS', False))
+
+
+# TODO: This whole module has not been tested since migrating tests fully to pytest
+class SecurityBaseTestCase:
+ @pytest.fixture(autouse=True)
+ def setvars(self):
+ self.home_dir = os.path.expanduser('~')
+ self.port = 8112
+
+ def _run_test(self, test):
+ d = getProcessOutputAndValue(
+ 'bash',
+ [
+ get_test_data_file('testssl.sh'),
+ '--quiet',
+ '--nodns',
+ 'none',
+ '--color',
+ '0',
+ test,
+ '127.0.0.1:%d' % self.port,
+ ],
+ )
+
+ def on_result(results):
+ if test == '-e':
+ results = results[0].split(b'\n')[7:-6]
+ assert len(results) > 3
+ else:
+ assert b'OK' in results[0]
+ assert b'NOT ok' not in results[0]
+
+ d.addCallback(on_result)
+ return d
+
+ def test_secured_webserver_protocol(self):
+ return self._run_test('-p')
+
+ def test_secured_webserver_standard_ciphers(self):
+ return self._run_test('-s')
+
+ def test_secured_webserver_heartbleed_vulnerability(self):
+ return self._run_test('-H')
+
+ def test_secured_webserver_css_injection_vulnerability(self):
+ return self._run_test('-I')
+
+ def test_secured_webserver_renegotiation_vulnerabilities(self):
+ return self._run_test('-R')
+
+ def test_secured_webserver_crime_vulnerability(self):
+ return self._run_test('-C')
+
+ def test_secured_webserver_poodle_vulnerability(self):
+ return self._run_test('-O')
+
+ def test_secured_webserver_tls_fallback_scsv_mitigation_vulnerability(self):
+ return self._run_test('-Z')
+
+ def test_secured_webserver_sweet32_vulnerability(self):
+ return self._run_test('-W')
+
+ def test_secured_webserver_beast_vulnerability(self):
+ return self._run_test('-A')
+
+ def test_secured_webserver_lucky13_vulnerability(self):
+ return self._run_test('-L')
+
+ def test_secured_webserver_freak_vulnerability(self):
+ return self._run_test('-F')
+
+ def test_secured_webserver_logjam_vulnerability(self):
+ return self._run_test('-J')
+
+ def test_secured_webserver_drown_vulnerability(self):
+ return self._run_test('-D')
+
+ def test_secured_webserver_forward_secrecy_settings(self):
+ return self._run_test('-f')
+
+ def test_secured_webserver_rc4_ciphers(self):
+ return self._run_test('-4')
+
+ def test_secured_webserver_preference(self):
+ return self._run_test('-P')
+
+ def test_secured_webserver_ciphers(self):
+ return self._run_test('-e')
+
+
+@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files')
+@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests')
+@pytest.mark.security
+class TestDaemonSecurity(BaseTestCase, DaemonBase, SecurityBaseTestCase):
+ def set_up(self):
+ d = self.common_set_up()
+ self.port = self.listen_port
+ d.addCallback(self.start_core)
+ d.addErrback(self.terminate_core)
+ return d
+
+ def tear_down(self):
+ d = component.shutdown()
+ d.addCallback(self.terminate_core)
+ return d
+
+
+@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files')
+@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests')
+@pytest.mark.security
+class TestWebUISecurity(WebServerTestBase, SecurityBaseTestCase):
+ def start_webapi(self, arg):
+ self.port = self.deluge_web.port = 8999
+
+ config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy()
+ config_defaults['port'] = self.deluge_web.port
+ config_defaults['https'] = True
+ self.config = configmanager.ConfigManager('web.conf', config_defaults)
+
+ self.deluge_web = DelugeWeb(daemon=False)
+
+ host = list(self.deluge_web.web_api.hostlist.config['hosts'][0])
+ host[2] = self.listen_port
+ self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host)
+ self.host_id = host[0]
+ self.deluge_web.start()
+
+ def test_secured_webserver_headers(self):
+ return self._run_test('-h')
+
+ def test_secured_webserver_breach_vulnerability(self):
+ return self._run_test('-B')
+
+ def test_secured_webserver_ticketbleed_vulnerability(self):
+ return self._run_test('-T')
diff --git a/deluge/tests/test_sessionproxy.py b/deluge/tests/test_sessionproxy.py
new file mode 100644
index 0000000..86289cc
--- /dev/null
+++ b/deluge/tests/test_sessionproxy.py
@@ -0,0 +1,154 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+from twisted.internet.defer import maybeDeferred, succeed
+from twisted.internet.task import Clock
+
+import deluge.component as component
+import deluge.ui.sessionproxy
+from deluge.conftest import BaseTestCase
+
+
+class Core:
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.torrents = {}
+ self.torrents['a'] = {'key1': 1, 'key2': 2, 'key3': 3}
+ self.torrents['b'] = {'key1': 1, 'key2': 2, 'key3': 3}
+ self.torrents['c'] = {'key1': 1, 'key2': 2, 'key3': 3}
+ self.prev_status = {}
+
+ def get_session_state(self):
+ return maybeDeferred(self.torrents.keys)
+
+ def get_torrent_status(self, torrent_id, keys, diff=False):
+ if not keys:
+ keys = list(self.torrents[torrent_id])
+
+ if not diff:
+ ret = {}
+ for key in keys:
+ ret[key] = self.torrents[torrent_id][key]
+
+ return succeed(ret)
+
+ else:
+ ret = {}
+ if torrent_id in self.prev_status:
+ for key in keys:
+ if (
+ self.prev_status[torrent_id][key]
+ != self.torrents[torrent_id][key]
+ ):
+ ret[key] = self.torrents[torrent_id][key]
+ else:
+ ret = self.torrents[torrent_id]
+ self.prev_status[torrent_id] = dict(self.torrents[torrent_id])
+ return succeed(ret)
+
+ def get_torrents_status(self, filter_dict, keys, diff=False):
+ if not filter_dict:
+ filter_dict['id'] = list(self.torrents)
+ if not keys:
+ keys = list(self.torrents['a'])
+ if not diff:
+ if 'id' in filter_dict:
+ torrents = filter_dict['id']
+ ret = {}
+ for torrent in torrents:
+ ret[torrent] = {}
+ for key in keys:
+ ret[torrent][key] = self.torrents[torrent][key]
+ return succeed(ret)
+ else:
+ if 'id' in filter_dict:
+ torrents = filter_dict['id']
+ ret = {}
+ for torrent in torrents:
+ ret[torrent] = {}
+ if torrent in self.prev_status:
+ for key in self.prev_status[torrent]:
+ if (
+ self.prev_status[torrent][key]
+ != self.torrents[torrent][key]
+ ):
+ ret[torrent][key] = self.torrents[torrent][key]
+ else:
+ ret[torrent] = dict(self.torrents[torrent])
+
+ self.prev_status[torrent] = dict(self.torrents[torrent])
+ return succeed(ret)
+
+
+class Client:
+ def __init__(self):
+ self.core = Core()
+
+ def __noop__(self, *args, **kwargs):
+ return None
+
+ def __getattr__(self, *args, **kwargs):
+ return self.__noop__
+
+
+client = Client()
+
+
+class TestSessionProxy(BaseTestCase):
+ def set_up(self):
+ self.clock = Clock()
+ self.patch(deluge.ui.sessionproxy, 'time', self.clock.seconds)
+ self.patch(deluge.ui.sessionproxy, 'client', client)
+ self.sp = deluge.ui.sessionproxy.SessionProxy()
+ client.core.reset()
+ d = self.sp.start()
+
+ def do_get_torrents_status(torrent_ids):
+ inital_keys = ['key1']
+ # Advance clock to expire the cache times
+ self.clock.advance(2)
+ return self.sp.get_torrents_status({'id': torrent_ids}, inital_keys)
+
+ d.addCallback(do_get_torrents_status)
+ return d
+
+ def tear_down(self):
+ return component.deregister(self.sp)
+
+ def test_startup(self):
+ assert client.core.torrents['a'] == self.sp.torrents['a'][1]
+
+ async def test_get_torrent_status_no_change(self):
+ result = await self.sp.get_torrent_status('a', [])
+ assert result == client.core.torrents['a']
+
+ async def test_get_torrent_status_change_with_cache(self):
+ client.core.torrents['a']['key1'] = 2
+ result = await self.sp.get_torrent_status('a', ['key1'])
+ assert result == {'key1': 1}
+
+ async def test_get_torrent_status_change_without_cache(self):
+ client.core.torrents['a']['key1'] = 2
+ self.clock.advance(self.sp.cache_time + 0.1)
+ result = await self.sp.get_torrent_status('a', [])
+ assert result == client.core.torrents['a']
+
+ async def test_get_torrent_status_key_not_updated(self):
+ self.clock.advance(self.sp.cache_time + 0.1)
+ self.sp.get_torrent_status('a', ['key1'])
+ client.core.torrents['a']['key2'] = 99
+ result = await self.sp.get_torrent_status('a', ['key2'])
+ assert result == {'key2': 99}
+
+ async def test_get_torrents_status_key_not_updated(self):
+ self.clock.advance(self.sp.cache_time + 0.1)
+ self.sp.get_torrents_status({'id': ['a']}, ['key1'])
+ client.core.torrents['a']['key2'] = 99
+ result = await self.sp.get_torrents_status({'id': ['a']}, ['key2'])
+ assert result == {'a': {'key2': 99}}
diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py
new file mode 100644
index 0000000..6288615
--- /dev/null
+++ b/deluge/tests/test_torrent.py
@@ -0,0 +1,388 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import itertools
+import os
+import time
+from base64 import b64encode
+from unittest import mock
+
+import pytest
+from twisted.internet import defer, reactor
+from twisted.internet.task import deferLater
+
+import deluge.component as component
+import deluge.core.torrent
+import deluge.tests.common as common
+from deluge._libtorrent import lt
+from deluge.common import VersionSplit, utf8_encode_structure
+from deluge.conftest import BaseTestCase
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.core.torrent import Torrent
+from deluge.core.torrentmanager import TorrentManager, TorrentState
+
+try:
+ from unittest.mock import AsyncMock
+except ImportError:
+ from mock import AsyncMock
+
+
+class TestTorrent(BaseTestCase):
+ def setup_config(self):
+ core_config = deluge.config.Config(
+ 'core.conf',
+ defaults=deluge.core.preferencesmanager.DEFAULT_PREFS,
+ config_dir=self.config_dir,
+ )
+ core_config.save()
+
+ def set_up(self):
+ self.setup_config()
+ self.rpcserver = RPCServer(listen=False)
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.core.config.config['new_release_check'] = False
+ self.session = self.core.session
+ self.torrent = None
+ return component.start()
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def print_priority_list(self, priorities):
+ tmp = ''
+ for i, p in enumerate(priorities):
+ if i % 100 == 0:
+ print(tmp)
+ tmp = ''
+ tmp += '%s' % p
+ print(tmp)
+
+ def assert_state(self, torrent, state):
+ """Assert torrent state matches expected state"""
+ torrent.update_state()
+ assert torrent.state == state
+
+ def assert_state_wait(self, torrent, expected, timeout=1, interval=0.2):
+ """Assert state but retry with timeout e.g. Allow for async lt alerts"""
+ start = time.time()
+
+ while time.time() - start < timeout:
+ torrent.update_state()
+ time.sleep(interval)
+ if torrent.state == expected:
+ break
+ else:
+ assert torrent.state == expected
+
+ def get_torrent_atp(self, filename):
+ filename = common.get_test_data_file(filename)
+ with open(filename, 'rb') as _file:
+ info = lt.torrent_info(lt.bdecode(_file.read()))
+ atp = {
+ 'ti': info,
+ 'save_path': os.getcwd(),
+ 'storage_mode': lt.storage_mode_t.storage_mode_sparse,
+ 'flags': (
+ lt.torrent_flags.auto_managed
+ | lt.torrent_flags.duplicate_is_error & ~lt.torrent_flags.paused
+ ),
+ }
+ return atp
+
+ async def test_set_file_priorities(self):
+ if getattr(lt, 'file_prio_alert', None):
+ # Libtorrent 2.0.3 and later has a file_prio_alert
+ prios_set = defer.Deferred()
+ prios_set.addTimeout(1.5, reactor)
+ component.get('AlertManager').register_handler(
+ 'file_prio_alert', lambda a: prios_set.callback(True)
+ )
+ else:
+ # On older libtorrent, we just wait a while
+ prios_set = deferLater(reactor, 0.8)
+
+ atp = self.get_torrent_atp('dir_with_6_files.torrent')
+ handle = self.session.add_torrent(atp)
+ torrent = Torrent(handle, {})
+
+ result = torrent.get_file_priorities()
+ assert all(x == 4 for x in result)
+
+ new_priorities = [3, 1, 2, 0, 5, 6, 7]
+ torrent.set_file_priorities(new_priorities)
+ assert torrent.get_file_priorities() == new_priorities
+
+ # Test with handle.piece_priorities as handle.file_priorities async
+ # updates and will return old value. Also need to remove a priority
+ # value as one file is much smaller than piece size so doesn't show.
+ await prios_set # Delay to wait for alert from lt
+ piece_prio = handle.get_piece_priorities()
+ result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7])
+ assert result
+
+ def test_set_prioritize_first_last_pieces(self):
+ piece_indexes = [
+ 0,
+ 1,
+ 50,
+ 51,
+ 52,
+ 110,
+ 111,
+ 112,
+ 113,
+ 200,
+ 201,
+ 202,
+ 212,
+ 213,
+ 214,
+ 215,
+ 216,
+ 217,
+ 457,
+ 458,
+ 459,
+ 460,
+ 461,
+ 462,
+ ]
+ self.run_test_set_prioritize_first_last_pieces(
+ 'dir_with_6_files.torrent', piece_indexes
+ )
+
+ def run_test_set_prioritize_first_last_pieces(
+ self, torrent_file, prioritized_piece_indexes
+ ):
+ atp = self.get_torrent_atp(torrent_file)
+ handle = self.session.add_torrent(atp)
+
+ self.torrent = Torrent(handle, {})
+ priorities_original = handle.get_piece_priorities()
+ self.torrent.set_prioritize_first_last_pieces(True)
+ priorities = handle.get_piece_priorities()
+
+ # The length of the list of new priorites is the same as the original
+ assert len(priorities_original) == len(priorities)
+
+ # Test the priority of all the pieces against the calculated indexes.
+ for idx, priority in enumerate(priorities):
+ if idx in prioritized_piece_indexes:
+ assert priorities[idx] == 7
+ else:
+ assert priorities[idx] == 4
+
+ # self.print_priority_list(priorities)
+
+ def test_set_prioritize_first_last_pieces_false(self):
+ atp = self.get_torrent_atp('dir_with_6_files.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ # First set some pieces prioritized
+ self.torrent.set_prioritize_first_last_pieces(True)
+ # Reset pirorities
+ self.torrent.set_prioritize_first_last_pieces(False)
+ priorities = handle.get_piece_priorities()
+
+ # Test the priority of the prioritized pieces
+ for i in priorities:
+ assert priorities[i] == 4
+
+ # self.print_priority_list(priorities)
+
+ def test_torrent_error_data_missing(self):
+ options = {'seed_mode': True}
+ filename = common.get_test_data_file('test_torrent.file.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ torrent = self.core.torrentmanager.torrents[torrent_id]
+
+ # Inital check will fail and return to download state
+ self.assert_state_wait(torrent, 'Downloading')
+
+ # Force an error by reading (non-existant) piece from disk
+ torrent.handle.read_piece(0)
+ self.assert_state_wait(torrent, 'Error')
+
+ def test_torrent_error_resume_original_state(self):
+ options = {'seed_mode': True, 'add_paused': True}
+ filename = common.get_test_data_file('test_torrent.file.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ torrent = self.core.torrentmanager.torrents[torrent_id]
+
+ orig_state = 'Paused'
+ self.assert_state(torrent, orig_state)
+
+ # Force an error by reading (non-existant) piece from disk
+ torrent.handle.read_piece(0)
+ self.assert_state_wait(torrent, 'Error')
+
+ # Clear error and verify returned to original state
+ torrent.force_recheck()
+
+ def test_torrent_error_resume_data_unaltered(self):
+ if VersionSplit(lt.__version__) >= VersionSplit('1.2.0.0'):
+ pytest.skip('Test not working as expected on lt 1.2 or greater')
+
+ resume_data = {
+ 'active_time': 13399,
+ 'num_incomplete': 16777215,
+ 'announce_to_lsd': 1,
+ 'seed_mode': 0,
+ 'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01',
+ 'paused': 0,
+ 'seeding_time': 13399,
+ 'last_scrape': 13399,
+ 'info-hash': '-\xc5\xd0\xe7\x1af\xfeid\x9ad\r9\xcb\x00\xa2YpIs',
+ 'max_uploads': 16777215,
+ 'max_connections': 16777215,
+ 'num_downloaders': 16777215,
+ 'total_downloaded': 0,
+ 'file-format': 'libtorrent resume file',
+ 'peers6': '',
+ 'added_time': 1411826665,
+ 'banned_peers6': '',
+ 'file_priority': [1],
+ 'last_seen_complete': 0,
+ 'total_uploaded': 0,
+ 'piece_priority': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01',
+ 'file-version': 1,
+ 'announce_to_dht': 1,
+ 'auto_managed': 1,
+ 'upload_rate_limit': 0,
+ 'completed_time': 1411826665,
+ 'allocation': 'sparse',
+ 'blocks per piece': 2,
+ 'download_rate_limit': 0,
+ 'libtorrent-version': '0.16.17.0',
+ 'banned_peers': '',
+ 'num_seeds': 16777215,
+ 'sequential_download': 0,
+ 'announce_to_trackers': 1,
+ 'peers': '\n\x00\x02\x0f=\xc6SC\x17]\xd8}\x7f\x00\x00\x01=\xc6',
+ 'finished_time': 13399,
+ 'last_upload': 13399,
+ 'trackers': [[]],
+ 'super_seeding': 0,
+ 'file sizes': [[512000, 1411826586]],
+ 'last_download': 13399,
+ }
+ torrent_state = TorrentState(
+ torrent_id='2dc5d0e71a66fe69649a640d39cb00a259704973',
+ filename='test_torrent.file.torrent',
+ name='',
+ save_path='/home/ubuntu/Downloads',
+ file_priorities=[1],
+ is_finished=True,
+ )
+
+ filename = common.get_test_data_file('test_torrent.file.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = _file.read()
+ resume_data = utf8_encode_structure(resume_data)
+ torrent_id = self.core.torrentmanager.add(
+ state=torrent_state, filedump=filedump, resume_data=lt.bencode(resume_data)
+ )
+ torrent = self.core.torrentmanager.torrents[torrent_id]
+
+ def assert_resume_data():
+ self.assert_state(torrent, 'Error')
+ tm_resume_data = lt.bdecode(
+ self.core.torrentmanager.resume_data[torrent.torrent_id]
+ )
+ assert tm_resume_data == resume_data
+
+ return deferLater(reactor, 0.5, assert_resume_data)
+
+ def test_get_eta_seeding(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ assert self.torrent.get_eta() == 0
+ self.torrent.status = mock.MagicMock()
+
+ self.torrent.status.upload_payload_rate = 5000
+ self.torrent.status.download_payload_rate = 0
+ self.torrent.status.all_time_download = 10000
+ self.torrent.status.all_time_upload = 500
+ self.torrent.is_finished = True
+ self.torrent.options = {'stop_at_ratio': False}
+ # Test finished and uploading but no stop_at_ratio set.
+ assert self.torrent.get_eta() == 0
+
+ self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5}
+ result = self.torrent.get_eta()
+ assert result == 2
+ assert isinstance(result, int)
+
+ def test_get_eta_downloading(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ assert self.torrent.get_eta() == 0
+
+ self.torrent.status = mock.MagicMock()
+ self.torrent.status.download_payload_rate = 50
+ self.torrent.status.total_wanted = 10000
+ self.torrent.status.total_wanted_done = 5000
+
+ result = self.torrent.get_eta()
+ assert result == 100
+ assert isinstance(result, int)
+
+ def test_get_name_unicode(self):
+ """Test retrieving a unicode torrent name from libtorrent."""
+ atp = self.get_torrent_atp('unicode_file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ assert self.torrent.get_name() == 'সুকুমার রায়.txt'
+
+ def test_rename_unicode(self):
+ """Test renaming file/folders with unicode filenames."""
+ atp = self.get_torrent_atp('unicode_filenames.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ # Ignore TorrentManager method call
+ TorrentManager.save_resume_data = AsyncMock()
+
+ result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв')
+ assert isinstance(result, defer.DeferredList)
+
+ result = self.torrent.rename_files([[0, 'new_рбачёв']])
+ assert result is None
+
+ def test_connect_peer_port(self):
+ """Test to ensure port is int for libtorrent"""
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ self.torrent = Torrent(handle, {})
+ assert not self.torrent.connect_peer('127.0.0.1', 'text')
+ assert self.torrent.connect_peer('127.0.0.1', '1234')
+
+ def test_status_cache(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ mock_time = mock.Mock(return_value=time.time())
+ with mock.patch('time.time', mock_time):
+ torrent = Torrent(handle, {})
+ counter = itertools.count()
+ handle.status = mock.Mock(side_effect=counter.__next__)
+ first_status = torrent.get_lt_status()
+ assert first_status == 0, 'sanity check'
+ assert first_status == torrent.status, 'cached status should be used'
+ assert torrent.get_lt_status() == 1, 'status should update'
+ assert torrent.status == 1
+ # Advance time and verify cache expires and updates
+ mock_time.return_value += 10
+ assert torrent.status == 2
diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py
new file mode 100644
index 0000000..1a5e3a9
--- /dev/null
+++ b/deluge/tests/test_torrentmanager.py
@@ -0,0 +1,146 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import os
+import shutil
+import warnings
+from base64 import b64encode
+from unittest import mock
+
+import pytest
+import pytest_twisted
+from twisted.internet import reactor, task
+
+from deluge import component
+from deluge.bencode import bencode
+from deluge.conftest import BaseTestCase
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.error import InvalidTorrentError
+
+from . import common
+
+warnings.filterwarnings('ignore', category=RuntimeWarning)
+warnings.resetwarnings()
+
+
+class TestTorrentmanager(BaseTestCase):
+ def set_up(self):
+ self.rpcserver = RPCServer(listen=False)
+ self.core = Core()
+ self.core.config.config['lsd'] = False
+ self.clock = task.Clock()
+ self.tm = self.core.torrentmanager
+ self.tm.callLater = self.clock.callLater
+ return component.start()
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrent(self):
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = _file.read()
+ torrent_id = yield self.core.add_torrent_file_async(
+ filename, b64encode(filedump), {}
+ )
+ assert self.tm.remove(torrent_id, False)
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_magnet(self):
+ """Test remove magnet before received metadata and delete_copies is True"""
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ options = {}
+ self.core.config.config['copy_torrent_file'] = True
+ self.core.config.config['del_copy_torrent_file'] = True
+ torrent_id = yield self.core.add_torrent_magnet(magnet, options)
+ assert self.tm.remove(torrent_id, False)
+
+ async def test_prefetch_metadata(self):
+ from deluge._libtorrent import lt
+
+ with open(common.get_test_data_file('test.torrent'), 'rb') as _file:
+ t_info = lt.torrent_info(lt.bdecode(_file.read()))
+ mock_alert = mock.MagicMock()
+ mock_alert.handle.info_hash = mock.MagicMock(
+ return_value='ab570cdd5a17ea1b61e970bb72047de141bce173'
+ )
+ mock_alert.handle.get_torrent_info = mock.MagicMock(return_value=t_info)
+
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ d = self.tm.prefetch_metadata(magnet, 30)
+ # Make sure to use calllater, because the above prefetch call won't
+ # actually start running until we await it.
+ reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert)
+
+ expected = (
+ 'ab570cdd5a17ea1b61e970bb72047de141bce173',
+ b64encode(
+ bencode(
+ {
+ b'piece length': 32768,
+ b'sha1': (
+ b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh'
+ b'\x9d\xc5\xb7\xac\xdd'
+ ),
+ b'name': b'azcvsupdater_2.6.2.jar',
+ b'private': 0,
+ b'pieces': (
+ b"\xdb\x04B\x05\xc3'\xdab\xb8su97\xa9u"
+ b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd'
+ b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7'
+ b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3'
+ b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0'
+ b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8'
+ b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed'
+ b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA'
+ b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f'
+ b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee'
+ b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc'
+ b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu'
+ b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?'
+ b'\xa1\xd6\x8c\x83\x9e&'
+ ),
+ b'length': 307949,
+ b'name.utf-8': b'azcvsupdater_2.6.2.jar',
+ b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7',
+ }
+ )
+ ),
+ )
+ assert expected == await d
+
+ async def test_prefetch_metadata_timeout(self):
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ d = self.tm.prefetch_metadata(magnet, 30)
+ self.clock.advance(30)
+ result = await d
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
+ assert result == expected
+
+ @pytest.mark.todo
+ def test_remove_torrent_false(self):
+ """Test when remove_torrent returns False"""
+ common.todo_test(self)
+
+ def test_remove_invalid_torrent(self):
+ with pytest.raises(InvalidTorrentError):
+ self.tm.remove('torrentidthatdoesntexist')
+
+ def test_open_state(self):
+ """Open a state with a UTF-8 encoded torrent filename."""
+ shutil.copy(
+ common.get_test_data_file('utf8_filename_torrents.state'),
+ os.path.join(self.config_dir, 'state', 'torrents.state'),
+ )
+
+ state = self.tm.open_state()
+ assert len(state.torrents) == 1
diff --git a/deluge/tests/test_torrentview.py b/deluge/tests/test_torrentview.py
new file mode 100644
index 0000000..9da99d8
--- /dev/null
+++ b/deluge/tests/test_torrentview.py
@@ -0,0 +1,224 @@
+#
+# Copyright (C) 2014 Bro <bro.development@gmail.com>
+# Copyright (C) 2014 Calum Lind <calumlind@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import pytest
+
+import deluge.component as component
+from deluge.configmanager import ConfigManager
+from deluge.conftest import BaseTestCase
+from deluge.i18n import setup_translation
+
+# Allow running other tests without GTKUI dependencies available
+try:
+ # pylint: disable=ungrouped-imports
+ from gi.repository.GObject import TYPE_UINT64
+
+ from deluge.ui.gtk3.gtkui import DEFAULT_PREFS
+ from deluge.ui.gtk3.mainwindow import MainWindow
+ from deluge.ui.gtk3.menubar import MenuBar
+ from deluge.ui.gtk3.torrentdetails import TorrentDetails
+ from deluge.ui.gtk3.torrentview import TorrentView
+except (ImportError, ValueError):
+ libs_available = False
+ TYPE_UINT64 = 'Whatever'
+else:
+ libs_available = True
+
+setup_translation()
+
+
+@pytest.mark.gtkui
+class TestTorrentview(BaseTestCase):
+ default_column_index = [
+ 'filter',
+ 'torrent_id',
+ 'dirty',
+ '#',
+ 'Name',
+ 'Size',
+ 'Downloaded',
+ 'Uploaded',
+ 'Remaining',
+ 'Progress',
+ 'Seeds',
+ 'Peers',
+ 'Seeds:Peers',
+ 'Down Speed',
+ 'Up Speed',
+ 'Down Limit',
+ 'Up Limit',
+ 'ETA',
+ 'Ratio',
+ 'Avail',
+ 'Added',
+ 'Completed',
+ 'Complete Seen',
+ 'Last Transfer',
+ 'Tracker',
+ 'Download Folder',
+ 'Owner',
+ 'Shared',
+ ]
+ default_liststore_columns = [
+ bool,
+ str,
+ bool,
+ int,
+ str,
+ str, # Name
+ TYPE_UINT64,
+ TYPE_UINT64,
+ TYPE_UINT64,
+ TYPE_UINT64,
+ float,
+ str, # Progress
+ int,
+ int,
+ int,
+ int,
+ float, # Seeds, Peers
+ int,
+ int,
+ float,
+ float,
+ int,
+ float,
+ float, # ETA, Ratio, Avail
+ int,
+ int,
+ int,
+ int,
+ str,
+ str, # Tracker
+ str,
+ str,
+ bool,
+ ] # shared
+
+ def set_up(self):
+ if libs_available is False:
+ pytest.skip('GTKUI dependencies not available')
+
+ # MainWindow loads this config file, so lets make sure it contains the defaults
+ ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS)
+ self.mainwindow = MainWindow()
+ self.torrentview = TorrentView()
+ self.torrentdetails = TorrentDetails()
+ self.menubar = MenuBar()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_torrentview_columns(self):
+ assert self.torrentview.column_index == self.default_column_index
+ assert self.torrentview.liststore_columns == self.default_liststore_columns
+ assert self.torrentview.columns['Download Folder'].column_indices == [30]
+
+ def test_add_column(self):
+ # Add a text column
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 1
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 1
+ assert self.torrentview.column_index[-1] == test_col
+ assert self.torrentview.columns[test_col].column_indices == [33]
+
+ def test_add_columns(self):
+ # Add a text column
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+
+ # Add a second text column
+ test_col2 = 'Test column2'
+ self.torrentview.add_text_column(test_col2, status_field=['label2'])
+
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 2
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 2
+ # test_col
+ assert self.torrentview.column_index[-2] == test_col
+ assert self.torrentview.columns[test_col].column_indices == [33]
+
+ # test_col2
+ assert self.torrentview.column_index[-1] == test_col2
+ assert self.torrentview.columns[test_col2].column_indices == [34]
+
+ def test_remove_column(self):
+ # Add and remove text column
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+ self.torrentview.remove_column(test_col)
+
+ assert len(self.torrentview.liststore_columns) == len(
+ self.default_liststore_columns
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index)
+ assert self.torrentview.column_index[-1] == self.default_column_index[-1]
+ assert self.torrentview.columns[
+ self.default_column_index[-1]
+ ].column_indices == [32]
+
+ def test_remove_columns(self):
+ # Add two columns
+ test_col = 'Test column'
+ self.torrentview.add_text_column(test_col, status_field=['label'])
+ test_col2 = 'Test column2'
+ self.torrentview.add_text_column(test_col2, status_field=['label2'])
+
+ # Remove test_col
+ self.torrentview.remove_column(test_col)
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 1
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 1
+ assert self.torrentview.column_index[-1] == test_col2
+ assert self.torrentview.columns[test_col2].column_indices == [33]
+
+ # Remove test_col2
+ self.torrentview.remove_column(test_col2)
+ assert len(self.torrentview.liststore_columns) == len(
+ self.default_liststore_columns
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index)
+ assert self.torrentview.column_index[-1] == self.default_column_index[-1]
+ assert self.torrentview.columns[
+ self.default_column_index[-1]
+ ].column_indices == [32]
+
+ def test_add_remove_column_multiple_types(self):
+ # Add a column with multiple column types
+ test_col3 = 'Test column3'
+ self.torrentview.add_progress_column(
+ test_col3, status_field=['progress', 'label3'], col_types=[float, str]
+ )
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 2
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 1
+ assert self.torrentview.column_index[-1] == test_col3
+ assert self.torrentview.columns[test_col3].column_indices == [33, 34]
+
+ # Remove multiple column-types column
+ self.torrentview.remove_column(test_col3)
+
+ assert len(self.torrentview.liststore_columns) == len(
+ self.default_liststore_columns
+ )
+ assert len(self.torrentview.column_index) == len(self.default_column_index)
+ assert self.torrentview.column_index[-1] == self.default_column_index[-1]
+ assert self.torrentview.columns[
+ self.default_column_index[-1]
+ ].column_indices == [32]
diff --git a/deluge/tests/test_tracker_icons.py b/deluge/tests/test_tracker_icons.py
new file mode 100644
index 0000000..57cc138
--- /dev/null
+++ b/deluge/tests/test_tracker_icons.py
@@ -0,0 +1,71 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import os.path
+
+import pytest
+
+import deluge.component as component
+import deluge.ui.tracker_icons
+from deluge.conftest import BaseTestCase
+from deluge.ui.tracker_icons import TrackerIcon, TrackerIcons
+
+from . import common
+
+common.disable_new_release_check()
+
+
+@pytest.mark.internet
+class TestTrackerIcons(BaseTestCase):
+ def set_up(self):
+ # Disable resizing with Pillow for consistency.
+ self.patch(deluge.ui.tracker_icons, 'Image', None)
+ self.icons = TrackerIcons()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ async def test_get_deluge_png(self, mock_mkstemp):
+ # Deluge has a png favicon link
+ icon = TrackerIcon(common.get_test_data_file('deluge.png'))
+ result = await self.icons.fetch('deluge-torrent.org')
+ assert result == icon
+ assert not os.path.isfile(mock_mkstemp[1])
+
+ async def test_get_google_ico(self):
+ # Google doesn't have any icon links
+ # So instead we'll grab its favicon.ico
+ icon = TrackerIcon(common.get_test_data_file('google.ico'))
+ result = await self.icons.fetch('www.google.com')
+ assert result == icon
+
+ async def test_get_google_ico_hebrew(self):
+ """Test that Google.co.il page is read as UTF-8"""
+ icon = TrackerIcon(common.get_test_data_file('google.ico'))
+ result = await self.icons.fetch('www.google.co.il')
+ assert result == icon
+
+ async def test_get_google_ico_with_redirect(self):
+ # google.com redirects to www.google.com
+ icon = TrackerIcon(common.get_test_data_file('google.ico'))
+ result = await self.icons.fetch('google.com')
+ assert result == icon
+
+ @pytest.mark.skip(reason='Site removed favicon, new SNI test will be needed')
+ async def test_get_seo_svg_with_sni(self):
+ # seo using certificates with SNI support only
+ icon = TrackerIcon(common.get_test_data_file('seo.svg'))
+ result = await self.icons.fetch('www.seo.com')
+ assert result == icon
+
+ async def test_get_empty_string_tracker(self):
+ result = await self.icons.fetch('')
+ assert result is None
+
+ async def test_invalid_host(self, mock_mkstemp):
+ """Test that TrackerIcon can handle invalid hostname"""
+ result = await self.icons.fetch('deluge.example.com')
+ assert not result
+ assert not os.path.isfile(mock_mkstemp[1])
diff --git a/deluge/tests/test_transfer.py b/deluge/tests/test_transfer.py
new file mode 100644
index 0000000..92e349b
--- /dev/null
+++ b/deluge/tests/test_transfer.py
@@ -0,0 +1,398 @@
+#
+# Copyright (C) 2012 Bro <bro.development@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import base64
+
+import pytest
+import rencode
+
+import deluge.log
+from deluge.transfer import DelugeTransferProtocol
+
+deluge.log.setup_logger('none')
+
+
+class TransferTestClass(DelugeTransferProtocol):
+ def __init__(self):
+ DelugeTransferProtocol.__init__(self)
+ self.transport = self
+ self.messages_out = []
+ self.messages_in = []
+ self.packet_count = 0
+
+ def write(self, message):
+ """
+ Called by DelugeTransferProtocol class
+ This simulates the write method of the self.transport in DelugeTransferProtocol.
+ """
+ self.messages_out.append(message)
+
+ def message_received(self, message):
+ """
+ This method overrides message_received is DelugeTransferProtocol and is
+ called with the complete message as it was sent by DelugeRPCProtocol
+ """
+ self.messages_in.append(message)
+
+ def get_messages_out_joined(self):
+ return b''.join(self.messages_out)
+
+ def get_messages_in(self):
+ return self.messages_in
+
+ def data_received_old_protocol(self, data):
+ """
+ This is the original method logic (as close as possible) for handling data receival on the client
+
+ :param data: a zlib compressed string encoded with rencode.
+
+ """
+ import zlib
+
+ print('\n=== New Data Received ===\nBytes received:', len(data))
+
+ if self._buffer:
+ # We have some data from the last dataReceived() so lets prepend it
+ print('Current buffer:', len(self._buffer) if self._buffer else '0')
+ data = self._buffer + data
+ self._buffer = None
+
+ self.packet_count += 1
+ self._bytes_received += len(data)
+
+ while data:
+ print('\n-- Handle packet data --')
+
+ print('Bytes received:', self._bytes_received)
+ print('Current data:', len(data))
+
+ if self._message_length == 0:
+ # handle_new_message uses _buffer so set data to _buffer.
+ self._buffer = data
+ self._handle_new_message()
+ data = self._buffer
+ self._buffer = None
+ self.packet_count = 1
+ print('New message of length:', self._message_length)
+
+ dobj = zlib.decompressobj()
+ try:
+ request = rencode.loads(dobj.decompress(data))
+ print('Successfully loaded message', end=' ')
+ print(
+ ' - Buffer length: %d, data length: %d, unused length: %d'
+ % (
+ len(data),
+ len(data) - len(dobj.unused_data),
+ len(dobj.unused_data),
+ )
+ )
+ print('Packet count:', self.packet_count)
+ except Exception as ex:
+ # log.debug('Received possible invalid message (%r): %s', data, e)
+ # This could be cut-off data, so we'll save this in the buffer
+ # and try to prepend it on the next dataReceived()
+ self._buffer = data
+ print(
+ 'Failed to load buffer (size %d): %s' % (len(self._buffer), str(ex))
+ )
+ return
+ else:
+ data = dobj.unused_data
+ self._message_length = 0
+
+ self.message_received(request)
+
+
+class TestDelugeTransferProtocol:
+ @pytest.fixture(autouse=True)
+ def set_up(self):
+ """
+ The expected messages corresponds to the test messages (msg1, msg2) after they've been processed
+ by DelugeTransferProtocol.send, which means that they've first been encoded with rencode,
+ and then compressed with zlib.
+ The expected messages are encoded in base64 to easily including it here in the source.
+ So before comparing the results with the expected messages, the expected messages must be decoded,
+ or the result message be encoded in base64.
+
+ """
+ self.transfer = TransferTestClass()
+ self.msg1 = (
+ 0,
+ 1,
+ {'key_int': 1242429423},
+ {'key_str': b'some string'},
+ {'key_bool': True},
+ )
+ self.msg2 = (
+ 2,
+ 3,
+ {'key_float': 12424.29423},
+ {'key_unicode': 'some string'},
+ {'key_dict_with_tuple': {'key_tuple': (1, 2, 3)}},
+ {'keylist': [4, '5', 6.7]},
+ )
+
+ self.msg1_expected_compressed_base64 = (
+ b'AQAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU'
+ b'XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ'
+ )
+ self.msg2_expected_compressed_base64 = (
+ b'AQAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3il+ZlJuenpH'
+ b'YX5+emKhSXFGXmpadPBkmkZCaXxJdnlmTEl5QW5KRCdIOZhxmB'
+ b'hrUDuTmZxSWHWRpNnRyupaUBAHYlJxI='
+ )
+
+ def test_send_one_message(self):
+ """
+ Send one message and test that it has been sent correctoly to the
+ method 'write' in self.transport.
+
+ """
+ self.transfer.transfer_message(self.msg1)
+ # Get the data as sent by DelugeTransferProtocol
+ messages = self.transfer.get_messages_out_joined()
+ base64_encoded = base64.b64encode(messages)
+ assert base64_encoded == self.msg1_expected_compressed_base64
+
+ def test_receive_one_message(self):
+ """
+ Receive one message and test that it has been sent to the
+ method 'message_received'.
+
+ """
+ self.transfer.dataReceived(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ # Get the data as sent by DelugeTransferProtocol
+ messages = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(messages)
+
+ def test_receive_old_message(self):
+ """
+ Receive an old message (with no header) and verify that the data is discarded.
+
+ """
+ self.transfer.dataReceived(rencode.dumps(self.msg1))
+ assert len(self.transfer.get_messages_in()) == 0
+ assert self.transfer._message_length == 0
+ assert len(self.transfer._buffer) == 0
+
+ def test_receive_two_concatenated_messages(self):
+ """
+ This test simply concatenates two messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data as one string.
+
+ """
+ two_concatenated = base64.b64decode(
+ self.msg1_expected_compressed_base64
+ ) + base64.b64decode(self.msg2_expected_compressed_base64)
+ self.transfer.dataReceived(two_concatenated)
+
+ # Get the data as sent by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
+ message2 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
+
+ def test_receive_three_messages_in_parts(self):
+ """
+ This test concatenates three messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data in multiple parts.
+
+ """
+ msg_bytes = (
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ + base64.b64decode(self.msg2_expected_compressed_base64)
+ + base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ packet_size = 40
+
+ one_message_byte_count = len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ two_messages_byte_count = one_message_byte_count + len(
+ base64.b64decode(self.msg2_expected_compressed_base64)
+ )
+ three_messages_byte_count = two_messages_byte_count + len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+
+ for d in self.receive_parts_helper(msg_bytes, packet_size):
+ bytes_received = self.transfer.get_bytes_recv()
+
+ if bytes_received >= three_messages_byte_count:
+ expected_msgs_received_count = 3
+ elif bytes_received >= two_messages_byte_count:
+ expected_msgs_received_count = 2
+ elif bytes_received >= one_message_byte_count:
+ expected_msgs_received_count = 1
+ else:
+ expected_msgs_received_count = 0
+ # Verify that the expected number of complete messages has arrived
+ assert expected_msgs_received_count == len(self.transfer.get_messages_in())
+
+ # Get the data as received by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
+ message2 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
+ message3 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(message3)
+
+ # Remove underscore to enable test, or run the test directly:
+ def _test_rencode_fail_protocol(self):
+ """
+ This test tries to test the protocol that relies on errors from rencode.
+
+ """
+ msg_bytes = (
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ + base64.b64decode(self.msg2_expected_compressed_base64)
+ + base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ packet_size = 149
+
+ one_message_byte_count = len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+ two_messages_byte_count = one_message_byte_count + len(
+ base64.b64decode(self.msg2_expected_compressed_base64)
+ )
+ three_messages_byte_count = two_messages_byte_count + len(
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ )
+
+ print()
+
+ print(
+ 'Msg1 size:',
+ len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4,
+ )
+ print(
+ 'Msg2 size:',
+ len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4,
+ )
+ print(
+ 'Msg3 size:',
+ len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4,
+ )
+
+ print('one_message_byte_count:', one_message_byte_count)
+ print('two_messages_byte_count:', two_messages_byte_count)
+ print('three_messages_byte_count:', three_messages_byte_count)
+
+ for d in self.receive_parts_helper(
+ msg_bytes, packet_size, self.transfer.data_received_old_protocol
+ ):
+ bytes_received = self.transfer.get_bytes_recv()
+
+ if bytes_received >= three_messages_byte_count:
+ expected_msgs_received_count = 3
+ elif bytes_received >= two_messages_byte_count:
+ expected_msgs_received_count = 2
+ elif bytes_received >= one_message_byte_count:
+ expected_msgs_received_count = 1
+ else:
+ expected_msgs_received_count = 0
+ # Verify that the expected number of complete messages has arrived
+ if expected_msgs_received_count != len(self.transfer.get_messages_in()):
+ print(
+ 'Expected number of messages received is %d, but %d have been received.'
+ % (
+ expected_msgs_received_count,
+ len(self.transfer.get_messages_in()),
+ )
+ )
+
+ # Get the data as received by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
+ message2 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
+ message3 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(message3)
+
+ def test_receive_middle_of_header(self):
+ """
+ This test concatenates two messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data in two parts.
+ The first part contains the first message, plus two bytes of the next message.
+ The next part contains the rest of the message.
+
+ This is a special case, as DelugeTransferProtocol can't start parsing
+ a message until it has at least 5 bytes (the size of the header) to be able
+ to read and parse the size of the payload.
+
+ """
+ two_concatenated = base64.b64decode(
+ self.msg1_expected_compressed_base64
+ ) + base64.b64decode(self.msg2_expected_compressed_base64)
+ first_len = len(base64.b64decode(self.msg1_expected_compressed_base64))
+
+ # Now found the entire first message, and half the header of the next message (2 bytes into the header)
+ self.transfer.dataReceived(two_concatenated[: first_len + 2])
+
+ # Should be 1 message in the list
+ assert 1 == len(self.transfer.get_messages_in())
+
+ # Send the rest
+ self.transfer.dataReceived(two_concatenated[first_len + 2 :])
+
+ # Should be 2 messages in the list
+ assert 2 == len(self.transfer.get_messages_in())
+
+ # Get the data as sent by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
+ message2 = self.transfer.get_messages_in().pop(0)
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
+
+ # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon
+ # def test_simulate_big_transfer(self):
+ # filename = '../deluge.torrentlist'
+ #
+ # f = open(filename, 'r')
+ # data = f.read()
+ # message_to_send = eval(data)
+ # self.transfer.transfer_message(message_to_send)
+ #
+ # Get the data as sent to the network by DelugeTransferProtocol
+ # compressed_data = self.transfer.get_messages_out_joined()
+ # packet_size = 16000 # Or something smaller...
+ #
+ # for d in self.receive_parts_helper(compressed_data, packet_size):
+ # bytes_recv = self.transfer.get_bytes_recv()
+ # if bytes_recv < len(compressed_data):
+ # self.assertEqual(len(self.transfer.get_messages_in()), 0)
+ # else:
+ # self.assertEqual(len(self.transfer.get_messages_in()), 1)
+ # Get the data as received by DelugeTransferProtocol
+ # transfered_message = self.transfer.get_messages_in().pop(0)
+ # Test that the data structures are equal
+ # self.assertEqual(transfered_message, message_to_send)
+ # self.assertTrue(transfered_message == message_to_send)
+ #
+ # f.close()
+ # f = open('rencode.torrentlist', 'w')
+ # f.write(str(transfered_message))
+ # f.close()
+
+ def receive_parts_helper(self, data, packet_size, receive_func=None):
+ byte_count = len(data)
+ sent_bytes = 0
+ while byte_count > 0:
+ to_receive = packet_size if byte_count > packet_size else byte_count
+ sent_bytes += to_receive
+ byte_count -= to_receive
+ if receive_func:
+ receive_func(data[:to_receive])
+ else:
+ self.transfer.dataReceived(data[:to_receive])
+ data = data[to_receive:]
+ yield
diff --git a/deluge/tests/test_ui_common.py b/deluge/tests/test_ui_common.py
new file mode 100644
index 0000000..87a4a2c
--- /dev/null
+++ b/deluge/tests/test_ui_common.py
@@ -0,0 +1,290 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from deluge.ui.common import TorrentInfo
+
+from . import common
+
+
+class TestUICommon:
+ def test_hash_optional_single_file(self):
+ """Ensure single file with `ed2k` and `sha1` keys are not in filetree output."""
+ filename = common.get_test_data_file('test.torrent')
+ files_tree = {'azcvsupdater_2.6.2.jar': (0, 307949, True)}
+ ti = TorrentInfo(filename, filetree=1)
+ assert ti.files_tree == files_tree
+
+ files_tree2 = {
+ 'contents': {
+ 'azcvsupdater_2.6.2.jar': {
+ 'type': 'file',
+ 'index': 0,
+ 'length': 307949,
+ 'download': True,
+ }
+ }
+ }
+ ti = TorrentInfo(filename, filetree=2)
+ assert ti.files_tree == files_tree2
+
+ def test_hash_optional_multi_file(self):
+ """Ensure multi-file with `filehash` and `ed2k` are keys not in filetree output."""
+ filename = common.get_test_data_file('filehash_field.torrent')
+ files_tree = {
+ 'torrent_filehash': {
+ 'tull.txt': (0, 54, True),
+ '還在一個人無聊嗎~還不趕緊上來聊天美.txt': (1, 54, True),
+ }
+ }
+ ti = TorrentInfo(filename, filetree=1)
+ assert ti.files_tree == files_tree
+
+ files_tree2 = {
+ 'contents': {
+ 'torrent_filehash': {
+ 'type': 'dir',
+ 'contents': {
+ 'tull.txt': {
+ 'type': 'file',
+ 'path': 'torrent_filehash/tull.txt',
+ 'length': 54,
+ 'index': 0,
+ 'download': True,
+ },
+ '還在一個人無聊嗎~還不趕緊上來聊天美.txt': {
+ 'type': 'file',
+ 'path': 'torrent_filehash/還在一個人無聊嗎~還不趕緊上來聊天美.txt',
+ 'length': 54,
+ 'index': 1,
+ 'download': True,
+ },
+ },
+ 'length': 108,
+ 'download': True,
+ }
+ },
+ 'type': 'dir',
+ }
+ ti = TorrentInfo(filename, filetree=2)
+ assert ti.files_tree == files_tree2
+
+ def test_hash_optional_md5sum(self):
+ # Ensure `md5sum` key is not included in filetree output
+ filename = common.get_test_data_file('md5sum.torrent')
+ files_tree = {'test': {'lol': (0, 4, True), 'rofl': (1, 5, True)}}
+ ti = TorrentInfo(filename, filetree=1)
+ assert ti.files_tree == files_tree
+ ti = TorrentInfo(filename, filetree=2)
+ files_tree2 = {
+ 'contents': {
+ 'test': {
+ 'type': 'dir',
+ 'contents': {
+ 'lol': {
+ 'type': 'file',
+ 'path': 'test/lol',
+ 'index': 0,
+ 'length': 4,
+ 'download': True,
+ },
+ 'rofl': {
+ 'type': 'file',
+ 'path': 'test/rofl',
+ 'index': 1,
+ 'length': 5,
+ 'download': True,
+ },
+ },
+ 'length': 9,
+ 'download': True,
+ }
+ },
+ 'type': 'dir',
+ }
+ assert ti.files_tree == files_tree2
+
+ def test_utf8_encoded_paths(self):
+ filename = common.get_test_data_file('test.torrent')
+ ti = TorrentInfo(filename)
+ assert 'azcvsupdater_2.6.2.jar' in ti.files_tree
+
+ def test_utf8_encoded_paths2(self):
+ filename = common.get_test_data_file('unicode_filenames.torrent')
+ filepath1 = '\u30c6\u30af\u30b9\u30fb\u30c6\u30af\u30b5\u30f3.mkv'
+ filepath2 = (
+ '\u041c\u0438\u0445\u0430\u0438\u043b \u0413\u043e'
+ '\u0440\u0431\u0430\u0447\u0451\u0432.mkv'
+ )
+ filepath3 = "Alisher ibn G'iyosiddin Navoiy.mkv"
+ filepath4 = 'Ascii title.mkv'
+ filepath5 = '\u09b8\u09c1\u0995\u09c1\u09ae\u09be\u09b0 \u09b0\u09be\u09df.mkv'
+
+ ti = TorrentInfo(filename)
+ files_tree = ti.files_tree['unicode_filenames']
+ assert filepath1 in files_tree
+ assert filepath2 in files_tree
+ assert filepath3 in files_tree
+ assert filepath4 in files_tree
+ assert filepath5 in files_tree
+
+ result_files = [
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath3,
+ 'size': 126158658,
+ },
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath4,
+ 'size': 189321363,
+ },
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath2,
+ 'size': 106649699,
+ },
+ {
+ 'download': True,
+ 'path': 'unicode_filenames/' + filepath5,
+ 'size': 21590269,
+ },
+ {'download': True, 'path': 'unicode_filenames/' + filepath1, 'size': 1771},
+ ]
+
+ assert len(ti.files) == len(result_files)
+
+ def test_directory_with_single_file(self):
+ filename = common.get_test_data_file('dir_with_single_file.torrent')
+
+ ti = TorrentInfo(filename)
+ expected_file_tree = {'dir_with_single_file': {'single_file.txt': (0, 9, True)}}
+ assert ti.files_tree == expected_file_tree
+
+ result_files = [
+ {
+ 'path': 'dir_with_single_file/single_file.txt',
+ 'size': 9,
+ 'download': True,
+ }
+ ]
+ assert ti.files == result_files
+
+ def test_bittorrent_v2_path(self):
+ filename = common.get_test_data_file('v2_test.torrent')
+ files_tree = {
+ 'torrent_test': {
+ 'small.txt': (0, 22, True),
+ '還在一個人無聊嗎~還不趕緊上來聊天美.txt': (1, 32, True),
+ }
+ }
+ ti = TorrentInfo(filename, filetree=1)
+ assert ti.files_tree == files_tree
+
+ files_tree2 = {
+ 'contents': {
+ 'torrent_test': {
+ 'type': 'dir',
+ 'contents': {
+ 'small.txt': {
+ 'type': 'file',
+ 'path': 'torrent_test/small.txt',
+ 'length': 22,
+ 'index': 0,
+ 'download': True,
+ },
+ '還在一個人無聊嗎~還不趕緊上來聊天美.txt': {
+ 'type': 'file',
+ 'path': 'torrent_test/還在一個人無聊嗎~還不趕緊上來聊天美.txt',
+ 'length': 32,
+ 'index': 1,
+ 'download': True,
+ },
+ },
+ 'length': 54,
+ 'download': True,
+ }
+ },
+ 'type': 'dir',
+ }
+ ti = TorrentInfo(filename, filetree=2)
+ assert ti.files_tree == files_tree2
+
+ def test_bittorrent_v2_hybrid_path(self):
+ filename = common.get_test_data_file('v2_hybrid.torrent')
+ files_tree = {
+ 'torrent_test': {
+ 'small.txt': (0, 22, True),
+ '還在一個人無聊嗎~還不趕緊上來聊天美.txt': (2, 32, True),
+ '.pad': {
+ '16362': (1, 16362, True),
+ '16352': (3, 16352, True),
+ },
+ }
+ }
+ ti = TorrentInfo(filename, filetree=1, force_bt_version=1)
+ assert ti.files_tree == files_tree
+ del files_tree['torrent_test']['.pad']
+ files_tree['torrent_test']['還在一個人無聊嗎~還不趕緊上來聊天美.txt'] = (1, 32, True)
+ ti = TorrentInfo(filename, filetree=1, force_bt_version=2)
+ assert ti.files_tree == files_tree
+
+ files_tree2 = {
+ 'contents': {
+ 'torrent_test': {
+ 'type': 'dir',
+ 'contents': {
+ 'small.txt': {
+ 'type': 'file',
+ 'path': 'torrent_test/small.txt',
+ 'length': 22,
+ 'index': 0,
+ 'download': True,
+ },
+ '還在一個人無聊嗎~還不趕緊上來聊天美.txt': {
+ 'type': 'file',
+ 'path': 'torrent_test/還在一個人無聊嗎~還不趕緊上來聊天美.txt',
+ 'length': 32,
+ 'index': 2,
+ 'download': True,
+ },
+ '.pad': {
+ 'type': 'dir',
+ 'contents': {
+ '16362': {
+ 'type': 'file',
+ 'path': 'torrent_test/.pad/16362',
+ 'length': 16362,
+ 'index': 1,
+ 'download': True,
+ },
+ '16352': {
+ 'type': 'file',
+ 'path': 'torrent_test/.pad/16352',
+ 'length': 16352,
+ 'index': 3,
+ 'download': True,
+ },
+ },
+ 'length': 32714,
+ 'download': True,
+ },
+ },
+ 'length': 32768,
+ 'download': True,
+ }
+ },
+ 'type': 'dir',
+ }
+ ti = TorrentInfo(filename, filetree=2, force_bt_version=1)
+ assert ti.files_tree == files_tree2
+ torrent_test = files_tree2['contents']['torrent_test']
+ torrent_test['length'] -= torrent_test['contents']['.pad']['length']
+ del torrent_test['contents']['.pad']
+ torrent_test['contents']['還在一個人無聊嗎~還不趕緊上來聊天美.txt']['index'] = 1
+ ti = TorrentInfo(filename, filetree=2, force_bt_version=2)
+ assert ti.files_tree == files_tree2
diff --git a/deluge/tests/test_ui_console.py b/deluge/tests/test_ui_console.py
new file mode 100644
index 0000000..34398ee
--- /dev/null
+++ b/deluge/tests/test_ui_console.py
@@ -0,0 +1,80 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import argparse
+
+import pytest
+
+from deluge.ui.console.cmdline.commands.add import Command
+from deluge.ui.console.cmdline.commands.config import json_eval
+from deluge.ui.console.widgets.fields import TextInput
+
+
+class MockParent:
+ def __init__(self):
+ self.border_off_x = 1
+ self.pane_width = 20
+ self.encoding = 'utf8'
+
+
+class TestUIConsoleField:
+ @pytest.fixture(autouse=True)
+ def set_up(self):
+ self.parent = MockParent()
+
+ def test_text_input(self):
+ def move_func(self, r, c):
+ self._cursor_row = r
+ self._cursor_col = c
+
+ t = TextInput(
+ self.parent,
+ 'name',
+ 'message',
+ move_func,
+ 20,
+ '/text/field/file/path',
+ complete=False,
+ )
+ assert t
+ assert t.handle_read(33)
+
+
+class TestUIConsoleCommands:
+ def test_add_move_completed(self):
+ completed_path = 'completed_path'
+ parser = argparse.ArgumentParser()
+ cmd = Command()
+ cmd.add_arguments(parser)
+ args = parser.parse_args(['torrent', '-m', completed_path])
+ assert args.move_completed_path == completed_path
+ args = parser.parse_args(['torrent', '--move-path', completed_path])
+ assert args.move_completed_path == completed_path
+
+ def test_config_json_eval(self):
+ assert json_eval('/downloads') == '/downloads'
+ assert json_eval('/dir/with space') == '/dir/with space'
+ assert json_eval('c:\\\\downloads') == 'c:\\\\downloads'
+ assert json_eval('c:/downloads') == 'c:/downloads'
+ # Ensure newlines are split and only first setting is used.
+ assert json_eval('setting\nwithneline') == 'setting'
+ # Allow both parentheses and square brackets.
+ assert json_eval('(8000, 8001)') == [8000, 8001]
+ assert json_eval('[8000, 8001]') == [8000, 8001]
+ assert json_eval('["abc", "def"]') == ['abc', 'def']
+ assert json_eval('{"foo": "bar"}') == {'foo': 'bar'}
+ assert json_eval('{"number": 1234}') == {'number': 1234}
+ # Hex string for peer_tos.
+ assert json_eval('0x00') == '0x00'
+ assert json_eval('1000') == 1000
+ assert json_eval('-6') == -6
+ assert json_eval('10.5') == 10.5
+ assert json_eval('True')
+ assert not json_eval('false')
+ assert json_eval('none') is None
+ # Empty values to clear config key.
+ assert json_eval('[]') == []
+ assert json_eval('') == ''
diff --git a/deluge/tests/test_ui_entry.py b/deluge/tests/test_ui_entry.py
new file mode 100644
index 0000000..9a1330e
--- /dev/null
+++ b/deluge/tests/test_ui_entry.py
@@ -0,0 +1,440 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import argparse
+import sys
+from io import StringIO
+from unittest import mock
+
+import pytest
+import pytest_twisted
+from twisted.internet import defer
+
+import deluge
+import deluge.component as component
+import deluge.ui.console
+import deluge.ui.console.cmdline.commands.quit
+import deluge.ui.console.main
+import deluge.ui.web.server
+from deluge.common import get_localhost_auth, windows_check
+from deluge.conftest import BaseTestCase
+from deluge.ui import ui_entry
+from deluge.ui.web.server import DelugeWeb
+
+from . import common
+from .daemon_base import DaemonBase
+
+DEBUG_COMMAND = False
+
+sys_stdout = sys.stdout
+# To catch output to stdout/stderr while running unit tests, we patch
+# the file descriptors in sys and argparse._sys with StringFileDescriptor.
+# Regular print statements from such tests will therefore write to the
+# StringFileDescriptor object instead of the terminal.
+# To print to terminal from the tests, use: print('Message...', file=sys_stdout)
+
+
+class StringFileDescriptor:
+ """File descriptor that writes to string buffer"""
+
+ def __init__(self, fd):
+ self.out = StringIO()
+ self.fd = fd
+ for a in ['encoding']:
+ setattr(self, a, getattr(sys_stdout, a))
+
+ def write(self, *data, **kwargs):
+ data_string = str(*data)
+ print(data_string, file=self.out, end='')
+
+ def flush(self):
+ self.out.flush()
+
+
+class UIBaseTestCase:
+ def set_up(self):
+ common.setup_test_logger(level='info', prefix=self.config_dir / self.id())
+ return component.start()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def exec_command(self):
+ if DEBUG_COMMAND:
+ print('Executing: %s\n' % sys.argv, file=sys_stdout)
+ return self.var['start_cmd']()
+
+
+class UIWithDaemonBaseTestCase(UIBaseTestCase, DaemonBase):
+ """Subclass for test that require a deluged daemon"""
+
+ def set_up(self):
+ d = self.common_set_up()
+ common.setup_test_logger(level='info', prefix=self.config_dir / self.id())
+ return d
+
+
+class TestDelugeEntry(BaseTestCase):
+ def set_up(self):
+ return component.start()
+
+ def tear_down(self):
+ return component.shutdown()
+
+ def test_deluge_help(self):
+ self.patch(sys, 'argv', ['./deluge', '-h'])
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ with pytest.raises(SystemExit):
+ ui_entry.start_ui()
+ assert 'usage: deluge' in fd.out.getvalue()
+ assert 'UI Options:' in fd.out.getvalue()
+ assert '* console' in fd.out.getvalue()
+
+ def test_start_default(self):
+ self.patch(sys, 'argv', ['./deluge'])
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ # Just test that no exception is raised
+ ui_entry.start_ui()
+
+ def test_start_with_log_level(self):
+ _level = []
+
+ def setup_logger(
+ level='error',
+ filename=None,
+ filemode='w',
+ logrotate=None,
+ output_stream=sys.stdout,
+ ):
+ _level.append(level)
+
+ self.patch(deluge.log, 'setup_logger', setup_logger)
+ self.patch(sys, 'argv', ['./deluge', '-L', 'info'])
+
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ # Just test that no exception is raised
+ ui_entry.start_ui()
+
+ assert _level[0] == 'info'
+
+
+class GtkUIBaseTestCase(UIBaseTestCase):
+ """Implement all GtkUI tests here"""
+
+ def test_start_gtk3ui(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'])
+
+ from deluge.ui.gtk3 import gtkui
+
+ with mock.patch.object(gtkui.GtkUI, 'start', autospec=True):
+ self.exec_command()
+
+
+@pytest.mark.gtkui
+class TestGtkUIDelugeScriptEntry(BaseTestCase, GtkUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge gtk',
+ 'start_cmd': ui_entry.start_ui,
+ 'sys_arg_cmd': ['./deluge', 'gtk'],
+ }
+
+
+@pytest.mark.gtkui
+class TestGtkUIScriptEntry(BaseTestCase, GtkUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ from deluge.ui import gtk3
+
+ request.cls.var = {
+ 'cmd_name': 'deluge-gtk',
+ 'start_cmd': gtk3.start,
+ 'sys_arg_cmd': ['./deluge-gtk'],
+ }
+
+
+class DelugeWebMock(DelugeWeb):
+ def __init__(self, *args, **kwargs):
+ kwargs['daemon'] = False
+ DelugeWeb.__init__(self, *args, **kwargs)
+
+
+class WebUIBaseTestCase(UIBaseTestCase):
+ """Implement all WebUI tests here"""
+
+ def test_start_webserver(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'])
+ self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock)
+ self.exec_command()
+
+ def test_start_web_with_log_level(self):
+ _level = []
+
+ def setup_logger(
+ level='error',
+ filename=None,
+ filemode='w',
+ logrotate=None,
+ output_stream=sys.stdout,
+ ):
+ _level.append(level)
+
+ self.patch(deluge.log, 'setup_logger', setup_logger)
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info'])
+
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'web'
+ config.save()
+
+ self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock)
+ self.exec_command()
+ assert _level[0] == 'info'
+
+
+class TestWebUIScriptEntry(BaseTestCase, WebUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge-web',
+ 'start_cmd': deluge.ui.web.start,
+ 'sys_arg_cmd': ['./deluge-web'],
+ }
+ if not windows_check():
+ request.cls.var['sys_arg_cmd'].append('--do-not-daemonize')
+
+
+class TestWebUIDelugeScriptEntry(BaseTestCase, WebUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge web',
+ 'start_cmd': ui_entry.start_ui,
+ 'sys_arg_cmd': ['./deluge', 'web'],
+ }
+ if not windows_check():
+ request.cls.var['sys_arg_cmd'].append('--do-not-daemonize')
+
+
+class ConsoleUIBaseTestCase(UIBaseTestCase):
+ """Implement Console tests that do not require a running daemon"""
+
+ def test_start_console(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'])
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.exec_command()
+
+ def test_start_console_with_log_level(self, request):
+ _level = []
+
+ def setup_logger(
+ level='error',
+ filename=None,
+ filemode='w',
+ logrotate=None,
+ output_stream=sys.stdout,
+ ):
+ _level.append(level)
+
+ self.patch(deluge.log, 'setup_logger', setup_logger)
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info'])
+
+ config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS)
+ config.config['default_ui'] = 'console'
+ config.save()
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ # Just test that no exception is raised
+ self.exec_command()
+
+ assert _level[0] == 'info'
+
+ def test_console_help(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-h'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ with pytest.raises(SystemExit):
+ self.exec_command()
+ std_output = fd.out.getvalue()
+ assert (
+ 'usage: %s' % self.var['cmd_name']
+ ) in std_output # Check command name
+ assert 'Common Options:' in std_output
+ assert 'Console Options:' in std_output
+ assert (
+ 'Console Commands:\n The following console commands are available:'
+ in std_output
+ )
+ assert 'The following console commands are available:' in std_output
+
+ def test_console_command_info(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ self.exec_command()
+
+ def test_console_command_info_help(self):
+ self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info', '-h'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stdout', fd)
+
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ with pytest.raises(SystemExit):
+ self.exec_command()
+ std_output = fd.out.getvalue()
+ assert 'usage: info' in std_output
+ assert 'Show information about the torrents' in std_output
+
+ def test_console_unrecognized_arguments(self):
+ self.patch(
+ sys, 'argv', ['./deluge', '--ui', 'console']
+ ) # --ui is not longer supported
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(argparse._sys, 'stderr', fd)
+ with mock.patch('deluge.ui.console.main.ConsoleUI'):
+ with pytest.raises(SystemExit):
+ self.exec_command()
+ assert 'unrecognized arguments: --ui' in fd.out.getvalue()
+
+
+class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase):
+ """Implement Console tests that require a running daemon"""
+
+ def set_up(self):
+ # Avoid calling reactor.shutdown after commands are executed by main.exec_args()
+ deluge.ui.console.main.reactor = common.ReactorOverride()
+ return UIWithDaemonBaseTestCase.set_up(self)
+
+ def patch_arg_command(self, command):
+ if type(command) == str:
+ command = [command]
+ username, password = get_localhost_auth()
+ self.patch(
+ sys,
+ 'argv',
+ self.var['sys_arg_cmd']
+ + ['--port']
+ + [str(self.listen_port)]
+ + ['--username']
+ + [username]
+ + ['--password']
+ + [password]
+ + command,
+ )
+
+ @pytest_twisted.inlineCallbacks
+ def test_console_command_add(self):
+ filename = common.get_test_data_file('test.torrent')
+ self.patch_arg_command([f'add "{filename}"'])
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(sys, 'stdout', fd)
+
+ yield self.exec_command()
+
+ std_output = fd.out.getvalue()
+ assert (
+ std_output
+ == 'Attempting to add torrent: ' + filename + '\nTorrent added!\n'
+ )
+
+ @pytest_twisted.inlineCallbacks
+ def test_console_command_add_move_completed(self):
+ filename = common.get_test_data_file('test.torrent')
+ tmp_path = 'c:\\tmp' if windows_check() else '/tmp'
+ self.patch_arg_command(
+ [
+ f'add --move-path "{tmp_path}" "{filename}" ; status'
+ ' ; manage'
+ ' ab570cdd5a17ea1b61e970bb72047de141bce173'
+ ' move_completed'
+ ' move_completed_path'
+ ]
+ )
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch(sys, 'stdout', fd)
+
+ yield self.exec_command()
+
+ std_output = fd.out.getvalue()
+ assert std_output.endswith(
+ f'move_completed: True\nmove_completed_path: {tmp_path}\n'
+ ) or std_output.endswith(
+ f'move_completed_path: {tmp_path}\nmove_completed: True\n'
+ )
+
+ async def test_console_command_status(self):
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch_arg_command(['status'])
+ self.patch(sys, 'stdout', fd)
+
+ await self.exec_command()
+
+ std_output = fd.out.getvalue()
+ assert std_output.startswith('Total upload: ')
+ assert std_output.endswith(' Moving: 0\n')
+
+ @defer.inlineCallbacks
+ def test_console_command_config_set_download_location(self):
+ fd = StringFileDescriptor(sys.stdout)
+ self.patch_arg_command(['config --set download_location /downloads'])
+ self.patch(sys, 'stdout', fd)
+
+ yield self.exec_command()
+ std_output = fd.out.getvalue()
+ assert std_output.startswith('Setting "download_location" to: \'/downloads\'')
+ assert std_output.endswith('Configuration value successfully updated.\n')
+
+
+@pytest.mark.usefixtures('daemon', 'client')
+class TestConsoleScriptEntryWithDaemon(BaseTestCase, ConsoleUIWithDaemonBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge-console',
+ 'start_cmd': deluge.ui.console.test_start,
+ 'sys_arg_cmd': ['./deluge-console'],
+ }
+
+
+class TestConsoleScriptEntry(BaseTestCase, ConsoleUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge-console',
+ 'start_cmd': deluge.ui.console.start,
+ 'sys_arg_cmd': ['./deluge-console'],
+ }
+
+
+class TestConsoleDelugeScriptEntry(BaseTestCase, ConsoleUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge console',
+ 'start_cmd': ui_entry.start_ui,
+ 'sys_arg_cmd': ['./deluge', 'console'],
+ }
diff --git a/deluge/tests/test_ui_gtk3.py b/deluge/tests/test_ui_gtk3.py
new file mode 100644
index 0000000..e6d025c
--- /dev/null
+++ b/deluge/tests/test_ui_gtk3.py
@@ -0,0 +1,30 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import sys
+from unittest import mock
+
+import pytest
+
+
+@pytest.mark.gtkui
+class TestGTK3Common:
+ def setUp(self):
+ sys.modules['gi.repository'] = mock.MagicMock()
+
+ def tearDown(self):
+ pass
+
+ def test_cmp(self):
+ from deluge.ui.gtk3.common import cmp
+
+ assert cmp(None, None) == 0
+ assert cmp(1, None) == 1
+ assert cmp(0, None) == 1
+ assert cmp(None, 7) == -1
+ assert cmp(None, 'bar') == -1
+ assert cmp('foo', None) == 1
+ assert cmp('', None) == 1
diff --git a/deluge/tests/test_web_api.py b/deluge/tests/test_web_api.py
new file mode 100644
index 0000000..814fecf
--- /dev/null
+++ b/deluge/tests/test_web_api.py
@@ -0,0 +1,202 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import json
+from io import BytesIO
+
+import pytest
+import pytest_twisted
+from twisted.internet import defer, reactor
+from twisted.web.client import Agent, FileBodyProducer
+from twisted.web.http_headers import Headers
+from twisted.web.static import File
+
+import deluge.component as component
+
+from . import common
+from .common_web import WebServerTestBase
+
+common.disable_new_release_check()
+
+
+class TestWebAPI(WebServerTestBase):
+ @pytest.mark.xfail(reason='This just logs an error at the moment.')
+ async def test_connect_invalid_host(self):
+ with pytest.raises(Exception):
+ await self.deluge_web.web_api.connect('id')
+
+ def test_connect(self, client):
+ d = self.deluge_web.web_api.connect(self.host_id)
+
+ def on_connect(result):
+ assert type(result) == tuple
+ assert len(result) > 0
+ return result
+
+ d.addCallback(on_connect)
+ d.addErrback(self.fail)
+ return d
+
+ def test_disconnect(self):
+ d = self.deluge_web.web_api.connect(self.host_id)
+
+ @defer.inlineCallbacks
+ def on_connect(result):
+ assert self.deluge_web.web_api.connected()
+ yield self.deluge_web.web_api.disconnect()
+ assert not self.deluge_web.web_api.connected()
+
+ d.addCallback(on_connect)
+ d.addErrback(self.fail)
+ return d
+
+ def test_get_config(self):
+ config = self.deluge_web.web_api.get_config()
+ assert self.deluge_web.port == config['port']
+
+ def test_set_config(self):
+ config = self.deluge_web.web_api.get_config()
+ config['pwd_salt'] = 'new_salt'
+ config['pwd_sha1'] = 'new_sha'
+ config['sessions'] = {
+ '233f23632af0a74748bc5dd1d8717564748877baa16420e6898e17e8aa365e6e': {
+ 'login': 'skrot',
+ 'expires': 1460030877.0,
+ 'level': 10,
+ }
+ }
+ self.deluge_web.web_api.set_config(config)
+ web_config = component.get('DelugeWeb').config.config
+ assert config['pwd_salt'] != web_config['pwd_salt']
+ assert config['pwd_sha1'] != web_config['pwd_sha1']
+ assert config['sessions'] != web_config['sessions']
+
+ @defer.inlineCallbacks
+ def get_host_status(self):
+ host = list(self.deluge_web.web_api._get_host(self.host_id))
+ host[3] = 'Online'
+ host[4] = '2.0.0.dev562'
+ status = yield self.deluge_web.web_api.get_host_status(self.host_id)
+ assert status == tuple(status)
+
+ def test_get_host(self):
+ assert not self.deluge_web.web_api._get_host('invalid_id')
+ conn = list(self.deluge_web.web_api.hostlist.get_hosts_info()[0])
+ assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4]
+
+ def test_add_host(self):
+ conn = ['abcdef', '10.0.0.1', 0, 'user123', 'pass123']
+ assert not self.deluge_web.web_api._get_host(conn[0])
+ # Add valid host
+ result, host_id = self.deluge_web.web_api.add_host(
+ conn[1], conn[2], conn[3], conn[4]
+ )
+ assert result
+ conn[0] = host_id
+ assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4]
+
+ # Add already existing host
+ ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4])
+ assert ret == (False, 'Host details already in hostlist')
+
+ # Add invalid port
+ conn[2] = 'bad port'
+ ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4])
+ assert ret == (False, 'Invalid port. Must be an integer')
+
+ def test_remove_host(self):
+ conn = ['connection_id', '', 0, '', '']
+ self.deluge_web.web_api.hostlist.config['hosts'].append(conn)
+ assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4]
+ # Remove valid host
+ assert self.deluge_web.web_api.remove_host(conn[0])
+ assert not self.deluge_web.web_api._get_host(conn[0])
+ # Remove non-existing host
+ assert not self.deluge_web.web_api.remove_host(conn[0])
+
+ def test_get_torrent_info(self):
+ filename = common.get_test_data_file('test.torrent')
+ ret = self.deluge_web.web_api.get_torrent_info(filename)
+ assert ret['name'] == 'azcvsupdater_2.6.2.jar'
+ assert ret['info_hash'] == 'ab570cdd5a17ea1b61e970bb72047de141bce173'
+ assert 'files_tree' in ret
+
+ def test_get_torrent_info_with_md5(self):
+ filename = common.get_test_data_file('md5sum.torrent')
+ ret = self.deluge_web.web_api.get_torrent_info(filename)
+ # JSON dumping happens during response creation in normal usage
+ # JSON serialization may fail if any of the dictionary items are byte arrays rather than strings
+ ret = json.loads(json.dumps(ret))
+ assert ret['name'] == 'test'
+ assert ret['info_hash'] == 'f6408ba9944cf9fe01b547b28f336b3ee6ec32c5'
+ assert 'files_tree' in ret
+
+ def test_get_magnet_info(self):
+ ret = self.deluge_web.web_api.get_magnet_info(
+ 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN'
+ )
+ assert ret['name'] == '953bad769164e8482c7785a21d12166f94b9e14d'
+ assert ret['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d'
+ assert 'files_tree' in ret
+
+ @pytest_twisted.inlineCallbacks
+ def test_get_torrent_files(self):
+ yield self.deluge_web.web_api.connect(self.host_id)
+ filename = common.get_test_data_file('test.torrent')
+ torrents = [
+ {'path': filename, 'options': {'download_location': '/home/deluge/'}}
+ ]
+ yield self.deluge_web.web_api.add_torrents(torrents)
+ ret = yield self.deluge_web.web_api.get_torrent_files(
+ 'ab570cdd5a17ea1b61e970bb72047de141bce173'
+ )
+ assert ret['type'] == 'dir'
+ assert ret['contents'] == {
+ 'azcvsupdater_2.6.2.jar': {
+ 'priority': 4,
+ 'index': 0,
+ 'offset': 0,
+ 'progress': 0.0,
+ 'path': 'azcvsupdater_2.6.2.jar',
+ 'type': 'file',
+ 'size': 307949,
+ }
+ }
+
+ @pytest_twisted.inlineCallbacks
+ def test_download_torrent_from_url(self):
+ filename = 'ubuntu-9.04-desktop-i386.iso.torrent'
+ self.deluge_web.top_level.putChild(
+ filename.encode(), File(common.get_test_data_file(filename))
+ )
+ url = 'http://localhost:%d/%s' % (self.deluge_web.port, filename)
+ res = yield self.deluge_web.web_api.download_torrent_from_url(url)
+ assert res.endswith(filename)
+
+ @pytest_twisted.inlineCallbacks
+ def test_invalid_json(self):
+ """
+ If json_api._send_response does not return server.NOT_DONE_YET
+ this error is thrown when json is invalid:
+ exceptions.RuntimeError: Request.write called on a request after Request.finish was called.
+
+ """
+ agent = Agent(reactor)
+ bad_body = b'{ method": "auth.login" }'
+ d = yield agent.request(
+ b'POST',
+ b'http://127.0.0.1:%i/json' % self.deluge_web.port,
+ Headers(
+ {
+ b'User-Agent': [b'Twisted Web Client Example'],
+ b'Content-Type': [b'application/json'],
+ }
+ ),
+ FileBodyProducer(BytesIO(bad_body)),
+ )
+ yield d
diff --git a/deluge/tests/test_web_auth.py b/deluge/tests/test_web_auth.py
new file mode 100644
index 0000000..39d66c1
--- /dev/null
+++ b/deluge/tests/test_web_auth.py
@@ -0,0 +1,33 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+from unittest.mock import patch
+
+from deluge.ui.web import auth
+
+
+class MockConfig:
+ def __init__(self, config):
+ self.config = config
+
+ def __getitem__(self, key):
+ return self.config[key]
+
+ def __setitem__(self, key, value):
+ self.config[key] = value
+
+
+class TestWebAuth:
+ @patch('deluge.ui.web.auth.JSONComponent.__init__', return_value=None)
+ def test_change_password(self, mock_json):
+ config = MockConfig(
+ {
+ 'pwd_sha1': '8d8ff487626141d2b91025901d3ab57211180b48',
+ 'pwd_salt': '7555d757710158655bd1646e207dee21a89e9226',
+ }
+ )
+ webauth = auth.Auth(config)
+ assert webauth.change_password('deluge', 'deluge_new')
diff --git a/deluge/tests/test_webserver.py b/deluge/tests/test_webserver.py
new file mode 100644
index 0000000..9503f50
--- /dev/null
+++ b/deluge/tests/test_webserver.py
@@ -0,0 +1,108 @@
+#
+# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import json as json_lib
+from io import BytesIO
+
+import pytest
+import twisted.web.client
+from twisted.internet import reactor
+from twisted.web.client import Agent, FileBodyProducer
+from twisted.web.http_headers import Headers
+
+from . import common
+from .common import get_test_data_file
+from .common_web import WebServerMockBase, WebServerTestBase
+
+common.disable_new_release_check()
+
+
+class TestWebServer(WebServerTestBase, WebServerMockBase):
+ async def test_get_torrent_info(self):
+ agent = Agent(reactor)
+
+ self.mock_authentication_ignore(self.deluge_web.auth)
+
+ # This torrent file contains an uncommon field 'filehash' which must be hex
+ # encoded to allow dumping the torrent info to json. Otherwise it will fail with:
+ # UnicodeDecodeError: 'utf8' codec can't decode byte 0xe5 in position 0: invalid continuation byte
+ filename = get_test_data_file('filehash_field.torrent')
+ input_file = (
+ '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}'
+ % filename.replace('\\', '\\\\')
+ )
+ headers = {
+ b'User-Agent': ['Twisted Web Client Example'],
+ b'Content-Type': ['application/json'],
+ }
+ url = 'http://127.0.0.1:%s/json' % self.deluge_web.port
+
+ response = await agent.request(
+ b'POST',
+ url.encode(),
+ Headers(headers),
+ FileBodyProducer(BytesIO(input_file.encode())),
+ )
+ body = await twisted.web.client.readBody(response)
+
+ try:
+ json = json_lib.loads(body.decode())
+ except Exception:
+ print('aoeu')
+ assert json['error'] is None
+ assert 'torrent_filehash' == json['result']['name']
+
+ @pytest.mark.parametrize('base', ['', '/', 'deluge'])
+ async def test_base_with_config(self, base):
+ agent = Agent(reactor)
+ root_url = f'http://127.0.0.1:{self.deluge_web.port}'
+ base_url = f'{root_url}/{base}'
+
+ self.deluge_web.base = base
+
+ response = await agent.request(b'GET', root_url.encode())
+ assert response.code == 200
+ body = await twisted.web.client.readBody(response)
+ assert 'Deluge WebUI' in body.decode()
+
+ response = await agent.request(b'GET', base_url.encode())
+ assert response.code == 200
+
+ @pytest.mark.parametrize('base', ['/', 'deluge'])
+ async def test_base_with_config_recurring_basepath(self, base):
+ agent = Agent(reactor)
+ base_url = f'http://127.0.0.1:{self.deluge_web.port}/{base}'
+
+ self.deluge_web.base = base
+
+ response = await agent.request(b'GET', base_url.encode())
+ assert response.code == 200
+
+ recursive_url = f'{base_url}/{base}'
+ response = await agent.request(b'GET', recursive_url.encode())
+ assert response.code == 404 if base.strip('/') else 200
+
+ recursive_url = f'{recursive_url}/{base}'
+ response = await agent.request(b'GET', recursive_url.encode())
+ assert response.code == 404 if base.strip('/') else 200
+
+ async def test_base_with_deluge_header(self):
+ """Ensure base path is set and HTML contains path"""
+ agent = Agent(reactor)
+ base = 'deluge'
+ url = f'http://127.0.0.1:{self.deluge_web.port}'
+ headers = Headers({'X-Deluge-Base': [base]})
+
+ response = await agent.request(b'GET', url.encode(), headers)
+ body = await twisted.web.client.readBody(response)
+ assert f'href="/{base}/' in body.decode()
+
+ # Header only changes HTML base path so ensure no resource at server path
+ url = f'{url}/{base}'
+ response = await agent.request(b'GET', url.encode(), headers)
+ assert response.code == 404