diff options
Diffstat (limited to 'deluge/core')
-rw-r--r-- | deluge/core/__init__.py | 0 | ||||
-rw-r--r-- | deluge/core/alertmanager.py | 193 | ||||
-rw-r--r-- | deluge/core/authmanager.py | 285 | ||||
-rw-r--r-- | deluge/core/core.py | 1302 | ||||
-rw-r--r-- | deluge/core/daemon.py | 203 | ||||
-rw-r--r-- | deluge/core/daemon_entry.py | 140 | ||||
-rw-r--r-- | deluge/core/eventmanager.py | 66 | ||||
-rw-r--r-- | deluge/core/filtermanager.py | 274 | ||||
-rw-r--r-- | deluge/core/pluginmanager.py | 105 | ||||
-rw-r--r-- | deluge/core/preferencesmanager.py | 476 | ||||
-rw-r--r-- | deluge/core/rpcserver.py | 598 | ||||
-rw-r--r-- | deluge/core/torrent.py | 1563 | ||||
-rw-r--r-- | deluge/core/torrentmanager.py | 1700 |
13 files changed, 6905 insertions, 0 deletions
diff --git a/deluge/core/__init__.py b/deluge/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/deluge/core/__init__.py diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py new file mode 100644 index 0000000..cf541f0 --- /dev/null +++ b/deluge/core/alertmanager.py @@ -0,0 +1,193 @@ +# +# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +""" + +The AlertManager handles all the libtorrent alerts. + +This should typically only be used by the Core. Plugins should utilize the +`:mod:EventManager` for similar functionality. + +""" +import contextlib +import logging +import threading +import time +from collections import defaultdict +from functools import partial +from typing import Any, Callable + +from twisted.internet import reactor, task, threads + +import deluge.component as component +from deluge._libtorrent import lt +from deluge.common import decode_bytes + +log = logging.getLogger(__name__) + + +class AlertManager(component.Component): + """AlertManager fetches and processes libtorrent alerts""" + + def __init__(self): + log.debug('AlertManager init...') + component.Component.__init__(self, 'AlertManager') + self.session = component.get('Core').session + + # Increase the alert queue size so that alerts don't get lost. + self.alert_queue_size = 10000 + self.set_alert_queue_size(self.alert_queue_size) + + alert_mask = ( + lt.alert.category_t.error_notification + | lt.alert.category_t.port_mapping_notification + | lt.alert.category_t.storage_notification + | lt.alert.category_t.tracker_notification + | lt.alert.category_t.status_notification + | lt.alert.category_t.ip_block_notification + | lt.alert.category_t.performance_warning + | lt.alert.category_t.file_progress_notification + ) + + self.session.apply_settings({'alert_mask': alert_mask}) + + # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} + self.handlers = defaultdict(list) + self.handlers_timeout_secs = 2 + self.delayed_calls = [] + self._event = threading.Event() + + def update(self): + pass + + def start(self): + thread = threading.Thread( + target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True + ) + thread.start() + self._event.set() + + def stop(self): + self.cancel_delayed_calls() + + def pause(self): + self._event.clear() + + def resume(self): + self._event.set() + + def wait_for_alert_in_thread(self): + while self._component_state not in ('Stopping', 'Stopped'): + if self.check_delayed_calls(): + time.sleep(0.05) + continue + + if self.session.wait_for_alert(1000) is None: + continue + if self._event.wait(): + threads.blockingCallFromThread(reactor, self.maybe_handle_alerts) + + def on_delayed_call_timeout(self, result, timeout, **kwargs): + log.warning('Alert handler was timed-out before being called %s', kwargs) + + def cancel_delayed_calls(self): + """Cancel all delayed handlers.""" + for delayed_call in self.delayed_calls: + delayed_call.cancel() + self.delayed_calls = [] + + def check_delayed_calls(self) -> bool: + """Returns True if any handler calls are delayed.""" + self.delayed_calls = [dc for dc in self.delayed_calls if not dc.called] + return len(self.delayed_calls) > 0 + + def maybe_handle_alerts(self) -> None: + if self._component_state != 'Started': + return + + self.handle_alerts() + + def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None: + """ + Registers a function that will be called when 'alert_type' is pop'd + in handle_alerts. The handler function should look like: handler(alert) + Where 'alert' is the actual alert object from libtorrent. + + Args: + alert_type: String representation of the libtorrent alert name. + Can be supplied with or without `_alert` suffix. + handler: Callback function when the alert is raised. + """ + if alert_type and alert_type.endswith('_alert'): + alert_type = alert_type[: -len('_alert')] + + self.handlers[alert_type].append(handler) + log.debug('Registered handler for alert %s', alert_type) + + def deregister_handler(self, handler: Callable[[Any], None]): + """ + De-registers the `handler` function from all alert types. + + Args: + handler: The handler function to deregister. + """ + for alert_type_handlers in self.handlers.values(): + with contextlib.suppress(ValueError): + alert_type_handlers.remove(handler) + + def handle_alerts(self): + """ + Pops all libtorrent alerts in the session queue and handles them appropriately. + """ + alerts = self.session.pop_alerts() + if not alerts: + return + + num_alerts = len(alerts) + if log.isEnabledFor(logging.DEBUG): + log.debug('Alerts queued: %s', num_alerts) + if num_alerts > 0.9 * self.alert_queue_size: + log.warning( + 'Warning total alerts queued, %s, passes 90%% of queue size.', + num_alerts, + ) + + for alert in alerts: + alert_type = alert.what() + + # Display the alert message + if log.isEnabledFor(logging.DEBUG): + log.debug('%s: %s', alert_type, decode_bytes(alert.message())) + + if alert_type not in self.handlers: + continue + + # Call any handlers for this alert type + for handler in self.handlers[alert_type]: + if log.isEnabledFor(logging.DEBUG): + log.debug('Handling alert: %s', alert_type) + d = task.deferLater(reactor, 0, handler, alert) + on_handler_timeout = partial( + self.on_delayed_call_timeout, + handler=handler.__qualname__, + alert_type=alert_type, + ) + d.addTimeout( + self.handlers_timeout_secs, + reactor, + onTimeoutCancel=on_handler_timeout, + ) + self.delayed_calls.append(d) + + def set_alert_queue_size(self, queue_size): + """Sets the maximum size of the libtorrent alert queue""" + log.info('Alert Queue Size set to %s', queue_size) + self.alert_queue_size = queue_size + component.get('Core').apply_session_setting( + 'alert_queue_size', self.alert_queue_size + ) diff --git a/deluge/core/authmanager.py b/deluge/core/authmanager.py new file mode 100644 index 0000000..3ff8a3a --- /dev/null +++ b/deluge/core/authmanager.py @@ -0,0 +1,285 @@ +# +# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com> +# Copyright (C) 2011 Pedro Algarvio <pedro@algarvio.me> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import logging +import os +import shutil + +import deluge.component as component +import deluge.configmanager as configmanager +from deluge.common import ( + AUTH_LEVEL_ADMIN, + AUTH_LEVEL_DEFAULT, + AUTH_LEVEL_NONE, + AUTH_LEVEL_NORMAL, + AUTH_LEVEL_READONLY, + create_localclient_account, +) +from deluge.error import AuthenticationRequired, AuthManagerError, BadLoginError + +log = logging.getLogger(__name__) + +AUTH_LEVELS_MAPPING = { + 'NONE': AUTH_LEVEL_NONE, + 'READONLY': AUTH_LEVEL_READONLY, + 'DEFAULT': AUTH_LEVEL_DEFAULT, + 'NORMAL': AUTH_LEVEL_NORMAL, + 'ADMIN': AUTH_LEVEL_ADMIN, +} +AUTH_LEVELS_MAPPING_REVERSE = {v: k for k, v in AUTH_LEVELS_MAPPING.items()} + + +class Account: + __slots__ = ('username', 'password', 'authlevel') + + def __init__(self, username, password, authlevel): + self.username = username + self.password = password + self.authlevel = authlevel + + def data(self): + return { + 'username': self.username, + 'password': self.password, + 'authlevel': AUTH_LEVELS_MAPPING_REVERSE[self.authlevel], + 'authlevel_int': self.authlevel, + } + + def __repr__(self): + return '<Account username="{username}" authlevel={authlevel}>'.format( + username=self.username, + authlevel=self.authlevel, + ) + + +class AuthManager(component.Component): + def __init__(self): + component.Component.__init__(self, 'AuthManager', interval=10) + self.__auth = {} + self.__auth_modification_time = None + + def start(self): + self.__load_auth_file() + + def stop(self): + self.__auth = {} + + def shutdown(self): + pass + + def update(self): + auth_file = configmanager.get_config_dir('auth') + # Check for auth file and create if necessary + if not os.path.isfile(auth_file): + log.info('Authfile not found, recreating it.') + self.__load_auth_file() + return + + auth_file_modification_time = os.stat(auth_file).st_mtime + if self.__auth_modification_time != auth_file_modification_time: + log.info('Auth file changed, reloading it!') + self.__load_auth_file() + + def authorize(self, username, password): + """Authorizes users based on username and password. + + Args: + username (str): Username + password (str): Password + + Returns: + int: The auth level for this user. + + Raises: + AuthenticationRequired: If additional details are required to authenticate. + BadLoginError: If the username does not exist or password does not match. + + """ + if not username: + raise AuthenticationRequired( + 'Username and Password are required.', username + ) + + if username not in self.__auth: + # Let's try to re-load the file.. Maybe it's been updated + self.__load_auth_file() + if username not in self.__auth: + raise BadLoginError('Username does not exist', username) + + if self.__auth[username].password == password: + # Return the users auth level + return self.__auth[username].authlevel + elif not password and self.__auth[username].password: + raise AuthenticationRequired('Password is required', username) + else: + raise BadLoginError('Password does not match', username) + + def has_account(self, username): + return username in self.__auth + + def get_known_accounts(self): + """Returns a list of known deluge usernames.""" + self.__load_auth_file() + return [account.data() for account in self.__auth.values()] + + def create_account(self, username, password, authlevel): + if username in self.__auth: + raise AuthManagerError('Username in use.', username) + if authlevel not in AUTH_LEVELS_MAPPING: + raise AuthManagerError('Invalid auth level: %s' % authlevel) + try: + self.__auth[username] = Account( + username, password, AUTH_LEVELS_MAPPING[authlevel] + ) + self.write_auth_file() + return True + except Exception as ex: + log.exception(ex) + raise ex + + def update_account(self, username, password, authlevel): + if username not in self.__auth: + raise AuthManagerError('Username not known', username) + if authlevel not in AUTH_LEVELS_MAPPING: + raise AuthManagerError('Invalid auth level: %s' % authlevel) + try: + self.__auth[username].username = username + self.__auth[username].password = password + self.__auth[username].authlevel = AUTH_LEVELS_MAPPING[authlevel] + self.write_auth_file() + return True + except Exception as ex: + log.exception(ex) + raise ex + + def remove_account(self, username): + if username not in self.__auth: + raise AuthManagerError('Username not known', username) + elif username == component.get('RPCServer').get_session_user(): + raise AuthManagerError( + 'You cannot delete your own account while logged in!', username + ) + + del self.__auth[username] + self.write_auth_file() + return True + + def write_auth_file(self): + filename = 'auth' + filepath = os.path.join(configmanager.get_config_dir(), filename) + filepath_bak = filepath + '.bak' + filepath_tmp = filepath + '.tmp' + + try: + if os.path.isfile(filepath): + log.debug('Creating backup of %s at: %s', filename, filepath_bak) + shutil.copy2(filepath, filepath_bak) + except OSError as ex: + log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex) + else: + log.info('Saving the %s at: %s', filename, filepath) + try: + with open(filepath_tmp, 'w', encoding='utf8') as _file: + for account in self.__auth.values(): + _file.write( + '%(username)s:%(password)s:%(authlevel_int)s\n' + % account.data() + ) + _file.flush() + os.fsync(_file.fileno()) + shutil.move(filepath_tmp, filepath) + except OSError as ex: + log.error('Unable to save %s: %s', filename, ex) + if os.path.isfile(filepath_bak): + log.info('Restoring backup of %s from: %s', filename, filepath_bak) + shutil.move(filepath_bak, filepath) + + self.__load_auth_file() + + def __load_auth_file(self): + save_and_reload = False + filename = 'auth' + auth_file = configmanager.get_config_dir(filename) + auth_file_bak = auth_file + '.bak' + + # Check for auth file and create if necessary + if not os.path.isfile(auth_file): + create_localclient_account() + return self.__load_auth_file() + + auth_file_modification_time = os.stat(auth_file).st_mtime + if self.__auth_modification_time is None: + self.__auth_modification_time = auth_file_modification_time + elif self.__auth_modification_time == auth_file_modification_time: + # File didn't change, no need for re-parsing's + return + + for _filepath in (auth_file, auth_file_bak): + log.info('Opening %s for load: %s', filename, _filepath) + try: + with open(_filepath, encoding='utf8') as _file: + file_data = _file.readlines() + except OSError as ex: + log.warning('Unable to load %s: %s', _filepath, ex) + file_data = [] + else: + log.info('Successfully loaded %s: %s', filename, _filepath) + break + + # Load the auth file into a dictionary: {username: Account(...)} + for line in file_data: + line = line.strip() + if line.startswith('#') or not line: + # This line is a comment or empty + continue + lsplit = line.split(':') + if len(lsplit) == 2: + username, password = lsplit + log.warning( + 'Your auth entry for %s contains no auth level, ' + 'using AUTH_LEVEL_DEFAULT(%s)..', + username, + AUTH_LEVEL_DEFAULT, + ) + if username == 'localclient': + authlevel = AUTH_LEVEL_ADMIN + else: + authlevel = AUTH_LEVEL_DEFAULT + # This is probably an old auth file + save_and_reload = True + elif len(lsplit) == 3: + username, password, authlevel = lsplit + else: + log.error('Your auth file is malformed: Incorrect number of fields!') + continue + + username = username.strip() + password = password.strip() + try: + authlevel = int(authlevel) + except ValueError: + try: + authlevel = AUTH_LEVELS_MAPPING[authlevel] + except KeyError: + log.error( + 'Your auth file is malformed: %r is not a valid auth level', + authlevel, + ) + continue + + self.__auth[username] = Account(username, password, authlevel) + + if 'localclient' not in self.__auth: + create_localclient_account(True) + return self.__load_auth_file() + + if save_and_reload: + log.info('Re-writing auth file (upgrade)') + self.write_auth_file() + self.__auth_modification_time = auth_file_modification_time diff --git a/deluge/core/core.py b/deluge/core/core.py new file mode 100644 index 0000000..e2130f5 --- /dev/null +++ b/deluge/core/core.py @@ -0,0 +1,1302 @@ +# +# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> +# Copyright (C) 2011 Pedro Algarvio <pedro@algarvio.me> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import glob +import logging +import os +import shutil +import tempfile +from base64 import b64decode, b64encode +from typing import Any, Dict, List, Optional, Tuple, Union +from urllib.request import URLError, urlopen + +from twisted.internet import defer, reactor, task, threads +from twisted.web.client import Agent, readBody + +import deluge.common +import deluge.component as component +from deluge import metafile, path_chooser_common +from deluge._libtorrent import LT_VERSION, lt +from deluge.configmanager import ConfigManager, get_config_dir +from deluge.core.alertmanager import AlertManager +from deluge.core.authmanager import ( + AUTH_LEVEL_ADMIN, + AUTH_LEVEL_NONE, + AUTH_LEVELS_MAPPING, + AUTH_LEVELS_MAPPING_REVERSE, + AuthManager, +) +from deluge.core.eventmanager import EventManager +from deluge.core.filtermanager import FilterManager +from deluge.core.pluginmanager import PluginManager +from deluge.core.preferencesmanager import PreferencesManager +from deluge.core.rpcserver import export +from deluge.core.torrentmanager import TorrentManager +from deluge.decorators import deprecated, maybe_coroutine +from deluge.error import ( + AddTorrentError, + DelugeError, + InvalidPathError, + InvalidTorrentError, +) +from deluge.event import ( + NewVersionAvailableEvent, + SessionPausedEvent, + SessionResumedEvent, + TorrentQueueChangedEvent, +) +from deluge.httpdownloader import download_file + +log = logging.getLogger(__name__) + +DEPR_SESSION_STATUS_KEYS = { + # 'active_requests': None, # In dht_stats_alert, if required. + 'allowed_upload_slots': 'ses.num_unchoke_slots', + # 'dht_global_nodes': None, + 'dht_node_cache': 'dht.dht_node_cache', + 'dht_nodes': 'dht.dht_nodes', + 'dht_torrents': 'dht.dht_torrents', + # 'dht_total_allocations': None, + 'down_bandwidth_bytes_queue': 'net.limiter_down_bytes', + 'down_bandwidth_queue': 'net.limiter_down_queue', + 'has_incoming_connections': 'net.has_incoming_connections', + 'num_peers': 'peer.num_peers_connected', + 'num_unchoked': 'peer.num_peers_up_unchoked', + # 'optimistic_unchoke_counter': None, # lt.settings_pack + 'total_dht_download': 'dht.dht_bytes_in', + 'total_dht_upload': 'dht.dht_bytes_out', + 'total_download': 'net.recv_bytes', + 'total_failed_bytes': 'net.recv_failed_bytes', + 'total_ip_overhead_download': 'net.recv_ip_overhead_bytes', + 'total_ip_overhead_upload': 'net.sent_ip_overhead_bytes', + 'total_payload_download': 'net.recv_payload_bytes', + 'total_payload_upload': 'net.sent_payload_bytes', + 'total_redundant_bytes': 'net.recv_redundant_bytes', + 'total_tracker_download': 'net.recv_tracker_bytes', + 'total_tracker_upload': 'net.sent_tracker_bytes', + 'total_upload': 'net.sent_bytes', + # 'unchoke_counter': None, # lt.settings_pack + 'up_bandwidth_bytes_queue': 'net.limiter_up_bytes', + 'up_bandwidth_queue': 'net.limiter_up_queue', + # 'utp_stats': None +} + +# Session status rate keys associated with session status counters. +SESSION_RATES_MAPPING = { + 'dht_download_rate': 'dht.dht_bytes_in', + 'dht_upload_rate': 'dht.dht_bytes_out', + 'ip_overhead_download_rate': 'net.recv_ip_overhead_bytes', + 'ip_overhead_upload_rate': 'net.sent_ip_overhead_bytes', + 'payload_download_rate': 'net.recv_payload_bytes', + 'payload_upload_rate': 'net.sent_payload_bytes', + 'tracker_download_rate': 'net.recv_tracker_bytes', + 'tracker_upload_rate': 'net.sent_tracker_bytes', + 'download_rate': 'net.recv_bytes', + 'upload_rate': 'net.sent_bytes', +} + +DELUGE_VER = deluge.common.get_version() + + +class Core(component.Component): + def __init__( + self, listen_interface=None, outgoing_interface=None, read_only_config_keys=None + ): + component.Component.__init__(self, 'Core') + + # Start the libtorrent session. + user_agent = f'Deluge/{DELUGE_VER} libtorrent/{LT_VERSION}' + peer_id = self._create_peer_id(DELUGE_VER) + log.debug('Starting session (peer_id: %s, user_agent: %s)', peer_id, user_agent) + settings_pack = { + 'peer_fingerprint': peer_id, + 'user_agent': user_agent, + 'ignore_resume_timestamps': True, + } + self.session = lt.session(settings_pack, flags=0) + + # Load the settings, if available. + self._load_session_state() + + # Enable libtorrent extensions + # Allows peers to download the metadata from the swarm directly + self.session.add_extension('ut_metadata') + # Ban peers that sends bad data + self.session.add_extension('smart_ban') + + # Create the components + self.eventmanager = EventManager() + self.preferencesmanager = PreferencesManager() + self.alertmanager = AlertManager() + self.pluginmanager = PluginManager(self) + self.torrentmanager = TorrentManager() + self.filtermanager = FilterManager(self) + self.authmanager = AuthManager() + + # New release check information + self.new_release = None + + # External IP Address from libtorrent + self.external_ip = None + self.eventmanager.register_event_handler( + 'ExternalIPEvent', self._on_external_ip_event + ) + + # GeoIP instance with db loaded + self.geoip_instance = None + + # These keys will be dropped from the set_config() RPC and are + # configurable from the command-line. + self.read_only_config_keys = read_only_config_keys + log.debug('read_only_config_keys: %s', read_only_config_keys) + + # Get the core config + self.config = ConfigManager('core.conf') + self.config.save() + + # If there was an interface value from the command line, use it, but + # store the one in the config so we can restore it on shutdown + self._old_listen_interface = None + if listen_interface: + if deluge.common.is_interface(listen_interface): + self._old_listen_interface = self.config['listen_interface'] + self.config['listen_interface'] = listen_interface + else: + log.error( + 'Invalid listen interface (must be IP Address or Interface Name): %s', + listen_interface, + ) + + self._old_outgoing_interface = None + if outgoing_interface: + if deluge.common.is_interface(outgoing_interface): + self._old_outgoing_interface = self.config['outgoing_interface'] + self.config['outgoing_interface'] = outgoing_interface + else: + log.error( + 'Invalid outgoing interface (must be IP Address or Interface Name): %s', + outgoing_interface, + ) + + # New release check information + self.__new_release = None + + # Session status timer + self.session_status = {k.name: 0 for k in lt.session_stats_metrics()} + self._session_prev_bytes = {k: 0 for k in SESSION_RATES_MAPPING} + # Initiate other session status keys. + self.session_status.update(self._session_prev_bytes) + hit_ratio_keys = ['write_hit_ratio', 'read_hit_ratio'] + self.session_status.update({k: 0.0 for k in hit_ratio_keys}) + + self.session_status_timer_interval = 0.5 + self.session_status_timer = task.LoopingCall(self.session.post_session_stats) + self.alertmanager.register_handler( + 'session_stats', self._on_alert_session_stats + ) + self.session_rates_timer_interval = 2 + self.session_rates_timer = task.LoopingCall(self._update_session_rates) + + def start(self): + """Starts the core""" + self.session_status_timer.start(self.session_status_timer_interval) + self.session_rates_timer.start(self.session_rates_timer_interval, now=False) + + def stop(self): + log.debug('Core stopping...') + + if self.session_status_timer.running: + self.session_status_timer.stop() + + if self.session_rates_timer.running: + self.session_rates_timer.stop() + + # Save the libtorrent session state + self._save_session_state() + + # We stored a copy of the old interface value + if self._old_listen_interface is not None: + self.config['listen_interface'] = self._old_listen_interface + + if self._old_outgoing_interface is not None: + self.config['outgoing_interface'] = self._old_outgoing_interface + + # Make sure the config file has been saved + self.config.save() + + def shutdown(self): + pass + + def apply_session_setting(self, key, value): + self.apply_session_settings({key: value}) + + def apply_session_settings(self, settings): + """Apply libtorrent session settings. + + Args: + settings: A dict of lt session settings to apply. + """ + self.session.apply_settings(settings) + + @staticmethod + def _create_peer_id(version: str) -> str: + """Create a peer_id fingerprint. + + This creates the peer_id and modifies the release char to identify + pre-release and development version. Using ``D`` for dev, daily or + nightly builds, ``a, b, r`` for pre-releases and ``s`` for + stable releases. + + Examples: + ``--<client><client><major><minor><micro><release>--`` + ``--DE200D--`` (development version of 2.0.0) + ``--DE200s--`` (stable release of v2.0.0) + ``--DE201b--`` (beta pre-release of v2.0.1) + + Args: + version: The version string in PEP440 dotted notation. + + Returns: + The formatted peer_id with Deluge prefix e.g. '--DE200s--' + """ + split = deluge.common.VersionSplit(version) + # Fill list with zeros to length of 4 and use lt to create fingerprint. + version_list = split.version + [0] * (4 - len(split.version)) + peer_id = lt.generate_fingerprint('DE', *version_list) + + def substitute_chr(string, idx, char): + """Fast substitute single char in string.""" + return string[:idx] + char + string[idx + 1 :] + + if split.dev: + release_chr = 'D' + elif split.suffix: + # a (alpha), b (beta) or r (release candidate). + release_chr = split.suffix[0].lower() + else: + release_chr = 's' + peer_id = substitute_chr(peer_id, 6, release_chr) + + return peer_id + + def _save_session_state(self): + """Saves the libtorrent session state""" + filename = 'session.state' + filepath = get_config_dir(filename) + filepath_bak = filepath + '.bak' + filepath_tmp = filepath + '.tmp' + + try: + if os.path.isfile(filepath): + log.debug('Creating backup of %s at: %s', filename, filepath_bak) + shutil.copy2(filepath, filepath_bak) + except OSError as ex: + log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex) + else: + log.info('Saving the %s at: %s', filename, filepath) + try: + with open(filepath_tmp, 'wb') as _file: + _file.write(lt.bencode(self.session.save_state())) + _file.flush() + os.fsync(_file.fileno()) + shutil.move(filepath_tmp, filepath) + except (OSError, EOFError) as ex: + log.error('Unable to save %s: %s', filename, ex) + if os.path.isfile(filepath_bak): + log.info('Restoring backup of %s from: %s', filename, filepath_bak) + shutil.move(filepath_bak, filepath) + + def _load_session_state(self) -> dict: + """Loads the libtorrent session state + + Returns: + A libtorrent sesion state, empty dict if unable to load it. + """ + filename = 'session.state' + filepath = get_config_dir(filename) + filepath_bak = filepath + '.bak' + + for _filepath in (filepath, filepath_bak): + log.debug('Opening %s for load: %s', filename, _filepath) + try: + with open(_filepath, 'rb') as _file: + state = lt.bdecode(_file.read()) + except (OSError, EOFError, RuntimeError) as ex: + log.warning('Unable to load %s: %s', _filepath, ex) + else: + log.info('Successfully loaded %s: %s', filename, _filepath) + self.session.load_state(state) + + def _on_alert_session_stats(self, alert): + """The handler for libtorrent session stats alert""" + self.session_status.update(alert.values) + self._update_session_cache_hit_ratio() + + def _update_session_cache_hit_ratio(self): + """Calculates the cache read/write hit ratios for session_status.""" + blocks_written = self.session_status['disk.num_blocks_written'] + blocks_read = self.session_status['disk.num_blocks_read'] + + if blocks_written: + self.session_status['write_hit_ratio'] = ( + blocks_written - self.session_status['disk.num_write_ops'] + ) / blocks_written + else: + self.session_status['write_hit_ratio'] = 0.0 + + if blocks_read: + self.session_status['read_hit_ratio'] = ( + blocks_read - self.session_status['disk.num_read_ops'] + ) / blocks_read + else: + self.session_status['read_hit_ratio'] = 0.0 + + def _update_session_rates(self): + """Calculate session status rates. + + Uses polling interval and counter difference for session_status rates. + """ + for rate_key, prev_bytes in list(self._session_prev_bytes.items()): + new_bytes = self.session_status[SESSION_RATES_MAPPING[rate_key]] + self.session_status[rate_key] = ( + new_bytes - prev_bytes + ) / self.session_rates_timer_interval + # Store current value for next update. + self._session_prev_bytes[rate_key] = new_bytes + + def get_new_release(self): + log.debug('get_new_release') + try: + self.new_release = ( + urlopen('http://download.deluge-torrent.org/version-2.0') + .read() + .decode() + .strip() + ) + except URLError as ex: + log.debug('Unable to get release info from website: %s', ex) + else: + self.check_new_release() + + def check_new_release(self): + if self.new_release: + log.debug('new_release: %s', self.new_release) + if deluge.common.VersionSplit( + self.new_release + ) > deluge.common.VersionSplit(deluge.common.get_version()): + component.get('EventManager').emit( + NewVersionAvailableEvent(self.new_release) + ) + return self.new_release + return False + + # Exported Methods + @export + def add_torrent_file_async( + self, filename: str, filedump: str, options: dict, save_state: bool = True + ) -> 'defer.Deferred[Optional[str]]': + """Adds a torrent file to the session asynchronously. + + Args: + filename: The filename of the torrent. + filedump: A base64 encoded string of torrent file contents. + options: The options to apply to the torrent upon adding. + save_state: If the state should be saved after adding the file. + + Returns: + The torrent ID or None. + """ + try: + filedump = b64decode(filedump) + except TypeError as ex: + log.error('There was an error decoding the filedump string: %s', ex) + + try: + d = self.torrentmanager.add_async( + filedump=filedump, + options=options, + filename=filename, + save_state=save_state, + ) + except RuntimeError as ex: + log.error('There was an error adding the torrent file %s: %s', filename, ex) + raise + else: + return d + + @export + @maybe_coroutine + async def prefetch_magnet_metadata( + self, magnet: str, timeout: int = 30 + ) -> Tuple[str, bytes]: + """Download magnet metadata without adding to Deluge session. + + Used by UIs to get magnet files for selection before adding to session. + + The metadata is bencoded and for transfer base64 encoded. + + Args: + magnet: The magnet URI. + timeout: Number of seconds to wait before canceling request. + + Returns: + A tuple of (torrent_id, metadata) for the magnet. + + """ + return await self.torrentmanager.prefetch_metadata(magnet, timeout) + + @export + def add_torrent_file( + self, filename: str, filedump: Union[str, bytes], options: dict + ) -> Optional[str]: + """Adds a torrent file to the session. + + Args: + filename: The filename of the torrent. + filedump: A base64 encoded string of the torrent file contents. + options: The options to apply to the torrent upon adding. + + Returns: + The torrent_id or None. + """ + try: + filedump = b64decode(filedump) + except Exception as ex: + log.error('There was an error decoding the filedump string: %s', ex) + + try: + return self.torrentmanager.add( + filedump=filedump, options=options, filename=filename + ) + except RuntimeError as ex: + log.error('There was an error adding the torrent file %s: %s', filename, ex) + raise + + @export + def add_torrent_files( + self, torrent_files: List[Tuple[str, Union[str, bytes], dict]] + ) -> 'defer.Deferred[List[AddTorrentError]]': + """Adds multiple torrent files to the session asynchronously. + + Args: + torrent_files: Torrent files as tuple of + ``(filename, filedump, options)``. + + Returns: + A list of errors (if there were any) + """ + + @maybe_coroutine + async def add_torrents(): + errors = [] + last_index = len(torrent_files) - 1 + for idx, torrent in enumerate(torrent_files): + try: + await self.add_torrent_file_async( + torrent[0], torrent[1], torrent[2], save_state=idx == last_index + ) + except AddTorrentError as ex: + log.warning('Error when adding torrent: %s', ex) + errors.append(ex) + defer.returnValue(errors) + + return task.deferLater(reactor, 0, add_torrents) + + @export + @maybe_coroutine + async def add_torrent_url( + self, url: str, options: dict, headers: dict = None + ) -> 'defer.Deferred[Optional[str]]': + """Adds a torrent from a URL. Deluge will attempt to fetch the torrent + from the URL prior to adding it to the session. + + Args: + url: the URL pointing to the torrent file + options: the options to apply to the torrent on add + headers: any optional headers to send + + Returns: + a Deferred which returns the torrent_id as a str or None + """ + log.info('Attempting to add URL %s', url) + + tmp_fd, tmp_file = tempfile.mkstemp(prefix='deluge_url.', suffix='.torrent') + try: + filename = await download_file( + url, tmp_file, headers=headers, force_filename=True + ) + except Exception: + log.error('Failed to add torrent from URL %s', url) + raise + else: + with open(filename, 'rb') as _file: + data = _file.read() + return self.add_torrent_file(filename, b64encode(data), options) + finally: + try: + os.close(tmp_fd) + os.remove(tmp_file) + except OSError as ex: + log.warning(f'Unable to delete temp file {tmp_file}: , {ex}') + + @export + def add_torrent_magnet(self, uri: str, options: dict) -> str: + """Adds a torrent from a magnet link. + + Args: + uri: the magnet link + options: the options to apply to the torrent on add + + Returns: + the torrent_id + """ + log.debug('Attempting to add by magnet URI: %s', uri) + + return self.torrentmanager.add(magnet=uri, options=options) + + @export + def remove_torrent(self, torrent_id: str, remove_data: bool) -> bool: + """Removes a single torrent from the session. + + Args: + torrent_id: The torrent ID to remove. + remove_data: If True, also remove the downloaded data. + + Returns: + True if removed successfully. + + Raises: + InvalidTorrentError: If the torrent ID does not exist in the session. + """ + log.debug('Removing torrent %s from the core.', torrent_id) + return self.torrentmanager.remove(torrent_id, remove_data) + + @export + def remove_torrents( + self, torrent_ids: List[str], remove_data: bool + ) -> 'defer.Deferred[List[Tuple[str, str]]]': + """Remove multiple torrents from the session. + + Args: + torrent_ids: The torrent IDs to remove. + remove_data: If True, also remove the downloaded data. + + Returns: + An empty list if no errors occurred otherwise the list contains + tuples of strings, a torrent ID and an error message. For example: + + [('<torrent_id>', 'Error removing torrent')] + """ + log.info('Removing %d torrents from core.', len(torrent_ids)) + + def do_remove_torrents(): + errors = [] + for torrent_id in torrent_ids: + try: + self.torrentmanager.remove( + torrent_id, remove_data=remove_data, save_state=False + ) + except InvalidTorrentError as ex: + errors.append((torrent_id, str(ex))) + # Save the session state + self.torrentmanager.save_state() + if errors: + log.warning( + 'Failed to remove %d of %d torrents.', len(errors), len(torrent_ids) + ) + return errors + + return task.deferLater(reactor, 0, do_remove_torrents) + + @export + def get_session_status(self, keys: List[str]) -> Dict[str, Union[int, float]]: + """Gets the session status values for 'keys', these keys are taking + from libtorrent's session status. + + See: http://www.rasterbar.com/products/libtorrent/manual.html#status + + Args: + keys: the keys for which we want values + + Returns: + a dictionary of {key: value, ...} + """ + if not keys: + return self.session_status + + status = {} + for key in keys: + try: + status[key] = self.session_status[key] + except KeyError: + if key in DEPR_SESSION_STATUS_KEYS: + new_key = DEPR_SESSION_STATUS_KEYS[key] + log.debug( + 'Deprecated session status key %s, please use %s', key, new_key + ) + status[key] = self.session_status[new_key] + else: + log.debug('Session status key not valid: %s', key) + return status + + @export + def force_reannounce(self, torrent_ids: List[str]) -> None: + log.debug('Forcing reannouncment to: %s', torrent_ids) + for torrent_id in torrent_ids: + self.torrentmanager[torrent_id].force_reannounce() + + @export + def pause_torrent(self, torrent_id: str) -> None: + """Pauses a torrent""" + log.debug('Pausing: %s', torrent_id) + if not isinstance(torrent_id, str): + self.pause_torrents(torrent_id) + else: + self.torrentmanager[torrent_id].pause() + + @export + def pause_torrents(self, torrent_ids: List[str] = None) -> None: + """Pauses a list of torrents""" + if not torrent_ids: + torrent_ids = self.torrentmanager.get_torrent_list() + for torrent_id in torrent_ids: + self.pause_torrent(torrent_id) + + @export + def connect_peer(self, torrent_id: str, ip: str, port: int): + log.debug('adding peer %s to %s', ip, torrent_id) + if not self.torrentmanager[torrent_id].connect_peer(ip, port): + log.warning('Error adding peer %s:%s to %s', ip, port, torrent_id) + + @export + def move_storage(self, torrent_ids: List[str], dest: str): + log.debug('Moving storage %s to %s', torrent_ids, dest) + for torrent_id in torrent_ids: + if not self.torrentmanager[torrent_id].move_storage(dest): + log.warning('Error moving torrent %s to %s', torrent_id, dest) + + @export + def pause_session(self) -> None: + """Pause the entire session""" + if not self.session.is_paused(): + self.session.pause() + component.get('EventManager').emit(SessionPausedEvent()) + + @export + def resume_session(self) -> None: + """Resume the entire session""" + if self.session.is_paused(): + self.session.resume() + for torrent_id in self.torrentmanager.torrents: + self.torrentmanager[torrent_id].update_state() + component.get('EventManager').emit(SessionResumedEvent()) + + @export + def is_session_paused(self) -> bool: + """Returns the activity of the session""" + return self.session.is_paused() + + @export + def resume_torrent(self, torrent_id: str) -> None: + """Resumes a torrent""" + log.debug('Resuming: %s', torrent_id) + if not isinstance(torrent_id, str): + self.resume_torrents(torrent_id) + else: + self.torrentmanager[torrent_id].resume() + + @export + def resume_torrents(self, torrent_ids: List[str] = None) -> None: + """Resumes a list of torrents""" + if not torrent_ids: + torrent_ids = self.torrentmanager.get_torrent_list() + for torrent_id in torrent_ids: + self.resume_torrent(torrent_id) + + def create_torrent_status( + self, + torrent_id, + torrent_keys, + plugin_keys, + diff=False, + update=False, + all_keys=False, + ): + try: + status = self.torrentmanager[torrent_id].get_status( + torrent_keys, diff, update=update, all_keys=all_keys + ) + except KeyError: + import traceback + + traceback.print_exc() + # Torrent was probably removed meanwhile + return {} + + # Ask the plugin manager to fill in the plugin keys + if len(plugin_keys) > 0 or all_keys: + status.update(self.pluginmanager.get_status(torrent_id, plugin_keys)) + return status + + @export + def get_torrent_status( + self, torrent_id: str, keys: List[str], diff: bool = False + ) -> dict: + torrent_keys, plugin_keys = self.torrentmanager.separate_keys( + keys, [torrent_id] + ) + return self.create_torrent_status( + torrent_id, + torrent_keys, + plugin_keys, + diff=diff, + update=True, + all_keys=not keys, + ) + + @export + @maybe_coroutine + async def get_torrents_status( + self, filter_dict: dict, keys: List[str], diff: bool = False + ) -> dict: + """returns all torrents , optionally filtered by filter_dict.""" + all_keys = not keys + torrent_ids = self.filtermanager.filter_torrent_ids(filter_dict) + status_dict, plugin_keys = await self.torrentmanager.torrents_status_update( + torrent_ids, keys, diff=diff + ) + # Ask the plugin manager to fill in the plugin keys + if len(plugin_keys) > 0 or all_keys: + for key in status_dict: + status_dict[key].update(self.pluginmanager.get_status(key, plugin_keys)) + return status_dict + + @export + def get_filter_tree( + self, show_zero_hits: bool = True, hide_cat: List[str] = None + ) -> Dict: + """returns {field: [(value,count)] } + for use in sidebar(s) + """ + return self.filtermanager.get_filter_tree(show_zero_hits, hide_cat) + + @export + def get_session_state(self) -> List[str]: + """Returns a list of torrent_ids in the session.""" + # Get the torrent list from the TorrentManager + return self.torrentmanager.get_torrent_list() + + @export + def get_config(self) -> dict: + """Get all the preferences as a dictionary""" + return self.config.config + + @export + def get_config_value(self, key: str) -> Any: + """Get the config value for key""" + return self.config.get(key) + + @export + def get_config_values(self, keys: List[str]) -> Dict[str, Any]: + """Get the config values for the entered keys""" + return {key: self.config.get(key) for key in keys} + + @export + def set_config(self, config: Dict[str, Any]): + """Set the config with values from dictionary""" + # Load all the values into the configuration + for key in config: + if self.read_only_config_keys and key in self.read_only_config_keys: + continue + self.config[key] = config[key] + + @export + def get_listen_port(self) -> int: + """Returns the active listen port""" + return self.session.listen_port() + + @export + def get_proxy(self) -> Dict[str, Any]: + """Returns the proxy settings + + Returns: + Proxy settings. + + Notes: + Proxy type names: + 0: None, 1: Socks4, 2: Socks5, 3: Socks5 w Auth, 4: HTTP, 5: HTTP w Auth, 6: I2P + """ + + settings = self.session.get_settings() + proxy_type = settings['proxy_type'] + proxy_hostname = ( + settings['i2p_hostname'] if proxy_type == 6 else settings['proxy_hostname'] + ) + proxy_port = settings['i2p_port'] if proxy_type == 6 else settings['proxy_port'] + proxy_dict = { + 'type': proxy_type, + 'hostname': proxy_hostname, + 'username': settings['proxy_username'], + 'password': settings['proxy_password'], + 'port': proxy_port, + 'proxy_hostnames': settings['proxy_hostnames'], + 'proxy_peer_connections': settings['proxy_peer_connections'], + 'proxy_tracker_connections': settings['proxy_tracker_connections'], + } + + return proxy_dict + + @export + def get_available_plugins(self) -> List[str]: + """Returns a list of plugins available in the core""" + return self.pluginmanager.get_available_plugins() + + @export + def get_enabled_plugins(self) -> List[str]: + """Returns a list of enabled plugins in the core""" + return self.pluginmanager.get_enabled_plugins() + + @export + def enable_plugin(self, plugin: str) -> 'defer.Deferred[bool]': + return self.pluginmanager.enable_plugin(plugin) + + @export + def disable_plugin(self, plugin: str) -> 'defer.Deferred[bool]': + return self.pluginmanager.disable_plugin(plugin) + + @export + def force_recheck(self, torrent_ids: List[str]) -> None: + """Forces a data recheck on torrent_ids""" + for torrent_id in torrent_ids: + self.torrentmanager[torrent_id].force_recheck() + + @export + def set_torrent_options( + self, torrent_ids: List[str], options: Dict[str, Any] + ) -> None: + """Sets the torrent options for torrent_ids + + Args: + torrent_ids: A list of torrent_ids to set the options for. + options: A dict of torrent options to set. See + ``torrent.TorrentOptions`` class for valid keys. + """ + if 'owner' in options and not self.authmanager.has_account(options['owner']): + raise DelugeError('Username "%s" is not known.' % options['owner']) + + if isinstance(torrent_ids, str): + torrent_ids = [torrent_ids] + + for torrent_id in torrent_ids: + self.torrentmanager[torrent_id].set_options(options) + + @export + def set_torrent_trackers( + self, torrent_id: str, trackers: List[Dict[str, Any]] + ) -> None: + """Sets a torrents tracker list. trackers will be ``[{"url", "tier"}]``""" + return self.torrentmanager[torrent_id].set_trackers(trackers) + + @export + def get_magnet_uri(self, torrent_id: str) -> str: + return self.torrentmanager[torrent_id].get_magnet_uri() + + @deprecated + @export + def set_torrent_max_connections(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'max_connections'""" + self.set_torrent_options([torrent_id], {'max_connections': value}) + + @deprecated + @export + def set_torrent_max_upload_slots(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'max_upload_slots'""" + self.set_torrent_options([torrent_id], {'max_upload_slots': value}) + + @deprecated + @export + def set_torrent_max_upload_speed(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'max_upload_speed'""" + self.set_torrent_options([torrent_id], {'max_upload_speed': value}) + + @deprecated + @export + def set_torrent_max_download_speed(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'max_download_speed'""" + self.set_torrent_options([torrent_id], {'max_download_speed': value}) + + @deprecated + @export + def set_torrent_file_priorities(self, torrent_id, priorities): + """Deprecated: Use set_torrent_options with 'file_priorities'""" + self.set_torrent_options([torrent_id], {'file_priorities': priorities}) + + @deprecated + @export + def set_torrent_prioritize_first_last(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'prioritize_first_last'""" + self.set_torrent_options([torrent_id], {'prioritize_first_last_pieces': value}) + + @deprecated + @export + def set_torrent_auto_managed(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'auto_managed'""" + self.set_torrent_options([torrent_id], {'auto_managed': value}) + + @deprecated + @export + def set_torrent_stop_at_ratio(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'stop_at_ratio'""" + self.set_torrent_options([torrent_id], {'stop_at_ratio': value}) + + @deprecated + @export + def set_torrent_stop_ratio(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'stop_ratio'""" + self.set_torrent_options([torrent_id], {'stop_ratio': value}) + + @deprecated + @export + def set_torrent_remove_at_ratio(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'remove_at_ratio'""" + self.set_torrent_options([torrent_id], {'remove_at_ratio': value}) + + @deprecated + @export + def set_torrent_move_completed(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'move_completed'""" + self.set_torrent_options([torrent_id], {'move_completed': value}) + + @deprecated + @export + def set_torrent_move_completed_path(self, torrent_id, value): + """Deprecated: Use set_torrent_options with 'move_completed_path'""" + self.set_torrent_options([torrent_id], {'move_completed_path': value}) + + @export + def get_path_size(self, path): + """Returns the size of the file or folder 'path' and -1 if the path is + inaccessible (non-existent or insufficient privileges)""" + return deluge.common.get_path_size(path) + + @export + def create_torrent( + self, + path, + tracker, + piece_length, + comment=None, + target=None, + webseeds=None, + private=False, + created_by=None, + trackers=None, + add_to_session=False, + torrent_format=metafile.TorrentFormat.V1, + ): + if isinstance(torrent_format, str): + torrent_format = metafile.TorrentFormat(torrent_format) + + log.debug('creating torrent..') + return threads.deferToThread( + self._create_torrent_thread, + path, + tracker, + piece_length, + comment=comment, + target=target, + webseeds=webseeds, + private=private, + created_by=created_by, + trackers=trackers, + add_to_session=add_to_session, + torrent_format=torrent_format, + ) + + def _create_torrent_thread( + self, + path, + tracker, + piece_length, + comment, + target, + webseeds, + private, + created_by, + trackers, + add_to_session, + torrent_format, + ): + from deluge import metafile + + filecontent = metafile.make_meta_file_content( + path, + tracker, + piece_length, + comment=comment, + webseeds=webseeds, + private=private, + created_by=created_by, + trackers=trackers, + torrent_format=torrent_format, + ) + + write_file = False + if target or not add_to_session: + write_file = True + + if not target: + target = metafile.default_meta_file_path(path) + filename = os.path.split(target)[-1] + + if write_file: + with open(target, 'wb') as _file: + _file.write(filecontent) + + filedump = b64encode(filecontent) + log.debug('torrent created!') + if add_to_session: + options = {} + options['download_location'] = os.path.split(path)[0] + self.add_torrent_file(filename, filedump, options) + return filename, filedump + + @export + def upload_plugin(self, filename: str, filedump: Union[str, bytes]) -> None: + """This method is used to upload new plugins to the daemon. It is used + when connecting to the daemon remotely and installing a new plugin on + the client side. ``plugin_data`` is a ``xmlrpc.Binary`` object of the file data, + i.e. ``plugin_file.read()``""" + + try: + filedump = b64decode(filedump) + except TypeError as ex: + log.error('There was an error decoding the filedump string!') + log.exception(ex) + return + + with open(os.path.join(get_config_dir(), 'plugins', filename), 'wb') as _file: + _file.write(filedump) + component.get('CorePluginManager').scan_for_plugins() + + @export + def rescan_plugins(self) -> None: + """Re-scans the plugin folders for new plugins""" + component.get('CorePluginManager').scan_for_plugins() + + @export + def rename_files( + self, torrent_id: str, filenames: List[Tuple[int, str]] + ) -> defer.Deferred: + """Rename files in ``torrent_id``. Since this is an asynchronous operation by + libtorrent, watch for the TorrentFileRenamedEvent to know when the + files have been renamed. + + Args: + torrent_id: the torrent_id to rename files + filenames: a list of index, filename pairs + + Raises: + InvalidTorrentError: if torrent_id is invalid + """ + if torrent_id not in self.torrentmanager.torrents: + raise InvalidTorrentError('torrent_id is not in session') + + def rename(): + self.torrentmanager[torrent_id].rename_files(filenames) + + return task.deferLater(reactor, 0, rename) + + @export + def rename_folder( + self, torrent_id: str, folder: str, new_folder: str + ) -> defer.Deferred: + """Renames the 'folder' to 'new_folder' in 'torrent_id'. Watch for the + TorrentFolderRenamedEvent which is emitted when the folder has been + renamed successfully. + + Args: + torrent_id: the torrent to rename folder in + folder: the folder to rename + new_folder: the new folder name + + Raises: + InvalidTorrentError: if the torrent_id is invalid + """ + if torrent_id not in self.torrentmanager.torrents: + raise InvalidTorrentError('torrent_id is not in session') + + return self.torrentmanager[torrent_id].rename_folder(folder, new_folder) + + @export + def queue_top(self, torrent_ids: List[str]) -> None: + log.debug('Attempting to queue %s to top', torrent_ids) + # torrent_ids must be sorted in reverse before moving to preserve order + for torrent_id in sorted( + torrent_ids, key=self.torrentmanager.get_queue_position, reverse=True + ): + try: + # If the queue method returns True, then we should emit a signal + if self.torrentmanager.queue_top(torrent_id): + component.get('EventManager').emit(TorrentQueueChangedEvent()) + except KeyError: + log.warning('torrent_id: %s does not exist in the queue', torrent_id) + + @export + def queue_up(self, torrent_ids: List[str]) -> None: + log.debug('Attempting to queue %s to up', torrent_ids) + torrents = ( + (self.torrentmanager.get_queue_position(torrent_id), torrent_id) + for torrent_id in torrent_ids + ) + torrent_moved = True + prev_queue_position = None + # torrent_ids must be sorted before moving. + for queue_position, torrent_id in sorted(torrents): + # Move the torrent if and only if there is space (by not moving it we preserve the order) + if torrent_moved or queue_position - prev_queue_position > 1: + try: + torrent_moved = self.torrentmanager.queue_up(torrent_id) + except KeyError: + log.warning( + 'torrent_id: %s does not exist in the queue', torrent_id + ) + # If the torrent moved, then we should emit a signal + if torrent_moved: + component.get('EventManager').emit(TorrentQueueChangedEvent()) + else: + prev_queue_position = queue_position + + @export + def queue_down(self, torrent_ids: List[str]) -> None: + log.debug('Attempting to queue %s to down', torrent_ids) + torrents = ( + (self.torrentmanager.get_queue_position(torrent_id), torrent_id) + for torrent_id in torrent_ids + ) + torrent_moved = True + prev_queue_position = None + # torrent_ids must be sorted before moving. + for queue_position, torrent_id in sorted(torrents, reverse=True): + # Move the torrent if and only if there is space (by not moving it we preserve the order) + if torrent_moved or prev_queue_position - queue_position > 1: + try: + torrent_moved = self.torrentmanager.queue_down(torrent_id) + except KeyError: + log.warning( + 'torrent_id: %s does not exist in the queue', torrent_id + ) + # If the torrent moved, then we should emit a signal + if torrent_moved: + component.get('EventManager').emit(TorrentQueueChangedEvent()) + else: + prev_queue_position = queue_position + + @export + def queue_bottom(self, torrent_ids: List[str]) -> None: + log.debug('Attempting to queue %s to bottom', torrent_ids) + # torrent_ids must be sorted before moving to preserve order + for torrent_id in sorted( + torrent_ids, key=self.torrentmanager.get_queue_position + ): + try: + # If the queue method returns True, then we should emit a signal + if self.torrentmanager.queue_bottom(torrent_id): + component.get('EventManager').emit(TorrentQueueChangedEvent()) + except KeyError: + log.warning('torrent_id: %s does not exist in the queue', torrent_id) + + @export + def glob(self, path: str) -> List[str]: + return glob.glob(path) + + @export + def test_listen_port(self) -> 'defer.Deferred[Optional[bool]]': + """Checks if the active port is open + + Returns: + True if the port is open, False if not + """ + port = self.get_listen_port() + url = 'https://deluge-torrent.org/test_port.php?port=%s' % port + agent = Agent(reactor, connectTimeout=30) + d = agent.request(b'GET', url.encode()) + + def on_get_page(body): + return bool(int(body)) + + def on_error(failure): + log.warning('Error testing listen port: %s', failure) + + d.addCallback(readBody).addCallback(on_get_page) + d.addErrback(on_error) + + return d + + @export + def get_free_space(self, path: str = None) -> int: + """Returns the number of free bytes at path + + Args: + path: the path to check free space at, if None, use the default download location + + Returns: + the number of free bytes at path + + Raises: + InvalidPathError: if the path is invalid + """ + if not path: + path = self.config['download_location'] + try: + return deluge.common.free_space(path) + except InvalidPathError: + return -1 + + def _on_external_ip_event(self, external_ip): + self.external_ip = external_ip + + @export + def get_external_ip(self) -> str: + """Returns the external IP address received from libtorrent.""" + return self.external_ip + + @export + def get_libtorrent_version(self) -> str: + """Returns the libtorrent version. + + Returns: + the version + """ + return LT_VERSION + + @export + def get_completion_paths(self, args: Dict[str, Any]) -> Dict[str, Any]: + """Returns the available path completions for the input value.""" + return path_chooser_common.get_completion_paths(args) + + @export(AUTH_LEVEL_ADMIN) + def get_known_accounts(self) -> List[Dict[str, Any]]: + return self.authmanager.get_known_accounts() + + @export(AUTH_LEVEL_NONE) + def get_auth_levels_mappings(self) -> Tuple[Dict[str, int], Dict[int, str]]: + return (AUTH_LEVELS_MAPPING, AUTH_LEVELS_MAPPING_REVERSE) + + @export(AUTH_LEVEL_ADMIN) + def create_account(self, username: str, password: str, authlevel: str) -> bool: + return self.authmanager.create_account(username, password, authlevel) + + @export(AUTH_LEVEL_ADMIN) + def update_account(self, username: str, password: str, authlevel: str) -> bool: + return self.authmanager.update_account(username, password, authlevel) + + @export(AUTH_LEVEL_ADMIN) + def remove_account(self, username: str) -> bool: + return self.authmanager.remove_account(username) diff --git a/deluge/core/daemon.py b/deluge/core/daemon.py new file mode 100644 index 0000000..0185dd8 --- /dev/null +++ b/deluge/core/daemon.py @@ -0,0 +1,203 @@ +# +# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +"""The Deluge daemon""" +import logging +import os +import socket + +from twisted.internet import reactor + +import deluge.component as component +from deluge.common import get_version, is_ip, is_process_running, windows_check +from deluge.configmanager import get_config_dir +from deluge.core.core import Core +from deluge.core.rpcserver import RPCServer, export +from deluge.error import DaemonRunningError + +if windows_check(): + from win32api import SetConsoleCtrlHandler + from win32con import CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT + +log = logging.getLogger(__name__) + + +def is_daemon_running(pid_file): + """ + Check for another running instance of the daemon using the same pid file. + + Args: + pid_file: The location of the file with pid, port values. + + Returns: + bool: True is daemon is running, False otherwise. + + """ + + try: + with open(pid_file) as _file: + pid, port = (int(x) for x in _file.readline().strip().split(';')) + except (OSError, ValueError): + return False + + if is_process_running(pid): + # Ensure it's a deluged process by trying to open a socket to it's port. + _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + _socket.connect(('127.0.0.1', port)) + except OSError: + # Can't connect, so pid is not a deluged process. + return False + else: + # This is a deluged process! + _socket.close() + return True + + +class Daemon: + """The Deluge Daemon class""" + + def __init__( + self, + listen_interface=None, + outgoing_interface=None, + interface=None, + port=None, + standalone=False, + read_only_config_keys=None, + ): + """ + Args: + listen_interface (str, optional): The IP address to listen to + BitTorrent connections on. + outgoing_interface (str, optional): The network interface name or + IP address to open outgoing BitTorrent connections on. + interface (str, optional): The IP address the daemon will + listen for UI connections on. + port (int, optional): The port the daemon will listen for UI + connections on. + standalone (bool, optional): If True the client is in Standalone + mode otherwise, if False, start the daemon as separate process. + read_only_config_keys (list of str, optional): A list of config + keys that will not be altered by core.set_config() RPC method. + """ + self.standalone = standalone + self.pid_file = get_config_dir('deluged.pid') + log.info('Deluge daemon %s', get_version()) + if is_daemon_running(self.pid_file): + raise DaemonRunningError( + 'Deluge daemon already running with this config directory!' + ) + + # Twisted catches signals to terminate, so just have it call the shutdown method. + reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) + + # Catch some Windows specific signals + if windows_check(): + + def win_handler(ctrl_type): + """Handle the Windows shutdown or close events.""" + log.debug('windows handler ctrl_type: %s', ctrl_type) + if ctrl_type == CTRL_CLOSE_EVENT or ctrl_type == CTRL_SHUTDOWN_EVENT: + self._shutdown() + return 1 + + SetConsoleCtrlHandler(win_handler) + + # Start the core as a thread and join it until it's done + self.core = Core( + listen_interface=listen_interface, + outgoing_interface=outgoing_interface, + read_only_config_keys=read_only_config_keys, + ) + + if port is None: + port = self.core.config['daemon_port'] + self.port = port + + if interface and not is_ip(interface): + log.error('Invalid UI interface (must be IP Address): %s', interface) + interface = None + + self.rpcserver = RPCServer( + port=port, + allow_remote=self.core.config['allow_remote'], + listen=not standalone, + interface=interface, + ) + + log.debug( + 'Listening to UI on: %s:%s and bittorrent on: %s Making connections out on: %s', + interface, + port, + listen_interface, + outgoing_interface, + ) + + def start(self): + # Register the daemon and the core RPCs + self.rpcserver.register_object(self.core) + self.rpcserver.register_object(self) + + # Make sure we start the PreferencesManager first + component.start('PreferencesManager') + + if not self.standalone: + log.info('Deluge daemon starting...') + # Create pid file to track if deluged is running, also includes the port number. + pid = os.getpid() + log.debug('Storing pid %s & port %s in: %s', pid, self.port, self.pid_file) + with open(self.pid_file, 'w') as _file: + _file.write(f'{pid};{self.port}\n') + + component.start() + + try: + reactor.run() + finally: + log.debug('Remove pid file: %s', self.pid_file) + os.remove(self.pid_file) + log.info('Deluge daemon shutdown successfully') + + @export() + def shutdown(self, *args, **kwargs): + log.debug('Deluge daemon shutdown requested...') + reactor.callLater(0, reactor.stop) + + def _shutdown(self, *args, **kwargs): + log.info('Deluge daemon shutting down, waiting for components to shutdown...') + if not self.standalone: + return component.shutdown() + + @export() + def get_method_list(self): + """Returns a list of the exported methods.""" + return self.rpcserver.get_method_list() + + @export() + def get_version(self): + """Returns the daemon version""" + return get_version() + + @export(1) + def authorized_call(self, rpc): + """Determines if session auth_level is authorized to call RPC. + + Args: + rpc (str): A RPC, e.g. core.get_torrents_status + + Returns: + bool: True if authorized to call RPC, otherwise False. + """ + if rpc not in self.get_method_list(): + return False + + return ( + self.rpcserver.get_session_auth_level() + >= self.rpcserver.get_rpc_auth_level(rpc) + ) diff --git a/deluge/core/daemon_entry.py b/deluge/core/daemon_entry.py new file mode 100644 index 0000000..c49fd2a --- /dev/null +++ b/deluge/core/daemon_entry.py @@ -0,0 +1,140 @@ +# +# Copyright (C) 2007 Andrew Resch <andrewresch@gmail.com> +# Copyright (C) 2010 Pedro Algarvio <pedro@algarvio.me> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import os +import sys +from logging import DEBUG, FileHandler, getLogger + +from twisted.internet.error import CannotListenError + +from deluge.argparserbase import ArgParserBase +from deluge.common import run_profiled +from deluge.configmanager import get_config_dir +from deluge.i18n import setup_mock_translation + + +def add_daemon_options(parser): + group = parser.add_argument_group(_('Daemon Options')) + group.add_argument( + '-u', + '--ui-interface', + metavar='<ip-addr>', + action='store', + help=_('IP address to listen for UI connections'), + ) + group.add_argument( + '-p', + '--port', + metavar='<port>', + action='store', + type=int, + help=_('Port to listen for UI connections on'), + ) + group.add_argument( + '-i', + '--interface', + metavar='<ip-addr>', + dest='listen_interface', + action='store', + help=_('IP address to listen for BitTorrent connections'), + ) + group.add_argument( + '-o', + '--outgoing-interface', + metavar='<interface>', + dest='outgoing_interface', + action='store', + help=_( + 'The network interface name or IP address for outgoing BitTorrent connections.' + ), + ) + group.add_argument( + '--read-only-config-keys', + metavar='<comma-separated-keys>', + action='store', + help=_('Config keys to be unmodified by `set_config` RPC'), + type=str, + default='', + ) + parser.add_process_arg_group() + + +def start_daemon(skip_start=False): + """ + Entry point for daemon script + + Args: + skip_start (bool): If starting daemon should be skipped. + + Returns: + deluge.core.daemon.Daemon: A new daemon object + + """ + setup_mock_translation() + + # Setup the argument parser + parser = ArgParserBase() + add_daemon_options(parser) + + options = parser.parse_args() + + # Check for any daemons running with this same config + from deluge.core.daemon import is_daemon_running + + pid_file = get_config_dir('deluged.pid') + if is_daemon_running(pid_file): + print( + 'Cannot run multiple daemons with same config directory.\n' + 'If you believe this is an error, force starting by deleting: %s' % pid_file + ) + sys.exit(1) + + log = getLogger(__name__) + + # If no logfile specified add logging to default location (as well as stdout) + if not options.logfile: + options.logfile = get_config_dir('deluged.log') + file_handler = FileHandler(options.logfile) + log.addHandler(file_handler) + + def run_daemon(options): + try: + from deluge.core.daemon import Daemon + + daemon = Daemon( + listen_interface=options.listen_interface, + outgoing_interface=options.outgoing_interface, + interface=options.ui_interface, + port=options.port, + read_only_config_keys=options.read_only_config_keys.split(','), + ) + if skip_start: + return daemon + else: + daemon.start() + except CannotListenError as ex: + log.error( + 'Cannot start deluged, listen port in use.\n' + ' Check for other running daemons or services using this port: %s:%s', + ex.interface, + ex.port, + ) + sys.exit(1) + except Exception as ex: + log.error('Unable to start deluged: %s', ex) + if log.isEnabledFor(DEBUG): + log.exception(ex) + sys.exit(1) + finally: + log.info('Exiting...') + if options.pidfile: + os.remove(options.pidfile) + + return run_profiled( + run_daemon, options, output_file=options.profile, do_profile=options.profile + ) diff --git a/deluge/core/eventmanager.py b/deluge/core/eventmanager.py new file mode 100644 index 0000000..d43847a --- /dev/null +++ b/deluge/core/eventmanager.py @@ -0,0 +1,66 @@ +# +# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import logging + +import deluge.component as component + +log = logging.getLogger(__name__) + + +class EventManager(component.Component): + def __init__(self): + component.Component.__init__(self, 'EventManager') + self.handlers = {} + + def emit(self, event): + """ + Emits the event to interested clients. + + :param event: DelugeEvent + """ + # Emit the event to the interested clients + component.get('RPCServer').emit_event(event) + # Call any handlers for the event + if event.name in self.handlers: + for handler in self.handlers[event.name]: + # log.debug('Running handler %s for event %s with args: %s', event.name, handler, event.args) + try: + handler(*event.args) + except Exception as ex: + log.error( + 'Event handler %s failed in %s with exception %s', + event.name, + handler, + ex, + ) + + def register_event_handler(self, event, handler): + """ + Registers a function to be called when a `:param:event` is emitted. + + :param event: str, the event name + :param handler: function, to be called when `:param:event` is emitted + + """ + if event not in self.handlers: + self.handlers[event] = [] + + if handler not in self.handlers[event]: + self.handlers[event].append(handler) + + def deregister_event_handler(self, event, handler): + """ + Deregisters an event handler function. + + :param event: str, the event name + :param handler: function, currently registered to handle `:param:event` + + """ + if event in self.handlers and handler in self.handlers[event]: + self.handlers[event].remove(handler) diff --git a/deluge/core/filtermanager.py b/deluge/core/filtermanager.py new file mode 100644 index 0000000..a60cc5b --- /dev/null +++ b/deluge/core/filtermanager.py @@ -0,0 +1,274 @@ +# +# Copyright (C) 2008 Martijn Voncken <mvoncken@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +import logging + +import deluge.component as component +from deluge.common import TORRENT_STATE + +log = logging.getLogger(__name__) + +STATE_SORT = ['All', 'Active'] + TORRENT_STATE + + +# Special purpose filters: +def filter_keywords(torrent_ids, values): + # Cleanup + keywords = ','.join([v.lower() for v in values]) + keywords = keywords.split(',') + + for keyword in keywords: + torrent_ids = filter_one_keyword(torrent_ids, keyword) + return torrent_ids + + +def filter_one_keyword(torrent_ids, keyword): + """ + search torrent on keyword. + searches title,state,tracker-status,tracker,files + """ + all_torrents = component.get('TorrentManager').torrents + + for torrent_id in torrent_ids: + torrent = all_torrents[torrent_id] + if keyword in torrent.filename.lower(): + yield torrent_id + elif keyword in torrent.state.lower(): + yield torrent_id + elif torrent.trackers and keyword in torrent.trackers[0]['url']: + yield torrent_id + elif keyword in torrent_id: + yield torrent_id + # Want to find broken torrents (search on "error", or "unregistered") + elif keyword in torrent.tracker_status.lower(): + yield torrent_id + else: + for t_file in torrent.get_files(): + if keyword in t_file['path'].lower(): + yield torrent_id + break + + +def filter_by_name(torrent_ids, search_string): + all_torrents = component.get('TorrentManager').torrents + try: + search_string, match_case = search_string[0].split('::match') + except ValueError: + search_string = search_string[0] + match_case = False + + if match_case is False: + search_string = search_string.lower() + + for torrent_id in torrent_ids: + torrent_name = all_torrents[torrent_id].get_name() + if match_case is False: + torrent_name = all_torrents[torrent_id].get_name().lower() + else: + torrent_name = all_torrents[torrent_id].get_name() + + if search_string in torrent_name: + yield torrent_id + + +def tracker_error_filter(torrent_ids, values): + filtered_torrent_ids = [] + tm = component.get('TorrentManager') + + # If this is a tracker_host, then we need to filter on it + if values[0] != 'Error': + for torrent_id in torrent_ids: + if values[0] == tm[torrent_id].get_status(['tracker_host'])['tracker_host']: + filtered_torrent_ids.append(torrent_id) + return filtered_torrent_ids + + # Check torrent's tracker_status for 'Error:' and return those torrent_ids + for torrent_id in torrent_ids: + if 'Error:' in tm[torrent_id].get_status(['tracker_status'])['tracker_status']: + filtered_torrent_ids.append(torrent_id) + return filtered_torrent_ids + + +class FilterManager(component.Component): + """FilterManager""" + + def __init__(self, core): + component.Component.__init__(self, 'FilterManager') + log.debug('FilterManager init..') + self.core = core + self.torrents = core.torrentmanager + self.registered_filters = {} + self.register_filter('keyword', filter_keywords) + self.register_filter('name', filter_by_name) + self.tree_fields = {} + + self.register_tree_field('state', self._init_state_tree) + + def _init_tracker_tree(): + return {'Error': 0} + + self.register_tree_field('tracker_host', _init_tracker_tree) + + self.register_filter('tracker_host', tracker_error_filter) + + def _init_users_tree(): + return {'': 0} + + self.register_tree_field('owner', _init_users_tree) + + def filter_torrent_ids(self, filter_dict): + """ + returns a list of torrent_id's matching filter_dict. + core filter method + """ + if not filter_dict: + return self.torrents.get_torrent_list() + + # Sanitize input: filter-value must be a list of strings + for key, value in filter_dict.items(): + if isinstance(value, str): + filter_dict[key] = [value] + + # Optimized filter for id + if 'id' in filter_dict: + torrent_ids = list(filter_dict['id']) + del filter_dict['id'] + else: + torrent_ids = self.torrents.get_torrent_list() + + # Return if there's nothing more to filter + if not filter_dict: + return torrent_ids + + # Special purpose, state=Active. + if 'state' in filter_dict: + # We need to make sure this is a list for the logic below + filter_dict['state'] = list(filter_dict['state']) + + if 'state' in filter_dict and 'Active' in filter_dict['state']: + filter_dict['state'].remove('Active') + if not filter_dict['state']: + del filter_dict['state'] + torrent_ids = self.filter_state_active(torrent_ids) + + if not filter_dict: + return torrent_ids + + # Registered filters + for field, values in list(filter_dict.items()): + if field in self.registered_filters: + # Filters out doubles + torrent_ids = list( + set(self.registered_filters[field](torrent_ids, values)) + ) + del filter_dict[field] + + if not filter_dict: + return torrent_ids + + torrent_keys, plugin_keys = self.torrents.separate_keys( + list(filter_dict), torrent_ids + ) + # Leftover filter arguments, default filter on status fields. + for torrent_id in list(torrent_ids): + status = self.core.create_torrent_status( + torrent_id, torrent_keys, plugin_keys + ) + for field, values in filter_dict.items(): + if field in status and status[field] in values: + continue + elif torrent_id in torrent_ids: + torrent_ids.remove(torrent_id) + return torrent_ids + + def get_filter_tree(self, show_zero_hits=True, hide_cat=None): + """ + returns {field: [(value,count)] } + for use in sidebar. + """ + torrent_ids = self.torrents.get_torrent_list() + tree_keys = list(self.tree_fields) + if hide_cat: + for cat in hide_cat: + tree_keys.remove(cat) + + torrent_keys, plugin_keys = self.torrents.separate_keys(tree_keys, torrent_ids) + items = {field: self.tree_fields[field]() for field in tree_keys} + + for torrent_id in list(torrent_ids): + status = self.core.create_torrent_status( + torrent_id, torrent_keys, plugin_keys + ) # status={key:value} + for field in tree_keys: + value = status[field] + items[field][value] = items[field].get(value, 0) + 1 + + if 'tracker_host' in items: + items['tracker_host']['All'] = len(torrent_ids) + items['tracker_host']['Error'] = len( + tracker_error_filter(torrent_ids, ('Error',)) + ) + + if not show_zero_hits: + for cat in ['state', 'owner', 'tracker_host']: + if cat in tree_keys: + self._hide_state_items(items[cat]) + + # Return a dict of tuples: + sorted_items = {field: sorted(items[field].items()) for field in tree_keys} + + if 'state' in tree_keys: + sorted_items['state'].sort(key=self._sort_state_item) + + return sorted_items + + def _init_state_tree(self): + init_state = {} + init_state['All'] = len(self.torrents.get_torrent_list()) + for state in TORRENT_STATE: + init_state[state] = 0 + init_state['Active'] = len( + self.filter_state_active(self.torrents.get_torrent_list()) + ) + return init_state + + def register_filter(self, filter_id, filter_func, filter_value=None): + self.registered_filters[filter_id] = filter_func + + def deregister_filter(self, filter_id): + del self.registered_filters[filter_id] + + def register_tree_field(self, field, init_func=lambda: {}): + self.tree_fields[field] = init_func + + def deregister_tree_field(self, field): + if field in self.tree_fields: + del self.tree_fields[field] + + def filter_state_active(self, torrent_ids): + for torrent_id in list(torrent_ids): + status = self.torrents[torrent_id].get_status( + ['download_payload_rate', 'upload_payload_rate'] + ) + if status['download_payload_rate'] or status['upload_payload_rate']: + pass + else: + torrent_ids.remove(torrent_id) + return torrent_ids + + def _hide_state_items(self, state_items): + """For hide(show)-zero hits""" + for value, count in list(state_items.items()): + if value != 'All' and count == 0: + del state_items[value] + + def _sort_state_item(self, item): + try: + return STATE_SORT.index(item[0]) + except ValueError: + return 99 diff --git a/deluge/core/pluginmanager.py b/deluge/core/pluginmanager.py new file mode 100644 index 0000000..0482b16 --- /dev/null +++ b/deluge/core/pluginmanager.py @@ -0,0 +1,105 @@ +# +# Copyright (C) 2007 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + + +"""PluginManager for Core""" +import logging + +from twisted.internet import defer + +import deluge.component as component +import deluge.pluginmanagerbase +from deluge.event import PluginDisabledEvent, PluginEnabledEvent + +log = logging.getLogger(__name__) + + +class PluginManager(deluge.pluginmanagerbase.PluginManagerBase, component.Component): + """PluginManager handles the loading of plugins and provides plugins with + functions to access parts of the core.""" + + def __init__(self, core): + component.Component.__init__(self, 'CorePluginManager') + + self.status_fields = {} + + # Call the PluginManagerBase constructor + deluge.pluginmanagerbase.PluginManagerBase.__init__( + self, 'core.conf', 'deluge.plugin.core' + ) + + def start(self): + # Enable plugins that are enabled in the config + self.enable_plugins() + + def stop(self): + # Disable all enabled plugins + self.disable_plugins() + + def shutdown(self): + self.stop() + + def update_plugins(self): + for plugin in self.plugins: + if hasattr(self.plugins[plugin], 'update'): + try: + self.plugins[plugin].update() + except Exception as ex: + log.exception(ex) + + def enable_plugin(self, name): + d = defer.succeed(True) + if name not in self.plugins: + d = deluge.pluginmanagerbase.PluginManagerBase.enable_plugin(self, name) + + def on_enable_plugin(result): + if result is True and name in self.plugins: + component.get('EventManager').emit(PluginEnabledEvent(name)) + return result + + d.addBoth(on_enable_plugin) + return d + + def disable_plugin(self, name): + d = defer.succeed(True) + if name in self.plugins: + d = deluge.pluginmanagerbase.PluginManagerBase.disable_plugin(self, name) + + def on_disable_plugin(result): + if name not in self.plugins: + component.get('EventManager').emit(PluginDisabledEvent(name)) + return result + + d.addBoth(on_disable_plugin) + return d + + def get_status(self, torrent_id, fields): + """Return the value of status fields for the selected torrent_id.""" + status = {} + if len(fields) == 0: + fields = list(self.status_fields) + for field in fields: + try: + status[field] = self.status_fields[field](torrent_id) + except KeyError: + pass + return status + + def register_status_field(self, field, function): + """Register a new status field. This can be used in the same way the + client requests other status information from core.""" + log.debug('Registering status field %s with PluginManager', field) + self.status_fields[field] = function + + def deregister_status_field(self, field): + """Deregisters a status field""" + log.debug('Deregistering status field %s with PluginManager', field) + try: + del self.status_fields[field] + except Exception: + log.warning('Unable to deregister status field %s', field) diff --git a/deluge/core/preferencesmanager.py b/deluge/core/preferencesmanager.py new file mode 100644 index 0000000..7e5c207 --- /dev/null +++ b/deluge/core/preferencesmanager.py @@ -0,0 +1,476 @@ +# +# Copyright (C) 2008-2010 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + + +import logging +import os +import platform +import random +import threading +from urllib.parse import quote_plus +from urllib.request import urlopen + +from twisted.internet.task import LoopingCall + +import deluge.common +import deluge.component as component +import deluge.configmanager +from deluge._libtorrent import lt +from deluge.event import ConfigValueChangedEvent + +GeoIP = None +try: + from GeoIP import GeoIP +except ImportError: + try: + from pygeoip import GeoIP + except ImportError: + pass + +log = logging.getLogger(__name__) + +DEFAULT_PREFS = { + 'send_info': False, + 'info_sent': 0.0, + 'daemon_port': 58846, + 'allow_remote': False, + 'pre_allocate_storage': False, + 'download_location': deluge.common.get_default_download_dir(), + 'listen_ports': [6881, 6891], + 'listen_interface': '', + 'outgoing_interface': '', + 'random_port': True, + 'listen_random_port': None, + 'listen_use_sys_port': False, + 'listen_reuse_port': True, + 'outgoing_ports': [0, 0], + 'random_outgoing_ports': True, + 'copy_torrent_file': False, + 'del_copy_torrent_file': False, + 'torrentfiles_location': deluge.common.get_default_download_dir(), + 'plugins_location': os.path.join(deluge.configmanager.get_config_dir(), 'plugins'), + 'prioritize_first_last_pieces': False, + 'sequential_download': False, + 'dht': True, + 'upnp': True, + 'natpmp': True, + 'utpex': True, + 'lsd': True, + 'enc_in_policy': 1, + 'enc_out_policy': 1, + 'enc_level': 2, + 'max_connections_global': 200, + 'max_upload_speed': -1.0, + 'max_download_speed': -1.0, + 'max_upload_slots_global': 4, + 'max_half_open_connections': ( + lambda: deluge.common.windows_check() + and (lambda: deluge.common.vista_check() and 4 or 8)() + or 50 + )(), + 'max_connections_per_second': 20, + 'ignore_limits_on_local_network': True, + 'max_connections_per_torrent': -1, + 'max_upload_slots_per_torrent': -1, + 'max_upload_speed_per_torrent': -1, + 'max_download_speed_per_torrent': -1, + 'enabled_plugins': [], + 'add_paused': False, + 'max_active_seeding': 5, + 'max_active_downloading': 3, + 'max_active_limit': 8, + 'dont_count_slow_torrents': False, + 'queue_new_to_top': False, + 'stop_seed_at_ratio': False, + 'remove_seed_at_ratio': False, + 'stop_seed_ratio': 2.00, + 'share_ratio_limit': 2.00, + 'seed_time_ratio_limit': 7.00, + 'seed_time_limit': 180, + 'auto_managed': True, + 'move_completed': False, + 'move_completed_path': deluge.common.get_default_download_dir(), + 'move_completed_paths_list': [], + 'download_location_paths_list': [], + 'path_chooser_show_chooser_button_on_localhost': True, + 'path_chooser_auto_complete_enabled': True, + 'path_chooser_accelerator_string': 'Tab', + 'path_chooser_max_popup_rows': 20, + 'path_chooser_show_hidden_files': False, + 'new_release_check': True, + 'proxy': { + 'type': 0, + 'hostname': '', + 'username': '', + 'password': '', + 'port': 8080, + 'proxy_hostnames': True, + 'proxy_peer_connections': True, + 'proxy_tracker_connections': True, + 'force_proxy': False, + 'anonymous_mode': False, + }, + 'peer_tos': '0x00', + 'rate_limit_ip_overhead': True, + 'geoip_db_location': '/usr/share/GeoIP/GeoIP.dat', + 'cache_size': 512, + 'cache_expiry': 60, + 'auto_manage_prefer_seeds': False, + 'shared': False, + 'super_seeding': False, +} + + +class PreferencesManager(component.Component): + def __init__(self): + component.Component.__init__(self, 'PreferencesManager') + self.config = deluge.configmanager.ConfigManager('core.conf', DEFAULT_PREFS) + if 'proxies' in self.config: + log.warning( + 'Updating config file for proxy, using "peer" values to fill new "proxy" setting' + ) + self.config['proxy'].update(self.config['proxies']['peer']) + log.warning('New proxy config is: %s', self.config['proxy']) + del self.config['proxies'] + if 'i2p_proxy' in self.config and self.config['i2p_proxy']['hostname']: + self.config['proxy'].update(self.config['i2p_proxy']) + self.config['proxy']['type'] = 6 + del self.config['i2p_proxy'] + if 'anonymous_mode' in self.config: + self.config['proxy']['anonymous_mode'] = self.config['anonymous_mode'] + del self.config['anonymous_mode'] + if 'proxy' in self.config: + for key in DEFAULT_PREFS['proxy']: + if key not in self.config['proxy']: + self.config['proxy'][key] = DEFAULT_PREFS['proxy'][key] + + self.core = component.get('Core') + self.new_release_timer = None + + def start(self): + # Set the initial preferences on start-up + for key in DEFAULT_PREFS: + self.do_config_set_func(key, self.config[key]) + + self.config.register_change_callback(self._on_config_value_change) + + def stop(self): + if self.new_release_timer and self.new_release_timer.running: + self.new_release_timer.stop() + + # Config set functions + def do_config_set_func(self, key, value): + on_set_func = getattr(self, '_on_set_' + key, None) + if on_set_func: + if log.isEnabledFor(logging.DEBUG): + log.debug('Config key: %s set to %s..', key, value) + on_set_func(key, value) + + def _on_config_value_change(self, key, value): + if self.get_state() == 'Started': + self.do_config_set_func(key, value) + component.get('EventManager').emit(ConfigValueChangedEvent(key, value)) + + def _on_set_torrentfiles_location(self, key, value): + if self.config['copy_torrent_file']: + try: + os.makedirs(value) + except OSError as ex: + log.debug('Unable to make directory: %s', ex) + + def _on_set_listen_ports(self, key, value): + self.__set_listen_on() + + def _on_set_listen_interface(self, key, value): + self.__set_listen_on() + + def _on_set_outgoing_interface(self, key, value): + """Set interface name or IP address for outgoing BitTorrent connections.""" + value = value.strip() if value else '' + self.core.apply_session_settings({'outgoing_interfaces': value}) + + def _on_set_random_port(self, key, value): + self.__set_listen_on() + + def __set_listen_on(self): + """Set the ports and interface address to listen for incoming connections on.""" + if self.config['random_port']: + if not self.config['listen_random_port']: + self.config['listen_random_port'] = random.randrange(49152, 65525) + listen_ports = [ + self.config['listen_random_port'] + ] * 2 # use single port range + else: + self.config['listen_random_port'] = None + listen_ports = self.config['listen_ports'] + + if self.config['listen_interface']: + interface = self.config['listen_interface'].strip() + else: + interface = '0.0.0.0' + + log.debug( + 'Listen Interface: %s, Ports: %s with use_sys_port: %s', + interface, + listen_ports, + self.config['listen_use_sys_port'], + ) + interfaces = [ + f'{interface}:{port}' + for port in range(listen_ports[0], listen_ports[1] + 1) + ] + self.core.apply_session_settings( + { + 'listen_system_port_fallback': self.config['listen_use_sys_port'], + 'listen_interfaces': ','.join(interfaces), + } + ) + + def _on_set_outgoing_ports(self, key, value): + self.__set_outgoing_ports() + + def _on_set_random_outgoing_ports(self, key, value): + self.__set_outgoing_ports() + + def __set_outgoing_ports(self): + port = ( + 0 + if self.config['random_outgoing_ports'] + else self.config['outgoing_ports'][0] + ) + if port: + num_ports = ( + self.config['outgoing_ports'][1] - self.config['outgoing_ports'][0] + ) + num_ports = num_ports if num_ports > 1 else 5 + else: + num_ports = 0 + log.debug('Outgoing port set to %s with range: %s', port, num_ports) + self.core.apply_session_settings( + {'outgoing_port': port, 'num_outgoing_ports': num_ports} + ) + + def _on_set_peer_tos(self, key, value): + try: + self.core.apply_session_setting('peer_tos', int(value, 16)) + except ValueError as ex: + log.error('Invalid tos byte: %s', ex) + + def _on_set_dht(self, key, value): + lt_bootstraps = self.core.session.get_settings()['dht_bootstrap_nodes'] + # Update list of lt bootstraps, using set to remove duplicates. + dht_bootstraps = set( + lt_bootstraps.split(',') + + [ + 'router.bittorrent.com:6881', + 'router.utorrent.com:6881', + 'router.bitcomet.com:6881', + 'dht.transmissionbt.com:6881', + 'dht.aelitis.com:6881', + ] + ) + self.core.apply_session_settings( + {'dht_bootstrap_nodes': ','.join(dht_bootstraps), 'enable_dht': value} + ) + + def _on_set_upnp(self, key, value): + self.core.apply_session_setting('enable_upnp', value) + + def _on_set_natpmp(self, key, value): + self.core.apply_session_setting('enable_natpmp', value) + + def _on_set_lsd(self, key, value): + self.core.apply_session_setting('enable_lsd', value) + + def _on_set_utpex(self, key, value): + if value: + self.core.session.add_extension('ut_pex') + + def _on_set_enc_in_policy(self, key, value): + self._on_set_encryption(key, value) + + def _on_set_enc_out_policy(self, key, value): + self._on_set_encryption(key, value) + + def _on_set_enc_level(self, key, value): + self._on_set_encryption(key, value) + + def _on_set_encryption(self, key, value): + # Convert Deluge enc_level values to libtorrent enc_level values. + pe_enc_level = { + 0: lt.enc_level.plaintext, + 1: lt.enc_level.rc4, + 2: lt.enc_level.both, + } + self.core.apply_session_settings( + { + 'out_enc_policy': lt.enc_policy(self.config['enc_out_policy']), + 'in_enc_policy': lt.enc_policy(self.config['enc_in_policy']), + 'allowed_enc_level': lt.enc_level( + pe_enc_level[self.config['enc_level']] + ), + 'prefer_rc4': True, + } + ) + + def _on_set_max_connections_global(self, key, value): + self.core.apply_session_setting('connections_limit', value) + + def _on_set_max_upload_speed(self, key, value): + # We need to convert Kb/s to B/s + value = -1 if value < 0 else int(value * 1024) + self.core.apply_session_setting('upload_rate_limit', value) + + def _on_set_max_download_speed(self, key, value): + # We need to convert Kb/s to B/s + value = -1 if value < 0 else int(value * 1024) + self.core.apply_session_setting('download_rate_limit', value) + + def _on_set_max_upload_slots_global(self, key, value): + self.core.apply_session_setting('unchoke_slots_limit', value) + + def _on_set_max_half_open_connections(self, key, value): + self.core.apply_session_setting('half_open_limit', value) + + def _on_set_max_connections_per_second(self, key, value): + self.core.apply_session_setting('connection_speed', value) + + def _on_set_ignore_limits_on_local_network(self, key, value): + self.core.apply_session_setting('ignore_limits_on_local_network', value) + + def _on_set_share_ratio_limit(self, key, value): + # This value is a float percentage in deluge, but libtorrent needs int percentage. + self.core.apply_session_setting('share_ratio_limit', int(value * 100)) + + def _on_set_seed_time_ratio_limit(self, key, value): + # This value is a float percentage in deluge, but libtorrent needs int percentage. + self.core.apply_session_setting('seed_time_ratio_limit', int(value * 100)) + + def _on_set_seed_time_limit(self, key, value): + # This value is stored in minutes in deluge, but libtorrent wants seconds + self.core.apply_session_setting('seed_time_limit', int(value * 60)) + + def _on_set_max_active_downloading(self, key, value): + self.core.apply_session_setting('active_downloads', value) + + def _on_set_max_active_seeding(self, key, value): + self.core.apply_session_setting('active_seeds', value) + + def _on_set_max_active_limit(self, key, value): + self.core.apply_session_setting('active_limit', value) + + def _on_set_dont_count_slow_torrents(self, key, value): + self.core.apply_session_setting('dont_count_slow_torrents', value) + + def _on_set_send_info(self, key, value): + """sends anonymous stats home""" + log.debug('Sending anonymous stats..') + + class SendInfoThread(threading.Thread): + def __init__(self, config): + self.config = config + threading.Thread.__init__(self) + + def run(self): + import time + + now = time.time() + # check if we've done this within the last week or never + if (now - self.config['info_sent']) >= (60 * 60 * 24 * 7): + try: + url = ( + 'http://deluge-torrent.org/stats_get.php?processor=' + + platform.machine() + + '&python=' + + platform.python_version() + + '&deluge=' + + deluge.common.get_version() + + '&os=' + + platform.system() + + '&plugins=' + + quote_plus(':'.join(self.config['enabled_plugins'])) + ) + urlopen(url) + except OSError as ex: + log.debug('Network error while trying to send info: %s', ex) + else: + self.config['info_sent'] = now + + if value: + SendInfoThread(self.config).start() + + def _on_set_new_release_check(self, key, value): + if value: + log.debug('Checking for new release..') + threading.Thread(target=self.core.get_new_release).start() + if self.new_release_timer and self.new_release_timer.running: + self.new_release_timer.stop() + # Set a timer to check for a new release every 3 days + self.new_release_timer = LoopingCall( + self._on_set_new_release_check, 'new_release_check', True + ) + self.new_release_timer.start(72 * 60 * 60, False) + else: + if self.new_release_timer and self.new_release_timer.running: + self.new_release_timer.stop() + + def _on_set_proxy(self, key, value): + # Initialise with type none and blank hostnames. + proxy_settings = { + 'proxy_type': lt.proxy_type_t.none, + 'i2p_hostname': '', + 'proxy_hostname': '', + 'proxy_hostnames': value['proxy_hostnames'], + 'proxy_peer_connections': value['proxy_peer_connections'], + 'proxy_tracker_connections': value['proxy_tracker_connections'], + 'force_proxy': value['force_proxy'], + 'anonymous_mode': value['anonymous_mode'], + } + + if value['type'] == lt.proxy_type_t.i2p_proxy: + proxy_settings.update( + { + 'proxy_type': lt.proxy_type_t.i2p_proxy, + 'i2p_hostname': value['hostname'], + 'i2p_port': value['port'], + } + ) + elif value['type'] != lt.proxy_type_t.none: + proxy_settings.update( + { + 'proxy_type': value['type'], + 'proxy_hostname': value['hostname'], + 'proxy_port': value['port'], + 'proxy_username': value['username'], + 'proxy_password': value['password'], + } + ) + + self.core.apply_session_settings(proxy_settings) + + def _on_set_rate_limit_ip_overhead(self, key, value): + self.core.apply_session_setting('rate_limit_ip_overhead', value) + + def _on_set_geoip_db_location(self, key, geoipdb_path): + # Load the GeoIP DB for country look-ups if available + if os.path.exists(geoipdb_path): + try: + self.core.geoip_instance = GeoIP(geoipdb_path, 0) + except Exception as ex: + log.warning('GeoIP Unavailable: %s', ex) + else: + log.warning('Unable to find GeoIP database file: %s', geoipdb_path) + + def _on_set_cache_size(self, key, value): + self.core.apply_session_setting('cache_size', value) + + def _on_set_cache_expiry(self, key, value): + self.core.apply_session_setting('cache_expiry', value) + + def _on_auto_manage_prefer_seeds(self, key, value): + self.core.apply_session_setting('auto_manage_prefer_seeds', value) diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py new file mode 100644 index 0000000..81ab2e0 --- /dev/null +++ b/deluge/core/rpcserver.py @@ -0,0 +1,598 @@ +# +# Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +"""RPCServer Module""" +import logging +import os +import sys +import traceback +from collections import namedtuple +from types import FunctionType +from typing import Callable, TypeVar, overload + +from twisted.internet import defer, reactor +from twisted.internet.protocol import Factory, connectionDone + +import deluge.component as component +import deluge.configmanager +from deluge.core.authmanager import ( + AUTH_LEVEL_ADMIN, + AUTH_LEVEL_DEFAULT, + AUTH_LEVEL_NONE, +) +from deluge.crypto_utils import check_ssl_keys, get_context_factory +from deluge.error import ( + DelugeError, + IncompatibleClient, + NotAuthorizedError, + WrappedException, + _ClientSideRecreateError, +) +from deluge.event import ClientDisconnectedEvent +from deluge.transfer import DelugeTransferProtocol + +RPC_RESPONSE = 1 +RPC_ERROR = 2 +RPC_EVENT = 3 + +log = logging.getLogger(__name__) + +TCallable = TypeVar('TCallable', bound=Callable) + + +@overload +def export(func: TCallable) -> TCallable: + ... + + +@overload +def export(auth_level: int) -> Callable[[TCallable], TCallable]: + ... + + +def export(auth_level=AUTH_LEVEL_DEFAULT): + """ + Decorator function to register an object's method as an RPC. The object + will need to be registered with an :class:`RPCServer` to be effective. + + :param func: the function to export + :type func: function + :param auth_level: the auth level required to call this method + :type auth_level: int + + """ + + def wrap(func, *args, **kwargs): + func._rpcserver_export = True + func._rpcserver_auth_level = auth_level + + rpc_text = '**RPC exported method** (*Auth level: %s*)' % auth_level + + # Append the RPC text while ensuring correct docstring formatting. + if func.__doc__: + if func.__doc__.endswith(' '): + indent = func.__doc__.split('\n')[-1] + func.__doc__ += f'\n{indent}' + else: + func.__doc__ += '\n\n' + func.__doc__ += rpc_text + else: + func.__doc__ = rpc_text + + return func + + if isinstance(auth_level, FunctionType): + func = auth_level + auth_level = AUTH_LEVEL_DEFAULT + return wrap(func) + else: + return wrap + + +def format_request(call): + """ + Format the RPCRequest message for debug printing + + :param call: the request + :type call: a RPCRequest + + :returns: a formatted string for printing + :rtype: str + + """ + try: + s = call[1] + '(' + if call[2]: + s += ', '.join([str(x) for x in call[2]]) + if call[3]: + if call[2]: + s += ', ' + s += ', '.join([key + '=' + str(value) for key, value in call[3].items()]) + s += ')' + except UnicodeEncodeError: + return 'UnicodeEncodeError, call: %s' % call + else: + return s + + +class DelugeRPCProtocol(DelugeTransferProtocol): + def __init__(self): + super().__init__() + # namedtuple subclass with auth_level, username for the connected session. + self.AuthLevel = namedtuple('SessionAuthlevel', 'auth_level, username') + + def message_received(self, request): + """ + This method is called whenever a message is received from a client. The + only message that a client sends to the server is a RPC Request message. + If the RPC Request message is valid, then the method is called in + :meth:`dispatch`. + + :param request: the request from the client. + :type data: tuple + + """ + if not isinstance(request, tuple): + log.debug('Received invalid message: type is not tuple') + return + + if len(request) < 1: + log.debug('Received invalid message: there are no items') + return + + for call in request: + if len(call) != 4: + log.debug( + 'Received invalid rpc request: number of items ' 'in request is %s', + len(call), + ) + continue + # log.debug('RPCRequest: %s', format_request(call)) + reactor.callLater(0, self.dispatch, *call) + + def sendData(self, data): # NOQA: N802 + """ + Sends the data to the client. + + :param data: the object that is to be sent to the client. This should + be one of the RPC message types. + :type data: object + + """ + try: + self.transfer_message(data) + except Exception as ex: + log.warning('Error occurred when sending message: %s.', ex) + log.exception(ex) + raise + + def connectionMade(self): # NOQA: N802 + """ + This method is called when a new client connects. + """ + peer = self.transport.getPeer() + log.info('Deluge Client connection made from: %s:%s', peer.host, peer.port) + # Set the initial auth level of this session to AUTH_LEVEL_NONE + self.factory.authorized_sessions[self.transport.sessionno] = self.AuthLevel( + AUTH_LEVEL_NONE, '' + ) + + def connectionLost(self, reason=connectionDone): # NOQA: N802 + """ + This method is called when the client is disconnected. + + :param reason: the reason the client disconnected. + :type reason: str + + """ + + # We need to remove this session from various dicts + del self.factory.authorized_sessions[self.transport.sessionno] + if self.transport.sessionno in self.factory.session_protocols: + del self.factory.session_protocols[self.transport.sessionno] + if self.transport.sessionno in self.factory.interested_events: + del self.factory.interested_events[self.transport.sessionno] + + if self.factory.state == 'running': + component.get('EventManager').emit( + ClientDisconnectedEvent(self.factory.session_id) + ) + log.info('Deluge client disconnected: %s', reason.value) + + def valid_session(self): + return self.transport.sessionno in self.factory.authorized_sessions + + def dispatch(self, request_id, method, args, kwargs): + """ + This method is run when a RPC Request is made. It will run the local method + and will send either a RPC Response or RPC Error back to the client. + + :param request_id: the request_id from the client (sent in the RPC Request) + :type request_id: int + :param method: the local method to call. It must be registered with + the :class:`RPCServer`. + :type method: str + :param args: the arguments to pass to `method` + :type args: list + :param kwargs: the keyword-arguments to pass to `method` + :type kwargs: dict + + """ + + def send_error(): + """ + Sends an error response with the contents of the exception that was raised. + """ + exc_type, exc_value, dummy_exc_trace = sys.exc_info() + formated_tb = traceback.format_exc() + try: + self.sendData( + ( + RPC_ERROR, + request_id, + exc_type.__name__, + exc_value._args, + exc_value._kwargs, + formated_tb, + ) + ) + except AttributeError: + # This is not a deluge exception (object has no attribute '_args), let's wrap it + log.warning( + 'An exception occurred while sending RPC_ERROR to ' + 'client. Wrapping it and resending. Error to ' + 'send(causing exception goes next):\n%s', + formated_tb, + ) + try: + raise WrappedException( + str(exc_value), exc_type.__name__, formated_tb + ) + except WrappedException: + send_error() + except Exception as ex: + log.error( + 'An exception occurred while sending RPC_ERROR to client: %s', ex + ) + + if method == 'daemon.info': + # This is a special case and used in the initial connection process + self.sendData((RPC_RESPONSE, request_id, deluge.common.get_version())) + return + elif method == 'daemon.login': + # This is a special case and used in the initial connection process + # We need to authenticate the user here + log.debug('RPC dispatch daemon.login') + try: + client_version = kwargs.pop('client_version', None) + if client_version is None: + raise IncompatibleClient(deluge.common.get_version()) + ret = component.get('AuthManager').authorize(*args, **kwargs) + if ret: + self.factory.authorized_sessions[ + self.transport.sessionno + ] = self.AuthLevel(ret, args[0]) + self.factory.session_protocols[self.transport.sessionno] = self + except Exception as ex: + send_error() + if not isinstance(ex, _ClientSideRecreateError): + log.exception(ex) + else: + self.sendData((RPC_RESPONSE, request_id, (ret))) + if not ret: + self.transport.loseConnection() + return + + # Anything below requires a valid session + if not self.valid_session(): + return + + if method == 'daemon.set_event_interest': + log.debug('RPC dispatch daemon.set_event_interest') + # This special case is to allow clients to set which events they are + # interested in receiving. + # We are expecting a sequence from the client. + try: + if self.transport.sessionno not in self.factory.interested_events: + self.factory.interested_events[self.transport.sessionno] = [] + self.factory.interested_events[self.transport.sessionno].extend(args[0]) + except Exception: + send_error() + else: + self.sendData((RPC_RESPONSE, request_id, (True))) + return + + if method not in self.factory.methods: + try: + # Raise exception to be sent back to client + raise AttributeError('RPC call on invalid function: %s' % method) + except AttributeError: + send_error() + return + + log.debug('RPC dispatch %s', method) + try: + method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level + auth_level = self.factory.authorized_sessions[ + self.transport.sessionno + ].auth_level + if auth_level < method_auth_requirement: + # This session is not allowed to call this method + log.debug( + 'Session %s is attempting an unauthorized method call!', + self.transport.sessionno, + ) + raise NotAuthorizedError(auth_level, method_auth_requirement) + # Set the session_id in the factory so that methods can know + # which session is calling it. + self.factory.session_id = self.transport.sessionno + ret = self.factory.methods[method](*args, **kwargs) + except Exception as ex: + send_error() + # Don't bother printing out DelugeErrors, because they are just + # for the client + if not isinstance(ex, DelugeError): + log.exception('Exception calling RPC request: %s', ex) + else: + # Check if the return value is a deferred, since we'll need to + # wait for it to fire before sending the RPC_RESPONSE + if isinstance(ret, defer.Deferred): + + def on_success(result): + try: + self.sendData((RPC_RESPONSE, request_id, result)) + except Exception: + send_error() + return result + + def on_fail(failure): + try: + failure.raiseException() + except Exception: + send_error() + return failure + + ret.addCallbacks(on_success, on_fail) + else: + self.sendData((RPC_RESPONSE, request_id, ret)) + + +class RPCServer(component.Component): + """ + This class is used to handle rpc requests from the client. Objects are + registered with this class and their methods are exported using the export + decorator. + + :param port: the port the RPCServer will listen on + :type port: int + :param interface: the interface to listen on, this may override the `allow_remote` setting + :type interface: str + :param allow_remote: set True if the server should allow remote connections + :type allow_remote: bool + :param listen: if False, will not start listening.. This is only useful in Classic Mode + :type listen: bool + """ + + def __init__(self, port=58846, interface='', allow_remote=False, listen=True): + component.Component.__init__(self, 'RPCServer') + + self.factory = Factory() + self.factory.protocol = DelugeRPCProtocol + self.factory.session_id = -1 + self.factory.state = 'running' + + # Holds the registered methods + self.factory.methods = {} + # Holds the session_ids and auth levels + self.factory.authorized_sessions = {} + # Holds the protocol objects with the session_id as key + self.factory.session_protocols = {} + # Holds the interested event list for the sessions + self.factory.interested_events = {} + + self.listen = listen + if not listen: + return + + if allow_remote: + hostname = '' + else: + hostname = 'localhost' + + if interface: + hostname = interface + + log.info('Starting DelugeRPC server %s:%s', hostname, port) + + # Check for SSL keys and generate some if needed + check_ssl_keys() + + cert = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.cert') + pkey = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.pkey') + + try: + reactor.listenSSL( + port, self.factory, get_context_factory(cert, pkey), interface=hostname + ) + except Exception as ex: + log.debug('Daemon already running or port not available.: %s', ex) + raise + + def register_object(self, obj, name=None): + """ + Registers an object to export it's rpc methods. These methods should + be exported with the export decorator prior to registering the object. + + :param obj: the object that we want to export + :type obj: object + :param name: the name to use, if None, it will be the class name of the object + :type name: str + """ + if not name: + name = obj.__class__.__name__.lower() + + for d in dir(obj): + if d[0] == '_': + continue + if getattr(getattr(obj, d), '_rpcserver_export', False): + log.debug('Registering method: %s', name + '.' + d) + self.factory.methods[name + '.' + d] = getattr(obj, d) + + def deregister_object(self, obj): + """ + Deregisters an objects exported rpc methods. + + :param obj: the object that was previously registered + + """ + for key, value in self.factory.methods.items(): + if value.__self__ == obj: + del self.factory.methods[key] + + def get_object_method(self, name): + """ + Returns a registered method. + + :param name: the name of the method, usually in the form of 'object.method' + :type name: str + + :returns: method + + :raises KeyError: if `name` is not registered + + """ + return self.factory.methods[name] + + def get_method_list(self): + """ + Returns a list of the exported methods. + + :returns: the exported methods + :rtype: list + """ + return list(self.factory.methods) + + def get_session_id(self): + """ + Returns the session id of the current RPC. + + :returns: the session id, this will be -1 if no connections have been made + :rtype: int + + """ + return self.factory.session_id + + def get_session_user(self): + """ + Returns the username calling the current RPC. + + :returns: the username of the user calling the current RPC + :rtype: string + + """ + if not self.listen: + return 'localclient' + session_id = self.get_session_id() + if session_id > -1 and session_id in self.factory.authorized_sessions: + return self.factory.authorized_sessions[session_id].username + else: + # No connections made yet + return '' + + def get_session_auth_level(self): + """ + Returns the auth level of the user calling the current RPC. + + :returns: the auth level + :rtype: int + """ + if not self.listen or not self.is_session_valid(self.get_session_id()): + return AUTH_LEVEL_ADMIN + return self.factory.authorized_sessions[self.get_session_id()].auth_level + + def get_rpc_auth_level(self, rpc): + """ + Returns the auth level requirement for an exported rpc. + + :returns: the auth level + :rtype: int + """ + return self.factory.methods[rpc]._rpcserver_auth_level + + def is_session_valid(self, session_id): + """ + Checks if the session is still valid, eg, if the client is still connected. + + :param session_id: the session id + :type session_id: int + + :returns: True if the session is valid + :rtype: bool + + """ + return session_id in self.factory.authorized_sessions + + def emit_event(self, event): + """ + Emits the event to interested clients. + + :param event: the event to emit + :type event: :class:`deluge.event.DelugeEvent` + """ + log.debug('intevents: %s', self.factory.interested_events) + # Use copy of `interested_events` since it can mutate while iterating. + for session_id, interest in self.factory.interested_events.copy().items(): + if event.name in interest: + log.debug('Emit Event: %s %s', event.name, event.args) + # This session is interested so send a RPC_EVENT + self.factory.session_protocols[session_id].sendData( + (RPC_EVENT, event.name, event.args) + ) + + def emit_event_for_session_id(self, session_id, event): + """ + Emits the event to specified session_id. + + :param session_id: the event to emit + :type session_id: int + :param event: the event to emit + :type event: :class:`deluge.event.DelugeEvent` + """ + if not self.is_session_valid(session_id): + log.debug( + 'Session ID %s is not valid. Not sending event "%s".', + session_id, + event.name, + ) + return + if session_id not in self.factory.interested_events: + log.debug( + 'Session ID %s is not interested in any events. Not sending event "%s".', + session_id, + event.name, + ) + return + if event.name not in self.factory.interested_events[session_id]: + log.debug( + 'Session ID %s is not interested in event "%s". Not sending it.', + session_id, + event.name, + ) + return + log.debug( + 'Sending event "%s" with args "%s" to session id "%s".', + event.name, + event.args, + session_id, + ) + self.factory.session_protocols[session_id].sendData( + (RPC_EVENT, event.name, event.args) + ) + + def stop(self): + self.factory.state = 'stopping' diff --git a/deluge/core/torrent.py b/deluge/core/torrent.py new file mode 100644 index 0000000..57ec26f --- /dev/null +++ b/deluge/core/torrent.py @@ -0,0 +1,1563 @@ +# +# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +"""Internal Torrent class + +Attributes: + LT_TORRENT_STATE_MAP (dict): Maps the torrent state from libtorrent to Deluge state. + +""" + +import logging +import os +import socket +import time +from typing import Optional +from urllib.parse import urlparse + +from twisted.internet.defer import Deferred, DeferredList + +import deluge.component as component +from deluge._libtorrent import lt +from deluge.common import decode_bytes +from deluge.configmanager import ConfigManager, get_config_dir +from deluge.core.authmanager import AUTH_LEVEL_ADMIN +from deluge.decorators import deprecated +from deluge.event import ( + TorrentFolderRenamedEvent, + TorrentStateChangedEvent, + TorrentTrackerStatusEvent, +) + +log = logging.getLogger(__name__) + +LT_TORRENT_STATE_MAP = { + 'queued_for_checking': 'Checking', + 'checking_files': 'Checking', + 'downloading_metadata': 'Downloading', + 'downloading': 'Downloading', + 'finished': 'Seeding', + 'seeding': 'Seeding', + 'allocating': 'Allocating', + 'checking_resume_data': 'Checking', +} + + +def sanitize_filepath(filepath, folder=False): + """Returns a sanitized filepath to pass to libtorrent rename_file(). + + The filepath will have backslashes substituted along with whitespace + padding and duplicate slashes stripped. + + Args: + folder (bool): A trailing slash is appended to the returned filepath. + """ + + def clean_filename(filename): + """Strips whitespace and discards dotted filenames""" + filename = filename.strip() + if filename.replace('.', '') == '': + return '' + return filename + + if '\\' in filepath or '/' in filepath: + folderpath = filepath.replace('\\', '/').split('/') + folderpath = [clean_filename(x) for x in folderpath] + newfilepath = '/'.join([path for path in folderpath if path]) + else: + newfilepath = clean_filename(filepath) + + if folder is True: + newfilepath += '/' + + return newfilepath + + +def convert_lt_files(files): + """Indexes and decodes files from libtorrent get_files(). + + Args: + files (file_storage): The libtorrent torrent files. + + Returns: + list of dict: The files. + + The format for the file dict:: + + { + "index": int, + "path": str, + "size": int, + "offset": int + } + """ + filelist = [] + for index in range(files.num_files()): + try: + file_path = files.file_path(index).decode('utf8') + except AttributeError: + file_path = files.file_path(index) + + filelist.append( + { + 'index': index, + 'path': file_path.replace('\\', '/'), + 'size': files.file_size(index), + 'offset': files.file_offset(index), + } + ) + + return filelist + + +class TorrentOptions(dict): + """TorrentOptions create a dict of the torrent options. + + Attributes: + add_paused (bool): Add the torrrent in a paused state. + auto_managed (bool): Set torrent to auto managed mode, i.e. will be started or queued automatically. + download_location (str): The path for the torrent data to be stored while downloading. + file_priorities (list of int): The priority for files in torrent, range is [0..7] however + only [0, 1, 4, 7] are normally used and correspond to [Skip, Low, Normal, High] + mapped_files (dict): A mapping of the renamed filenames in 'index:filename' pairs. + max_connections (int): Sets maximum number of connections this torrent will open. + This must be at least 2. The default is unlimited (-1). + max_download_speed (float): Will limit the download bandwidth used by this torrent to the + limit you set.The default is unlimited (-1) but will not exceed global limit. + max_upload_slots (int): Sets the maximum number of peers that are + unchoked at the same time on this torrent. This defaults to infinite (-1). + max_upload_speed (float): Will limit the upload bandwidth used by this torrent to the limit + you set. The default is unlimited (-1) but will not exceed global limit. + move_completed (bool): Move the torrent when downloading has finished. + move_completed_path (str): The path to move torrent to when downloading has finished. + name (str): The display name of the torrent. + owner (str): The user this torrent belongs to. + pre_allocate_storage (bool): When adding the torrent should all files be pre-allocated. + prioritize_first_last_pieces (bool): Prioritize the first and last pieces in the torrent. + remove_at_ratio (bool): Remove the torrent when it has reached the stop_ratio. + seed_mode (bool): Assume that all files are present for this torrent (Only used when adding a torent). + sequential_download (bool): Download the pieces of the torrent in order. + shared (bool): Enable the torrent to be seen by other Deluge users. + stop_at_ratio (bool): Stop the torrent when it has reached stop_ratio. + stop_ratio (float): The seeding ratio to stop (or remove) the torrent at. + super_seeding (bool): Enable super seeding/initial seeding. + """ + + def __init__(self): + super().__init__() + config = ConfigManager('core.conf').config + options_conf_map = { + 'add_paused': 'add_paused', + 'auto_managed': 'auto_managed', + 'download_location': 'download_location', + 'max_connections': 'max_connections_per_torrent', + 'max_download_speed': 'max_download_speed_per_torrent', + 'max_upload_slots': 'max_upload_slots_per_torrent', + 'max_upload_speed': 'max_upload_speed_per_torrent', + 'move_completed': 'move_completed', + 'move_completed_path': 'move_completed_path', + 'pre_allocate_storage': 'pre_allocate_storage', + 'prioritize_first_last_pieces': 'prioritize_first_last_pieces', + 'remove_at_ratio': 'remove_seed_at_ratio', + 'sequential_download': 'sequential_download', + 'shared': 'shared', + 'stop_at_ratio': 'stop_seed_at_ratio', + 'stop_ratio': 'stop_seed_ratio', + 'super_seeding': 'super_seeding', + } + for opt_k, conf_k in options_conf_map.items(): + self[opt_k] = config[conf_k] + self['file_priorities'] = [] + self['mapped_files'] = {} + self['name'] = '' + self['owner'] = '' + self['seed_mode'] = False + + +class TorrentError: + def __init__(self, error_message, was_paused=False, restart_to_resume=False): + self.error_message = error_message + self.was_paused = was_paused + self.restart_to_resume = restart_to_resume + + +class Torrent: + """Torrent holds information about torrents added to the libtorrent session. + + Args: + handle: The libtorrent torrent handle. + options (dict): The torrent options. + state (TorrentState): The torrent state. + filename (str): The filename of the torrent file. + magnet (str): The magnet URI. + + Attributes: + torrent_id (str): The torrent_id for this torrent + handle: Holds the libtorrent torrent handle + magnet (str): The magnet URI used to add this torrent (if available). + status: Holds status info so that we don"t need to keep getting it from libtorrent. + torrent_info: store the torrent info. + has_metadata (bool): True if the metadata for the torrent is available, False otherwise. + status_funcs (dict): The function mappings to get torrent status + prev_status (dict): Previous status dicts returned for this torrent. We use this to return + dicts that only contain changes from the previous. + {session_id: status_dict, ...} + waiting_on_folder_rename (list of dict): A list of Deferreds for file indexes we're waiting for file_rename + alerts on. This is so we can send one folder_renamed signal instead of multiple file_renamed signals. + [{index: Deferred, ...}, ...] + options (dict): The torrent options. + filename (str): The filename of the torrent file in case it is required. + is_finished (bool): Keep track if torrent is finished to prevent some weird things on state load. + statusmsg (str): Status message holds error/extra info about the torrent. + state (str): The torrent's state + trackers (list of dict): The torrent's trackers + tracker_status (str): Status message of currently connected tracker + tracker_host (str): Hostname of the currently connected tracker + forcing_recheck (bool): Keep track if we're forcing a recheck of the torrent + forcing_recheck_paused (bool): Keep track if we're forcing a recheck of the torrent so that + we can re-pause it after its done if necessary + forced_error (TorrentError): Keep track if we have forced this torrent to be in Error state. + """ + + def __init__(self, handle, options, state=None, filename=None, magnet=None): + self.torrent_id = str(handle.info_hash()) + if log.isEnabledFor(logging.DEBUG): + log.debug('Creating torrent object %s', self.torrent_id) + + # Get the core config + self.config = ConfigManager('core.conf') + self.rpcserver = component.get('RPCServer') + + self.handle = handle + + self.magnet = magnet + self._status: Optional['lt.torrent_status'] = None + self._status_last_update: float = 0.0 + + self.torrent_info = self.handle.torrent_file() + self.has_metadata = self.status.has_metadata + + self.options = TorrentOptions() + self.options.update(options) + + # Load values from state if we have it + if state: + self.set_trackers(state.trackers) + self.is_finished = state.is_finished + self.filename = state.filename + else: + self.set_trackers() + self.is_finished = False + self.filename = filename + + if not self.filename: + self.filename = '' + + self.forced_error = None + self.statusmsg = None + self.state = None + self.moving_storage_dest_path = None + self.tracker_status = '' + self.tracker_host = None + self.forcing_recheck = False + self.forcing_recheck_paused = False + self.status_funcs = None + self.prev_status = {} + self.waiting_on_folder_rename = [] + + self._create_status_funcs() + self.set_options(self.options) + self.update_state() + + if log.isEnabledFor(logging.DEBUG): + log.debug('Torrent object created.') + + def _set_handle_flags(self, flag: lt.torrent_flags, set_flag: bool): + """set or unset a flag to the lt handle + + Args: + flag (lt.torrent_flags): the flag to set/unset + set_flag (bool): True for setting the flag, False for unsetting it + """ + if set_flag: + self.handle.set_flags(flag) + else: + self.handle.unset_flags(flag) + + def on_metadata_received(self): + """Process the metadata received alert for this torrent""" + self.has_metadata = True + self.torrent_info = self.handle.get_torrent_info() + if self.options['prioritize_first_last_pieces']: + self.set_prioritize_first_last_pieces(True) + self.write_torrentfile() + + # --- Options methods --- + def set_options(self, options): + """Set the torrent options. + + Args: + options (dict): Torrent options, see TorrentOptions class for valid keys. + """ + + # Skip set_prioritize_first_last if set_file_priorities is in options as it also calls the method. + if 'file_priorities' in options and 'prioritize_first_last_pieces' in options: + self.options['prioritize_first_last_pieces'] = options.pop( + 'prioritize_first_last_pieces' + ) + + for key, value in options.items(): + if key in self.options: + options_set_func = getattr(self, 'set_' + key, None) + if options_set_func: + options_set_func(value) + else: + # Update config options that do not have funcs + self.options[key] = value + + def get_options(self): + """Get the torrent options. + + Returns: + dict: the torrent options. + """ + return self.options + + def set_max_connections(self, max_connections): + """Sets maximum number of connections this torrent will open. + + Args: + max_connections (int): Maximum number of connections + + Note: + The minimum value for handle.max_connections is 2 (or -1 for unlimited connections). + This is enforced by libtorrent and values 0 or 1 raise an assert with lt debug builds. + """ + + if max_connections == 0: + max_connections = -1 + elif max_connections == 1: + max_connections = 2 + + self.options['max_connections'] = max_connections + self.handle.set_max_connections(max_connections) + + def set_max_upload_slots(self, max_slots): + """Sets maximum number of upload slots for this torrent. + + Args: + max_slots (int): Maximum upload slots + """ + self.options['max_upload_slots'] = max_slots + self.handle.set_max_uploads(max_slots) + + def set_max_upload_speed(self, m_up_speed): + """Sets maximum upload speed for this torrent. + + Args: + m_up_speed (float): Maximum upload speed in KiB/s. + """ + self.options['max_upload_speed'] = m_up_speed + if m_up_speed < 0: + value = -1 + else: + value = int(m_up_speed * 1024) + self.handle.set_upload_limit(value) + + def set_max_download_speed(self, m_down_speed): + """Sets maximum download speed for this torrent. + + Args: + m_down_speed (float): Maximum download speed in KiB/s. + """ + self.options['max_download_speed'] = m_down_speed + if m_down_speed < 0: + value = -1 + else: + value = int(m_down_speed * 1024) + self.handle.set_download_limit(value) + + @deprecated + def set_prioritize_first_last(self, prioritize): + """Deprecated: Use set_prioritize_first_last_pieces.""" + self.set_prioritize_first_last_pieces(prioritize) + + def set_prioritize_first_last_pieces(self, prioritize): + """Prioritize the first and last pieces in the torrent. + + Args: + prioritize (bool): Prioritize the first and last pieces. + + """ + if not self.has_metadata: + return + + self.options['prioritize_first_last_pieces'] = prioritize + if not prioritize: + # If we are turning off this option, call set_file_priorities to + # reset all the piece priorities + self.set_file_priorities(self.options['file_priorities']) + return + + # A list of priorities for each piece in the torrent + priorities = self.handle.get_piece_priorities() + + def get_file_piece(idx, byte_offset): + return self.torrent_info.map_file(idx, byte_offset, 0).piece + + for idx in range(self.torrent_info.num_files()): + file_size = self.torrent_info.files().file_size(idx) + two_percent_bytes = int(0.02 * file_size) + # Get the pieces for the byte offsets + first_start = get_file_piece(idx, 0) + first_end = get_file_piece(idx, two_percent_bytes) + 1 + last_start = get_file_piece(idx, file_size - two_percent_bytes) + last_end = get_file_piece(idx, max(file_size - 1, 0)) + 1 + + # Set the pieces in first and last ranges to priority 7 + # if they are not marked as do not download + priorities[first_start:first_end] = [ + p and 7 for p in priorities[first_start:first_end] + ] + priorities[last_start:last_end] = [ + p and 7 for p in priorities[last_start:last_end] + ] + + # Setting the priorites for all the pieces of this torrent + self.handle.prioritize_pieces(priorities) + + def set_sequential_download(self, sequential): + """Sets whether to download the pieces of the torrent in order. + + Args: + sequential (bool): Enable sequential downloading. + """ + self.options['sequential_download'] = sequential + self._set_handle_flags( + flag=lt.torrent_flags.sequential_download, + set_flag=sequential, + ) + + def set_auto_managed(self, auto_managed): + """Set auto managed mode, i.e. will be started or queued automatically. + + Args: + auto_managed (bool): Enable auto managed. + """ + self.options['auto_managed'] = auto_managed + if not (self.status.paused and not self.status.auto_managed): + self._set_handle_flags( + flag=lt.torrent_flags.auto_managed, + set_flag=auto_managed, + ) + self.update_state() + + def set_super_seeding(self, super_seeding): + """Set super seeding/initial seeding. + + Args: + super_seeding (bool): Enable super seeding. + """ + self.options['super_seeding'] = super_seeding + self._set_handle_flags( + flag=lt.torrent_flags.super_seeding, + set_flag=super_seeding, + ) + + def set_stop_ratio(self, stop_ratio): + """The seeding ratio to stop (or remove) the torrent at. + + Args: + stop_ratio (float): The seeding ratio. + """ + self.options['stop_ratio'] = stop_ratio + + def set_stop_at_ratio(self, stop_at_ratio): + """Stop the torrent when it has reached stop_ratio. + + Args: + stop_at_ratio (bool): Stop the torrent. + """ + self.options['stop_at_ratio'] = stop_at_ratio + + def set_remove_at_ratio(self, remove_at_ratio): + """Remove the torrent when it has reached the stop_ratio. + + Args: + remove_at_ratio (bool): Remove the torrent. + """ + self.options['remove_at_ratio'] = remove_at_ratio + + def set_move_completed(self, move_completed): + """Set whether to move the torrent when downloading has finished. + + Args: + move_completed (bool): Move the torrent. + + """ + self.options['move_completed'] = move_completed + + def set_move_completed_path(self, move_completed_path): + """Set the path to move torrent to when downloading has finished. + + Args: + move_completed_path (str): The move path. + """ + self.options['move_completed_path'] = move_completed_path + + def set_file_priorities(self, file_priorities): + """Sets the file priotities. + + Args: + file_priorities (list of int): List of file priorities. + """ + if not self.has_metadata: + return + + if log.isEnabledFor(logging.DEBUG): + log.debug( + 'Setting %s file priorities to: %s', self.torrent_id, file_priorities + ) + + if file_priorities and len(file_priorities) == len(self.get_files()): + self.handle.prioritize_files(file_priorities) + else: + log.debug('Unable to set new file priorities.') + file_priorities = self.handle.get_file_priorities() + + if 0 in self.options['file_priorities']: + # Previously marked a file 'skip' so check for any 0's now >0. + for index, priority in enumerate(self.options['file_priorities']): + if priority == 0 and file_priorities[index] > 0: + # Changed priority from skip to download so update state. + self.is_finished = False + self.update_state() + break + + # Store the priorities. + self.options['file_priorities'] = file_priorities + + # Set the first/last priorities if needed. + if self.options['prioritize_first_last_pieces']: + self.set_prioritize_first_last_pieces(True) + + @deprecated + def set_save_path(self, download_location): + """Deprecated: Use set_download_location.""" + self.set_download_location(download_location) + + def set_download_location(self, download_location): + """The location for downloading torrent data.""" + self.options['download_location'] = download_location + + def set_owner(self, account): + """Sets the owner of this torrent. + + Args: + account (str): The new owner account name. + + Notes: + Only a user with admin level auth can change this value. + + """ + + if self.rpcserver.get_session_auth_level() == AUTH_LEVEL_ADMIN: + self.options['owner'] = account + + # End Options methods # + + def set_trackers(self, trackers=None): + """Sets the trackers for this torrent. + + Args: + trackers (list of dicts): A list of trackers. + """ + if trackers is None: + self.trackers = list(self.handle.trackers()) + self.tracker_host = None + return + + if log.isEnabledFor(logging.DEBUG): + log.debug('Setting trackers for %s: %s', self.torrent_id, trackers) + + tracker_list = [] + + for tracker in trackers: + new_entry = lt.announce_entry(str(tracker['url'])) + new_entry.tier = tracker['tier'] + tracker_list.append(new_entry) + self.handle.replace_trackers(tracker_list) + + # Print out the trackers + if log.isEnabledFor(logging.DEBUG): + log.debug('Trackers set for %s:', self.torrent_id) + for tracker in self.handle.trackers(): + log.debug(' [tier %s]: %s', tracker['tier'], tracker['url']) + # Set the tracker list in the torrent object + self.trackers = trackers + if len(trackers) > 0: + # Force a re-announce if there is at least 1 tracker + self.force_reannounce() + self.tracker_host = None + + def set_tracker_status(self, status): + """Sets the tracker status. + + Args: + status (str): The tracker status. + + Emits: + TorrentTrackerStatusEvent upon tracker status change. + + """ + + self.tracker_host = None + + if self.tracker_status != status: + self.tracker_status = status + component.get('EventManager').emit( + TorrentTrackerStatusEvent(self.torrent_id, self.tracker_status) + ) + + def merge_trackers(self, torrent_info): + """Merges new trackers in torrent_info into torrent""" + log.info( + 'Adding any new trackers to torrent (%s) already in session...', + self.torrent_id, + ) + if not torrent_info: + return + # Don't merge trackers if either torrent has private flag set. + if torrent_info.priv() or self.get_status(['private'])['private']: + log.info('Adding trackers aborted: Torrent has private flag set.') + else: + for tracker in torrent_info.trackers(): + self.handle.add_tracker({'url': tracker.url, 'tier': tracker.tier}) + # Update torrent.trackers from libtorrent handle. + self.set_trackers() + + def update_state(self): + """Updates the state, based on libtorrent's torrent state""" + status = self.get_lt_status() + session_paused = component.get('Core').session.is_paused() + old_state = self.state + self.set_status_message() + status_error = status.errc.message() if status.errc.value() else '' + + if self.forced_error: + self.state = 'Error' + self.set_status_message(self.forced_error.error_message) + elif status_error: + self.state = 'Error' + # auto-manage status will be reverted upon resuming. + self._set_handle_flags( + flag=lt.torrent_flags.auto_managed, + set_flag=False, + ) + self.set_status_message(decode_bytes(status_error)) + elif status.moving_storage: + self.state = 'Moving' + elif not session_paused and status.paused and status.auto_managed: + self.state = 'Queued' + elif session_paused or status.paused: + self.state = 'Paused' + else: + self.state = LT_TORRENT_STATE_MAP.get(str(status.state), str(status.state)) + + if self.state != old_state: + component.get('EventManager').emit( + TorrentStateChangedEvent(self.torrent_id, self.state) + ) + + if log.isEnabledFor(logging.DEBUG): + log.debug( + 'State from lt was: %s | Session is paused: %s\nTorrent state set from "%s" to "%s" (%s)', + 'error' if status_error else status.state, + session_paused, + old_state, + self.state, + self.torrent_id, + ) + if self.forced_error: + log.debug( + 'Torrent Error state message: %s', self.forced_error.error_message + ) + + def set_status_message(self, message=None): + """Sets the torrent status message. + + Calling method without a message will reset the message to 'OK'. + + Args: + message (str, optional): The status message. + + """ + if not message: + message = 'OK' + self.statusmsg = message + + def force_error_state(self, message, restart_to_resume=True): + """Forces the torrent into an error state. + + For setting an error state not covered by libtorrent. + + Args: + message (str): The error status message. + restart_to_resume (bool, optional): Prevent resuming clearing the error, only restarting + session can resume. + """ + status = self.get_lt_status() + self._set_handle_flags( + flag=lt.torrent_flags.auto_managed, + set_flag=False, + ) + self.forced_error = TorrentError(message, status.paused, restart_to_resume) + if not status.paused: + self.handle.pause() + self.update_state() + + def clear_forced_error_state(self, update_state=True): + if not self.forced_error: + return + + if self.forced_error.restart_to_resume: + log.error('Restart deluge to clear this torrent error') + + if not self.forced_error.was_paused and self.options['auto_managed']: + self._set_handle_flags( + flag=lt.torrent_flags.auto_managed, + set_flag=True, + ) + self.forced_error = None + self.set_status_message('OK') + if update_state: + self.update_state() + + def get_eta(self): + """Get the ETA for this torrent. + + Returns: + int: The ETA in seconds. + + """ + status = self.status + eta = 0 + if ( + self.is_finished + and self.options['stop_at_ratio'] + and status.upload_payload_rate + ): + # We're a seed, so calculate the time to the 'stop_share_ratio' + eta = ( + int(status.all_time_download * self.options['stop_ratio']) + - status.all_time_upload + ) // status.upload_payload_rate + elif status.download_payload_rate: + left = status.total_wanted - status.total_wanted_done + if left > 0: + eta = left // status.download_payload_rate + + # Limit to 1 year, avoid excessive values and prevent GTK int overflow. + return eta if eta < 31557600 else -1 + + def get_ratio(self): + """Get the ratio of upload/download for this torrent. + + Returns: + float: The ratio or -1.0 (for infinity). + + """ + if self.status.total_done > 0: + return self.status.all_time_upload / self.status.total_done + else: + return -1.0 + + def get_files(self): + """Get the files this torrent contains. + + Returns: + list of dict: The files. + + """ + if not self.has_metadata: + return [] + + files = self.torrent_info.files() + return convert_lt_files(files) + + def get_orig_files(self): + """Get the original filenames of files in this torrent. + + Returns: + list of dict: The files with original filenames. + + """ + if not self.has_metadata: + return [] + + files = self.torrent_info.orig_files() + return convert_lt_files(files) + + def get_peers(self): + """Get the peers for this torrent. + + A list of peers and various information about them. + + Returns: + list of dict: The peers. + + The format for the peer dict:: + + { + "client": str, + "country": str, + "down_speed": int, + "ip": str, + "progress": float, + "seed": bool, + "up_speed": int + } + """ + ret = [] + peers = self.handle.get_peer_info() + + for peer in peers: + # We do not want to report peers that are half-connected + if peer.flags & peer.connecting or peer.flags & peer.handshake: + continue + + try: + client = decode_bytes(peer.client) + except UnicodeDecodeError: + # libtorrent on Py3 can raise UnicodeDecodeError for peer_info.client + client = 'unknown' + + try: + country = component.get('Core').geoip_instance.country_code_by_addr( + peer.ip[0] + ) + except AttributeError: + country = '' + else: + try: + country = ''.join( + [char if char.isalpha() else ' ' for char in country] + ) + except TypeError: + country = '' + + ret.append( + { + 'client': client, + 'country': country, + 'down_speed': peer.payload_down_speed, + 'ip': f'{peer.ip[0]}:{peer.ip[1]}', + 'progress': peer.progress, + 'seed': peer.flags & peer.seed, + 'up_speed': peer.payload_up_speed, + } + ) + + return ret + + def get_queue_position(self): + """Get the torrents queue position + + Returns: + int: queue position + """ + return self.handle.queue_position() + + def get_file_priorities(self): + """Return the file priorities""" + if not self.handle.status().has_metadata: + return [] + + if not self.options['file_priorities']: + # Ensure file_priorities option is populated. + self.set_file_priorities([]) + + return self.options['file_priorities'] + + def get_file_progress(self): + """Calculates the file progress as a percentage. + + Returns: + list of floats: The file progress (0.0 -> 1.0), empty list if n/a. + """ + if not self.has_metadata: + return [] + + try: + files_progresses = zip( + self.handle.file_progress(), self.torrent_info.files() + ) + except Exception: + # Handle libtorrent >=2.0.0,<=2.0.4 file_progress error + files_progresses = zip(iter(lambda: 0, 1), self.torrent_info.files()) + + return [ + progress / _file.size if _file.size else 0.0 + for progress, _file in files_progresses + ] + + def get_tracker_host(self): + """Get the hostname of the currently connected tracker. + + If no tracker is connected, it uses the 1st tracker. + + Returns: + str: The tracker host + """ + if self.tracker_host: + return self.tracker_host + + tracker = self.status.current_tracker + if not tracker and self.trackers: + tracker = self.trackers[0]['url'] + + if tracker: + url = urlparse(tracker.replace('udp://', 'http://')) + if hasattr(url, 'hostname'): + host = url.hostname or 'DHT' + # Check if hostname is an IP address and just return it if that's the case + try: + socket.inet_aton(host) + except OSError: + pass + else: + # This is an IP address because an exception wasn't raised + return url.hostname + + parts = host.split('.') + if len(parts) > 2: + if parts[-2] in ('co', 'com', 'net', 'org') or parts[-1] == 'uk': + host = '.'.join(parts[-3:]) + else: + host = '.'.join(parts[-2:]) + self.tracker_host = host + return host + return '' + + def get_magnet_uri(self): + """Returns a magnet URI for this torrent""" + return lt.make_magnet_uri(self.handle) + + def get_name(self): + """The name of the torrent (distinct from the filenames). + + Note: + Can be manually set in options through `name` key. If the key is + reset to empty string "" it will return the original torrent name. + + Returns: + str: the name of the torrent. + + """ + if self.options['name']: + return self.options['name'] + + if self.has_metadata: + # Use the top-level folder as torrent name. + filename = decode_bytes(self.torrent_info.files().file_path(0)) + name = filename.replace('\\', '/', 1).split('/', 1)[0] + else: + name = decode_bytes(self.handle.status().name) + + if not name: + name = self.torrent_id + + return name + + def get_progress(self): + """The progress of this torrent's current task. + + Returns: + float: The progress percentage (0 to 100). + + """ + + def get_size(files, path): + """Returns total size of 'files' currently located in 'path'""" + files = [os.path.join(path, f) for f in files] + return sum(os.stat(f).st_size for f in files if os.path.exists(f)) + + if self.state == 'Error': + progress = 100.0 + elif self.state == 'Moving': + # Check if torrent has downloaded any data yet. + if self.status.total_done: + torrent_files = [f['path'] for f in self.get_files()] + dest_path_size = get_size(torrent_files, self.moving_storage_dest_path) + progress = dest_path_size / self.status.total_done * 100 + else: + progress = 100.0 + else: + progress = self.status.progress * 100 + + return progress + + def get_time_since_transfer(self): + """The time since either upload/download from peers""" + time_since = (self.status.time_since_download, self.status.time_since_upload) + try: + return min(x for x in time_since if x != -1) + except ValueError: + return -1 + + def get_status(self, keys, diff=False, update=False, all_keys=False): + """Returns the status of the torrent based on the keys provided + + Args: + keys (list of str): the keys to get the status on + diff (bool): Will return a diff of the changes since the last + call to get_status based on the session_id + update (bool): If True the status will be updated from libtorrent + if False, the cached values will be returned + all_keys (bool): If True return all keys while ignoring the keys param + if False, return only the requested keys + + Returns: + dict: a dictionary of the status keys and their values + """ + if update: + self.get_lt_status() + + if all_keys: + keys = list(self.status_funcs) + + status_dict = {} + + for key in keys: + status_dict[key] = self.status_funcs[key]() + + if diff: + session_id = self.rpcserver.get_session_id() + if session_id in self.prev_status: + # We have a previous status dict, so lets make a diff + status_diff = {} + for key, value in status_dict.items(): + if key in self.prev_status[session_id]: + if value != self.prev_status[session_id][key]: + status_diff[key] = value + else: + status_diff[key] = value + + self.prev_status[session_id] = status_dict + return status_diff + + self.prev_status[session_id] = status_dict + return status_dict + + return status_dict + + def get_lt_status(self) -> 'lt.torrent_status': + """Get the torrent status fresh, not from cache. + + This should be used when a guaranteed fresh status is needed rather than + `torrent.handle.status()` because it will update the cache as well. + """ + self.status = self.handle.status() + return self.status + + @property + def status(self) -> 'lt.torrent_status': + """Cached copy of the libtorrent status for this torrent. + + If it has not been updated within the last five seconds, it will be + automatically refreshed. + """ + if self._status_last_update < (time.time() - 5): + self.status = self.handle.status() + return self._status + + @status.setter + def status(self, status: 'lt.torrent_status') -> None: + """Updates the cached status. + + Args: + status: a libtorrent torrent status + """ + self._status = status + self._status_last_update = time.time() + + def _create_status_funcs(self): + """Creates the functions for getting torrent status""" + self.status_funcs = { + 'active_time': lambda: self.status.active_time, + 'seeding_time': lambda: self.status.seeding_time, + 'finished_time': lambda: self.status.finished_time, + 'all_time_download': lambda: self.status.all_time_download, + 'storage_mode': lambda: self.status.storage_mode.name.split('_')[ + 2 + ], # sparse or allocate + 'distributed_copies': lambda: max(0.0, self.status.distributed_copies), + 'download_payload_rate': lambda: self.status.download_payload_rate, + 'file_priorities': self.get_file_priorities, + 'hash': lambda: self.torrent_id, + 'auto_managed': lambda: self.options['auto_managed'], + 'is_auto_managed': lambda: self.options['auto_managed'], + 'is_finished': lambda: self.is_finished, + 'max_connections': lambda: self.options['max_connections'], + 'max_download_speed': lambda: self.options['max_download_speed'], + 'max_upload_slots': lambda: self.options['max_upload_slots'], + 'max_upload_speed': lambda: self.options['max_upload_speed'], + 'message': lambda: self.statusmsg, + 'move_on_completed_path': lambda: self.options[ + 'move_completed_path' + ], # Deprecated: move_completed_path + 'move_on_completed': lambda: self.options[ + 'move_completed' + ], # Deprecated: Use move_completed + 'move_completed_path': lambda: self.options['move_completed_path'], + 'move_completed': lambda: self.options['move_completed'], + 'next_announce': lambda: self.status.next_announce.seconds, + 'num_peers': lambda: self.status.num_peers - self.status.num_seeds, + 'num_seeds': lambda: self.status.num_seeds, + 'owner': lambda: self.options['owner'], + 'paused': lambda: self.status.paused, + 'prioritize_first_last': lambda: self.options[ + 'prioritize_first_last_pieces' + ], + # Deprecated: Use prioritize_first_last_pieces + 'prioritize_first_last_pieces': lambda: self.options[ + 'prioritize_first_last_pieces' + ], + 'sequential_download': lambda: self.options['sequential_download'], + 'progress': self.get_progress, + 'shared': lambda: self.options['shared'], + 'remove_at_ratio': lambda: self.options['remove_at_ratio'], + 'save_path': lambda: self.options[ + 'download_location' + ], # Deprecated: Use download_location + 'download_location': lambda: self.options['download_location'], + 'seeds_peers_ratio': lambda: -1.0 + if self.status.num_incomplete == 0 + else ( # Use -1.0 to signify infinity + self.status.num_complete / self.status.num_incomplete + ), + 'seed_rank': lambda: self.status.seed_rank, + 'state': lambda: self.state, + 'stop_at_ratio': lambda: self.options['stop_at_ratio'], + 'stop_ratio': lambda: self.options['stop_ratio'], + 'time_added': lambda: self.status.added_time, + 'total_done': lambda: self.status.total_done, + 'total_payload_download': lambda: self.status.total_payload_download, + 'total_payload_upload': lambda: self.status.total_payload_upload, + 'total_peers': lambda: self.status.num_incomplete, + 'total_seeds': lambda: self.status.num_complete, + 'total_uploaded': lambda: self.status.all_time_upload, + 'total_wanted': lambda: self.status.total_wanted, + 'total_remaining': lambda: self.status.total_wanted + - self.status.total_wanted_done, + 'tracker': lambda: self.status.current_tracker, + 'tracker_host': self.get_tracker_host, + 'trackers': lambda: self.trackers, + 'tracker_status': lambda: self.tracker_status, + 'upload_payload_rate': lambda: self.status.upload_payload_rate, + 'comment': lambda: decode_bytes(self.torrent_info.comment()) + if self.has_metadata + else '', + 'creator': lambda: decode_bytes(self.torrent_info.creator()) + if self.has_metadata + else '', + 'num_files': lambda: self.torrent_info.num_files() + if self.has_metadata + else 0, + 'num_pieces': lambda: self.torrent_info.num_pieces() + if self.has_metadata + else 0, + 'piece_length': lambda: self.torrent_info.piece_length() + if self.has_metadata + else 0, + 'private': lambda: self.torrent_info.priv() if self.has_metadata else False, + 'total_size': lambda: self.torrent_info.total_size() + if self.has_metadata + else 0, + 'eta': self.get_eta, + 'file_progress': self.get_file_progress, + 'files': self.get_files, + 'orig_files': self.get_orig_files, + 'is_seed': lambda: self.status.is_seeding, + 'peers': self.get_peers, + 'queue': lambda: self.status.queue_position, + 'ratio': self.get_ratio, + 'completed_time': lambda: self.status.completed_time, + 'last_seen_complete': lambda: self.status.last_seen_complete, + 'name': self.get_name, + 'pieces': self._get_pieces_info, + 'seed_mode': lambda: self.status.seed_mode, + 'super_seeding': lambda: self.status.super_seeding, + 'time_since_download': lambda: self.status.time_since_download, + 'time_since_upload': lambda: self.status.time_since_upload, + 'time_since_transfer': self.get_time_since_transfer, + } + + def pause(self): + """Pause this torrent. + + Returns: + bool: True is successful, otherwise False. + + """ + # Turn off auto-management so the torrent will not be unpaused by lt queueing + self._set_handle_flags( + flag=lt.torrent_flags.auto_managed, + set_flag=False, + ) + if self.state == 'Error': + log.debug('Unable to pause torrent while in Error state') + elif self.status.paused: + # This torrent was probably paused due to being auto managed by lt + # Since we turned auto_managed off, we should update the state which should + # show it as 'Paused'. We need to emit a torrent_paused signal because + # the torrent_paused alert from libtorrent will not be generated. + self.update_state() + component.get('EventManager').emit( + TorrentStateChangedEvent(self.torrent_id, 'Paused') + ) + else: + try: + self.handle.pause() + except RuntimeError as ex: + log.debug('Unable to pause torrent: %s', ex) + + def resume(self): + """Resumes this torrent.""" + if self.status.paused and self.status.auto_managed: + log.debug('Resume not possible for auto-managed torrent!') + elif self.forced_error and self.forced_error.was_paused: + log.debug( + 'Resume skipped for forced_error torrent as it was originally paused.' + ) + elif ( + self.status.is_finished + and self.options['stop_at_ratio'] + and self.get_ratio() >= self.options['stop_ratio'] + ): + log.debug('Resume skipped for torrent as it has reached "stop_seed_ratio".') + else: + # Check if torrent was originally being auto-managed. + if self.options['auto_managed']: + self._set_handle_flags( + flag=lt.torrent_flags.auto_managed, + set_flag=True, + ) + try: + self.handle.resume() + except RuntimeError as ex: + log.debug('Unable to resume torrent: %s', ex) + + # Clear torrent error state. + if self.forced_error and not self.forced_error.restart_to_resume: + self.clear_forced_error_state() + elif self.state == 'Error' and not self.forced_error: + self.handle.clear_error() + + def connect_peer(self, peer_ip, peer_port): + """Manually add a peer to the torrent + + Args: + peer_ip (str) : Peer IP Address + peer_port (int): Peer Port + + Returns: + bool: True is successful, otherwise False + """ + try: + self.handle.connect_peer((peer_ip, int(peer_port)), 0) + except (RuntimeError, ValueError) as ex: + log.debug('Unable to connect to peer: %s', ex) + return False + return True + + def move_storage(self, dest): + """Move a torrent's storage location + + Args: + dest (str): The destination folder for the torrent data + + Returns: + bool: True if successful, otherwise False + + """ + dest = decode_bytes(dest) + + if not os.path.exists(dest): + try: + os.makedirs(dest) + except OSError as ex: + log.error( + 'Could not move storage for torrent %s since %s does ' + 'not exist and could not create the directory: %s', + self.torrent_id, + dest, + ex, + ) + return False + + try: + # lt needs utf8 byte-string. Otherwise if wstrings enabled, unicode string. + # Keyword argument flags=2 (dont_replace) dont overwrite target files but delete source. + try: + self.handle.move_storage(dest.encode('utf8'), flags=2) + except TypeError: + self.handle.move_storage(dest, flags=2) + except RuntimeError as ex: + log.error('Error calling libtorrent move_storage: %s', ex) + return False + self.moving_storage_dest_path = dest + self.update_state() + return True + + def save_resume_data(self, flush_disk_cache=False): + """Signals libtorrent to build resume data for this torrent. + + Args: + flush_disk_cache (bool): Avoids potential issue with file timestamps + and is only needed when stopping the session. + + Returns: + None: The response with resume data is returned in a libtorrent save_resume_data_alert. + + """ + if log.isEnabledFor(logging.DEBUG): + log.debug('Requesting save_resume_data for torrent: %s', self.torrent_id) + flags = lt.save_resume_flags_t.flush_disk_cache if flush_disk_cache else 0 + # Don't generate fastresume data if torrent is in a Deluge Error state. + if self.forced_error: + component.get('TorrentManager').waiting_on_resume_data[ + self.torrent_id + ].errback(UserWarning('Skipped creating resume_data while in Error state')) + else: + self.handle.save_resume_data(flags) + + def write_torrentfile(self, filedump=None): + """Writes the torrent file to the state dir and optional 'copy of' dir. + + Args: + filedump (str, optional): bencoded filedump of a torrent file. + + """ + + def write_file(filepath, filedump): + """Write out the torrent file""" + log.debug('Writing torrent file to: %s', filepath) + try: + with open(filepath, 'wb') as save_file: + save_file.write(filedump) + except OSError as ex: + log.error('Unable to save torrent file to: %s', ex) + + filepath = os.path.join(get_config_dir(), 'state', self.torrent_id + '.torrent') + + if filedump is None: + lt_ct = lt.create_torrent(self.torrent_info) + filedump = lt.bencode(lt_ct.generate()) + + write_file(filepath, filedump) + + # If the user has requested a copy of the torrent be saved elsewhere we need to do that. + if self.config['copy_torrent_file']: + if not self.filename: + self.filename = self.get_name() + '.torrent' + filepath = os.path.join(self.config['torrentfiles_location'], self.filename) + write_file(filepath, filedump) + + def delete_torrentfile(self, delete_copies=False): + """Deletes the .torrent file in the state directory in config""" + torrent_files = [ + os.path.join(get_config_dir(), 'state', self.torrent_id + '.torrent') + ] + if delete_copies and self.filename: + torrent_files.append( + os.path.join(self.config['torrentfiles_location'], self.filename) + ) + + for torrent_file in torrent_files: + log.debug('Deleting torrent file: %s', torrent_file) + try: + os.remove(torrent_file) + except OSError as ex: + log.warning('Unable to delete the torrent file: %s', ex) + + def force_reannounce(self): + """Force a tracker reannounce""" + try: + self.handle.force_reannounce() + except RuntimeError as ex: + log.debug('Unable to force reannounce: %s', ex) + return False + return True + + def scrape_tracker(self): + """Scrape the tracker + + A scrape request queries the tracker for statistics such as total + number of incomplete peers, complete peers, number of downloads etc. + """ + try: + self.handle.scrape_tracker() + except RuntimeError as ex: + log.debug('Unable to scrape tracker: %s', ex) + return False + return True + + def force_recheck(self): + """Forces a recheck of the torrent's pieces""" + if self.forced_error: + self.forcing_recheck_paused = self.forced_error.was_paused + self.clear_forced_error_state(update_state=False) + else: + self.forcing_recheck_paused = self.status.paused + + try: + self.handle.force_recheck() + self.handle.resume() + self.forcing_recheck = True + except RuntimeError as ex: + log.debug('Unable to force recheck: %s', ex) + self.forcing_recheck = False + return self.forcing_recheck + + def rename_files(self, filenames): + """Renames files in the torrent. + + Args: + filenames (list): A list of (index, filename) pairs. + """ + for index, filename in filenames: + # Make sure filename is a sanitized unicode string. + filename = sanitize_filepath(decode_bytes(filename)) + # lt needs utf8 byte-string. Otherwise if wstrings enabled, unicode string. + try: + self.handle.rename_file(index, filename.encode('utf8')) + except (UnicodeDecodeError, TypeError): + self.handle.rename_file(index, filename) + + def rename_folder(self, folder, new_folder): + """Renames a folder within a torrent. + + This basically does a file rename on all of the folders children. + + Args: + folder (str): The original folder name + new_folder (str): The new folder name + + Returns: + twisted.internet.defer.Deferred: A deferred which fires when the rename is complete + """ + log.debug('Attempting to rename folder: %s to %s', folder, new_folder) + + # Empty string means remove the dir and move its content to the parent + if len(new_folder) > 0: + new_folder = sanitize_filepath(new_folder, folder=True) + + def on_file_rename_complete(dummy_result, wait_dict, index): + """File rename complete""" + wait_dict.pop(index, None) + + wait_on_folder = {} + self.waiting_on_folder_rename.append(wait_on_folder) + for _file in self.get_files(): + if _file['path'].startswith(folder): + # Keep track of filerenames we're waiting on + wait_on_folder[_file['index']] = Deferred().addBoth( + on_file_rename_complete, wait_on_folder, _file['index'] + ) + new_path = _file['path'].replace(folder, new_folder, 1) + try: + self.handle.rename_file(_file['index'], new_path.encode('utf8')) + except (UnicodeDecodeError, TypeError): + self.handle.rename_file(_file['index'], new_path) + + def on_folder_rename_complete(dummy_result, torrent, folder, new_folder): + """Folder rename complete""" + component.get('EventManager').emit( + TorrentFolderRenamedEvent(torrent.torrent_id, folder, new_folder) + ) + # Empty folders are removed after libtorrent folder renames + self.remove_empty_folders(folder) + torrent.waiting_on_folder_rename = [ + _dir for _dir in torrent.waiting_on_folder_rename if _dir + ] + component.get('TorrentManager').save_resume_data((self.torrent_id,)) + + d = DeferredList(list(wait_on_folder.values())) + d.addBoth(on_folder_rename_complete, self, folder, new_folder) + return d + + def remove_empty_folders(self, folder): + """Recursively removes folders but only if they are empty. + + This cleans up after libtorrent folder renames. + + Args: + folder (str): The folder to recursively check + """ + # Removes leading slashes that can cause join to ignore download_location + download_location = self.options['download_location'] + folder_full_path = os.path.normpath( + os.path.join(download_location, folder.lstrip('\\/')) + ) + + try: + if not os.listdir(folder_full_path): + os.removedirs(folder_full_path) + log.debug('Removed Empty Folder %s', folder_full_path) + else: + for root, dirs, dummy_files in os.walk(folder_full_path, topdown=False): + for name in dirs: + try: + os.removedirs(os.path.join(root, name)) + log.debug( + 'Removed Empty Folder %s', os.path.join(root, name) + ) + except OSError as ex: + log.debug(ex) + + except OSError as ex: + log.debug('Cannot Remove Folder: %s', ex) + + def cleanup_prev_status(self): + """Checks the validity of the keys in the prev_status dict. + + If the key is no longer valid, the dict will be deleted. + """ + # Dict will be modified so iterate over generated list + for key in list(self.prev_status): + if not self.rpcserver.is_session_valid(key): + del self.prev_status[key] + + def _get_pieces_info(self): + """Get the pieces for this torrent.""" + if not self.has_metadata or self.status.is_seeding: + pieces = None + else: + pieces = [] + for piece, avail_piece in zip( + self.status.pieces, self.handle.piece_availability() + ): + if piece: + pieces.append(3) # Completed. + elif avail_piece: + pieces.append( + 1 + ) # Available, just not downloaded nor being downloaded. + else: + pieces.append( + 0 + ) # Missing, no known peer with piece, or not asked for yet. + + for peer_info in self.handle.get_peer_info(): + if peer_info.downloading_piece_index >= 0: + pieces[ + peer_info.downloading_piece_index + ] = 2 # Being downloaded from peer. + + return pieces diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py new file mode 100644 index 0000000..c43a7a2 --- /dev/null +++ b/deluge/core/torrentmanager.py @@ -0,0 +1,1700 @@ +# +# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# + +"""TorrentManager handles Torrent objects""" +import datetime +import logging +import operator +import os +import pickle +import time +from base64 import b64encode +from tempfile import gettempdir +from typing import Dict, List, NamedTuple, Tuple + +from twisted.internet import defer, reactor, threads +from twisted.internet.defer import Deferred, DeferredList +from twisted.internet.task import LoopingCall + +import deluge.component as component +from deluge._libtorrent import LT_VERSION, lt +from deluge.common import ( + VersionSplit, + archive_files, + decode_bytes, + get_magnet_info, + is_magnet, +) +from deluge.configmanager import ConfigManager, get_config_dir +from deluge.core.authmanager import AUTH_LEVEL_ADMIN +from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath +from deluge.decorators import maybe_coroutine +from deluge.error import AddTorrentError, InvalidTorrentError +from deluge.event import ( + ExternalIPEvent, + PreTorrentRemovedEvent, + SessionStartedEvent, + TorrentAddedEvent, + TorrentFileCompletedEvent, + TorrentFileRenamedEvent, + TorrentFinishedEvent, + TorrentRemovedEvent, + TorrentResumedEvent, +) + +log = logging.getLogger(__name__) + +LT_DEFAULT_ADD_TORRENT_FLAGS = ( + lt.torrent_flags.paused + | lt.torrent_flags.auto_managed + | lt.torrent_flags.update_subscribe + | lt.torrent_flags.apply_ip_filter +) + + +class PrefetchQueueItem(NamedTuple): + alert_deferred: Deferred + result_queue: List[Deferred] + + +class TorrentState: # pylint: disable=old-style-class + """Create a torrent state. + + Note: + This must be old style class to avoid breaking torrent.state file. + + """ + + def __init__( + self, + torrent_id=None, + filename=None, + trackers=None, + storage_mode='sparse', + paused=False, + save_path=None, + max_connections=-1, + max_upload_slots=-1, + max_upload_speed=-1.0, + max_download_speed=-1.0, + prioritize_first_last=False, + sequential_download=False, + file_priorities=None, + queue=None, + auto_managed=True, + is_finished=False, + stop_ratio=2.00, + stop_at_ratio=False, + remove_at_ratio=False, + move_completed=False, + move_completed_path=None, + magnet=None, + owner=None, + shared=False, + super_seeding=False, + name=None, + ): + # Build the class attribute list from args + for key, value in locals().items(): + if key == 'self': + continue + setattr(self, key, value) + + def __eq__(self, other): + return isinstance(other, TorrentState) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self == other + + +class TorrentManagerState: # pylint: disable=old-style-class + """TorrentManagerState holds a list of TorrentState objects. + + Note: + This must be old style class to avoid breaking torrent.state file. + + """ + + def __init__(self): + self.torrents = [] + + def __eq__(self, other): + return ( + isinstance(other, TorrentManagerState) and self.torrents == other.torrents + ) + + def __ne__(self, other): + return not self == other + + +class TorrentManager(component.Component): + """TorrentManager contains a list of torrents in the current libtorrent session. + + This object is also responsible for saving the state of the session for use on restart. + + """ + + # This is used in the test to mock out timeouts + clock = reactor + + def __init__(self): + component.Component.__init__( + self, + 'TorrentManager', + interval=5, + depend=['CorePluginManager', 'AlertManager'], + ) + log.debug('TorrentManager init...') + # Set the libtorrent session + self.session = component.get('Core').session + # Set the alertmanager + self.alerts = component.get('AlertManager') + # Get the core config + self.config = ConfigManager('core.conf') + + # Make sure the state folder has been created + self.state_dir = os.path.join(get_config_dir(), 'state') + if not os.path.exists(self.state_dir): + os.makedirs(self.state_dir) + self.temp_file = os.path.join(self.state_dir, '.safe_state_check') + + # Create the torrents dict { torrent_id: Torrent } + self.torrents = {} + self.queued_torrents = set() + self.is_saving_state = False + self.save_resume_data_file_lock = defer.DeferredLock() + self.torrents_loading = {} + self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {} + + # This is a map of torrent_ids to Deferreds used to track needed resume data. + # The Deferreds will be completed when resume data has been saved. + self.waiting_on_resume_data = {} + + # Keep track of torrents finished but moving storage + self.waiting_on_finish_moving = [] + + # Keeps track of resume data + self.resume_data = {} + + self.torrents_status_requests = [] + self.status_dict = {} + self.last_state_update_alert_ts = 0 + + # Keep the previous saved state + self.prev_saved_state = None + + # Register set functions + set_config_keys = [ + 'max_connections_per_torrent', + 'max_upload_slots_per_torrent', + 'max_upload_speed_per_torrent', + 'max_download_speed_per_torrent', + ] + + for config_key in set_config_keys: + on_set_func = getattr(self, ''.join(['on_set_', config_key])) + self.config.register_set_function(config_key, on_set_func) + + # Register alert functions + alert_handles = [ + 'external_ip', + 'performance', + 'add_torrent', + 'metadata_received', + 'torrent_finished', + 'torrent_paused', + 'torrent_checked', + 'torrent_resumed', + 'tracker_reply', + 'tracker_announce', + 'tracker_warning', + 'tracker_error', + 'file_renamed', + 'file_error', + 'file_completed', + 'storage_moved', + 'storage_moved_failed', + 'state_update', + 'state_changed', + 'save_resume_data', + 'save_resume_data_failed', + 'fastresume_rejected', + ] + + for alert_handle in alert_handles: + on_alert_func = getattr(self, ''.join(['on_alert_', alert_handle])) + self.alerts.register_handler(alert_handle, on_alert_func) + + # Define timers + self.save_state_timer = LoopingCall(self.save_state) + self.save_resume_data_timer = LoopingCall(self.save_resume_data) + self.prev_status_cleanup_loop = LoopingCall(self.cleanup_torrents_prev_status) + + def start(self): + # Check for old temp file to verify safe shutdown + if os.path.isfile(self.temp_file): + self.archive_state('Bad shutdown detected so archiving state files') + os.remove(self.temp_file) + + with open(self.temp_file, 'a'): + os.utime(self.temp_file, None) + + # Try to load the state from file + self.load_state() + + # Save the state periodically + self.save_state_timer.start(200, False) + self.save_resume_data_timer.start(190, False) + self.prev_status_cleanup_loop.start(10) + + @maybe_coroutine + async def stop(self): + # Stop timers + if self.save_state_timer.running: + self.save_state_timer.stop() + + if self.save_resume_data_timer.running: + self.save_resume_data_timer.stop() + + if self.prev_status_cleanup_loop.running: + self.prev_status_cleanup_loop.stop() + + # Save state on shutdown + await self.save_state() + + self.session.pause() + + result = await self.save_resume_data(flush_disk_cache=True) + # Remove the temp_file to signify successfully saved state + if result and os.path.isfile(self.temp_file): + os.remove(self.temp_file) + + def update(self): + for torrent_id, torrent in self.torrents.items(): + # XXX: Should the state check be those that _can_ be stopped at ratio + if torrent.options['stop_at_ratio'] and torrent.state not in ( + 'Checking', + 'Allocating', + 'Paused', + 'Queued', + ): + if ( + torrent.get_ratio() >= torrent.options['stop_ratio'] + and torrent.is_finished + ): + if torrent.options['remove_at_ratio']: + self.remove(torrent_id) + break + + torrent.pause() + + def __getitem__(self, torrent_id): + """Return the Torrent with torrent_id. + + Args: + torrent_id (str): The torrent_id. + + Returns: + Torrent: A torrent object. + + """ + return self.torrents[torrent_id] + + def get_torrent_list(self): + """Creates a list of torrent_ids, owned by current user and any marked shared. + + Returns: + list: A list of torrent_ids. + + """ + torrent_ids = list(self.torrents) + if component.get('RPCServer').get_session_auth_level() == AUTH_LEVEL_ADMIN: + return torrent_ids + + current_user = component.get('RPCServer').get_session_user() + for torrent_id in torrent_ids[:]: + torrent_status = self.torrents[torrent_id].get_status(['owner', 'shared']) + if torrent_status['owner'] != current_user and not torrent_status['shared']: + torrent_ids.pop(torrent_ids.index(torrent_id)) + return torrent_ids + + def get_torrent_info_from_file(self, filepath): + """Retrieves torrent_info from the file specified. + + Args: + filepath (str): The filepath to extract torrent info from. + + Returns: + lt.torrent_info: A libtorrent torrent_info dict or None if invalid file or data. + + """ + # Get the torrent data from the torrent file + if log.isEnabledFor(logging.DEBUG): + log.debug('Attempting to extract torrent_info from %s', filepath) + try: + torrent_info = lt.torrent_info(filepath) + except RuntimeError as ex: + log.warning('Unable to open torrent file %s: %s', filepath, ex) + else: + return torrent_info + + @maybe_coroutine + async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]: + """Download the metadata for a magnet URI. + + Args: + magnet: A magnet URI to download the metadata for. + timeout: Number of seconds to wait before canceling. + + Returns: + A tuple of (torrent_id, metadata) + + """ + + torrent_id = get_magnet_info(magnet)['info_hash'] + if torrent_id in self.prefetching_metadata: + d = Deferred() + self.prefetching_metadata[torrent_id].result_queue.append(d) + return await d + + add_torrent_params = lt.parse_magnet_uri(magnet) + add_torrent_params.save_path = gettempdir() + add_torrent_params.flags = ( + ( + LT_DEFAULT_ADD_TORRENT_FLAGS + | lt.torrent_flags.duplicate_is_error + | lt.torrent_flags.upload_mode + ) + ^ lt.torrent_flags.auto_managed + ^ lt.torrent_flags.paused + ) + + torrent_handle = self.session.add_torrent(add_torrent_params) + + d = Deferred() + # Cancel the defer if timeout reached. + d.addTimeout(timeout, self.clock) + self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, []) + + try: + torrent_info = await d + except (defer.TimeoutError, defer.CancelledError): + log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.') + metadata = b'' + else: + log.debug('prefetch metadata received') + if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'): + metadata = torrent_info.metadata() + else: + metadata = torrent_info.info_section() + + log.debug('remove prefetch magnet from session') + result_queue = self.prefetching_metadata.pop(torrent_id).result_queue + self.session.remove_torrent(torrent_handle, 1) + result = torrent_id, b64encode(metadata) + + for d in result_queue: + d.callback(result) + return result + + def _build_torrent_options(self, options): + """Load default options and update if needed.""" + _options = TorrentOptions() + if options: + _options.update(options) + options = _options + + if not options['owner']: + options['owner'] = component.get('RPCServer').get_session_user() + if not component.get('AuthManager').has_account(options['owner']): + options['owner'] = 'localclient' + + return options + + def _build_torrent_params( + self, torrent_info=None, magnet=None, options=None, resume_data=None + ): + """Create the add_torrent_params dict for adding torrent to libtorrent.""" + add_torrent_params = {} + if torrent_info: + add_torrent_params['ti'] = torrent_info + name = torrent_info.name() + if not name: + name = ( + torrent_info.file_at(0).path.replace('\\', '/', 1).split('/', 1)[0] + ) + add_torrent_params['name'] = name + torrent_id = str(torrent_info.info_hash()) + elif magnet: + magnet_info = get_magnet_info(magnet) + if magnet_info: + add_torrent_params['name'] = magnet_info['name'] + add_torrent_params['trackers'] = list(magnet_info['trackers']) + torrent_id = magnet_info['info_hash'] + add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id)) + else: + raise AddTorrentError( + 'Unable to add magnet, invalid magnet info: %s' % magnet + ) + + # Check for existing torrent in session. + if torrent_id in self.get_torrent_list(): + # Attempt merge trackers before returning. + self.torrents[torrent_id].merge_trackers(torrent_info) + raise AddTorrentError('Torrent already in session (%s).' % torrent_id) + elif torrent_id in self.torrents_loading: + raise AddTorrentError('Torrent already being added (%s).' % torrent_id) + elif torrent_id in self.prefetching_metadata: + # Cancel and remove metadata fetching torrent. + self.prefetching_metadata[torrent_id].alert_deferred.cancel() + + # Check for renamed files and if so, rename them in the torrent_info before adding. + if options['mapped_files'] and torrent_info: + for index, fname in options['mapped_files'].items(): + fname = sanitize_filepath(decode_bytes(fname)) + if log.isEnabledFor(logging.DEBUG): + log.debug('renaming file index %s to %s', index, fname) + try: + torrent_info.rename_file(index, fname.encode('utf8')) + except TypeError: + torrent_info.rename_file(index, fname) + add_torrent_params['ti'] = torrent_info + + if log.isEnabledFor(logging.DEBUG): + log.debug('options: %s', options) + + # Fill in the rest of the add_torrent_params dictionary. + add_torrent_params['save_path'] = options['download_location'].encode('utf8') + if options['name']: + add_torrent_params['name'] = options['name'] + if options['pre_allocate_storage']: + add_torrent_params['storage_mode'] = lt.storage_mode_t.storage_mode_allocate + if resume_data: + add_torrent_params['resume_data'] = resume_data + + # Set flags: enable duplicate_is_error & override_resume_data, disable auto_managed. + add_torrent_params['flags'] = ( + LT_DEFAULT_ADD_TORRENT_FLAGS | lt.torrent_flags.duplicate_is_error + ) ^ lt.torrent_flags.auto_managed + if options['seed_mode']: + add_torrent_params['flags'] |= lt.torrent_flags.seed_mode + if options['super_seeding']: + add_torrent_params['flags'] |= lt.torrent_flags.super_seeding + + return torrent_id, add_torrent_params + + def add( + self, + torrent_info=None, + state=None, + options=None, + save_state=True, + filedump=None, + filename=None, + magnet=None, + resume_data=None, + ): + """Adds a torrent to the torrent manager. + + Args: + torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object. + state (TorrentState, optional): The torrent state. + options (dict, optional): The options to apply to the torrent on adding. + save_state (bool, optional): If True save the session state after adding torrent, defaults to True. + filedump (str, optional): bencoded filedump of a torrent file. + filename (str, optional): The filename of the torrent file. + magnet (str, optional): The magnet URI. + resume_data (lt.entry, optional): libtorrent fast resume data. + + Returns: + str: If successful the torrent_id of the added torrent, None if adding the torrent failed. + + Emits: + TorrentAddedEvent: Torrent with torrent_id added to session. + + """ + if not torrent_info and not filedump and not magnet: + raise AddTorrentError( + 'You must specify a valid torrent_info, torrent state or magnet.' + ) + + if filedump: + try: + torrent_info = lt.torrent_info(lt.bdecode(filedump)) + except RuntimeError as ex: + raise AddTorrentError( + 'Unable to add torrent, decoding filedump failed: %s' % ex + ) + + options = self._build_torrent_options(options) + __, add_torrent_params = self._build_torrent_params( + torrent_info, magnet, options, resume_data + ) + + # We need to pause the AlertManager momentarily to prevent alerts + # for this torrent being generated before a Torrent object is created. + component.pause('AlertManager') + + try: + handle = self.session.add_torrent(add_torrent_params) + if not handle.is_valid(): + raise InvalidTorrentError('Torrent handle is invalid!') + except (RuntimeError, InvalidTorrentError) as ex: + component.resume('AlertManager') + raise AddTorrentError('Unable to add torrent to session: %s' % ex) + + torrent = self._add_torrent_obj( + handle, options, state, filename, magnet, resume_data, filedump, save_state + ) + return torrent.torrent_id + + def add_async( + self, + torrent_info=None, + state=None, + options=None, + save_state=True, + filedump=None, + filename=None, + magnet=None, + resume_data=None, + ): + """Adds a torrent to the torrent manager using libtorrent async add torrent method. + + Args: + torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object. + state (TorrentState, optional): The torrent state. + options (dict, optional): The options to apply to the torrent on adding. + save_state (bool, optional): If True save the session state after adding torrent, defaults to True. + filedump (str, optional): bencoded filedump of a torrent file. + filename (str, optional): The filename of the torrent file. + magnet (str, optional): The magnet URI. + resume_data (lt.entry, optional): libtorrent fast resume data. + + Returns: + Deferred: If successful the torrent_id of the added torrent, None if adding the torrent failed. + + Emits: + TorrentAddedEvent: Torrent with torrent_id added to session. + + """ + if not torrent_info and not filedump and not magnet: + raise AddTorrentError( + 'You must specify a valid torrent_info, torrent state or magnet.' + ) + + if filedump: + try: + torrent_info = lt.torrent_info(lt.bdecode(filedump)) + except RuntimeError as ex: + raise AddTorrentError( + 'Unable to add torrent, decoding filedump failed: %s' % ex + ) + + options = self._build_torrent_options(options) + torrent_id, add_torrent_params = self._build_torrent_params( + torrent_info, magnet, options, resume_data + ) + + d = Deferred() + self.torrents_loading[torrent_id] = ( + d, + options, + state, + filename, + magnet, + resume_data, + filedump, + save_state, + ) + try: + self.session.async_add_torrent(add_torrent_params) + except RuntimeError as ex: + raise AddTorrentError('Unable to add torrent to session: %s' % ex) + return d + + def _add_torrent_obj( + self, + handle, + options, + state, + filename, + magnet, + resume_data, + filedump, + save_state, + ): + # For magnets added with metadata, filename is used so set as magnet. + if not magnet and is_magnet(filename): + magnet = filename + filename = None + + # Create a Torrent object and add to the dictionary. + torrent = Torrent(handle, options, state, filename, magnet) + self.torrents[torrent.torrent_id] = torrent + + # Resume AlertManager if paused for adding torrent to libtorrent. + component.resume('AlertManager') + + # Store the original resume_data, in case of errors. + if resume_data: + self.resume_data[torrent.torrent_id] = resume_data + + # Add to queued torrents set. + self.queued_torrents.add(torrent.torrent_id) + if self.config['queue_new_to_top']: + self.queue_top(torrent.torrent_id) + + # Resume the torrent if needed. + if not options['add_paused']: + torrent.resume() + + # Emit torrent_added signal. + from_state = state is not None + component.get('EventManager').emit( + TorrentAddedEvent(torrent.torrent_id, from_state) + ) + + if log.isEnabledFor(logging.DEBUG): + log.debug('Torrent added: %s', str(handle.info_hash())) + if log.isEnabledFor(logging.INFO): + name_and_owner = torrent.get_status(['name', 'owner']) + log.info( + 'Torrent %s from user "%s" %s', + name_and_owner['name'], + name_and_owner['owner'], + from_state and 'loaded' or 'added', + ) + + # Write the .torrent file to the state directory. + if filedump: + torrent.write_torrentfile(filedump) + + # Save the session state. + if save_state: + self.save_state() + + return torrent + + def add_async_callback( + self, + handle, + d, + options, + state, + filename, + magnet, + resume_data, + filedump, + save_state, + ): + torrent = self._add_torrent_obj( + handle, options, state, filename, magnet, resume_data, filedump, save_state + ) + + d.callback(torrent.torrent_id) + + def remove(self, torrent_id, remove_data=False, save_state=True): + """Remove a torrent from the session. + + Args: + torrent_id (str): The torrent ID to remove. + remove_data (bool, optional): If True, remove the downloaded data, defaults to False. + save_state (bool, optional): If True, save the session state after removal, defaults to True. + + Returns: + bool: True if removed successfully, False if not. + + Emits: + PreTorrentRemovedEvent: Torrent is about to be removed from session. + TorrentRemovedEvent: Torrent with torrent_id removed from session. + + Raises: + InvalidTorrentError: If the torrent_id is not in the session. + + """ + try: + torrent = self.torrents[torrent_id] + except KeyError: + raise InvalidTorrentError('torrent_id %s not in session.' % torrent_id) + + torrent_name = torrent.get_status(['name'])['name'] + + # Emit the signal to the clients + component.get('EventManager').emit(PreTorrentRemovedEvent(torrent_id)) + + try: + self.session.remove_torrent(torrent.handle, 1 if remove_data else 0) + except RuntimeError as ex: + log.warning('Error removing torrent: %s', ex) + return False + + # Remove fastresume data if it is exists + self.resume_data.pop(torrent_id, None) + + # Remove the .torrent file in the state and copy location, if user requested. + delete_copies = ( + self.config['copy_torrent_file'] and self.config['del_copy_torrent_file'] + ) + torrent.delete_torrentfile(delete_copies) + + # Remove from set if it wasn't finished + if not torrent.is_finished: + try: + self.queued_torrents.remove(torrent_id) + except KeyError: + log.debug('%s is not in queued torrents set.', torrent_id) + raise InvalidTorrentError( + '%s is not in queued torrents set.' % torrent_id + ) + + # Remove the torrent from deluge's session + del self.torrents[torrent_id] + + if save_state: + self.save_state() + + # Emit the signal to the clients + component.get('EventManager').emit(TorrentRemovedEvent(torrent_id)) + log.info( + 'Torrent %s removed by user: %s', + torrent_name, + component.get('RPCServer').get_session_user(), + ) + return True + + def fixup_state(self, state): + """Fixup an old state by adding missing TorrentState options and assigning default values. + + Args: + state (TorrentManagerState): A torrentmanager state containing torrent details. + + Returns: + TorrentManagerState: A fixedup TorrentManager state. + + """ + if state.torrents: + t_state_tmp = TorrentState() + if dir(state.torrents[0]) != dir(t_state_tmp): + self.archive_state('Migration of TorrentState required.') + try: + for attr in set(dir(t_state_tmp)) - set(dir(state.torrents[0])): + for t_state in state.torrents: + setattr(t_state, attr, getattr(t_state_tmp, attr, None)) + except AttributeError as ex: + log.error( + 'Unable to update state file to a compatible version: %s', ex + ) + return state + + def open_state(self): + """Open the torrents.state file containing a TorrentManager state with session torrents. + + Returns: + TorrentManagerState: The TorrentManager state. + + """ + torrents_state = os.path.join(self.state_dir, 'torrents.state') + state = None + for filepath in (torrents_state, torrents_state + '.bak'): + log.info('Loading torrent state: %s', filepath) + if not os.path.isfile(filepath): + continue + + try: + with open(filepath, 'rb') as _file: + state = pickle.load(_file, encoding='utf8') + except (OSError, EOFError, pickle.UnpicklingError) as ex: + message = f'Unable to load {filepath}: {ex}' + log.error(message) + if not filepath.endswith('.bak'): + self.archive_state(message) + else: + log.info('Successfully loaded %s', filepath) + break + + return state if state else TorrentManagerState() + + def load_state(self): + """Load all the torrents from TorrentManager state into session. + + Emits: + SessionStartedEvent: Emitted after all torrents are added to the session. + + """ + start = datetime.datetime.now() + state = self.open_state() + state = self.fixup_state(state) + + # Reorder the state.torrents list to add torrents in the correct queue order. + state.torrents.sort( + key=operator.attrgetter('queue'), reverse=self.config['queue_new_to_top'] + ) + resume_data = self.load_resume_data_file() + + deferreds = [] + for t_state in state.torrents: + # Populate the options dict from state + options = TorrentOptions() + for option in options: + try: + options[option] = getattr(t_state, option) + except AttributeError: + pass + # Manually update unmatched attributes + options['download_location'] = t_state.save_path + options['pre_allocate_storage'] = t_state.storage_mode == 'allocate' + options['prioritize_first_last_pieces'] = t_state.prioritize_first_last + options['add_paused'] = t_state.paused + + magnet = t_state.magnet + torrent_info = self.get_torrent_info_from_file( + os.path.join(self.state_dir, t_state.torrent_id + '.torrent') + ) + + try: + d = self.add_async( + torrent_info=torrent_info, + state=t_state, + options=options, + save_state=False, + magnet=magnet, + resume_data=resume_data.get(t_state.torrent_id), + ) + except AddTorrentError as ex: + log.warning( + 'Error when adding torrent "%s" to session: %s', + t_state.torrent_id, + ex, + ) + else: + deferreds.append(d) + + deferred_list = DeferredList(deferreds, consumeErrors=False) + + def on_complete(result): + log.info( + 'Finished loading %d torrents in %s', + len(state.torrents), + str(datetime.datetime.now() - start), + ) + component.get('EventManager').emit(SessionStartedEvent()) + + deferred_list.addCallback(on_complete) + + def create_state(self): + """Create a state of all the torrents in TorrentManager. + + Returns: + TorrentManagerState: The TorrentManager state. + + """ + state = TorrentManagerState() + # Create the state for each Torrent and append to the list + for torrent in self.torrents.values(): + if self.session.is_paused(): + paused = torrent.handle.is_paused() + elif torrent.forced_error: + paused = torrent.forced_error.was_paused + elif torrent.state == 'Paused': + paused = True + else: + paused = False + + torrent_state = TorrentState( + torrent.torrent_id, + torrent.filename, + torrent.trackers, + torrent.get_status(['storage_mode'])['storage_mode'], + paused, + torrent.options['download_location'], + torrent.options['max_connections'], + torrent.options['max_upload_slots'], + torrent.options['max_upload_speed'], + torrent.options['max_download_speed'], + torrent.options['prioritize_first_last_pieces'], + torrent.options['sequential_download'], + torrent.options['file_priorities'], + torrent.get_queue_position(), + torrent.options['auto_managed'], + torrent.is_finished, + torrent.options['stop_ratio'], + torrent.options['stop_at_ratio'], + torrent.options['remove_at_ratio'], + torrent.options['move_completed'], + torrent.options['move_completed_path'], + torrent.magnet, + torrent.options['owner'], + torrent.options['shared'], + torrent.options['super_seeding'], + torrent.options['name'], + ) + state.torrents.append(torrent_state) + return state + + def save_state(self): + """Run the save state task in a separate thread to avoid blocking main thread. + + Note: + If a save task is already running, this call is ignored. + + """ + if self.is_saving_state: + return defer.succeed(None) + self.is_saving_state = True + d = threads.deferToThread(self._save_state) + + def on_state_saved(arg): + self.is_saving_state = False + if self.save_state_timer.running: + self.save_state_timer.reset() + + d.addBoth(on_state_saved) + return d + + def _save_state(self): + """Save the state of the TorrentManager to the torrents.state file.""" + state = self.create_state() + + # If the state hasn't changed, no need to save it + if self.prev_saved_state == state: + return + + filename = 'torrents.state' + filepath = os.path.join(self.state_dir, filename) + filepath_bak = filepath + '.bak' + filepath_tmp = filepath + '.tmp' + + try: + log.debug('Creating the temporary file: %s', filepath_tmp) + with open(filepath_tmp, 'wb', 0) as _file: + pickle.dump(state, _file, protocol=2) + _file.flush() + os.fsync(_file.fileno()) + except (OSError, pickle.PicklingError) as ex: + log.error('Unable to save %s: %s', filename, ex) + return + + try: + log.debug('Creating backup of %s at: %s', filename, filepath_bak) + if os.path.isfile(filepath_bak): + os.remove(filepath_bak) + if os.path.isfile(filepath): + os.rename(filepath, filepath_bak) + except OSError as ex: + log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex) + return + + try: + log.debug('Saving %s to: %s', filename, filepath) + os.rename(filepath_tmp, filepath) + self.prev_saved_state = state + except OSError as ex: + log.error('Failed to set new state file %s: %s', filepath, ex) + if os.path.isfile(filepath_bak): + log.info('Restoring backup of state from: %s', filepath_bak) + os.rename(filepath_bak, filepath) + + def save_resume_data(self, torrent_ids=None, flush_disk_cache=False): + """Saves torrents resume data. + + Args: + torrent_ids (list of str): A list of torrents to save the resume data for, defaults + to None which saves all torrents resume data. + flush_disk_cache (bool, optional): If True flushes the disk cache which avoids potential + issue with file timestamps, defaults to False. This is only needed when stopping the session. + + Returns: + t.i.d.DeferredList: A list of twisted Deferred callbacks to be invoked when save is complete. + + """ + if torrent_ids is None: + torrent_ids = ( + tid + for tid, t in self.torrents.items() + if t.handle.need_save_resume_data() + ) + + def on_torrent_resume_save(dummy_result, torrent_id): + """Received torrent resume_data alert so remove from waiting list""" + self.waiting_on_resume_data.pop(torrent_id, None) + + deferreds = [] + for torrent_id in torrent_ids: + d = self.waiting_on_resume_data.get(torrent_id) + if not d: + d = Deferred().addBoth(on_torrent_resume_save, torrent_id) + self.waiting_on_resume_data[torrent_id] = d + deferreds.append(d) + self.torrents[torrent_id].save_resume_data(flush_disk_cache) + + def on_all_resume_data_finished(dummy_result): + """Saves resume data file when no more torrents waiting for resume data. + + Returns: + bool: True if fastresume file is saved. + + This return value determines removal of `self.temp_file` in `self.stop()`. + + """ + # Use flush_disk_cache as a marker for shutdown so fastresume is + # saved even if torrents are waiting. + if not self.waiting_on_resume_data or flush_disk_cache: + return self.save_resume_data_file(queue_task=flush_disk_cache) + + return DeferredList(deferreds).addBoth(on_all_resume_data_finished) + + def load_resume_data_file(self): + """Load the resume data from file for all torrents. + + Returns: + dict: A dict of torrents and their resume_data. + + """ + filename = 'torrents.fastresume' + filepath = os.path.join(self.state_dir, filename) + filepath_bak = filepath + '.bak' + old_data_filepath = os.path.join(get_config_dir(), filename) + + for _filepath in (filepath, filepath_bak, old_data_filepath): + log.info('Opening %s for load: %s', filename, _filepath) + try: + with open(_filepath, 'rb') as _file: + resume_data = lt.bdecode(_file.read()) + except (OSError, EOFError, RuntimeError) as ex: + if self.torrents: + log.warning('Unable to load %s: %s', _filepath, ex) + resume_data = None + else: + # lt.bdecode returns the dict keys as bytes so decode them. + resume_data = {k.decode(): v for k, v in resume_data.items()} + log.info('Successfully loaded %s: %s', filename, _filepath) + break + + # If the libtorrent bdecode doesn't happen properly, it will return None + # so we need to make sure we return a {} + if resume_data is None: + return {} + else: + return resume_data + + def save_resume_data_file(self, queue_task=False): + """Save resume data to file in a separate thread to avoid blocking main thread. + + Args: + queue_task (bool): If True and a save task is already running then queue + this save task to run next. Default is to not queue save tasks. + + Returns: + Deferred: Fires with arg, True if save task was successful, False if + not and None if task was not performed. + + """ + if not queue_task and self.save_resume_data_file_lock.locked: + return defer.succeed(None) + + def on_lock_aquired(): + d = threads.deferToThread(self._save_resume_data_file) + + def on_resume_data_file_saved(arg): + if self.save_resume_data_timer.running: + self.save_resume_data_timer.reset() + return arg + + d.addBoth(on_resume_data_file_saved) + return d + + return self.save_resume_data_file_lock.run(on_lock_aquired) + + def _save_resume_data_file(self): + """Saves the resume data file with the contents of self.resume_data""" + if not self.resume_data: + return True + + filename = 'torrents.fastresume' + filepath = os.path.join(self.state_dir, filename) + filepath_bak = filepath + '.bak' + filepath_tmp = filepath + '.tmp' + + try: + log.debug('Creating the temporary file: %s', filepath_tmp) + with open(filepath_tmp, 'wb', 0) as _file: + _file.write(lt.bencode(self.resume_data)) + _file.flush() + os.fsync(_file.fileno()) + except (OSError, EOFError) as ex: + log.error('Unable to save %s: %s', filename, ex) + return False + + try: + log.debug('Creating backup of %s at: %s', filename, filepath_bak) + if os.path.isfile(filepath_bak): + os.remove(filepath_bak) + if os.path.isfile(filepath): + os.rename(filepath, filepath_bak) + except OSError as ex: + log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex) + return False + + try: + log.debug('Saving %s to: %s', filename, filepath) + os.rename(filepath_tmp, filepath) + except OSError as ex: + log.error('Failed to set new file %s: %s', filepath, ex) + if os.path.isfile(filepath_bak): + log.info('Restoring backup from: %s', filepath_bak) + os.rename(filepath_bak, filepath) + else: + # Sync the rename operations for the directory + if hasattr(os, 'O_DIRECTORY'): + dirfd = os.open(os.path.dirname(filepath), os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + return True + + def archive_state(self, message): + log.warning(message) + arc_filepaths = [] + for filename in ('torrents.fastresume', 'torrents.state'): + filepath = os.path.join(self.state_dir, filename) + arc_filepaths.extend([filepath, filepath + '.bak']) + + archive_files('state', arc_filepaths, message=message) + + def get_queue_position(self, torrent_id): + """Get queue position of torrent""" + return self.torrents[torrent_id].get_queue_position() + + def queue_top(self, torrent_id): + """Queue torrent to top""" + if self.torrents[torrent_id].get_queue_position() == 0: + return False + + self.torrents[torrent_id].handle.queue_position_top() + return True + + def queue_up(self, torrent_id): + """Queue torrent up one position""" + if self.torrents[torrent_id].get_queue_position() == 0: + return False + + self.torrents[torrent_id].handle.queue_position_up() + return True + + def queue_down(self, torrent_id): + """Queue torrent down one position""" + if self.torrents[torrent_id].get_queue_position() == ( + len(self.queued_torrents) - 1 + ): + return False + + self.torrents[torrent_id].handle.queue_position_down() + return True + + def queue_bottom(self, torrent_id): + """Queue torrent to bottom""" + if self.torrents[torrent_id].get_queue_position() == ( + len(self.queued_torrents) - 1 + ): + return False + + self.torrents[torrent_id].handle.queue_position_bottom() + return True + + def cleanup_torrents_prev_status(self): + """Run cleanup_prev_status for each registered torrent""" + for torrent in self.torrents.values(): + torrent.cleanup_prev_status() + + def on_set_max_connections_per_torrent(self, key, value): + """Sets the per-torrent connection limit""" + log.debug('max_connections_per_torrent set to %s...', value) + for key in self.torrents: + self.torrents[key].set_max_connections(value) + + def on_set_max_upload_slots_per_torrent(self, key, value): + """Sets the per-torrent upload slot limit""" + log.debug('max_upload_slots_per_torrent set to %s...', value) + for key in self.torrents: + self.torrents[key].set_max_upload_slots(value) + + def on_set_max_upload_speed_per_torrent(self, key, value): + """Sets the per-torrent upload speed limit""" + log.debug('max_upload_speed_per_torrent set to %s...', value) + for key in self.torrents: + self.torrents[key].set_max_upload_speed(value) + + def on_set_max_download_speed_per_torrent(self, key, value): + """Sets the per-torrent download speed limit""" + log.debug('max_download_speed_per_torrent set to %s...', value) + for key in self.torrents: + self.torrents[key].set_max_download_speed(value) + + # --- Alert handlers --- + def on_alert_add_torrent(self, alert): + """Alert handler for libtorrent add_torrent_alert""" + if not alert.handle.is_valid(): + log.warning('Torrent handle is invalid: %s', alert.error.message()) + return + + try: + torrent_id = str(alert.handle.info_hash()) + except RuntimeError as ex: + log.warning('Failed to get torrent id from handle: %s', ex) + return + + try: + add_async_params = self.torrents_loading.pop(torrent_id) + except KeyError as ex: + log.warning('Torrent id not in torrents loading list: %s', ex) + return + + self.add_async_callback(alert.handle, *add_async_params) + + def on_alert_torrent_finished(self, alert): + """Alert handler for libtorrent torrent_finished_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + + # If total_download is 0, do not move, it's likely the torrent wasn't downloaded, but just added. + # Get fresh data from libtorrent, the cache isn't always up to date + total_download = torrent.get_status(['total_payload_download'], update=True)[ + 'total_payload_download' + ] + + if log.isEnabledFor(logging.DEBUG): + log.debug('Finished %s ', torrent_id) + log.debug( + 'Torrent settings: is_finished: %s, total_download: %s, move_completed: %s, move_path: %s', + torrent.is_finished, + total_download, + torrent.options['move_completed'], + torrent.options['move_completed_path'], + ) + + torrent.update_state() + if not torrent.is_finished and total_download: + # Move completed download to completed folder if needed + if ( + torrent.options['move_completed'] + and torrent.options['download_location'] + != torrent.options['move_completed_path'] + ): + self.waiting_on_finish_moving.append(torrent_id) + torrent.move_storage(torrent.options['move_completed_path']) + else: + torrent.is_finished = True + component.get('EventManager').emit(TorrentFinishedEvent(torrent_id)) + else: + torrent.is_finished = True + + # Torrent is no longer part of the queue + try: + self.queued_torrents.remove(torrent_id) + except KeyError: + # Sometimes libtorrent fires a TorrentFinishedEvent twice + if log.isEnabledFor(logging.DEBUG): + log.debug('%s is not in queued torrents set.', torrent_id) + + # Only save resume data if it was actually downloaded something. Helps + # on startup with big queues with lots of seeding torrents. Libtorrent + # emits alert_torrent_finished for them, but there seems like nothing + # worth really to save in resume data, we just read it up in + # self.load_state(). + if total_download: + self.save_resume_data((torrent_id,)) + + def on_alert_torrent_paused(self, alert): + """Alert handler for libtorrent torrent_paused_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + torrent.update_state() + # Write the fastresume file if we are not waiting on a bulk write + if torrent_id not in self.waiting_on_resume_data: + self.save_resume_data((torrent_id,)) + + def on_alert_torrent_checked(self, alert): + """Alert handler for libtorrent torrent_checked_alert""" + try: + torrent = self.torrents[str(alert.handle.info_hash())] + except (RuntimeError, KeyError): + return + + # Check to see if we're forcing a recheck and set it back to paused if necessary. + if torrent.forcing_recheck: + torrent.forcing_recheck = False + if torrent.forcing_recheck_paused: + torrent.handle.pause() + + torrent.update_state() + + def on_alert_tracker_reply(self, alert): + """Alert handler for libtorrent tracker_reply_alert""" + try: + torrent = self.torrents[str(alert.handle.info_hash())] + except (RuntimeError, KeyError): + return + + # Set the tracker status for the torrent + torrent.set_tracker_status('Announce OK') + + # Check for peer information from the tracker, if none then send a scrape request. + torrent.get_lt_status() + if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1: + torrent.scrape_tracker() + + def on_alert_tracker_announce(self, alert): + """Alert handler for libtorrent tracker_announce_alert""" + try: + torrent = self.torrents[str(alert.handle.info_hash())] + except (RuntimeError, KeyError): + return + + # Set the tracker status for the torrent + torrent.set_tracker_status('Announce Sent') + + def on_alert_tracker_warning(self, alert): + """Alert handler for libtorrent tracker_warning_alert""" + try: + torrent = self.torrents[str(alert.handle.info_hash())] + except (RuntimeError, KeyError): + return + # Set the tracker status for the torrent + torrent.set_tracker_status('Warning: %s' % decode_bytes(alert.message())) + + def on_alert_tracker_error(self, alert): + """Alert handler for libtorrent tracker_error_alert""" + try: + torrent = self.torrents[str(alert.handle.info_hash())] + except (RuntimeError, KeyError): + return + + error_message = decode_bytes(alert.error_message()) + if not error_message: + error_message = decode_bytes(alert.error.message()) + log.debug( + 'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message + ) + # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates + # we will need to verify that at least one endpoint to the errored tracker is working + for tracker in torrent.handle.trackers(): + if tracker['url'] == alert.url: + if any( + endpoint['last_error']['value'] == 0 + for endpoint in tracker['endpoints'] + ): + torrent.set_tracker_status('Announce OK') + else: + torrent.set_tracker_status('Error: ' + error_message) + break + + def on_alert_storage_moved(self, alert): + """Alert handler for libtorrent storage_moved_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + + torrent.set_download_location(os.path.normpath(alert.storage_path())) + torrent.set_move_completed(False) + torrent.update_state() + + if torrent_id in self.waiting_on_finish_moving: + self.waiting_on_finish_moving.remove(torrent_id) + torrent.is_finished = True + component.get('EventManager').emit(TorrentFinishedEvent(torrent_id)) + + def on_alert_storage_moved_failed(self, alert): + """Alert handler for libtorrent storage_moved_failed_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + + log.warning('on_alert_storage_moved_failed: %s', decode_bytes(alert.message())) + # Set an Error message and pause the torrent + alert_msg = decode_bytes(alert.message()).split(':', 1)[1].strip() + torrent.force_error_state('Failed to move download folder: %s' % alert_msg) + + if torrent_id in self.waiting_on_finish_moving: + self.waiting_on_finish_moving.remove(torrent_id) + torrent.is_finished = True + component.get('EventManager').emit(TorrentFinishedEvent(torrent_id)) + + def on_alert_torrent_resumed(self, alert): + """Alert handler for libtorrent torrent_resumed_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + torrent.update_state() + component.get('EventManager').emit(TorrentResumedEvent(torrent_id)) + + def on_alert_state_changed(self, alert): + """Alert handler for libtorrent state_changed_alert. + + Emits: + TorrentStateChangedEvent: The state has changed. + + """ + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + + torrent.update_state() + # Torrent may need to download data after checking. + if torrent.state in ('Checking', 'Downloading'): + torrent.is_finished = False + self.queued_torrents.add(torrent_id) + + def on_alert_save_resume_data(self, alert): + """Alert handler for libtorrent save_resume_data_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + except RuntimeError: + return + if torrent_id in self.torrents: + # libtorrent add_torrent expects bencoded resume_data. + self.resume_data[torrent_id] = lt.bencode( + lt.write_resume_data(alert.params) + ) + + if torrent_id in self.waiting_on_resume_data: + self.waiting_on_resume_data[torrent_id].callback(None) + + def on_alert_save_resume_data_failed(self, alert): + """Alert handler for libtorrent save_resume_data_failed_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + except RuntimeError: + return + + if torrent_id in self.waiting_on_resume_data: + self.waiting_on_resume_data[torrent_id].errback( + Exception(decode_bytes(alert.message())) + ) + + def on_alert_fastresume_rejected(self, alert): + """Alert handler for libtorrent fastresume_rejected_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + + alert_msg = decode_bytes(alert.message()) + log.error('on_alert_fastresume_rejected: %s', alert_msg) + if alert.error.value() == 134: + if not os.path.isdir(torrent.options['download_location']): + error_msg = 'Unable to locate Download Folder!' + else: + error_msg = 'Missing or invalid torrent data!' + else: + error_msg = ( + 'Problem with resume data: %s' % alert_msg.split(':', 1)[1].strip() + ) + torrent.force_error_state(error_msg, restart_to_resume=True) + + def on_alert_file_renamed(self, alert): + """Alert handler for libtorrent file_renamed_alert. + + Emits: + TorrentFileRenamedEvent: Files in the torrent have been renamed. + + """ + try: + torrent_id = str(alert.handle.info_hash()) + torrent = self.torrents[torrent_id] + except (RuntimeError, KeyError): + return + + new_name = decode_bytes(alert.new_name()) + log.debug('index: %s name: %s', alert.index, new_name) + + # We need to see if this file index is in a waiting_on_folder dict + for wait_on_folder in torrent.waiting_on_folder_rename: + if alert.index in wait_on_folder: + wait_on_folder[alert.index].callback(None) + break + else: + # This is just a regular file rename so send the signal + component.get('EventManager').emit( + TorrentFileRenamedEvent(torrent_id, alert.index, new_name) + ) + self.save_resume_data((torrent_id,)) + + def on_alert_metadata_received(self, alert): + """Alert handler for libtorrent metadata_received_alert""" + try: + torrent_id = str(alert.handle.info_hash()) + except RuntimeError: + return + + try: + torrent = self.torrents[torrent_id] + except KeyError: + pass + else: + return torrent.on_metadata_received() + + # Try callback to prefetch_metadata method. + try: + d = self.prefetching_metadata[torrent_id].alert_deferred + except KeyError: + pass + else: + torrent_info = alert.handle.get_torrent_info() + return d.callback(torrent_info) + + def on_alert_file_error(self, alert): + """Alert handler for libtorrent file_error_alert""" + try: + torrent = self.torrents[str(alert.handle.info_hash())] + except (RuntimeError, KeyError): + return + torrent.update_state() + + def on_alert_file_completed(self, alert): + """Alert handler for libtorrent file_completed_alert + + Emits: + TorrentFileCompletedEvent: When an individual file completes downloading. + + """ + try: + torrent_id = str(alert.handle.info_hash()) + except RuntimeError: + return + if torrent_id in self.torrents: + component.get('EventManager').emit( + TorrentFileCompletedEvent(torrent_id, alert.index) + ) + + def on_alert_state_update(self, alert): + """Alert handler for libtorrent state_update_alert + + Result of a session.post_torrent_updates() call and contains the torrent status + of all torrents that changed since last time this was posted. + + """ + self.last_state_update_alert_ts = time.time() + + for t_status in alert.status: + try: + torrent_id = str(t_status.info_hash) + except RuntimeError: + continue + if torrent_id in self.torrents: + self.torrents[torrent_id].status = t_status + + self.handle_torrents_status_callback(self.torrents_status_requests.pop()) + + def on_alert_external_ip(self, alert): + """Alert handler for libtorrent external_ip_alert""" + log.info('on_alert_external_ip: %s', alert.external_address) + component.get('EventManager').emit(ExternalIPEvent(alert.external_address)) + + def on_alert_performance(self, alert): + """Alert handler for libtorrent performance_alert""" + log.warning( + 'on_alert_performance: %s, %s', + decode_bytes(alert.message()), + alert.warning_code, + ) + if alert.warning_code == lt.performance_warning_t.send_buffer_watermark_too_low: + max_send_buffer_watermark = 3 * 1024 * 1024 # 3MiB + settings = self.session.get_settings() + send_buffer_watermark = settings['send_buffer_watermark'] + + # If send buffer is too small, try increasing its size by 512KiB (up to max_send_buffer_watermark) + if send_buffer_watermark < max_send_buffer_watermark: + value = send_buffer_watermark + (500 * 1024) + log.info( + 'Increasing send_buffer_watermark from %s to %s Bytes', + send_buffer_watermark, + value, + ) + component.get('Core').apply_session_setting( + 'send_buffer_watermark', value + ) + else: + log.warning( + 'send_buffer_watermark reached maximum value: %s Bytes', + max_send_buffer_watermark, + ) + + def separate_keys(self, keys, torrent_ids): + """Separates the input keys into torrent class keys and plugins keys""" + if self.torrents: + for torrent_id in torrent_ids: + if torrent_id in self.torrents: + status_keys = list(self.torrents[torrent_id].status_funcs) + leftover_keys = list(set(keys) - set(status_keys)) + torrent_keys = list(set(keys) - set(leftover_keys)) + return torrent_keys, leftover_keys + return [], [] + + def handle_torrents_status_callback(self, status_request): + """Build the status dictionary with torrent values""" + d, torrent_ids, keys, diff = status_request + status_dict = {}.fromkeys(torrent_ids) + torrent_keys, plugin_keys = self.separate_keys(keys, torrent_ids) + + # Get the torrent status for each torrent_id + for torrent_id in torrent_ids: + if torrent_id not in self.torrents: + # The torrent_id does not exist in the dict. + # Could be the clients cache (sessionproxy) isn't up to speed. + del status_dict[torrent_id] + else: + status_dict[torrent_id] = self.torrents[torrent_id].get_status( + torrent_keys, diff, all_keys=not keys + ) + self.status_dict = status_dict + d.callback((status_dict, plugin_keys)) + + def torrents_status_update(self, torrent_ids, keys, diff=False): + """Returns status dict for the supplied torrent_ids async. + + Note: + If torrent states was updated recently post_torrent_updates is not called and + instead cached state is used. + + Args: + torrent_ids (list of str): The torrent IDs to get the status of. + keys (list of str): The keys to get the status on. + diff (bool, optional): If True, will return a diff of the changes since the + last call to get_status based on the session_id, defaults to False. + + Returns: + dict: A status dictionary for the requested torrents. + + """ + d = Deferred() + now = time.time() + # If last update was recent, use cached data instead of request updates from libtorrent + if (now - self.last_state_update_alert_ts) < 1.5: + reactor.callLater( + 0, self.handle_torrents_status_callback, (d, torrent_ids, keys, diff) + ) + else: + # Ask libtorrent for status update + self.torrents_status_requests.insert(0, (d, torrent_ids, keys, diff)) + self.session.post_torrent_updates() + return d |