summaryrefslogtreecommitdiffstats
path: root/deluge/tests/test_core.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/tests/test_core.py')
-rw-r--r--deluge/tests/test_core.py511
1 files changed, 511 insertions, 0 deletions
diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py
new file mode 100644
index 0000000..28b5902
--- /dev/null
+++ b/deluge/tests/test_core.py
@@ -0,0 +1,511 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import base64
+import os
+from base64 import b64encode
+from hashlib import sha1 as sha
+
+import pytest
+import pytest_twisted
+from twisted.internet import defer, reactor, task
+from twisted.internet.error import CannotListenError
+from twisted.web.http import FORBIDDEN
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory, Site
+from twisted.web.static import File
+
+import deluge.common
+import deluge.component as component
+import deluge.core.torrent
+from deluge._libtorrent import lt
+from deluge.conftest import BaseTestCase
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer
+from deluge.error import AddTorrentError, InvalidTorrentError
+
+from . import common
+
+common.disable_new_release_check()
+
+
+class CookieResource(Resource):
+ def render(self, request):
+ if request.getCookie(b'password') != b'deluge':
+ request.setResponseCode(FORBIDDEN)
+ return
+
+ request.setHeader(b'Content-Type', b'application/x-bittorrent')
+ with open(
+ common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
+ ) as _file:
+ data = _file.read()
+ return data
+
+
+class PartialDownload(Resource):
+ def getChild(self, path, request): # NOQA: N802
+ return EncodingResourceWrapper(self, [GzipEncoderFactory()])
+
+ def render(self, request):
+ with open(
+ common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
+ ) as _file:
+ data = _file.read()
+ request.setHeader(b'Content-Length', str(len(data)))
+ request.setHeader(b'Content-Type', b'application/x-bittorrent')
+ return data
+
+
+class RedirectResource(Resource):
+ def render(self, request):
+ request.redirect(b'/ubuntu-9.04-desktop-i386.iso.torrent')
+ return b''
+
+
+class TopLevelResource(Resource):
+ def __init__(self):
+ Resource.__init__(self)
+ self.putChild(b'cookie', CookieResource())
+ self.putChild(b'partial', PartialDownload())
+ self.putChild(b'redirect', RedirectResource())
+ self.putChild(
+ b'ubuntu-9.04-desktop-i386.iso.torrent',
+ File(common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent')),
+ )
+
+
+class TestCore(BaseTestCase):
+ def set_up(self):
+ self.rpcserver = RPCServer(listen=False)
+ self.core: Core = Core()
+ self.core.config.config['lsd'] = False
+ self.clock = task.Clock()
+ self.core.torrentmanager.clock = self.clock
+ self.listen_port = 51242
+ return component.start().addCallback(self.start_web_server)
+
+ def start_web_server(self, result):
+ website = Site(TopLevelResource())
+ for dummy in range(10):
+ try:
+ self.webserver = reactor.listenTCP(self.listen_port, website)
+ except CannotListenError as ex:
+ error = ex
+ self.listen_port += 1
+ else:
+ break
+ else:
+ raise error
+
+ return result
+
+ def tear_down(self):
+ def on_shutdown(result):
+ del self.rpcserver
+ del self.core
+ return self.webserver.stopListening()
+
+ return component.shutdown().addCallback(on_shutdown)
+
+ def add_torrent(self, filename, paused=False):
+ if not paused:
+ # Patch libtorrent flags starting torrents paused
+ self.patch(
+ deluge.core.torrentmanager,
+ 'LT_DEFAULT_ADD_TORRENT_FLAGS',
+ lt.torrent_flags.auto_managed
+ | lt.torrent_flags.update_subscribe
+ | lt.torrent_flags.apply_ip_filter,
+ )
+ options = {'add_paused': paused, 'auto_managed': False}
+ filepath = common.get_test_data_file(filename)
+ with open(filepath, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = self.core.add_torrent_file(filename, filedump, options)
+ return torrent_id
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_files(self):
+ options = {}
+ filenames = ['test.torrent', 'test_torrent.file.torrent']
+ files_to_add = []
+ for f in filenames:
+ filename = common.get_test_data_file(f)
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ files_to_add.append((filename, filedump, options))
+ errors = yield self.core.add_torrent_files(files_to_add)
+ assert len(errors) == 0
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_files_error_duplicate(self):
+ options = {}
+ filenames = ['test.torrent', 'test.torrent']
+ files_to_add = []
+ for f in filenames:
+ filename = common.get_test_data_file(f)
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ files_to_add.append((filename, filedump, options))
+ errors = yield self.core.add_torrent_files(files_to_add)
+ assert len(errors) == 1
+ assert str(errors[0]).startswith('Torrent already in session')
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_file(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+
+ # Get the info hash from the test.torrent
+ from deluge.bencode import bdecode, bencode
+
+ with open(filename, 'rb') as _file:
+ info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest()
+ assert torrent_id == info_hash
+
+ def test_add_torrent_file_invalid_filedump(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with pytest.raises(AddTorrentError):
+ self.core.add_torrent_file(filename, False, options)
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_url(self, mock_mkstemp):
+ url = (
+ 'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent'
+ % self.listen_port
+ )
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ torrent_id = yield self.core.add_torrent_url(url, options)
+ assert torrent_id == info_hash
+ assert not os.path.isfile(mock_mkstemp[1])
+
+ async def test_add_torrent_url_with_cookie(self):
+ url = 'http://localhost:%d/cookie' % self.listen_port
+ options = {}
+ headers = {'Cookie': 'password=deluge'}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ with pytest.raises(Exception):
+ await self.core.add_torrent_url(url, options)
+
+ result = await self.core.add_torrent_url(url, options, headers)
+ assert result == info_hash
+
+ async def test_add_torrent_url_with_redirect(self):
+ url = 'http://localhost:%d/redirect' % self.listen_port
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ result = await self.core.add_torrent_url(url, options)
+ assert result == info_hash
+
+ async def test_add_torrent_url_with_partial_download(self):
+ url = 'http://localhost:%d/partial' % self.listen_port
+ options = {}
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+
+ result = await self.core.add_torrent_url(url, options)
+ assert result == info_hash
+
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_magnet(self):
+ info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
+ tracker = 'udp://tracker.example.com'
+ name = 'test magnet'
+ uri = deluge.common.create_magnet_uri(info_hash, name=name, trackers=[tracker])
+ options = {}
+ torrent_id = yield self.core.add_torrent_magnet(uri, options)
+ assert torrent_id == info_hash
+ torrent_status = self.core.get_torrent_status(torrent_id, ['name', 'trackers'])
+ assert torrent_status['trackers'][0]['url'] == tracker
+ assert torrent_status['name'] == name
+
+ def test_resume_torrent(self):
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ # Assert paused
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ self.core.resume_torrent(tid2)
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ def test_resume_torrent_list(self):
+ """Backward compatibility for list of torrent_ids."""
+ torrent_id = self.add_torrent('test.torrent', paused=True)
+ self.core.resume_torrent([torrent_id])
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ assert not result['paused']
+
+ def test_resume_torrents(self):
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ self.core.resume_torrents([tid1, tid2])
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ def test_resume_torrents_all(self):
+ """With no torrent_ids param, resume all torrents"""
+ tid1 = self.add_torrent('test.torrent', paused=True)
+ tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
+ self.core.resume_torrents()
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ def test_pause_torrent(self):
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+ # Assert not paused
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert not r2['paused']
+
+ self.core.pause_torrent(tid2)
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert not r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ def test_pause_torrent_list(self):
+ """Backward compatibility for list of torrent_ids."""
+ torrent_id = self.add_torrent('test.torrent')
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ assert not result['paused']
+ self.core.pause_torrent([torrent_id])
+ result = self.core.get_torrent_status(torrent_id, ['paused'])
+ assert result['paused']
+
+ def test_pause_torrents(self):
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+
+ self.core.pause_torrents([tid1, tid2])
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ def test_pause_torrents_all(self):
+ """With no torrent_ids param, pause all torrents"""
+ tid1 = self.add_torrent('test.torrent')
+ tid2 = self.add_torrent('test_torrent.file.torrent')
+
+ self.core.pause_torrents()
+ r1 = self.core.get_torrent_status(tid1, ['paused'])
+ assert r1['paused']
+ r2 = self.core.get_torrent_status(tid2, ['paused'])
+ assert r2['paused']
+
+ @pytest_twisted.inlineCallbacks
+ def test_prefetch_metadata_existing(self):
+ """Check another call with same magnet returns existing deferred."""
+ magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
+
+ d1 = self.core.prefetch_magnet_metadata(magnet)
+ d2 = self.core.prefetch_magnet_metadata(magnet)
+ dg = defer.gatherResults([d1, d2], consumeErrors=True)
+ self.clock.advance(30)
+ result = yield dg
+ assert result == [expected] * 2
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrent(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+ removed = self.core.remove_torrent(torrent_id, True)
+ assert removed
+ assert len(self.core.get_session_state()) == 0
+
+ def test_remove_torrent_invalid(self):
+ with pytest.raises(InvalidTorrentError):
+ self.core.remove_torrent(
+ 'torrentidthatdoesntexist',
+ True,
+ )
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrents(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
+
+ filename2 = common.get_test_data_file('unicode_filenames.torrent')
+ with open(filename2, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id2 = yield self.core.add_torrent_file_async(
+ filename2, filedump, options
+ )
+ d = self.core.remove_torrents([torrent_id, torrent_id2], True)
+
+ def test_ret(val):
+ assert val == []
+
+ d.addCallback(test_ret)
+
+ def test_session_state(val):
+ assert len(self.core.get_session_state()) == 0
+
+ d.addCallback(test_session_state)
+ yield d
+
+ @pytest_twisted.inlineCallbacks
+ def test_remove_torrents_invalid(self):
+ options = {}
+ filename = common.get_test_data_file('test.torrent')
+ with open(filename, 'rb') as _file:
+ filedump = b64encode(_file.read())
+ torrent_id = yield self.core.add_torrent_file_async(
+ filename, filedump, options
+ )
+ val = yield self.core.remove_torrents(
+ ['invalidid1', 'invalidid2', torrent_id], False
+ )
+ assert len(val) == 2
+ assert val[0] == ('invalidid1', 'torrent_id invalidid1 not in session.')
+ assert val[1] == ('invalidid2', 'torrent_id invalidid2 not in session.')
+
+ def test_get_session_status(self):
+ status = self.core.get_session_status(
+ ['net.recv_tracker_bytes', 'net.sent_tracker_bytes']
+ )
+ assert isinstance(status, dict)
+ assert status['net.recv_tracker_bytes'] == 0
+ assert status['net.sent_tracker_bytes'] == 0
+
+ def test_get_session_status_all(self):
+ status = self.core.get_session_status([])
+ assert isinstance(status, dict)
+ assert 'upload_rate' in status
+ assert 'net.recv_bytes' in status
+
+ def test_get_session_status_depr(self):
+ status = self.core.get_session_status(['num_peers', 'num_unchoked'])
+ assert isinstance(status, dict)
+ assert status['num_peers'] == 0
+ assert status['num_unchoked'] == 0
+
+ def test_get_session_status_rates(self):
+ status = self.core.get_session_status(['upload_rate', 'download_rate'])
+ assert isinstance(status, dict)
+ assert status['upload_rate'] == 0
+
+ def test_get_session_status_ratio(self):
+ status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio'])
+ assert isinstance(status, dict)
+ assert status['write_hit_ratio'] == 0.0
+ assert status['read_hit_ratio'] == 0.0
+
+ def test_get_free_space(self):
+ space = self.core.get_free_space('.')
+ assert isinstance(space, int)
+ assert space >= 0
+ assert self.core.get_free_space('/someinvalidpath') == -1
+
+ @pytest.mark.slow
+ def test_test_listen_port(self):
+ d = self.core.test_listen_port()
+
+ def result(r):
+ assert r in (True, False)
+
+ d.addCallback(result)
+ return d
+
+ def test_sanitize_filepath(self):
+ pathlist = {
+ '\\backslash\\path\\': 'backslash/path',
+ ' single_file ': 'single_file',
+ '..': '',
+ '/../..../': '',
+ ' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file',
+ '/ test /\\.. /.file/': 'test/.file',
+ 'mytorrent/subfold/file1': 'mytorrent/subfold/file1',
+ 'Torrent/folder/': 'Torrent/folder',
+ }
+
+ for key in pathlist:
+ assert (
+ deluge.core.torrent.sanitize_filepath(key, folder=False)
+ == pathlist[key]
+ )
+
+ assert (
+ deluge.core.torrent.sanitize_filepath(key, folder=True)
+ == pathlist[key] + '/'
+ )
+
+ def test_get_set_config_values(self):
+ assert self.core.get_config_values(['abc', 'foo']) == {'foo': None, 'abc': None}
+ assert self.core.get_config_value('foobar') is None
+ self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'})
+ assert self.core.get_config_values(['foo', 'abc']) == {'foo': 10, 'abc': 'def'}
+ assert self.core.get_config_value('foobar') == 'barfoo'
+
+ def test_read_only_config_keys(self):
+ key = 'max_upload_speed'
+ self.core.read_only_config_keys = [key]
+
+ old_value = self.core.get_config_value(key)
+ self.core.set_config({key: old_value + 10})
+ new_value = self.core.get_config_value(key)
+ assert old_value == new_value
+
+ self.core.read_only_config_keys = None
+
+ def test__create_peer_id(self):
+ assert self.core._create_peer_id('2.0.0') == '-DE200s-'
+ assert self.core._create_peer_id('2.0.0.dev15') == '-DE200D-'
+ assert self.core._create_peer_id('2.0.1rc1') == '-DE201r-'
+ assert self.core._create_peer_id('2.11.0b2') == '-DE2B0b-'
+ assert self.core._create_peer_id('2.4.12b2.dev3') == '-DE24CD-'
+
+ @pytest.mark.parametrize(
+ 'path',
+ [
+ common.get_test_data_file('deluge.png'),
+ os.path.dirname(common.get_test_data_file('deluge.png')),
+ ],
+ )
+ @pytest.mark.parametrize('piece_length', [2**14, 2**16])
+ @pytest_twisted.inlineCallbacks
+ def test_create_torrent(self, path, tmp_path, piece_length):
+ target = tmp_path / 'test.torrent'
+
+ filename, filedump = yield self.core.create_torrent(
+ path=path,
+ tracker=None,
+ piece_length=piece_length,
+ target=target,
+ add_to_session=False,
+ )
+ filecontent = base64.b64decode(filedump)
+
+ with open(target, 'rb') as f:
+ assert f.read() == filecontent
+
+ lt.torrent_info(filecontent)