diff options
Diffstat (limited to 'deluge/tests')
52 files changed, 6268 insertions, 0 deletions
diff --git a/deluge/tests/__init__.py b/deluge/tests/__init__.py new file mode 100644 index 0000000..7b6afa1 --- /dev/null +++ b/deluge/tests/__init__.py @@ -0,0 +1,17 @@ +# Increase open file descriptor limit to allow tests to run +# without getting error: what(): epoll: Too many open files +from deluge.i18n import setup_translation + +try: + import resource +except ImportError: # Does not exist on Windows + pass +else: + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536)) + except (ValueError, resource.error) as ex: + error = 'Failed to raise file descriptor limit: %s' % ex + # print(error) + +# Initialize gettext +setup_translation() diff --git a/deluge/tests/common.py b/deluge/tests/common.py new file mode 100644 index 0000000..b594156 --- /dev/null +++ b/deluge/tests/common.py @@ -0,0 +1,363 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os +import sys +import traceback + +import pytest +from twisted.internet import defer, protocol, reactor +from twisted.internet.defer import Deferred +from twisted.internet.error import CannotListenError + +import deluge.configmanager +import deluge.core.preferencesmanager +import deluge.log +from deluge.common import get_localhost_auth +from deluge.error import DelugeError +from deluge.ui.client import Client + +# This sets log level to critical, so use log.critical() to debug while running unit tests +deluge.log.setup_logger('none') + + +def disable_new_release_check(): + deluge.core.preferencesmanager.DEFAULT_PREFS['new_release_check'] = False + + +def setup_test_logger(level='info', prefix='deluge'): + deluge.log.setup_logger(level, filename='%s.log' % prefix, twisted_observer=False) + + +def get_test_data_file(filename): + return os.path.join(os.path.join(os.path.dirname(__file__), 'data'), filename) + + +def todo_test(caller): + # If we are using the delugereporter we can set todo mark on the test + # Without the delugereporter the todo would print a stack trace, so in + # that case we rely only on skipTest + if os.environ.get('DELUGE_REPORTER', None): + getattr(caller, caller._testMethodName).__func__.todo = 'To be fixed' + + filename = os.path.basename(traceback.extract_stack(None, 2)[0][0]) + funcname = traceback.extract_stack(None, 2)[0][2] + pytest.skip(f'TODO: {filename}:{funcname}') + + +def add_watchdog(deferred, timeout=0.05, message=None): + def callback(value): + if not watchdog.called and not watchdog.cancelled: + watchdog.cancel() + if not deferred.called: + if message: + print(message) + deferred.cancel() + return value + + deferred.addBoth(callback) + watchdog = reactor.callLater(timeout, defer.Deferred.addTimeout, deferred) + return watchdog + + +class ReactorOverride: + """Class used to patch reactor while running unit tests + to avoid starting and stopping the twisted reactor + """ + + def __getattr__(self, attr): + if attr == 'run': + return self._run + if attr == 'stop': + return self._stop + return getattr(reactor, attr) + + def _run(self): + pass + + def _stop(self): + pass + + def addReader(self, arg): # NOQA: N802 + pass + + +class ProcessOutputHandler(protocol.ProcessProtocol): + def __init__( + self, + script, + shutdown_func, + callbacks, + logfile=None, + print_stdout=True, + print_stderr=True, + ): + """Executes a script and handle the process' output to stdout and stderr. + + Args: + script (str): The script to execute. + shutdown_func (func): A function which will gracefully stop the called script. + callbacks (list): Callbacks to trigger if the expected output if found. + logfile (str, optional): Filename to wrote the process' output. + print_stderr (bool): Print the process' stderr output to stdout. + print_stdout (bool): Print the process' stdout output to stdout. + + """ + self.callbacks = callbacks + self.script = script + self.shutdown_func = shutdown_func + self.log_output = '' + self.stderr_out = '' + self.logfile = logfile + self.print_stdout = print_stdout + self.print_stderr = print_stderr + self.quit_d = None + self.killed = False + self.watchdogs = [] + + def connectionMade(self): # NOQA: N802 + self.transport.write(self.script) + self.transport.closeStdin() + + def outConnectionLost(self): # NOQA: N802 + if not self.logfile: + return + with open(self.logfile, 'w') as f: + f.write(self.log_output) + + @defer.inlineCallbacks + def kill(self): + """Kill the running process. + + Returns: + Deferred: A deferred that is triggered when the process has quit. + + """ + if self.killed: + return + self.killed = True + self._kill_watchdogs() + self.quit_d = Deferred() + shutdown = self.shutdown_func() + shutdown.addTimeout(5, reactor) + try: + yield shutdown + except Exception: + self.transport.signalProcess('TERM') + result = yield self.quit_d + return result + + def _kill_watchdogs(self): + """Cancel all watchdogs""" + for w in self.watchdogs: + if not w.called and not w.cancelled: + w.cancel() + + def processEnded(self, status): # NOQA: N802 + self.transport.loseConnection() + if self.quit_d is None: + return + if status.value.exitCode == 0: + self.quit_d.callback(True) + else: + self.quit_d.errback(status) + + def check_callbacks(self, data, cb_type='stdout'): + ret = False + for c in self.callbacks: + if cb_type not in c['types'] or c['deferred'].called: + continue + for trigger in c['triggers']: + if trigger['expr'] in data: + ret = True + if 'cb' in trigger: + trigger['cb'](self, c['deferred'], data, self.log_output) + elif 'value' not in trigger: + raise Exception('Trigger must specify either "cb" or "value"') + else: + val = trigger['value'](self, data, self.log_output) + if trigger.get('type', 'callback') == 'errback': + c['deferred'].errback(val) + else: + c['deferred'].callback(val) + return ret + + def outReceived(self, data): # NOQA: N802 + """Process output from stdout""" + data = data.decode('utf8') + self.log_output += data + if self.check_callbacks(data): + pass + elif '[ERROR' in data: + if not self.print_stdout: + return + print(data, end=' ') + + def errReceived(self, data): # NOQA: N802 + """Process output from stderr""" + data = data.decode('utf8') + self.log_output += data + self.stderr_out += data + self.check_callbacks(data, cb_type='stderr') + if not self.print_stderr: + return + data = '\n%s' % data.strip() + prefixed = data.replace('\n', '\nSTDERR: ') + print('\n%s' % prefixed) + + +def start_core( + listen_port=58900, + logfile=None, + timeout=10, + timeout_msg=None, + custom_script='', + print_stdout=True, + print_stderr=True, + extra_callbacks=None, + config_directory='', +): + """Start the deluge core as a daemon. + + Args: + listen_port (int, optional): The port the daemon listens for client connections. + logfile (str, optional): Logfile name to write the output from the process. + timeout (int): If none of the callbacks have been triggered before the timeout, the process is killed. + timeout_msg (str): The message to print when the timeout expires. + custom_script (str): Extra python code to insert into the daemon process script. + print_stderr (bool): If the output from the process' stderr should be printed to stdout. + print_stdout (bool): If the output from the process' stdout should be printed to stdout. + extra_callbacks (list): A list of dictionaries specifying extra callbacks. + + Returns: + tuple(Deferred, ProcessOutputHandler): + + The Deferred is fired when the core callback is triggered either after the default + output triggers are matched (daemon successfully started, or failed to start), + or upon timeout expiry. The ProcessOutputHandler is the handler for the deluged process. + + """ + daemon_script = """ +import sys +import deluge.core.daemon_entry + +from deluge.common import windows_check + +if windows_check(): + sys.argv.extend(['-c', '%(dir)s', '-L', 'info', '-p', '%(port)d']) +else: + sys.argv.extend(['-d', '-c', '%(dir)s', '-L', 'info', '-p', '%(port)d']) + +try: + daemon = deluge.core.daemon_entry.start_daemon(skip_start=True) + %(script)s + daemon.start() +except Exception: + import traceback + sys.stderr.write('Exception raised:\\n %%s' %% traceback.format_exc()) +""" % { + 'dir': config_directory.as_posix(), + 'port': listen_port, + 'script': custom_script, + } + + callbacks = [] + default_core_cb = {'deferred': Deferred(), 'types': 'stdout'} + if timeout: + default_core_cb['timeout'] = timeout + + # Specify the triggers for daemon log output + default_core_cb['triggers'] = [ + {'expr': 'Finished loading ', 'value': lambda reader, data, data_all: reader}, + { + 'expr': 'Cannot start deluged, listen port in use.', + 'type': 'errback', + 'value': lambda reader, data, data_all: CannotListenError( + 'localhost', + listen_port, + 'Could not start deluge test client!\n%s' % data, + ), + }, + { + 'expr': 'Traceback', + 'type': 'errback', + 'value': lambda reader, data, data_all: DelugeError( + 'Traceback found when starting daemon:\n%s' % data + ), + }, + ] + + callbacks.append(default_core_cb) + if extra_callbacks: + callbacks.extend(extra_callbacks) + + @defer.inlineCallbacks + def shutdown_daemon(): + username, password = get_localhost_auth() + client = Client() + yield client.connect( + 'localhost', listen_port, username=username, password=password + ) + yield client.daemon.shutdown() + + process_protocol = start_process( + daemon_script, shutdown_daemon, callbacks, logfile, print_stdout, print_stderr + ) + return default_core_cb['deferred'], process_protocol + + +def start_process( + script, shutdown_func, callbacks, logfile=None, print_stdout=True, print_stderr=True +): + """ + Starts an external python process which executes the given script. + + Args: + script (str): The content of the script to execute. + shutdown_func (func): A function which will gracefully end the called script. + callbacks (list): list of dictionaries specifying callbacks. + logfile (str, optional): Logfile name to write the output from the process. + print_stderr (bool): If the output from the process' stderr should be printed to stdout. + print_stdout (bool): If the output from the process' stdout should be printed to stdout. + + Returns: + ProcessOutputHandler: The handler for the process's output. + + Each entry in the callbacks list is a dictionary with the following keys: + * "deferred": The deferred to be called when matched. + * "types": The output this callback should be matched against. + Possible values: ["stdout", "stderr"] + * "timeout" (optional): A timeout in seconds for the deferred. + * "triggers": A list of dictionaries, each specifying specifying a trigger: + * "expr": A string to match against the log output. + * "value": A function to produce the result to be passed to the callback. + * "type" (optional): A string that specifies wether to trigger a regular callback or errback. + + """ + cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + process_protocol = ProcessOutputHandler( + script.encode('utf8'), + shutdown_func, + callbacks, + logfile, + print_stdout, + print_stderr, + ) + + # Add timeouts to deferreds + for c in callbacks: + if 'timeout' in c: + w = add_watchdog( + c['deferred'], timeout=c['timeout'], message=c.get('timeout_msg', None) + ) + process_protocol.watchdogs.append(w) + + reactor.spawnProcess( + process_protocol, sys.executable, args=[sys.executable], path=cwd + ) + return process_protocol diff --git a/deluge/tests/common_web.py b/deluge/tests/common_web.py new file mode 100644 index 0000000..f7da577 --- /dev/null +++ b/deluge/tests/common_web.py @@ -0,0 +1,57 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import pytest + +import deluge.common +import deluge.ui.web.auth +import deluge.ui.web.server +from deluge import configmanager +from deluge.conftest import BaseTestCase +from deluge.ui.web.server import DelugeWeb + +from .common import ReactorOverride + + +@pytest.mark.usefixtures('daemon', 'component') +class WebServerTestBase(BaseTestCase): + """ + Base class for tests that need a running webapi + + """ + + def set_up(self): + self.host_id = None + deluge.ui.web.server.reactor = ReactorOverride() + return self.start_webapi(None) + + def start_webapi(self, arg): + config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy() + config_defaults['port'] = 8999 + self.config = configmanager.ConfigManager('web.conf', config_defaults) + + self.deluge_web = DelugeWeb(daemon=False) + + host = list(self.deluge_web.web_api.hostlist.config['hosts'][0]) + host[2] = self.listen_port + self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host) + self.host_id = host[0] + self.deluge_web.start() + + +class WebServerMockBase: + """ + Class with utility functions for mocking with tests using the webserver + + """ + + def mock_authentication_ignore(self, auth): + def check_request(request, method=None, level=None): + pass + + self.patch(auth, 'check_request', check_request) diff --git a/deluge/tests/daemon_base.py b/deluge/tests/daemon_base.py new file mode 100644 index 0000000..707570f --- /dev/null +++ b/deluge/tests/daemon_base.py @@ -0,0 +1,67 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import pytest +from twisted.internet import defer +from twisted.internet.error import CannotListenError + +import deluge.component as component + +from . import common + + +@pytest.mark.usefixtures('config_dir') +class DaemonBase: + def common_set_up(self): + self.listen_port = 58900 + self.core = None + return component.start() + + def terminate_core(self, *args): + if args[0] is not None: + if hasattr(args[0], 'getTraceback'): + print('terminate_core: Errback Exception: %s' % args[0].getTraceback()) + + if not self.core.killed: + d = self.core.kill() + return d + + @defer.inlineCallbacks + def start_core( + self, + arg, + custom_script='', + logfile='', + print_stdout=True, + print_stderr=True, + timeout=5, + port_range=10, + extra_callbacks=None, + ): + logfile = f'daemon_{self.id()}.log' if logfile == '' else logfile + + for dummy in range(port_range): + try: + d, self.core = common.start_core( + listen_port=self.listen_port, + logfile=logfile, + timeout=timeout, + timeout_msg='Timeout!', + custom_script=custom_script, + print_stdout=print_stdout, + print_stderr=print_stderr, + extra_callbacks=extra_callbacks, + config_directory=self.config_dir, + ) + yield d + except CannotListenError as ex: + exception_error = ex + self.listen_port += 1 + except (KeyboardInterrupt, SystemExit): + raise + else: + return + raise exception_error diff --git a/deluge/tests/data/deluge.png b/deluge/tests/data/deluge.png Binary files differnew file mode 100644 index 0000000..6787fa3 --- /dev/null +++ b/deluge/tests/data/deluge.png diff --git a/deluge/tests/data/dir_with_6_files.torrent b/deluge/tests/data/dir_with_6_files.torrent Binary files differnew file mode 100644 index 0000000..2c6b5fb --- /dev/null +++ b/deluge/tests/data/dir_with_6_files.torrent diff --git a/deluge/tests/data/dir_with_single_file.torrent b/deluge/tests/data/dir_with_single_file.torrent new file mode 100644 index 0000000..33fec2c --- /dev/null +++ b/deluge/tests/data/dir_with_single_file.torrent @@ -0,0 +1 @@ +d10:created by13:mktorrent 1.113:creation datei1684991433e4:infod5:filesld6:lengthi9e4:pathl15:single_file.txteee4:name20:dir_with_single_file12:piece lengthi262144e6:pieces20:Wi,=35Yhee
\ No newline at end of file diff --git a/deluge/tests/data/filehash_field.torrent b/deluge/tests/data/filehash_field.torrent new file mode 100644 index 0000000..99e41f0 --- /dev/null +++ b/deluge/tests/data/filehash_field.torrent @@ -0,0 +1,2 @@ +d13:creation datei1476342472e4:infod5:filesld4:ed2k16:2M2&XE 18:filehash20:f4^S96P՝R6:lengthi54e4:pathl8:tull.txteed4:ed2k16:_G
L@8:filehash20:vXd/n136:lengthi54e4:pathl56:還在一個人無聊嗎~還不趕緊上來聊天美.txteee4:name16:torrent_filehash12:piece lengthi32768e6:pieces20:G@g\&\
fB +ee
\ No newline at end of file diff --git a/deluge/tests/data/google.ico b/deluge/tests/data/google.ico Binary files differnew file mode 100644 index 0000000..82339b3 --- /dev/null +++ b/deluge/tests/data/google.ico diff --git a/deluge/tests/data/md5sum.torrent b/deluge/tests/data/md5sum.torrent new file mode 100644 index 0000000..0e8c93f --- /dev/null +++ b/deluge/tests/data/md5sum.torrent @@ -0,0 +1 @@ +d8:announce25:lol.this.is.not.a.tracker7:comment36:created with py3createtorrent v0.9.610:created by23:py3createtorrent v0.9.613:creation datei1590076175e4:infod5:filesld6:lengthi4e6:md5sum32:59bcc3ad6775562f845953cf016242254:pathl3:loleed6:lengthi5e6:md5sum32:10245815f893d79f3d779690774f0b434:pathl4:rofleee4:name4:test12:piece lengthi16384e6:pieces20:86Aڲ-Y>+S]\/ee
\ No newline at end of file diff --git a/deluge/tests/data/seo.svg b/deluge/tests/data/seo.svg new file mode 100644 index 0000000..fc96f74 --- /dev/null +++ b/deluge/tests/data/seo.svg @@ -0,0 +1 @@ +<?xml version="1.0" encoding="UTF-8"?> <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 95.63 110.42" width="10cm" height="10cm"><defs><style>.cls-1{fill:#ec5728;}.cls-2{fill:#fff;}</style></defs><title>seocom-target</title><g id="Layer_2" data-name="Layer 2"><g id="Layer_1-2" data-name="Layer 1"><polygon class="cls-1" points="95.63 82.81 47.81 110.42 0 82.81 0 27.61 47.81 0 95.63 27.61 95.63 82.81"></polygon><path class="cls-2" d="M47.81,18.64A36.57,36.57,0,1,0,84.38,55.21,36.57,36.57,0,0,0,47.81,18.64Zm0,63.92A27.35,27.35,0,1,1,75.16,55.21,27.35,27.35,0,0,1,47.81,82.56Z"></path><path class="cls-2" d="M47.81,39.46A15.75,15.75,0,1,0,63.56,55.21,15.75,15.75,0,0,0,47.81,39.46Zm0,24.25a8.5,8.5,0,1,1,8.5-8.5A8.51,8.51,0,0,1,47.81,63.71Z"></path></g></g></svg>
\ No newline at end of file diff --git a/deluge/tests/data/test.torrent b/deluge/tests/data/test.torrent new file mode 100644 index 0000000..847ec58 --- /dev/null +++ b/deluge/tests/data/test.torrent @@ -0,0 +1,2 @@ +d8:announce40:http://tracker.aelitis.com:6969/announce13:announce-listll40:http://tracker.aelitis.com:6969/announceel41:http://tracker.aelitis.com:16969/announceee18:azureus_propertiesd17:dht_backup_enablei0ee7:comment34:provided by http://getazureus.com/13:comment.utf-834:provided by http://getazureus.com/10:created by19:Azureus/2.5.0.3_CVS13:creation datei1169429806e4:infod4:ed2k16:>pl]K^\;;e6:lengthi307949e4:name22:azcvsupdater_2.6.2.jar10:name.utf-822:azcvsupdater_2.6.2.jar12:piece lengthi32768e6:pieces200:B'bsu97u<w\fԛ}:qv"7y!
lv{/"PD8-MFx+S`%% +;ϐ/F>gA2|ڂb#wIfWswKrGWiد#uE?:ub(oj
^AJ?&7:privatei0e4:sha120:2ζ"Կ^Khŷee
\ No newline at end of file diff --git a/deluge/tests/data/test_torrent.file.torrent b/deluge/tests/data/test_torrent.file.torrent new file mode 100644 index 0000000..fca14ca --- /dev/null +++ b/deluge/tests/data/test_torrent.file.torrent @@ -0,0 +1,2 @@ +d7:comment17:Test torrent file10:created by11:Deluge Team13:creation datei1411826665e8:encoding5:UTF-84:infod6:lengthi512000e4:name17:test_torrent.file12:piece lengthi32768e6:pieces320:$2Tj
>hU.--~ޔBzuEB1@ͥ/K"zF0֣[asV1B^Wp-SF`M9)},4niW
jQI̗(,t؋chi*M}^WS7 h:-beTXa3m|J"]0$}l@L V,4˫zMLģJ* +\AP&I9}20֎H:_8<V2JYb2'2h0\*_j7:privatei0eee
\ No newline at end of file diff --git a/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent Binary files differnew file mode 100644 index 0000000..b55c9ae --- /dev/null +++ b/deluge/tests/data/ubuntu-9.04-desktop-i386.iso.torrent diff --git a/deluge/tests/data/unicode_file.torrent b/deluge/tests/data/unicode_file.torrent new file mode 100644 index 0000000..e62fb1f --- /dev/null +++ b/deluge/tests/data/unicode_file.torrent @@ -0,0 +1 @@ +d13:creation datei1627211242e8:encoding5:UTF-84:infod6:lengthi32e4:name35:সুকুমার রায়.txt12:piece lengthi32768e6:pieces20:",.xe2U7:privatei0eee diff --git a/deluge/tests/data/unicode_filenames.torrent b/deluge/tests/data/unicode_filenames.torrent Binary files differnew file mode 100644 index 0000000..e34f055 --- /dev/null +++ b/deluge/tests/data/unicode_filenames.torrent diff --git a/deluge/tests/data/utf8_filename_torrents.state b/deluge/tests/data/utf8_filename_torrents.state new file mode 100644 index 0000000..0e9c33d --- /dev/null +++ b/deluge/tests/data/utf8_filename_torrents.state @@ -0,0 +1,85 @@ +(ideluge.core.torrentmanager +TorrentManagerState +p1 +(dp2 +S'torrents' +p3 +(lp4 +(ideluge.core.torrentmanager +TorrentState +p5 +(dp6 +S'max_download_speed' +p7 +I-1 +sS'move_completed_path' +p8 +S'/home/calum/Downloads' +p9 +sS'paused' +p10 +I00 +sS'max_upload_slots' +p11 +I-1 +sS'prioritize_first_last' +p12 +I00 +sS'max_connections' +p13 +I-1 +sS'compact' +p14 +I00 +sS'queue' +p15 +I0 +sS'file_priorities' +p16 +(lp17 +I4 +asS'filename' +p18 +S'\xc2\xa2.torrent' +p19 +sS'max_upload_speed' +p20 +I-1 +sS'save_path' +p21 +S'/home/calum/Downloads' +p22 +sS'time_added' +p23 +F1573563097.749759 +sS'total_uploaded' +p24 +I0 +sS'torrent_id' +p25 +S'80d81d55ef3b85f3c1b634c362e014b35594dc71' +p26 +sS'auto_managed' +p27 +I01 +sS'stop_at_ratio' +p28 +I00 +sS'move_completed' +p29 +I00 +sS'trackers' +p30 +(lp31 +sS'magnet' +p32 +NsS'remove_at_ratio' +p33 +I00 +sS'stop_ratio' +p34 +F2 +sS'is_finished' +p35 +I00 +sbasb. diff --git a/deluge/tests/data/v2_hybrid.torrent b/deluge/tests/data/v2_hybrid.torrent Binary files differnew file mode 100644 index 0000000..e58057c --- /dev/null +++ b/deluge/tests/data/v2_hybrid.torrent diff --git a/deluge/tests/data/v2_test.torrent b/deluge/tests/data/v2_test.torrent Binary files differnew file mode 100644 index 0000000..fe6cbd0 --- /dev/null +++ b/deluge/tests/data/v2_test.torrent diff --git a/deluge/tests/test_alertmanager.py b/deluge/tests/test_alertmanager.py new file mode 100644 index 0000000..2d18f4b --- /dev/null +++ b/deluge/tests/test_alertmanager.py @@ -0,0 +1,102 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +from dataclasses import dataclass + +import pytest + +from deluge.core.core import Core + + +class LtSessionMock: + def __init__(self): + self.alerts = [] + + def push_alerts(self, alerts): + self.alerts = alerts + + def wait_for_alert(self, timeout): + return self.alerts[0] if len(self.alerts) > 0 else None + + def pop_alerts(self): + alerts = self.alerts + self.alerts = [] + return alerts + + +@dataclass +class LtAlertMock: + type: int + name: str + message: str + + def message(self): + return self.message + + def what(self): + return self.name + + +@pytest.fixture +def mock_alert1(): + return LtAlertMock(type=1, name='mock_alert1', message='Alert 1') + + +@pytest.fixture +def mock_alert2(): + return LtAlertMock(type=2, name='mock_alert2', message='Alert 2') + + +class TestAlertManager: + @pytest.fixture(autouse=True) + def set_up(self, component): + self.core = Core() + self.core.config.config['lsd'] = False + self.am = component.get('AlertManager') + self.am.session = LtSessionMock() + + component.start(['AlertManager']) + + def test_register_handler(self): + def handler(alert): + ... + + self.am.register_handler('dummy1', handler) + self.am.register_handler('dummy2_alert', handler) + assert self.am.handlers['dummy1'] == [handler] + assert self.am.handlers['dummy2'] == [handler] + + async def test_pop_alert(self, mock_callback, mock_alert1, mock_alert2): + self.am.register_handler('mock_alert1', mock_callback) + + self.am.session.push_alerts([mock_alert1, mock_alert2]) + + await mock_callback.deferred + + mock_callback.assert_called_once_with(mock_alert1) + + async def test_pause_not_pop_alert( + self, component, mock_alert1, mock_alert2, mock_callback + ): + await component.pause(['AlertManager']) + + self.am.register_handler('mock_alert1', mock_callback) + self.am.session.push_alerts([mock_alert1, mock_alert2]) + + await mock_callback.deferred + + mock_callback.assert_not_called() + assert not self.am._event.is_set() + assert len(self.am.session.alerts) == 2 + + def test_deregister_handler(self): + def handler(alert): + ... + + self.am.register_handler('dummy1', handler) + self.am.register_handler('dummy2_alert', handler) + self.am.deregister_handler(handler) + assert self.am.handlers['dummy1'] == [] + assert self.am.handlers['dummy2'] == [] diff --git a/deluge/tests/test_authmanager.py b/deluge/tests/test_authmanager.py new file mode 100644 index 0000000..aa86fdb --- /dev/null +++ b/deluge/tests/test_authmanager.py @@ -0,0 +1,23 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import deluge.component as component +from deluge.common import get_localhost_auth +from deluge.conftest import BaseTestCase +from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AuthManager + + +class TestAuthManager(BaseTestCase): + def set_up(self): + self.auth = AuthManager() + self.auth.start() + + def tear_down(self): + # We must ensure that the components in component registry are removed + return component.shutdown() + + def test_authorize(self): + assert self.auth.authorize(*get_localhost_auth()) == AUTH_LEVEL_ADMIN diff --git a/deluge/tests/test_bencode.py b/deluge/tests/test_bencode.py new file mode 100644 index 0000000..a4a7681 --- /dev/null +++ b/deluge/tests/test_bencode.py @@ -0,0 +1,32 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import pytest + +from deluge import bencode + +from . import common + + +class TestBencode: + def test_bencode_unicode_metainfo(self): + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + metainfo = bencode.bdecode(_file.read())[b'info'] + bencode.bencode({b'info': metainfo}) + + def test_bencode_unicode_value(self): + assert bencode.bencode(b'abc') == b'3:abc' + assert bencode.bencode('abc') == b'3:abc' + + def test_bdecode(self): + assert bencode.bdecode(b'3:dEf') == b'dEf' + with pytest.raises(bencode.BTFailure): + bencode.bdecode('dEf') + with pytest.raises(bencode.BTFailure): + bencode.bdecode(b'dEf') + with pytest.raises(bencode.BTFailure): + bencode.bdecode({'dEf': 123}) diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py new file mode 100644 index 0000000..763d43c --- /dev/null +++ b/deluge/tests/test_client.py @@ -0,0 +1,192 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import pytest +import pytest_twisted +from twisted.internet import defer + +from deluge import error +from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth, get_version +from deluge.core.authmanager import AUTH_LEVEL_ADMIN +from deluge.ui.client import Client, DaemonSSLProxy, client + + +class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy): + def authenticate(self, username, password): + self.login_deferred = defer.Deferred() + d = self.call('daemon.login', username, password) + d.addCallback(self.__on_login, username) + d.addErrback(self.__on_login_fail) + return self.login_deferred + + def __on_login(self, result, username): + self.login_deferred.callback(result) + + def __on_login_fail(self, result): + self.login_deferred.errback(result) + + +class NoVersionSendingClient(Client): + def connect( + self, + host='127.0.0.1', + port=58846, + username='', + password='', + skip_authentication=False, + ): + self._daemon_proxy = NoVersionSendingDaemonSSLProxy() + self._daemon_proxy.set_disconnect_callback(self.__on_disconnect) + + d = self._daemon_proxy.connect(host, port) + + def on_connect_fail(reason): + self.disconnect() + return reason + + def on_authenticate(result, daemon_info): + return result + + def on_authenticate_fail(reason): + return reason + + def on_connected(daemon_version): + return daemon_version + + def authenticate(daemon_version, username, password): + d = self._daemon_proxy.authenticate(username, password) + d.addCallback(on_authenticate, daemon_version) + d.addErrback(on_authenticate_fail) + return d + + d.addCallback(on_connected) + d.addErrback(on_connect_fail) + if not skip_authentication: + d.addCallback(authenticate, username, password) + return d + + def __on_disconnect(self): + if self.disconnect_callback: + self.disconnect_callback() + + +@pytest.mark.usefixtures('daemon', 'client') +class TestClient: + def test_connect_no_credentials(self): + d = client.connect('localhost', self.listen_port, username='', password='') + + def on_connect(result): + assert client.get_auth_level() == AUTH_LEVEL_ADMIN + return result + + d.addCallbacks(on_connect, self.fail) + return d + + def test_connect_localclient(self): + username, password = get_localhost_auth() + d = client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + + def on_connect(result): + assert client.get_auth_level() == AUTH_LEVEL_ADMIN + return result + + d.addCallbacks(on_connect, self.fail) + return d + + def test_connect_bad_password(self): + username, password = get_localhost_auth() + d = client.connect( + 'localhost', self.listen_port, username=username, password=password + '1' + ) + + def on_failure(failure): + assert failure.trap(error.BadLoginError) == error.BadLoginError + assert failure.value.message == 'Password does not match' + + d.addCallbacks(self.fail, on_failure) + return d + + def test_connect_invalid_user(self): + d = client.connect('localhost', self.listen_port, username='invalid-user') + + def on_failure(failure): + assert failure.trap(error.BadLoginError) == error.BadLoginError + assert failure.value.message == 'Username does not exist' + + d.addCallbacks(self.fail, on_failure) + return d + + def test_connect_without_password(self): + username, password = get_localhost_auth() + d = client.connect('localhost', self.listen_port, username=username) + + def on_failure(failure): + assert ( + failure.trap(error.AuthenticationRequired) + == error.AuthenticationRequired + ) + assert failure.value.username == username + + d.addCallbacks(self.fail, on_failure) + return d + + @pytest_twisted.inlineCallbacks + def test_connect_with_password(self): + username, password = get_localhost_auth() + yield client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + yield client.core.create_account('testuser', 'testpw', 'DEFAULT') + yield client.disconnect() + ret = yield client.connect( + 'localhost', self.listen_port, username='testuser', password='testpw' + ) + assert ret == AUTH_LEVEL_NORMAL + + @pytest_twisted.inlineCallbacks + def test_invalid_rpc_method_call(self): + yield client.connect('localhost', self.listen_port, username='', password='') + d = client.core.invalid_method() + + def on_failure(failure): + assert failure.trap(error.WrappedException) == error.WrappedException + + d.addCallbacks(self.fail, on_failure) + yield d + + def test_connect_without_sending_client_version_fails(self): + username, password = get_localhost_auth() + no_version_sending_client = NoVersionSendingClient() + d = no_version_sending_client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + + def on_failure(failure): + assert failure.trap(error.IncompatibleClient) == error.IncompatibleClient + + d.addCallbacks(self.fail, on_failure) + return d + + @pytest_twisted.inlineCallbacks + def test_daemon_version(self): + username, password = get_localhost_auth() + yield client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + + assert client.daemon_version == get_version() + + @pytest_twisted.inlineCallbacks + def test_daemon_version_check_min(self): + username, password = get_localhost_auth() + yield client.connect( + 'localhost', self.listen_port, username=username, password=password + ) + + assert client.daemon_version_check_min(get_version()) + assert not client.daemon_version_check_min(f'{get_version()}1') + assert client.daemon_version_check_min('0.1.0') diff --git a/deluge/tests/test_common.py b/deluge/tests/test_common.py new file mode 100644 index 0000000..a1af6cc --- /dev/null +++ b/deluge/tests/test_common.py @@ -0,0 +1,227 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os +import sys +import tarfile +from urllib.parse import quote_plus + +import pytest + +from deluge.common import ( + VersionSplit, + archive_files, + fdate, + fpcnt, + fpeer, + fsize, + fspeed, + ftime, + get_magnet_info, + get_path_size, + is_infohash, + is_interface, + is_interface_name, + is_ip, + is_ipv4, + is_ipv6, + is_magnet, + is_url, + parse_human_size, + windows_check, +) + +from .common import get_test_data_file + + +class TestCommon: + def test_fsize(self): + assert fsize(0) == '0 B' + assert fsize(100) == '100 B' + assert fsize(1023) == '1023 B' + assert fsize(1024) == '1.0 KiB' + assert fsize(1048575) == '1024.0 KiB' + assert fsize(1048576) == '1.0 MiB' + assert fsize(1073741823) == '1024.0 MiB' + assert fsize(1073741824) == '1.0 GiB' + assert fsize(112245) == '109.6 KiB' + assert fsize(110723441824) == '103.1 GiB' + assert fsize(1099511627775) == '1024.0 GiB' + assert fsize(1099511627777) == '1.0 TiB' + assert fsize(766148267453245) == '696.8 TiB' + + def test_fpcnt(self): + assert fpcnt(0.9311) == '93.11%' + + def test_fspeed(self): + assert fspeed(43134) == '42.1 KiB/s' + + def test_fpeer(self): + assert fpeer(10, 20) == '10 (20)' + assert fpeer(10, -1) == '10' + + def test_ftime(self): + assert ftime(0) == '' + assert ftime(5) == '5s' + assert ftime(100) == '1m 40s' + assert ftime(3789) == '1h 3m' + assert ftime(23011) == '6h 23m' + assert ftime(391187) == '4d 12h' + assert ftime(604800) == '1w 0d' + assert ftime(13893086) == '22w 6d' + assert ftime(59740269) == '1y 46w' + assert ftime(61.25) == '1m 1s' + assert ftime(119.9) == '1m 59s' + + def test_fdate(self): + assert fdate(-1) == '' + + def test_is_url(self): + assert is_url('http://deluge-torrent.org') + assert not is_url('file://test.torrent') + + def test_is_magnet(self): + assert is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN') + assert not is_magnet(None) + + def test_is_infohash(self): + assert is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973') + + def test_get_path_size(self): + if windows_check() and sys.version_info < (3, 8): + # https://bugs.python.org/issue1311 + pytest.skip('os.devnull returns False on Windows') + assert get_path_size(os.devnull) == 0 + assert get_path_size('non-existant.file') == -1 + + def test_is_ip(self): + assert is_ip('192.0.2.0') + assert not is_ip('192..0.0') + assert is_ip('2001:db8::') + assert not is_ip('2001:db8:') + + def test_is_ipv4(self): + assert is_ipv4('192.0.2.0') + assert not is_ipv4('192..0.0') + + def test_is_ipv6(self): + assert is_ipv6('2001:db8::') + assert not is_ipv6('2001:db8:') + + def test_is_interface_name(self): + if windows_check(): + assert not is_interface_name('2001:db8:') + assert not is_interface_name('{THIS0000-IS00-ONLY-FOR0-TESTING00000}') + else: + assert is_interface_name('lo') + assert not is_interface_name('127.0.0.1') + assert not is_interface_name('eth01101') + + def test_is_interface(self): + if windows_check(): + assert is_interface('127.0.0.1') + assert not is_interface('127') + assert not is_interface('{THIS0000-IS00-ONLY-FOR0-TESTING00000}') + else: + assert is_interface('lo') + assert is_interface('127.0.0.1') + assert not is_interface('127.') + assert not is_interface('eth01101') + + def test_version_split(self): + assert VersionSplit('1.2.2') == VersionSplit('1.2.2') + assert VersionSplit('1.2.1') < VersionSplit('1.2.2') + assert VersionSplit('1.1.9') < VersionSplit('1.2.2') + assert VersionSplit('1.2.2') > VersionSplit('1.2.1') + assert VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0') + assert VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2') + assert VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2') + assert VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2') + assert VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2') + assert VersionSplit('0.14.9') == VersionSplit('0.14.9') + assert VersionSplit('0.14.9') > VersionSplit('0.14.5') + assert VersionSplit('0.14.10') >= VersionSplit('0.14.9') + assert VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123') + assert VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2') + assert VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123') + assert VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123') + assert VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0') + assert VersionSplit('1.4.0a1') < VersionSplit('1.4.0') + + @pytest.mark.parametrize( + ('human_size', 'expected'), + [ + ('1', 1), + ('10 bytes', 10), + ('2048 bytes', 2048), + ('1MiB', 2 ** (10 * 2)), + ('1 MiB', 2 ** (10 * 2)), + ('1 GiB', 2 ** (10 * 3)), + ('1 TiB', 2 ** (10 * 4)), + ('1M', 10**6), + ('1p', 10**15), + ('1MB', 10**6), + ('1 GB', 10**9), + ('1 TB', 10**12), + ], + ) + def test_parse_human_size(self, human_size, expected): + parsed = parse_human_size(human_size) + assert parsed == expected, 'Mismatch when converting: %s' % human_size + + def test_archive_files(self): + arc_filelist = [ + get_test_data_file('test.torrent'), + get_test_data_file('deluge.png'), + ] + arc_filepath = archive_files('test-arc', arc_filelist) + + with tarfile.open(arc_filepath, 'r') as tar: + for tar_info in tar: + assert tar_info.isfile() + assert tar_info.name in [ + os.path.basename(arcf) for arcf in arc_filelist + ] + + def test_archive_files_missing(self): + """Archive exists even with file not found.""" + filelist = ['test.torrent', 'deluge.png', 'missing.file'] + arc_filepath = archive_files( + 'test-arc', [get_test_data_file(f) for f in filelist] + ) + filelist.remove('missing.file') + + with tarfile.open(arc_filepath, 'r') as tar: + assert tar.getnames() == filelist + assert all(tarinfo.isfile() for tarinfo in tar) + + def test_archive_files_message(self): + filelist = ['test.torrent', 'deluge.png'] + arc_filepath = archive_files( + 'test-arc', [get_test_data_file(f) for f in filelist], message='test' + ) + + result_files = filelist + ['archive_message.txt'] + with tarfile.open(arc_filepath, 'r') as tar: + assert tar.getnames() == result_files + for tar_info in tar: + assert tar_info.isfile() + if tar_info.name == 'archive_message.txt': + result = tar.extractfile(tar_info).read().decode() + assert result == 'test' + + def test_get_magnet_info_tiers(self): + tracker1 = 'udp://tracker1.example.com' + tracker2 = 'udp://tracker2.example.com' + magnet = ( + 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN' + f'&tr.1={quote_plus(tracker1)}' + f'&tr.2={quote_plus(tracker2)}' + ) + result = get_magnet_info(magnet) + assert result['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d' + assert result['trackers'][tracker1] == 1 + assert result['trackers'][tracker2] == 2 diff --git a/deluge/tests/test_component.py b/deluge/tests/test_component.py new file mode 100644 index 0000000..907d50b --- /dev/null +++ b/deluge/tests/test_component.py @@ -0,0 +1,192 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import time +from unittest.mock import Mock + +import pytest +import pytest_twisted +from twisted.internet import defer, threads + +import deluge.component as component + + +class ComponentTester(component.Component): + def __init__(self, name, depend=None): + super().__init__(name, depend=depend) + event_methods = ('start', 'update', 'pause', 'resume', 'stop', 'shutdown') + for event_method in event_methods: + setattr(self, event_method, Mock()) + + +class ComponentTesterDelayStart(ComponentTester): + def __init__(self, name, depend=None): + super().__init__(name, depend=depend) + self.start = Mock(side_effect=self.delay) + + @pytest_twisted.inlineCallbacks + def delay(self): + yield threads.deferToThread(time.sleep, 0.5) + + +@pytest.mark.usefixtures('component') +class TestComponent: + async def test_start_component(self): + c = ComponentTester('test_start') + await component.start(['test_start']) + + assert c._component_state == 'Started' + assert c.start.call_count == 1 + + async def test_start_stop_depends(self): + c1 = ComponentTester('test_start_depends_c1') + c2 = ComponentTester('test_start_depends_c2', depend=['test_start_depends_c1']) + + await component.start('test_start_depends_c2') + + assert c1._component_state == 'Started' + assert c2._component_state == 'Started' + assert c1.start.call_count == 1 + assert c2.start.call_count == 1 + + await component.stop(['test_start_depends_c1']) + + assert c1._component_state == 'Stopped' + assert c2._component_state == 'Stopped' + assert c1.stop.call_count == 1 + assert c2.stop.call_count == 1 + + async def start_with_depends(self): + c1 = ComponentTesterDelayStart('test_start_all_c1') + c2 = ComponentTester('test_start_all_c2', depend=['test_start_all_c4']) + c3 = ComponentTesterDelayStart( + 'test_start_all_c3', depend=['test_start_all_c5', 'test_start_all_c1'] + ) + c4 = ComponentTester('test_start_all_c4', depend=['test_start_all_c3']) + c5 = ComponentTester('test_start_all_c5') + + await component.start() + return c1, c2, c3, c4, c5 + + def finish_start_with_depends(self, *args): + for c in args[1:]: + component.deregister(c) + + async def test_start_all(self): + components = await self.start_with_depends() + for c in components: + assert c._component_state == 'Started' + assert c.start.call_count == 1 + + self.finish_start_with_depends(components) + + def test_register_exception(self): + ComponentTester('test_register_exception') + with pytest.raises(component.ComponentAlreadyRegistered): + ComponentTester( + 'test_register_exception', + ) + + async def test_stop(self): + c = ComponentTester('test_stop') + + await component.start(['test_stop']) + + assert c._component_state == 'Started' + + await component.stop(['test_stop']) + + assert c._component_state == 'Stopped' + assert not c._component_timer.running + assert c.stop.call_count == 1 + + async def test_stop_all(self): + components = await self.start_with_depends() + assert all(c._component_state == 'Started' for c in components) + + component.stop() + for c in components: + assert c._component_state == 'Stopped' + assert c.stop.call_count == 1 + + self.finish_start_with_depends(components) + + async def test_update(self): + c = ComponentTester('test_update') + init_update_count = int(c.update.call_count) + await component.start(['test_update']) + + assert c._component_timer + assert c._component_timer.running + assert c.update.call_count != init_update_count + await component.stop() + + async def test_pause(self): + c = ComponentTester('test_pause') + init_update_count = int(c.update.call_count) + + await component.start(['test_pause']) + + assert c._component_timer + assert c.update.call_count != init_update_count + + await component.pause(['test_pause']) + + assert c._component_state == 'Paused' + assert c.pause.call_count == 1 + assert c.update.call_count != init_update_count + assert not c._component_timer.running + + async def test_resume(self): + c = ComponentTester('test_resume') + + await component.start(['test_resume']) + + assert c._component_state == 'Started' + + await component.pause(['test_resume']) + + assert c._component_state == 'Paused' + + await component.resume(['test_resume']) + + assert c._component_state == 'Started' + assert c.resume.call_count == 1 + assert c._component_timer.running + + async def test_component_start_error(self): + ComponentTester('test_start_error') + await component.start(['test_start_error']) + await component.pause(['test_start_error']) + test_comp = component.get('test_start_error') + with pytest.raises(component.ComponentException, match='Current state: Paused'): + await test_comp._component_start() + + async def test_start_paused_error(self): + name = 'test_pause_error' + ComponentTester(name) + await component.start([name]) + await component.pause([name]) + + (failure, error), *_ = await component.start() + assert (failure, error.type, error.value.message) == ( + defer.FAILURE, + component.ComponentException, + ( + f'Trying to start component "{name}" but it is ' + 'not in a stopped state. Current state: Paused' + ), + ) + + async def test_shutdown(self): + c = ComponentTester('test_shutdown') + + await component.start(['test_shutdown']) + await component.shutdown() + + assert c.shutdown.call_count == 1 + assert c._component_state == 'Stopped' + assert not c._component_timer.running + assert c.stop.call_count == 1 diff --git a/deluge/tests/test_config.py b/deluge/tests/test_config.py new file mode 100644 index 0000000..146a5c9 --- /dev/null +++ b/deluge/tests/test_config.py @@ -0,0 +1,274 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import json +import logging +import os +from codecs import getwriter + +import pytest +from twisted.internet import task + +from deluge.common import JSON_FORMAT +from deluge.config import Config +from deluge.ui.hostlist import mask_hosts_password + +DEFAULTS = { + 'string': 'foobar', + 'int': 1, + 'float': 0.435, + 'bool': True, + 'unicode': 'foobar', + 'password': 'abc123*\\[!]?/<>#{@}=|"+$%(^)~', + 'hosts': [ + ('host1', 'port', '', 'password1234'), + ('host2', 'port', '', 'password5678'), + ], +} + + +LOGGER = logging.getLogger(__name__) + + +class TestConfig: + def test_init(self): + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + assert DEFAULTS == config.config + + config = Config('test.conf', config_dir=self.config_dir) + assert {} == config.config + + def test_set_get_item(self): + config = Config('test.conf', config_dir=self.config_dir) + config['foo'] = 1 + assert config['foo'] == 1 + with pytest.raises(ValueError): + config.set_item('foo', 'bar') + + config['foo'] = 2 + assert config.get_item('foo') == 2 + + config['foo'] = '3' + assert config.get_item('foo') == 3 + + config['unicode'] = 'ВИДЕОФИЛЬМЫ' + assert config['unicode'] == 'ВИДЕОФИЛЬМЫ' + + config['unicode'] = b'foostring' + assert not isinstance(config.get_item('unicode'), bytes) + + config._save_timer.cancel() + + def test_set_get_item_none(self): + config = Config('test.conf', config_dir=self.config_dir) + + config['foo'] = None + assert config['foo'] is None + assert isinstance(config['foo'], type(None)) + + config['foo'] = 1 + assert config.get('foo') == 1 + + config['foo'] = None + assert config['foo'] is None + + config['bar'] = None + assert config['bar'] is None + + config['bar'] = None + assert config['bar'] is None + + config._save_timer.cancel() + + async def test_on_changed_callback(self, mock_callback): + config = Config('test.conf', config_dir=self.config_dir) + config.register_change_callback(mock_callback) + config['foo'] = 1 + assert config['foo'] == 1 + await mock_callback.deferred + mock_callback.assert_called_once_with('foo', 1) + + async def test_key_function_callback(self, mock_callback): + config = Config( + 'test.conf', defaults={'foo': 1, 'bar': 1}, config_dir=self.config_dir + ) + + assert config['foo'] == 1 + config.register_set_function('foo', mock_callback) + await mock_callback.deferred + mock_callback.assert_called_once_with('foo', 1) + + mock_callback.reset_mock() + config.register_set_function('bar', mock_callback, apply_now=False) + mock_callback.assert_not_called() + config['bar'] = 2 + await mock_callback.deferred + mock_callback.assert_called_once_with('bar', 2) + + def test_get(self): + config = Config('test.conf', config_dir=self.config_dir) + config['foo'] = 1 + assert config.get('foo') == 1 + assert config.get('foobar') is None + assert config.get('foobar', 2) == 2 + config['foobar'] = 5 + assert config.get('foobar', 2) == 5 + + def test_set_log_mask_funcs(self, caplog): + """Test mask func masks key in log""" + caplog.set_level(logging.DEBUG) + config = Config( + 'test.conf', + config_dir=self.config_dir, + log_mask_funcs={'hosts': mask_hosts_password}, + ) + config['hosts'] = DEFAULTS['hosts'] + assert isinstance(config['hosts'], list) + assert 'host1' in caplog.text + assert 'host2' in caplog.text + assert 'password1234' not in caplog.text + assert 'password5678' not in caplog.text + assert '*' * 10 in caplog.text + + def test_load_log_mask_funcs(self, caplog): + """Test mask func masks key in log""" + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + config = Config( + 'test.conf', + config_dir=self.config_dir, + log_mask_funcs={'hosts': mask_hosts_password}, + ) + with caplog.at_level(logging.DEBUG): + config.load(os.path.join(self.config_dir, 'test.conf')) + assert 'host1' in caplog.text + assert 'host2' in caplog.text + assert 'foobar' in caplog.text + assert 'password1234' not in caplog.text + assert 'password5678' not in caplog.text + assert '*' * 10 in caplog.text + + def test_load(self): + def check_config(): + config = Config('test.conf', config_dir=self.config_dir) + + assert config['string'] == 'foobar' + assert config['float'] == 0.435 + assert config['password'] == 'abc123*\\[!]?/<>#{@}=|"+$%(^)~' + + # Test opening a previous 1.2 config file of just a json object + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + check_config() + + # Test opening a previous 1.2 config file of having the format versions + # as ints + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + _file.write(b'1\n') + _file.write(b'1\n') + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + check_config() + + # Test the 1.2 config format + version = {'format': 1, 'file': 1} + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + json.dump(version, getwriter('utf8')(_file), **JSON_FORMAT) + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + check_config() + + def test_save(self): + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + # We do this twice because the first time we need to save the file to disk + # and the second time we do a compare and we should not write + ret = config.save() + assert ret + ret = config.save() + assert ret + + config['string'] = 'baz' + config['int'] = 2 + ret = config.save() + assert ret + del config + + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + assert config['string'] == 'baz' + assert config['int'] == 2 + + def test_save_timer(self): + clock = task.Clock() + + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + config.callLater = clock.callLater + config['string'] = 'baz' + config['int'] = 2 + assert config._save_timer.active() + + # Timeout set for 5 seconds in config, so lets move clock by 5 seconds + clock.advance(5) + + def check_config(config): + assert not config._save_timer.active() + del config + config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + assert config['string'] == 'baz' + assert config['int'] == 2 + + check_config(config) + + def test_find_json_objects(self): + s = """{ + "file": 1, + "format": 1 +}{ + "ssl": true, + "enabled": false, + "port": 8115 +}\n""" + + from deluge.config import find_json_objects + + objects = find_json_objects(s) + assert len(objects) == 2 + + def test_find_json_objects_curly_brace(self): + """Test with string containing curly brace""" + s = """{ + "file": 1, + "format": 1 +}{ + "ssl": true, + "enabled": false, + "port": 8115, + "password": "abc{def" +}""" + + from deluge.config import find_json_objects + + objects = find_json_objects(s) + assert len(objects) == 2 + + def test_find_json_objects_double_quote(self): + """Test with string containing double quote""" + s = r"""{ + "file": 1, + "format": 1 +}{ + "ssl": true, + "enabled": false, + "port": 8115, + "password": "abc\"def" +} +""" + + from deluge.config import find_json_objects + + objects = find_json_objects(s) + assert len(objects) == 2 diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py new file mode 100644 index 0000000..28b5902 --- /dev/null +++ b/deluge/tests/test_core.py @@ -0,0 +1,511 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import base64 +import os +from base64 import b64encode +from hashlib import sha1 as sha + +import pytest +import pytest_twisted +from twisted.internet import defer, reactor, task +from twisted.internet.error import CannotListenError +from twisted.web.http import FORBIDDEN +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory, Site +from twisted.web.static import File + +import deluge.common +import deluge.component as component +import deluge.core.torrent +from deluge._libtorrent import lt +from deluge.conftest import BaseTestCase +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer +from deluge.error import AddTorrentError, InvalidTorrentError + +from . import common + +common.disable_new_release_check() + + +class CookieResource(Resource): + def render(self, request): + if request.getCookie(b'password') != b'deluge': + request.setResponseCode(FORBIDDEN) + return + + request.setHeader(b'Content-Type', b'application/x-bittorrent') + with open( + common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb' + ) as _file: + data = _file.read() + return data + + +class PartialDownload(Resource): + def getChild(self, path, request): # NOQA: N802 + return EncodingResourceWrapper(self, [GzipEncoderFactory()]) + + def render(self, request): + with open( + common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb' + ) as _file: + data = _file.read() + request.setHeader(b'Content-Length', str(len(data))) + request.setHeader(b'Content-Type', b'application/x-bittorrent') + return data + + +class RedirectResource(Resource): + def render(self, request): + request.redirect(b'/ubuntu-9.04-desktop-i386.iso.torrent') + return b'' + + +class TopLevelResource(Resource): + def __init__(self): + Resource.__init__(self) + self.putChild(b'cookie', CookieResource()) + self.putChild(b'partial', PartialDownload()) + self.putChild(b'redirect', RedirectResource()) + self.putChild( + b'ubuntu-9.04-desktop-i386.iso.torrent', + File(common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent')), + ) + + +class TestCore(BaseTestCase): + def set_up(self): + self.rpcserver = RPCServer(listen=False) + self.core: Core = Core() + self.core.config.config['lsd'] = False + self.clock = task.Clock() + self.core.torrentmanager.clock = self.clock + self.listen_port = 51242 + return component.start().addCallback(self.start_web_server) + + def start_web_server(self, result): + website = Site(TopLevelResource()) + for dummy in range(10): + try: + self.webserver = reactor.listenTCP(self.listen_port, website) + except CannotListenError as ex: + error = ex + self.listen_port += 1 + else: + break + else: + raise error + + return result + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + del self.core + return self.webserver.stopListening() + + return component.shutdown().addCallback(on_shutdown) + + def add_torrent(self, filename, paused=False): + if not paused: + # Patch libtorrent flags starting torrents paused + self.patch( + deluge.core.torrentmanager, + 'LT_DEFAULT_ADD_TORRENT_FLAGS', + lt.torrent_flags.auto_managed + | lt.torrent_flags.update_subscribe + | lt.torrent_flags.apply_ip_filter, + ) + options = {'add_paused': paused, 'auto_managed': False} + filepath = common.get_test_data_file(filename) + with open(filepath, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = self.core.add_torrent_file(filename, filedump, options) + return torrent_id + + @pytest_twisted.inlineCallbacks + def test_add_torrent_files(self): + options = {} + filenames = ['test.torrent', 'test_torrent.file.torrent'] + files_to_add = [] + for f in filenames: + filename = common.get_test_data_file(f) + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + files_to_add.append((filename, filedump, options)) + errors = yield self.core.add_torrent_files(files_to_add) + assert len(errors) == 0 + + @pytest_twisted.inlineCallbacks + def test_add_torrent_files_error_duplicate(self): + options = {} + filenames = ['test.torrent', 'test.torrent'] + files_to_add = [] + for f in filenames: + filename = common.get_test_data_file(f) + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + files_to_add.append((filename, filedump, options)) + errors = yield self.core.add_torrent_files(files_to_add) + assert len(errors) == 1 + assert str(errors[0]).startswith('Torrent already in session') + + @pytest_twisted.inlineCallbacks + def test_add_torrent_file(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) + + # Get the info hash from the test.torrent + from deluge.bencode import bdecode, bencode + + with open(filename, 'rb') as _file: + info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest() + assert torrent_id == info_hash + + def test_add_torrent_file_invalid_filedump(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with pytest.raises(AddTorrentError): + self.core.add_torrent_file(filename, False, options) + + @pytest_twisted.inlineCallbacks + def test_add_torrent_url(self, mock_mkstemp): + url = ( + 'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent' + % self.listen_port + ) + options = {} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + torrent_id = yield self.core.add_torrent_url(url, options) + assert torrent_id == info_hash + assert not os.path.isfile(mock_mkstemp[1]) + + async def test_add_torrent_url_with_cookie(self): + url = 'http://localhost:%d/cookie' % self.listen_port + options = {} + headers = {'Cookie': 'password=deluge'} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + with pytest.raises(Exception): + await self.core.add_torrent_url(url, options) + + result = await self.core.add_torrent_url(url, options, headers) + assert result == info_hash + + async def test_add_torrent_url_with_redirect(self): + url = 'http://localhost:%d/redirect' % self.listen_port + options = {} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + result = await self.core.add_torrent_url(url, options) + assert result == info_hash + + async def test_add_torrent_url_with_partial_download(self): + url = 'http://localhost:%d/partial' % self.listen_port + options = {} + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + + result = await self.core.add_torrent_url(url, options) + assert result == info_hash + + @pytest_twisted.inlineCallbacks + def test_add_torrent_magnet(self): + info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' + tracker = 'udp://tracker.example.com' + name = 'test magnet' + uri = deluge.common.create_magnet_uri(info_hash, name=name, trackers=[tracker]) + options = {} + torrent_id = yield self.core.add_torrent_magnet(uri, options) + assert torrent_id == info_hash + torrent_status = self.core.get_torrent_status(torrent_id, ['name', 'trackers']) + assert torrent_status['trackers'][0]['url'] == tracker + assert torrent_status['name'] == name + + def test_resume_torrent(self): + tid1 = self.add_torrent('test.torrent', paused=True) + tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) + # Assert paused + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert r2['paused'] + + self.core.resume_torrent(tid2) + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert not r2['paused'] + + def test_resume_torrent_list(self): + """Backward compatibility for list of torrent_ids.""" + torrent_id = self.add_torrent('test.torrent', paused=True) + self.core.resume_torrent([torrent_id]) + result = self.core.get_torrent_status(torrent_id, ['paused']) + assert not result['paused'] + + def test_resume_torrents(self): + tid1 = self.add_torrent('test.torrent', paused=True) + tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) + self.core.resume_torrents([tid1, tid2]) + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert not r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert not r2['paused'] + + def test_resume_torrents_all(self): + """With no torrent_ids param, resume all torrents""" + tid1 = self.add_torrent('test.torrent', paused=True) + tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) + self.core.resume_torrents() + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert not r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert not r2['paused'] + + def test_pause_torrent(self): + tid1 = self.add_torrent('test.torrent') + tid2 = self.add_torrent('test_torrent.file.torrent') + # Assert not paused + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert not r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert not r2['paused'] + + self.core.pause_torrent(tid2) + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert not r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert r2['paused'] + + def test_pause_torrent_list(self): + """Backward compatibility for list of torrent_ids.""" + torrent_id = self.add_torrent('test.torrent') + result = self.core.get_torrent_status(torrent_id, ['paused']) + assert not result['paused'] + self.core.pause_torrent([torrent_id]) + result = self.core.get_torrent_status(torrent_id, ['paused']) + assert result['paused'] + + def test_pause_torrents(self): + tid1 = self.add_torrent('test.torrent') + tid2 = self.add_torrent('test_torrent.file.torrent') + + self.core.pause_torrents([tid1, tid2]) + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert r2['paused'] + + def test_pause_torrents_all(self): + """With no torrent_ids param, pause all torrents""" + tid1 = self.add_torrent('test.torrent') + tid2 = self.add_torrent('test_torrent.file.torrent') + + self.core.pause_torrents() + r1 = self.core.get_torrent_status(tid1, ['paused']) + assert r1['paused'] + r2 = self.core.get_torrent_status(tid2, ['paused']) + assert r2['paused'] + + @pytest_twisted.inlineCallbacks + def test_prefetch_metadata_existing(self): + """Check another call with same magnet returns existing deferred.""" + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'') + + d1 = self.core.prefetch_magnet_metadata(magnet) + d2 = self.core.prefetch_magnet_metadata(magnet) + dg = defer.gatherResults([d1, d2], consumeErrors=True) + self.clock.advance(30) + result = yield dg + assert result == [expected] * 2 + + @pytest_twisted.inlineCallbacks + def test_remove_torrent(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) + removed = self.core.remove_torrent(torrent_id, True) + assert removed + assert len(self.core.get_session_state()) == 0 + + def test_remove_torrent_invalid(self): + with pytest.raises(InvalidTorrentError): + self.core.remove_torrent( + 'torrentidthatdoesntexist', + True, + ) + + @pytest_twisted.inlineCallbacks + def test_remove_torrents(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) + + filename2 = common.get_test_data_file('unicode_filenames.torrent') + with open(filename2, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id2 = yield self.core.add_torrent_file_async( + filename2, filedump, options + ) + d = self.core.remove_torrents([torrent_id, torrent_id2], True) + + def test_ret(val): + assert val == [] + + d.addCallback(test_ret) + + def test_session_state(val): + assert len(self.core.get_session_state()) == 0 + + d.addCallback(test_session_state) + yield d + + @pytest_twisted.inlineCallbacks + def test_remove_torrents_invalid(self): + options = {} + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = yield self.core.add_torrent_file_async( + filename, filedump, options + ) + val = yield self.core.remove_torrents( + ['invalidid1', 'invalidid2', torrent_id], False + ) + assert len(val) == 2 + assert val[0] == ('invalidid1', 'torrent_id invalidid1 not in session.') + assert val[1] == ('invalidid2', 'torrent_id invalidid2 not in session.') + + def test_get_session_status(self): + status = self.core.get_session_status( + ['net.recv_tracker_bytes', 'net.sent_tracker_bytes'] + ) + assert isinstance(status, dict) + assert status['net.recv_tracker_bytes'] == 0 + assert status['net.sent_tracker_bytes'] == 0 + + def test_get_session_status_all(self): + status = self.core.get_session_status([]) + assert isinstance(status, dict) + assert 'upload_rate' in status + assert 'net.recv_bytes' in status + + def test_get_session_status_depr(self): + status = self.core.get_session_status(['num_peers', 'num_unchoked']) + assert isinstance(status, dict) + assert status['num_peers'] == 0 + assert status['num_unchoked'] == 0 + + def test_get_session_status_rates(self): + status = self.core.get_session_status(['upload_rate', 'download_rate']) + assert isinstance(status, dict) + assert status['upload_rate'] == 0 + + def test_get_session_status_ratio(self): + status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio']) + assert isinstance(status, dict) + assert status['write_hit_ratio'] == 0.0 + assert status['read_hit_ratio'] == 0.0 + + def test_get_free_space(self): + space = self.core.get_free_space('.') + assert isinstance(space, int) + assert space >= 0 + assert self.core.get_free_space('/someinvalidpath') == -1 + + @pytest.mark.slow + def test_test_listen_port(self): + d = self.core.test_listen_port() + + def result(r): + assert r in (True, False) + + d.addCallback(result) + return d + + def test_sanitize_filepath(self): + pathlist = { + '\\backslash\\path\\': 'backslash/path', + ' single_file ': 'single_file', + '..': '', + '/../..../': '', + ' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file', + '/ test /\\.. /.file/': 'test/.file', + 'mytorrent/subfold/file1': 'mytorrent/subfold/file1', + 'Torrent/folder/': 'Torrent/folder', + } + + for key in pathlist: + assert ( + deluge.core.torrent.sanitize_filepath(key, folder=False) + == pathlist[key] + ) + + assert ( + deluge.core.torrent.sanitize_filepath(key, folder=True) + == pathlist[key] + '/' + ) + + def test_get_set_config_values(self): + assert self.core.get_config_values(['abc', 'foo']) == {'foo': None, 'abc': None} + assert self.core.get_config_value('foobar') is None + self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'}) + assert self.core.get_config_values(['foo', 'abc']) == {'foo': 10, 'abc': 'def'} + assert self.core.get_config_value('foobar') == 'barfoo' + + def test_read_only_config_keys(self): + key = 'max_upload_speed' + self.core.read_only_config_keys = [key] + + old_value = self.core.get_config_value(key) + self.core.set_config({key: old_value + 10}) + new_value = self.core.get_config_value(key) + assert old_value == new_value + + self.core.read_only_config_keys = None + + def test__create_peer_id(self): + assert self.core._create_peer_id('2.0.0') == '-DE200s-' + assert self.core._create_peer_id('2.0.0.dev15') == '-DE200D-' + assert self.core._create_peer_id('2.0.1rc1') == '-DE201r-' + assert self.core._create_peer_id('2.11.0b2') == '-DE2B0b-' + assert self.core._create_peer_id('2.4.12b2.dev3') == '-DE24CD-' + + @pytest.mark.parametrize( + 'path', + [ + common.get_test_data_file('deluge.png'), + os.path.dirname(common.get_test_data_file('deluge.png')), + ], + ) + @pytest.mark.parametrize('piece_length', [2**14, 2**16]) + @pytest_twisted.inlineCallbacks + def test_create_torrent(self, path, tmp_path, piece_length): + target = tmp_path / 'test.torrent' + + filename, filedump = yield self.core.create_torrent( + path=path, + tracker=None, + piece_length=piece_length, + target=target, + add_to_session=False, + ) + filecontent = base64.b64decode(filedump) + + with open(target, 'rb') as f: + assert f.read() == filecontent + + lt.torrent_info(filecontent) diff --git a/deluge/tests/test_decorators.py b/deluge/tests/test_decorators.py new file mode 100644 index 0000000..d2ecd1a --- /dev/null +++ b/deluge/tests/test_decorators.py @@ -0,0 +1,48 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + + +from deluge.decorators import proxy + + +class TestDecorators: + def test_proxy_with_simple_functions(self): + def negate(func, *args, **kwargs): + return not func(*args, **kwargs) + + @proxy(negate) + def something(_bool): + return _bool + + @proxy(negate) + @proxy(negate) + def double_nothing(_bool): + return _bool + + assert something(False) + assert not something(True) + assert double_nothing(True) + assert not double_nothing(False) + + def test_proxy_with_class_method(self): + def negate(func, *args, **kwargs): + return -func(*args, **kwargs) + + class Test: + def __init__(self, number): + self.number = number + + @proxy(negate) + def diff(self, number): + return self.number - number + + @proxy(negate) + def no_diff(self, number): + return self.diff(number) + + t = Test(5) + assert t.diff(1) == -4 + assert t.no_diff(1) == 4 diff --git a/deluge/tests/test_error.py b/deluge/tests/test_error.py new file mode 100644 index 0000000..a87d6a2 --- /dev/null +++ b/deluge/tests/test_error.py @@ -0,0 +1,39 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import deluge.error + + +class TestError: + def test_deluge_error(self): + msg = 'Some message' + e = deluge.error.DelugeError(msg) + assert str(e) == msg + from twisted.internet.defer import DebugInfo + + del DebugInfo.__del__ # Hides all errors + assert e._args == (msg,) + assert e._kwargs == {} + + def test_incompatible_client(self): + version = '1.3.6' + e = deluge.error.IncompatibleClient(version) + assert ( + str(e) == 'Your deluge client is not compatible with the daemon. ' + 'Please upgrade your client to %s' % version + ) + + def test_not_authorized_error(self): + current_level = 5 + required_level = 10 + e = deluge.error.NotAuthorizedError(current_level, required_level) + assert str(e) == 'Auth level too low: %d < %d' % (current_level, required_level) + + def test_bad_login_error(self): + message = 'Login failed' + username = 'deluge' + e = deluge.error.BadLoginError(message, username) + assert str(e) == message diff --git a/deluge/tests/test_files_tab.py b/deluge/tests/test_files_tab.py new file mode 100644 index 0000000..1e97cbb --- /dev/null +++ b/deluge/tests/test_files_tab.py @@ -0,0 +1,163 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import pytest + +import deluge.component as component +from deluge.configmanager import ConfigManager +from deluge.conftest import BaseTestCase +from deluge.i18n import setup_translation + +libs_available = True +# Allow running other tests without GTKUI dependencies available +try: + from deluge.ui.gtk3.files_tab import FilesTab + from deluge.ui.gtk3.gtkui import DEFAULT_PREFS + from deluge.ui.gtk3.mainwindow import MainWindow +except (ImportError, ValueError): + # gi.require_version gives ValueError if library not available + libs_available = False + +setup_translation() + + +@pytest.mark.gtkui +class TestFilesTab(BaseTestCase): + def set_up(self): + if libs_available is False: + pytest.skip('GTKUI dependencies not available') + + ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS) + self.mainwindow = MainWindow() + self.filestab = FilesTab() + self.t_id = '1' + self.filestab.torrent_id = self.t_id + self.index = 1 + + def tear_down(self): + return component.shutdown() + + def print_treestore(self, title, treestore): + root = treestore.get_iter_first() + level = 1 + + def p_level(s, lvl): + print('{}{}'.format(' ' * lvl, s)) + + def _print_treestore_children(i, lvl): + while i: + p_level(treestore[i][0], lvl) + if treestore.iter_children(i): + _print_treestore_children(treestore.iter_children(i), lvl + 2) + i = treestore.iter_next(i) + + print('\n%s' % title) + _print_treestore_children(root, level) + print('') + + def verify_treestore(self, treestore, tree): + def _verify_treestore(itr, tree_values): + i = 0 + while itr: + values = tree_values[i] + if treestore[itr][0] != values[0]: + return False + if treestore.iter_children(itr): + if not _verify_treestore(treestore.iter_children(itr), values[1]): + return False + itr = treestore.iter_next(itr) + i += 1 + return True + + return _verify_treestore(treestore.get_iter_first(), tree) + + def test_files_tab(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '2/test_100.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, + [['1/', [['test_10.txt']]], ['2/', [['test_100.txt']]]], + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + assert ret + + def test_files_tab2(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/1/test_101.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, + [['1/', [['1/', [['test_100.txt'], ['test_101.txt']]]]]], + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + assert ret + + def test_files_tab3(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/test_101.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]] + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + assert ret + + def test_files_tab4(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': '1/test_101.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/2/test_101.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, + [['1/', [['2/', [['test_101.txt']]], ['test_100.txt']]]], + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + assert ret + + def test_files_tab5(self): + self.filestab.files_list[self.t_id] = ( + {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': '2/test_101.txt', 'offset': 13, 'size': 14}, + ) + self.filestab.update_files() + self.filestab._on_torrentfilerenamed_event( + self.t_id, self.index, '1/test_101.txt' + ) + + ret = self.verify_treestore( + self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]] + ) + if not ret: + self.print_treestore('Treestore not expected:', self.filestab.treestore) + assert ret diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py new file mode 100644 index 0000000..1c27045 --- /dev/null +++ b/deluge/tests/test_httpdownloader.py @@ -0,0 +1,275 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os +import tempfile +from email.utils import formatdate + +import pytest +import pytest_twisted +from twisted.internet import reactor +from twisted.internet.error import CannotListenError +from twisted.web.error import Error, PageRedirect +from twisted.web.http import NOT_MODIFIED +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory, Site +from twisted.web.util import redirectTo + +from deluge.httpdownloader import download_file +from deluge.log import setup_logger + +temp_dir = tempfile.mkdtemp() + + +def fname(name): + return os.path.join(temp_dir, name) + + +class RedirectResource(Resource): + def render(self, request): + url = self.get_url().encode('utf8') + return redirectTo(url, request) + + +class RenameResource(Resource): + def render(self, request): + filename = request.args.get(b'filename', [b'renamed_file'])[0] + request.setHeader(b'Content-Type', b'text/plain') + request.setHeader(b'Content-Disposition', b'attachment; filename=' + filename) + return b'This file should be called ' + filename + + +class AttachmentResource(Resource): + def render(self, request): + content_type = b'text/plain' + charset = request.getHeader(b'content-charset') + if charset: + content_type += b'; charset=' + charset + request.setHeader(b'Content-Type', content_type) + request.setHeader(b'Content-Disposition', b'attachment') + append = request.getHeader(b'content-append') or b'' + content = 'Attachment with no filename set{}'.format(append.decode('utf8')) + return ( + content.encode(charset.decode('utf8')) + if charset + else content.encode('utf8') + ) + + +class TorrentResource(Resource): + def render(self, request): + content_type = b'application/x-bittorrent' + charset = request.getHeader(b'content-charset') + if charset: + content_type += b'; charset=' + charset + request.setHeader(b'Content-Type', content_type) + request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent') + return 'Binary attachment ignore charset 世丕且\n'.encode() + + +class CookieResource(Resource): + def render(self, request): + request.setHeader(b'Content-Type', b'text/plain') + if request.getCookie(b'password') is None: + return b'Password cookie not set!' + + if request.getCookie(b'password') == b'deluge': + return b'COOKIE MONSTER!' + + return request.getCookie('password') + + +class GzipResource(Resource): + def getChild(self, path, request): # NOQA: N802 + return EncodingResourceWrapper(self, [GzipEncoderFactory()]) + + def render(self, request): + message = request.args.get(b'msg', [b'EFFICIENCY!'])[0] + request.setHeader(b'Content-Type', b'text/plain') + return message + + +class PartialDownloadResource(Resource): + def __init__(self, *args, **kwargs): + Resource.__init__(self) + self.render_count = 0 + + def render(self, request): + # encoding = request.requestHeaders._rawHeaders.get('accept-encoding', None) + if self.render_count == 0: + request.setHeader(b'content-length', b'5') + else: + request.setHeader(b'content-length', b'3') + + # if encoding == "deflate, gzip, x-gzip": + request.write('abc') + self.render_count += 1 + return '' + + +class TopLevelResource(Resource): + def __init__(self): + Resource.__init__(self) + self.putChild(b'cookie', CookieResource()) + self.putChild(b'gzip', GzipResource()) + self.redirect_rsrc = RedirectResource() + self.putChild(b'redirect', self.redirect_rsrc) + self.putChild(b'rename', RenameResource()) + self.putChild(b'attachment', AttachmentResource()) + self.putChild(b'torrent', TorrentResource()) + self.putChild(b'partial', PartialDownloadResource()) + + def getChild(self, path, request): # NOQA: N802 + if not path: + return self + else: + return Resource.getChild(self, path, request) + + def render(self, request): + if request.getHeader(b'If-Modified-Since'): + request.setResponseCode(NOT_MODIFIED) + return b'<h1>Deluge HTTP Downloader tests webserver here</h1>' + + +class TestDownloadFile: + def get_url(self, path=''): + return 'http://localhost:%d/%s' % (self.listen_port, path) + + @pytest_twisted.async_yield_fixture(autouse=True) + async def setUp(self, request): # NOQA + self = request.instance + setup_logger('warning', fname('log_file')) + self.website = Site(TopLevelResource()) + self.listen_port = 51242 + self.website.resource.redirect_rsrc.get_url = self.get_url + for dummy in range(10): + try: + self.webserver = reactor.listenTCP(self.listen_port, self.website) + except CannotListenError as ex: + error = ex + self.listen_port += 1 + else: + break + else: + raise error + + yield + + await self.webserver.stopListening() + + def assert_contains(self, filename, contents): + with open(filename, encoding='utf8') as _file: + try: + assert _file.read() == contents + except Exception as ex: + pytest.fail(ex) + return filename + + def assert_not_contains(self, filename, contents, file_mode=''): + with open(filename, encoding='utf8') as _file: + try: + assert _file.read() != contents + except Exception as ex: + pytest.fail(ex) + return filename + + async def test_download(self): + filename = await download_file(self.get_url(), fname('index.html')) + assert filename == fname('index.html') + + async def test_download_without_required_cookies(self): + url = self.get_url('cookie') + filename = await download_file(url, fname('none')) + self.assert_contains(filename, 'Password cookie not set!') + + async def test_download_with_required_cookies(self): + url = self.get_url('cookie') + cookie = {'cookie': 'password=deluge'} + filename = await download_file(url, fname('monster'), headers=cookie) + assert filename == fname('monster') + self.assert_contains(filename, 'COOKIE MONSTER!') + + async def test_download_with_rename(self): + url = self.get_url('rename?filename=renamed') + filename = await download_file(url, fname('original')) + assert filename == fname('renamed') + self.assert_contains(filename, 'This file should be called renamed') + + async def test_download_with_rename_exists(self): + open(fname('renamed'), 'w').close() + url = self.get_url('rename?filename=renamed') + filename = await download_file(url, fname('original')) + assert filename == fname('renamed-1') + self.assert_contains(filename, 'This file should be called renamed') + + async def test_download_with_rename_sanitised(self): + url = self.get_url('rename?filename=/etc/passwd') + filename = await download_file(url, fname('original')) + assert filename == fname('passwd') + self.assert_contains(filename, 'This file should be called /etc/passwd') + + async def test_download_with_attachment_no_filename(self): + url = self.get_url('attachment') + filename = await download_file(url, fname('original')) + assert filename == fname('original') + self.assert_contains(filename, 'Attachment with no filename set') + + async def test_download_with_rename_prevented(self): + url = self.get_url('rename?filename=spam') + filename = await download_file(url, fname('forced'), force_filename=True) + assert filename == fname('forced') + self.assert_contains(filename, 'This file should be called spam') + + async def test_download_with_gzip_encoding(self): + url = self.get_url('gzip?msg=success') + filename = await download_file(url, fname('gzip_encoded')) + self.assert_contains(filename, 'success') + + async def test_download_with_gzip_encoding_disabled(self): + url = self.get_url('gzip?msg=unzip') + filename = await download_file( + url, fname('gzip_encoded'), allow_compression=False + ) + self.assert_contains(filename, 'unzip') + + async def test_page_redirect_unhandled(self): + url = self.get_url('redirect') + with pytest.raises(PageRedirect): + await download_file(url, fname('none'), handle_redirects=False) + + async def test_page_redirect(self): + url = self.get_url('redirect') + filename = await download_file(url, fname('none'), handle_redirects=True) + assert filename == fname('none') + + async def test_page_not_found(self): + with pytest.raises(Error): + await download_file(self.get_url('page/not/found'), fname('none')) + + @pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.") + async def test_page_not_modified(self): + headers = {'If-Modified-Since': formatdate(usegmt=True)} + with pytest.raises(Error) as exc_info: + await download_file(self.get_url(), fname('index.html'), headers=headers) + assert exc_info.value.status == NOT_MODIFIED + + async def test_download_text_reencode_charset(self): + """Re-encode as UTF-8 specified charset for text content-type header""" + url = self.get_url('attachment') + filepath = fname('test.txt') + headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'} + filename = await download_file(url, filepath, headers=headers) + assert filename == filepath + self.assert_contains(filename, 'Attachment with no filename setбвгде') + + async def test_download_binary_ignore_charset(self): + """Ignore charset for binary content-type header e.g. torrent files""" + url = self.get_url('torrent') + headers = {'content-charset': 'Windows-1251'} + filepath = fname('test.torrent') + filename = await download_file(url, fname('test.torrent'), headers=headers) + assert filename == filepath + self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n') diff --git a/deluge/tests/test_json_api.py b/deluge/tests/test_json_api.py new file mode 100644 index 0000000..ef21e94 --- /dev/null +++ b/deluge/tests/test_json_api.py @@ -0,0 +1,267 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import json as json_lib +from unittest.mock import MagicMock + +import pytest +import pytest_twisted +from twisted.internet.defer import Deferred +from twisted.web import server +from twisted.web.http import Request + +import deluge.common +import deluge.ui.web.auth +import deluge.ui.web.json_api +from deluge.error import DelugeError +from deluge.ui.web.auth import Auth +from deluge.ui.web.json_api import JSON, JSONException + +from . import common +from .common_web import WebServerMockBase + +common.disable_new_release_check() + + +@pytest.mark.usefixtures('daemon', 'client', 'component') +class TestJSON: + async def test_get_remote_methods(self): + json = JSON() + methods = await json.get_remote_methods() + assert type(methods) == tuple + assert len(methods) > 0 + + def test_render_fail_disconnected(self): + json = JSON() + request = MagicMock() + request.method = b'POST' + request._disconnected = True + # When disconnected, returns empty string + assert json.render(request) == '' + + def test_render_fail(self): + json = JSON() + request = MagicMock() + request.method = b'POST' + + def write(response_str): + request.write_was_called = True + response = json_lib.loads(response_str.decode()) + assert response['result'] is None + assert response['id'] is None + assert response['error']['message'] == 'JSONException: JSON not decodable' + assert response['error']['code'] == 5 + + request.write = write + request.write_was_called = False + request._disconnected = False + request.getHeader.return_value = b'application/json' + assert json.render(request) == server.NOT_DONE_YET + assert request.write_was_called + + def test_handle_request_invalid_method(self): + json = JSON() + request = MagicMock() + json_data = {'method': 'no-existing-module.test', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + request_id, result, error = json._handle_request(request) + assert error == {'message': 'Unknown method', 'code': 2} + + def test_handle_request_invalid_json_request(self): + json = JSON() + request = MagicMock() + json_data = {'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + with pytest.raises(JSONException): + json._handle_request(request) + json_data = {'method': 'some.method', 'params': []} + request.json = json_lib.dumps(json_data).encode() + with pytest.raises(JSONException): + json._handle_request(request) + json_data = {'method': 'some.method', 'id': 0} + request.json = json_lib.dumps(json_data).encode() + with pytest.raises(JSONException): + json._handle_request(request) + + def test_on_json_request_invalid_content_type(self): + """Test for exception with content type not application/json""" + json = JSON() + request = MagicMock() + request.getHeader.return_value = b'text/plain' + json_data = {'method': 'some.method', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + with pytest.raises(JSONException): + json._on_json_request(request) + + def test_on_json_request_valid_content_type(self): + """Ensure content-type application/json is accepted""" + json = JSON() + request = MagicMock() + request.getHeader.return_value = b'application/json' + json_data = {'method': 'some.method', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + json._on_json_request(request) + + def test_on_json_request_valid_content_type_with_charset(self): + """Ensure content-type parameters such as charset are ignored""" + json = JSON() + request = MagicMock() + request.getHeader.return_value = b'application/json;charset=utf-8' + json_data = {'method': 'some.method', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + json._on_json_request(request) + + +@pytest.mark.usefixtures('daemon', 'client', 'component') +class TestJSONCustomUserTestCase: + @pytest_twisted.inlineCallbacks + def test_handle_request_auth_error(self): + json = JSON() + auth_conf = {'session_timeout': 10, 'sessions': {}} + Auth(auth_conf) # Must create the component + + # Must be called to update remote methods in json object + yield json.get_remote_methods() + + request = MagicMock() + request.getCookie = MagicMock(return_value=b'bad_value') + json_data = {'method': 'core.get_libtorrent_version', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + request_id, result, error = json._handle_request(request) + assert error == {'message': 'Not authenticated', 'code': 1} + + +@pytest.mark.usefixtures('daemon', 'client', 'component') +class TestRPCRaiseDelugeErrorJSON: + daemon_custom_script = """ + from deluge.error import DelugeError + from deluge.core.rpcserver import export + class TestClass(object): + @export() + def test(self): + raise DelugeError('DelugeERROR') + + test = TestClass() + daemon.rpcserver.register_object(test) +""" + + async def test_handle_request_method_raise_delugeerror(self): + json = JSON() + + def get_session_id(s_id): + return s_id + + self.patch(deluge.ui.web.auth, 'get_session_id', get_session_id) + auth_conf = {'session_timeout': 10, 'sessions': {}} + auth = Auth(auth_conf) + request = Request(MagicMock(), False) + request.base = b'' + auth._create_session(request) + methods = await json.get_remote_methods() + # Verify the function has been registered + assert 'testclass.test' in methods + + request = MagicMock() + session_id = list(auth.config['sessions'])[0] + request.getCookie = MagicMock(return_value=session_id.encode()) + json_data = {'method': 'testclass.test', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + request_id, result, error = json._handle_request(request) + with pytest.raises(DelugeError): + await result + + +class TestJSONRequestFailed(WebServerMockBase): + @pytest_twisted.async_yield_fixture(autouse=True) + async def set_up(self, config_dir): + custom_script = """ + from deluge.error import DelugeError + from deluge.core.rpcserver import export + from twisted.internet import reactor, task + class TestClass(object): + @export() + def test(self): + def test_raise_error(): + raise DelugeError('DelugeERROR') + + return task.deferLater(reactor, 1, test_raise_error) + + test = TestClass() + daemon.rpcserver.register_object(test) +""" + + extra_callback = { + 'deferred': Deferred(), + 'types': ['stderr'], + 'timeout': 10, + 'triggers': [ + { + 'expr': 'in test_raise_error', + 'value': lambda reader, data, data_all: 'Test', + } + ], + } + + def on_test_raise(*args): + assert 'Unhandled error in Deferred:' in daemon.stderr_out + assert 'in test_raise_error' in daemon.stderr_out + + d, daemon = common.start_core( + custom_script=custom_script, + print_stdout=True, + print_stderr=False, + timeout=5, + extra_callbacks=[extra_callback], + config_directory=config_dir, + ) + extra_callback['deferred'].addCallback(on_test_raise, daemon) + + await d + yield + await daemon.kill() + + @pytest_twisted.inlineCallbacks + def test_render_on_rpc_request_failed(self, component, client): + json = JSON() + + methods = yield json.get_remote_methods() + # Verify the function has been registered + assert 'testclass.test' in methods + + request = MagicMock() + + # Circumvent authentication + auth = Auth({}) + self.mock_authentication_ignore(auth) + + def write(response_str): + request.write_was_called = True + response = json_lib.loads(response_str.decode()) + assert response['result'] is None, 'BAD RESULT' + assert response['id'] == 0 + assert ( + response['error']['message'] + == 'Failure: [Failure instance: Traceback (failure with no frames):' + " <class 'deluge.error.DelugeError'>: DelugeERROR\n]" + ) + assert response['error']['code'] == 4 + + request.write = write + request.write_was_called = False + request._disconnected = False + request.getHeader.return_value = b'application/json' + json_data = {'method': 'testclass.test', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + d = json._on_json_request(request) + + def on_success(arg): + assert arg == server.NOT_DONE_YET + return True + + d.addCallbacks(on_success, pytest.fail) + yield d diff --git a/deluge/tests/test_log.py b/deluge/tests/test_log.py new file mode 100644 index 0000000..f0dcbee --- /dev/null +++ b/deluge/tests/test_log.py @@ -0,0 +1,47 @@ +# +# Copyright (C) 2015 Calum Lind <calumlind@gmail.com> +# Copyright (C) 2010 Pedro Algarvio <ufs@ufsoft.org> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import logging +import warnings + +from deluge.conftest import BaseTestCase +from deluge.log import setup_logger + + +class TestLog(BaseTestCase): + def set_up(self): + setup_logger(logging.DEBUG) + + def tear_down(self): + setup_logger('none') + + def test_old_log_deprecation_warning(self): + from deluge.log import LOG + + with warnings.catch_warnings(record=True) as w: + # Cause all warnings to always be triggered. + warnings.simplefilter('always') + LOG.debug('foo') + assert w[-1].category == DeprecationWarning + + # def test_twisted_error_log(self): + # from twisted.internet import defer + # import deluge.component as component + # from deluge.core.eventmanager import EventManager + # EventManager() + # + # d = component.start() + # + # @defer.inlineCallbacks + # def call(*args): + # yield component.pause(["EventManager"]) + # yield component.start(["EventManager"]) + # + # d.addCallback(call) + # return d diff --git a/deluge/tests/test_maketorrent.py b/deluge/tests/test_maketorrent.py new file mode 100644 index 0000000..a2e473f --- /dev/null +++ b/deluge/tests/test_maketorrent.py @@ -0,0 +1,85 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os +import tempfile + +from deluge import maketorrent + + +def check_torrent(filename): + # Test loading with libtorrent to make sure it's valid + from deluge._libtorrent import lt + + lt.torrent_info(filename) + + # Test loading with our internal TorrentInfo class + from deluge.ui.common import TorrentInfo + + TorrentInfo(filename) + + +class TestMakeTorrent: + def test_save_multifile(self): + # Create a temporary folder for torrent creation + tmp_path = tempfile.mkdtemp() + with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file: + _file.write(b'a' * (312 * 1024)) + with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file: + _file.write(b'b' * (2354 * 1024)) + with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file: + _file.write(b'c' * (11 * 1024)) + + t = maketorrent.TorrentMetadata() + t.data_path = tmp_path + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + t.save(tmp_file) + + check_torrent(tmp_file) + + os.remove(os.path.join(tmp_path, 'file_A')) + os.remove(os.path.join(tmp_path, 'file_B')) + os.remove(os.path.join(tmp_path, 'file_C')) + os.rmdir(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) + + def test_save_singlefile(self): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_data = tmp_dir + '/data' + with open(tmp_data, 'wb') as _file: + _file.write(b'a' * (2314 * 1024)) + t = maketorrent.TorrentMetadata() + t.data_path = tmp_data + tmp_file = tmp_dir + '/.torrent' + t.save(tmp_file) + + check_torrent(tmp_file) + + def test_save_multifile_padded(self): + # Create a temporary folder for torrent creation + tmp_path = tempfile.mkdtemp() + with open(os.path.join(tmp_path, 'file_A'), 'wb') as _file: + _file.write(b'a' * (312 * 1024)) + with open(os.path.join(tmp_path, 'file_B'), 'wb') as _file: + _file.write(b'b' * (2354 * 1024)) + with open(os.path.join(tmp_path, 'file_C'), 'wb') as _file: + _file.write(b'c' * (11 * 1024)) + + t = maketorrent.TorrentMetadata() + t.data_path = tmp_path + t.pad_files = True + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + t.save(tmp_file) + + check_torrent(tmp_file) + + os.remove(os.path.join(tmp_path, 'file_A')) + os.remove(os.path.join(tmp_path, 'file_B')) + os.remove(os.path.join(tmp_path, 'file_C')) + os.rmdir(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) diff --git a/deluge/tests/test_maybe_coroutine.py b/deluge/tests/test_maybe_coroutine.py new file mode 100644 index 0000000..afaf171 --- /dev/null +++ b/deluge/tests/test_maybe_coroutine.py @@ -0,0 +1,207 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import pytest +import pytest_twisted +import twisted.python.failure +from twisted.internet import defer, reactor, task +from twisted.internet.defer import maybeDeferred + +from deluge.decorators import maybe_coroutine + + +@defer.inlineCallbacks +def inline_func(): + result = yield task.deferLater(reactor, 0, lambda: 'function_result') + return result + + +@defer.inlineCallbacks +def inline_error(): + raise Exception('function_error') + yield + + +@maybe_coroutine +async def coro_func(): + result = await task.deferLater(reactor, 0, lambda: 'function_result') + return result + + +@maybe_coroutine +async def coro_error(): + raise Exception('function_error') + + +@defer.inlineCallbacks +def coro_func_from_inline(): + result = yield coro_func() + return result + + +@defer.inlineCallbacks +def coro_error_from_inline(): + result = yield coro_error() + return result + + +@maybe_coroutine +async def coro_func_from_coro(): + return await coro_func() + + +@maybe_coroutine +async def coro_error_from_coro(): + return await coro_error() + + +@maybe_coroutine +async def inline_func_from_coro(): + return await inline_func() + + +@maybe_coroutine +async def inline_error_from_coro(): + return await inline_error() + + +@pytest_twisted.inlineCallbacks +def test_standard_twisted(): + """Sanity check that twisted tests work how we expect. + + Not really testing deluge code at all. + """ + result = yield inline_func() + assert result == 'function_result' + + with pytest.raises(Exception, match='function_error'): + yield inline_error() + + +@pytest.mark.parametrize( + 'function', + [ + inline_func, + coro_func, + coro_func_from_coro, + coro_func_from_inline, + inline_func_from_coro, + ], +) +@pytest_twisted.inlineCallbacks +def test_from_inline(function): + """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds.""" + result = yield function() + assert result == 'function_result' + + def cb(result): + assert result == 'function_result' + + d = function() + d.addCallback(cb) + yield d + + +@pytest.mark.parametrize( + 'function', + [ + inline_error, + coro_error, + coro_error_from_coro, + coro_error_from_inline, + inline_error_from_coro, + ], +) +@pytest_twisted.inlineCallbacks +def test_error_from_inline(function): + """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds that raise.""" + with pytest.raises(Exception, match='function_error'): + yield function() + + def eb(result): + assert isinstance(result, twisted.python.failure.Failure) + assert result.getErrorMessage() == 'function_error' + + d = function() + d.addErrback(eb) + yield d + + +@pytest.mark.parametrize( + 'function', + [ + inline_func, + coro_func, + coro_func_from_coro, + coro_func_from_inline, + inline_func_from_coro, + ], +) +async def test_from_coro(function): + """Test our coroutines wrapped with maybe_coroutine work from another coroutine.""" + result = await function() + assert result == 'function_result' + + +@pytest.mark.parametrize( + 'function', + [ + inline_error, + coro_error, + coro_error_from_coro, + coro_error_from_inline, + inline_error_from_coro, + ], +) +async def test_error_from_coro(function): + """Test our coroutines wrapped with maybe_coroutine work from another coroutine with errors.""" + with pytest.raises(Exception, match='function_error'): + await function() + + +async def test_tracebacks_preserved(): + with pytest.raises(Exception) as exc: + await coro_error_from_coro() + traceback_lines = [ + 'await coro_error_from_coro()', + 'return await coro_error()', + "raise Exception('function_error')", + ] + # If each coroutine got wrapped with ensureDeferred, the traceback will be mangled + # verify the coroutines passed through by checking the traceback. + for expected, actual in zip(traceback_lines, exc.traceback): + assert expected in str(actual) + + +async def test_maybe_deferred_coroutine(): + result = await maybeDeferred(coro_func) + assert result == 'function_result' + + +async def test_callback_before_await(): + def cb(res): + assert res == 'function_result' + return res + + d = coro_func() + d.addCallback(cb) + result = await d + assert result == 'function_result' + + +async def test_callback_after_await(): + """If it has already been used as a coroutine, can't be retroactively turned into a Deferred. + This limitation could be fixed, but the extra complication doesn't feel worth it. + """ + + def cb(res): + pass + + d = coro_func() + await d + with pytest.raises( + Exception, match='Cannot add callbacks to an already awaited coroutine' + ): + d.addCallback(cb) diff --git a/deluge/tests/test_metafile.py b/deluge/tests/test_metafile.py new file mode 100644 index 0000000..1b16750 --- /dev/null +++ b/deluge/tests/test_metafile.py @@ -0,0 +1,112 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os +import tempfile + +import pytest + +from deluge import metafile +from deluge._libtorrent import LT_VERSION +from deluge.common import VersionSplit + +from . import common + + +def check_torrent(filename): + # Test loading with libtorrent to make sure it's valid + from deluge._libtorrent import lt + + lt.torrent_info(filename) + + # Test loading with our internal TorrentInfo class + from deluge.ui.common import TorrentInfo + + TorrentInfo(filename) + + +class TestMetafile: + def test_save_multifile(self): + # Create a temporary folder for torrent creation + tmp_path = tempfile.mkdtemp() + with open(os.path.join(tmp_path, 'file_A'), 'wb') as tmp_file: + tmp_file.write(b'a' * (312 * 1024)) + with open(os.path.join(tmp_path, 'file_B'), 'wb') as tmp_file: + tmp_file.write(b'b' * (2354 * 1024)) + with open(os.path.join(tmp_path, 'file_C'), 'wb') as tmp_file: + tmp_file.write(b'c' * (11 * 1024)) + + tmp_fd, tmp_file = tempfile.mkstemp('.torrent') + metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file) + + check_torrent(tmp_file) + + os.remove(os.path.join(tmp_path, 'file_A')) + os.remove(os.path.join(tmp_path, 'file_B')) + os.remove(os.path.join(tmp_path, 'file_C')) + os.rmdir(tmp_path) + os.close(tmp_fd) + os.remove(tmp_file) + + def test_save_singlefile(self): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_data = tmp_dir + '/testdata' + with open(tmp_data, 'wb') as tmp_file: + tmp_file.write(b'a' * (2314 * 1024)) + + tmp_torrent = tmp_dir + '/.torrent' + metafile.make_meta_file(tmp_data, '', 32768, target=tmp_torrent) + + check_torrent(tmp_torrent) + + @pytest.mark.parametrize( + 'path', + [ + common.get_test_data_file('deluge.png'), + common.get_test_data_file('unicode_filenames.torrent'), + os.path.dirname(common.get_test_data_file('deluge.png')), + ], + ) + @pytest.mark.parametrize( + 'torrent_format', + [ + metafile.TorrentFormat.V1, + metafile.TorrentFormat.V2, + metafile.TorrentFormat.HYBRID, + ], + ) + @pytest.mark.parametrize('piece_length', [2**14, 2**15, 2**16]) + @pytest.mark.parametrize('private', [True, False]) + def test_create_info(self, path, torrent_format, piece_length, private): + our_info, our_piece_layers = metafile.makeinfo( + path, + piece_length, + metafile.dummy, + private=private, + torrent_format=torrent_format, + ) + lt_info, lt_piece_layers = metafile.makeinfo_lt( + path, + piece_length, + private=private, + torrent_format=torrent_format, + ) + + if ( + torrent_format == metafile.TorrentFormat.HYBRID + and os.path.isdir(path) + and VersionSplit(LT_VERSION) <= VersionSplit('2.0.7.0') + ): + # Libtorrent didn't correctly follow the standard until version 2.0.7 included + # https://github.com/arvidn/libtorrent/commit/74d82a0cd7c2e9e3c4294901d7eb65e247050df4 + # If last file is a padding, ignore that file and the last piece. + if our_info[b'files'][-1][b'path'][0] == b'.pad': + our_info[b'files'] = our_info[b'files'][:-1] + our_info[b'pieces'] = our_info[b'pieces'][:-32] + lt_info[b'pieces'] = lt_info[b'pieces'][:-32] + + assert our_info == lt_info + assert our_piece_layers == lt_piece_layers diff --git a/deluge/tests/test_plugin_metadata.py b/deluge/tests/test_plugin_metadata.py new file mode 100644 index 0000000..adf115d --- /dev/null +++ b/deluge/tests/test_plugin_metadata.py @@ -0,0 +1,43 @@ +# +# Copyright (C) 2015 Calum Lind <calumlind@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from deluge.pluginmanagerbase import PluginManagerBase + + +class TestPluginManagerBase: + def test_get_plugin_info(self): + pm = PluginManagerBase('core.conf', 'deluge.plugin.core') + for p in pm.get_available_plugins(): + for key, value in pm.get_plugin_info(p).items(): + assert isinstance(key, str) + assert isinstance(value, str) + + def test_get_plugin_info_invalid_name(self): + pm = PluginManagerBase('core.conf', 'deluge.plugin.core') + for key, value in pm.get_plugin_info('random').items(): + result = 'not available' if key in ('Name', 'Version') else '' + assert value == result + + def test_parse_pkg_info_metadata_2_1(self): + pkg_info = """Metadata-Version: 2.1 +Name: AutoAdd +Version: 1.8 +Summary: Monitors folders for .torrent files. +Home-page: http://dev.deluge-torrent.org/wiki/Plugins/AutoAdd +Author: Chase Sterling, Pedro Algarvio +Author-email: chase.sterling@gmail.com, pedro@algarvio.me +License: GPLv3 +Platform: UNKNOWN + +Monitors folders for .torrent files. + """ + plugin_info = PluginManagerBase.parse_pkg_info(pkg_info) + for value in plugin_info.values(): + assert value != '' + result = 'Monitors folders for .torrent files.' + assert plugin_info['Description'] == result diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py new file mode 100644 index 0000000..77c9f1e --- /dev/null +++ b/deluge/tests/test_rpcserver.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2013 Bro <bro.development@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import deluge.component as component +import deluge.error +from deluge.common import get_localhost_auth +from deluge.conftest import BaseTestCase +from deluge.core import rpcserver +from deluge.core.authmanager import AuthManager +from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer +from deluge.log import setup_logger + +setup_logger('none') + + +class DelugeRPCProtocolTester(DelugeRPCProtocol): + messages = [] + + def transfer_message(self, data): + self.messages.append(data) + + +class TestRPCServer(BaseTestCase): + def set_up(self): + self.rpcserver = RPCServer(listen=False) + self.rpcserver.factory.protocol = DelugeRPCProtocolTester + self.factory = self.rpcserver.factory + self.session_id = '0' + self.request_id = 11 + self.protocol = self.rpcserver.factory.protocol() + self.protocol.factory = self.factory + self.protocol.transport = self.protocol + self.factory.session_protocols[self.session_id] = self.protocol + self.factory.authorized_sessions[self.session_id] = None + self.factory.interested_events[self.session_id] = ['TorrentFolderRenamedEvent'] + self.protocol.sessionno = self.session_id + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + + return component.shutdown().addCallback(on_shutdown) + + def test_emit_event_for_session_id(self): + torrent_id = '12' + from deluge.event import TorrentFolderRenamedEvent + + data = [torrent_id, 'new name', 'old name'] + e = TorrentFolderRenamedEvent(*data) + self.rpcserver.emit_event_for_session_id(self.session_id, e) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_EVENT, str(msg) + assert msg[1] == 'TorrentFolderRenamedEvent', str(msg) + assert msg[2] == data, str(msg) + + def test_invalid_client_login(self): + self.protocol.dispatch(self.request_id, 'daemon.login', [1], {}) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + + def test_valid_client_login(self): + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch( + self.request_id, 'daemon.login', auth, {'client_version': 'Test'} + ) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) + assert msg[1] == self.request_id, str(msg) + assert msg[2] == rpcserver.AUTH_LEVEL_ADMIN, str(msg) + + def test_client_login_error(self): + # This test causes error log prints while running the test... + self.protocol.transport = None # This should cause AttributeError + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch( + self.request_id, 'daemon.login', auth, {'client_version': 'Test'} + ) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + assert msg[2] == 'WrappedException' + assert msg[3][1] == 'AttributeError' + + def test_client_invalid_method_call(self): + self.authmanager = AuthManager() + auth = get_localhost_auth() + self.protocol.dispatch(self.request_id, 'invalid_function', auth, {}) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + assert msg[2] == 'WrappedException' + assert msg[3][1] == 'AttributeError' + + def test_daemon_info(self): + self.protocol.dispatch(self.request_id, 'daemon.info', [], {}) + msg = self.protocol.messages.pop() + assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) + assert msg[1] == self.request_id, str(msg) + assert msg[2] == deluge.common.get_version(), str(msg) diff --git a/deluge/tests/test_security.py b/deluge/tests/test_security.py new file mode 100644 index 0000000..c472d16 --- /dev/null +++ b/deluge/tests/test_security.py @@ -0,0 +1,158 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os + +import pytest +from twisted.internet.utils import getProcessOutputAndValue + +import deluge.component as component +import deluge.ui.web.server +from deluge import configmanager +from deluge.common import windows_check +from deluge.conftest import BaseTestCase +from deluge.ui.web.server import DelugeWeb + +from .common import get_test_data_file +from .common_web import WebServerTestBase +from .daemon_base import DaemonBase + +SECURITY_TESTS = bool(os.getenv('SECURITY_TESTS', False)) + + +# TODO: This whole module has not been tested since migrating tests fully to pytest +class SecurityBaseTestCase: + @pytest.fixture(autouse=True) + def setvars(self): + self.home_dir = os.path.expanduser('~') + self.port = 8112 + + def _run_test(self, test): + d = getProcessOutputAndValue( + 'bash', + [ + get_test_data_file('testssl.sh'), + '--quiet', + '--nodns', + 'none', + '--color', + '0', + test, + '127.0.0.1:%d' % self.port, + ], + ) + + def on_result(results): + if test == '-e': + results = results[0].split(b'\n')[7:-6] + assert len(results) > 3 + else: + assert b'OK' in results[0] + assert b'NOT ok' not in results[0] + + d.addCallback(on_result) + return d + + def test_secured_webserver_protocol(self): + return self._run_test('-p') + + def test_secured_webserver_standard_ciphers(self): + return self._run_test('-s') + + def test_secured_webserver_heartbleed_vulnerability(self): + return self._run_test('-H') + + def test_secured_webserver_css_injection_vulnerability(self): + return self._run_test('-I') + + def test_secured_webserver_renegotiation_vulnerabilities(self): + return self._run_test('-R') + + def test_secured_webserver_crime_vulnerability(self): + return self._run_test('-C') + + def test_secured_webserver_poodle_vulnerability(self): + return self._run_test('-O') + + def test_secured_webserver_tls_fallback_scsv_mitigation_vulnerability(self): + return self._run_test('-Z') + + def test_secured_webserver_sweet32_vulnerability(self): + return self._run_test('-W') + + def test_secured_webserver_beast_vulnerability(self): + return self._run_test('-A') + + def test_secured_webserver_lucky13_vulnerability(self): + return self._run_test('-L') + + def test_secured_webserver_freak_vulnerability(self): + return self._run_test('-F') + + def test_secured_webserver_logjam_vulnerability(self): + return self._run_test('-J') + + def test_secured_webserver_drown_vulnerability(self): + return self._run_test('-D') + + def test_secured_webserver_forward_secrecy_settings(self): + return self._run_test('-f') + + def test_secured_webserver_rc4_ciphers(self): + return self._run_test('-4') + + def test_secured_webserver_preference(self): + return self._run_test('-P') + + def test_secured_webserver_ciphers(self): + return self._run_test('-e') + + +@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files') +@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests') +@pytest.mark.security +class TestDaemonSecurity(BaseTestCase, DaemonBase, SecurityBaseTestCase): + def set_up(self): + d = self.common_set_up() + self.port = self.listen_port + d.addCallback(self.start_core) + d.addErrback(self.terminate_core) + return d + + def tear_down(self): + d = component.shutdown() + d.addCallback(self.terminate_core) + return d + + +@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files') +@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests') +@pytest.mark.security +class TestWebUISecurity(WebServerTestBase, SecurityBaseTestCase): + def start_webapi(self, arg): + self.port = self.deluge_web.port = 8999 + + config_defaults = deluge.ui.web.server.CONFIG_DEFAULTS.copy() + config_defaults['port'] = self.deluge_web.port + config_defaults['https'] = True + self.config = configmanager.ConfigManager('web.conf', config_defaults) + + self.deluge_web = DelugeWeb(daemon=False) + + host = list(self.deluge_web.web_api.hostlist.config['hosts'][0]) + host[2] = self.listen_port + self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host) + self.host_id = host[0] + self.deluge_web.start() + + def test_secured_webserver_headers(self): + return self._run_test('-h') + + def test_secured_webserver_breach_vulnerability(self): + return self._run_test('-B') + + def test_secured_webserver_ticketbleed_vulnerability(self): + return self._run_test('-T') diff --git a/deluge/tests/test_sessionproxy.py b/deluge/tests/test_sessionproxy.py new file mode 100644 index 0000000..86289cc --- /dev/null +++ b/deluge/tests/test_sessionproxy.py @@ -0,0 +1,154 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +from twisted.internet.defer import maybeDeferred, succeed +from twisted.internet.task import Clock + +import deluge.component as component +import deluge.ui.sessionproxy +from deluge.conftest import BaseTestCase + + +class Core: + def __init__(self): + self.reset() + + def reset(self): + self.torrents = {} + self.torrents['a'] = {'key1': 1, 'key2': 2, 'key3': 3} + self.torrents['b'] = {'key1': 1, 'key2': 2, 'key3': 3} + self.torrents['c'] = {'key1': 1, 'key2': 2, 'key3': 3} + self.prev_status = {} + + def get_session_state(self): + return maybeDeferred(self.torrents.keys) + + def get_torrent_status(self, torrent_id, keys, diff=False): + if not keys: + keys = list(self.torrents[torrent_id]) + + if not diff: + ret = {} + for key in keys: + ret[key] = self.torrents[torrent_id][key] + + return succeed(ret) + + else: + ret = {} + if torrent_id in self.prev_status: + for key in keys: + if ( + self.prev_status[torrent_id][key] + != self.torrents[torrent_id][key] + ): + ret[key] = self.torrents[torrent_id][key] + else: + ret = self.torrents[torrent_id] + self.prev_status[torrent_id] = dict(self.torrents[torrent_id]) + return succeed(ret) + + def get_torrents_status(self, filter_dict, keys, diff=False): + if not filter_dict: + filter_dict['id'] = list(self.torrents) + if not keys: + keys = list(self.torrents['a']) + if not diff: + if 'id' in filter_dict: + torrents = filter_dict['id'] + ret = {} + for torrent in torrents: + ret[torrent] = {} + for key in keys: + ret[torrent][key] = self.torrents[torrent][key] + return succeed(ret) + else: + if 'id' in filter_dict: + torrents = filter_dict['id'] + ret = {} + for torrent in torrents: + ret[torrent] = {} + if torrent in self.prev_status: + for key in self.prev_status[torrent]: + if ( + self.prev_status[torrent][key] + != self.torrents[torrent][key] + ): + ret[torrent][key] = self.torrents[torrent][key] + else: + ret[torrent] = dict(self.torrents[torrent]) + + self.prev_status[torrent] = dict(self.torrents[torrent]) + return succeed(ret) + + +class Client: + def __init__(self): + self.core = Core() + + def __noop__(self, *args, **kwargs): + return None + + def __getattr__(self, *args, **kwargs): + return self.__noop__ + + +client = Client() + + +class TestSessionProxy(BaseTestCase): + def set_up(self): + self.clock = Clock() + self.patch(deluge.ui.sessionproxy, 'time', self.clock.seconds) + self.patch(deluge.ui.sessionproxy, 'client', client) + self.sp = deluge.ui.sessionproxy.SessionProxy() + client.core.reset() + d = self.sp.start() + + def do_get_torrents_status(torrent_ids): + inital_keys = ['key1'] + # Advance clock to expire the cache times + self.clock.advance(2) + return self.sp.get_torrents_status({'id': torrent_ids}, inital_keys) + + d.addCallback(do_get_torrents_status) + return d + + def tear_down(self): + return component.deregister(self.sp) + + def test_startup(self): + assert client.core.torrents['a'] == self.sp.torrents['a'][1] + + async def test_get_torrent_status_no_change(self): + result = await self.sp.get_torrent_status('a', []) + assert result == client.core.torrents['a'] + + async def test_get_torrent_status_change_with_cache(self): + client.core.torrents['a']['key1'] = 2 + result = await self.sp.get_torrent_status('a', ['key1']) + assert result == {'key1': 1} + + async def test_get_torrent_status_change_without_cache(self): + client.core.torrents['a']['key1'] = 2 + self.clock.advance(self.sp.cache_time + 0.1) + result = await self.sp.get_torrent_status('a', []) + assert result == client.core.torrents['a'] + + async def test_get_torrent_status_key_not_updated(self): + self.clock.advance(self.sp.cache_time + 0.1) + self.sp.get_torrent_status('a', ['key1']) + client.core.torrents['a']['key2'] = 99 + result = await self.sp.get_torrent_status('a', ['key2']) + assert result == {'key2': 99} + + async def test_get_torrents_status_key_not_updated(self): + self.clock.advance(self.sp.cache_time + 0.1) + self.sp.get_torrents_status({'id': ['a']}, ['key1']) + client.core.torrents['a']['key2'] = 99 + result = await self.sp.get_torrents_status({'id': ['a']}, ['key2']) + assert result == {'a': {'key2': 99}} diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py new file mode 100644 index 0000000..6288615 --- /dev/null +++ b/deluge/tests/test_torrent.py @@ -0,0 +1,388 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import itertools +import os +import time +from base64 import b64encode +from unittest import mock + +import pytest +from twisted.internet import defer, reactor +from twisted.internet.task import deferLater + +import deluge.component as component +import deluge.core.torrent +import deluge.tests.common as common +from deluge._libtorrent import lt +from deluge.common import VersionSplit, utf8_encode_structure +from deluge.conftest import BaseTestCase +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer +from deluge.core.torrent import Torrent +from deluge.core.torrentmanager import TorrentManager, TorrentState + +try: + from unittest.mock import AsyncMock +except ImportError: + from mock import AsyncMock + + +class TestTorrent(BaseTestCase): + def setup_config(self): + core_config = deluge.config.Config( + 'core.conf', + defaults=deluge.core.preferencesmanager.DEFAULT_PREFS, + config_dir=self.config_dir, + ) + core_config.save() + + def set_up(self): + self.setup_config() + self.rpcserver = RPCServer(listen=False) + self.core = Core() + self.core.config.config['lsd'] = False + self.core.config.config['new_release_check'] = False + self.session = self.core.session + self.torrent = None + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + del self.core + + return component.shutdown().addCallback(on_shutdown) + + def print_priority_list(self, priorities): + tmp = '' + for i, p in enumerate(priorities): + if i % 100 == 0: + print(tmp) + tmp = '' + tmp += '%s' % p + print(tmp) + + def assert_state(self, torrent, state): + """Assert torrent state matches expected state""" + torrent.update_state() + assert torrent.state == state + + def assert_state_wait(self, torrent, expected, timeout=1, interval=0.2): + """Assert state but retry with timeout e.g. Allow for async lt alerts""" + start = time.time() + + while time.time() - start < timeout: + torrent.update_state() + time.sleep(interval) + if torrent.state == expected: + break + else: + assert torrent.state == expected + + def get_torrent_atp(self, filename): + filename = common.get_test_data_file(filename) + with open(filename, 'rb') as _file: + info = lt.torrent_info(lt.bdecode(_file.read())) + atp = { + 'ti': info, + 'save_path': os.getcwd(), + 'storage_mode': lt.storage_mode_t.storage_mode_sparse, + 'flags': ( + lt.torrent_flags.auto_managed + | lt.torrent_flags.duplicate_is_error & ~lt.torrent_flags.paused + ), + } + return atp + + async def test_set_file_priorities(self): + if getattr(lt, 'file_prio_alert', None): + # Libtorrent 2.0.3 and later has a file_prio_alert + prios_set = defer.Deferred() + prios_set.addTimeout(1.5, reactor) + component.get('AlertManager').register_handler( + 'file_prio_alert', lambda a: prios_set.callback(True) + ) + else: + # On older libtorrent, we just wait a while + prios_set = deferLater(reactor, 0.8) + + atp = self.get_torrent_atp('dir_with_6_files.torrent') + handle = self.session.add_torrent(atp) + torrent = Torrent(handle, {}) + + result = torrent.get_file_priorities() + assert all(x == 4 for x in result) + + new_priorities = [3, 1, 2, 0, 5, 6, 7] + torrent.set_file_priorities(new_priorities) + assert torrent.get_file_priorities() == new_priorities + + # Test with handle.piece_priorities as handle.file_priorities async + # updates and will return old value. Also need to remove a priority + # value as one file is much smaller than piece size so doesn't show. + await prios_set # Delay to wait for alert from lt + piece_prio = handle.get_piece_priorities() + result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7]) + assert result + + def test_set_prioritize_first_last_pieces(self): + piece_indexes = [ + 0, + 1, + 50, + 51, + 52, + 110, + 111, + 112, + 113, + 200, + 201, + 202, + 212, + 213, + 214, + 215, + 216, + 217, + 457, + 458, + 459, + 460, + 461, + 462, + ] + self.run_test_set_prioritize_first_last_pieces( + 'dir_with_6_files.torrent', piece_indexes + ) + + def run_test_set_prioritize_first_last_pieces( + self, torrent_file, prioritized_piece_indexes + ): + atp = self.get_torrent_atp(torrent_file) + handle = self.session.add_torrent(atp) + + self.torrent = Torrent(handle, {}) + priorities_original = handle.get_piece_priorities() + self.torrent.set_prioritize_first_last_pieces(True) + priorities = handle.get_piece_priorities() + + # The length of the list of new priorites is the same as the original + assert len(priorities_original) == len(priorities) + + # Test the priority of all the pieces against the calculated indexes. + for idx, priority in enumerate(priorities): + if idx in prioritized_piece_indexes: + assert priorities[idx] == 7 + else: + assert priorities[idx] == 4 + + # self.print_priority_list(priorities) + + def test_set_prioritize_first_last_pieces_false(self): + atp = self.get_torrent_atp('dir_with_6_files.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + # First set some pieces prioritized + self.torrent.set_prioritize_first_last_pieces(True) + # Reset pirorities + self.torrent.set_prioritize_first_last_pieces(False) + priorities = handle.get_piece_priorities() + + # Test the priority of the prioritized pieces + for i in priorities: + assert priorities[i] == 4 + + # self.print_priority_list(priorities) + + def test_torrent_error_data_missing(self): + options = {'seed_mode': True} + filename = common.get_test_data_file('test_torrent.file.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = self.core.add_torrent_file(filename, filedump, options) + torrent = self.core.torrentmanager.torrents[torrent_id] + + # Inital check will fail and return to download state + self.assert_state_wait(torrent, 'Downloading') + + # Force an error by reading (non-existant) piece from disk + torrent.handle.read_piece(0) + self.assert_state_wait(torrent, 'Error') + + def test_torrent_error_resume_original_state(self): + options = {'seed_mode': True, 'add_paused': True} + filename = common.get_test_data_file('test_torrent.file.torrent') + with open(filename, 'rb') as _file: + filedump = b64encode(_file.read()) + torrent_id = self.core.add_torrent_file(filename, filedump, options) + torrent = self.core.torrentmanager.torrents[torrent_id] + + orig_state = 'Paused' + self.assert_state(torrent, orig_state) + + # Force an error by reading (non-existant) piece from disk + torrent.handle.read_piece(0) + self.assert_state_wait(torrent, 'Error') + + # Clear error and verify returned to original state + torrent.force_recheck() + + def test_torrent_error_resume_data_unaltered(self): + if VersionSplit(lt.__version__) >= VersionSplit('1.2.0.0'): + pytest.skip('Test not working as expected on lt 1.2 or greater') + + resume_data = { + 'active_time': 13399, + 'num_incomplete': 16777215, + 'announce_to_lsd': 1, + 'seed_mode': 0, + 'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', + 'paused': 0, + 'seeding_time': 13399, + 'last_scrape': 13399, + 'info-hash': '-\xc5\xd0\xe7\x1af\xfeid\x9ad\r9\xcb\x00\xa2YpIs', + 'max_uploads': 16777215, + 'max_connections': 16777215, + 'num_downloaders': 16777215, + 'total_downloaded': 0, + 'file-format': 'libtorrent resume file', + 'peers6': '', + 'added_time': 1411826665, + 'banned_peers6': '', + 'file_priority': [1], + 'last_seen_complete': 0, + 'total_uploaded': 0, + 'piece_priority': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', + 'file-version': 1, + 'announce_to_dht': 1, + 'auto_managed': 1, + 'upload_rate_limit': 0, + 'completed_time': 1411826665, + 'allocation': 'sparse', + 'blocks per piece': 2, + 'download_rate_limit': 0, + 'libtorrent-version': '0.16.17.0', + 'banned_peers': '', + 'num_seeds': 16777215, + 'sequential_download': 0, + 'announce_to_trackers': 1, + 'peers': '\n\x00\x02\x0f=\xc6SC\x17]\xd8}\x7f\x00\x00\x01=\xc6', + 'finished_time': 13399, + 'last_upload': 13399, + 'trackers': [[]], + 'super_seeding': 0, + 'file sizes': [[512000, 1411826586]], + 'last_download': 13399, + } + torrent_state = TorrentState( + torrent_id='2dc5d0e71a66fe69649a640d39cb00a259704973', + filename='test_torrent.file.torrent', + name='', + save_path='/home/ubuntu/Downloads', + file_priorities=[1], + is_finished=True, + ) + + filename = common.get_test_data_file('test_torrent.file.torrent') + with open(filename, 'rb') as _file: + filedump = _file.read() + resume_data = utf8_encode_structure(resume_data) + torrent_id = self.core.torrentmanager.add( + state=torrent_state, filedump=filedump, resume_data=lt.bencode(resume_data) + ) + torrent = self.core.torrentmanager.torrents[torrent_id] + + def assert_resume_data(): + self.assert_state(torrent, 'Error') + tm_resume_data = lt.bdecode( + self.core.torrentmanager.resume_data[torrent.torrent_id] + ) + assert tm_resume_data == resume_data + + return deferLater(reactor, 0.5, assert_resume_data) + + def test_get_eta_seeding(self): + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + assert self.torrent.get_eta() == 0 + self.torrent.status = mock.MagicMock() + + self.torrent.status.upload_payload_rate = 5000 + self.torrent.status.download_payload_rate = 0 + self.torrent.status.all_time_download = 10000 + self.torrent.status.all_time_upload = 500 + self.torrent.is_finished = True + self.torrent.options = {'stop_at_ratio': False} + # Test finished and uploading but no stop_at_ratio set. + assert self.torrent.get_eta() == 0 + + self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5} + result = self.torrent.get_eta() + assert result == 2 + assert isinstance(result, int) + + def test_get_eta_downloading(self): + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + assert self.torrent.get_eta() == 0 + + self.torrent.status = mock.MagicMock() + self.torrent.status.download_payload_rate = 50 + self.torrent.status.total_wanted = 10000 + self.torrent.status.total_wanted_done = 5000 + + result = self.torrent.get_eta() + assert result == 100 + assert isinstance(result, int) + + def test_get_name_unicode(self): + """Test retrieving a unicode torrent name from libtorrent.""" + atp = self.get_torrent_atp('unicode_file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + assert self.torrent.get_name() == 'সুকুমার রায়.txt' + + def test_rename_unicode(self): + """Test renaming file/folders with unicode filenames.""" + atp = self.get_torrent_atp('unicode_filenames.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + # Ignore TorrentManager method call + TorrentManager.save_resume_data = AsyncMock() + + result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв') + assert isinstance(result, defer.DeferredList) + + result = self.torrent.rename_files([[0, 'new_рбачёв']]) + assert result is None + + def test_connect_peer_port(self): + """Test to ensure port is int for libtorrent""" + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + self.torrent = Torrent(handle, {}) + assert not self.torrent.connect_peer('127.0.0.1', 'text') + assert self.torrent.connect_peer('127.0.0.1', '1234') + + def test_status_cache(self): + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + mock_time = mock.Mock(return_value=time.time()) + with mock.patch('time.time', mock_time): + torrent = Torrent(handle, {}) + counter = itertools.count() + handle.status = mock.Mock(side_effect=counter.__next__) + first_status = torrent.get_lt_status() + assert first_status == 0, 'sanity check' + assert first_status == torrent.status, 'cached status should be used' + assert torrent.get_lt_status() == 1, 'status should update' + assert torrent.status == 1 + # Advance time and verify cache expires and updates + mock_time.return_value += 10 + assert torrent.status == 2 diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py new file mode 100644 index 0000000..1a5e3a9 --- /dev/null +++ b/deluge/tests/test_torrentmanager.py @@ -0,0 +1,146 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import os +import shutil +import warnings +from base64 import b64encode +from unittest import mock + +import pytest +import pytest_twisted +from twisted.internet import reactor, task + +from deluge import component +from deluge.bencode import bencode +from deluge.conftest import BaseTestCase +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer +from deluge.error import InvalidTorrentError + +from . import common + +warnings.filterwarnings('ignore', category=RuntimeWarning) +warnings.resetwarnings() + + +class TestTorrentmanager(BaseTestCase): + def set_up(self): + self.rpcserver = RPCServer(listen=False) + self.core = Core() + self.core.config.config['lsd'] = False + self.clock = task.Clock() + self.tm = self.core.torrentmanager + self.tm.callLater = self.clock.callLater + return component.start() + + def tear_down(self): + def on_shutdown(result): + del self.rpcserver + del self.core + + return component.shutdown().addCallback(on_shutdown) + + @pytest_twisted.inlineCallbacks + def test_remove_torrent(self): + filename = common.get_test_data_file('test.torrent') + with open(filename, 'rb') as _file: + filedump = _file.read() + torrent_id = yield self.core.add_torrent_file_async( + filename, b64encode(filedump), {} + ) + assert self.tm.remove(torrent_id, False) + + @pytest_twisted.inlineCallbacks + def test_remove_magnet(self): + """Test remove magnet before received metadata and delete_copies is True""" + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + options = {} + self.core.config.config['copy_torrent_file'] = True + self.core.config.config['del_copy_torrent_file'] = True + torrent_id = yield self.core.add_torrent_magnet(magnet, options) + assert self.tm.remove(torrent_id, False) + + async def test_prefetch_metadata(self): + from deluge._libtorrent import lt + + with open(common.get_test_data_file('test.torrent'), 'rb') as _file: + t_info = lt.torrent_info(lt.bdecode(_file.read())) + mock_alert = mock.MagicMock() + mock_alert.handle.info_hash = mock.MagicMock( + return_value='ab570cdd5a17ea1b61e970bb72047de141bce173' + ) + mock_alert.handle.get_torrent_info = mock.MagicMock(return_value=t_info) + + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + d = self.tm.prefetch_metadata(magnet, 30) + # Make sure to use calllater, because the above prefetch call won't + # actually start running until we await it. + reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert) + + expected = ( + 'ab570cdd5a17ea1b61e970bb72047de141bce173', + b64encode( + bencode( + { + b'piece length': 32768, + b'sha1': ( + b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh' + b'\x9d\xc5\xb7\xac\xdd' + ), + b'name': b'azcvsupdater_2.6.2.jar', + b'private': 0, + b'pieces': ( + b"\xdb\x04B\x05\xc3'\xdab\xb8su97\xa9u" + b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd' + b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7' + b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3' + b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0' + b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8' + b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed' + b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA' + b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f' + b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee' + b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc' + b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu' + b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?' + b'\xa1\xd6\x8c\x83\x9e&' + ), + b'length': 307949, + b'name.utf-8': b'azcvsupdater_2.6.2.jar', + b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7', + } + ) + ), + ) + assert expected == await d + + async def test_prefetch_metadata_timeout(self): + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + d = self.tm.prefetch_metadata(magnet, 30) + self.clock.advance(30) + result = await d + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'') + assert result == expected + + @pytest.mark.todo + def test_remove_torrent_false(self): + """Test when remove_torrent returns False""" + common.todo_test(self) + + def test_remove_invalid_torrent(self): + with pytest.raises(InvalidTorrentError): + self.tm.remove('torrentidthatdoesntexist') + + def test_open_state(self): + """Open a state with a UTF-8 encoded torrent filename.""" + shutil.copy( + common.get_test_data_file('utf8_filename_torrents.state'), + os.path.join(self.config_dir, 'state', 'torrents.state'), + ) + + state = self.tm.open_state() + assert len(state.torrents) == 1 diff --git a/deluge/tests/test_torrentview.py b/deluge/tests/test_torrentview.py new file mode 100644 index 0000000..9da99d8 --- /dev/null +++ b/deluge/tests/test_torrentview.py @@ -0,0 +1,224 @@ +# +# Copyright (C) 2014 Bro <bro.development@gmail.com> +# Copyright (C) 2014 Calum Lind <calumlind@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import pytest + +import deluge.component as component +from deluge.configmanager import ConfigManager +from deluge.conftest import BaseTestCase +from deluge.i18n import setup_translation + +# Allow running other tests without GTKUI dependencies available +try: + # pylint: disable=ungrouped-imports + from gi.repository.GObject import TYPE_UINT64 + + from deluge.ui.gtk3.gtkui import DEFAULT_PREFS + from deluge.ui.gtk3.mainwindow import MainWindow + from deluge.ui.gtk3.menubar import MenuBar + from deluge.ui.gtk3.torrentdetails import TorrentDetails + from deluge.ui.gtk3.torrentview import TorrentView +except (ImportError, ValueError): + libs_available = False + TYPE_UINT64 = 'Whatever' +else: + libs_available = True + +setup_translation() + + +@pytest.mark.gtkui +class TestTorrentview(BaseTestCase): + default_column_index = [ + 'filter', + 'torrent_id', + 'dirty', + '#', + 'Name', + 'Size', + 'Downloaded', + 'Uploaded', + 'Remaining', + 'Progress', + 'Seeds', + 'Peers', + 'Seeds:Peers', + 'Down Speed', + 'Up Speed', + 'Down Limit', + 'Up Limit', + 'ETA', + 'Ratio', + 'Avail', + 'Added', + 'Completed', + 'Complete Seen', + 'Last Transfer', + 'Tracker', + 'Download Folder', + 'Owner', + 'Shared', + ] + default_liststore_columns = [ + bool, + str, + bool, + int, + str, + str, # Name + TYPE_UINT64, + TYPE_UINT64, + TYPE_UINT64, + TYPE_UINT64, + float, + str, # Progress + int, + int, + int, + int, + float, # Seeds, Peers + int, + int, + float, + float, + int, + float, + float, # ETA, Ratio, Avail + int, + int, + int, + int, + str, + str, # Tracker + str, + str, + bool, + ] # shared + + def set_up(self): + if libs_available is False: + pytest.skip('GTKUI dependencies not available') + + # MainWindow loads this config file, so lets make sure it contains the defaults + ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS) + self.mainwindow = MainWindow() + self.torrentview = TorrentView() + self.torrentdetails = TorrentDetails() + self.menubar = MenuBar() + + def tear_down(self): + return component.shutdown() + + def test_torrentview_columns(self): + assert self.torrentview.column_index == self.default_column_index + assert self.torrentview.liststore_columns == self.default_liststore_columns + assert self.torrentview.columns['Download Folder'].column_indices == [30] + + def test_add_column(self): + # Add a text column + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 1 + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 1 + assert self.torrentview.column_index[-1] == test_col + assert self.torrentview.columns[test_col].column_indices == [33] + + def test_add_columns(self): + # Add a text column + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + + # Add a second text column + test_col2 = 'Test column2' + self.torrentview.add_text_column(test_col2, status_field=['label2']) + + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 2 + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 2 + # test_col + assert self.torrentview.column_index[-2] == test_col + assert self.torrentview.columns[test_col].column_indices == [33] + + # test_col2 + assert self.torrentview.column_index[-1] == test_col2 + assert self.torrentview.columns[test_col2].column_indices == [34] + + def test_remove_column(self): + # Add and remove text column + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + self.torrentview.remove_column(test_col) + + assert len(self.torrentview.liststore_columns) == len( + self.default_liststore_columns + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + assert self.torrentview.column_index[-1] == self.default_column_index[-1] + assert self.torrentview.columns[ + self.default_column_index[-1] + ].column_indices == [32] + + def test_remove_columns(self): + # Add two columns + test_col = 'Test column' + self.torrentview.add_text_column(test_col, status_field=['label']) + test_col2 = 'Test column2' + self.torrentview.add_text_column(test_col2, status_field=['label2']) + + # Remove test_col + self.torrentview.remove_column(test_col) + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 1 + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 1 + assert self.torrentview.column_index[-1] == test_col2 + assert self.torrentview.columns[test_col2].column_indices == [33] + + # Remove test_col2 + self.torrentview.remove_column(test_col2) + assert len(self.torrentview.liststore_columns) == len( + self.default_liststore_columns + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + assert self.torrentview.column_index[-1] == self.default_column_index[-1] + assert self.torrentview.columns[ + self.default_column_index[-1] + ].column_indices == [32] + + def test_add_remove_column_multiple_types(self): + # Add a column with multiple column types + test_col3 = 'Test column3' + self.torrentview.add_progress_column( + test_col3, status_field=['progress', 'label3'], col_types=[float, str] + ) + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 2 + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 1 + assert self.torrentview.column_index[-1] == test_col3 + assert self.torrentview.columns[test_col3].column_indices == [33, 34] + + # Remove multiple column-types column + self.torrentview.remove_column(test_col3) + + assert len(self.torrentview.liststore_columns) == len( + self.default_liststore_columns + ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + assert self.torrentview.column_index[-1] == self.default_column_index[-1] + assert self.torrentview.columns[ + self.default_column_index[-1] + ].column_indices == [32] diff --git a/deluge/tests/test_tracker_icons.py b/deluge/tests/test_tracker_icons.py new file mode 100644 index 0000000..57cc138 --- /dev/null +++ b/deluge/tests/test_tracker_icons.py @@ -0,0 +1,71 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import os.path + +import pytest + +import deluge.component as component +import deluge.ui.tracker_icons +from deluge.conftest import BaseTestCase +from deluge.ui.tracker_icons import TrackerIcon, TrackerIcons + +from . import common + +common.disable_new_release_check() + + +@pytest.mark.internet +class TestTrackerIcons(BaseTestCase): + def set_up(self): + # Disable resizing with Pillow for consistency. + self.patch(deluge.ui.tracker_icons, 'Image', None) + self.icons = TrackerIcons() + + def tear_down(self): + return component.shutdown() + + async def test_get_deluge_png(self, mock_mkstemp): + # Deluge has a png favicon link + icon = TrackerIcon(common.get_test_data_file('deluge.png')) + result = await self.icons.fetch('deluge-torrent.org') + assert result == icon + assert not os.path.isfile(mock_mkstemp[1]) + + async def test_get_google_ico(self): + # Google doesn't have any icon links + # So instead we'll grab its favicon.ico + icon = TrackerIcon(common.get_test_data_file('google.ico')) + result = await self.icons.fetch('www.google.com') + assert result == icon + + async def test_get_google_ico_hebrew(self): + """Test that Google.co.il page is read as UTF-8""" + icon = TrackerIcon(common.get_test_data_file('google.ico')) + result = await self.icons.fetch('www.google.co.il') + assert result == icon + + async def test_get_google_ico_with_redirect(self): + # google.com redirects to www.google.com + icon = TrackerIcon(common.get_test_data_file('google.ico')) + result = await self.icons.fetch('google.com') + assert result == icon + + @pytest.mark.skip(reason='Site removed favicon, new SNI test will be needed') + async def test_get_seo_svg_with_sni(self): + # seo using certificates with SNI support only + icon = TrackerIcon(common.get_test_data_file('seo.svg')) + result = await self.icons.fetch('www.seo.com') + assert result == icon + + async def test_get_empty_string_tracker(self): + result = await self.icons.fetch('') + assert result is None + + async def test_invalid_host(self, mock_mkstemp): + """Test that TrackerIcon can handle invalid hostname""" + result = await self.icons.fetch('deluge.example.com') + assert not result + assert not os.path.isfile(mock_mkstemp[1]) diff --git a/deluge/tests/test_transfer.py b/deluge/tests/test_transfer.py new file mode 100644 index 0000000..92e349b --- /dev/null +++ b/deluge/tests/test_transfer.py @@ -0,0 +1,398 @@ +# +# Copyright (C) 2012 Bro <bro.development@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import base64 + +import pytest +import rencode + +import deluge.log +from deluge.transfer import DelugeTransferProtocol + +deluge.log.setup_logger('none') + + +class TransferTestClass(DelugeTransferProtocol): + def __init__(self): + DelugeTransferProtocol.__init__(self) + self.transport = self + self.messages_out = [] + self.messages_in = [] + self.packet_count = 0 + + def write(self, message): + """ + Called by DelugeTransferProtocol class + This simulates the write method of the self.transport in DelugeTransferProtocol. + """ + self.messages_out.append(message) + + def message_received(self, message): + """ + This method overrides message_received is DelugeTransferProtocol and is + called with the complete message as it was sent by DelugeRPCProtocol + """ + self.messages_in.append(message) + + def get_messages_out_joined(self): + return b''.join(self.messages_out) + + def get_messages_in(self): + return self.messages_in + + def data_received_old_protocol(self, data): + """ + This is the original method logic (as close as possible) for handling data receival on the client + + :param data: a zlib compressed string encoded with rencode. + + """ + import zlib + + print('\n=== New Data Received ===\nBytes received:', len(data)) + + if self._buffer: + # We have some data from the last dataReceived() so lets prepend it + print('Current buffer:', len(self._buffer) if self._buffer else '0') + data = self._buffer + data + self._buffer = None + + self.packet_count += 1 + self._bytes_received += len(data) + + while data: + print('\n-- Handle packet data --') + + print('Bytes received:', self._bytes_received) + print('Current data:', len(data)) + + if self._message_length == 0: + # handle_new_message uses _buffer so set data to _buffer. + self._buffer = data + self._handle_new_message() + data = self._buffer + self._buffer = None + self.packet_count = 1 + print('New message of length:', self._message_length) + + dobj = zlib.decompressobj() + try: + request = rencode.loads(dobj.decompress(data)) + print('Successfully loaded message', end=' ') + print( + ' - Buffer length: %d, data length: %d, unused length: %d' + % ( + len(data), + len(data) - len(dobj.unused_data), + len(dobj.unused_data), + ) + ) + print('Packet count:', self.packet_count) + except Exception as ex: + # log.debug('Received possible invalid message (%r): %s', data, e) + # This could be cut-off data, so we'll save this in the buffer + # and try to prepend it on the next dataReceived() + self._buffer = data + print( + 'Failed to load buffer (size %d): %s' % (len(self._buffer), str(ex)) + ) + return + else: + data = dobj.unused_data + self._message_length = 0 + + self.message_received(request) + + +class TestDelugeTransferProtocol: + @pytest.fixture(autouse=True) + def set_up(self): + """ + The expected messages corresponds to the test messages (msg1, msg2) after they've been processed + by DelugeTransferProtocol.send, which means that they've first been encoded with rencode, + and then compressed with zlib. + The expected messages are encoded in base64 to easily including it here in the source. + So before comparing the results with the expected messages, the expected messages must be decoded, + or the result message be encoded in base64. + + """ + self.transfer = TransferTestClass() + self.msg1 = ( + 0, + 1, + {'key_int': 1242429423}, + {'key_str': b'some string'}, + {'key_bool': True}, + ) + self.msg2 = ( + 2, + 3, + {'key_float': 12424.29423}, + {'key_unicode': 'some string'}, + {'key_dict_with_tuple': {'key_tuple': (1, 2, 3)}}, + {'keylist': [4, '5', 6.7]}, + ) + + self.msg1_expected_compressed_base64 = ( + b'AQAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU' + b'XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ' + ) + self.msg2_expected_compressed_base64 = ( + b'AQAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3il+ZlJuenpH' + b'YX5+emKhSXFGXmpadPBkmkZCaXxJdnlmTEl5QW5KRCdIOZhxmB' + b'hrUDuTmZxSWHWRpNnRyupaUBAHYlJxI=' + ) + + def test_send_one_message(self): + """ + Send one message and test that it has been sent correctoly to the + method 'write' in self.transport. + + """ + self.transfer.transfer_message(self.msg1) + # Get the data as sent by DelugeTransferProtocol + messages = self.transfer.get_messages_out_joined() + base64_encoded = base64.b64encode(messages) + assert base64_encoded == self.msg1_expected_compressed_base64 + + def test_receive_one_message(self): + """ + Receive one message and test that it has been sent to the + method 'message_received'. + + """ + self.transfer.dataReceived( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + # Get the data as sent by DelugeTransferProtocol + messages = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(messages) + + def test_receive_old_message(self): + """ + Receive an old message (with no header) and verify that the data is discarded. + + """ + self.transfer.dataReceived(rencode.dumps(self.msg1)) + assert len(self.transfer.get_messages_in()) == 0 + assert self.transfer._message_length == 0 + assert len(self.transfer._buffer) == 0 + + def test_receive_two_concatenated_messages(self): + """ + This test simply concatenates two messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data as one string. + + """ + two_concatenated = base64.b64decode( + self.msg1_expected_compressed_base64 + ) + base64.b64decode(self.msg2_expected_compressed_base64) + self.transfer.dataReceived(two_concatenated) + + # Get the data as sent by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) + message2 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) + + def test_receive_three_messages_in_parts(self): + """ + This test concatenates three messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data in multiple parts. + + """ + msg_bytes = ( + base64.b64decode(self.msg1_expected_compressed_base64) + + base64.b64decode(self.msg2_expected_compressed_base64) + + base64.b64decode(self.msg1_expected_compressed_base64) + ) + packet_size = 40 + + one_message_byte_count = len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + two_messages_byte_count = one_message_byte_count + len( + base64.b64decode(self.msg2_expected_compressed_base64) + ) + three_messages_byte_count = two_messages_byte_count + len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + + for d in self.receive_parts_helper(msg_bytes, packet_size): + bytes_received = self.transfer.get_bytes_recv() + + if bytes_received >= three_messages_byte_count: + expected_msgs_received_count = 3 + elif bytes_received >= two_messages_byte_count: + expected_msgs_received_count = 2 + elif bytes_received >= one_message_byte_count: + expected_msgs_received_count = 1 + else: + expected_msgs_received_count = 0 + # Verify that the expected number of complete messages has arrived + assert expected_msgs_received_count == len(self.transfer.get_messages_in()) + + # Get the data as received by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) + message2 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) + message3 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(message3) + + # Remove underscore to enable test, or run the test directly: + def _test_rencode_fail_protocol(self): + """ + This test tries to test the protocol that relies on errors from rencode. + + """ + msg_bytes = ( + base64.b64decode(self.msg1_expected_compressed_base64) + + base64.b64decode(self.msg2_expected_compressed_base64) + + base64.b64decode(self.msg1_expected_compressed_base64) + ) + packet_size = 149 + + one_message_byte_count = len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + two_messages_byte_count = one_message_byte_count + len( + base64.b64decode(self.msg2_expected_compressed_base64) + ) + three_messages_byte_count = two_messages_byte_count + len( + base64.b64decode(self.msg1_expected_compressed_base64) + ) + + print() + + print( + 'Msg1 size:', + len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4, + ) + print( + 'Msg2 size:', + len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4, + ) + print( + 'Msg3 size:', + len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4, + ) + + print('one_message_byte_count:', one_message_byte_count) + print('two_messages_byte_count:', two_messages_byte_count) + print('three_messages_byte_count:', three_messages_byte_count) + + for d in self.receive_parts_helper( + msg_bytes, packet_size, self.transfer.data_received_old_protocol + ): + bytes_received = self.transfer.get_bytes_recv() + + if bytes_received >= three_messages_byte_count: + expected_msgs_received_count = 3 + elif bytes_received >= two_messages_byte_count: + expected_msgs_received_count = 2 + elif bytes_received >= one_message_byte_count: + expected_msgs_received_count = 1 + else: + expected_msgs_received_count = 0 + # Verify that the expected number of complete messages has arrived + if expected_msgs_received_count != len(self.transfer.get_messages_in()): + print( + 'Expected number of messages received is %d, but %d have been received.' + % ( + expected_msgs_received_count, + len(self.transfer.get_messages_in()), + ) + ) + + # Get the data as received by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) + message2 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) + message3 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(message3) + + def test_receive_middle_of_header(self): + """ + This test concatenates two messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data in two parts. + The first part contains the first message, plus two bytes of the next message. + The next part contains the rest of the message. + + This is a special case, as DelugeTransferProtocol can't start parsing + a message until it has at least 5 bytes (the size of the header) to be able + to read and parse the size of the payload. + + """ + two_concatenated = base64.b64decode( + self.msg1_expected_compressed_base64 + ) + base64.b64decode(self.msg2_expected_compressed_base64) + first_len = len(base64.b64decode(self.msg1_expected_compressed_base64)) + + # Now found the entire first message, and half the header of the next message (2 bytes into the header) + self.transfer.dataReceived(two_concatenated[: first_len + 2]) + + # Should be 1 message in the list + assert 1 == len(self.transfer.get_messages_in()) + + # Send the rest + self.transfer.dataReceived(two_concatenated[first_len + 2 :]) + + # Should be 2 messages in the list + assert 2 == len(self.transfer.get_messages_in()) + + # Get the data as sent by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) + message2 = self.transfer.get_messages_in().pop(0) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) + + # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon + # def test_simulate_big_transfer(self): + # filename = '../deluge.torrentlist' + # + # f = open(filename, 'r') + # data = f.read() + # message_to_send = eval(data) + # self.transfer.transfer_message(message_to_send) + # + # Get the data as sent to the network by DelugeTransferProtocol + # compressed_data = self.transfer.get_messages_out_joined() + # packet_size = 16000 # Or something smaller... + # + # for d in self.receive_parts_helper(compressed_data, packet_size): + # bytes_recv = self.transfer.get_bytes_recv() + # if bytes_recv < len(compressed_data): + # self.assertEqual(len(self.transfer.get_messages_in()), 0) + # else: + # self.assertEqual(len(self.transfer.get_messages_in()), 1) + # Get the data as received by DelugeTransferProtocol + # transfered_message = self.transfer.get_messages_in().pop(0) + # Test that the data structures are equal + # self.assertEqual(transfered_message, message_to_send) + # self.assertTrue(transfered_message == message_to_send) + # + # f.close() + # f = open('rencode.torrentlist', 'w') + # f.write(str(transfered_message)) + # f.close() + + def receive_parts_helper(self, data, packet_size, receive_func=None): + byte_count = len(data) + sent_bytes = 0 + while byte_count > 0: + to_receive = packet_size if byte_count > packet_size else byte_count + sent_bytes += to_receive + byte_count -= to_receive + if receive_func: + receive_func(data[:to_receive]) + else: + self.transfer.dataReceived(data[:to_receive]) + data = data[to_receive:] + yield diff --git a/deluge/tests/test_ui_common.py b/deluge/tests/test_ui_common.py new file mode 100644 index 0000000..87a4a2c --- /dev/null +++ b/deluge/tests/test_ui_common.py @@ -0,0 +1,290 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from deluge.ui.common import TorrentInfo + +from . import common + + +class TestUICommon: + def test_hash_optional_single_file(self): + """Ensure single file with `ed2k` and `sha1` keys are not in filetree output.""" + filename = common.get_test_data_file('test.torrent') + files_tree = {'azcvsupdater_2.6.2.jar': (0, 307949, True)} + ti = TorrentInfo(filename, filetree=1) + assert ti.files_tree == files_tree + + files_tree2 = { + 'contents': { + 'azcvsupdater_2.6.2.jar': { + 'type': 'file', + 'index': 0, + 'length': 307949, + 'download': True, + } + } + } + ti = TorrentInfo(filename, filetree=2) + assert ti.files_tree == files_tree2 + + def test_hash_optional_multi_file(self): + """Ensure multi-file with `filehash` and `ed2k` are keys not in filetree output.""" + filename = common.get_test_data_file('filehash_field.torrent') + files_tree = { + 'torrent_filehash': { + 'tull.txt': (0, 54, True), + '還在一個人無聊嗎~還不趕緊上來聊天美.txt': (1, 54, True), + } + } + ti = TorrentInfo(filename, filetree=1) + assert ti.files_tree == files_tree + + files_tree2 = { + 'contents': { + 'torrent_filehash': { + 'type': 'dir', + 'contents': { + 'tull.txt': { + 'type': 'file', + 'path': 'torrent_filehash/tull.txt', + 'length': 54, + 'index': 0, + 'download': True, + }, + '還在一個人無聊嗎~還不趕緊上來聊天美.txt': { + 'type': 'file', + 'path': 'torrent_filehash/還在一個人無聊嗎~還不趕緊上來聊天美.txt', + 'length': 54, + 'index': 1, + 'download': True, + }, + }, + 'length': 108, + 'download': True, + } + }, + 'type': 'dir', + } + ti = TorrentInfo(filename, filetree=2) + assert ti.files_tree == files_tree2 + + def test_hash_optional_md5sum(self): + # Ensure `md5sum` key is not included in filetree output + filename = common.get_test_data_file('md5sum.torrent') + files_tree = {'test': {'lol': (0, 4, True), 'rofl': (1, 5, True)}} + ti = TorrentInfo(filename, filetree=1) + assert ti.files_tree == files_tree + ti = TorrentInfo(filename, filetree=2) + files_tree2 = { + 'contents': { + 'test': { + 'type': 'dir', + 'contents': { + 'lol': { + 'type': 'file', + 'path': 'test/lol', + 'index': 0, + 'length': 4, + 'download': True, + }, + 'rofl': { + 'type': 'file', + 'path': 'test/rofl', + 'index': 1, + 'length': 5, + 'download': True, + }, + }, + 'length': 9, + 'download': True, + } + }, + 'type': 'dir', + } + assert ti.files_tree == files_tree2 + + def test_utf8_encoded_paths(self): + filename = common.get_test_data_file('test.torrent') + ti = TorrentInfo(filename) + assert 'azcvsupdater_2.6.2.jar' in ti.files_tree + + def test_utf8_encoded_paths2(self): + filename = common.get_test_data_file('unicode_filenames.torrent') + filepath1 = '\u30c6\u30af\u30b9\u30fb\u30c6\u30af\u30b5\u30f3.mkv' + filepath2 = ( + '\u041c\u0438\u0445\u0430\u0438\u043b \u0413\u043e' + '\u0440\u0431\u0430\u0447\u0451\u0432.mkv' + ) + filepath3 = "Alisher ibn G'iyosiddin Navoiy.mkv" + filepath4 = 'Ascii title.mkv' + filepath5 = '\u09b8\u09c1\u0995\u09c1\u09ae\u09be\u09b0 \u09b0\u09be\u09df.mkv' + + ti = TorrentInfo(filename) + files_tree = ti.files_tree['unicode_filenames'] + assert filepath1 in files_tree + assert filepath2 in files_tree + assert filepath3 in files_tree + assert filepath4 in files_tree + assert filepath5 in files_tree + + result_files = [ + { + 'download': True, + 'path': 'unicode_filenames/' + filepath3, + 'size': 126158658, + }, + { + 'download': True, + 'path': 'unicode_filenames/' + filepath4, + 'size': 189321363, + }, + { + 'download': True, + 'path': 'unicode_filenames/' + filepath2, + 'size': 106649699, + }, + { + 'download': True, + 'path': 'unicode_filenames/' + filepath5, + 'size': 21590269, + }, + {'download': True, 'path': 'unicode_filenames/' + filepath1, 'size': 1771}, + ] + + assert len(ti.files) == len(result_files) + + def test_directory_with_single_file(self): + filename = common.get_test_data_file('dir_with_single_file.torrent') + + ti = TorrentInfo(filename) + expected_file_tree = {'dir_with_single_file': {'single_file.txt': (0, 9, True)}} + assert ti.files_tree == expected_file_tree + + result_files = [ + { + 'path': 'dir_with_single_file/single_file.txt', + 'size': 9, + 'download': True, + } + ] + assert ti.files == result_files + + def test_bittorrent_v2_path(self): + filename = common.get_test_data_file('v2_test.torrent') + files_tree = { + 'torrent_test': { + 'small.txt': (0, 22, True), + '還在一個人無聊嗎~還不趕緊上來聊天美.txt': (1, 32, True), + } + } + ti = TorrentInfo(filename, filetree=1) + assert ti.files_tree == files_tree + + files_tree2 = { + 'contents': { + 'torrent_test': { + 'type': 'dir', + 'contents': { + 'small.txt': { + 'type': 'file', + 'path': 'torrent_test/small.txt', + 'length': 22, + 'index': 0, + 'download': True, + }, + '還在一個人無聊嗎~還不趕緊上來聊天美.txt': { + 'type': 'file', + 'path': 'torrent_test/還在一個人無聊嗎~還不趕緊上來聊天美.txt', + 'length': 32, + 'index': 1, + 'download': True, + }, + }, + 'length': 54, + 'download': True, + } + }, + 'type': 'dir', + } + ti = TorrentInfo(filename, filetree=2) + assert ti.files_tree == files_tree2 + + def test_bittorrent_v2_hybrid_path(self): + filename = common.get_test_data_file('v2_hybrid.torrent') + files_tree = { + 'torrent_test': { + 'small.txt': (0, 22, True), + '還在一個人無聊嗎~還不趕緊上來聊天美.txt': (2, 32, True), + '.pad': { + '16362': (1, 16362, True), + '16352': (3, 16352, True), + }, + } + } + ti = TorrentInfo(filename, filetree=1, force_bt_version=1) + assert ti.files_tree == files_tree + del files_tree['torrent_test']['.pad'] + files_tree['torrent_test']['還在一個人無聊嗎~還不趕緊上來聊天美.txt'] = (1, 32, True) + ti = TorrentInfo(filename, filetree=1, force_bt_version=2) + assert ti.files_tree == files_tree + + files_tree2 = { + 'contents': { + 'torrent_test': { + 'type': 'dir', + 'contents': { + 'small.txt': { + 'type': 'file', + 'path': 'torrent_test/small.txt', + 'length': 22, + 'index': 0, + 'download': True, + }, + '還在一個人無聊嗎~還不趕緊上來聊天美.txt': { + 'type': 'file', + 'path': 'torrent_test/還在一個人無聊嗎~還不趕緊上來聊天美.txt', + 'length': 32, + 'index': 2, + 'download': True, + }, + '.pad': { + 'type': 'dir', + 'contents': { + '16362': { + 'type': 'file', + 'path': 'torrent_test/.pad/16362', + 'length': 16362, + 'index': 1, + 'download': True, + }, + '16352': { + 'type': 'file', + 'path': 'torrent_test/.pad/16352', + 'length': 16352, + 'index': 3, + 'download': True, + }, + }, + 'length': 32714, + 'download': True, + }, + }, + 'length': 32768, + 'download': True, + } + }, + 'type': 'dir', + } + ti = TorrentInfo(filename, filetree=2, force_bt_version=1) + assert ti.files_tree == files_tree2 + torrent_test = files_tree2['contents']['torrent_test'] + torrent_test['length'] -= torrent_test['contents']['.pad']['length'] + del torrent_test['contents']['.pad'] + torrent_test['contents']['還在一個人無聊嗎~還不趕緊上來聊天美.txt']['index'] = 1 + ti = TorrentInfo(filename, filetree=2, force_bt_version=2) + assert ti.files_tree == files_tree2 diff --git a/deluge/tests/test_ui_console.py b/deluge/tests/test_ui_console.py new file mode 100644 index 0000000..34398ee --- /dev/null +++ b/deluge/tests/test_ui_console.py @@ -0,0 +1,80 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import argparse + +import pytest + +from deluge.ui.console.cmdline.commands.add import Command +from deluge.ui.console.cmdline.commands.config import json_eval +from deluge.ui.console.widgets.fields import TextInput + + +class MockParent: + def __init__(self): + self.border_off_x = 1 + self.pane_width = 20 + self.encoding = 'utf8' + + +class TestUIConsoleField: + @pytest.fixture(autouse=True) + def set_up(self): + self.parent = MockParent() + + def test_text_input(self): + def move_func(self, r, c): + self._cursor_row = r + self._cursor_col = c + + t = TextInput( + self.parent, + 'name', + 'message', + move_func, + 20, + '/text/field/file/path', + complete=False, + ) + assert t + assert t.handle_read(33) + + +class TestUIConsoleCommands: + def test_add_move_completed(self): + completed_path = 'completed_path' + parser = argparse.ArgumentParser() + cmd = Command() + cmd.add_arguments(parser) + args = parser.parse_args(['torrent', '-m', completed_path]) + assert args.move_completed_path == completed_path + args = parser.parse_args(['torrent', '--move-path', completed_path]) + assert args.move_completed_path == completed_path + + def test_config_json_eval(self): + assert json_eval('/downloads') == '/downloads' + assert json_eval('/dir/with space') == '/dir/with space' + assert json_eval('c:\\\\downloads') == 'c:\\\\downloads' + assert json_eval('c:/downloads') == 'c:/downloads' + # Ensure newlines are split and only first setting is used. + assert json_eval('setting\nwithneline') == 'setting' + # Allow both parentheses and square brackets. + assert json_eval('(8000, 8001)') == [8000, 8001] + assert json_eval('[8000, 8001]') == [8000, 8001] + assert json_eval('["abc", "def"]') == ['abc', 'def'] + assert json_eval('{"foo": "bar"}') == {'foo': 'bar'} + assert json_eval('{"number": 1234}') == {'number': 1234} + # Hex string for peer_tos. + assert json_eval('0x00') == '0x00' + assert json_eval('1000') == 1000 + assert json_eval('-6') == -6 + assert json_eval('10.5') == 10.5 + assert json_eval('True') + assert not json_eval('false') + assert json_eval('none') is None + # Empty values to clear config key. + assert json_eval('[]') == [] + assert json_eval('') == '' diff --git a/deluge/tests/test_ui_entry.py b/deluge/tests/test_ui_entry.py new file mode 100644 index 0000000..9a1330e --- /dev/null +++ b/deluge/tests/test_ui_entry.py @@ -0,0 +1,440 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import argparse +import sys +from io import StringIO +from unittest import mock + +import pytest +import pytest_twisted +from twisted.internet import defer + +import deluge +import deluge.component as component +import deluge.ui.console +import deluge.ui.console.cmdline.commands.quit +import deluge.ui.console.main +import deluge.ui.web.server +from deluge.common import get_localhost_auth, windows_check +from deluge.conftest import BaseTestCase +from deluge.ui import ui_entry +from deluge.ui.web.server import DelugeWeb + +from . import common +from .daemon_base import DaemonBase + +DEBUG_COMMAND = False + +sys_stdout = sys.stdout +# To catch output to stdout/stderr while running unit tests, we patch +# the file descriptors in sys and argparse._sys with StringFileDescriptor. +# Regular print statements from such tests will therefore write to the +# StringFileDescriptor object instead of the terminal. +# To print to terminal from the tests, use: print('Message...', file=sys_stdout) + + +class StringFileDescriptor: + """File descriptor that writes to string buffer""" + + def __init__(self, fd): + self.out = StringIO() + self.fd = fd + for a in ['encoding']: + setattr(self, a, getattr(sys_stdout, a)) + + def write(self, *data, **kwargs): + data_string = str(*data) + print(data_string, file=self.out, end='') + + def flush(self): + self.out.flush() + + +class UIBaseTestCase: + def set_up(self): + common.setup_test_logger(level='info', prefix=self.config_dir / self.id()) + return component.start() + + def tear_down(self): + return component.shutdown() + + def exec_command(self): + if DEBUG_COMMAND: + print('Executing: %s\n' % sys.argv, file=sys_stdout) + return self.var['start_cmd']() + + +class UIWithDaemonBaseTestCase(UIBaseTestCase, DaemonBase): + """Subclass for test that require a deluged daemon""" + + def set_up(self): + d = self.common_set_up() + common.setup_test_logger(level='info', prefix=self.config_dir / self.id()) + return d + + +class TestDelugeEntry(BaseTestCase): + def set_up(self): + return component.start() + + def tear_down(self): + return component.shutdown() + + def test_deluge_help(self): + self.patch(sys, 'argv', ['./deluge', '-h']) + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + with pytest.raises(SystemExit): + ui_entry.start_ui() + assert 'usage: deluge' in fd.out.getvalue() + assert 'UI Options:' in fd.out.getvalue() + assert '* console' in fd.out.getvalue() + + def test_start_default(self): + self.patch(sys, 'argv', ['./deluge']) + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + # Just test that no exception is raised + ui_entry.start_ui() + + def test_start_with_log_level(self): + _level = [] + + def setup_logger( + level='error', + filename=None, + filemode='w', + logrotate=None, + output_stream=sys.stdout, + ): + _level.append(level) + + self.patch(deluge.log, 'setup_logger', setup_logger) + self.patch(sys, 'argv', ['./deluge', '-L', 'info']) + + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + # Just test that no exception is raised + ui_entry.start_ui() + + assert _level[0] == 'info' + + +class GtkUIBaseTestCase(UIBaseTestCase): + """Implement all GtkUI tests here""" + + def test_start_gtk3ui(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd']) + + from deluge.ui.gtk3 import gtkui + + with mock.patch.object(gtkui.GtkUI, 'start', autospec=True): + self.exec_command() + + +@pytest.mark.gtkui +class TestGtkUIDelugeScriptEntry(BaseTestCase, GtkUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge gtk', + 'start_cmd': ui_entry.start_ui, + 'sys_arg_cmd': ['./deluge', 'gtk'], + } + + +@pytest.mark.gtkui +class TestGtkUIScriptEntry(BaseTestCase, GtkUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + from deluge.ui import gtk3 + + request.cls.var = { + 'cmd_name': 'deluge-gtk', + 'start_cmd': gtk3.start, + 'sys_arg_cmd': ['./deluge-gtk'], + } + + +class DelugeWebMock(DelugeWeb): + def __init__(self, *args, **kwargs): + kwargs['daemon'] = False + DelugeWeb.__init__(self, *args, **kwargs) + + +class WebUIBaseTestCase(UIBaseTestCase): + """Implement all WebUI tests here""" + + def test_start_webserver(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd']) + self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock) + self.exec_command() + + def test_start_web_with_log_level(self): + _level = [] + + def setup_logger( + level='error', + filename=None, + filemode='w', + logrotate=None, + output_stream=sys.stdout, + ): + _level.append(level) + + self.patch(deluge.log, 'setup_logger', setup_logger) + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info']) + + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'web' + config.save() + + self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock) + self.exec_command() + assert _level[0] == 'info' + + +class TestWebUIScriptEntry(BaseTestCase, WebUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge-web', + 'start_cmd': deluge.ui.web.start, + 'sys_arg_cmd': ['./deluge-web'], + } + if not windows_check(): + request.cls.var['sys_arg_cmd'].append('--do-not-daemonize') + + +class TestWebUIDelugeScriptEntry(BaseTestCase, WebUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge web', + 'start_cmd': ui_entry.start_ui, + 'sys_arg_cmd': ['./deluge', 'web'], + } + if not windows_check(): + request.cls.var['sys_arg_cmd'].append('--do-not-daemonize') + + +class ConsoleUIBaseTestCase(UIBaseTestCase): + """Implement Console tests that do not require a running daemon""" + + def test_start_console(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd']) + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.exec_command() + + def test_start_console_with_log_level(self, request): + _level = [] + + def setup_logger( + level='error', + filename=None, + filemode='w', + logrotate=None, + output_stream=sys.stdout, + ): + _level.append(level) + + self.patch(deluge.log, 'setup_logger', setup_logger) + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-L', 'info']) + + config = deluge.configmanager.ConfigManager('ui.conf', ui_entry.DEFAULT_PREFS) + config.config['default_ui'] = 'console' + config.save() + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + # Just test that no exception is raised + self.exec_command() + + assert _level[0] == 'info' + + def test_console_help(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-h']) + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + with pytest.raises(SystemExit): + self.exec_command() + std_output = fd.out.getvalue() + assert ( + 'usage: %s' % self.var['cmd_name'] + ) in std_output # Check command name + assert 'Common Options:' in std_output + assert 'Console Options:' in std_output + assert ( + 'Console Commands:\n The following console commands are available:' + in std_output + ) + assert 'The following console commands are available:' in std_output + + def test_console_command_info(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info']) + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + self.exec_command() + + def test_console_command_info_help(self): + self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info', '-h']) + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stdout', fd) + + with mock.patch('deluge.ui.console.main.ConsoleUI'): + with pytest.raises(SystemExit): + self.exec_command() + std_output = fd.out.getvalue() + assert 'usage: info' in std_output + assert 'Show information about the torrents' in std_output + + def test_console_unrecognized_arguments(self): + self.patch( + sys, 'argv', ['./deluge', '--ui', 'console'] + ) # --ui is not longer supported + fd = StringFileDescriptor(sys.stdout) + self.patch(argparse._sys, 'stderr', fd) + with mock.patch('deluge.ui.console.main.ConsoleUI'): + with pytest.raises(SystemExit): + self.exec_command() + assert 'unrecognized arguments: --ui' in fd.out.getvalue() + + +class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase): + """Implement Console tests that require a running daemon""" + + def set_up(self): + # Avoid calling reactor.shutdown after commands are executed by main.exec_args() + deluge.ui.console.main.reactor = common.ReactorOverride() + return UIWithDaemonBaseTestCase.set_up(self) + + def patch_arg_command(self, command): + if type(command) == str: + command = [command] + username, password = get_localhost_auth() + self.patch( + sys, + 'argv', + self.var['sys_arg_cmd'] + + ['--port'] + + [str(self.listen_port)] + + ['--username'] + + [username] + + ['--password'] + + [password] + + command, + ) + + @pytest_twisted.inlineCallbacks + def test_console_command_add(self): + filename = common.get_test_data_file('test.torrent') + self.patch_arg_command([f'add "{filename}"']) + fd = StringFileDescriptor(sys.stdout) + self.patch(sys, 'stdout', fd) + + yield self.exec_command() + + std_output = fd.out.getvalue() + assert ( + std_output + == 'Attempting to add torrent: ' + filename + '\nTorrent added!\n' + ) + + @pytest_twisted.inlineCallbacks + def test_console_command_add_move_completed(self): + filename = common.get_test_data_file('test.torrent') + tmp_path = 'c:\\tmp' if windows_check() else '/tmp' + self.patch_arg_command( + [ + f'add --move-path "{tmp_path}" "{filename}" ; status' + ' ; manage' + ' ab570cdd5a17ea1b61e970bb72047de141bce173' + ' move_completed' + ' move_completed_path' + ] + ) + fd = StringFileDescriptor(sys.stdout) + self.patch(sys, 'stdout', fd) + + yield self.exec_command() + + std_output = fd.out.getvalue() + assert std_output.endswith( + f'move_completed: True\nmove_completed_path: {tmp_path}\n' + ) or std_output.endswith( + f'move_completed_path: {tmp_path}\nmove_completed: True\n' + ) + + async def test_console_command_status(self): + fd = StringFileDescriptor(sys.stdout) + self.patch_arg_command(['status']) + self.patch(sys, 'stdout', fd) + + await self.exec_command() + + std_output = fd.out.getvalue() + assert std_output.startswith('Total upload: ') + assert std_output.endswith(' Moving: 0\n') + + @defer.inlineCallbacks + def test_console_command_config_set_download_location(self): + fd = StringFileDescriptor(sys.stdout) + self.patch_arg_command(['config --set download_location /downloads']) + self.patch(sys, 'stdout', fd) + + yield self.exec_command() + std_output = fd.out.getvalue() + assert std_output.startswith('Setting "download_location" to: \'/downloads\'') + assert std_output.endswith('Configuration value successfully updated.\n') + + +@pytest.mark.usefixtures('daemon', 'client') +class TestConsoleScriptEntryWithDaemon(BaseTestCase, ConsoleUIWithDaemonBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge-console', + 'start_cmd': deluge.ui.console.test_start, + 'sys_arg_cmd': ['./deluge-console'], + } + + +class TestConsoleScriptEntry(BaseTestCase, ConsoleUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge-console', + 'start_cmd': deluge.ui.console.start, + 'sys_arg_cmd': ['./deluge-console'], + } + + +class TestConsoleDelugeScriptEntry(BaseTestCase, ConsoleUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge console', + 'start_cmd': ui_entry.start_ui, + 'sys_arg_cmd': ['./deluge', 'console'], + } diff --git a/deluge/tests/test_ui_gtk3.py b/deluge/tests/test_ui_gtk3.py new file mode 100644 index 0000000..e6d025c --- /dev/null +++ b/deluge/tests/test_ui_gtk3.py @@ -0,0 +1,30 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import sys +from unittest import mock + +import pytest + + +@pytest.mark.gtkui +class TestGTK3Common: + def setUp(self): + sys.modules['gi.repository'] = mock.MagicMock() + + def tearDown(self): + pass + + def test_cmp(self): + from deluge.ui.gtk3.common import cmp + + assert cmp(None, None) == 0 + assert cmp(1, None) == 1 + assert cmp(0, None) == 1 + assert cmp(None, 7) == -1 + assert cmp(None, 'bar') == -1 + assert cmp('foo', None) == 1 + assert cmp('', None) == 1 diff --git a/deluge/tests/test_web_api.py b/deluge/tests/test_web_api.py new file mode 100644 index 0000000..814fecf --- /dev/null +++ b/deluge/tests/test_web_api.py @@ -0,0 +1,202 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import json +from io import BytesIO + +import pytest +import pytest_twisted +from twisted.internet import defer, reactor +from twisted.web.client import Agent, FileBodyProducer +from twisted.web.http_headers import Headers +from twisted.web.static import File + +import deluge.component as component + +from . import common +from .common_web import WebServerTestBase + +common.disable_new_release_check() + + +class TestWebAPI(WebServerTestBase): + @pytest.mark.xfail(reason='This just logs an error at the moment.') + async def test_connect_invalid_host(self): + with pytest.raises(Exception): + await self.deluge_web.web_api.connect('id') + + def test_connect(self, client): + d = self.deluge_web.web_api.connect(self.host_id) + + def on_connect(result): + assert type(result) == tuple + assert len(result) > 0 + return result + + d.addCallback(on_connect) + d.addErrback(self.fail) + return d + + def test_disconnect(self): + d = self.deluge_web.web_api.connect(self.host_id) + + @defer.inlineCallbacks + def on_connect(result): + assert self.deluge_web.web_api.connected() + yield self.deluge_web.web_api.disconnect() + assert not self.deluge_web.web_api.connected() + + d.addCallback(on_connect) + d.addErrback(self.fail) + return d + + def test_get_config(self): + config = self.deluge_web.web_api.get_config() + assert self.deluge_web.port == config['port'] + + def test_set_config(self): + config = self.deluge_web.web_api.get_config() + config['pwd_salt'] = 'new_salt' + config['pwd_sha1'] = 'new_sha' + config['sessions'] = { + '233f23632af0a74748bc5dd1d8717564748877baa16420e6898e17e8aa365e6e': { + 'login': 'skrot', + 'expires': 1460030877.0, + 'level': 10, + } + } + self.deluge_web.web_api.set_config(config) + web_config = component.get('DelugeWeb').config.config + assert config['pwd_salt'] != web_config['pwd_salt'] + assert config['pwd_sha1'] != web_config['pwd_sha1'] + assert config['sessions'] != web_config['sessions'] + + @defer.inlineCallbacks + def get_host_status(self): + host = list(self.deluge_web.web_api._get_host(self.host_id)) + host[3] = 'Online' + host[4] = '2.0.0.dev562' + status = yield self.deluge_web.web_api.get_host_status(self.host_id) + assert status == tuple(status) + + def test_get_host(self): + assert not self.deluge_web.web_api._get_host('invalid_id') + conn = list(self.deluge_web.web_api.hostlist.get_hosts_info()[0]) + assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4] + + def test_add_host(self): + conn = ['abcdef', '10.0.0.1', 0, 'user123', 'pass123'] + assert not self.deluge_web.web_api._get_host(conn[0]) + # Add valid host + result, host_id = self.deluge_web.web_api.add_host( + conn[1], conn[2], conn[3], conn[4] + ) + assert result + conn[0] = host_id + assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4] + + # Add already existing host + ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4]) + assert ret == (False, 'Host details already in hostlist') + + # Add invalid port + conn[2] = 'bad port' + ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4]) + assert ret == (False, 'Invalid port. Must be an integer') + + def test_remove_host(self): + conn = ['connection_id', '', 0, '', ''] + self.deluge_web.web_api.hostlist.config['hosts'].append(conn) + assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4] + # Remove valid host + assert self.deluge_web.web_api.remove_host(conn[0]) + assert not self.deluge_web.web_api._get_host(conn[0]) + # Remove non-existing host + assert not self.deluge_web.web_api.remove_host(conn[0]) + + def test_get_torrent_info(self): + filename = common.get_test_data_file('test.torrent') + ret = self.deluge_web.web_api.get_torrent_info(filename) + assert ret['name'] == 'azcvsupdater_2.6.2.jar' + assert ret['info_hash'] == 'ab570cdd5a17ea1b61e970bb72047de141bce173' + assert 'files_tree' in ret + + def test_get_torrent_info_with_md5(self): + filename = common.get_test_data_file('md5sum.torrent') + ret = self.deluge_web.web_api.get_torrent_info(filename) + # JSON dumping happens during response creation in normal usage + # JSON serialization may fail if any of the dictionary items are byte arrays rather than strings + ret = json.loads(json.dumps(ret)) + assert ret['name'] == 'test' + assert ret['info_hash'] == 'f6408ba9944cf9fe01b547b28f336b3ee6ec32c5' + assert 'files_tree' in ret + + def test_get_magnet_info(self): + ret = self.deluge_web.web_api.get_magnet_info( + 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN' + ) + assert ret['name'] == '953bad769164e8482c7785a21d12166f94b9e14d' + assert ret['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d' + assert 'files_tree' in ret + + @pytest_twisted.inlineCallbacks + def test_get_torrent_files(self): + yield self.deluge_web.web_api.connect(self.host_id) + filename = common.get_test_data_file('test.torrent') + torrents = [ + {'path': filename, 'options': {'download_location': '/home/deluge/'}} + ] + yield self.deluge_web.web_api.add_torrents(torrents) + ret = yield self.deluge_web.web_api.get_torrent_files( + 'ab570cdd5a17ea1b61e970bb72047de141bce173' + ) + assert ret['type'] == 'dir' + assert ret['contents'] == { + 'azcvsupdater_2.6.2.jar': { + 'priority': 4, + 'index': 0, + 'offset': 0, + 'progress': 0.0, + 'path': 'azcvsupdater_2.6.2.jar', + 'type': 'file', + 'size': 307949, + } + } + + @pytest_twisted.inlineCallbacks + def test_download_torrent_from_url(self): + filename = 'ubuntu-9.04-desktop-i386.iso.torrent' + self.deluge_web.top_level.putChild( + filename.encode(), File(common.get_test_data_file(filename)) + ) + url = 'http://localhost:%d/%s' % (self.deluge_web.port, filename) + res = yield self.deluge_web.web_api.download_torrent_from_url(url) + assert res.endswith(filename) + + @pytest_twisted.inlineCallbacks + def test_invalid_json(self): + """ + If json_api._send_response does not return server.NOT_DONE_YET + this error is thrown when json is invalid: + exceptions.RuntimeError: Request.write called on a request after Request.finish was called. + + """ + agent = Agent(reactor) + bad_body = b'{ method": "auth.login" }' + d = yield agent.request( + b'POST', + b'http://127.0.0.1:%i/json' % self.deluge_web.port, + Headers( + { + b'User-Agent': [b'Twisted Web Client Example'], + b'Content-Type': [b'application/json'], + } + ), + FileBodyProducer(BytesIO(bad_body)), + ) + yield d diff --git a/deluge/tests/test_web_auth.py b/deluge/tests/test_web_auth.py new file mode 100644 index 0000000..39d66c1 --- /dev/null +++ b/deluge/tests/test_web_auth.py @@ -0,0 +1,33 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +from unittest.mock import patch + +from deluge.ui.web import auth + + +class MockConfig: + def __init__(self, config): + self.config = config + + def __getitem__(self, key): + return self.config[key] + + def __setitem__(self, key, value): + self.config[key] = value + + +class TestWebAuth: + @patch('deluge.ui.web.auth.JSONComponent.__init__', return_value=None) + def test_change_password(self, mock_json): + config = MockConfig( + { + 'pwd_sha1': '8d8ff487626141d2b91025901d3ab57211180b48', + 'pwd_salt': '7555d757710158655bd1646e207dee21a89e9226', + } + ) + webauth = auth.Auth(config) + assert webauth.change_password('deluge', 'deluge_new') diff --git a/deluge/tests/test_webserver.py b/deluge/tests/test_webserver.py new file mode 100644 index 0000000..9503f50 --- /dev/null +++ b/deluge/tests/test_webserver.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import json as json_lib +from io import BytesIO + +import pytest +import twisted.web.client +from twisted.internet import reactor +from twisted.web.client import Agent, FileBodyProducer +from twisted.web.http_headers import Headers + +from . import common +from .common import get_test_data_file +from .common_web import WebServerMockBase, WebServerTestBase + +common.disable_new_release_check() + + +class TestWebServer(WebServerTestBase, WebServerMockBase): + async def test_get_torrent_info(self): + agent = Agent(reactor) + + self.mock_authentication_ignore(self.deluge_web.auth) + + # This torrent file contains an uncommon field 'filehash' which must be hex + # encoded to allow dumping the torrent info to json. Otherwise it will fail with: + # UnicodeDecodeError: 'utf8' codec can't decode byte 0xe5 in position 0: invalid continuation byte + filename = get_test_data_file('filehash_field.torrent') + input_file = ( + '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}' + % filename.replace('\\', '\\\\') + ) + headers = { + b'User-Agent': ['Twisted Web Client Example'], + b'Content-Type': ['application/json'], + } + url = 'http://127.0.0.1:%s/json' % self.deluge_web.port + + response = await agent.request( + b'POST', + url.encode(), + Headers(headers), + FileBodyProducer(BytesIO(input_file.encode())), + ) + body = await twisted.web.client.readBody(response) + + try: + json = json_lib.loads(body.decode()) + except Exception: + print('aoeu') + assert json['error'] is None + assert 'torrent_filehash' == json['result']['name'] + + @pytest.mark.parametrize('base', ['', '/', 'deluge']) + async def test_base_with_config(self, base): + agent = Agent(reactor) + root_url = f'http://127.0.0.1:{self.deluge_web.port}' + base_url = f'{root_url}/{base}' + + self.deluge_web.base = base + + response = await agent.request(b'GET', root_url.encode()) + assert response.code == 200 + body = await twisted.web.client.readBody(response) + assert 'Deluge WebUI' in body.decode() + + response = await agent.request(b'GET', base_url.encode()) + assert response.code == 200 + + @pytest.mark.parametrize('base', ['/', 'deluge']) + async def test_base_with_config_recurring_basepath(self, base): + agent = Agent(reactor) + base_url = f'http://127.0.0.1:{self.deluge_web.port}/{base}' + + self.deluge_web.base = base + + response = await agent.request(b'GET', base_url.encode()) + assert response.code == 200 + + recursive_url = f'{base_url}/{base}' + response = await agent.request(b'GET', recursive_url.encode()) + assert response.code == 404 if base.strip('/') else 200 + + recursive_url = f'{recursive_url}/{base}' + response = await agent.request(b'GET', recursive_url.encode()) + assert response.code == 404 if base.strip('/') else 200 + + async def test_base_with_deluge_header(self): + """Ensure base path is set and HTML contains path""" + agent = Agent(reactor) + base = 'deluge' + url = f'http://127.0.0.1:{self.deluge_web.port}' + headers = Headers({'X-Deluge-Base': [base]}) + + response = await agent.request(b'GET', url.encode(), headers) + body = await twisted.web.client.readBody(response) + assert f'href="/{base}/' in body.decode() + + # Header only changes HTML base path so ensure no resource at server path + url = f'{url}/{base}' + response = await agent.request(b'GET', url.encode(), headers) + assert response.code == 404 |