diff options
Diffstat (limited to 'deluge/core/torrentmanager.py')
-rw-r--r-- | deluge/core/torrentmanager.py | 175 |
1 files changed, 91 insertions, 84 deletions
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index a7df501..5609df4 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> # @@ -8,27 +7,33 @@ # """TorrentManager handles Torrent objects""" -from __future__ import unicode_literals - import datetime import logging import operator import os +import pickle import time -from collections import namedtuple +from base64 import b64encode from tempfile import gettempdir +from typing import Dict, List, NamedTuple, Tuple -import six.moves.cPickle as pickle # noqa: N813 -from twisted.internet import defer, error, reactor, threads +from twisted.internet import defer, reactor, threads from twisted.internet.defer import Deferred, DeferredList from twisted.internet.task import LoopingCall import deluge.component as component -from deluge._libtorrent import lt -from deluge.common import archive_files, decode_bytes, get_magnet_info, is_magnet +from deluge._libtorrent import LT_VERSION, lt +from deluge.common import ( + VersionSplit, + archive_files, + decode_bytes, + get_magnet_info, + is_magnet, +) from deluge.configmanager import ConfigManager, get_config_dir from deluge.core.authmanager import AUTH_LEVEL_ADMIN from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath +from deluge.decorators import maybe_coroutine from deluge.error import AddTorrentError, InvalidTorrentError from deluge.event import ( ExternalIPEvent, @@ -52,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = ( ) +class PrefetchQueueItem(NamedTuple): + alert_deferred: Deferred + result_queue: List[Deferred] + + class TorrentState: # pylint: disable=old-style-class """Create a torrent state. @@ -89,7 +99,7 @@ class TorrentState: # pylint: disable=old-style-class super_seeding=False, name=None, ): - # Build the class atrribute list from args + # Build the class attribute list from args for key, value in locals().items(): if key == 'self': continue @@ -129,7 +139,8 @@ class TorrentManager(component.Component): """ - callLater = reactor.callLater # noqa: N815 + # This is used in the test to mock out timeouts + clock = reactor def __init__(self): component.Component.__init__( @@ -158,7 +169,7 @@ class TorrentManager(component.Component): self.is_saving_state = False self.save_resume_data_file_lock = defer.DeferredLock() self.torrents_loading = {} - self.prefetching_metadata = {} + self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {} # This is a map of torrent_ids to Deferreds used to track needed resume data. # The Deferreds will be completed when resume data has been saved. @@ -243,8 +254,8 @@ class TorrentManager(component.Component): self.save_resume_data_timer.start(190, False) self.prev_status_cleanup_loop.start(10) - @defer.inlineCallbacks - def stop(self): + @maybe_coroutine + async def stop(self): # Stop timers if self.save_state_timer.running: self.save_state_timer.stop() @@ -256,11 +267,11 @@ class TorrentManager(component.Component): self.prev_status_cleanup_loop.stop() # Save state on shutdown - yield self.save_state() + await self.save_state() self.session.pause() - result = yield self.save_resume_data(flush_disk_cache=True) + result = await self.save_resume_data(flush_disk_cache=True) # Remove the temp_file to signify successfully saved state if result and os.path.isfile(self.temp_file): os.remove(self.temp_file) @@ -274,11 +285,6 @@ class TorrentManager(component.Component): 'Paused', 'Queued', ): - # If the global setting is set, but the per-torrent isn't... - # Just skip to the next torrent. - # This is so that a user can turn-off the stop at ratio option on a per-torrent basis - if not torrent.options['stop_at_ratio']: - continue if ( torrent.get_ratio() >= torrent.options['stop_ratio'] and torrent.is_finished @@ -286,7 +292,7 @@ class TorrentManager(component.Component): if torrent.options['remove_at_ratio']: self.remove(torrent_id) break - if not torrent.handle.status().paused: + if not torrent.status.paused: torrent.pause() def __getitem__(self, torrent_id): @@ -339,26 +345,28 @@ class TorrentManager(component.Component): else: return torrent_info - def prefetch_metadata(self, magnet, timeout): - """Download the metadata for a magnet uri. + @maybe_coroutine + async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]: + """Download the metadata for a magnet URI. Args: - magnet (str): A magnet uri to download the metadata for. - timeout (int): Number of seconds to wait before cancelling. + magnet: A magnet URI to download the metadata for. + timeout: Number of seconds to wait before canceling. Returns: - Deferred: A tuple of (torrent_id (str), metadata (dict)) + A tuple of (torrent_id, metadata) """ torrent_id = get_magnet_info(magnet)['info_hash'] if torrent_id in self.prefetching_metadata: - return self.prefetching_metadata[torrent_id].defer + d = Deferred() + self.prefetching_metadata[torrent_id].result_queue.append(d) + return await d - add_torrent_params = {} - add_torrent_params['save_path'] = gettempdir() - add_torrent_params['url'] = magnet.strip().encode('utf8') - add_torrent_params['flags'] = ( + add_torrent_params = lt.parse_magnet_uri(magnet) + add_torrent_params.save_path = gettempdir() + add_torrent_params.flags = ( ( LT_DEFAULT_ADD_TORRENT_FLAGS | lt.add_torrent_params_flags_t.flag_duplicate_is_error @@ -372,33 +380,29 @@ class TorrentManager(component.Component): d = Deferred() # Cancel the defer if timeout reached. - defer_timeout = self.callLater(timeout, d.cancel) - d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout) - Prefetch = namedtuple('Prefetch', 'defer handle') - self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle) - return d + d.addTimeout(timeout, self.clock) + self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, []) - def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout): - # Cancel reactor.callLater. try: - defer_timeout.cancel() - except error.AlreadyCalled: - pass - - log.debug('remove prefetch magnet from session') - try: - torrent_handle = self.prefetching_metadata.pop(torrent_id).handle - except KeyError: - pass + torrent_info = await d + except (defer.TimeoutError, defer.CancelledError): + log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.') + metadata = b'' else: - self.session.remove_torrent(torrent_handle, 1) - - metadata = None - if isinstance(torrent_info, lt.torrent_info): log.debug('prefetch metadata received') - metadata = lt.bdecode(torrent_info.metadata()) + if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'): + metadata = torrent_info.metadata() + else: + metadata = torrent_info.info_section() + + log.debug('remove prefetch magnet from session') + result_queue = self.prefetching_metadata.pop(torrent_id).result_queue + self.session.remove_torrent(torrent_handle, 1) + result = torrent_id, b64encode(metadata) - return torrent_id, metadata + for d in result_queue: + d.callback(result) + return result def _build_torrent_options(self, options): """Load default options and update if needed.""" @@ -431,9 +435,10 @@ class TorrentManager(component.Component): elif magnet: magnet_info = get_magnet_info(magnet) if magnet_info: - add_torrent_params['url'] = magnet.strip().encode('utf8') add_torrent_params['name'] = magnet_info['name'] + add_torrent_params['trackers'] = list(magnet_info['trackers']) torrent_id = magnet_info['info_hash'] + add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id)) else: raise AddTorrentError( 'Unable to add magnet, invalid magnet info: %s' % magnet @@ -448,7 +453,7 @@ class TorrentManager(component.Component): raise AddTorrentError('Torrent already being added (%s).' % torrent_id) elif torrent_id in self.prefetching_metadata: # Cancel and remove metadata fetching torrent. - self.prefetching_metadata[torrent_id].defer.cancel() + self.prefetching_metadata[torrent_id].alert_deferred.cancel() # Check for renamed files and if so, rename them in the torrent_info before adding. if options['mapped_files'] and torrent_info: @@ -509,7 +514,7 @@ class TorrentManager(component.Component): save_state (bool, optional): If True save the session state after adding torrent, defaults to True. filedump (str, optional): bencoded filedump of a torrent file. filename (str, optional): The filename of the torrent file. - magnet (str, optional): The magnet uri. + magnet (str, optional): The magnet URI. resume_data (lt.entry, optional): libtorrent fast resume data. Returns: @@ -574,7 +579,7 @@ class TorrentManager(component.Component): save_state (bool, optional): If True save the session state after adding torrent, defaults to True. filedump (str, optional): bencoded filedump of a torrent file. filename (str, optional): The filename of the torrent file. - magnet (str, optional): The magnet uri. + magnet (str, optional): The magnet URI. resume_data (lt.entry, optional): libtorrent fast resume data. Returns: @@ -642,7 +647,7 @@ class TorrentManager(component.Component): # Resume AlertManager if paused for adding torrent to libtorrent. component.resume('AlertManager') - # Store the orignal resume_data, in case of errors. + # Store the original resume_data, in case of errors. if resume_data: self.resume_data[torrent.torrent_id] = resume_data @@ -809,9 +814,9 @@ class TorrentManager(component.Component): try: with open(filepath, 'rb') as _file: - state = pickle.load(_file) - except (IOError, EOFError, pickle.UnpicklingError) as ex: - message = 'Unable to load {}: {}'.format(filepath, ex) + state = pickle.load(_file, encoding='utf8') + except (OSError, EOFError, pickle.UnpicklingError) as ex: + message = f'Unable to load {filepath}: {ex}' log.error(message) if not filepath.endswith('.bak'): self.archive_state(message) @@ -1022,7 +1027,7 @@ class TorrentManager(component.Component): ) def on_torrent_resume_save(dummy_result, torrent_id): - """Recieved torrent resume_data alert so remove from waiting list""" + """Received torrent resume_data alert so remove from waiting list""" self.waiting_on_resume_data.pop(torrent_id, None) deferreds = [] @@ -1067,7 +1072,7 @@ class TorrentManager(component.Component): try: with open(_filepath, 'rb') as _file: resume_data = lt.bdecode(_file.read()) - except (IOError, EOFError, RuntimeError) as ex: + except (OSError, EOFError, RuntimeError) as ex: if self.torrents: log.warning('Unable to load %s: %s', _filepath, ex) resume_data = None @@ -1240,7 +1245,7 @@ class TorrentManager(component.Component): def on_alert_add_torrent(self, alert): """Alert handler for libtorrent add_torrent_alert""" if not alert.handle.is_valid(): - log.warning('Torrent handle is invalid!') + log.warning('Torrent handle is invalid: %s', alert.error.message()) return try: @@ -1351,10 +1356,8 @@ class TorrentManager(component.Component): torrent.set_tracker_status('Announce OK') # Check for peer information from the tracker, if none then send a scrape request. - if ( - alert.handle.status().num_complete == -1 - or alert.handle.status().num_incomplete == -1 - ): + torrent.get_lt_status() + if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1: torrent.scrape_tracker() def on_alert_tracker_announce(self, alert): @@ -1389,7 +1392,18 @@ class TorrentManager(component.Component): log.debug( 'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message ) - torrent.set_tracker_status('Error: ' + error_message) + # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates + # we will need to verify that at least one endpoint to the errored tracker is working + for tracker in torrent.handle.trackers(): + if tracker['url'] == alert.url: + if any( + endpoint['last_error']['value'] == 0 + for endpoint in tracker['endpoints'] + ): + torrent.set_tracker_status('Announce OK') + else: + torrent.set_tracker_status('Error: ' + error_message) + break def on_alert_storage_moved(self, alert): """Alert handler for libtorrent storage_moved_alert""" @@ -1463,7 +1477,9 @@ class TorrentManager(component.Component): return if torrent_id in self.torrents: # libtorrent add_torrent expects bencoded resume_data. - self.resume_data[torrent_id] = lt.bencode(alert.resume_data) + self.resume_data[torrent_id] = lt.bencode( + lt.write_resume_data(alert.params) + ) if torrent_id in self.waiting_on_resume_data: self.waiting_on_resume_data[torrent_id].callback(None) @@ -1545,7 +1561,7 @@ class TorrentManager(component.Component): # Try callback to prefetch_metadata method. try: - d = self.prefetching_metadata[torrent_id].defer + d = self.prefetching_metadata[torrent_id].alert_deferred except KeyError: pass else: @@ -1591,23 +1607,14 @@ class TorrentManager(component.Component): except RuntimeError: continue if torrent_id in self.torrents: - self.torrents[torrent_id].update_status(t_status) + self.torrents[torrent_id].status = t_status self.handle_torrents_status_callback(self.torrents_status_requests.pop()) def on_alert_external_ip(self, alert): - """Alert handler for libtorrent external_ip_alert - - Note: - The alert.message IPv4 address format is: - 'external IP received: 0.0.0.0' - and IPv6 address format is: - 'external IP received: 0:0:0:0:0:0:0:0' - """ - - external_ip = decode_bytes(alert.message()).split(' ')[-1] - log.info('on_alert_external_ip: %s', external_ip) - component.get('EventManager').emit(ExternalIPEvent(external_ip)) + """Alert handler for libtorrent external_ip_alert""" + log.info('on_alert_external_ip: %s', alert.external_address) + component.get('EventManager').emit(ExternalIPEvent(alert.external_address)) def on_alert_performance(self, alert): """Alert handler for libtorrent performance_alert""" |