summaryrefslogtreecommitdiffstats
path: root/deluge/core/torrentmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/core/torrentmanager.py')
-rw-r--r--deluge/core/torrentmanager.py175
1 files changed, 91 insertions, 84 deletions
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py
index a7df501..5609df4 100644
--- a/deluge/core/torrentmanager.py
+++ b/deluge/core/torrentmanager.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
#
@@ -8,27 +7,33 @@
#
"""TorrentManager handles Torrent objects"""
-from __future__ import unicode_literals
-
import datetime
import logging
import operator
import os
+import pickle
import time
-from collections import namedtuple
+from base64 import b64encode
from tempfile import gettempdir
+from typing import Dict, List, NamedTuple, Tuple
-import six.moves.cPickle as pickle # noqa: N813
-from twisted.internet import defer, error, reactor, threads
+from twisted.internet import defer, reactor, threads
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall
import deluge.component as component
-from deluge._libtorrent import lt
-from deluge.common import archive_files, decode_bytes, get_magnet_info, is_magnet
+from deluge._libtorrent import LT_VERSION, lt
+from deluge.common import (
+ VersionSplit,
+ archive_files,
+ decode_bytes,
+ get_magnet_info,
+ is_magnet,
+)
from deluge.configmanager import ConfigManager, get_config_dir
from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath
+from deluge.decorators import maybe_coroutine
from deluge.error import AddTorrentError, InvalidTorrentError
from deluge.event import (
ExternalIPEvent,
@@ -52,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = (
)
+class PrefetchQueueItem(NamedTuple):
+ alert_deferred: Deferred
+ result_queue: List[Deferred]
+
+
class TorrentState: # pylint: disable=old-style-class
"""Create a torrent state.
@@ -89,7 +99,7 @@ class TorrentState: # pylint: disable=old-style-class
super_seeding=False,
name=None,
):
- # Build the class atrribute list from args
+ # Build the class attribute list from args
for key, value in locals().items():
if key == 'self':
continue
@@ -129,7 +139,8 @@ class TorrentManager(component.Component):
"""
- callLater = reactor.callLater # noqa: N815
+ # This is used in the test to mock out timeouts
+ clock = reactor
def __init__(self):
component.Component.__init__(
@@ -158,7 +169,7 @@ class TorrentManager(component.Component):
self.is_saving_state = False
self.save_resume_data_file_lock = defer.DeferredLock()
self.torrents_loading = {}
- self.prefetching_metadata = {}
+ self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved.
@@ -243,8 +254,8 @@ class TorrentManager(component.Component):
self.save_resume_data_timer.start(190, False)
self.prev_status_cleanup_loop.start(10)
- @defer.inlineCallbacks
- def stop(self):
+ @maybe_coroutine
+ async def stop(self):
# Stop timers
if self.save_state_timer.running:
self.save_state_timer.stop()
@@ -256,11 +267,11 @@ class TorrentManager(component.Component):
self.prev_status_cleanup_loop.stop()
# Save state on shutdown
- yield self.save_state()
+ await self.save_state()
self.session.pause()
- result = yield self.save_resume_data(flush_disk_cache=True)
+ result = await self.save_resume_data(flush_disk_cache=True)
# Remove the temp_file to signify successfully saved state
if result and os.path.isfile(self.temp_file):
os.remove(self.temp_file)
@@ -274,11 +285,6 @@ class TorrentManager(component.Component):
'Paused',
'Queued',
):
- # If the global setting is set, but the per-torrent isn't...
- # Just skip to the next torrent.
- # This is so that a user can turn-off the stop at ratio option on a per-torrent basis
- if not torrent.options['stop_at_ratio']:
- continue
if (
torrent.get_ratio() >= torrent.options['stop_ratio']
and torrent.is_finished
@@ -286,7 +292,7 @@ class TorrentManager(component.Component):
if torrent.options['remove_at_ratio']:
self.remove(torrent_id)
break
- if not torrent.handle.status().paused:
+ if not torrent.status.paused:
torrent.pause()
def __getitem__(self, torrent_id):
@@ -339,26 +345,28 @@ class TorrentManager(component.Component):
else:
return torrent_info
- def prefetch_metadata(self, magnet, timeout):
- """Download the metadata for a magnet uri.
+ @maybe_coroutine
+ async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
+ """Download the metadata for a magnet URI.
Args:
- magnet (str): A magnet uri to download the metadata for.
- timeout (int): Number of seconds to wait before cancelling.
+ magnet: A magnet URI to download the metadata for.
+ timeout: Number of seconds to wait before canceling.
Returns:
- Deferred: A tuple of (torrent_id (str), metadata (dict))
+ A tuple of (torrent_id, metadata)
"""
torrent_id = get_magnet_info(magnet)['info_hash']
if torrent_id in self.prefetching_metadata:
- return self.prefetching_metadata[torrent_id].defer
+ d = Deferred()
+ self.prefetching_metadata[torrent_id].result_queue.append(d)
+ return await d
- add_torrent_params = {}
- add_torrent_params['save_path'] = gettempdir()
- add_torrent_params['url'] = magnet.strip().encode('utf8')
- add_torrent_params['flags'] = (
+ add_torrent_params = lt.parse_magnet_uri(magnet)
+ add_torrent_params.save_path = gettempdir()
+ add_torrent_params.flags = (
(
LT_DEFAULT_ADD_TORRENT_FLAGS
| lt.add_torrent_params_flags_t.flag_duplicate_is_error
@@ -372,33 +380,29 @@ class TorrentManager(component.Component):
d = Deferred()
# Cancel the defer if timeout reached.
- defer_timeout = self.callLater(timeout, d.cancel)
- d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout)
- Prefetch = namedtuple('Prefetch', 'defer handle')
- self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle)
- return d
+ d.addTimeout(timeout, self.clock)
+ self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
- def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout):
- # Cancel reactor.callLater.
try:
- defer_timeout.cancel()
- except error.AlreadyCalled:
- pass
-
- log.debug('remove prefetch magnet from session')
- try:
- torrent_handle = self.prefetching_metadata.pop(torrent_id).handle
- except KeyError:
- pass
+ torrent_info = await d
+ except (defer.TimeoutError, defer.CancelledError):
+ log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
+ metadata = b''
else:
- self.session.remove_torrent(torrent_handle, 1)
-
- metadata = None
- if isinstance(torrent_info, lt.torrent_info):
log.debug('prefetch metadata received')
- metadata = lt.bdecode(torrent_info.metadata())
+ if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
+ metadata = torrent_info.metadata()
+ else:
+ metadata = torrent_info.info_section()
+
+ log.debug('remove prefetch magnet from session')
+ result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
+ self.session.remove_torrent(torrent_handle, 1)
+ result = torrent_id, b64encode(metadata)
- return torrent_id, metadata
+ for d in result_queue:
+ d.callback(result)
+ return result
def _build_torrent_options(self, options):
"""Load default options and update if needed."""
@@ -431,9 +435,10 @@ class TorrentManager(component.Component):
elif magnet:
magnet_info = get_magnet_info(magnet)
if magnet_info:
- add_torrent_params['url'] = magnet.strip().encode('utf8')
add_torrent_params['name'] = magnet_info['name']
+ add_torrent_params['trackers'] = list(magnet_info['trackers'])
torrent_id = magnet_info['info_hash']
+ add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id))
else:
raise AddTorrentError(
'Unable to add magnet, invalid magnet info: %s' % magnet
@@ -448,7 +453,7 @@ class TorrentManager(component.Component):
raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
elif torrent_id in self.prefetching_metadata:
# Cancel and remove metadata fetching torrent.
- self.prefetching_metadata[torrent_id].defer.cancel()
+ self.prefetching_metadata[torrent_id].alert_deferred.cancel()
# Check for renamed files and if so, rename them in the torrent_info before adding.
if options['mapped_files'] and torrent_info:
@@ -509,7 +514,7 @@ class TorrentManager(component.Component):
save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
filedump (str, optional): bencoded filedump of a torrent file.
filename (str, optional): The filename of the torrent file.
- magnet (str, optional): The magnet uri.
+ magnet (str, optional): The magnet URI.
resume_data (lt.entry, optional): libtorrent fast resume data.
Returns:
@@ -574,7 +579,7 @@ class TorrentManager(component.Component):
save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
filedump (str, optional): bencoded filedump of a torrent file.
filename (str, optional): The filename of the torrent file.
- magnet (str, optional): The magnet uri.
+ magnet (str, optional): The magnet URI.
resume_data (lt.entry, optional): libtorrent fast resume data.
Returns:
@@ -642,7 +647,7 @@ class TorrentManager(component.Component):
# Resume AlertManager if paused for adding torrent to libtorrent.
component.resume('AlertManager')
- # Store the orignal resume_data, in case of errors.
+ # Store the original resume_data, in case of errors.
if resume_data:
self.resume_data[torrent.torrent_id] = resume_data
@@ -809,9 +814,9 @@ class TorrentManager(component.Component):
try:
with open(filepath, 'rb') as _file:
- state = pickle.load(_file)
- except (IOError, EOFError, pickle.UnpicklingError) as ex:
- message = 'Unable to load {}: {}'.format(filepath, ex)
+ state = pickle.load(_file, encoding='utf8')
+ except (OSError, EOFError, pickle.UnpicklingError) as ex:
+ message = f'Unable to load {filepath}: {ex}'
log.error(message)
if not filepath.endswith('.bak'):
self.archive_state(message)
@@ -1022,7 +1027,7 @@ class TorrentManager(component.Component):
)
def on_torrent_resume_save(dummy_result, torrent_id):
- """Recieved torrent resume_data alert so remove from waiting list"""
+ """Received torrent resume_data alert so remove from waiting list"""
self.waiting_on_resume_data.pop(torrent_id, None)
deferreds = []
@@ -1067,7 +1072,7 @@ class TorrentManager(component.Component):
try:
with open(_filepath, 'rb') as _file:
resume_data = lt.bdecode(_file.read())
- except (IOError, EOFError, RuntimeError) as ex:
+ except (OSError, EOFError, RuntimeError) as ex:
if self.torrents:
log.warning('Unable to load %s: %s', _filepath, ex)
resume_data = None
@@ -1240,7 +1245,7 @@ class TorrentManager(component.Component):
def on_alert_add_torrent(self, alert):
"""Alert handler for libtorrent add_torrent_alert"""
if not alert.handle.is_valid():
- log.warning('Torrent handle is invalid!')
+ log.warning('Torrent handle is invalid: %s', alert.error.message())
return
try:
@@ -1351,10 +1356,8 @@ class TorrentManager(component.Component):
torrent.set_tracker_status('Announce OK')
# Check for peer information from the tracker, if none then send a scrape request.
- if (
- alert.handle.status().num_complete == -1
- or alert.handle.status().num_incomplete == -1
- ):
+ torrent.get_lt_status()
+ if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1:
torrent.scrape_tracker()
def on_alert_tracker_announce(self, alert):
@@ -1389,7 +1392,18 @@ class TorrentManager(component.Component):
log.debug(
'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message
)
- torrent.set_tracker_status('Error: ' + error_message)
+ # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates
+ # we will need to verify that at least one endpoint to the errored tracker is working
+ for tracker in torrent.handle.trackers():
+ if tracker['url'] == alert.url:
+ if any(
+ endpoint['last_error']['value'] == 0
+ for endpoint in tracker['endpoints']
+ ):
+ torrent.set_tracker_status('Announce OK')
+ else:
+ torrent.set_tracker_status('Error: ' + error_message)
+ break
def on_alert_storage_moved(self, alert):
"""Alert handler for libtorrent storage_moved_alert"""
@@ -1463,7 +1477,9 @@ class TorrentManager(component.Component):
return
if torrent_id in self.torrents:
# libtorrent add_torrent expects bencoded resume_data.
- self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
+ self.resume_data[torrent_id] = lt.bencode(
+ lt.write_resume_data(alert.params)
+ )
if torrent_id in self.waiting_on_resume_data:
self.waiting_on_resume_data[torrent_id].callback(None)
@@ -1545,7 +1561,7 @@ class TorrentManager(component.Component):
# Try callback to prefetch_metadata method.
try:
- d = self.prefetching_metadata[torrent_id].defer
+ d = self.prefetching_metadata[torrent_id].alert_deferred
except KeyError:
pass
else:
@@ -1591,23 +1607,14 @@ class TorrentManager(component.Component):
except RuntimeError:
continue
if torrent_id in self.torrents:
- self.torrents[torrent_id].update_status(t_status)
+ self.torrents[torrent_id].status = t_status
self.handle_torrents_status_callback(self.torrents_status_requests.pop())
def on_alert_external_ip(self, alert):
- """Alert handler for libtorrent external_ip_alert
-
- Note:
- The alert.message IPv4 address format is:
- 'external IP received: 0.0.0.0'
- and IPv6 address format is:
- 'external IP received: 0:0:0:0:0:0:0:0'
- """
-
- external_ip = decode_bytes(alert.message()).split(' ')[-1]
- log.info('on_alert_external_ip: %s', external_ip)
- component.get('EventManager').emit(ExternalIPEvent(external_ip))
+ """Alert handler for libtorrent external_ip_alert"""
+ log.info('on_alert_external_ip: %s', alert.external_address)
+ component.get('EventManager').emit(ExternalIPEvent(alert.external_address))
def on_alert_performance(self, alert):
"""Alert handler for libtorrent performance_alert"""