summaryrefslogtreecommitdiffstats
path: root/deluge/core/torrentmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/core/torrentmanager.py')
-rw-r--r--deluge/core/torrentmanager.py1700
1 files changed, 1700 insertions, 0 deletions
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py
new file mode 100644
index 0000000..c43a7a2
--- /dev/null
+++ b/deluge/core/torrentmanager.py
@@ -0,0 +1,1700 @@
+#
+# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+"""TorrentManager handles Torrent objects"""
+import datetime
+import logging
+import operator
+import os
+import pickle
+import time
+from base64 import b64encode
+from tempfile import gettempdir
+from typing import Dict, List, NamedTuple, Tuple
+
+from twisted.internet import defer, reactor, threads
+from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.task import LoopingCall
+
+import deluge.component as component
+from deluge._libtorrent import LT_VERSION, lt
+from deluge.common import (
+ VersionSplit,
+ archive_files,
+ decode_bytes,
+ get_magnet_info,
+ is_magnet,
+)
+from deluge.configmanager import ConfigManager, get_config_dir
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN
+from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath
+from deluge.decorators import maybe_coroutine
+from deluge.error import AddTorrentError, InvalidTorrentError
+from deluge.event import (
+ ExternalIPEvent,
+ PreTorrentRemovedEvent,
+ SessionStartedEvent,
+ TorrentAddedEvent,
+ TorrentFileCompletedEvent,
+ TorrentFileRenamedEvent,
+ TorrentFinishedEvent,
+ TorrentRemovedEvent,
+ TorrentResumedEvent,
+)
+
+log = logging.getLogger(__name__)
+
+LT_DEFAULT_ADD_TORRENT_FLAGS = (
+ lt.torrent_flags.paused
+ | lt.torrent_flags.auto_managed
+ | lt.torrent_flags.update_subscribe
+ | lt.torrent_flags.apply_ip_filter
+)
+
+
+class PrefetchQueueItem(NamedTuple):
+ alert_deferred: Deferred
+ result_queue: List[Deferred]
+
+
+class TorrentState: # pylint: disable=old-style-class
+ """Create a torrent state.
+
+ Note:
+ This must be old style class to avoid breaking torrent.state file.
+
+ """
+
+ def __init__(
+ self,
+ torrent_id=None,
+ filename=None,
+ trackers=None,
+ storage_mode='sparse',
+ paused=False,
+ save_path=None,
+ max_connections=-1,
+ max_upload_slots=-1,
+ max_upload_speed=-1.0,
+ max_download_speed=-1.0,
+ prioritize_first_last=False,
+ sequential_download=False,
+ file_priorities=None,
+ queue=None,
+ auto_managed=True,
+ is_finished=False,
+ stop_ratio=2.00,
+ stop_at_ratio=False,
+ remove_at_ratio=False,
+ move_completed=False,
+ move_completed_path=None,
+ magnet=None,
+ owner=None,
+ shared=False,
+ super_seeding=False,
+ name=None,
+ ):
+ # Build the class attribute list from args
+ for key, value in locals().items():
+ if key == 'self':
+ continue
+ setattr(self, key, value)
+
+ def __eq__(self, other):
+ return isinstance(other, TorrentState) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not self == other
+
+
+class TorrentManagerState: # pylint: disable=old-style-class
+ """TorrentManagerState holds a list of TorrentState objects.
+
+ Note:
+ This must be old style class to avoid breaking torrent.state file.
+
+ """
+
+ def __init__(self):
+ self.torrents = []
+
+ def __eq__(self, other):
+ return (
+ isinstance(other, TorrentManagerState) and self.torrents == other.torrents
+ )
+
+ def __ne__(self, other):
+ return not self == other
+
+
+class TorrentManager(component.Component):
+ """TorrentManager contains a list of torrents in the current libtorrent session.
+
+ This object is also responsible for saving the state of the session for use on restart.
+
+ """
+
+ # This is used in the test to mock out timeouts
+ clock = reactor
+
+ def __init__(self):
+ component.Component.__init__(
+ self,
+ 'TorrentManager',
+ interval=5,
+ depend=['CorePluginManager', 'AlertManager'],
+ )
+ log.debug('TorrentManager init...')
+ # Set the libtorrent session
+ self.session = component.get('Core').session
+ # Set the alertmanager
+ self.alerts = component.get('AlertManager')
+ # Get the core config
+ self.config = ConfigManager('core.conf')
+
+ # Make sure the state folder has been created
+ self.state_dir = os.path.join(get_config_dir(), 'state')
+ if not os.path.exists(self.state_dir):
+ os.makedirs(self.state_dir)
+ self.temp_file = os.path.join(self.state_dir, '.safe_state_check')
+
+ # Create the torrents dict { torrent_id: Torrent }
+ self.torrents = {}
+ self.queued_torrents = set()
+ self.is_saving_state = False
+ self.save_resume_data_file_lock = defer.DeferredLock()
+ self.torrents_loading = {}
+ self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
+
+ # This is a map of torrent_ids to Deferreds used to track needed resume data.
+ # The Deferreds will be completed when resume data has been saved.
+ self.waiting_on_resume_data = {}
+
+ # Keep track of torrents finished but moving storage
+ self.waiting_on_finish_moving = []
+
+ # Keeps track of resume data
+ self.resume_data = {}
+
+ self.torrents_status_requests = []
+ self.status_dict = {}
+ self.last_state_update_alert_ts = 0
+
+ # Keep the previous saved state
+ self.prev_saved_state = None
+
+ # Register set functions
+ set_config_keys = [
+ 'max_connections_per_torrent',
+ 'max_upload_slots_per_torrent',
+ 'max_upload_speed_per_torrent',
+ 'max_download_speed_per_torrent',
+ ]
+
+ for config_key in set_config_keys:
+ on_set_func = getattr(self, ''.join(['on_set_', config_key]))
+ self.config.register_set_function(config_key, on_set_func)
+
+ # Register alert functions
+ alert_handles = [
+ 'external_ip',
+ 'performance',
+ 'add_torrent',
+ 'metadata_received',
+ 'torrent_finished',
+ 'torrent_paused',
+ 'torrent_checked',
+ 'torrent_resumed',
+ 'tracker_reply',
+ 'tracker_announce',
+ 'tracker_warning',
+ 'tracker_error',
+ 'file_renamed',
+ 'file_error',
+ 'file_completed',
+ 'storage_moved',
+ 'storage_moved_failed',
+ 'state_update',
+ 'state_changed',
+ 'save_resume_data',
+ 'save_resume_data_failed',
+ 'fastresume_rejected',
+ ]
+
+ for alert_handle in alert_handles:
+ on_alert_func = getattr(self, ''.join(['on_alert_', alert_handle]))
+ self.alerts.register_handler(alert_handle, on_alert_func)
+
+ # Define timers
+ self.save_state_timer = LoopingCall(self.save_state)
+ self.save_resume_data_timer = LoopingCall(self.save_resume_data)
+ self.prev_status_cleanup_loop = LoopingCall(self.cleanup_torrents_prev_status)
+
+ def start(self):
+ # Check for old temp file to verify safe shutdown
+ if os.path.isfile(self.temp_file):
+ self.archive_state('Bad shutdown detected so archiving state files')
+ os.remove(self.temp_file)
+
+ with open(self.temp_file, 'a'):
+ os.utime(self.temp_file, None)
+
+ # Try to load the state from file
+ self.load_state()
+
+ # Save the state periodically
+ self.save_state_timer.start(200, False)
+ self.save_resume_data_timer.start(190, False)
+ self.prev_status_cleanup_loop.start(10)
+
+ @maybe_coroutine
+ async def stop(self):
+ # Stop timers
+ if self.save_state_timer.running:
+ self.save_state_timer.stop()
+
+ if self.save_resume_data_timer.running:
+ self.save_resume_data_timer.stop()
+
+ if self.prev_status_cleanup_loop.running:
+ self.prev_status_cleanup_loop.stop()
+
+ # Save state on shutdown
+ await self.save_state()
+
+ self.session.pause()
+
+ result = await self.save_resume_data(flush_disk_cache=True)
+ # Remove the temp_file to signify successfully saved state
+ if result and os.path.isfile(self.temp_file):
+ os.remove(self.temp_file)
+
+ def update(self):
+ for torrent_id, torrent in self.torrents.items():
+ # XXX: Should the state check be those that _can_ be stopped at ratio
+ if torrent.options['stop_at_ratio'] and torrent.state not in (
+ 'Checking',
+ 'Allocating',
+ 'Paused',
+ 'Queued',
+ ):
+ if (
+ torrent.get_ratio() >= torrent.options['stop_ratio']
+ and torrent.is_finished
+ ):
+ if torrent.options['remove_at_ratio']:
+ self.remove(torrent_id)
+ break
+
+ torrent.pause()
+
+ def __getitem__(self, torrent_id):
+ """Return the Torrent with torrent_id.
+
+ Args:
+ torrent_id (str): The torrent_id.
+
+ Returns:
+ Torrent: A torrent object.
+
+ """
+ return self.torrents[torrent_id]
+
+ def get_torrent_list(self):
+ """Creates a list of torrent_ids, owned by current user and any marked shared.
+
+ Returns:
+ list: A list of torrent_ids.
+
+ """
+ torrent_ids = list(self.torrents)
+ if component.get('RPCServer').get_session_auth_level() == AUTH_LEVEL_ADMIN:
+ return torrent_ids
+
+ current_user = component.get('RPCServer').get_session_user()
+ for torrent_id in torrent_ids[:]:
+ torrent_status = self.torrents[torrent_id].get_status(['owner', 'shared'])
+ if torrent_status['owner'] != current_user and not torrent_status['shared']:
+ torrent_ids.pop(torrent_ids.index(torrent_id))
+ return torrent_ids
+
+ def get_torrent_info_from_file(self, filepath):
+ """Retrieves torrent_info from the file specified.
+
+ Args:
+ filepath (str): The filepath to extract torrent info from.
+
+ Returns:
+ lt.torrent_info: A libtorrent torrent_info dict or None if invalid file or data.
+
+ """
+ # Get the torrent data from the torrent file
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Attempting to extract torrent_info from %s', filepath)
+ try:
+ torrent_info = lt.torrent_info(filepath)
+ except RuntimeError as ex:
+ log.warning('Unable to open torrent file %s: %s', filepath, ex)
+ else:
+ return torrent_info
+
+ @maybe_coroutine
+ async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
+ """Download the metadata for a magnet URI.
+
+ Args:
+ magnet: A magnet URI to download the metadata for.
+ timeout: Number of seconds to wait before canceling.
+
+ Returns:
+ A tuple of (torrent_id, metadata)
+
+ """
+
+ torrent_id = get_magnet_info(magnet)['info_hash']
+ if torrent_id in self.prefetching_metadata:
+ d = Deferred()
+ self.prefetching_metadata[torrent_id].result_queue.append(d)
+ return await d
+
+ add_torrent_params = lt.parse_magnet_uri(magnet)
+ add_torrent_params.save_path = gettempdir()
+ add_torrent_params.flags = (
+ (
+ LT_DEFAULT_ADD_TORRENT_FLAGS
+ | lt.torrent_flags.duplicate_is_error
+ | lt.torrent_flags.upload_mode
+ )
+ ^ lt.torrent_flags.auto_managed
+ ^ lt.torrent_flags.paused
+ )
+
+ torrent_handle = self.session.add_torrent(add_torrent_params)
+
+ d = Deferred()
+ # Cancel the defer if timeout reached.
+ d.addTimeout(timeout, self.clock)
+ self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
+
+ try:
+ torrent_info = await d
+ except (defer.TimeoutError, defer.CancelledError):
+ log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
+ metadata = b''
+ else:
+ log.debug('prefetch metadata received')
+ if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
+ metadata = torrent_info.metadata()
+ else:
+ metadata = torrent_info.info_section()
+
+ log.debug('remove prefetch magnet from session')
+ result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
+ self.session.remove_torrent(torrent_handle, 1)
+ result = torrent_id, b64encode(metadata)
+
+ for d in result_queue:
+ d.callback(result)
+ return result
+
+ def _build_torrent_options(self, options):
+ """Load default options and update if needed."""
+ _options = TorrentOptions()
+ if options:
+ _options.update(options)
+ options = _options
+
+ if not options['owner']:
+ options['owner'] = component.get('RPCServer').get_session_user()
+ if not component.get('AuthManager').has_account(options['owner']):
+ options['owner'] = 'localclient'
+
+ return options
+
+ def _build_torrent_params(
+ self, torrent_info=None, magnet=None, options=None, resume_data=None
+ ):
+ """Create the add_torrent_params dict for adding torrent to libtorrent."""
+ add_torrent_params = {}
+ if torrent_info:
+ add_torrent_params['ti'] = torrent_info
+ name = torrent_info.name()
+ if not name:
+ name = (
+ torrent_info.file_at(0).path.replace('\\', '/', 1).split('/', 1)[0]
+ )
+ add_torrent_params['name'] = name
+ torrent_id = str(torrent_info.info_hash())
+ elif magnet:
+ magnet_info = get_magnet_info(magnet)
+ if magnet_info:
+ add_torrent_params['name'] = magnet_info['name']
+ add_torrent_params['trackers'] = list(magnet_info['trackers'])
+ torrent_id = magnet_info['info_hash']
+ add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id))
+ else:
+ raise AddTorrentError(
+ 'Unable to add magnet, invalid magnet info: %s' % magnet
+ )
+
+ # Check for existing torrent in session.
+ if torrent_id in self.get_torrent_list():
+ # Attempt merge trackers before returning.
+ self.torrents[torrent_id].merge_trackers(torrent_info)
+ raise AddTorrentError('Torrent already in session (%s).' % torrent_id)
+ elif torrent_id in self.torrents_loading:
+ raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
+ elif torrent_id in self.prefetching_metadata:
+ # Cancel and remove metadata fetching torrent.
+ self.prefetching_metadata[torrent_id].alert_deferred.cancel()
+
+ # Check for renamed files and if so, rename them in the torrent_info before adding.
+ if options['mapped_files'] and torrent_info:
+ for index, fname in options['mapped_files'].items():
+ fname = sanitize_filepath(decode_bytes(fname))
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('renaming file index %s to %s', index, fname)
+ try:
+ torrent_info.rename_file(index, fname.encode('utf8'))
+ except TypeError:
+ torrent_info.rename_file(index, fname)
+ add_torrent_params['ti'] = torrent_info
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('options: %s', options)
+
+ # Fill in the rest of the add_torrent_params dictionary.
+ add_torrent_params['save_path'] = options['download_location'].encode('utf8')
+ if options['name']:
+ add_torrent_params['name'] = options['name']
+ if options['pre_allocate_storage']:
+ add_torrent_params['storage_mode'] = lt.storage_mode_t.storage_mode_allocate
+ if resume_data:
+ add_torrent_params['resume_data'] = resume_data
+
+ # Set flags: enable duplicate_is_error & override_resume_data, disable auto_managed.
+ add_torrent_params['flags'] = (
+ LT_DEFAULT_ADD_TORRENT_FLAGS | lt.torrent_flags.duplicate_is_error
+ ) ^ lt.torrent_flags.auto_managed
+ if options['seed_mode']:
+ add_torrent_params['flags'] |= lt.torrent_flags.seed_mode
+ if options['super_seeding']:
+ add_torrent_params['flags'] |= lt.torrent_flags.super_seeding
+
+ return torrent_id, add_torrent_params
+
+ def add(
+ self,
+ torrent_info=None,
+ state=None,
+ options=None,
+ save_state=True,
+ filedump=None,
+ filename=None,
+ magnet=None,
+ resume_data=None,
+ ):
+ """Adds a torrent to the torrent manager.
+
+ Args:
+ torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object.
+ state (TorrentState, optional): The torrent state.
+ options (dict, optional): The options to apply to the torrent on adding.
+ save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
+ filedump (str, optional): bencoded filedump of a torrent file.
+ filename (str, optional): The filename of the torrent file.
+ magnet (str, optional): The magnet URI.
+ resume_data (lt.entry, optional): libtorrent fast resume data.
+
+ Returns:
+ str: If successful the torrent_id of the added torrent, None if adding the torrent failed.
+
+ Emits:
+ TorrentAddedEvent: Torrent with torrent_id added to session.
+
+ """
+ if not torrent_info and not filedump and not magnet:
+ raise AddTorrentError(
+ 'You must specify a valid torrent_info, torrent state or magnet.'
+ )
+
+ if filedump:
+ try:
+ torrent_info = lt.torrent_info(lt.bdecode(filedump))
+ except RuntimeError as ex:
+ raise AddTorrentError(
+ 'Unable to add torrent, decoding filedump failed: %s' % ex
+ )
+
+ options = self._build_torrent_options(options)
+ __, add_torrent_params = self._build_torrent_params(
+ torrent_info, magnet, options, resume_data
+ )
+
+ # We need to pause the AlertManager momentarily to prevent alerts
+ # for this torrent being generated before a Torrent object is created.
+ component.pause('AlertManager')
+
+ try:
+ handle = self.session.add_torrent(add_torrent_params)
+ if not handle.is_valid():
+ raise InvalidTorrentError('Torrent handle is invalid!')
+ except (RuntimeError, InvalidTorrentError) as ex:
+ component.resume('AlertManager')
+ raise AddTorrentError('Unable to add torrent to session: %s' % ex)
+
+ torrent = self._add_torrent_obj(
+ handle, options, state, filename, magnet, resume_data, filedump, save_state
+ )
+ return torrent.torrent_id
+
+ def add_async(
+ self,
+ torrent_info=None,
+ state=None,
+ options=None,
+ save_state=True,
+ filedump=None,
+ filename=None,
+ magnet=None,
+ resume_data=None,
+ ):
+ """Adds a torrent to the torrent manager using libtorrent async add torrent method.
+
+ Args:
+ torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object.
+ state (TorrentState, optional): The torrent state.
+ options (dict, optional): The options to apply to the torrent on adding.
+ save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
+ filedump (str, optional): bencoded filedump of a torrent file.
+ filename (str, optional): The filename of the torrent file.
+ magnet (str, optional): The magnet URI.
+ resume_data (lt.entry, optional): libtorrent fast resume data.
+
+ Returns:
+ Deferred: If successful the torrent_id of the added torrent, None if adding the torrent failed.
+
+ Emits:
+ TorrentAddedEvent: Torrent with torrent_id added to session.
+
+ """
+ if not torrent_info and not filedump and not magnet:
+ raise AddTorrentError(
+ 'You must specify a valid torrent_info, torrent state or magnet.'
+ )
+
+ if filedump:
+ try:
+ torrent_info = lt.torrent_info(lt.bdecode(filedump))
+ except RuntimeError as ex:
+ raise AddTorrentError(
+ 'Unable to add torrent, decoding filedump failed: %s' % ex
+ )
+
+ options = self._build_torrent_options(options)
+ torrent_id, add_torrent_params = self._build_torrent_params(
+ torrent_info, magnet, options, resume_data
+ )
+
+ d = Deferred()
+ self.torrents_loading[torrent_id] = (
+ d,
+ options,
+ state,
+ filename,
+ magnet,
+ resume_data,
+ filedump,
+ save_state,
+ )
+ try:
+ self.session.async_add_torrent(add_torrent_params)
+ except RuntimeError as ex:
+ raise AddTorrentError('Unable to add torrent to session: %s' % ex)
+ return d
+
+ def _add_torrent_obj(
+ self,
+ handle,
+ options,
+ state,
+ filename,
+ magnet,
+ resume_data,
+ filedump,
+ save_state,
+ ):
+ # For magnets added with metadata, filename is used so set as magnet.
+ if not magnet and is_magnet(filename):
+ magnet = filename
+ filename = None
+
+ # Create a Torrent object and add to the dictionary.
+ torrent = Torrent(handle, options, state, filename, magnet)
+ self.torrents[torrent.torrent_id] = torrent
+
+ # Resume AlertManager if paused for adding torrent to libtorrent.
+ component.resume('AlertManager')
+
+ # Store the original resume_data, in case of errors.
+ if resume_data:
+ self.resume_data[torrent.torrent_id] = resume_data
+
+ # Add to queued torrents set.
+ self.queued_torrents.add(torrent.torrent_id)
+ if self.config['queue_new_to_top']:
+ self.queue_top(torrent.torrent_id)
+
+ # Resume the torrent if needed.
+ if not options['add_paused']:
+ torrent.resume()
+
+ # Emit torrent_added signal.
+ from_state = state is not None
+ component.get('EventManager').emit(
+ TorrentAddedEvent(torrent.torrent_id, from_state)
+ )
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Torrent added: %s', str(handle.info_hash()))
+ if log.isEnabledFor(logging.INFO):
+ name_and_owner = torrent.get_status(['name', 'owner'])
+ log.info(
+ 'Torrent %s from user "%s" %s',
+ name_and_owner['name'],
+ name_and_owner['owner'],
+ from_state and 'loaded' or 'added',
+ )
+
+ # Write the .torrent file to the state directory.
+ if filedump:
+ torrent.write_torrentfile(filedump)
+
+ # Save the session state.
+ if save_state:
+ self.save_state()
+
+ return torrent
+
+ def add_async_callback(
+ self,
+ handle,
+ d,
+ options,
+ state,
+ filename,
+ magnet,
+ resume_data,
+ filedump,
+ save_state,
+ ):
+ torrent = self._add_torrent_obj(
+ handle, options, state, filename, magnet, resume_data, filedump, save_state
+ )
+
+ d.callback(torrent.torrent_id)
+
+ def remove(self, torrent_id, remove_data=False, save_state=True):
+ """Remove a torrent from the session.
+
+ Args:
+ torrent_id (str): The torrent ID to remove.
+ remove_data (bool, optional): If True, remove the downloaded data, defaults to False.
+ save_state (bool, optional): If True, save the session state after removal, defaults to True.
+
+ Returns:
+ bool: True if removed successfully, False if not.
+
+ Emits:
+ PreTorrentRemovedEvent: Torrent is about to be removed from session.
+ TorrentRemovedEvent: Torrent with torrent_id removed from session.
+
+ Raises:
+ InvalidTorrentError: If the torrent_id is not in the session.
+
+ """
+ try:
+ torrent = self.torrents[torrent_id]
+ except KeyError:
+ raise InvalidTorrentError('torrent_id %s not in session.' % torrent_id)
+
+ torrent_name = torrent.get_status(['name'])['name']
+
+ # Emit the signal to the clients
+ component.get('EventManager').emit(PreTorrentRemovedEvent(torrent_id))
+
+ try:
+ self.session.remove_torrent(torrent.handle, 1 if remove_data else 0)
+ except RuntimeError as ex:
+ log.warning('Error removing torrent: %s', ex)
+ return False
+
+ # Remove fastresume data if it is exists
+ self.resume_data.pop(torrent_id, None)
+
+ # Remove the .torrent file in the state and copy location, if user requested.
+ delete_copies = (
+ self.config['copy_torrent_file'] and self.config['del_copy_torrent_file']
+ )
+ torrent.delete_torrentfile(delete_copies)
+
+ # Remove from set if it wasn't finished
+ if not torrent.is_finished:
+ try:
+ self.queued_torrents.remove(torrent_id)
+ except KeyError:
+ log.debug('%s is not in queued torrents set.', torrent_id)
+ raise InvalidTorrentError(
+ '%s is not in queued torrents set.' % torrent_id
+ )
+
+ # Remove the torrent from deluge's session
+ del self.torrents[torrent_id]
+
+ if save_state:
+ self.save_state()
+
+ # Emit the signal to the clients
+ component.get('EventManager').emit(TorrentRemovedEvent(torrent_id))
+ log.info(
+ 'Torrent %s removed by user: %s',
+ torrent_name,
+ component.get('RPCServer').get_session_user(),
+ )
+ return True
+
+ def fixup_state(self, state):
+ """Fixup an old state by adding missing TorrentState options and assigning default values.
+
+ Args:
+ state (TorrentManagerState): A torrentmanager state containing torrent details.
+
+ Returns:
+ TorrentManagerState: A fixedup TorrentManager state.
+
+ """
+ if state.torrents:
+ t_state_tmp = TorrentState()
+ if dir(state.torrents[0]) != dir(t_state_tmp):
+ self.archive_state('Migration of TorrentState required.')
+ try:
+ for attr in set(dir(t_state_tmp)) - set(dir(state.torrents[0])):
+ for t_state in state.torrents:
+ setattr(t_state, attr, getattr(t_state_tmp, attr, None))
+ except AttributeError as ex:
+ log.error(
+ 'Unable to update state file to a compatible version: %s', ex
+ )
+ return state
+
+ def open_state(self):
+ """Open the torrents.state file containing a TorrentManager state with session torrents.
+
+ Returns:
+ TorrentManagerState: The TorrentManager state.
+
+ """
+ torrents_state = os.path.join(self.state_dir, 'torrents.state')
+ state = None
+ for filepath in (torrents_state, torrents_state + '.bak'):
+ log.info('Loading torrent state: %s', filepath)
+ if not os.path.isfile(filepath):
+ continue
+
+ try:
+ with open(filepath, 'rb') as _file:
+ state = pickle.load(_file, encoding='utf8')
+ except (OSError, EOFError, pickle.UnpicklingError) as ex:
+ message = f'Unable to load {filepath}: {ex}'
+ log.error(message)
+ if not filepath.endswith('.bak'):
+ self.archive_state(message)
+ else:
+ log.info('Successfully loaded %s', filepath)
+ break
+
+ return state if state else TorrentManagerState()
+
+ def load_state(self):
+ """Load all the torrents from TorrentManager state into session.
+
+ Emits:
+ SessionStartedEvent: Emitted after all torrents are added to the session.
+
+ """
+ start = datetime.datetime.now()
+ state = self.open_state()
+ state = self.fixup_state(state)
+
+ # Reorder the state.torrents list to add torrents in the correct queue order.
+ state.torrents.sort(
+ key=operator.attrgetter('queue'), reverse=self.config['queue_new_to_top']
+ )
+ resume_data = self.load_resume_data_file()
+
+ deferreds = []
+ for t_state in state.torrents:
+ # Populate the options dict from state
+ options = TorrentOptions()
+ for option in options:
+ try:
+ options[option] = getattr(t_state, option)
+ except AttributeError:
+ pass
+ # Manually update unmatched attributes
+ options['download_location'] = t_state.save_path
+ options['pre_allocate_storage'] = t_state.storage_mode == 'allocate'
+ options['prioritize_first_last_pieces'] = t_state.prioritize_first_last
+ options['add_paused'] = t_state.paused
+
+ magnet = t_state.magnet
+ torrent_info = self.get_torrent_info_from_file(
+ os.path.join(self.state_dir, t_state.torrent_id + '.torrent')
+ )
+
+ try:
+ d = self.add_async(
+ torrent_info=torrent_info,
+ state=t_state,
+ options=options,
+ save_state=False,
+ magnet=magnet,
+ resume_data=resume_data.get(t_state.torrent_id),
+ )
+ except AddTorrentError as ex:
+ log.warning(
+ 'Error when adding torrent "%s" to session: %s',
+ t_state.torrent_id,
+ ex,
+ )
+ else:
+ deferreds.append(d)
+
+ deferred_list = DeferredList(deferreds, consumeErrors=False)
+
+ def on_complete(result):
+ log.info(
+ 'Finished loading %d torrents in %s',
+ len(state.torrents),
+ str(datetime.datetime.now() - start),
+ )
+ component.get('EventManager').emit(SessionStartedEvent())
+
+ deferred_list.addCallback(on_complete)
+
+ def create_state(self):
+ """Create a state of all the torrents in TorrentManager.
+
+ Returns:
+ TorrentManagerState: The TorrentManager state.
+
+ """
+ state = TorrentManagerState()
+ # Create the state for each Torrent and append to the list
+ for torrent in self.torrents.values():
+ if self.session.is_paused():
+ paused = torrent.handle.is_paused()
+ elif torrent.forced_error:
+ paused = torrent.forced_error.was_paused
+ elif torrent.state == 'Paused':
+ paused = True
+ else:
+ paused = False
+
+ torrent_state = TorrentState(
+ torrent.torrent_id,
+ torrent.filename,
+ torrent.trackers,
+ torrent.get_status(['storage_mode'])['storage_mode'],
+ paused,
+ torrent.options['download_location'],
+ torrent.options['max_connections'],
+ torrent.options['max_upload_slots'],
+ torrent.options['max_upload_speed'],
+ torrent.options['max_download_speed'],
+ torrent.options['prioritize_first_last_pieces'],
+ torrent.options['sequential_download'],
+ torrent.options['file_priorities'],
+ torrent.get_queue_position(),
+ torrent.options['auto_managed'],
+ torrent.is_finished,
+ torrent.options['stop_ratio'],
+ torrent.options['stop_at_ratio'],
+ torrent.options['remove_at_ratio'],
+ torrent.options['move_completed'],
+ torrent.options['move_completed_path'],
+ torrent.magnet,
+ torrent.options['owner'],
+ torrent.options['shared'],
+ torrent.options['super_seeding'],
+ torrent.options['name'],
+ )
+ state.torrents.append(torrent_state)
+ return state
+
+ def save_state(self):
+ """Run the save state task in a separate thread to avoid blocking main thread.
+
+ Note:
+ If a save task is already running, this call is ignored.
+
+ """
+ if self.is_saving_state:
+ return defer.succeed(None)
+ self.is_saving_state = True
+ d = threads.deferToThread(self._save_state)
+
+ def on_state_saved(arg):
+ self.is_saving_state = False
+ if self.save_state_timer.running:
+ self.save_state_timer.reset()
+
+ d.addBoth(on_state_saved)
+ return d
+
+ def _save_state(self):
+ """Save the state of the TorrentManager to the torrents.state file."""
+ state = self.create_state()
+
+ # If the state hasn't changed, no need to save it
+ if self.prev_saved_state == state:
+ return
+
+ filename = 'torrents.state'
+ filepath = os.path.join(self.state_dir, filename)
+ filepath_bak = filepath + '.bak'
+ filepath_tmp = filepath + '.tmp'
+
+ try:
+ log.debug('Creating the temporary file: %s', filepath_tmp)
+ with open(filepath_tmp, 'wb', 0) as _file:
+ pickle.dump(state, _file, protocol=2)
+ _file.flush()
+ os.fsync(_file.fileno())
+ except (OSError, pickle.PicklingError) as ex:
+ log.error('Unable to save %s: %s', filename, ex)
+ return
+
+ try:
+ log.debug('Creating backup of %s at: %s', filename, filepath_bak)
+ if os.path.isfile(filepath_bak):
+ os.remove(filepath_bak)
+ if os.path.isfile(filepath):
+ os.rename(filepath, filepath_bak)
+ except OSError as ex:
+ log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
+ return
+
+ try:
+ log.debug('Saving %s to: %s', filename, filepath)
+ os.rename(filepath_tmp, filepath)
+ self.prev_saved_state = state
+ except OSError as ex:
+ log.error('Failed to set new state file %s: %s', filepath, ex)
+ if os.path.isfile(filepath_bak):
+ log.info('Restoring backup of state from: %s', filepath_bak)
+ os.rename(filepath_bak, filepath)
+
+ def save_resume_data(self, torrent_ids=None, flush_disk_cache=False):
+ """Saves torrents resume data.
+
+ Args:
+ torrent_ids (list of str): A list of torrents to save the resume data for, defaults
+ to None which saves all torrents resume data.
+ flush_disk_cache (bool, optional): If True flushes the disk cache which avoids potential
+ issue with file timestamps, defaults to False. This is only needed when stopping the session.
+
+ Returns:
+ t.i.d.DeferredList: A list of twisted Deferred callbacks to be invoked when save is complete.
+
+ """
+ if torrent_ids is None:
+ torrent_ids = (
+ tid
+ for tid, t in self.torrents.items()
+ if t.handle.need_save_resume_data()
+ )
+
+ def on_torrent_resume_save(dummy_result, torrent_id):
+ """Received torrent resume_data alert so remove from waiting list"""
+ self.waiting_on_resume_data.pop(torrent_id, None)
+
+ deferreds = []
+ for torrent_id in torrent_ids:
+ d = self.waiting_on_resume_data.get(torrent_id)
+ if not d:
+ d = Deferred().addBoth(on_torrent_resume_save, torrent_id)
+ self.waiting_on_resume_data[torrent_id] = d
+ deferreds.append(d)
+ self.torrents[torrent_id].save_resume_data(flush_disk_cache)
+
+ def on_all_resume_data_finished(dummy_result):
+ """Saves resume data file when no more torrents waiting for resume data.
+
+ Returns:
+ bool: True if fastresume file is saved.
+
+ This return value determines removal of `self.temp_file` in `self.stop()`.
+
+ """
+ # Use flush_disk_cache as a marker for shutdown so fastresume is
+ # saved even if torrents are waiting.
+ if not self.waiting_on_resume_data or flush_disk_cache:
+ return self.save_resume_data_file(queue_task=flush_disk_cache)
+
+ return DeferredList(deferreds).addBoth(on_all_resume_data_finished)
+
+ def load_resume_data_file(self):
+ """Load the resume data from file for all torrents.
+
+ Returns:
+ dict: A dict of torrents and their resume_data.
+
+ """
+ filename = 'torrents.fastresume'
+ filepath = os.path.join(self.state_dir, filename)
+ filepath_bak = filepath + '.bak'
+ old_data_filepath = os.path.join(get_config_dir(), filename)
+
+ for _filepath in (filepath, filepath_bak, old_data_filepath):
+ log.info('Opening %s for load: %s', filename, _filepath)
+ try:
+ with open(_filepath, 'rb') as _file:
+ resume_data = lt.bdecode(_file.read())
+ except (OSError, EOFError, RuntimeError) as ex:
+ if self.torrents:
+ log.warning('Unable to load %s: %s', _filepath, ex)
+ resume_data = None
+ else:
+ # lt.bdecode returns the dict keys as bytes so decode them.
+ resume_data = {k.decode(): v for k, v in resume_data.items()}
+ log.info('Successfully loaded %s: %s', filename, _filepath)
+ break
+
+ # If the libtorrent bdecode doesn't happen properly, it will return None
+ # so we need to make sure we return a {}
+ if resume_data is None:
+ return {}
+ else:
+ return resume_data
+
+ def save_resume_data_file(self, queue_task=False):
+ """Save resume data to file in a separate thread to avoid blocking main thread.
+
+ Args:
+ queue_task (bool): If True and a save task is already running then queue
+ this save task to run next. Default is to not queue save tasks.
+
+ Returns:
+ Deferred: Fires with arg, True if save task was successful, False if
+ not and None if task was not performed.
+
+ """
+ if not queue_task and self.save_resume_data_file_lock.locked:
+ return defer.succeed(None)
+
+ def on_lock_aquired():
+ d = threads.deferToThread(self._save_resume_data_file)
+
+ def on_resume_data_file_saved(arg):
+ if self.save_resume_data_timer.running:
+ self.save_resume_data_timer.reset()
+ return arg
+
+ d.addBoth(on_resume_data_file_saved)
+ return d
+
+ return self.save_resume_data_file_lock.run(on_lock_aquired)
+
+ def _save_resume_data_file(self):
+ """Saves the resume data file with the contents of self.resume_data"""
+ if not self.resume_data:
+ return True
+
+ filename = 'torrents.fastresume'
+ filepath = os.path.join(self.state_dir, filename)
+ filepath_bak = filepath + '.bak'
+ filepath_tmp = filepath + '.tmp'
+
+ try:
+ log.debug('Creating the temporary file: %s', filepath_tmp)
+ with open(filepath_tmp, 'wb', 0) as _file:
+ _file.write(lt.bencode(self.resume_data))
+ _file.flush()
+ os.fsync(_file.fileno())
+ except (OSError, EOFError) as ex:
+ log.error('Unable to save %s: %s', filename, ex)
+ return False
+
+ try:
+ log.debug('Creating backup of %s at: %s', filename, filepath_bak)
+ if os.path.isfile(filepath_bak):
+ os.remove(filepath_bak)
+ if os.path.isfile(filepath):
+ os.rename(filepath, filepath_bak)
+ except OSError as ex:
+ log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
+ return False
+
+ try:
+ log.debug('Saving %s to: %s', filename, filepath)
+ os.rename(filepath_tmp, filepath)
+ except OSError as ex:
+ log.error('Failed to set new file %s: %s', filepath, ex)
+ if os.path.isfile(filepath_bak):
+ log.info('Restoring backup from: %s', filepath_bak)
+ os.rename(filepath_bak, filepath)
+ else:
+ # Sync the rename operations for the directory
+ if hasattr(os, 'O_DIRECTORY'):
+ dirfd = os.open(os.path.dirname(filepath), os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+ return True
+
+ def archive_state(self, message):
+ log.warning(message)
+ arc_filepaths = []
+ for filename in ('torrents.fastresume', 'torrents.state'):
+ filepath = os.path.join(self.state_dir, filename)
+ arc_filepaths.extend([filepath, filepath + '.bak'])
+
+ archive_files('state', arc_filepaths, message=message)
+
+ def get_queue_position(self, torrent_id):
+ """Get queue position of torrent"""
+ return self.torrents[torrent_id].get_queue_position()
+
+ def queue_top(self, torrent_id):
+ """Queue torrent to top"""
+ if self.torrents[torrent_id].get_queue_position() == 0:
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_top()
+ return True
+
+ def queue_up(self, torrent_id):
+ """Queue torrent up one position"""
+ if self.torrents[torrent_id].get_queue_position() == 0:
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_up()
+ return True
+
+ def queue_down(self, torrent_id):
+ """Queue torrent down one position"""
+ if self.torrents[torrent_id].get_queue_position() == (
+ len(self.queued_torrents) - 1
+ ):
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_down()
+ return True
+
+ def queue_bottom(self, torrent_id):
+ """Queue torrent to bottom"""
+ if self.torrents[torrent_id].get_queue_position() == (
+ len(self.queued_torrents) - 1
+ ):
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_bottom()
+ return True
+
+ def cleanup_torrents_prev_status(self):
+ """Run cleanup_prev_status for each registered torrent"""
+ for torrent in self.torrents.values():
+ torrent.cleanup_prev_status()
+
+ def on_set_max_connections_per_torrent(self, key, value):
+ """Sets the per-torrent connection limit"""
+ log.debug('max_connections_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_connections(value)
+
+ def on_set_max_upload_slots_per_torrent(self, key, value):
+ """Sets the per-torrent upload slot limit"""
+ log.debug('max_upload_slots_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_upload_slots(value)
+
+ def on_set_max_upload_speed_per_torrent(self, key, value):
+ """Sets the per-torrent upload speed limit"""
+ log.debug('max_upload_speed_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_upload_speed(value)
+
+ def on_set_max_download_speed_per_torrent(self, key, value):
+ """Sets the per-torrent download speed limit"""
+ log.debug('max_download_speed_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_download_speed(value)
+
+ # --- Alert handlers ---
+ def on_alert_add_torrent(self, alert):
+ """Alert handler for libtorrent add_torrent_alert"""
+ if not alert.handle.is_valid():
+ log.warning('Torrent handle is invalid: %s', alert.error.message())
+ return
+
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError as ex:
+ log.warning('Failed to get torrent id from handle: %s', ex)
+ return
+
+ try:
+ add_async_params = self.torrents_loading.pop(torrent_id)
+ except KeyError as ex:
+ log.warning('Torrent id not in torrents loading list: %s', ex)
+ return
+
+ self.add_async_callback(alert.handle, *add_async_params)
+
+ def on_alert_torrent_finished(self, alert):
+ """Alert handler for libtorrent torrent_finished_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ # If total_download is 0, do not move, it's likely the torrent wasn't downloaded, but just added.
+ # Get fresh data from libtorrent, the cache isn't always up to date
+ total_download = torrent.get_status(['total_payload_download'], update=True)[
+ 'total_payload_download'
+ ]
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Finished %s ', torrent_id)
+ log.debug(
+ 'Torrent settings: is_finished: %s, total_download: %s, move_completed: %s, move_path: %s',
+ torrent.is_finished,
+ total_download,
+ torrent.options['move_completed'],
+ torrent.options['move_completed_path'],
+ )
+
+ torrent.update_state()
+ if not torrent.is_finished and total_download:
+ # Move completed download to completed folder if needed
+ if (
+ torrent.options['move_completed']
+ and torrent.options['download_location']
+ != torrent.options['move_completed_path']
+ ):
+ self.waiting_on_finish_moving.append(torrent_id)
+ torrent.move_storage(torrent.options['move_completed_path'])
+ else:
+ torrent.is_finished = True
+ component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
+ else:
+ torrent.is_finished = True
+
+ # Torrent is no longer part of the queue
+ try:
+ self.queued_torrents.remove(torrent_id)
+ except KeyError:
+ # Sometimes libtorrent fires a TorrentFinishedEvent twice
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('%s is not in queued torrents set.', torrent_id)
+
+ # Only save resume data if it was actually downloaded something. Helps
+ # on startup with big queues with lots of seeding torrents. Libtorrent
+ # emits alert_torrent_finished for them, but there seems like nothing
+ # worth really to save in resume data, we just read it up in
+ # self.load_state().
+ if total_download:
+ self.save_resume_data((torrent_id,))
+
+ def on_alert_torrent_paused(self, alert):
+ """Alert handler for libtorrent torrent_paused_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+ torrent.update_state()
+ # Write the fastresume file if we are not waiting on a bulk write
+ if torrent_id not in self.waiting_on_resume_data:
+ self.save_resume_data((torrent_id,))
+
+ def on_alert_torrent_checked(self, alert):
+ """Alert handler for libtorrent torrent_checked_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ # Check to see if we're forcing a recheck and set it back to paused if necessary.
+ if torrent.forcing_recheck:
+ torrent.forcing_recheck = False
+ if torrent.forcing_recheck_paused:
+ torrent.handle.pause()
+
+ torrent.update_state()
+
+ def on_alert_tracker_reply(self, alert):
+ """Alert handler for libtorrent tracker_reply_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ # Set the tracker status for the torrent
+ torrent.set_tracker_status('Announce OK')
+
+ # Check for peer information from the tracker, if none then send a scrape request.
+ torrent.get_lt_status()
+ if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1:
+ torrent.scrape_tracker()
+
+ def on_alert_tracker_announce(self, alert):
+ """Alert handler for libtorrent tracker_announce_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ # Set the tracker status for the torrent
+ torrent.set_tracker_status('Announce Sent')
+
+ def on_alert_tracker_warning(self, alert):
+ """Alert handler for libtorrent tracker_warning_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+ # Set the tracker status for the torrent
+ torrent.set_tracker_status('Warning: %s' % decode_bytes(alert.message()))
+
+ def on_alert_tracker_error(self, alert):
+ """Alert handler for libtorrent tracker_error_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ error_message = decode_bytes(alert.error_message())
+ if not error_message:
+ error_message = decode_bytes(alert.error.message())
+ log.debug(
+ 'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message
+ )
+ # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates
+ # we will need to verify that at least one endpoint to the errored tracker is working
+ for tracker in torrent.handle.trackers():
+ if tracker['url'] == alert.url:
+ if any(
+ endpoint['last_error']['value'] == 0
+ for endpoint in tracker['endpoints']
+ ):
+ torrent.set_tracker_status('Announce OK')
+ else:
+ torrent.set_tracker_status('Error: ' + error_message)
+ break
+
+ def on_alert_storage_moved(self, alert):
+ """Alert handler for libtorrent storage_moved_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ torrent.set_download_location(os.path.normpath(alert.storage_path()))
+ torrent.set_move_completed(False)
+ torrent.update_state()
+
+ if torrent_id in self.waiting_on_finish_moving:
+ self.waiting_on_finish_moving.remove(torrent_id)
+ torrent.is_finished = True
+ component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
+
+ def on_alert_storage_moved_failed(self, alert):
+ """Alert handler for libtorrent storage_moved_failed_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ log.warning('on_alert_storage_moved_failed: %s', decode_bytes(alert.message()))
+ # Set an Error message and pause the torrent
+ alert_msg = decode_bytes(alert.message()).split(':', 1)[1].strip()
+ torrent.force_error_state('Failed to move download folder: %s' % alert_msg)
+
+ if torrent_id in self.waiting_on_finish_moving:
+ self.waiting_on_finish_moving.remove(torrent_id)
+ torrent.is_finished = True
+ component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
+
+ def on_alert_torrent_resumed(self, alert):
+ """Alert handler for libtorrent torrent_resumed_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+ torrent.update_state()
+ component.get('EventManager').emit(TorrentResumedEvent(torrent_id))
+
+ def on_alert_state_changed(self, alert):
+ """Alert handler for libtorrent state_changed_alert.
+
+ Emits:
+ TorrentStateChangedEvent: The state has changed.
+
+ """
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ torrent.update_state()
+ # Torrent may need to download data after checking.
+ if torrent.state in ('Checking', 'Downloading'):
+ torrent.is_finished = False
+ self.queued_torrents.add(torrent_id)
+
+ def on_alert_save_resume_data(self, alert):
+ """Alert handler for libtorrent save_resume_data_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+ if torrent_id in self.torrents:
+ # libtorrent add_torrent expects bencoded resume_data.
+ self.resume_data[torrent_id] = lt.bencode(
+ lt.write_resume_data(alert.params)
+ )
+
+ if torrent_id in self.waiting_on_resume_data:
+ self.waiting_on_resume_data[torrent_id].callback(None)
+
+ def on_alert_save_resume_data_failed(self, alert):
+ """Alert handler for libtorrent save_resume_data_failed_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+
+ if torrent_id in self.waiting_on_resume_data:
+ self.waiting_on_resume_data[torrent_id].errback(
+ Exception(decode_bytes(alert.message()))
+ )
+
+ def on_alert_fastresume_rejected(self, alert):
+ """Alert handler for libtorrent fastresume_rejected_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ alert_msg = decode_bytes(alert.message())
+ log.error('on_alert_fastresume_rejected: %s', alert_msg)
+ if alert.error.value() == 134:
+ if not os.path.isdir(torrent.options['download_location']):
+ error_msg = 'Unable to locate Download Folder!'
+ else:
+ error_msg = 'Missing or invalid torrent data!'
+ else:
+ error_msg = (
+ 'Problem with resume data: %s' % alert_msg.split(':', 1)[1].strip()
+ )
+ torrent.force_error_state(error_msg, restart_to_resume=True)
+
+ def on_alert_file_renamed(self, alert):
+ """Alert handler for libtorrent file_renamed_alert.
+
+ Emits:
+ TorrentFileRenamedEvent: Files in the torrent have been renamed.
+
+ """
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ new_name = decode_bytes(alert.new_name())
+ log.debug('index: %s name: %s', alert.index, new_name)
+
+ # We need to see if this file index is in a waiting_on_folder dict
+ for wait_on_folder in torrent.waiting_on_folder_rename:
+ if alert.index in wait_on_folder:
+ wait_on_folder[alert.index].callback(None)
+ break
+ else:
+ # This is just a regular file rename so send the signal
+ component.get('EventManager').emit(
+ TorrentFileRenamedEvent(torrent_id, alert.index, new_name)
+ )
+ self.save_resume_data((torrent_id,))
+
+ def on_alert_metadata_received(self, alert):
+ """Alert handler for libtorrent metadata_received_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+
+ try:
+ torrent = self.torrents[torrent_id]
+ except KeyError:
+ pass
+ else:
+ return torrent.on_metadata_received()
+
+ # Try callback to prefetch_metadata method.
+ try:
+ d = self.prefetching_metadata[torrent_id].alert_deferred
+ except KeyError:
+ pass
+ else:
+ torrent_info = alert.handle.get_torrent_info()
+ return d.callback(torrent_info)
+
+ def on_alert_file_error(self, alert):
+ """Alert handler for libtorrent file_error_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+ torrent.update_state()
+
+ def on_alert_file_completed(self, alert):
+ """Alert handler for libtorrent file_completed_alert
+
+ Emits:
+ TorrentFileCompletedEvent: When an individual file completes downloading.
+
+ """
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+ if torrent_id in self.torrents:
+ component.get('EventManager').emit(
+ TorrentFileCompletedEvent(torrent_id, alert.index)
+ )
+
+ def on_alert_state_update(self, alert):
+ """Alert handler for libtorrent state_update_alert
+
+ Result of a session.post_torrent_updates() call and contains the torrent status
+ of all torrents that changed since last time this was posted.
+
+ """
+ self.last_state_update_alert_ts = time.time()
+
+ for t_status in alert.status:
+ try:
+ torrent_id = str(t_status.info_hash)
+ except RuntimeError:
+ continue
+ if torrent_id in self.torrents:
+ self.torrents[torrent_id].status = t_status
+
+ self.handle_torrents_status_callback(self.torrents_status_requests.pop())
+
+ def on_alert_external_ip(self, alert):
+ """Alert handler for libtorrent external_ip_alert"""
+ log.info('on_alert_external_ip: %s', alert.external_address)
+ component.get('EventManager').emit(ExternalIPEvent(alert.external_address))
+
+ def on_alert_performance(self, alert):
+ """Alert handler for libtorrent performance_alert"""
+ log.warning(
+ 'on_alert_performance: %s, %s',
+ decode_bytes(alert.message()),
+ alert.warning_code,
+ )
+ if alert.warning_code == lt.performance_warning_t.send_buffer_watermark_too_low:
+ max_send_buffer_watermark = 3 * 1024 * 1024 # 3MiB
+ settings = self.session.get_settings()
+ send_buffer_watermark = settings['send_buffer_watermark']
+
+ # If send buffer is too small, try increasing its size by 512KiB (up to max_send_buffer_watermark)
+ if send_buffer_watermark < max_send_buffer_watermark:
+ value = send_buffer_watermark + (500 * 1024)
+ log.info(
+ 'Increasing send_buffer_watermark from %s to %s Bytes',
+ send_buffer_watermark,
+ value,
+ )
+ component.get('Core').apply_session_setting(
+ 'send_buffer_watermark', value
+ )
+ else:
+ log.warning(
+ 'send_buffer_watermark reached maximum value: %s Bytes',
+ max_send_buffer_watermark,
+ )
+
+ def separate_keys(self, keys, torrent_ids):
+ """Separates the input keys into torrent class keys and plugins keys"""
+ if self.torrents:
+ for torrent_id in torrent_ids:
+ if torrent_id in self.torrents:
+ status_keys = list(self.torrents[torrent_id].status_funcs)
+ leftover_keys = list(set(keys) - set(status_keys))
+ torrent_keys = list(set(keys) - set(leftover_keys))
+ return torrent_keys, leftover_keys
+ return [], []
+
+ def handle_torrents_status_callback(self, status_request):
+ """Build the status dictionary with torrent values"""
+ d, torrent_ids, keys, diff = status_request
+ status_dict = {}.fromkeys(torrent_ids)
+ torrent_keys, plugin_keys = self.separate_keys(keys, torrent_ids)
+
+ # Get the torrent status for each torrent_id
+ for torrent_id in torrent_ids:
+ if torrent_id not in self.torrents:
+ # The torrent_id does not exist in the dict.
+ # Could be the clients cache (sessionproxy) isn't up to speed.
+ del status_dict[torrent_id]
+ else:
+ status_dict[torrent_id] = self.torrents[torrent_id].get_status(
+ torrent_keys, diff, all_keys=not keys
+ )
+ self.status_dict = status_dict
+ d.callback((status_dict, plugin_keys))
+
+ def torrents_status_update(self, torrent_ids, keys, diff=False):
+ """Returns status dict for the supplied torrent_ids async.
+
+ Note:
+ If torrent states was updated recently post_torrent_updates is not called and
+ instead cached state is used.
+
+ Args:
+ torrent_ids (list of str): The torrent IDs to get the status of.
+ keys (list of str): The keys to get the status on.
+ diff (bool, optional): If True, will return a diff of the changes since the
+ last call to get_status based on the session_id, defaults to False.
+
+ Returns:
+ dict: A status dictionary for the requested torrents.
+
+ """
+ d = Deferred()
+ now = time.time()
+ # If last update was recent, use cached data instead of request updates from libtorrent
+ if (now - self.last_state_update_alert_ts) < 1.5:
+ reactor.callLater(
+ 0, self.handle_torrents_status_callback, (d, torrent_ids, keys, diff)
+ )
+ else:
+ # Ask libtorrent for status update
+ self.torrents_status_requests.insert(0, (d, torrent_ids, keys, diff))
+ self.session.post_torrent_updates()
+ return d